diff --git a/server/internal/orchestrator/swarm/orchestrator.go b/server/internal/orchestrator/swarm/orchestrator.go index 4bc10f84..0d1f0688 100644 --- a/server/internal/orchestrator/swarm/orchestrator.go +++ b/server/internal/orchestrator/swarm/orchestrator.go @@ -402,11 +402,17 @@ func (o *Orchestrator) GenerateInstanceRestoreResources(spec *database.InstanceS } func (o *Orchestrator) GenerateServiceInstanceResources(spec *database.ServiceInstanceSpec) (*database.ServiceInstanceResources, error) { - // Only MCP service instance generation is currently implemented. - if spec.ServiceSpec.ServiceType != "mcp" { + switch spec.ServiceSpec.ServiceType { + case "mcp": + return o.generateMCPInstanceResources(spec) + case "rag": + return o.generateRAGInstanceResources(spec) + default: return nil, fmt.Errorf("service type %q instance generation is not yet supported", spec.ServiceSpec.ServiceType) } +} +func (o *Orchestrator) generateMCPInstanceResources(spec *database.ServiceInstanceSpec) (*database.ServiceInstanceResources, error) { // Get service image based on service type and version serviceImage, err := o.serviceVersions.GetServiceImage(spec.ServiceSpec.ServiceType, spec.ServiceSpec.Version) if err != nil { @@ -519,7 +525,33 @@ func (o *Orchestrator) GenerateServiceInstanceResources(spec *database.ServiceIn serviceInstance, } - // Convert to resource data + return o.buildServiceInstanceResources(spec, orchestratorResources) +} + +func (o *Orchestrator) generateRAGInstanceResources(spec *database.ServiceInstanceSpec) (*database.ServiceInstanceResources, error) { + // RAG service user role (per-host, not replicated by Spock) + ragUserRole := &RAGServiceUserRole{ + ServiceID: spec.ServiceSpec.ServiceID, + DatabaseID: spec.DatabaseID, + DatabaseName: spec.DatabaseName, + NodeName: spec.NodeName, + } + if spec.Credentials != nil { + ragUserRole.Username = spec.Credentials.Username + ragUserRole.Password = spec.Credentials.Password + } + + // Resource chain: RAGServiceUserRole (container deployment in future PRs) + orchestratorResources := []resource.Resource{ + ragUserRole, + } + + return o.buildServiceInstanceResources(spec, orchestratorResources) +} + +// buildServiceInstanceResources converts a slice of resources into a +// ServiceInstanceResources, shared by all service type generators. +func (o *Orchestrator) buildServiceInstanceResources(spec *database.ServiceInstanceSpec, orchestratorResources []resource.Resource) (*database.ServiceInstanceResources, error) { data := make([]*resource.ResourceData, len(orchestratorResources)) for i, res := range orchestratorResources { d, err := resource.ToResourceData(res) diff --git a/server/internal/orchestrator/swarm/rag_service_user_role.go b/server/internal/orchestrator/swarm/rag_service_user_role.go new file mode 100644 index 00000000..914c3672 --- /dev/null +++ b/server/internal/orchestrator/swarm/rag_service_user_role.go @@ -0,0 +1,199 @@ +package swarm + +import ( + "context" + "fmt" + + "github.com/rs/zerolog" + "github.com/samber/do" + + "github.com/pgEdge/control-plane/server/internal/database" + "github.com/pgEdge/control-plane/server/internal/postgres" + "github.com/pgEdge/control-plane/server/internal/resource" + "github.com/pgEdge/control-plane/server/internal/utils" +) + +var _ resource.Resource = (*RAGServiceUserRole)(nil) + +const ResourceTypeRAGServiceUserRole resource.Type = "swarm.rag_service_user_role" + +func RAGServiceUserRoleIdentifier(serviceID string) resource.Identifier { + return resource.Identifier{ + ID: serviceID, + Type: ResourceTypeRAGServiceUserRole, + } +} + +// RAGServiceUserRole manages the Postgres role for a RAG service. +// The role is created on the primary of the co-located Postgres instance +// and granted the pgedge_application_read_only built-in role. +// Spock replicates the role to every other node because we connect via r.DatabaseName. +type RAGServiceUserRole struct { + ServiceID string `json:"service_id"` + DatabaseID string `json:"database_id"` + DatabaseName string `json:"database_name"` + NodeName string `json:"node_name"` // Database node name for PrimaryExecutor routing + Username string `json:"username"` + Password string `json:"password"` // Generated on Create, persisted in state +} + +func (r *RAGServiceUserRole) ResourceVersion() string { + return "1" +} + +func (r *RAGServiceUserRole) DiffIgnore() []string { + return []string{ + "/node_name", + "/username", + "/password", + } +} + +func (r *RAGServiceUserRole) Identifier() resource.Identifier { + return RAGServiceUserRoleIdentifier(r.ServiceID) +} + +func (r *RAGServiceUserRole) Executor() resource.Executor { + return resource.PrimaryExecutor(r.NodeName) +} + +func (r *RAGServiceUserRole) Dependencies() []resource.Identifier { + return nil +} + +func (r *RAGServiceUserRole) TypeDependencies() []resource.Type { + return nil +} + +func (r *RAGServiceUserRole) Refresh(ctx context.Context, rc *resource.Context) error { + if r.Username == "" || r.Password == "" { + return resource.ErrNotFound + } + + logger, err := do.Invoke[zerolog.Logger](rc.Injector) + if err != nil { + return err + } + logger = logger.With(). + Str("service_id", r.ServiceID). + Str("database_id", r.DatabaseID). + Logger() + + primary, err := database.GetPrimaryInstance(ctx, rc, r.NodeName) + if err != nil { + return fmt.Errorf("failed to get primary instance: %w", err) + } + conn, err := primary.Connection(ctx, rc, r.DatabaseName) + if err != nil { + return fmt.Errorf("failed to connect to database: %w", err) + } + defer conn.Close(ctx) + + needsCreate, err := postgres.UserRoleNeedsCreate(r.Username).Scalar(ctx, conn) + if err != nil { + logger.Warn().Err(err).Msg("pg_roles query failed") + return fmt.Errorf("pg_roles query failed: %w", err) + } + if needsCreate { + return resource.ErrNotFound + } + return nil +} + +func (r *RAGServiceUserRole) Create(ctx context.Context, rc *resource.Context) error { + logger, err := do.Invoke[zerolog.Logger](rc.Injector) + if err != nil { + return err + } + logger = logger.With(). + Str("service_id", r.ServiceID). + Str("database_id", r.DatabaseID). + Logger() + logger.Info().Msg("creating RAG service user role") + + r.Username = database.GenerateServiceUsername(r.ServiceID) + if r.Password == "" { + password, err := utils.RandomString(32) + if err != nil { + return fmt.Errorf("failed to generate password: %w", err) + } + r.Password = password + } + + if err := r.createRole(ctx, rc); err != nil { + return fmt.Errorf("failed to create RAG service user role: %w", err) + } + + logger.Info().Str("username", r.Username).Msg("RAG service user role created successfully") + return nil +} + +func (r *RAGServiceUserRole) createRole(ctx context.Context, rc *resource.Context) error { + primary, err := database.GetPrimaryInstance(ctx, rc, r.NodeName) + if err != nil { + return fmt.Errorf("failed to get primary instance: %w", err) + } + conn, err := primary.Connection(ctx, rc, r.DatabaseName) + if err != nil { + return fmt.Errorf("failed to connect to database: %w", err) + } + defer conn.Close(ctx) + + statements, err := postgres.CreateUserRole(postgres.UserRoleOptions{ + Name: r.Username, + Password: r.Password, + DBName: r.DatabaseName, + DBOwner: false, + Attributes: []string{"LOGIN"}, + Roles: []string{"pgedge_application_read_only"}, + }) + if err != nil { + return fmt.Errorf("failed to generate create user role statements: %w", err) + } + + if err := statements.Exec(ctx, conn); err != nil { + return fmt.Errorf("failed to create RAG service user: %w", err) + } + + return nil +} + +func (r *RAGServiceUserRole) Update(ctx context.Context, rc *resource.Context) error { + return nil +} + +func (r *RAGServiceUserRole) Delete(ctx context.Context, rc *resource.Context) error { + logger, err := do.Invoke[zerolog.Logger](rc.Injector) + if err != nil { + return err + } + logger = logger.With(). + Str("service_id", r.ServiceID). + Str("database_id", r.DatabaseID). + Str("username", r.Username). + Logger() + logger.Info().Msg("deleting RAG service user from database") + + primary, err := database.GetPrimaryInstance(ctx, rc, r.NodeName) + if err != nil { + // During deletion the database may already be gone or unreachable. + logger.Warn().Err(err).Msg("failed to get primary instance, skipping RAG user deletion") + return nil + } + conn, err := primary.Connection(ctx, rc, r.DatabaseName) + if err != nil { + // During deletion the database may already be gone or unreachable. + logger.Warn().Err(err).Msg("failed to connect to database, skipping RAG user deletion") + return nil + } + defer conn.Close(ctx) + + _, err = conn.Exec(ctx, fmt.Sprintf("DROP ROLE IF EXISTS %s", sanitizeIdentifier(r.Username))) + if err != nil { + logger.Warn().Err(err).Msg("failed to drop RAG user role, continuing anyway") + return nil + } + + logger.Info().Msg("RAG service user deleted successfully") + return nil +} diff --git a/server/internal/orchestrator/swarm/rag_service_user_role_test.go b/server/internal/orchestrator/swarm/rag_service_user_role_test.go new file mode 100644 index 00000000..40e8526f --- /dev/null +++ b/server/internal/orchestrator/swarm/rag_service_user_role_test.go @@ -0,0 +1,217 @@ +package swarm + +import ( + "context" + "testing" + + "github.com/pgEdge/control-plane/server/internal/database" + "github.com/pgEdge/control-plane/server/internal/resource" +) + +func TestRAGServiceUserRole_ResourceVersion(t *testing.T) { + r := &RAGServiceUserRole{} + if got := r.ResourceVersion(); got != "1" { + t.Errorf("ResourceVersion() = %q, want %q", got, "1") + } +} + +func TestRAGServiceUserRole_Identifier(t *testing.T) { + r := &RAGServiceUserRole{ServiceID: "rag"} + id := r.Identifier() + if id.ID != "rag" { + t.Errorf("Identifier().ID = %q, want %q", id.ID, "rag") + } + if id.Type != ResourceTypeRAGServiceUserRole { + t.Errorf("Identifier().Type = %q, want %q", id.Type, ResourceTypeRAGServiceUserRole) + } +} + +func TestRAGServiceUserRole_Executor(t *testing.T) { + r := &RAGServiceUserRole{NodeName: "n1"} + exec := r.Executor() + if exec != resource.PrimaryExecutor("n1") { + t.Errorf("Executor() = %v, want PrimaryExecutor(%q)", exec, "n1") + } +} + +func TestRAGServiceUserRole_DiffIgnore(t *testing.T) { + r := &RAGServiceUserRole{} + ignored := r.DiffIgnore() + want := map[string]bool{ + "/node_name": true, + "/username": true, + "/password": true, + } + if len(ignored) != len(want) { + t.Errorf("DiffIgnore() length = %d, want %d", len(ignored), len(want)) + } + for _, path := range ignored { + if !want[path] { + t.Errorf("unexpected path in DiffIgnore(): %q", path) + } + } +} + +func TestRAGServiceUserRole_RefreshEmptyCredentials(t *testing.T) { + tests := []struct { + name string + username string + password string + }{ + {"empty username", "", "somepassword"}, + {"empty password", "svc_inst", ""}, + {"both empty", "", ""}, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + r := &RAGServiceUserRole{ + ServiceID: "rag", + Username: tt.username, + Password: tt.password, + } + // Refresh with nil rc — the empty-credential guard fires before any + // injection call, so no injector is needed. + err := r.Refresh(context.Background(), nil) + if err != resource.ErrNotFound { + t.Errorf("Refresh() = %v, want ErrNotFound", err) + } + }) + } +} + +func TestRAGServiceUserRoleIdentifier(t *testing.T) { + id := RAGServiceUserRoleIdentifier("my-instance") + if id.ID != "my-instance" { + t.Errorf("ID = %q, want %q", id.ID, "my-instance") + } + if id.Type != ResourceTypeRAGServiceUserRole { + t.Errorf("Type = %q, want %q", id.Type, ResourceTypeRAGServiceUserRole) + } +} + +func TestGenerateRAGInstanceResources_ResourceList(t *testing.T) { + o := &Orchestrator{} + spec := &database.ServiceInstanceSpec{ + ServiceInstanceID: "storefront-rag-host1", + ServiceSpec: &database.ServiceSpec{ + ServiceID: "rag", + ServiceType: "rag", + Version: "latest", + }, + DatabaseID: "storefront", + DatabaseName: "storefront", + HostID: "host-1", + NodeName: "n1", + } + + result, err := o.generateRAGInstanceResources(spec) + if err != nil { + t.Fatalf("generateRAGInstanceResources() error = %v", err) + } + + if result.ServiceInstance == nil { + t.Fatal("ServiceInstance is nil") + } + if result.ServiceInstance.ServiceInstanceID != spec.ServiceInstanceID { + t.Errorf("ServiceInstance.ServiceInstanceID = %q, want %q", + result.ServiceInstance.ServiceInstanceID, spec.ServiceInstanceID) + } + if result.ServiceInstance.HostID != spec.HostID { + t.Errorf("ServiceInstance.HostID = %q, want %q", + result.ServiceInstance.HostID, spec.HostID) + } + if result.ServiceInstance.State != database.ServiceInstanceStateCreating { + t.Errorf("ServiceInstance.State = %q, want %q", + result.ServiceInstance.State, database.ServiceInstanceStateCreating) + } + + if len(result.Resources) != 1 { + t.Fatalf("len(Resources) = %d, want 1", len(result.Resources)) + } + if result.Resources[0].Identifier.Type != ResourceTypeRAGServiceUserRole { + t.Errorf("Resources[0].Identifier.Type = %q, want %q", + result.Resources[0].Identifier.Type, ResourceTypeRAGServiceUserRole) + } +} + +func TestGenerateRAGInstanceResources_WithCredentials(t *testing.T) { + o := &Orchestrator{} + spec := &database.ServiceInstanceSpec{ + ServiceInstanceID: "storefront-rag-host1", + ServiceSpec: &database.ServiceSpec{ + ServiceID: "rag", + ServiceType: "rag", + Version: "latest", + }, + DatabaseID: "storefront", + DatabaseName: "storefront", + HostID: "host-1", + NodeName: "n1", + Credentials: &database.ServiceUser{ + Username: "svc_storefront_rag_host1", + Password: "supersecret", + }, + } + + result, err := o.generateRAGInstanceResources(spec) + if err != nil { + t.Fatalf("generateRAGInstanceResources() error = %v", err) + } + + // Deserialise the first resource and verify credentials are populated. + role, err := resource.ToResource[*RAGServiceUserRole](result.Resources[0]) + if err != nil { + t.Fatalf("ToResource RAGServiceUserRole: %v", err) + } + if role.Username != spec.Credentials.Username { + t.Errorf("Username = %q, want %q", role.Username, spec.Credentials.Username) + } + if role.Password != spec.Credentials.Password { + t.Errorf("Password = %q, want %q", role.Password, spec.Credentials.Password) + } +} + +func TestGenerateServiceInstanceResources_RAGDispatch(t *testing.T) { + o := &Orchestrator{} + spec := &database.ServiceInstanceSpec{ + ServiceInstanceID: "db1-rag-host1", + ServiceSpec: &database.ServiceSpec{ + ServiceID: "rag", + ServiceType: "rag", + Version: "latest", + }, + DatabaseID: "db1", + DatabaseName: "db1", + HostID: "host-1", + NodeName: "n1", + } + + result, err := o.GenerateServiceInstanceResources(spec) + if err != nil { + t.Fatalf("GenerateServiceInstanceResources() error = %v", err) + } + if result == nil { + t.Fatal("result is nil") + } +} + +func TestGenerateServiceInstanceResources_UnknownTypeReturnsError(t *testing.T) { + o := &Orchestrator{} + spec := &database.ServiceInstanceSpec{ + ServiceInstanceID: "db1-unknown-host1", + ServiceSpec: &database.ServiceSpec{ + ServiceID: "unknown", + ServiceType: "unknown", + Version: "latest", + }, + DatabaseID: "db1", + DatabaseName: "db1", + HostID: "host-1", + NodeName: "n1", + } + + _, err := o.GenerateServiceInstanceResources(spec) + if err == nil { + t.Fatal("expected error for unknown service type, got nil") + } +} diff --git a/server/internal/orchestrator/swarm/resources.go b/server/internal/orchestrator/swarm/resources.go index 4878137f..e4a63a61 100644 --- a/server/internal/orchestrator/swarm/resources.go +++ b/server/internal/orchestrator/swarm/resources.go @@ -21,4 +21,5 @@ func RegisterResourceTypes(registry *resource.Registry) { resource.RegisterResourceType[*Switchover](registry, ResourceTypeSwitchover) resource.RegisterResourceType[*ScaleService](registry, ResourceTypeScaleService) resource.RegisterResourceType[*MCPConfigResource](registry, ResourceTypeMCPConfig) + resource.RegisterResourceType[*RAGServiceUserRole](registry, ResourceTypeRAGServiceUserRole) } diff --git a/server/internal/postgres/roles.go b/server/internal/postgres/roles.go index 29225eed..97c6b799 100644 --- a/server/internal/postgres/roles.go +++ b/server/internal/postgres/roles.go @@ -11,6 +11,17 @@ import ( var defaultSchemas = []string{"public", "spock", "pg_catalog", "information_schema"} var builtinRoles = []string{"pgedge_application", "pgedge_application_read_only", "pgedge_superuser"} +// UserRoleNeedsCreate returns a query that evaluates to true when the named +// role does not yet exist in pg_catalog.pg_roles. +func UserRoleNeedsCreate(name string) Query[bool] { + return Query[bool]{ + SQL: "SELECT NOT EXISTS (SELECT 1 FROM pg_catalog.pg_roles WHERE rolname = @name);", + Args: pgx.NamedArgs{ + "name": name, + }, + } +} + type UserRoleOptions struct { Name string Password string diff --git a/server/internal/workflows/plan_update.go b/server/internal/workflows/plan_update.go index da6c702b..58bd9ea4 100644 --- a/server/internal/workflows/plan_update.go +++ b/server/internal/workflows/plan_update.go @@ -129,15 +129,22 @@ func (w *Workflows) getServiceResources( return nil, err } - return &operations.ServiceResources{ + svcResources := &operations.ServiceResources{ ServiceInstanceID: serviceInstanceID, Resources: generateOutput.Resources.Resources, - MonitorResource: &monitor.ServiceInstanceMonitorResource{ + } + // Only attach the monitor when the service deploys a Docker container + // (swarm.service_instance). Service types that provision infrastructure + // without a container (e.g. "rag" in its initial phase) must not set this + // dependency, as the planner requires all declared dependencies to exist. + if serviceSpec.ServiceType != "rag" { + svcResources.MonitorResource = &monitor.ServiceInstanceMonitorResource{ DatabaseID: spec.DatabaseID, ServiceInstanceID: serviceInstanceID, HostID: hostID, - }, - }, nil + } + } + return svcResources, nil } // findPostgresInstance resolves the Postgres hostname and port for a service