diff --git a/cmd/genddl/config_enhanced_ingestion.json.example b/cmd/genddl/config_enhanced_ingestion.json.example new file mode 100644 index 00000000..0818ccab --- /dev/null +++ b/cmd/genddl/config_enhanced_ingestion.json.example @@ -0,0 +1,18 @@ +{ + "mode": "enhanced_ingestion", + "scale_factor": "1000", + "file_format": "parquet", + "compression_method": "uncompressed", + "workload": "tpcds", + "workload_definition": "tpc-ds", + "iceberg": true, + "source_file_format": "CSV", + "source_catalog": "source_catalog_name", + "target_catalog": "target_catalog_name", + "source_schema": "source_schema_name", + "target_schema": "target_schema_name", + "s3_source_location": "s3a://bucket-name/path/to/source/data", + "s3_target_location": "s3a://bucket-name/path/to/target/data", + "engine": "presto", + "session_variables": {} +} \ No newline at end of file diff --git a/cmd/genddl/create_source_table.sql.tmpl b/cmd/genddl/create_source_table.sql.tmpl new file mode 100644 index 00000000..74bacbfc --- /dev/null +++ b/cmd/genddl/create_source_table.sql.tmpl @@ -0,0 +1,45 @@ +{{- /*gotype:pbench/cmd/genddl.Schema*/ -}} +{{- /* This template is used for ENHANCED_INGESTION mode - creates source tables */ -}} +{{- if .SourceSchema }} +{{- if .SourceCatalog }} +CREATE SCHEMA IF NOT EXISTS {{ .SourceCatalog }}.{{ .SourceSchema }} +WITH ( + location = '{{ .S3SourceLocation }}/' +); + +USE {{ .SourceCatalog }}.{{ .SourceSchema }}; +{{- else }} +CREATE SCHEMA IF NOT EXISTS hive.{{ .SourceSchema }} +WITH ( + location = '{{ .S3SourceLocation }}/' +); + +USE hive.{{ .SourceSchema }}; +{{- end }} + +{{ range .Tables }} +CREATE TABLE IF NOT EXISTS {{ .Name }} ( +{{- $first := true }} +{{- range .Columns }} + {{- if $first }} + {{- $first = false }} + {{- else }}, + {{- end }} + {{- if eq $.SourceFileFormat "CSV" }} + {{ .Name }} varchar + {{- else }} + {{ .Name }} {{ .Type }} + {{- end }} +{{- end }}) +WITH ( + external_location = '{{ $.S3SourceLocation }}/{{ .Name }}/', + {{- if eq $.SourceFileFormat "CSV" }} + format = 'CSV', + csv_separator = '|' + {{- else }} + format = 'TEXTFILE', + textfile_field_delim = '|' + {{- end }} +); +{{ end }} +{{- end }} \ No newline at end of file diff --git a/cmd/genddl/create_target_table.sql.tmpl b/cmd/genddl/create_target_table.sql.tmpl new file mode 100644 index 00000000..6c93d219 --- /dev/null +++ b/cmd/genddl/create_target_table.sql.tmpl @@ -0,0 +1,61 @@ +{{- /*gotype:pbench/cmd/genddl.Schema*/ -}} +{{- /* This template is used for ENHANCED_INGESTION mode - creates target tables */ -}} +{{- if .TargetSchema }} +{{- if .TargetCatalog }} +CREATE SCHEMA IF NOT EXISTS {{ .TargetCatalog }}.{{ .TargetSchema }} +WITH ( + location = '{{ .S3TargetLocation }}/' +); + +USE {{ .TargetCatalog }}.{{ .TargetSchema }}; +{{- else }} +{{- if .Iceberg }} +CREATE SCHEMA IF NOT EXISTS iceberg.{{ .TargetSchema }} +WITH ( + location = '{{ .S3TargetLocation }}/' +); + +USE iceberg.{{ .TargetSchema }}; +{{- else }} +CREATE SCHEMA IF NOT EXISTS hive.{{ .TargetSchema }} +WITH ( + location = '{{ .S3TargetLocation }}/' +); + +USE hive.{{ .TargetSchema }}; +{{- end }} +{{- end }} + +{{ range .Tables }} +CREATE TABLE IF NOT EXISTS {{ .Name }} ( +{{- $first := true }} +{{- range .Columns }} + {{- if $first }} + {{- $first = false }} + {{- else -}} + , + {{- end }} + {{ .Name }} {{ .Type }} +{{- end }} +) +{{- if eq $.Engine "spark" }} +USING iceberg +{{- if .Partitioned }} +PARTITIONED BY ({{ .LastColumn.Name }}) +{{- end }} +TBLPROPERTIES ('write.target-file-size-bytes' = 268435456, 'write.parquet.row-group-size-bytes' = 67108864) +LOCATION '{{ $.S3TargetLocation }}/{{ .Name }}'; +{{- else }} +WITH ( + format = 'PARQUET' + {{- if $.Partitioned }} + {{- if $.Iceberg}} + , partitioning = array['{{ .LastColumn.Name }}'] + {{- else if .Partitioned }} + , partitioned_by = array['{{ .LastColumn.Name }}'] + {{- end }} + {{- end }} +); +{{- end }} +{{ end }} +{{- end }} \ No newline at end of file diff --git a/cmd/genddl/insert_table.sql.tmpl b/cmd/genddl/insert_table.sql.tmpl index 360b4e7c..52c08280 100644 --- a/cmd/genddl/insert_table.sql.tmpl +++ b/cmd/genddl/insert_table.sql.tmpl @@ -3,6 +3,48 @@ SET SESSION {{ $key }}='{{ $value }}'; {{ end }} +{{- if .TargetSchema }} +{{- /* ENHANCED_INGESTION MODE */ -}} +{{- if .TargetCatalog }} +USE {{ .TargetCatalog }}.{{ .TargetSchema }}; +{{- else }} +{{- if .Iceberg }} +USE iceberg.{{ .TargetSchema }}; +{{- else }} +USE hive.{{ .TargetSchema }}; +{{- end }} +{{- end }} + +{{ range .InsertTables -}} +{{- if $.TargetCatalog }} +INSERT INTO {{ $.TargetCatalog }}.{{ $.TargetSchema }}.{{ .Name }} +{{- else }} +INSERT INTO {{ .Name }} +{{- end }} +{{- if eq $.SourceFileFormat "TEXTFILE" }} +SELECT * FROM {{ if $.SourceCatalog }}{{ $.SourceCatalog }}.{{ $.SourceSchema }}.{{ .Name }}{{ else }}hive.{{ $.SourceSchema }}.{{ .Name }}{{ end }}; +{{- else }} +SELECT +{{- $first := true }} +{{- range .Columns }} + {{- if $first }} + {{- $first = false }} + {{- else }}, + {{- end }} + {{- if eq $.SourceFileFormat "CSV" }} + {{- if .IsVarchar }} + NULLIF({{ .Name }}, '') AS {{ .Name }} + {{- else }} + CAST(NULLIF({{ .Name }}, '') AS {{ .Type }}) AS {{ .Name }} + {{- end }} + {{- end }} +{{- end }} +FROM {{ if $.SourceCatalog }}{{ $.SourceCatalog }}.{{ $.SourceSchema }}.{{ .Name }}{{ else }}hive.{{ $.SourceSchema }}.{{ .Name }}{{ end }}; +{{- end }} + +{{ end }} +{{- else }} +{{- /* LEGACY MODE (old workflow) */ -}} {{- if .Iceberg }} USE iceberg.{{ .SchemaName }}; {{- else }} @@ -31,6 +73,8 @@ SELECT * FROM {{ if $.Iceberg }}iceberg{{ else }}hive{{ end }}.{{ $.Uncompressed {{- end }} {{ end }} +{{- end }} + {{- range .InsertTables -}} ANALYZE {{ .Name }}; {{ end -}} diff --git a/cmd/genddl/main.go b/cmd/genddl/main.go index 346f8709..97f92ae9 100644 --- a/cmd/genddl/main.go +++ b/cmd/genddl/main.go @@ -31,6 +31,17 @@ type Schema struct { Tables map[string]*Table `json:"tables"` InsertTables map[string]*Table `json:"insert_tables"` SessionVariables map[string]string `json:"session_variables"` + + // Enhanced ingestion workflow fields + Mode string `json:"mode"` // "legacy", "ingestion", or "enhanced_ingestion" + S3SourceLocation string `json:"s3_source_location"` // S3 path for source data + S3TargetLocation string `json:"s3_target_location"` // S3 path for target data + SourceSchema string `json:"source_schema"` // Source schema name + TargetSchema string `json:"target_schema"` // Target schema name + SourceFileFormat string `json:"source_file_format"` // "CSV" or "TEXTFILE" + SourceCatalog string `json:"source_catalog"` // Source catalog name (optional) + TargetCatalog string `json:"target_catalog"` // Target catalog name (optional) + Engine string `json:"engine"` // "presto" or "spark" } type Column struct { @@ -54,6 +65,12 @@ type RegisterTable struct { ExternalLocation *string } +// isEnhancedIngestionMode returns true if this schema should use the enhanced ingestion workflow +// (source tables + target tables + INSERT statements with catalog support and CSV/TEXTFILE handling) +func (s *Schema) isEnhancedIngestionMode() bool { + return s.Mode == "enhanced_ingestion" +} + func Run(_ *cobra.Command, args []string) { // Reset template cache for each invocation. templateCache = make(map[string]*template.Template) @@ -140,8 +157,12 @@ func generateSchemaFromDef(schema *Schema, defDir string, configDir string, outp registerTable.ExternalLocation = externalLoc schema.RegisterTables = append(schema.RegisterTables, ®isterTable) } else { - tbl.reorderColumns(schema) // Move PartitionKey columns to the bottom - tbl.LastColumn = tbl.Columns[len(tbl.Columns)-1] + // Only reorder columns for Hive tables (Iceberg doesn't require partition columns at end) + if !schema.Iceberg { + tbl.reorderColumns(schema) + } + // Set LastColumn for template rendering (finds partition key or uses last column) + tbl.setLastColumn(schema) schema.Tables[tbl.Name] = tbl } if isInsertTable(tbl, schema) { @@ -161,15 +182,22 @@ func generateSchemaFromDef(schema *Schema, defDir string, configDir string, outp func generateCreateTable(schema *Schema, currDir string, outputDirs []string, step int) { genSubSteps := !schema.Iceberg && schema.Partitioned - tName := "create_table.sql.tmpl" - var fName string - if genSubSteps { - // If there are sub-tasks, prefix the first output file with step a - fName = strconv.Itoa(step+1) + "a-create-" + schema.LocationName + ".sql" + if schema.isEnhancedIngestionMode() { + // Enhanced ingestion mode: generate separate source and target table files + generateSourceTable(schema, currDir, outputDirs, step) + generateTargetTable(schema, currDir, outputDirs, step) } else { - fName = strconv.Itoa(step+1) + "-create-" + schema.LocationName + ".sql" + // Legacy mode: single create table file + tName := "create_table.sql.tmpl" + var fName string + if genSubSteps { + // If there are sub-tasks, prefix the first output file with step a + fName = strconv.Itoa(step+1) + "a-create-" + schema.LocationName + ".sql" + } else { + fName = strconv.Itoa(step+1) + "-create-" + schema.LocationName + ".sql" + } + execTemplate(schema, getTemplate(tName, currDir), fName, outputDirs) } - execTemplate(schema, getTemplate(tName, currDir), fName, outputDirs) if genSubSteps { generateAwsS3Mv(schema, currDir, outputDirs, step) // Generate step b @@ -178,6 +206,20 @@ func generateCreateTable(schema *Schema, currDir string, outputDirs []string, st } } +// generateSourceTable generates source table DDL for enhanced ingestion mode +func generateSourceTable(schema *Schema, currDir string, outputDirs []string, step int) { + tName := "create_source_table.sql.tmpl" + fName := strconv.Itoa(step+1) + "a-create-source-" + schema.LocationName + ".sql" + execTemplate(schema, getTemplate(tName, currDir), fName, outputDirs) +} + +// generateTargetTable generates target table DDL for enhanced ingestion mode +func generateTargetTable(schema *Schema, currDir string, outputDirs []string, step int) { + tName := "create_target_table.sql.tmpl" + fName := strconv.Itoa(step+1) + "b-create-target-" + schema.LocationName + ".sql" + execTemplate(schema, getTemplate(tName, currDir), fName, outputDirs) +} + func generateInsertTable(schema *Schema, currDir string, outputDirs []string, step int) { tName := "insert_table.sql.tmpl" fName := strconv.Itoa(step+1) + "-insert-" + schema.LocationName + ".sql" @@ -265,10 +307,20 @@ func cleanOutputDir(dir string) error { } func (s *Schema) shouldGenInsert() bool { + // Enhanced ingestion mode always generates inserts + if s.isEnhancedIngestionMode() { + return true + } + // Legacy mode: only generate inserts for Iceberg return s.Iceberg } func isRegisterTable(table *Table, schema *Schema) bool { + // Enhanced ingestion mode doesn't use register tables + if schema.isEnhancedIngestionMode() { + return false + } + // Legacy mode: register non-partitioned tables in Iceberg partitioned schemas if schema.Iceberg && schema.Partitioned { return !table.Partitioned } @@ -276,6 +328,11 @@ func isRegisterTable(table *Table, schema *Schema) bool { } func isInsertTable(table *Table, schema *Schema) bool { + // Enhanced ingestion mode: all tables get inserts + if schema.isEnhancedIngestionMode() { + return true + } + // Legacy mode: partitioned tables in partitioned schemas, or all tables in non-partitioned if schema.Partitioned { return table.Partitioned } @@ -283,14 +340,26 @@ func isInsertTable(table *Table, schema *Schema) bool { } func getNamedOutput(configData []byte, workload string) (string, error) { - var config map[string]string + // Use interface{} to handle both string and boolean values in config + var config map[string]interface{} if err := json.Unmarshal(configData, &config); err != nil { return "", err } - scaleFactor := config["scale_factor"] - fileFormat := config["file_format"] - compressionMethod := config["compression_method"] + // Extract string values with type assertion (these are always strings in config) + scaleFactor, ok := config["scale_factor"].(string) + if !ok { + return "", fmt.Errorf("scale_factor must be a string") + } + fileFormat, ok := config["file_format"].(string) + if !ok { + return "", fmt.Errorf("file_format must be a string") + } + compressionMethod, ok := config["compression_method"].(string) + if !ok { + return "", fmt.Errorf("compression_method must be a string") + } + var compressionSuffix string if compressionMethod == "uncompressed" { compressionSuffix = "" @@ -323,15 +392,38 @@ func loadSchemas(data []byte) ([]*Schema, error) { if base.WorkloadDefinition == "" { base.WorkloadDefinition = "tpc-ds" } + // Default engine for enhanced ingestion mode + if base.Engine == "" && base.isEnhancedIngestionMode() { + base.Engine = "presto" + } - combinations := []struct { + // In enhanced ingestion mode, detect catalog type and generate only for that type + var combinations []struct { Iceberg bool Partitioned bool - }{ - {true, false}, - {true, true}, - {false, false}, - {false, true}, + } + + if base.isEnhancedIngestionMode() { + // Use the iceberg property from config to determine catalog type + // Generate only for the specified catalog type, with both partitioned variants + combinations = []struct { + Iceberg bool + Partitioned bool + }{ + {base.Iceberg, false}, + {base.Iceberg, true}, + } + } else { + // Legacy mode: generate all 4 combinations + combinations = []struct { + Iceberg bool + Partitioned bool + }{ + {true, false}, + {true, true}, + {false, false}, + {false, true}, + } } for _, c := range combinations { @@ -395,6 +487,24 @@ func (t *Table) reorderColumns(s *Schema) { } } +// setLastColumn sets the LastColumn field for template rendering. +// For partitioned tables, it finds the column marked with partition_key=true. +// For non-partitioned tables or if no partition key is found, it uses the last column. +// This works for both Iceberg (partition columns can be anywhere) and Hive (after reordering). +func (t *Table) setLastColumn(s *Schema) { + // Look for partition key column + for _, col := range t.Columns { + if col.PartitionKey != nil && *col.PartitionKey { + t.LastColumn = col + return + } + } + // Fallback to last column if no partition key found + if len(t.Columns) > 0 { + t.LastColumn = t.Columns[len(t.Columns)-1] + } +} + func (s *Schema) setSessionVars() { s.SessionVariables["query_max_execution_time"] = "12h" s.SessionVariables["query_max_run_time"] = "12h"