diff --git a/.github/dependabot.yml b/.github/dependabot.yml new file mode 100644 index 0000000..8e8323f --- /dev/null +++ b/.github/dependabot.yml @@ -0,0 +1,11 @@ +# To get started with Dependabot version updates, you'll need to specify which +# package ecosystems to update and where the package manifests are located. +# Please see the documentation for all configuration options: +# https://docs.github.com/code-security/dependabot/dependabot-version-updates/configuration-options-for-the-dependabot.yml-file + +version: 2 +updates: + - package-ecosystem: "gomod" # See documentation for possible values + directory: "/client" # Location of package manifests + schedule: + interval: "weekly" diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index 7b05c4f..93b3d0c 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -1,16 +1,25 @@ name: Multi-Platform Build - on: - push: - branches: [main, develop] pull_request: - branches: [main] + branches: + - main + - "v*" release: types: [published] jobs: + securityIntention: + name: Security Intention + runs-on: ubuntu-latest + steps: + - name: Security Intention + run: | + echo "This workflow is intended to build the project in a secure manner:" + echo " - Only installs absolutely essential and trusted dependencies. (steps \"Install *\")" + echo " - Uses HTTPS for direct package downloads" + echo " - Only uses official Github Actions \"actions/*\"" build: - name: Build for ${{ matrix.os }} + name: Build for ${{ matrix.os }}-${{matrix.arch}} runs-on: ${{ matrix.runs-on }} strategy: matrix: @@ -24,15 +33,7 @@ jobs: - os: linux runs-on: ubuntu-latest arch: x86_64 - steps: - - name: Security Intention - run: | - echo "This workflow is intended to build the project in a secure manner:" - echo " - Only installs absolutely essential and trusted dependencies. (steps \"Install *\")" - echo " - Uses HTTPS for direct package downloads" - echo " - Only uses official Github Actions \"actions/*\"" - - name: Checkout code uses: actions/checkout@v4 @@ -133,6 +134,66 @@ jobs: # Use make with MSYS2/MinGW bash -c "make build" + - name: Test sqlrsync --version + run: | + echo "Testing sqlrsync --version..." + ./client/bin/sqlrsync --version + + - name: Test sqlrsync help + run: | + echo "Testing sqlrsync help..." + ./client/bin/sqlrsync || true + + - name: Test sqlrsync with usgs.gov/earthquakes.db + run: | + echo "Testing sqlrsync usgs.gov/earthquakes.db..." + ./client/bin/sqlrsync usgs.gov/earthquakes.db + + - name: Test sqlrsync with subscribe for 10 seconds (Linux) + if: matrix.os == 'linux' + run: | + echo "Testing sqlrsync usgs.gov/earthquakes.db --subscribe for 10 seconds..." + timeout 10s ./client/bin/sqlrsync usgs.gov/earthquakes.db --subscribe > subscribe_output.log 2>&1 || true + + - name: Test sqlrsync with subscribe for 10 seconds (macOS) + if: matrix.os == 'darwin' + run: | + echo "Testing sqlrsync usgs.gov/earthquakes.db --subscribe for 10 seconds..." + # macOS doesn't have timeout, use gtimeout or alternative + if command -v gtimeout &> /dev/null; then + gtimeout 10s ./client/bin/sqlrsync usgs.gov/earthquakes.db --subscribe > subscribe_output.log 2>&1 || true + else + # Fallback: run in background and kill after 10 seconds + ./client/bin/sqlrsync usgs.gov/earthquakes.db --subscribe > subscribe_output.log 2>&1 & + PID=$! + sleep 10 + kill $PID 2>/dev/null || true + wait $PID 2>/dev/null || true + fi + + - name: Test sqlrsync with subscribe for 10 seconds (Windows) + if: matrix.os == 'windows' + run: | + echo "Testing sqlrsync usgs.gov/earthquakes.db --subscribe for 10 seconds..." + # Windows doesn't have timeout, use PowerShell equivalent + $job = Start-Job { ./client/bin/sqlrsync.exe usgs.gov/earthquakes.db --subscribe } + Wait-Job $job -Timeout 10 + Stop-Job $job + Receive-Job $job > subscribe_output.log 2>&1 || $true + + - name: Verify subscribe output (Unix) + if: matrix.os != 'windows' + run: | + echo "Checking for 'Sync complete' in output..." + cat subscribe_output.log + if grep -q "Sync complete" subscribe_output.log; then + echo "✅ SUCCESS: Found 'Sync complete' in output" + else + echo "❌ FAILURE: 'Sync complete' not found in output" + echo "Full output:" + cat subscribe_output.log + exit 1 + fi - name: Create release directory run: | mkdir -p release @@ -162,7 +223,7 @@ jobs: release: if: github.ref == 'refs/heads/main' && github.event_name == 'push' needs: build - permissions: + permissions: contents: write packages: write issues: write @@ -170,39 +231,39 @@ jobs: actions: write runs-on: ubuntu-latest steps: - - uses: actions/checkout@v5 - - - name: Extract version from main.go - id: extract-version - run: | - VERSION=$(grep 'var VERSION = ' client/main.go | sed 's/var VERSION = "\(.*\)"/\1/') - echo "version=$VERSION" >> $GITHUB_OUTPUT - echo "Extracted version: $VERSION" - - - name: Check if tag exists - id: tag-check - run: | - VERSION=${{ steps.extract-version.outputs.version }} - if git rev-parse "v$VERSION" >/dev/null 2>&1; then - echo "Tag v$VERSION already exists" - echo "tag-created=false" >> $GITHUB_OUTPUT - else - echo "Tag v$VERSION does not exist, will create" - echo "tag-created=true" >> $GITHUB_OUTPUT - fi - - - name: Download all release artifacts - if: steps.tag-check.outputs.tag-created == 'true' - uses: actions/download-artifact@v5 - - - name: Create tag and GitHub Release, attach artifact - env: - GH_TOKEN: ${{ github.token }} - run: | - TAG=v${{ steps.extract-version.outputs.version }} - git config user.name "${{ github.actor }}" - git config user.email "${{ github.actor }}@users.noreply.github.com" - git tag -a $TAG -m "Release $TAG" - git push origin $TAG - # create the release and attach the artifact (gh CLI) - gh release create $TAG --generate-notes sqlrsync-*/sqlrsync-* \ No newline at end of file + - uses: actions/checkout@v5 + + - name: Extract version from main.go + id: extract-version + run: | + VERSION=$(grep 'var VERSION = ' client/main.go | sed 's/var VERSION = "\(.*\)"/\1/') + echo "version=$VERSION" >> $GITHUB_OUTPUT + echo "Extracted version: $VERSION" + + - name: Check if tag exists + id: tag-check + run: | + VERSION=${{ steps.extract-version.outputs.version }} + if git rev-parse "v$VERSION" >/dev/null 2>&1; then + echo "Tag v$VERSION already exists" + echo "tag-created=false" >> $GITHUB_OUTPUT + else + echo "Tag v$VERSION does not exist, will create" + echo "tag-created=true" >> $GITHUB_OUTPUT + fi + + - name: Download all release artifacts + if: steps.tag-check.outputs.tag-created == 'true' + uses: actions/download-artifact@v5 + + - name: Create tag and GitHub Release, attach artifact + env: + GH_TOKEN: ${{ github.token }} + run: | + TAG=v${{ steps.extract-version.outputs.version }} + git config user.name "${{ github.actor }}" + git config user.email "${{ github.actor }}@users.noreply.github.com" + git tag -a $TAG -m "Release $TAG" + git push origin $TAG + # create the release and attach the artifact (gh CLI) + gh release create $TAG --generate-notes sqlrsync-*/sqlrsync-* diff --git a/.gitignore b/.gitignore index 248f074..f81f746 100644 --- a/.gitignore +++ b/.gitignore @@ -16,3 +16,5 @@ tmp/ client/sqlrsync client/sqlrsync client/sqlrsync_simple +asciinema/ +examples/earthquakes/nohup.out diff --git a/bridge/cgo_bridge.go b/bridge/cgo_bridge.go index c8d9782..ab6970d 100644 --- a/bridge/cgo_bridge.go +++ b/bridge/cgo_bridge.go @@ -65,6 +65,36 @@ func cgoGetDatabaseInfo(dbPath string) (*DatabaseInfo, error) { return info, nil } +// SQLRSYNC +// CheckIntegrity checks the database integrity using PRAGMA integrity_check +func CheckIntegrity(dbPath string) (bool, string, error) { + return cgoCheckIntegrity(dbPath) +} + +// SQLRSYNC +// cgoCheckIntegrity checks the database integrity using PRAGMA integrity_check +func cgoCheckIntegrity(dbPath string) (bool, string, error) { + cDbPath := C.CString(dbPath) + defer C.free(unsafe.Pointer(cDbPath)) + + const errorMsgSize = 1024 + errorMsg := make([]byte, errorMsgSize) + cErrorMsg := (*C.char)(unsafe.Pointer(&errorMsg[0])) + + result := C.sqlite_rsync_check_integrity(cDbPath, cErrorMsg, C.int(errorMsgSize)) + + switch result { + case 0: + return true, "", nil // Database is OK + case 1: + return false, C.GoString(cErrorMsg), nil // Database is corrupted + default: + return false, C.GoString(cErrorMsg), &SQLiteRsyncError{ + Code: int(result), + Message: "failed to check database integrity", + } + } +} // RunOriginSync wraps the C function to run origin synchronization func RunOriginSync(dbPath string, dryRun bool, client *BridgeClient) error { diff --git a/bridge/client.go b/bridge/client.go index 43ccb5e..1bbc925 100644 --- a/bridge/client.go +++ b/bridge/client.go @@ -2,6 +2,7 @@ package bridge import ( "fmt" + "os" "go.uber.org/zap" ) @@ -70,6 +71,28 @@ func (c *BridgeClient) GetDatabaseInfo() (*DatabaseInfo, error) { return info, nil } +// CheckIntegrity checks the database integrity using PRAGMA integrity_check +func (c *BridgeClient) CheckIntegrity() { + c.Logger.Debug("Checking database integrity", zap.String("path", c.Config.DatabasePath)) + + if _, err := os.Stat(c.Config.DatabasePath); os.IsNotExist(err) { + c.Logger.Fatal("database file does not exist", zap.String("path", c.Config.DatabasePath)) + } + + isOk, errorMsg, err := CheckIntegrity(c.Config.DatabasePath) + if err != nil { + c.Logger.Fatal("fatal error while checking integrity", zap.Error(err)) + } + + if !isOk { + c.Logger.Fatal("database integrity check failed", + zap.String("database", c.Config.DatabasePath), + zap.String("error", errorMsg)) + } + + c.Logger.Debug("Database integrity check passed", zap.String("database", c.Config.DatabasePath)) +} + // RunPushSync runs the origin-side synchronization with provided I/O functions func (c *BridgeClient) RunPushSync(readFunc ReadFunc, writeFunc WriteFunc) error { c.Logger.Info("Starting origin sync", zap.String("database", c.Config.DatabasePath)) @@ -117,7 +140,7 @@ func (c *BridgeClient) RunPullSync(readFunc ReadFunc, writeFunc WriteFunc) error return err } - c.Logger.Info("Replica sync completed successfully") + c.Logger.Info("Replica sync completed") return nil } diff --git a/bridge/sqlite_rsync_wrapper.c b/bridge/sqlite_rsync_wrapper.c index eeba020..771e4c5 100644 --- a/bridge/sqlite_rsync_wrapper.c +++ b/bridge/sqlite_rsync_wrapper.c @@ -132,6 +132,63 @@ int sqlite_rsync_get_db_info(const char *db_path, sqlite_db_info_t *info) sqlite3_close(db); return 0; } +// SQLRSYNC +// Check database integrity using PRAGMA integrity_check +int sqlite_rsync_check_integrity(const char *db_path, char *error_msg, int error_msg_size) +{ + if (!db_path || !error_msg) + { + return -1; + } + + // Initialize error message + error_msg[0] = '\0'; + + sqlite3 *db; + int rc = sqlite3_open_v2(db_path, &db, SQLITE_OPEN_READONLY, NULL); + if (rc != SQLITE_OK) + { + if (error_msg_size > 0) + { + snprintf(error_msg, error_msg_size, "Cannot open database: %s", sqlite3_errmsg(db)); + } + if (db) + sqlite3_close(db); + return -1; + } + + sqlite3_stmt *stmt; + rc = sqlite3_prepare_v2(db, "PRAGMA integrity_check", -1, &stmt, NULL); + if (rc != SQLITE_OK) + { + if (error_msg_size > 0) + { + snprintf(error_msg, error_msg_size, "Cannot prepare integrity check: %s", sqlite3_errmsg(db)); + } + sqlite3_close(db); + return -1; + } + + int result = 0; // Assume OK + while (sqlite3_step(stmt) == SQLITE_ROW) + { + const char *result_text = (const char *)sqlite3_column_text(stmt, 0); + if (result_text && strcmp(result_text, "ok") != 0) + { + // Database is corrupted + result = 1; + if (error_msg_size > 0) + { + snprintf(error_msg, error_msg_size, "Integrity check failed: %s", result_text); + } + break; + } + } + + sqlite3_finalize(stmt); + sqlite3_close(db); + return result; +} // Cleanup resources void sqlite_rsync_cleanup(void) diff --git a/bridge/sqlite_rsync_wrapper.h b/bridge/sqlite_rsync_wrapper.h index 431eb7c..d6f874f 100644 --- a/bridge/sqlite_rsync_wrapper.h +++ b/bridge/sqlite_rsync_wrapper.h @@ -78,6 +78,10 @@ extern "C" int sqlite_rsync_get_db_info(const char *db_path, sqlite_db_info_t *info); + // SQLRSYNC: Check database integrity using PRAGMA integrity_check + // Returns 0 if OK, 1 if corrupted, -1 on error + int sqlite_rsync_check_integrity(const char *db_path, char *error_msg, int error_msg_size); + // Cleanup resources void sqlite_rsync_cleanup(void); diff --git a/client/Makefile b/client/Makefile index ec5744f..26981ad 100644 --- a/client/Makefile +++ b/client/Makefile @@ -1,5 +1,5 @@ # SQLite Rsync Go Client Makefile -.PHONY: all build clean test deps check-deps install-deps run help +.PHONY: all build clean test deps check-deps install-deps install run help # Build configuration BINARY_NAME := sqlrsync @@ -55,6 +55,12 @@ build: $(SQLITE_RSYNC_LIB) CGO_ENABLED=$(CGO_ENABLED) CGO_LDFLAGS="-L$(BRIDGE_LIB_DIR) -lsqlite_rsync" go build $(GOFLAGS) -ldflags="$(LDFLAGS)" -o $(BUILD_DIR)/$(BINARY_NAME) $(MAIN_FILE) @echo "✓ Build complete: $(BUILD_DIR)/$(BINARY_NAME)" +# Install the binary to system path +install: build + @echo "Installing $(BINARY_NAME) to /usr/local/bin/..." + cp $(BUILD_DIR)/$(BINARY_NAME) /usr/local/bin/$(BINARY_NAME) + @echo "✓ Install complete: /usr/local/bin/$(BINARY_NAME)" + # Build with debug symbols build-debug: check-deps @echo "Building $(BINARY_NAME) with debug symbols..." @@ -104,6 +110,7 @@ help: @echo " all - Check dependencies and build (default)" @echo " build - Build the binary" @echo " build-debug - Build with debug symbols" + @echo " install - Build and install binary to /usr/local/bin" @echo " clean - Remove build artifacts" @echo " deps - Download Go dependencies" @echo " check-deps - Check system dependencies" @@ -118,5 +125,6 @@ help: @echo "Usage examples:" @echo " make build" @echo " make run" + @echo " make install" @echo " make run-dry" @echo " make test" \ No newline at end of file diff --git a/client/auth/config.go b/client/auth/config.go index bea5925..b526b2a 100644 --- a/client/auth/config.go +++ b/client/auth/config.go @@ -166,17 +166,12 @@ func SaveLocalSecretsConfig(config *LocalSecretsConfig) error { return fmt.Errorf("failed to create directory %s: %w", dir, err) } - file, err := os.Create(path) + file, err := os.OpenFile(path, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0600) if err != nil { return fmt.Errorf("failed to create local-secrets config file %s: %w", path, err) } defer file.Close() - // Set file permissions to 0600 (read/write for owner only) - if err := file.Chmod(0600); err != nil { - return fmt.Errorf("failed to set permissions on local-secrets config file: %w", err) - } - encoder := toml.NewEncoder(file) if err := encoder.Encode(config); err != nil { return fmt.Errorf("failed to write local-secrets config: %w", err) diff --git a/client/auth/resolver.go b/client/auth/resolver.go index 96bf8f0..66a24ad 100644 --- a/client/auth/resolver.go +++ b/client/auth/resolver.go @@ -12,7 +12,7 @@ import ( // ResolveResult contains the resolved authentication information type ResolveResult struct { - AccessToken string + AccessKey string ReplicaID string ServerURL string RemotePath string @@ -22,14 +22,14 @@ type ResolveResult struct { // ResolveRequest contains the parameters for authentication resolution type ResolveRequest struct { - LocalPath string - RemotePath string - ServerURL string - ProvidedPullKey string - ProvidedPushKey string + LocalPath string + RemotePath string + ServerURL string + ProvidedPullKey string + ProvidedPushKey string ProvidedReplicaID string - Operation string // "pull", "push", "subscribe" - Logger *zap.Logger + Operation string // "pull", "push", "subscribe" + Logger *zap.Logger } // Resolver handles authentication and configuration resolution @@ -53,9 +53,9 @@ func (r *Resolver) Resolve(req *ResolveRequest) (*ResolveResult, error) { } // 1. Try environment variable first - if token := os.Getenv("SQLRSYNC_AUTH_TOKEN"); token != "" { - r.logger.Debug("Using SQLRSYNC_AUTH_TOKEN from environment") - result.AccessToken = token + if key := os.Getenv("SQLRSYNC_AUTH_KEY"); key != "" { + r.logger.Debug("Using SQLRSYNC_AUTH_KEY from environment") + result.AccessKey = key result.ReplicaID = req.ProvidedReplicaID return result, nil } @@ -63,14 +63,14 @@ func (r *Resolver) Resolve(req *ResolveRequest) (*ResolveResult, error) { // 2. Try explicitly provided keys if req.ProvidedPullKey != "" { r.logger.Debug("Using provided pull key") - result.AccessToken = req.ProvidedPullKey + result.AccessKey = req.ProvidedPullKey result.ReplicaID = req.ProvidedReplicaID return result, nil } if req.ProvidedPushKey != "" { r.logger.Debug("Using provided push key") - result.AccessToken = req.ProvidedPushKey + result.AccessKey = req.ProvidedPushKey result.ReplicaID = req.ProvidedReplicaID return result, nil } @@ -87,7 +87,7 @@ func (r *Resolver) Resolve(req *ResolveRequest) (*ResolveResult, error) { if req.ServerURL == "wss://sqlrsync.com" { if localSecretsConfig, err := LoadLocalSecretsConfig(); err == nil { if dbConfig := localSecretsConfig.FindDatabaseByPath(absLocalPath); dbConfig != nil { - r.logger.Debug("Using server URL from local secrets config", + r.logger.Debug("Using server URL from local secrets config", zap.String("configuredServer", dbConfig.Server), zap.String("defaultServer", req.ServerURL)) result.ServerURL = dbConfig.Server @@ -114,7 +114,7 @@ func (r *Resolver) Resolve(req *ResolveRequest) (*ResolveResult, error) { if req.Operation == "push" { if os.Getenv("SQLRSYNC_ADMIN_KEY") != "" { r.logger.Debug("Using SQLRSYNC_ADMIN_KEY from environment") - result.AccessToken = os.Getenv("SQLRSYNC_ADMIN_KEY") + result.AccessKey = os.Getenv("SQLRSYNC_ADMIN_KEY") result.ShouldPrompt = false return result, nil } @@ -126,7 +126,7 @@ func (r *Resolver) Resolve(req *ResolveRequest) (*ResolveResult, error) { // 5. If it's a pull, maybe no key needed if req.Operation == "pull" || req.Operation == "subscribe" { - result.AccessToken = "" + result.AccessKey = "" result.ShouldPrompt = false return result, nil } @@ -138,7 +138,7 @@ func (r *Resolver) Resolve(req *ResolveRequest) (*ResolveResult, error) { // resolveFromLocalSecrets attempts to resolve auth from local-secrets.toml func (r *Resolver) resolveFromLocalSecrets(absLocalPath, serverURL string, result *ResolveResult) (*ResolveResult, error) { r.logger.Debug("Attempting to resolve from local secrets", zap.String("absLocalPath", absLocalPath), zap.String("serverURL", serverURL)) - + localSecretsConfig, err := LoadLocalSecretsConfig() if err != nil { r.logger.Debug("Failed to load local secrets config", zap.Error(err)) @@ -162,14 +162,14 @@ func (r *Resolver) resolveFromLocalSecrets(absLocalPath, serverURL string, resul } if dbConfig.Server != serverURL { - r.logger.Debug("Server URL mismatch", - zap.String("configured", dbConfig.Server), + r.logger.Debug("Server URL mismatch", + zap.String("configured", dbConfig.Server), zap.String("requested", serverURL)) return nil, fmt.Errorf("server URL mismatch: configured=%s, requested=%s", dbConfig.Server, serverURL) } r.logger.Debug("Found authentication in local secrets config") - result.AccessToken = dbConfig.PushKey + result.AccessKey = dbConfig.PushKey result.ReplicaID = dbConfig.ReplicaID result.RemotePath = dbConfig.RemotePath result.ServerURL = dbConfig.Server @@ -193,7 +193,7 @@ func (r *Resolver) resolveFromDashFile(localPath string, result *ResolveResult) } r.logger.Debug("Found authentication in -sqlrsync file") - result.AccessToken = dashSQLRsync.PullKey + result.AccessKey = dashSQLRsync.PullKey result.ReplicaID = dashSQLRsync.ReplicaID result.RemotePath = dashSQLRsync.RemotePath result.ServerURL = dashSQLRsync.Server @@ -201,24 +201,25 @@ func (r *Resolver) resolveFromDashFile(localPath string, result *ResolveResult) return result, nil } -// PromptForAdminKey prompts the user for an admin key -func (r *Resolver) PromptForAdminKey(serverURL string) (string, error) { +// PromptForKey prompts the user for an key +func (r *Resolver) PromptForKey(serverURL string, remotePath string, keyType string) (string, error) { httpServer := strings.Replace(serverURL, "ws", "http", 1) - fmt.Println("No Key provided. Creating a new Replica? Get a key at " + httpServer + "/namespaces") - fmt.Print(" Enter an Account Admin Key to create a new Replica: ") + fmt.Println("Replica not found when using unauthenticated access. Try again using a key or check your spelling.") + fmt.Println(" Get a key at " + httpServer + "/namespaces or " + httpServer + "/" + remotePath) + fmt.Print(" Provide a key to " + keyType + ": ") reader := bufio.NewReader(os.Stdin) - token, err := reader.ReadString('\n') + key, err := reader.ReadString('\n') if err != nil { - return "", fmt.Errorf("failed to read admin key: %w", err) + return "", fmt.Errorf("failed to read key: %w", err) } - token = strings.TrimSpace(token) - if token == "" { - return "", fmt.Errorf("admin key cannot be empty") + key = strings.TrimSpace(key) + if key == "" { + return "", fmt.Errorf("key cannot be empty") } - return token, nil + return key, nil } // SavePushResult saves the result of a successful push operation @@ -279,4 +280,4 @@ func (r *Resolver) CheckNeedsDashFile(localPath, remotePath string) bool { } return dashSQLRsync.RemotePath != remotePath -} \ No newline at end of file +} diff --git a/client/main.go b/client/main.go index 45e972a..c3b6c6e 100644 --- a/client/main.go +++ b/client/main.go @@ -15,7 +15,7 @@ import ( "github.com/sqlrsync/sqlrsync.com/sync" ) -var VERSION = "0.0.5" +var VERSION = "0.0.6" var ( serverURL string verbose bool @@ -155,7 +155,7 @@ func runSync(cmd *cobra.Command, args []string) error { // Create sync coordinator coordinator := sync.NewCoordinator(&sync.CoordinatorConfig{ ServerURL: serverURL, - ProvidedAuthToken: getAuthToken(), + ProvidedAuthKey: getAuthKey(), ProvidedPullKey: pullKey, ProvidedPushKey: pushKey, ProvidedReplicaID: replicaID, @@ -224,10 +224,10 @@ func determineOperation(args []string) (sync.Operation, string, string, error) { return sync.Operation(0), "", "", fmt.Errorf("invalid arguments") } -func getAuthToken() string { +func getAuthKey() string { // Try environment variable first - if token := os.Getenv("SQLRSYNC_AUTH_TOKEN"); token != "" { - return token + if key := os.Getenv("SQLRSYNC_AUTH_KEY"); key != "" { + return key } // Try pull/push keys diff --git a/client/remote/client.go b/client/remote/client.go index 0b00cd2..4eaa070 100644 --- a/client/remote/client.go +++ b/client/remote/client.go @@ -401,9 +401,9 @@ type Config struct { EnableTrafficInspection bool // Enable detailed traffic logging InspectionDepth int // How many bytes to inspect (default: 32) PingPong bool - AuthToken string + AuthKey string ClientVersion string // version of the client software - SendKeyRequest bool // the -sqlrsync file doesn't exist, so make a token + SendKeyRequest bool // the -sqlrsync file doesn't exist, so make a key SendConfigCmd bool // we don't have the version number or remote path LocalHostname string @@ -685,9 +685,9 @@ func (c *Client) Connect() error { headers := http.Header{} - headers.Set("Authorization", c.config.AuthToken) + headers.Set("Authorization", c.config.AuthKey) - headers.Set("X-ClientVersion", c.config.ClientVersion); + headers.Set("X-ClientVersion", c.config.ClientVersion) if c.config.WsID != "" { headers.Set("X-ClientID", c.config.WsID) @@ -882,9 +882,9 @@ func (c *Client) Read(buffer []byte) (int, error) { if c.config.Subscribe { return 1 * time.Hour } - // Use a longer timeout if sync is completed to allow final transaction processing + // Use a shorter timeout if sync is completed to allow final transaction processing if c.isSyncCompleted() { - return 2 * time.Second + return 1 * time.Second } return 30 * time.Second }()): @@ -1012,7 +1012,7 @@ func (c *Client) Close() { if c.conn != nil { // Send close message closeMessage := websocket.FormatCloseMessage(websocket.CloseNormalClosure, "") - err := c.conn.WriteControl(websocket.CloseMessage, closeMessage, time.Now().Add(5*time.Second)) + err := c.conn.WriteControl(websocket.CloseMessage, closeMessage, time.Now().Add(3*time.Second)) if err != nil { c.logger.Debug("Error sending close message", zap.Error(err)) } else { diff --git a/client/subscription/manager.go b/client/subscription/manager.go index 74e45c2..5c3139b 100644 --- a/client/subscription/manager.go +++ b/client/subscription/manager.go @@ -1,12 +1,14 @@ package subscription import ( + "bufio" "context" "encoding/json" "fmt" "io" "net/http" "net/url" + "os" "strings" "sync" "time" @@ -38,7 +40,7 @@ type Message struct { type ManagerConfig struct { ServerURL string ReplicaPath string - AuthToken string + AccessKey string ReplicaID string WsID string // websocket ID for client identification ClientVersion string // version of the client software @@ -199,7 +201,7 @@ func (m *Manager) doConnect() error { u.Path = strings.TrimSuffix(u.Path, "/") + "/sapi/subscribe/" + m.config.ReplicaPath headers := http.Header{} - headers.Set("Authorization", m.config.AuthToken) + headers.Set("Authorization", m.config.AccessKey) if m.config.ReplicaID != "" { headers.Set("X-ReplicaID", m.config.ReplicaID) } @@ -229,6 +231,20 @@ func (m *Manager) doConnect() error { } } + // Connect to remote server + if strings.Contains(err.Error(), "key is not authorized") || strings.Contains(err.Error(), "404 Path not found") { + if m.config.AccessKey == "" { + key, err := PromptForKey(m.config.ServerURL, m.config.ReplicaPath, "PULL") + if err != nil { + return fmt.Errorf("manager failed to get key interactively: %w", err) + } + m.config.AccessKey = key + return m.doConnect() + } else { + return fmt.Errorf("manager failed to connect to server: %w", err) + } + } + // Create a clean error message var errorMsg strings.Builder errorMsg.WriteString(fmt.Sprintf("HTTP %d (%s)", statusCode, statusText)) @@ -574,3 +590,24 @@ func (m *Manager) pingLoop() { } } } + +// PromptForKey prompts the user for an admin key +func PromptForKey(serverURL string, remotePath string, keyType string) (string, error) { + httpServer := strings.Replace(serverURL, "ws", "http", 1) + fmt.Println("Replica not found when using unauthenticated access. Try again using a key or check your spelling.") + fmt.Println(" Get a key at " + httpServer + "/namespaces or " + httpServer + "/" + remotePath) + fmt.Print(" Provide a key to " + keyType + ": ") + + reader := bufio.NewReader(os.Stdin) + key, err := reader.ReadString('\n') + if err != nil { + return "", fmt.Errorf("failed to read admin key: %w", err) + } + + key = strings.TrimSpace(key) + if key == "" { + return "", fmt.Errorf("admin key cannot be empty") + } + + return key, nil +} diff --git a/client/sync/coordinator.go b/client/sync/coordinator.go index 4da1af6..0af5335 100644 --- a/client/sync/coordinator.go +++ b/client/sync/coordinator.go @@ -32,7 +32,7 @@ const ( // CoordinatorConfig holds sync coordinator configuration type CoordinatorConfig struct { ServerURL string - ProvidedAuthToken string // Explicitly provided auth token + ProvidedAuthKey string // Explicitly provided auth key ProvidedPullKey string // Explicitly provided pull key ProvidedPushKey string // Explicitly provided push key ProvidedReplicaID string // Explicitly provided replica ID @@ -133,10 +133,10 @@ func (c *Coordinator) displayDryRunInfo(operation string, authResult *auth.Resol if operation != "local" { fmt.Printf(" - Server: %s\n", color.GreenString(serverURL)) - if authResult.AccessToken != "" { - fmt.Printf(" - Access Token: %s\n", color.GreenString(authResult.AccessToken)) + if authResult.AccessKey != "" { + fmt.Printf(" - Access Key: %s\n", color.GreenString(authResult.AccessKey)) } else { - fmt.Printf(" - Access Token: %s\n", color.YellowString("(none)")) + fmt.Printf(" - Access Key: %s\n", color.YellowString("(none)")) } if operation == "push" { @@ -179,14 +179,14 @@ func (c *Coordinator) resolveAuth(operation string) (*auth.ResolveResult, error) Logger: c.logger, } - // Try explicit auth token first - if c.config.ProvidedAuthToken != "" { + // Try explicit auth key first + if c.config.ProvidedAuthKey != "" { return &auth.ResolveResult{ - AccessToken: c.config.ProvidedAuthToken, - ReplicaID: c.config.ProvidedReplicaID, - ServerURL: c.config.ServerURL, - RemotePath: c.config.RemotePath, - LocalPath: c.config.LocalPath, + AccessKey: c.config.ProvidedAuthKey, + ReplicaID: c.config.ProvidedReplicaID, + ServerURL: c.config.ServerURL, + RemotePath: c.config.RemotePath, + LocalPath: c.config.LocalPath, }, nil } @@ -196,12 +196,12 @@ func (c *Coordinator) resolveAuth(operation string) (*auth.ResolveResult, error) } // If prompting is needed for push operations - if result.ShouldPrompt && operation == "push" { - token, err := c.authResolver.PromptForAdminKey(c.config.ServerURL) + if result.ShouldPrompt || (operation == "push" && result.AccessKey == "") { + key, err := c.authResolver.PromptForKey(c.config.ServerURL, c.config.RemotePath, "PUSH") if err != nil { return nil, err } - result.AccessToken = token + result.AccessKey = key result.ShouldPrompt = false } @@ -245,7 +245,7 @@ func (c *Coordinator) executeSubscribe() error { c.subManager = subscription.NewManager(&subscription.ManagerConfig{ ServerURL: authResult.ServerURL, ReplicaPath: authResult.RemotePath, - AuthToken: authResult.AccessToken, + AccessKey: authResult.AccessKey, ReplicaID: authResult.ReplicaID, WsID: c.config.WsID, ClientVersion: c.config.ClientVersion, @@ -366,7 +366,7 @@ func (c *Coordinator) executePull(isSubscription bool) error { // Create remote client for WebSocket transport remoteClient, err := remote.New(&remote.Config{ ServerURL: serverURL + "/sapi/pull/" + remotePath, - AuthToken: authResult.AccessToken, + AuthKey: authResult.AccessKey, ReplicaID: authResult.ReplicaID, Timeout: 8000, PingPong: false, // No ping/pong needed for single sync @@ -398,7 +398,17 @@ func (c *Coordinator) executePull(isSubscription bool) error { // Connect to remote server if err := remoteClient.Connect(); err != nil { - return fmt.Errorf("failed to connect to server: %w", err) + if (strings.Contains(err.Error(), "key is not authorized") || strings.Contains(err.Error(), "404 Path not found")) && authResult.AccessKey == "" { + key, err := c.authResolver.PromptForKey(c.config.ServerURL, c.config.RemotePath, "PULL") + if err != nil { + return fmt.Errorf("coordinator failed to get key interactively: %w", err) + } + c.config.ProvidedAuthKey = key + return c.executePull(isSubscription) + } else { + + return fmt.Errorf("coordinator failed to connect to server: %w", err) + } } // Create local client for SQLite operations @@ -417,6 +427,10 @@ func (c *Coordinator) executePull(isSubscription bool) error { if err := c.performPullSync(localClient, remoteClient); err != nil { return fmt.Errorf("pull synchronization failed: %w", err) } + + // Check database integrity after pull + localClient.CheckIntegrity() + c.config.Version = remoteClient.GetVersion() // Save pull result if needed if remoteClient.GetNewPullKey() != "" && c.authResolver.CheckNeedsDashFile(c.config.LocalPath, remotePath) { @@ -440,10 +454,21 @@ func (c *Coordinator) executePull(isSubscription bool) error { // executePush performs a push sync operation func (c *Coordinator) executePush() error { - // Validate that database file exists - if _, err := os.Stat(c.config.LocalPath); os.IsNotExist(err) { - return fmt.Errorf("database file does not exist: %s", c.config.LocalPath) + // Create local client for SQLite operations + localClient, err := bridge.New(&bridge.BridgeConfig{ + DatabasePath: c.config.LocalPath, + DryRun: c.config.DryRun, + Logger: c.logger.Named("local"), + EnableSQLiteRsyncLogging: c.config.Verbose, + }) + if err != nil { + return fmt.Errorf("failed to create local client: %w", err) } + defer localClient.Close() + + // Check database integrity before pushing + localClient.CheckIntegrity() + // Resolve authentication authResult, err := c.resolveAuth("push") @@ -463,18 +488,6 @@ func (c *Coordinator) executePush() error { remotePath = c.config.RemotePath } - // Create local client for SQLite operations - localClient, err := bridge.New(&bridge.BridgeConfig{ - DatabasePath: c.config.LocalPath, - DryRun: c.config.DryRun, - Logger: c.logger.Named("local"), - EnableSQLiteRsyncLogging: c.config.Verbose, - }) - if err != nil { - return fmt.Errorf("failed to create local client: %w", err) - } - defer localClient.Close() - // Get absolute path for the local database absLocalPath, err := filepath.Abs(c.config.LocalPath) if err != nil { @@ -495,7 +508,7 @@ func (c *Coordinator) executePush() error { ServerURL: serverURL + "/sapi/push/" + remotePath, PingPong: true, Timeout: 15000, - AuthToken: authResult.AccessToken, + AuthKey: authResult.AccessKey, Logger: c.logger.Named("remote"), EnableTrafficInspection: c.config.Verbose, LocalHostname: localHostname, @@ -507,7 +520,7 @@ func (c *Coordinator) executePush() error { CommitMessage: c.config.CommitMessage, WsID: c.config.WsID, // Add websocket ID ClientVersion: c.config.ClientVersion, - ProgressCallback: nil, //remote.DefaultProgressCallback(remote.FormatSimple), + ProgressCallback: nil, //remote.DefaultProgressCallback(remote.FormatSimple), ProgressConfig: &remote.ProgressConfig{ Enabled: true, Format: remote.FormatSimple, diff --git a/examples/earthquakes/gov.usgs.earthquakes.sh b/examples/earthquakes/gov.usgs.earthquakes.sh new file mode 100755 index 0000000..f34a27c --- /dev/null +++ b/examples/earthquakes/gov.usgs.earthquakes.sh @@ -0,0 +1,137 @@ +#!/bin/bash + +# USGS Earthquake Data Synchronization Script +# Downloads earthquake data every 50 minutes and syncs to SQLRsync + +# Configuration +FILE=earthquakes.db +TABLE=earthquakes +UPDATES=5m +URL=https://earthquake.usgs.gov/earthquakes/feed/v1.0/summary/4.5_month.csv +URL=https://earthquake.usgs.gov/earthquakes/feed/v1.0/summary/all_hour.csv +SQLRSYNC_PATH=usgs.gov/earthquakes.db +PRIMARY_KEY=id +MODE="INSERT OR REPLACE INTO" +SCHEMA="time TEXT, latitude REAL, longitude REAL, depth REAL, mag REAL, magType TEXT, nst INTEGER, gap REAL, dmin REAL, rms REAL, net TEXT, id TEXT PRIMARY KEY, updated TEXT, place TEXT, type TEXT, horizontalError REAL, depthError REAL, magError REAL, magNst INTEGER, status TEXT, locationSource TEXT, magSource TEXT" + +# Convert time interval to seconds for sleep +convert_to_seconds() { + local time_str="$1" + local num="${time_str%[a-zA-Z]*}" + local unit="${time_str#$num}" + + case "$unit" in + s|sec) echo "$num" ;; + m|min) echo $((num * 60)) ;; + h|hour) echo $((num * 3600)) ;; + d|day) echo $((num * 86400)) ;; + *) echo 3000 ;; # default to 50 minutes + esac +} + +# Initialize database and table +init_database() { + echo "Initializing database: $FILE" + sqlite3 "$FILE" "CREATE TABLE IF NOT EXISTS $TABLE ($SCHEMA);" + if [ $? -eq 0 ]; then + echo "Database initialized successfully" + else + echo "Error: Failed to initialize database" + exit 1 + fi +} + +# Download and import data with in-memory staging +sync_data() { + echo "$(date): Downloading earthquake data from USGS..." + + # Download CSV data + local temp_file=$(mktemp) + if curl -s -f "$URL" -o "$temp_file"; then + echo "Data downloaded successfully" + + # Get record count before import + local count_before=$(sqlite3 "$FILE" "SELECT COUNT(*) FROM $TABLE;" 2>/dev/null || echo "0") + + # Use in-memory database for staging to avoid bloating main database + sqlite3 ":memory:" </dev/null | head -3 + + # Sync to SQLRsync server if path is configured + if [ -n "$SQLRSYNC_PATH" ] && command -v sqlrsync >/dev/null 2>&1; then + echo "Syncing to SQLRsync server: $SQLRSYNC_PATH" + sqlrsync "$FILE" "$SQLRSYNC_PATH" -m "Added $new_records records, others updated" + if [ $? -eq 0 ]; then + echo "Successfully synced to server" + else + echo "Warning: Failed to sync to server" + fi + fi + else + echo "Error: Failed to import data" + fi + else + echo "Error: Failed to download data from $URL" + fi + + rm -f "$temp_file" +} + +# Main execution +main() { + echo "Fetch CSV to SQLite Data Sync Starting..." + echo "Configuration:" + echo " Database: $FILE" + echo " Table: $TABLE" + echo " Update interval: $UPDATES" + echo " Data source: $URL" + echo " SQLRsync path: $SQLRSYNC_PATH" + echo "" + + # Initialize database + init_database + + # Convert update interval to seconds + local sleep_seconds=$(convert_to_seconds "$UPDATES") + echo "Update interval: $sleep_seconds seconds" + echo "" + + # Initial sync + sync_data + + # Continuous sync loop + echo "Starting continuous sync (Ctrl+C to stop)..." + while true; do + echo "Sleeping for $UPDATES ($sleep_seconds seconds)..." + sleep "$sleep_seconds" + sync_data + done +} + +# Run main function if script is executed directly +if [ "${BASH_SOURCE[0]}" == "${0}" ]; then + main "$@" +fi +