Skip to content

feat: Global logs exporter#1273

Open
Azorlogh wants to merge 23 commits intomainfrom
feat/global-logs-exporter
Open

feat: Global logs exporter#1273
Azorlogh wants to merge 23 commits intomainfrom
feat/global-logs-exporter

Conversation

@Azorlogh
Copy link
Copy Markdown
Contributor

Usage:

  • enable global exporter --experimental-global-exporter "stdout:{}"
  • reset the exporter state --global-exporter-reset

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
@Azorlogh Azorlogh requested a review from a team as a code owner February 27, 2026 17:18
@coderabbitai
Copy link
Copy Markdown
Contributor

coderabbitai bot commented Feb 27, 2026

Note

Reviews paused

It looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the reviews.auto_review.auto_pause_after_reviewed_commits setting.

Use the following commands to manage reviews:

  • @coderabbitai resume to resume automatic reviews.
  • @coderabbitai review to trigger a single review.

Use the checkboxes below for quick actions:

  • ▶️ Resume reviews
  • 🔍 Trigger review

Walkthrough

This PR introduces a global logs exporter feature that orchestrates cross-ledger log export. It adds configuration parsing, driver instantiation from JSON configs, a stateful runner managing periodic log polling and batch pushing with retry logic, state persistence, database schema migrations, and comprehensive end-to-end tests.

Changes

Cohort / File(s) Summary
Configuration & Command Integration
cmd/config.go, cmd/serve.go
Added ExperimentalGlobalExporter and GlobalExporterReset config fields with parser; wired global exporter module into command with lifecycle hooks and new public flags.
Driver Registry
internal/replication/drivers/registry.go
Added CreateFromConfig method to instantiate drivers from in-memory JSON configurations without store interaction.
Global Exporter Core
internal/replication/global_exporter.go
Comprehensive new module defining GlobalExporterStateStore interface, GlobalExporterRunnerConfig, GlobalExporterRunner with Run/Shutdown methods, per-ledger log export orchestration, batch pushing with retry logic, and state caching.
State Management
internal/controller/system/store.go, internal/storage/system/store.go
Added interface methods and storage implementations for UpdateGlobalExporterState, ListGlobalExporterStates, and DeleteAllGlobalExporterStates.
Database Schema
internal/storage/system/migrations.go
New migration creating _system.global_exporter_state table with ledger (primary key) and last_log_id columns.
Testing
pkg/testserver/server.go, test/e2e/api_global_exporter_test.go
Added ExperimentalGlobalExporterInstrumentation and GlobalExporterResetInstrumentation helpers; comprehensive E2E test suite validating export initialization, catch-up behavior, multi-server scenarios, and reset flows.

Sequence Diagram

sequenceDiagram
    participant Config as Configuration
    participant Cmd as Command/Serve
    participant Runner as GlobalExporterRunner
    participant Store as State Store
    participant Fetcher as Log Fetcher
    participant Driver as Export Driver

    Config->>Cmd: Parse ExperimentalGlobalExporter config
    Cmd->>Runner: Initialize with config & driver
    
    loop Run Loop (Poll Interval)
        Runner->>Store: ListGlobalExporterStates()
        Store-->>Runner: map[ledger]lastLogID
        
        par Per-Ledger Export
            Runner->>Fetcher: Request logs from lastLogID
            Fetcher-->>Runner: Paginated log results
            
            loop Batch Push with Retry
                Runner->>Driver: Push batch logs
                alt Success
                    Driver-->>Runner: OK
                    Runner->>Store: UpdateGlobalExporterState(ledger, newLogID)
                else Failure
                    Driver-->>Runner: Error
                    Runner->>Runner: Wait PushRetryPeriod
                    Runner->>Driver: Retry push
                end
            end
        end
        
        Runner->>Runner: Check stop signal
    end
    
    Cmd->>Runner: Shutdown()
    Runner->>Driver: Close driver
    Runner-->>Cmd: Graceful shutdown complete
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~45 minutes

Poem

🐰 Hop, hop, logs now flow across the warren wide,
Each ledger's tales exported with periodic pride,
State persists through restarts, no logs are lost or missed,
A global exporter's dance—resilient, fast, and blessed!

🚥 Pre-merge checks | ✅ 2 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 7.69% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (2 passed)
Check name Status Explanation
Title check ✅ Passed The PR title 'feat: Global logs exporter' directly and clearly describes the main feature being added, matching the comprehensive implementation across multiple files.
Description check ✅ Passed The PR description provides usage examples for the new global exporter feature, which aligns with the changeset's introduction of global exporter functionality.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch feat/global-logs-exporter

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 6

🧹 Nitpick comments (4)
internal/storage/system/store.go (1)

420-429: Consider using WhereAllWithDeleted() instead of Where("TRUE").

While WHERE TRUE works, bun provides WhereAllWithDeleted() (or simply omitting the WHERE clause when using Model((*type)(nil))) for deleting all rows. However, since there's no soft-delete on this table, a cleaner alternative is to use bun's WhereAllWithDeleted() or just remove the WHERE clause if the model has no soft-delete.

♻️ Optional: Cleaner alternative
 func (d *DefaultStore) DeleteAllGlobalExporterStates(ctx context.Context) error {
 	_, err := d.db.NewDelete().
 		Model((*globalExporterState)(nil)).
-		Where("TRUE").
+		WhereAllWithDeleted().
 		Exec(ctx)
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@internal/storage/system/store.go` around lines 420 - 429, The
DeleteAllGlobalExporterStates method currently uses Where("TRUE") to delete all
rows for model globalExporterState; replace that with bun's cleaner API by
calling WhereAllWithDeleted() on the query (or simply remove the Where clause if
globalExporterState has no soft-delete) in the
d.db.NewDelete().Model((*globalExporterState)(nil)) chain inside
DefaultStore.DeleteAllGlobalExporterStates so the intention to delete all rows
is expressed idiomatically.
internal/replication/global_exporter.go (3)

201-235: Potential goroutine leak if context is cancelled during driver.Accept.

The goroutine calling driver.Accept writes to errChan after the call completes. If the stop signal is received (line 227-231), the function returns true but the goroutine may still be running. While cancel() is called, the goroutine will eventually complete and write to errChan, which is buffered (size 1), so it won't block. However, if driver.Accept doesn't respect context cancellation, this could delay shutdown.

This is acceptable if driver.Accept respects context cancellation. If it doesn't, consider adding a timeout or documenting this behavior.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@internal/replication/global_exporter.go` around lines 201 - 235, The
goroutine calling driver.Accept can outlive the function if driver.Accept
ignores context cancellation; modify the push loop to avoid a potential
goroutine/shutdown hang by ensuring the goroutine's result send to errChan is
non-blocking or bounded by the stop signal: create errChan as buffered (already
1) but also select when sending the error (e.g., select { case errChan <- err:
case <-r.stopChannel: } ), or wrap exportCtx with a timeout via
context.WithTimeout before calling driver.Accept so Accept is forced to return;
reference the goroutine that invokes driver.Accept, the errChan send,
exportCtx/cancel, and r.stopChannel/r.config.PushRetryPeriod when making the
change.

76-159: Consider adding observability for catch-up progress.

The Run method processes ledgers and logs but doesn't emit metrics or structured logs indicating progress (e.g., number of ledgers processed, logs exported). For operational visibility during catch-up of large backlogs, consider adding basic metrics or periodic progress logging.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@internal/replication/global_exporter.go` around lines 76 - 159, Add simple
observability around ledger catch-up inside GlobalExporterRunner.Run: count
ledgers processed in the inner ledger loop and count logs exported inside
exportLedgerLogs (or have exportLedgerLogs return the number exported) and emit
periodic structured logs or metric increments (e.g., r.logger.Infof or
r.metrics.Inc if a metrics field exists) reporting "ledgers_processed" and
"logs_exported" and remaining backlog status; update Run to aggregate
per-iteration totals (use the cursor loop and variables like states, cursor, and
nextQuery) and log or emit metrics after each pagination batch or when
exportLedgerLogs signals HasMore so operators can see progress. Ensure you
update exportLedgerLogs signature/return to provide count information if needed
and reference facade, l.Name and lastLogID when emitting context in logs.

79-84: Reset failure silently continues—consider whether this should be fatal.

If resetting the global exporter state fails, the runner logs an error but continues. This could lead to unexpected behavior where logs are not re-exported as intended. Consider whether a reset failure should prevent startup or at least be more prominently surfaced.

♻️ Option: Make reset failure fatal
 	if r.config.Reset {
 		r.logger.Infof("Resetting global exporter state — all logs will be re-exported")
 		if err := r.store.DeleteAllGlobalExporterStates(ctx); err != nil {
-			r.logger.Errorf("Failed to reset global exporter state: %v", err)
+			r.logger.Errorf("Failed to reset global exporter state: %v — aborting", err)
+			return
 		}
 	}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@internal/replication/global_exporter.go` around lines 79 - 84, The current
reset block logs a failure from r.store.DeleteAllGlobalExporterStates(ctx) but
continues startup; change this so a reset error is propagated or treated as
fatal: inside the r.config.Reset condition, if DeleteAllGlobalExporterStates
returns an error, return that error (or call a controlled shutdown/exit) instead
of just r.logger.Errorf, so the caller of the function handling startup receives
the failure; update any surrounding function signatures to return error if
necessary and ensure callers handle the propagated error appropriately
(references: r.config.Reset, r.store.DeleteAllGlobalExporterStates,
r.logger.Infof, r.logger.Errorf).
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@cmd/config.go`:
- Around line 45-53: The parseGlobalExporter function currently accepts inputs
like ":{}" or "stdout:" and defers failure; update parseGlobalExporter to
validate that the driverName (value[:idx]) and configStr (value[idx+1:]) are
non-empty and return immediate errors for empty driver or empty config (e.g.,
"missing driver" / "missing config"); additionally validate that configStr is
valid JSON (use json.Valid([]byte(configStr)) or attempt json.Unmarshal into
json.RawMessage) and return a clear error if JSON is malformed, otherwise return
driverName and json.RawMessage(configStr) as before.

In `@cmd/serve.go`:
- Around line 165-171: The reset flag is only handled inside the
ExperimentalGlobalExporter branch so "--global-exporter-reset" is a no-op when
no exporter is configured; before or separate from the
cfg.ExperimentalGlobalExporter check, detect cfg.GlobalExporterReset being true
and append a reset-only module to options (e.g. call globalExporterModule with
an empty driverName/rawConfig or via a dedicated reset path) so reset behavior
runs even when parseGlobalExporter is not invoked; keep parseGlobalExporter
usage unchanged for the case where ExperimentalGlobalExporter != "" and still
pass cfg.GlobalExporterReset through to globalExporterModule.

In `@internal/replication/drivers/registry.go`:
- Around line 104-128: CreateFromConfig applies defaults but never runs the
config validation step; after calling SetDefaults on driverConfig (the
config.Defaulter branch) invoke the existing ValidateConfig function (or the
package's validation helper) with driverConfig and return any validation error
before calling the driver constructor. Ensure you validate the same config
instance produced by extractConfigType (i.e., the driverConfig variable) and
short-circuit with an error if validation fails so invalid configs don't reach
reflect.ValueOf(driverConstructor).Call.

In `@internal/replication/global_exporter.go`:
- Around line 237-243: The code only persists state when lastLog.ID != nil which
can silently drop progress; change the logic in the batch-persist section
(referencing lastLog, logs.Data and r.store.UpdateGlobalExporterState) to always
persist state using a deterministic value even if lastLog.ID is nil — for
example persist 0 or derive an offset based on the batch (or enforce non-nil by
casting/validating DB results) and call r.store.UpdateGlobalExporterState(ctx,
ledgerName, persistedID) unconditionally; ensure the chosen persistedID is a
uint64 (matching UpdateGlobalExporterState) and document the behavior so
restarts won’t reprocess the same logs.

In `@internal/storage/system/migrations.go`:
- Around line 301-304: The new table _system.global_exporter_state defines
ledger as an unconstrained varchar primary key which allows stale exporter state
if a ledger row is deleted/recreated; change the schema to add a foreign key
constraint on ledger referencing the system ledger table (e.g.
_system.ledger(name) or the actual ledger PK) and add ON DELETE CASCADE (or the
appropriate cascade behavior) so exporter state rows are removed when the
corresponding ledger is deleted; ensure the ledger column type matches the
referenced PK and keep last_log_id as NOT NULL.

In `@test/e2e/api_global_exporter_test.go`:
- Around line 192-195: The test uses the shared ctx when stopping the server
which can cause unbounded waits; change the shutdown call to use the per-spec
context by replacing srv.Stop(ctx) with srv.Stop(specContext) in the block where
firstServer.Wait(specContext) returns (i.e., the code referencing srv and
firstServer.Wait/specContext) so the server shutdown honors the spec timeout.

---

Nitpick comments:
In `@internal/replication/global_exporter.go`:
- Around line 201-235: The goroutine calling driver.Accept can outlive the
function if driver.Accept ignores context cancellation; modify the push loop to
avoid a potential goroutine/shutdown hang by ensuring the goroutine's result
send to errChan is non-blocking or bounded by the stop signal: create errChan as
buffered (already 1) but also select when sending the error (e.g., select { case
errChan <- err: case <-r.stopChannel: } ), or wrap exportCtx with a timeout via
context.WithTimeout before calling driver.Accept so Accept is forced to return;
reference the goroutine that invokes driver.Accept, the errChan send,
exportCtx/cancel, and r.stopChannel/r.config.PushRetryPeriod when making the
change.
- Around line 76-159: Add simple observability around ledger catch-up inside
GlobalExporterRunner.Run: count ledgers processed in the inner ledger loop and
count logs exported inside exportLedgerLogs (or have exportLedgerLogs return the
number exported) and emit periodic structured logs or metric increments (e.g.,
r.logger.Infof or r.metrics.Inc if a metrics field exists) reporting
"ledgers_processed" and "logs_exported" and remaining backlog status; update Run
to aggregate per-iteration totals (use the cursor loop and variables like
states, cursor, and nextQuery) and log or emit metrics after each pagination
batch or when exportLedgerLogs signals HasMore so operators can see progress.
Ensure you update exportLedgerLogs signature/return to provide count information
if needed and reference facade, l.Name and lastLogID when emitting context in
logs.
- Around line 79-84: The current reset block logs a failure from
r.store.DeleteAllGlobalExporterStates(ctx) but continues startup; change this so
a reset error is propagated or treated as fatal: inside the r.config.Reset
condition, if DeleteAllGlobalExporterStates returns an error, return that error
(or call a controlled shutdown/exit) instead of just r.logger.Errorf, so the
caller of the function handling startup receives the failure; update any
surrounding function signatures to return error if necessary and ensure callers
handle the propagated error appropriately (references: r.config.Reset,
r.store.DeleteAllGlobalExporterStates, r.logger.Infof, r.logger.Errorf).

In `@internal/storage/system/store.go`:
- Around line 420-429: The DeleteAllGlobalExporterStates method currently uses
Where("TRUE") to delete all rows for model globalExporterState; replace that
with bun's cleaner API by calling WhereAllWithDeleted() on the query (or simply
remove the Where clause if globalExporterState has no soft-delete) in the
d.db.NewDelete().Model((*globalExporterState)(nil)) chain inside
DefaultStore.DeleteAllGlobalExporterStates so the intention to delete all rows
is expressed idiomatically.

ℹ️ Review info

Configuration used: Path: .coderabbit.yaml

Review profile: CHILL

Plan: Pro

Disabled knowledge base sources:

  • Linear integration is disabled

You can enable these sources in your CodeRabbit configuration.

📥 Commits

Reviewing files that changed from the base of the PR and between 5a97716 and fc56c7f.

📒 Files selected for processing (10)
  • cmd/config.go
  • cmd/root.go
  • cmd/serve.go
  • internal/controller/system/store.go
  • internal/replication/drivers/registry.go
  • internal/replication/global_exporter.go
  • internal/storage/system/migrations.go
  • internal/storage/system/store.go
  • pkg/testserver/server.go
  • test/e2e/api_global_exporter_test.go

@codecov
Copy link
Copy Markdown

codecov bot commented Feb 27, 2026

Codecov Report

❌ Patch coverage is 73.13433% with 90 lines in your changes missing coverage. Please review.
✅ Project coverage is 78.10%. Comparing base (5a97716) to head (9896413).
⚠️ Report is 17 commits behind head on main.

Files with missing lines Patch % Lines
internal/replication/global_exporter.go 88.63% 11 Missing and 4 partials ⚠️
internal/storage/system/store.go 54.83% 7 Missing and 7 partials ⚠️
internal/replication/drivers/registry.go 33.33% 5 Missing and 7 partials ⚠️
cmd/worker.go 76.19% 5 Missing and 5 partials ⚠️
internal/replication/manager.go 73.52% 2 Missing and 7 partials ⚠️
internal/replication/module.go 73.52% 4 Missing and 5 partials ⚠️
cmd/config.go 14.28% 2 Missing and 4 partials ⚠️
internal/replication/drivers/log.go 50.00% 2 Missing and 3 partials ⚠️
cmd/serve.go 20.00% 1 Missing and 3 partials ⚠️
internal/replication/pipeline.go 66.66% 2 Missing and 1 partial ⚠️
... and 2 more
Additional details and impacted files
@@            Coverage Diff             @@
##             main    #1273      +/-   ##
==========================================
- Coverage   80.78%   78.10%   -2.68%     
==========================================
  Files         205      206       +1     
  Lines       10929    11301     +372     
==========================================
- Hits         8829     8827       -2     
- Misses       1524     1640     +116     
- Partials      576      834     +258     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

@Azorlogh Azorlogh force-pushed the feat/global-logs-exporter branch from 9fb59bb to 8cf2603 Compare February 27, 2026 18:52
Copy link
Copy Markdown
Contributor

@gfyrag gfyrag left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review — Global Logs Exporter

Problèmes critiques

1. openLedger appelé à chaque cycle pour chaque ledger — N requêtes SQL/seconde inutiles

internal/replication/global_exporter.go:170

À chaque tick de poll (1s par défaut), openLedger est appelé pour chaque ledger, même s'il n'y a aucun nouveau log. Cela déclenche un sysDriver.OpenLedger qui exécute un GetLedger SQL + crée un store à chaque fois.

Avec 100 ledgers, c'est 100 requêtes/seconde en régime stable sans rien à faire.

Suggestion : mettre en cache les LogFetcher par nom de ledger.


2. L'erreur du reset est avalée

internal/replication/global_exporter.go:79-84

if err := r.store.DeleteAllGlobalExporterStates(ctx); err != nil {
    r.logger.Errorf("Failed to reset global exporter state: %v", err)
    // continue comme si de rien n'était...
}

L'opérateur a explicitement demandé --global-exporter-reset. Si le reset échoue, le runner continue avec l'ancien état, exportant potentiellement des logs partiels. L'échec de cette opération devrait être fatal (return error / ne pas démarrer).


3. ListGlobalExporterStates exécuté à chaque seconde sans cache

internal/replication/global_exporter.go:121

La map complète ledger → lastLogID est rechargée depuis la DB à chaque cycle de poll, même quand rien n'a changé. Un cache local invalidé uniquement après un export réussi suffirait.


Problèmes importants

4. Double Shutdown bloque indéfiniment

internal/replication/global_exporter.go:266-281

stopChannel a un buffer de 1. Si Shutdown est appelé deux fois (ex: redondance FX), le second appel envoie dans le channel, mais Run est déjà terminé et ne consommera jamais ce second message. Le second appel reste bloqué pour toujours.

Suggestion : protéger avec sync.Once.


5. L'erreur de UpdateGlobalExporterState est avalée

internal/replication/global_exporter.go:240-242

Si la sauvegarde de l'état échoue après un push réussi, au prochain cycle les mêmes logs seront ré-exportés. Cela suppose implicitement que le driver est idempotent, ce qui n'est ni documenté ni garanti. En cas d'erreur DB persistante → boucle infinie de ré-export.


6. Commentaire mensonger

internal/replication/global_exporter.go:118-119

// Reset to default poll interval; exportLedgerLogs may set it to 0 if HasMore

exportLedgerLogs ne modifie jamais nextInterval (variable locale à Run). Le commentaire est faux.


7. Pas de timeout sur le facade.Stop dans le defer

internal/replication/global_exporter.go:91

defer func() {
    if err := facade.Stop(ctx); err != nil { ... }
}()

Le ctx ici est un context.WithoutCancel, donc sans deadline. Si le driver tarde à s'arrêter, cet appel bloque indéfiniment. Comparer avec le Manager qui reçoit le ctx du OnStop FX (avec timeout intégré).


Axes d'amélioration

8. Flags déclarés au mauvais endroit

cmd/root.go:42-43

--experimental-global-exporter et --global-exporter-reset sont sur PersistentFlags() du root (donc disponibles pour version, migrate, buckets...). Ils ne concernent que serve.


9. Aucun test unitaire

Pas de global_exporter_test.go. Seuls des tests e2e existent. Le PipelineHandler et le Manager ont tous deux des tests unitaires. Un composant aussi critique (boucle autonome avec retry, concurrence, state management) en a besoin pour tester :

  • Shutdown pendant différentes phases (avant ready, pendant export, pendant retry)
  • Comportement de retry
  • Pagination multi-ledger
  • Reset

10. Tests e2e : aucun test multi-ledger

Le test e2e ne teste qu'un seul ledger "default". Or la raison d'être du "global" exporter est justement de couvrir tous les ledgers. Il manque un scénario avec 2+ ledgers.


11. Duplication du pattern fetch-push-persist avec PipelineHandler

Les deux composants suivent exactement le même schéma (poll → fetch logs id > last → push vers driver avec retry → persist state → stop channel). La logique commune pourrait être extraite.


12. Traitement séquentiel des ledgers

Les logs de chaque ledger sont exportés séquentiellement (for _, l := range cursor.Data). Si un ledger a un gros retard, il bloque tous les suivants. Un pool de workers serait plus performant à terme.


13. Couplage du wiring FX

cmd/serve.go:270-291

La conversion OpenLedger → LogFetcher est faite inline, dupliquant le même pattern que storageAdapter utilisé par le Manager. Réutiliser l'adaptateur existant éviterait la duplication.


14. Migration sans FK ni contrainte de taille

internal/storage/system/migrations.go:297-308

La colonne ledger n'a pas de FK vers _system.ledgers(name), ni de contrainte de taille (vs varchar(63) pour les ledgers). Si un ledger est supprimé, son entrée dans global_exporter_state reste orpheline.


15. time.After dans une boucle

internal/replication/global_exporter.go:115

time.After crée un timer non libéré jusqu'à expiration. Un time.NewTimer avec Reset serait plus propre (pattern aussi présent dans pipeline.go, mais autant le corriger ici).


🤖 Generated with Claude Code

Copy link
Copy Markdown
Contributor

@gfyrag gfyrag left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review — Global Logs Exporter

(English version — supersedes the previous French review)

Critical Issues

1. openLedger called every cycle for every ledger — N SQL queries/second for nothing

internal/replication/global_exporter.go:170

On every poll tick (1s by default), openLedger is called for every ledger, even when there are no new logs. This triggers sysDriver.OpenLedger which runs a GetLedger SQL query + creates a new store each time.

With 100 ledgers, that's 100 queries/second at steady state with nothing to do.

Suggestion: cache LogFetcher instances per ledger name.


2. Reset error is silently swallowed

internal/replication/global_exporter.go:79-84

if err := r.store.DeleteAllGlobalExporterStates(ctx); err != nil {
    r.logger.Errorf("Failed to reset global exporter state: %v", err)
    // continues as if nothing happened...
}

The operator explicitly requested --global-exporter-reset. If the reset fails, the runner continues with stale state, potentially exporting partial logs. This failure should be fatal (return error / refuse to start).


3. ListGlobalExporterStates executed every second with no cache

internal/replication/global_exporter.go:121

The full ledger → lastLogID map is reloaded from the DB on every poll cycle, even when nothing has changed. A local cache invalidated only after a successful export would suffice.


Important Issues

4. Double Shutdown blocks forever

internal/replication/global_exporter.go:266-281

stopChannel has a buffer of 1. If Shutdown is called twice (e.g. FX lifecycle redundancy), the second call sends into the channel, but Run has already terminated and will never consume this second message. The second call hangs indefinitely.

Suggestion: guard with sync.Once.


5. UpdateGlobalExporterState error is silently swallowed

internal/replication/global_exporter.go:240-242

If persisting the state fails after a successful push, the same logs will be re-exported on the next cycle. This implicitly assumes the driver is idempotent, which is neither documented nor guaranteed. With a persistent DB error → infinite re-export loop.


6. Misleading comment

internal/replication/global_exporter.go:118-119

// Reset to default poll interval; exportLedgerLogs may set it to 0 if HasMore

exportLedgerLogs never modifies nextInterval (it's a local variable in Run). The comment is wrong.


7. No timeout on facade.Stop in the defer

internal/replication/global_exporter.go:91

defer func() {
    if err := facade.Stop(ctx); err != nil { ... }
}()

ctx here is a context.WithoutCancel, so it has no deadline. If the driver is slow to stop, this call blocks forever. Compare with the Manager which receives the ctx from FX's OnStop (which has a built-in timeout).


Improvements

8. Flags declared at the wrong level

cmd/root.go:42-43

--experimental-global-exporter and --global-exporter-reset are on the root's PersistentFlags() (so they're available for version, migrate, buckets...). They only apply to serve.


9. No unit tests

There is no global_exporter_test.go. Only e2e tests exist. Both PipelineHandler and Manager have unit tests. A critical autonomous component like this (poll loop, retry, concurrency, state management) needs unit tests covering:

  • Shutdown during different phases (before ready, during export, during retry)
  • Retry behavior
  • Multi-ledger pagination
  • Reset

10. E2E tests: no multi-ledger scenario

The e2e test only uses a single ledger "default". The whole point of a "global" exporter is to cover all ledgers. A scenario with 2+ ledgers is missing.


11. Duplicated fetch-push-persist pattern with PipelineHandler

Both components follow the exact same pattern (poll → fetch logs id > last → push to driver with retry → persist state → stop channel). The common logic could be extracted into a shared component.


12. Sequential processing of ledgers

Logs for each ledger are exported sequentially in for _, l := range cursor.Data. If one ledger has a large backlog, it blocks all subsequent ones. A worker pool would scale better.


13. FX wiring duplication

cmd/serve.go:270-291

The OpenLedger → LogFetcher conversion is done inline, duplicating the same pattern as storageAdapter used by the Manager. Reusing the existing adapter would avoid duplication.


14. Migration: no FK, no size constraint

internal/storage/system/migrations.go:297-308

The ledger column has no FK to _system.ledgers(name) and no size constraint (vs varchar(63) for ledgers). If a ledger is deleted, its entry in global_exporter_state remains orphaned.


15. time.After in a loop

internal/replication/global_exporter.go:115

time.After creates a timer that isn't freed until it fires. A time.NewTimer with Reset would be cleaner (same pattern exists in pipeline.go, but worth fixing here).


🤖 Generated with Claude Code

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (1)
internal/replication/global_exporter.go (1)

130-131: Update the stale nextInterval comment.

Line 130 says exportLedgerLogs may set nextInterval to 0, but that function cannot mutate it. This is misleading for maintainers—please update/remove the comment.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@internal/replication/global_exporter.go` around lines 130 - 131, The comment
on the assignment to nextInterval is stale/misleading because exportLedgerLogs
does not modify nextInterval; update or remove it so it accurately describes
behavior: either remove the mention of exportLedgerLogs setting nextInterval to
0, or replace with a short note that nextInterval is reset to
r.config.PollInterval and any adjustments must be done by the caller after
exportLedgerLogs returns. Refer to the nextInterval variable, the
r.config.PollInterval value, and the exportLedgerLogs function when making the
change.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@internal/replication/global_exporter.go`:
- Around line 91-95: The code logs an error when r.config.Reset is true but
r.store.DeleteAllGlobalExporterStates(ctx) fails, yet continues startup; change
this to fail fast by returning the error (or otherwise terminating startup) so
the operator intent is enforced—modify the block around r.config.Reset in the
method that runs the global exporter so that if
r.store.DeleteAllGlobalExporterStates(ctx) returns an error you call
r.logger.Errorf with the error and then return that error (or call
os.Exit/propagate it up) instead of continuing; reference r.config.Reset,
r.logger.Infof/r.logger.Errorf, and r.store.DeleteAllGlobalExporterStates(ctx)
when making the change.

---

Nitpick comments:
In `@internal/replication/global_exporter.go`:
- Around line 130-131: The comment on the assignment to nextInterval is
stale/misleading because exportLedgerLogs does not modify nextInterval; update
or remove it so it accurately describes behavior: either remove the mention of
exportLedgerLogs setting nextInterval to 0, or replace with a short note that
nextInterval is reset to r.config.PollInterval and any adjustments must be done
by the caller after exportLedgerLogs returns. Refer to the nextInterval
variable, the r.config.PollInterval value, and the exportLedgerLogs function
when making the change.

ℹ️ Review info

Configuration used: Path: .coderabbit.yaml

Review profile: CHILL

Plan: Pro

Disabled knowledge base sources:

  • Linear integration is disabled

You can enable these sources in your CodeRabbit configuration.

📥 Commits

Reviewing files that changed from the base of the PR and between bdb37c7 and 3e7f50b.

📒 Files selected for processing (2)
  • cmd/serve.go
  • internal/replication/global_exporter.go

@flemzord
Copy link
Copy Markdown
Member

flemzord commented Mar 4, 2026

Additional Architecture Review Findings

After a collaborative review (Claude + Codex), we found 3 issues that haven't been raised in existing reviews:


1. [HIGH] Watermark skippage — failed ledger permanently lost

internal/replication/global_exporter.go:210-247

When iterating over discovered ledgers, if getLogFetcher fails for a ledger but subsequent ledgers succeed, lastSeenLedgerID advances past the failed one. Since the next poll queries id > lastSeenLedgerID, the failed ledger is never retried.

Example with ledgers [id=1 "foo"], [id=2 "bar"], [id=3 "baz"]:

1. getLogFetcher("foo") fails → continue
2. getLogFetcher("bar") succeeds → handler created, lastSeenLedgerID = 2
3. getLogFetcher("baz") succeeds → handler created, lastSeenLedgerID = 3

Next poll: WHERE id > 3 → ledger "foo" (id=1) is permanently skipped

Suggested fix: Don't advance the watermark past failed ledgers. Either:

  • Only advance lastSeenLedgerID to the minimum contiguous successfully-started ID
  • Use a startedLedgers set and do a full scan periodically
  • Maintain an explicit retry queue for failed ledgers

2. [MEDIUM] Global exporter bypasses the batching driver wrapper

internal/replication/module.go:73-76 vs module.go:99-104

The Manager pipeline decorates the driver Factory with NewWithBatchingDriverFactory (via fx.Decorate), so all per-ledger pipeline drivers benefit from batching. However, NewFXGlobalExporterModule creates the driver directly via registry.CreateFromConfig, which skips the batching wrapper entirely.

This means the global exporter and per-ledger pipelines have inconsistent behavior for the same driver type — batching config in the driver JSON is silently ignored by the global exporter.

Suggested fix: Apply drivers.NewWithBatchingDriver() wrapping to the driver created in NewFXGlobalExporterModule, or route through the same decorated Factory.


3. [MEDIUM] No multi-instance protection — duplicate exports

internal/storage/system/store.go:390-403 + global_exporter.go:155-158

The global_exporter_state table uses ledger as PK with ON CONFLICT DO UPDATE for state persistence. If two worker instances run with --worker-global-exporter against the same database, they will:

  1. Read the same initial state
  2. Export the same logs in parallel (duplicates sent to the driver)
  3. Race on the UPSERT, with last-write-wins semantics

There is no advisory lock, leader election, or instance-scoping mechanism. The per-ledger Manager pipelines don't have this issue because pipeline state is scoped by pipeline ID.

Suggested fix: Either:

  • Document that only one instance should run with global exporter enabled
  • Add a pg_advisory_lock at startup to ensure single-writer
  • Scope the state table with an instance identifier

@Azorlogh
Copy link
Copy Markdown
Contributor Author

Azorlogh commented Mar 5, 2026

  1. [MEDIUM] No multi-instance protection — duplicate exports

This one is out of scope for this PR - we already assume only 1 worker to be launched

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants