Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 13 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,19 @@ fe8e2a3c-f91a-11ee-9812-82f5834c1ba7:1-46602355

**Note**: Remember to prepend the prefix `MySQL56/` onto your starting GTIDs.

### 3b. Metadata and delete capture
Each emitted record includes a `_planetscale_operation` field with one of:
- `insert`
- `update`
- `delete`

When `include_metadata` is enabled, each emitted record also includes a `_planetscale_metadata` object with:
- `vgtid_position`: the Vitess VGTID replication position
- `extracted_at`: the extraction timestamp in nanoseconds
- `sequence_number`: a monotonically increasing per-sync sequence number

Delete events are only emitted when `capture_deletes` is enabled.

## Interpreting logs
Airbyte logs will include logs from the source (this library), the Airbyte connector, and the destination. All source logs will be prefixed with `[source]` and `PlanetScale Source ::`.

Expand Down Expand Up @@ -235,5 +248,3 @@ Vitess cells that the Airbyte source will sync from.
`Finished reading <recordCount> records for table [<table>]`
Airbyte source has detected that the client has ended its connection. `recordCount` records were counted as having been sent to connector for table `table`.



3 changes: 3 additions & 0 deletions cmd/airbyte-source/check.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,9 @@ func parseSource(reader FileReader, configFilePath string) (internal.PlanetScale
if err = json.Unmarshal(contents, &psc); err != nil {
return psc, err
}
if err = psc.Validate(); err != nil {
return psc, err
}

return psc, nil
}
Expand Down
40 changes: 40 additions & 0 deletions cmd/airbyte-source/check_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,46 @@ func TestCheckInvalidCatalogJSON(t *testing.T) {
assert.Equal(t, "FAILED", amsg.ConnectionStatus.Status)
}

func TestParseSource_AllowsCaptureDeletesWithoutMetadata(t *testing.T) {
tfr := testFileReader{
content: []byte("{\"host\":\"something.us-east-3.psdb.cloud\",\"database\":\"database\",\"username\":\"username\",\"password\":\"password\",\"capture_deletes\":true}"),
}

psc, err := parseSource(tfr, "source.json")
require.NoError(t, err)
assert.True(t, psc.CaptureDeletes)
assert.False(t, psc.IncludeMetadata)
}

func TestCheckAllowsCaptureDeletesWithoutMetadata(t *testing.T) {
tfr := testFileReader{
content: []byte("{\"host\":\"something.us-east-3.psdb.cloud\",\"database\":\"database\",\"username\":\"username\",\"password\":\"password\",\"capture_deletes\":true}"),
}

td := testDatabase{
connectResponse: canConnectResponse{
err: nil,
},
}

checkCommand := CheckCommand(&Helper{
Database: td,
FileReader: tfr,
Logger: internal.NewLogger(os.Stdout),
})
b := bytes.NewBufferString("")
checkCommand.SetOut(b)
assert.NoError(t, checkCommand.Flag("config").Value.Set("catalog.json"))
assert.NoError(t, checkCommand.Execute())

var amsg internal.AirbyteMessage
err := json.NewDecoder(b).Decode(&amsg)
require.NoError(t, err)
assert.Equal(t, internal.CONNECTION_STATUS, amsg.Type)
require.NotNil(t, amsg.ConnectionStatus)
assert.Equal(t, "SUCCEEDED", amsg.ConnectionStatus.Status)
}

func TestCheckCredentialsInvalid(t *testing.T) {
tfr := testFileReader{
content: []byte("{\"host\": \"something.us-east-3.psdb.cloud\",\"database\":\"database\",\"username\":\"username\",\"password\":\"password\"}"),
Expand Down
15 changes: 11 additions & 4 deletions cmd/airbyte-source/spec.json
Original file line number Diff line number Diff line change
Expand Up @@ -66,34 +66,41 @@
"default": false,
"order": 7
},
"capture_deletes": {
"description": "Emit delete events as records using the deleted row values and mark them with `_planetscale_operation = delete`.",
"title": "Capture deletes?",
"type": "boolean",
"default": false,
"order": 8
},
"starting_gtids": {
"type": "string",
"title": "Starting GTIDs",
"default": "",
"description": "A JSON string containing start GTIDs for every { keyspace: { shard: starting_gtid } }",
"order": 8
"order": 9
},
"max_retries": {
"type": "integer",
"title": "Max retries",
"default": 3,
"description": "The max number of times we continue syncing after potential errors",
"order": 9
"order": 10
},
"timeout_seconds": {
"type": "integer",
"title": "Timeout (in seconds)",
"default": 300,
"minimum": 300,
"description": "Timeout in seconds for a sync attempt",
"order": 10
"order": 11
},
"use_gtid_with_table_pks": {
"type": "boolean",
"title": "Use GTID with table primary keys",
"default": false,
"description": "Use GTID position together with table primary keys",
"order": 11
"order": 12
},
"options": {
"type": "object",
Expand Down
5 changes: 5 additions & 0 deletions cmd/internal/planetscale_connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ type PlanetScaleSource struct {
UseReplica bool `json:"use_replica"`
UseRdonly bool `json:"use_rdonly"`
IncludeMetadata bool `json:"include_metadata"`
CaptureDeletes bool `json:"capture_deletes"`
StartingGtids string `json:"starting_gtids"`
Options CustomSourceOptions `json:"options"`
MaxRetries uint `json:"max_retries"`
Expand Down Expand Up @@ -144,3 +145,7 @@ func (psc PlanetScaleSource) GetStartingGtids() (StartingGtids, error) {

return startingGtids, nil
}

func (psc PlanetScaleSource) Validate() error {
return nil
}
37 changes: 31 additions & 6 deletions cmd/internal/planetscale_edge_database.go
Original file line number Diff line number Diff line change
Expand Up @@ -357,8 +357,9 @@ func (p PlanetScaleEdgeDatabase) sync(ctx context.Context, syncMode string, tc *
}

type rowWithPosition struct {
Result *query.QueryResult
Position string
Result *query.QueryResult
Position string
Operation string
}
var rows []rowWithPosition
for _, event := range res.Events {
Expand Down Expand Up @@ -398,13 +399,18 @@ func (p PlanetScaleEdgeDatabase) sync(ctx context.Context, syncMode string, tc *
case binlogdata.VEventType_ROW:
// Collect rows for processing
for _, change := range event.RowEvent.RowChanges {
if change.After != nil {
row, operation, ok := getRowChangeRecord(change, ps.CaptureDeletes)
if ok {
if operation == "delete" {
p.Logger.Log(LOGLEVEL_INFO, fmt.Sprintf("%sCaptured delete row for emission at position [%s]", preamble, tc.Position))
}
rows = append(rows, rowWithPosition{
Result: &query.QueryResult{
Fields: fields,
Rows: []*query.Row{change.After},
Rows: []*query.Row{row},
},
Position: tc.Position,
Position: tc.Position,
Operation: operation,
})
}
}
Expand Down Expand Up @@ -444,7 +450,7 @@ func (p PlanetScaleEdgeDatabase) sync(ctx context.Context, syncMode string, tc *
}
sqlResult.Rows = append(sqlResult.Rows, row)
// Results queued to Airbyte here, and flushed at the end of sync()
p.printQueryResult(sqlResult, keyspaceOrDatabase, s.Name, &ps, tc.Position, resultCount)
p.printQueryResult(sqlResult, keyspaceOrDatabase, s.Name, &ps, rwp.Position, rwp.Operation, resultCount)
}
}
}
Expand All @@ -461,6 +467,22 @@ func (p PlanetScaleEdgeDatabase) sync(ctx context.Context, syncMode string, tc *
}
}

func getRowChangeRecord(change *binlogdata.RowChange, captureDeletes bool) (*query.Row, string, bool) {
switch {
case change.After != nil && change.Before == nil:
return change.After, "insert", true
case change.After != nil && change.Before != nil:
return change.After, "update", true
case change.Before != nil && change.After == nil:
if !captureDeletes {
return nil, "", false
}
return change.Before, "delete", true
default:
return nil, "", false
}
}

func (p PlanetScaleEdgeDatabase) getStopCursorPosition(ctx context.Context, shard, keyspace string, s Stream, ps PlanetScaleSource, tabletType psdbconnect.TabletType) (string, error) {
defer p.Logger.Flush()
timeout := 45 * time.Second
Expand Down Expand Up @@ -548,6 +570,7 @@ func (p PlanetScaleEdgeDatabase) printQueryResult(
tableNamespace, tableName string,
ps *PlanetScaleSource,
position string,
operation string,
resultCounter int,
) {
data := QueryResultToRecords(qr, ps)
Expand All @@ -557,6 +580,8 @@ func (p PlanetScaleEdgeDatabase) printQueryResult(
continue
}

record["_planetscale_operation"] = operation

if ps.IncludeMetadata {
// Ensure there's a _metadata field (map[string]interface{})
metadata, ok := record["_planetscale_metadata"].(map[string]interface{})
Expand Down
Loading
Loading