Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions apps/docs/src/content/docs/plugins/backfill.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ This document covers practical usage of the optional `backfill` plugin.

- Builds deterministic, immutable backfill plans that divide a time window into chunks.
- Executes backfills against ClickHouse with per-chunk checkpointing, automatic retries, and idempotency tokens.
- Detects materialized views and automatically generates correct CTE-wrapped replay queries.
- Detects materialized views and automatically generates replay queries with direct time-filter injection (no CTE wrapping).
- Supports resume from checkpoint, cancel, status monitoring, and doctor-style diagnostics.
- Integrates with [`chkit check`](/cli/check/) for CI enforcement of pending backfills.
- Persists all state as JSON/NDJSON on disk.
Expand Down Expand Up @@ -84,7 +84,7 @@ The plugin supports two strategies for backfilling data, chosen automatically ba

**Table backfill** (`table` strategy): For direct table targets, inserts data by selecting from the same table within the time window. This is the most common case.

**Materialized view replay** (`mv_replay` strategy): When the target is a materialized view's `to` table, the plugin detects the view's aggregation query and wraps it in a CTE (Common Table Expression). This re-materializes the aggregation for each chunk window, ensuring correctness for aggregate backfills. Requires `requireIdempotencyToken: true` for safe resumable retries.
**Materialized view replay** (`mv_replay` strategy): When the target is a materialized view's `to` table, the plugin detects the view's aggregation query and injects the chunk time filter directly into that query (before trailing clauses like `GROUP BY`). This avoids CTE-related type inference issues while still re-materializing the aggregation for each chunk window. Requires `requireIdempotencyToken: true` for safe resumable retries.

## Time column resolution

Expand Down
22 changes: 19 additions & 3 deletions packages/cli/src/bin/commands/generate/plan-pipeline.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,22 @@ import type {

import type { ColumnRenameMapping, TableRenameMapping } from './rename-mappings.js'

const SIMPLE_IDENTIFIER = /^[A-Za-z_][A-Za-z0-9_]*$/

function formatIdentifier(value: string): string {
const trimmed = value.trim()
if (SIMPLE_IDENTIFIER.test(trimmed)) return trimmed
return `\`${trimmed.replace(/`/g, '``')}\``
}

function quoteIdentifier(value: string): string {
return `\`${value.replace(/`/g, '``')}\``
}

function formatQualifiedName(database: string, name: string): string {
return `${formatIdentifier(database)}.${formatIdentifier(name)}`
}

export function applySelectedRenameSuggestions(
plan: MigrationPlan,
selectedSuggestions: ColumnRenameSuggestion[]
Expand Down Expand Up @@ -75,7 +91,7 @@ export function applyExplicitTableRenames(
type: 'create_database',
key: dbKey,
risk: 'safe',
sql: `CREATE DATABASE IF NOT EXISTS ${mapping.newDatabase};`,
sql: `CREATE DATABASE IF NOT EXISTS ${formatIdentifier(mapping.newDatabase)};`,
})
createDatabaseOps.add(dbKey)
}
Expand All @@ -84,7 +100,7 @@ export function applyExplicitTableRenames(
type: 'alter_table_rename_table',
key: `table:${mapping.newDatabase}.${mapping.newName}:rename_table`,
risk: 'caution',
sql: `RENAME TABLE ${mapping.oldDatabase}.${mapping.oldName} TO ${mapping.newDatabase}.${mapping.newName};`,
sql: `RENAME TABLE ${formatQualifiedName(mapping.oldDatabase, mapping.oldName)} TO ${formatQualifiedName(mapping.newDatabase, mapping.newName)};`,
})
}

Expand Down Expand Up @@ -129,7 +145,7 @@ export function buildExplicitColumnRenameSuggestions(
: 'Explicitly confirmed by schema metadata (renamedFrom).',
dropOperationKey,
addOperationKey,
confirmationSQL: `ALTER TABLE ${mapping.database}.${mapping.table} RENAME COLUMN \`${mapping.from}\` TO \`${mapping.to}\`;`,
confirmationSQL: `ALTER TABLE ${formatQualifiedName(mapping.database, mapping.table)} RENAME COLUMN ${quoteIdentifier(mapping.from)} TO ${quoteIdentifier(mapping.to)};`,
})
}

Expand Down
34 changes: 34 additions & 0 deletions packages/cli/src/bin/commands/migrate.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
import { describe, expect, test } from 'bun:test'
import { mkdtemp, rm, writeFile } from 'node:fs/promises'
import { join } from 'node:path'
import { tmpdir } from 'node:os'

import { __testUtils } from './migrate.js'

describe('migrate scoped selection', () => {
test('includes unannotated migrations as safety fallback', async () => {
const dir = await mkdtemp(join(tmpdir(), 'chkit-migrate-scope-'))
try {
await writeFile(
join(dir, '001_unannotated.sql'),
"ALTER TABLE app.events ADD COLUMN IF NOT EXISTS source String;"
)
await writeFile(
join(dir, '002_annotated.sql'),
[
'-- operation: alter_table_add_column key=table:app.events:column:source risk=safe',
"ALTER TABLE app.events ADD COLUMN IF NOT EXISTS source String;",
].join('\n')
)

const result = await __testUtils.filterPendingByScope(dir, ['001_unannotated.sql', '002_annotated.sql'], new Set(['app.events']))

expect(result.files).toEqual(['001_unannotated.sql', '002_annotated.sql'])
expect(result.warnings).toHaveLength(1)
expect(result.warnings[0]).toContain('001_unannotated.sql')
} finally {
await rm(dir, { recursive: true, force: true })
}
})
})

35 changes: 30 additions & 5 deletions packages/cli/src/bin/commands/migrate.ts
Original file line number Diff line number Diff line change
Expand Up @@ -71,13 +71,22 @@ async function filterPendingByScope(
migrationsDir: string,
pending: string[],
selectedTables: ReadonlySet<string>
): Promise<string[]> {
): Promise<{ files: string[]; warnings: string[] }> {
const selectedDatabases = new Set([...selectedTables].map((table) => table.split('.')[0] ?? ''))

const inScope: string[] = []
const warnings: string[] = []
for (const file of pending) {
const sql = await readFile(join(migrationsDir, file), 'utf8')
const operations = extractMigrationOperationSummaries(sql)
if (operations.length === 0) {
inScope.push(file)
warnings.push(
`Migration "${file}" has no operation metadata comments; included in scoped run as a safety fallback.`
)
continue
}

const matches = operations.some((operation) => {
const tableKey = tableKeyFromOperationKey(operation.key)
if (tableKey) return selectedTables.has(tableKey)
Expand All @@ -91,7 +100,11 @@ async function filterPendingByScope(
if (matches) inScope.push(file)
}

return inScope
return { files: inScope, warnings }
}

export const __testUtils = {
filterPendingByScope,
}

async function cmdMigrate(ctx: CommandRunContext): Promise<void> {
Expand Down Expand Up @@ -161,9 +174,10 @@ async function cmdMigrate(ctx: CommandRunContext): Promise<void> {
return
}

const pending = tableScope.enabled
const scopedSelection = tableScope.enabled
? await filterPendingByScope(migrationsDir, pendingAll, new Set(tableScope.matchedTables))
: pendingAll
: { files: pendingAll, warnings: [] }
const pending = scopedSelection.files

if (pending.length === 0) {
if (jsonMode) {
Expand All @@ -172,9 +186,13 @@ async function cmdMigrate(ctx: CommandRunContext): Promise<void> {
scope: tableScope,
pending: [],
applied: [],
warnings: scopedSelection.warnings.length > 0 ? scopedSelection.warnings : undefined,
})
} else {
console.log('No pending migrations.')
for (const warning of scopedSelection.warnings) {
console.warn(`Warning: ${warning}`)
}
}
return
}
Expand All @@ -186,7 +204,10 @@ async function cmdMigrate(ctx: CommandRunContext): Promise<void> {
}

if (jsonMode && !executeRequested) {
emitJson('migrate', planned)
emitJson('migrate', {
...planned,
warnings: scopedSelection.warnings.length > 0 ? scopedSelection.warnings : undefined,
})
return
}

Expand All @@ -197,6 +218,9 @@ async function cmdMigrate(ctx: CommandRunContext): Promise<void> {
}
console.log(`Pending migrations: ${pending.length}`)
for (const file of pending) console.log(`- ${file}`)
for (const warning of scopedSelection.warnings) {
console.warn(`Warning: ${warning}`)
}
}

let shouldExecute = executeRequested
Expand Down Expand Up @@ -315,6 +339,7 @@ async function cmdMigrate(ctx: CommandRunContext): Promise<void> {
mode: 'execute',
scope: tableScope,
applied: appliedNow,
warnings: scopedSelection.warnings.length > 0 ? scopedSelection.warnings : undefined,
})
return
}
Expand Down
8 changes: 7 additions & 1 deletion packages/cli/src/bin/journal-store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,12 @@ export interface JournalStore {
}

const DEFAULT_JOURNAL_TABLE = '_chkit_migrations'
const SIMPLE_IDENTIFIER = /^[A-Za-z_][A-Za-z0-9_]*$/

function formatIdentifier(value: string): string {
if (SIMPLE_IDENTIFIER.test(value)) return value
return `\`${value.replace(/`/g, '``')}\``
}

function resolveJournalTableName(): string {
const candidate = process.env.CHKIT_JOURNAL_TABLE?.trim()
Expand All @@ -36,7 +42,7 @@ function isRetryableInsertRace(error: unknown): boolean {
}

export function createJournalStore(db: ClickHouseExecutor): JournalStore {
const journalTable = resolveJournalTableName()
const journalTable = formatIdentifier(resolveJournalTableName())
const createTableSql = `CREATE TABLE IF NOT EXISTS ${journalTable} (
name String,
applied_at DateTime64(3, 'UTC'),
Expand Down
32 changes: 32 additions & 0 deletions packages/cli/src/drift.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,38 @@ describe('@chkit/cli drift comparer', () => {
expect(result).toBeNull()
})

test('treats wrapped low-cardinality nullable column forms as equivalent', () => {
const expected = table({
database: 'app',
name: 'users',
engine: 'MergeTree()',
columns: [
{ name: 'id', type: 'UInt64' },
{ name: 'region', type: 'LowCardinality(String)', nullable: true },
],
primaryKey: ['id'],
orderBy: ['id'],
})

const result = compareTableShape(expected, {
engine: 'MergeTree()',
primaryKey: '(id)',
orderBy: '(id)',
uniqueKey: undefined,
partitionBy: undefined,
columns: [
{ name: 'id', type: 'UInt64' },
{ name: 'region', type: 'LowCardinality(String)', nullable: true },
],
settings: {},
indexes: [],
projections: [],
ttl: undefined,
})

expect(result).toBeNull()
})

test('emits reason codes for semantic drift', () => {
const expected = table({
database: 'app',
Expand Down
5 changes: 5 additions & 0 deletions packages/clickhouse/src/create-table-parser.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,13 @@ function extractCreateTableBody(createTableQuery: string | undefined): string |
let stringQuote = "'"
for (let i = openIndex; i < left.length; i += 1) {
const char = left[i]
const next = left[i + 1]
if (!char) continue
if (inString) {
if (char === stringQuote && next === stringQuote) {
i += 1
continue
}
if (char === stringQuote && left[i - 1] !== '\\') {
inString = false
}
Expand Down
40 changes: 40 additions & 0 deletions packages/clickhouse/src/index.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ import {
parseSettingsFromCreateTableQuery,
parseTTLFromCreateTableQuery,
parseUniqueKeyFromCreateTableQuery,
parseSystemColumnType,
normalizeSkipIndexType,
} from './index'

describe('@chkit/clickhouse smoke', () => {
Expand Down Expand Up @@ -99,4 +101,42 @@ ORDER BY id;`
},
])
})

test('extracts projection body when column defaults contain doubled quotes', () => {
const query = `CREATE TABLE app.events
(
id UInt64,
source String DEFAULT 'it''s fine',
PROJECTION p_by_source (SELECT source, count() GROUP BY source)
)
ENGINE = MergeTree()
ORDER BY id;`

expect(parseProjectionsFromCreateTableQuery(query)).toEqual([
{
name: 'p_by_source',
query: 'SELECT source, count() GROUP BY source',
},
])
})

test('parses nullable wrappers for system column types without losing wrappers', () => {
expect(parseSystemColumnType('Nullable(String)')).toEqual({
type: 'String',
nullable: true,
})
expect(parseSystemColumnType('LowCardinality(Nullable(String))')).toEqual({
type: 'LowCardinality(String)',
nullable: true,
})
expect(parseSystemColumnType('Array(Nullable(String))')).toEqual({
type: 'Array(Nullable(String))',
nullable: undefined,
})
})

test('preserves unknown skip-index types from introspection', () => {
expect(normalizeSkipIndexType('set')).toBe('set')
expect(normalizeSkipIndexType('new_index_type_v2')).toBe('new_index_type_v2')
})
})
Loading
Loading