diff --git a/apps/docs/src/content/docs/plugins/backfill.md b/apps/docs/src/content/docs/plugins/backfill.md index 2133440..22563ac 100644 --- a/apps/docs/src/content/docs/plugins/backfill.md +++ b/apps/docs/src/content/docs/plugins/backfill.md @@ -9,7 +9,7 @@ This document covers practical usage of the optional `backfill` plugin. - Builds deterministic, immutable backfill plans that divide a time window into chunks. - Executes backfills against ClickHouse with per-chunk checkpointing, automatic retries, and idempotency tokens. -- Detects materialized views and automatically generates correct CTE-wrapped replay queries. +- Detects materialized views and automatically generates replay queries with direct time-filter injection (no CTE wrapping). - Supports resume from checkpoint, cancel, status monitoring, and doctor-style diagnostics. - Integrates with [`chkit check`](/cli/check/) for CI enforcement of pending backfills. - Persists all state as JSON/NDJSON on disk. @@ -84,7 +84,7 @@ The plugin supports two strategies for backfilling data, chosen automatically ba **Table backfill** (`table` strategy): For direct table targets, inserts data by selecting from the same table within the time window. This is the most common case. -**Materialized view replay** (`mv_replay` strategy): When the target is a materialized view's `to` table, the plugin detects the view's aggregation query and wraps it in a CTE (Common Table Expression). This re-materializes the aggregation for each chunk window, ensuring correctness for aggregate backfills. Requires `requireIdempotencyToken: true` for safe resumable retries. +**Materialized view replay** (`mv_replay` strategy): When the target is a materialized view's `to` table, the plugin detects the view's aggregation query and injects the chunk time filter directly into that query (before trailing clauses like `GROUP BY`). This avoids CTE-related type inference issues while still re-materializing the aggregation for each chunk window. Requires `requireIdempotencyToken: true` for safe resumable retries. ## Time column resolution diff --git a/packages/cli/src/bin/commands/generate/plan-pipeline.ts b/packages/cli/src/bin/commands/generate/plan-pipeline.ts index 40af566..c61bcef 100644 --- a/packages/cli/src/bin/commands/generate/plan-pipeline.ts +++ b/packages/cli/src/bin/commands/generate/plan-pipeline.ts @@ -8,6 +8,22 @@ import type { import type { ColumnRenameMapping, TableRenameMapping } from './rename-mappings.js' +const SIMPLE_IDENTIFIER = /^[A-Za-z_][A-Za-z0-9_]*$/ + +function formatIdentifier(value: string): string { + const trimmed = value.trim() + if (SIMPLE_IDENTIFIER.test(trimmed)) return trimmed + return `\`${trimmed.replace(/`/g, '``')}\`` +} + +function quoteIdentifier(value: string): string { + return `\`${value.replace(/`/g, '``')}\`` +} + +function formatQualifiedName(database: string, name: string): string { + return `${formatIdentifier(database)}.${formatIdentifier(name)}` +} + export function applySelectedRenameSuggestions( plan: MigrationPlan, selectedSuggestions: ColumnRenameSuggestion[] @@ -75,7 +91,7 @@ export function applyExplicitTableRenames( type: 'create_database', key: dbKey, risk: 'safe', - sql: `CREATE DATABASE IF NOT EXISTS ${mapping.newDatabase};`, + sql: `CREATE DATABASE IF NOT EXISTS ${formatIdentifier(mapping.newDatabase)};`, }) createDatabaseOps.add(dbKey) } @@ -84,7 +100,7 @@ export function applyExplicitTableRenames( type: 'alter_table_rename_table', key: `table:${mapping.newDatabase}.${mapping.newName}:rename_table`, risk: 'caution', - sql: `RENAME TABLE ${mapping.oldDatabase}.${mapping.oldName} TO ${mapping.newDatabase}.${mapping.newName};`, + sql: `RENAME TABLE ${formatQualifiedName(mapping.oldDatabase, mapping.oldName)} TO ${formatQualifiedName(mapping.newDatabase, mapping.newName)};`, }) } @@ -129,7 +145,7 @@ export function buildExplicitColumnRenameSuggestions( : 'Explicitly confirmed by schema metadata (renamedFrom).', dropOperationKey, addOperationKey, - confirmationSQL: `ALTER TABLE ${mapping.database}.${mapping.table} RENAME COLUMN \`${mapping.from}\` TO \`${mapping.to}\`;`, + confirmationSQL: `ALTER TABLE ${formatQualifiedName(mapping.database, mapping.table)} RENAME COLUMN ${quoteIdentifier(mapping.from)} TO ${quoteIdentifier(mapping.to)};`, }) } diff --git a/packages/cli/src/bin/commands/migrate.test.ts b/packages/cli/src/bin/commands/migrate.test.ts new file mode 100644 index 0000000..cff3cd7 --- /dev/null +++ b/packages/cli/src/bin/commands/migrate.test.ts @@ -0,0 +1,34 @@ +import { describe, expect, test } from 'bun:test' +import { mkdtemp, rm, writeFile } from 'node:fs/promises' +import { join } from 'node:path' +import { tmpdir } from 'node:os' + +import { __testUtils } from './migrate.js' + +describe('migrate scoped selection', () => { + test('includes unannotated migrations as safety fallback', async () => { + const dir = await mkdtemp(join(tmpdir(), 'chkit-migrate-scope-')) + try { + await writeFile( + join(dir, '001_unannotated.sql'), + "ALTER TABLE app.events ADD COLUMN IF NOT EXISTS source String;" + ) + await writeFile( + join(dir, '002_annotated.sql'), + [ + '-- operation: alter_table_add_column key=table:app.events:column:source risk=safe', + "ALTER TABLE app.events ADD COLUMN IF NOT EXISTS source String;", + ].join('\n') + ) + + const result = await __testUtils.filterPendingByScope(dir, ['001_unannotated.sql', '002_annotated.sql'], new Set(['app.events'])) + + expect(result.files).toEqual(['001_unannotated.sql', '002_annotated.sql']) + expect(result.warnings).toHaveLength(1) + expect(result.warnings[0]).toContain('001_unannotated.sql') + } finally { + await rm(dir, { recursive: true, force: true }) + } + }) +}) + diff --git a/packages/cli/src/bin/commands/migrate.ts b/packages/cli/src/bin/commands/migrate.ts index be25881..0716ebc 100644 --- a/packages/cli/src/bin/commands/migrate.ts +++ b/packages/cli/src/bin/commands/migrate.ts @@ -71,13 +71,22 @@ async function filterPendingByScope( migrationsDir: string, pending: string[], selectedTables: ReadonlySet -): Promise { +): Promise<{ files: string[]; warnings: string[] }> { const selectedDatabases = new Set([...selectedTables].map((table) => table.split('.')[0] ?? '')) const inScope: string[] = [] + const warnings: string[] = [] for (const file of pending) { const sql = await readFile(join(migrationsDir, file), 'utf8') const operations = extractMigrationOperationSummaries(sql) + if (operations.length === 0) { + inScope.push(file) + warnings.push( + `Migration "${file}" has no operation metadata comments; included in scoped run as a safety fallback.` + ) + continue + } + const matches = operations.some((operation) => { const tableKey = tableKeyFromOperationKey(operation.key) if (tableKey) return selectedTables.has(tableKey) @@ -91,7 +100,11 @@ async function filterPendingByScope( if (matches) inScope.push(file) } - return inScope + return { files: inScope, warnings } +} + +export const __testUtils = { + filterPendingByScope, } async function cmdMigrate(ctx: CommandRunContext): Promise { @@ -161,9 +174,10 @@ async function cmdMigrate(ctx: CommandRunContext): Promise { return } - const pending = tableScope.enabled + const scopedSelection = tableScope.enabled ? await filterPendingByScope(migrationsDir, pendingAll, new Set(tableScope.matchedTables)) - : pendingAll + : { files: pendingAll, warnings: [] } + const pending = scopedSelection.files if (pending.length === 0) { if (jsonMode) { @@ -172,9 +186,13 @@ async function cmdMigrate(ctx: CommandRunContext): Promise { scope: tableScope, pending: [], applied: [], + warnings: scopedSelection.warnings.length > 0 ? scopedSelection.warnings : undefined, }) } else { console.log('No pending migrations.') + for (const warning of scopedSelection.warnings) { + console.warn(`Warning: ${warning}`) + } } return } @@ -186,7 +204,10 @@ async function cmdMigrate(ctx: CommandRunContext): Promise { } if (jsonMode && !executeRequested) { - emitJson('migrate', planned) + emitJson('migrate', { + ...planned, + warnings: scopedSelection.warnings.length > 0 ? scopedSelection.warnings : undefined, + }) return } @@ -197,6 +218,9 @@ async function cmdMigrate(ctx: CommandRunContext): Promise { } console.log(`Pending migrations: ${pending.length}`) for (const file of pending) console.log(`- ${file}`) + for (const warning of scopedSelection.warnings) { + console.warn(`Warning: ${warning}`) + } } let shouldExecute = executeRequested @@ -315,6 +339,7 @@ async function cmdMigrate(ctx: CommandRunContext): Promise { mode: 'execute', scope: tableScope, applied: appliedNow, + warnings: scopedSelection.warnings.length > 0 ? scopedSelection.warnings : undefined, }) return } diff --git a/packages/cli/src/bin/journal-store.ts b/packages/cli/src/bin/journal-store.ts index 464f250..07d38e4 100644 --- a/packages/cli/src/bin/journal-store.ts +++ b/packages/cli/src/bin/journal-store.ts @@ -10,6 +10,12 @@ export interface JournalStore { } const DEFAULT_JOURNAL_TABLE = '_chkit_migrations' +const SIMPLE_IDENTIFIER = /^[A-Za-z_][A-Za-z0-9_]*$/ + +function formatIdentifier(value: string): string { + if (SIMPLE_IDENTIFIER.test(value)) return value + return `\`${value.replace(/`/g, '``')}\`` +} function resolveJournalTableName(): string { const candidate = process.env.CHKIT_JOURNAL_TABLE?.trim() @@ -36,7 +42,7 @@ function isRetryableInsertRace(error: unknown): boolean { } export function createJournalStore(db: ClickHouseExecutor): JournalStore { - const journalTable = resolveJournalTableName() + const journalTable = formatIdentifier(resolveJournalTableName()) const createTableSql = `CREATE TABLE IF NOT EXISTS ${journalTable} ( name String, applied_at DateTime64(3, 'UTC'), diff --git a/packages/cli/src/drift.test.ts b/packages/cli/src/drift.test.ts index 27be4d6..f66eab9 100644 --- a/packages/cli/src/drift.test.ts +++ b/packages/cli/src/drift.test.ts @@ -184,6 +184,38 @@ describe('@chkit/cli drift comparer', () => { expect(result).toBeNull() }) + test('treats wrapped low-cardinality nullable column forms as equivalent', () => { + const expected = table({ + database: 'app', + name: 'users', + engine: 'MergeTree()', + columns: [ + { name: 'id', type: 'UInt64' }, + { name: 'region', type: 'LowCardinality(String)', nullable: true }, + ], + primaryKey: ['id'], + orderBy: ['id'], + }) + + const result = compareTableShape(expected, { + engine: 'MergeTree()', + primaryKey: '(id)', + orderBy: '(id)', + uniqueKey: undefined, + partitionBy: undefined, + columns: [ + { name: 'id', type: 'UInt64' }, + { name: 'region', type: 'LowCardinality(String)', nullable: true }, + ], + settings: {}, + indexes: [], + projections: [], + ttl: undefined, + }) + + expect(result).toBeNull() + }) + test('emits reason codes for semantic drift', () => { const expected = table({ database: 'app', diff --git a/packages/clickhouse/src/create-table-parser.ts b/packages/clickhouse/src/create-table-parser.ts index 2e39f0a..b218677 100644 --- a/packages/clickhouse/src/create-table-parser.ts +++ b/packages/clickhouse/src/create-table-parser.ts @@ -33,8 +33,13 @@ function extractCreateTableBody(createTableQuery: string | undefined): string | let stringQuote = "'" for (let i = openIndex; i < left.length; i += 1) { const char = left[i] + const next = left[i + 1] if (!char) continue if (inString) { + if (char === stringQuote && next === stringQuote) { + i += 1 + continue + } if (char === stringQuote && left[i - 1] !== '\\') { inString = false } diff --git a/packages/clickhouse/src/index.test.ts b/packages/clickhouse/src/index.test.ts index 107c5ec..afd2f40 100644 --- a/packages/clickhouse/src/index.test.ts +++ b/packages/clickhouse/src/index.test.ts @@ -11,6 +11,8 @@ import { parseSettingsFromCreateTableQuery, parseTTLFromCreateTableQuery, parseUniqueKeyFromCreateTableQuery, + parseSystemColumnType, + normalizeSkipIndexType, } from './index' describe('@chkit/clickhouse smoke', () => { @@ -99,4 +101,42 @@ ORDER BY id;` }, ]) }) + + test('extracts projection body when column defaults contain doubled quotes', () => { + const query = `CREATE TABLE app.events +( + id UInt64, + source String DEFAULT 'it''s fine', + PROJECTION p_by_source (SELECT source, count() GROUP BY source) +) +ENGINE = MergeTree() +ORDER BY id;` + + expect(parseProjectionsFromCreateTableQuery(query)).toEqual([ + { + name: 'p_by_source', + query: 'SELECT source, count() GROUP BY source', + }, + ]) + }) + + test('parses nullable wrappers for system column types without losing wrappers', () => { + expect(parseSystemColumnType('Nullable(String)')).toEqual({ + type: 'String', + nullable: true, + }) + expect(parseSystemColumnType('LowCardinality(Nullable(String))')).toEqual({ + type: 'LowCardinality(String)', + nullable: true, + }) + expect(parseSystemColumnType('Array(Nullable(String))')).toEqual({ + type: 'Array(Nullable(String))', + nullable: undefined, + }) + }) + + test('preserves unknown skip-index types from introspection', () => { + expect(normalizeSkipIndexType('set')).toBe('set') + expect(normalizeSkipIndexType('new_index_type_v2')).toBe('new_index_type_v2') + }) }) diff --git a/packages/clickhouse/src/index.ts b/packages/clickhouse/src/index.ts index dcf93aa..b64770e 100644 --- a/packages/clickhouse/src/index.ts +++ b/packages/clickhouse/src/index.ts @@ -74,6 +74,11 @@ export interface IntrospectedTable { ttl?: string } +export interface ParsedSystemColumnType { + type: string + nullable?: boolean +} + export { parseEngineFromCreateTableQuery, parseOrderByFromCreateTableQuery, @@ -92,38 +97,62 @@ export function inferSchemaKindFromEngine(engine: string): SchemaObjectRef['kind return 'table' } +function unwrapTypeWrapper(type: string, wrapper: string): string | null { + const trimmed = type.trim() + const prefix = `${wrapper}(` + if (!trimmed.startsWith(prefix) || !trimmed.endsWith(')')) return null + return trimmed.slice(prefix.length, -1).trim() +} + +export function parseSystemColumnType(type: string): ParsedSystemColumnType { + let normalizedType = type.trim() + let nullable = false + + const outerNullable = unwrapTypeWrapper(normalizedType, 'Nullable') + if (outerNullable) { + normalizedType = outerNullable + nullable = true + } + + const lowCardinality = unwrapTypeWrapper(normalizedType, 'LowCardinality') + if (lowCardinality) { + const lowCardinalityNullable = unwrapTypeWrapper(lowCardinality, 'Nullable') + if (lowCardinalityNullable) { + normalizedType = `LowCardinality(${lowCardinalityNullable})` + nullable = true + } + } + + return { + type: normalizedType, + nullable: nullable || undefined, + } +} function normalizeColumnFromSystemRow(row: SystemColumnRow): ColumnDefinition { - const nullableMatch = row.type.match(/^Nullable\((.+)\)$/) - const type = nullableMatch?.[1] ? nullableMatch[1] : row.type - const nullable = Boolean(nullableMatch?.[1]) + const normalizedType = parseSystemColumnType(row.type) let defaultValue: ColumnDefinition['default'] | undefined if (row.default_expression && row.default_kind === 'DEFAULT') { defaultValue = normalizeSQLFragment(row.default_expression) } return { name: row.name, - type, - nullable: nullable || undefined, + type: normalizedType.type, + nullable: normalizedType.nullable, default: defaultValue, comment: row.comment?.trim() || undefined, } } -function normalizeIndexType(value: string): SkipIndexDefinition['type'] { - if (value === 'minmax') return 'minmax' - if (value === 'set') return 'set' - if (value === 'bloom_filter') return 'bloom_filter' - if (value === 'tokenbf_v1') return 'tokenbf_v1' - if (value === 'ngrambf_v1') return 'ngrambf_v1' - return 'set' +export function normalizeSkipIndexType(value: string): SkipIndexDefinition['type'] { + return value.trim() } function normalizeIndexFromSystemRow(row: SystemSkippingIndexRow): SkipIndexDefinition { return { name: row.name, expression: normalizeSQLFragment(row.expr), - type: normalizeIndexType(row.type), + type: normalizeSkipIndexType(row.type), granularity: row.granularity, } } diff --git a/packages/core/src/conformance.test.ts b/packages/core/src/conformance.test.ts new file mode 100644 index 0000000..32ee377 --- /dev/null +++ b/packages/core/src/conformance.test.ts @@ -0,0 +1,75 @@ +import { describe, expect, test } from 'bun:test' + +import { + canonicalizeDefinitions, + formatIdentifier, + planDiff, + table, + toCreateSQL, + view, + materializedView, +} from './index.js' + +describe('core conformance', () => { + test('canonicalization is idempotent and planner is stable on canonical output', () => { + const definitions = [ + table({ + database: 'app', + name: 'events', + columns: [ + { name: 'id', type: 'UInt64' }, + { name: 'source', type: 'LowCardinality(String)' }, + { name: 'recorded_at', type: 'DateTime64(3)', default: 'fn:now64(3)' }, + ], + engine: 'MergeTree', + primaryKey: ['id'], + orderBy: ['id'], + settings: { + index_granularity: 8192, + min_rows_for_wide_part: 0, + }, + indexes: [ + { name: 'idx_source', expression: 'source', type: 'set', granularity: 1 }, + ], + }), + view({ + database: 'app', + name: 'events_view', + as: 'SELECT id, source FROM app.events', + }), + materializedView({ + database: 'app', + name: 'events_mv', + to: { database: 'app', name: 'events' }, + as: 'SELECT id, source FROM app.events', + }), + ] + + const canonicalA = canonicalizeDefinitions(definitions) + const canonicalB = canonicalizeDefinitions(canonicalA) + expect(canonicalB).toEqual(canonicalA) + + const plan = planDiff(canonicalA, canonicalB) + expect(plan.operations).toEqual([]) + expect(plan.riskSummary).toEqual({ safe: 0, caution: 0, danger: 0 }) + }) + + test('renders quoted identifiers for non-simple names', () => { + const sql = toCreateSQL( + table({ + database: 'analytics-prod', + name: 'event-log', + columns: [{ name: 'user id', type: 'String' }], + engine: 'MergeTree', + primaryKey: ['user id'], + orderBy: ['user id'], + }) + ) + + expect(sql).toContain('CREATE TABLE IF NOT EXISTS `analytics-prod`.`event-log`') + expect(sql).toContain('`user id` String') + expect(formatIdentifier('simple_name')).toBe('simple_name') + expect(formatIdentifier('bad-name')).toBe('`bad-name`') + }) +}) + diff --git a/packages/core/src/identifiers.ts b/packages/core/src/identifiers.ts new file mode 100644 index 0000000..a6cece6 --- /dev/null +++ b/packages/core/src/identifiers.ts @@ -0,0 +1,18 @@ +const SIMPLE_IDENTIFIER = /^[A-Za-z_][A-Za-z0-9_]*$/ + +export function quoteIdentifier(value: string): string { + return `\`${value.replace(/`/g, '``')}\`` +} + +export function formatIdentifier(value: string): string { + const trimmed = value.trim() + if (trimmed.length === 0) { + throw new Error('Identifier cannot be empty.') + } + if (SIMPLE_IDENTIFIER.test(trimmed)) return trimmed + return quoteIdentifier(trimmed) +} + +export function formatQualifiedName(database: string, name: string): string { + return `${formatIdentifier(database)}.${formatIdentifier(name)}` +} diff --git a/packages/core/src/index.ts b/packages/core/src/index.ts index 3bfcd33..47dc265 100644 --- a/packages/core/src/index.ts +++ b/packages/core/src/index.ts @@ -9,6 +9,7 @@ export { planDiff } from './planner.js' export { createSnapshot } from './snapshot.js' export { loadSchemaDefinitions, type SchemaLoaderOptions } from './schema-loader.js' export { splitTopLevelComma } from './key-clause.js' +export { formatIdentifier, formatQualifiedName } from './identifiers.js' export { normalizeEngine, normalizeSQLFragment } from './sql-normalizer.js' export { toCreateSQL } from './sql.js' export { assertValidDefinitions, validateDefinitions } from './validate.js' diff --git a/packages/core/src/model-types.ts b/packages/core/src/model-types.ts index 6d991d6..412ea74 100644 --- a/packages/core/src/model-types.ts +++ b/packages/core/src/model-types.ts @@ -29,10 +29,17 @@ export interface ColumnDefinition { comment?: string } +export type KnownSkipIndexType = + | 'minmax' + | 'set' + | 'bloom_filter' + | 'tokenbf_v1' + | 'ngrambf_v1' + export interface SkipIndexDefinition { name: string expression: string - type: 'minmax' | 'set' | 'bloom_filter' | 'tokenbf_v1' | 'ngrambf_v1' + type: KnownSkipIndexType | (string & {}) granularity: number } diff --git a/packages/core/src/planner.ts b/packages/core/src/planner.ts index e715aba..e3fde12 100644 --- a/packages/core/src/planner.ts +++ b/packages/core/src/planner.ts @@ -22,6 +22,7 @@ import { renderAlterResetSetting, toCreateSQL, } from './sql.js' +import { formatIdentifier, formatQualifiedName, quoteIdentifier } from './identifiers.js' import { assertValidDefinitions } from './validate.js' function createMap(definitions: SchemaDefinition[]): Map { @@ -38,7 +39,7 @@ function pushDropOperation( type: 'drop_table', key: definitionKey(def), risk, - sql: `DROP TABLE IF EXISTS ${def.database}.${def.name};`, + sql: `DROP TABLE IF EXISTS ${formatQualifiedName(def.database, def.name)};`, }) return } @@ -47,7 +48,7 @@ function pushDropOperation( type: 'drop_view', key: definitionKey(def), risk, - sql: `DROP VIEW IF EXISTS ${def.database}.${def.name};`, + sql: `DROP VIEW IF EXISTS ${formatQualifiedName(def.database, def.name)};`, }) return } @@ -55,7 +56,7 @@ function pushDropOperation( type: 'drop_materialized_view', key: definitionKey(def), risk, - sql: `DROP TABLE IF EXISTS ${def.database}.${def.name} SYNC;`, + sql: `DROP TABLE IF EXISTS ${formatQualifiedName(def.database, def.name)} SYNC;`, }) } @@ -99,7 +100,7 @@ function pushCreateDatabaseOperation( type: 'create_database', key: `database:${database}`, risk, - sql: `CREATE DATABASE IF NOT EXISTS ${database};`, + sql: `CREATE DATABASE IF NOT EXISTS ${formatIdentifier(database)};`, }) } @@ -137,7 +138,7 @@ function normalizeColumn(column: ColumnDefinition): Omit `\`${column}\``) + .map((column) => quoteIdentifier(column)) .join(', ') } @@ -34,10 +35,10 @@ function renderTableSQL(def: TableDefinition): string { const columns = def.columns.map(renderColumn) const indexes = (def.indexes ?? []).map( (idx) => - `INDEX \`${idx.name}\` (${idx.expression}) TYPE ${idx.type} GRANULARITY ${idx.granularity}` + `INDEX ${quoteIdentifier(idx.name)} (${idx.expression}) TYPE ${idx.type} GRANULARITY ${idx.granularity}` ) const projections = (def.projections ?? []).map( - (projection) => `PROJECTION \`${projection.name}\` (${projection.query})` + (projection) => `PROJECTION ${quoteIdentifier(projection.name)} (${projection.query})` ) const body = [...columns, ...indexes, ...projections].join(',\n ') @@ -58,15 +59,15 @@ function renderTableSQL(def: TableDefinition): string { } if (def.comment) clauses.push(`COMMENT '${def.comment.replace(/'/g, "''")}'`) - return `CREATE TABLE IF NOT EXISTS ${def.database}.${def.name}\n(\n ${body}\n) ENGINE = ${def.engine}\n${clauses.join('\n')};` + return `CREATE TABLE IF NOT EXISTS ${formatQualifiedName(def.database, def.name)}\n(\n ${body}\n) ENGINE = ${def.engine}\n${clauses.join('\n')};` } function renderViewSQL(def: ViewDefinition): string { - return `CREATE VIEW IF NOT EXISTS ${def.database}.${def.name} AS\n${def.as};` + return `CREATE VIEW IF NOT EXISTS ${formatQualifiedName(def.database, def.name)} AS\n${def.as};` } function renderMaterializedViewSQL(def: MaterializedViewDefinition): string { - return `CREATE MATERIALIZED VIEW IF NOT EXISTS ${def.database}.${def.name} TO ${def.to.database}.${def.to.name} AS\n${def.as};` + return `CREATE MATERIALIZED VIEW IF NOT EXISTS ${formatQualifiedName(def.database, def.name)} TO ${formatQualifiedName(def.to.database, def.to.name)} AS\n${def.as};` } export function toCreateSQL(def: SchemaDefinition): string { @@ -77,31 +78,31 @@ export function toCreateSQL(def: SchemaDefinition): string { } export function renderAlterAddColumn(def: TableDefinition, column: ColumnDefinition): string { - return `ALTER TABLE ${def.database}.${def.name} ADD COLUMN IF NOT EXISTS ${renderColumn(column)};` + return `ALTER TABLE ${formatQualifiedName(def.database, def.name)} ADD COLUMN IF NOT EXISTS ${renderColumn(column)};` } export function renderAlterModifyColumn(def: TableDefinition, column: ColumnDefinition): string { - return `ALTER TABLE ${def.database}.${def.name} MODIFY COLUMN ${renderColumn(column)};` + return `ALTER TABLE ${formatQualifiedName(def.database, def.name)} MODIFY COLUMN ${renderColumn(column)};` } export function renderAlterDropColumn(def: TableDefinition, columnName: string): string { - return `ALTER TABLE ${def.database}.${def.name} DROP COLUMN IF EXISTS \`${columnName}\`;` + return `ALTER TABLE ${formatQualifiedName(def.database, def.name)} DROP COLUMN IF EXISTS ${quoteIdentifier(columnName)};` } export function renderAlterAddIndex(def: TableDefinition, index: SkipIndexDefinition): string { - return `ALTER TABLE ${def.database}.${def.name} ADD INDEX IF NOT EXISTS \`${index.name}\` (${index.expression}) TYPE ${index.type} GRANULARITY ${index.granularity};` + return `ALTER TABLE ${formatQualifiedName(def.database, def.name)} ADD INDEX IF NOT EXISTS ${quoteIdentifier(index.name)} (${index.expression}) TYPE ${index.type} GRANULARITY ${index.granularity};` } export function renderAlterDropIndex(def: TableDefinition, indexName: string): string { - return `ALTER TABLE ${def.database}.${def.name} DROP INDEX IF EXISTS \`${indexName}\`;` + return `ALTER TABLE ${formatQualifiedName(def.database, def.name)} DROP INDEX IF EXISTS ${quoteIdentifier(indexName)};` } export function renderAlterAddProjection(def: TableDefinition, projection: { name: string; query: string }): string { - return `ALTER TABLE ${def.database}.${def.name} ADD PROJECTION IF NOT EXISTS \`${projection.name}\` (${projection.query});` + return `ALTER TABLE ${formatQualifiedName(def.database, def.name)} ADD PROJECTION IF NOT EXISTS ${quoteIdentifier(projection.name)} (${projection.query});` } export function renderAlterDropProjection(def: TableDefinition, projectionName: string): string { - return `ALTER TABLE ${def.database}.${def.name} DROP PROJECTION IF EXISTS \`${projectionName}\`;` + return `ALTER TABLE ${formatQualifiedName(def.database, def.name)} DROP PROJECTION IF EXISTS ${quoteIdentifier(projectionName)};` } export function renderAlterModifySetting( @@ -109,16 +110,16 @@ export function renderAlterModifySetting( key: string, value: string | number | boolean ): string { - return `ALTER TABLE ${def.database}.${def.name} MODIFY SETTING ${key} = ${value};` + return `ALTER TABLE ${formatQualifiedName(def.database, def.name)} MODIFY SETTING ${formatIdentifier(key)} = ${value};` } export function renderAlterResetSetting(def: TableDefinition, key: string): string { - return `ALTER TABLE ${def.database}.${def.name} RESET SETTING ${key};` + return `ALTER TABLE ${formatQualifiedName(def.database, def.name)} RESET SETTING ${formatIdentifier(key)};` } export function renderAlterModifyTTL(def: TableDefinition, ttl: string | undefined): string { if (ttl === undefined) { - return `ALTER TABLE ${def.database}.${def.name} REMOVE TTL;` + return `ALTER TABLE ${formatQualifiedName(def.database, def.name)} REMOVE TTL;` } - return `ALTER TABLE ${def.database}.${def.name} MODIFY TTL ${ttl};` + return `ALTER TABLE ${formatQualifiedName(def.database, def.name)} MODIFY TTL ${ttl};` } diff --git a/packages/plugin-backfill/src/planner.test.ts b/packages/plugin-backfill/src/planner.test.ts index edb9a3e..68b7471 100644 --- a/packages/plugin-backfill/src/planner.test.ts +++ b/packages/plugin-backfill/src/planner.test.ts @@ -505,4 +505,25 @@ describe('injectTimeFilter', () => { // Inner WHERE must remain intact expect(result).toContain('WHERE inner = 1') }) + + test('handles SQL-escaped single quotes while scanning for keywords', () => { + const query = "SELECT * FROM app.events WHERE message = 'it''s fine' ORDER BY ts" + const result = injectTimeFilter(query, 'event_time', from, to) + + expect(result).toContain("message = 'it''s fine'") + expect(result).toContain("AND event_time >= parseDateTimeBestEffort('") + expect(result).toContain("AND event_time < parseDateTimeBestEffort('") + }) + + test('ignores keywords inside comments while choosing insertion point', () => { + const query = `SELECT id +FROM app.events +-- ORDER BY fake_col +/* WHERE fake = 1 */ +GROUP BY id` + const result = injectTimeFilter(query, 'event_time', from, to) + + expect(result).toContain("WHERE event_time >= parseDateTimeBestEffort('") + expect(result.indexOf('WHERE event_time')).toBeLessThan(result.indexOf('GROUP BY id')) + }) }) diff --git a/packages/plugin-backfill/src/planner.ts b/packages/plugin-backfill/src/planner.ts index 0466744..f62a961 100644 --- a/packages/plugin-backfill/src/planner.ts +++ b/packages/plugin-backfill/src/planner.ts @@ -22,6 +22,18 @@ import type { NormalizedBackfillPluginOptions, } from './types.js' +const SIMPLE_IDENTIFIER = /^[A-Za-z_][A-Za-z0-9_]*$/ + +function formatIdentifier(value: string): string { + const trimmed = value.trim() + if (SIMPLE_IDENTIFIER.test(trimmed)) return trimmed + return `\`${trimmed.replace(/`/g, '``')}\`` +} + +function formatQualifiedName(database: string, name: string): string { + return `${formatIdentifier(database)}.${formatIdentifier(name)}` +} + function ensureHoursWithinLimits(input: { from: string to: string @@ -49,6 +61,12 @@ function buildSettingsClause(token: string): string { return `SETTINGS async_insert=0` } +function formatTargetReference(target: string): string { + const [database, table, ...rest] = target.split('.') + if (!database || !table || rest.length > 0) return target + return formatQualifiedName(database, table) +} + /** * Inject a time-range filter directly into a SQL query. * @@ -73,19 +91,79 @@ export function injectTimeFilter( type KWHit = { keyword: string; position: number } const hits: KWHit[] = [] let depth = 0 + let inSingleQuote = false + let inDoubleQuote = false + let inBacktick = false + let inLineComment = false + let inBlockComment = false for (let i = 0; i < trimmed.length; i++) { const ch = trimmed[i] - if (ch === '(') { depth++; continue } - if (ch === ')') { depth--; continue } - if (ch === "'") { - i++ - while (i < trimmed.length && trimmed[i] !== "'") { - if (trimmed[i] === '\\') i++ - i++ + const next = trimmed[i + 1] + if (!ch) continue + + if (inLineComment) { + if (ch === '\n') inLineComment = false + continue + } + + if (inBlockComment) { + if (ch === '*' && next === '/') { + inBlockComment = false + i += 1 } continue } + + if (!inSingleQuote && !inDoubleQuote && !inBacktick) { + if (ch === '-' && next === '-') { + inLineComment = true + i += 1 + continue + } + if (ch === '/' && next === '*') { + inBlockComment = true + i += 1 + continue + } + } + + if (inSingleQuote) { + if (ch === "'" && next === "'") { + i += 1 + continue + } + if (ch === "'") inSingleQuote = false + continue + } + if (inDoubleQuote) { + if (ch === '"' && next === '"') { + i += 1 + continue + } + if (ch === '"') inDoubleQuote = false + continue + } + if (inBacktick) { + if (ch === '`') inBacktick = false + continue + } + + if (ch === "'") { + inSingleQuote = true + continue + } + if (ch === '"') { + inDoubleQuote = true + continue + } + if (ch === '`') { + inBacktick = true + continue + } + + if (ch === '(') { depth++; continue } + if (ch === ')') { depth = Math.max(0, depth - 1); continue } if (depth !== 0) continue // Must be preceded by whitespace or be at start @@ -247,21 +325,23 @@ function buildChunkSqlTemplate(chunk: { }): string { const header = `/* chkit backfill plan=${chunk.planId} chunk=${chunk.chunkId} token=${chunk.token} */` const settings = buildSettingsClause(chunk.token) + const formattedTarget = formatTargetReference(chunk.target) if (chunk.mvAsQuery) { const filtered = injectTimeFilter(chunk.mvAsQuery, chunk.timeColumn, chunk.from, chunk.to) if (chunk.targetColumns?.length) { const reordered = rewriteSelectColumns(filtered, chunk.targetColumns) - return [header, `INSERT INTO ${chunk.target}`, reordered, settings].join('\n') + const insertClause = `INSERT INTO ${formattedTarget} (${chunk.targetColumns.map((name) => formatIdentifier(name)).join(', ')})` + return [header, insertClause, reordered, settings].join('\n') } - return [header, `INSERT INTO ${chunk.target}`, filtered, settings].join('\n') + return [header, `INSERT INTO ${formattedTarget}`, filtered, settings].join('\n') } return [ header, - `INSERT INTO ${chunk.target}`, + `INSERT INTO ${formattedTarget}`, `SELECT *`, - `FROM ${chunk.target}`, + `FROM ${formattedTarget}`, `WHERE ${chunk.timeColumn} >= parseDateTimeBestEffort('${chunk.from}')`, ` AND ${chunk.timeColumn} < parseDateTimeBestEffort('${chunk.to}')`, settings, diff --git a/packages/plugin-pull/src/index.ts b/packages/plugin-pull/src/index.ts index 75d88ae..1ca9c57 100644 --- a/packages/plugin-pull/src/index.ts +++ b/packages/plugin-pull/src/index.ts @@ -298,9 +298,13 @@ async function pullSchema(input: { if (usesDefaultIntrospector || selectedDatabases.length === 0) { const db = createClickHouseExecutor(input.config.clickhouse) - objects = await db.listSchemaObjects() - if (selectedDatabases.length === 0) { - selectedDatabases = [...new Set(objects.map((item) => item.database))].sort() + try { + objects = await db.listSchemaObjects() + if (selectedDatabases.length === 0) { + selectedDatabases = [...new Set(objects.map((item) => item.database))].sort() + } + } finally { + await db.close() } } @@ -329,12 +333,16 @@ async function defaultIntrospector(input: { databases: string[] }): Promise { const db = createClickHouseExecutor(input.config) - const tables = await db.listTableDetails(input.databases) - const nonTableRows = await listNonTableRows(db, input.databases) - const nonTableObjects = nonTableRows - .map(mapSystemTableRowToDefinition) - .filter((definition): definition is Exclude => definition !== null) - return [...tables.map((table) => ({ kind: 'table' as const, ...table })), ...nonTableObjects] + try { + const tables = await db.listTableDetails(input.databases) + const nonTableRows = await listNonTableRows(db, input.databases) + const nonTableObjects = nonTableRows + .map(mapSystemTableRowToDefinition) + .filter((definition): definition is Exclude => definition !== null) + return [...tables.map((table) => ({ kind: 'table' as const, ...table })), ...nonTableObjects] + } finally { + await db.close() + } } function mapIntrospectedTableToDefinition(table: IntrospectedTable): TableDefinition {