diff --git a/e2e/service_provisioning_test.go b/e2e/service_provisioning_test.go index fe711a68..094c6433 100644 --- a/e2e/service_provisioning_test.go +++ b/e2e/service_provisioning_test.go @@ -4,11 +4,12 @@ package e2e import ( "context" + "errors" "testing" "time" - "github.com/jackc/pgx/v5" controlplane "github.com/pgEdge/control-plane/api/apiv1/gen/control_plane" + "github.com/pgEdge/control-plane/client" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -315,11 +316,11 @@ func TestUpdateDatabaseAddService(t *testing.T) { t.Log("Add service to existing database test completed successfully") } -// TestProvisionMCPServiceUnsupportedVersion tests that database creation succeeds -// even when service provisioning fails due to an unsupported image version. +// TestProvisionMCPServiceUnsupportedVersion tests that database creation fails +// when service provisioning fails due to an unsupported image version. // Version "99.99.99" passes API validation (semver pattern) but is not registered -// in ServiceVersions, so GenerateServiceInstanceResources fails. The database -// should still become available and Postgres should be accessible. +// in ServiceVersions, so getServiceResources fails fatally. The workflow should +// fail and the database should be in a "failed" state. func TestProvisionMCPServiceUnsupportedVersion(t *testing.T) { t.Parallel() @@ -330,7 +331,7 @@ func TestProvisionMCPServiceUnsupportedVersion(t *testing.T) { t.Log("Creating database with MCP service using unsupported version") - db := fixture.NewDatabaseFixture(ctx, t, &controlplane.CreateDatabaseRequest{ + createResp, err := fixture.Client.CreateDatabase(ctx, &controlplane.CreateDatabaseRequest{ Spec: &controlplane.DatabaseSpec{ DatabaseName: "test_mcp_unsupported_ver", DatabaseUsers: []*controlplane.DatabaseUserSpec{ @@ -363,36 +364,65 @@ func TestProvisionMCPServiceUnsupportedVersion(t *testing.T) { }, }, }) + require.NoError(t, err, "CreateDatabase API call should succeed") + require.NotNil(t, createResp.Task, "CreateDatabase should return a task") + require.NotNil(t, createResp.Database, "CreateDatabase should return a database") + + dbID := createResp.Database.ID + + // Register cleanup to force-delete the database + t.Cleanup(func() { + cleanupCtx, cleanupCancel := context.WithTimeout(context.Background(), 2*time.Minute) + defer cleanupCancel() + + t.Logf("cleaning up database %s", dbID) + resp, err := fixture.Client.DeleteDatabase(cleanupCtx, &controlplane.DeleteDatabasePayload{ + DatabaseID: dbID, + Force: true, + }) + if err != nil { + if !errors.Is(err, client.ErrNotFound) { + t.Logf("failed to cleanup database %s: %s", dbID, err) + } + return + } + _, _ = fixture.Client.WaitForDatabaseTask(cleanupCtx, &controlplane.GetDatabaseTaskPayload{ + DatabaseID: dbID, + TaskID: resp.Task.TaskID, + }) + }) - t.Log("Database created, verifying database is available despite service failure") + t.Log("Waiting for creation task to complete") - // Database should be available even though service provisioning failed - assert.Equal(t, "available", db.State, "Database should be available despite service provisioning failure") + task, err := fixture.Client.WaitForDatabaseTask(ctx, &controlplane.GetDatabaseTaskPayload{ + DatabaseID: dbID, + TaskID: createResp.Task.TaskID, + }) + require.NoError(t, err, "WaitForDatabaseTask should not return a transport error") - // Verify Postgres instances exist and are accessible - require.NotEmpty(t, db.Instances, "Database should have at least one Postgres instance") + // The task should have failed due to the unsupported service version + assert.Equal(t, client.TaskStatusFailed, task.Status, "Task should have failed") + require.NotNil(t, task.Error, "Task should have an error message") + assert.Contains(t, *task.Error, "unsupported version", "Task error should mention unsupported version") - db.WithConnection(ctx, ConnectionOptions{ - Matcher: WithRole("primary"), - Username: "admin", - Password: "testpassword", - }, t, func(conn *pgx.Conn) { - var result int - row := conn.QueryRow(ctx, "SELECT 1") - require.NoError(t, row.Scan(&result)) - assert.Equal(t, 1, result) - t.Log("Postgres is accessible despite service provisioning failure") + t.Logf("Task failed as expected: %s", *task.Error) + + // Verify the database is in a "failed" state + db, err := fixture.Client.GetDatabase(ctx, &controlplane.GetDatabasePayload{ + DatabaseID: dbID, }) + require.NoError(t, err, "GetDatabase should succeed") + assert.Equal(t, "failed", db.State, "Database should be in failed state") t.Log("Unsupported version test completed successfully") } // TestProvisionMCPServiceRecovery tests that a failed service can be recovered // by updating the database with a corrected service version. The sequence is: -// 1. Create database with an unsupported service version (provisioning fails) -// 2. Verify database is available and Postgres is accessible +// 1. Create database with an unsupported service version (workflow fails, database goes to "failed") +// 2. Verify database is in "failed" state // 3. Update database with a corrected service version -// 4. Verify the service instance is created and transitions to running +// 4. Verify the database recovers to "available" and service instances are created and running func TestProvisionMCPServiceRecovery(t *testing.T) { t.Parallel() @@ -401,9 +431,11 @@ func TestProvisionMCPServiceRecovery(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 10*time.Minute) defer cancel() + // --- Step 1: Create database with unsupported service version (expect failure) --- + t.Log("Creating database with MCP service using unsupported version") - db := fixture.NewDatabaseFixture(ctx, t, &controlplane.CreateDatabaseRequest{ + createResp, err := fixture.Client.CreateDatabase(ctx, &controlplane.CreateDatabaseRequest{ Spec: &controlplane.DatabaseSpec{ DatabaseName: "test_mcp_recovery", DatabaseUsers: []*controlplane.DatabaseUserSpec{ @@ -425,7 +457,7 @@ func TestProvisionMCPServiceRecovery(t *testing.T) { { ServiceID: "mcp-server", ServiceType: "mcp", - Version: "99.99.99", // Unsupported version - service provisioning will fail + Version: "99.99.99", // Unsupported version - workflow will fail HostIds: []controlplane.Identifier{controlplane.Identifier(host1)}, Config: map[string]any{ "llm_provider": "anthropic", @@ -436,51 +468,114 @@ func TestProvisionMCPServiceRecovery(t *testing.T) { }, }, }) + require.NoError(t, err, "CreateDatabase API call should succeed") + require.NotNil(t, createResp.Task, "CreateDatabase should return a task") + require.NotNil(t, createResp.Database, "CreateDatabase should return a database") + + dbID := createResp.Database.ID + + // Register cleanup to force-delete the database + t.Cleanup(func() { + cleanupCtx, cleanupCancel := context.WithTimeout(context.Background(), 2*time.Minute) + defer cleanupCancel() + + t.Logf("cleaning up database %s", dbID) + resp, err := fixture.Client.DeleteDatabase(cleanupCtx, &controlplane.DeleteDatabasePayload{ + DatabaseID: dbID, + Force: true, + }) + if err != nil { + if !errors.Is(err, client.ErrNotFound) { + t.Logf("failed to cleanup database %s: %s", dbID, err) + } + return + } + _, _ = fixture.Client.WaitForDatabaseTask(cleanupCtx, &controlplane.GetDatabaseTaskPayload{ + DatabaseID: dbID, + TaskID: resp.Task.TaskID, + }) + }) - // Database should be available despite service failure - assert.Equal(t, "available", db.State, "Database should be available despite service provisioning failure") - t.Log("Database available, now updating with corrected service version") + t.Log("Waiting for creation task to complete (expecting failure)") - // Update database with corrected service version - err := db.Update(ctx, UpdateOptions{ - Spec: &controlplane.DatabaseSpec{ - DatabaseName: "test_mcp_recovery", - DatabaseUsers: []*controlplane.DatabaseUserSpec{ - { - Username: "admin", - Password: pointerTo("testpassword"), - DbOwner: pointerTo(true), - Attributes: []string{"LOGIN", "SUPERUSER"}, + task, err := fixture.Client.WaitForDatabaseTask(ctx, &controlplane.GetDatabaseTaskPayload{ + DatabaseID: dbID, + TaskID: createResp.Task.TaskID, + }) + require.NoError(t, err, "WaitForDatabaseTask should not return a transport error") + + // The task should have failed due to the unsupported service version + assert.Equal(t, client.TaskStatusFailed, task.Status, "Task should have failed") + require.NotNil(t, task.Error, "Task should have an error message") + t.Logf("Task failed as expected: %s", *task.Error) + + // Verify the database is in a "failed" state + db, err := fixture.Client.GetDatabase(ctx, &controlplane.GetDatabasePayload{ + DatabaseID: dbID, + }) + require.NoError(t, err, "GetDatabase should succeed") + assert.Equal(t, "failed", db.State, "Database should be in failed state after unsupported version") + + // --- Step 2: Update database with corrected service version (expect recovery) --- + + t.Log("Updating database with corrected service version") + + updateResp, err := fixture.Client.UpdateDatabase(ctx, &controlplane.UpdateDatabasePayload{ + DatabaseID: dbID, + Request: &controlplane.UpdateDatabaseRequest{ + Spec: &controlplane.DatabaseSpec{ + DatabaseName: "test_mcp_recovery", + DatabaseUsers: []*controlplane.DatabaseUserSpec{ + { + Username: "admin", + Password: pointerTo("testpassword"), + DbOwner: pointerTo(true), + Attributes: []string{"LOGIN", "SUPERUSER"}, + }, }, - }, - Port: pointerTo(0), - Nodes: []*controlplane.DatabaseNodeSpec{ - { - Name: "n1", - HostIds: []controlplane.Identifier{controlplane.Identifier(host1)}, + Port: pointerTo(0), + Nodes: []*controlplane.DatabaseNodeSpec{ + { + Name: "n1", + HostIds: []controlplane.Identifier{controlplane.Identifier(host1)}, + }, }, - }, - Services: []*controlplane.ServiceSpec{ - { - ServiceID: "mcp-server", - ServiceType: "mcp", - Version: "latest", // Corrected version - HostIds: []controlplane.Identifier{controlplane.Identifier(host1)}, - Config: map[string]any{ - "llm_provider": "anthropic", - "llm_model": "claude-sonnet-4-5", - "anthropic_api_key": "sk-ant-test-key-12345", + Services: []*controlplane.ServiceSpec{ + { + ServiceID: "mcp-server", + ServiceType: "mcp", + Version: "latest", // Corrected version + HostIds: []controlplane.Identifier{controlplane.Identifier(host1)}, + Config: map[string]any{ + "llm_provider": "anthropic", + "llm_model": "claude-sonnet-4-5", + "anthropic_api_key": "sk-ant-test-key-12345", + }, }, }, }, }, }) - require.NoError(t, err, "Failed to update database with corrected service version") + require.NoError(t, err, "UpdateDatabase API call should succeed") + require.NotNil(t, updateResp.Task, "UpdateDatabase should return a task") + + t.Log("Waiting for update task to complete") + + updateTask, err := fixture.Client.WaitForDatabaseTask(ctx, &controlplane.GetDatabaseTaskPayload{ + DatabaseID: dbID, + TaskID: updateResp.Task.TaskID, + }) + require.NoError(t, err, "WaitForDatabaseTask should not return a transport error") + require.Equal(t, client.TaskStatusCompleted, updateTask.Status, "Update task should have completed successfully") - t.Log("Database updated, verifying service instance recovered") + t.Log("Update task completed, verifying database recovered") - // Database should still be available - assert.Equal(t, "available", db.State, "Database should remain available after update") + // Verify the database is now available + db, err = fixture.Client.GetDatabase(ctx, &controlplane.GetDatabasePayload{ + DatabaseID: dbID, + }) + require.NoError(t, err, "GetDatabase should succeed after update") + assert.Equal(t, "available", db.State, "Database should be available after recovery update") // Service instance should now exist require.NotNil(t, db.ServiceInstances, "ServiceInstances should not be nil after recovery") @@ -488,7 +583,7 @@ func TestProvisionMCPServiceRecovery(t *testing.T) { serviceInstance := db.ServiceInstances[0] assert.Equal(t, "mcp-server", serviceInstance.ServiceID, "Service ID should match") - assert.Equal(t, host1, serviceInstance.HostID, "Host ID should match") + assert.Equal(t, string(host1), serviceInstance.HostID, "Host ID should match") t.Logf("Service instance created: %s (state: %s)", serviceInstance.ServiceInstanceID, serviceInstance.State) @@ -501,7 +596,9 @@ func TestProvisionMCPServiceRecovery(t *testing.T) { deadline := time.Now().Add(maxWait) for time.Now().Before(deadline) { - err := db.Refresh(ctx) + db, err = fixture.Client.GetDatabase(ctx, &controlplane.GetDatabasePayload{ + DatabaseID: dbID, + }) require.NoError(t, err, "Failed to refresh database") if len(db.ServiceInstances) > 0 && db.ServiceInstances[0].State == "running" { @@ -520,6 +617,144 @@ func TestProvisionMCPServiceRecovery(t *testing.T) { t.Log("Service recovery test completed successfully") } +// TestUpdateDatabaseServiceStable tests that updating a database without changing +// the service spec does not delete or recreate the service instance. This is the +// core behavior that PLAT-412 fixes: service resources participate in the normal +// reconciliation cycle and are left untouched when unchanged. +func TestUpdateDatabaseServiceStable(t *testing.T) { + t.Parallel() + + host1 := fixture.HostIDs()[0] + + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Minute) + defer cancel() + + t.Log("Creating database with MCP service") + + db := fixture.NewDatabaseFixture(ctx, t, &controlplane.CreateDatabaseRequest{ + Spec: &controlplane.DatabaseSpec{ + DatabaseName: "test_service_stable", + DatabaseUsers: []*controlplane.DatabaseUserSpec{ + { + Username: "admin", + Password: pointerTo("testpassword"), + DbOwner: pointerTo(true), + Attributes: []string{"LOGIN", "SUPERUSER"}, + }, + }, + Port: pointerTo(0), + Nodes: []*controlplane.DatabaseNodeSpec{ + { + Name: "n1", + HostIds: []controlplane.Identifier{controlplane.Identifier(host1)}, + }, + }, + Services: []*controlplane.ServiceSpec{ + { + ServiceID: "mcp-server", + ServiceType: "mcp", + Version: "latest", + HostIds: []controlplane.Identifier{controlplane.Identifier(host1)}, + Config: map[string]any{ + "llm_provider": "anthropic", + "llm_model": "claude-sonnet-4-5", + "anthropic_api_key": "sk-ant-test-key-stable", + }, + }, + }, + }, + }) + + require.Len(t, db.ServiceInstances, 1, "Expected 1 service instance") + + // Wait for service to reach "running" state before recording baseline + if db.ServiceInstances[0].State != "running" { + t.Log("Service not yet running, waiting...") + deadline := time.Now().Add(5 * time.Minute) + for time.Now().Before(deadline) { + time.Sleep(5 * time.Second) + err := db.Refresh(ctx) + require.NoError(t, err, "Failed to refresh database") + if len(db.ServiceInstances) > 0 && db.ServiceInstances[0].State == "running" { + break + } + } + } + require.Equal(t, "running", db.ServiceInstances[0].State, "Service should be running") + + // Record identifiers before the update to verify stability after + serviceInstanceID := db.ServiceInstances[0].ServiceInstanceID + createdAtBefore := db.ServiceInstances[0].CreatedAt + var containerIDBefore string + if db.ServiceInstances[0].Status != nil && db.ServiceInstances[0].Status.ContainerID != nil { + containerIDBefore = *db.ServiceInstances[0].Status.ContainerID + } + + t.Logf("Service instance %s: created_at=%s, container_id=%s", serviceInstanceID, createdAtBefore, containerIDBefore) + + t.Log("Updating database with a postgres config change (service unchanged)") + + // Update database with a non-service change (add a postgresql_conf setting). + // The service spec is identical — the service should not be touched. + err := db.Update(ctx, UpdateOptions{ + Spec: &controlplane.DatabaseSpec{ + DatabaseName: "test_service_stable", + DatabaseUsers: []*controlplane.DatabaseUserSpec{ + { + Username: "admin", + Password: pointerTo("testpassword"), + DbOwner: pointerTo(true), + Attributes: []string{"LOGIN", "SUPERUSER"}, + }, + }, + Port: pointerTo(0), + Nodes: []*controlplane.DatabaseNodeSpec{ + { + Name: "n1", + HostIds: []controlplane.Identifier{controlplane.Identifier(host1)}, + PostgresqlConf: map[string]any{ + "log_min_duration_statement": "1000", + }, + }, + }, + Services: []*controlplane.ServiceSpec{ + { + ServiceID: "mcp-server", + ServiceType: "mcp", + Version: "latest", + HostIds: []controlplane.Identifier{controlplane.Identifier(host1)}, + Config: map[string]any{ + "llm_provider": "anthropic", + "llm_model": "claude-sonnet-4-5", + "anthropic_api_key": "sk-ant-test-key-stable", + }, + }, + }, + }, + }) + require.NoError(t, err, "Failed to update database") + + t.Log("Database updated, verifying service instance was not restarted") + + // Verify the service instance still exists and is running + require.Len(t, db.ServiceInstances, 1, "Should still have 1 service instance") + assert.Equal(t, "running", db.ServiceInstances[0].State, "Service should still be running") + assert.Equal(t, serviceInstanceID, db.ServiceInstances[0].ServiceInstanceID, "Service instance ID should be unchanged") + + // The key assertions: created_at and container ID should be unchanged, + // proving the service was not deleted/recreated during the database update. + // (updated_at is NOT checked because the service instance monitor + // periodically writes health status, which legitimately bumps it.) + assert.Equal(t, createdAtBefore, db.ServiceInstances[0].CreatedAt, "Service created_at should be unchanged (service was not recreated)") + + if containerIDBefore != "" && db.ServiceInstances[0].Status != nil && db.ServiceInstances[0].Status.ContainerID != nil { + assert.Equal(t, containerIDBefore, *db.ServiceInstances[0].Status.ContainerID, "Container ID should be unchanged (container was not restarted)") + } + + t.Logf("Service instance %s stable after database update", serviceInstanceID) + t.Log("Service stability test completed successfully") +} + // TestUpdateDatabaseRemoveService tests removing a service from a database. func TestUpdateDatabaseRemoveService(t *testing.T) { t.Parallel() diff --git a/server/internal/database/operations/common.go b/server/internal/database/operations/common.go index 475933d7..17b35c4c 100644 --- a/server/internal/database/operations/common.go +++ b/server/internal/database/operations/common.go @@ -67,3 +67,21 @@ func mergePartialStates(in [][]*resource.State) []*resource.State { return out } + +// ServiceResources represents the resources for a single service instance. +type ServiceResources struct { + ServiceInstanceID string + Resources []*resource.ResourceData + MonitorResource resource.Resource +} + +func (s *ServiceResources) State() (*resource.State, error) { + state := resource.NewState() + state.Add(s.Resources...) + if s.MonitorResource != nil { + if err := state.AddResource(s.MonitorResource); err != nil { + return nil, err + } + } + return state, nil +} diff --git a/server/internal/database/operations/end.go b/server/internal/database/operations/end.go index a0501445..387b3802 100644 --- a/server/internal/database/operations/end.go +++ b/server/internal/database/operations/end.go @@ -10,7 +10,7 @@ import ( // EndState computes the end state for the database, containing only the given // nodes. -func EndState(nodes []*NodeResources) (*resource.State, error) { +func EndState(nodes []*NodeResources, services []*ServiceResources) (*resource.State, error) { end := resource.NewState() for _, node := range nodes { var resources []resource.Resource @@ -59,5 +59,13 @@ func EndState(nodes []*NodeResources) (*resource.State, error) { } } + for _, svc := range services { + state, err := svc.State() + if err != nil { + return nil, err + } + end.Merge(state) + } + return end, nil } diff --git a/server/internal/database/operations/restore_database.go b/server/internal/database/operations/restore_database.go index aa07ff84..d5ed1d0a 100644 --- a/server/internal/database/operations/restore_database.go +++ b/server/internal/database/operations/restore_database.go @@ -91,7 +91,7 @@ func RestoreDatabase( targets []*NodeRestoreResources, ) ([]resource.Plan, error) { // The other states will be applied on top of this base state. - base, err := EndState(nodes) + base, err := EndState(nodes, nil) if err != nil { return nil, err } @@ -118,7 +118,7 @@ func RestoreDatabase( states[i] = merged } - end, err := EndState(allNodes) + end, err := EndState(allNodes, nil) if err != nil { return nil, err } diff --git a/server/internal/database/operations/update_database.go b/server/internal/database/operations/update_database.go index 5be441f5..d6596e57 100644 --- a/server/internal/database/operations/update_database.go +++ b/server/internal/database/operations/update_database.go @@ -36,6 +36,7 @@ func UpdateDatabase( options UpdateDatabaseOptions, start *resource.State, nodes []*NodeResources, + services []*ServiceResources, ) ([]resource.Plan, error) { update, err := updateFunc(options) if err != nil { @@ -65,7 +66,7 @@ func UpdateDatabase( } // Auto-select source node ONLY when both SourceNode and RestoreConfig are empty. - // If no existing nodes (fresh cluster), skip auto-select (don’t error). + // If no existing nodes (fresh cluster), skip auto-select (don't error). if len(adds) > 0 { var defaultSource string if len(updates) > 0 { @@ -112,7 +113,7 @@ func UpdateDatabase( } } - end, err := EndState(nodes) + end, err := EndState(nodes, services) if err != nil { return nil, err } diff --git a/server/internal/database/operations/update_database_test.go b/server/internal/database/operations/update_database_test.go index bc807fdf..3f0034a8 100644 --- a/server/internal/database/operations/update_database_test.go +++ b/server/internal/database/operations/update_database_test.go @@ -337,6 +337,7 @@ func TestUpdateDatabase(t *testing.T) { tc.options, tc.start, tc.nodes, + nil, ) if tc.expectedErr != "" { assert.Nil(t, plans) diff --git a/server/internal/database/service_instance.go b/server/internal/database/service_instance.go index 0ef33026..b339e1af 100644 --- a/server/internal/database/service_instance.go +++ b/server/internal/database/service_instance.go @@ -173,6 +173,7 @@ type ServiceInstanceSpec struct { CohortMemberID string Credentials *ServiceUser DatabaseNetworkID string + PostgresHostID string // Host where Postgres instance runs (for ServiceUserRole executor routing) DatabaseHost string // Postgres instance hostname to connect to DatabasePort int // Postgres instance port Port *int // Service instance published port (optional, 0 = random) diff --git a/server/internal/orchestrator/swarm/orchestrator.go b/server/internal/orchestrator/swarm/orchestrator.go index 6aaba966..ecea139d 100644 --- a/server/internal/orchestrator/swarm/orchestrator.go +++ b/server/internal/orchestrator/swarm/orchestrator.go @@ -426,8 +426,16 @@ func (o *Orchestrator) GenerateServiceInstanceResources(spec *database.ServiceIn ServiceInstanceID: spec.ServiceInstanceID, DatabaseID: spec.DatabaseID, DatabaseName: spec.DatabaseName, - Username: spec.Credentials.Username, HostID: spec.HostID, + PostgresHostID: spec.PostgresHostID, + ServiceID: spec.ServiceSpec.ServiceID, + } + // Username and Password are populated from existing state during Refresh, + // or generated during Create. Only set if credentials exist (backward + // compatibility with existing state). + if spec.Credentials != nil { + serviceUserRole.Username = spec.Credentials.Username + serviceUserRole.Password = spec.Credentials.Password } // Service instance spec resource @@ -454,6 +462,8 @@ func (o *Orchestrator) GenerateServiceInstanceResources(spec *database.ServiceIn ServiceInstanceID: spec.ServiceInstanceID, DatabaseID: spec.DatabaseID, ServiceName: serviceName, + ServiceID: spec.ServiceSpec.ServiceID, + HostID: spec.HostID, } orchestratorResources := []resource.Resource{ diff --git a/server/internal/orchestrator/swarm/service_instance.go b/server/internal/orchestrator/swarm/service_instance.go index da01d0ab..6d700373 100644 --- a/server/internal/orchestrator/swarm/service_instance.go +++ b/server/internal/orchestrator/swarm/service_instance.go @@ -33,6 +33,7 @@ type ServiceInstanceResource struct { DatabaseID string `json:"database_id"` ServiceName string `json:"service_name"` ServiceID string `json:"service_id"` + HostID string `json:"host_id"` NeedsUpdate bool `json:"needs_update"` } @@ -44,6 +45,7 @@ func (s *ServiceInstanceResource) DiffIgnore() []string { return []string{ "/database_id", "/service_id", + "/host_id", } } @@ -90,6 +92,57 @@ func (s *ServiceInstanceResource) Refresh(ctx context.Context, rc *resource.Cont } func (s *ServiceInstanceResource) Create(ctx context.Context, rc *resource.Context) error { + dbSvc, err := do.Invoke[*database.Service](rc.Injector) + if err != nil { + return fmt.Errorf("failed to get database service: %w", err) + } + + // Store initial service instance record in etcd with state="creating". + // This ensures the service instance is visible in the API even if deployment + // fails later. Previously this was done by the StoreServiceInstance activity + // in ProvisionServices. + err = dbSvc.UpdateServiceInstance(ctx, &database.ServiceInstanceUpdateOptions{ + ServiceInstanceID: s.ServiceInstanceID, + ServiceID: s.ServiceID, + DatabaseID: s.DatabaseID, + HostID: s.HostID, + State: database.ServiceInstanceStateCreating, + }) + if err != nil { + return fmt.Errorf("failed to store initial service instance: %w", err) + } + + return s.deploy(ctx, rc) +} + +func (s *ServiceInstanceResource) Update(ctx context.Context, rc *resource.Context) error { + client, err := do.Invoke[*docker.Docker](rc.Injector) + if err != nil { + return err + } + + resp, err := client.ServiceInspectByLabels(ctx, map[string]string{ + "pgedge.component": "service", + "pgedge.service.instance.id": s.ServiceInstanceID, + }) + if err != nil && !errors.Is(err, docker.ErrNotFound) { + return fmt.Errorf("failed to inspect service instance: %w", err) + } + if err == nil && resp.Spec.Name != s.ServiceName { + // If the service name has changed, we need to remove the service with + // the old name so that it can be recreated with the new name. + if err := client.ServiceRemove(ctx, resp.Spec.Name); err != nil { + return fmt.Errorf("failed to remove service instance for service name update: %w", err) + } + } + + return s.deploy(ctx, rc) +} + +// deploy handles the shared Docker service deployment logic used by both +// Create and Update. It deploys the service, waits for it to start, and +// transitions the etcd state to "running". +func (s *ServiceInstanceResource) deploy(ctx context.Context, rc *resource.Context) error { client, err := do.Invoke[*docker.Docker](rc.Injector) if err != nil { return err @@ -100,6 +153,11 @@ func (s *ServiceInstanceResource) Create(ctx context.Context, rc *resource.Conte return err } + dbSvc, err := do.Invoke[*database.Service](rc.Injector) + if err != nil { + return fmt.Errorf("failed to get database service: %w", err) + } + specResourceID := ServiceInstanceSpecResourceIdentifier(s.ServiceInstanceID) spec, err := resource.FromContext[*ServiceInstanceSpecResource](rc, specResourceID) if err != nil { @@ -140,44 +198,15 @@ func (s *ServiceInstanceResource) Create(ctx context.Context, rc *resource.Conte // Transition state to "running" immediately after successful deployment. // This mirrors the database instance pattern where state is set // deterministically within the resource activity, not deferred to the monitor. - dbSvc, err := do.Invoke[*database.Service](rc.Injector) - if err != nil { - logger.Warn().Err(err).Msg("failed to get database service for state update") - } else { - if err := dbSvc.SetServiceInstanceState(ctx, s.DatabaseID, s.ServiceInstanceID, database.ServiceInstanceStateRunning); err != nil { - logger.Warn().Err(err). - Str("service_instance_id", s.ServiceInstanceID). - Msg("failed to update service instance state to running (monitor will handle it)") - } + if err := dbSvc.SetServiceInstanceState(ctx, s.DatabaseID, s.ServiceInstanceID, database.ServiceInstanceStateRunning); err != nil { + logger.Warn().Err(err). + Str("service_instance_id", s.ServiceInstanceID). + Msg("failed to update service instance state to running (monitor will handle it)") } return nil } -func (s *ServiceInstanceResource) Update(ctx context.Context, rc *resource.Context) error { - client, err := do.Invoke[*docker.Docker](rc.Injector) - if err != nil { - return err - } - - resp, err := client.ServiceInspectByLabels(ctx, map[string]string{ - "pgedge.component": "service", - "pgedge.service.instance.id": s.ServiceInstanceID, - }) - if err != nil && !errors.Is(err, docker.ErrNotFound) { - return fmt.Errorf("failed to inspect service instance: %w", err) - } - if err == nil && resp.Spec.Name != s.ServiceName { - // If the service name has changed, we need to remove the service with - // the old name so that it can be recreated with the new name. - if err := client.ServiceRemove(ctx, resp.Spec.Name); err != nil { - return fmt.Errorf("failed to remove service instance for service name update: %w", err) - } - } - - return s.Create(ctx, rc) -} - func (s *ServiceInstanceResource) Delete(ctx context.Context, rc *resource.Context) error { client, err := do.Invoke[*docker.Docker](rc.Injector) if err != nil { diff --git a/server/internal/orchestrator/swarm/service_instance_spec.go b/server/internal/orchestrator/swarm/service_instance_spec.go index 69370e12..8b0192f7 100644 --- a/server/internal/orchestrator/swarm/service_instance_spec.go +++ b/server/internal/orchestrator/swarm/service_instance_spec.go @@ -58,22 +58,36 @@ func (s *ServiceInstanceSpecResource) Executor() resource.Executor { } func (s *ServiceInstanceSpecResource) Dependencies() []resource.Identifier { - // Service instances depend on the database network existing + // Service instances depend on the database network and service user role return []resource.Identifier{ NetworkResourceIdentifier(s.DatabaseNetworkID), + ServiceUserRoleIdentifier(s.ServiceInstanceID), } } +func (s *ServiceInstanceSpecResource) populateCredentials(rc *resource.Context) error { + userRole, err := resource.FromContext[*ServiceUserRole](rc, ServiceUserRoleIdentifier(s.ServiceInstanceID)) + if err != nil { + return fmt.Errorf("failed to get service user role from state: %w", err) + } + s.Credentials = &database.ServiceUser{ + Username: userRole.Username, + Password: userRole.Password, + Role: "pgedge_application_read_only", + } + return nil +} + func (s *ServiceInstanceSpecResource) Refresh(ctx context.Context, rc *resource.Context) error { network, err := resource.FromContext[*Network](rc, NetworkResourceIdentifier(s.DatabaseNetworkID)) if err != nil { return fmt.Errorf("failed to get database network from state: %w", err) } - // DatabaseHost and DatabasePort are populated by the ProvisionServices workflow, - // which prefers a co-located instance for lower latency but falls back to any - // instance in the database when no local instance exists. - // TODO: consider alternatives and discuss with the team + // Populate credentials from the ServiceUserRole resource + if err := s.populateCredentials(rc); err != nil { + return err + } spec, err := ServiceContainerSpec(&ServiceContainerSpecOptions{ ServiceSpec: s.ServiceSpec, diff --git a/server/internal/orchestrator/swarm/service_user_role.go b/server/internal/orchestrator/swarm/service_user_role.go index b06609ad..9b62bde1 100644 --- a/server/internal/orchestrator/swarm/service_user_role.go +++ b/server/internal/orchestrator/swarm/service_user_role.go @@ -7,13 +7,16 @@ import ( "strings" "time" + "github.com/jackc/pgx/v5" "github.com/rs/zerolog" "github.com/samber/do" "github.com/pgEdge/control-plane/server/internal/certificates" "github.com/pgEdge/control-plane/server/internal/database" "github.com/pgEdge/control-plane/server/internal/patroni" + "github.com/pgEdge/control-plane/server/internal/postgres" "github.com/pgEdge/control-plane/server/internal/resource" + "github.com/pgEdge/control-plane/server/internal/utils" ) var _ resource.Resource = (*ServiceUserRole)(nil) @@ -35,23 +38,32 @@ func ServiceUserRoleIdentifier(serviceInstanceID string) resource.Identifier { // ServiceUserRole manages the lifecycle of a database user for a service instance. // -// This resource handles cleanup of database users when service instances are deleted. -// User creation is performed by the CreateServiceUser activity during provisioning, -// but deletion requires infrastructure access and is therefore handled by this resource. +// This resource handles creation, verification, and cleanup of database users. +// On Create, it generates a deterministic username and random password, creates +// the Postgres role, and stores the credentials in the resource state. On +// subsequent reconciliation cycles, the credentials are reused from the +// persisted state (no password regeneration). type ServiceUserRole struct { ServiceInstanceID string `json:"service_instance_id"` DatabaseID string `json:"database_id"` DatabaseName string `json:"database_name"` Username string `json:"username"` HostID string `json:"host_id"` + PostgresHostID string `json:"postgres_host_id"` // Host where Postgres runs (for executor routing) + ServiceID string `json:"service_id"` // Needed for username generation + Password string `json:"password"` // Generated on Create, persisted in state } func (r *ServiceUserRole) ResourceVersion() string { - return "1" + return "2" } func (r *ServiceUserRole) DiffIgnore() []string { - return nil + return []string{ + "/postgres_host_id", + "/username", + "/password", + } } func (r *ServiceUserRole) Identifier() resource.Identifier { @@ -59,6 +71,11 @@ func (r *ServiceUserRole) Identifier() resource.Identifier { } func (r *ServiceUserRole) Executor() resource.Executor { + // ServiceUserRole must execute on the host running Postgres, because + // Create/Delete connect via local Docker container inspect. + if r.PostgresHostID != "" { + return resource.HostExecutor(r.PostgresHostID) + } return resource.HostExecutor(r.HostID) } @@ -68,13 +85,55 @@ func (r *ServiceUserRole) Dependencies() []resource.Identifier { } func (r *ServiceUserRole) Refresh(ctx context.Context, rc *resource.Context) error { - // Nothing to refresh - user existence is managed by Create/Delete + // If username or password is empty, the resource state is from before we + // added credential management. Return ErrNotFound to trigger recreation. + if r.Username == "" || r.Password == "" { + return resource.ErrNotFound + } return nil } func (r *ServiceUserRole) Create(ctx context.Context, rc *resource.Context) error { - // User was already created by the CreateServiceUser activity during provisioning. - // This resource only handles deletion cleanup. + logger, err := do.Invoke[zerolog.Logger](rc.Injector) + if err != nil { + return err + } + logger = logger.With(). + Str("service_instance_id", r.ServiceInstanceID). + Str("database_id", r.DatabaseID). + Logger() + logger.Info().Msg("creating service user role") + + // Generate deterministic username and random password + r.Username = database.GenerateServiceUsername(r.ServiceID, r.HostID) + password, err := utils.RandomString(32) + if err != nil { + return fmt.Errorf("failed to generate password: %w", err) + } + r.Password = password + + conn, err := r.connectToPrimary(ctx, rc, logger) + if err != nil { + return err + } + defer conn.Close(ctx) + + statements, err := postgres.CreateUserRole(postgres.UserRoleOptions{ + Name: r.Username, + Password: r.Password, + DBName: r.DatabaseName, + DBOwner: false, + Roles: []string{"pgedge_application_read_only"}, + }) + if err != nil { + return fmt.Errorf("failed to generate create user role statements: %w", err) + } + + if err := statements.Exec(ctx, conn); err != nil { + return fmt.Errorf("failed to create service user: %w", err) + } + + logger.Info().Str("username", r.Username).Msg("service user role created successfully") return nil } @@ -95,39 +154,62 @@ func (r *ServiceUserRole) Delete(ctx context.Context, rc *resource.Context) erro Logger() logger.Info().Msg("deleting service user from database") + conn, err := r.connectToPrimary(ctx, rc, logger) + if err != nil { + // During deletion, connection failures are non-fatal — the database + // may already be gone or unreachable. + logger.Warn().Err(err).Msg("failed to connect to primary instance, skipping user deletion") + return nil + } + defer conn.Close(ctx) + + // Drop the user role + // Using IF EXISTS to handle cases where the user was already dropped manually + _, err = conn.Exec(ctx, fmt.Sprintf("DROP ROLE IF EXISTS %s", sanitizeIdentifier(r.Username))) + if err != nil { + logger.Warn().Err(err).Msg("failed to drop user role, continuing anyway") + // Don't fail the deletion if we can't drop the user - this prevents + // the resource from getting stuck in a failed state + return nil + } + + logger.Info().Msg("service user deleted successfully") + return nil +} + +// connectToPrimary finds the primary Postgres instance and returns an +// authenticated connection to it. The caller is responsible for closing +// the connection. +func (r *ServiceUserRole) connectToPrimary(ctx context.Context, rc *resource.Context, logger zerolog.Logger) (*pgx.Conn, error) { orch, err := do.Invoke[database.Orchestrator](rc.Injector) if err != nil { - return err + return nil, err } - // Get database service to find an instance to connect to dbSvc, err := do.Invoke[*database.Service](rc.Injector) if err != nil { - return err + return nil, err } db, err := dbSvc.GetDatabase(ctx, r.DatabaseID) if err != nil { if errors.Is(err, database.ErrDatabaseNotFound) { - logger.Info().Msg("database not found, skipping user deletion") - return nil + return nil, fmt.Errorf("database not found: %w", err) } - return fmt.Errorf("failed to get database: %w", err) + return nil, fmt.Errorf("failed to get database: %w", err) } if len(db.Instances) == 0 { - logger.Info().Msg("database has no instances, skipping user deletion") - return nil + return nil, fmt.Errorf("database has no instances") } - // Connect to primary instance (or any available instance) + // Find primary instance via Patroni var primaryInstanceID string for _, inst := range db.Instances { connInfo, err := orch.GetInstanceConnectionInfo(ctx, r.DatabaseID, inst.InstanceID) if err != nil { continue } - patroniClient := patroni.NewClient(connInfo.PatroniURL(), nil) primaryID, err := database.GetPrimaryInstanceID(ctx, patroniClient, 10*time.Second) if err == nil && primaryID != "" { @@ -135,54 +217,33 @@ func (r *ServiceUserRole) Delete(ctx context.Context, rc *resource.Context) erro break } } - if primaryInstanceID == "" { - // Fallback: use first available instance primaryInstanceID = db.Instances[0].InstanceID logger.Warn().Msg("could not determine primary instance, using first available instance") } - // Get connection info for the primary instance connInfo, err := orch.GetInstanceConnectionInfo(ctx, r.DatabaseID, primaryInstanceID) if err != nil { - logger.Warn().Err(err).Msg("failed to get instance connection info, skipping user deletion") - return nil + return nil, fmt.Errorf("failed to get instance connection info: %w", err) } - // Get certificate service for TLS authentication certSvc, err := do.Invoke[*certificates.Service](rc.Injector) if err != nil { - return fmt.Errorf("failed to get certificate service: %w", err) + return nil, fmt.Errorf("failed to get certificate service: %w", err) } - // Create TLS config with pgedge user certificates tlsConfig, err := certSvc.PostgresUserTLS(ctx, primaryInstanceID, connInfo.InstanceHostname, "pgedge") if err != nil { - logger.Warn().Err(err).Msg("failed to create TLS config, skipping user deletion") - return nil + return nil, fmt.Errorf("failed to create TLS config: %w", err) } - // Connect to the postgres system database conn, err := database.ConnectToInstance(ctx, &database.ConnectionOptions{ DSN: connInfo.AdminDSN("postgres"), TLS: tlsConfig, }) if err != nil { - logger.Warn().Err(err).Msg("failed to connect to database, skipping user deletion") - return nil - } - defer conn.Close(ctx) - - // Drop the user role - // Using IF EXISTS to handle cases where the user was already dropped manually - _, err = conn.Exec(ctx, fmt.Sprintf("DROP ROLE IF EXISTS %s", sanitizeIdentifier(r.Username))) - if err != nil { - logger.Warn().Err(err).Msg("failed to drop user role, continuing anyway") - // Don't fail the deletion if we can't drop the user - this prevents - // the resource from getting stuck in a failed state - return nil + return nil, fmt.Errorf("failed to connect to database: %w", err) } - logger.Info().Msg("service user deleted successfully") - return nil + return conn, nil } diff --git a/server/internal/workflows/activities/activities.go b/server/internal/workflows/activities/activities.go index 439b0e26..cc6dda12 100644 --- a/server/internal/workflows/activities/activities.go +++ b/server/internal/workflows/activities/activities.go @@ -25,11 +25,9 @@ func (a *Activities) Register(work *worker.Worker) error { work.RegisterActivity(a.CancelSwitchover), work.RegisterActivity(a.CheckClusterHealth), work.RegisterActivity(a.CreatePgBackRestBackup), - work.RegisterActivity(a.CreateServiceUser), work.RegisterActivity(a.DeleteDbEntities), work.RegisterActivity(a.GenerateServiceInstanceResources), work.RegisterActivity(a.GetCurrentState), - work.RegisterActivity(a.GetServiceInstanceStatus), work.RegisterActivity(a.GetInstanceResources), work.RegisterActivity(a.GetPrimaryInstance), work.RegisterActivity(a.GetRestoreResources), @@ -44,9 +42,7 @@ func (a *Activities) Register(work *worker.Worker) error { work.RegisterActivity(a.SelectCandidate), work.RegisterActivity(a.StartInstance), work.RegisterActivity(a.StopInstance), - work.RegisterActivity(a.StoreServiceInstance), work.RegisterActivity(a.UpdateDbState), - work.RegisterActivity(a.UpdateServiceInstanceState), work.RegisterActivity(a.UpdateTask), work.RegisterActivity(a.ValidateInstanceSpecs), } diff --git a/server/internal/workflows/activities/create_service_user.go b/server/internal/workflows/activities/create_service_user.go deleted file mode 100644 index c7ecc1e6..00000000 --- a/server/internal/workflows/activities/create_service_user.go +++ /dev/null @@ -1,216 +0,0 @@ -package activities - -import ( - "context" - "fmt" - "time" - - "github.com/cschleiden/go-workflows/activity" - "github.com/cschleiden/go-workflows/workflow" - "github.com/samber/do" - - "github.com/pgEdge/control-plane/server/internal/certificates" - "github.com/pgEdge/control-plane/server/internal/database" - "github.com/pgEdge/control-plane/server/internal/patroni" - "github.com/pgEdge/control-plane/server/internal/postgres" - "github.com/pgEdge/control-plane/server/internal/utils" -) - -type CreateServiceUserInput struct { - ServiceInstanceID string `json:"service_instance_id"` - DatabaseID string `json:"database_id"` - DatabaseName string `json:"database_name"` - ServiceID string `json:"service_id"` - HostID string `json:"host_id"` -} - -type CreateServiceUserOutput struct { - Credentials *database.ServiceUser `json:"credentials"` -} - -func (a *Activities) ExecuteCreateServiceUser( - ctx workflow.Context, - hostID string, - input *CreateServiceUserInput, -) workflow.Future[*CreateServiceUserOutput] { - options := workflow.ActivityOptions{ - Queue: utils.HostQueue(hostID), - RetryOptions: workflow.RetryOptions{ - MaxAttempts: 1, - }, - } - return workflow.ExecuteActivity[*CreateServiceUserOutput](ctx, options, a.CreateServiceUser, input) -} - -// CreateServiceUser creates dedicated database credentials for a service instance. -// -// # Credential Generation Pattern -// -// Each service instance receives isolated database credentials with read-only access. -// This provides security isolation between service instances and prevents services from -// modifying data. -// -// # Username Generation -// -// Usernames follow the format: "svc_{service_id}_{host_id}" -// Example: For service_id "mcp-server" and host_id "host1", the username is -// "svc_mcp-server_host1". -// -// The username is deterministic: the same service_id + host_id always produces -// the same username. It is truncated to 63 characters for PostgreSQL -// compatibility. -// -// # Password Generation -// -// Passwords are generated by utils.RandomString(32), which reads 32 bytes from -// crypto/rand and base64url-encodes them. The result is a 44-character string -// (256 bits of entropy), making brute-force attacks infeasible. -// -// # Database Permissions -// -// Service users are granted the "pgedge_application_read_only" role, which provides: -// - SELECT privileges on all tables -// - EXECUTE privileges on functions (read-only operations) -// - No INSERT, UPDATE, DELETE, or DDL privileges -// -// This follows the principle of least privilege - services can query data but cannot -// modify it. Any data modification must go through the application layer. -// -// # Credential Lifecycle -// -// 1. Provisioning: Credentials are created during service instance provisioning -// 2. Storage: Passwords are stored in etcd alongside service instance metadata -// 3. Injection: Credentials are injected as environment variables (PGUSER/PGPASSWORD) -// into the service container at startup -// 4. Rotation: Not currently supported (future enhancement) -// 5. Deletion: Credentials are revoked when the service instance is deleted -// -// # Security Considerations -// -// - Credentials are unique per service instance (not shared) -// - Passwords never appear in logs (marked as sensitive) -// - Read-only access prevents data corruption from compromised services -// - Credentials are injected as environment variables into service containers -// -// # Implementation Details -// -// The activity connects to the database's primary instance using admin credentials, -// generates the username and password, then executes CREATE USER and GRANT statements -// via postgres.CreateUserRole(). The generated credentials are returned for storage -// in etcd and injection into the service container. -func (a *Activities) CreateServiceUser(ctx context.Context, input *CreateServiceUserInput) (*CreateServiceUserOutput, error) { - logger := activity.Logger(ctx).With( - "service_instance_id", input.ServiceInstanceID, - "database_id", input.DatabaseID, - ) - logger.Info("creating service user") - - orch, err := do.Invoke[database.Orchestrator](a.Injector) - if err != nil { - return nil, err - } - - // Get database service to find an instance to connect to - dbSvc, err := do.Invoke[*database.Service](a.Injector) - if err != nil { - return nil, err - } - - db, err := dbSvc.GetDatabase(ctx, input.DatabaseID) - if err != nil { - return nil, fmt.Errorf("failed to get database: %w", err) - } - - if len(db.Instances) == 0 { - return nil, fmt.Errorf("database has no instances") - } - - // Connect to any instance (preferably primary, but any will work for user creation) - var primaryInstanceID string - for _, inst := range db.Instances { - connInfo, err := orch.GetInstanceConnectionInfo(ctx, input.DatabaseID, inst.InstanceID) - if err != nil { - continue - } - - patroniClient := patroni.NewClient(connInfo.PatroniURL(), nil) - primaryID, err := database.GetPrimaryInstanceID(ctx, patroniClient, 10*time.Second) - if err == nil && primaryID != "" { - primaryInstanceID = primaryID - break - } - } - - if primaryInstanceID == "" { - // Fallback: use first available instance - primaryInstanceID = db.Instances[0].InstanceID - logger.Warn("could not determine primary instance, using first available instance") - } - - // Get connection info for the primary instance - connInfo, err := orch.GetInstanceConnectionInfo(ctx, input.DatabaseID, primaryInstanceID) - if err != nil { - return nil, fmt.Errorf("failed to get instance connection info: %w", err) - } - - // Get certificate service to create TLS config for pgedge user - // The pg_hba.conf requires SSL certificate-based authentication for admin connections - certSvc, err := do.Invoke[*certificates.Service](a.Injector) - if err != nil { - return nil, fmt.Errorf("failed to get certificate service: %w", err) - } - - // Create TLS config with pgedge user certificates - tlsConfig, err := certSvc.PostgresUserTLS(ctx, primaryInstanceID, connInfo.InstanceHostname, "pgedge") - if err != nil { - return nil, fmt.Errorf("failed to create TLS config: %w", err) - } - - // Connect to the postgres system database - // Note: We connect to "postgres" instead of the user database because: - // 1. CREATE ROLE/ALTER ROLE/GRANT are cluster-level operations that work from any database - // 2. The pg_hba.conf may not allow connections from Control Plane to user databases - // 3. This matches the pattern used by other user management activities - conn, err := database.ConnectToInstance(ctx, &database.ConnectionOptions{ - DSN: connInfo.AdminDSN("postgres"), - TLS: tlsConfig, - }) - if err != nil { - return nil, fmt.Errorf("failed to connect to database: %w", err) - } - defer conn.Close(ctx) - - // Generate credentials - username := database.GenerateServiceUsername(input.ServiceID, input.HostID) - password, err := utils.RandomString(32) - if err != nil { - return nil, fmt.Errorf("failed to generate password: %w", err) - } - - // Create user role with read-only permissions - statements, err := postgres.CreateUserRole(postgres.UserRoleOptions{ - Name: username, - Password: password, - DBName: input.DatabaseName, - DBOwner: false, - Roles: []string{"pgedge_application_read_only"}, - }) - if err != nil { - return nil, fmt.Errorf("failed to generate create user role statements: %w", err) - } - - // Execute statements - if err := statements.Exec(ctx, conn); err != nil { - return nil, fmt.Errorf("failed to create service user: %w", err) - } - - logger.Info("service user created successfully", "username", username) - - return &CreateServiceUserOutput{ - Credentials: &database.ServiceUser{ - Username: username, - Password: password, - Role: "pgedge_application_read_only", - }, - }, nil -} diff --git a/server/internal/workflows/activities/get_service_instance_status.go b/server/internal/workflows/activities/get_service_instance_status.go deleted file mode 100644 index 2f5a3b0c..00000000 --- a/server/internal/workflows/activities/get_service_instance_status.go +++ /dev/null @@ -1,56 +0,0 @@ -package activities - -import ( - "context" - "fmt" - - "github.com/cschleiden/go-workflows/activity" - "github.com/cschleiden/go-workflows/workflow" - "github.com/samber/do" - - "github.com/pgEdge/control-plane/server/internal/database" - "github.com/pgEdge/control-plane/server/internal/utils" -) - -type GetServiceInstanceStatusInput struct { - ServiceInstanceID string `json:"service_instance_id"` - HostID string `json:"host_id"` -} - -type GetServiceInstanceStatusOutput struct { - Status *database.ServiceInstanceStatus `json:"status"` -} - -func (a *Activities) ExecuteGetServiceInstanceStatus( - ctx workflow.Context, - hostID string, - input *GetServiceInstanceStatusInput, -) workflow.Future[*GetServiceInstanceStatusOutput] { - options := workflow.ActivityOptions{ - Queue: utils.HostQueue(hostID), - RetryOptions: workflow.RetryOptions{ - MaxAttempts: 3, - }, - } - return workflow.ExecuteActivity[*GetServiceInstanceStatusOutput](ctx, options, a.GetServiceInstanceStatus, input) -} - -func (a *Activities) GetServiceInstanceStatus( - ctx context.Context, - input *GetServiceInstanceStatusInput, -) (*GetServiceInstanceStatusOutput, error) { - logger := activity.Logger(ctx).With("service_instance_id", input.ServiceInstanceID) - logger.Debug("getting service instance status") - - orch, err := do.Invoke[database.Orchestrator](a.Injector) - if err != nil { - return nil, err - } - - status, err := orch.GetServiceInstanceStatus(ctx, input.ServiceInstanceID) - if err != nil { - return nil, fmt.Errorf("failed to get service instance status: %w", err) - } - - return &GetServiceInstanceStatusOutput{Status: status}, nil -} diff --git a/server/internal/workflows/activities/store_service_instance.go b/server/internal/workflows/activities/store_service_instance.go deleted file mode 100644 index fc44e2f2..00000000 --- a/server/internal/workflows/activities/store_service_instance.go +++ /dev/null @@ -1,57 +0,0 @@ -package activities - -import ( - "context" - "fmt" - - "github.com/cschleiden/go-workflows/activity" - "github.com/cschleiden/go-workflows/workflow" - - "github.com/pgEdge/control-plane/server/internal/database" - "github.com/pgEdge/control-plane/server/internal/utils" -) - -type StoreServiceInstanceInput struct { - ServiceInstance *database.ServiceInstance `json:"service_instance"` -} - -type StoreServiceInstanceOutput struct{} - -func (a *Activities) ExecuteStoreServiceInstance( - ctx workflow.Context, - input *StoreServiceInstanceInput, -) workflow.Future[*StoreServiceInstanceOutput] { - options := workflow.ActivityOptions{ - Queue: utils.ManagerQueue(), - RetryOptions: workflow.RetryOptions{ - MaxAttempts: 1, - }, - } - return workflow.ExecuteActivity[*StoreServiceInstanceOutput](ctx, options, a.StoreServiceInstance, input) -} - -func (a *Activities) StoreServiceInstance( - ctx context.Context, - input *StoreServiceInstanceInput, -) (*StoreServiceInstanceOutput, error) { - logger := activity.Logger(ctx).With( - "service_instance_id", input.ServiceInstance.ServiceInstanceID, - "database_id", input.ServiceInstance.DatabaseID, - ) - logger.Debug("storing service instance") - - err := a.DatabaseService.UpdateServiceInstance(ctx, &database.ServiceInstanceUpdateOptions{ - ServiceInstanceID: input.ServiceInstance.ServiceInstanceID, - ServiceID: input.ServiceInstance.ServiceID, - DatabaseID: input.ServiceInstance.DatabaseID, - HostID: input.ServiceInstance.HostID, - State: input.ServiceInstance.State, - }) - if err != nil { - return nil, fmt.Errorf("failed to store service instance: %w", err) - } - - logger.Debug("successfully stored service instance") - - return &StoreServiceInstanceOutput{}, nil -} diff --git a/server/internal/workflows/activities/update_service_instance_state.go b/server/internal/workflows/activities/update_service_instance_state.go deleted file mode 100644 index d8a34678..00000000 --- a/server/internal/workflows/activities/update_service_instance_state.go +++ /dev/null @@ -1,60 +0,0 @@ -package activities - -import ( - "context" - "fmt" - - "github.com/cschleiden/go-workflows/activity" - "github.com/cschleiden/go-workflows/workflow" - - "github.com/pgEdge/control-plane/server/internal/database" - "github.com/pgEdge/control-plane/server/internal/utils" -) - -type UpdateServiceInstanceStateInput struct { - ServiceInstanceID string `json:"service_instance_id"` - DatabaseID string `json:"database_id,omitempty"` - State database.ServiceInstanceState `json:"state"` - Status *database.ServiceInstanceStatus `json:"status,omitempty"` - Error string `json:"error,omitempty"` -} - -type UpdateServiceInstanceStateOutput struct{} - -func (a *Activities) ExecuteUpdateServiceInstanceState( - ctx workflow.Context, - input *UpdateServiceInstanceStateInput, -) workflow.Future[*UpdateServiceInstanceStateOutput] { - options := workflow.ActivityOptions{ - Queue: utils.ManagerQueue(), - RetryOptions: workflow.RetryOptions{ - MaxAttempts: 1, - }, - } - return workflow.ExecuteActivity[*UpdateServiceInstanceStateOutput](ctx, options, a.UpdateServiceInstanceState, input) -} - -func (a *Activities) UpdateServiceInstanceState( - ctx context.Context, - input *UpdateServiceInstanceStateInput, -) (*UpdateServiceInstanceStateOutput, error) { - logger := activity.Logger(ctx).With( - "service_instance_id", input.ServiceInstanceID, - "state", input.State, - ) - logger.Debug("updating service instance state") - - err := a.DatabaseService.UpdateServiceInstanceState(ctx, input.ServiceInstanceID, &database.ServiceInstanceStateUpdate{ - DatabaseID: input.DatabaseID, - State: input.State, - Status: input.Status, - Error: input.Error, - }) - if err != nil { - return nil, fmt.Errorf("failed to update service instance state: %w", err) - } - - logger.Debug("successfully updated service instance state") - - return &UpdateServiceInstanceStateOutput{}, nil -} diff --git a/server/internal/workflows/delete_database.go b/server/internal/workflows/delete_database.go index cd80b328..ba9bc338 100644 --- a/server/internal/workflows/delete_database.go +++ b/server/internal/workflows/delete_database.go @@ -88,7 +88,7 @@ func (w *Workflows) DeleteDatabase(ctx workflow.Context, input *DeleteDatabaseIn } current := refreshCurrentOutput.State - plans, err := operations.UpdateDatabase(operations.UpdateDatabaseOptions{}, current, nil) + plans, err := operations.UpdateDatabase(operations.UpdateDatabaseOptions{}, current, nil, nil) if err != nil { return nil, fmt.Errorf("failed to plan database delete: %w", err) } diff --git a/server/internal/workflows/plan_update.go b/server/internal/workflows/plan_update.go index 2a1480cc..ef5cfe35 100644 --- a/server/internal/workflows/plan_update.go +++ b/server/internal/workflows/plan_update.go @@ -7,8 +7,11 @@ import ( "github.com/pgEdge/control-plane/server/internal/database" "github.com/pgEdge/control-plane/server/internal/database/operations" + "github.com/pgEdge/control-plane/server/internal/host" + "github.com/pgEdge/control-plane/server/internal/monitor" "github.com/pgEdge/control-plane/server/internal/resource" "github.com/pgEdge/control-plane/server/internal/utils" + "github.com/pgEdge/control-plane/server/internal/workflows/activities" ) type PlanUpdateInput struct { @@ -53,7 +56,32 @@ func (w *Workflows) PlanUpdate(ctx workflow.Context, input *PlanUpdateInput) (*P nodeResources[i] = resources } - plans, err := operations.UpdateDatabase(input.Options, input.Current, nodeResources) + // Generate service instance resources. + // Determine a Postgres host for ServiceUserRole executor routing — + // ServiceUserRole.Create() needs local Docker access to the Postgres container. + var postgresHostID string + for _, node := range nodeInstances { + if len(node.Instances) > 0 { + postgresHostID = node.Instances[0].HostID + break + } + } + if postgresHostID == "" && len(input.Spec.Services) > 0 { + return nil, fmt.Errorf("no postgres instances available for service role routing") + } + + var serviceResources []*operations.ServiceResources + for _, serviceSpec := range input.Spec.Services { + for _, hostID := range serviceSpec.HostIDs { + svcRes, err := w.getServiceResources(ctx, input.Spec, serviceSpec, hostID, postgresHostID, nodeInstances) + if err != nil { + return nil, fmt.Errorf("failed to get service resources for %s on %s: %w", serviceSpec.ServiceID, hostID, err) + } + serviceResources = append(serviceResources, svcRes) + } + } + + plans, err := operations.UpdateDatabase(input.Options, input.Current, nodeResources, serviceResources) if err != nil { return nil, fmt.Errorf("failed to plan database update: %w", err) } @@ -62,3 +90,89 @@ func (w *Workflows) PlanUpdate(ctx workflow.Context, input *PlanUpdateInput) (*P return &PlanUpdateOutput{Plans: plans}, nil } + +func (w *Workflows) getServiceResources( + ctx workflow.Context, + spec *database.Spec, + serviceSpec *database.ServiceSpec, + hostID string, + postgresHostID string, + nodeInstances []*database.NodeInstances, +) (*operations.ServiceResources, error) { + serviceInstanceID := database.GenerateServiceInstanceID(spec.DatabaseID, serviceSpec.ServiceID, hostID) + pgEdgeVersion, err := host.NewPgEdgeVersion(spec.PostgresVersion, spec.SpockVersion) + if err != nil { + return nil, fmt.Errorf("failed to parse pgedge version: %w", err) + } + + // Resolve Postgres connection info for the service container. + // Services connect to Postgres via the overlay network using the instance hostname. + databaseHost, databasePort, err := findPostgresInstance(nodeInstances, hostID) + if err != nil { + return nil, fmt.Errorf("failed to resolve postgres instance for service: %w", err) + } + + serviceInstanceSpec := &database.ServiceInstanceSpec{ + ServiceInstanceID: serviceInstanceID, + ServiceSpec: serviceSpec, + PgEdgeVersion: pgEdgeVersion, + DatabaseID: spec.DatabaseID, + DatabaseName: spec.DatabaseName, + HostID: hostID, + PostgresHostID: postgresHostID, + DatabaseNetworkID: database.GenerateDatabaseNetworkID(spec.DatabaseID), + DatabaseHost: databaseHost, + DatabasePort: databasePort, + Port: serviceSpec.Port, + // Credentials: nil — ServiceUserRole.Create() will generate them + } + + generateInput := &activities.GenerateServiceInstanceResourcesInput{Spec: serviceInstanceSpec} + generateOutput, err := w.Activities.ExecuteGenerateServiceInstanceResources(ctx, generateInput).Get(ctx) + if err != nil { + return nil, err + } + + return &operations.ServiceResources{ + ServiceInstanceID: serviceInstanceID, + Resources: generateOutput.Resources.Resources, + MonitorResource: &monitor.ServiceInstanceMonitorResource{ + DatabaseID: spec.DatabaseID, + ServiceInstanceID: serviceInstanceID, + HostID: hostID, + }, + }, nil +} + +// findPostgresInstance resolves the Postgres hostname and port for a service +// container from the database spec. It prefers a co-located instance (same host +// as the service) for lower latency, falling back to any instance in the database. +// The hostname follows the swarm orchestrator convention: "postgres-{instanceID}". +func findPostgresInstance(nodeInstances []*database.NodeInstances, serviceHostID string) (string, int, error) { + const defaultPort = 5432 + + instancePort := func(inst *database.InstanceSpec) int { + if inst.Port != nil { + return *inst.Port + } + return defaultPort + } + + var fallback *database.InstanceSpec + for _, node := range nodeInstances { + for _, inst := range node.Instances { + if fallback == nil { + fallback = inst + } + if inst.HostID == serviceHostID { + return fmt.Sprintf("postgres-%s", inst.InstanceID), instancePort(inst), nil + } + } + } + + if fallback != nil { + return fmt.Sprintf("postgres-%s", fallback.InstanceID), instancePort(fallback), nil + } + + return "", 0, fmt.Errorf("no postgres instances found for service host %s", serviceHostID) +} diff --git a/server/internal/workflows/provision_services.go b/server/internal/workflows/provision_services.go deleted file mode 100644 index 36835b69..00000000 --- a/server/internal/workflows/provision_services.go +++ /dev/null @@ -1,429 +0,0 @@ -package workflows - -import ( - "fmt" - - "github.com/cschleiden/go-workflows/workflow" - "github.com/google/uuid" - - "github.com/pgEdge/control-plane/server/internal/database" - "github.com/pgEdge/control-plane/server/internal/host" - "github.com/pgEdge/control-plane/server/internal/monitor" - "github.com/pgEdge/control-plane/server/internal/resource" - "github.com/pgEdge/control-plane/server/internal/task" - "github.com/pgEdge/control-plane/server/internal/utils" - "github.com/pgEdge/control-plane/server/internal/workflows/activities" -) - -type ProvisionServicesInput struct { - TaskID uuid.UUID `json:"task_id"` - Spec *database.Spec `json:"spec"` -} - -type ProvisionServicesOutput struct { -} - -func (w *Workflows) ExecuteProvisionServices( - ctx workflow.Context, - input *ProvisionServicesInput, -) workflow.Future[*ProvisionServicesOutput] { - options := workflow.SubWorkflowOptions{ - Queue: utils.HostQueue(w.Config.HostID), - RetryOptions: workflow.RetryOptions{ - MaxAttempts: 1, - }, - } - return workflow.CreateSubWorkflowInstance[*ProvisionServicesOutput](ctx, options, w.ProvisionServices, input) -} - -func (w *Workflows) ProvisionServices(ctx workflow.Context, input *ProvisionServicesInput) (*ProvisionServicesOutput, error) { - logger := workflow.Logger(ctx).With("database_id", input.Spec.DatabaseID) - logger.With("service_count", len(input.Spec.Services)).Info("ProvisionServices workflow started") - - if len(input.Spec.Services) == 0 { - logger.Info("no services to provision - returning early") - return &ProvisionServicesOutput{}, nil - } - - // Parse database version for service compatibility validation - pgEdgeVersion, err := host.NewPgEdgeVersion(input.Spec.PostgresVersion, input.Spec.SpockVersion) - if err != nil { - return nil, fmt.Errorf("failed to parse pgedge version: %w", err) - } - - // Log task start - start := workflow.Now(ctx) - err = w.logTaskEvent(ctx, - task.ScopeDatabase, - input.Spec.DatabaseID, - input.TaskID, - task.LogEntry{ - Message: fmt.Sprintf("provisioning %d service(s)", len(input.Spec.Services)), - Fields: map[string]any{ - "service_count": len(input.Spec.Services), - }, - }, - ) - if err != nil { - return nil, err - } - - // Get existing database state - getCurrentInput := &activities.GetCurrentStateInput{ - DatabaseID: input.Spec.DatabaseID, - } - getCurrentOutput, err := w.Activities.ExecuteGetCurrentState(ctx, getCurrentInput).Get(ctx) - if err != nil { - return nil, fmt.Errorf("failed to get current state: %w", err) - } - accumulatedState := getCurrentOutput.State - - // Track prepared service instances for parallel deployment - type serviceInstancePrep struct { - serviceInstanceID string - serviceSpec *database.ServiceSpec - hostID string - resources []*resource.ResourceData - logFields []any // Logger fields for this instance - } - var preparedInstances []serviceInstancePrep - - // Phase 1: Prepare all service instances (create users, generate resources) - // This must be done serially because CreateServiceUser connects to Postgres - for _, serviceSpec := range input.Spec.Services { - logger := logger.With("service_id", serviceSpec.ServiceID) - - // Log service provisioning start - err := w.logTaskEvent(ctx, - task.ScopeDatabase, - input.Spec.DatabaseID, - input.TaskID, - task.LogEntry{ - Message: fmt.Sprintf("provisioning service '%s' on %d host(s)", serviceSpec.ServiceID, len(serviceSpec.HostIDs)), - Fields: map[string]any{ - "service_id": serviceSpec.ServiceID, - "service_type": serviceSpec.ServiceType, - "version": serviceSpec.Version, - "host_count": len(serviceSpec.HostIDs), - }, - }, - ) - if err != nil { - return nil, err - } - - // Prepare each service instance on each host - for _, hostID := range serviceSpec.HostIDs { - serviceInstanceID := database.GenerateServiceInstanceID(input.Spec.DatabaseID, serviceSpec.ServiceID, hostID) - instanceLogger := logger.With("service_instance_id", serviceInstanceID, "host_id", hostID) - - // Store service instance immediately with state="creating" - // This ensures it's visible even if provisioning fails later - storeInitialInput := &activities.StoreServiceInstanceInput{ - ServiceInstance: &database.ServiceInstance{ - ServiceInstanceID: serviceInstanceID, - ServiceID: serviceSpec.ServiceID, - DatabaseID: input.Spec.DatabaseID, - HostID: hostID, - State: database.ServiceInstanceStateCreating, - CreatedAt: workflow.Now(ctx), - UpdatedAt: workflow.Now(ctx), - }, - } - _, err := w.Activities.ExecuteStoreServiceInstance(ctx, storeInitialInput).Get(ctx) - if err != nil { - instanceLogger.With("error", err).Error("failed to store initial service instance") - return nil, fmt.Errorf("failed to store service instance %s: %w", serviceInstanceID, err) - } - - instanceLogger.Info("stored service instance with state=creating") - - // Error handler to mark service instance as failed and continue to next instance - // Can only be used AFTER the service instance is stored above - handleError := func(cause error) { - instanceLogger.With("error", cause).Error("failed to prepare service instance") - - // Mark service instance as failed - updateInstanceInput := &activities.UpdateServiceInstanceStateInput{ - ServiceInstanceID: serviceInstanceID, - DatabaseID: input.Spec.DatabaseID, - State: database.ServiceInstanceStateFailed, - Error: cause.Error(), - } - _, stateErr := w.Activities.ExecuteUpdateServiceInstanceState(ctx, updateInstanceInput).Get(ctx) - if stateErr != nil { - instanceLogger.With("error", stateErr).Warn("failed to update service instance state to failed") - } - } - - // Find any Postgres instance in this database to get connection details - // Services can be on different hosts than database instances, they just need - // database network connectivity. We prefer an instance on the same host for - // lower latency, but any instance will work. - var instanceHostname string - var instancePort = 5432 // Default Postgres port - var instanceHostID string // Host where the instance is running (needed for CreateServiceUser) - instanceResources := accumulatedState.GetAll(database.ResourceTypeInstance) - - // First try to find an instance on the same host (preferred for latency) - for _, instanceData := range instanceResources { - instance, err := resource.ToResource[*database.InstanceResource](instanceData) - if err != nil { - continue - } - if instance.Spec.DatabaseID == input.Spec.DatabaseID && instance.Spec.HostID == hostID { - instanceHostname = instance.InstanceHostname - instanceHostID = instance.Spec.HostID - if instance.Spec.Port != nil { - instancePort = *instance.Spec.Port - } - break - } - } - - // If no instance on same host, use any instance in the database - if instanceHostname == "" { - for _, instanceData := range instanceResources { - instance, err := resource.ToResource[*database.InstanceResource](instanceData) - if err != nil { - continue - } - if instance.Spec.DatabaseID == input.Spec.DatabaseID { - instanceHostname = instance.InstanceHostname - instanceHostID = instance.Spec.HostID - if instance.Spec.Port != nil { - instancePort = *instance.Spec.Port - } - break - } - } - } - - if instanceHostname == "" { - handleError(fmt.Errorf("no postgres instance found for database %s", input.Spec.DatabaseID)) - continue - } - - // Create database credentials for this service instance - // IMPORTANT: Execute on instanceHostID (where Postgres is running), not hostID (where service will run) - // CreateServiceUser needs to connect to the local Postgres container via Docker - createUserInput := &activities.CreateServiceUserInput{ - DatabaseID: input.Spec.DatabaseID, - DatabaseName: input.Spec.DatabaseName, - ServiceInstanceID: serviceInstanceID, - ServiceID: serviceSpec.ServiceID, - HostID: hostID, - } - createUserOutput, err := w.Activities.ExecuteCreateServiceUser(ctx, instanceHostID, createUserInput).Get(ctx) - if err != nil { - handleError(fmt.Errorf("failed to create service user for instance %s: %w", serviceInstanceID, err)) - continue - } - - instanceLogger.With("username", createUserOutput.Credentials.Username).Info("created service instance credentials") - - // Generate service instance resources - // Note: CohortMemberID is populated by the orchestrator using its swarmNodeID - serviceInstanceSpec := &database.ServiceInstanceSpec{ - ServiceInstanceID: serviceInstanceID, - ServiceSpec: serviceSpec, - PgEdgeVersion: pgEdgeVersion, - DatabaseID: input.Spec.DatabaseID, - DatabaseName: input.Spec.DatabaseName, - HostID: hostID, - Credentials: createUserOutput.Credentials, - DatabaseNetworkID: database.GenerateDatabaseNetworkID(input.Spec.DatabaseID), - DatabaseHost: instanceHostname, - DatabasePort: instancePort, - Port: serviceSpec.Port, - } - - generateInput := &activities.GenerateServiceInstanceResourcesInput{ - Spec: serviceInstanceSpec, - } - generateOutput, err := w.Activities.ExecuteGenerateServiceInstanceResources(ctx, generateInput).Get(ctx) - if err != nil { - handleError(fmt.Errorf("failed to generate service instance resources: %w", err)) - continue - } - - instanceLogger.With("resource_count", len(generateOutput.Resources.Resources)).Info("generated service instance resources") - - // Add monitor resource to track service instance state transitions - monitorResource := &monitor.ServiceInstanceMonitorResource{ - DatabaseID: input.Spec.DatabaseID, - ServiceInstanceID: serviceInstanceID, - HostID: hostID, - } - monitorResourceData, err := resource.ToResourceData(monitorResource) - if err != nil { - handleError(fmt.Errorf("failed to convert monitor resource to resource data: %w", err)) - continue - } - generateOutput.Resources.Resources = append(generateOutput.Resources.Resources, monitorResourceData) - - instanceLogger.With("resource_count", len(generateOutput.Resources.Resources)).Info("generated service instance resources with monitor") - - // Add to prepared instances for parallel deployment - preparedInstances = append(preparedInstances, serviceInstancePrep{ - serviceInstanceID: serviceInstanceID, - serviceSpec: serviceSpec, - hostID: hostID, - resources: generateOutput.Resources.Resources, - logFields: []any{"service_instance_id", serviceInstanceID, "host_id", hostID}, - }) - } - - } - - // Phase 2 & 3: Deploy all service instances in parallel - if len(preparedInstances) > 0 { - // Accumulate all service instance resources into desired state - serviceDesiredState := resource.NewState() - for _, prep := range preparedInstances { - serviceDesiredState.Add(prep.resources...) - } - - // Plan all service resources together (shares network, etc.) - serviceCurrentState := resource.NewState() - servicePlan, err := serviceCurrentState.Plan(resource.PlanOptions{}, serviceDesiredState) - if err != nil { - return nil, fmt.Errorf("failed to plan service instance resources: %w", err) - } - - // Apply all service resources in parallel (same pattern as database instances) - err = w.applyEvents(ctx, input.Spec.DatabaseID, input.TaskID, serviceCurrentState, servicePlan) - if err != nil { - logger.With("error", err).Warn("some service instances failed to deploy") - - // Check for resource errors and mark service instances as failed - // applyEvents may have aborted before creating monitors, so we must handle state transitions here - for _, prep := range preparedInstances { - // Get the ServiceInstance resource from current state to check for errors - // Use the same identifier format as swarm.ServiceInstanceResourceIdentifier but without import cycle - serviceInstanceIdentifier := resource.Identifier{ - ID: prep.serviceInstanceID, - Type: resource.Type(database.ResourceTypeServiceInstance), - } - serviceInstanceResourceData, found := serviceCurrentState.Get(serviceInstanceIdentifier) - - if found && serviceInstanceResourceData != nil && serviceInstanceResourceData.Error != "" { - // ServiceInstance deployment failed - mark as failed in etcd - updateInput := &activities.UpdateServiceInstanceStateInput{ - ServiceInstanceID: prep.serviceInstanceID, - DatabaseID: input.Spec.DatabaseID, - State: database.ServiceInstanceStateFailed, - Error: serviceInstanceResourceData.Error, - } - - _, updateErr := w.Activities.ExecuteUpdateServiceInstanceState(ctx, updateInput).Get(ctx) - if updateErr != nil { - logger.With("error", updateErr, "service_instance_id", prep.serviceInstanceID). - Error("failed to update service instance state to failed") - } else { - logger.With("service_instance_id", prep.serviceInstanceID). - Info("marked service instance as failed due to deployment error") - } - } - } - } - - // Merge service resources into accumulated state - for _, resourcesByID := range serviceCurrentState.Resources { - for _, res := range resourcesByID { - accumulatedState.Add(res) - } - } - - // Phase 4: Update statuses for all service instances - for _, prep := range preparedInstances { - instanceLogger := logger.With(prep.logFields...) - - // Get service instance status (connection info, ports) - statusInput := &activities.GetServiceInstanceStatusInput{ - ServiceInstanceID: prep.serviceInstanceID, - HostID: prep.hostID, - } - statusOutput, err := w.Activities.ExecuteGetServiceInstanceStatus(ctx, prep.hostID, statusInput).Get(ctx) - if err != nil { - instanceLogger.With("error", err).Warn("failed to get service instance status (monitor will enrich)") - continue - } - - // If status is nil, the container is still starting - leave state as "creating" - // The instance monitor will update it to "running" once the container is ready - if statusOutput.Status == nil { - instanceLogger.Info("service container still starting - status will be populated by monitoring") - continue - } - - // Update service instance state to "running" with connection info - updateInstanceInput := &activities.UpdateServiceInstanceStateInput{ - ServiceInstanceID: prep.serviceInstanceID, - DatabaseID: input.Spec.DatabaseID, - State: database.ServiceInstanceStateRunning, - Status: statusOutput.Status, - } - _, err = w.Activities.ExecuteUpdateServiceInstanceState(ctx, updateInstanceInput).Get(ctx) - if err != nil { - instanceLogger.With("error", err).Error("failed to update service instance state") - continue - } - - instanceLogger.Info("service instance provisioned successfully") - } - - } - - // Log overall service provisioning completion - for _, serviceSpec := range input.Spec.Services { - err = w.logTaskEvent(ctx, - task.ScopeDatabase, - input.Spec.DatabaseID, - input.TaskID, - task.LogEntry{ - Message: fmt.Sprintf("provisioned service '%s' on %d host(s)", serviceSpec.ServiceID, len(serviceSpec.HostIDs)), - Fields: map[string]any{ - "service_id": serviceSpec.ServiceID, - "host_count": len(serviceSpec.HostIDs), - }, - }, - ) - if err != nil { - return nil, err - } - } - - // Persist the complete state with all database and service instance resources - persistInput := &activities.PersistStateInput{ - DatabaseID: input.Spec.DatabaseID, - State: accumulatedState, - } - _, err = w.Activities.ExecutePersistState(ctx, persistInput).Get(ctx) - if err != nil { - logger.With("error", err).Error("failed to persist service instance state") - return nil, fmt.Errorf("failed to persist service instance state: %w", err) - } - - // Log task completion - duration := workflow.Now(ctx).Sub(start) - err = w.logTaskEvent(ctx, - task.ScopeDatabase, - input.Spec.DatabaseID, - input.TaskID, - task.LogEntry{ - Message: fmt.Sprintf("finished provisioning %d service(s) (took %s)", len(input.Spec.Services), duration), - Fields: map[string]any{ - "service_count": len(input.Spec.Services), - "duration_ms": duration.Milliseconds(), - }, - }, - ) - if err != nil { - return nil, err - } - - logger.Info("successfully provisioned all services") - - return &ProvisionServicesOutput{}, nil -} diff --git a/server/internal/workflows/update_database.go b/server/internal/workflows/update_database.go index 6ed666b7..e1ae9a72 100644 --- a/server/internal/workflows/update_database.go +++ b/server/internal/workflows/update_database.go @@ -119,39 +119,6 @@ func (w *Workflows) UpdateDatabase(ctx workflow.Context, input *UpdateDatabaseIn return nil, handleError(err) } - // Provision services after database resources are applied - logger.With("service_count_in_spec", len(input.Spec.Services)).Info("checking if we need to provision services") - if len(input.Spec.Services) > 0 { - provisionServicesInput := &ProvisionServicesInput{ - TaskID: input.TaskID, - Spec: input.Spec, - } - - logger.With("service_count", len(input.Spec.Services)).Info("calling ProvisionServices workflow") - - _, err = w.ExecuteProvisionServices(ctx, provisionServicesInput).Get(ctx) - if err != nil { - // Log service provisioning failure but allow database to succeed - // Service instances will be marked as "failed" with error details - logger.With("error", err).Error("failed to provision services - database will be available but services degraded") - - err = w.logTaskEvent(ctx, - task.ScopeDatabase, - input.Spec.DatabaseID, - input.TaskID, - task.LogEntry{ - Message: "service provisioning failed - database available but services unavailable", - Fields: map[string]any{ - "error": err.Error(), - }, - }, - ) - if err != nil { - logger.With("error", err).Warn("failed to log service provisioning error") - } - } - } - updateStateInput := &activities.UpdateDbStateInput{ DatabaseID: input.Spec.DatabaseID, State: database.DatabaseStateAvailable, diff --git a/server/internal/workflows/workflows.go b/server/internal/workflows/workflows.go index c812f9f9..21f94199 100644 --- a/server/internal/workflows/workflows.go +++ b/server/internal/workflows/workflows.go @@ -27,7 +27,6 @@ func (w *Workflows) Register(work *worker.Worker) error { work.RegisterWorkflow(w.PgBackRestRestore), work.RegisterWorkflow(w.PlanRestore), work.RegisterWorkflow(w.PlanUpdate), - work.RegisterWorkflow(w.ProvisionServices), work.RegisterWorkflow(w.RefreshCurrentState), work.RegisterWorkflow(w.RemoveHost), work.RegisterWorkflow(w.RestartInstance),