diff --git a/server/internal/database/service_instance.go b/server/internal/database/service_instance.go index a5e8b0b2..8c849003 100644 --- a/server/internal/database/service_instance.go +++ b/server/internal/database/service_instance.go @@ -166,6 +166,7 @@ type ServiceInstanceSpec struct { DatabaseHosts []ServiceHostEntry // Ordered list of Postgres host:port entries TargetSessionAttrs string // libpq target_session_attrs value Port *int // Service instance published port (optional, 0 = random) + DatabaseNodes []*NodeInstances // All database nodes; used to create per-node ServiceUserRole resources } // storedToServiceInstance converts stored service instance and status to ServiceInstance. diff --git a/server/internal/orchestrator/swarm/orchestrator.go b/server/internal/orchestrator/swarm/orchestrator.go index 982b4d4b..82b9d1ff 100644 --- a/server/internal/orchestrator/swarm/orchestrator.go +++ b/server/internal/orchestrator/swarm/orchestrator.go @@ -440,6 +440,10 @@ func (o *Orchestrator) GenerateServiceInstanceResources(spec *database.ServiceIn Allocator: o.dbNetworkAllocator, } + // Canonical identifiers for the RO and RW service user roles. + canonicalROID := ServiceUserRoleIdentifier(spec.ServiceSpec.ServiceID, ServiceUserRoleRO) + canonicalRWID := ServiceUserRoleIdentifier(spec.ServiceSpec.ServiceID, ServiceUserRoleRW) + // Service user role resources (manages database user lifecycle). // Two roles are created per service: read-only and read-write. serviceUserRoleRO := &ServiceUserRole{ @@ -521,6 +525,32 @@ func (o *Orchestrator) GenerateServiceInstanceResources(spec *database.ServiceIn serviceInstance, } + // Append per-node ServiceUserRole resources for each additional database node. + // The canonical resources (above) cover the first node; nodes [1:] each get + // their own RO and RW role that sources credentials from the canonical. + if len(spec.DatabaseNodes) > 1 { + for _, nodeInst := range spec.DatabaseNodes[1:] { + orchestratorResources = append(orchestratorResources, + &ServiceUserRole{ + ServiceID: spec.ServiceSpec.ServiceID, + DatabaseID: spec.DatabaseID, + DatabaseName: spec.DatabaseName, + NodeName: nodeInst.NodeName, + Mode: ServiceUserRoleRO, + CredentialSource: &canonicalROID, + }, + &ServiceUserRole{ + ServiceID: spec.ServiceSpec.ServiceID, + DatabaseID: spec.DatabaseID, + DatabaseName: spec.DatabaseName, + NodeName: nodeInst.NodeName, + Mode: ServiceUserRoleRW, + CredentialSource: &canonicalRWID, + }, + ) + } + } + // Convert to resource data data := make([]*resource.ResourceData, len(orchestratorResources)) for i, res := range orchestratorResources { diff --git a/server/internal/orchestrator/swarm/service_user_role.go b/server/internal/orchestrator/swarm/service_user_role.go index 5ac29166..7724690b 100644 --- a/server/internal/orchestrator/swarm/service_user_role.go +++ b/server/internal/orchestrator/swarm/service_user_role.go @@ -2,18 +2,13 @@ package swarm import ( "context" - "errors" "fmt" "strings" - "time" - "github.com/jackc/pgx/v5" "github.com/rs/zerolog" "github.com/samber/do" - "github.com/pgEdge/control-plane/server/internal/certificates" "github.com/pgEdge/control-plane/server/internal/database" - "github.com/pgEdge/control-plane/server/internal/patroni" "github.com/pgEdge/control-plane/server/internal/postgres" "github.com/pgEdge/control-plane/server/internal/resource" "github.com/pgEdge/control-plane/server/internal/utils" @@ -41,6 +36,13 @@ func ServiceUserRoleIdentifier(serviceInstanceID string, mode string) resource.I } } +func ServiceUserRolePerNodeIdentifier(serviceID, mode, nodeName string) resource.Identifier { + return resource.Identifier{ + ID: serviceID + "-" + mode + "-" + nodeName, + Type: ResourceTypeServiceUserRole, + } +} + // ServiceUserRole manages the lifecycle of a database user for a service. // // Two ServiceUserRole resources are created per service: one with Mode set to @@ -55,16 +57,19 @@ func ServiceUserRoleIdentifier(serviceInstanceID string, mode string) resource.I // On subsequent reconciliation cycles, credentials are reused from the persisted // state (no password regeneration). // -// The role is created on the primary instance and Spock replicates it to all -// other nodes automatically. +// When CredentialSource is nil, this is the canonical resource: it generates +// credentials and runs on the first node. When CredentialSource is non-nil, +// this is a per-node resource: it reads credentials from the canonical resource +// and runs on its own node's primary. type ServiceUserRole struct { - ServiceID string `json:"service_id"` - DatabaseID string `json:"database_id"` - DatabaseName string `json:"database_name"` - NodeName string `json:"node_name"` // Database node name for PrimaryExecutor routing - Mode string `json:"mode"` // ServiceUserRoleRO or ServiceUserRoleRW - Username string `json:"username"` - Password string `json:"password"` // Generated on Create, persisted in state + ServiceID string `json:"service_id"` + DatabaseID string `json:"database_id"` + DatabaseName string `json:"database_name"` + NodeName string `json:"node_name"` // Database node name for PrimaryExecutor routing + Mode string `json:"mode"` // ServiceUserRoleRO or ServiceUserRoleRW + Username string `json:"username"` + Password string `json:"password"` // Generated on Create, persisted in state + CredentialSource *resource.Identifier `json:"credential_source,omitempty"` } func (r *ServiceUserRole) ResourceVersion() string { @@ -77,10 +82,14 @@ func (r *ServiceUserRole) DiffIgnore() []string { "/mode", "/username", "/password", + "/credential_source", } } func (r *ServiceUserRole) Identifier() resource.Identifier { + if r.CredentialSource != nil { + return ServiceUserRolePerNodeIdentifier(r.ServiceID, r.Mode, r.NodeName) + } return ServiceUserRoleIdentifier(r.ServiceID, r.Mode) } @@ -89,8 +98,11 @@ func (r *ServiceUserRole) Executor() resource.Executor { } func (r *ServiceUserRole) Dependencies() []resource.Identifier { - // No dependencies - this resource can be created/deleted independently - return nil + nodeID := database.NodeResourceIdentifier(r.NodeName) + if r.CredentialSource != nil { + return []resource.Identifier{nodeID, *r.CredentialSource} + } + return []resource.Identifier{nodeID} } func (r *ServiceUserRole) TypeDependencies() []resource.Type { @@ -117,13 +129,23 @@ func (r *ServiceUserRole) Create(ctx context.Context, rc *resource.Context) erro Logger() logger.Info().Msg("creating service user role") - // Generate deterministic username and random password - r.Username = database.GenerateServiceUsername(r.ServiceID, r.Mode) - password, err := utils.RandomString(32) - if err != nil { - return fmt.Errorf("failed to generate password: %w", err) + if r.CredentialSource != nil { + // Per-node resource: read credentials from the canonical resource in state. + canonical, err := resource.FromContext[*ServiceUserRole](rc, *r.CredentialSource) + if err != nil { + return fmt.Errorf("canonical service user role %s must be created before per-node role: %w", r.CredentialSource, err) + } + r.Username = canonical.Username + r.Password = canonical.Password + } else { + // Canonical resource: generate credentials. + r.Username = database.GenerateServiceUsername(r.ServiceID, r.Mode) + password, err := utils.RandomString(32) + if err != nil { + return fmt.Errorf("failed to generate password: %w", err) + } + r.Password = password } - r.Password = password if err := r.createUserRole(ctx, rc, logger); err != nil { return fmt.Errorf("failed to create service user role: %w", err) @@ -134,11 +156,13 @@ func (r *ServiceUserRole) Create(ctx context.Context, rc *resource.Context) erro } func (r *ServiceUserRole) createUserRole(ctx context.Context, rc *resource.Context, logger zerolog.Logger) error { - // Connect to the application database so schema-level grants resolve correctly. - // The role is created on the primary and Spock replicates it to other nodes. - conn, err := r.connectToPrimary(ctx, rc, logger, r.DatabaseName) + primary, err := database.GetPrimaryInstance(ctx, rc, r.NodeName) if err != nil { - return err + return fmt.Errorf("failed to get primary instance: %w", err) + } + conn, err := primary.Connection(ctx, rc, "postgres") + if err != nil { + return fmt.Errorf("failed to connect to database postgres on node %s: %w", r.NodeName, err) } defer conn.Close(ctx) @@ -156,7 +180,6 @@ func (r *ServiceUserRole) createUserRole(ctx context.Context, rc *resource.Conte statements, err := postgres.CreateUserRole(postgres.UserRoleOptions{ Name: r.Username, Password: r.Password, - DBName: r.DatabaseName, DBOwner: false, Attributes: []string{"LOGIN"}, Roles: []string{groupRole}, @@ -189,10 +212,15 @@ func (r *ServiceUserRole) Delete(ctx context.Context, rc *resource.Context) erro Logger() logger.Info().Msg("deleting service user from database") - conn, err := r.connectToPrimary(ctx, rc, logger, "postgres") + primary, err := database.GetPrimaryInstance(ctx, rc, r.NodeName) if err != nil { // During deletion, connection failures are non-fatal — the database // may already be gone or unreachable. + logger.Warn().Err(err).Msg("failed to get primary instance, skipping user deletion") + return nil + } + conn, err := primary.Connection(ctx, rc, "postgres") + if err != nil { logger.Warn().Err(err).Msg("failed to connect to primary instance, skipping user deletion") return nil } @@ -211,69 +239,3 @@ func (r *ServiceUserRole) Delete(ctx context.Context, rc *resource.Context) erro logger.Info().Msg("service user deleted successfully") return nil } - -// connectToPrimary finds the primary Postgres instance and returns an -// authenticated connection to it. The caller is responsible for closing -// the connection. -func (r *ServiceUserRole) connectToPrimary(ctx context.Context, rc *resource.Context, logger zerolog.Logger, dbName string) (*pgx.Conn, error) { - dbSvc, err := do.Invoke[*database.Service](rc.Injector) - if err != nil { - return nil, err - } - - db, err := dbSvc.GetDatabase(ctx, r.DatabaseID) - if err != nil { - if errors.Is(err, database.ErrDatabaseNotFound) { - return nil, fmt.Errorf("database not found: %w", err) - } - return nil, fmt.Errorf("failed to get database: %w", err) - } - - if len(db.Instances) == 0 { - return nil, fmt.Errorf("database has no instances") - } - - // Find primary instance via Patroni - var primaryInstanceID string - for _, inst := range db.Instances { - connInfo, err := dbSvc.GetInstanceConnectionInfo(ctx, r.DatabaseID, inst.InstanceID) - if err != nil { - continue - } - patroniClient := patroni.NewClient(connInfo.PatroniURL(), nil) - primaryID, err := database.GetPrimaryInstanceID(ctx, patroniClient, 10*time.Second) - if err == nil && primaryID != "" { - primaryInstanceID = primaryID - break - } - } - if primaryInstanceID == "" { - primaryInstanceID = db.Instances[0].InstanceID - logger.Warn().Msg("could not determine primary instance, using first available instance") - } - - connInfo, err := dbSvc.GetInstanceConnectionInfo(ctx, r.DatabaseID, primaryInstanceID) - if err != nil { - return nil, fmt.Errorf("failed to get instance connection info: %w", err) - } - - certSvc, err := do.Invoke[*certificates.Service](rc.Injector) - if err != nil { - return nil, fmt.Errorf("failed to get certificate service: %w", err) - } - - tlsConfig, err := certSvc.PostgresUserTLS(ctx, primaryInstanceID, connInfo.InstanceHostname, "pgedge") - if err != nil { - return nil, fmt.Errorf("failed to create TLS config: %w", err) - } - - conn, err := database.ConnectToInstance(ctx, &database.ConnectionOptions{ - DSN: connInfo.AdminDSN(dbName), - TLS: tlsConfig, - }) - if err != nil { - return nil, fmt.Errorf("failed to connect to database: %w", err) - } - - return conn, nil -} diff --git a/server/internal/orchestrator/swarm/service_user_role_test.go b/server/internal/orchestrator/swarm/service_user_role_test.go new file mode 100644 index 00000000..ab23f0c1 --- /dev/null +++ b/server/internal/orchestrator/swarm/service_user_role_test.go @@ -0,0 +1,211 @@ +package swarm + +import ( + "fmt" + "testing" + + "github.com/pgEdge/control-plane/server/internal/database" + "github.com/pgEdge/control-plane/server/internal/resource" +) + +func TestServiceUserRoleIdentifier(t *testing.T) { + t.Run("canonical resource uses service+mode identifier", func(t *testing.T) { + r := &ServiceUserRole{ + ServiceID: "svc-abc", + Mode: ServiceUserRoleRO, + } + got := r.Identifier() + want := ServiceUserRoleIdentifier("svc-abc", ServiceUserRoleRO) + if got != want { + t.Errorf("Identifier() = %v, want %v", got, want) + } + }) + + t.Run("per-node resource uses service+mode+node identifier", func(t *testing.T) { + canonicalID := ServiceUserRoleIdentifier("svc-abc", ServiceUserRoleRW) + r := &ServiceUserRole{ + ServiceID: "svc-abc", + Mode: ServiceUserRoleRW, + NodeName: "n2", + CredentialSource: &canonicalID, + } + got := r.Identifier() + want := ServiceUserRolePerNodeIdentifier("svc-abc", ServiceUserRoleRW, "n2") + if got != want { + t.Errorf("Identifier() = %v, want %v", got, want) + } + }) + + t.Run("canonical and per-node identifiers are distinct", func(t *testing.T) { + canonical := ServiceUserRoleIdentifier("svc-abc", ServiceUserRoleRO) + perNode := ServiceUserRolePerNodeIdentifier("svc-abc", ServiceUserRoleRO, "n1") + if canonical == perNode { + t.Errorf("canonical and per-node identifiers should differ, both = %v", canonical) + } + }) +} + +func TestServiceUserRolePerNodeIdentifier(t *testing.T) { + t.Run("format is service-mode-node", func(t *testing.T) { + got := ServiceUserRolePerNodeIdentifier("svc-abc", "ro", "n2") + want := resource.Identifier{ + ID: "svc-abc-ro-n2", + Type: ResourceTypeServiceUserRole, + } + if got != want { + t.Errorf("ServiceUserRolePerNodeIdentifier() = %v, want %v", got, want) + } + }) + + t.Run("different nodes produce different identifiers", func(t *testing.T) { + id1 := ServiceUserRolePerNodeIdentifier("svc-abc", "ro", "n1") + id2 := ServiceUserRolePerNodeIdentifier("svc-abc", "ro", "n2") + if id1 == id2 { + t.Errorf("different nodes should produce different identifiers, both = %v", id1) + } + }) +} + +func TestServiceUserRoleDependencies(t *testing.T) { + t.Run("canonical resource depends only on its node", func(t *testing.T) { + r := &ServiceUserRole{ + ServiceID: "svc-abc", + NodeName: "n1", + Mode: ServiceUserRoleRO, + } + deps := r.Dependencies() + nodeID := database.NodeResourceIdentifier("n1") + if len(deps) != 1 { + t.Fatalf("canonical resource Dependencies() = %v, want 1 dependency", deps) + } + if deps[0] != nodeID { + t.Errorf("canonical resource dependency = %v, want %v", deps[0], nodeID) + } + }) + + t.Run("per-node resource depends on node and canonical", func(t *testing.T) { + canonicalID := ServiceUserRoleIdentifier("svc-abc", ServiceUserRoleRO) + r := &ServiceUserRole{ + ServiceID: "svc-abc", + Mode: ServiceUserRoleRO, + NodeName: "n2", + CredentialSource: &canonicalID, + } + deps := r.Dependencies() + if len(deps) != 2 { + t.Fatalf("per-node resource Dependencies() = %v, want 2 dependencies", deps) + } + nodeID := database.NodeResourceIdentifier("n2") + if deps[0] != nodeID { + t.Errorf("per-node resource deps[0] = %v, want node %v", deps[0], nodeID) + } + if deps[1] != canonicalID { + t.Errorf("per-node resource deps[1] = %v, want canonical %v", deps[1], canonicalID) + } + }) +} + +// buildServiceUserRoles replicates the orchestrator's per-node resource +// construction logic so it can be tested without a full Orchestrator instance. +func buildServiceUserRoles(serviceID, databaseID, databaseName, firstNodeName string, extraNodeNames []string) []*ServiceUserRole { + canonicalROID := ServiceUserRoleIdentifier(serviceID, ServiceUserRoleRO) + canonicalRWID := ServiceUserRoleIdentifier(serviceID, ServiceUserRoleRW) + + roles := []*ServiceUserRole{ + {ServiceID: serviceID, DatabaseID: databaseID, DatabaseName: databaseName, NodeName: firstNodeName, Mode: ServiceUserRoleRO}, + {ServiceID: serviceID, DatabaseID: databaseID, DatabaseName: databaseName, NodeName: firstNodeName, Mode: ServiceUserRoleRW}, + } + for _, nodeName := range extraNodeNames { + roles = append(roles, + &ServiceUserRole{ServiceID: serviceID, DatabaseID: databaseID, DatabaseName: databaseName, NodeName: nodeName, Mode: ServiceUserRoleRO, CredentialSource: &canonicalROID}, + &ServiceUserRole{ServiceID: serviceID, DatabaseID: databaseID, DatabaseName: databaseName, NodeName: nodeName, Mode: ServiceUserRoleRW, CredentialSource: &canonicalRWID}, + ) + } + return roles +} + +func TestServiceUserRolePerNodeProvisioning(t *testing.T) { + tests := []struct { + name string + extraNodes []string + wantTotal int + wantCanonical int + wantPerNode int + }{ + {"single node", nil, 2, 2, 0}, + {"two nodes", []string{"n2"}, 4, 2, 2}, + {"three nodes", []string{"n2", "n3"}, 6, 2, 4}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + roles := buildServiceUserRoles("svc-abc", "db-abc", "appdb", "n1", tt.extraNodes) + + if len(roles) != tt.wantTotal { + t.Errorf("got %d ServiceUserRole resources, want %d", len(roles), tt.wantTotal) + } + + var canonicals, perNodes []*ServiceUserRole + for _, r := range roles { + if r.CredentialSource == nil { + canonicals = append(canonicals, r) + } else { + perNodes = append(perNodes, r) + } + } + + if len(canonicals) != tt.wantCanonical { + t.Errorf("got %d canonical resources, want %d", len(canonicals), tt.wantCanonical) + } + if len(perNodes) != tt.wantPerNode { + t.Errorf("got %d per-node resources, want %d", len(perNodes), tt.wantPerNode) + } + + // Verify per-node resources reference the correct canonical identifier. + canonicalROID := ServiceUserRoleIdentifier("svc-abc", ServiceUserRoleRO) + canonicalRWID := ServiceUserRoleIdentifier("svc-abc", ServiceUserRoleRW) + for _, pn := range perNodes { + var wantSource resource.Identifier + switch pn.Mode { + case ServiceUserRoleRO: + wantSource = canonicalROID + case ServiceUserRoleRW: + wantSource = canonicalRWID + default: + t.Errorf("unexpected mode %q", pn.Mode) + continue + } + if *pn.CredentialSource != wantSource { + t.Errorf("per-node %s CredentialSource = %v, want %v", pn.Mode, *pn.CredentialSource, wantSource) + } + } + + // Verify per-node resources are routed to the correct node. + for i, nodeName := range tt.extraNodes { + roRole := perNodes[i*2] + rwRole := perNodes[i*2+1] + if roRole.NodeName != nodeName { + t.Errorf("per-node RO role[%d].NodeName = %q, want %q", i, roRole.NodeName, nodeName) + } + if rwRole.NodeName != nodeName { + t.Errorf("per-node RW role[%d].NodeName = %q, want %q", i, rwRole.NodeName, nodeName) + } + } + }) + } +} + +func TestServiceUserRolePerNodeIdentifierUniqueness(t *testing.T) { + // All identifiers across a 3-node cluster must be unique. + nodes := []string{"n1", "n2", "n3"} + roles := buildServiceUserRoles("svc-abc", "db-abc", "appdb", nodes[0], nodes[1:]) + + seen := make(map[resource.Identifier]string) + for i, r := range roles { + id := r.Identifier() + if prev, exists := seen[id]; exists { + t.Errorf("duplicate identifier %v: role[%d] and %s", id, i, prev) + } + seen[id] = fmt.Sprintf("role[%d] node=%s mode=%s", i, r.NodeName, r.Mode) + } +} diff --git a/server/internal/workflows/plan_update.go b/server/internal/workflows/plan_update.go index e1c38fce..6a4c5933 100644 --- a/server/internal/workflows/plan_update.go +++ b/server/internal/workflows/plan_update.go @@ -57,7 +57,7 @@ func (w *Workflows) PlanUpdate(ctx workflow.Context, input *PlanUpdateInput) (*P } // Generate service instance resources. - // Pick any node name for ServiceUserRole PrimaryExecutor routing. + // Use first node as canonical node for ServiceUserRole credential generation. var nodeName string if len(nodeInstances) > 0 { nodeName = nodeInstances[0].NodeName @@ -131,6 +131,7 @@ func (w *Workflows) getServiceResources( DatabaseHosts: connInfo.Hosts, TargetSessionAttrs: connInfo.TargetSessionAttrs, Port: serviceSpec.Port, + DatabaseNodes: nodeInstances, // Credentials: nil — ServiceUserRole.Create() will generate them }