From d3de1cfa09734c9a622f4c75197bf68427ef6af9 Mon Sep 17 00:00:00 2001 From: pnwmatt <180812017+pnwmatt@users.noreply.github.com> Date: Wed, 1 Oct 2025 20:08:20 -0700 Subject: [PATCH 1/4] Oops - readd runs-on --- .github/workflows/build.yml | 1 + 1 file changed, 1 insertion(+) diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index 332cc23..ff72442 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -11,6 +11,7 @@ on: jobs: build: name: Build for ${{ matrix.os }} + runs-on: ${{ matrix.runs-on }} strategy: matrix: include: From 818dbb8ef5fbf8494102129d0b86de162237b224 Mon Sep 17 00:00:00 2001 From: pnwmatt <180812017+pnwmatt@users.noreply.github.com> Date: Wed, 1 Oct 2025 21:29:22 -0700 Subject: [PATCH 2/4] enable cDebugFile in verbose mode --- bridge/cgo_bridge.go | 242 ++++++++++++++++++------------------- bridge/client.go | 31 ++--- client/auth/resolver.go | 23 ++-- client/main.go | 2 +- client/remote/client.go | 31 +++-- client/sync/coordinator.go | 74 ++++++------ 6 files changed, 207 insertions(+), 196 deletions(-) diff --git a/bridge/cgo_bridge.go b/bridge/cgo_bridge.go index ca2357f..c8d9782 100644 --- a/bridge/cgo_bridge.go +++ b/bridge/cgo_bridge.go @@ -30,14 +30,14 @@ import ( "go.uber.org/zap" ) -const DEBUG_FILE = "sqlite_rsync_debug.log" - var ( handleCounter int64 handleMap = make(map[int64]cgo.Handle) handleMutex sync.RWMutex ) +const DEBUG_FILE = "sqlrsync-debug" + // GetDatabaseInfo wraps the C function to get database info func GetDatabaseInfo(dbPath string) (*DatabaseInfo, error) { return cgoGetDatabaseInfo(dbPath) @@ -67,12 +67,12 @@ func cgoGetDatabaseInfo(dbPath string) (*DatabaseInfo, error) { } // RunOriginSync wraps the C function to run origin synchronization -func RunOriginSync(dbPath string, dryRun bool, client *Client) error { +func RunOriginSync(dbPath string, dryRun bool, client *BridgeClient) error { return cgoRunOriginSync(dbPath, dryRun, client) } // cgoRunOriginSync wraps the C function to run origin synchronization -func cgoRunOriginSync(dbPath string, dryRun bool, client *Client) error { +func cgoRunOriginSync(dbPath string, dryRun bool, client *BridgeClient) error { // Prepare C configuration cDbPath := C.CString(dbPath) defer C.free(unsafe.Pointer(cDbPath)) @@ -86,9 +86,9 @@ func cgoRunOriginSync(dbPath string, dryRun bool, client *Client) error { wal_only: 0, error_file: nil, } - // Set debug_file if DEBUG_FILE is defined (non-empty) - if DEBUG_FILE != "" { - cDebugFile := C.CString(DEBUG_FILE + ".origin.log") + // Set debugFile if debugFile is defined (non-empty) + if client.Config.EnableSQLiteRsyncLogging { + cDebugFile := C.CString(DEBUG_FILE + "-push.log") defer C.free(unsafe.Pointer(cDebugFile)) config.error_file = cDebugFile } @@ -100,11 +100,11 @@ func cgoRunOriginSync(dbPath string, dryRun bool, client *Client) error { // Create cgo handle for the client and store it in map handle := cgo.NewHandle(client) handleID := atomic.AddInt64(&handleCounter, 1) - + handleMutex.Lock() handleMap[handleID] = handle handleMutex.Unlock() - + defer func() { handleMutex.Lock() delete(handleMap, handleID) @@ -117,7 +117,7 @@ func cgoRunOriginSync(dbPath string, dryRun bool, client *Client) error { cHandleID := C.malloc(C.sizeof_long) defer C.free(cHandleID) *(*C.long)(cHandleID) = C.long(handleID) - + ioCtx := C.sqlite_rsync_io_context_t{ user_data: cHandleID, read_func: C.read_callback_t(C.go_local_read_callback), @@ -150,16 +150,16 @@ func cgoCleanup() { func go_local_read_callback(userData unsafe.Pointer, buffer *C.uint8_t, size C.int) C.int { handleID := int64(*(*C.long)(userData)) - + handleMutex.RLock() handle, ok := handleMap[handleID] handleMutex.RUnlock() - + if !ok { return -1 } - - client := handle.Value().(*Client) + + client := handle.Value().(*BridgeClient) if client.ReadFunc == nil { client.Logger.Error("Read function not set") @@ -186,16 +186,16 @@ func go_local_read_callback(userData unsafe.Pointer, buffer *C.uint8_t, size C.i //export go_local_write_callback func go_local_write_callback(userData unsafe.Pointer, buffer *C.uint8_t, size C.int) C.int { handleID := int64(*(*C.long)(userData)) - + handleMutex.RLock() handle, ok := handleMap[handleID] handleMutex.RUnlock() - + if !ok { return -1 } - - client := handle.Value().(*Client) + + client := handle.Value().(*BridgeClient) if client.WriteFunc == nil { client.Logger.Error("Write function not set") @@ -216,117 +216,117 @@ func go_local_write_callback(userData unsafe.Pointer, buffer *C.uint8_t, size C. } // RunReplicaSync wraps the C function to run replica synchronization -func RunReplicaSync(originDbPath, replicaDbPath string, client *Client) error { - return cgoRunReplicaSync(originDbPath, replicaDbPath, client) +func RunReplicaSync(originDbPath, replicaDbPath string, client *BridgeClient) error { + return cgoRunReplicaSync(originDbPath, replicaDbPath, client) } // RunDirectSync wraps the C function to run direct local synchronization func RunDirectSync(originDbPath, replicaDbPath string, dryRun bool, verbose int) error { - return cgoRunDirectSync(originDbPath, replicaDbPath, dryRun, verbose) + return cgoRunDirectSync(originDbPath, replicaDbPath, dryRun, verbose) } // cgoRunReplicaSync wraps the C function to run replica synchronization -func cgoRunReplicaSync(originDbPath, replicaDbPath string, client *Client) error { - // Prepare C configuration - cOriginPath := C.CString(originDbPath) - defer C.free(unsafe.Pointer(cOriginPath)) - - cReplicaPath := C.CString(replicaDbPath) - defer C.free(unsafe.Pointer(cReplicaPath)) - - config := C.sqlite_rsync_config_t{ - origin_path: cOriginPath, - replica_path: cReplicaPath, - protocol_version: C.PROTOCOL_VERSION, - verbose_level: 0, - dry_run: 0, - wal_only: 0, - error_file: nil, - } - - // Set debug_file if DEBUG_FILE is defined - if DEBUG_FILE != "" { - cDebugFile := C.CString(DEBUG_FILE + "replica.log") - defer C.free(unsafe.Pointer(cDebugFile)) - config.error_file = cDebugFile - } - - // Create cgo handle for the client - handle := cgo.NewHandle(client) - handleID := atomic.AddInt64(&handleCounter, 1) - - handleMutex.Lock() - handleMap[handleID] = handle - handleMutex.Unlock() - - defer func() { - handleMutex.Lock() - delete(handleMap, handleID) - handleMutex.Unlock() - handle.Delete() - }() - - // Setup I/O context with Go callbacks - cHandleID := C.malloc(C.sizeof_long) - defer C.free(cHandleID) - *(*C.long)(cHandleID) = C.long(handleID) - - ioCtx := C.sqlite_rsync_io_context_t{ - user_data: cHandleID, - read_func: C.read_callback_t(C.go_local_read_callback), - write_func: C.write_callback_t(C.go_local_write_callback), - } - - // Run the replica synchronization - result := C.sqlite_rsync_run_replica(&config, &ioCtx) - if result != 0 { - return &SQLiteRsyncError{ - Code: int(result), - Message: "replica sync failed", - } - } - - return nil +func cgoRunReplicaSync(originDbPath, replicaDbPath string, client *BridgeClient) error { + // Prepare C configuration + cOriginPath := C.CString(originDbPath) + defer C.free(unsafe.Pointer(cOriginPath)) + + cReplicaPath := C.CString(replicaDbPath) + defer C.free(unsafe.Pointer(cReplicaPath)) + + config := C.sqlite_rsync_config_t{ + origin_path: cOriginPath, + replica_path: cReplicaPath, + protocol_version: C.PROTOCOL_VERSION, + verbose_level: 0, + dry_run: 0, + wal_only: 0, + error_file: nil, + } + + // Set debugFile if debugFile is defined + if client.Config.EnableSQLiteRsyncLogging { + cDebugFile := C.CString(DEBUG_FILE + "-push.log") + defer C.free(unsafe.Pointer(cDebugFile)) + config.error_file = cDebugFile + } + + // Create cgo handle for the client + handle := cgo.NewHandle(client) + handleID := atomic.AddInt64(&handleCounter, 1) + + handleMutex.Lock() + handleMap[handleID] = handle + handleMutex.Unlock() + + defer func() { + handleMutex.Lock() + delete(handleMap, handleID) + handleMutex.Unlock() + handle.Delete() + }() + + // Setup I/O context with Go callbacks + cHandleID := C.malloc(C.sizeof_long) + defer C.free(cHandleID) + *(*C.long)(cHandleID) = C.long(handleID) + + ioCtx := C.sqlite_rsync_io_context_t{ + user_data: cHandleID, + read_func: C.read_callback_t(C.go_local_read_callback), + write_func: C.write_callback_t(C.go_local_write_callback), + } + + // Run the replica synchronization + result := C.sqlite_rsync_run_replica(&config, &ioCtx) + if result != 0 { + return &SQLiteRsyncError{ + Code: int(result), + Message: "replica sync failed", + } + } + + return nil } // cgoRunDirectSync wraps the C function to run direct local synchronization func cgoRunDirectSync(originDbPath, replicaDbPath string, dryRun bool, verbose int) error { - // Prepare C configuration - cOriginPath := C.CString(originDbPath) - defer C.free(unsafe.Pointer(cOriginPath)) - - cReplicaPath := C.CString(replicaDbPath) - defer C.free(unsafe.Pointer(cReplicaPath)) - - config := C.sqlite_rsync_config_t{ - origin_path: cOriginPath, - replica_path: cReplicaPath, - protocol_version: C.PROTOCOL_VERSION, - verbose_level: C.int(verbose), - dry_run: 0, - wal_only: 0, - error_file: nil, - } - - // Set debug_file if DEBUG_FILE is defined - if DEBUG_FILE != "" { - cDebugFile := C.CString(DEBUG_FILE + ".direct.log") - defer C.free(unsafe.Pointer(cDebugFile)) - config.error_file = cDebugFile - } - - if dryRun { - config.dry_run = 1 - } - - // Run direct sync without WebSocket I/O - use the original C implementation - result := C.sqlite_rsync_run_direct(&config) - if result != 0 { - return &SQLiteRsyncError{ - Code: int(result), - Message: "direct sync failed", - } - } - - return nil -} \ No newline at end of file + // Prepare C configuration + cOriginPath := C.CString(originDbPath) + defer C.free(unsafe.Pointer(cOriginPath)) + + cReplicaPath := C.CString(replicaDbPath) + defer C.free(unsafe.Pointer(cReplicaPath)) + + config := C.sqlite_rsync_config_t{ + origin_path: cOriginPath, + replica_path: cReplicaPath, + protocol_version: C.PROTOCOL_VERSION, + verbose_level: C.int(verbose), + dry_run: 0, + wal_only: 0, + error_file: nil, + } + + // Set debugFile if debugFile is defined + if verbose > 0 { + cDebugFile := C.CString(DEBUG_FILE + "-push.log") + defer C.free(unsafe.Pointer(cDebugFile)) + config.error_file = cDebugFile + } + + if dryRun { + config.dry_run = 1 + } + + // Run direct sync without WebSocket I/O - use the original C implementation + result := C.sqlite_rsync_run_direct(&config) + if result != 0 { + return &SQLiteRsyncError{ + Code: int(result), + Message: "direct sync failed", + } + } + + return nil +} diff --git a/bridge/client.go b/bridge/client.go index 981b65a..43ccb5e 100644 --- a/bridge/client.go +++ b/bridge/client.go @@ -6,11 +6,12 @@ import ( "go.uber.org/zap" ) -// Config holds the configuration for the local SQLite client -type Config struct { - DatabasePath string - DryRun bool - Logger *zap.Logger +// BridgeConfig holds the configuration for the local SQLite client +type BridgeConfig struct { + DatabasePath string + DryRun bool + Logger *zap.Logger + EnableSQLiteRsyncLogging bool } // DatabaseInfo holds metadata about the SQLite database @@ -26,16 +27,16 @@ type ReadFunc func(buffer []byte) (int, error) // WriteFunc defines the function signature for writing data to remote type WriteFunc func(data []byte) error -// Client handles local SQLite operations and CGO interactions -type Client struct { - Config *Config +// BridgeClient handles local SQLite operations and CGO interactions +type BridgeClient struct { + Config *BridgeConfig Logger *zap.Logger ReadFunc ReadFunc WriteFunc WriteFunc } // New creates a new local SQLite client -func New(config *Config) (*Client, error) { +func New(config *BridgeConfig) (*BridgeClient, error) { if config == nil { return nil, fmt.Errorf("config cannot be nil") } @@ -44,7 +45,7 @@ func New(config *Config) (*Client, error) { return nil, fmt.Errorf("logger cannot be nil") } - client := &Client{ + client := &BridgeClient{ Config: config, Logger: config.Logger, } @@ -53,7 +54,7 @@ func New(config *Config) (*Client, error) { } // GetDatabaseInfo retrieves metadata about the SQLite database -func (c *Client) GetDatabaseInfo() (*DatabaseInfo, error) { +func (c *BridgeClient) GetDatabaseInfo() (*DatabaseInfo, error) { c.Logger.Debug("Getting database info", zap.String("path", c.Config.DatabasePath)) info, err := cgoGetDatabaseInfo(c.Config.DatabasePath) @@ -70,7 +71,7 @@ func (c *Client) GetDatabaseInfo() (*DatabaseInfo, error) { } // RunPushSync runs the origin-side synchronization with provided I/O functions -func (c *Client) RunPushSync(readFunc ReadFunc, writeFunc WriteFunc) error { +func (c *BridgeClient) RunPushSync(readFunc ReadFunc, writeFunc WriteFunc) error { c.Logger.Info("Starting origin sync", zap.String("database", c.Config.DatabasePath)) if c.Config.DryRun { @@ -95,7 +96,7 @@ func (c *Client) RunPushSync(readFunc ReadFunc, writeFunc WriteFunc) error { } // RunPullSync runs the replica-side synchronization with provided I/O functions -func (c *Client) RunPullSync(readFunc ReadFunc, writeFunc WriteFunc) error { +func (c *BridgeClient) RunPullSync(readFunc ReadFunc, writeFunc WriteFunc) error { c.Logger.Info("Starting replica sync", zap.String("database", c.Config.DatabasePath)) // Store I/O functions for callbacks @@ -121,7 +122,7 @@ func (c *Client) RunPullSync(readFunc ReadFunc, writeFunc WriteFunc) error { } // RunDirectSync runs direct local synchronization between two SQLite files -func (c *Client) RunDirectSync(replicaPath string) error { +func (c *BridgeClient) RunDirectSync(replicaPath string) error { c.Logger.Info("Starting direct local sync", zap.String("origin", c.Config.DatabasePath), zap.String("replica", replicaPath)) @@ -149,7 +150,7 @@ func (c *Client) RunDirectSync(replicaPath string) error { } // Close cleans up resources -func (c *Client) Close() { +func (c *BridgeClient) Close() { c.Logger.Debug("Cleaning up local client resources") cgoCleanup() } diff --git a/client/auth/resolver.go b/client/auth/resolver.go index d807e2f..96bf8f0 100644 --- a/client/auth/resolver.go +++ b/client/auth/resolver.go @@ -12,7 +12,7 @@ import ( // ResolveResult contains the resolved authentication information type ResolveResult struct { - AuthToken string + AccessToken string ReplicaID string ServerURL string RemotePath string @@ -55,7 +55,7 @@ func (r *Resolver) Resolve(req *ResolveRequest) (*ResolveResult, error) { // 1. Try environment variable first if token := os.Getenv("SQLRSYNC_AUTH_TOKEN"); token != "" { r.logger.Debug("Using SQLRSYNC_AUTH_TOKEN from environment") - result.AuthToken = token + result.AccessToken = token result.ReplicaID = req.ProvidedReplicaID return result, nil } @@ -63,14 +63,14 @@ func (r *Resolver) Resolve(req *ResolveRequest) (*ResolveResult, error) { // 2. Try explicitly provided keys if req.ProvidedPullKey != "" { r.logger.Debug("Using provided pull key") - result.AuthToken = req.ProvidedPullKey + result.AccessToken = req.ProvidedPullKey result.ReplicaID = req.ProvidedReplicaID return result, nil } if req.ProvidedPushKey != "" { r.logger.Debug("Using provided push key") - result.AuthToken = req.ProvidedPushKey + result.AccessToken = req.ProvidedPushKey result.ReplicaID = req.ProvidedReplicaID return result, nil } @@ -114,7 +114,7 @@ func (r *Resolver) Resolve(req *ResolveRequest) (*ResolveResult, error) { if req.Operation == "push" { if os.Getenv("SQLRSYNC_ADMIN_KEY") != "" { r.logger.Debug("Using SQLRSYNC_ADMIN_KEY from environment") - result.AuthToken = os.Getenv("SQLRSYNC_ADMIN_KEY") + result.AccessToken = os.Getenv("SQLRSYNC_ADMIN_KEY") result.ShouldPrompt = false return result, nil } @@ -124,7 +124,14 @@ func (r *Resolver) Resolve(req *ResolveRequest) (*ResolveResult, error) { return result, nil } - // 5. No authentication found + // 5. If it's a pull, maybe no key needed + if req.Operation == "pull" || req.Operation == "subscribe" { + result.AccessToken = "" + result.ShouldPrompt = false + return result, nil + } + + // 6. No authentication found return nil, fmt.Errorf("no authentication credentials found") } @@ -162,7 +169,7 @@ func (r *Resolver) resolveFromLocalSecrets(absLocalPath, serverURL string, resul } r.logger.Debug("Found authentication in local secrets config") - result.AuthToken = dbConfig.PushKey + result.AccessToken = dbConfig.PushKey result.ReplicaID = dbConfig.ReplicaID result.RemotePath = dbConfig.RemotePath result.ServerURL = dbConfig.Server @@ -186,7 +193,7 @@ func (r *Resolver) resolveFromDashFile(localPath string, result *ResolveResult) } r.logger.Debug("Found authentication in -sqlrsync file") - result.AuthToken = dashSQLRsync.PullKey + result.AccessToken = dashSQLRsync.PullKey result.ReplicaID = dashSQLRsync.ReplicaID result.RemotePath = dashSQLRsync.RemotePath result.ServerURL = dashSQLRsync.Server diff --git a/client/main.go b/client/main.go index 283a925..e19f376 100644 --- a/client/main.go +++ b/client/main.go @@ -100,7 +100,7 @@ func runSync(cmd *cobra.Command, args []string) error { } // Create sync coordinator - coordinator := sync.NewCoordinator(&sync.Config{ + coordinator := sync.NewCoordinator(&sync.CoordinatorConfig{ ServerURL: serverURL, ProvidedAuthToken: getAuthToken(), ProvidedPullKey: pullKey, diff --git a/client/remote/client.go b/client/remote/client.go index d94db75..a0ca76e 100644 --- a/client/remote/client.go +++ b/client/remote/client.go @@ -389,18 +389,18 @@ func (t *TrafficInspector) parseMessageType(data []byte) string { // Config holds the configuration for the remote WebSocket client type Config struct { - ServerURL string - Version string - ReplicaID string - Subscribe bool - SetVisibility int // for PUSH - Timeout int // in milliseconds - Logger *zap.Logger - EnableTrafficInspection bool // Enable detailed traffic logging - InspectionDepth int // How many bytes to inspect (default: 32) - PingPong bool - AuthToken string - SendKeyRequest bool // the -sqlrsync file doesn't exist, so make a token + ServerURL string + Version string + ReplicaID string + Subscribe bool + SetVisibility int // for PUSH + Timeout int // in milliseconds + Logger *zap.Logger + EnableTrafficInspection bool // Enable detailed traffic logging + InspectionDepth int // How many bytes to inspect (default: 32) + PingPong bool + AuthToken string + SendKeyRequest bool // the -sqlrsync file doesn't exist, so make a token SendConfigCmd bool // we don't have the version number or remote path LocalHostname string @@ -680,11 +680,8 @@ func (c *Client) Connect() error { defer connectCancel() headers := http.Header{} - if c.config.AuthToken == "" || len(c.config.AuthToken) <= 20 { - return fmt.Errorf("invalid authtoken: %s", c.config.AuthToken) - } else { - headers.Set("Authorization", c.config.AuthToken) - } + + headers.Set("Authorization", c.config.AuthToken) if c.config.LocalHostname != "" { headers.Set("X-LocalHostname", c.config.LocalHostname) diff --git a/client/sync/coordinator.go b/client/sync/coordinator.go index faa75cd..a558f40 100644 --- a/client/sync/coordinator.go +++ b/client/sync/coordinator.go @@ -29,8 +29,8 @@ const ( OperationLocalSync ) -// Config holds sync coordinator configuration -type Config struct { +// CoordinatorConfig holds sync coordinator configuration +type CoordinatorConfig struct { ServerURL string ProvidedAuthToken string // Explicitly provided auth token ProvidedPullKey string // Explicitly provided pull key @@ -49,7 +49,7 @@ type Config struct { // Coordinator manages sync operations and subscriptions type Coordinator struct { - config *Config + config *CoordinatorConfig logger *zap.Logger authResolver *auth.Resolver subManager *subscription.Manager @@ -58,7 +58,7 @@ type Coordinator struct { } // NewCoordinator creates a new sync coordinator -func NewCoordinator(config *Config) *Coordinator { +func NewCoordinator(config *CoordinatorConfig) *Coordinator { ctx, cancel := context.WithCancel(context.Background()) return &Coordinator{ @@ -130,8 +130,11 @@ func (c *Coordinator) displayDryRunInfo(operation string, authResult *auth.Resol if operation != "local" { fmt.Printf(" - Server: %s\n", color.GreenString(serverURL)) - - fmt.Printf(" - Auth Token: %s\n", color.GreenString(authResult.AuthToken)) + if authResult.AccessToken != "" { + fmt.Printf(" - Access Token: %s\n", color.GreenString(authResult.AccessToken)) + } else { + fmt.Printf(" - Access Token: %s\n", color.YellowString("(none)")) + } if operation == "push" { switch c.config.SetVisibility { @@ -176,11 +179,11 @@ func (c *Coordinator) resolveAuth(operation string) (*auth.ResolveResult, error) // Try explicit auth token first if c.config.ProvidedAuthToken != "" { return &auth.ResolveResult{ - AuthToken: c.config.ProvidedAuthToken, - ReplicaID: c.config.ProvidedReplicaID, - ServerURL: c.config.ServerURL, - RemotePath: c.config.RemotePath, - LocalPath: c.config.LocalPath, + AccessToken: c.config.ProvidedAuthToken, + ReplicaID: c.config.ProvidedReplicaID, + ServerURL: c.config.ServerURL, + RemotePath: c.config.RemotePath, + LocalPath: c.config.LocalPath, }, nil } @@ -195,7 +198,7 @@ func (c *Coordinator) resolveAuth(operation string) (*auth.ResolveResult, error) if err != nil { return nil, err } - result.AuthToken = token + result.AccessToken = token result.ShouldPrompt = false } @@ -239,7 +242,7 @@ func (c *Coordinator) executeSubscribe() error { c.subManager = subscription.NewManager(&subscription.Config{ ServerURL: authResult.ServerURL, ReplicaPath: authResult.RemotePath, - AuthToken: authResult.AuthToken, + AuthToken: authResult.AccessToken, ReplicaID: authResult.ReplicaID, Logger: c.logger.Named("subscription"), MaxReconnectAttempts: 20, // Infinite reconnect attempts @@ -357,18 +360,18 @@ func (c *Coordinator) executePull(isSubscription bool) error { // Create remote client for WebSocket transport remoteClient, err := remote.New(&remote.Config{ - ServerURL: serverURL + "/sapi/pull/" + remotePath, - AuthToken: authResult.AuthToken, - ReplicaID: authResult.ReplicaID, - Timeout: 8000, - PingPong: false, // No ping/pong needed for single sync - Logger: c.logger.Named("remote"), - Subscribe: false, // Subscription handled separately - EnableTrafficInspection: c.config.Verbose, - InspectionDepth: 5, - Version: version, - SendConfigCmd: true, - SendKeyRequest: c.authResolver.CheckNeedsDashFile(c.config.LocalPath, remotePath), + ServerURL: serverURL + "/sapi/pull/" + remotePath, + AuthToken: authResult.AccessToken, + ReplicaID: authResult.ReplicaID, + Timeout: 8000, + PingPong: false, // No ping/pong needed for single sync + Logger: c.logger.Named("remote"), + Subscribe: false, // Subscription handled separately + EnableTrafficInspection: c.config.Verbose, + InspectionDepth: 5, + Version: version, + SendConfigCmd: true, + SendKeyRequest: c.authResolver.CheckNeedsDashFile(c.config.LocalPath, remotePath), //ProgressCallback: remote.DefaultProgressCallback(remote.FormatSimple), ProgressCallback: nil, ProgressConfig: &remote.ProgressConfig{ @@ -392,10 +395,11 @@ func (c *Coordinator) executePull(isSubscription bool) error { } // Create local client for SQLite operations - localClient, err := bridge.New(&bridge.Config{ + localClient, err := bridge.New(&bridge.BridgeConfig{ DatabasePath: c.config.LocalPath, DryRun: c.config.DryRun, Logger: c.logger.Named("local"), + EnableSQLiteRsyncLogging: c.config.Verbose, }) if err != nil { return fmt.Errorf("failed to create local client: %w", err) @@ -453,10 +457,11 @@ func (c *Coordinator) executePush() error { } // Create local client for SQLite operations - localClient, err := bridge.New(&bridge.Config{ + localClient, err := bridge.New(&bridge.BridgeConfig{ DatabasePath: c.config.LocalPath, DryRun: c.config.DryRun, Logger: c.logger.Named("local"), + EnableSQLiteRsyncLogging: c.config.Verbose, }) if err != nil { return fmt.Errorf("failed to create local client: %w", err) @@ -483,7 +488,7 @@ func (c *Coordinator) executePush() error { ServerURL: serverURL + "/sapi/push/" + remotePath, PingPong: true, Timeout: 15000, - AuthToken: authResult.AuthToken, + AuthToken: authResult.AccessToken, Logger: c.logger.Named("remote"), EnableTrafficInspection: c.config.Verbose, LocalHostname: localHostname, @@ -557,7 +562,7 @@ func (c *Coordinator) executePush() error { } // performPullSync executes the pull synchronization -func (c *Coordinator) performPullSync(localClient *bridge.Client, remoteClient *remote.Client) error { +func (c *Coordinator) performPullSync(localClient *bridge.BridgeClient, remoteClient *remote.Client) error { // Create I/O bridge between remote and local clients readFunc := func(buffer []byte) (int, error) { return remoteClient.Read(buffer) @@ -579,7 +584,7 @@ func (c *Coordinator) performPullSync(localClient *bridge.Client, remoteClient * } // performPushSync executes the push synchronization -func (c *Coordinator) performPushSync(localClient *bridge.Client, remoteClient *remote.Client) error { +func (c *Coordinator) performPushSync(localClient *bridge.BridgeClient, remoteClient *remote.Client) error { // Create I/O bridge between local and remote clients readFunc := func(buffer []byte) (int, error) { return remoteClient.Read(buffer) @@ -625,10 +630,11 @@ func (c *Coordinator) executeLocalSync() error { fmt.Printf("Syncing LOCAL to LOCAL: %s → %s\n", absOriginPath, absReplicaPath) // Create local client for SQLite operations - localClient, err := bridge.New(&bridge.Config{ - DatabasePath: absOriginPath, - DryRun: c.config.DryRun, - Logger: c.logger.Named("local"), + localClient, err := bridge.New(&bridge.BridgeConfig{ + DatabasePath: absOriginPath, + DryRun: c.config.DryRun, + Logger: c.logger.Named("local"), + EnableSQLiteRsyncLogging: c.config.Verbose, }) if err != nil { return fmt.Errorf("failed to create local client: %w", err) From d7d53691017af65bc06bdfab19c38b759d265c6d Mon Sep 17 00:00:00 2001 From: pnwmatt <180812017+pnwmatt@users.noreply.github.com> Date: Wed, 1 Oct 2025 23:52:34 -0700 Subject: [PATCH 3/4] Add commit message --- client/main.go | 83 ++++++++++++++++++++++++++++++-------- client/remote/client.go | 69 +++++++++++++++++++------------ client/sync/coordinator.go | 38 ++++++++--------- 3 files changed, 130 insertions(+), 60 deletions(-) diff --git a/client/main.go b/client/main.go index e19f376..df2deb9 100644 --- a/client/main.go +++ b/client/main.go @@ -17,19 +17,23 @@ import ( var VERSION = "0.0.2" var ( - serverURL string - verbose bool - dryRun bool - SetPublic bool - SetUnlisted bool - subscribing bool - pullKey string - pushKey string - replicaID string - logger *zap.Logger - showVersion bool + serverURL string + verbose bool + dryRun bool + SetPublic bool + SetUnlisted bool + subscribing bool + pullKey string + pushKey string + commitMessageParam string + replicaID string + logger *zap.Logger + showVersion bool ) +var NoCommitMessage = "" +var MAX_MESSAGE_SIZE = 4096 + var rootCmd = &cobra.Command{ Use: "sqlrsync [ORIGIN] [REPLICA] or [LOCAL] or [REMOTE]", Short: "SQLRsync v" + VERSION, @@ -70,6 +74,19 @@ func runSync(cmd *cobra.Command, args []string) error { return cmd.Help() } + var commitMessage []byte + + if commitMessageParam == NoCommitMessage { + commitMessage = nil + } else if len(commitMessageParam) == 0 { + + } else { + if len(commitMessageParam) > MAX_MESSAGE_SIZE { + return fmt.Errorf("commit message too long (max %d characters)", MAX_MESSAGE_SIZE) + } + commitMessage = []byte(commitMessageParam) + } + // Preprocess variables serverURL = strings.TrimRight(serverURL, "/") @@ -81,13 +98,45 @@ func runSync(cmd *cobra.Command, args []string) error { versionRaw := strings.SplitN(remotePath, "@", 2) version := "latest" + + // permitted version formats: + // # + // @ # just the at sign + // @latest + // @1 + // @30 + // @v1 + // @v30 + // @latest-1 + // @latest-20 + // + + // NOT permitted: + // @latest1 + // @latest+1 + // Therefore this is a good regexp for this https://regex101.com/r/LooJFS/1 /^(latest-\d+)|(latest)|v?(\d+)$/ if len(versionRaw) == 2 { - version = strings.TrimPrefix(strings.ToLower(versionRaw[1]), "v") + verStr := strings.ToLower(strings.TrimPrefix(versionRaw[1], "v")) remotePath = versionRaw[0] - versionCheck, _ := strconv.Atoi(version) - if strings.HasPrefix(version, "latest") && versionCheck <= 0 { - return fmt.Errorf("invalid version specified: %s (must be `latest`, `latest-`, or `` where the number is greater than 0)", version) + + if !strings.HasPrefix(verStr, "latest") && !strings.HasPrefix(verStr, "latest-") { + // Accept plain numbers + if _, err := strconv.Atoi(verStr); err != nil { + return fmt.Errorf("invalid version specified: %s (must be `latest`, `latest-`, or ``)", verStr) + } + } else { + // Accept latest or latest-N + if !strings.HasPrefix(verStr, "latest") { + return fmt.Errorf("invalid version specified: %s (must be `latest`, `latest-`, or ``)", verStr) + } + if strings.HasPrefix(verStr, "latest-") { + numStr := strings.TrimPrefix(verStr, "latest-") + if n, err := strconv.Atoi(numStr); err != nil || n <= 0 { + return fmt.Errorf("invalid version specified: %s (must be `latest`, `latest-`, or `` where number > 0)", verStr) + } + } } + version = verStr } visibility := 0 @@ -111,6 +160,7 @@ func runSync(cmd *cobra.Command, args []string) error { ReplicaPath: remotePath, // For LOCAL TO LOCAL, remotePath is actually the replica path Version: version, // Could be extended to parse @version syntax Operation: operation, + CommitMessage: commitMessage, SetVisibility: visibility, DryRun: dryRun, Logger: logger, @@ -220,8 +270,9 @@ func setupLogger() { func init() { rootCmd.Flags().StringVar(&pullKey, "pullKey", "", "Authentication key for PULL operations") rootCmd.Flags().StringVar(&pushKey, "pushKey", "", "Authentication key for PUSH operations") + rootCmd.Flags().StringVarP(&commitMessageParam, "message", "m", NoCommitMessage, "Commit message for the PUSH operation") rootCmd.Flags().StringVar(&replicaID, "replicaID", "", "Replica ID for the remote database") - rootCmd.Flags().StringVarP(&serverURL, "server", "s", "wss://sqlrsync.com", "Server URL for operations") + rootCmd.Flags().StringVarP(&serverURL, "server", "s", "wss://sqlrsync.com", "Server URL for remote operations") rootCmd.Flags().BoolVar(&subscribing, "subscribe", false, "Enable subscription to PULL changes") rootCmd.Flags().BoolVar(&verbose, "verbose", false, "Enable verbose logging") rootCmd.Flags().BoolVar(&SetUnlisted, "unlisted", false, "Enable unlisted access to the replica (initial PUSH only)") diff --git a/client/remote/client.go b/client/remote/client.go index a0ca76e..8a5f86b 100644 --- a/client/remote/client.go +++ b/client/remote/client.go @@ -21,6 +21,7 @@ const ( SQLRSYNC_CONFIG = 0x51 // Send to keys and replicaID SQLRSYNC_NEWREPLICAVERSION = 0x52 // New version available SQLRSYNC_KEYREQUEST = 0x53 // request keys + SQLRSYNC_COMMITMESSAGE = 0x54 // commit message ) // ProgressPhase represents the current phase of the sync operation @@ -132,9 +133,9 @@ func NewTrafficInspector(logger *zap.Logger, depth int) *TrafficInspector { } // InspectOutbound logs outbound traffic (Go → Remote) and returns true if it's ORIGIN_END -func (t *TrafficInspector) InspectOutbound(data []byte, enableLogging bool) bool { +func (t *TrafficInspector) InspectOutbound(data []byte, enableLogging bool) string { if len(data) == 0 { - return false + return "" } msgType := t.parseMessageType(data) @@ -153,7 +154,7 @@ func (t *TrafficInspector) InspectOutbound(data []byte, enableLogging bool) bool } // Return whether this is an ORIGIN_END message - return msgType == "ORIGIN_END" + return msgType } // InspectInbound logs inbound traffic (Remote → Go) and returns true if it's ORIGIN_END @@ -389,18 +390,19 @@ func (t *TrafficInspector) parseMessageType(data []byte) string { // Config holds the configuration for the remote WebSocket client type Config struct { - ServerURL string - Version string - ReplicaID string - Subscribe bool - SetVisibility int // for PUSH - Timeout int // in milliseconds - Logger *zap.Logger - EnableTrafficInspection bool // Enable detailed traffic logging - InspectionDepth int // How many bytes to inspect (default: 32) - PingPong bool - AuthToken string - SendKeyRequest bool // the -sqlrsync file doesn't exist, so make a token + ServerURL string + Version string + ReplicaID string + Subscribe bool + SetVisibility int // for PUSH + CommitMessage []byte + Timeout int // in milliseconds + Logger *zap.Logger + EnableTrafficInspection bool // Enable detailed traffic logging + InspectionDepth int // How many bytes to inspect (default: 32) + PingPong bool + AuthToken string + SendKeyRequest bool // the -sqlrsync file doesn't exist, so make a token SendConfigCmd bool // we don't have the version number or remote path LocalHostname string @@ -863,11 +865,23 @@ func (c *Client) isSyncCompleted() bool { // handleOutboundTraffic inspects outbound data and handles sync completion detection func (c *Client) handleOutboundTraffic(data []byte) { // Always inspect for protocol messages (sync completion detection) - isOriginEnd := c.inspector.InspectOutbound(data, c.config.EnableTrafficInspection) - if isOriginEnd { + outboundCommand := c.inspector.InspectOutbound(data, c.config.EnableTrafficInspection) + if outboundCommand == "ORIGIN_END" { c.logger.Info("ORIGIN_END detected - sync completing") c.setSyncCompleted(true) } + if outboundCommand == "ORIGIN_BEGIN" { + if len(c.config.CommitMessage) > 0 { + length := len(c.config.CommitMessage) + // Encode length as 2 bytes (big-endian), 2 bytes is ~65k max + lenBytes := []byte{ + byte(length >> 8), + byte(length), + } + c.writeQueue <- append([]byte{SQLRSYNC_COMMITMESSAGE}, append(lenBytes, c.config.CommitMessage...)...) + c.config.CommitMessage = nil + } + } // Handle progress tracking if c.config.ProgressCallback != nil { @@ -1357,15 +1371,6 @@ func (c *Client) writeLoop() { // Set write deadline conn.SetWriteDeadline(time.Now().Add(30 * time.Second)) - if c.config.SendConfigCmd { - conn.WriteMessage(websocket.BinaryMessage, []byte{SQLRSYNC_CONFIG}) - c.config.SendConfigCmd = false - } - if c.config.SendKeyRequest { - conn.WriteMessage(websocket.BinaryMessage, []byte{SQLRSYNC_KEYREQUEST}) - c.config.SendKeyRequest = false - } - // Inspect raw WebSocket outbound traffic c.inspector.LogWebSocketTraffic(data, "OUT (Client → Server)", c.config.EnableTrafficInspection) @@ -1383,6 +1388,18 @@ func (c *Client) writeLoop() { return } + // consider moving this to InspectorOutbound + + // Do this here so ORIGIN_BEGIN sends first + if c.config.SendConfigCmd { + conn.WriteMessage(websocket.BinaryMessage, []byte{SQLRSYNC_CONFIG}) + c.config.SendConfigCmd = false + } + if c.config.SendKeyRequest { + conn.WriteMessage(websocket.BinaryMessage, []byte{SQLRSYNC_KEYREQUEST}) + c.config.SendKeyRequest = false + } + c.logger.Debug("Sent message to remote", zap.Int("bytes", len(data))) } } diff --git a/client/sync/coordinator.go b/client/sync/coordinator.go index a558f40..18aa1df 100644 --- a/client/sync/coordinator.go +++ b/client/sync/coordinator.go @@ -42,6 +42,7 @@ type CoordinatorConfig struct { Version string Operation Operation SetVisibility int + CommitMessage []byte DryRun bool Logger *zap.Logger Verbose bool @@ -360,18 +361,18 @@ func (c *Coordinator) executePull(isSubscription bool) error { // Create remote client for WebSocket transport remoteClient, err := remote.New(&remote.Config{ - ServerURL: serverURL + "/sapi/pull/" + remotePath, - AuthToken: authResult.AccessToken, - ReplicaID: authResult.ReplicaID, - Timeout: 8000, - PingPong: false, // No ping/pong needed for single sync - Logger: c.logger.Named("remote"), - Subscribe: false, // Subscription handled separately - EnableTrafficInspection: c.config.Verbose, - InspectionDepth: 5, - Version: version, - SendConfigCmd: true, - SendKeyRequest: c.authResolver.CheckNeedsDashFile(c.config.LocalPath, remotePath), + ServerURL: serverURL + "/sapi/pull/" + remotePath, + AuthToken: authResult.AccessToken, + ReplicaID: authResult.ReplicaID, + Timeout: 8000, + PingPong: false, // No ping/pong needed for single sync + Logger: c.logger.Named("remote"), + Subscribe: false, // Subscription handled separately + EnableTrafficInspection: c.config.Verbose, + InspectionDepth: 5, + Version: version, + SendConfigCmd: true, + SendKeyRequest: c.authResolver.CheckNeedsDashFile(c.config.LocalPath, remotePath), //ProgressCallback: remote.DefaultProgressCallback(remote.FormatSimple), ProgressCallback: nil, ProgressConfig: &remote.ProgressConfig{ @@ -396,9 +397,9 @@ func (c *Coordinator) executePull(isSubscription bool) error { // Create local client for SQLite operations localClient, err := bridge.New(&bridge.BridgeConfig{ - DatabasePath: c.config.LocalPath, - DryRun: c.config.DryRun, - Logger: c.logger.Named("local"), + DatabasePath: c.config.LocalPath, + DryRun: c.config.DryRun, + Logger: c.logger.Named("local"), EnableSQLiteRsyncLogging: c.config.Verbose, }) if err != nil { @@ -458,9 +459,9 @@ func (c *Coordinator) executePush() error { // Create local client for SQLite operations localClient, err := bridge.New(&bridge.BridgeConfig{ - DatabasePath: c.config.LocalPath, - DryRun: c.config.DryRun, - Logger: c.logger.Named("local"), + DatabasePath: c.config.LocalPath, + DryRun: c.config.DryRun, + Logger: c.logger.Named("local"), EnableSQLiteRsyncLogging: c.config.Verbose, }) if err != nil { @@ -497,6 +498,7 @@ func (c *Coordinator) executePush() error { SendKeyRequest: c.authResolver.CheckNeedsDashFile(c.config.LocalPath, remotePath), SendConfigCmd: true, SetVisibility: c.config.SetVisibility, + CommitMessage: c.config.CommitMessage, ProgressCallback: nil, //remote.DefaultProgressCallback(remote.FormatSimple), ProgressConfig: &remote.ProgressConfig{ Enabled: true, From 3893a0cff501e6602fe55a97aa27f6f796c27695 Mon Sep 17 00:00:00 2001 From: pnwmatt <180812017+pnwmatt@users.noreply.github.com> Date: Wed, 1 Oct 2025 23:53:26 -0700 Subject: [PATCH 4/4] simplify commit message --- client/main.go | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/client/main.go b/client/main.go index df2deb9..799237d 100644 --- a/client/main.go +++ b/client/main.go @@ -31,7 +31,6 @@ var ( showVersion bool ) -var NoCommitMessage = "" var MAX_MESSAGE_SIZE = 4096 var rootCmd = &cobra.Command{ @@ -76,10 +75,8 @@ func runSync(cmd *cobra.Command, args []string) error { var commitMessage []byte - if commitMessageParam == NoCommitMessage { + if len(commitMessageParam) == 0 { commitMessage = nil - } else if len(commitMessageParam) == 0 { - } else { if len(commitMessageParam) > MAX_MESSAGE_SIZE { return fmt.Errorf("commit message too long (max %d characters)", MAX_MESSAGE_SIZE) @@ -270,7 +267,7 @@ func setupLogger() { func init() { rootCmd.Flags().StringVar(&pullKey, "pullKey", "", "Authentication key for PULL operations") rootCmd.Flags().StringVar(&pushKey, "pushKey", "", "Authentication key for PUSH operations") - rootCmd.Flags().StringVarP(&commitMessageParam, "message", "m", NoCommitMessage, "Commit message for the PUSH operation") + rootCmd.Flags().StringVarP(&commitMessageParam, "message", "m", "", "Commit message for the PUSH operation") rootCmd.Flags().StringVar(&replicaID, "replicaID", "", "Replica ID for the remote database") rootCmd.Flags().StringVarP(&serverURL, "server", "s", "wss://sqlrsync.com", "Server URL for remote operations") rootCmd.Flags().BoolVar(&subscribing, "subscribe", false, "Enable subscription to PULL changes")