diff --git a/docs/primitives.md b/docs/primitives.md index 7e6e156e..fec3bb0b 100644 --- a/docs/primitives.md +++ b/docs/primitives.md @@ -156,6 +156,7 @@ have been applied. This means a single mutation can safely add a container and t | `pkg/primitives/statefulset` | Workload | [statefulset.md](primitives/statefulset.md) | | `pkg/primitives/replicaset` | Workload | [replicaset.md](primitives/replicaset.md) | | `pkg/primitives/daemonset` | Workload | [daemonset.md](primitives/daemonset.md) | +| `pkg/primitives/job` | Task | [job.md](primitives/job.md) | | `pkg/primitives/cronjob` | Integration | [cronjob.md](primitives/cronjob.md) | | `pkg/primitives/configmap` | Static | [configmap.md](primitives/configmap.md) | | `pkg/primitives/pdb` | Static | [pdb.md](primitives/pdb.md) | diff --git a/docs/primitives/job.md b/docs/primitives/job.md new file mode 100644 index 00000000..9c6c1ded --- /dev/null +++ b/docs/primitives/job.md @@ -0,0 +1,256 @@ +# Job Primitive + +The `job` primitive is the framework's built-in task abstraction for managing Kubernetes `Job` resources. It integrates +fully with the component lifecycle and provides a rich mutation API for managing containers, pod specs, and metadata — +following the same pod-template mutation pattern as the Deployment primitive. + +## Capabilities + +| Capability | Detail | +| ----------------------- | ----------------------------------------------------------------------------------------------- | +| **Completion tracking** | Monitors Job conditions and reports `Completed`, `TaskRunning`, `TaskPending`, or `TaskFailing` | +| **Suspension** | Sets `spec.suspend=true` or deletes the Job (default); reports `Suspending` / `Suspended` | +| **Mutation pipeline** | Typed editors for metadata, job spec, pod spec, and containers | + +## Building a Job Primitive + +```go +import "github.com/sourcehawk/operator-component-framework/pkg/primitives/job" + +base := &batchv1.Job{ + ObjectMeta: metav1.ObjectMeta{ + Name: "db-migration", + Namespace: owner.Namespace, + }, + Spec: batchv1.JobSpec{ + Template: corev1.PodTemplateSpec{ + Spec: corev1.PodSpec{ + RestartPolicy: corev1.RestartPolicyOnFailure, + Containers: []corev1.Container{ + {Name: "migrate", Image: "my-app-migration:latest"}, + }, + }, + }, + }, +} + +resource, err := job.NewBuilder(base). + WithMutation(MyFeatureMutation(owner.Spec.Version)). + Build() +``` + +## Mutations + +Mutations are the primary mechanism for modifying a `Job` beyond its baseline. Each mutation is a named function that +receives a `*Mutator` and records edit intent through typed editors. + +The `Feature` field controls when a mutation applies. Leaving it nil applies the mutation unconditionally. A feature +with no version constraints and no `When()` conditions is also always enabled: + +```go +func MyFeatureMutation(version string) job.Mutation { + return job.Mutation{ + Name: "my-feature", + Feature: feature.NewResourceFeature(version, nil), // always enabled + Mutate: func(m *job.Mutator) error { + // record edits here + return nil + }, + } +} +``` + +Mutations are applied in the order they are registered with the builder. If one mutation depends on a change made by +another, register the dependency first. + +### Boolean-gated mutations + +Use `When(bool)` to gate a mutation on a runtime condition: + +```go +func TracingMutation(version string, enabled bool) job.Mutation { + return job.Mutation{ + Name: "tracing", + Feature: feature.NewResourceFeature(version, nil).When(enabled), + Mutate: func(m *job.Mutator) error { + m.EnsureContainerEnvVar(corev1.EnvVar{ + Name: "OTEL_EXPORTER_OTLP_ENDPOINT", + Value: "http://otel-collector:4317", + }) + return nil + }, + } +} +``` + +### Version-gated mutations + +Pass a `[]feature.VersionConstraint` to gate on a semver range: + +```go +var legacyConstraint = mustSemverConstraint("< 2.0.0") + +func LegacyMigrationMutation(version string) job.Mutation { + return job.Mutation{ + Name: "legacy-migration-format", + Feature: feature.NewResourceFeature( + version, + []feature.VersionConstraint{legacyConstraint}, + ), + Mutate: func(m *job.Mutator) error { + m.EditContainers(selectors.ContainerNamed("migrate"), func(e *editors.ContainerEditor) error { + e.EnsureEnvVar(corev1.EnvVar{Name: "MIGRATION_FORMAT", Value: "v1"}) + return nil + }) + return nil + }, + } +} +``` + +All version constraints and `When()` conditions must be satisfied for a mutation to apply. + +## Internal Mutation Ordering + +Within a single mutation, edit operations are grouped into categories and applied in a fixed sequence regardless of the +order they are recorded. This ensures structural consistency across mutations. + +| Step | Category | What it affects | +| ---- | --------------------------- | ----------------------------------------------------------------------- | +| 1 | Job metadata edits | Labels and annotations on the `Job` object | +| 2 | JobSpec edits | Completions, parallelism, backoff limit, deadline, etc. | +| 3 | Pod template metadata edits | Labels and annotations on the pod template | +| 4 | Pod spec edits | Volumes, tolerations, node selectors, service account, security context | +| 5 | Regular container presence | Adding or removing containers from `spec.template.spec.containers` | +| 6 | Regular container edits | Env vars, args, resources (snapshot taken after step 5) | +| 7 | Init container presence | Adding or removing containers from `spec.template.spec.initContainers` | +| 8 | Init container edits | Env vars, args, resources (snapshot taken after step 7) | + +Container edits (steps 6 and 8) are evaluated against a snapshot taken _after_ presence operations in the same mutation. +This means a single mutation can add a container and then configure it without selector resolution issues. + +## Editors + +### JobSpecEditor + +Controls job-level settings via `m.EditJobSpec`. + +Available methods: `SetCompletions`, `SetParallelism`, `SetBackoffLimit`, `SetActiveDeadlineSeconds`, +`SetTTLSecondsAfterFinished`, `SetCompletionMode`, `Raw`. + +```go +m.EditJobSpec(func(e *editors.JobSpecEditor) error { + e.SetBackoffLimit(3) + e.SetActiveDeadlineSeconds(600) + return nil +}) +``` + +For fields not covered by the typed API, use `Raw()`: + +```go +m.EditJobSpec(func(e *editors.JobSpecEditor) error { + e.Raw().Suspend = ptr.To(true) + return nil +}) +``` + +### PodSpecEditor + +Manages pod-level configuration via `m.EditPodSpec`. + +Available methods: `SetServiceAccountName`, `EnsureVolume`, `RemoveVolume`, `EnsureToleration`, `RemoveTolerations`, +`EnsureNodeSelector`, `RemoveNodeSelector`, `EnsureImagePullSecret`, `RemoveImagePullSecret`, `SetPriorityClassName`, +`SetHostNetwork`, `SetHostPID`, `SetHostIPC`, `SetSecurityContext`, `Raw`. + +```go +m.EditPodSpec(func(e *editors.PodSpecEditor) error { + e.SetServiceAccountName("migration-sa") + e.EnsureVolume(corev1.Volume{ + Name: "config", + VolumeSource: corev1.VolumeSource{ + ConfigMap: &corev1.ConfigMapVolumeSource{ + LocalObjectReference: corev1.LocalObjectReference{Name: "migration-config"}, + }, + }, + }) + return nil +}) +``` + +### ContainerEditor + +Modifies individual containers via `m.EditContainers` or `m.EditInitContainers`. Always used in combination with a +[selector](../primitives.md#container-selectors). + +Available methods: `EnsureEnvVar`, `EnsureEnvVars`, `RemoveEnvVar`, `RemoveEnvVars`, `EnsureArg`, `EnsureArgs`, +`RemoveArg`, `RemoveArgs`, `SetResourceLimit`, `SetResourceRequest`, `SetResources`, `Raw`. + +```go +m.EditContainers(selectors.ContainerNamed("migrate"), func(e *editors.ContainerEditor) error { + e.EnsureEnvVar(corev1.EnvVar{Name: "DB_HOST", Value: "postgres:5432"}) + e.SetResourceLimit(corev1.ResourceCPU, resource.MustParse("500m")) + return nil +}) +``` + +### ObjectMetaEditor + +Modifies labels and annotations. Use `m.EditObjectMetadata` to target the `Job` object itself, or +`m.EditPodTemplateMetadata` to target the pod template. + +Available methods: `EnsureLabel`, `RemoveLabel`, `EnsureAnnotation`, `RemoveAnnotation`, `Raw`. + +```go +m.EditObjectMetadata(func(e *editors.ObjectMetaEditor) error { + e.EnsureLabel("app.kubernetes.io/version", version) + return nil +}) +``` + +## Convenience Methods + +The `Mutator` exposes convenience wrappers that target all containers at once: + +| Method | Equivalent to | +| ----------------------------- | ------------------------------------------------------------- | +| `EnsureContainerEnvVar(ev)` | `EditContainers(AllContainers(), ...)` → `EnsureEnvVar(ev)` | +| `RemoveContainerEnvVar(name)` | `EditContainers(AllContainers(), ...)` → `RemoveEnvVar(name)` | + +## Suspension + +Jobs use the Task lifecycle for suspension, which differs from Workloads: + +- **Default behavior**: `DefaultDeleteOnSuspendHandler` returns `true`, meaning the Job is deleted from the cluster + during suspension. +- **Suspend mutation**: `DefaultSuspendMutationHandler` sets `spec.suspend=true`, which prevents the Job controller from + creating new pods while allowing existing pods to complete. +- **Suspension status**: `DefaultSuspensionStatusHandler` checks if `spec.suspend=true` and `status.active=0`. + +Override any of these via the Builder: + +```go +resource, err := job.NewBuilder(base). + WithCustomSuspendDeletionDecision(func(j *batchv1.Job) bool { + return false // Keep the Job in the cluster when suspended + }). + Build() +``` + +## Guidance + +**`Feature: nil` applies unconditionally.** Omit `Feature` (leave it nil) for mutations that should always run. Use +`feature.NewResourceFeature(version, constraints)` when version-based gating is needed, and chain `.When(bool)` for +boolean conditions. + +**Register mutations in dependency order.** If mutation B relies on a container added by mutation A, register A first. +The internal ordering within each mutation handles intra-mutation dependencies automatically. + +**Prefer `EnsureContainer` over direct slice manipulation.** The mutator tracks presence operations so that selectors in +the same mutation resolve correctly and reconciliation remains idempotent. + +**Use selectors for precision.** Targeting `AllContainers()` when you only mean to modify the primary container can +cause unexpected behavior if init containers or sidecar containers are present. + +**Jobs are deleted on suspend by default.** Unlike Deployments which scale to zero, Jobs are deleted during suspension. +Override `WithCustomSuspendDeletionDecision` if you need to keep the Job resource in the cluster. diff --git a/examples/job-primitive/README.md b/examples/job-primitive/README.md new file mode 100644 index 00000000..a9532c98 --- /dev/null +++ b/examples/job-primitive/README.md @@ -0,0 +1,36 @@ +# Job Primitive Example + +This example demonstrates the usage of the `job` primitive within the operator component framework. It shows how to +manage a Kubernetes Job as a component of a larger application, utilizing features like: + +- **Base Construction**: Initializing a Job with basic metadata, spec, and restart policy. +- **Feature Mutations**: Applying version-gated or conditional changes (env vars, image version, retry policies) using + the `Mutator`. +- **Custom Status Handlers**: Overriding the default `ConvergingStatus` interface using the `WithCustomConvergeStatus` + builder option. +- **Suspension**: Demonstrating how Jobs are suspended (deleted by default) when the component is suspended. +- **Data Extraction**: Harvesting information from the reconciled resource. + +## Directory Structure + +- `app/`: Defines the mock `ExampleApp` CRD and the controller that uses the component framework. +- `features/`: Contains modular feature definitions: + - `mutations.go`: tracing env vars, retry policies, and version-based image updates. + - `status.go`: implementation of a custom handler for completion status. +- `resources/`: Contains the central `NewJobResource` factory that assembles all features using the `job.Builder`. +- `main.go`: A standalone entry point that demonstrates a single reconciliation loop using a fake client. + +## Running the Example + +You can run this example directly using `go run`: + +```bash +go run examples/job-primitive/main.go +``` + +This will: + +1. Initialize a fake Kubernetes client. +2. Create an `ExampleApp` owner object. +3. Reconcile the `ExampleApp` components through multiple spec changes. +4. Print the resulting status conditions after each reconciliation step. diff --git a/examples/job-primitive/app/controller.go b/examples/job-primitive/app/controller.go new file mode 100644 index 00000000..52262e7c --- /dev/null +++ b/examples/job-primitive/app/controller.go @@ -0,0 +1,54 @@ +// Package app provides a sample controller using the job primitive. +package app + +import ( + "context" + + "github.com/sourcehawk/operator-component-framework/pkg/component" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/client-go/tools/record" + "sigs.k8s.io/controller-runtime/pkg/client" +) + +// ExampleController reconciles an ExampleApp object using the component framework. +type ExampleController struct { + client.Client + Scheme *runtime.Scheme + Recorder record.EventRecorder + Metrics component.Recorder + + // NewJobResource is a factory function to create the job resource. + // This allows us to inject the resource construction logic. + NewJobResource func(*ExampleApp) (component.Resource, error) +} + +// Reconcile performs the reconciliation for a single ExampleApp. +func (r *ExampleController) Reconcile(ctx context.Context, owner *ExampleApp) error { + // 1. Build the job resource for this owner. + jobResource, err := r.NewJobResource(owner) + if err != nil { + return err + } + + // 2. Build the component that manages the job. + comp, err := component.NewComponentBuilder(). + WithName("example-migration"). + WithConditionType("MigrationReady"). + WithResource(jobResource, component.ResourceOptions{}). + Suspend(owner.Spec.Suspended). + Build() + if err != nil { + return err + } + + // 3. Execute the component reconciliation. + resCtx := component.ReconcileContext{ + Client: r.Client, + Scheme: r.Scheme, + Recorder: r.Recorder, + Metrics: r.Metrics, + Owner: owner, + } + + return comp.Reconcile(ctx, resCtx) +} diff --git a/examples/job-primitive/app/owner.go b/examples/job-primitive/app/owner.go new file mode 100644 index 00000000..6b611a02 --- /dev/null +++ b/examples/job-primitive/app/owner.go @@ -0,0 +1,20 @@ +package app + +import ( + sharedapp "github.com/sourcehawk/operator-component-framework/examples/shared/app" +) + +// ExampleApp re-exports the shared CRD type so callers in this package need no import alias. +type ExampleApp = sharedapp.ExampleApp + +// ExampleAppSpec re-exports the shared spec type. +type ExampleAppSpec = sharedapp.ExampleAppSpec + +// ExampleAppStatus re-exports the shared status type. +type ExampleAppStatus = sharedapp.ExampleAppStatus + +// ExampleAppList re-exports the shared list type. +type ExampleAppList = sharedapp.ExampleAppList + +// AddToScheme registers the ExampleApp types with the given scheme. +var AddToScheme = sharedapp.AddToScheme diff --git a/examples/job-primitive/features/mutations.go b/examples/job-primitive/features/mutations.go new file mode 100644 index 00000000..a5a21b45 --- /dev/null +++ b/examples/job-primitive/features/mutations.go @@ -0,0 +1,70 @@ +// Package features provides sample features for the job primitive. +package features + +import ( + "fmt" + + "github.com/sourcehawk/operator-component-framework/pkg/feature" + "github.com/sourcehawk/operator-component-framework/pkg/mutation/editors" + "github.com/sourcehawk/operator-component-framework/pkg/mutation/selectors" + "github.com/sourcehawk/operator-component-framework/pkg/primitives/job" + corev1 "k8s.io/api/core/v1" +) + +// TracingFeature adds tracing environment variables to the job containers. +func TracingFeature(enabled bool) job.Mutation { + return job.Mutation{ + Name: "Tracing", + Feature: feature.NewResourceFeature("any", nil).When(enabled), + Mutate: func(m *job.Mutator) error { + m.EnsureContainerEnvVar(corev1.EnvVar{ + Name: "OTEL_EXPORTER_OTLP_ENDPOINT", + Value: "http://otel-collector:4317", + }) + m.EnsureContainerEnvVar(corev1.EnvVar{ + Name: "OTEL_TRACES_SAMPLER", + Value: "always_on", + }) + + return nil + }, + } +} + +// RetryPolicyFeature configures the job's retry behavior. +func RetryPolicyFeature(version string) job.Mutation { + return job.Mutation{ + Name: "RetryPolicy", + Feature: feature.NewResourceFeature(version, nil), + Mutate: func(m *job.Mutator) error { + m.EditJobSpec(func(e *editors.JobSpecEditor) error { + e.SetBackoffLimit(3) + e.SetActiveDeadlineSeconds(600) + return nil + }) + + return nil + }, + } +} + +// VersionFeature sets the image version and a label. +func VersionFeature(version string) job.Mutation { + return job.Mutation{ + Name: "Version", + Feature: feature.NewResourceFeature(version, nil), + Mutate: func(m *job.Mutator) error { + m.EditContainers(selectors.ContainerNamed("migrate"), func(ce *editors.ContainerEditor) error { + ce.Raw().Image = fmt.Sprintf("my-app-migration:%s", version) + return nil + }) + + m.EditObjectMetadata(func(meta *editors.ObjectMetaEditor) error { + meta.EnsureLabel("app.kubernetes.io/version", version) + return nil + }) + + return nil + }, + } +} diff --git a/examples/job-primitive/features/status.go b/examples/job-primitive/features/status.go new file mode 100644 index 00000000..ebdf6fc8 --- /dev/null +++ b/examples/job-primitive/features/status.go @@ -0,0 +1,29 @@ +package features + +import ( + "fmt" + + "github.com/sourcehawk/operator-component-framework/pkg/component/concepts" + "github.com/sourcehawk/operator-component-framework/pkg/primitives/job" + batchv1 "k8s.io/api/batch/v1" +) + +// CustomConvergeStatus demonstrates a custom handler for job completion status. +func CustomConvergeStatus() func(concepts.ConvergingOperation, *batchv1.Job) (concepts.CompletionStatusWithReason, error) { + return func(op concepts.ConvergingOperation, j *batchv1.Job) (concepts.CompletionStatusWithReason, error) { + // Use the default logic but add a custom reason or additional checks. + status, err := job.DefaultConvergingStatusHandler(op, j) + if err != nil { + return status, err + } + + switch status.Status { + case concepts.CompletionStatusCompleted: + status.Reason = "Migration completed successfully" + case concepts.CompletionStatusRunning: + status.Reason = fmt.Sprintf("Migration in progress: %s", status.Reason) + } + + return status, nil + } +} diff --git a/examples/job-primitive/main.go b/examples/job-primitive/main.go new file mode 100644 index 00000000..7d53ef9b --- /dev/null +++ b/examples/job-primitive/main.go @@ -0,0 +1,118 @@ +// Package main is the entry point for the job primitive example. +package main + +import ( + "context" + "fmt" + "os" + + ocm "github.com/sourcehawk/go-crd-condition-metrics/pkg/crd-condition-metrics" + "github.com/sourcehawk/operator-component-framework/examples/job-primitive/app" + "github.com/sourcehawk/operator-component-framework/examples/job-primitive/resources" + batchv1 "k8s.io/api/batch/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/client-go/tools/record" + "sigs.k8s.io/controller-runtime/pkg/client/fake" +) + +func main() { + // 1. Setup scheme and fake client for the example. + scheme := runtime.NewScheme() + if err := app.AddToScheme(scheme); err != nil { + fmt.Fprintf(os.Stderr, "failed to add to scheme: %v\n", err) + os.Exit(1) + } + if err := batchv1.AddToScheme(scheme); err != nil { + fmt.Fprintf(os.Stderr, "failed to add batch/v1 to scheme: %v\n", err) + os.Exit(1) + } + + fakeClient := fake.NewClientBuilder(). + WithScheme(scheme). + WithStatusSubresource(&app.ExampleApp{}). + Build() + + // 2. Create an example Owner object. + owner := &app.ExampleApp{ + Spec: app.ExampleAppSpec{ + Version: "1.2.3", + EnableTracing: true, + EnableMetrics: false, + Suspended: false, + }, + } + owner.Name = "my-example-app" + owner.Namespace = "default" + + if err := fakeClient.Create(context.Background(), owner); err != nil { + fmt.Fprintf(os.Stderr, "failed to create owner: %v\n", err) + os.Exit(1) + } + + // 3. Initialize our controller. + gauge := ocm.NewOperatorConditionsGauge("example") + controller := &app.ExampleController{ + Client: fakeClient, + Scheme: scheme, + Recorder: record.NewFakeRecorder(100), + Metrics: &ocm.ConditionMetricRecorder{ + Controller: "example-controller", + OperatorConditionsGauge: gauge, + }, + + // Pass the job resource factory. + NewJobResource: resources.NewJobResource, + } + + // 4. Run reconciliation with multiple spec versions. + specs := []app.ExampleAppSpec{ + { + Version: "1.2.3", + EnableTracing: true, + Suspended: false, + }, + { + Version: "1.2.4", // Version upgrade + EnableTracing: true, + Suspended: false, + }, + { + Version: "1.2.4", + EnableTracing: false, // Disable tracing + Suspended: false, + }, + { + Version: "1.2.4", + EnableTracing: false, + Suspended: true, // Suspend the job + }, + } + + ctx := context.Background() + + for i, spec := range specs { + fmt.Printf("\n--- Step %d: Applying Spec: Version=%s, Tracing=%v, Suspended=%v ---\n", + i+1, spec.Version, spec.EnableTracing, spec.Suspended) + + // Update owner spec + owner.Spec = spec + if err := fakeClient.Update(ctx, owner); err != nil { + fmt.Fprintf(os.Stderr, "failed to update owner: %v\n", err) + os.Exit(1) + } + + fmt.Println("Running reconciliation...") + if err := controller.Reconcile(ctx, owner); err != nil { + fmt.Fprintf(os.Stderr, "reconciliation failed: %v\n", err) + os.Exit(1) + } + + // Inspect the owner conditions. + for _, cond := range owner.Status.Conditions { + fmt.Printf("Condition: %s, Status: %s, Reason: %s\n", + cond.Type, cond.Status, cond.Reason) + } + } + + fmt.Println("\nReconciliation sequence completed successfully!") +} diff --git a/examples/job-primitive/resources/job.go b/examples/job-primitive/resources/job.go new file mode 100644 index 00000000..e1f85ada --- /dev/null +++ b/examples/job-primitive/resources/job.go @@ -0,0 +1,76 @@ +// Package resources provides resource implementations for the job primitive example. +package resources + +import ( + "fmt" + + "github.com/sourcehawk/operator-component-framework/examples/job-primitive/app" + "github.com/sourcehawk/operator-component-framework/examples/job-primitive/features" + "github.com/sourcehawk/operator-component-framework/pkg/component" + "github.com/sourcehawk/operator-component-framework/pkg/primitives/job" + batchv1 "k8s.io/api/batch/v1" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "sigs.k8s.io/yaml" +) + +// NewJobResource constructs a job primitive resource with all the features. +func NewJobResource(owner *app.ExampleApp) (component.Resource, error) { + // 1. Create the base Job object. + base := &batchv1.Job{ + ObjectMeta: metav1.ObjectMeta{ + Name: owner.Name + "-migration", + Namespace: owner.Namespace, + Labels: map[string]string{ + "app": owner.Name, + }, + }, + Spec: batchv1.JobSpec{ + Template: corev1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{ + "app": owner.Name, + }, + }, + Spec: corev1.PodSpec{ + RestartPolicy: corev1.RestartPolicyOnFailure, + Containers: []corev1.Container{ + { + Name: "migrate", + Image: "my-app-migration:latest", // Will be overwritten by VersionFeature + }, + }, + }, + }, + }, + } + + // 2. Initialize the job builder. + builder := job.NewBuilder(base) + + // 3. Apply mutations (features) based on the owner spec. + builder.WithMutation(features.VersionFeature(owner.Spec.Version)) + builder.WithMutation(features.TracingFeature(owner.Spec.EnableTracing)) + builder.WithMutation(features.RetryPolicyFeature(owner.Spec.Version)) + + // 4. Configure custom status handler. + builder.WithCustomConvergeStatus(features.CustomConvergeStatus()) + + // 5. Data extraction (optional). + builder.WithDataExtractor(func(j batchv1.Job) error { + fmt.Printf("Reconciling job: %s, active: %d, succeeded: %d, failed: %d\n", + j.Name, j.Status.Active, j.Status.Succeeded, j.Status.Failed) + + // Print the complete job resource object as yaml + y, err := yaml.Marshal(j) + if err != nil { + return fmt.Errorf("failed to marshal job to yaml: %w", err) + } + fmt.Printf("Complete Job Resource:\n---\n%s\n---\n", string(y)) + + return nil + }) + + // 6. Build the final resource. + return builder.Build() +} diff --git a/pkg/mutation/editors/jobspec_test.go b/pkg/mutation/editors/jobspec_test.go index ff421527..a0481a82 100644 --- a/pkg/mutation/editors/jobspec_test.go +++ b/pkg/mutation/editors/jobspec_test.go @@ -4,6 +4,7 @@ import ( "testing" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" batchv1 "k8s.io/api/batch/v1" ) @@ -12,6 +13,7 @@ func TestJobSpecEditor(t *testing.T) { spec := &batchv1.JobSpec{} editor := NewJobSpecEditor(spec) editor.SetCompletions(5) + require.NotNil(t, spec.Completions) assert.Equal(t, int32(5), *spec.Completions) }) @@ -19,6 +21,7 @@ func TestJobSpecEditor(t *testing.T) { spec := &batchv1.JobSpec{} editor := NewJobSpecEditor(spec) editor.SetParallelism(3) + require.NotNil(t, spec.Parallelism) assert.Equal(t, int32(3), *spec.Parallelism) }) @@ -26,6 +29,7 @@ func TestJobSpecEditor(t *testing.T) { spec := &batchv1.JobSpec{} editor := NewJobSpecEditor(spec) editor.SetBackoffLimit(6) + require.NotNil(t, spec.BackoffLimit) assert.Equal(t, int32(6), *spec.BackoffLimit) }) @@ -33,6 +37,7 @@ func TestJobSpecEditor(t *testing.T) { spec := &batchv1.JobSpec{} editor := NewJobSpecEditor(spec) editor.SetActiveDeadlineSeconds(300) + require.NotNil(t, spec.ActiveDeadlineSeconds) assert.Equal(t, int64(300), *spec.ActiveDeadlineSeconds) }) @@ -40,6 +45,7 @@ func TestJobSpecEditor(t *testing.T) { spec := &batchv1.JobSpec{} editor := NewJobSpecEditor(spec) editor.SetTTLSecondsAfterFinished(100) + require.NotNil(t, spec.TTLSecondsAfterFinished) assert.Equal(t, int32(100), *spec.TTLSecondsAfterFinished) }) @@ -47,12 +53,13 @@ func TestJobSpecEditor(t *testing.T) { spec := &batchv1.JobSpec{} editor := NewJobSpecEditor(spec) editor.SetCompletionMode(batchv1.IndexedCompletion) + require.NotNil(t, spec.CompletionMode) assert.Equal(t, batchv1.IndexedCompletion, *spec.CompletionMode) }) - t.Run("Raw", func(t *testing.T) { + t.Run("Raw returns pointer to spec", func(t *testing.T) { spec := &batchv1.JobSpec{} editor := NewJobSpecEditor(spec) - assert.Equal(t, spec, editor.Raw()) + assert.Same(t, spec, editor.Raw()) }) } diff --git a/pkg/primitives/job/builder.go b/pkg/primitives/job/builder.go new file mode 100644 index 00000000..228c4c42 --- /dev/null +++ b/pkg/primitives/job/builder.go @@ -0,0 +1,158 @@ +package job + +import ( + "fmt" + + "github.com/sourcehawk/operator-component-framework/internal/generic" + "github.com/sourcehawk/operator-component-framework/pkg/component/concepts" + "github.com/sourcehawk/operator-component-framework/pkg/feature" + batchv1 "k8s.io/api/batch/v1" +) + +// Builder is a configuration helper for creating and customizing a Job Resource. +// +// It provides a fluent API for registering mutations, status handlers, and +// data extractors. This builder ensures that the resulting Resource is +// properly initialized and validated before use in a reconciliation loop. +type Builder struct { + base *generic.TaskBuilder[*batchv1.Job, *Mutator] +} + +// NewBuilder initializes a new Builder with the provided Job object. +// +// The Job object passed here serves as the "desired base state". During +// reconciliation, the Resource will attempt to make the cluster's state match +// this base state, modified by any registered mutations. +// +// The provided Job must have at least a Name and Namespace set, which +// is validated during the Build() call. +func NewBuilder(job *batchv1.Job) *Builder { + identityFunc := func(j *batchv1.Job) string { + return fmt.Sprintf("batch/v1/Job/%s/%s", j.Namespace, j.Name) + } + + base := generic.NewTaskBuilder[*batchv1.Job, *Mutator]( + job, + identityFunc, + NewMutator, + ) + + base. + WithCustomConvergeStatus(DefaultConvergingStatusHandler). + WithCustomSuspendStatus(DefaultSuspensionStatusHandler). + WithCustomSuspendMutation(DefaultSuspendMutationHandler). + WithCustomSuspendDeletionDecision(DefaultDeleteOnSuspendHandler) + + return &Builder{ + base: base, + } +} + +// WithMutation registers a feature-based mutation for the Job. +// +// Mutations are applied sequentially during the Mutate() phase of reconciliation. +// They are typically used by Features to inject environment variables, +// arguments, or other configuration into the Job's containers. +// +// Since mutations are often version-gated, the provided feature.Mutation +// should contain the logic to determine if and how the mutation is applied +// based on the component's current version or configuration. +func (b *Builder) WithMutation(m Mutation) *Builder { + b.base.WithMutation(feature.Mutation[*Mutator](m)) + return b +} + +// WithCustomConvergeStatus overrides the default logic for determining if the +// Job has completed, is running, or has failed. +// +// The default behavior uses DefaultConvergingStatusHandler, which checks the Job's +// status conditions. Use this method if your Job requires more complex checks, +// such as waiting for specific annotations or external signals. +// +// If you want to augment the default behavior, you can call DefaultConvergingStatusHandler +// within your custom handler. +func (b *Builder) WithCustomConvergeStatus( + handler func(concepts.ConvergingOperation, *batchv1.Job) (concepts.CompletionStatusWithReason, error), +) *Builder { + b.base.WithCustomConvergeStatus(handler) + return b +} + +// WithCustomSuspendStatus overrides how the progress of suspension is reported. +// +// The default behavior uses DefaultSuspensionStatusHandler, which reports the +// progress based on the Job's Suspend field and active pod count. Use this if +// your custom suspension strategy involves other measurable states. +// +// If you want to augment the default behavior, you can call DefaultSuspensionStatusHandler +// within your custom handler. +func (b *Builder) WithCustomSuspendStatus( + handler func(*batchv1.Job) (concepts.SuspensionStatusWithReason, error), +) *Builder { + b.base.WithCustomSuspendStatus(handler) + return b +} + +// WithCustomSuspendMutation defines how the Job should be modified when +// the component is suspended. +// +// The default behavior uses DefaultSuspendMutationHandler, which sets the +// Job's Suspend field to true. You might override this if you want to suspend +// the Job by other means. +// +// If you want to augment the default behavior, you can call DefaultSuspendMutationHandler +// within your custom handler. +func (b *Builder) WithCustomSuspendMutation( + handler func(*Mutator) error, +) *Builder { + b.base.WithCustomSuspendMutation(handler) + return b +} + +// WithCustomSuspendDeletionDecision overrides the decision of whether to delete +// the Job when the component is suspended. +// +// The default behavior uses DefaultDeleteOnSuspendHandler, which returns true, +// meaning Jobs are deleted during suspension. Return false from this handler if +// you want the Job to remain in the cluster when suspended. +// +// If you want to augment the default behavior, you can call DefaultDeleteOnSuspendHandler +// within your custom handler. +func (b *Builder) WithCustomSuspendDeletionDecision( + handler func(*batchv1.Job) bool, +) *Builder { + b.base.WithCustomSuspendDeletionDecision(handler) + return b +} + +// WithDataExtractor registers a function to harvest information from the +// Job after it has been successfully reconciled. +// +// This is useful for capturing auto-generated fields (like completion status +// or pod names) and making them available to other components or resources via +// the framework's data extraction mechanism. +func (b *Builder) WithDataExtractor( + extractor func(batchv1.Job) error, +) *Builder { + if extractor != nil { + b.base.WithDataExtractor(func(j *batchv1.Job) error { + return extractor(*j) + }) + } + return b +} + +// Build validates the configuration and returns the initialized Resource. +// +// It ensures that: +// - A base Job object was provided. +// - The Job has both a name and a namespace set. +// +// If validation fails, an error is returned and the Resource should not be used. +func (b *Builder) Build() (*Resource, error) { + genericRes, err := b.base.Build() + if err != nil { + return nil, err + } + return &Resource{base: genericRes}, nil +} diff --git a/pkg/primitives/job/builder_test.go b/pkg/primitives/job/builder_test.go new file mode 100644 index 00000000..d0133340 --- /dev/null +++ b/pkg/primitives/job/builder_test.go @@ -0,0 +1,213 @@ +package job + +import ( + "errors" + "testing" + + "github.com/sourcehawk/operator-component-framework/pkg/component/concepts" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + batchv1 "k8s.io/api/batch/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +func TestBuilder(t *testing.T) { + t.Parallel() + + t.Run("Build validation", func(t *testing.T) { + t.Parallel() + + tests := []struct { + name string + job *batchv1.Job + expectedErr string + }{ + { + name: "nil job", + job: nil, + expectedErr: "object cannot be nil", + }, + { + name: "empty name", + job: &batchv1.Job{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "test-ns", + }, + }, + expectedErr: "object name cannot be empty", + }, + { + name: "empty namespace", + job: &batchv1.Job{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-job", + }, + }, + expectedErr: "object namespace cannot be empty", + }, + { + name: "valid job", + job: &batchv1.Job{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-job", + Namespace: "test-ns", + }, + }, + expectedErr: "", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + res, err := NewBuilder(tt.job).Build() + if tt.expectedErr != "" { + require.Error(t, err) + assert.Contains(t, err.Error(), tt.expectedErr) + assert.Nil(t, res) + } else { + require.NoError(t, err) + require.NotNil(t, res) + assert.Equal(t, "batch/v1/Job/test-ns/test-job", res.Identity()) + } + }) + } + }) + + t.Run("WithMutation", func(t *testing.T) { + t.Parallel() + job := &batchv1.Job{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-job", + Namespace: "test-ns", + }, + } + m := Mutation{ + Name: "test-mutation", + } + res, err := NewBuilder(job). + WithMutation(m). + Build() + require.NoError(t, err) + assert.Len(t, res.base.Mutations, 1) + assert.Equal(t, "test-mutation", res.base.Mutations[0].Name) + }) + + t.Run("WithCustomConvergeStatus", func(t *testing.T) { + t.Parallel() + job := &batchv1.Job{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-job", + Namespace: "test-ns", + }, + } + handler := func(_ concepts.ConvergingOperation, _ *batchv1.Job) (concepts.CompletionStatusWithReason, error) { + return concepts.CompletionStatusWithReason{Status: concepts.CompletionStatusCompleted}, nil + } + res, err := NewBuilder(job). + WithCustomConvergeStatus(handler). + Build() + require.NoError(t, err) + require.NotNil(t, res.base.ConvergingStatusHandler) + status, err := res.base.ConvergingStatusHandler(concepts.ConvergingOperationUpdated, nil) + require.NoError(t, err) + assert.Equal(t, concepts.CompletionStatusCompleted, status.Status) + }) + + t.Run("WithCustomSuspendStatus", func(t *testing.T) { + t.Parallel() + job := &batchv1.Job{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-job", + Namespace: "test-ns", + }, + } + handler := func(_ *batchv1.Job) (concepts.SuspensionStatusWithReason, error) { + return concepts.SuspensionStatusWithReason{Status: concepts.SuspensionStatusSuspended}, nil + } + res, err := NewBuilder(job). + WithCustomSuspendStatus(handler). + Build() + require.NoError(t, err) + require.NotNil(t, res.base.SuspendStatusHandler) + status, err := res.base.SuspendStatusHandler(nil) + require.NoError(t, err) + assert.Equal(t, concepts.SuspensionStatusSuspended, status.Status) + }) + + t.Run("WithCustomSuspendMutation", func(t *testing.T) { + t.Parallel() + job := &batchv1.Job{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-job", + Namespace: "test-ns", + }, + } + handler := func(_ *Mutator) error { + return errors.New("suspend error") + } + res, err := NewBuilder(job). + WithCustomSuspendMutation(handler). + Build() + require.NoError(t, err) + require.NotNil(t, res.base.SuspendMutationHandler) + err = res.base.SuspendMutationHandler(nil) + assert.EqualError(t, err, "suspend error") + }) + + t.Run("WithCustomSuspendDeletionDecision", func(t *testing.T) { + t.Parallel() + job := &batchv1.Job{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-job", + Namespace: "test-ns", + }, + } + handler := func(_ *batchv1.Job) bool { + return false + } + res, err := NewBuilder(job). + WithCustomSuspendDeletionDecision(handler). + Build() + require.NoError(t, err) + require.NotNil(t, res.base.DeleteOnSuspendHandler) + assert.False(t, res.base.DeleteOnSuspendHandler(nil)) + }) + + t.Run("WithDataExtractor", func(t *testing.T) { + t.Parallel() + job := &batchv1.Job{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-job", + Namespace: "test-ns", + }, + } + called := false + extractor := func(_ batchv1.Job) error { + called = true + return nil + } + res, err := NewBuilder(job). + WithDataExtractor(extractor). + Build() + require.NoError(t, err) + assert.Len(t, res.base.DataExtractors, 1) + err = res.base.DataExtractors[0](&batchv1.Job{}) + require.NoError(t, err) + assert.True(t, called) + }) + + t.Run("WithDataExtractor nil", func(t *testing.T) { + t.Parallel() + job := &batchv1.Job{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-job", + Namespace: "test-ns", + }, + } + res, err := NewBuilder(job). + WithDataExtractor(nil). + Build() + require.NoError(t, err) + assert.Len(t, res.base.DataExtractors, 0) + }) +} diff --git a/pkg/primitives/job/handlers.go b/pkg/primitives/job/handlers.go new file mode 100644 index 00000000..032216c2 --- /dev/null +++ b/pkg/primitives/job/handlers.go @@ -0,0 +1,128 @@ +package job + +import ( + "fmt" + + "github.com/sourcehawk/operator-component-framework/pkg/component/concepts" + "github.com/sourcehawk/operator-component-framework/pkg/mutation/editors" + batchv1 "k8s.io/api/batch/v1" + corev1 "k8s.io/api/core/v1" +) + +// DefaultConvergingStatusHandler is the default logic for determining if a Job has completed or is still running. +// +// It checks the Job's status conditions for Complete or Failed, and reports the +// appropriate CompletionStatus based on the current state. +// +// This function is used as the default handler by the Resource if no custom handler +// is registered via Builder.WithCustomConvergeStatus. It can be reused within +// custom handlers to augment the default behavior. +func DefaultConvergingStatusHandler( + op concepts.ConvergingOperation, job *batchv1.Job, +) (concepts.CompletionStatusWithReason, error) { + for _, cond := range job.Status.Conditions { + if cond.Type == batchv1.JobComplete && cond.Status == corev1.ConditionTrue { + return concepts.CompletionStatusWithReason{ + Status: concepts.CompletionStatusCompleted, + Reason: "Job completed successfully", + }, nil + } + if cond.Type == batchv1.JobFailed && cond.Status == corev1.ConditionTrue { + reason := "Job failed" + switch { + case cond.Message != "": + reason = fmt.Sprintf("Job failed: %s", cond.Message) + case cond.Reason != "": + reason = fmt.Sprintf("Job failed: %s", cond.Reason) + } + return concepts.CompletionStatusWithReason{ + Status: concepts.CompletionStatusFailing, + Reason: reason, + }, nil + } + } + + if job.Status.Active > 0 { + return concepts.CompletionStatusWithReason{ + Status: concepts.CompletionStatusRunning, + Reason: fmt.Sprintf("Job has %d active pod(s)", job.Status.Active), + }, nil + } + + switch op { + case concepts.ConvergingOperationCreated: + return concepts.CompletionStatusWithReason{ + Status: concepts.CompletionStatusPending, + Reason: "Job was just created, waiting for pods to be scheduled", + }, nil + default: + return concepts.CompletionStatusWithReason{ + Status: concepts.CompletionStatusPending, + Reason: "Job is waiting for pods to be scheduled", + }, nil + } +} + +// DefaultDeleteOnSuspendHandler provides the default decision of whether to delete the Job +// when the parent component is suspended. +// +// It always returns true, meaning the Job is deleted from the cluster during suspension. +// Jobs cannot be meaningfully scaled to zero like Deployments, so deletion is the +// standard approach for suspending a task resource. +// +// This function is used as the default handler by the Resource if no custom handler is +// registered via Builder.WithCustomSuspendDeletionDecision. It can be reused within +// custom handlers. +func DefaultDeleteOnSuspendHandler(_ *batchv1.Job) bool { + return true +} + +// DefaultSuspendMutationHandler provides the default mutation applied to a Job when +// the component is suspended. +// +// It sets the Job's Suspend field to true, which prevents the Job controller from +// creating new pods while allowing existing pods to complete. +// +// This function is used as the default handler by the Resource if no custom handler is +// registered via Builder.WithCustomSuspendMutation. It can be reused within custom handlers. +func DefaultSuspendMutationHandler(mutator *Mutator) error { + mutator.EditJobSpec(func(e *editors.JobSpecEditor) error { + e.Raw().Suspend = boolPtr(true) + return nil + }) + return nil +} + +// DefaultSuspensionStatusHandler monitors the progress of the suspension process. +// +// It reports whether the Job has been successfully suspended by checking if the +// Job's Suspend field is true and there are no active pods. +// +// This function is used as the default handler by the Resource if no custom handler is +// registered via Builder.WithCustomSuspendStatus. It can be reused within custom handlers. +func DefaultSuspensionStatusHandler(job *batchv1.Job) (concepts.SuspensionStatusWithReason, error) { + isSuspended := job.Spec.Suspend != nil && *job.Spec.Suspend + + if isSuspended && job.Status.Active == 0 { + return concepts.SuspensionStatusWithReason{ + Status: concepts.SuspensionStatusSuspended, + Reason: "Job is suspended", + }, nil + } + + if isSuspended { + return concepts.SuspensionStatusWithReason{ + Status: concepts.SuspensionStatusSuspending, + Reason: fmt.Sprintf("Job is suspending, %d pod(s) still active", job.Status.Active), + }, nil + } + + return concepts.SuspensionStatusWithReason{ + Status: concepts.SuspensionStatusSuspending, + Reason: "Waiting for Job suspend field to be applied", + }, nil +} + +func boolPtr(b bool) *bool { + return &b +} diff --git a/pkg/primitives/job/handlers_test.go b/pkg/primitives/job/handlers_test.go new file mode 100644 index 00000000..f329ee93 --- /dev/null +++ b/pkg/primitives/job/handlers_test.go @@ -0,0 +1,185 @@ +package job + +import ( + "testing" + + "github.com/sourcehawk/operator-component-framework/pkg/component/concepts" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + batchv1 "k8s.io/api/batch/v1" + corev1 "k8s.io/api/core/v1" +) + +func TestDefaultConvergingStatusHandler(t *testing.T) { + tests := []struct { + name string + op concepts.ConvergingOperation + job *batchv1.Job + wantStatus concepts.CompletionStatus + wantReason string + }{ + { + name: "completed", + op: concepts.ConvergingOperationUpdated, + job: &batchv1.Job{ + Status: batchv1.JobStatus{ + Conditions: []batchv1.JobCondition{ + {Type: batchv1.JobComplete, Status: corev1.ConditionTrue}, + }, + }, + }, + wantStatus: concepts.CompletionStatusCompleted, + wantReason: "Job completed successfully", + }, + { + name: "failed", + op: concepts.ConvergingOperationUpdated, + job: &batchv1.Job{ + Status: batchv1.JobStatus{ + Conditions: []batchv1.JobCondition{ + {Type: batchv1.JobFailed, Status: corev1.ConditionTrue, Message: "BackoffLimitExceeded"}, + }, + }, + }, + wantStatus: concepts.CompletionStatusFailing, + wantReason: "Job failed: BackoffLimitExceeded", + }, + { + name: "failed with reason only", + op: concepts.ConvergingOperationUpdated, + job: &batchv1.Job{ + Status: batchv1.JobStatus{ + Conditions: []batchv1.JobCondition{ + {Type: batchv1.JobFailed, Status: corev1.ConditionTrue, Reason: "BackoffLimitExceeded"}, + }, + }, + }, + wantStatus: concepts.CompletionStatusFailing, + wantReason: "Job failed: BackoffLimitExceeded", + }, + { + name: "failed without message or reason", + op: concepts.ConvergingOperationUpdated, + job: &batchv1.Job{ + Status: batchv1.JobStatus{ + Conditions: []batchv1.JobCondition{ + {Type: batchv1.JobFailed, Status: corev1.ConditionTrue}, + }, + }, + }, + wantStatus: concepts.CompletionStatusFailing, + wantReason: "Job failed", + }, + { + name: "running", + op: concepts.ConvergingOperationUpdated, + job: &batchv1.Job{ + Status: batchv1.JobStatus{ + Active: 2, + }, + }, + wantStatus: concepts.CompletionStatusRunning, + wantReason: "Job has 2 active pod(s)", + }, + { + name: "pending after creation", + op: concepts.ConvergingOperationCreated, + job: &batchv1.Job{ + Status: batchv1.JobStatus{}, + }, + wantStatus: concepts.CompletionStatusPending, + wantReason: "Job was just created, waiting for pods to be scheduled", + }, + { + name: "pending after update", + op: concepts.ConvergingOperationUpdated, + job: &batchv1.Job{ + Status: batchv1.JobStatus{}, + }, + wantStatus: concepts.CompletionStatusPending, + wantReason: "Job is waiting for pods to be scheduled", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got, err := DefaultConvergingStatusHandler(tt.op, tt.job) + require.NoError(t, err) + assert.Equal(t, tt.wantStatus, got.Status) + assert.Equal(t, tt.wantReason, got.Reason) + }) + } +} + +func TestDefaultDeleteOnSuspendHandler(t *testing.T) { + job := &batchv1.Job{} + assert.True(t, DefaultDeleteOnSuspendHandler(job)) +} + +func TestDefaultSuspendMutationHandler(t *testing.T) { + job := &batchv1.Job{} + mutator := NewMutator(job) + mutator.BeginFeature() + err := DefaultSuspendMutationHandler(mutator) + require.NoError(t, err) + err = mutator.Apply() + require.NoError(t, err) + require.NotNil(t, job.Spec.Suspend) + assert.True(t, *job.Spec.Suspend) +} + +func TestDefaultSuspensionStatusHandler(t *testing.T) { + tests := []struct { + name string + job *batchv1.Job + wantStatus concepts.SuspensionStatus + wantReason string + }{ + { + name: "suspended", + job: &batchv1.Job{ + Spec: batchv1.JobSpec{ + Suspend: boolPtr(true), + }, + Status: batchv1.JobStatus{ + Active: 0, + }, + }, + wantStatus: concepts.SuspensionStatusSuspended, + wantReason: "Job is suspended", + }, + { + name: "suspending with active pods", + job: &batchv1.Job{ + Spec: batchv1.JobSpec{ + Suspend: boolPtr(true), + }, + Status: batchv1.JobStatus{ + Active: 2, + }, + }, + wantStatus: concepts.SuspensionStatusSuspending, + wantReason: "Job is suspending, 2 pod(s) still active", + }, + { + name: "waiting for suspend field", + job: &batchv1.Job{ + Spec: batchv1.JobSpec{}, + Status: batchv1.JobStatus{ + Active: 1, + }, + }, + wantStatus: concepts.SuspensionStatusSuspending, + wantReason: "Waiting for Job suspend field to be applied", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got, err := DefaultSuspensionStatusHandler(tt.job) + require.NoError(t, err) + assert.Equal(t, tt.wantStatus, got.Status) + assert.Equal(t, tt.wantReason, got.Reason) + }) + } +} diff --git a/pkg/primitives/job/mutator.go b/pkg/primitives/job/mutator.go new file mode 100644 index 00000000..3282b4b6 --- /dev/null +++ b/pkg/primitives/job/mutator.go @@ -0,0 +1,360 @@ +// Package job provides a builder and resource for managing Kubernetes Jobs. +package job + +import ( + "github.com/sourcehawk/operator-component-framework/pkg/feature" + "github.com/sourcehawk/operator-component-framework/pkg/mutation/editors" + "github.com/sourcehawk/operator-component-framework/pkg/mutation/selectors" + batchv1 "k8s.io/api/batch/v1" + corev1 "k8s.io/api/core/v1" +) + +// Mutation defines a mutation that is applied to a job Mutator +// only if its associated feature gate is enabled. +type Mutation feature.Mutation[*Mutator] + +type containerEdit struct { + selector selectors.ContainerSelector + edit func(*editors.ContainerEditor) error +} + +type containerPresenceOp struct { + name string + container *corev1.Container // nil for remove +} + +type featurePlan struct { + jobMetadataEdits []func(*editors.ObjectMetaEditor) error + jobSpecEdits []func(*editors.JobSpecEditor) error + podTemplateMetadataEdits []func(*editors.ObjectMetaEditor) error + podSpecEdits []func(*editors.PodSpecEditor) error + containerPresence []containerPresenceOp + containerEdits []containerEdit + initContainerPresence []containerPresenceOp + initContainerEdits []containerEdit +} + +// Mutator is a high-level helper for modifying a Kubernetes Job. +// +// It uses a "plan-and-apply" pattern: mutations are recorded first, then applied +// to the Job in a single controlled pass when Apply() is called. +// +// The Mutator maintains feature boundaries: each feature's mutations are planned +// together and applied in the order the features were registered. +// +// Mutator implements editors.ObjectMutator. +type Mutator struct { + current *batchv1.Job + + plans []featurePlan + active *featurePlan +} + +// NewMutator creates a new Mutator for the given Job. +// +// It is typically used within a Feature's Mutation logic to express desired +// changes to the Job. BeginFeature must be called before registering +// any mutations. +func NewMutator(current *batchv1.Job) *Mutator { + return &Mutator{ + current: current, + } +} + +// BeginFeature starts a new feature planning scope. All subsequent mutation +// registrations will be grouped into this feature's plan. +func (m *Mutator) BeginFeature() { + m.plans = append(m.plans, featurePlan{}) + m.active = &m.plans[len(m.plans)-1] +} + +// EditObjectMetadata records a mutation for the Job's own metadata. +// +// Metadata edits are applied before all other categories within the same feature. +// A nil edit function is ignored. +func (m *Mutator) EditObjectMetadata(edit func(*editors.ObjectMetaEditor) error) { + if edit == nil { + return + } + m.active.jobMetadataEdits = append(m.active.jobMetadataEdits, edit) +} + +// EditJobSpec records a mutation for the Job's top-level spec. +// +// Job spec edits are applied after metadata edits but before pod template edits +// within the same feature. A nil edit function is ignored. +func (m *Mutator) EditJobSpec(edit func(*editors.JobSpecEditor) error) { + if edit == nil { + return + } + m.active.jobSpecEdits = append(m.active.jobSpecEdits, edit) +} + +// EditPodTemplateMetadata records a mutation for the Job's pod template metadata. +// +// Pod template metadata edits are applied after job spec edits but before pod spec +// edits within the same feature. A nil edit function is ignored. +func (m *Mutator) EditPodTemplateMetadata(edit func(*editors.ObjectMetaEditor) error) { + if edit == nil { + return + } + m.active.podTemplateMetadataEdits = append(m.active.podTemplateMetadataEdits, edit) +} + +// EditPodSpec records a mutation for the Job's pod spec. +// +// Pod spec edits are applied after pod template metadata edits but before container +// edits within the same feature. A nil edit function is ignored. +func (m *Mutator) EditPodSpec(edit func(*editors.PodSpecEditor) error) { + if edit == nil { + return + } + m.active.podSpecEdits = append(m.active.podSpecEdits, edit) +} + +// EditContainers records a mutation for containers matching the given selector. +// +// Edits are applied after container presence operations within the same feature. +// Selector matching is evaluated against a snapshot taken after the current +// feature's container presence operations have been applied. +// +// If either selector or edit function is nil, the registration is ignored. +func (m *Mutator) EditContainers(selector selectors.ContainerSelector, edit func(*editors.ContainerEditor) error) { + if selector == nil || edit == nil { + return + } + m.active.containerEdits = append(m.active.containerEdits, containerEdit{ + selector: selector, + edit: edit, + }) +} + +// EditInitContainers records a mutation for init containers matching the given selector. +// +// Edits are applied after init container presence operations within the same feature. +// Selector matching is evaluated against a snapshot taken after the current +// feature's init container presence operations have been applied. +// +// If either selector or edit function is nil, the registration is ignored. +func (m *Mutator) EditInitContainers(selector selectors.ContainerSelector, edit func(*editors.ContainerEditor) error) { + if selector == nil || edit == nil { + return + } + m.active.initContainerEdits = append(m.active.initContainerEdits, containerEdit{ + selector: selector, + edit: edit, + }) +} + +// EnsureContainer records that a regular container must be present in the Job. +// If a container with the same name exists, it is replaced; otherwise, it is appended. +func (m *Mutator) EnsureContainer(container corev1.Container) { + m.active.containerPresence = append(m.active.containerPresence, containerPresenceOp{ + name: container.Name, + container: &container, + }) +} + +// RemoveContainer records that a regular container should be removed by name. +func (m *Mutator) RemoveContainer(name string) { + m.active.containerPresence = append(m.active.containerPresence, containerPresenceOp{ + name: name, + container: nil, + }) +} + +// RemoveContainers records that multiple regular containers should be removed by name. +func (m *Mutator) RemoveContainers(names []string) { + for _, name := range names { + m.RemoveContainer(name) + } +} + +// EnsureInitContainer records that an init container must be present in the Job. +// If an init container with the same name exists, it is replaced; otherwise, it is appended. +func (m *Mutator) EnsureInitContainer(container corev1.Container) { + m.active.initContainerPresence = append(m.active.initContainerPresence, containerPresenceOp{ + name: container.Name, + container: &container, + }) +} + +// RemoveInitContainer records that an init container should be removed by name. +func (m *Mutator) RemoveInitContainer(name string) { + m.active.initContainerPresence = append(m.active.initContainerPresence, containerPresenceOp{ + name: name, + container: nil, + }) +} + +// RemoveInitContainers records that multiple init containers should be removed by name. +func (m *Mutator) RemoveInitContainers(names []string) { + for _, name := range names { + m.RemoveInitContainer(name) + } +} + +// EnsureContainerEnvVar records that an environment variable must be present +// in all containers of the Job. +// +// This is a convenience wrapper over EditContainers. +func (m *Mutator) EnsureContainerEnvVar(ev corev1.EnvVar) { + m.EditContainers(selectors.AllContainers(), func(e *editors.ContainerEditor) error { + e.EnsureEnvVar(ev) + return nil + }) +} + +// RemoveContainerEnvVar records that an environment variable should be +// removed from all containers of the Job. +// +// This is a convenience wrapper over EditContainers. +func (m *Mutator) RemoveContainerEnvVar(name string) { + m.EditContainers(selectors.AllContainers(), func(e *editors.ContainerEditor) error { + e.RemoveEnvVar(name) + return nil + }) +} + +// Apply executes all recorded mutation intents on the underlying Job. +// +// Execution order across all registered features: +// +// 1. Object metadata edits +// 2. Job spec edits +// 3. Pod template metadata edits +// 4. Pod spec edits +// 5. Regular container presence operations +// 6. Regular container edits +// 7. Init container presence operations +// 8. Init container edits +// +// Features are applied in the order they were registered. Within each category +// of a single feature, edits are applied in their registration order. +// +// Container selectors are evaluated against a snapshot taken after the current +// feature's container presence operations have been applied. +func (m *Mutator) Apply() error { + for _, plan := range m.plans { + // 1. Object metadata + if len(plan.jobMetadataEdits) > 0 { + editor := editors.NewObjectMetaEditor(&m.current.ObjectMeta) + for _, edit := range plan.jobMetadataEdits { + if err := edit(editor); err != nil { + return err + } + } + } + + // 2. Job spec + if len(plan.jobSpecEdits) > 0 { + editor := editors.NewJobSpecEditor(&m.current.Spec) + for _, edit := range plan.jobSpecEdits { + if err := edit(editor); err != nil { + return err + } + } + } + + // 3. Pod template metadata + if len(plan.podTemplateMetadataEdits) > 0 { + editor := editors.NewObjectMetaEditor(&m.current.Spec.Template.ObjectMeta) + for _, edit := range plan.podTemplateMetadataEdits { + if err := edit(editor); err != nil { + return err + } + } + } + + // 4. Pod spec + if len(plan.podSpecEdits) > 0 { + editor := editors.NewPodSpecEditor(&m.current.Spec.Template.Spec) + for _, edit := range plan.podSpecEdits { + if err := edit(editor); err != nil { + return err + } + } + } + + // 5. Regular container presence + for _, op := range plan.containerPresence { + applyPresenceOp(&m.current.Spec.Template.Spec.Containers, op) + } + + // 6. Regular container edits + if len(plan.containerEdits) > 0 { + // Take snapshot of containers AFTER presence ops but BEFORE applying any edits + snapshots := make([]corev1.Container, len(m.current.Spec.Template.Spec.Containers)) + for i := range m.current.Spec.Template.Spec.Containers { + m.current.Spec.Template.Spec.Containers[i].DeepCopyInto(&snapshots[i]) + } + + for i := range m.current.Spec.Template.Spec.Containers { + container := &m.current.Spec.Template.Spec.Containers[i] + snapshot := &snapshots[i] + editor := editors.NewContainerEditor(container) + for _, ce := range plan.containerEdits { + if ce.selector(i, snapshot) { + if err := ce.edit(editor); err != nil { + return err + } + } + } + } + } + + // 7. Init container presence + for _, op := range plan.initContainerPresence { + applyPresenceOp(&m.current.Spec.Template.Spec.InitContainers, op) + } + + // 8. Init container edits + if len(plan.initContainerEdits) > 0 { + // Take snapshot of init containers AFTER presence ops but BEFORE applying any edits + snapshots := make([]corev1.Container, len(m.current.Spec.Template.Spec.InitContainers)) + for i := range m.current.Spec.Template.Spec.InitContainers { + m.current.Spec.Template.Spec.InitContainers[i].DeepCopyInto(&snapshots[i]) + } + + for i := range m.current.Spec.Template.Spec.InitContainers { + container := &m.current.Spec.Template.Spec.InitContainers[i] + snapshot := &snapshots[i] + editor := editors.NewContainerEditor(container) + for _, ce := range plan.initContainerEdits { + if ce.selector(i, snapshot) { + if err := ce.edit(editor); err != nil { + return err + } + } + } + } + } + } + + return nil +} + +func applyPresenceOp(containers *[]corev1.Container, op containerPresenceOp) { + found := -1 + for i, c := range *containers { + if c.Name == op.name { + found = i + break + } + } + + if op.container == nil { + // Remove + if found != -1 { + *containers = append((*containers)[:found], (*containers)[found+1:]...) + } + return + } + + // Ensure + if found != -1 { + (*containers)[found] = *op.container + } else { + *containers = append(*containers, *op.container) + } +} diff --git a/pkg/primitives/job/mutator_test.go b/pkg/primitives/job/mutator_test.go new file mode 100644 index 00000000..7d58eed0 --- /dev/null +++ b/pkg/primitives/job/mutator_test.go @@ -0,0 +1,634 @@ +package job + +import ( + "errors" + "testing" + + "github.com/sourcehawk/operator-component-framework/pkg/mutation/editors" + "github.com/sourcehawk/operator-component-framework/pkg/mutation/selectors" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + batchv1 "k8s.io/api/batch/v1" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +func TestNewMutator(t *testing.T) { + job := &batchv1.Job{} + m := NewMutator(job) + assert.NotNil(t, m) + assert.Equal(t, job, m.current) + assert.Empty(t, m.plans, "NewMutator must not create any plans") + assert.Nil(t, m.active, "active plan must not be set") +} + +func TestBeginFeature_AddsExactlyOnePlan(t *testing.T) { + job := &batchv1.Job{} + m := NewMutator(job) + + m.BeginFeature() + require.Len(t, m.plans, 1, "BeginFeature must add exactly one plan") + assert.Equal(t, &m.plans[0], m.active, "active must point to the new plan") + + m.BeginFeature() + require.Len(t, m.plans, 2) + assert.Equal(t, &m.plans[1], m.active) +} + +func TestBeginFeature_IsolatesFeaturePlans(t *testing.T) { + job := &batchv1.Job{ + Spec: batchv1.JobSpec{ + Template: corev1.PodTemplateSpec{ + Spec: corev1.PodSpec{ + Containers: []corev1.Container{{Name: "app"}}, + }, + }, + }, + } + m := NewMutator(job) + + // Record mutations in the first feature plan + m.BeginFeature() + m.EditJobSpec(func(e *editors.JobSpecEditor) error { + e.SetBackoffLimit(3) + return nil + }) + m.EditContainers(selectors.ContainerNamed("app"), func(e *editors.ContainerEditor) error { + e.Raw().Image = "v1" + return nil + }) + + // Start a new feature and record different mutations + m.BeginFeature() + m.EditJobSpec(func(e *editors.JobSpecEditor) error { + e.SetBackoffLimit(5) + return nil + }) + + // First plan should have its edits, second plan should have its own + assert.Len(t, m.plans[0].jobSpecEdits, 1, "first plan should have one spec edit") + assert.Len(t, m.plans[0].containerEdits, 1, "first plan should have one container edit") + assert.Len(t, m.plans[1].jobSpecEdits, 1, "second plan should have one spec edit") + assert.Empty(t, m.plans[1].containerEdits, "second plan should have no container edits") +} + +func TestMutator_SingleFeature_PlanCount(t *testing.T) { + job := &batchv1.Job{} + m := NewMutator(job) + m.BeginFeature() + m.EditJobSpec(func(e *editors.JobSpecEditor) error { + e.SetBackoffLimit(3) + return nil + }) + + require.NoError(t, m.Apply()) + assert.Len(t, m.plans, 1, "no extra plans should be created during Apply") + assert.Equal(t, int32(3), *job.Spec.BackoffLimit) +} + +func TestMutator_EnvVars(t *testing.T) { + job := &batchv1.Job{ + Spec: batchv1.JobSpec{ + Template: corev1.PodTemplateSpec{ + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: "main", + Env: []corev1.EnvVar{ + {Name: "KEEP", Value: "stay"}, + {Name: "CHANGE", Value: "old"}, + {Name: "REMOVE", Value: "gone"}, + }, + }, + }, + }, + }, + }, + } + + m := NewMutator(job) + m.BeginFeature() + m.EnsureContainerEnvVar(corev1.EnvVar{Name: "CHANGE", Value: "new"}) + m.EnsureContainerEnvVar(corev1.EnvVar{Name: "ADD", Value: "added"}) + m.RemoveContainerEnvVar("REMOVE") + + err := m.Apply() + require.NoError(t, err) + + env := job.Spec.Template.Spec.Containers[0].Env + assert.Len(t, env, 3) + + findEnv := func(name string) *corev1.EnvVar { + for i := range env { + if env[i].Name == name { + return &env[i] + } + } + return nil + } + + assert.NotNil(t, findEnv("KEEP")) + assert.Equal(t, "stay", findEnv("KEEP").Value) + assert.Equal(t, "new", findEnv("CHANGE").Value) + assert.Equal(t, "added", findEnv("ADD").Value) + assert.Nil(t, findEnv("REMOVE")) +} + +func TestMutator_EditContainers(t *testing.T) { + job := &batchv1.Job{ + Spec: batchv1.JobSpec{ + Template: corev1.PodTemplateSpec{ + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + {Name: "c1"}, + {Name: "c2"}, + }, + }, + }, + }, + } + + m := NewMutator(job) + m.BeginFeature() + m.EditContainers(selectors.ContainerNamed("c1"), func(e *editors.ContainerEditor) error { + e.Raw().Image = "c1-image" + return nil + }) + m.EditContainers(selectors.AllContainers(), func(e *editors.ContainerEditor) error { + e.EnsureEnvVar(corev1.EnvVar{Name: "GLOBAL", Value: "true"}) + return nil + }) + + err := m.Apply() + require.NoError(t, err) + + assert.Equal(t, "c1-image", job.Spec.Template.Spec.Containers[0].Image) + assert.Equal(t, "", job.Spec.Template.Spec.Containers[1].Image) + assert.Equal(t, "GLOBAL", job.Spec.Template.Spec.Containers[0].Env[0].Name) + assert.Equal(t, "GLOBAL", job.Spec.Template.Spec.Containers[1].Env[0].Name) +} + +func TestMutator_EditPodSpec(t *testing.T) { + job := &batchv1.Job{} + m := NewMutator(job) + m.BeginFeature() + m.EditPodSpec(func(e *editors.PodSpecEditor) error { + e.Raw().ServiceAccountName = "my-sa" + return nil + }) + + err := m.Apply() + require.NoError(t, err) + assert.Equal(t, "my-sa", job.Spec.Template.Spec.ServiceAccountName) +} + +func TestMutator_EditJobSpec(t *testing.T) { + job := &batchv1.Job{} + m := NewMutator(job) + m.BeginFeature() + m.EditJobSpec(func(e *editors.JobSpecEditor) error { + e.SetBackoffLimit(5) + e.SetCompletions(3) + return nil + }) + + err := m.Apply() + require.NoError(t, err) + require.NotNil(t, job.Spec.BackoffLimit) + assert.Equal(t, int32(5), *job.Spec.BackoffLimit) + require.NotNil(t, job.Spec.Completions) + assert.Equal(t, int32(3), *job.Spec.Completions) +} + +func TestMutator_EditMetadata(t *testing.T) { + job := &batchv1.Job{} + m := NewMutator(job) + m.BeginFeature() + m.EditObjectMetadata(func(e *editors.ObjectMetaEditor) error { + e.Raw().Labels = map[string]string{"job": "label"} + return nil + }) + m.EditPodTemplateMetadata(func(e *editors.ObjectMetaEditor) error { + e.Raw().Annotations = map[string]string{"pod": "ann"} + return nil + }) + + err := m.Apply() + require.NoError(t, err) + assert.Equal(t, "label", job.Labels["job"]) + assert.Equal(t, "ann", job.Spec.Template.Annotations["pod"]) +} + +func TestMutator_Errors(t *testing.T) { + job := &batchv1.Job{} + m := NewMutator(job) + m.BeginFeature() + m.EditPodSpec(func(_ *editors.PodSpecEditor) error { + return errors.New("boom") + }) + + err := m.Apply() + assert.Error(t, err) + assert.Equal(t, "boom", err.Error()) +} + +func TestMutator_Order(t *testing.T) { + job := &batchv1.Job{ + Spec: batchv1.JobSpec{ + Template: corev1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{"orig": "label"}, + }, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{{Name: "main"}}, + }, + }, + }, + } + + var order []string + + m := NewMutator(job) + m.BeginFeature() + // Register in reverse order to verify fixed category ordering + m.EditContainers(selectors.AllContainers(), func(_ *editors.ContainerEditor) error { + order = append(order, "container") + return nil + }) + m.EditPodSpec(func(_ *editors.PodSpecEditor) error { + order = append(order, "podspec") + return nil + }) + m.EditPodTemplateMetadata(func(_ *editors.ObjectMetaEditor) error { + order = append(order, "podmeta") + return nil + }) + m.EditJobSpec(func(_ *editors.JobSpecEditor) error { + order = append(order, "jobspec") + return nil + }) + m.EditObjectMetadata(func(_ *editors.ObjectMetaEditor) error { + order = append(order, "jobmeta") + return nil + }) + + err := m.Apply() + require.NoError(t, err) + + expected := []string{"jobmeta", "jobspec", "podmeta", "podspec", "container"} + assert.Equal(t, expected, order) +} + +func TestMutator_ContainerPresence(t *testing.T) { + job := &batchv1.Job{ + Spec: batchv1.JobSpec{ + Template: corev1.PodTemplateSpec{ + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + {Name: "app", Image: "app-image"}, + {Name: "sidecar", Image: "sidecar-image"}, + }, + }, + }, + }, + } + + m := NewMutator(job) + m.BeginFeature() + m.EnsureContainer(corev1.Container{Name: "app", Image: "app-new-image"}) + m.RemoveContainer("sidecar") + m.EnsureContainer(corev1.Container{Name: "new-container", Image: "new-image"}) + + err := m.Apply() + require.NoError(t, err) + + require.Len(t, job.Spec.Template.Spec.Containers, 2) + assert.Equal(t, "app", job.Spec.Template.Spec.Containers[0].Name) + assert.Equal(t, "app-new-image", job.Spec.Template.Spec.Containers[0].Image) + assert.Equal(t, "new-container", job.Spec.Template.Spec.Containers[1].Name) + assert.Equal(t, "new-image", job.Spec.Template.Spec.Containers[1].Image) +} + +func TestMutator_InitContainerPresence(t *testing.T) { + job := &batchv1.Job{ + Spec: batchv1.JobSpec{ + Template: corev1.PodTemplateSpec{ + Spec: corev1.PodSpec{ + InitContainers: []corev1.Container{ + {Name: "init-1", Image: "init-1-image"}, + }, + }, + }, + }, + } + + m := NewMutator(job) + m.BeginFeature() + m.EnsureInitContainer(corev1.Container{Name: "init-2", Image: "init-2-image"}) + m.RemoveInitContainers([]string{"init-1"}) + + err := m.Apply() + require.NoError(t, err) + + require.Len(t, job.Spec.Template.Spec.InitContainers, 1) + assert.Equal(t, "init-2", job.Spec.Template.Spec.InitContainers[0].Name) +} + +func TestMutator_SelectorSnapshotSemantics(t *testing.T) { + const appV2 = "app-v2" + job := &batchv1.Job{ + Spec: batchv1.JobSpec{ + Template: corev1.PodTemplateSpec{ + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + {Name: "app", Image: "app-image"}, + }, + }, + }, + }, + } + + m := NewMutator(job) + m.BeginFeature() + + // First edit renames the container + m.EditContainers(selectors.ContainerNamed("app"), func(e *editors.ContainerEditor) error { + e.Raw().Name = appV2 + return nil + }) + + // Second edit should still match using "app" selector because of snapshot + m.EditContainers(selectors.ContainerNamed("app"), func(e *editors.ContainerEditor) error { + e.Raw().Image = "app-image-updated" + return nil + }) + + // Third edit targeting "app-v2" should NOT match in this apply pass + m.EditContainers(selectors.ContainerNamed(appV2), func(e *editors.ContainerEditor) error { + e.Raw().Image = "should-not-be-set" + return nil + }) + + err := m.Apply() + require.NoError(t, err) + + assert.Equal(t, appV2, job.Spec.Template.Spec.Containers[0].Name) + assert.Equal(t, "app-image-updated", job.Spec.Template.Spec.Containers[0].Image) +} + +func TestMutator_NilSafety(t *testing.T) { + job := &batchv1.Job{ + Spec: batchv1.JobSpec{ + Template: corev1.PodTemplateSpec{ + Spec: corev1.PodSpec{ + Containers: []corev1.Container{{Name: "main"}}, + }, + }, + }, + } + m := NewMutator(job) + m.BeginFeature() + + // These should all be no-ops and not panic + m.EditContainers(nil, func(_ *editors.ContainerEditor) error { return nil }) + m.EditContainers(selectors.AllContainers(), nil) + m.EditInitContainers(nil, func(_ *editors.ContainerEditor) error { return nil }) + m.EditInitContainers(selectors.AllContainers(), nil) + m.EditPodSpec(nil) + m.EditPodTemplateMetadata(nil) + m.EditObjectMetadata(nil) + m.EditJobSpec(nil) + + err := m.Apply() + assert.NoError(t, err) +} + +func TestMutator_CrossFeatureOrdering(t *testing.T) { + job := &batchv1.Job{ + Spec: batchv1.JobSpec{ + Template: corev1.PodTemplateSpec{ + Spec: corev1.PodSpec{ + Containers: []corev1.Container{{Name: "app", Image: "v1"}}, + }, + }, + }, + } + + m := NewMutator(job) + + // Feature A: sets backoff to 2, image to v2 + m.BeginFeature() + m.EditJobSpec(func(e *editors.JobSpecEditor) error { + e.SetBackoffLimit(2) + return nil + }) + m.EditContainers(selectors.ContainerNamed("app"), func(e *editors.ContainerEditor) error { + e.Raw().Image = "v2" + return nil + }) + + // Feature B: sets backoff to 3, image to v3 + m.BeginFeature() + m.EditJobSpec(func(e *editors.JobSpecEditor) error { + e.SetBackoffLimit(3) + return nil + }) + m.EditContainers(selectors.ContainerNamed("app"), func(e *editors.ContainerEditor) error { + e.Raw().Image = "v3" + return nil + }) + + err := m.Apply() + require.NoError(t, err) + + // Feature B should win + assert.Equal(t, int32(3), *job.Spec.BackoffLimit) + assert.Equal(t, "v3", job.Spec.Template.Spec.Containers[0].Image) +} + +func TestMutator_WithinFeatureCategoryOrdering(t *testing.T) { + job := &batchv1.Job{ + ObjectMeta: metav1.ObjectMeta{Name: "original-name"}, + Spec: batchv1.JobSpec{ + Template: corev1.PodTemplateSpec{ + Spec: corev1.PodSpec{ + Containers: []corev1.Container{{Name: "app"}}, + }, + }, + }, + } + + m := NewMutator(job) + m.BeginFeature() + + var executionOrder []string + + // We register them in reverse order of expected execution + m.EditContainers(selectors.AllContainers(), func(_ *editors.ContainerEditor) error { + executionOrder = append(executionOrder, "container") + return nil + }) + m.EditPodSpec(func(_ *editors.PodSpecEditor) error { + executionOrder = append(executionOrder, "podspec") + return nil + }) + m.EditPodTemplateMetadata(func(_ *editors.ObjectMetaEditor) error { + executionOrder = append(executionOrder, "podmeta") + return nil + }) + m.EditJobSpec(func(_ *editors.JobSpecEditor) error { + executionOrder = append(executionOrder, "jobspec") + return nil + }) + m.EditObjectMetadata(func(_ *editors.ObjectMetaEditor) error { + executionOrder = append(executionOrder, "jobmeta") + return nil + }) + + err := m.Apply() + require.NoError(t, err) + + expectedOrder := []string{ + "jobmeta", + "jobspec", + "podmeta", + "podspec", + "container", + } + assert.Equal(t, expectedOrder, executionOrder) +} + +func TestMutator_CrossFeatureVisibility(t *testing.T) { + job := &batchv1.Job{ + Spec: batchv1.JobSpec{ + Template: corev1.PodTemplateSpec{ + Spec: corev1.PodSpec{ + Containers: []corev1.Container{{Name: "app"}}, + }, + }, + }, + } + + m := NewMutator(job) + + // Feature A renames container + m.BeginFeature() + m.EditContainers(selectors.ContainerNamed("app"), func(e *editors.ContainerEditor) error { + e.Raw().Name = "app-v2" + return nil + }) + + // Feature B selects by the new name + m.BeginFeature() + m.EditContainers(selectors.ContainerNamed("app-v2"), func(e *editors.ContainerEditor) error { + e.Raw().Image = "v2-image" + return nil + }) + + err := m.Apply() + require.NoError(t, err) + + assert.Equal(t, "app-v2", job.Spec.Template.Spec.Containers[0].Name) + assert.Equal(t, "v2-image", job.Spec.Template.Spec.Containers[0].Image) +} + +func TestMutator_PresenceBeforeEdit(t *testing.T) { + job := &batchv1.Job{ + Spec: batchv1.JobSpec{ + Template: corev1.PodTemplateSpec{ + Spec: corev1.PodSpec{ + Containers: []corev1.Container{}, + }, + }, + }, + } + + m := NewMutator(job) + m.BeginFeature() + + // Register edit first + m.EditContainers(selectors.ContainerNamed("new-app"), func(e *editors.ContainerEditor) error { + e.Raw().Image = "edited-image" + return nil + }) + + // Register presence later + m.EnsureContainer(corev1.Container{Name: "new-app", Image: "original-image"}) + + err := m.Apply() + require.NoError(t, err) + + // It should work because presence happens before edits in Apply() + require.Len(t, job.Spec.Template.Spec.Containers, 1) + assert.Equal(t, "edited-image", job.Spec.Template.Spec.Containers[0].Image) +} + +func TestMutator_InitContainers(t *testing.T) { + const newImage = "new-image" + job := &batchv1.Job{ + Spec: batchv1.JobSpec{ + Template: corev1.PodTemplateSpec{ + Spec: corev1.PodSpec{ + InitContainers: []corev1.Container{ + {Name: "init-1", Image: "old-image"}, + }, + }, + }, + }, + } + + m := NewMutator(job) + m.BeginFeature() + m.EditInitContainers(selectors.ContainerNamed("init-1"), func(e *editors.ContainerEditor) error { + e.Raw().Image = newImage + return nil + }) + + err := m.Apply() + require.NoError(t, err) + + assert.Equal(t, newImage, job.Spec.Template.Spec.InitContainers[0].Image) +} + +func TestMutator_InitContainer_OrderingAndSnapshots(t *testing.T) { + job := &batchv1.Job{ + Spec: batchv1.JobSpec{ + Template: corev1.PodTemplateSpec{ + Spec: corev1.PodSpec{ + InitContainers: []corev1.Container{}, + }, + }, + }, + } + + m := NewMutator(job) + m.BeginFeature() + + // 1. Add init-1 + m.EnsureInitContainer(corev1.Container{Name: "init-1", Image: "v1"}) + + // 2. Edit init-1 (it's present in the same feature's phase) + m.EditInitContainers(selectors.ContainerNamed("init-1"), func(e *editors.ContainerEditor) error { + e.Raw().Image = "v1-edited" + return nil + }) + + // 3. Rename it inside the edit phase + m.EditInitContainers(selectors.ContainerNamed("init-1"), func(e *editors.ContainerEditor) error { + e.Raw().Name = "init-1-renamed" + return nil + }) + + // 4. Selector targeting "init-1" should still match because of snapshot in same phase + m.EditInitContainers(selectors.ContainerNamed("init-1"), func(e *editors.ContainerEditor) error { + e.Raw().Image = "v1-final" + return nil + }) + + err := m.Apply() + require.NoError(t, err) + + require.Len(t, job.Spec.Template.Spec.InitContainers, 1) + assert.Equal(t, "init-1-renamed", job.Spec.Template.Spec.InitContainers[0].Name) + assert.Equal(t, "v1-final", job.Spec.Template.Spec.InitContainers[0].Image) +} diff --git a/pkg/primitives/job/resource.go b/pkg/primitives/job/resource.go new file mode 100644 index 00000000..9ed3a9ad --- /dev/null +++ b/pkg/primitives/job/resource.go @@ -0,0 +1,123 @@ +package job + +import ( + "github.com/sourcehawk/operator-component-framework/internal/generic" + "github.com/sourcehawk/operator-component-framework/pkg/component/concepts" + batchv1 "k8s.io/api/batch/v1" + "sigs.k8s.io/controller-runtime/pkg/client" +) + +// Resource is a high-level abstraction for managing a Kubernetes Job within a controller's +// reconciliation loop. +// +// It implements several component interfaces to integrate with the operator-component-framework: +// - component.Resource: for basic identity and mutation behavior. +// - component.Completable: for run-to-completion tracking. +// - component.Suspendable: for controlled deactivation (suspend or delete). +// - component.DataExtractable: for exporting information after successful reconciliation. +// +// This resource handles the lifecycle of a Job, including initial creation, +// updates via feature mutations, and completion status monitoring. +type Resource struct { + base *generic.TaskResource[*batchv1.Job, *Mutator] +} + +// Identity returns a unique identifier for the Job in the format +// "batch/v1/Job//". +// +// This identifier is used by the framework's internal tracking and recording +// mechanisms to distinguish this specific Job from other resources +// managed by the same component. +func (r *Resource) Identity() string { + return r.base.Identity() +} + +// Object returns a copy of the underlying Kubernetes Job object. +// +// The returned object implements the client.Object interface, making it +// fully compatible with controller-runtime's Client for operations like +// Get, Create, Update, and Patch. +// +// This method is called by the framework to obtain the current state +// of the resource before applying mutations. +func (r *Resource) Object() (client.Object, error) { + return r.base.Object() +} + +// Mutate transforms the current state of a Kubernetes Job into the desired state. +// +// The mutation process follows a specific order: +// 1. Core State: The desired base state is applied to the current object. +// 2. Feature Mutations: All registered feature-based mutations are applied, +// allowing for granular, version-gated changes to the Job. +// 3. Suspension: If the resource is in a suspending state, the suspension +// logic (e.g., setting suspend=true) is applied. +// +// This method is invoked by the framework during the "Update" phase of +// reconciliation. It ensures that the in-memory object reflects all +// configuration and feature requirements before it is sent to the API server. +func (r *Resource) Mutate(current client.Object) error { + return r.base.Mutate(current) +} + +// ConvergingStatus evaluates if the Job has completed, is still running, or has failed. +// +// By default, it uses DefaultConvergingStatusHandler, which checks the Job's status +// conditions for Complete or Failed. +// +// The return value includes a descriptive status (concepts.CompletionStatusCompleted, +// concepts.CompletionStatusRunning, concepts.CompletionStatusPending, or +// concepts.CompletionStatusFailing) and a human-readable reason, which are used to +// update the component's conditions. +func (r *Resource) ConvergingStatus(op concepts.ConvergingOperation) (concepts.CompletionStatusWithReason, error) { + return r.base.ConvergingStatus(op) +} + +// DeleteOnSuspend determines whether the Job should be deleted from the +// cluster when the parent component is suspended. +// +// By default, it uses DefaultDeleteOnSuspendHandler, which returns true, meaning +// the Job is deleted during suspension. Jobs cannot be meaningfully scaled to zero +// like Deployments, so deletion is the standard approach. +// +// A custom decision handler can be registered via the Builder to change this +// behavior based on the current state of the Job. +func (r *Resource) DeleteOnSuspend() bool { + return r.base.DeleteOnSuspend() +} + +// Suspend triggers the deactivation of the Job. +// +// It registers a mutation that will be executed during the next Mutate call. +// The default behavior uses DefaultSuspendMutationHandler to set the Job's +// Suspend field to true, which prevents new pods from being created. +// +// This is typically called by the framework when a component's .spec.suspended +// field is set to true. +func (r *Resource) Suspend() error { + return r.base.Suspend() +} + +// SuspensionStatus monitors the progress of the suspension process. +// +// By default, it uses DefaultSuspensionStatusHandler, which reports whether the +// Job has been successfully suspended by checking if the Suspend field is true +// and there are no active pods. The framework uses this to determine when the +// component has reached a fully suspended state. +func (r *Resource) SuspensionStatus() (concepts.SuspensionStatusWithReason, error) { + return r.base.SuspensionStatus() +} + +// ExtractData executes registered data extraction functions to harvest information +// from the reconciled Job. +// +// This is called by the framework after a successful reconciliation of the +// resource. It allows the component to export details (like completion status +// or generated names) that might be needed by other resources or higher-level +// controllers. +// +// Data extractors are provided with a deep copy of the current Job to +// prevent accidental mutations during the extraction process. +func (r *Resource) ExtractData() error { + return r.base.ExtractData() +} diff --git a/pkg/primitives/job/resource_test.go b/pkg/primitives/job/resource_test.go new file mode 100644 index 00000000..63c242c7 --- /dev/null +++ b/pkg/primitives/job/resource_test.go @@ -0,0 +1,372 @@ +package job + +import ( + "errors" + "testing" + + "github.com/sourcehawk/operator-component-framework/pkg/component/concepts" + "github.com/sourcehawk/operator-component-framework/pkg/feature" + "github.com/sourcehawk/operator-component-framework/pkg/mutation/editors" + "github.com/sourcehawk/operator-component-framework/pkg/mutation/selectors" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" + batchv1 "k8s.io/api/batch/v1" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +func newValidJob() *batchv1.Job { + return &batchv1.Job{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-job", + Namespace: "test-ns", + }, + Spec: batchv1.JobSpec{ + Template: corev1.PodTemplateSpec{ + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + {Name: "worker", Image: "busybox"}, + }, + RestartPolicy: corev1.RestartPolicyNever, + }, + }, + }, + } +} + +func TestResource_Identity(t *testing.T) { + res, err := NewBuilder(newValidJob()).Build() + require.NoError(t, err) + assert.Equal(t, "batch/v1/Job/test-ns/test-job", res.Identity()) +} + +func TestResource_Object(t *testing.T) { + job := newValidJob() + res, err := NewBuilder(job).Build() + require.NoError(t, err) + + obj, err := res.Object() + require.NoError(t, err) + + got, ok := obj.(*batchv1.Job) + require.True(t, ok) + assert.Equal(t, job.Name, got.Name) + assert.Equal(t, job.Namespace, got.Namespace) + + // Must be a deep copy. + got.Name = "changed" + assert.Equal(t, "test-job", job.Name) +} + +func TestResource_Mutate(t *testing.T) { + desired := newValidJob() + res, err := NewBuilder(desired).Build() + require.NoError(t, err) + + obj, err := res.Object() + require.NoError(t, err) + require.NoError(t, res.Mutate(obj)) + + got := obj.(*batchv1.Job) + assert.Equal(t, "busybox", got.Spec.Template.Spec.Containers[0].Image) +} + +func TestResource_Mutate_WithMutation(t *testing.T) { + desired := newValidJob() + res, err := NewBuilder(desired). + WithMutation(Mutation{ + Name: "add-env", + Feature: feature.NewResourceFeature("v1", nil).When(true), + Mutate: func(m *Mutator) error { + m.EnsureContainerEnvVar(corev1.EnvVar{Name: "FOO", Value: "BAR"}) + return nil + }, + }). + Build() + require.NoError(t, err) + + obj, err := res.Object() + require.NoError(t, err) + require.NoError(t, res.Mutate(obj)) + + got := obj.(*batchv1.Job) + assert.Equal(t, "BAR", got.Spec.Template.Spec.Containers[0].Env[0].Value) +} + +func TestResource_Mutate_FeatureOrdering(t *testing.T) { + desired := newValidJob() + res, err := NewBuilder(desired). + WithMutation(Mutation{ + Name: "feature-a", + Feature: feature.NewResourceFeature("v1", nil).When(true), + Mutate: func(m *Mutator) error { + m.EditJobSpec(func(e *editors.JobSpecEditor) error { + e.Raw().BackoffLimit = int32Ptr(5) + return nil + }) + return nil + }, + }). + WithMutation(Mutation{ + Name: "feature-b", + Feature: feature.NewResourceFeature("v1", nil).When(true), + Mutate: func(m *Mutator) error { + m.EditJobSpec(func(e *editors.JobSpecEditor) error { + if e.Raw().BackoffLimit != nil && *e.Raw().BackoffLimit == 5 { + e.Raw().BackoffLimit = int32Ptr(10) + } + return nil + }) + return nil + }, + }). + Build() + require.NoError(t, err) + + obj, err := res.Object() + require.NoError(t, err) + require.NoError(t, res.Mutate(obj)) + + got := obj.(*batchv1.Job) + require.NotNil(t, got.Spec.BackoffLimit) + assert.Equal(t, int32(10), *got.Spec.BackoffLimit) +} + +func TestResource_Mutate_CrossMutationSelectorSnapshot(t *testing.T) { + desired := newValidJob() + res, err := NewBuilder(desired). + WithMutation(Mutation{ + Name: "add-sidecar", + Feature: feature.NewResourceFeature("v1", nil).When(true), + Mutate: func(m *Mutator) error { + m.EnsureContainer(corev1.Container{ + Name: "sidecar", + Image: "sidecar:latest", + }) + return nil + }, + }). + WithMutation(Mutation{ + Name: "configure-sidecar", + Feature: feature.NewResourceFeature("v1", nil).When(true), + Mutate: func(m *Mutator) error { + m.EditContainers(selectors.ContainerNamed("sidecar"), func(e *editors.ContainerEditor) error { + e.EnsureEnvVar(corev1.EnvVar{Name: "LOG_LEVEL", Value: "debug"}) + return nil + }) + return nil + }, + }). + Build() + require.NoError(t, err) + + obj, err := res.Object() + require.NoError(t, err) + require.NoError(t, res.Mutate(obj)) + + got := obj.(*batchv1.Job) + // The sidecar container should exist and have the env var from the second mutation. + var sidecar *corev1.Container + for i := range got.Spec.Template.Spec.Containers { + if got.Spec.Template.Spec.Containers[i].Name == "sidecar" { + sidecar = &got.Spec.Template.Spec.Containers[i] + break + } + } + require.NotNil(t, sidecar, "sidecar container should be present") + require.Len(t, sidecar.Env, 1) + assert.Equal(t, "LOG_LEVEL", sidecar.Env[0].Name) + assert.Equal(t, "debug", sidecar.Env[0].Value) +} + +type mockHandlers struct { + mock.Mock +} + +func (m *mockHandlers) ConvergingStatus(op concepts.ConvergingOperation, j *batchv1.Job) (concepts.CompletionStatusWithReason, error) { + args := m.Called(op, j) + return args.Get(0).(concepts.CompletionStatusWithReason), args.Error(1) +} + +func (m *mockHandlers) SuspensionStatus(j *batchv1.Job) (concepts.SuspensionStatusWithReason, error) { + args := m.Called(j) + return args.Get(0).(concepts.SuspensionStatusWithReason), args.Error(1) +} + +func (m *mockHandlers) Suspend(mut *Mutator) error { + args := m.Called(mut) + return args.Error(0) +} + +func (m *mockHandlers) DeleteOnSuspend(j *batchv1.Job) bool { + args := m.Called(j) + return args.Bool(0) +} + +func TestResource_DeleteOnSuspend(t *testing.T) { + job := newValidJob() + + t.Run("calls handler", func(t *testing.T) { + m := &mockHandlers{} + m.On("DeleteOnSuspend", job).Return(false) + + res, err := NewBuilder(job). + WithCustomSuspendDeletionDecision(m.DeleteOnSuspend). + Build() + require.NoError(t, err) + assert.False(t, res.DeleteOnSuspend()) + m.AssertExpectations(t) + }) + + t.Run("uses default", func(t *testing.T) { + res, err := NewBuilder(job).Build() + require.NoError(t, err) + assert.True(t, res.DeleteOnSuspend()) + }) +} + +func TestResource_Suspend(t *testing.T) { + job := newValidJob() + + t.Run("Suspend registers mutation and Mutate applies it using default handler", func(t *testing.T) { + res, err := NewBuilder(job).Build() + require.NoError(t, err) + err = res.Suspend() + require.NoError(t, err) + + current := job.DeepCopy() + err = res.Mutate(current) + require.NoError(t, err) + + require.NotNil(t, current.Spec.Suspend) + assert.True(t, *current.Spec.Suspend) + }) + + t.Run("Suspend uses custom mutation handler", func(t *testing.T) { + m := &mockHandlers{} + m.On("Suspend", mock.Anything).Return(nil).Run(func(args mock.Arguments) { + mut := args.Get(0).(*Mutator) + mut.EditJobSpec(func(e *editors.JobSpecEditor) error { + e.Raw().BackoffLimit = int32Ptr(0) + return nil + }) + }) + + res, err := NewBuilder(job). + WithCustomSuspendMutation(m.Suspend). + Build() + require.NoError(t, err) + err = res.Suspend() + require.NoError(t, err) + + current := job.DeepCopy() + err = res.Mutate(current) + require.NoError(t, err) + + m.AssertExpectations(t) + require.NotNil(t, current.Spec.BackoffLimit) + assert.Equal(t, int32(0), *current.Spec.BackoffLimit) + }) +} + +func TestResource_SuspensionStatus(t *testing.T) { + job := &batchv1.Job{ + ObjectMeta: metav1.ObjectMeta{Name: "test-job", Namespace: "test-ns"}, + Spec: batchv1.JobSpec{ + Suspend: boolPtr(true), + }, + Status: batchv1.JobStatus{ + Active: 0, + }, + } + + t.Run("calls handler", func(t *testing.T) { + m := &mockHandlers{} + statusSuspended := concepts.SuspensionStatusWithReason{Status: concepts.SuspensionStatusSuspended} + m.On("SuspensionStatus", job).Return(statusSuspended, nil) + + res, err := NewBuilder(job). + WithCustomSuspendStatus(m.SuspensionStatus). + Build() + require.NoError(t, err) + status, err := res.SuspensionStatus() + require.NoError(t, err) + m.AssertExpectations(t) + assert.Equal(t, concepts.SuspensionStatusSuspended, status.Status) + }) + + t.Run("uses default", func(t *testing.T) { + res, err := NewBuilder(job).Build() + require.NoError(t, err) + status, err := res.SuspensionStatus() + require.NoError(t, err) + assert.Equal(t, concepts.SuspensionStatusSuspended, status.Status) + }) +} + +func TestResource_ConvergingStatus(t *testing.T) { + job := &batchv1.Job{ + ObjectMeta: metav1.ObjectMeta{Name: "test-job", Namespace: "test-ns"}, + Status: batchv1.JobStatus{ + Active: 1, + }, + } + + t.Run("calls handler", func(t *testing.T) { + m := &mockHandlers{} + statusRunning := concepts.CompletionStatusWithReason{Status: concepts.CompletionStatusRunning} + m.On("ConvergingStatus", concepts.ConvergingOperationUpdated, job).Return(statusRunning, nil) + + res, err := NewBuilder(job). + WithCustomConvergeStatus(m.ConvergingStatus). + Build() + require.NoError(t, err) + status, err := res.ConvergingStatus(concepts.ConvergingOperationUpdated) + require.NoError(t, err) + m.AssertExpectations(t) + assert.Equal(t, concepts.CompletionStatusRunning, status.Status) + }) + + t.Run("uses default", func(t *testing.T) { + res, err := NewBuilder(job).Build() + require.NoError(t, err) + status, err := res.ConvergingStatus(concepts.ConvergingOperationUpdated) + require.NoError(t, err) + assert.Equal(t, concepts.CompletionStatusRunning, status.Status) + }) +} + +func TestResource_ExtractData(t *testing.T) { + job := newValidJob() + + extractedImage := "" + res, err := NewBuilder(job). + WithDataExtractor(func(j batchv1.Job) error { + extractedImage = j.Spec.Template.Spec.Containers[0].Image + return nil + }). + Build() + require.NoError(t, err) + + err = res.ExtractData() + require.NoError(t, err) + assert.Equal(t, "busybox", extractedImage) +} + +func TestResource_ExtractData_Error(t *testing.T) { + res, err := NewBuilder(newValidJob()). + WithDataExtractor(func(_ batchv1.Job) error { + return errors.New("extract error") + }). + Build() + require.NoError(t, err) + + err = res.ExtractData() + require.Error(t, err) + assert.Contains(t, err.Error(), "extract error") +} + +func int32Ptr(i int32) *int32 { + return &i +}