diff --git a/bridge/cgo_sqldiff.go b/bridge/cgo_sqldiff.go new file mode 100644 index 0000000..9b1d649 --- /dev/null +++ b/bridge/cgo_sqldiff.go @@ -0,0 +1,120 @@ +package bridge + +/* +#cgo CFLAGS: -I${SRCDIR} +#cgo LDFLAGS: -lsqlite3 + +#include +#include +#include "sqldiff_wrapper.h" +*/ +import "C" +import ( + "fmt" + "unsafe" +) + +// DiffResult contains the result of a diff operation +type DiffResult struct { + SQL string // SQL statements to transform db1 to db2 + HasChanges bool // Whether there are any differences + Operations []DiffOperation // Parsed operations from the diff + Conflicts []PrimaryKeyConflict // Detected primary key conflicts +} + +// DiffOperation represents a single SQL operation from the diff +type DiffOperation struct { + Type string // INSERT, UPDATE, DELETE + Table string // Table name + PrimaryKey map[string]interface{} // Primary key values + SQL string // The actual SQL statement +} + +// PrimaryKeyConflict represents a conflict on the same primary key +type PrimaryKeyConflict struct { + Table string + PrimaryKey map[string]interface{} + Operation1 string // First operation (from db1) + Operation2 string // Second operation (from db2) +} + +// RunSQLDiff compares two SQLite databases and returns the differences +// db1Path: path to first database (baseline/old version) +// db2Path: path to second database (new version with changes) +// Returns SQL statements that would transform db1 into db2 +func RunSQLDiff(db1Path, db2Path string) (*DiffResult, error) { + cDb1 := C.CString(db1Path) + cDb2 := C.CString(db2Path) + defer C.free(unsafe.Pointer(cDb1)) + defer C.free(unsafe.Pointer(cDb2)) + + var cResult *C.char + var cError *C.char + + // Call the C wrapper function + rc := C.sqldiff_run(cDb1, cDb2, &cResult, &cError) + + if rc != 0 { + if cError != nil { + errMsg := C.GoString(cError) + C.free(unsafe.Pointer(cError)) + return nil, fmt.Errorf("sqldiff failed: %s", errMsg) + } + return nil, fmt.Errorf("sqldiff failed with code %d", rc) + } + + result := &DiffResult{ + HasChanges: false, + Operations: []DiffOperation{}, + Conflicts: []PrimaryKeyConflict{}, + } + + if cResult != nil { + result.SQL = C.GoString(cResult) + result.HasChanges = len(result.SQL) > 0 + C.free(unsafe.Pointer(cResult)) + + // Parse the SQL to extract operations + result.Operations = parseSQL(result.SQL) + + // Detect primary key conflicts + result.Conflicts = detectConflicts(result.Operations) + } + + return result, nil +} + +// parseSQL parses SQL diff output into individual operations +func parseSQL(sql string) []DiffOperation { + // TODO: Implement SQL parsing + // For now, return empty slice + // This would parse INSERT, UPDATE, DELETE statements + // and extract table names and primary keys + return []DiffOperation{} +} + +// detectConflicts finds operations that conflict on the same primary key +func detectConflicts(operations []DiffOperation) []PrimaryKeyConflict { + // TODO: Implement conflict detection + // This would find cases where: + // - Same PK has multiple UPDATE/DELETE operations + // - INSERT on existing PK + // etc. + return []PrimaryKeyConflict{} +} + +// ApplyDiff applies a diff to a database +// This executes the SQL statements from a DiffResult +func ApplyDiff(dbPath string, diff *DiffResult) error { + if !diff.HasChanges { + return nil // Nothing to apply + } + + if len(diff.Conflicts) > 0 { + return fmt.Errorf("cannot apply diff with %d conflicts", len(diff.Conflicts)) + } + + // TODO: Open database and execute SQL + // For now, we'll need to implement this using the bridge + return fmt.Errorf("ApplyDiff not yet implemented") +} diff --git a/bridge/sqldiff.c b/bridge/sqldiff.c new file mode 100644 index 0000000..6596c63 --- /dev/null +++ b/bridge/sqldiff.c @@ -0,0 +1,2396 @@ +//go:build ignore +/* +** 2015-04-06 +** +** The author disclaims copyright to this source code. In place of +** a legal notice, here is a blessing: +** +** May you do good and not evil. +** May you find forgiveness for yourself and forgive others. +** May you share freely, never taking more than you give. +** +************************************************************************* +** +** This is a utility program that computes the differences in content +** between two SQLite databases. +** +** To compile, simply link against SQLite. (Windows builds must also link +** against ext/misc/sqlite3_stdio.c.) +** +** See the showHelp() routine below for a brief description of how to +** run the utility. +*/ +#include +#include +#include +#include +#include +#include +#include "sqlite3.h" +#include "sqlite3_stdio.h" + +/* +** All global variables are gathered into the "g" singleton. +*/ +struct GlobalVars +{ + const char *zArgv0; /* Name of program */ + int bSchemaOnly; /* Only show schema differences */ + int bSchemaPK; /* Use the schema-defined PK, not the true PK */ + int bHandleVtab; /* Handle fts3, fts4, fts5 and rtree vtabs */ + unsigned fDebug; /* Debug flags */ + int bSchemaCompare; /* Doing single-table sqlite_schema compare */ + sqlite3 *db; /* The database connection */ +} g; + +/* +** Allowed values for g.fDebug +*/ +#define DEBUG_COLUMN_NAMES 0x000001 +#define DEBUG_DIFF_SQL 0x000002 + +/* +** Clear and free an sqlite3_str object +*/ +static void strFree(sqlite3_str *pStr) +{ + sqlite3_free(sqlite3_str_finish(pStr)); +} + +/* +** Print an error resulting from faulting command-line arguments and +** abort the program. +*/ +static void cmdlineError(const char *zFormat, ...) +{ + sqlite3_str *pOut = sqlite3_str_new(0); + va_list ap; + va_start(ap, zFormat); + sqlite3_str_vappendf(pOut, zFormat, ap); + va_end(ap); + sqlite3_fprintf(stderr, "%s: %s\n", g.zArgv0, sqlite3_str_value(pOut)); + strFree(pOut); + sqlite3_fprintf(stderr, "\"%s --help\" for more help\n", g.zArgv0); + exit(1); +} + +/* +** Print an error message for an error that occurs at runtime, then +** abort the program. +*/ +static void runtimeError(const char *zFormat, ...) +{ + sqlite3_str *pOut = sqlite3_str_new(0); + va_list ap; + va_start(ap, zFormat); + sqlite3_str_vappendf(pOut, zFormat, ap); + va_end(ap); + sqlite3_fprintf(stderr, "%s: %s\n", g.zArgv0, sqlite3_str_value(pOut)); + strFree(pOut); + exit(1); +} + +/* Safely quote an SQL identifier. Use the minimum amount of transformation +** necessary to allow the string to be used with %s. +** +** Space to hold the returned string is obtained from sqlite3_malloc(). The +** caller is responsible for ensuring this space is freed when no longer +** needed. +*/ +static char *safeId(const char *zId) +{ + int i, x; + char c; + if (zId[0] == 0) + return sqlite3_mprintf("\"\""); + for (i = x = 0; (c = zId[i]) != 0; i++) + { + if (!isalpha(c) && c != '_') + { + if (i > 0 && isdigit(c)) + { + x++; + } + else + { + return sqlite3_mprintf("\"%w\"", zId); + } + } + } + if (x || !sqlite3_keyword_check(zId, i)) + { + return sqlite3_mprintf("%s", zId); + } + return sqlite3_mprintf("\"%w\"", zId); +} + +/* +** Prepare a new SQL statement. Print an error and abort if anything +** goes wrong. +*/ +static sqlite3_stmt *db_vprepare(const char *zFormat, va_list ap) +{ + char *zSql; + int rc; + sqlite3_stmt *pStmt; + + zSql = sqlite3_vmprintf(zFormat, ap); + if (zSql == 0) + runtimeError("out of memory"); + rc = sqlite3_prepare_v2(g.db, zSql, -1, &pStmt, 0); + if (rc) + { + runtimeError("SQL statement error: %s\n\"%s\"", sqlite3_errmsg(g.db), + zSql); + } + sqlite3_free(zSql); + return pStmt; +} +static sqlite3_stmt *db_prepare(const char *zFormat, ...) +{ + va_list ap; + sqlite3_stmt *pStmt; + va_start(ap, zFormat); + pStmt = db_vprepare(zFormat, ap); + va_end(ap); + return pStmt; +} + +/* +** Free a list of strings +*/ +static void namelistFree(char **az) +{ + if (az) + { + int i; + for (i = 0; az[i]; i++) + sqlite3_free(az[i]); + sqlite3_free(az); + } +} + +/* +** Return a list of column names [a] for the table zDb.zTab. Space to +** hold the list is obtained from sqlite3_malloc() and should released +** using namelistFree() when no longer needed. +** +** Primary key columns are listed first, followed by data columns. +** The number of columns in the primary key is returned in *pnPkey. +** +** Normally [a], the "primary key" in the previous sentence is the true +** primary key - the rowid or INTEGER PRIMARY KEY for ordinary tables +** or the declared PRIMARY KEY for WITHOUT ROWID tables. However, if +** the g.bSchemaPK flag is set, then the schema-defined PRIMARY KEY is +** used in all cases. In that case, entries that have NULL values in +** any of their primary key fields will be excluded from the analysis. +** +** If the primary key for a table is the rowid but rowid is inaccessible, +** then this routine returns a NULL pointer. +** +** [a. If the lone, named table is "sqlite_schema", "rootpage" column is +** omitted and the "type" and "name" columns are made to be the PK.] +** +** Examples: +** CREATE TABLE t1(a INT UNIQUE, b INTEGER, c TEXT, PRIMARY KEY(c)); +** *pnPKey = 1; +** az = { "rowid", "a", "b", "c", 0 } // Normal case +** az = { "c", "a", "b", 0 } // g.bSchemaPK==1 +** +** CREATE TABLE t2(a INT UNIQUE, b INTEGER, c TEXT, PRIMARY KEY(b)); +** *pnPKey = 1; +** az = { "b", "a", "c", 0 } +** +** CREATE TABLE t3(x,y,z,PRIMARY KEY(y,z)); +** *pnPKey = 1 // Normal case +** az = { "rowid", "x", "y", "z", 0 } // Normal case +** *pnPKey = 2 // g.bSchemaPK==1 +** az = { "y", "x", "z", 0 } // g.bSchemaPK==1 +** +** CREATE TABLE t4(x,y,z,PRIMARY KEY(y,z)) WITHOUT ROWID; +** *pnPKey = 2 +** az = { "y", "z", "x", 0 } +** +** CREATE TABLE t5(rowid,_rowid_,oid); +** az = 0 // The rowid is not accessible +*/ +static char **columnNames( + const char *zDb, /* Database ("main" or "aux") to query */ + const char *zTab, /* Name of table to return details of */ + int *pnPKey, /* OUT: Number of PK columns */ + int *pbRowid /* OUT: True if PK is an implicit rowid */ +) +{ + char **az = 0; /* List of column names to be returned */ + int naz = 0; /* Number of entries in az[] */ + sqlite3_stmt *pStmt; /* SQL statement being run */ + char *zPkIdxName = 0; /* Name of the PRIMARY KEY index */ + int truePk = 0; /* PRAGMA table_info identifies the PK to use */ + int nPK = 0; /* Number of PRIMARY KEY columns */ + int i, j; /* Loop counters */ + + if (g.bSchemaPK == 0) + { + /* Normal case: Figure out what the true primary key is for the table. + ** * For WITHOUT ROWID tables, the true primary key is the same as + ** the schema PRIMARY KEY, which is guaranteed to be present. + ** * For rowid tables with an INTEGER PRIMARY KEY, the true primary + ** key is the INTEGER PRIMARY KEY. + ** * For all other rowid tables, the rowid is the true primary key. + */ + pStmt = db_prepare("PRAGMA %s.index_list=%Q", zDb, zTab); + while (SQLITE_ROW == sqlite3_step(pStmt)) + { + if (sqlite3_stricmp((const char *)sqlite3_column_text(pStmt, 3), "pk") == 0) + { + zPkIdxName = sqlite3_mprintf("%s", sqlite3_column_text(pStmt, 1)); + break; + } + } + sqlite3_finalize(pStmt); + if (zPkIdxName) + { + int nKey = 0; + int nCol = 0; + truePk = 0; + pStmt = db_prepare("PRAGMA %s.index_xinfo=%Q", zDb, zPkIdxName); + while (SQLITE_ROW == sqlite3_step(pStmt)) + { + nCol++; + if (sqlite3_column_int(pStmt, 5)) + { + nKey++; + continue; + } + if (sqlite3_column_int(pStmt, 1) >= 0) + truePk = 1; + } + if (nCol == nKey) + truePk = 1; + if (truePk) + { + nPK = nKey; + } + else + { + nPK = 1; + } + sqlite3_finalize(pStmt); + sqlite3_free(zPkIdxName); + } + else + { + truePk = 1; + nPK = 1; + } + pStmt = db_prepare("PRAGMA %s.table_info=%Q", zDb, zTab); + } + else + { + /* The g.bSchemaPK==1 case: Use whatever primary key is declared + ** in the schema. The "rowid" will still be used as the primary key + ** if the table definition does not contain a PRIMARY KEY. + */ + nPK = 0; + pStmt = db_prepare("PRAGMA %s.table_info=%Q", zDb, zTab); + while (SQLITE_ROW == sqlite3_step(pStmt)) + { + if (sqlite3_column_int(pStmt, 5) > 0) + nPK++; + } + sqlite3_reset(pStmt); + if (nPK == 0) + nPK = 1; + truePk = 1; + } + if (g.bSchemaCompare) + { + assert(sqlite3_stricmp(zTab, "sqlite_schema") == 0 || sqlite3_stricmp(zTab, "sqlite_master") == 0); + /* For sqlite_schema, will use type and name as the PK. */ + nPK = 2; + truePk = 0; + } + *pnPKey = nPK; + naz = nPK; + az = sqlite3_malloc(sizeof(char *) * (nPK + 1)); + if (az == 0) + runtimeError("out of memory"); + memset(az, 0, sizeof(char *) * (nPK + 1)); + if (g.bSchemaCompare) + { + az[0] = sqlite3_mprintf("%s", "type"); + az[1] = sqlite3_mprintf("%s", "name"); + } + while (SQLITE_ROW == sqlite3_step(pStmt)) + { + char *sid = safeId((char *)sqlite3_column_text(pStmt, 1)); + int iPKey; + if (truePk && (iPKey = sqlite3_column_int(pStmt, 5)) > 0) + { + az[iPKey - 1] = sid; + } + else + { + if (!g.bSchemaCompare || !(strcmp(sid, "rootpage") == 0 || strcmp(sid, "name") == 0 || strcmp(sid, "type") == 0)) + { + az = sqlite3_realloc(az, sizeof(char *) * (naz + 2)); + if (az == 0) + runtimeError("out of memory"); + az[naz++] = sid; + } + } + } + sqlite3_finalize(pStmt); + if (az) + az[naz] = 0; + + /* If it is non-NULL, set *pbRowid to indicate whether or not the PK of + ** this table is an implicit rowid (*pbRowid==1) or not (*pbRowid==0). */ + if (pbRowid) + *pbRowid = (az[0] == 0); + + /* If this table has an implicit rowid for a PK, figure out how to refer + ** to it. There are usually three options - "rowid", "_rowid_" and "oid". + ** Any of these will work, unless the table has an explicit column of the + ** same name or the sqlite_schema tables are to be compared. In the latter + ** case, pretend that the "true" primary key is the name column, which + ** avoids extraneous diffs against the schemas due to rowid variance. */ + if (az[0] == 0) + { + const char *azRowid[] = {"rowid", "_rowid_", "oid"}; + for (i = 0; i < sizeof(azRowid) / sizeof(azRowid[0]); i++) + { + for (j = 1; j < naz; j++) + { + if (sqlite3_stricmp(az[j], azRowid[i]) == 0) + break; + } + if (j >= naz) + { + az[0] = sqlite3_mprintf("%s", azRowid[i]); + break; + } + } + if (az[0] == 0) + { + for (i = 1; i < naz; i++) + sqlite3_free(az[i]); + sqlite3_free(az); + az = 0; + } + } + return az; +} + +/* +** Print the sqlite3_value X as an SQL literal. +*/ +static void printQuoted(FILE *out, sqlite3_value *X) +{ + switch (sqlite3_value_type(X)) + { + case SQLITE_FLOAT: + { + double r1; + char zBuf[50]; + r1 = sqlite3_value_double(X); + sqlite3_snprintf(sizeof(zBuf), zBuf, "%!.15g", r1); + sqlite3_fprintf(out, "%s", zBuf); + break; + } + case SQLITE_INTEGER: + { + sqlite3_fprintf(out, "%lld", sqlite3_value_int64(X)); + break; + } + case SQLITE_BLOB: + { + const unsigned char *zBlob = sqlite3_value_blob(X); + int nBlob = sqlite3_value_bytes(X); + if (zBlob) + { + int i; + sqlite3_fprintf(out, "x'"); + for (i = 0; i < nBlob; i++) + { + sqlite3_fprintf(out, "%02x", zBlob[i]); + } + sqlite3_fprintf(out, "'"); + } + else + { + /* Could be an OOM, could be a zero-byte blob */ + sqlite3_fprintf(out, "X''"); + } + break; + } + case SQLITE_TEXT: + { + const unsigned char *zArg = sqlite3_value_text(X); + + if (zArg == 0) + { + sqlite3_fprintf(out, "NULL"); + } + else + { + int inctl = 0; + int i, j; + sqlite3_fprintf(out, "'"); + for (i = j = 0; zArg[i]; i++) + { + char c = zArg[i]; + int ctl = iscntrl((unsigned char)c); + if (ctl > inctl) + { + inctl = ctl; + sqlite3_fprintf(out, "%.*s'||X'%02x", i - j, &zArg[j], c); + j = i + 1; + } + else if (ctl) + { + sqlite3_fprintf(out, "%02x", c); + j = i + 1; + } + else + { + if (inctl) + { + inctl = 0; + sqlite3_fprintf(out, "'\n||'"); + } + if (c == '\'') + { + sqlite3_fprintf(out, "%.*s'", i - j + 1, &zArg[j]); + j = i + 1; + } + } + } + sqlite3_fprintf(out, "%s'", &zArg[j]); + } + break; + } + case SQLITE_NULL: + { + sqlite3_fprintf(out, "NULL"); + break; + } + } +} + +/* +** Output SQL that will recreate the aux.zTab table. +*/ +static void dump_table(const char *zTab, FILE *out) +{ + char *zId = safeId(zTab); /* Name of the table */ + char **az = 0; /* List of columns */ + int nPk; /* Number of true primary key columns */ + int nCol; /* Number of data columns */ + int i; /* Loop counter */ + sqlite3_stmt *pStmt; /* SQL statement */ + const char *zSep; /* Separator string */ + sqlite3_str *pIns; /* Beginning of the INSERT statement */ + + pStmt = db_prepare("SELECT sql FROM aux.sqlite_schema WHERE name=%Q", zTab); + if (SQLITE_ROW == sqlite3_step(pStmt)) + { + sqlite3_fprintf(out, "%s;\n", sqlite3_column_text(pStmt, 0)); + } + sqlite3_finalize(pStmt); + if (!g.bSchemaOnly) + { + az = columnNames("aux", zTab, &nPk, 0); + pIns = sqlite3_str_new(0); + if (az == 0) + { + pStmt = db_prepare("SELECT * FROM aux.%s", zId); + sqlite3_str_appendf(pIns, "INSERT INTO %s VALUES", zId); + } + else + { + sqlite3_str *pSql = sqlite3_str_new(0); + zSep = "SELECT"; + for (i = 0; az[i]; i++) + { + sqlite3_str_appendf(pSql, "%s %s", zSep, az[i]); + zSep = ","; + } + sqlite3_str_appendf(pSql, " FROM aux.%s", zId); + zSep = " ORDER BY"; + for (i = 1; i <= nPk; i++) + { + sqlite3_str_appendf(pSql, "%s %d", zSep, i); + zSep = ","; + } + pStmt = db_prepare("%s", sqlite3_str_value(pSql)); + strFree(pSql); + sqlite3_str_appendf(pIns, "INSERT INTO %s", zId); + zSep = "("; + for (i = 0; az[i]; i++) + { + sqlite3_str_appendf(pIns, "%s%s", zSep, az[i]); + zSep = ","; + } + sqlite3_str_appendf(pIns, ") VALUES"); + namelistFree(az); + } + nCol = sqlite3_column_count(pStmt); + while (SQLITE_ROW == sqlite3_step(pStmt)) + { + sqlite3_fprintf(out, "%s", sqlite3_str_value(pIns)); + zSep = "("; + for (i = 0; i < nCol; i++) + { + sqlite3_fprintf(out, "%s", zSep); + printQuoted(out, sqlite3_column_value(pStmt, i)); + zSep = ","; + } + sqlite3_fprintf(out, ");\n"); + } + sqlite3_finalize(pStmt); + strFree(pIns); + } /* endif !g.bSchemaOnly */ + pStmt = db_prepare("SELECT sql FROM aux.sqlite_schema" + " WHERE type='index' AND tbl_name=%Q AND sql IS NOT NULL", + zTab); + while (SQLITE_ROW == sqlite3_step(pStmt)) + { + sqlite3_fprintf(out, "%s;\n", sqlite3_column_text(pStmt, 0)); + } + sqlite3_finalize(pStmt); + sqlite3_free(zId); +} + +/* +** Compute all differences for a single table, except if the +** table name is sqlite_schema, ignore the rootpage column. +*/ +static void diff_one_table(const char *zTab, FILE *out) +{ + char *zId = safeId(zTab); /* Name of table (translated for us in SQL) */ + char **az = 0; /* Columns in main */ + char **az2 = 0; /* Columns in aux */ + int nPk; /* Primary key columns in main */ + int nPk2; /* Primary key columns in aux */ + int n = 0; /* Number of columns in main */ + int n2; /* Number of columns in aux */ + int nQ; /* Number of output columns in the diff query */ + int i; /* Loop counter */ + const char *zSep; /* Separator string */ + sqlite3_str *pSql; /* Comparison query */ + sqlite3_stmt *pStmt; /* Query statement to do the diff */ + const char *zLead = /* Becomes line-comment for sqlite_schema */ + (g.bSchemaCompare) ? "-- " : ""; + + pSql = sqlite3_str_new(0); + if (g.fDebug == DEBUG_COLUMN_NAMES) + { + /* Simply run columnNames() on all tables of the origin + ** database and show the results. This is used for testing + ** and debugging of the columnNames() function. + */ + az = columnNames("aux", zTab, &nPk, 0); + if (az == 0) + { + sqlite3_fprintf(stdout, "Rowid not accessible for %s\n", zId); + } + else + { + sqlite3_fprintf(stdout, "%s:", zId); + for (i = 0; az[i]; i++) + { + sqlite3_fprintf(stdout, " %s", az[i]); + if (i + 1 == nPk) + sqlite3_fprintf(stdout, " *"); + } + sqlite3_fprintf(stdout, "\n"); + } + goto end_diff_one_table; + } + + if (sqlite3_table_column_metadata(g.db, "aux", zTab, 0, 0, 0, 0, 0, 0)) + { + if (!sqlite3_table_column_metadata(g.db, "main", zTab, 0, 0, 0, 0, 0, 0)) + { + /* Table missing from second database. */ + if (g.bSchemaCompare) + sqlite3_fprintf(out, "-- 2nd DB has no %s table\n", zTab); + else + sqlite3_fprintf(out, "DROP TABLE %s;\n", zId); + } + goto end_diff_one_table; + } + + if (sqlite3_table_column_metadata(g.db, "main", zTab, 0, 0, 0, 0, 0, 0)) + { + /* Table missing from source */ + if (g.bSchemaCompare) + { + sqlite3_fprintf(out, "-- 1st DB has no %s table\n", zTab); + } + else + { + dump_table(zTab, out); + } + goto end_diff_one_table; + } + + az = columnNames("main", zTab, &nPk, 0); + az2 = columnNames("aux", zTab, &nPk2, 0); + if (az && az2) + { + for (n = 0; az[n] && az2[n]; n++) + { + if (sqlite3_stricmp(az[n], az2[n]) != 0) + break; + } + } + if (az == 0 || az2 == 0 || nPk != nPk2 || az[n]) + { + /* Schema mismatch */ + sqlite3_fprintf(out, "%sDROP TABLE %s; -- due to schema mismatch\n", zLead, zId); + dump_table(zTab, out); + goto end_diff_one_table; + } + + /* Build the comparison query */ + for (n2 = n; az2[n2]; n2++) + { + char *zNTab = safeId(az2[n2]); + sqlite3_fprintf(out, "ALTER TABLE %s ADD COLUMN %s;\n", zId, zNTab); + sqlite3_free(zNTab); + } + nQ = nPk2 + 1 + 2 * (n2 - nPk2); + if (n2 > nPk2) + { + zSep = "SELECT "; + for (i = 0; i < nPk; i++) + { + sqlite3_str_appendf(pSql, "%sB.%s", zSep, az[i]); + zSep = ", "; + } + sqlite3_str_appendf(pSql, ", 1 /* changed row */"); + while (az[i]) + { + sqlite3_str_appendf(pSql, ", A.%s IS NOT B.%s, B.%s", + az[i], az2[i], az2[i]); + i++; + } + while (az2[i]) + { + sqlite3_str_appendf(pSql, ", B.%s IS NOT NULL, B.%s", + az2[i], az2[i]); + i++; + } + sqlite3_str_appendf(pSql, "\n FROM main.%s A, aux.%s B\n", zId, zId); + zSep = " WHERE"; + for (i = 0; i < nPk; i++) + { + sqlite3_str_appendf(pSql, "%s A.%s=B.%s", zSep, az[i], az[i]); + zSep = " AND"; + } + zSep = "\n AND ("; + while (az[i]) + { + sqlite3_str_appendf(pSql, "%sA.%s IS NOT B.%s%s\n", + zSep, az[i], az2[i], az2[i + 1] == 0 ? ")" : ""); + zSep = " OR "; + i++; + } + while (az2[i]) + { + sqlite3_str_appendf(pSql, "%sB.%s IS NOT NULL%s\n", + zSep, az2[i], az2[i + 1] == 0 ? ")" : ""); + zSep = " OR "; + i++; + } + sqlite3_str_appendf(pSql, " UNION ALL\n"); + } + zSep = "SELECT "; + for (i = 0; i < nPk; i++) + { + sqlite3_str_appendf(pSql, "%sA.%s", zSep, az[i]); + zSep = ", "; + } + sqlite3_str_appendf(pSql, ", 2 /* deleted row */"); + while (az2[i]) + { + sqlite3_str_appendf(pSql, ", NULL, NULL"); + i++; + } + sqlite3_str_appendf(pSql, "\n FROM main.%s A\n", zId); + sqlite3_str_appendf(pSql, " WHERE NOT EXISTS(SELECT 1 FROM aux.%s B\n", zId); + zSep = " WHERE"; + for (i = 0; i < nPk; i++) + { + sqlite3_str_appendf(pSql, "%s A.%s=B.%s", zSep, az[i], az[i]); + zSep = " AND"; + } + sqlite3_str_appendf(pSql, ")\n"); + zSep = " UNION ALL\nSELECT "; + for (i = 0; i < nPk; i++) + { + sqlite3_str_appendf(pSql, "%sB.%s", zSep, az[i]); + zSep = ", "; + } + sqlite3_str_appendf(pSql, ", 3 /* inserted row */"); + while (az2[i]) + { + sqlite3_str_appendf(pSql, ", 1, B.%s", az2[i]); + i++; + } + sqlite3_str_appendf(pSql, "\n FROM aux.%s B\n", zId); + sqlite3_str_appendf(pSql, " WHERE NOT EXISTS(SELECT 1 FROM main.%s A\n", zId); + zSep = " WHERE"; + for (i = 0; i < nPk; i++) + { + sqlite3_str_appendf(pSql, "%s A.%s=B.%s", zSep, az[i], az[i]); + zSep = " AND"; + } + sqlite3_str_appendf(pSql, ")\n ORDER BY"); + zSep = " "; + for (i = 1; i <= nPk; i++) + { + sqlite3_str_appendf(pSql, "%s%d", zSep, i); + zSep = ", "; + } + sqlite3_str_appendf(pSql, ";\n"); + + if (g.fDebug & DEBUG_DIFF_SQL) + { + printf("SQL for %s:\n%s\n", zId, sqlite3_str_value(pSql)); + goto end_diff_one_table; + } + + /* Drop indexes that are missing in the destination */ + pStmt = db_prepare( + "SELECT name FROM main.sqlite_schema" + " WHERE type='index' AND tbl_name=%Q" + " AND sql IS NOT NULL" + " AND sql NOT IN (SELECT sql FROM aux.sqlite_schema" + " WHERE type='index' AND tbl_name=%Q" + " AND sql IS NOT NULL)", + zTab, zTab); + while (SQLITE_ROW == sqlite3_step(pStmt)) + { + char *z = safeId((const char *)sqlite3_column_text(pStmt, 0)); + sqlite3_fprintf(out, "DROP INDEX %s;\n", z); + sqlite3_free(z); + } + sqlite3_finalize(pStmt); + + /* Run the query and output differences */ + if (!g.bSchemaOnly) + { + pStmt = db_prepare("%s", sqlite3_str_value(pSql)); + while (SQLITE_ROW == sqlite3_step(pStmt)) + { + int iType = sqlite3_column_int(pStmt, nPk); + if (iType == 1 || iType == 2) + { + if (iType == 1) + { /* Change the content of a row */ + sqlite3_fprintf(out, "%sUPDATE %s", zLead, zId); + zSep = " SET"; + for (i = nPk + 1; i < nQ; i += 2) + { + if (sqlite3_column_int(pStmt, i) == 0) + continue; + sqlite3_fprintf(out, "%s %s=", zSep, az2[(i + nPk - 1) / 2]); + zSep = ","; + printQuoted(out, sqlite3_column_value(pStmt, i + 1)); + } + } + else + { /* Delete a row */ + sqlite3_fprintf(out, "%sDELETE FROM %s", zLead, zId); + } + zSep = " WHERE"; + for (i = 0; i < nPk; i++) + { + sqlite3_fprintf(out, "%s %s=", zSep, az2[i]); + printQuoted(out, sqlite3_column_value(pStmt, i)); + zSep = " AND"; + } + sqlite3_fprintf(out, ";\n"); + } + else + { /* Insert a row */ + sqlite3_fprintf(out, "%sINSERT INTO %s(%s", zLead, zId, az2[0]); + for (i = 1; az2[i]; i++) + sqlite3_fprintf(out, ",%s", az2[i]); + sqlite3_fprintf(out, ") VALUES"); + zSep = "("; + for (i = 0; i < nPk2; i++) + { + sqlite3_fprintf(out, "%s", zSep); + zSep = ","; + printQuoted(out, sqlite3_column_value(pStmt, i)); + } + for (i = nPk2 + 2; i < nQ; i += 2) + { + sqlite3_fprintf(out, ","); + printQuoted(out, sqlite3_column_value(pStmt, i)); + } + sqlite3_fprintf(out, ");\n"); + } + } + sqlite3_finalize(pStmt); + } /* endif !g.bSchemaOnly */ + + /* Create indexes that are missing in the source */ + pStmt = db_prepare( + "SELECT sql FROM aux.sqlite_schema" + " WHERE type='index' AND tbl_name=%Q" + " AND sql IS NOT NULL" + " AND sql NOT IN (SELECT sql FROM main.sqlite_schema" + " WHERE type='index' AND tbl_name=%Q" + " AND sql IS NOT NULL)", + zTab, zTab); + while (SQLITE_ROW == sqlite3_step(pStmt)) + { + sqlite3_fprintf(out, "%s;\n", sqlite3_column_text(pStmt, 0)); + } + sqlite3_finalize(pStmt); + +end_diff_one_table: + strFree(pSql); + sqlite3_free(zId); + namelistFree(az); + namelistFree(az2); + return; +} + +/* +** Check that table zTab exists and has the same schema in both the "main" +** and "aux" databases currently opened by the global db handle. If they +** do not, output an error message on stderr and exit(1). Otherwise, if +** the schemas do match, return control to the caller. +*/ +static void checkSchemasMatch(const char *zTab) +{ + sqlite3_stmt *pStmt = db_prepare( + "SELECT A.sql=B.sql FROM main.sqlite_schema A, aux.sqlite_schema B" + " WHERE A.name=%Q AND B.name=%Q", + zTab, zTab); + if (SQLITE_ROW == sqlite3_step(pStmt)) + { + if (sqlite3_column_int(pStmt, 0) == 0) + { + runtimeError("schema changes for table %s", safeId(zTab)); + } + } + else + { + runtimeError("table %s missing from one or both databases", safeId(zTab)); + } + sqlite3_finalize(pStmt); +} + +/************************************************************************** +** The following code is copied from fossil. It is used to generate the +** fossil delta blobs sometimes used in RBU update records. +*/ + +typedef unsigned short u16; +typedef unsigned int u32; +typedef unsigned char u8; + +/* +** The width of a hash window in bytes. The algorithm only works if this +** is a power of 2. +*/ +#define NHASH 16 + +/* +** The current state of the rolling hash. +** +** z[] holds the values that have been hashed. z[] is a circular buffer. +** z[i] is the first entry and z[(i+NHASH-1)%NHASH] is the last entry of +** the window. +** +** Hash.a is the sum of all elements of hash.z[]. Hash.b is a weighted +** sum. Hash.b is z[i]*NHASH + z[i+1]*(NHASH-1) + ... + z[i+NHASH-1]*1. +** (Each index for z[] should be module NHASH, of course. The %NHASH operator +** is omitted in the prior expression for brevity.) +*/ +typedef struct hash hash; +struct hash +{ + u16 a, b; /* Hash values */ + u16 i; /* Start of the hash window */ + char z[NHASH]; /* The values that have been hashed */ +}; + +/* +** Initialize the rolling hash using the first NHASH characters of z[] +*/ +static void hash_init(hash *pHash, const char *z) +{ + u16 a, b, i; + a = b = 0; + for (i = 0; i < NHASH; i++) + { + a += z[i]; + b += (NHASH - i) * z[i]; + pHash->z[i] = z[i]; + } + pHash->a = a & 0xffff; + pHash->b = b & 0xffff; + pHash->i = 0; +} + +/* +** Advance the rolling hash by a single character "c" +*/ +static void hash_next(hash *pHash, int c) +{ + u16 old = pHash->z[pHash->i]; + pHash->z[pHash->i] = (char)c; + pHash->i = (pHash->i + 1) & (NHASH - 1); + pHash->a = pHash->a - old + (char)c; + pHash->b = pHash->b - NHASH * old + pHash->a; +} + +/* +** Return a 32-bit hash value +*/ +static u32 hash_32bit(hash *pHash) +{ + return (pHash->a & 0xffff) | (((u32)(pHash->b & 0xffff)) << 16); +} + +/* +** Write an base-64 integer into the given buffer. +*/ +static void putInt(unsigned int v, char **pz) +{ + static const char zDigits[] = + "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ_abcdefghijklmnopqrstuvwxyz~"; + /* 123456789 123456789 123456789 123456789 123456789 123456789 123 */ + int i, j; + char zBuf[20]; + if (v == 0) + { + *(*pz)++ = '0'; + return; + } + for (i = 0; v > 0; i++, v >>= 6) + { + zBuf[i] = zDigits[v & 0x3f]; + } + for (j = i - 1; j >= 0; j--) + { + *(*pz)++ = zBuf[j]; + } +} + +/* +** Return the number digits in the base-64 representation of a positive integer +*/ +static int digit_count(int v) +{ + unsigned int i, x; + for (i = 1, x = 64; (unsigned int)v >= x; i++, x <<= 6) + { + } + return i; +} + +/* +** Compute a 32-bit checksum on the N-byte buffer. Return the result. +*/ +static unsigned int checksum(const char *zIn, size_t N) +{ + const unsigned char *z = (const unsigned char *)zIn; + unsigned sum0 = 0; + unsigned sum1 = 0; + unsigned sum2 = 0; + unsigned sum3 = 0; + while (N >= 16) + { + sum0 += ((unsigned)z[0] + z[4] + z[8] + z[12]); + sum1 += ((unsigned)z[1] + z[5] + z[9] + z[13]); + sum2 += ((unsigned)z[2] + z[6] + z[10] + z[14]); + sum3 += ((unsigned)z[3] + z[7] + z[11] + z[15]); + z += 16; + N -= 16; + } + while (N >= 4) + { + sum0 += z[0]; + sum1 += z[1]; + sum2 += z[2]; + sum3 += z[3]; + z += 4; + N -= 4; + } + sum3 += (sum2 << 8) + (sum1 << 16) + (sum0 << 24); + switch (N) + { + case 3: + sum3 += (z[2] << 8); + case 2: + sum3 += (z[1] << 16); + case 1: + sum3 += (z[0] << 24); + default:; + } + return sum3; +} + +/* +** Create a new delta. +** +** The delta is written into a preallocated buffer, zDelta, which +** should be at least 60 bytes longer than the target file, zOut. +** The delta string will be NUL-terminated, but it might also contain +** embedded NUL characters if either the zSrc or zOut files are +** binary. This function returns the length of the delta string +** in bytes, excluding the final NUL terminator character. +** +** Output Format: +** +** The delta begins with a base64 number followed by a newline. This +** number is the number of bytes in the TARGET file. Thus, given a +** delta file z, a program can compute the size of the output file +** simply by reading the first line and decoding the base-64 number +** found there. The delta_output_size() routine does exactly this. +** +** After the initial size number, the delta consists of a series of +** literal text segments and commands to copy from the SOURCE file. +** A copy command looks like this: +** +** NNN@MMM, +** +** where NNN is the number of bytes to be copied and MMM is the offset +** into the source file of the first byte (both base-64). If NNN is 0 +** it means copy the rest of the input file. Literal text is like this: +** +** NNN:TTTTT +** +** where NNN is the number of bytes of text (base-64) and TTTTT is the text. +** +** The last term is of the form +** +** NNN; +** +** In this case, NNN is a 32-bit bigendian checksum of the output file +** that can be used to verify that the delta applied correctly. All +** numbers are in base-64. +** +** Pure text files generate a pure text delta. Binary files generate a +** delta that may contain some binary data. +** +** Algorithm: +** +** The encoder first builds a hash table to help it find matching +** patterns in the source file. 16-byte chunks of the source file +** sampled at evenly spaced intervals are used to populate the hash +** table. +** +** Next we begin scanning the target file using a sliding 16-byte +** window. The hash of the 16-byte window in the target is used to +** search for a matching section in the source file. When a match +** is found, a copy command is added to the delta. An effort is +** made to extend the matching section to regions that come before +** and after the 16-byte hash window. A copy command is only issued +** if the result would use less space that just quoting the text +** literally. Literal text is added to the delta for sections that +** do not match or which can not be encoded efficiently using copy +** commands. +*/ +static int rbuDeltaCreate( + const char *zSrc, /* The source or pattern file */ + unsigned int lenSrc, /* Length of the source file */ + const char *zOut, /* The target file */ + unsigned int lenOut, /* Length of the target file */ + char *zDelta /* Write the delta into this buffer */ +) +{ + unsigned int i, base; + char *zOrigDelta = zDelta; + hash h; + int nHash; /* Number of hash table entries */ + int *landmark; /* Primary hash table */ + int *collide; /* Collision chain */ + int lastRead = -1; /* Last byte of zSrc read by a COPY command */ + + /* Add the target file size to the beginning of the delta + */ + putInt(lenOut, &zDelta); + *(zDelta++) = '\n'; + + /* If the source file is very small, it means that we have no + ** chance of ever doing a copy command. Just output a single + ** literal segment for the entire target and exit. + */ + if (lenSrc <= NHASH) + { + putInt(lenOut, &zDelta); + *(zDelta++) = ':'; + memcpy(zDelta, zOut, lenOut); + zDelta += lenOut; + putInt(checksum(zOut, lenOut), &zDelta); + *(zDelta++) = ';'; + return (int)(zDelta - zOrigDelta); + } + + /* Compute the hash table used to locate matching sections in the + ** source file. + */ + nHash = lenSrc / NHASH; + collide = sqlite3_malloc(nHash * 2 * sizeof(int)); + landmark = &collide[nHash]; + memset(landmark, -1, nHash * sizeof(int)); + memset(collide, -1, nHash * sizeof(int)); + for (i = 0; i < lenSrc - NHASH; i += NHASH) + { + int hv; + hash_init(&h, &zSrc[i]); + hv = hash_32bit(&h) % nHash; + collide[i / NHASH] = landmark[hv]; + landmark[hv] = i / NHASH; + } + + /* Begin scanning the target file and generating copy commands and + ** literal sections of the delta. + */ + base = 0; /* We have already generated everything before zOut[base] */ + while (base + NHASH < lenOut) + { + int iSrc, iBlock; + int bestCnt, bestOfst = 0, bestLitsz = 0; + hash_init(&h, &zOut[base]); + i = 0; /* Trying to match a landmark against zOut[base+i] */ + bestCnt = 0; + while (1) + { + int hv; + int limit = 250; + + hv = hash_32bit(&h) % nHash; + iBlock = landmark[hv]; + while (iBlock >= 0 && (limit--) > 0) + { + /* + ** The hash window has identified a potential match against + ** landmark block iBlock. But we need to investigate further. + ** + ** Look for a region in zOut that matches zSrc. Anchor the search + ** at zSrc[iSrc] and zOut[base+i]. Do not include anything prior to + ** zOut[base] or after zOut[outLen] nor anything after zSrc[srcLen]. + ** + ** Set cnt equal to the length of the match and set ofst so that + ** zSrc[ofst] is the first element of the match. litsz is the number + ** of characters between zOut[base] and the beginning of the match. + ** sz will be the overhead (in bytes) needed to encode the copy + ** command. Only generate copy command if the overhead of the + ** copy command is less than the amount of literal text to be copied. + */ + int cnt, ofst, litsz; + int j, k, x, y; + int sz; + + /* Beginning at iSrc, match forwards as far as we can. j counts + ** the number of characters that match */ + iSrc = iBlock * NHASH; + for ( + j = 0, x = iSrc, y = base + i; + (unsigned int)x < lenSrc && (unsigned int)y < lenOut; + j++, x++, y++) + { + if (zSrc[x] != zOut[y]) + break; + } + j--; + + /* Beginning at iSrc-1, match backwards as far as we can. k counts + ** the number of characters that match */ + for (k = 1; k < iSrc && (unsigned int)k <= i; k++) + { + if (zSrc[iSrc - k] != zOut[base + i - k]) + break; + } + k--; + + /* Compute the offset and size of the matching region */ + ofst = iSrc - k; + cnt = j + k + 1; + litsz = i - k; /* Number of bytes of literal text before the copy */ + /* sz will hold the number of bytes needed to encode the "insert" + ** command and the copy command, not counting the "insert" text */ + sz = digit_count(i - k) + digit_count(cnt) + digit_count(ofst) + 3; + if (cnt >= sz && cnt > bestCnt) + { + /* Remember this match only if it is the best so far and it + ** does not increase the file size */ + bestCnt = cnt; + bestOfst = iSrc - k; + bestLitsz = litsz; + } + + /* Check the next matching block */ + iBlock = collide[iBlock]; + } + + /* We have a copy command that does not cause the delta to be larger + ** than a literal insert. So add the copy command to the delta. + */ + if (bestCnt > 0) + { + if (bestLitsz > 0) + { + /* Add an insert command before the copy */ + putInt(bestLitsz, &zDelta); + *(zDelta++) = ':'; + memcpy(zDelta, &zOut[base], bestLitsz); + zDelta += bestLitsz; + base += bestLitsz; + } + base += bestCnt; + putInt(bestCnt, &zDelta); + *(zDelta++) = '@'; + putInt(bestOfst, &zDelta); + *(zDelta++) = ','; + if (bestOfst + bestCnt - 1 > lastRead) + { + lastRead = bestOfst + bestCnt - 1; + } + bestCnt = 0; + break; + } + + /* If we reach this point, it means no match is found so far */ + if (base + i + NHASH >= lenOut) + { + /* We have reached the end of the file and have not found any + ** matches. Do an "insert" for everything that does not match */ + putInt(lenOut - base, &zDelta); + *(zDelta++) = ':'; + memcpy(zDelta, &zOut[base], lenOut - base); + zDelta += lenOut - base; + base = lenOut; + break; + } + + /* Advance the hash by one character. Keep looking for a match */ + hash_next(&h, zOut[base + i + NHASH]); + i++; + } + } + /* Output a final "insert" record to get all the text at the end of + ** the file that does not match anything in the source file. + */ + if (base < lenOut) + { + putInt(lenOut - base, &zDelta); + *(zDelta++) = ':'; + memcpy(zDelta, &zOut[base], lenOut - base); + zDelta += lenOut - base; + } + /* Output the final checksum record. */ + putInt(checksum(zOut, lenOut), &zDelta); + *(zDelta++) = ';'; + sqlite3_free(collide); + return (int)(zDelta - zOrigDelta); +} + +/* +** End of code copied from fossil. +**************************************************************************/ + +static void strPrintfArray( + sqlite3_str *pStr, /* String object to append to */ + const char *zSep, /* Separator string */ + const char *zFmt, /* Format for each entry */ + char **az, int n /* Array of strings & its size (or -1) */ +) +{ + int i; + for (i = 0; az[i] && (i < n || n < 0); i++) + { + if (i != 0) + sqlite3_str_appendf(pStr, "%s", zSep); + sqlite3_str_appendf(pStr, zFmt, az[i], az[i], az[i]); + } +} + +static void getRbudiffQuery( + const char *zTab, + char **azCol, + int nPK, + int bOtaRowid, + sqlite3_str *pSql) +{ + int i; + + /* First the newly inserted rows: **/ + sqlite3_str_appendf(pSql, "SELECT "); + strPrintfArray(pSql, ", ", "%s", azCol, -1); + sqlite3_str_appendf(pSql, ", 0, "); /* Set ota_control to 0 for an insert */ + strPrintfArray(pSql, ", ", "NULL", azCol, -1); + sqlite3_str_appendf(pSql, " FROM aux.%Q AS n WHERE NOT EXISTS (\n", zTab); + sqlite3_str_appendf(pSql, " SELECT 1 FROM ", zTab); + sqlite3_str_appendf(pSql, " main.%Q AS o WHERE ", zTab); + strPrintfArray(pSql, " AND ", "(n.%Q = o.%Q)", azCol, nPK); + sqlite3_str_appendf(pSql, "\n) AND "); + strPrintfArray(pSql, " AND ", "(n.%Q IS NOT NULL)", azCol, nPK); + + /* Deleted rows: */ + sqlite3_str_appendf(pSql, "\nUNION ALL\nSELECT "); + strPrintfArray(pSql, ", ", "%s", azCol, nPK); + if (azCol[nPK]) + { + sqlite3_str_appendf(pSql, ", "); + strPrintfArray(pSql, ", ", "NULL", &azCol[nPK], -1); + } + sqlite3_str_appendf(pSql, ", 1, "); /* Set ota_control to 1 for a delete */ + strPrintfArray(pSql, ", ", "NULL", azCol, -1); + sqlite3_str_appendf(pSql, " FROM main.%Q AS n WHERE NOT EXISTS (\n", zTab); + sqlite3_str_appendf(pSql, " SELECT 1 FROM ", zTab); + sqlite3_str_appendf(pSql, " aux.%Q AS o WHERE ", zTab); + strPrintfArray(pSql, " AND ", "(n.%Q = o.%Q)", azCol, nPK); + sqlite3_str_appendf(pSql, "\n) AND "); + strPrintfArray(pSql, " AND ", "(n.%Q IS NOT NULL)", azCol, nPK); + + /* Updated rows. If all table columns are part of the primary key, there + ** can be no updates. In this case this part of the compound SELECT can + ** be omitted altogether. */ + if (azCol[nPK]) + { + sqlite3_str_appendf(pSql, "\nUNION ALL\nSELECT "); + strPrintfArray(pSql, ", ", "n.%s", azCol, nPK); + sqlite3_str_appendf(pSql, ",\n"); + strPrintfArray(pSql, " ,\n", + " CASE WHEN n.%s IS o.%s THEN NULL ELSE n.%s END", &azCol[nPK], -1); + + if (bOtaRowid == 0) + { + sqlite3_str_appendf(pSql, ", '"); + strPrintfArray(pSql, "", ".", azCol, nPK); + sqlite3_str_appendf(pSql, "' ||\n"); + } + else + { + sqlite3_str_appendf(pSql, ",\n"); + } + strPrintfArray(pSql, " ||\n", + " CASE WHEN n.%s IS o.%s THEN '.' ELSE 'x' END", &azCol[nPK], -1); + sqlite3_str_appendf(pSql, "\nAS ota_control, "); + strPrintfArray(pSql, ", ", "NULL", azCol, nPK); + sqlite3_str_appendf(pSql, ",\n"); + strPrintfArray(pSql, " ,\n", + " CASE WHEN n.%s IS o.%s THEN NULL ELSE o.%s END", &azCol[nPK], -1); + + sqlite3_str_appendf(pSql, "\nFROM main.%Q AS o, aux.%Q AS n\nWHERE ", + zTab, zTab); + strPrintfArray(pSql, " AND ", "(n.%Q = o.%Q)", azCol, nPK); + sqlite3_str_appendf(pSql, " AND ota_control LIKE '%%x%%'"); + } + + /* Now add an ORDER BY clause to sort everything by PK. */ + sqlite3_str_appendf(pSql, "\nORDER BY "); + for (i = 1; i <= nPK; i++) + sqlite3_str_appendf(pSql, "%s%d", ((i > 1) ? ", " : ""), i); +} + +static void rbudiff_one_table(const char *zTab, FILE *out) +{ + int bOtaRowid; /* True to use an ota_rowid column */ + int nPK; /* Number of primary key columns in table */ + char **azCol; /* NULL terminated array of col names */ + int i; + int nCol; + sqlite3_str *pCt; /* The "CREATE TABLE data_xxx" statement */ + sqlite3_str *pSql; /* Query to find differences */ + sqlite3_str *pInsert; /* First part of output INSERT statement */ + sqlite3_stmt *pStmt = 0; + int nRow = 0; /* Total rows in data_xxx table */ + + /* --rbu mode must use real primary keys. */ + g.bSchemaPK = 1; + pCt = sqlite3_str_new(0); + pSql = sqlite3_str_new(0); + pInsert = sqlite3_str_new(0); + + /* Check that the schemas of the two tables match. Exit early otherwise. */ + checkSchemasMatch(zTab); + + /* Grab the column names and PK details for the table(s). If no usable PK + ** columns are found, bail out early. */ + azCol = columnNames("main", zTab, &nPK, &bOtaRowid); + if (azCol == 0) + { + runtimeError("table %s has no usable PK columns", zTab); + } + for (nCol = 0; azCol[nCol]; nCol++) + ; + + /* Build and output the CREATE TABLE statement for the data_xxx table */ + sqlite3_str_appendf(pCt, "CREATE TABLE IF NOT EXISTS 'data_%q'(", zTab); + if (bOtaRowid) + sqlite3_str_appendf(pCt, "rbu_rowid, "); + strPrintfArray(pCt, ", ", "%s", &azCol[bOtaRowid], -1); + sqlite3_str_appendf(pCt, ", rbu_control);"); + + /* Get the SQL for the query to retrieve data from the two databases */ + getRbudiffQuery(zTab, azCol, nPK, bOtaRowid, pSql); + + /* Build the first part of the INSERT statement output for each row + ** in the data_xxx table. */ + sqlite3_str_appendf(pInsert, "INSERT INTO 'data_%q' (", zTab); + if (bOtaRowid) + sqlite3_str_appendf(pInsert, "rbu_rowid, "); + strPrintfArray(pInsert, ", ", "%s", &azCol[bOtaRowid], -1); + sqlite3_str_appendf(pInsert, ", rbu_control) VALUES("); + + pStmt = db_prepare("%s", sqlite3_str_value(pSql)); + + while (sqlite3_step(pStmt) == SQLITE_ROW) + { + + /* If this is the first row output, print out the CREATE TABLE + ** statement first. And reset pCt so that it will not be + ** printed again. */ + if (sqlite3_str_length(pCt)) + { + sqlite3_fprintf(out, "%s\n", sqlite3_str_value(pCt)); + sqlite3_str_reset(pCt); + } + + /* Output the first part of the INSERT statement */ + sqlite3_fprintf(out, "%s", sqlite3_str_value(pInsert)); + nRow++; + + if (sqlite3_column_type(pStmt, nCol) == SQLITE_INTEGER) + { + for (i = 0; i <= nCol; i++) + { + if (i > 0) + sqlite3_fprintf(out, ", "); + printQuoted(out, sqlite3_column_value(pStmt, i)); + } + } + else + { + char *zOtaControl; + int nOtaControl = sqlite3_column_bytes(pStmt, nCol); + + zOtaControl = (char *)sqlite3_malloc(nOtaControl + 1); + memcpy(zOtaControl, sqlite3_column_text(pStmt, nCol), nOtaControl + 1); + + for (i = 0; i < nCol; i++) + { + int bDone = 0; + if (i >= nPK && sqlite3_column_type(pStmt, i) == SQLITE_BLOB && sqlite3_column_type(pStmt, nCol + 1 + i) == SQLITE_BLOB) + { + const char *aSrc = sqlite3_column_blob(pStmt, nCol + 1 + i); + int nSrc = sqlite3_column_bytes(pStmt, nCol + 1 + i); + const char *aFinal = sqlite3_column_blob(pStmt, i); + int nFinal = sqlite3_column_bytes(pStmt, i); + char *aDelta; + int nDelta; + + aDelta = sqlite3_malloc(nFinal + 60); + nDelta = rbuDeltaCreate(aSrc, nSrc, aFinal, nFinal, aDelta); + if (nDelta < nFinal) + { + int j; + sqlite3_fprintf(out, "x'"); + for (j = 0; j < nDelta; j++) + sqlite3_fprintf(out, "%02x", (u8)aDelta[j]); + sqlite3_fprintf(out, "'"); + zOtaControl[i - bOtaRowid] = 'f'; + bDone = 1; + } + sqlite3_free(aDelta); + } + + if (bDone == 0) + { + printQuoted(out, sqlite3_column_value(pStmt, i)); + } + sqlite3_fprintf(out, ", "); + } + sqlite3_fprintf(out, "'%s'", zOtaControl); + sqlite3_free(zOtaControl); + } + + /* And the closing bracket of the insert statement */ + sqlite3_fprintf(out, ");\n"); + } + + sqlite3_finalize(pStmt); + if (nRow > 0) + { + sqlite3_str *pCnt = sqlite3_str_new(0); + sqlite3_str_appendf(pCnt, + "INSERT INTO rbu_count VALUES('data_%q', %d);", zTab, nRow); + sqlite3_fprintf(out, "%s\n", sqlite3_str_value(pCnt)); + strFree(pCnt); + } + + strFree(pCt); + strFree(pSql); + strFree(pInsert); +} + +/* +** Display a summary of differences between two versions of the same +** table table. +** +** * Number of rows changed +** * Number of rows added +** * Number of rows deleted +** * Number of identical rows +*/ +static void summarize_one_table(const char *zTab, FILE *out) +{ + char *zId = safeId(zTab); /* Name of table (translated for us in SQL) */ + char **az = 0; /* Columns in main */ + char **az2 = 0; /* Columns in aux */ + int nPk; /* Primary key columns in main */ + int nPk2; /* Primary key columns in aux */ + int n = 0; /* Number of columns in main */ + int n2; /* Number of columns in aux */ + int i; /* Loop counter */ + const char *zSep; /* Separator string */ + sqlite3_str *pSql; /* Comparison query */ + sqlite3_stmt *pStmt; /* Query statement to do the diff */ + sqlite3_int64 nUpdate; /* Number of updated rows */ + sqlite3_int64 nUnchanged; /* Number of unmodified rows */ + sqlite3_int64 nDelete; /* Number of deleted rows */ + sqlite3_int64 nInsert; /* Number of inserted rows */ + + pSql = sqlite3_str_new(0); + if (sqlite3_table_column_metadata(g.db, "aux", zTab, 0, 0, 0, 0, 0, 0)) + { + if (!sqlite3_table_column_metadata(g.db, "main", zTab, 0, 0, 0, 0, 0, 0)) + { + /* Table missing from second database. */ + sqlite3_fprintf(out, "%s: missing from second database\n", zTab); + } + goto end_summarize_one_table; + } + + if (sqlite3_table_column_metadata(g.db, "main", zTab, 0, 0, 0, 0, 0, 0)) + { + /* Table missing from source */ + sqlite3_fprintf(out, "%s: missing from first database\n", zTab); + goto end_summarize_one_table; + } + + az = columnNames("main", zTab, &nPk, 0); + az2 = columnNames("aux", zTab, &nPk2, 0); + if (az && az2) + { + for (n = 0; az[n]; n++) + { + if (sqlite3_stricmp(az[n], az2[n]) != 0) + break; + } + } + if (az == 0 || az2 == 0 || nPk != nPk2 || az[n]) + { + /* Schema mismatch */ + sqlite3_fprintf(out, "%s: incompatible schema\n", zTab); + goto end_summarize_one_table; + } + + /* Build the comparison query */ + for (n2 = n; az[n2]; n2++) + { + } + sqlite3_str_appendf(pSql, "SELECT 1, count(*)"); + if (n2 == nPk2) + { + sqlite3_str_appendf(pSql, ", 0\n"); + } + else + { + zSep = ", sum("; + for (i = nPk; az[i]; i++) + { + sqlite3_str_appendf(pSql, "%sA.%s IS NOT B.%s", zSep, az[i], az[i]); + zSep = " OR "; + } + sqlite3_str_appendf(pSql, ")\n"); + } + sqlite3_str_appendf(pSql, " FROM main.%s A, aux.%s B\n", zId, zId); + zSep = " WHERE"; + for (i = 0; i < nPk; i++) + { + sqlite3_str_appendf(pSql, "%s A.%s=B.%s", zSep, az[i], az[i]); + zSep = " AND"; + } + sqlite3_str_appendf(pSql, " UNION ALL\n"); + sqlite3_str_appendf(pSql, "SELECT 2, count(*), 0\n"); + sqlite3_str_appendf(pSql, " FROM main.%s A\n", zId); + sqlite3_str_appendf(pSql, " WHERE NOT EXISTS(SELECT 1 FROM aux.%s B ", zId); + zSep = "WHERE"; + for (i = 0; i < nPk; i++) + { + sqlite3_str_appendf(pSql, "%s A.%s=B.%s", zSep, az[i], az[i]); + zSep = " AND"; + } + sqlite3_str_appendf(pSql, ")\n"); + sqlite3_str_appendf(pSql, " UNION ALL\n"); + sqlite3_str_appendf(pSql, "SELECT 3, count(*), 0\n"); + sqlite3_str_appendf(pSql, " FROM aux.%s B\n", zId); + sqlite3_str_appendf(pSql, " WHERE NOT EXISTS(SELECT 1 FROM main.%s A ", zId); + zSep = "WHERE"; + for (i = 0; i < nPk; i++) + { + sqlite3_str_appendf(pSql, "%s A.%s=B.%s", zSep, az[i], az[i]); + zSep = " AND"; + } + sqlite3_str_appendf(pSql, ")\n ORDER BY 1;\n"); + + if ((g.fDebug & DEBUG_DIFF_SQL) != 0) + { + sqlite3_fprintf(stdout, "SQL for %s:\n%s\n", zId, sqlite3_str_value(pSql)); + goto end_summarize_one_table; + } + + /* Run the query and output difference summary */ + pStmt = db_prepare("%s", sqlite3_str_value(pSql)); + nUpdate = 0; + nInsert = 0; + nDelete = 0; + nUnchanged = 0; + while (SQLITE_ROW == sqlite3_step(pStmt)) + { + switch (sqlite3_column_int(pStmt, 0)) + { + case 1: + nUpdate = sqlite3_column_int64(pStmt, 2); + nUnchanged = sqlite3_column_int64(pStmt, 1) - nUpdate; + break; + case 2: + nDelete = sqlite3_column_int64(pStmt, 1); + break; + case 3: + nInsert = sqlite3_column_int64(pStmt, 1); + break; + } + } + sqlite3_finalize(pStmt); + sqlite3_fprintf(out, + "%s: %lld changes, %lld inserts, %lld deletes, %lld unchanged\n", + zTab, nUpdate, nInsert, nDelete, nUnchanged); + +end_summarize_one_table: + strFree(pSql); + sqlite3_free(zId); + namelistFree(az); + namelistFree(az2); + return; +} + +/* +** Write a 64-bit signed integer as a varint onto out +*/ +static void putsVarint(FILE *out, sqlite3_uint64 v) +{ + int i, n; + unsigned char p[12]; + if (v & (((sqlite3_uint64)0xff000000) << 32)) + { + p[8] = (unsigned char)v; + v >>= 8; + for (i = 7; i >= 0; i--) + { + p[i] = (unsigned char)((v & 0x7f) | 0x80); + v >>= 7; + } + fwrite(p, 8, 1, out); + } + else + { + n = 9; + do + { + p[n--] = (unsigned char)((v & 0x7f) | 0x80); + v >>= 7; + } while (v != 0); + p[9] &= 0x7f; + fwrite(p + n + 1, 9 - n, 1, out); + } +} + +/* +** Write an SQLite value onto out. +*/ +static void putValue(FILE *out, sqlite3_stmt *pStmt, int k) +{ + int iDType = sqlite3_column_type(pStmt, k); + sqlite3_int64 iX; + double rX; + sqlite3_uint64 uX; + int j; + + putc(iDType, out); + switch (iDType) + { + case SQLITE_INTEGER: + iX = sqlite3_column_int64(pStmt, k); + memcpy(&uX, &iX, 8); + for (j = 56; j >= 0; j -= 8) + putc((uX >> j) & 0xff, out); + break; + case SQLITE_FLOAT: + rX = sqlite3_column_double(pStmt, k); + memcpy(&uX, &rX, 8); + for (j = 56; j >= 0; j -= 8) + putc((uX >> j) & 0xff, out); + break; + case SQLITE_TEXT: + iX = sqlite3_column_bytes(pStmt, k); + putsVarint(out, (sqlite3_uint64)iX); + fwrite(sqlite3_column_text(pStmt, k), 1, (size_t)iX, out); + break; + case SQLITE_BLOB: + iX = sqlite3_column_bytes(pStmt, k); + putsVarint(out, (sqlite3_uint64)iX); + fwrite(sqlite3_column_blob(pStmt, k), 1, (size_t)iX, out); + break; + case SQLITE_NULL: + break; + } +} + +/* +** Generate a CHANGESET for all differences from main.zTab to aux.zTab. +*/ +static void changeset_one_table(const char *zTab, FILE *out) +{ + sqlite3_stmt *pStmt; /* SQL statment */ + char *zId = safeId(zTab); /* Escaped name of the table */ + char **azCol = 0; /* List of escaped column names */ + int nCol = 0; /* Number of columns */ + int *aiFlg = 0; /* 0 if column is not part of PK */ + int *aiPk = 0; /* Column numbers for each PK column */ + int nPk = 0; /* Number of PRIMARY KEY columns */ + sqlite3_str *pSql; /* SQL for the diff query */ + int i, k; /* Loop counters */ + const char *zSep; /* List separator */ + + /* Check that the schemas of the two tables match. Exit early otherwise. */ + checkSchemasMatch(zTab); + pSql = sqlite3_str_new(0); + + pStmt = db_prepare("PRAGMA main.table_info=%Q", zTab); + while (SQLITE_ROW == sqlite3_step(pStmt)) + { + nCol++; + azCol = sqlite3_realloc(azCol, sizeof(char *) * nCol); + if (azCol == 0) + runtimeError("out of memory"); + aiFlg = sqlite3_realloc(aiFlg, sizeof(int) * nCol); + if (aiFlg == 0) + runtimeError("out of memory"); + azCol[nCol - 1] = safeId((const char *)sqlite3_column_text(pStmt, 1)); + aiFlg[nCol - 1] = i = sqlite3_column_int(pStmt, 5); + if (i > 0) + { + if (i > nPk) + { + nPk = i; + aiPk = sqlite3_realloc(aiPk, sizeof(int) * nPk); + if (aiPk == 0) + runtimeError("out of memory"); + } + aiPk[i - 1] = nCol - 1; + } + } + sqlite3_finalize(pStmt); + if (nPk == 0) + goto end_changeset_one_table; + if (nCol > nPk) + { + sqlite3_str_appendf(pSql, "SELECT %d", SQLITE_UPDATE); + for (i = 0; i < nCol; i++) + { + if (aiFlg[i]) + { + sqlite3_str_appendf(pSql, ",\n A.%s", azCol[i]); + } + else + { + sqlite3_str_appendf(pSql, ",\n A.%s IS NOT B.%s, A.%s, B.%s", + azCol[i], azCol[i], azCol[i], azCol[i]); + } + } + sqlite3_str_appendf(pSql, "\n FROM main.%s A, aux.%s B\n", zId, zId); + zSep = " WHERE"; + for (i = 0; i < nPk; i++) + { + sqlite3_str_appendf(pSql, "%s A.%s=B.%s", + zSep, azCol[aiPk[i]], azCol[aiPk[i]]); + zSep = " AND"; + } + zSep = "\n AND ("; + for (i = 0; i < nCol; i++) + { + if (aiFlg[i]) + continue; + sqlite3_str_appendf(pSql, "%sA.%s IS NOT B.%s", zSep, azCol[i], azCol[i]); + zSep = " OR\n "; + } + sqlite3_str_appendf(pSql, ")\n UNION ALL\n"); + } + sqlite3_str_appendf(pSql, "SELECT %d", SQLITE_DELETE); + for (i = 0; i < nCol; i++) + { + if (aiFlg[i]) + { + sqlite3_str_appendf(pSql, ",\n A.%s", azCol[i]); + } + else + { + sqlite3_str_appendf(pSql, ",\n 1, A.%s, NULL", azCol[i]); + } + } + sqlite3_str_appendf(pSql, "\n FROM main.%s A\n", zId); + sqlite3_str_appendf(pSql, " WHERE NOT EXISTS(SELECT 1 FROM aux.%s B\n", zId); + zSep = " WHERE"; + for (i = 0; i < nPk; i++) + { + sqlite3_str_appendf(pSql, "%s A.%s=B.%s", + zSep, azCol[aiPk[i]], azCol[aiPk[i]]); + zSep = " AND"; + } + sqlite3_str_appendf(pSql, ")\n UNION ALL\n"); + sqlite3_str_appendf(pSql, "SELECT %d", SQLITE_INSERT); + for (i = 0; i < nCol; i++) + { + if (aiFlg[i]) + { + sqlite3_str_appendf(pSql, ",\n B.%s", azCol[i]); + } + else + { + sqlite3_str_appendf(pSql, ",\n 1, NULL, B.%s", azCol[i]); + } + } + sqlite3_str_appendf(pSql, "\n FROM aux.%s B\n", zId); + sqlite3_str_appendf(pSql, " WHERE NOT EXISTS(SELECT 1 FROM main.%s A\n", zId); + zSep = " WHERE"; + for (i = 0; i < nPk; i++) + { + sqlite3_str_appendf(pSql, "%s A.%s=B.%s", + zSep, azCol[aiPk[i]], azCol[aiPk[i]]); + zSep = " AND"; + } + sqlite3_str_appendf(pSql, ")\n"); + sqlite3_str_appendf(pSql, " ORDER BY"); + zSep = " "; + for (i = 0; i < nPk; i++) + { + sqlite3_str_appendf(pSql, "%s %d", zSep, aiPk[i] + 2); + zSep = ","; + } + sqlite3_str_appendf(pSql, ";\n"); + + if (g.fDebug & DEBUG_DIFF_SQL) + { + sqlite3_fprintf(stdout, "SQL for %s:\n%s\n", zId, sqlite3_str_value(pSql)); + goto end_changeset_one_table; + } + + putc('T', out); + putsVarint(out, (sqlite3_uint64)nCol); + for (i = 0; i < nCol; i++) + putc(aiFlg[i], out); + fwrite(zTab, 1, strlen(zTab), out); + putc(0, out); + + pStmt = db_prepare("%s", sqlite3_str_value(pSql)); + while (SQLITE_ROW == sqlite3_step(pStmt)) + { + int iType = sqlite3_column_int(pStmt, 0); + putc(iType, out); + putc(0, out); + switch (sqlite3_column_int(pStmt, 0)) + { + case SQLITE_UPDATE: + { + for (k = 1, i = 0; i < nCol; i++) + { + if (aiFlg[i]) + { + putValue(out, pStmt, k); + k++; + } + else if (sqlite3_column_int(pStmt, k)) + { + putValue(out, pStmt, k + 1); + k += 3; + } + else + { + putc(0, out); + k += 3; + } + } + for (k = 1, i = 0; i < nCol; i++) + { + if (aiFlg[i]) + { + putc(0, out); + k++; + } + else if (sqlite3_column_int(pStmt, k)) + { + putValue(out, pStmt, k + 2); + k += 3; + } + else + { + putc(0, out); + k += 3; + } + } + break; + } + case SQLITE_INSERT: + { + for (k = 1, i = 0; i < nCol; i++) + { + if (aiFlg[i]) + { + putValue(out, pStmt, k); + k++; + } + else + { + putValue(out, pStmt, k + 2); + k += 3; + } + } + break; + } + case SQLITE_DELETE: + { + for (k = 1, i = 0; i < nCol; i++) + { + if (aiFlg[i]) + { + putValue(out, pStmt, k); + k++; + } + else + { + putValue(out, pStmt, k + 1); + k += 3; + } + } + break; + } + } + } + sqlite3_finalize(pStmt); + +end_changeset_one_table: + while (nCol > 0) + sqlite3_free(azCol[--nCol]); + sqlite3_free(azCol); + sqlite3_free(aiPk); + sqlite3_free(zId); + sqlite3_free(aiFlg); + strFree(pSql); +} + +/* +** Return true if the ascii character passed as the only argument is a +** whitespace character. Otherwise return false. +*/ +static int is_whitespace(char x) +{ + return (x == ' ' || x == '\t' || x == '\n' || x == '\r'); +} + +/* +** Extract the next SQL keyword or quoted string from buffer zIn and copy it +** (or a prefix of it if it will not fit) into buffer zBuf, size nBuf bytes. +** Return a pointer to the character within zIn immediately following +** the token or quoted string just extracted. +*/ +static const char *gobble_token(const char *zIn, char *zBuf, int nBuf) +{ + const char *p = zIn; + char *pOut = zBuf; + char *pEnd = &pOut[nBuf - 1]; + char q = 0; /* quote character, if any */ + + if (p == 0) + return 0; + while (is_whitespace(*p)) + p++; + switch (*p) + { + case '"': + q = '"'; + break; + case '\'': + q = '\''; + break; + case '`': + q = '`'; + break; + case '[': + q = ']'; + break; + } + + if (q) + { + p++; + while (*p && pOut < pEnd) + { + if (*p == q) + { + p++; + if (*p != q) + break; + } + if (pOut < pEnd) + *pOut++ = *p; + p++; + } + } + else + { + while (*p && !is_whitespace(*p) && *p != '(') + { + if (pOut < pEnd) + *pOut++ = *p; + p++; + } + } + + *pOut = '\0'; + return p; +} + +/* +** This function is the implementation of SQL scalar function "module_name": +** +** module_name(SQL) +** +** The only argument should be an SQL statement of the type that may appear +** in the sqlite_schema table. If the statement is a "CREATE VIRTUAL TABLE" +** statement, then the value returned is the name of the module that it +** uses. Otherwise, if the statement is not a CVT, NULL is returned. +*/ +static void module_name_func( + sqlite3_context *pCtx, + int nVal, sqlite3_value **apVal) +{ + const char *zSql; + char zToken[32]; + + assert(nVal == 1); + zSql = (const char *)sqlite3_value_text(apVal[0]); + + zSql = gobble_token(zSql, zToken, sizeof(zToken)); + if (zSql == 0 || sqlite3_stricmp(zToken, "create")) + return; + zSql = gobble_token(zSql, zToken, sizeof(zToken)); + if (zSql == 0 || sqlite3_stricmp(zToken, "virtual")) + return; + zSql = gobble_token(zSql, zToken, sizeof(zToken)); + if (zSql == 0 || sqlite3_stricmp(zToken, "table")) + return; + zSql = gobble_token(zSql, zToken, sizeof(zToken)); + if (zSql == 0) + return; + zSql = gobble_token(zSql, zToken, sizeof(zToken)); + if (zSql == 0 || sqlite3_stricmp(zToken, "using")) + return; + zSql = gobble_token(zSql, zToken, sizeof(zToken)); + + sqlite3_result_text(pCtx, zToken, -1, SQLITE_TRANSIENT); +} + +/* +** Return the text of an SQL statement that itself returns the list of +** tables to process within the database. +*/ +const char *all_tables_sql() +{ + if (g.bHandleVtab) + { + int rc; + + rc = sqlite3_exec(g.db, + "CREATE TEMP TABLE tblmap(module COLLATE nocase, postfix);" + "INSERT INTO temp.tblmap VALUES" + "('fts3', '_content'), ('fts3', '_segments'), ('fts3', '_segdir')," + + "('fts4', '_content'), ('fts4', '_segments'), ('fts4', '_segdir')," + "('fts4', '_docsize'), ('fts4', '_stat')," + + "('fts5', '_data'), ('fts5', '_idx'), ('fts5', '_content')," + "('fts5', '_docsize'), ('fts5', '_config')," + + "('rtree', '_node'), ('rtree', '_rowid'), ('rtree', '_parent');", + 0, 0, 0); + assert(rc == SQLITE_OK); + + rc = sqlite3_create_function( + g.db, "module_name", 1, SQLITE_UTF8, 0, module_name_func, 0, 0); + assert(rc == SQLITE_OK); + + return "SELECT name FROM main.sqlite_schema\n" + " WHERE type='table' AND (\n" + " module_name(sql) IS NULL OR \n" + " module_name(sql) IN (SELECT module FROM temp.tblmap)\n" + " ) AND name NOT IN (\n" + " SELECT a.name || b.postfix \n" + "FROM main.sqlite_schema AS a, temp.tblmap AS b \n" + "WHERE module_name(a.sql) = b.module\n" + " )\n" + "UNION \n" + "SELECT name FROM aux.sqlite_schema\n" + " WHERE type='table' AND (\n" + " module_name(sql) IS NULL OR \n" + " module_name(sql) IN (SELECT module FROM temp.tblmap)\n" + " ) AND name NOT IN (\n" + " SELECT a.name || b.postfix \n" + "FROM aux.sqlite_schema AS a, temp.tblmap AS b \n" + "WHERE module_name(a.sql) = b.module\n" + " )\n" + " ORDER BY name"; + } + else + { + return "SELECT name FROM main.sqlite_schema\n" + " WHERE type='table' AND sql NOT LIKE 'CREATE VIRTUAL%%'\n" + " UNION\n" + "SELECT name FROM aux.sqlite_schema\n" + " WHERE type='table' AND sql NOT LIKE 'CREATE VIRTUAL%%'\n" + " ORDER BY name"; + } +} + +/* +** Print sketchy documentation for this utility program +*/ +static void showHelp(void) +{ + sqlite3_fprintf(stdout, "Usage: %s [options] DB1 DB2\n", g.zArgv0); + sqlite3_fprintf(stdout, + "Output SQL text that would transform DB1 into DB2.\n" + "Options:\n" + " --changeset FILE Write a CHANGESET into FILE\n" + " -L|--lib LIBRARY Load an SQLite extension library\n" + " --primarykey Use schema-defined PRIMARY KEYs\n" + " --rbu Output SQL to create/populate RBU table(s)\n" + " --schema Show only differences in the schema\n" + " --summary Show only a summary of the differences\n" + " --table TAB Show only differences in table TAB\n" + " --transaction Show SQL output inside a transaction\n" + " --vtab Handle fts3, fts4, fts5 and rtree tables\n" + "See https://sqlite.org/sqldiff.html for detailed explanation.\n"); +} + +int main(int argc, char **argv) +{ + const char *zDb1 = 0; + const char *zDb2 = 0; + int i; + int rc; + char *zErrMsg = 0; + char *zSql; + sqlite3_stmt *pStmt; + char *zTab = 0; + FILE *out = stdout; + void (*xDiff)(const char *, FILE *) = diff_one_table; +#ifndef SQLITE_OMIT_LOAD_EXTENSION + int nExt = 0; + char **azExt = 0; +#endif + int useTransaction = 0; + int neverUseTransaction = 0; + + g.zArgv0 = argv[0]; + sqlite3_config(SQLITE_CONFIG_SINGLETHREAD); + for (i = 1; i < argc; i++) + { + const char *z = argv[i]; + if (z[0] == '-') + { + z++; + if (z[0] == '-') + z++; + if (strcmp(z, "changeset") == 0) + { + if (i == argc - 1) + cmdlineError("missing argument to %s", argv[i]); + out = sqlite3_fopen(argv[++i], "wb"); + if (out == 0) + cmdlineError("cannot open: %s", argv[i]); + xDiff = changeset_one_table; + neverUseTransaction = 1; + } + else if (strcmp(z, "debug") == 0) + { + if (i == argc - 1) + cmdlineError("missing argument to %s", argv[i]); + g.fDebug = strtol(argv[++i], 0, 0); + } + else if (strcmp(z, "help") == 0) + { + showHelp(); + return 0; + } + else +#ifndef SQLITE_OMIT_LOAD_EXTENSION + if (strcmp(z, "lib") == 0 || strcmp(z, "L") == 0) + { + if (i == argc - 1) + cmdlineError("missing argument to %s", argv[i]); + azExt = realloc(azExt, sizeof(azExt[0]) * (nExt + 1)); + if (azExt == 0) + cmdlineError("out of memory"); + azExt[nExt++] = argv[++i]; + } + else +#endif + if (strcmp(z, "primarykey") == 0) + { + g.bSchemaPK = 1; + } + else if (strcmp(z, "rbu") == 0) + { + xDiff = rbudiff_one_table; + } + else if (strcmp(z, "schema") == 0) + { + g.bSchemaOnly = 1; + } + else if (strcmp(z, "summary") == 0) + { + xDiff = summarize_one_table; + } + else if (strcmp(z, "table") == 0) + { + if (i == argc - 1) + cmdlineError("missing argument to %s", argv[i]); + zTab = argv[++i]; + g.bSchemaCompare = + sqlite3_stricmp(zTab, "sqlite_schema") == 0 || sqlite3_stricmp(zTab, "sqlite_master") == 0; + } + else if (strcmp(z, "transaction") == 0) + { + useTransaction = 1; + } + else if (strcmp(z, "vtab") == 0) + { + g.bHandleVtab = 1; + } + else + { + cmdlineError("unknown option: %s", argv[i]); + } + } + else if (zDb1 == 0) + { + zDb1 = argv[i]; + } + else if (zDb2 == 0) + { + zDb2 = argv[i]; + } + else + { + cmdlineError("unknown argument: %s", argv[i]); + } + } + if (zDb2 == 0) + { + cmdlineError("two database arguments required"); + } + if (g.bSchemaOnly && g.bSchemaCompare) + { + cmdlineError("The --schema option is useless with --table %s .", zTab); + } + rc = sqlite3_open_v2(zDb1, &g.db, SQLITE_OPEN_READONLY, 0); + if (rc) + { + cmdlineError("cannot open database file \"%s\"", zDb1); + } + rc = sqlite3_exec(g.db, "SELECT * FROM sqlite_schema", 0, 0, &zErrMsg); + if (rc || zErrMsg) + { + cmdlineError("\"%s\" does not appear to be a valid SQLite database", zDb1); + } + { + sqlite3 *db2 = 0; + if (sqlite3_open_v2(zDb2, &db2, SQLITE_OPEN_READONLY, 0)) + { + cmdlineError("cannot open database file \"%s\"", zDb2); + } + sqlite3_close(db2); + } +#ifndef SQLITE_OMIT_LOAD_EXTENSION + sqlite3_enable_load_extension(g.db, 1); + for (i = 0; i < nExt; i++) + { + rc = sqlite3_load_extension(g.db, azExt[i], 0, &zErrMsg); + if (rc || zErrMsg) + { + cmdlineError("error loading %s: %s", azExt[i], zErrMsg); + } + } + free(azExt); +#endif + zSql = sqlite3_mprintf("ATTACH %Q as aux;", zDb2); + rc = sqlite3_exec(g.db, zSql, 0, 0, &zErrMsg); + sqlite3_free(zSql); + zSql = 0; + if (rc || zErrMsg) + { + cmdlineError("cannot attach database \"%s\"", zDb2); + } + rc = sqlite3_exec(g.db, "SELECT * FROM aux.sqlite_schema", 0, 0, &zErrMsg); + if (rc || zErrMsg) + { + cmdlineError("\"%s\" does not appear to be a valid SQLite database", zDb2); + } + + if (neverUseTransaction) + useTransaction = 0; + if (useTransaction) + sqlite3_fprintf(out, "BEGIN TRANSACTION;\n"); + if (xDiff == rbudiff_one_table) + { + sqlite3_fprintf(out, "CREATE TABLE IF NOT EXISTS rbu_count" + "(tbl TEXT PRIMARY KEY COLLATE NOCASE, cnt INTEGER) " + "WITHOUT ROWID;\n"); + } + if (zTab) + { + xDiff(zTab, out); + } + else + { + /* Handle tables one by one */ + pStmt = db_prepare("%s", all_tables_sql()); + while (SQLITE_ROW == sqlite3_step(pStmt)) + { + xDiff((const char *)sqlite3_column_text(pStmt, 0), out); + } + sqlite3_finalize(pStmt); + } + if (useTransaction) + sqlite3_fprintf(stdout, "COMMIT;\n"); + + /* TBD: Handle trigger differences */ + /* TBD: Handle view differences */ + sqlite3_close(g.db); + return 0; +} diff --git a/bridge/sqldiff_wrapper.c b/bridge/sqldiff_wrapper.c new file mode 100644 index 0000000..2f1e9b6 --- /dev/null +++ b/bridge/sqldiff_wrapper.c @@ -0,0 +1,58 @@ +#include +#include +#include +#include "sqldiff_wrapper.h" +#include "sqlite3.h" + +// Simplified sqldiff wrapper +// TODO: Integrate with full sqldiff.c once sqlite3_stdio.h is available + +int sqldiff_run(const char *db1, const char *db2, char **result, char **error) { + sqlite3 *pDb1 = NULL; + sqlite3 *pDb2 = NULL; + int rc; + + *result = NULL; + *error = NULL; + + // Open both databases + rc = sqlite3_open_v2(db1, &pDb1, SQLITE_OPEN_READONLY, NULL); + if (rc != SQLITE_OK) { + if (pDb1) { + *error = strdup(sqlite3_errmsg(pDb1)); + sqlite3_close(pDb1); + } else { + *error = strdup("Failed to open first database"); + } + return rc; + } + + rc = sqlite3_open_v2(db2, &pDb2, SQLITE_OPEN_READONLY, NULL); + if (rc != SQLITE_OK) { + if (pDb2) { + *error = strdup(sqlite3_errmsg(pDb2)); + sqlite3_close(pDb2); + } else { + *error = strdup("Failed to open second database"); + } + sqlite3_close(pDb1); + return rc; + } + + // For now, generate a placeholder diff + // In the future, this will call the actual sqldiff logic + char buffer[1024]; + snprintf(buffer, sizeof(buffer), + "-- SQLDiff placeholder for %s vs %s\n" + "-- TODO: Implement full diff using sqldiff.c\n" + "-- Once sqlite3_stdio.h is available\n", + db1, db2); + + *result = strdup(buffer); + + // Close databases + sqlite3_close(pDb1); + sqlite3_close(pDb2); + + return 0; +} diff --git a/bridge/sqldiff_wrapper.h b/bridge/sqldiff_wrapper.h new file mode 100644 index 0000000..8875578 --- /dev/null +++ b/bridge/sqldiff_wrapper.h @@ -0,0 +1,20 @@ +#ifndef SQLDIFF_WRAPPER_H +#define SQLDIFF_WRAPPER_H + +#ifdef __cplusplus +extern "C" { +#endif + +// sqldiff_run compares two databases and returns SQL to transform db1 to db2 +// db1: path to first database +// db2: path to second database +// result: pointer to receive SQL diff output (caller must free) +// error: pointer to receive error message if any (caller must free) +// returns: 0 on success, non-zero on error +int sqldiff_run(const char *db1, const char *db2, char **result, char **error); + +#ifdef __cplusplus +} +#endif + +#endif // SQLDIFF_WRAPPER_H diff --git a/client/go.mod b/client/go.mod index d5ea0b9..6998bd1 100644 --- a/client/go.mod +++ b/client/go.mod @@ -5,8 +5,9 @@ go 1.24.5 require ( github.com/BurntSushi/toml v1.5.0 github.com/fatih/color v1.18.0 + github.com/fsnotify/fsnotify v1.9.0 github.com/google/uuid v1.6.0 - github.com/gorilla/websocket v1.5.0 + github.com/gorilla/websocket v1.5.3 github.com/spf13/cobra v1.10.1 github.com/sqlrsync/sqlrsync.com/bridge v0.0.0-00010101000000-000000000000 go.uber.org/zap v1.27.0 diff --git a/client/go.sum b/client/go.sum index 5aadc11..85f893d 100644 --- a/client/go.sum +++ b/client/go.sum @@ -5,10 +5,14 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/fatih/color v1.18.0 h1:S8gINlzdQ840/4pfAwic/ZE0djQEH3wM94VfqLTZcOM= github.com/fatih/color v1.18.0/go.mod h1:4FelSpRwEGDpQ12mAdzqdOukCy4u8WUtOY6lkT/6HfU= +github.com/fsnotify/fsnotify v1.9.0 h1:2Ml+OJNzbYCTzsxtv8vKSFD9PbJjmhYF14k/jKC7S9k= +github.com/fsnotify/fsnotify v1.9.0/go.mod h1:8jBTzvmWwFyi3Pb8djgCCO5IBqzKJ/Jwo8TRcHyHii0= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/gorilla/websocket v1.5.0 h1:PPwGk2jz7EePpoHN/+ClbZu8SPxiqlu12wZP/3sWmnc= github.com/gorilla/websocket v1.5.0/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= +github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aNNg= +github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2s0bqwp9tc8= github.com/inconshreveable/mousetrap v1.1.0/go.mod h1:vpF70FUmC8bwa3OWnCshd2FqLfsEA9PFc4w1p2J65bw= github.com/mattn/go-colorable v0.1.13 h1:fFA4WZxdEF4tXPZVKMLwD8oUnCTTo08duU7wxecdEvA= diff --git a/client/main.go b/client/main.go index c3b6c6e..2236700 100644 --- a/client/main.go +++ b/client/main.go @@ -29,6 +29,10 @@ var ( replicaID string logger *zap.Logger showVersion bool + waitIdle string + maxInterval string + minInterval string + autoMerge bool ) var MAX_MESSAGE_SIZE = 4096 @@ -40,14 +44,28 @@ var rootCmd = &cobra.Command{ A web-enabled rsync-like utility for SQLite databases with subscription support. Usage modes: -1. Push to server: sqlrsync LOCAL [REMOTE] [OPTIONS] -2. Pull from server: sqlrsync REMOTE [LOCAL] [OPTIONS] -3. Pull with subscription: sqlrsync REMOTE [LOCAL] --subscribe [OPTIONS] -4. Local to local sync: sqlrsync LOCAL1 LOCAL2 [OPTIONS] +1. PUSH to server: sqlrsync LOCAL [REMOTE] [OPTIONS] +2. PUSH when LOCAL changes: sqlrsync LOCAL [REMOTE] --subscribe [OPTIONS] +3. PULL from server: sqlrsync REMOTE [LOCAL] [OPTIONS] +4. PULL when server gets new version: sqlrsync REMOTE [LOCAL] --subscribe [OPTIONS] +5. LOCAL to local sync: sqlrsync LOCAL1 LOCAL2 [OPTIONS] Where: -- REMOTE is a path like namespace/db.sqlite (remote server) -- LOCAL is a local file path like ./db.sqlite or db.sqlite (local file) +- REMOTE is a path like oregon/elections.db (remote server) +- LOCAL is a local file path like ./forex.db or history.sqlite (local files) + +Public and Unlisted Replicas do not require authentication to PULL. + +Private Replicas will interactively prompt for an authentication token on the +first synchronization. After successfully synchronizing, the server creates +new keys to enable pulls (and pushes if the operation was PUSH) so that future +operations do not require interactive authentication. The pull key is +stored adjascent to the local database file in a "-sqlrsync" file. The push key +is stored in ~/.config/sqlrsync/. Keys can be rotated and deleted from the +https://sqlrsync.com website. + +In other words: 'sqlrsync LOCAL' will redo the last REMOTE operation on that LOCAL +and will not require any further authentication. Limitations: - Pushing to the server requires page size of 4096 (default for SQLite). @@ -171,6 +189,10 @@ func runSync(cmd *cobra.Command, args []string) error { Verbose: verbose, WsID: wsID, // Add websocket ID ClientVersion: VERSION, + WaitIdle: waitIdle, + MaxInterval: maxInterval, + MinInterval: minInterval, + AutoMerge: autoMerge, }) // Execute the operation @@ -188,12 +210,15 @@ func determineOperation(args []string) (sync.Operation, string, string, error) { path := args[0] if isLocal(path) { // LOCAL -> push to default remote + if subscribing { + return sync.OperationPushSubscribe, path, "", nil + } return sync.OperationPush, path, "", nil } else { // REMOTE -> pull to default local dbname := filepath.Base(path) if subscribing { - return sync.OperationSubscribe, dbname, path, nil + return sync.OperationPullSubscribe, dbname, path, nil } return sync.OperationPull, dbname, path, nil } @@ -205,12 +230,15 @@ func determineOperation(args []string) (sync.Operation, string, string, error) { replicaLocal := isLocal(replica) if originLocal && !replicaLocal { - // LOCAL REMOTE -> push + // LOCAL REMOTE -> push (or push subscribe if --subscribe is used) + if subscribing { + return sync.OperationPushSubscribe, origin, replica, nil + } return sync.OperationPush, origin, replica, nil } else if !originLocal && replicaLocal { // REMOTE LOCAL -> pull (or subscribe) if subscribing { - return sync.OperationSubscribe, replica, origin, nil + return sync.OperationPullSubscribe, replica, origin, nil } return sync.OperationPull, replica, origin, nil } else if originLocal && replicaLocal { @@ -279,12 +307,16 @@ func init() { rootCmd.Flags().StringVarP(&commitMessageParam, "message", "m", "", "Commit message for the PUSH operation") rootCmd.Flags().StringVar(&replicaID, "replicaID", "", "Replica ID for the remote database") rootCmd.Flags().StringVarP(&serverURL, "server", "s", "wss://sqlrsync.com", "Server URL for remote operations") - rootCmd.Flags().BoolVar(&subscribing, "subscribe", false, "Enable subscription to PULL changes") + rootCmd.Flags().BoolVar(&subscribing, "subscribe", false, "Long-running automated PUSH and PULL based on local activity and server notifications") rootCmd.Flags().BoolVar(&verbose, "verbose", false, "Enable verbose logging") rootCmd.Flags().BoolVar(&SetUnlisted, "unlisted", false, "Enable unlisted access to the replica (initial PUSH only)") rootCmd.Flags().BoolVar(&SetPublic, "public", false, "Enable public access to the replica (initial PUSH only)") rootCmd.Flags().BoolVar(&dryRun, "dry", false, "Perform a dry run without making changes") rootCmd.Flags().BoolVarP(&showVersion, "version", "v", false, "Show version information") + rootCmd.Flags().StringVar(&waitIdle, "waitIdle", "", "Time to wait for idleness before pushing (e.g., 10m, 1h30m, 3d)") + rootCmd.Flags().StringVar(&maxInterval, "maxInterval", "", "Maximum time between pushes regardless of activity (e.g., 24h, 1w)") + rootCmd.Flags().StringVar(&minInterval, "minInterval", "", "Minimum time between subsequent pushes (defaults to 1/2 maxInterval)") + rootCmd.Flags().BoolVar(&autoMerge, "merge", false, "Automatically merge changes when server has newer version") } diff --git a/client/remote/client.go b/client/remote/client.go index fbcb38b..c1ee4a6 100644 --- a/client/remote/client.go +++ b/client/remote/client.go @@ -17,11 +17,14 @@ import ( "go.uber.org/zap" ) +const FEATURE_PULL_CONFLICTDETECTION = false + const ( SQLRSYNC_CONFIG = 0x51 // Send to keys and replicaID SQLRSYNC_NEWREPLICAVERSION = 0x52 // New version available SQLRSYNC_KEYREQUEST = 0x53 // request keys SQLRSYNC_COMMITMESSAGE = 0x54 // commit message + SQLRSYNC_CHANGED = 0x55 // Write detected notification with duration ) // ProgressPhase represents the current phase of the sync operation @@ -404,6 +407,7 @@ type Config struct { AuthKey string ClientVersion string // version of the client software SendKeyRequest bool // the -sqlrsync file doesn't exist, so make a key + AuthToken string SendConfigCmd bool // we don't have the version number or remote path LocalHostname string @@ -451,11 +455,17 @@ type Client struct { NewPullKey string NewPushKey string ReplicaID string - Version string + Version string // needs to stay string because it can be `latest` ReplicaPath string SetVisibility int + KeyType string // "PUSH" or "PULL" - indicates the type of key being used newVersionChan chan struct{} + // Version conflict tracking + versionConflict bool + latestVersion string + versionMu sync.RWMutex + // Progress tracking progress *SyncProgress progressMu sync.RWMutex @@ -688,6 +698,10 @@ func (c *Client) Connect() error { headers.Set("Authorization", c.config.AuthKey) + // Set X-ClientID to the wsID from defaults config + wsID := c.config.WsID + headers.Set("X-ClientID", wsID) + headers.Set("X-ClientVersion", c.config.ClientVersion) if c.config.WsID != "" { @@ -1427,7 +1441,7 @@ func (c *Client) writeLoop() { // Inspect raw WebSocket outbound traffic c.inspector.LogWebSocketTraffic(data, "OUT (Client → Server)", c.config.EnableTrafficInspection) - // For a PULL with no existing data we need to send this before writes, however I imagine this needs + // For a PULL with no existing data we need to send this before writes, however I imagine this needs // to go after ORIGIN_BEGIN for a PUSH with existing data. if c.config.SendConfigCmd { conn.WriteMessage(websocket.BinaryMessage, []byte{SQLRSYNC_CONFIG}) @@ -1452,7 +1466,6 @@ func (c *Client) writeLoop() { return } - c.logger.Debug("Sent message to remote", zap.Int("bytes", len(data))) } } @@ -1466,15 +1479,66 @@ func (c *Client) GetNewPushKey() string { return c.NewPushKey } +// SendChangedNotification sends SQLRSYNC_CHANGED message with duration until push +func (c *Client) SendChangedNotification(durationSeconds uint32) error { + // Message format: [0x57][duration: 4 bytes as seconds] + msg := make([]byte, 5) + msg[0] = SQLRSYNC_CHANGED + + // Duration in seconds (4 bytes, little-endian) + msg[1] = byte(durationSeconds) + msg[2] = byte(durationSeconds >> 8) + msg[3] = byte(durationSeconds >> 16) + msg[4] = byte(durationSeconds >> 24) + + // Send via write queue + select { + case c.writeQueue <- msg: + c.logger.Debug("Sent SQLRSYNC_CHANGED notification", + zap.Uint32("durationSeconds", durationSeconds)) + return nil + default: + return fmt.Errorf("write queue full") + } +} + func (c *Client) GetReplicaID() string { return c.ReplicaID } func (c *Client) GetReplicaPath() string { return c.ReplicaPath } -func (c *Client) GetVersion() string { +func (c *Client) GetLatestCommitVersion() string { return c.Version } +func (c *Client) GetKeyType() string { + return c.KeyType +} + +// HasVersionConflict returns true if a version conflict was detected +func (c *Client) HasVersionConflict() bool { + if FEATURE_PULL_CONFLICTDETECTION != true { + return false + } + c.versionMu.RLock() + defer c.versionMu.RUnlock() + return c.versionConflict +} + +// GetLatestVersion returns the latest version from server (if version conflict occurred) +func (c *Client) GetLatestVersion() string { + c.versionMu.RLock() + defer c.versionMu.RUnlock() + return c.latestVersion +} + +// ResetVersionConflict clears the version conflict flag +func (c *Client) ResetVersionConflict() { + c.versionMu.Lock() + defer c.versionMu.Unlock() + c.versionConflict = false + c.latestVersion = "" +} // WaitForNewVersion blocks until a new version notification is received (0x52) // Returns nil when a new version is available, or an error if the connection is lost diff --git a/client/subscription/manager.go b/client/subscription/manager.go index 5c3139b..7f75f03 100644 --- a/client/subscription/manager.go +++ b/client/subscription/manager.go @@ -231,20 +231,6 @@ func (m *Manager) doConnect() error { } } - // Connect to remote server - if strings.Contains(err.Error(), "key is not authorized") || strings.Contains(err.Error(), "404 Path not found") { - if m.config.AccessKey == "" { - key, err := PromptForKey(m.config.ServerURL, m.config.ReplicaPath, "PULL") - if err != nil { - return fmt.Errorf("manager failed to get key interactively: %w", err) - } - m.config.AccessKey = key - return m.doConnect() - } else { - return fmt.Errorf("manager failed to connect to server: %w", err) - } - } - // Create a clean error message var errorMsg strings.Builder errorMsg.WriteString(fmt.Sprintf("HTTP %d (%s)", statusCode, statusText)) @@ -282,6 +268,15 @@ func (m *Manager) doConnect() error { errorMsg.WriteString(fmt.Sprintf("\nOriginal error: %v", err)) + if strings.Contains(err.Error(), "key is not authorized") || strings.Contains(err.Error(), "404 Path not found") { + key, err := PromptForKey(m.config.ServerURL, m.config.ReplicaPath, "PULL") + if err != nil || key == "" { + return fmt.Errorf("manager failed to get key interactively: %w", err) + } + m.config.AccessKey = key + return m.doConnect() + } + return fmt.Errorf("%s", errorMsg.String()) } diff --git a/client/sync/coordinator.go b/client/sync/coordinator.go index 4e11bb0..7deedbb 100644 --- a/client/sync/coordinator.go +++ b/client/sync/coordinator.go @@ -17,6 +17,7 @@ import ( "github.com/sqlrsync/sqlrsync.com/bridge" "github.com/sqlrsync/sqlrsync.com/remote" "github.com/sqlrsync/sqlrsync.com/subscription" + "github.com/sqlrsync/sqlrsync.com/watcher" ) // Operation represents a sync operation type @@ -25,7 +26,8 @@ type Operation int const ( OperationPull Operation = iota OperationPush - OperationSubscribe + OperationPullSubscribe + OperationPushSubscribe OperationLocalSync ) @@ -48,16 +50,25 @@ type CoordinatorConfig struct { Verbose bool WsID string // Websocket ID for client identification ClientVersion string // version of the client software + Subscribing bool + WaitIdle string + MaxInterval string + MinInterval string + AutoMerge bool } // Coordinator manages sync operations and subscriptions type Coordinator struct { - config *CoordinatorConfig - logger *zap.Logger - authResolver *auth.Resolver - subManager *subscription.Manager - ctx context.Context - cancel context.CancelFunc + config *CoordinatorConfig + logger *zap.Logger + authResolver *auth.Resolver + subManager *subscription.Manager + remoteClient *remote.Client // For PUSH subscription mode + ctx context.Context + cancel context.CancelFunc + sessionStarted bool // Flag to track if we've sent initial SQLRSYNC_CHANGED for this session + lastChangedSent time.Time // Track when we last sent SQLRSYNC_CHANGED + resendTimer *time.Timer // Timer for resending SQLRSYNC_CHANGED 10s before expiration } // NewCoordinator creates a new sync coordinator @@ -96,8 +107,10 @@ func (c *Coordinator) Execute() error { return c.executePull(false) case OperationPush: return c.executePush() - case OperationSubscribe: - return c.executeSubscribe() + case OperationPullSubscribe: + return c.executePullSubscribe() + case OperationPushSubscribe: + return c.executePushSubscribe() case OperationLocalSync: return c.executeLocalSync() default: @@ -208,8 +221,9 @@ func (c *Coordinator) resolveAuth(operation string) (*auth.ResolveResult, error) return result, nil } -// executeSubscribe runs pull sync with subscription for new versions -func (c *Coordinator) executeSubscribe() error { +// executePullSubscribe runs pull sync with subscription for new versions +func (c *Coordinator) executePullSubscribe() error { + fmt.Println("📡 PULL Subscribe mode enabled - will watch for new versions...") fmt.Println("📡 Subscribe mode enabled - will watch for new versions...") fmt.Println(" Press Ctrl+C to stop watching...") @@ -274,7 +288,7 @@ func (c *Coordinator) executeSubscribe() error { return fmt.Errorf("sync #%d failed: %w", syncCount, err) } - fmt.Printf("✅ Sync complete. Waiting for new version...\n") + fmt.Printf("✅ Sync completed at %s. Waiting for new version...\n", time.Now().Format(time.RFC3339)) // Wait for new version or shutdown select { @@ -284,41 +298,19 @@ func (c *Coordinator) executeSubscribe() error { default: } - // Wait for new version notification - var version string + // PULL (read only) Subscribe: Wait for new version notification for { - version, err = c.subManager.WaitForNewVersionMsg() - if err != nil { - // Check if this is a cancellation (graceful shutdown) - if strings.Contains(err.Error(), "cancelled") { - fmt.Println("Subscription stopped by user.") - return nil - } - - // Check if this is a permanent reconnection failure - if strings.Contains(err.Error(), "reconnection failed") { - fmt.Printf("❌ Failed to maintain connection to subscription service: %v\n", err) - fmt.Println(" Please check your network connection and try again later.") - return fmt.Errorf("subscription connection lost: %w", err) + if err := c.waitForVersionAndPull(false); err != nil { + // Check if this is a cancellation or reconnection failure (both are terminal) + if strings.Contains(err.Error(), "cancelled") || strings.Contains(err.Error(), "reconnection failed") { + return err } - - c.logger.Error("Subscription error", zap.Error(err)) - return fmt.Errorf("subscription error: %w", err) - } - if c.config.Version == version { - fmt.Printf("â„šī¸ Already at version %s, waiting for next update...\n", version) + // For other errors, continue the loop to try again continue - } else { - break } + // Successfully got and processed a new version, break out of version wait loop + break } - - fmt.Printf("🔄 New version %s announced!\n", version) - // Update version for next sync - if version != "latest" { - c.config.Version = version - } - } } @@ -398,7 +390,10 @@ func (c *Coordinator) executePull(isSubscription bool) error { // Connect to remote server if err := remoteClient.Connect(); err != nil { - if (strings.Contains(err.Error(), "key is not authorized") || strings.Contains(err.Error(), "404 Path not found")) && authResult.AccessKey == "" { + // I removed the check if the authkey is empty because they + // might have provided the wrong key and let's give them a + // chance to fix that. + if strings.Contains(err.Error(), "key is not authorized") || strings.Contains(err.Error(), "404 Path not found") { key, err := c.authResolver.PromptForKey(c.config.ServerURL, c.config.RemotePath, "PULL") if err != nil { return fmt.Errorf("coordinator failed to get key interactively: %w", err) @@ -431,7 +426,7 @@ func (c *Coordinator) executePull(isSubscription bool) error { // Check database integrity after pull localClient.CheckIntegrity() - c.config.Version = remoteClient.GetVersion() + c.config.Version = remoteClient.GetLatestCommitVersion() // Save pull result if needed if remoteClient.GetNewPullKey() != "" && c.authResolver.CheckNeedsDashFile(c.config.LocalPath, remotePath) { if err := c.authResolver.SavePullResult( @@ -474,7 +469,7 @@ func (c *Coordinator) executePush() error { return fmt.Errorf("failed to stat local file: %w", err) } fileSize := fileInfo.Size() - if( fileSize > 100*1024*1024 ) { + if fileSize > 100*1024*1024 { fmt.Printf("âš ī¸ Warning: The database file is large (%.2f MB) for SQLRsync.com and so the behavior and performance is untested.\n", float64(fileSize)/(1024*1024)) } @@ -546,14 +541,43 @@ func (c *Coordinator) executePush() error { // Connect to remote server if err := remoteClient.Connect(); err != nil { + if strings.Contains(err.Error(), "key is not authorized") || strings.Contains(err.Error(), "404 Path not found") { + key, err := subscription.PromptForKey(serverURL, remotePath, "PULL") + if err != nil || key == "" { + return fmt.Errorf("manager failed to get key interactively: %w", err) + } + authResult.AccessKey = key + return c.executePush() + } return fmt.Errorf("failed to connect to server: %w", err) } // Perform the sync if err := c.performPushSync(localClient, remoteClient); err != nil { - return fmt.Errorf("push synchronization failed: %w", err) + + // Check if this was a version conflict and auto-merge is enabled + if remoteClient.HasVersionConflict() && c.config.AutoMerge { + latestVersion := remoteClient.GetLatestVersion() + fmt.Printf("🔄 Auto-merge enabled - attempting to merge with server version %s...\n", latestVersion) + + if mergeErr := c.executeAutoMerge(localClient, remoteClient, latestVersion); mergeErr != nil { + return fmt.Errorf("auto-merge failed: %w", mergeErr) + } + + // Auto-merge succeeded, retry the push + fmt.Println("✅ Auto-merge successful - retrying PUSH...") + remoteClient.ResetVersionConflict() + + if err := c.performPushSync(localClient, remoteClient); err != nil { + return fmt.Errorf("push after auto-merge failed: %w", err) + } + } else { + return fmt.Errorf("push synchronization failed: %w", err) + } } + c.config.Version = remoteClient.GetLatestCommitVersion() + // Save push result if we got new keys if remoteClient.GetNewPushKey() != "" { if err := c.authResolver.SavePushResult( @@ -682,6 +706,733 @@ func (c *Coordinator) executeLocalSync() error { return nil } +// executePushSubscribe performs an initial push then watches for file changes to auto-push +func (c *Coordinator) executePushSubscribe() error { + fmt.Println("📡 PUSH Subscribe mode enabled - will push on file changes...") + fmt.Println(" Press Ctrl+C to stop watching...") + + // Parse duration strings + var waitIdleDuration, maxIntervalDuration, minIntervalDuration time.Duration + var err error + + if c.config.WaitIdle == "" { + return fmt.Errorf("--waitIdle is required for PUSH subscription mode") + } + + waitIdleDuration, err = ParseDuration(c.config.WaitIdle) + if err != nil { + return fmt.Errorf("invalid --waitIdle: %w", err) + } + + if err := ValidateWaitIdle(waitIdleDuration); err != nil { + return fmt.Errorf("invalid --waitIdle: %w", err) + } + + if c.config.MaxInterval != "" { + maxIntervalDuration, err = ParseDuration(c.config.MaxInterval) + if err != nil { + return fmt.Errorf("invalid --maxInterval: %w", err) + } + } + + if c.config.MinInterval != "" { + minIntervalDuration, err = ParseDuration(c.config.MinInterval) + if err != nil { + return fmt.Errorf("invalid --minInterval: %w", err) + } + } else if c.config.MaxInterval != "" { + // Default minInterval to half of maxInterval + minIntervalDuration = maxIntervalDuration / 2 + } + + // Perform initial PUSH + fmt.Println("🔄 Performing initial PUSH...") + if err := c.executePush(); err != nil { + return fmt.Errorf("initial PUSH failed: %w", err) + } + + lastPushTime := time.Now() + fmt.Println("✅ Initial PUSH complete") + + // Check if we should continue with PUSH subscribe based on key type + // The server will tell us via CONFIG what type of key we're using + + // Set up persistent remote client connection for sending CHANGED messages + fmt.Println("📡 Establishing persistent connection for change notifications...") + + // Resolve authentication (same as in executePush) + authResult, err := c.resolveAuth("push") + if err != nil { + return fmt.Errorf("authentication failed: %w", err) + } + + serverURL := authResult.ServerURL + if c.config.ServerURL != "" && c.config.ServerURL != "wss://sqlrsync.com" { + serverURL = c.config.ServerURL + } + + remotePath := authResult.RemotePath + if c.config.RemotePath != "" { + remotePath = c.config.RemotePath + } + + c.remoteClient, err = remote.New(&remote.Config{ + ServerURL: serverURL + "/sapi/push/" + remotePath, + PingPong: true, + Timeout: 60000, + AuthKey: authResult.AccessKey, + ClientVersion: c.config.ClientVersion, + Logger: c.logger.Named("remote-notifications"), + EnableTrafficInspection: c.config.Verbose, + WsID: c.config.WsID, // Add websocket ID + }) + if err != nil { + return fmt.Errorf("failed to create remote client: %w", err) + } + defer c.remoteClient.Close() + + if err := c.remoteClient.Connect(); err != nil { + if strings.Contains(err.Error(), "key is not authorized") || strings.Contains(err.Error(), "404 Path not found") { + key, err := c.authResolver.PromptForKey(c.config.ServerURL, c.config.RemotePath, "PUSH") + if err != nil { + return fmt.Errorf("coordinator failed to get key interactively: %w", err) + } + c.config.ProvidedAuthKey = key + + // We need to SendConfigCmd in the PUSH to get and store keys + return c.executePushSubscribe() + } + return fmt.Errorf("failed to connect to server: %w", err) + } + + // Wait briefly for CONFIG message to arrive with key type + time.Sleep(500 * time.Millisecond) + + // Check key type - if it's a PULL key, we shouldn't be doing PUSH subscribe + keyType := c.remoteClient.GetKeyType() + if keyType == "PULL" { + fmt.Println("âš ī¸ Warning: Using a PULL key - ignoring --waitIdle/--maxInterval/--minInterval settings") + fmt.Println(" Switching to PULL subscription mode instead...") + return c.executePullSubscribe() + } + + // Create file watcher + fileWatcher, err := watcher.NewWatcher(&watcher.Config{ + DatabasePath: c.config.LocalPath, + Logger: c.logger.Named("watcher"), + WriteNotificationFunc: c.sendWriteNotification, + }) + if err != nil { + return fmt.Errorf("failed to create file watcher: %w", err) + } + defer fileWatcher.Close() + + if err := fileWatcher.Start(); err != nil { + return fmt.Errorf("failed to start file watcher: %w", err) + } + + fmt.Printf("👀 Watching %s for changes...\n", c.config.LocalPath) + fmt.Printf(" Wait idle: %v\n", waitIdleDuration) + if maxIntervalDuration > 0 { + fmt.Printf(" Max interval: %v\n", maxIntervalDuration) + } + if minIntervalDuration > 0 { + fmt.Printf(" Min interval: %v\n", minIntervalDuration) + } + + // Also set up subscription to listen for remote updates (bidirectional sync) + fmt.Println("📡 Setting up subscription for remote updates...") + c.subManager = subscription.NewManager(&subscription.ManagerConfig{ + ServerURL: serverURL, + ReplicaPath: remotePath, + AccessKey: authResult.AccessKey, + ReplicaID: authResult.ReplicaID, + WsID: c.config.WsID, + ClientVersion: c.config.ClientVersion, + Logger: c.logger.Named("subscription"), + MaxReconnectAttempts: 20, // Infinite reconnect attempts + InitialReconnectDelay: 5 * time.Second, // Start with 5 seconds delay + MaxReconnectDelay: 5 * time.Minute, // Cap at 5 minutes + }) + + // Connect to subscription service for pull notifications + if err := c.subManager.Connect(); err != nil { + c.logger.Warn("Failed to connect to subscription service for pull notifications", zap.Error(err)) + fmt.Println("âš ī¸ Warning: Could not set up pull subscription - will only push changes") + } else { + defer c.subManager.Close() + fmt.Println("✅ Subscribed to remote updates - will pull new versions automatically") + + // Start goroutine to handle pull notifications + go c.handlePullNotifications() + } + + var waitTimer *time.Timer + resetCount := 0 + + for { + select { + case <-c.ctx.Done(): + fmt.Println("Subscription stopped by user.") + return nil + + default: + } + + // Check for maxInterval timeout even without file changes + if maxIntervalDuration > 0 && time.Since(lastPushTime) >= maxIntervalDuration { + fmt.Printf("⏰ Max interval (%v) reached - pushing...\n", maxIntervalDuration) + + if waitTimer != nil { + waitTimer.Stop() + waitTimer = nil + } + + if err := c.executePushWithRetry(); err != nil { + c.logger.Error("PUSH failed", zap.Error(err)) + } else { + lastPushTime = time.Now() + resetCount = 0 + } + continue + } + + // Wait for file change with timeout + changeTime, err := fileWatcher.WaitForChange() + if err != nil { + if strings.Contains(err.Error(), "cancelled") { + fmt.Println("Subscription stopped by user.") + return nil + } + return fmt.Errorf("file watcher error: %w", err) + } + + timeSinceLastPush := time.Since(lastPushTime) + + c.logger.Debug("File change detected", + zap.Time("changeTime", changeTime), + zap.Duration("timeSinceLastPush", timeSinceLastPush)) + + // Check if we should push immediately due to maxInterval + if maxIntervalDuration > 0 && timeSinceLastPush >= maxIntervalDuration { + fmt.Printf("⏰ Max interval (%v) reached - pushing immediately...\n", maxIntervalDuration) + + if waitTimer != nil { + waitTimer.Stop() + waitTimer = nil + } + + if err := c.executePushWithRetry(); err != nil { + c.logger.Error("PUSH failed", zap.Error(err)) + // Continue watching despite error + } else { + lastPushTime = time.Now() + resetCount = 0 + } + continue + } + + // Calculate timer duration: MAX(minInterval - timeSinceLastPush, waitIdle) + timerDuration := waitIdleDuration + if minIntervalDuration > 0 { + remainingMinInterval := minIntervalDuration - timeSinceLastPush + if remainingMinInterval > timerDuration { + timerDuration = remainingMinInterval + } + } + + c.logger.Debug("Calculated timer duration", + zap.Duration("timerDuration", timerDuration), + zap.Duration("waitIdle", waitIdleDuration), + zap.Duration("minInterval", minIntervalDuration), + zap.Duration("timeSinceLastPush", timeSinceLastPush)) + + // Reset or start the wait timer + if waitTimer != nil { + waitTimer.Stop() + resetCount++ + } else { + resetCount = 0 + } + + c.logger.Debug("Starting wait timer", zap.Duration("duration", timerDuration)) + waitTimer = time.AfterFunc(timerDuration, func() { + fmt.Printf("⏰ Timer expired after %v - pushing changes...\n", timerDuration) + + if err := c.executePushWithRetry(); err != nil { + c.logger.Error("PUSH failed", zap.Error(err)) + } else { + lastPushTime = time.Now() + resetCount = 0 + } + }) + } +} + +// executePushWithRetry executes a push with exponential backoff on failure +func (c *Coordinator) executePushWithRetry() error { + const maxRetries = 5 + delay := 5 * time.Second + + var lastErr error + for attempt := 0; attempt < maxRetries; attempt++ { + if attempt > 0 { + c.logger.Info("Retrying PUSH", zap.Int("attempt", attempt+1), zap.Duration("delay", delay)) + time.Sleep(delay) + delay *= 2 // Exponential backoff + } + + if err := c.executePush(); err != nil { + lastErr = err + c.logger.Warn("PUSH attempt failed", zap.Error(err), zap.Int("attempt", attempt+1)) + + // Report error to server if it's been more than 5 minutes of failures + if attempt == maxRetries-1 { + c.reportErrorToServer(err) + } + continue + } + + // PUSH succeeded - reset session flag for next batch of changes + c.sessionStarted = false + if c.resendTimer != nil { + c.resendTimer.Stop() + c.resendTimer = nil + } + + return nil + } + + return fmt.Errorf("PUSH failed after %d attempts: %w", maxRetries, lastErr) +} + +// reportErrorToServer sends error information to the server via HTTPS POST +func (c *Coordinator) reportErrorToServer(err error) { + // TODO: Implement HTTPS POST to $server/sapi/$replicaID with error message + c.logger.Error("Reporting error to server (not yet implemented)", zap.Error(err)) +} + +// sendWriteNotification sends a write detection notification to the server +// On first write: sends SQLRSYNC_CHANGED with waitIdle duration and sets sessionStarted=true +// On subsequent writes: if resend timer isn't running, starts timer for (waitIdle - 10s) to resend +func (c *Coordinator) sendWriteNotification(path string, timestamp time.Time) error { + if c.remoteClient == nil { + return nil // No remote client available yet + } + + waitIdleDuration, err := ParseDuration(c.config.WaitIdle) + if err != nil { + return err + } + + // If this is the first write of a session + if !c.sessionStarted { + // Send SQLRSYNC_CHANGED with waitIdle duration + if err := c.remoteClient.SendChangedNotification(uint32(waitIdleDuration.Seconds())); err != nil { + return fmt.Errorf("failed to send SQLRSYNC_CHANGED: %w", err) + } + + c.sessionStarted = true + c.lastChangedSent = timestamp + + c.logger.Debug("Sent initial SQLRSYNC_CHANGED", + zap.Duration("waitIdle", waitIdleDuration)) + + fmt.Printf("âœī¸ Detected local change at %s - will PUSH when idle for %v seconds.\n", timestamp.Format(time.RFC3339), waitIdleDuration.Seconds()) + + return nil + } + + // Subsequent write - schedule resend 10 seconds before expiration if not already scheduled + if c.resendTimer == nil { + // Calculate when we need to resend: 10 seconds before the original would expire + timeSinceLastSent := time.Since(c.lastChangedSent) + timeUntilExpiration := waitIdleDuration - timeSinceLastSent + resendDelay := timeUntilExpiration - (10 * time.Second) + + if resendDelay < 0 { + resendDelay = 0 // Send immediately if we're past the 10s mark + } + + c.resendTimer = time.AfterFunc(resendDelay, func() { + c.onResendTimerFired(waitIdleDuration) + }) + + c.logger.Debug("Scheduled SQLRSYNC_CHANGED resend", + zap.Duration("delay", resendDelay)) + } + + return nil +} + +// onResendTimerFired is called when we need to resend SQLRSYNC_CHANGED 10s before expiration +func (c *Coordinator) onResendTimerFired(waitIdleDuration time.Duration) { + // Calculate remaining time until push + timeSinceLastSent := time.Since(c.lastChangedSent) + remainingTime := waitIdleDuration - timeSinceLastSent + + if remainingTime < 0 { + remainingTime = 0 + } + + // Send SQLRSYNC_CHANGED with remaining duration + if err := c.remoteClient.SendChangedNotification(uint32(remainingTime.Seconds())); err != nil { + c.logger.Error("Failed to resend SQLRSYNC_CHANGED", zap.Error(err)) + c.resendTimer = nil + return + } + + c.lastChangedSent = time.Now() + c.resendTimer = nil + + c.logger.Debug("Resent SQLRSYNC_CHANGED with updated duration", + zap.Duration("remainingTime", remainingTime)) +} + +// executeAutoMerge handles automatic merging when server has newer version +func (c *Coordinator) executeAutoMerge(localClient *bridge.BridgeClient, remoteClient *remote.Client, latestVersion string) error { + // Step 1: Create temp file for merge + tempFile, err := os.CreateTemp("", "sqlrsync-merge-local-*.sqlite") + if err != nil { + return fmt.Errorf("failed to create temp file: %w", err) + } + tempPath := tempFile.Name() + tempFile.Close() + defer os.Remove(tempPath) + + fmt.Printf("📋 Step 1/5: Copying local database to temp file %s...\n", tempPath) + + // Step 2: Use LOCAL mode to copy current database to temp + if err := localClient.RunDirectSync(tempPath); err != nil { + return fmt.Errorf("failed to copy local database to temp: %w", err) + } + + fmt.Printf("đŸ“Ĩ Step 2/5: Pulling latest version %s from server over temp file...\n", latestVersion) + + // Step 3: PULL latest version over the temp file + authResult, err := c.resolveAuth("pull") + if err != nil { + return fmt.Errorf("authentication failed for merge pull: %w", err) + } + + serverURL := authResult.ServerURL + if c.config.ServerURL != "" && c.config.ServerURL != "wss://sqlrsync.com" { + serverURL = c.config.ServerURL + } + + remotePath := authResult.RemotePath + if c.config.RemotePath != "" { + remotePath = c.config.RemotePath + } + + // Create new remote client for pulling latest version + pullClient, err := remote.New(&remote.Config{ + ServerURL: serverURL + "/sapi/pull/" + remotePath, + AuthKey: authResult.AccessKey, + ReplicaID: authResult.ReplicaID, + Timeout: 8000, + PingPong: false, + Logger: c.logger.Named("merge-pull"), + Version: latestVersion, + ClientVersion: c.config.ClientVersion, + WsID: c.config.WsID, + SendConfigCmd: false, + SendKeyRequest: false, + EnableTrafficInspection: c.config.Verbose, + }) + if err != nil { + return fmt.Errorf("failed to create pull client for merge: %w", err) + } + defer pullClient.Close() + + if err := pullClient.Connect(); err != nil { + return fmt.Errorf("failed to connect for merge pull: %w", err) + } + + // Create bridge client for temp file + tempBridge, err := bridge.New(&bridge.BridgeConfig{ + DatabasePath: tempPath, + Logger: c.logger.Named("merge-temp"), + EnableSQLiteRsyncLogging: c.config.Verbose, + }) + if err != nil { + return fmt.Errorf("failed to create bridge for temp file: %w", err) + } + defer tempBridge.Close() + + // Pull latest version over temp file + readFunc := func(buffer []byte) (int, error) { + return pullClient.Read(buffer) + } + writeFunc := func(data []byte) error { + return pullClient.Write(data) + } + + if err := tempBridge.RunPullSync(readFunc, writeFunc); err != nil { + return fmt.Errorf("failed to pull latest version for merge: %w", err) + } + + fmt.Println("🔍 Step 3/5: Generating diff between temp (latest) and local (your changes)...") + + // Step 4: Generate diff between temp (has latest) and local (has our changes) + // tempPath has server's latest version + // c.config.LocalPath has our local changes + // We want to find what changed from temp to local (our changes on top of latest) + diffResult, err := bridge.RunSQLDiff(tempPath, c.config.LocalPath) + if err != nil { + return fmt.Errorf("failed to generate diff: %w", err) + } + + if !diffResult.HasChanges { + fmt.Println("✅ Step 4/5: No changes detected - databases are identical") + return nil // Nothing to merge + } + + c.logger.Debug("Diff generated", + zap.Int("operations", len(diffResult.Operations)), + zap.Int("conflicts", len(diffResult.Conflicts))) + + // Step 5: Check for conflicts + if len(diffResult.Conflicts) > 0 { + fmt.Printf("❌ Step 4/5: Detected %d primary key conflict(s)\n", len(diffResult.Conflicts)) + + // Send conflict notification to server + return c.sendMergeConflictNotification(serverURL, remotePath, latestVersion, []byte(diffResult.SQL)) + } + + fmt.Println("✅ Step 4/5: No conflicts detected") + fmt.Printf("📝 Step 5/5: Applying %d change(s) to local database...\n", len(diffResult.Operations)) + + // Step 6: Apply the diff to the temp file (which has latest) + // Then copy temp to local + if err := bridge.ApplyDiff(tempPath, diffResult); err != nil { + // If apply fails, fall back to simple copy + c.logger.Warn("Failed to apply diff, using direct copy", zap.Error(err)) + if err := tempBridge.RunDirectSync(c.config.LocalPath); err != nil { + return fmt.Errorf("failed to apply merge: %w", err) + } + } else { + // Copy merged temp file to local + if err := tempBridge.RunDirectSync(c.config.LocalPath); err != nil { + return fmt.Errorf("failed to copy merged result: %w", err) + } + } + + fmt.Println("✅ Merge completed successfully") + return nil +} + +// sendMergeConflictNotification sends a merge conflict notification to the server +func (c *Coordinator) sendMergeConflictNotification(serverURL, replicaName, version string, diffData []byte) error { + // Get hostname and wsID for notification + hostname, _ := os.Hostname() + //wsID, _ := auth.GetWsID() + + // TODO: Implement HTTP POST to serverURL/sapi/notification/account/replicaName/ + // Message body: { type: "merge-conflict", diff: base64(diffData), versions: [...], hostname: hostname, wsID: wsID } + + fmt.Printf("❌ Merge conflict detected - server blocking until manual resolution\n") + fmt.Printf(" Server: %s\n", serverURL) + fmt.Printf(" Replica: %s\n", replicaName) + fmt.Printf(" Version: %s\n", version) + fmt.Printf(" Hostname: %s\n", hostname) + //fmt.Printf(" wsID: %s\n", wsID) + fmt.Printf(" Diff: %s\n", diffData) + + return fmt.Errorf("merge conflict requires manual resolution") +} + +// vacuumDatabase creates a vacuumed copy of the database in /tmp +// Returns the path to the temp file and a cleanup function +func (c *Coordinator) vacuumDatabase(dbPath string) (string, func(), error) { + c.logger.Info("Creating vacuumed copy of database", zap.String("source", dbPath)) + + // Create temp file in /tmp with pattern sqlrsync-vacuum-* + tempFile, err := os.CreateTemp("/tmp", "sqlrsync-vacuum-*.sqlite") + if err != nil { + return "", nil, fmt.Errorf("failed to create temp file: %w", err) + } + tempPath := tempFile.Name() + tempFile.Close() + + // Remove the temp file so VACUUM INTO can create it + os.Remove(tempPath) + + cleanup := func() { + if err := os.Remove(tempPath); err != nil { + c.logger.Warn("Failed to remove temp vacuum file", zap.String("path", tempPath), zap.Error(err)) + } + } + + // Open database using RunDirectSync to copy and vacuum + // First, we'll use a direct file copy, then vacuum in place + // Actually, we'll use bridge to copy the file directly, then vacuum the copy + localClient, err := bridge.New(&bridge.BridgeConfig{ + DatabasePath: dbPath, + Logger: c.logger.Named("vacuum-copy"), + EnableSQLiteRsyncLogging: c.config.Verbose, + }) + if err != nil { + cleanup() + return "", nil, fmt.Errorf("failed to create bridge client: %w", err) + } + defer localClient.Close() + + // Copy database to temp location using RunDirectSync + if err := localClient.RunDirectSync(tempPath); err != nil { + cleanup() + return "", nil, fmt.Errorf("failed to copy database: %w", err) + } + + c.logger.Info("Database copied, running VACUUM", zap.String("tempFile", tempPath)) + + // Now run VACUUM on the temp file using the bridge's ExecuteVacuum function + //if err := bridge.ExecuteVacuum(tempPath); err != nil { + //cleanup() + //return "", nil, fmt.Errorf("VACUUM failed: %w", err) + //} + c.logger.Fatal("VACUUM not implemented") + c.logger.Info("VACUUM complete", zap.String("tempFile", tempPath)) + + // Get file sizes for reporting + sourceInfo, _ := os.Stat(dbPath) + tempInfo, _ := os.Stat(tempPath) + if sourceInfo != nil && tempInfo != nil { + reduction := float64(sourceInfo.Size()-tempInfo.Size()) / float64(sourceInfo.Size()) * 100 + fmt.Printf("đŸ“Ļ VACUUM complete: %s → %s (%.1f%% reduction)\n", + formatBytes(sourceInfo.Size()), + formatBytes(tempInfo.Size()), + reduction) + } + + return tempPath, cleanup, nil +} + +// formatBytes formats bytes as human-readable string +func formatBytes(bytes int64) string { + const unit = 1024 + if bytes < unit { + return fmt.Sprintf("%d B", bytes) + } + div, exp := int64(unit), 0 + for n := bytes / unit; n >= unit; n /= unit { + div *= unit + exp++ + } + return fmt.Sprintf("%.1f %ciB", float64(bytes)/float64(div), "KMGTPE"[exp]) +} + +// handlePullNotifications runs in a goroutine to listen for remote updates and pull them +func (c *Coordinator) handlePullNotifications() { + c.logger.Info("Starting pull notification handler") + + for { + select { + case <-c.ctx.Done(): + c.logger.Info("Pull notification handler stopped by context cancellation") + return + default: + } + + // Wait for new version and pull if needed + if err := c.waitForVersionAndPull(true); err != nil { + if strings.Contains(err.Error(), "cancelled") || strings.Contains(err.Error(), "reconnection failed") { + return + } + c.logger.Warn("Pull subscription error", zap.Error(err)) + continue + } + } +} + +// waitForVersionAndPull waits for a new version notification and pulls it if different from current version +// isBackground indicates if this is called from a background goroutine (affects logging and error handling) +func (c *Coordinator) waitForVersionAndPull(isBackground bool) error { + // Wait for new version notification + version, err := c.subManager.WaitForNewVersionMsg() + if err != nil { + // Check if this is a cancellation (graceful shutdown) + if strings.Contains(err.Error(), "cancelled") { + if isBackground { + c.logger.Info("Pull notification handler stopped by user") + } else { + fmt.Println("Subscription stopped by user.") + } + return err + } + + // Check if this is a permanent reconnection failure + if strings.Contains(err.Error(), "reconnection failed") { + if isBackground { + c.logger.Error("Pull subscription connection lost permanently", zap.Error(err)) + fmt.Printf("❌ Lost connection to pull subscription service: %v\n", err) + } else { + fmt.Printf("❌ Failed to maintain connection to subscription service: %v\n", err) + fmt.Println(" Please check your network connection and try again later.") + } + return err + } + + if isBackground { + c.logger.Warn("Pull subscription error", zap.Error(err)) + } else { + c.logger.Error("Subscription error", zap.Error(err)) + } + return err + } + + if c.config.Version == version { + if isBackground { + c.logger.Debug("Already at version, skipping pull", zap.String("version", version)) + } else { + fmt.Printf("â„šī¸ Already at version %s, waiting for next update...\n", version) + } + return nil // Not an error, just skip + } + + if isBackground { + fmt.Printf("đŸ“Ĩ New remote version %s detected - pulling update...\n", version) + } else { + fmt.Printf("🔄 New version %s announced at %s!\n", version, time.Now().Format(time.RFC3339)) + } + + // Update version for the pull + oldVersion := c.config.Version + if version != "latest" { + c.config.Version = version + } + + // Perform the pull + if err := c.executePull(true); err != nil { + if isBackground { + c.logger.Error("Auto-pull failed", zap.Error(err), zap.String("version", version)) + fmt.Printf("❌ Failed to pull version %s: %v\n", version, err) + // Restore old version on failure + c.config.Version = oldVersion + } + return err + } + + if isBackground { + fmt.Printf("✅ Successfully pulled version %s at %s\n", version, time.Now().Format(time.RFC3339)) + } + return nil +} + +// formatBytes formats bytes as human-readable string +func _formatBytes(bytes int64) string { + const unit = 1024 + if bytes < unit { + return fmt.Sprintf("%d B", bytes) + } + div, exp := int64(unit), 0 + for n := bytes / unit; n >= unit; n /= unit { + div *= unit + exp++ + } + return fmt.Sprintf("%.1f %ciB", float64(bytes)/float64(div), "KMGTPE"[exp]) +} + // Close cleanly shuts down the coordinator func (c *Coordinator) Close() error { c.cancel() diff --git a/client/sync/duration.go b/client/sync/duration.go new file mode 100644 index 0000000..f1a7bf8 --- /dev/null +++ b/client/sync/duration.go @@ -0,0 +1,78 @@ +package sync + +import ( + "fmt" + "regexp" + "strconv" + "time" +) + +// ParseDuration parses duration strings with support for days and weeks +// Supports formats like: 3s, 10m, 1h30m, 2h, 1h45m, 3d, 1w, 4w +// Does not support months as they have variable length +func ParseDuration(s string) (time.Duration, error) { + if s == "" { + return 0, fmt.Errorf("empty duration string") + } + + // Pattern to match duration components: number followed by unit + // Supported units: s (seconds), m (minutes), h (hours), d (days), w (weeks) + pattern := regexp.MustCompile(`(\d+)([smhdw])`) + matches := pattern.FindAllStringSubmatch(s, -1) + + if len(matches) == 0 { + return 0, fmt.Errorf("invalid duration format: %s", s) + } + + var total time.Duration + + for _, match := range matches { + if len(match) != 3 { + continue + } + + value, err := strconv.Atoi(match[1]) + if err != nil { + return 0, fmt.Errorf("invalid number in duration: %s", match[1]) + } + + unit := match[2] + switch unit { + case "s": + total += time.Duration(value) * time.Second + case "m": + total += time.Duration(value) * time.Minute + case "h": + total += time.Duration(value) * time.Hour + case "d": + total += time.Duration(value) * 24 * time.Hour + case "w": + total += time.Duration(value) * 7 * 24 * time.Hour + default: + return 0, fmt.Errorf("unsupported duration unit: %s", unit) + } + } + + if total == 0 { + return 0, fmt.Errorf("duration must be greater than 0") + } + + return total, nil +} + +// ValidateWaitIdle validates that waitIdle is within acceptable range +// Min: 10 seconds, Max: 24 days +func ValidateWaitIdle(d time.Duration) error { + const maxWaitIdle = 24 * 24 * time.Hour // 24 days + const minWaitIdle = 10 * time.Second + + if d < minWaitIdle { + return fmt.Errorf("waitIdle must be at least %v", minWaitIdle) + } + + if d > maxWaitIdle { + return fmt.Errorf("waitIdle cannot exceed %v (24 days) due to SQLite busy_timeout limitations", maxWaitIdle) + } + + return nil +} diff --git a/client/watcher/watcher.go b/client/watcher/watcher.go new file mode 100644 index 0000000..2406d3b --- /dev/null +++ b/client/watcher/watcher.go @@ -0,0 +1,146 @@ +package watcher + +import ( + "context" + "fmt" + "path/filepath" + "sync" + "time" + + "github.com/fsnotify/fsnotify" + "go.uber.org/zap" +) + +// WriteNotificationFunc is called when a write is detected (for analytics) +type WriteNotificationFunc func(path string, timestamp time.Time) error + +// Config holds file watcher configuration +type Config struct { + DatabasePath string + Logger *zap.Logger + WriteNotificationFunc WriteNotificationFunc // Optional: called on write detection +} + +// Watcher monitors file changes for SQLite database and WAL files +type Watcher struct { + config *Config + logger *zap.Logger + watcher *fsnotify.Watcher + ctx context.Context + cancel context.CancelFunc + mu sync.RWMutex + changeCh chan time.Time +} + +// NewWatcher creates a new file watcher +func NewWatcher(config *Config) (*Watcher, error) { + if config == nil { + return nil, fmt.Errorf("config cannot be nil") + } + + if config.Logger == nil { + return nil, fmt.Errorf("logger cannot be nil") + } + + fsWatcher, err := fsnotify.NewWatcher() + if err != nil { + return nil, fmt.Errorf("failed to create file watcher: %w", err) + } + + ctx, cancel := context.WithCancel(context.Background()) + + w := &Watcher{ + config: config, + logger: config.Logger, + watcher: fsWatcher, + ctx: ctx, + cancel: cancel, + changeCh: make(chan time.Time, 10), + } + + return w, nil +} + +// Start begins watching for file changes +func (w *Watcher) Start() error { + // Watch the database file + if err := w.watcher.Add(w.config.DatabasePath); err != nil { + return fmt.Errorf("failed to watch database file: %w", err) + } + + // Watch the WAL file if it exists + walPath := w.config.DatabasePath + "-wal" + if err := w.watcher.Add(walPath); err != nil { + w.logger.Debug("WAL file not found or cannot be watched", zap.String("path", walPath)) + } + + // Start the event processing loop + go w.eventLoop() + + w.logger.Info("File watcher started", zap.String("path", w.config.DatabasePath)) + return nil +} + +// eventLoop processes file system events +func (w *Watcher) eventLoop() { + for { + select { + case <-w.ctx.Done(): + return + case event, ok := <-w.watcher.Events: + if !ok { + return + } + + // Only care about Write and Create events + if event.Op&fsnotify.Write == fsnotify.Write || event.Op&fsnotify.Create == fsnotify.Create { + // Check if it's the database or WAL file + baseName := filepath.Base(event.Name) + dbBaseName := filepath.Base(w.config.DatabasePath) + walBaseName := dbBaseName + "-wal" + + if baseName == dbBaseName || baseName == walBaseName { + changeTime := time.Now() + w.logger.Debug("File change detected", zap.String("file", event.Name), zap.String("op", event.Op.String())) + + // Send write notification to server for analytics (optional, non-blocking) + if w.config.WriteNotificationFunc != nil { + go func() { + if err := w.config.WriteNotificationFunc(event.Name, changeTime); err != nil { + w.logger.Debug("Failed to send write notification to server", zap.Error(err)) + } + }() + } + + // Send change notification (non-blocking) + select { + case w.changeCh <- changeTime: + default: + // Channel full, change already pending + } + } + } + case err, ok := <-w.watcher.Errors: + if !ok { + return + } + w.logger.Error("File watcher error", zap.Error(err)) + } + } +} + +// WaitForChange blocks until a file change is detected or context is cancelled +func (w *Watcher) WaitForChange() (time.Time, error) { + select { + case <-w.ctx.Done(): + return time.Time{}, fmt.Errorf("watcher cancelled") + case changeTime := <-w.changeCh: + return changeTime, nil + } +} + +// Close stops the file watcher +func (w *Watcher) Close() error { + w.cancel() + return w.watcher.Close() +} diff --git a/examples/earthquakes/gov.usgs.earthquakes.sh b/examples/earthquakes/gov.usgs.earthquakes.sh index f34a27c..cb1529a 100755 --- a/examples/earthquakes/gov.usgs.earthquakes.sh +++ b/examples/earthquakes/gov.usgs.earthquakes.sh @@ -6,10 +6,12 @@ # Configuration FILE=earthquakes.db TABLE=earthquakes -UPDATES=5m +UPDATES=65s URL=https://earthquake.usgs.gov/earthquakes/feed/v1.0/summary/4.5_month.csv URL=https://earthquake.usgs.gov/earthquakes/feed/v1.0/summary/all_hour.csv -SQLRSYNC_PATH=usgs.gov/earthquakes.db + +# Disabled so we can use file watching +#SQLRSYNC_PATH=usgs.gov/earthquakes.db PRIMARY_KEY=id MODE="INSERT OR REPLACE INTO" SCHEMA="time TEXT, latitude REAL, longitude REAL, depth REAL, mag REAL, magType TEXT, nst INTEGER, gap REAL, dmin REAL, rms REAL, net TEXT, id TEXT PRIMARY KEY, updated TEXT, place TEXT, type TEXT, horizontalError REAL, depthError REAL, magError REAL, magNst INTEGER, status TEXT, locationSource TEXT, magSource TEXT" @@ -43,7 +45,7 @@ init_database() { # Download and import data with in-memory staging sync_data() { - echo "$(date): Downloading earthquake data from USGS..." + echo "$(date -u +"%Y-%m-%dT%H:%M:%SZ"): Downloading earthquake data from USGS..." # Download CSV data local temp_file=$(mktemp) @@ -64,16 +66,15 @@ ATTACH DATABASE "$FILE" AS c; CREATE TABLE IF NOT EXISTS c.$TABLE ($SCHEMA); -INSERT OR REPLACE INTO c.$TABLE SELECT * FROM incoming; +INSERT OR IGNORE INTO c.$TABLE SELECT * FROM incoming; -SELECT CHANGES() AS CHANGED; DETACH DATABASE c; EOF if [ $? -eq 0 ]; then local count_after=$(sqlite3 "$FILE" "SELECT COUNT(*) FROM $TABLE;") local new_records=$((count_after - count_before)) - echo "Data imported successfully. Total records: $count_after (added/updated: $new_records)" + echo "Data imported successfully. Total records: $count_after (added $new_records)" # Show some recent earthquake info echo "Recent earthquakes:" @@ -126,6 +127,7 @@ main() { while true; do echo "Sleeping for $UPDATES ($sleep_seconds seconds)..." sleep "$sleep_seconds" + echo "" sync_data done }