Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/backfill-retry-backoff.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@chkit/plugin-backfill": patch
---

Fix backfill runtime issues: add exponential backoff between retries (configurable via `defaults.retryDelayMs`, default 1000ms), continue processing remaining chunks after one fails permanently (instead of stopping), and make `resume` automatically retry failed chunks without requiring `--replay-failed`.
8 changes: 5 additions & 3 deletions apps/docs/src/content/docs/plugins/backfill.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ export default defineConfig({
chunkHours: 6,
maxParallelChunks: 1,
maxRetriesPerChunk: 3,
retryDelayMs: 1000,
requireIdempotencyToken: true,
timeColumn: 'created_at',
},
Expand Down Expand Up @@ -133,6 +134,7 @@ Configuration is organized into three groups plus a top-level `stateDir`.
| `chunkHours` | `number` | `6` | Hours per chunk |
| `maxParallelChunks` | `number` | `1` | Max concurrent chunks |
| `maxRetriesPerChunk` | `number` | `3` | Retry budget per chunk |
| `retryDelayMs` | `number` | `1000` | Exponential backoff delay between retries (milliseconds) |
| `requireIdempotencyToken` | `boolean` | `true` | Generate deterministic tokens |
| `timeColumn` | `string` | auto-detect | Fallback column name for time-based WHERE clause (overridden by schema-level config) |

Expand Down Expand Up @@ -187,13 +189,13 @@ Execute a planned backfill with checkpointed chunk progress.

### `chkit plugin backfill resume`

Resume a backfill run from last checkpoint.
Resume a backfill run from last checkpoint. Automatically retries failed chunks.

| Flag | Required | Description |
|------|----------|-------------|
| `--plan-id <hex16>` | Yes | Plan ID (16-char hex) |
| `--replay-done` | No | Re-execute already-completed chunks |
| `--replay-failed` | No | Re-execute failed chunks |
| `--replay-failed` | No | Re-execute failed chunks (enabled by default on resume) |
| `--force-overlap` | No | Allow concurrent runs for the same target |
| `--force-compatibility` | No | Skip compatibility token check |
| `--force-environment` | No | Skip environment mismatch check (plan was created for a different ClickHouse cluster/database) |
Expand Down Expand Up @@ -276,7 +278,7 @@ chkit plugin backfill status --plan-id <planId>
```sh
chkit plugin backfill plan --target analytics.events --from 2025-01-01 --to 2025-02-01
chkit plugin backfill run --plan-id <planId> # some chunks fail
chkit plugin backfill resume --plan-id <planId> --replay-failed
chkit plugin backfill resume --plan-id <planId> # automatically retries failed chunks
```

**CI enforcement:**
Expand Down
2 changes: 1 addition & 1 deletion packages/cli/src/plugin.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -369,7 +369,7 @@ describe('plugin runtime', () => {
chunkCounts: { done: number; failed: number }
}
expect(failedPayload.status).toBe('failed')
expect(failedPayload.chunkCounts.done).toBe(1)
expect(failedPayload.chunkCounts.done).toBe(2)
expect(failedPayload.chunkCounts.failed).toBe(1)

const resumed = runCli([
Expand Down
10 changes: 10 additions & 0 deletions packages/plugin-backfill/src/options.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ const DEFAULT_OPTIONS: NormalizedBackfillPluginOptions = {
chunkHours: 6,
maxParallelChunks: 1,
maxRetriesPerChunk: 3,
retryDelayMs: 1000,
requireIdempotencyToken: true,
},
policy: {
Expand Down Expand Up @@ -36,6 +37,14 @@ function parsePositiveNumber(value: unknown, key: string): number | undefined {
return value
}

function parseNonNegativeNumber(value: unknown, key: string): number | undefined {
if (value === undefined) return undefined
if (typeof value !== 'number' || !Number.isFinite(value) || value < 0) {
throw new BackfillConfigError(`Invalid plugin option "${key}". Expected a non-negative number.`)
}
return value
}

function parseBoolean(value: unknown, key: string): boolean | undefined {
if (value === undefined) return undefined
if (typeof value !== 'boolean') {
Expand Down Expand Up @@ -72,6 +81,7 @@ function normalizeRuntimeOptions(options: Record<string, unknown>): BackfillPlug
options.defaults.maxRetriesPerChunk,
'defaults.maxRetriesPerChunk'
),
retryDelayMs: parseNonNegativeNumber(options.defaults.retryDelayMs, 'defaults.retryDelayMs'),
requireIdempotencyToken: parseBoolean(
options.defaults.requireIdempotencyToken,
'defaults.requireIdempotencyToken'
Expand Down
112 changes: 106 additions & 6 deletions packages/plugin-backfill/src/runtime.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ describe('@chkit/plugin-backfill run lifecycle', () => {
defaults: {
chunkHours: 2,
maxRetriesPerChunk: 1,
retryDelayMs: 0,
},
})

Expand Down Expand Up @@ -105,7 +106,7 @@ describe('@chkit/plugin-backfill run lifecycle', () => {
})

expect(firstRun.status.status).toBe('failed')
expect(firstRun.status.totals.done).toBe(1)
expect(firstRun.status.totals.done).toBe(2)
expect(firstRun.status.totals.failed).toBe(1)

const resumed = await resumeBackfillRun({
Expand Down Expand Up @@ -142,10 +143,10 @@ describe('@chkit/plugin-backfill run lifecycle', () => {
metaDir: './chkit/meta',
})
const planOptions = normalizeBackfillOptions({
defaults: { chunkHours: 2, maxRetriesPerChunk: 1 },
defaults: { chunkHours: 2, maxRetriesPerChunk: 1, retryDelayMs: 0 },
})
const changedOptions = normalizeBackfillOptions({
defaults: { chunkHours: 2, maxRetriesPerChunk: 5 },
defaults: { chunkHours: 2, maxRetriesPerChunk: 5, retryDelayMs: 0 },
})

const planned = await buildBackfillPlan({
Expand Down Expand Up @@ -228,7 +229,7 @@ describe('@chkit/plugin-backfill run lifecycle', () => {
).rejects.toThrow('already completed')

const options2 = normalizeBackfillOptions({
defaults: { chunkHours: 2, maxRetriesPerChunk: 1 },
defaults: { chunkHours: 2, maxRetriesPerChunk: 1, retryDelayMs: 0 },
})
const planned2 = await buildBackfillPlan({
target: 'app.events',
Expand Down Expand Up @@ -324,7 +325,7 @@ describe('@chkit/plugin-backfill execute callback', () => {
metaDir: './chkit/meta',
})
const options = normalizeBackfillOptions({
defaults: { chunkHours: 2, maxRetriesPerChunk: 3 },
defaults: { chunkHours: 2, maxRetriesPerChunk: 3, retryDelayMs: 0 },
})

const planned = await buildBackfillPlan({
Expand Down Expand Up @@ -374,7 +375,7 @@ describe('@chkit/plugin-backfill execute callback', () => {
metaDir: './chkit/meta',
})
const options = normalizeBackfillOptions({
defaults: { chunkHours: 2, maxRetriesPerChunk: 2 },
defaults: { chunkHours: 2, maxRetriesPerChunk: 2, retryDelayMs: 0 },
})

const planned = await buildBackfillPlan({
Expand Down Expand Up @@ -411,6 +412,105 @@ describe('@chkit/plugin-backfill execute callback', () => {
})
})

describe('@chkit/plugin-backfill continue past failures', () => {
test('continues to remaining chunks after a chunk fails permanently', async () => {
const dir = await mkdtemp(join(tmpdir(), 'chkit-backfill-plugin-'))
const configPath = join(dir, 'clickhouse.config.ts')

try {
const config = resolveConfig({
schema: './schema.ts',
metaDir: './chkit/meta',
})
const options = normalizeBackfillOptions({
defaults: { chunkHours: 2, maxRetriesPerChunk: 1, retryDelayMs: 0 },
})

const planned = await buildBackfillPlan({
target: 'app.events',
from: '2026-01-01T00:00:00.000Z',
to: '2026-01-01T06:00:00.000Z',
configPath,
config,
options,
})

const failChunkId = planned.plan.chunks[0]?.id
expect(failChunkId).toBeTruthy()

const ran = await executeBackfillRun({
planId: planned.plan.planId,
configPath,
config,
options,
execution: {
simulation: { failChunkId, failCount: 1 },
},
})

expect(ran.status.status).toBe('failed')
expect(ran.status.totals.done).toBe(2)
expect(ran.status.totals.failed).toBe(1)
expect(ran.run.chunks[0]?.status).toBe('failed')
expect(ran.run.chunks[1]?.status).toBe('done')
expect(ran.run.chunks[2]?.status).toBe('done')
} finally {
await rm(dir, { recursive: true, force: true })
}
})

test('resume retries failed chunks without requiring --replay-failed', async () => {
const dir = await mkdtemp(join(tmpdir(), 'chkit-backfill-plugin-'))
const configPath = join(dir, 'clickhouse.config.ts')

try {
const config = resolveConfig({
schema: './schema.ts',
metaDir: './chkit/meta',
})
const options = normalizeBackfillOptions({
defaults: { chunkHours: 2, maxRetriesPerChunk: 1, retryDelayMs: 0 },
})

const planned = await buildBackfillPlan({
target: 'app.events',
from: '2026-01-01T00:00:00.000Z',
to: '2026-01-01T06:00:00.000Z',
configPath,
config,
options,
})

const failChunkId = planned.plan.chunks[1]?.id
expect(failChunkId).toBeTruthy()

await executeBackfillRun({
planId: planned.plan.planId,
configPath,
config,
options,
execution: {
simulation: { failChunkId, failCount: 1 },
},
})

// Resume WITHOUT --replay-failed — should still retry the failed chunk
const resumed = await resumeBackfillRun({
planId: planned.plan.planId,
configPath,
config,
options,
})

expect(resumed.status.status).toBe('completed')
expect(resumed.status.totals.done).toBe(3)
expect(resumed.status.totals.failed).toBe(0)
} finally {
await rm(dir, { recursive: true, force: true })
}
})
})

describe('@chkit/plugin-backfill check integration', () => {
test('reports pending required backfills when plan exists but run is missing', async () => {
const dir = await mkdtemp(join(tmpdir(), 'chkit-backfill-plugin-'))
Expand Down
Loading