diff --git a/.gitignore b/.gitignore index 4292076..7fc3c42 100644 --- a/.gitignore +++ b/.gitignore @@ -10,4 +10,6 @@ **/.claude/ CLAUDE.md -**/CLAUDE.md \ No newline at end of file +**/CLAUDE.md + +tmp/ diff --git a/bridge/cgo_bridge.go b/bridge/cgo_bridge.go index e40d383..ca2357f 100644 --- a/bridge/cgo_bridge.go +++ b/bridge/cgo_bridge.go @@ -174,7 +174,9 @@ func go_local_read_callback(userData unsafe.Pointer, buffer *C.uint8_t, size C.i if err.Error() != "connection lost" && err.Error() != "sync completed" { client.Logger.Error("Connection to server had a failure. Are you online? Read callback error", zap.Error(err)) } - return -1 + // For sync completion errors, return 0 to signal EOF gracefully + // This allows sqlite_rsync to finish processing any buffered data + return 0 } client.Logger.Debug("Read callback", zap.Int("bytesRead", bytesRead)) diff --git a/bridge/sqlite_rsync_wrapper.c b/bridge/sqlite_rsync_wrapper.c index 6bf7aa6..eeba020 100644 --- a/bridge/sqlite_rsync_wrapper.c +++ b/bridge/sqlite_rsync_wrapper.c @@ -277,12 +277,13 @@ static void *websocket_read_thread(void *arg) } else if (bytes_read == 0) { - // No more data, close write end to signal EOF + // 0 bytes indicates EOF from the Go callback (sync completed) + // Close write end to signal EOF to sqlite_rsync break; } else { - // Error occurred + // Negative value indicates a real error break; } } diff --git a/client/config.go b/client/config.go index cae4dd6..5b20ebe 100644 --- a/client/config.go +++ b/client/config.go @@ -11,26 +11,26 @@ import ( "github.com/BurntSushi/toml" ) +// .config/sqlrsync/defaults.toml type DefaultsConfig struct { Defaults struct { Server string `toml:"server"` } `toml:"defaults"` } +// .config/sqlrsync/local-secrets.toml type LocalSecretsConfig struct { - Local struct { - Hostname string `toml:"hostname"` - DefaultClientSideEncryptionKey string `toml:"defaultClientSideEncryptionKey"` - } `toml:"local"` SQLRsyncDatabases []SQLRsyncDatabase `toml:"sqlrsync-databases"` } type SQLRsyncDatabase struct { - Path string `toml:"path"` - ReplicaID string `toml:"replicaID,omitempty"` - ClientSideEncryptionKey string `toml:"clientSideEncryptionKey,omitempty"` - LastUpdated time.Time `toml:"lastUpdated,omitempty"` - Server string `toml:"server,omitempty"` + LocalPath string `toml:"path,omitempty"` + Server string `toml:"server"` + CustomerSuppliedEncryptionKey string `toml:"customerSuppliedEncryptionKey,omitempty"` + ReplicaID string `toml:"replicaID"` + RemotePath string `toml:"remotePath,omitempty"` + PushKey string `toml:"pushKey,omitempty"` + LastPush time.Time `toml:"lastPush,omitempty"` } // DashSQLRsync manages the -sqlrsync file for a database @@ -38,6 +38,8 @@ type DashSQLRsync struct { DatabasePath string RemotePath string PullKey string + Server string + ReplicaID string } func GetConfigDir() (string, error) { @@ -176,7 +178,7 @@ func SaveLocalSecretsConfig(config *LocalSecretsConfig) error { func (c *LocalSecretsConfig) FindDatabaseByPath(path string) *SQLRsyncDatabase { for i := range c.SQLRsyncDatabases { - if c.SQLRsyncDatabases[i].Path == path { + if c.SQLRsyncDatabases[i].LocalPath == path { return &c.SQLRsyncDatabases[i] } } @@ -185,7 +187,7 @@ func (c *LocalSecretsConfig) FindDatabaseByPath(path string) *SQLRsyncDatabase { func (c *LocalSecretsConfig) UpdateOrAddDatabase(db SQLRsyncDatabase) { for i := range c.SQLRsyncDatabases { - if c.SQLRsyncDatabases[i].Path == db.Path { + if c.SQLRsyncDatabases[i].LocalPath == db.LocalPath { // Update existing database c.SQLRsyncDatabases[i] = db return @@ -197,7 +199,7 @@ func (c *LocalSecretsConfig) UpdateOrAddDatabase(db SQLRsyncDatabase) { func (c *LocalSecretsConfig) RemoveDatabase(path string) { for i, db := range c.SQLRsyncDatabases { - if db.Path == path { + if db.LocalPath == path { // Remove database from slice c.SQLRsyncDatabases = append(c.SQLRsyncDatabases[:i], c.SQLRsyncDatabases[i+1:]...) return @@ -205,16 +207,12 @@ func (c *LocalSecretsConfig) RemoveDatabase(path string) { } } -func (c *LocalSecretsConfig) SetHostname(hostname string) { - c.Local.Hostname = hostname -} - -func (c *LocalSecretsConfig) SetDefaultEncryptionKey(key string) { - c.Local.DefaultClientSideEncryptionKey = key -} - // NewDashSQLRsync creates a new DashSQLRsync instance for the given database path func NewDashSQLRsync(databasePath string) *DashSQLRsync { + if(strings.Contains(databasePath, "@")) { + databasePath = strings.Split(databasePath, "@")[0] + } + return &DashSQLRsync{ DatabasePath: databasePath, } @@ -246,21 +244,27 @@ func (d *DashSQLRsync) Read() error { scanner := bufio.NewScanner(file) for scanner.Scan() { line := strings.TrimSpace(scanner.Text()) - + if strings.HasPrefix(line, "#") || line == "" { continue } - + if strings.HasPrefix(line, "sqlrsync ") { parts := strings.Fields(line) if len(parts) >= 2 { d.RemotePath = parts[1] } - + for _, part := range parts { if strings.HasPrefix(part, "--pullKey=") { d.PullKey = strings.TrimPrefix(part, "--pullKey=") } + if strings.HasPrefix(part, "--replicaID=") { + d.ReplicaID = strings.TrimPrefix(part, "--replicaID=") + } + if strings.HasPrefix(part, "--server=") { + d.Server = strings.TrimPrefix(part, "--server=") + } } break } @@ -270,14 +274,17 @@ func (d *DashSQLRsync) Read() error { } // Write writes the -sqlrsync file with the given remote path and pull key -func (d *DashSQLRsync) Write(remotePath string, pullKey string) error { +func (d *DashSQLRsync) Write(remotePath string, localName string, replicaID string, pullKey string, serverURL string) error { d.RemotePath = remotePath d.PullKey = pullKey + localNameTree := strings.Split(localName, "/") + localName = localNameTree[len(localNameTree)-1] + content := fmt.Sprintf(`#!/bin/bash -# https://sqlrsync.com/docs/pullfile -sqlrsync %s --pullKey=%s -`, remotePath, pullKey) +# https://sqlrsync.com/docs/-sqlrsync +sqlrsync %s %s --replicaID=%s --pullKey=%s --server=%s +`, remotePath, localName, replicaID, pullKey, serverURL) if err := os.WriteFile(d.FilePath(), []byte(content), 0755); err != nil { return fmt.Errorf("failed to write -sqlrsync file: %w", err) @@ -292,4 +299,4 @@ func (d *DashSQLRsync) Remove() error { return nil } return os.Remove(d.FilePath()) -} \ No newline at end of file +} diff --git a/client/main.go b/client/main.go index ae7ce4c..08c34bb 100644 --- a/client/main.go +++ b/client/main.go @@ -23,12 +23,15 @@ var ( serverURL string verbose bool dryRun bool + setPublic bool timeout int logger *zap.Logger inspectTraffic bool inspectionDepth int newReadToken bool - authToken string + pullKey string + pushKey string + replicaID string ) var rootCmd = &cobra.Command{ @@ -175,22 +178,24 @@ func runSync(cmd *cobra.Command, args []string) error { } } else if len(args) == 1 { // One argument: either ORIGIN (push/pull depends on ~.config & -sqlrsync) or REPLICA (for pull) - path := args[0] + path := args[0] if isLocal(path) { // IF ORIGIN:LOCAL (no REPLICA) - varies localSecretsConfig, err := LoadLocalSecretsConfig() if err != nil { return fmt.Errorf("failed to load local secrets config: %w", err) } - - // If we have a push key for this database, use it to push - pushedDBInfo := localSecretsConfig.FindDatabaseByPath(path) - if pushedDBInfo != nil && pushedDBInfo.ReplicaID != "" { - - return runPushSync(path, pushedDBInfo.ReplicaID) + // Get absolute path for the local database + absPath, err := filepath.Abs(path) + if err == nil { + // If we have a push key for this database, use it to push + pushedDBInfo := localSecretsConfig.FindDatabaseByPath(absPath) + if pushedDBInfo != nil && pushedDBInfo.PushKey != "" && pushedDBInfo.Server == serverURL { + pushKey = pushedDBInfo.PushKey + return runPushSync(absPath, pushedDBInfo.RemotePath) + } } - // else if there is a -sqlrsync file, do a pull instead dashSQLRsync := NewDashSQLRsync(path) if dashSQLRsync.Exists() { @@ -200,7 +205,16 @@ func runSync(cmd *cobra.Command, args []string) error { if dashSQLRsync.RemotePath == "" { return fmt.Errorf("invalid -sqlrsync file: missing remote path") } - return runPullSync(dashSQLRsync.RemotePath, path) + if dashSQLRsync.Server == serverURL { + localPath := "" + version := "latest" + localPath, version, _ = strings.Cut(path, "@") + + pullKey = dashSQLRsync.PullKey + replicaID = dashSQLRsync.ReplicaID + serverURL = dashSQLRsync.Server + return runPullSync(dashSQLRsync.RemotePath+"@"+version, localPath) + } } // else push this file up @@ -268,12 +282,6 @@ func runPushSync(localPath string, remotePath string) error { return fmt.Errorf("database file does not exist: %s", localPath) } - // Load defaults config - defaultsConfig, err := LoadDefaultsConfig() - if err != nil { - return fmt.Errorf("failed to load defaults config: %w", err) - } - // Load local secrets config localSecretsConfig, err := LoadLocalSecretsConfig() if err != nil { @@ -291,7 +299,18 @@ func runPushSync(localPath string, remotePath string) error { if dbConfig == nil { // Create new database entry dbConfig = &SQLRsyncDatabase{ - Path: absLocalPath, + LocalPath: absLocalPath, + Server: serverURL, + } + } else { + if serverURL == "" { + serverURL = dbConfig.Server + } + if pushKey == "" { + pushKey = dbConfig.PushKey + } + if remotePath == "" { + remotePath = dbConfig.RemotePath } } @@ -307,9 +326,10 @@ func runPushSync(localPath string, remotePath string) error { } // Check if we have a push key for this database - if os.Getenv("SQLRSYNC_TOKEN") == "" && authToken == "" { - fmt.Println("No Key provided. Creating a new Replica? Get a key at https://sqlrsync.com/namespaces") - fmt.Print("Enter an Account Admin Key to create a new Replica: ") + if os.Getenv("SQLRSYNC_ADMIN_KEY") == "" && pushKey == "" { + httpServer := strings.Replace(serverURL, "ws", "http", 1) + fmt.Println("No Key provided. Creating a new Replica? Get a key at " + httpServer + "/namespaces") + fmt.Print(" Enter an Account Admin Key to create a new Replica: ") reader := bufio.NewReader(os.Stdin) token, err := reader.ReadString('\n') if err != nil { @@ -320,20 +340,8 @@ func runPushSync(localPath string, remotePath string) error { if token == "" { return fmt.Errorf("push key cannot be empty") } - authToken = token - - // account admin tokens are 24 and are stashed for the session - if len(token) == 24 { - os.Setenv("SQLRSYNC_TOKEN", token) - } - } - - // Use server from database config, or defaults if not set - if dbConfig.Server == "" { - dbConfig.Server = defaultsConfig.Defaults.Server - } - if serverURL == "" { - serverURL = dbConfig.Server + pushKey = token + fmt.Println() } logger.Info("Starting push synchronization to sqlrsync.com", @@ -342,6 +350,8 @@ func runPushSync(localPath string, remotePath string) error { zap.String("server", serverURL), zap.Bool("dryRun", dryRun)) + fmt.Println("PUSHing up to " + serverURL + " ...") + // Create local client for SQLite operations localClient, err := bridge.New(&bridge.Config{ DatabasePath: localPath, @@ -354,20 +364,20 @@ func runPushSync(localPath string, remotePath string) error { defer localClient.Close() localHostname, _ := os.Hostname() - fmt.Println("Using hostname", localHostname, "and abs path", absLocalPath) // Create remote client for WebSocket transport remoteClient, err := remote.New(&remote.Config{ ServerURL: serverURL + "/sapi/push/" + remotePath, PingPong: false, Timeout: timeout, - AuthToken: authToken, + AuthToken: pushKey, Logger: logger.Named("remote"), EnableTrafficInspection: inspectTraffic, LocalHostname: localHostname, LocalAbsolutePath: absLocalPath, InspectionDepth: inspectionDepth, - RequestReadToken: needsReadToken(localPath), + SendConfigCmd: needsToBuildDashSQLRSyncFile(localPath, remotePath), + SetPublic: setPublic, }) if err != nil { @@ -396,21 +406,34 @@ func runPushSync(localPath string, remotePath string) error { return fmt.Errorf("push synchronization failed: %w", err) } - // Update database config with latest info - dbConfig.LastUpdated = time.Now() - localSecretsConfig.UpdateOrAddDatabase(*dbConfig) + logger.Info("Push synchronization completed successfully") + dbConfig.LastPush = time.Now() + if remoteClient.GetNewPushKey() != "" { + fmt.Println("🔑 This database is now PUSH-enabled on this system.") + fmt.Println(" A new, replica-specific PUSH key has been stored at ~/.config/sqlrsync/local-secrets.toml") + dbConfig.ReplicaID = remoteClient.GetReplicaID() + dbConfig.RemotePath = remoteClient.GetReplicaPath() + dbConfig.PushKey = remoteClient.GetNewPushKey() + } + localSecretsConfig.UpdateOrAddDatabase(*dbConfig) // Save the updated config if err := SaveLocalSecretsConfig(localSecretsConfig); err != nil { logger.Warn("Failed to save local secrets config", zap.Error(err)) } - logger.Info("Push synchronization completed successfully") - if needsReadToken(localPath) { - token := remoteClient.GetNewReadToken() + if setPublic { + fmt.Println("🌐 This replica is now publicly accessible.") + fmt.Println(" Share this database with sqlrsync.com/" + remoteClient.GetReplicaPath()) + } + + if needsToBuildDashSQLRSyncFile(localPath, remotePath) { + token := remoteClient.GetNewPullKey() + replicaID := remoteClient.GetReplicaID() + replicaPath := remoteClient.GetReplicaPath() dashSQLRsync := NewDashSQLRsync(localPath) - if err := dashSQLRsync.Write(remotePath, token); err != nil { + if err := dashSQLRsync.Write(replicaPath, localPath, replicaID, token, serverURL); err != nil { return fmt.Errorf("failed to create shareable config file: %w", err) } fmt.Println("🔑 Shareable config file created:", dashSQLRsync.FilePath()) @@ -441,13 +464,15 @@ func isValidVersion(version string) bool { return false } -func needsReadToken(path string) bool { +func needsToBuildDashSQLRSyncFile(filepath string, remotePath string) bool { if !newReadToken { return false } + + dashSQLRsync := NewDashSQLRsync(filepath) + dashSQLRsync.Read() // check if the {path}-sqlrsync file exists - dashSQLRsync := NewDashSQLRsync(path) - return !dashSQLRsync.Exists() + return !(dashSQLRsync.Exists() && dashSQLRsync.RemotePath == remotePath) } func runPullSync(remotePath string, localPath string) error { @@ -458,10 +483,12 @@ func runPullSync(remotePath string, localPath string) error { zap.Bool("dryRun", dryRun)) version := "latest" - // if remotePath has an @, then we want to pass that version through if strings.Contains(remotePath, "@") { remotePath, version, _ = strings.Cut(remotePath, "@") + if version == "" { + version = "latest" + } // if version is not a number, `latest`, or `latest-` then error if !isValidVersion(version) { @@ -469,51 +496,20 @@ func runPullSync(remotePath string, localPath string) error { } } - // Load defaults config - defaultsConfig, err := LoadDefaultsConfig() - if err != nil { - return fmt.Errorf("failed to load defaults config: %w", err) - } - - // Load local secrets config - localSecretsConfig, err := LoadLocalSecretsConfig() - if err != nil { - return fmt.Errorf("failed to load local secrets config: %w", err) - } - - // Get absolute path for the local database - absLocalPath, err := filepath.Abs(localPath) - if err != nil { - return fmt.Errorf("failed to get absolute path: %w", err) - } - - // Find or create database entry - dbConfig := localSecretsConfig.FindDatabaseByPath(absLocalPath) - if dbConfig == nil { - // Create new database entry - dbConfig = &SQLRsyncDatabase{ - Path: absLocalPath, - } - } - - // Use server from database config, or defaults if not set - if dbConfig.Server == "" { - dbConfig.Server = defaultsConfig.Defaults.Server - } - if serverURL == "" { - serverURL = dbConfig.Server - } + fmt.Println("PULLing down from " + serverURL + "/" + remotePath + "@" + version + " ...") // Create remote client for WebSocket transport remoteClient, err := remote.New(&remote.Config{ ServerURL: serverURL + "/sapi/pull/" + remotePath, + AuthToken: pullKey, + ReplicaID: replicaID, Timeout: timeout, PingPong: false, Logger: logger.Named("remote"), EnableTrafficInspection: inspectTraffic, InspectionDepth: inspectionDepth, Version: version, - RequestReadToken: needsReadToken(localPath), + SendConfigCmd: needsToBuildDashSQLRSyncFile(localPath, remotePath), }) if err != nil { return fmt.Errorf("failed to create remote client: %w", err) @@ -522,7 +518,7 @@ func runPullSync(remotePath string, localPath string) error { // Connect to remote server if err := remoteClient.Connect(); err != nil { - return fmt.Errorf("failed to connect to pull from server: %w", err) + return fmt.Errorf("%w", err) } // Create local client for SQLite operations @@ -541,24 +537,15 @@ func runPullSync(remotePath string, localPath string) error { return fmt.Errorf("pull synchronization failed: %w", err) } - if needsReadToken(localPath) { - token := remoteClient.GetNewReadToken() - + if needsToBuildDashSQLRSyncFile(localPath, remotePath) { + token := remoteClient.GetNewPullKey() dashSQLRsync := NewDashSQLRsync(localPath) - if err := dashSQLRsync.Write(remotePath, token); err != nil { + replicaID := remoteClient.GetReplicaID() + if err := dashSQLRsync.Write(remotePath, localPath, replicaID, token, serverURL); err != nil { return fmt.Errorf("failed to create shareable config file: %w", err) } } - // Update database config with latest info - dbConfig.LastUpdated = time.Now() - localSecretsConfig.UpdateOrAddDatabase(*dbConfig) - - // Save the updated config - if err := SaveLocalSecretsConfig(localSecretsConfig); err != nil { - logger.Warn("Failed to save local secrets config", zap.Error(err)) - } - logger.Info("Pull synchronization completed successfully") return nil } @@ -608,9 +595,12 @@ func Execute() error { } func init() { - rootCmd.Flags().StringVar(&authToken, "authKey", "", "Authentication key for push/pull operations") - rootCmd.Flags().StringVarP(&serverURL, "server", "s", "", "Server URL for push/pull operations (defaults to value in config)") + rootCmd.Flags().StringVar(&pullKey, "pullKey", "", "Authentication key for pull operations") + rootCmd.Flags().StringVar(&pushKey, "pushKey", "", "Authentication key for push operations") + rootCmd.Flags().StringVar(&replicaID, "replicaID", "", "Replica ID for the remote database (overwrites the REMOTE path)") + rootCmd.Flags().StringVarP(&serverURL, "server", "s", "wss://sqlrsync.com", "Server URL for push/pull operations") rootCmd.Flags().BoolVarP(&verbose, "verbose", "v", false, "Enable verbose logging") + rootCmd.Flags().BoolVar(&setPublic, "public", false, "Enable public access to the replica (only for push operations)") rootCmd.Flags().BoolVar(&newReadToken, "storeNewReadToken", true, "After syncing, the server creates a new read-only token that is stored in the -sqlrsync file adjacent to the local database") rootCmd.Flags().BoolVar(&dryRun, "dry", false, "Perform a dry run without making changes") rootCmd.Flags().IntVarP(&timeout, "timeout", "t", 8000, "Connection timeout in milliseconds (Max 10 seconds)") diff --git a/client/remote/client.go b/client/remote/client.go index 1671e81..26aca7d 100644 --- a/client/remote/client.go +++ b/client/remote/client.go @@ -15,7 +15,7 @@ import ( ) const ( - SQLRSYNC_NEEDREADKEY = 0x51 // Send to request a new read key + SQLRSYNC_CONFIG = 0x51 // Send to keys and replicaID ) // TrafficInspector provides traffic inspection and protocol message detection @@ -143,7 +143,7 @@ func (t *TrafficInspector) parseMessageType(data []byte) string { case 0x67: // REPLICA_CONFIG return "REPLICA_CONFIG" case 0x51: - return "SQLRSYNC_NEEDREADKEY" + return "SQLRSYNC_CONFIG" default: // For unknown messages, classify by first byte if firstByte >= 32 && firstByte <= 126 { @@ -157,13 +157,15 @@ func (t *TrafficInspector) parseMessageType(data []byte) string { type Config struct { ServerURL string Version string + ReplicaID string + SetPublic bool // for PUSH Timeout int // in milliseconds Logger *zap.Logger EnableTrafficInspection bool // Enable detailed traffic logging InspectionDepth int // How many bytes to inspect (default: 32) PingPong bool AuthToken string - RequestReadToken bool // the -sqlrsync file doesn't exist, so make a token + SendConfigCmd bool // the -sqlrsync file doesn't exist, so make a token LocalHostname string LocalAbsolutePath string } @@ -201,7 +203,11 @@ type Client struct { syncMu sync.RWMutex // sqlrsync specific - NewReadToken string + NewPullKey string + NewPushKey string + ReplicaID string + ReplicaPath string + SetPublic bool } // New creates a new remote WebSocket client @@ -267,10 +273,19 @@ func (c *Client) Connect() error { if c.config.LocalAbsolutePath != "" { headers.Set("X-LocalAbsolutePath", c.config.LocalAbsolutePath) } + if c.config.Version != "" { + headers.Set("X-ReplicaVersion", strings.Replace(c.config.Version, "latest", "", 1)) + } + if c.config.ReplicaID != "" { + headers.Set("X-ReplicaID", c.config.ReplicaID) + } + + if c.config.SetPublic { + headers.Set("X-SetPublic", fmt.Sprintf("%t", c.config.SetPublic)) + } conn, response, err := dialer.DialContext(connectCtx, u.String(), headers) if err != nil { - fmt.Println("Failed to connect:", err) respStr, _ := io.ReadAll(response.Body) return fmt.Errorf("%s", respStr) } @@ -335,12 +350,6 @@ func (c *Client) Read(buffer []byte) (int, error) { return 0, fmt.Errorf("connection error: %w", lastErr) } - // If sync is completed and connection is not active, exit immediately - if c.isSyncCompleted() && !c.isConnected() { - c.logger.Debug("Sync completed and connection not active - exiting immediately") - return 0, nil - } - select { case <-c.ctx.Done(): return 0, fmt.Errorf("client context cancelled") @@ -635,13 +644,13 @@ func (c *Client) readLoop() { conn := c.conn c.mu.RUnlock() - if conn == nil { + if conn == nil || c.isSyncCompleted() { c.setConnected(false) return } // Set read deadline - conn.SetReadDeadline(time.Now().Add(60 * time.Second)) + conn.SetReadDeadline(time.Now().Add(9 * time.Second)) messageType, data, err := conn.ReadMessage() if err != nil { @@ -682,10 +691,29 @@ func (c *Client) readLoop() { } if messageType == websocket.TextMessage { - readKeyCmd := "NEWREADKEY=" + accessKeyLength := 22 + replicaIDLength := 18 + readPullKeyResp := "NEWPULLKEY=" + readPushKeyResp := "NEWPUSHKEY=" + replicaIDResp := "REPLICAID=" + replicaPathResp := "REPLICAPATH=" + // Handle text messages for NEWPULLKEY, NEWPUSHKEY, REPLICAID + // Example: "NEWPULLKEY=xxxxxxxxxxxxxxxxxxxxxx" strData := string(data) - if (len(data) >= len(readKeyCmd)+22) && strings.HasPrefix(strData, readKeyCmd) { - c.NewReadToken = strData[len(readKeyCmd) : len(readKeyCmd)+22] + if (len(data) >= len(readPullKeyResp)+accessKeyLength) && strings.HasPrefix(strData, readPullKeyResp) { + c.NewPullKey = strData[len(readPullKeyResp):] + c.logger.Debug("📥 Received new Pull Key:", zap.String("key", c.NewPullKey)) + } else if (len(data) >= len(readPushKeyResp)+accessKeyLength) && strings.HasPrefix(strData, readPushKeyResp) { + + c.NewPushKey = strData[len(readPushKeyResp):] + c.logger.Debug("📥 Received new Push Key:", zap.String("key", c.NewPushKey)) + } else if (len(data) >= len(replicaIDResp)+replicaIDLength) && strings.HasPrefix(strData, replicaIDResp) { + c.ReplicaID = strData[len(replicaIDResp):] + c.logger.Debug("📥 Received Replica ID:", zap.String("id", c.ReplicaID)) + } else if (len(data) >= len(replicaPathResp)) && strings.HasPrefix(strData, replicaPathResp) { + + c.ReplicaPath = strData[len(replicaPathResp):] + c.logger.Debug("📥 Received new Replica Path:", zap.String("path", c.ReplicaPath)) } continue } @@ -768,10 +796,10 @@ func (c *Client) writeLoop() { return } - if c.config.RequestReadToken { - conn.WriteMessage(websocket.BinaryMessage, []byte{SQLRSYNC_NEEDREADKEY}) - c.config.RequestReadToken = false - c.logger.Debug("🔑 Also asked for a new read token.") + if c.config.SendConfigCmd { + conn.WriteMessage(websocket.BinaryMessage, []byte{SQLRSYNC_CONFIG}) + c.config.SendConfigCmd = false + c.logger.Debug("🔑 Also asked for keys and replicaID.") } c.logger.Debug("Sent message to remote", zap.Int("bytes", len(data))) @@ -779,6 +807,17 @@ func (c *Client) writeLoop() { } } -func (c *Client) GetNewReadToken() string { - return c.NewReadToken +func (c *Client) GetNewPullKey() string { + return c.NewPullKey +} + +func (c *Client) GetNewPushKey() string { + return c.NewPushKey +} + +func (c *Client) GetReplicaID() string { + return c.ReplicaID +} +func (c *Client) GetReplicaPath() string { + return c.ReplicaPath }