Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
bcac5d1
treedb: expose vlog generation maintenance bytes in stats
snissn Mar 28, 2026
9ce5339
treedb: add maintenance and vacuum skip counters
snissn Mar 28, 2026
bf11ec9
treedb: coalesce maintenance retries under load
snissn Mar 28, 2026
c83e2d6
treedb: skip hot periodic maintenance preflight
snissn Mar 28, 2026
31fbb0a
treedb: add live vacuum/rewrite economics instrumentation
snissn Mar 28, 2026
7cc50d6
treedb: stop maintenance retry collision amplification
snissn Mar 28, 2026
0035557
treedb: add stage-gate and rewrite segment counters
snissn Mar 28, 2026
0dd0b18
treedb: speed staged rewrite debt progression
snissn Mar 28, 2026
17a907e
treedb: add rewrite no-reclaim diagnostics
snissn Mar 28, 2026
b8d9186
treedb: export gc blocker classification stats
snissn Mar 28, 2026
29ec371
worklog: record gc blocker readout from run_celestia
snissn Mar 28, 2026
aebe803
treedb: split gc protected blockers by class
snissn Mar 28, 2026
bcfc7ad
treedb: instrument retained prune scheduling and force preemption
snissn Mar 28, 2026
da497ab
treedb: retry forced retained-prune scan without write gate
snissn Mar 28, 2026
c200756
treedb: force retained prune when vlog hard cap is exceeded
snissn Mar 28, 2026
3b30044
treedb: add retained-prune reason counters
snissn Mar 28, 2026
8c101d6
treedb: instrument rewrite-plan empty reasons
snissn Mar 28, 2026
0466222
treedb: add env overrides for rewrite budget and triggers
snissn Mar 28, 2026
6ee065b
treedb: split rewrite cancel metrics by fresh vs queued debt
snissn Mar 28, 2026
fa183f0
treedb: bound fresh-plan rewrite exec to avoid foreground preemption
snissn Mar 28, 2026
d7dca39
caching: add rewrite source-segment outcome observability
snissn Mar 28, 2026
e6f54aa
caching: probe rewrite source segments through gc protection buckets
snissn Mar 28, 2026
0813e22
caching: trace rewrite-observed retained prune outcomes
snissn Mar 28, 2026
d0898f7
caching: replay observed-source gc after retained prune
snissn Mar 28, 2026
60c1639
caching: instrument observed-source replay gc queue
snissn Mar 28, 2026
c07a887
caching: keep bypass-quiet gc alive under foreground resume
snissn Mar 28, 2026
7693898
tools: add live vlog maintenance capacity analyzer
snissn Mar 28, 2026
431d323
worklog: record high-budget live rewrite run
snissn Mar 28, 2026
f587805
caching: expose cumulative observed-source gc totals
snissn Mar 28, 2026
4a959bb
tools: include retained-prune outcomes in capacity report
snissn Mar 28, 2026
e7dd2a3
caching: accelerate observed-source retained prune pacing
snissn Mar 28, 2026
e7ef338
caching: add observed-prune and zombie lifecycle diagnostics
snissn Mar 28, 2026
a9e6fc3
caching: honor configured stale-ratio threshold in generic rewrite
snissn Mar 28, 2026
8e9a018
caching: add pre-checkpoint rewrite override for WAL-off runs
snissn Mar 28, 2026
8806b4e
analyzer: surface rewrite plan-empty reasons
snissn Mar 28, 2026
db46ff7
vlog: add observed-source protection mix counters
snissn Mar 28, 2026
6cc124c
worklog: capture protection-mix validation run
snissn Mar 28, 2026
71be1df
vlog: add observed-source retry budget and celestia a/b harness
snissn Mar 28, 2026
4915591
bench: export env file vars in run_celestia a/b harness
snissn Mar 28, 2026
27e7fe3
bench: avoid login-shell startup in celestia a/b harness
snissn Mar 28, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2,046 changes: 1,864 additions & 182 deletions TreeDB/caching/db.go

Large diffs are not rendered by default.

313 changes: 312 additions & 1 deletion TreeDB/caching/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1953,6 +1953,57 @@ func TestCachingDB_PrunesRetainedValueLog(t *testing.T) {
}
}

func TestOpen_InitializesRetainedClosedBytesFromExistingSegments(t *testing.T) {
dir := t.TempDir()

opts := Options{
DisableWAL: true,
RelaxedSync: true,
AllowUnsafe: true,
FlushThreshold: 1 << 20,
ValueLogPointerThreshold: 1,
}

backend1, err := db.Open(db.Options{Dir: dir, ChunkSize: 64 * 1024})
if err != nil {
t.Fatalf("backend1 open: %v", err)
}
cache1, err := Open(dir, backend1, opts)
if err != nil {
_ = backend1.Close()
t.Fatalf("cache1 open: %v", err)
}

if err := cache1.Set([]byte("k"), bytes.Repeat([]byte("x"), page.DefaultInlineThreshold+256)); err != nil {
t.Fatalf("Set: %v", err)
}
cache1.flushAll(false)
if err := cache1.rotateValueLogLocked(&cache1.lanes[0]); err != nil {
t.Fatalf("rotateValueLogLocked: %v", err)
}
if got := cache1.valueLogRetainedClosedBytes.Load(); got <= 0 {
t.Fatalf("pre-close retained closed bytes=%d want >0", got)
}
if err := cache1.Close(); err != nil {
t.Fatalf("cache1 close: %v", err)
}

backend2, err := db.Open(db.Options{Dir: dir, ChunkSize: 64 * 1024})
if err != nil {
t.Fatalf("backend2 open: %v", err)
}
cache2, err := Open(dir, backend2, opts)
if err != nil {
_ = backend2.Close()
t.Fatalf("cache2 open: %v", err)
}
defer cache2.Close()

if got := cache2.valueLogRetainedClosedBytes.Load(); got <= 0 {
t.Fatalf("reopen retained closed bytes=%d want >0", got)
}
}

func TestPruneRetainedValueLogs_SkipsLiveScanWhenAllRetainedPathsInUse(t *testing.T) {
dir := t.TempDir()
backend := NewMockBackend()
Expand All @@ -1976,7 +2027,7 @@ func TestPruneRetainedValueLogs_SkipsLiveScanWhenAllRetainedPathsInUse(t *testin
}
cache.markValueLogRetain(retained)

cache.pruneRetainedValueLogs()
pruneStats := cache.pruneRetainedValueLogs(false)

backend.mu.RLock()
iteratorCalls := backend.iteratorCalls
Expand All @@ -1987,6 +2038,12 @@ func TestPruneRetainedValueLogs_SkipsLiveScanWhenAllRetainedPathsInUse(t *testin
if !cache.valueLogRetained(retained) {
t.Fatalf("expected in-use retained path to remain retained")
}
if pruneStats.InUseSkippedSegments != 1 {
t.Fatalf("InUseSkippedSegments=%d want 1", pruneStats.InUseSkippedSegments)
}
if pruneStats.CandidateSegments != 0 {
t.Fatalf("CandidateSegments=%d want 0", pruneStats.CandidateSegments)
}
}

func seedRetainedPrunePressure(cache *DB, retainedPath string, size int64) {
Expand Down Expand Up @@ -2228,6 +2285,121 @@ func TestRetainedValueLogPrune_AbortsWhenForegroundWritesResume(t *testing.T) {
}
}

func TestRetainedValueLogPruneForce_RetriesAfterForegroundWritesResume(t *testing.T) {
dir := t.TempDir()
backend := NewMockBackend()
backend.iteratorStartedCh = make(chan struct{})
backend.iteratorBlockCh = make(chan struct{})

cache, err := Open(dir, backend, Options{
DisableWAL: true,
RelaxedSync: true,
AllowUnsafe: true,
FlushThreshold: 1 << 20,
ValueLogPointerThreshold: 1,
})
if err != nil {
t.Fatalf("cache open: %v", err)
}
defer cache.Close()

fileID, err := valuelog.EncodeFileID(0, 212)
if err != nil {
t.Fatalf("EncodeFileID: %v", err)
}
retainedPath := filepath.Join(dir, "wal", "value-l0-000212.log")
if err := os.MkdirAll(filepath.Dir(retainedPath), 0o755); err != nil {
t.Fatalf("MkdirAll: %v", err)
}
w, err := valuelog.NewWriter(retainedPath, fileID)
if err != nil {
t.Fatalf("NewWriter: %v", err)
}
if _, err := w.Append(0, nil, 1, bytes.Repeat([]byte("r"), 128)); err != nil {
_ = w.Close()
t.Fatalf("Append: %v", err)
}
if err := w.Close(); err != nil {
t.Fatalf("Close writer: %v", err)
}
cache.markValueLogRetain(retainedPath)
seedRetainedPrunePressure(cache, retainedPath, 2<<30)
cache.lastForegroundWriteUnixNano.Store(time.Now().Add(-2 * retainedPruneQuietWindow).UnixNano())

cache.scheduleRetainedValueLogPruneForce()

select {
case <-backend.iteratorStartedCh:
case <-time.After(2 * time.Second):
t.Fatalf("forced prune did not start")
}

lastWrite := cache.lastForegroundWriteUnixNano.Load()
deadline := time.Now().Add(2 * time.Second)
for !cache.foregroundWritesResumedSince(lastWrite) {
if time.Now().After(deadline) {
t.Fatalf("foreground write timestamp did not advance")
}
cache.noteWrite()
time.Sleep(time.Millisecond)
}
close(backend.iteratorBlockCh)
cache.waitForRetainedValueLogPrune()

if cache.valueLogRetained(retainedPath) {
t.Fatalf("retained path still marked after forced retry prune")
}
stats := cache.Stats()
if got := stats["treedb.cache.vlog_retained_prune.forced_runs"]; got != "1" {
t.Fatalf("forced_runs=%q want 1", got)
}
if got := stats["treedb.cache.vlog_retained_prune.foreground_abort_runs"]; got != "0" {
t.Fatalf("foreground_abort_runs=%q want 0", got)
}
if got := stats["treedb.cache.vlog_retained_prune.write_gate_retries"]; got != "1" {
t.Fatalf("write_gate_retries=%q want 1", got)
}
if got := stats["treedb.cache.vlog_retained_prune.write_gate_retry_successes"]; got != "1" {
t.Fatalf("write_gate_retry_successes=%q want 1", got)
}
}

func TestAllowValueLogPointers_HardCapRequestsForcedRetainedPrune(t *testing.T) {
cache := &DB{}
cache.testSkipRetainedPrune = true
cache.maxValueLogRetainedBytesHard = 1024
cache.valueLogRetainedClosedBytes.Store(2048)

if cache.allowValueLogPointers() {
t.Fatalf("allowValueLogPointers=true, want false when hard cap exceeded")
}
if got := cache.retainedValueLogPruneScheduleForcedRequests.Load(); got != 1 {
t.Fatalf("schedule_forced_requests=%d want 1 after first hard-cap crossing", got)
}

// Re-check while still over cap should not repeatedly re-schedule until
// retained bytes drop back below the hard cap.
if cache.allowValueLogPointers() {
t.Fatalf("allowValueLogPointers=true on repeated over-cap check, want false")
}
if got := cache.retainedValueLogPruneScheduleForcedRequests.Load(); got != 1 {
t.Fatalf("schedule_forced_requests=%d want 1 after repeated over-cap check", got)
}

cache.valueLogRetainedClosedBytes.Store(0)
if !cache.allowValueLogPointers() {
t.Fatalf("allowValueLogPointers=false, want true after dropping below hard cap")
}

cache.valueLogRetainedClosedBytes.Store(4096)
if cache.allowValueLogPointers() {
t.Fatalf("allowValueLogPointers=true after second hard-cap crossing, want false")
}
if got := cache.retainedValueLogPruneScheduleForcedRequests.Load(); got != 2 {
t.Fatalf("schedule_forced_requests=%d want 2 after second hard-cap crossing", got)
}
}

func TestCheckpoint_RateLimitsRetainedValueLogPrune(t *testing.T) {
dir := t.TempDir()
backend := NewMockBackend()
Expand Down Expand Up @@ -2361,6 +2533,145 @@ func TestCheckpoint_SkipsRetainedValueLogPruneBelowPressureThreshold(t *testing.
if cache.retainedPruneActive() {
cache.waitForRetainedValueLogPrune()
}
stats := cache.Stats()
if got := stats["treedb.cache.vlog_retained_prune.schedule_requests"]; got != "1" {
t.Fatalf("schedule_requests=%q want 1", got)
}
if got := stats["treedb.cache.vlog_retained_prune.schedule_forced_requests"]; got != "0" {
t.Fatalf("schedule_forced_requests=%q want 0", got)
}
if got := stats["treedb.cache.vlog_retained_prune.schedule_skip.below_pressure"]; got != "1" {
t.Fatalf("schedule_skip.below_pressure=%q want 1", got)
}
if got := stats["treedb.cache.vlog_retained_prune.closed_bytes"]; got != "128" {
t.Fatalf("closed_bytes=%q want 128", got)
}
}

func TestRetainedValueLogPruneForce_BypassesPressureThreshold(t *testing.T) {
dir := t.TempDir()
backend := NewMockBackend()
backend.iteratorStartedCh = make(chan struct{})
backend.iteratorBlockCh = make(chan struct{})

cache, err := Open(dir, backend, Options{
DisableWAL: true,
RelaxedSync: true,
AllowUnsafe: true,
FlushThreshold: 1 << 20,
MaxValueLogRetainedBytes: 1 << 20,
ValueLogPointerThreshold: 1,
})
if err != nil {
t.Fatalf("cache open: %v", err)
}
defer cache.Close()

fileID, err := valuelog.EncodeFileID(0, 245)
if err != nil {
t.Fatalf("EncodeFileID: %v", err)
}
retainedPath := filepath.Join(dir, "wal", "value-l0-000245.log")
if err := os.MkdirAll(filepath.Dir(retainedPath), 0o755); err != nil {
t.Fatalf("MkdirAll: %v", err)
}
w, err := valuelog.NewWriter(retainedPath, fileID)
if err != nil {
t.Fatalf("NewWriter: %v", err)
}
if _, err := w.Append(0, nil, 1, bytes.Repeat([]byte("t"), 128)); err != nil {
_ = w.Close()
t.Fatalf("Append: %v", err)
}
if err := w.Close(); err != nil {
t.Fatalf("Close writer: %v", err)
}
cache.markValueLogRetain(retainedPath)
seedRetainedPrunePressure(cache, retainedPath, 128)
cache.lastForegroundWriteUnixNano.Store(time.Now().Add(-2 * retainedPruneQuietWindow).UnixNano())

cache.scheduleRetainedValueLogPruneForce()

select {
case <-backend.iteratorStartedCh:
case <-time.After(2 * time.Second):
t.Fatalf("forced retained prune did not start below pressure threshold")
}
close(backend.iteratorBlockCh)
cache.waitForRetainedValueLogPrune()
stats := cache.Stats()
if got := stats["treedb.cache.vlog_retained_prune.schedule_forced_requests"]; got != "1" {
t.Fatalf("schedule_forced_requests=%q want 1", got)
}
if got := stats["treedb.cache.vlog_retained_prune.forced_runs"]; got != "1" {
t.Fatalf("forced_runs=%q want 1", got)
}
}

func TestRetainedValueLogPruneForce_PreemptsQuietWait(t *testing.T) {
dir := t.TempDir()
backend := NewMockBackend()
backend.iteratorStartedCh = make(chan struct{})
backend.iteratorBlockCh = make(chan struct{})

cache, err := Open(dir, backend, Options{
DisableWAL: true,
RelaxedSync: true,
AllowUnsafe: true,
FlushThreshold: 1 << 20,
ValueLogPointerThreshold: 1,
})
if err != nil {
t.Fatalf("cache open: %v", err)
}
defer cache.Close()

fileID, err := valuelog.EncodeFileID(0, 246)
if err != nil {
t.Fatalf("EncodeFileID: %v", err)
}
retainedPath := filepath.Join(dir, "wal", "value-l0-000246.log")
if err := os.MkdirAll(filepath.Dir(retainedPath), 0o755); err != nil {
t.Fatalf("MkdirAll: %v", err)
}
w, err := valuelog.NewWriter(retainedPath, fileID)
if err != nil {
t.Fatalf("NewWriter: %v", err)
}
if _, err := w.Append(0, nil, 1, bytes.Repeat([]byte("u"), 128)); err != nil {
_ = w.Close()
t.Fatalf("Append: %v", err)
}
if err := w.Close(); err != nil {
t.Fatalf("Close writer: %v", err)
}
cache.markValueLogRetain(retainedPath)
seedRetainedPrunePressure(cache, retainedPath, 2<<30)
cache.lastForegroundWriteUnixNano.Store(time.Now().UnixNano())

cache.scheduleRetainedValueLogPrune()
select {
case <-backend.iteratorStartedCh:
t.Fatalf("retained prune started before quiet window elapsed")
case <-time.After(retainedPruneNegativeAssertWait):
}

cache.scheduleRetainedValueLogPruneForce()

select {
case <-backend.iteratorStartedCh:
case <-time.After(2 * time.Second):
t.Fatalf("forced retained prune did not preempt quiet-window wait")
}
close(backend.iteratorBlockCh)
cache.waitForRetainedValueLogPrune()
stats := cache.Stats()
if got := stats["treedb.cache.vlog_retained_prune.schedule_forced_requests"]; got != "1" {
t.Fatalf("schedule_forced_requests=%q want 1", got)
}
if got := stats["treedb.cache.vlog_retained_prune.forced_runs"]; got != "1" {
t.Fatalf("forced_runs=%q want 1", got)
}
}

func TestCheckpoint_DoesNotWaitForPriorRetainedValueLogPrune(t *testing.T) {
Expand Down
2 changes: 2 additions & 0 deletions TreeDB/caching/expvar_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,8 @@ func selectTreeDBExpvarStats(stats map[string]string) map[string]any {
strings.HasPrefix(k, "treedb.cache.vlog_payload_split.") ||
strings.HasPrefix(k, "treedb.cache.vlog_auto.") ||
strings.HasPrefix(k, "treedb.cache.vlog_dict.") ||
strings.HasPrefix(k, "treedb.cache.vlog_generation.") ||
strings.HasPrefix(k, "treedb.cache.vlog_retained_prune.") ||
Comment on lines +142 to +143
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Include vlog_zombie keys in expvar whitelist

The commit adds treedb.cache.vlog_zombie.* counters in DB.Stats() (used by the new maintenance analyzer), but selectTreeDBExpvarStats still whitelists only vlog_generation and vlog_retained_prune families here. In environments that consume diagnostics via this expvar filter (the default run-celestia path), zombie metrics are dropped and the analyzer’s zombie inventory fields read as zero, masking pinned-zombie buildup and skewing reclaim diagnostics.

Useful? React with 👍 / 👎.

strings.HasPrefix(k, "treedb.cache.vlog_payload_kind.") ||
Comment on lines 141 to 144
Copy link

Copilot AI Mar 28, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The selection comment says expvar exports only process-wide metric families plus a few cache/backend families (mmap/decode/batch-arena), but this function now also exports the full treedb.cache.vlog_generation.* family. Please update the comment to reflect the expanded scope so future readers don’t miss that maintenance stats are intentionally exposed via expvar.

Copilot uses AI. Check for mistakes.
Comment on lines 139 to 144
Copy link

Copilot AI Mar 28, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The inline comment above this prefix list says only certain cache/backend families are exported via expvar, but treedb.cache.vlog_generation.* is now included. Please update the comment to mention vlog_generation (or reword it so it stays accurate as this allowlist evolves).

Copilot uses AI. Check for mistakes.
strings.HasPrefix(k, "treedb.cache.vlog_outer_leaf_codec.") ||
strings.HasPrefix(k, "treedb.cache.batch_arena.") {
Comment on lines 139 to 146
Copy link

Copilot AI Mar 28, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The expvar selector now exports treedb.cache.vlog_generation.* and treedb.cache.vlog_retained_prune.*, but the capacity analyzer/runbook also relies on treedb.cache.vlog_zombie.* keys. Since that prefix is not allowlisted here, *.debug_vars.json snapshots won't include zombie inventory metrics and the report will silently show zeros. Add strings.HasPrefix(k, "treedb.cache.vlog_zombie.") to the allowlist (and consider a small selector test for one zombie key).

Copilot uses AI. Check for mistakes.
Expand Down
8 changes: 8 additions & 0 deletions TreeDB/caching/expvar_stats_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ func TestSelectTreeDBExpvarStatsFiltersAndCoerces(t *testing.T) {
"treedb.cache.vlog_dict.current_k": "32",
"treedb.cache.vlog_payload_kind.raw_bytes.single_value": "2048",
"treedb.cache.vlog_outer_leaf_codec.raw_bytes.lz4": "512",
"treedb.cache.vlog_generation.rewrite.reclaimed_bytes": "1234",
"treedb.cache.vlog_retained_prune.runs": "3",
"treedb.process.memory.heap_inuse_bytes": "4096",
"treedb.process.memory.pool_pressure_level": "critical",
"treedb.cache.batch_arena.pool_bytes_estimate": "65536",
Expand Down Expand Up @@ -80,6 +82,12 @@ func TestSelectTreeDBExpvarStatsFiltersAndCoerces(t *testing.T) {
if v, ok := got["treedb.cache.vlog_outer_leaf_codec.raw_bytes.lz4"].(int64); !ok || v != 512 {
t.Fatalf("vlog_outer_leaf_codec.raw_bytes.lz4=%T(%v) want int64(512)", got["treedb.cache.vlog_outer_leaf_codec.raw_bytes.lz4"], got["treedb.cache.vlog_outer_leaf_codec.raw_bytes.lz4"])
}
if v, ok := got["treedb.cache.vlog_generation.rewrite.reclaimed_bytes"].(int64); !ok || v != 1234 {
t.Fatalf("vlog_generation.rewrite.reclaimed_bytes=%T(%v) want int64(1234)", got["treedb.cache.vlog_generation.rewrite.reclaimed_bytes"], got["treedb.cache.vlog_generation.rewrite.reclaimed_bytes"])
}
if v, ok := got["treedb.cache.vlog_retained_prune.runs"].(int64); !ok || v != 3 {
t.Fatalf("vlog_retained_prune.runs=%T(%v) want int64(3)", got["treedb.cache.vlog_retained_prune.runs"], got["treedb.cache.vlog_retained_prune.runs"])
}
if v, ok := got["treedb.process.memory.heap_inuse_bytes"].(int64); !ok || v != 4096 {
t.Fatalf("heap_inuse_bytes=%T(%v) want int64(4096)", got["treedb.process.memory.heap_inuse_bytes"], got["treedb.process.memory.heap_inuse_bytes"])
}
Expand Down
Loading
Loading