Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ on:
jobs:
build:
name: Build for ${{ matrix.os }}
runs-on: ${{ matrix.runs-on }}
strategy:
matrix:
include:
Expand Down
242 changes: 121 additions & 121 deletions bridge/cgo_bridge.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,14 @@ import (
"go.uber.org/zap"
)

const DEBUG_FILE = "sqlite_rsync_debug.log"

var (
handleCounter int64
handleMap = make(map[int64]cgo.Handle)
handleMutex sync.RWMutex
)

const DEBUG_FILE = "sqlrsync-debug"

// GetDatabaseInfo wraps the C function to get database info
func GetDatabaseInfo(dbPath string) (*DatabaseInfo, error) {
return cgoGetDatabaseInfo(dbPath)
Expand Down Expand Up @@ -67,12 +67,12 @@ func cgoGetDatabaseInfo(dbPath string) (*DatabaseInfo, error) {
}

// RunOriginSync wraps the C function to run origin synchronization
func RunOriginSync(dbPath string, dryRun bool, client *Client) error {
func RunOriginSync(dbPath string, dryRun bool, client *BridgeClient) error {
return cgoRunOriginSync(dbPath, dryRun, client)
}

// cgoRunOriginSync wraps the C function to run origin synchronization
func cgoRunOriginSync(dbPath string, dryRun bool, client *Client) error {
func cgoRunOriginSync(dbPath string, dryRun bool, client *BridgeClient) error {
// Prepare C configuration
cDbPath := C.CString(dbPath)
defer C.free(unsafe.Pointer(cDbPath))
Expand All @@ -86,9 +86,9 @@ func cgoRunOriginSync(dbPath string, dryRun bool, client *Client) error {
wal_only: 0,
error_file: nil,
}
// Set debug_file if DEBUG_FILE is defined (non-empty)
if DEBUG_FILE != "" {
cDebugFile := C.CString(DEBUG_FILE + ".origin.log")
// Set debugFile if debugFile is defined (non-empty)
if client.Config.EnableSQLiteRsyncLogging {
cDebugFile := C.CString(DEBUG_FILE + "-push.log")
defer C.free(unsafe.Pointer(cDebugFile))
config.error_file = cDebugFile
}
Expand All @@ -100,11 +100,11 @@ func cgoRunOriginSync(dbPath string, dryRun bool, client *Client) error {
// Create cgo handle for the client and store it in map
handle := cgo.NewHandle(client)
handleID := atomic.AddInt64(&handleCounter, 1)

handleMutex.Lock()
handleMap[handleID] = handle
handleMutex.Unlock()

defer func() {
handleMutex.Lock()
delete(handleMap, handleID)
Expand All @@ -117,7 +117,7 @@ func cgoRunOriginSync(dbPath string, dryRun bool, client *Client) error {
cHandleID := C.malloc(C.sizeof_long)
defer C.free(cHandleID)
*(*C.long)(cHandleID) = C.long(handleID)

ioCtx := C.sqlite_rsync_io_context_t{
user_data: cHandleID,
read_func: C.read_callback_t(C.go_local_read_callback),
Expand Down Expand Up @@ -150,16 +150,16 @@ func cgoCleanup() {
func go_local_read_callback(userData unsafe.Pointer, buffer *C.uint8_t, size C.int) C.int {

handleID := int64(*(*C.long)(userData))

handleMutex.RLock()
handle, ok := handleMap[handleID]
handleMutex.RUnlock()

if !ok {
return -1
}
client := handle.Value().(*Client)

client := handle.Value().(*BridgeClient)

if client.ReadFunc == nil {
client.Logger.Error("Read function not set")
Expand All @@ -186,16 +186,16 @@ func go_local_read_callback(userData unsafe.Pointer, buffer *C.uint8_t, size C.i
//export go_local_write_callback
func go_local_write_callback(userData unsafe.Pointer, buffer *C.uint8_t, size C.int) C.int {
handleID := int64(*(*C.long)(userData))

handleMutex.RLock()
handle, ok := handleMap[handleID]
handleMutex.RUnlock()

if !ok {
return -1
}
client := handle.Value().(*Client)

client := handle.Value().(*BridgeClient)

if client.WriteFunc == nil {
client.Logger.Error("Write function not set")
Expand All @@ -216,117 +216,117 @@ func go_local_write_callback(userData unsafe.Pointer, buffer *C.uint8_t, size C.
}

// RunReplicaSync wraps the C function to run replica synchronization
func RunReplicaSync(originDbPath, replicaDbPath string, client *Client) error {
return cgoRunReplicaSync(originDbPath, replicaDbPath, client)
func RunReplicaSync(originDbPath, replicaDbPath string, client *BridgeClient) error {
return cgoRunReplicaSync(originDbPath, replicaDbPath, client)
}

// RunDirectSync wraps the C function to run direct local synchronization
func RunDirectSync(originDbPath, replicaDbPath string, dryRun bool, verbose int) error {
return cgoRunDirectSync(originDbPath, replicaDbPath, dryRun, verbose)
return cgoRunDirectSync(originDbPath, replicaDbPath, dryRun, verbose)
}

// cgoRunReplicaSync wraps the C function to run replica synchronization
func cgoRunReplicaSync(originDbPath, replicaDbPath string, client *Client) error {
// Prepare C configuration
cOriginPath := C.CString(originDbPath)
defer C.free(unsafe.Pointer(cOriginPath))
cReplicaPath := C.CString(replicaDbPath)
defer C.free(unsafe.Pointer(cReplicaPath))

config := C.sqlite_rsync_config_t{
origin_path: cOriginPath,
replica_path: cReplicaPath,
protocol_version: C.PROTOCOL_VERSION,
verbose_level: 0,
dry_run: 0,
wal_only: 0,
error_file: nil,
}

// Set debug_file if DEBUG_FILE is defined
if DEBUG_FILE != "" {
cDebugFile := C.CString(DEBUG_FILE + "replica.log")
defer C.free(unsafe.Pointer(cDebugFile))
config.error_file = cDebugFile
}

// Create cgo handle for the client
handle := cgo.NewHandle(client)
handleID := atomic.AddInt64(&handleCounter, 1)
handleMutex.Lock()
handleMap[handleID] = handle
handleMutex.Unlock()
defer func() {
handleMutex.Lock()
delete(handleMap, handleID)
handleMutex.Unlock()
handle.Delete()
}()

// Setup I/O context with Go callbacks
cHandleID := C.malloc(C.sizeof_long)
defer C.free(cHandleID)
*(*C.long)(cHandleID) = C.long(handleID)
ioCtx := C.sqlite_rsync_io_context_t{
user_data: cHandleID,
read_func: C.read_callback_t(C.go_local_read_callback),
write_func: C.write_callback_t(C.go_local_write_callback),
}

// Run the replica synchronization
result := C.sqlite_rsync_run_replica(&config, &ioCtx)
if result != 0 {
return &SQLiteRsyncError{
Code: int(result),
Message: "replica sync failed",
}
}

return nil
func cgoRunReplicaSync(originDbPath, replicaDbPath string, client *BridgeClient) error {
// Prepare C configuration
cOriginPath := C.CString(originDbPath)
defer C.free(unsafe.Pointer(cOriginPath))

cReplicaPath := C.CString(replicaDbPath)
defer C.free(unsafe.Pointer(cReplicaPath))

config := C.sqlite_rsync_config_t{
origin_path: cOriginPath,
replica_path: cReplicaPath,
protocol_version: C.PROTOCOL_VERSION,
verbose_level: 0,
dry_run: 0,
wal_only: 0,
error_file: nil,
}

// Set debugFile if debugFile is defined
if client.Config.EnableSQLiteRsyncLogging {
cDebugFile := C.CString(DEBUG_FILE + "-push.log")
defer C.free(unsafe.Pointer(cDebugFile))
config.error_file = cDebugFile
}

// Create cgo handle for the client
handle := cgo.NewHandle(client)
handleID := atomic.AddInt64(&handleCounter, 1)

handleMutex.Lock()
handleMap[handleID] = handle
handleMutex.Unlock()

defer func() {
handleMutex.Lock()
delete(handleMap, handleID)
handleMutex.Unlock()
handle.Delete()
}()

// Setup I/O context with Go callbacks
cHandleID := C.malloc(C.sizeof_long)
defer C.free(cHandleID)
*(*C.long)(cHandleID) = C.long(handleID)

ioCtx := C.sqlite_rsync_io_context_t{
user_data: cHandleID,
read_func: C.read_callback_t(C.go_local_read_callback),
write_func: C.write_callback_t(C.go_local_write_callback),
}

// Run the replica synchronization
result := C.sqlite_rsync_run_replica(&config, &ioCtx)
if result != 0 {
return &SQLiteRsyncError{
Code: int(result),
Message: "replica sync failed",
}
}

return nil
}

// cgoRunDirectSync wraps the C function to run direct local synchronization
func cgoRunDirectSync(originDbPath, replicaDbPath string, dryRun bool, verbose int) error {
// Prepare C configuration
cOriginPath := C.CString(originDbPath)
defer C.free(unsafe.Pointer(cOriginPath))
cReplicaPath := C.CString(replicaDbPath)
defer C.free(unsafe.Pointer(cReplicaPath))

config := C.sqlite_rsync_config_t{
origin_path: cOriginPath,
replica_path: cReplicaPath,
protocol_version: C.PROTOCOL_VERSION,
verbose_level: C.int(verbose),
dry_run: 0,
wal_only: 0,
error_file: nil,
}

// Set debug_file if DEBUG_FILE is defined
if DEBUG_FILE != "" {
cDebugFile := C.CString(DEBUG_FILE + ".direct.log")
defer C.free(unsafe.Pointer(cDebugFile))
config.error_file = cDebugFile
}

if dryRun {
config.dry_run = 1
}

// Run direct sync without WebSocket I/O - use the original C implementation
result := C.sqlite_rsync_run_direct(&config)
if result != 0 {
return &SQLiteRsyncError{
Code: int(result),
Message: "direct sync failed",
}
}

return nil
}
// Prepare C configuration
cOriginPath := C.CString(originDbPath)
defer C.free(unsafe.Pointer(cOriginPath))

cReplicaPath := C.CString(replicaDbPath)
defer C.free(unsafe.Pointer(cReplicaPath))

config := C.sqlite_rsync_config_t{
origin_path: cOriginPath,
replica_path: cReplicaPath,
protocol_version: C.PROTOCOL_VERSION,
verbose_level: C.int(verbose),
dry_run: 0,
wal_only: 0,
error_file: nil,
}

// Set debugFile if debugFile is defined
if verbose > 0 {
cDebugFile := C.CString(DEBUG_FILE + "-push.log")
defer C.free(unsafe.Pointer(cDebugFile))
config.error_file = cDebugFile
}

if dryRun {
config.dry_run = 1
}

// Run direct sync without WebSocket I/O - use the original C implementation
result := C.sqlite_rsync_run_direct(&config)
if result != 0 {
return &SQLiteRsyncError{
Code: int(result),
Message: "direct sync failed",
}
}

return nil
}
Loading
Loading