Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
359 changes: 297 additions & 62 deletions e2e/service_provisioning_test.go

Large diffs are not rendered by default.

18 changes: 18 additions & 0 deletions server/internal/database/operations/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,3 +67,21 @@ func mergePartialStates(in [][]*resource.State) []*resource.State {

return out
}

// ServiceResources represents the resources for a single service instance.
type ServiceResources struct {
ServiceInstanceID string
Resources []*resource.ResourceData
MonitorResource resource.Resource
}

func (s *ServiceResources) State() (*resource.State, error) {
state := resource.NewState()
state.Add(s.Resources...)
if s.MonitorResource != nil {
if err := state.AddResource(s.MonitorResource); err != nil {
return nil, err
}
}
return state, nil
}
10 changes: 9 additions & 1 deletion server/internal/database/operations/end.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (

// EndState computes the end state for the database, containing only the given
// nodes.
func EndState(nodes []*NodeResources) (*resource.State, error) {
func EndState(nodes []*NodeResources, services []*ServiceResources) (*resource.State, error) {
end := resource.NewState()
for _, node := range nodes {
var resources []resource.Resource
Expand Down Expand Up @@ -59,5 +59,13 @@ func EndState(nodes []*NodeResources) (*resource.State, error) {
}
}

for _, svc := range services {
state, err := svc.State()
if err != nil {
return nil, err
}
end.Merge(state)
}

return end, nil
}
4 changes: 2 additions & 2 deletions server/internal/database/operations/restore_database.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func RestoreDatabase(
targets []*NodeRestoreResources,
) ([]resource.Plan, error) {
// The other states will be applied on top of this base state.
base, err := EndState(nodes)
base, err := EndState(nodes, nil)
if err != nil {
return nil, err
}
Expand All @@ -118,7 +118,7 @@ func RestoreDatabase(
states[i] = merged
}

end, err := EndState(allNodes)
end, err := EndState(allNodes, nil)
if err != nil {
return nil, err
}
Expand Down
5 changes: 3 additions & 2 deletions server/internal/database/operations/update_database.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ func UpdateDatabase(
options UpdateDatabaseOptions,
start *resource.State,
nodes []*NodeResources,
services []*ServiceResources,
) ([]resource.Plan, error) {
update, err := updateFunc(options)
if err != nil {
Expand Down Expand Up @@ -65,7 +66,7 @@ func UpdateDatabase(
}

// Auto-select source node ONLY when both SourceNode and RestoreConfig are empty.
// If no existing nodes (fresh cluster), skip auto-select (dont error).
// If no existing nodes (fresh cluster), skip auto-select (don't error).
if len(adds) > 0 {
var defaultSource string
if len(updates) > 0 {
Expand Down Expand Up @@ -112,7 +113,7 @@ func UpdateDatabase(
}
}

end, err := EndState(nodes)
end, err := EndState(nodes, services)
if err != nil {
return nil, err
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -337,6 +337,7 @@ func TestUpdateDatabase(t *testing.T) {
tc.options,
tc.start,
tc.nodes,
nil,
)
if tc.expectedErr != "" {
assert.Nil(t, plans)
Expand Down
1 change: 1 addition & 0 deletions server/internal/database/service_instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,7 @@ type ServiceInstanceSpec struct {
CohortMemberID string
Credentials *ServiceUser
DatabaseNetworkID string
PostgresHostID string // Host where Postgres instance runs (for ServiceUserRole executor routing)
DatabaseHost string // Postgres instance hostname to connect to
DatabasePort int // Postgres instance port
Port *int // Service instance published port (optional, 0 = random)
Expand Down
12 changes: 11 additions & 1 deletion server/internal/orchestrator/swarm/orchestrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -426,8 +426,16 @@ func (o *Orchestrator) GenerateServiceInstanceResources(spec *database.ServiceIn
ServiceInstanceID: spec.ServiceInstanceID,
DatabaseID: spec.DatabaseID,
DatabaseName: spec.DatabaseName,
Username: spec.Credentials.Username,
HostID: spec.HostID,
PostgresHostID: spec.PostgresHostID,
ServiceID: spec.ServiceSpec.ServiceID,
}
// Username and Password are populated from existing state during Refresh,
// or generated during Create. Only set if credentials exist (backward
// compatibility with existing state).
if spec.Credentials != nil {
serviceUserRole.Username = spec.Credentials.Username
serviceUserRole.Password = spec.Credentials.Password
}

// Service instance spec resource
Expand All @@ -454,6 +462,8 @@ func (o *Orchestrator) GenerateServiceInstanceResources(spec *database.ServiceIn
ServiceInstanceID: spec.ServiceInstanceID,
DatabaseID: spec.DatabaseID,
ServiceName: serviceName,
ServiceID: spec.ServiceSpec.ServiceID,
HostID: spec.HostID,
}

orchestratorResources := []resource.Resource{
Expand Down
95 changes: 62 additions & 33 deletions server/internal/orchestrator/swarm/service_instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ type ServiceInstanceResource struct {
DatabaseID string `json:"database_id"`
ServiceName string `json:"service_name"`
ServiceID string `json:"service_id"`
HostID string `json:"host_id"`
NeedsUpdate bool `json:"needs_update"`
}

Expand All @@ -44,6 +45,7 @@ func (s *ServiceInstanceResource) DiffIgnore() []string {
return []string{
"/database_id",
"/service_id",
"/host_id",
}
}

Expand Down Expand Up @@ -90,6 +92,57 @@ func (s *ServiceInstanceResource) Refresh(ctx context.Context, rc *resource.Cont
}

func (s *ServiceInstanceResource) Create(ctx context.Context, rc *resource.Context) error {
dbSvc, err := do.Invoke[*database.Service](rc.Injector)
if err != nil {
return fmt.Errorf("failed to get database service: %w", err)
}

// Store initial service instance record in etcd with state="creating".
// This ensures the service instance is visible in the API even if deployment
// fails later. Previously this was done by the StoreServiceInstance activity
// in ProvisionServices.
err = dbSvc.UpdateServiceInstance(ctx, &database.ServiceInstanceUpdateOptions{
ServiceInstanceID: s.ServiceInstanceID,
ServiceID: s.ServiceID,
DatabaseID: s.DatabaseID,
HostID: s.HostID,
State: database.ServiceInstanceStateCreating,
})
if err != nil {
return fmt.Errorf("failed to store initial service instance: %w", err)
}

return s.deploy(ctx, rc)
}

func (s *ServiceInstanceResource) Update(ctx context.Context, rc *resource.Context) error {
client, err := do.Invoke[*docker.Docker](rc.Injector)
if err != nil {
return err
}

resp, err := client.ServiceInspectByLabels(ctx, map[string]string{
"pgedge.component": "service",
"pgedge.service.instance.id": s.ServiceInstanceID,
})
if err != nil && !errors.Is(err, docker.ErrNotFound) {
return fmt.Errorf("failed to inspect service instance: %w", err)
}
if err == nil && resp.Spec.Name != s.ServiceName {
// If the service name has changed, we need to remove the service with
// the old name so that it can be recreated with the new name.
if err := client.ServiceRemove(ctx, resp.Spec.Name); err != nil {
return fmt.Errorf("failed to remove service instance for service name update: %w", err)
}
}

return s.deploy(ctx, rc)
}

// deploy handles the shared Docker service deployment logic used by both
// Create and Update. It deploys the service, waits for it to start, and
// transitions the etcd state to "running".
func (s *ServiceInstanceResource) deploy(ctx context.Context, rc *resource.Context) error {
client, err := do.Invoke[*docker.Docker](rc.Injector)
if err != nil {
return err
Expand All @@ -100,6 +153,11 @@ func (s *ServiceInstanceResource) Create(ctx context.Context, rc *resource.Conte
return err
}

dbSvc, err := do.Invoke[*database.Service](rc.Injector)
if err != nil {
return fmt.Errorf("failed to get database service: %w", err)
}

specResourceID := ServiceInstanceSpecResourceIdentifier(s.ServiceInstanceID)
spec, err := resource.FromContext[*ServiceInstanceSpecResource](rc, specResourceID)
if err != nil {
Expand Down Expand Up @@ -140,44 +198,15 @@ func (s *ServiceInstanceResource) Create(ctx context.Context, rc *resource.Conte
// Transition state to "running" immediately after successful deployment.
// This mirrors the database instance pattern where state is set
// deterministically within the resource activity, not deferred to the monitor.
dbSvc, err := do.Invoke[*database.Service](rc.Injector)
if err != nil {
logger.Warn().Err(err).Msg("failed to get database service for state update")
} else {
if err := dbSvc.SetServiceInstanceState(ctx, s.DatabaseID, s.ServiceInstanceID, database.ServiceInstanceStateRunning); err != nil {
logger.Warn().Err(err).
Str("service_instance_id", s.ServiceInstanceID).
Msg("failed to update service instance state to running (monitor will handle it)")
}
if err := dbSvc.SetServiceInstanceState(ctx, s.DatabaseID, s.ServiceInstanceID, database.ServiceInstanceStateRunning); err != nil {
logger.Warn().Err(err).
Str("service_instance_id", s.ServiceInstanceID).
Msg("failed to update service instance state to running (monitor will handle it)")
}

return nil
}

func (s *ServiceInstanceResource) Update(ctx context.Context, rc *resource.Context) error {
client, err := do.Invoke[*docker.Docker](rc.Injector)
if err != nil {
return err
}

resp, err := client.ServiceInspectByLabels(ctx, map[string]string{
"pgedge.component": "service",
"pgedge.service.instance.id": s.ServiceInstanceID,
})
if err != nil && !errors.Is(err, docker.ErrNotFound) {
return fmt.Errorf("failed to inspect service instance: %w", err)
}
if err == nil && resp.Spec.Name != s.ServiceName {
// If the service name has changed, we need to remove the service with
// the old name so that it can be recreated with the new name.
if err := client.ServiceRemove(ctx, resp.Spec.Name); err != nil {
return fmt.Errorf("failed to remove service instance for service name update: %w", err)
}
}

return s.Create(ctx, rc)
}

func (s *ServiceInstanceResource) Delete(ctx context.Context, rc *resource.Context) error {
client, err := do.Invoke[*docker.Docker](rc.Injector)
if err != nil {
Expand Down
24 changes: 19 additions & 5 deletions server/internal/orchestrator/swarm/service_instance_spec.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,22 +58,36 @@ func (s *ServiceInstanceSpecResource) Executor() resource.Executor {
}

func (s *ServiceInstanceSpecResource) Dependencies() []resource.Identifier {
// Service instances depend on the database network existing
// Service instances depend on the database network and service user role
return []resource.Identifier{
NetworkResourceIdentifier(s.DatabaseNetworkID),
ServiceUserRoleIdentifier(s.ServiceInstanceID),
}
}

func (s *ServiceInstanceSpecResource) populateCredentials(rc *resource.Context) error {
userRole, err := resource.FromContext[*ServiceUserRole](rc, ServiceUserRoleIdentifier(s.ServiceInstanceID))
if err != nil {
return fmt.Errorf("failed to get service user role from state: %w", err)
}
s.Credentials = &database.ServiceUser{
Username: userRole.Username,
Password: userRole.Password,
Role: "pgedge_application_read_only",
}
return nil
}

func (s *ServiceInstanceSpecResource) Refresh(ctx context.Context, rc *resource.Context) error {
network, err := resource.FromContext[*Network](rc, NetworkResourceIdentifier(s.DatabaseNetworkID))
if err != nil {
return fmt.Errorf("failed to get database network from state: %w", err)
}

// DatabaseHost and DatabasePort are populated by the ProvisionServices workflow,
// which prefers a co-located instance for lower latency but falls back to any
// instance in the database when no local instance exists.
// TODO: consider alternatives and discuss with the team
// Populate credentials from the ServiceUserRole resource
if err := s.populateCredentials(rc); err != nil {
return err
}

spec, err := ServiceContainerSpec(&ServiceContainerSpecOptions{
ServiceSpec: s.ServiceSpec,
Expand Down
Loading