From 9ca91be0d31fda2aa9024c28fc30206a03dfba37 Mon Sep 17 00:00:00 2001 From: Geoffrey Ragot Date: Wed, 25 Mar 2026 16:58:03 +0100 Subject: [PATCH 1/5] feat(ledger): add v3 reconciler with Raft StatefulSet support MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Ledger v3 uses Raft consensus with Pebble embedded storage instead of PostgreSQL. This adds a version-branched reconciler that creates a StatefulSet (with headless service for peer discovery) when the ledger version is >= v3.0.0-alpha. Key changes: - Version gate in Reconcile(): v3+ skips Database/migrations entirely - StatefulSet with OrderedReady policy, 3 PVCs (wal, data, cold-cache) - Headless service (ledger-raft) for Raft peer DNS discovery - ClusterIP service mapping port 8080→9000 for gateway compatibility - Pod entrypoint script: computes node-id from ordinal, bootstrap/join - Settings-driven: replicas, PVC sizes, storage classes, Pebble/Raft tunables - RBAC marker for apps/statefulsets Co-Authored-By: Claude Opus 4.6 --- internal/resources/ledgers/init.go | 7 + internal/resources/ledgers/v3.go | 394 +++++++++++++++++++++++++++++ 2 files changed, 401 insertions(+) create mode 100644 internal/resources/ledgers/v3.go diff --git a/internal/resources/ledgers/init.go b/internal/resources/ledgers/init.go index 53c6e661..dc64ec7f 100644 --- a/internal/resources/ledgers/init.go +++ b/internal/resources/ledgers/init.go @@ -38,8 +38,14 @@ import ( //+kubebuilder:rbac:groups=formance.com,resources=ledgers/status,verbs=get;update;patch //+kubebuilder:rbac:groups=formance.com,resources=ledgers/finalizers,verbs=update //+kubebuilder:rbac:groups=batch,resources=cronjobs,verbs=get;list;watch;create;update;patch;delete +//+kubebuilder:rbac:groups=apps,resources=statefulsets,verbs=get;list;watch;create;update;patch;delete func Reconcile(ctx Context, stack *v1beta1.Stack, ledger *v1beta1.Ledger, version string) error { + isV3 := !semver.IsValid(version) || semver.Compare(version, "v3.0.0-alpha") >= 0 + if isV3 { + return reconcileV3(ctx, stack, ledger, version) + } + database, err := databases.Create(ctx, stack, ledger) if err != nil { return err @@ -111,6 +117,7 @@ func init() { Init( WithModuleReconciler(Reconcile, WithOwn[*v1beta1.Ledger](&appsv1.Deployment{}), + WithOwn[*v1beta1.Ledger](&appsv1.StatefulSet{}), WithOwn[*v1beta1.Ledger](&batchv1.Job{}), WithOwn[*v1beta1.Ledger](&corev1.Service{}), WithOwn[*v1beta1.Ledger](&v1beta1.GatewayHTTPAPI{}), diff --git a/internal/resources/ledgers/v3.go b/internal/resources/ledgers/v3.go new file mode 100644 index 00000000..254797aa --- /dev/null +++ b/internal/resources/ledgers/v3.go @@ -0,0 +1,394 @@ +package ledgers + +import ( + "fmt" + "strings" + + appsv1 "k8s.io/api/apps/v1" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/intstr" + + "github.com/formancehq/operator/v3/api/formance.com/v1beta1" + "github.com/formancehq/operator/v3/internal/core" + "github.com/formancehq/operator/v3/internal/resources/gatewayhttpapis" + "github.com/formancehq/operator/v3/internal/resources/registries" + "github.com/formancehq/operator/v3/internal/resources/services" + "github.com/formancehq/operator/v3/internal/resources/settings" +) + +const ( + v3PortHTTP = int32(9000) + v3PortGRPC = int32(8888) + v3PortRaft = int32(7777) +) + +func reconcileV3(ctx core.Context, stack *v1beta1.Stack, ledger *v1beta1.Ledger, version string) error { + imageConfiguration, err := registries.GetFormanceImage(ctx, stack, "ledger", version) + if err != nil { + return err + } + + if err := gatewayhttpapis.Create(ctx, ledger, gatewayhttpapis.WithHealthCheckEndpoint("health")); err != nil { + return err + } + + if err := createV3HeadlessService(ctx, stack, ledger); err != nil { + return err + } + + // ClusterIP service: port 8080 → container port 9000 (gateway compatibility) + if _, err := services.Create(ctx, ledger, "ledger", services.WithConfig(services.PortConfig{ + ServiceName: "ledger", + PortName: "http", + Port: 8080, + TargetPort: "http", + })); err != nil { + return err + } + + if err := installV3StatefulSet(ctx, stack, ledger, imageConfiguration); err != nil { + return err + } + + return nil +} + +func createV3HeadlessService(ctx core.Context, stack *v1beta1.Stack, ledger *v1beta1.Ledger) error { + headlessSvcName := "ledger-raft" + + _, _, err := core.CreateOrUpdate[*corev1.Service](ctx, types.NamespacedName{ + Name: headlessSvcName, + Namespace: stack.Name, + }, + func(t *corev1.Service) error { + t.Spec = corev1.ServiceSpec{ + ClusterIP: "None", + PublishNotReadyAddresses: true, + Ports: []corev1.ServicePort{ + { + Name: "raft", + Port: v3PortRaft, + Protocol: "TCP", + TargetPort: intstr.FromString("raft"), + }, + { + Name: "grpc", + Port: v3PortGRPC, + Protocol: "TCP", + TargetPort: intstr.FromString("grpc"), + }, + }, + Selector: map[string]string{ + "app.kubernetes.io/name": "ledger", + }, + } + return nil + }, + core.WithController[*corev1.Service](ctx.GetScheme(), ledger), + ) + return err +} + +func installV3StatefulSet(ctx core.Context, stack *v1beta1.Stack, ledger *v1beta1.Ledger, image *registries.ImageConfiguration) error { + stackName := stack.Name + + replicas, err := settings.GetInt32OrDefault(ctx, stackName, 3, "ledger", "v3", "replicas") + if err != nil { + return err + } + if replicas%2 == 0 { + return fmt.Errorf("ledger.v3.replicas must be odd, got %d", replicas) + } + + volumeClaims, err := buildV3VolumeClaimTemplates(ctx, stackName) + if err != nil { + return err + } + + podTemplate, err := buildV3PodTemplate(ctx, stack, ledger, image) + if err != nil { + return err + } + + headlessSvcName := "ledger-raft" + stsName := "ledger" + + _, _, err = core.CreateOrUpdate[*appsv1.StatefulSet](ctx, types.NamespacedName{ + Name: stsName, + Namespace: stackName, + }, + func(t *appsv1.StatefulSet) error { + t.Spec = appsv1.StatefulSetSpec{ + Replicas: &replicas, + ServiceName: headlessSvcName, + PodManagementPolicy: appsv1.OrderedReadyPodManagement, + Selector: &metav1.LabelSelector{ + MatchLabels: map[string]string{ + "app.kubernetes.io/name": "ledger", + }, + }, + Template: *podTemplate, + VolumeClaimTemplates: volumeClaims, + } + return nil + }, + core.WithController[*appsv1.StatefulSet](ctx.GetScheme(), ledger), + ) + return err +} + +func buildV3PodTemplate(ctx core.Context, stack *v1beta1.Stack, ledger *v1beta1.Ledger, image *registries.ImageConfiguration) (*corev1.PodTemplateSpec, error) { + stackName := stack.Name + + otlpEnv, err := settings.GetOTELEnvVars(ctx, stackName, core.LowerCamelCaseKind(ctx, ledger), " ") + if err != nil { + return nil, err + } + + clusterID, err := settings.GetStringOrDefault(ctx, stackName, "default", "ledger", "v3", "cluster-id") + if err != nil { + return nil, err + } + + dataDir := "/data/app" + walDir := "/data/raft" + + env := []corev1.EnvVar{ + core.Env("BIND_ADDR", fmt.Sprintf("0.0.0.0:%d", v3PortRaft)), + core.Env("CLUSTER_ID", clusterID), + core.Env("GRPC_PORT", fmt.Sprint(v3PortGRPC)), + core.Env("HTTP_PORT", fmt.Sprint(v3PortHTTP)), + core.Env("WAL_DIR", walDir), + core.Env("DATA_DIR", dataDir), + { + Name: "POD_NAME", + ValueFrom: &corev1.EnvVarSource{ + FieldRef: &corev1.ObjectFieldSelector{FieldPath: "metadata.name"}, + }, + }, + { + Name: "POD_NAMESPACE", + ValueFrom: &corev1.EnvVarSource{ + FieldRef: &corev1.ObjectFieldSelector{FieldPath: "metadata.namespace"}, + }, + }, + } + env = append(env, otlpEnv...) + env = append(env, core.GetDevEnvVars(stack, ledger)...) + + // Add pebble settings + pebbleEnv, err := buildV3PebbleEnvVars(ctx, stackName) + if err != nil { + return nil, err + } + env = append(env, pebbleEnv...) + + // Add raft settings + raftEnv, err := buildV3RaftEnvVars(ctx, stackName) + if err != nil { + return nil, err + } + env = append(env, raftEnv...) + + headlessSvcName := "ledger-raft" + command := buildV3Command(headlessSvcName, dataDir) + + container := corev1.Container{ + Name: "ledger", + Image: image.GetFullImageName(), + Command: []string{"/bin/sh", "-c"}, + Args: []string{command}, + Env: env, + Ports: []corev1.ContainerPort{ + {Name: "http", ContainerPort: v3PortHTTP}, + {Name: "grpc", ContainerPort: v3PortGRPC}, + {Name: "raft", ContainerPort: v3PortRaft}, + }, + VolumeMounts: []corev1.VolumeMount{ + {Name: "wal", MountPath: walDir}, + {Name: "data", MountPath: dataDir}, + {Name: "cold-cache", MountPath: "/data/cold-cache"}, + }, + LivenessProbe: &corev1.Probe{ + ProbeHandler: corev1.ProbeHandler{ + HTTPGet: &corev1.HTTPGetAction{ + Path: "/livez", + Port: intstr.FromString("http"), + }, + }, + FailureThreshold: 20, + }, + StartupProbe: &corev1.Probe{ + ProbeHandler: corev1.ProbeHandler{ + HTTPGet: &corev1.HTTPGetAction{ + Path: "/livez", + Port: intstr.FromString("http"), + }, + }, + FailureThreshold: 30, + PeriodSeconds: 10, + }, + ReadinessProbe: &corev1.Probe{ + ProbeHandler: corev1.ProbeHandler{ + HTTPGet: &corev1.HTTPGetAction{ + Path: "/readyz", + Port: intstr.FromString("http"), + }, + }, + }, + } + + return &corev1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{ + "app.kubernetes.io/name": "ledger", + }, + }, + Spec: corev1.PodSpec{ + ImagePullSecrets: image.PullSecrets, + Containers: []corev1.Container{container}, + }, + }, nil +} + +func buildV3Command(headlessSvc, dataDir string) string { + // Shell script that computes node-id from the StatefulSet ordinal index, + // builds the advertise-addr from the pod's DNS name within the headless service, + // and decides whether to --bootstrap or --join depending on ordinal. + // + // POD_NAME is like "ledger-0", POD_INDEX is extracted from the suffix. + // pod-0 bootstraps (if no existing state), other pods join pod-0. + lines := []string{ + // Extract the ordinal index from the pod name (e.g. "ledger-2" → "2") + `POD_INDEX=${POD_NAME##*-}`, + // Raft node IDs must be >= 1 + `NODE_ID=$((POD_INDEX + 1))`, + // FQDN within the headless service + fmt.Sprintf(`ADVERTISE_ADDR="${POD_NAME}.%s.${POD_NAMESPACE}.svc.cluster.local:%d"`, headlessSvc, v3PortRaft), + // First pod (ordinal 0) bootstraps if no checkpoint exists yet, otherwise normal start. + // Other pods join pod-0. + fmt.Sprintf(`BOOTSTRAP_ADDR="ledger-0.%s.${POD_NAMESPACE}.svc.cluster.local:%d"`, headlessSvc, v3PortGRPC), + `CLUSTER_FLAG=""`, + fmt.Sprintf(`if [ "$POD_INDEX" = "0" ]; then + if [ ! -d "%s/pebble" ]; then + CLUSTER_FLAG="--bootstrap" + fi +else + CLUSTER_FLAG="--join $BOOTSTRAP_ADDR" +fi`, dataDir), + // Exec into the ledger binary + `exec ./ledger run \`, + ` --node-id "$NODE_ID" \`, + ` --advertise-addr "$ADVERTISE_ADDR" \`, + ` $CLUSTER_FLAG`, + } + + return strings.Join(lines, "\n") +} + +func buildV3VolumeClaimTemplates(ctx core.Context, stackName string) ([]corev1.PersistentVolumeClaim, error) { + type volumeSpec struct { + name string + sizeKey string + defaultSize string + storageClassKey string + } + + specs := []volumeSpec{ + {"wal", "ledger.v3.persistence.wal.size", "5Gi", "ledger.v3.persistence.wal.storage-class"}, + {"data", "ledger.v3.persistence.data.size", "10Gi", "ledger.v3.persistence.data.storage-class"}, + {"cold-cache", "ledger.v3.persistence.cold-cache.size", "10Gi", "ledger.v3.persistence.cold-cache.storage-class"}, + } + + var claims []corev1.PersistentVolumeClaim + for _, s := range specs { + sizeStr, err := settings.GetStringOrDefault(ctx, stackName, s.defaultSize, strings.Split(s.sizeKey, ".")...) + if err != nil { + return nil, err + } + + storageClass, err := settings.GetStringOrEmpty(ctx, stackName, strings.Split(s.storageClassKey, ".")...) + if err != nil { + return nil, err + } + + pvc := corev1.PersistentVolumeClaim{ + ObjectMeta: metav1.ObjectMeta{ + Name: s.name, + }, + Spec: corev1.PersistentVolumeClaimSpec{ + AccessModes: []corev1.PersistentVolumeAccessMode{corev1.ReadWriteOnce}, + Resources: corev1.VolumeResourceRequirements{ + Requests: corev1.ResourceList{ + corev1.ResourceStorage: resource.MustParse(sizeStr), + }, + }, + }, + } + if storageClass != "" { + pvc.Spec.StorageClassName = &storageClass + } + claims = append(claims, pvc) + } + + return claims, nil +} + +// pebble setting key → env var name +var v3PebbleSettings = []struct { + key string + envVar string +}{ + {"cache-size", "PEBBLE_CACHE_SIZE"}, + {"memtable-size", "PEBBLE_MEMTABLE_SIZE"}, + {"memtable-stop-writes-threshold", "PEBBLE_MEMTABLE_STOP_WRITES_THRESHOLD"}, + {"l0-compaction-threshold", "PEBBLE_L0_COMPACTION_THRESHOLD"}, + {"l0-stop-writes-threshold", "PEBBLE_L0_STOP_WRITES_THRESHOLD"}, + {"lbase-max-bytes", "PEBBLE_LBASE_MAX_BYTES"}, + {"target-file-size", "PEBBLE_TARGET_FILE_SIZE"}, + {"max-concurrent-compactions", "PEBBLE_MAX_CONCURRENT_COMPACTIONS"}, +} + +func buildV3PebbleEnvVars(ctx core.Context, stackName string) ([]corev1.EnvVar, error) { + var envVars []corev1.EnvVar + for _, s := range v3PebbleSettings { + val, err := settings.GetStringOrEmpty(ctx, stackName, "ledger", "v3", "pebble", s.key) + if err != nil { + return nil, err + } + if val != "" { + envVars = append(envVars, core.Env(s.envVar, val)) + } + } + return envVars, nil +} + +var v3RaftSettings = []struct { + key string + envVar string +}{ + {"snapshot-threshold", "RAFT_SNAPSHOT_THRESHOLD"}, + {"election-tick", "RAFT_ELECTION_TICK"}, + {"heartbeat-tick", "RAFT_HEARTBEAT_TICK"}, + {"tick-interval", "RAFT_TICK_INTERVAL"}, + {"max-size-per-msg", "RAFT_MAX_SIZE_PER_MSG"}, + {"max-inflight-msgs", "RAFT_MAX_INFLIGHT_MSGS"}, + {"compaction-margin", "RAFT_COMPACTION_MARGIN"}, +} + +func buildV3RaftEnvVars(ctx core.Context, stackName string) ([]corev1.EnvVar, error) { + var envVars []corev1.EnvVar + for _, s := range v3RaftSettings { + val, err := settings.GetStringOrEmpty(ctx, stackName, "ledger", "v3", "raft", s.key) + if err != nil { + return nil, err + } + if val != "" { + envVars = append(envVars, core.Env(s.envVar, val)) + } + } + return envVars, nil +} From dd0027882d965d0813a3b3bd3c3a58ceefc3dd40 Mon Sep 17 00:00:00 2001 From: Geoffrey Ragot Date: Wed, 25 Mar 2026 17:02:44 +0100 Subject: [PATCH 2/5] docs: add ledger v3 settings documentation Document all v3-specific Settings keys in both: - Settings reference table (01-Settings.md) - Ledger module page (03-Ledger.md) with architecture overview, YAML examples, and tables for persistence/Pebble/Raft tunables Co-Authored-By: Claude Opus 4.6 --- docs/04-Modules/03-Ledger.md | 115 ++++++++++++++++++ .../09-Configuration reference/01-Settings.md | 23 ++++ 2 files changed, 138 insertions(+) diff --git a/docs/04-Modules/03-Ledger.md b/docs/04-Modules/03-Ledger.md index 9bbaea5b..146439a8 100644 --- a/docs/04-Modules/03-Ledger.md +++ b/docs/04-Modules/03-Ledger.md @@ -100,3 +100,118 @@ Available fields: - `push-retry-period`: Retry period for failed pushes - `sync-period`: Synchronization period - `logs-page-size`: Number of logs per page + +## Ledger v3 + +Ledger v3 is an architecturally different version that uses Raft consensus with Pebble embedded storage instead of PostgreSQL. When the version is `>= v3.0.0-alpha`, the operator deploys a **StatefulSet** instead of a Deployment. + +### Requirements + +Ledger v3 does **not** require PostgreSQL or a message broker. Storage is fully embedded (Pebble LSM). + +### Architecture + +The operator creates the following resources for v3: + +| Resource | Purpose | +|----------|---------| +| `StatefulSet/ledger` | Raft cluster nodes with `OrderedReady` pod management | +| `Service/ledger-raft` (headless) | DNS-based peer discovery for Raft consensus | +| `Service/ledger` (ClusterIP) | Gateway-facing service, maps port 8080 to container port 9000 | +| 3 PVCs per pod | `wal`, `data`, `cold-cache` | + +### Cluster Settings + +```yaml +apiVersion: formance.com/v1beta1 +kind: Settings +metadata: + name: ledger-v3-replicas +spec: + stacks: ["*"] + key: ledger.v3.replicas + value: "3" +--- +apiVersion: formance.com/v1beta1 +kind: Settings +metadata: + name: ledger-v3-cluster-id +spec: + stacks: ["*"] + key: ledger.v3.cluster-id + value: default +``` + +- `ledger.v3.replicas`: Number of Raft nodes. **Must be odd** for quorum (default: 3). +- `ledger.v3.cluster-id`: Raft cluster identifier (default: "default"). + +### Persistence Settings + +Each pod gets three PVCs. Size and storage class are configurable: + +```yaml +apiVersion: formance.com/v1beta1 +kind: Settings +metadata: + name: ledger-v3-persistence +spec: + stacks: ["*"] + key: ledger.v3.persistence.wal.size + value: "5Gi" +--- +apiVersion: formance.com/v1beta1 +kind: Settings +metadata: + name: ledger-v3-data-size +spec: + stacks: ["*"] + key: ledger.v3.persistence.data.size + value: "10Gi" +--- +apiVersion: formance.com/v1beta1 +kind: Settings +metadata: + name: ledger-v3-cold-cache-size +spec: + stacks: ["*"] + key: ledger.v3.persistence.cold-cache.size + value: "10Gi" +``` + +| Key | Default | Description | +|-----|---------|-------------| +| `ledger.v3.persistence.wal.size` | 5Gi | WAL PVC size | +| `ledger.v3.persistence.wal.storage-class` | (cluster default) | WAL storage class | +| `ledger.v3.persistence.data.size` | 10Gi | Pebble data PVC size | +| `ledger.v3.persistence.data.storage-class` | (cluster default) | Data storage class | +| `ledger.v3.persistence.cold-cache.size` | 10Gi | Cold cache PVC size | +| `ledger.v3.persistence.cold-cache.storage-class` | (cluster default) | Cold cache storage class | + +### Pebble Tunables + +All Pebble settings are optional. When unset, the ledger binary defaults apply. + +| Key | Example | Description | +|-----|---------|-------------| +| `ledger.v3.pebble.cache-size` | 1073741824 | Block cache size in bytes | +| `ledger.v3.pebble.memtable-size` | 268435456 | Memtable size in bytes | +| `ledger.v3.pebble.memtable-stop-writes-threshold` | 2 | Memtable count before stopping writes | +| `ledger.v3.pebble.l0-compaction-threshold` | 4 | L0 files to trigger compaction | +| `ledger.v3.pebble.l0-stop-writes-threshold` | 12 | L0 files before stopping writes | +| `ledger.v3.pebble.lbase-max-bytes` | 67108864 | L1 max size in bytes | +| `ledger.v3.pebble.target-file-size` | 67108864 | SST file target size | +| `ledger.v3.pebble.max-concurrent-compactions` | 2 | Compaction parallelism | + +### Raft Tunables + +All Raft settings are optional. When unset, the ledger binary defaults apply. + +| Key | Example | Description | +|-----|---------|-------------| +| `ledger.v3.raft.snapshot-threshold` | 5000 | Log entries before snapshot | +| `ledger.v3.raft.election-tick` | 10 | Election timeout in ticks | +| `ledger.v3.raft.heartbeat-tick` | 1 | Heartbeat interval in ticks | +| `ledger.v3.raft.tick-interval` | 100ms | Duration of one tick | +| `ledger.v3.raft.max-size-per-msg` | 1048576 | Max message size in bytes | +| `ledger.v3.raft.max-inflight-msgs` | 256 | Max in-flight messages | +| `ledger.v3.raft.compaction-margin` | 1000 | Log retention after snapshot | diff --git a/docs/09-Configuration reference/01-Settings.md b/docs/09-Configuration reference/01-Settings.md index 6d56972a..a5594733 100644 --- a/docs/09-Configuration reference/01-Settings.md +++ b/docs/09-Configuration reference/01-Settings.md @@ -32,6 +32,29 @@ While we have some basic types (string, number, bool ...), we also have some com | ledger.worker.async-block-hasher | Map | max-block-size=1000, schedule="0 * * * * *" | Configure async block hasher for the Ledger worker (v2.3+). Fields: `max-block-size`, `schedule` | | ledger.worker.bucket-cleanup | Map | retention-period=720h, schedule="0 0 * * *" | Configure bucket cleanup for the Ledger worker (v2.4+). Fields: `retention-period`, `schedule` | | ledger.worker.pipelines | Map | pull-interval=5s, push-retry-period=10s, sync-period=1m, logs-page-size=100 | Configure pipelines for the Ledger worker (v2.3+). Fields: `pull-interval`, `push-retry-period`, `sync-period`, `logs-page-size` | +| ledger.v3.replicas | Int | 3 | Raft cluster node count (v3+). Must be odd for quorum. Default: 3 | +| ledger.v3.cluster-id | String | default | Raft cluster ID (v3+). Default: "default" | +| ledger.v3.persistence.wal.size | String | 5Gi | PVC size for the Raft write-ahead log (v3+). Default: 5Gi | +| ledger.v3.persistence.wal.storage-class | String | | Storage class for the WAL PVC (v3+). Empty = cluster default | +| ledger.v3.persistence.data.size | String | 10Gi | PVC size for the Pebble data directory (v3+). Default: 10Gi | +| ledger.v3.persistence.data.storage-class | String | | Storage class for the data PVC (v3+). Empty = cluster default | +| ledger.v3.persistence.cold-cache.size | String | 10Gi | PVC size for the cold storage cache (v3+). Default: 10Gi | +| ledger.v3.persistence.cold-cache.storage-class | String | | Storage class for the cold cache PVC (v3+). Empty = cluster default | +| ledger.v3.pebble.cache-size | String | 1073741824 | Pebble block cache size in bytes (v3+) | +| ledger.v3.pebble.memtable-size | String | 268435456 | Pebble memtable size in bytes (v3+) | +| ledger.v3.pebble.memtable-stop-writes-threshold | String | 2 | Pebble memtable count before stopping writes (v3+) | +| ledger.v3.pebble.l0-compaction-threshold | String | 4 | L0 file count to trigger compaction (v3+) | +| ledger.v3.pebble.l0-stop-writes-threshold | String | 12 | L0 file count before stopping writes (v3+) | +| ledger.v3.pebble.lbase-max-bytes | String | 67108864 | L1 max size in bytes (v3+) | +| ledger.v3.pebble.target-file-size | String | 67108864 | SST file target size in bytes (v3+) | +| ledger.v3.pebble.max-concurrent-compactions | String | 2 | Compaction parallelism (v3+) | +| ledger.v3.raft.snapshot-threshold | String | 5000 | Log entries before taking a snapshot (v3+) | +| ledger.v3.raft.election-tick | String | 10 | Election timeout in ticks (v3+) | +| ledger.v3.raft.heartbeat-tick | String | 1 | Heartbeat interval in ticks (v3+) | +| ledger.v3.raft.tick-interval | String | 100ms | Duration of one Raft tick (v3+) | +| ledger.v3.raft.max-size-per-msg | String | 1048576 | Max Raft message size in bytes (v3+) | +| ledger.v3.raft.max-inflight-msgs | String | 256 | Max in-flight Raft messages (v3+) | +| ledger.v3.raft.compaction-margin | String | 1000 | Raft log retention after snapshot (v3+) | | payments.encryption-key | string | | Payments data encryption key | | payments.worker.temporal-max-concurrent-workflow-task-pollers | Int | | Payments worker max concurrent workflow task pollers configuration | | payments.worker.temporal-max-concurrent-activity-task-pollers | Int | | Payments worker max concurrent activity task pollers configuration | From dd2cca4080c042c4c0c8d901952e9adede9f184e Mon Sep 17 00:00:00 2001 From: Geoffrey Ragot Date: Wed, 25 Mar 2026 17:57:24 +0100 Subject: [PATCH 3/5] feat(ledger): add v3 integration tests and fix settings prefix - Add 20 Ginkgo integration tests for v3 StatefulSet reconciler - Prefix all v3 settings with "module." for consistency - Fix version gate to use semver.Major == "v3" (avoids breaking v2 tests) - Remove redundant services.Create (GatewayHTTPAPI handles ClusterIP service) - Update docs with correct settings key paths Co-Authored-By: Claude Opus 4.6 --- config/rbac/role.yaml | 12 + docs/04-Modules/03-Ledger.md | 56 ++-- .../09-Configuration reference/01-Settings.md | 46 +-- internal/resources/ledgers/init.go | 2 +- internal/resources/ledgers/v3.go | 28 +- internal/tests/ledger_v3_controller_test.go | 306 ++++++++++++++++++ 6 files changed, 380 insertions(+), 70 deletions(-) create mode 100644 internal/tests/ledger_v3_controller_test.go diff --git a/config/rbac/role.yaml b/config/rbac/role.yaml index fcd0e7ca..b7b295bc 100644 --- a/config/rbac/role.yaml +++ b/config/rbac/role.yaml @@ -42,6 +42,18 @@ rules: - patch - update - watch +- apiGroups: + - apps + resources: + - statefulsets + verbs: + - create + - delete + - get + - list + - patch + - update + - watch - apiGroups: - batch resources: diff --git a/docs/04-Modules/03-Ledger.md b/docs/04-Modules/03-Ledger.md index 146439a8..383c1d00 100644 --- a/docs/04-Modules/03-Ledger.md +++ b/docs/04-Modules/03-Ledger.md @@ -129,7 +129,7 @@ metadata: name: ledger-v3-replicas spec: stacks: ["*"] - key: ledger.v3.replicas + key: module.ledger.v3.replicas value: "3" --- apiVersion: formance.com/v1beta1 @@ -138,12 +138,12 @@ metadata: name: ledger-v3-cluster-id spec: stacks: ["*"] - key: ledger.v3.cluster-id + key: module.ledger.v3.cluster-id value: default ``` -- `ledger.v3.replicas`: Number of Raft nodes. **Must be odd** for quorum (default: 3). -- `ledger.v3.cluster-id`: Raft cluster identifier (default: "default"). +- `module.ledger.v3.replicas`: Number of Raft nodes. **Must be odd** for quorum (default: 3). +- `module.ledger.v3.cluster-id`: Raft cluster identifier (default: "default"). ### Persistence Settings @@ -156,7 +156,7 @@ metadata: name: ledger-v3-persistence spec: stacks: ["*"] - key: ledger.v3.persistence.wal.size + key: module.ledger.v3.persistence.wal.size value: "5Gi" --- apiVersion: formance.com/v1beta1 @@ -165,7 +165,7 @@ metadata: name: ledger-v3-data-size spec: stacks: ["*"] - key: ledger.v3.persistence.data.size + key: module.ledger.v3.persistence.data.size value: "10Gi" --- apiVersion: formance.com/v1beta1 @@ -174,18 +174,18 @@ metadata: name: ledger-v3-cold-cache-size spec: stacks: ["*"] - key: ledger.v3.persistence.cold-cache.size + key: module.ledger.v3.persistence.cold-cache.size value: "10Gi" ``` | Key | Default | Description | |-----|---------|-------------| -| `ledger.v3.persistence.wal.size` | 5Gi | WAL PVC size | -| `ledger.v3.persistence.wal.storage-class` | (cluster default) | WAL storage class | -| `ledger.v3.persistence.data.size` | 10Gi | Pebble data PVC size | -| `ledger.v3.persistence.data.storage-class` | (cluster default) | Data storage class | -| `ledger.v3.persistence.cold-cache.size` | 10Gi | Cold cache PVC size | -| `ledger.v3.persistence.cold-cache.storage-class` | (cluster default) | Cold cache storage class | +| `module.ledger.v3.persistence.wal.size` | 5Gi | WAL PVC size | +| `module.ledger.v3.persistence.wal.storage-class` | (cluster default) | WAL storage class | +| `module.ledger.v3.persistence.data.size` | 10Gi | Pebble data PVC size | +| `module.ledger.v3.persistence.data.storage-class` | (cluster default) | Data storage class | +| `module.ledger.v3.persistence.cold-cache.size` | 10Gi | Cold cache PVC size | +| `module.ledger.v3.persistence.cold-cache.storage-class` | (cluster default) | Cold cache storage class | ### Pebble Tunables @@ -193,14 +193,14 @@ All Pebble settings are optional. When unset, the ledger binary defaults apply. | Key | Example | Description | |-----|---------|-------------| -| `ledger.v3.pebble.cache-size` | 1073741824 | Block cache size in bytes | -| `ledger.v3.pebble.memtable-size` | 268435456 | Memtable size in bytes | -| `ledger.v3.pebble.memtable-stop-writes-threshold` | 2 | Memtable count before stopping writes | -| `ledger.v3.pebble.l0-compaction-threshold` | 4 | L0 files to trigger compaction | -| `ledger.v3.pebble.l0-stop-writes-threshold` | 12 | L0 files before stopping writes | -| `ledger.v3.pebble.lbase-max-bytes` | 67108864 | L1 max size in bytes | -| `ledger.v3.pebble.target-file-size` | 67108864 | SST file target size | -| `ledger.v3.pebble.max-concurrent-compactions` | 2 | Compaction parallelism | +| `module.ledger.v3.pebble.cache-size` | 1073741824 | Block cache size in bytes | +| `module.ledger.v3.pebble.memtable-size` | 268435456 | Memtable size in bytes | +| `module.ledger.v3.pebble.memtable-stop-writes-threshold` | 2 | Memtable count before stopping writes | +| `module.ledger.v3.pebble.l0-compaction-threshold` | 4 | L0 files to trigger compaction | +| `module.ledger.v3.pebble.l0-stop-writes-threshold` | 12 | L0 files before stopping writes | +| `module.ledger.v3.pebble.lbase-max-bytes` | 67108864 | L1 max size in bytes | +| `module.ledger.v3.pebble.target-file-size` | 67108864 | SST file target size | +| `module.ledger.v3.pebble.max-concurrent-compactions` | 2 | Compaction parallelism | ### Raft Tunables @@ -208,10 +208,10 @@ All Raft settings are optional. When unset, the ledger binary defaults apply. | Key | Example | Description | |-----|---------|-------------| -| `ledger.v3.raft.snapshot-threshold` | 5000 | Log entries before snapshot | -| `ledger.v3.raft.election-tick` | 10 | Election timeout in ticks | -| `ledger.v3.raft.heartbeat-tick` | 1 | Heartbeat interval in ticks | -| `ledger.v3.raft.tick-interval` | 100ms | Duration of one tick | -| `ledger.v3.raft.max-size-per-msg` | 1048576 | Max message size in bytes | -| `ledger.v3.raft.max-inflight-msgs` | 256 | Max in-flight messages | -| `ledger.v3.raft.compaction-margin` | 1000 | Log retention after snapshot | +| `module.ledger.v3.raft.snapshot-threshold` | 5000 | Log entries before snapshot | +| `module.ledger.v3.raft.election-tick` | 10 | Election timeout in ticks | +| `module.ledger.v3.raft.heartbeat-tick` | 1 | Heartbeat interval in ticks | +| `module.ledger.v3.raft.tick-interval` | 100ms | Duration of one tick | +| `module.ledger.v3.raft.max-size-per-msg` | 1048576 | Max message size in bytes | +| `module.ledger.v3.raft.max-inflight-msgs` | 256 | Max in-flight messages | +| `module.ledger.v3.raft.compaction-margin` | 1000 | Log retention after snapshot | diff --git a/docs/09-Configuration reference/01-Settings.md b/docs/09-Configuration reference/01-Settings.md index a5594733..15c99896 100644 --- a/docs/09-Configuration reference/01-Settings.md +++ b/docs/09-Configuration reference/01-Settings.md @@ -32,29 +32,29 @@ While we have some basic types (string, number, bool ...), we also have some com | ledger.worker.async-block-hasher | Map | max-block-size=1000, schedule="0 * * * * *" | Configure async block hasher for the Ledger worker (v2.3+). Fields: `max-block-size`, `schedule` | | ledger.worker.bucket-cleanup | Map | retention-period=720h, schedule="0 0 * * *" | Configure bucket cleanup for the Ledger worker (v2.4+). Fields: `retention-period`, `schedule` | | ledger.worker.pipelines | Map | pull-interval=5s, push-retry-period=10s, sync-period=1m, logs-page-size=100 | Configure pipelines for the Ledger worker (v2.3+). Fields: `pull-interval`, `push-retry-period`, `sync-period`, `logs-page-size` | -| ledger.v3.replicas | Int | 3 | Raft cluster node count (v3+). Must be odd for quorum. Default: 3 | -| ledger.v3.cluster-id | String | default | Raft cluster ID (v3+). Default: "default" | -| ledger.v3.persistence.wal.size | String | 5Gi | PVC size for the Raft write-ahead log (v3+). Default: 5Gi | -| ledger.v3.persistence.wal.storage-class | String | | Storage class for the WAL PVC (v3+). Empty = cluster default | -| ledger.v3.persistence.data.size | String | 10Gi | PVC size for the Pebble data directory (v3+). Default: 10Gi | -| ledger.v3.persistence.data.storage-class | String | | Storage class for the data PVC (v3+). Empty = cluster default | -| ledger.v3.persistence.cold-cache.size | String | 10Gi | PVC size for the cold storage cache (v3+). Default: 10Gi | -| ledger.v3.persistence.cold-cache.storage-class | String | | Storage class for the cold cache PVC (v3+). Empty = cluster default | -| ledger.v3.pebble.cache-size | String | 1073741824 | Pebble block cache size in bytes (v3+) | -| ledger.v3.pebble.memtable-size | String | 268435456 | Pebble memtable size in bytes (v3+) | -| ledger.v3.pebble.memtable-stop-writes-threshold | String | 2 | Pebble memtable count before stopping writes (v3+) | -| ledger.v3.pebble.l0-compaction-threshold | String | 4 | L0 file count to trigger compaction (v3+) | -| ledger.v3.pebble.l0-stop-writes-threshold | String | 12 | L0 file count before stopping writes (v3+) | -| ledger.v3.pebble.lbase-max-bytes | String | 67108864 | L1 max size in bytes (v3+) | -| ledger.v3.pebble.target-file-size | String | 67108864 | SST file target size in bytes (v3+) | -| ledger.v3.pebble.max-concurrent-compactions | String | 2 | Compaction parallelism (v3+) | -| ledger.v3.raft.snapshot-threshold | String | 5000 | Log entries before taking a snapshot (v3+) | -| ledger.v3.raft.election-tick | String | 10 | Election timeout in ticks (v3+) | -| ledger.v3.raft.heartbeat-tick | String | 1 | Heartbeat interval in ticks (v3+) | -| ledger.v3.raft.tick-interval | String | 100ms | Duration of one Raft tick (v3+) | -| ledger.v3.raft.max-size-per-msg | String | 1048576 | Max Raft message size in bytes (v3+) | -| ledger.v3.raft.max-inflight-msgs | String | 256 | Max in-flight Raft messages (v3+) | -| ledger.v3.raft.compaction-margin | String | 1000 | Raft log retention after snapshot (v3+) | +| module.ledger.v3.replicas | Int | 3 | Raft cluster node count (v3+). Must be odd for quorum. Default: 3 | +| module.ledger.v3.cluster-id | String | default | Raft cluster ID (v3+). Default: "default" | +| module.ledger.v3.persistence.wal.size | String | 5Gi | PVC size for the Raft write-ahead log (v3+). Default: 5Gi | +| module.ledger.v3.persistence.wal.storage-class | String | | Storage class for the WAL PVC (v3+). Empty = cluster default | +| module.ledger.v3.persistence.data.size | String | 10Gi | PVC size for the Pebble data directory (v3+). Default: 10Gi | +| module.ledger.v3.persistence.data.storage-class | String | | Storage class for the data PVC (v3+). Empty = cluster default | +| module.ledger.v3.persistence.cold-cache.size | String | 10Gi | PVC size for the cold storage cache (v3+). Default: 10Gi | +| module.ledger.v3.persistence.cold-cache.storage-class | String | | Storage class for the cold cache PVC (v3+). Empty = cluster default | +| module.ledger.v3.pebble.cache-size | String | 1073741824 | Pebble block cache size in bytes (v3+) | +| module.ledger.v3.pebble.memtable-size | String | 268435456 | Pebble memtable size in bytes (v3+) | +| module.ledger.v3.pebble.memtable-stop-writes-threshold | String | 2 | Pebble memtable count before stopping writes (v3+) | +| module.ledger.v3.pebble.l0-compaction-threshold | String | 4 | L0 file count to trigger compaction (v3+) | +| module.ledger.v3.pebble.l0-stop-writes-threshold | String | 12 | L0 file count before stopping writes (v3+) | +| module.ledger.v3.pebble.lbase-max-bytes | String | 67108864 | L1 max size in bytes (v3+) | +| module.ledger.v3.pebble.target-file-size | String | 67108864 | SST file target size in bytes (v3+) | +| module.ledger.v3.pebble.max-concurrent-compactions | String | 2 | Compaction parallelism (v3+) | +| module.ledger.v3.raft.snapshot-threshold | String | 5000 | Log entries before taking a snapshot (v3+) | +| module.ledger.v3.raft.election-tick | String | 10 | Election timeout in ticks (v3+) | +| module.ledger.v3.raft.heartbeat-tick | String | 1 | Heartbeat interval in ticks (v3+) | +| module.ledger.v3.raft.tick-interval | String | 100ms | Duration of one Raft tick (v3+) | +| module.ledger.v3.raft.max-size-per-msg | String | 1048576 | Max Raft message size in bytes (v3+) | +| module.ledger.v3.raft.max-inflight-msgs | String | 256 | Max in-flight Raft messages (v3+) | +| module.ledger.v3.raft.compaction-margin | String | 1000 | Raft log retention after snapshot (v3+) | | payments.encryption-key | string | | Payments data encryption key | | payments.worker.temporal-max-concurrent-workflow-task-pollers | Int | | Payments worker max concurrent workflow task pollers configuration | | payments.worker.temporal-max-concurrent-activity-task-pollers | Int | | Payments worker max concurrent activity task pollers configuration | diff --git a/internal/resources/ledgers/init.go b/internal/resources/ledgers/init.go index dc64ec7f..871b172d 100644 --- a/internal/resources/ledgers/init.go +++ b/internal/resources/ledgers/init.go @@ -41,7 +41,7 @@ import ( //+kubebuilder:rbac:groups=apps,resources=statefulsets,verbs=get;list;watch;create;update;patch;delete func Reconcile(ctx Context, stack *v1beta1.Stack, ledger *v1beta1.Ledger, version string) error { - isV3 := !semver.IsValid(version) || semver.Compare(version, "v3.0.0-alpha") >= 0 + isV3 := semver.IsValid(version) && semver.Major(version) == "v3" if isV3 { return reconcileV3(ctx, stack, ledger, version) } diff --git a/internal/resources/ledgers/v3.go b/internal/resources/ledgers/v3.go index 254797aa..e13e34c2 100644 --- a/internal/resources/ledgers/v3.go +++ b/internal/resources/ledgers/v3.go @@ -15,7 +15,6 @@ import ( "github.com/formancehq/operator/v3/internal/core" "github.com/formancehq/operator/v3/internal/resources/gatewayhttpapis" "github.com/formancehq/operator/v3/internal/resources/registries" - "github.com/formancehq/operator/v3/internal/resources/services" "github.com/formancehq/operator/v3/internal/resources/settings" ) @@ -39,15 +38,8 @@ func reconcileV3(ctx core.Context, stack *v1beta1.Stack, ledger *v1beta1.Ledger, return err } - // ClusterIP service: port 8080 → container port 9000 (gateway compatibility) - if _, err := services.Create(ctx, ledger, "ledger", services.WithConfig(services.PortConfig{ - ServiceName: "ledger", - PortName: "http", - Port: 8080, - TargetPort: "http", - })); err != nil { - return err - } + // The GatewayHTTPAPI reconciler creates a ClusterIP service "ledger" with port 8080→"http". + // Since our container port named "http" is 9000, the service routes 8080→9000 automatically. if err := installV3StatefulSet(ctx, stack, ledger, imageConfiguration); err != nil { return err @@ -95,12 +87,12 @@ func createV3HeadlessService(ctx core.Context, stack *v1beta1.Stack, ledger *v1b func installV3StatefulSet(ctx core.Context, stack *v1beta1.Stack, ledger *v1beta1.Ledger, image *registries.ImageConfiguration) error { stackName := stack.Name - replicas, err := settings.GetInt32OrDefault(ctx, stackName, 3, "ledger", "v3", "replicas") + replicas, err := settings.GetInt32OrDefault(ctx, stackName, 3, "module", "ledger", "v3", "replicas") if err != nil { return err } if replicas%2 == 0 { - return fmt.Errorf("ledger.v3.replicas must be odd, got %d", replicas) + return fmt.Errorf("module.ledger.v3.replicas must be odd, got %d", replicas) } volumeClaims, err := buildV3VolumeClaimTemplates(ctx, stackName) @@ -148,7 +140,7 @@ func buildV3PodTemplate(ctx core.Context, stack *v1beta1.Stack, ledger *v1beta1. return nil, err } - clusterID, err := settings.GetStringOrDefault(ctx, stackName, "default", "ledger", "v3", "cluster-id") + clusterID, err := settings.GetStringOrDefault(ctx, stackName, "default", "module", "ledger", "v3", "cluster-id") if err != nil { return nil, err } @@ -298,9 +290,9 @@ func buildV3VolumeClaimTemplates(ctx core.Context, stackName string) ([]corev1.P } specs := []volumeSpec{ - {"wal", "ledger.v3.persistence.wal.size", "5Gi", "ledger.v3.persistence.wal.storage-class"}, - {"data", "ledger.v3.persistence.data.size", "10Gi", "ledger.v3.persistence.data.storage-class"}, - {"cold-cache", "ledger.v3.persistence.cold-cache.size", "10Gi", "ledger.v3.persistence.cold-cache.storage-class"}, + {"wal", "module.ledger.v3.persistence.wal.size", "5Gi", "module.ledger.v3.persistence.wal.storage-class"}, + {"data", "module.ledger.v3.persistence.data.size", "10Gi", "module.ledger.v3.persistence.data.storage-class"}, + {"cold-cache", "module.ledger.v3.persistence.cold-cache.size", "10Gi", "module.ledger.v3.persistence.cold-cache.storage-class"}, } var claims []corev1.PersistentVolumeClaim @@ -355,7 +347,7 @@ var v3PebbleSettings = []struct { func buildV3PebbleEnvVars(ctx core.Context, stackName string) ([]corev1.EnvVar, error) { var envVars []corev1.EnvVar for _, s := range v3PebbleSettings { - val, err := settings.GetStringOrEmpty(ctx, stackName, "ledger", "v3", "pebble", s.key) + val, err := settings.GetStringOrEmpty(ctx, stackName, "module", "ledger", "v3", "pebble", s.key) if err != nil { return nil, err } @@ -382,7 +374,7 @@ var v3RaftSettings = []struct { func buildV3RaftEnvVars(ctx core.Context, stackName string) ([]corev1.EnvVar, error) { var envVars []corev1.EnvVar for _, s := range v3RaftSettings { - val, err := settings.GetStringOrEmpty(ctx, stackName, "ledger", "v3", "raft", s.key) + val, err := settings.GetStringOrEmpty(ctx, stackName, "module", "ledger", "v3", "raft", s.key) if err != nil { return nil, err } diff --git a/internal/tests/ledger_v3_controller_test.go b/internal/tests/ledger_v3_controller_test.go new file mode 100644 index 00000000..651658f9 --- /dev/null +++ b/internal/tests/ledger_v3_controller_test.go @@ -0,0 +1,306 @@ +package tests_test + +import ( + "github.com/google/uuid" + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + appsv1 "k8s.io/api/apps/v1" + corev1 "k8s.io/api/core/v1" + + "github.com/formancehq/operator/v3/api/formance.com/v1beta1" + "github.com/formancehq/operator/v3/internal/core" + "github.com/formancehq/operator/v3/internal/resources/settings" + . "github.com/formancehq/operator/v3/internal/tests/internal" +) + +var _ = Describe("LedgerV3Controller", func() { + Context("When creating a Ledger with v3 version", func() { + var ( + stack *v1beta1.Stack + ledger *v1beta1.Ledger + ) + BeforeEach(func() { + stack = &v1beta1.Stack{ + ObjectMeta: RandObjectMeta(), + Spec: v1beta1.StackSpec{Version: "v3.0.0"}, + } + ledger = &v1beta1.Ledger{ + ObjectMeta: RandObjectMeta(), + Spec: v1beta1.LedgerSpec{ + StackDependency: v1beta1.StackDependency{ + Stack: stack.Name, + }, + }, + } + }) + JustBeforeEach(func() { + Expect(Create(stack)).To(Succeed()) + Expect(Create(ledger)).To(Succeed()) + }) + AfterEach(func() { + Expect(Delete(ledger)).To(Succeed()) + Expect(Delete(stack)).To(Succeed()) + }) + + It("Should create a StatefulSet", func() { + sts := &appsv1.StatefulSet{} + Eventually(func() error { + return LoadResource(stack.Name, "ledger", sts) + }).Should(Succeed()) + Expect(sts).To(BeControlledBy(ledger)) + }) + + It("Should create a StatefulSet with 3 replicas by default", func() { + sts := &appsv1.StatefulSet{} + Eventually(func() error { + return LoadResource(stack.Name, "ledger", sts) + }).Should(Succeed()) + Expect(*sts.Spec.Replicas).To(Equal(int32(3))) + }) + + It("Should create a StatefulSet with OrderedReady pod management", func() { + sts := &appsv1.StatefulSet{} + Eventually(func() error { + return LoadResource(stack.Name, "ledger", sts) + }).Should(Succeed()) + Expect(sts.Spec.PodManagementPolicy).To(Equal(appsv1.OrderedReadyPodManagement)) + }) + + It("Should create a StatefulSet using the headless service", func() { + sts := &appsv1.StatefulSet{} + Eventually(func() error { + return LoadResource(stack.Name, "ledger", sts) + }).Should(Succeed()) + Expect(sts.Spec.ServiceName).To(Equal("ledger-raft")) + }) + + It("Should create 3 volume claim templates (wal, data, cold-cache)", func() { + sts := &appsv1.StatefulSet{} + Eventually(func() error { + return LoadResource(stack.Name, "ledger", sts) + }).Should(Succeed()) + Expect(sts.Spec.VolumeClaimTemplates).To(HaveLen(3)) + Expect(sts.Spec.VolumeClaimTemplates[0].Name).To(Equal("wal")) + Expect(sts.Spec.VolumeClaimTemplates[1].Name).To(Equal("data")) + Expect(sts.Spec.VolumeClaimTemplates[2].Name).To(Equal("cold-cache")) + }) + + It("Should configure the container with 3 ports (http, grpc, raft)", func() { + sts := &appsv1.StatefulSet{} + Eventually(func() error { + return LoadResource(stack.Name, "ledger", sts) + }).Should(Succeed()) + container := sts.Spec.Template.Spec.Containers[0] + Expect(container.Ports).To(HaveLen(3)) + Expect(container.Ports).To(ContainElements( + HaveField("Name", "http"), + HaveField("Name", "grpc"), + HaveField("Name", "raft"), + )) + }) + + It("Should configure 3 volume mounts", func() { + sts := &appsv1.StatefulSet{} + Eventually(func() error { + return LoadResource(stack.Name, "ledger", sts) + }).Should(Succeed()) + container := sts.Spec.Template.Spec.Containers[0] + Expect(container.VolumeMounts).To(ConsistOf( + corev1.VolumeMount{Name: "wal", MountPath: "/data/raft"}, + corev1.VolumeMount{Name: "data", MountPath: "/data/app"}, + corev1.VolumeMount{Name: "cold-cache", MountPath: "/data/cold-cache"}, + )) + }) + + It("Should configure liveness, readiness, and startup probes", func() { + sts := &appsv1.StatefulSet{} + Eventually(func() error { + return LoadResource(stack.Name, "ledger", sts) + }).Should(Succeed()) + container := sts.Spec.Template.Spec.Containers[0] + Expect(container.LivenessProbe).NotTo(BeNil()) + Expect(container.LivenessProbe.HTTPGet.Path).To(Equal("/livez")) + Expect(container.ReadinessProbe).NotTo(BeNil()) + Expect(container.ReadinessProbe.HTTPGet.Path).To(Equal("/readyz")) + Expect(container.StartupProbe).NotTo(BeNil()) + Expect(container.StartupProbe.HTTPGet.Path).To(Equal("/livez")) + }) + + It("Should set CLUSTER_ID env var with default value", func() { + sts := &appsv1.StatefulSet{} + Eventually(func() error { + return LoadResource(stack.Name, "ledger", sts) + }).Should(Succeed()) + Expect(sts.Spec.Template.Spec.Containers[0].Env).To( + ContainElement(core.Env("CLUSTER_ID", "default")), + ) + }) + + It("Should set downward API env vars (POD_NAME, POD_NAMESPACE)", func() { + sts := &appsv1.StatefulSet{} + Eventually(func() error { + return LoadResource(stack.Name, "ledger", sts) + }).Should(Succeed()) + env := sts.Spec.Template.Spec.Containers[0].Env + Expect(env).To(ContainElement(HaveField("Name", "POD_NAME"))) + Expect(env).To(ContainElement(HaveField("Name", "POD_NAMESPACE"))) + }) + + It("Should create a headless service for Raft peer discovery", func() { + svc := &corev1.Service{} + Eventually(func() error { + return LoadResource(stack.Name, "ledger-raft", svc) + }).Should(Succeed()) + Expect(svc).To(BeControlledBy(ledger)) + Expect(svc.Spec.ClusterIP).To(Equal("None")) + Expect(svc.Spec.PublishNotReadyAddresses).To(BeTrue()) + Expect(svc.Spec.Ports).To(ContainElements( + HaveField("Name", "raft"), + HaveField("Name", "grpc"), + )) + }) + + It("Should create a GatewayHTTPAPI with health endpoint", func() { + httpAPI := &v1beta1.GatewayHTTPAPI{} + Eventually(func() error { + return LoadResource("", core.GetObjectName(stack.Name, "ledger"), httpAPI) + }).Should(Succeed()) + Expect(httpAPI.Spec.HealthCheckEndpoint).To(Equal("health")) + }) + + It("Should NOT create a Database object", func() { + Consistently(func() error { + return LoadResource("", core.GetObjectName(stack.Name, "ledger"), &v1beta1.Database{}) + }).ShouldNot(Succeed()) + }) + + It("Should use the correct image", func() { + sts := &appsv1.StatefulSet{} + Eventually(func() error { + return LoadResource(stack.Name, "ledger", sts) + }).Should(Succeed()) + Expect(sts.Spec.Template.Spec.Containers[0].Image).To(ContainSubstring("ledger")) + Expect(sts.Spec.Template.Spec.Containers[0].Image).To(ContainSubstring("v3.0.0")) + }) + + Context("with custom replicas setting", func() { + var replicasSetting *v1beta1.Settings + BeforeEach(func() { + replicasSetting = settings.New(uuid.NewString(), "module.ledger.v3.replicas", "5", stack.Name) + }) + JustBeforeEach(func() { + Expect(Create(replicasSetting)).To(Succeed()) + }) + AfterEach(func() { + Expect(Delete(replicasSetting)).To(Succeed()) + }) + It("Should create a StatefulSet with 5 replicas", func() { + sts := &appsv1.StatefulSet{} + Eventually(func(g Gomega) int32 { + g.Expect(LoadResource(stack.Name, "ledger", sts)).To(Succeed()) + return *sts.Spec.Replicas + }).Should(Equal(int32(5))) + }) + }) + + Context("with custom cluster-id setting", func() { + var clusterIDSetting *v1beta1.Settings + BeforeEach(func() { + clusterIDSetting = settings.New(uuid.NewString(), "module.ledger.v3.cluster-id", "my-cluster", stack.Name) + }) + JustBeforeEach(func() { + Expect(Create(clusterIDSetting)).To(Succeed()) + }) + AfterEach(func() { + Expect(Delete(clusterIDSetting)).To(Succeed()) + }) + It("Should set CLUSTER_ID env var to custom value", func() { + sts := &appsv1.StatefulSet{} + Eventually(func(g Gomega) []corev1.EnvVar { + g.Expect(LoadResource(stack.Name, "ledger", sts)).To(Succeed()) + return sts.Spec.Template.Spec.Containers[0].Env + }).Should(ContainElement(core.Env("CLUSTER_ID", "my-cluster"))) + }) + }) + + Context("with custom persistence sizes", func() { + var walSizeSetting *v1beta1.Settings + BeforeEach(func() { + walSizeSetting = settings.New(uuid.NewString(), "module.ledger.v3.persistence.wal.size", "20Gi", stack.Name) + }) + JustBeforeEach(func() { + Expect(Create(walSizeSetting)).To(Succeed()) + }) + AfterEach(func() { + Expect(Delete(walSizeSetting)).To(Succeed()) + }) + It("Should create WAL PVC with custom size", func() { + sts := &appsv1.StatefulSet{} + Eventually(func(g Gomega) string { + g.Expect(LoadResource(stack.Name, "ledger", sts)).To(Succeed()) + return sts.Spec.VolumeClaimTemplates[0].Spec.Resources.Requests.Storage().String() + }).Should(Equal("20Gi")) + }) + }) + + Context("with pebble settings", func() { + var cacheSizeSetting *v1beta1.Settings + BeforeEach(func() { + cacheSizeSetting = settings.New(uuid.NewString(), "module.ledger.v3.pebble.cache-size", "2147483648", stack.Name) + }) + JustBeforeEach(func() { + Expect(Create(cacheSizeSetting)).To(Succeed()) + }) + AfterEach(func() { + Expect(Delete(cacheSizeSetting)).To(Succeed()) + }) + It("Should set PEBBLE_CACHE_SIZE env var", func() { + sts := &appsv1.StatefulSet{} + Eventually(func(g Gomega) []corev1.EnvVar { + g.Expect(LoadResource(stack.Name, "ledger", sts)).To(Succeed()) + return sts.Spec.Template.Spec.Containers[0].Env + }).Should(ContainElement(core.Env("PEBBLE_CACHE_SIZE", "2147483648"))) + }) + }) + + Context("with raft settings", func() { + var snapshotSetting *v1beta1.Settings + BeforeEach(func() { + snapshotSetting = settings.New(uuid.NewString(), "module.ledger.v3.raft.snapshot-threshold", "10000", stack.Name) + }) + JustBeforeEach(func() { + Expect(Create(snapshotSetting)).To(Succeed()) + }) + AfterEach(func() { + Expect(Delete(snapshotSetting)).To(Succeed()) + }) + It("Should set RAFT_SNAPSHOT_THRESHOLD env var", func() { + sts := &appsv1.StatefulSet{} + Eventually(func(g Gomega) []corev1.EnvVar { + g.Expect(LoadResource(stack.Name, "ledger", sts)).To(Succeed()) + return sts.Spec.Template.Spec.Containers[0].Env + }).Should(ContainElement(core.Env("RAFT_SNAPSHOT_THRESHOLD", "10000"))) + }) + }) + + Context("with monitoring enabled", func() { + var otelTracesDSNSetting *v1beta1.Settings + BeforeEach(func() { + otelTracesDSNSetting = settings.New(uuid.NewString(), "opentelemetry.traces.dsn", "grpc://collector", stack.Name) + }) + JustBeforeEach(func() { + Expect(Create(otelTracesDSNSetting)).To(Succeed()) + }) + AfterEach(func() { + Expect(Delete(otelTracesDSNSetting)).To(Succeed()) + }) + It("Should add OTEL env vars to the StatefulSet", func() { + sts := &appsv1.StatefulSet{} + Eventually(func(g Gomega) []corev1.EnvVar { + g.Expect(LoadResource(stack.Name, "ledger", sts)).To(Succeed()) + return sts.Spec.Template.Spec.Containers[0].Env + }).Should(ContainElement(HaveField("Name", "OTEL_SERVICE_NAME"))) + }) + }) + }) +}) From 24a53f1b007c852a938c72074f2fe529836de176 Mon Sep 17 00:00:00 2001 From: Geoffrey Ragot Date: Wed, 25 Mar 2026 18:15:45 +0100 Subject: [PATCH 4/5] feat(ledger): add preStop hook for proper Raft scale-down On scale-down, the preStop lifecycle hook: 1. Calls POST /_admin/deregister to remove the node from the Raft cluster 2. Cleans the WAL directory so future re-joins start as fresh learners This ensures clean Raft membership management during StatefulSet scaling. Co-Authored-By: Claude Opus 4.6 --- internal/resources/ledgers/v3.go | 23 +++++++++++++++++++++ internal/tests/ledger_v3_controller_test.go | 14 +++++++++++++ 2 files changed, 37 insertions(+) diff --git a/internal/resources/ledgers/v3.go b/internal/resources/ledgers/v3.go index e13e34c2..c7e018a8 100644 --- a/internal/resources/ledgers/v3.go +++ b/internal/resources/ledgers/v3.go @@ -231,6 +231,13 @@ func buildV3PodTemplate(ctx core.Context, stack *v1beta1.Stack, ledger *v1beta1. }, }, }, + Lifecycle: &corev1.Lifecycle{ + PreStop: &corev1.LifecycleHandler{ + Exec: &corev1.ExecAction{ + Command: []string{"/bin/sh", "-c", buildV3PreStopScript(walDir)}, + }, + }, + }, } return &corev1.PodTemplateSpec{ @@ -281,6 +288,22 @@ fi`, dataDir), return strings.Join(lines, "\n") } +// buildV3PreStopScript returns a shell script executed by the Kubernetes preStop +// lifecycle hook before a pod is terminated. It deregisters the local node from +// the Raft cluster and cleans the WAL directory so that a future re-join (after +// scale-up) starts as a fresh learner. +func buildV3PreStopScript(walDir string) string { + lines := []string{ + // Best-effort deregister: call the admin endpoint to remove this node + // from the Raft cluster. Ignore errors (e.g., last node, or already removed). + fmt.Sprintf(`wget --post-data='' -q -O- http://localhost:%d/_admin/deregister || true`, v3PortHTTP), + // Clean WAL so that if this pod restarts (scale-up), it joins as a fresh learner. + fmt.Sprintf(`rm -rf %s/* || true`, walDir), + } + + return strings.Join(lines, "\n") +} + func buildV3VolumeClaimTemplates(ctx core.Context, stackName string) ([]corev1.PersistentVolumeClaim, error) { type volumeSpec struct { name string diff --git a/internal/tests/ledger_v3_controller_test.go b/internal/tests/ledger_v3_controller_test.go index 651658f9..50e2fa23 100644 --- a/internal/tests/ledger_v3_controller_test.go +++ b/internal/tests/ledger_v3_controller_test.go @@ -126,6 +126,20 @@ var _ = Describe("LedgerV3Controller", func() { Expect(container.StartupProbe.HTTPGet.Path).To(Equal("/livez")) }) + It("Should configure a preStop lifecycle hook for Raft deregistration", func() { + sts := &appsv1.StatefulSet{} + Eventually(func() error { + return LoadResource(stack.Name, "ledger", sts) + }).Should(Succeed()) + container := sts.Spec.Template.Spec.Containers[0] + Expect(container.Lifecycle).NotTo(BeNil()) + Expect(container.Lifecycle.PreStop).NotTo(BeNil()) + Expect(container.Lifecycle.PreStop.Exec).NotTo(BeNil()) + Expect(container.Lifecycle.PreStop.Exec.Command).To(HaveLen(3)) + Expect(container.Lifecycle.PreStop.Exec.Command[2]).To(ContainSubstring("/_admin/deregister")) + Expect(container.Lifecycle.PreStop.Exec.Command[2]).To(ContainSubstring("rm -rf")) + }) + It("Should set CLUSTER_ID env var with default value", func() { sts := &appsv1.StatefulSet{} Eventually(func() error { From a9652ecd14e334ce7cc00fddb4ec902b5d0ace0e Mon Sep 17 00:00:00 2001 From: Geoffrey Ragot Date: Wed, 25 Mar 2026 18:49:55 +0100 Subject: [PATCH 5/5] chore: regenerate helm RBAC template with StatefulSet permissions Co-Authored-By: Claude Opus 4.6 --- ....k8s.io_v1_clusterrole_formance-manager-role.yaml | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/helm/operator/templates/gen/rbac.authorization.k8s.io_v1_clusterrole_formance-manager-role.yaml b/helm/operator/templates/gen/rbac.authorization.k8s.io_v1_clusterrole_formance-manager-role.yaml index 8bb7f718..df20410f 100644 --- a/helm/operator/templates/gen/rbac.authorization.k8s.io_v1_clusterrole_formance-manager-role.yaml +++ b/helm/operator/templates/gen/rbac.authorization.k8s.io_v1_clusterrole_formance-manager-role.yaml @@ -41,6 +41,18 @@ rules: - patch - update - watch +- apiGroups: + - apps + resources: + - statefulsets + verbs: + - create + - delete + - get + - list + - patch + - update + - watch - apiGroups: - batch resources: