Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions server/internal/database/service_instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,7 @@ type ServiceInstanceSpec struct {
DatabaseHosts []ServiceHostEntry // Ordered list of Postgres host:port entries
TargetSessionAttrs string // libpq target_session_attrs value
Port *int // Service instance published port (optional, 0 = random)
DatabaseNodes []*NodeInstances // All database nodes; used to create per-node ServiceUserRole resources
}

// storedToServiceInstance converts stored service instance and status to ServiceInstance.
Expand Down
30 changes: 30 additions & 0 deletions server/internal/orchestrator/swarm/orchestrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -440,6 +440,10 @@ func (o *Orchestrator) GenerateServiceInstanceResources(spec *database.ServiceIn
Allocator: o.dbNetworkAllocator,
}

// Canonical identifiers for the RO and RW service user roles.
canonicalROID := ServiceUserRoleIdentifier(spec.ServiceSpec.ServiceID, ServiceUserRoleRO)
canonicalRWID := ServiceUserRoleIdentifier(spec.ServiceSpec.ServiceID, ServiceUserRoleRW)

// Service user role resources (manages database user lifecycle).
// Two roles are created per service: read-only and read-write.
serviceUserRoleRO := &ServiceUserRole{
Expand Down Expand Up @@ -521,6 +525,32 @@ func (o *Orchestrator) GenerateServiceInstanceResources(spec *database.ServiceIn
serviceInstance,
}

// Append per-node ServiceUserRole resources for each additional database node.
// The canonical resources (above) cover the first node; nodes [1:] each get
// their own RO and RW role that sources credentials from the canonical.
if len(spec.DatabaseNodes) > 1 {
for _, nodeInst := range spec.DatabaseNodes[1:] {
orchestratorResources = append(orchestratorResources,
&ServiceUserRole{
ServiceID: spec.ServiceSpec.ServiceID,
DatabaseID: spec.DatabaseID,
DatabaseName: spec.DatabaseName,
NodeName: nodeInst.NodeName,
Mode: ServiceUserRoleRO,
CredentialSource: &canonicalROID,
},
&ServiceUserRole{
ServiceID: spec.ServiceSpec.ServiceID,
DatabaseID: spec.DatabaseID,
DatabaseName: spec.DatabaseName,
NodeName: nodeInst.NodeName,
Mode: ServiceUserRoleRW,
CredentialSource: &canonicalRWID,
},
)
}
}

// Convert to resource data
data := make([]*resource.ResourceData, len(orchestratorResources))
for i, res := range orchestratorResources {
Expand Down
150 changes: 56 additions & 94 deletions server/internal/orchestrator/swarm/service_user_role.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,13 @@ package swarm

import (
"context"
"errors"
"fmt"
"strings"
"time"

"github.com/jackc/pgx/v5"
"github.com/rs/zerolog"
"github.com/samber/do"

"github.com/pgEdge/control-plane/server/internal/certificates"
"github.com/pgEdge/control-plane/server/internal/database"
"github.com/pgEdge/control-plane/server/internal/patroni"
"github.com/pgEdge/control-plane/server/internal/postgres"
"github.com/pgEdge/control-plane/server/internal/resource"
"github.com/pgEdge/control-plane/server/internal/utils"
Expand Down Expand Up @@ -41,6 +36,13 @@ func ServiceUserRoleIdentifier(serviceInstanceID string, mode string) resource.I
}
}

func ServiceUserRolePerNodeIdentifier(serviceID, mode, nodeName string) resource.Identifier {
return resource.Identifier{
ID: serviceID + "-" + mode + "-" + nodeName,
Type: ResourceTypeServiceUserRole,
}
}

// ServiceUserRole manages the lifecycle of a database user for a service.
//
// Two ServiceUserRole resources are created per service: one with Mode set to
Expand All @@ -55,16 +57,19 @@ func ServiceUserRoleIdentifier(serviceInstanceID string, mode string) resource.I
// On subsequent reconciliation cycles, credentials are reused from the persisted
// state (no password regeneration).
//
// The role is created on the primary instance and Spock replicates it to all
// other nodes automatically.
// When CredentialSource is nil, this is the canonical resource: it generates
// credentials and runs on the first node. When CredentialSource is non-nil,
// this is a per-node resource: it reads credentials from the canonical resource
// and runs on its own node's primary.
type ServiceUserRole struct {
ServiceID string `json:"service_id"`
DatabaseID string `json:"database_id"`
DatabaseName string `json:"database_name"`
NodeName string `json:"node_name"` // Database node name for PrimaryExecutor routing
Mode string `json:"mode"` // ServiceUserRoleRO or ServiceUserRoleRW
Username string `json:"username"`
Password string `json:"password"` // Generated on Create, persisted in state
ServiceID string `json:"service_id"`
DatabaseID string `json:"database_id"`
DatabaseName string `json:"database_name"`
NodeName string `json:"node_name"` // Database node name for PrimaryExecutor routing
Mode string `json:"mode"` // ServiceUserRoleRO or ServiceUserRoleRW
Username string `json:"username"`
Password string `json:"password"` // Generated on Create, persisted in state
CredentialSource *resource.Identifier `json:"credential_source,omitempty"`
}

func (r *ServiceUserRole) ResourceVersion() string {
Expand All @@ -77,10 +82,14 @@ func (r *ServiceUserRole) DiffIgnore() []string {
"/mode",
"/username",
"/password",
"/credential_source",
}
}

func (r *ServiceUserRole) Identifier() resource.Identifier {
if r.CredentialSource != nil {
return ServiceUserRolePerNodeIdentifier(r.ServiceID, r.Mode, r.NodeName)
}
return ServiceUserRoleIdentifier(r.ServiceID, r.Mode)
}

Expand All @@ -89,8 +98,11 @@ func (r *ServiceUserRole) Executor() resource.Executor {
}

func (r *ServiceUserRole) Dependencies() []resource.Identifier {
// No dependencies - this resource can be created/deleted independently
return nil
nodeID := database.NodeResourceIdentifier(r.NodeName)
if r.CredentialSource != nil {
return []resource.Identifier{nodeID, *r.CredentialSource}
}
return []resource.Identifier{nodeID}
}

func (r *ServiceUserRole) TypeDependencies() []resource.Type {
Expand All @@ -117,13 +129,23 @@ func (r *ServiceUserRole) Create(ctx context.Context, rc *resource.Context) erro
Logger()
logger.Info().Msg("creating service user role")

// Generate deterministic username and random password
r.Username = database.GenerateServiceUsername(r.ServiceID, r.Mode)
password, err := utils.RandomString(32)
if err != nil {
return fmt.Errorf("failed to generate password: %w", err)
if r.CredentialSource != nil {
// Per-node resource: read credentials from the canonical resource in state.
canonical, err := resource.FromContext[*ServiceUserRole](rc, *r.CredentialSource)
if err != nil {
return fmt.Errorf("canonical service user role %s must be created before per-node role: %w", r.CredentialSource, err)
}
r.Username = canonical.Username
r.Password = canonical.Password
} else {
// Canonical resource: generate credentials.
r.Username = database.GenerateServiceUsername(r.ServiceID, r.Mode)
password, err := utils.RandomString(32)
if err != nil {
return fmt.Errorf("failed to generate password: %w", err)
}
r.Password = password
}
r.Password = password

if err := r.createUserRole(ctx, rc, logger); err != nil {
return fmt.Errorf("failed to create service user role: %w", err)
Expand All @@ -134,11 +156,13 @@ func (r *ServiceUserRole) Create(ctx context.Context, rc *resource.Context) erro
}

func (r *ServiceUserRole) createUserRole(ctx context.Context, rc *resource.Context, logger zerolog.Logger) error {
// Connect to the application database so schema-level grants resolve correctly.
// The role is created on the primary and Spock replicates it to other nodes.
conn, err := r.connectToPrimary(ctx, rc, logger, r.DatabaseName)
primary, err := database.GetPrimaryInstance(ctx, rc, r.NodeName)
if err != nil {
return err
return fmt.Errorf("failed to get primary instance: %w", err)
}
conn, err := primary.Connection(ctx, rc, "postgres")
if err != nil {
return fmt.Errorf("failed to connect to database postgres on node %s: %w", r.NodeName, err)
}
defer conn.Close(ctx)

Expand All @@ -156,7 +180,6 @@ func (r *ServiceUserRole) createUserRole(ctx context.Context, rc *resource.Conte
statements, err := postgres.CreateUserRole(postgres.UserRoleOptions{
Name: r.Username,
Password: r.Password,
DBName: r.DatabaseName,
DBOwner: false,
Attributes: []string{"LOGIN"},
Roles: []string{groupRole},
Expand Down Expand Up @@ -189,10 +212,15 @@ func (r *ServiceUserRole) Delete(ctx context.Context, rc *resource.Context) erro
Logger()
logger.Info().Msg("deleting service user from database")

conn, err := r.connectToPrimary(ctx, rc, logger, "postgres")
primary, err := database.GetPrimaryInstance(ctx, rc, r.NodeName)
if err != nil {
// During deletion, connection failures are non-fatal — the database
// may already be gone or unreachable.
logger.Warn().Err(err).Msg("failed to get primary instance, skipping user deletion")
return nil
}
conn, err := primary.Connection(ctx, rc, "postgres")
if err != nil {
logger.Warn().Err(err).Msg("failed to connect to primary instance, skipping user deletion")
return nil
}
Expand All @@ -211,69 +239,3 @@ func (r *ServiceUserRole) Delete(ctx context.Context, rc *resource.Context) erro
logger.Info().Msg("service user deleted successfully")
return nil
}

// connectToPrimary finds the primary Postgres instance and returns an
// authenticated connection to it. The caller is responsible for closing
// the connection.
func (r *ServiceUserRole) connectToPrimary(ctx context.Context, rc *resource.Context, logger zerolog.Logger, dbName string) (*pgx.Conn, error) {
dbSvc, err := do.Invoke[*database.Service](rc.Injector)
if err != nil {
return nil, err
}

db, err := dbSvc.GetDatabase(ctx, r.DatabaseID)
if err != nil {
if errors.Is(err, database.ErrDatabaseNotFound) {
return nil, fmt.Errorf("database not found: %w", err)
}
return nil, fmt.Errorf("failed to get database: %w", err)
}

if len(db.Instances) == 0 {
return nil, fmt.Errorf("database has no instances")
}

// Find primary instance via Patroni
var primaryInstanceID string
for _, inst := range db.Instances {
connInfo, err := dbSvc.GetInstanceConnectionInfo(ctx, r.DatabaseID, inst.InstanceID)
if err != nil {
continue
}
patroniClient := patroni.NewClient(connInfo.PatroniURL(), nil)
primaryID, err := database.GetPrimaryInstanceID(ctx, patroniClient, 10*time.Second)
if err == nil && primaryID != "" {
primaryInstanceID = primaryID
break
}
}
if primaryInstanceID == "" {
primaryInstanceID = db.Instances[0].InstanceID
logger.Warn().Msg("could not determine primary instance, using first available instance")
}

connInfo, err := dbSvc.GetInstanceConnectionInfo(ctx, r.DatabaseID, primaryInstanceID)
if err != nil {
return nil, fmt.Errorf("failed to get instance connection info: %w", err)
}

certSvc, err := do.Invoke[*certificates.Service](rc.Injector)
if err != nil {
return nil, fmt.Errorf("failed to get certificate service: %w", err)
}

tlsConfig, err := certSvc.PostgresUserTLS(ctx, primaryInstanceID, connInfo.InstanceHostname, "pgedge")
if err != nil {
return nil, fmt.Errorf("failed to create TLS config: %w", err)
}

conn, err := database.ConnectToInstance(ctx, &database.ConnectionOptions{
DSN: connInfo.AdminDSN(dbName),
TLS: tlsConfig,
})
if err != nil {
return nil, fmt.Errorf("failed to connect to database: %w", err)
}

return conn, nil
}
Loading