diff --git a/README.md b/README.md index cb26d17..ff64c0c 100644 --- a/README.md +++ b/README.md @@ -196,6 +196,19 @@ fe8e2a3c-f91a-11ee-9812-82f5834c1ba7:1-46602355 **Note**: Remember to prepend the prefix `MySQL56/` onto your starting GTIDs. +### 3b. Metadata and delete capture +Each emitted record includes a `_planetscale_operation` field with one of: +- `insert` +- `update` +- `delete` + +When `include_metadata` is enabled, each emitted record also includes a `_planetscale_metadata` object with: +- `vgtid_position`: the Vitess VGTID replication position +- `extracted_at`: the extraction timestamp in nanoseconds +- `sequence_number`: a monotonically increasing per-sync sequence number + +Delete events are only emitted when `capture_deletes` is enabled. + ## Interpreting logs Airbyte logs will include logs from the source (this library), the Airbyte connector, and the destination. All source logs will be prefixed with `[source]` and `PlanetScale Source ::`. @@ -235,5 +248,3 @@ Vitess cells that the Airbyte source will sync from. `Finished reading records for table []` Airbyte source has detected that the client has ended its connection. `recordCount` records were counted as having been sent to connector for table `table`. - - diff --git a/cmd/airbyte-source/check.go b/cmd/airbyte-source/check.go index c06b9b5..b34a94c 100644 --- a/cmd/airbyte-source/check.go +++ b/cmd/airbyte-source/check.go @@ -69,6 +69,9 @@ func parseSource(reader FileReader, configFilePath string) (internal.PlanetScale if err = json.Unmarshal(contents, &psc); err != nil { return psc, err } + if err = psc.Validate(); err != nil { + return psc, err + } return psc, nil } diff --git a/cmd/airbyte-source/check_test.go b/cmd/airbyte-source/check_test.go index 208c4ac..7bdcd4f 100644 --- a/cmd/airbyte-source/check_test.go +++ b/cmd/airbyte-source/check_test.go @@ -45,6 +45,46 @@ func TestCheckInvalidCatalogJSON(t *testing.T) { assert.Equal(t, "FAILED", amsg.ConnectionStatus.Status) } +func TestParseSource_AllowsCaptureDeletesWithoutMetadata(t *testing.T) { + tfr := testFileReader{ + content: []byte("{\"host\":\"something.us-east-3.psdb.cloud\",\"database\":\"database\",\"username\":\"username\",\"password\":\"password\",\"capture_deletes\":true}"), + } + + psc, err := parseSource(tfr, "source.json") + require.NoError(t, err) + assert.True(t, psc.CaptureDeletes) + assert.False(t, psc.IncludeMetadata) +} + +func TestCheckAllowsCaptureDeletesWithoutMetadata(t *testing.T) { + tfr := testFileReader{ + content: []byte("{\"host\":\"something.us-east-3.psdb.cloud\",\"database\":\"database\",\"username\":\"username\",\"password\":\"password\",\"capture_deletes\":true}"), + } + + td := testDatabase{ + connectResponse: canConnectResponse{ + err: nil, + }, + } + + checkCommand := CheckCommand(&Helper{ + Database: td, + FileReader: tfr, + Logger: internal.NewLogger(os.Stdout), + }) + b := bytes.NewBufferString("") + checkCommand.SetOut(b) + assert.NoError(t, checkCommand.Flag("config").Value.Set("catalog.json")) + assert.NoError(t, checkCommand.Execute()) + + var amsg internal.AirbyteMessage + err := json.NewDecoder(b).Decode(&amsg) + require.NoError(t, err) + assert.Equal(t, internal.CONNECTION_STATUS, amsg.Type) + require.NotNil(t, amsg.ConnectionStatus) + assert.Equal(t, "SUCCEEDED", amsg.ConnectionStatus.Status) +} + func TestCheckCredentialsInvalid(t *testing.T) { tfr := testFileReader{ content: []byte("{\"host\": \"something.us-east-3.psdb.cloud\",\"database\":\"database\",\"username\":\"username\",\"password\":\"password\"}"), diff --git a/cmd/airbyte-source/spec.json b/cmd/airbyte-source/spec.json index 3d4a8d1..2d14f94 100644 --- a/cmd/airbyte-source/spec.json +++ b/cmd/airbyte-source/spec.json @@ -66,19 +66,26 @@ "default": false, "order": 7 }, + "capture_deletes": { + "description": "Emit delete events as records using the deleted row values and mark them with `_planetscale_operation = delete`.", + "title": "Capture deletes?", + "type": "boolean", + "default": false, + "order": 8 + }, "starting_gtids": { "type": "string", "title": "Starting GTIDs", "default": "", "description": "A JSON string containing start GTIDs for every { keyspace: { shard: starting_gtid } }", - "order": 8 + "order": 9 }, "max_retries": { "type": "integer", "title": "Max retries", "default": 3, "description": "The max number of times we continue syncing after potential errors", - "order": 9 + "order": 10 }, "timeout_seconds": { "type": "integer", @@ -86,14 +93,14 @@ "default": 300, "minimum": 300, "description": "Timeout in seconds for a sync attempt", - "order": 10 + "order": 11 }, "use_gtid_with_table_pks": { "type": "boolean", "title": "Use GTID with table primary keys", "default": false, "description": "Use GTID position together with table primary keys", - "order": 11 + "order": 12 }, "options": { "type": "object", diff --git a/cmd/internal/planetscale_connection.go b/cmd/internal/planetscale_connection.go index bd7fc44..6112b96 100644 --- a/cmd/internal/planetscale_connection.go +++ b/cmd/internal/planetscale_connection.go @@ -20,6 +20,7 @@ type PlanetScaleSource struct { UseReplica bool `json:"use_replica"` UseRdonly bool `json:"use_rdonly"` IncludeMetadata bool `json:"include_metadata"` + CaptureDeletes bool `json:"capture_deletes"` StartingGtids string `json:"starting_gtids"` Options CustomSourceOptions `json:"options"` MaxRetries uint `json:"max_retries"` @@ -144,3 +145,7 @@ func (psc PlanetScaleSource) GetStartingGtids() (StartingGtids, error) { return startingGtids, nil } + +func (psc PlanetScaleSource) Validate() error { + return nil +} diff --git a/cmd/internal/planetscale_edge_database.go b/cmd/internal/planetscale_edge_database.go index 1336bc9..5bae75d 100644 --- a/cmd/internal/planetscale_edge_database.go +++ b/cmd/internal/planetscale_edge_database.go @@ -357,8 +357,9 @@ func (p PlanetScaleEdgeDatabase) sync(ctx context.Context, syncMode string, tc * } type rowWithPosition struct { - Result *query.QueryResult - Position string + Result *query.QueryResult + Position string + Operation string } var rows []rowWithPosition for _, event := range res.Events { @@ -398,13 +399,18 @@ func (p PlanetScaleEdgeDatabase) sync(ctx context.Context, syncMode string, tc * case binlogdata.VEventType_ROW: // Collect rows for processing for _, change := range event.RowEvent.RowChanges { - if change.After != nil { + row, operation, ok := getRowChangeRecord(change, ps.CaptureDeletes) + if ok { + if operation == "delete" { + p.Logger.Log(LOGLEVEL_INFO, fmt.Sprintf("%sCaptured delete row for emission at position [%s]", preamble, tc.Position)) + } rows = append(rows, rowWithPosition{ Result: &query.QueryResult{ Fields: fields, - Rows: []*query.Row{change.After}, + Rows: []*query.Row{row}, }, - Position: tc.Position, + Position: tc.Position, + Operation: operation, }) } } @@ -444,7 +450,7 @@ func (p PlanetScaleEdgeDatabase) sync(ctx context.Context, syncMode string, tc * } sqlResult.Rows = append(sqlResult.Rows, row) // Results queued to Airbyte here, and flushed at the end of sync() - p.printQueryResult(sqlResult, keyspaceOrDatabase, s.Name, &ps, tc.Position, resultCount) + p.printQueryResult(sqlResult, keyspaceOrDatabase, s.Name, &ps, rwp.Position, rwp.Operation, resultCount) } } } @@ -461,6 +467,22 @@ func (p PlanetScaleEdgeDatabase) sync(ctx context.Context, syncMode string, tc * } } +func getRowChangeRecord(change *binlogdata.RowChange, captureDeletes bool) (*query.Row, string, bool) { + switch { + case change.After != nil && change.Before == nil: + return change.After, "insert", true + case change.After != nil && change.Before != nil: + return change.After, "update", true + case change.Before != nil && change.After == nil: + if !captureDeletes { + return nil, "", false + } + return change.Before, "delete", true + default: + return nil, "", false + } +} + func (p PlanetScaleEdgeDatabase) getStopCursorPosition(ctx context.Context, shard, keyspace string, s Stream, ps PlanetScaleSource, tabletType psdbconnect.TabletType) (string, error) { defer p.Logger.Flush() timeout := 45 * time.Second @@ -548,6 +570,7 @@ func (p PlanetScaleEdgeDatabase) printQueryResult( tableNamespace, tableName string, ps *PlanetScaleSource, position string, + operation string, resultCounter int, ) { data := QueryResultToRecords(qr, ps) @@ -557,6 +580,8 @@ func (p PlanetScaleEdgeDatabase) printQueryResult( continue } + record["_planetscale_operation"] = operation + if ps.IncludeMetadata { // Ensure there's a _metadata field (map[string]interface{}) metadata, ok := record["_planetscale_metadata"].(map[string]interface{}) diff --git a/cmd/internal/planetscale_edge_database_test.go b/cmd/internal/planetscale_edge_database_test.go index 7d6eedd..13b55e2 100644 --- a/cmd/internal/planetscale_edge_database_test.go +++ b/cmd/internal/planetscale_edge_database_test.go @@ -22,6 +22,55 @@ import ( "vitess.io/vitess/go/vt/proto/vtgateservice" ) +func metadataFromRecord(t *testing.T, record map[string]interface{}) map[string]interface{} { + t.Helper() + + metadataRaw, ok := record["_planetscale_metadata"] + if !ok { + t.Fatalf("metadata must be present when IncludeMetadata=true") + } + + metadata, ok := metadataRaw.(map[string]interface{}) + if !ok { + t.Fatalf("metadata must be a map[string]interface{}") + } + + return metadata +} + +func productFields(database, table string) []*query.Field { + return []*query.Field{ + { + Name: "pid", + Type: query.Type_INT64, + Table: table, + OrgTable: table, + Database: database, + ColumnLength: 20, + Charset: 63, + ColumnType: "bigint", + }, + { + Name: "description", + Type: query.Type_VARCHAR, + Table: table, + OrgTable: table, + Database: database, + ColumnLength: 1024, + Charset: 255, + ColumnType: "varchar(256)", + }, + } +} + +func productRow(pid int, description string) *query.Row { + pidStr := fmt.Sprintf("%d", pid) + return &query.Row{ + Lengths: []int64{int64(len(pidStr)), int64(len(description))}, + Values: []byte(pidStr + description), + } +} + func TestRead_CanPeekBeforeRead(t *testing.T) { tma := getTestMysqlAccess() b := bytes.NewBufferString("") @@ -727,28 +776,7 @@ func TestRead_IncrementalSync_CanIncludesMetadata(t *testing.T) { Type: binlogdata.VEventType_FIELD, FieldEvent: &binlogdata.FieldEvent{ TableName: table, - Fields: []*query.Field{ - { - Name: "pid", - Type: query.Type_INT64, - Table: table, - OrgTable: table, - Database: keyspace, - ColumnLength: 20, - Charset: 63, - ColumnType: "bigint", - }, - { - Name: "description", - Type: query.Type_VARCHAR, - Table: table, - OrgTable: table, - Database: keyspace, - ColumnLength: 1024, - Charset: 255, - ColumnType: "varchar(256)", - }, - }, + Fields: productFields(keyspace, table), }, }, }, @@ -866,13 +894,9 @@ func TestRead_IncrementalSync_CanIncludesMetadata(t *testing.T) { assert.Equal(t, 2, len(tal.records["connect-test.products"])) records := tal.records["connect-test.products"] - for _, r := range records { - metadataRaw, ok := r["_planetscale_metadata"] - assert.True(t, ok, "metadata must be present when IncludeMetadata=true") - - metadata, ok := metadataRaw.(map[string]interface{}) - assert.True(t, ok, "metadata must be a map[string]interface{}") - + for i, r := range records { + metadata := metadataFromRecord(t, r) + assert.Equal(t, "insert", r["_planetscale_operation"], "incorrect operation") pos, hasPos := metadata["vgtid_position"] assert.True(t, hasPos, "missing vgtid_position") assert.Equal(t, middleVGtid, pos, "incorrect vgtid_position") @@ -880,9 +904,618 @@ func TestRead_IncrementalSync_CanIncludesMetadata(t *testing.T) { _, hasExtractedAt := metadata["extracted_at"] assert.True(t, hasExtractedAt, "missing extracted_at") - _, hasSeq := metadata["sequence_number"] + seq, hasSeq := metadata["sequence_number"] assert.True(t, hasSeq, "missing sequence_number") + assert.Equal(t, i+1, seq) + } +} + +func TestRead_IncrementalSync_CanCaptureDeleteAndClassifyOperations(t *testing.T) { + tma := getTestMysqlAccess() + tal := testAirbyteLogger{} + ped := PlanetScaleEdgeDatabase{ + Logger: &tal, + Mysql: tma, + } + + keyspace := "connect-test" + shard := "-" + table := "products" + startVGtid := "MySQL56/0d5afdd6-da80-11ef-844c-26dc1854a614:1-2,e1e896df-dae3-11ef-895b-626e6780cb50:1-2,e50c022a-dade-11ef-8083-d2b0b749d1bb:1-2" + middleVGtid := "MySQL56/0d5afdd6-da80-11ef-844c-26dc1854a614:1-2,e1e896df-dae3-11ef-895b-626e6780cb50:1-3,e50c022a-dade-11ef-8083-d2b0b749d1bb:1-2" + stopVGtid := "MySQL56/0d5afdd6-da80-11ef-844c-26dc1854a614:1-2,e1e896df-dae3-11ef-895b-626e6780cb50:1-4,e50c022a-dade-11ef-8083-d2b0b749d1bb:1-2" + + startCursor := &psdbconnect.TableCursor{ + Shard: shard, + Position: startVGtid, + Keyspace: keyspace, + } + + vstreamSyncClient := &vtgateVStreamClientMock{ + vstreamResponses: []*vstreamResponse{ + { + response: &vtgate.VStreamResponse{ + Events: []*binlogdata.VEvent{ + { + Type: binlogdata.VEventType_VGTID, + Vgtid: &binlogdata.VGtid{ + ShardGtids: []*binlogdata.ShardGtid{{ + Shard: shard, + Gtid: stopVGtid, + Keyspace: keyspace, + }}, + }, + }, + }, + }, + }, + { + response: &vtgate.VStreamResponse{ + Events: []*binlogdata.VEvent{ + { + Type: binlogdata.VEventType_VGTID, + Vgtid: &binlogdata.VGtid{ + ShardGtids: []*binlogdata.ShardGtid{{ + Shard: shard, + Gtid: startVGtid, + Keyspace: keyspace, + }}, + }, + }, + }, + }, + }, + { + response: &vtgate.VStreamResponse{ + Events: []*binlogdata.VEvent{ + { + Type: binlogdata.VEventType_VGTID, + Vgtid: &binlogdata.VGtid{ + ShardGtids: []*binlogdata.ShardGtid{{ + Shard: shard, + Gtid: middleVGtid, + Keyspace: keyspace, + }}, + }, + }, + }, + }, + }, + { + response: &vtgate.VStreamResponse{ + Events: []*binlogdata.VEvent{ + { + Type: binlogdata.VEventType_FIELD, + FieldEvent: &binlogdata.FieldEvent{ + TableName: table, + Fields: productFields(keyspace, table), + }, + }, + }, + }, + }, + { + response: &vtgate.VStreamResponse{ + Events: []*binlogdata.VEvent{ + { + Type: binlogdata.VEventType_ROW, + RowEvent: &binlogdata.RowEvent{ + TableName: table, + Keyspace: keyspace, + Shard: shard, + RowChanges: []*binlogdata.RowChange{ + { + After: productRow(1, "keyboard"), + }, + { + Before: productRow(2, "monitor"), + After: productRow(2, "display"), + }, + { + Before: productRow(3, "mouse"), + }, + }, + }, + }, + }, + }, + }, + { + response: &vtgate.VStreamResponse{ + Events: []*binlogdata.VEvent{ + { + Type: binlogdata.VEventType_VGTID, + Vgtid: &binlogdata.VGtid{ + ShardGtids: []*binlogdata.ShardGtid{{ + Shard: shard, + Gtid: stopVGtid, + Keyspace: keyspace, + }}, + }, + }, + }, + }, + }, + }, + } + + vsc := vstreamClientMock{ + vstreamFn: func(ctx context.Context, in *vtgate.VStreamRequest, opts ...grpc.CallOption) (vtgateservice.Vitess_VStreamClient, error) { + assert.Equal(t, topodata.TabletType_PRIMARY, in.TabletType) + return vstreamSyncClient, nil + }, + } + + ped.vtgateClientFn = func(ctx context.Context, ps PlanetScaleSource) (vtgateservice.VitessClient, error) { + return &vsc, nil + } + + ps := PlanetScaleSource{ + Database: "connect-test", + IncludeMetadata: true, + CaptureDeletes: true, + } + cs := ConfiguredStream{ + Stream: Stream{ + Name: table, + Namespace: keyspace, + }, + } + + sc, err := ped.Read(context.Background(), os.Stdout, ps, cs, startCursor) + assert.NoError(t, err) + assert.NotNil(t, sc) + assert.Equal(t, 3, len(tal.records["connect-test.products"])) + + records := tal.records["connect-test.products"] + assert.Equal(t, 1, records[0]["pid"]) + assert.Equal(t, "keyboard", records[0]["description"]) + assert.Equal(t, 2, records[1]["pid"]) + assert.Equal(t, "display", records[1]["description"]) + assert.Equal(t, 3, records[2]["pid"]) + assert.Equal(t, "mouse", records[2]["description"]) + + expectedOperations := []string{"insert", "update", "delete"} + for i, record := range records { + metadata := metadataFromRecord(t, record) + assert.Equal(t, expectedOperations[i], record["_planetscale_operation"]) + assert.Equal(t, middleVGtid, metadata["vgtid_position"]) + assert.Equal(t, i+1, metadata["sequence_number"]) + } +} + +func TestRead_IncrementalSync_SkipsDeleteRowsWhenCaptureDeletesDisabled(t *testing.T) { + tma := getTestMysqlAccess() + tal := testAirbyteLogger{} + ped := PlanetScaleEdgeDatabase{ + Logger: &tal, + Mysql: tma, + } + + keyspace := "connect-test" + shard := "-" + table := "products" + startVGtid := "MySQL56/0d5afdd6-da80-11ef-844c-26dc1854a614:1-2,e1e896df-dae3-11ef-895b-626e6780cb50:1-2,e50c022a-dade-11ef-8083-d2b0b749d1bb:1-2" + middleVGtid := "MySQL56/0d5afdd6-da80-11ef-844c-26dc1854a614:1-2,e1e896df-dae3-11ef-895b-626e6780cb50:1-3,e50c022a-dade-11ef-8083-d2b0b749d1bb:1-2" + stopVGtid := "MySQL56/0d5afdd6-da80-11ef-844c-26dc1854a614:1-2,e1e896df-dae3-11ef-895b-626e6780cb50:1-4,e50c022a-dade-11ef-8083-d2b0b749d1bb:1-2" + + startCursor := &psdbconnect.TableCursor{ + Shard: shard, + Position: startVGtid, + Keyspace: keyspace, } + + vstreamSyncClient := &vtgateVStreamClientMock{ + vstreamResponses: []*vstreamResponse{ + { + response: &vtgate.VStreamResponse{ + Events: []*binlogdata.VEvent{ + { + Type: binlogdata.VEventType_VGTID, + Vgtid: &binlogdata.VGtid{ + ShardGtids: []*binlogdata.ShardGtid{{ + Shard: shard, + Gtid: stopVGtid, + Keyspace: keyspace, + }}, + }, + }, + }, + }, + }, + { + response: &vtgate.VStreamResponse{ + Events: []*binlogdata.VEvent{ + { + Type: binlogdata.VEventType_VGTID, + Vgtid: &binlogdata.VGtid{ + ShardGtids: []*binlogdata.ShardGtid{{ + Shard: shard, + Gtid: startVGtid, + Keyspace: keyspace, + }}, + }, + }, + }, + }, + }, + { + response: &vtgate.VStreamResponse{ + Events: []*binlogdata.VEvent{ + { + Type: binlogdata.VEventType_VGTID, + Vgtid: &binlogdata.VGtid{ + ShardGtids: []*binlogdata.ShardGtid{{ + Shard: shard, + Gtid: middleVGtid, + Keyspace: keyspace, + }}, + }, + }, + }, + }, + }, + { + response: &vtgate.VStreamResponse{ + Events: []*binlogdata.VEvent{ + { + Type: binlogdata.VEventType_FIELD, + FieldEvent: &binlogdata.FieldEvent{ + TableName: table, + Fields: productFields(keyspace, table), + }, + }, + }, + }, + }, + { + response: &vtgate.VStreamResponse{ + Events: []*binlogdata.VEvent{ + { + Type: binlogdata.VEventType_ROW, + RowEvent: &binlogdata.RowEvent{ + TableName: table, + Keyspace: keyspace, + Shard: shard, + RowChanges: []*binlogdata.RowChange{ + { + Before: productRow(3, "mouse"), + }, + }, + }, + }, + }, + }, + }, + { + response: &vtgate.VStreamResponse{ + Events: []*binlogdata.VEvent{ + { + Type: binlogdata.VEventType_VGTID, + Vgtid: &binlogdata.VGtid{ + ShardGtids: []*binlogdata.ShardGtid{{ + Shard: shard, + Gtid: stopVGtid, + Keyspace: keyspace, + }}, + }, + }, + }, + }, + }, + }, + } + + vsc := vstreamClientMock{ + vstreamFn: func(ctx context.Context, in *vtgate.VStreamRequest, opts ...grpc.CallOption) (vtgateservice.Vitess_VStreamClient, error) { + return vstreamSyncClient, nil + }, + } + + ped.vtgateClientFn = func(ctx context.Context, ps PlanetScaleSource) (vtgateservice.VitessClient, error) { + return &vsc, nil + } + + ps := PlanetScaleSource{ + Database: "connect-test", + IncludeMetadata: true, + } + cs := ConfiguredStream{ + Stream: Stream{ + Name: table, + Namespace: keyspace, + }, + } + + sc, err := ped.Read(context.Background(), os.Stdout, ps, cs, startCursor) + assert.NoError(t, err) + assert.NotNil(t, sc) + assert.Equal(t, 0, len(tal.records["connect-test.products"])) +} + +func TestRead_IncrementalSync_CanCaptureDeletesWithoutMetadata(t *testing.T) { + tma := getTestMysqlAccess() + tal := testAirbyteLogger{} + ped := PlanetScaleEdgeDatabase{ + Logger: &tal, + Mysql: tma, + } + + keyspace := "connect-test" + shard := "-" + table := "products" + startVGtid := "MySQL56/0d5afdd6-da80-11ef-844c-26dc1854a614:1-2,e1e896df-dae3-11ef-895b-626e6780cb50:1-2,e50c022a-dade-11ef-8083-d2b0b749d1bb:1-2" + middleVGtid := "MySQL56/0d5afdd6-da80-11ef-844c-26dc1854a614:1-2,e1e896df-dae3-11ef-895b-626e6780cb50:1-3,e50c022a-dade-11ef-8083-d2b0b749d1bb:1-2" + stopVGtid := "MySQL56/0d5afdd6-da80-11ef-844c-26dc1854a614:1-2,e1e896df-dae3-11ef-895b-626e6780cb50:1-4,e50c022a-dade-11ef-8083-d2b0b749d1bb:1-2" + + startCursor := &psdbconnect.TableCursor{ + Shard: shard, + Position: startVGtid, + Keyspace: keyspace, + } + + vstreamSyncClient := &vtgateVStreamClientMock{ + vstreamResponses: []*vstreamResponse{ + { + response: &vtgate.VStreamResponse{ + Events: []*binlogdata.VEvent{ + { + Type: binlogdata.VEventType_VGTID, + Vgtid: &binlogdata.VGtid{ + ShardGtids: []*binlogdata.ShardGtid{{ + Shard: shard, + Gtid: stopVGtid, + Keyspace: keyspace, + }}, + }, + }, + }, + }, + }, + { + response: &vtgate.VStreamResponse{ + Events: []*binlogdata.VEvent{ + { + Type: binlogdata.VEventType_VGTID, + Vgtid: &binlogdata.VGtid{ + ShardGtids: []*binlogdata.ShardGtid{{ + Shard: shard, + Gtid: startVGtid, + Keyspace: keyspace, + }}, + }, + }, + }, + }, + }, + { + response: &vtgate.VStreamResponse{ + Events: []*binlogdata.VEvent{ + { + Type: binlogdata.VEventType_VGTID, + Vgtid: &binlogdata.VGtid{ + ShardGtids: []*binlogdata.ShardGtid{{ + Shard: shard, + Gtid: middleVGtid, + Keyspace: keyspace, + }}, + }, + }, + }, + }, + }, + { + response: &vtgate.VStreamResponse{ + Events: []*binlogdata.VEvent{ + { + Type: binlogdata.VEventType_FIELD, + FieldEvent: &binlogdata.FieldEvent{ + TableName: table, + Fields: productFields(keyspace, table), + }, + }, + }, + }, + }, + { + response: &vtgate.VStreamResponse{ + Events: []*binlogdata.VEvent{ + { + Type: binlogdata.VEventType_ROW, + RowEvent: &binlogdata.RowEvent{ + TableName: table, + Keyspace: keyspace, + Shard: shard, + RowChanges: []*binlogdata.RowChange{ + { + Before: productRow(3, "mouse"), + }, + }, + }, + }, + }, + }, + }, + { + response: &vtgate.VStreamResponse{ + Events: []*binlogdata.VEvent{ + { + Type: binlogdata.VEventType_VGTID, + Vgtid: &binlogdata.VGtid{ + ShardGtids: []*binlogdata.ShardGtid{{ + Shard: shard, + Gtid: stopVGtid, + Keyspace: keyspace, + }}, + }, + }, + }, + }, + }, + }, + } + + vsc := vstreamClientMock{ + vstreamFn: func(ctx context.Context, in *vtgate.VStreamRequest, opts ...grpc.CallOption) (vtgateservice.Vitess_VStreamClient, error) { + return vstreamSyncClient, nil + }, + } + + ped.vtgateClientFn = func(ctx context.Context, ps PlanetScaleSource) (vtgateservice.VitessClient, error) { + return &vsc, nil + } + + ps := PlanetScaleSource{ + Database: "connect-test", + CaptureDeletes: true, + } + cs := ConfiguredStream{ + Stream: Stream{ + Name: table, + Namespace: keyspace, + }, + } + + sc, err := ped.Read(context.Background(), os.Stdout, ps, cs, startCursor) + assert.NoError(t, err) + assert.NotNil(t, sc) + assert.Equal(t, 1, len(tal.records["connect-test.products"])) + + record := tal.records["connect-test.products"][0] + assert.Equal(t, 3, record["pid"]) + assert.Equal(t, "mouse", record["description"]) + assert.Equal(t, "delete", record["_planetscale_operation"]) + _, hasMetadata := record["_planetscale_metadata"] + assert.False(t, hasMetadata) +} + +func TestRead_FullSync_LabelsCopyRowsAsInsert(t *testing.T) { + tma := getTestMysqlAccess() + tal := testAirbyteLogger{} + ped := PlanetScaleEdgeDatabase{ + Logger: &tal, + Mysql: tma, + } + + keyspace := "connect-test" + shard := "-" + table := "products" + copyVGtid := "MySQL56/0d5afdd6-da80-11ef-844c-26dc1854a614:1-5,e1e896df-dae3-11ef-895b-626e6780cb50:1-5,e50c022a-dade-11ef-8083-d2b0b749d1bb:1-5" + + startCursor := &psdbconnect.TableCursor{ + Shard: shard, + Position: "", + Keyspace: keyspace, + } + + vstreamSyncClient := &vtgateVStreamClientMock{ + vstreamResponses: []*vstreamResponse{ + { + response: &vtgate.VStreamResponse{ + Events: []*binlogdata.VEvent{ + { + Type: binlogdata.VEventType_VGTID, + Vgtid: &binlogdata.VGtid{ + ShardGtids: []*binlogdata.ShardGtid{{ + Shard: shard, + Gtid: copyVGtid, + Keyspace: keyspace, + }}, + }, + }, + }, + }, + }, + { + response: &vtgate.VStreamResponse{ + Events: []*binlogdata.VEvent{ + { + Type: binlogdata.VEventType_VGTID, + Vgtid: &binlogdata.VGtid{ + ShardGtids: []*binlogdata.ShardGtid{{ + Shard: shard, + Gtid: copyVGtid, + Keyspace: keyspace, + }}, + }, + }, + }, + }, + }, + { + response: &vtgate.VStreamResponse{ + Events: []*binlogdata.VEvent{ + { + Type: binlogdata.VEventType_FIELD, + FieldEvent: &binlogdata.FieldEvent{ + TableName: table, + Fields: productFields(keyspace, table), + }, + }, + }, + }, + }, + { + response: &vtgate.VStreamResponse{ + Events: []*binlogdata.VEvent{ + { + Type: binlogdata.VEventType_ROW, + RowEvent: &binlogdata.RowEvent{ + TableName: table, + Keyspace: keyspace, + Shard: shard, + RowChanges: []*binlogdata.RowChange{ + { + After: productRow(1, "keyboard"), + }, + }, + }, + }, + }, + }, + }, + { + response: &vtgate.VStreamResponse{ + Events: []*binlogdata.VEvent{ + { + Type: binlogdata.VEventType_COPY_COMPLETED, + }, + }, + }, + }, + }, + } + + vsc := vstreamClientMock{ + vstreamFn: func(ctx context.Context, in *vtgate.VStreamRequest, opts ...grpc.CallOption) (vtgateservice.Vitess_VStreamClient, error) { + return vstreamSyncClient, nil + }, + } + + ped.vtgateClientFn = func(ctx context.Context, ps PlanetScaleSource) (vtgateservice.VitessClient, error) { + return &vsc, nil + } + + ps := PlanetScaleSource{ + Database: "connect-test", + IncludeMetadata: true, + } + cs := ConfiguredStream{ + Stream: Stream{ + Name: table, + Namespace: keyspace, + }, + } + + sc, err := ped.Read(context.Background(), os.Stdout, ps, cs, startCursor) + assert.NoError(t, err) + assert.NotNil(t, sc) + assert.Equal(t, 1, len(tal.records["connect-test.products"])) + + metadata := metadataFromRecord(t, tal.records["connect-test.products"][0]) + assert.Equal(t, "insert", tal.records["connect-test.products"][0]["_planetscale_operation"]) + assert.Equal(t, copyVGtid, metadata["vgtid_position"]) + assert.Equal(t, 1, metadata["sequence_number"]) } // CanReturnNewCursorIfNewFound tests returning the same GTID as stop position diff --git a/cmd/internal/planetscale_edge_mysql.go b/cmd/internal/planetscale_edge_mysql.go index c86df68..5a00964 100644 --- a/cmd/internal/planetscale_edge_mysql.go +++ b/cmd/internal/planetscale_edge_mysql.go @@ -189,6 +189,10 @@ func (p planetScaleEdgeMySQLAccess) GetTableSchema(ctx context.Context, psc Plan return properties, errors.Wrapf(err, "unable to iterate columns for table %s", tableName) } + properties["_planetscale_operation"] = PropertyType{ + Type: []string{"string"}, + } + // Inject metadata column when include_metadata is true. if psc.IncludeMetadata { properties["_planetscale_metadata"] = PropertyType{