From edff5e119e072867c54bcca80073494c9b0659a5 Mon Sep 17 00:00:00 2001 From: KeKs0r Date: Sun, 1 Mar 2026 12:22:16 +0800 Subject: [PATCH 1/2] Update backfill docs --- .../docs/src/content/docs/plugins/backfill.md | 4 +- .../bin/commands/generate/plan-pipeline.ts | 22 +++- packages/cli/src/bin/commands/migrate.test.ts | 34 ++++++ packages/cli/src/bin/commands/migrate.ts | 35 +++++- packages/cli/src/bin/journal-store.ts | 8 +- packages/cli/src/drift.test.ts | 32 ++++++ .../clickhouse/src/create-table-parser.ts | 5 + packages/clickhouse/src/index.test.ts | 40 +++++++ packages/clickhouse/src/index.ts | 55 +++++++--- packages/core/src/conformance.test.ts | 75 +++++++++++++ packages/core/src/identifiers.ts | 18 ++++ packages/core/src/index.ts | 1 + packages/core/src/model-types.ts | 9 +- packages/core/src/planner.ts | 13 +-- packages/core/src/sql.ts | 37 +++---- packages/plugin-backfill/src/planner.test.ts | 21 ++++ packages/plugin-backfill/src/planner.ts | 101 ++++++++++++++++-- packages/plugin-pull/src/index.ts | 26 +++-- 18 files changed, 467 insertions(+), 69 deletions(-) create mode 100644 packages/cli/src/bin/commands/migrate.test.ts create mode 100644 packages/core/src/conformance.test.ts create mode 100644 packages/core/src/identifiers.ts diff --git a/apps/docs/src/content/docs/plugins/backfill.md b/apps/docs/src/content/docs/plugins/backfill.md index 13c104f..aff4faf 100644 --- a/apps/docs/src/content/docs/plugins/backfill.md +++ b/apps/docs/src/content/docs/plugins/backfill.md @@ -9,7 +9,7 @@ This document covers practical usage of the optional `backfill` plugin. - Builds deterministic, immutable backfill plans that divide a time window into chunks. - Executes backfills against ClickHouse with per-chunk checkpointing, automatic retries, and idempotency tokens. -- Detects materialized views and automatically generates correct CTE-wrapped replay queries. +- Detects materialized views and automatically generates replay queries with direct time-filter injection (no CTE wrapping). - Supports resume from checkpoint, cancel, status monitoring, and doctor-style diagnostics. - Integrates with [`chkit check`](/cli/check/) for CI enforcement of pending backfills. - Persists all state as JSON/NDJSON on disk. @@ -84,7 +84,7 @@ The plugin supports two strategies for backfilling data, chosen automatically ba **Table backfill** (`table` strategy): For direct table targets, inserts data by selecting from the same table within the time window. This is the most common case. -**Materialized view replay** (`mv_replay` strategy): When the target is a materialized view's `to` table, the plugin detects the view's aggregation query and wraps it in a CTE (Common Table Expression). This re-materializes the aggregation for each chunk window, ensuring correctness for aggregate backfills. Requires `requireIdempotencyToken: true` for safe resumable retries. +**Materialized view replay** (`mv_replay` strategy): When the target is a materialized view's `to` table, the plugin detects the view's aggregation query and injects the chunk time filter directly into that query (before trailing clauses like `GROUP BY`). This avoids CTE-related type inference issues while still re-materializing the aggregation for each chunk window. Requires `requireIdempotencyToken: true` for safe resumable retries. ## Time column resolution diff --git a/packages/cli/src/bin/commands/generate/plan-pipeline.ts b/packages/cli/src/bin/commands/generate/plan-pipeline.ts index 40af566..c61bcef 100644 --- a/packages/cli/src/bin/commands/generate/plan-pipeline.ts +++ b/packages/cli/src/bin/commands/generate/plan-pipeline.ts @@ -8,6 +8,22 @@ import type { import type { ColumnRenameMapping, TableRenameMapping } from './rename-mappings.js' +const SIMPLE_IDENTIFIER = /^[A-Za-z_][A-Za-z0-9_]*$/ + +function formatIdentifier(value: string): string { + const trimmed = value.trim() + if (SIMPLE_IDENTIFIER.test(trimmed)) return trimmed + return `\`${trimmed.replace(/`/g, '``')}\`` +} + +function quoteIdentifier(value: string): string { + return `\`${value.replace(/`/g, '``')}\`` +} + +function formatQualifiedName(database: string, name: string): string { + return `${formatIdentifier(database)}.${formatIdentifier(name)}` +} + export function applySelectedRenameSuggestions( plan: MigrationPlan, selectedSuggestions: ColumnRenameSuggestion[] @@ -75,7 +91,7 @@ export function applyExplicitTableRenames( type: 'create_database', key: dbKey, risk: 'safe', - sql: `CREATE DATABASE IF NOT EXISTS ${mapping.newDatabase};`, + sql: `CREATE DATABASE IF NOT EXISTS ${formatIdentifier(mapping.newDatabase)};`, }) createDatabaseOps.add(dbKey) } @@ -84,7 +100,7 @@ export function applyExplicitTableRenames( type: 'alter_table_rename_table', key: `table:${mapping.newDatabase}.${mapping.newName}:rename_table`, risk: 'caution', - sql: `RENAME TABLE ${mapping.oldDatabase}.${mapping.oldName} TO ${mapping.newDatabase}.${mapping.newName};`, + sql: `RENAME TABLE ${formatQualifiedName(mapping.oldDatabase, mapping.oldName)} TO ${formatQualifiedName(mapping.newDatabase, mapping.newName)};`, }) } @@ -129,7 +145,7 @@ export function buildExplicitColumnRenameSuggestions( : 'Explicitly confirmed by schema metadata (renamedFrom).', dropOperationKey, addOperationKey, - confirmationSQL: `ALTER TABLE ${mapping.database}.${mapping.table} RENAME COLUMN \`${mapping.from}\` TO \`${mapping.to}\`;`, + confirmationSQL: `ALTER TABLE ${formatQualifiedName(mapping.database, mapping.table)} RENAME COLUMN ${quoteIdentifier(mapping.from)} TO ${quoteIdentifier(mapping.to)};`, }) } diff --git a/packages/cli/src/bin/commands/migrate.test.ts b/packages/cli/src/bin/commands/migrate.test.ts new file mode 100644 index 0000000..cff3cd7 --- /dev/null +++ b/packages/cli/src/bin/commands/migrate.test.ts @@ -0,0 +1,34 @@ +import { describe, expect, test } from 'bun:test' +import { mkdtemp, rm, writeFile } from 'node:fs/promises' +import { join } from 'node:path' +import { tmpdir } from 'node:os' + +import { __testUtils } from './migrate.js' + +describe('migrate scoped selection', () => { + test('includes unannotated migrations as safety fallback', async () => { + const dir = await mkdtemp(join(tmpdir(), 'chkit-migrate-scope-')) + try { + await writeFile( + join(dir, '001_unannotated.sql'), + "ALTER TABLE app.events ADD COLUMN IF NOT EXISTS source String;" + ) + await writeFile( + join(dir, '002_annotated.sql'), + [ + '-- operation: alter_table_add_column key=table:app.events:column:source risk=safe', + "ALTER TABLE app.events ADD COLUMN IF NOT EXISTS source String;", + ].join('\n') + ) + + const result = await __testUtils.filterPendingByScope(dir, ['001_unannotated.sql', '002_annotated.sql'], new Set(['app.events'])) + + expect(result.files).toEqual(['001_unannotated.sql', '002_annotated.sql']) + expect(result.warnings).toHaveLength(1) + expect(result.warnings[0]).toContain('001_unannotated.sql') + } finally { + await rm(dir, { recursive: true, force: true }) + } + }) +}) + diff --git a/packages/cli/src/bin/commands/migrate.ts b/packages/cli/src/bin/commands/migrate.ts index be25881..0716ebc 100644 --- a/packages/cli/src/bin/commands/migrate.ts +++ b/packages/cli/src/bin/commands/migrate.ts @@ -71,13 +71,22 @@ async function filterPendingByScope( migrationsDir: string, pending: string[], selectedTables: ReadonlySet -): Promise { +): Promise<{ files: string[]; warnings: string[] }> { const selectedDatabases = new Set([...selectedTables].map((table) => table.split('.')[0] ?? '')) const inScope: string[] = [] + const warnings: string[] = [] for (const file of pending) { const sql = await readFile(join(migrationsDir, file), 'utf8') const operations = extractMigrationOperationSummaries(sql) + if (operations.length === 0) { + inScope.push(file) + warnings.push( + `Migration "${file}" has no operation metadata comments; included in scoped run as a safety fallback.` + ) + continue + } + const matches = operations.some((operation) => { const tableKey = tableKeyFromOperationKey(operation.key) if (tableKey) return selectedTables.has(tableKey) @@ -91,7 +100,11 @@ async function filterPendingByScope( if (matches) inScope.push(file) } - return inScope + return { files: inScope, warnings } +} + +export const __testUtils = { + filterPendingByScope, } async function cmdMigrate(ctx: CommandRunContext): Promise { @@ -161,9 +174,10 @@ async function cmdMigrate(ctx: CommandRunContext): Promise { return } - const pending = tableScope.enabled + const scopedSelection = tableScope.enabled ? await filterPendingByScope(migrationsDir, pendingAll, new Set(tableScope.matchedTables)) - : pendingAll + : { files: pendingAll, warnings: [] } + const pending = scopedSelection.files if (pending.length === 0) { if (jsonMode) { @@ -172,9 +186,13 @@ async function cmdMigrate(ctx: CommandRunContext): Promise { scope: tableScope, pending: [], applied: [], + warnings: scopedSelection.warnings.length > 0 ? scopedSelection.warnings : undefined, }) } else { console.log('No pending migrations.') + for (const warning of scopedSelection.warnings) { + console.warn(`Warning: ${warning}`) + } } return } @@ -186,7 +204,10 @@ async function cmdMigrate(ctx: CommandRunContext): Promise { } if (jsonMode && !executeRequested) { - emitJson('migrate', planned) + emitJson('migrate', { + ...planned, + warnings: scopedSelection.warnings.length > 0 ? scopedSelection.warnings : undefined, + }) return } @@ -197,6 +218,9 @@ async function cmdMigrate(ctx: CommandRunContext): Promise { } console.log(`Pending migrations: ${pending.length}`) for (const file of pending) console.log(`- ${file}`) + for (const warning of scopedSelection.warnings) { + console.warn(`Warning: ${warning}`) + } } let shouldExecute = executeRequested @@ -315,6 +339,7 @@ async function cmdMigrate(ctx: CommandRunContext): Promise { mode: 'execute', scope: tableScope, applied: appliedNow, + warnings: scopedSelection.warnings.length > 0 ? scopedSelection.warnings : undefined, }) return } diff --git a/packages/cli/src/bin/journal-store.ts b/packages/cli/src/bin/journal-store.ts index 464f250..07d38e4 100644 --- a/packages/cli/src/bin/journal-store.ts +++ b/packages/cli/src/bin/journal-store.ts @@ -10,6 +10,12 @@ export interface JournalStore { } const DEFAULT_JOURNAL_TABLE = '_chkit_migrations' +const SIMPLE_IDENTIFIER = /^[A-Za-z_][A-Za-z0-9_]*$/ + +function formatIdentifier(value: string): string { + if (SIMPLE_IDENTIFIER.test(value)) return value + return `\`${value.replace(/`/g, '``')}\`` +} function resolveJournalTableName(): string { const candidate = process.env.CHKIT_JOURNAL_TABLE?.trim() @@ -36,7 +42,7 @@ function isRetryableInsertRace(error: unknown): boolean { } export function createJournalStore(db: ClickHouseExecutor): JournalStore { - const journalTable = resolveJournalTableName() + const journalTable = formatIdentifier(resolveJournalTableName()) const createTableSql = `CREATE TABLE IF NOT EXISTS ${journalTable} ( name String, applied_at DateTime64(3, 'UTC'), diff --git a/packages/cli/src/drift.test.ts b/packages/cli/src/drift.test.ts index 27be4d6..f66eab9 100644 --- a/packages/cli/src/drift.test.ts +++ b/packages/cli/src/drift.test.ts @@ -184,6 +184,38 @@ describe('@chkit/cli drift comparer', () => { expect(result).toBeNull() }) + test('treats wrapped low-cardinality nullable column forms as equivalent', () => { + const expected = table({ + database: 'app', + name: 'users', + engine: 'MergeTree()', + columns: [ + { name: 'id', type: 'UInt64' }, + { name: 'region', type: 'LowCardinality(String)', nullable: true }, + ], + primaryKey: ['id'], + orderBy: ['id'], + }) + + const result = compareTableShape(expected, { + engine: 'MergeTree()', + primaryKey: '(id)', + orderBy: '(id)', + uniqueKey: undefined, + partitionBy: undefined, + columns: [ + { name: 'id', type: 'UInt64' }, + { name: 'region', type: 'LowCardinality(String)', nullable: true }, + ], + settings: {}, + indexes: [], + projections: [], + ttl: undefined, + }) + + expect(result).toBeNull() + }) + test('emits reason codes for semantic drift', () => { const expected = table({ database: 'app', diff --git a/packages/clickhouse/src/create-table-parser.ts b/packages/clickhouse/src/create-table-parser.ts index 2e39f0a..b218677 100644 --- a/packages/clickhouse/src/create-table-parser.ts +++ b/packages/clickhouse/src/create-table-parser.ts @@ -33,8 +33,13 @@ function extractCreateTableBody(createTableQuery: string | undefined): string | let stringQuote = "'" for (let i = openIndex; i < left.length; i += 1) { const char = left[i] + const next = left[i + 1] if (!char) continue if (inString) { + if (char === stringQuote && next === stringQuote) { + i += 1 + continue + } if (char === stringQuote && left[i - 1] !== '\\') { inString = false } diff --git a/packages/clickhouse/src/index.test.ts b/packages/clickhouse/src/index.test.ts index 107c5ec..afd2f40 100644 --- a/packages/clickhouse/src/index.test.ts +++ b/packages/clickhouse/src/index.test.ts @@ -11,6 +11,8 @@ import { parseSettingsFromCreateTableQuery, parseTTLFromCreateTableQuery, parseUniqueKeyFromCreateTableQuery, + parseSystemColumnType, + normalizeSkipIndexType, } from './index' describe('@chkit/clickhouse smoke', () => { @@ -99,4 +101,42 @@ ORDER BY id;` }, ]) }) + + test('extracts projection body when column defaults contain doubled quotes', () => { + const query = `CREATE TABLE app.events +( + id UInt64, + source String DEFAULT 'it''s fine', + PROJECTION p_by_source (SELECT source, count() GROUP BY source) +) +ENGINE = MergeTree() +ORDER BY id;` + + expect(parseProjectionsFromCreateTableQuery(query)).toEqual([ + { + name: 'p_by_source', + query: 'SELECT source, count() GROUP BY source', + }, + ]) + }) + + test('parses nullable wrappers for system column types without losing wrappers', () => { + expect(parseSystemColumnType('Nullable(String)')).toEqual({ + type: 'String', + nullable: true, + }) + expect(parseSystemColumnType('LowCardinality(Nullable(String))')).toEqual({ + type: 'LowCardinality(String)', + nullable: true, + }) + expect(parseSystemColumnType('Array(Nullable(String))')).toEqual({ + type: 'Array(Nullable(String))', + nullable: undefined, + }) + }) + + test('preserves unknown skip-index types from introspection', () => { + expect(normalizeSkipIndexType('set')).toBe('set') + expect(normalizeSkipIndexType('new_index_type_v2')).toBe('new_index_type_v2') + }) }) diff --git a/packages/clickhouse/src/index.ts b/packages/clickhouse/src/index.ts index dcf93aa..b64770e 100644 --- a/packages/clickhouse/src/index.ts +++ b/packages/clickhouse/src/index.ts @@ -74,6 +74,11 @@ export interface IntrospectedTable { ttl?: string } +export interface ParsedSystemColumnType { + type: string + nullable?: boolean +} + export { parseEngineFromCreateTableQuery, parseOrderByFromCreateTableQuery, @@ -92,38 +97,62 @@ export function inferSchemaKindFromEngine(engine: string): SchemaObjectRef['kind return 'table' } +function unwrapTypeWrapper(type: string, wrapper: string): string | null { + const trimmed = type.trim() + const prefix = `${wrapper}(` + if (!trimmed.startsWith(prefix) || !trimmed.endsWith(')')) return null + return trimmed.slice(prefix.length, -1).trim() +} + +export function parseSystemColumnType(type: string): ParsedSystemColumnType { + let normalizedType = type.trim() + let nullable = false + + const outerNullable = unwrapTypeWrapper(normalizedType, 'Nullable') + if (outerNullable) { + normalizedType = outerNullable + nullable = true + } + + const lowCardinality = unwrapTypeWrapper(normalizedType, 'LowCardinality') + if (lowCardinality) { + const lowCardinalityNullable = unwrapTypeWrapper(lowCardinality, 'Nullable') + if (lowCardinalityNullable) { + normalizedType = `LowCardinality(${lowCardinalityNullable})` + nullable = true + } + } + + return { + type: normalizedType, + nullable: nullable || undefined, + } +} function normalizeColumnFromSystemRow(row: SystemColumnRow): ColumnDefinition { - const nullableMatch = row.type.match(/^Nullable\((.+)\)$/) - const type = nullableMatch?.[1] ? nullableMatch[1] : row.type - const nullable = Boolean(nullableMatch?.[1]) + const normalizedType = parseSystemColumnType(row.type) let defaultValue: ColumnDefinition['default'] | undefined if (row.default_expression && row.default_kind === 'DEFAULT') { defaultValue = normalizeSQLFragment(row.default_expression) } return { name: row.name, - type, - nullable: nullable || undefined, + type: normalizedType.type, + nullable: normalizedType.nullable, default: defaultValue, comment: row.comment?.trim() || undefined, } } -function normalizeIndexType(value: string): SkipIndexDefinition['type'] { - if (value === 'minmax') return 'minmax' - if (value === 'set') return 'set' - if (value === 'bloom_filter') return 'bloom_filter' - if (value === 'tokenbf_v1') return 'tokenbf_v1' - if (value === 'ngrambf_v1') return 'ngrambf_v1' - return 'set' +export function normalizeSkipIndexType(value: string): SkipIndexDefinition['type'] { + return value.trim() } function normalizeIndexFromSystemRow(row: SystemSkippingIndexRow): SkipIndexDefinition { return { name: row.name, expression: normalizeSQLFragment(row.expr), - type: normalizeIndexType(row.type), + type: normalizeSkipIndexType(row.type), granularity: row.granularity, } } diff --git a/packages/core/src/conformance.test.ts b/packages/core/src/conformance.test.ts new file mode 100644 index 0000000..32ee377 --- /dev/null +++ b/packages/core/src/conformance.test.ts @@ -0,0 +1,75 @@ +import { describe, expect, test } from 'bun:test' + +import { + canonicalizeDefinitions, + formatIdentifier, + planDiff, + table, + toCreateSQL, + view, + materializedView, +} from './index.js' + +describe('core conformance', () => { + test('canonicalization is idempotent and planner is stable on canonical output', () => { + const definitions = [ + table({ + database: 'app', + name: 'events', + columns: [ + { name: 'id', type: 'UInt64' }, + { name: 'source', type: 'LowCardinality(String)' }, + { name: 'recorded_at', type: 'DateTime64(3)', default: 'fn:now64(3)' }, + ], + engine: 'MergeTree', + primaryKey: ['id'], + orderBy: ['id'], + settings: { + index_granularity: 8192, + min_rows_for_wide_part: 0, + }, + indexes: [ + { name: 'idx_source', expression: 'source', type: 'set', granularity: 1 }, + ], + }), + view({ + database: 'app', + name: 'events_view', + as: 'SELECT id, source FROM app.events', + }), + materializedView({ + database: 'app', + name: 'events_mv', + to: { database: 'app', name: 'events' }, + as: 'SELECT id, source FROM app.events', + }), + ] + + const canonicalA = canonicalizeDefinitions(definitions) + const canonicalB = canonicalizeDefinitions(canonicalA) + expect(canonicalB).toEqual(canonicalA) + + const plan = planDiff(canonicalA, canonicalB) + expect(plan.operations).toEqual([]) + expect(plan.riskSummary).toEqual({ safe: 0, caution: 0, danger: 0 }) + }) + + test('renders quoted identifiers for non-simple names', () => { + const sql = toCreateSQL( + table({ + database: 'analytics-prod', + name: 'event-log', + columns: [{ name: 'user id', type: 'String' }], + engine: 'MergeTree', + primaryKey: ['user id'], + orderBy: ['user id'], + }) + ) + + expect(sql).toContain('CREATE TABLE IF NOT EXISTS `analytics-prod`.`event-log`') + expect(sql).toContain('`user id` String') + expect(formatIdentifier('simple_name')).toBe('simple_name') + expect(formatIdentifier('bad-name')).toBe('`bad-name`') + }) +}) + diff --git a/packages/core/src/identifiers.ts b/packages/core/src/identifiers.ts new file mode 100644 index 0000000..a6cece6 --- /dev/null +++ b/packages/core/src/identifiers.ts @@ -0,0 +1,18 @@ +const SIMPLE_IDENTIFIER = /^[A-Za-z_][A-Za-z0-9_]*$/ + +export function quoteIdentifier(value: string): string { + return `\`${value.replace(/`/g, '``')}\`` +} + +export function formatIdentifier(value: string): string { + const trimmed = value.trim() + if (trimmed.length === 0) { + throw new Error('Identifier cannot be empty.') + } + if (SIMPLE_IDENTIFIER.test(trimmed)) return trimmed + return quoteIdentifier(trimmed) +} + +export function formatQualifiedName(database: string, name: string): string { + return `${formatIdentifier(database)}.${formatIdentifier(name)}` +} diff --git a/packages/core/src/index.ts b/packages/core/src/index.ts index 3bfcd33..47dc265 100644 --- a/packages/core/src/index.ts +++ b/packages/core/src/index.ts @@ -9,6 +9,7 @@ export { planDiff } from './planner.js' export { createSnapshot } from './snapshot.js' export { loadSchemaDefinitions, type SchemaLoaderOptions } from './schema-loader.js' export { splitTopLevelComma } from './key-clause.js' +export { formatIdentifier, formatQualifiedName } from './identifiers.js' export { normalizeEngine, normalizeSQLFragment } from './sql-normalizer.js' export { toCreateSQL } from './sql.js' export { assertValidDefinitions, validateDefinitions } from './validate.js' diff --git a/packages/core/src/model-types.ts b/packages/core/src/model-types.ts index 6d991d6..412ea74 100644 --- a/packages/core/src/model-types.ts +++ b/packages/core/src/model-types.ts @@ -29,10 +29,17 @@ export interface ColumnDefinition { comment?: string } +export type KnownSkipIndexType = + | 'minmax' + | 'set' + | 'bloom_filter' + | 'tokenbf_v1' + | 'ngrambf_v1' + export interface SkipIndexDefinition { name: string expression: string - type: 'minmax' | 'set' | 'bloom_filter' | 'tokenbf_v1' | 'ngrambf_v1' + type: KnownSkipIndexType | (string & {}) granularity: number } diff --git a/packages/core/src/planner.ts b/packages/core/src/planner.ts index e715aba..e3fde12 100644 --- a/packages/core/src/planner.ts +++ b/packages/core/src/planner.ts @@ -22,6 +22,7 @@ import { renderAlterResetSetting, toCreateSQL, } from './sql.js' +import { formatIdentifier, formatQualifiedName, quoteIdentifier } from './identifiers.js' import { assertValidDefinitions } from './validate.js' function createMap(definitions: SchemaDefinition[]): Map { @@ -38,7 +39,7 @@ function pushDropOperation( type: 'drop_table', key: definitionKey(def), risk, - sql: `DROP TABLE IF EXISTS ${def.database}.${def.name};`, + sql: `DROP TABLE IF EXISTS ${formatQualifiedName(def.database, def.name)};`, }) return } @@ -47,7 +48,7 @@ function pushDropOperation( type: 'drop_view', key: definitionKey(def), risk, - sql: `DROP VIEW IF EXISTS ${def.database}.${def.name};`, + sql: `DROP VIEW IF EXISTS ${formatQualifiedName(def.database, def.name)};`, }) return } @@ -55,7 +56,7 @@ function pushDropOperation( type: 'drop_materialized_view', key: definitionKey(def), risk, - sql: `DROP TABLE IF EXISTS ${def.database}.${def.name} SYNC;`, + sql: `DROP TABLE IF EXISTS ${formatQualifiedName(def.database, def.name)} SYNC;`, }) } @@ -99,7 +100,7 @@ function pushCreateDatabaseOperation( type: 'create_database', key: `database:${database}`, risk, - sql: `CREATE DATABASE IF NOT EXISTS ${database};`, + sql: `CREATE DATABASE IF NOT EXISTS ${formatIdentifier(database)};`, }) } @@ -137,7 +138,7 @@ function normalizeColumn(column: ColumnDefinition): Omit `\`${column}\``) + .map((column) => quoteIdentifier(column)) .join(', ') } @@ -34,10 +35,10 @@ function renderTableSQL(def: TableDefinition): string { const columns = def.columns.map(renderColumn) const indexes = (def.indexes ?? []).map( (idx) => - `INDEX \`${idx.name}\` (${idx.expression}) TYPE ${idx.type} GRANULARITY ${idx.granularity}` + `INDEX ${quoteIdentifier(idx.name)} (${idx.expression}) TYPE ${idx.type} GRANULARITY ${idx.granularity}` ) const projections = (def.projections ?? []).map( - (projection) => `PROJECTION \`${projection.name}\` (${projection.query})` + (projection) => `PROJECTION ${quoteIdentifier(projection.name)} (${projection.query})` ) const body = [...columns, ...indexes, ...projections].join(',\n ') @@ -58,15 +59,15 @@ function renderTableSQL(def: TableDefinition): string { } if (def.comment) clauses.push(`COMMENT '${def.comment.replace(/'/g, "''")}'`) - return `CREATE TABLE IF NOT EXISTS ${def.database}.${def.name}\n(\n ${body}\n) ENGINE = ${def.engine}\n${clauses.join('\n')};` + return `CREATE TABLE IF NOT EXISTS ${formatQualifiedName(def.database, def.name)}\n(\n ${body}\n) ENGINE = ${def.engine}\n${clauses.join('\n')};` } function renderViewSQL(def: ViewDefinition): string { - return `CREATE VIEW IF NOT EXISTS ${def.database}.${def.name} AS\n${def.as};` + return `CREATE VIEW IF NOT EXISTS ${formatQualifiedName(def.database, def.name)} AS\n${def.as};` } function renderMaterializedViewSQL(def: MaterializedViewDefinition): string { - return `CREATE MATERIALIZED VIEW IF NOT EXISTS ${def.database}.${def.name} TO ${def.to.database}.${def.to.name} AS\n${def.as};` + return `CREATE MATERIALIZED VIEW IF NOT EXISTS ${formatQualifiedName(def.database, def.name)} TO ${formatQualifiedName(def.to.database, def.to.name)} AS\n${def.as};` } export function toCreateSQL(def: SchemaDefinition): string { @@ -77,31 +78,31 @@ export function toCreateSQL(def: SchemaDefinition): string { } export function renderAlterAddColumn(def: TableDefinition, column: ColumnDefinition): string { - return `ALTER TABLE ${def.database}.${def.name} ADD COLUMN IF NOT EXISTS ${renderColumn(column)};` + return `ALTER TABLE ${formatQualifiedName(def.database, def.name)} ADD COLUMN IF NOT EXISTS ${renderColumn(column)};` } export function renderAlterModifyColumn(def: TableDefinition, column: ColumnDefinition): string { - return `ALTER TABLE ${def.database}.${def.name} MODIFY COLUMN ${renderColumn(column)};` + return `ALTER TABLE ${formatQualifiedName(def.database, def.name)} MODIFY COLUMN ${renderColumn(column)};` } export function renderAlterDropColumn(def: TableDefinition, columnName: string): string { - return `ALTER TABLE ${def.database}.${def.name} DROP COLUMN IF EXISTS \`${columnName}\`;` + return `ALTER TABLE ${formatQualifiedName(def.database, def.name)} DROP COLUMN IF EXISTS ${quoteIdentifier(columnName)};` } export function renderAlterAddIndex(def: TableDefinition, index: SkipIndexDefinition): string { - return `ALTER TABLE ${def.database}.${def.name} ADD INDEX IF NOT EXISTS \`${index.name}\` (${index.expression}) TYPE ${index.type} GRANULARITY ${index.granularity};` + return `ALTER TABLE ${formatQualifiedName(def.database, def.name)} ADD INDEX IF NOT EXISTS ${quoteIdentifier(index.name)} (${index.expression}) TYPE ${index.type} GRANULARITY ${index.granularity};` } export function renderAlterDropIndex(def: TableDefinition, indexName: string): string { - return `ALTER TABLE ${def.database}.${def.name} DROP INDEX IF EXISTS \`${indexName}\`;` + return `ALTER TABLE ${formatQualifiedName(def.database, def.name)} DROP INDEX IF EXISTS ${quoteIdentifier(indexName)};` } export function renderAlterAddProjection(def: TableDefinition, projection: { name: string; query: string }): string { - return `ALTER TABLE ${def.database}.${def.name} ADD PROJECTION IF NOT EXISTS \`${projection.name}\` (${projection.query});` + return `ALTER TABLE ${formatQualifiedName(def.database, def.name)} ADD PROJECTION IF NOT EXISTS ${quoteIdentifier(projection.name)} (${projection.query});` } export function renderAlterDropProjection(def: TableDefinition, projectionName: string): string { - return `ALTER TABLE ${def.database}.${def.name} DROP PROJECTION IF EXISTS \`${projectionName}\`;` + return `ALTER TABLE ${formatQualifiedName(def.database, def.name)} DROP PROJECTION IF EXISTS ${quoteIdentifier(projectionName)};` } export function renderAlterModifySetting( @@ -109,16 +110,16 @@ export function renderAlterModifySetting( key: string, value: string | number | boolean ): string { - return `ALTER TABLE ${def.database}.${def.name} MODIFY SETTING ${key} = ${value};` + return `ALTER TABLE ${formatQualifiedName(def.database, def.name)} MODIFY SETTING ${formatIdentifier(key)} = ${value};` } export function renderAlterResetSetting(def: TableDefinition, key: string): string { - return `ALTER TABLE ${def.database}.${def.name} RESET SETTING ${key};` + return `ALTER TABLE ${formatQualifiedName(def.database, def.name)} RESET SETTING ${formatIdentifier(key)};` } export function renderAlterModifyTTL(def: TableDefinition, ttl: string | undefined): string { if (ttl === undefined) { - return `ALTER TABLE ${def.database}.${def.name} REMOVE TTL;` + return `ALTER TABLE ${formatQualifiedName(def.database, def.name)} REMOVE TTL;` } - return `ALTER TABLE ${def.database}.${def.name} MODIFY TTL ${ttl};` + return `ALTER TABLE ${formatQualifiedName(def.database, def.name)} MODIFY TTL ${ttl};` } diff --git a/packages/plugin-backfill/src/planner.test.ts b/packages/plugin-backfill/src/planner.test.ts index 6489df5..6a264c7 100644 --- a/packages/plugin-backfill/src/planner.test.ts +++ b/packages/plugin-backfill/src/planner.test.ts @@ -456,4 +456,25 @@ describe('injectTimeFilter', () => { // Inner WHERE must remain intact expect(result).toContain('WHERE inner = 1') }) + + test('handles SQL-escaped single quotes while scanning for keywords', () => { + const query = "SELECT * FROM app.events WHERE message = 'it''s fine' ORDER BY ts" + const result = injectTimeFilter(query, 'event_time', from, to) + + expect(result).toContain("message = 'it''s fine'") + expect(result).toContain("AND event_time >= parseDateTimeBestEffort('") + expect(result).toContain("AND event_time < parseDateTimeBestEffort('") + }) + + test('ignores keywords inside comments while choosing insertion point', () => { + const query = `SELECT id +FROM app.events +-- ORDER BY fake_col +/* WHERE fake = 1 */ +GROUP BY id` + const result = injectTimeFilter(query, 'event_time', from, to) + + expect(result).toContain("WHERE event_time >= parseDateTimeBestEffort('") + expect(result.indexOf('WHERE event_time')).toBeLessThan(result.indexOf('GROUP BY id')) + }) }) diff --git a/packages/plugin-backfill/src/planner.ts b/packages/plugin-backfill/src/planner.ts index 7599ba5..6a71225 100644 --- a/packages/plugin-backfill/src/planner.ts +++ b/packages/plugin-backfill/src/planner.ts @@ -21,6 +21,18 @@ import type { NormalizedBackfillPluginOptions, } from './types.js' +const SIMPLE_IDENTIFIER = /^[A-Za-z_][A-Za-z0-9_]*$/ + +function formatIdentifier(value: string): string { + const trimmed = value.trim() + if (SIMPLE_IDENTIFIER.test(trimmed)) return trimmed + return `\`${trimmed.replace(/`/g, '``')}\`` +} + +function formatQualifiedName(database: string, name: string): string { + return `${formatIdentifier(database)}.${formatIdentifier(name)}` +} + function ensureHoursWithinLimits(input: { from: string to: string @@ -48,6 +60,12 @@ function buildSettingsClause(token: string): string { return `SETTINGS async_insert=0` } +function formatTargetReference(target: string): string { + const [database, table, ...rest] = target.split('.') + if (!database || !table || rest.length > 0) return target + return formatQualifiedName(database, table) +} + /** * Inject a time-range filter directly into a SQL query. * @@ -72,19 +90,79 @@ export function injectTimeFilter( type KWHit = { keyword: string; position: number } const hits: KWHit[] = [] let depth = 0 + let inSingleQuote = false + let inDoubleQuote = false + let inBacktick = false + let inLineComment = false + let inBlockComment = false for (let i = 0; i < trimmed.length; i++) { const ch = trimmed[i] - if (ch === '(') { depth++; continue } - if (ch === ')') { depth--; continue } - if (ch === "'") { - i++ - while (i < trimmed.length && trimmed[i] !== "'") { - if (trimmed[i] === '\\') i++ - i++ + const next = trimmed[i + 1] + if (!ch) continue + + if (inLineComment) { + if (ch === '\n') inLineComment = false + continue + } + + if (inBlockComment) { + if (ch === '*' && next === '/') { + inBlockComment = false + i += 1 + } + continue + } + + if (!inSingleQuote && !inDoubleQuote && !inBacktick) { + if (ch === '-' && next === '-') { + inLineComment = true + i += 1 + continue + } + if (ch === '/' && next === '*') { + inBlockComment = true + i += 1 + continue } + } + + if (inSingleQuote) { + if (ch === "'" && next === "'") { + i += 1 + continue + } + if (ch === "'") inSingleQuote = false + continue + } + if (inDoubleQuote) { + if (ch === '"' && next === '"') { + i += 1 + continue + } + if (ch === '"') inDoubleQuote = false + continue + } + if (inBacktick) { + if (ch === '`') inBacktick = false + continue + } + + if (ch === "'") { + inSingleQuote = true + continue + } + if (ch === '"') { + inDoubleQuote = true continue } + if (ch === '`') { + inBacktick = true + continue + } + + if (ch === '(') { depth++; continue } + if (ch === ')') { depth = Math.max(0, depth - 1); continue } if (depth !== 0) continue // Must be preceded by whitespace or be at start @@ -131,20 +209,21 @@ function buildChunkSqlTemplate(chunk: { }): string { const header = `/* chkit backfill plan=${chunk.planId} chunk=${chunk.chunkId} token=${chunk.token} */` const settings = buildSettingsClause(chunk.token) + const formattedTarget = formatTargetReference(chunk.target) if (chunk.mvAsQuery) { const filtered = injectTimeFilter(chunk.mvAsQuery, chunk.timeColumn, chunk.from, chunk.to) const insertClause = chunk.targetColumns?.length - ? `INSERT INTO ${chunk.target} (${chunk.targetColumns.join(', ')})` - : `INSERT INTO ${chunk.target}` + ? `INSERT INTO ${formattedTarget} (${chunk.targetColumns.map((name) => formatIdentifier(name)).join(', ')})` + : `INSERT INTO ${formattedTarget}` return [header, insertClause, filtered, settings].join('\n') } return [ header, - `INSERT INTO ${chunk.target}`, + `INSERT INTO ${formattedTarget}`, `SELECT *`, - `FROM ${chunk.target}`, + `FROM ${formattedTarget}`, `WHERE ${chunk.timeColumn} >= parseDateTimeBestEffort('${chunk.from}')`, ` AND ${chunk.timeColumn} < parseDateTimeBestEffort('${chunk.to}')`, settings, diff --git a/packages/plugin-pull/src/index.ts b/packages/plugin-pull/src/index.ts index 75d88ae..1ca9c57 100644 --- a/packages/plugin-pull/src/index.ts +++ b/packages/plugin-pull/src/index.ts @@ -298,9 +298,13 @@ async function pullSchema(input: { if (usesDefaultIntrospector || selectedDatabases.length === 0) { const db = createClickHouseExecutor(input.config.clickhouse) - objects = await db.listSchemaObjects() - if (selectedDatabases.length === 0) { - selectedDatabases = [...new Set(objects.map((item) => item.database))].sort() + try { + objects = await db.listSchemaObjects() + if (selectedDatabases.length === 0) { + selectedDatabases = [...new Set(objects.map((item) => item.database))].sort() + } + } finally { + await db.close() } } @@ -329,12 +333,16 @@ async function defaultIntrospector(input: { databases: string[] }): Promise { const db = createClickHouseExecutor(input.config) - const tables = await db.listTableDetails(input.databases) - const nonTableRows = await listNonTableRows(db, input.databases) - const nonTableObjects = nonTableRows - .map(mapSystemTableRowToDefinition) - .filter((definition): definition is Exclude => definition !== null) - return [...tables.map((table) => ({ kind: 'table' as const, ...table })), ...nonTableObjects] + try { + const tables = await db.listTableDetails(input.databases) + const nonTableRows = await listNonTableRows(db, input.databases) + const nonTableObjects = nonTableRows + .map(mapSystemTableRowToDefinition) + .filter((definition): definition is Exclude => definition !== null) + return [...tables.map((table) => ({ kind: 'table' as const, ...table })), ...nonTableObjects] + } finally { + await db.close() + } } function mapIntrospectedTableToDefinition(table: IntrospectedTable): TableDefinition { From fcf4678523173327da4d6a08131585265575f4b9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Marc=20H=C3=B6ffl?= Date: Sun, 1 Mar 2026 05:21:11 +0100 Subject: [PATCH 2/2] fix(plugin-backfill): rewrite SELECT columns to match target table order in MV replay (#77) ClickHouse's INSERT INTO target (cols) SELECT ... maps columns positionally, not by name. When the materialized view query's SELECT output is in a different column order than the target table, data was being inserted into wrong columns. Now rewriteSelectColumns() parses the SELECT projection to build an alias map, then emits columns in target order. Fixes backfill of materialized view targets with columns in different order. Co-authored-by: Claude Opus 4.6 --- .../fix-mv-backfill-select-column-order.md | 5 + packages/plugin-backfill/src/planner.test.ts | 57 +++++++- packages/plugin-backfill/src/planner.ts | 125 +++++++++++++++++- 3 files changed, 179 insertions(+), 8 deletions(-) create mode 100644 .changeset/fix-mv-backfill-select-column-order.md diff --git a/.changeset/fix-mv-backfill-select-column-order.md b/.changeset/fix-mv-backfill-select-column-order.md new file mode 100644 index 0000000..09f7daa --- /dev/null +++ b/.changeset/fix-mv-backfill-select-column-order.md @@ -0,0 +1,5 @@ +--- +"@chkit/plugin-backfill": patch +--- + +Fix materialized view backfill INSERT by rewriting SELECT column order to match target table. ClickHouse's positional column mapping requires SELECT output columns to be in the same order as the INSERT target columns, not matched by name. diff --git a/packages/plugin-backfill/src/planner.test.ts b/packages/plugin-backfill/src/planner.test.ts index 6a264c7..68b7471 100644 --- a/packages/plugin-backfill/src/planner.test.ts +++ b/packages/plugin-backfill/src/planner.test.ts @@ -6,7 +6,7 @@ import { tmpdir } from 'node:os' import { resolveConfig } from '@chkit/core' import { normalizeBackfillOptions } from './options.js' -import { buildBackfillPlan, injectTimeFilter } from './planner.js' +import { buildBackfillPlan, injectTimeFilter, rewriteSelectColumns } from './planner.js' import { computeBackfillStateDir } from './state.js' describe('@chkit/plugin-backfill planning', () => { @@ -241,7 +241,7 @@ export const events_mv = { } }) - test('MV replay INSERT includes explicit column list from target table', async () => { + test('MV replay rewrites SELECT columns to match target table order', async () => { const dir = await mkdtemp(join(tmpdir(), 'chkit-backfill-plugin-')) const configPath = join(dir, 'clickhouse.config.ts') const schemaPath = join(dir, 'schema.ts') @@ -293,9 +293,12 @@ export const sessions_mv = { expect(output.plan.strategy).toBe('mv_replay') const chunk = output.plan.chunks[0] - // INSERT must include explicit column list to avoid positional mismatch + // INSERT should NOT include explicit column list (rewriteSelectColumns handles ordering) + expect(chunk?.sqlTemplate).toContain('INSERT INTO app.session_analytics') + expect(chunk?.sqlTemplate).not.toContain('INSERT INTO app.session_analytics (') + // SELECT must be rewritten with columns in target table order expect(chunk?.sqlTemplate).toContain( - 'INSERT INTO app.session_analytics (session_date, session_id, skills, slash_commands, ingested_at)' + "SELECT session_date, session_id, extractAll(content, 'skill') AS skills, extractAll(content, 'cmd') AS slash_commands, ingested_at" ) } finally { await rm(dir, { recursive: true, force: true }) @@ -369,6 +372,52 @@ export const sessions_mv = { }) }) +describe('rewriteSelectColumns', () => { + test('reorders SELECT columns to match target table order', () => { + const query = 'SELECT *, _foo as bar, _baz as qux FROM source WHERE status = 1' + const result = rewriteSelectColumns(query, ['col_a', 'bar', 'col_b', 'qux']) + + expect(result).toContain('SELECT col_a, _foo as bar, col_b, _baz as qux') + expect(result).toContain('FROM source') + expect(result).toContain('WHERE status = 1') + }) + + test('preserves WITH clause when rewriting SELECT', () => { + const query = [ + 'WITH', + " arrayDistinct(extractAll(content, '\\w+')) AS _skills,", + " toUInt64(JSONExtractFloat(meta, 'input')) AS _input_tokens", + 'SELECT *, _skills as skills, _input_tokens as input_tokens', + 'FROM app.sessions', + 'WHERE length(content) > 0', + ].join('\n') + + const result = rewriteSelectColumns(query, ['session_id', 'skills', 'content', 'input_tokens']) + + expect(result).toContain('arrayDistinct') + expect(result).toContain('_input_tokens') + expect(result).toContain('SELECT session_id, _skills as skills, content, _input_tokens as input_tokens') + expect(result).toContain('FROM app.sessions') + expect(result).toContain('WHERE length(content) > 0') + }) + + test('handles SELECT without star expansion', () => { + const query = 'SELECT toStartOfHour(event_time) AS event_time, count() AS cnt FROM events GROUP BY event_time' + const result = rewriteSelectColumns(query, ['cnt', 'event_time']) + + expect(result).toContain('SELECT count() AS cnt, toStartOfHour(event_time) AS event_time') + expect(result).toContain('FROM events') + expect(result).toContain('GROUP BY event_time') + }) + + test('returns query unchanged when SELECT/FROM cannot be found', () => { + const query = 'INSERT INTO t VALUES (1, 2)' + const result = rewriteSelectColumns(query, ['a', 'b']) + + expect(result).toBe(query) + }) +}) + describe('injectTimeFilter', () => { const from = '2025-01-01T00:00:00.000Z' const to = '2025-01-01T06:00:00.000Z' diff --git a/packages/plugin-backfill/src/planner.ts b/packages/plugin-backfill/src/planner.ts index 6a71225..ff97a06 100644 --- a/packages/plugin-backfill/src/planner.ts +++ b/packages/plugin-backfill/src/planner.ts @@ -196,6 +196,121 @@ export function injectTimeFilter( return `${before}\nWHERE ${timeCondition}${after ? '\n' + after : ''}` } +/** + * Rewrite a SQL query's SELECT projection so its output columns are in the + * same positional order as `targetColumns`. + * + * Parses the existing SELECT clause to build an alias→expression map, + * then emits columns in `targetColumns` order. Columns that came from + * a `*` expansion are emitted as bare names; aliased expressions (e.g. + * `_skills as skills`) are preserved with their original expression. + */ +export function rewriteSelectColumns(query: string, targetColumns: string[]): string { + const trimmed = query.trimEnd() + const upper = trimmed.toUpperCase() + + // Scan for top-level SELECT and FROM positions (outside parens and strings) + let selectPos = -1 + let fromPos = -1 + let depth = 0 + + for (let i = 0; i < trimmed.length; i++) { + const ch = trimmed[i] + if (ch === '(') { depth++; continue } + if (ch === ')') { depth--; continue } + if (ch === "'") { + i++ + while (i < trimmed.length && trimmed[i] !== "'") { + if (trimmed[i] === '\\') i++ + i++ + } + continue + } + if (depth !== 0) continue + + if (i > 0 && /\S/.test(trimmed[i - 1] ?? '')) continue + + const rest = upper.slice(i) + if (selectPos === -1 && rest.startsWith('SELECT') && (i + 6 >= trimmed.length || /\s/.test(trimmed[i + 6] ?? ''))) { + selectPos = i + } else if (selectPos !== -1 && fromPos === -1 && rest.startsWith('FROM') && (i + 4 >= trimmed.length || /\s/.test(trimmed[i + 4] ?? ''))) { + fromPos = i + } + } + + if (selectPos === -1 || fromPos === -1) return query + + const projStart = selectPos + 6 + const projText = trimmed.slice(projStart, fromPos).trim() + + // Split projection by top-level commas + const items: string[] = [] + let itemStart = 0 + depth = 0 + + for (let i = 0; i < projText.length; i++) { + const ch = projText[i] + if (ch === '(') { depth++; continue } + if (ch === ')') { depth--; continue } + if (ch === "'") { + i++ + while (i < projText.length && projText[i] !== "'") { + if (projText[i] === '\\') i++ + i++ + } + continue + } + if (depth === 0 && ch === ',') { + items.push(projText.slice(itemStart, i).trim()) + itemStart = i + 1 + } + } + items.push(projText.slice(itemStart).trim()) + + // Build alias → expression map from non-star items + const aliasMap = new Map() + for (const item of items) { + if (item === '*') continue + + const itemUpper = item.toUpperCase() + let asPos = -1 + let d = 0 + + for (let i = 0; i < item.length; i++) { + const ch = item[i] + if (ch === '(') { d++; continue } + if (ch === ')') { d--; continue } + if (ch === "'") { + i++ + while (i < item.length && item[i] !== "'") { + if (item[i] === '\\') i++ + i++ + } + continue + } + if (d !== 0) continue + if (i > 0 && /\S/.test(item[i - 1] ?? '')) continue + + const rest = itemUpper.slice(i) + if (rest.startsWith('AS') && (i + 2 >= item.length || /\s/.test(item[i + 2] ?? ''))) { + asPos = i + } + } + + if (asPos !== -1) { + const alias = item.slice(asPos + 2).trim() + aliasMap.set(alias, item) + } + } + + // Emit columns in target order + const rewrittenCols = targetColumns.map(col => aliasMap.get(col) ?? col) + + const before = trimmed.slice(0, projStart) + const after = trimmed.slice(fromPos) + return `${before} ${rewrittenCols.join(', ')}\n${after}` +} + function buildChunkSqlTemplate(chunk: { planId: string chunkId: string @@ -213,10 +328,12 @@ function buildChunkSqlTemplate(chunk: { if (chunk.mvAsQuery) { const filtered = injectTimeFilter(chunk.mvAsQuery, chunk.timeColumn, chunk.from, chunk.to) - const insertClause = chunk.targetColumns?.length - ? `INSERT INTO ${formattedTarget} (${chunk.targetColumns.map((name) => formatIdentifier(name)).join(', ')})` - : `INSERT INTO ${formattedTarget}` - return [header, insertClause, filtered, settings].join('\n') + if (chunk.targetColumns?.length) { + const reordered = rewriteSelectColumns(filtered, chunk.targetColumns) + const insertClause = `INSERT INTO ${formattedTarget} (${chunk.targetColumns.map((name) => formatIdentifier(name)).join(', ')})` + return [header, insertClause, reordered, settings].join('\n') + } + return [header, `INSERT INTO ${formattedTarget}`, filtered, settings].join('\n') } return [