diff --git a/.changeset/backfill-retry-backoff.md b/.changeset/backfill-retry-backoff.md new file mode 100644 index 0000000..615b8a1 --- /dev/null +++ b/.changeset/backfill-retry-backoff.md @@ -0,0 +1,5 @@ +--- +"@chkit/plugin-backfill": patch +--- + +Fix backfill runtime issues: add exponential backoff between retries (configurable via `defaults.retryDelayMs`, default 1000ms), continue processing remaining chunks after one fails permanently (instead of stopping), and make `resume` automatically retry failed chunks without requiring `--replay-failed`. diff --git a/apps/docs/src/content/docs/plugins/backfill.md b/apps/docs/src/content/docs/plugins/backfill.md index 6353213..864a0bd 100644 --- a/apps/docs/src/content/docs/plugins/backfill.md +++ b/apps/docs/src/content/docs/plugins/backfill.md @@ -43,6 +43,7 @@ export default defineConfig({ chunkHours: 6, maxParallelChunks: 1, maxRetriesPerChunk: 3, + retryDelayMs: 1000, requireIdempotencyToken: true, timeColumn: 'created_at', }, @@ -133,6 +134,7 @@ Configuration is organized into three groups plus a top-level `stateDir`. | `chunkHours` | `number` | `6` | Hours per chunk | | `maxParallelChunks` | `number` | `1` | Max concurrent chunks | | `maxRetriesPerChunk` | `number` | `3` | Retry budget per chunk | +| `retryDelayMs` | `number` | `1000` | Exponential backoff delay between retries (milliseconds) | | `requireIdempotencyToken` | `boolean` | `true` | Generate deterministic tokens | | `timeColumn` | `string` | auto-detect | Fallback column name for time-based WHERE clause (overridden by schema-level config) | @@ -187,13 +189,13 @@ Execute a planned backfill with checkpointed chunk progress. ### `chkit plugin backfill resume` -Resume a backfill run from last checkpoint. +Resume a backfill run from last checkpoint. Automatically retries failed chunks. | Flag | Required | Description | |------|----------|-------------| | `--plan-id ` | Yes | Plan ID (16-char hex) | | `--replay-done` | No | Re-execute already-completed chunks | -| `--replay-failed` | No | Re-execute failed chunks | +| `--replay-failed` | No | Re-execute failed chunks (enabled by default on resume) | | `--force-overlap` | No | Allow concurrent runs for the same target | | `--force-compatibility` | No | Skip compatibility token check | | `--force-environment` | No | Skip environment mismatch check (plan was created for a different ClickHouse cluster/database) | @@ -276,7 +278,7 @@ chkit plugin backfill status --plan-id ```sh chkit plugin backfill plan --target analytics.events --from 2025-01-01 --to 2025-02-01 chkit plugin backfill run --plan-id # some chunks fail -chkit plugin backfill resume --plan-id --replay-failed +chkit plugin backfill resume --plan-id # automatically retries failed chunks ``` **CI enforcement:** diff --git a/packages/cli/src/plugin.test.ts b/packages/cli/src/plugin.test.ts index cd55d85..c05cd70 100644 --- a/packages/cli/src/plugin.test.ts +++ b/packages/cli/src/plugin.test.ts @@ -369,7 +369,7 @@ describe('plugin runtime', () => { chunkCounts: { done: number; failed: number } } expect(failedPayload.status).toBe('failed') - expect(failedPayload.chunkCounts.done).toBe(1) + expect(failedPayload.chunkCounts.done).toBe(2) expect(failedPayload.chunkCounts.failed).toBe(1) const resumed = runCli([ diff --git a/packages/plugin-backfill/src/options.ts b/packages/plugin-backfill/src/options.ts index bc6a785..4c8a82f 100644 --- a/packages/plugin-backfill/src/options.ts +++ b/packages/plugin-backfill/src/options.ts @@ -6,6 +6,7 @@ const DEFAULT_OPTIONS: NormalizedBackfillPluginOptions = { chunkHours: 6, maxParallelChunks: 1, maxRetriesPerChunk: 3, + retryDelayMs: 1000, requireIdempotencyToken: true, }, policy: { @@ -36,6 +37,14 @@ function parsePositiveNumber(value: unknown, key: string): number | undefined { return value } +function parseNonNegativeNumber(value: unknown, key: string): number | undefined { + if (value === undefined) return undefined + if (typeof value !== 'number' || !Number.isFinite(value) || value < 0) { + throw new BackfillConfigError(`Invalid plugin option "${key}". Expected a non-negative number.`) + } + return value +} + function parseBoolean(value: unknown, key: string): boolean | undefined { if (value === undefined) return undefined if (typeof value !== 'boolean') { @@ -72,6 +81,7 @@ function normalizeRuntimeOptions(options: Record): BackfillPlug options.defaults.maxRetriesPerChunk, 'defaults.maxRetriesPerChunk' ), + retryDelayMs: parseNonNegativeNumber(options.defaults.retryDelayMs, 'defaults.retryDelayMs'), requireIdempotencyToken: parseBoolean( options.defaults.requireIdempotencyToken, 'defaults.requireIdempotencyToken' diff --git a/packages/plugin-backfill/src/runtime.test.ts b/packages/plugin-backfill/src/runtime.test.ts index 19c4119..f03319c 100644 --- a/packages/plugin-backfill/src/runtime.test.ts +++ b/packages/plugin-backfill/src/runtime.test.ts @@ -76,6 +76,7 @@ describe('@chkit/plugin-backfill run lifecycle', () => { defaults: { chunkHours: 2, maxRetriesPerChunk: 1, + retryDelayMs: 0, }, }) @@ -105,7 +106,7 @@ describe('@chkit/plugin-backfill run lifecycle', () => { }) expect(firstRun.status.status).toBe('failed') - expect(firstRun.status.totals.done).toBe(1) + expect(firstRun.status.totals.done).toBe(2) expect(firstRun.status.totals.failed).toBe(1) const resumed = await resumeBackfillRun({ @@ -142,10 +143,10 @@ describe('@chkit/plugin-backfill run lifecycle', () => { metaDir: './chkit/meta', }) const planOptions = normalizeBackfillOptions({ - defaults: { chunkHours: 2, maxRetriesPerChunk: 1 }, + defaults: { chunkHours: 2, maxRetriesPerChunk: 1, retryDelayMs: 0 }, }) const changedOptions = normalizeBackfillOptions({ - defaults: { chunkHours: 2, maxRetriesPerChunk: 5 }, + defaults: { chunkHours: 2, maxRetriesPerChunk: 5, retryDelayMs: 0 }, }) const planned = await buildBackfillPlan({ @@ -228,7 +229,7 @@ describe('@chkit/plugin-backfill run lifecycle', () => { ).rejects.toThrow('already completed') const options2 = normalizeBackfillOptions({ - defaults: { chunkHours: 2, maxRetriesPerChunk: 1 }, + defaults: { chunkHours: 2, maxRetriesPerChunk: 1, retryDelayMs: 0 }, }) const planned2 = await buildBackfillPlan({ target: 'app.events', @@ -324,7 +325,7 @@ describe('@chkit/plugin-backfill execute callback', () => { metaDir: './chkit/meta', }) const options = normalizeBackfillOptions({ - defaults: { chunkHours: 2, maxRetriesPerChunk: 3 }, + defaults: { chunkHours: 2, maxRetriesPerChunk: 3, retryDelayMs: 0 }, }) const planned = await buildBackfillPlan({ @@ -374,7 +375,7 @@ describe('@chkit/plugin-backfill execute callback', () => { metaDir: './chkit/meta', }) const options = normalizeBackfillOptions({ - defaults: { chunkHours: 2, maxRetriesPerChunk: 2 }, + defaults: { chunkHours: 2, maxRetriesPerChunk: 2, retryDelayMs: 0 }, }) const planned = await buildBackfillPlan({ @@ -411,6 +412,105 @@ describe('@chkit/plugin-backfill execute callback', () => { }) }) +describe('@chkit/plugin-backfill continue past failures', () => { + test('continues to remaining chunks after a chunk fails permanently', async () => { + const dir = await mkdtemp(join(tmpdir(), 'chkit-backfill-plugin-')) + const configPath = join(dir, 'clickhouse.config.ts') + + try { + const config = resolveConfig({ + schema: './schema.ts', + metaDir: './chkit/meta', + }) + const options = normalizeBackfillOptions({ + defaults: { chunkHours: 2, maxRetriesPerChunk: 1, retryDelayMs: 0 }, + }) + + const planned = await buildBackfillPlan({ + target: 'app.events', + from: '2026-01-01T00:00:00.000Z', + to: '2026-01-01T06:00:00.000Z', + configPath, + config, + options, + }) + + const failChunkId = planned.plan.chunks[0]?.id + expect(failChunkId).toBeTruthy() + + const ran = await executeBackfillRun({ + planId: planned.plan.planId, + configPath, + config, + options, + execution: { + simulation: { failChunkId, failCount: 1 }, + }, + }) + + expect(ran.status.status).toBe('failed') + expect(ran.status.totals.done).toBe(2) + expect(ran.status.totals.failed).toBe(1) + expect(ran.run.chunks[0]?.status).toBe('failed') + expect(ran.run.chunks[1]?.status).toBe('done') + expect(ran.run.chunks[2]?.status).toBe('done') + } finally { + await rm(dir, { recursive: true, force: true }) + } + }) + + test('resume retries failed chunks without requiring --replay-failed', async () => { + const dir = await mkdtemp(join(tmpdir(), 'chkit-backfill-plugin-')) + const configPath = join(dir, 'clickhouse.config.ts') + + try { + const config = resolveConfig({ + schema: './schema.ts', + metaDir: './chkit/meta', + }) + const options = normalizeBackfillOptions({ + defaults: { chunkHours: 2, maxRetriesPerChunk: 1, retryDelayMs: 0 }, + }) + + const planned = await buildBackfillPlan({ + target: 'app.events', + from: '2026-01-01T00:00:00.000Z', + to: '2026-01-01T06:00:00.000Z', + configPath, + config, + options, + }) + + const failChunkId = planned.plan.chunks[1]?.id + expect(failChunkId).toBeTruthy() + + await executeBackfillRun({ + planId: planned.plan.planId, + configPath, + config, + options, + execution: { + simulation: { failChunkId, failCount: 1 }, + }, + }) + + // Resume WITHOUT --replay-failed — should still retry the failed chunk + const resumed = await resumeBackfillRun({ + planId: planned.plan.planId, + configPath, + config, + options, + }) + + expect(resumed.status.status).toBe('completed') + expect(resumed.status.totals.done).toBe(3) + expect(resumed.status.totals.failed).toBe(0) + } finally { + await rm(dir, { recursive: true, force: true }) + } + }) +}) + describe('@chkit/plugin-backfill check integration', () => { test('reports pending required backfills when plan exists but run is missing', async () => { const dir = await mkdtemp(join(tmpdir(), 'chkit-backfill-plugin-')) diff --git a/packages/plugin-backfill/src/runtime.ts b/packages/plugin-backfill/src/runtime.ts index 8c20a6d..5eda155 100644 --- a/packages/plugin-backfill/src/runtime.ts +++ b/packages/plugin-backfill/src/runtime.ts @@ -116,10 +116,15 @@ export async function evaluateBackfillCheck(input: { } } +function sleep(ms: number): Promise { + return new Promise((resolve) => setTimeout(resolve, ms)) +} + async function executeChunk(input: { run: BackfillRunState chunk: BackfillRunChunkState maxRetries: number + retryDelayMs: number runPath: string eventPath: string execute?: (sql: string) => Promise @@ -150,88 +155,46 @@ async function executeChunk(input: { const shouldSimulateFailure = input.simulation?.failChunkId === input.chunk.id && input.chunk.attempts <= failureBudget - if (!shouldSimulateFailure) { - let executionError: string | undefined - let executionResult: void | { rowsWritten?: number } | undefined - if (input.execute) { - try { - executionResult = await input.execute(input.chunk.sqlTemplate) - } catch (error) { - executionError = error instanceof Error ? error.message : String(error) - } - } + let attemptError: string | undefined + let executionResult: void | { rowsWritten?: number } | undefined - if (!executionError) { - input.chunk.status = 'done' - input.chunk.completedAt = nowIso() - input.chunk.lastError = undefined - if (executionResult && typeof executionResult === 'object' && typeof executionResult.rowsWritten === 'number') { - input.chunk.rowsWritten = executionResult.rowsWritten - } - - await persistRunAndEvent({ - run: input.run, - runPath: input.runPath, - eventPath: input.eventPath, - event: { - type: 'chunk_done', - planId: input.run.planId, - chunkId: input.chunk.id, - attempt: input.chunk.attempts, - }, - }) - - return { ok: true } + if (shouldSimulateFailure) { + attemptError = `Simulated failure for chunk ${input.chunk.id} attempt ${input.chunk.attempts}` + } else if (input.execute) { + try { + executionResult = await input.execute(input.chunk.sqlTemplate) + } catch (error) { + attemptError = error instanceof Error ? error.message : String(error) } + } - input.chunk.lastError = executionError - - if (input.chunk.attempts >= input.maxRetries) { - input.chunk.status = 'failed' - input.run.status = 'failed' - input.run.lastError = executionError - - await persistRunAndEvent({ - run: input.run, - runPath: input.runPath, - eventPath: input.eventPath, - event: { - type: 'chunk_failed_retry_exhausted', - planId: input.run.planId, - chunkId: input.chunk.id, - attempt: input.chunk.attempts, - message: executionError, - }, - }) - - return { ok: false, error: executionError } + if (!attemptError) { + input.chunk.status = 'done' + input.chunk.completedAt = nowIso() + input.chunk.lastError = undefined + if (executionResult && typeof executionResult === 'object' && typeof executionResult.rowsWritten === 'number') { + input.chunk.rowsWritten = executionResult.rowsWritten } - input.chunk.status = 'pending' - await persistRunAndEvent({ run: input.run, runPath: input.runPath, eventPath: input.eventPath, event: { - type: 'chunk_retry_scheduled', + type: 'chunk_done', planId: input.run.planId, chunkId: input.chunk.id, attempt: input.chunk.attempts, - nextAttempt: input.chunk.attempts + 1, }, }) - continue + return { ok: true } } - const errorMessage = `Simulated failure for chunk ${input.chunk.id} attempt ${input.chunk.attempts}` - input.chunk.lastError = errorMessage + input.chunk.lastError = attemptError if (input.chunk.attempts >= input.maxRetries) { input.chunk.status = 'failed' - input.run.status = 'failed' - input.run.lastError = errorMessage await persistRunAndEvent({ run: input.run, @@ -242,11 +205,11 @@ async function executeChunk(input: { planId: input.run.planId, chunkId: input.chunk.id, attempt: input.chunk.attempts, - message: errorMessage, + message: attemptError, }, }) - return { ok: false, error: errorMessage } + return { ok: false, error: attemptError } } input.chunk.status = 'pending' @@ -263,6 +226,11 @@ async function executeChunk(input: { nextAttempt: input.chunk.attempts + 1, }, }) + + if (input.retryDelayMs > 0) { + const delay = input.retryDelayMs * Math.pow(2, input.chunk.attempts - 1) + await sleep(delay) + } } return { @@ -279,6 +247,7 @@ async function executeRunLoop(input: { eventPath: string } execution: BackfillExecutionOptions + retryDelayMs: number execute?: (sql: string) => Promise }): Promise { const maxRetries = input.plan.options.maxRetriesPerChunk @@ -312,29 +281,8 @@ async function executeRunLoop(input: { if (chunk.status === 'failed') { if (!input.run.replayFailed) { - input.run.status = 'failed' - input.run.lastError = - chunk.lastError ?? - `Chunk ${chunk.id} is failed and resume requires --replay-failed to re-run failed chunks.` - - await persistRunAndEvent({ - run: input.run, - runPath: input.paths.runPath, - eventPath: input.paths.eventPath, - event: { - type: 'run_blocked_failed_chunk', - planId: input.plan.planId, - chunkId: chunk.id, - message: input.run.lastError, - }, - }) - - return { - run: input.run, - status: summarizeRunStatus(input.run, input.paths.runPath, input.paths.eventPath), - runPath: input.paths.runPath, - eventPath: input.paths.eventPath, - } + // Skip previously failed chunk — continue to remaining chunks + continue } chunk.status = 'pending' @@ -352,6 +300,7 @@ async function executeRunLoop(input: { run: input.run, chunk, maxRetries, + retryDelayMs: input.retryDelayMs, runPath: input.paths.runPath, eventPath: input.paths.eventPath, execute: input.execute, @@ -359,13 +308,37 @@ async function executeRunLoop(input: { }) if (!executed.ok) { - input.run.completedAt = nowIso() - return { - run: input.run, - status: summarizeRunStatus(input.run, input.paths.runPath, input.paths.eventPath), - runPath: input.paths.runPath, - eventPath: input.paths.eventPath, - } + // Continue processing remaining chunks instead of stopping + continue + } + } + + // Determine final run status after all chunks have been attempted + const failedChunks = input.run.chunks.filter((c) => c.status === 'failed') + + if (!aborted && failedChunks.length > 0) { + input.run.status = 'failed' + input.run.lastError = + failedChunks[failedChunks.length - 1]?.lastError ?? 'One or more chunks failed' + input.run.completedAt = nowIso() + + await persistRunAndEvent({ + run: input.run, + runPath: input.paths.runPath, + eventPath: input.paths.eventPath, + event: { + type: 'run_completed_with_failures', + planId: input.plan.planId, + failedCount: failedChunks.length, + totalCount: input.run.chunks.length, + }, + }) + + return { + run: input.run, + status: summarizeRunStatus(input.run, input.paths.runPath, input.paths.eventPath), + runPath: input.paths.runPath, + eventPath: input.paths.eventPath, } } @@ -501,6 +474,7 @@ export async function executeBackfillRun(input: { run, paths, execution, + retryDelayMs: input.options.defaults.retryDelayMs, execute: input.execute, }) } @@ -555,11 +529,19 @@ export async function resumeBackfillRun(input: { ) } + // Resume always retries failed chunks — the whole point of resume is to + // recover from failures. Users shouldn't need --replay-failed for this. + const execution: BackfillExecutionOptions = { + ...input.execution, + replayFailed: true, + } + return executeRunLoop({ plan, run, paths, - execution: input.execution ?? {}, + execution, + retryDelayMs: input.options.defaults.retryDelayMs, execute: input.execute, }) } diff --git a/packages/plugin-backfill/src/state.ts b/packages/plugin-backfill/src/state.ts index 5188b75..98896b4 100644 --- a/packages/plugin-backfill/src/state.ts +++ b/packages/plugin-backfill/src/state.ts @@ -44,6 +44,9 @@ export function computeCompatibilityToken(input: { plan: BackfillPlanState options: NormalizedBackfillPluginOptions }): string { + // Exclude retryDelayMs — it's a runtime behavior setting that doesn't + // affect data integrity and shouldn't break compatibility on upgrade. + const { retryDelayMs: _, ...compatDefaults } = input.options.defaults return hashId( stableSerialize({ planId: input.plan.planId, @@ -51,7 +54,7 @@ export function computeCompatibilityToken(input: { from: input.plan.from, to: input.plan.to, planOptions: input.plan.options, - runtimeDefaults: input.options.defaults, + runtimeDefaults: compatDefaults, runtimePolicy: input.options.policy, runtimeLimits: input.options.limits, }) diff --git a/packages/plugin-backfill/src/types.ts b/packages/plugin-backfill/src/types.ts index e4feb74..5254e95 100644 --- a/packages/plugin-backfill/src/types.ts +++ b/packages/plugin-backfill/src/types.ts @@ -4,6 +4,7 @@ export interface BackfillPluginDefaults { chunkHours?: number maxParallelChunks?: number maxRetriesPerChunk?: number + retryDelayMs?: number requireIdempotencyToken?: boolean timeColumn?: string } @@ -31,6 +32,7 @@ export interface NormalizedBackfillDefaults { chunkHours: number maxParallelChunks: number maxRetriesPerChunk: number + retryDelayMs: number requireIdempotencyToken: boolean timeColumn?: string }