From cc599f0182d475a868633397b4ea01c6257b0dcc Mon Sep 17 00:00:00 2001 From: Claude Date: Wed, 11 Feb 2026 08:15:18 +0000 Subject: [PATCH 1/2] add project roadmap covering correctness, testing, API, features, and docs https://claude.ai/code/session_01VSZuYevFutSrFay5XmL1qh --- ROADMAP.md | 176 +++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 176 insertions(+) create mode 100644 ROADMAP.md diff --git a/ROADMAP.md b/ROADMAP.md new file mode 100644 index 0000000..b6ee06f --- /dev/null +++ b/ROADMAP.md @@ -0,0 +1,176 @@ +# rebuf Roadmap + +This document outlines the planned improvements and features for rebuf, organized by priority. + +--- + +## Phase 1: Correctness & Reliability + +These are foundational issues that should be addressed before adding new features. + +### 1.1 Fix concurrency in `Write()` +The current `Write()` method only acquires the mutex for the final `Sync()` call, but the entire write path — including segment rotation, seeking, and buffered writes — is unprotected. Concurrent goroutine writes (as shown in `example.go`) can corrupt data or cause races. + +**Action:** Protect the entire `Write()` method with the mutex, not just the sync portion. + +### 1.2 Graceful shutdown of `syncPeriodically` goroutine +The background goroutine started in `Init()` runs forever with no shutdown signal. While `Close()` stops the ticker, the goroutine itself is never terminated, leaking it. + +**Action:** Use a `context.Context` or a done channel to cleanly stop the goroutine in `Close()`. + +### 1.3 Improve error handling in `Init()` +- `os.Mkdir` errors are silently ignored — a permission failure would go unnoticed. +- The temp file is opened twice (once in `Init`, once in `openExistingOrCreateNew`). + +**Action:** Propagate `os.Mkdir` errors; remove the duplicate file open. + +### 1.4 Data integrity via checksums +There is currently no mechanism to detect corrupted log entries. A single flipped bit in the 8-byte size prefix could cause the reader to consume arbitrary data. + +**Action:** Add a CRC32 checksum per entry (e.g., `[8-byte size][4-byte crc32][data]`) and validate on read/replay. + +--- + +## Phase 2: Testing + +### 2.1 Core package unit tests +The `rebuf` package has zero test coverage. The following test cases are needed: + +- `Init` with a fresh directory +- `Init` recovering from an existing directory with segments +- `Write` a single entry, then `Replay` it back +- `Write` enough data to trigger segment rotation +- `Write` enough data to trigger oldest-segment deletion (`maxSegments`) +- `Close` flushes and syncs correctly +- Round-trip: arbitrary `[]byte` data survives write + replay + +### 2.2 Concurrency tests +- Multiple goroutines calling `Write` concurrently (use `-race` detector) +- `Write` and `Replay` interleaving + +### 2.3 Edge case tests +- Zero-length data write +- Very large single write exceeding `MaxLogSize` +- `Replay` on an empty directory +- `Replay` on a directory with only a `.tmp` file +- Corrupt segment file handling + +### 2.4 Benchmarks +- `BenchmarkWrite` — single-entry write throughput +- `BenchmarkWriteParallel` — concurrent write throughput +- `BenchmarkReplay` — replay throughput over N entries + +### 2.5 Expand utils tests +- `TestGetLatestSegmentId` (currently a stub) +- `TestGetNumSegments` +- `TestGetOldestSegmentFile` +- `TestFileSize` + +--- + +## Phase 3: API Improvements + +### 3.1 Follow Go naming conventions +Rename `Init` to `New` or `Open` to follow standard Go constructor patterns. The current `Init` name implies it modifies global state. + +### 3.2 Add `context.Context` support +- `Init` / `New` should accept a context for cancellation. +- `Write` and `Replay` should accept a context so callers can cancel long-running operations. + +### 3.3 Expose read-only state +Add methods to query internal state without breaking encapsulation: +- `SegmentCount() int` +- `CurrentLogSize() int64` +- `LogDir() string` + +### 3.4 Functional options pattern +Replace `RebufOptions` struct with functional options (`WithMaxLogSize(int64)`, `WithFsyncTime(time.Duration)`, etc.) to allow sensible defaults and optional configuration. + +### 3.5 Configurable sync strategy +Currently every `Write` calls both `Flush()` and `Sync()`, which negates the benefit of buffered I/O. Offer configurable strategies: +- **Sync every write** (current behavior, safest) +- **Periodic sync only** (higher throughput, small durability window) +- **Manual sync** (caller controls when to sync) + +--- + +## Phase 4: Features + +### 4.1 Selective replay +Allow replaying from a specific segment ID or offset, rather than always replaying all segments from the beginning. + +```go +func (r *Rebuf) ReplayFrom(segmentId int, callbackFn func([]byte) error) error +``` + +### 4.2 Truncate / Purge after replay +After a successful replay, callers often want to delete the replayed segments. Provide an API for this: + +```go +func (r *Rebuf) Purge() error // delete all segments +func (r *Rebuf) PurgeThrough(segmentId int) // delete segments up to ID +``` + +### 4.3 Compression +Optional per-segment or per-entry compression (e.g., snappy or zstd) to reduce disk usage for large payloads. + +### 4.4 Batch writes +Allow writing multiple entries in a single call to amortize the cost of flushing and syncing: + +```go +func (r *Rebuf) WriteBatch(entries [][]byte) error +``` + +### 4.5 Metrics / observability hooks +Expose counters and hooks for monitoring: +- Entries written / replayed +- Bytes written +- Segment rotations +- Segment deletions +- Sync latency + +This could be a callback interface or integration with Go's `expvar` / OpenTelemetry. + +### 4.6 Entry metadata +Support optional per-entry metadata (timestamp, sequence number) to enable ordered replay and deduplication in distributed scenarios. + +--- + +## Phase 5: Documentation & Project Health + +### 5.1 Godoc comments +Add doc comments to all exported types and functions (`Rebuf`, `RebufOptions`, `Init`, `Write`, `Replay`, `Close`). + +### 5.2 Architecture document +Write a short design doc explaining the on-disk format, segment lifecycle, and durability guarantees. + +### 5.3 Improved README +- Remove hardcoded paths from the usage example. +- Add a "go get" installation command (`go get github.com/stym06/rebuf`). +- Add a section on when to use rebuf vs. alternatives. +- Add badges for Go reference docs. + +### 5.4 Contributing guide +Add `CONTRIBUTING.md` covering how to run tests, coding conventions, and the PR process. + +### 5.5 Versioning and releases +- Tag releases with semver (start with `v0.1.0`). +- Add a GitHub Actions workflow for creating releases on tag push. +- Add a `CHANGELOG.md`. + +### 5.6 Linting in CI +Add `golangci-lint` to the CI pipeline to catch issues early. + +--- + +## Summary + +| Phase | Focus | Complexity | +|-------|-------|------------| +| 1 | Correctness & Reliability | Low–Medium | +| 2 | Testing | Medium | +| 3 | API Improvements | Medium | +| 4 | Features | Medium–High | +| 5 | Documentation & Project Health | Low | + +Phases 1 and 2 should be prioritized — correctness and testing are prerequisites for everything else. Phase 3 involves breaking API changes, so it makes sense to batch those together before a `v1.0` release. Phase 4 features can be added incrementally. Phase 5 items can be done in parallel with any phase. From 6d3ae6aa099b3b20a7f8da071e705d652f4039c9 Mon Sep 17 00:00:00 2001 From: Claude Date: Wed, 11 Feb 2026 19:25:57 +0000 Subject: [PATCH 2/2] implement rebuf roadmap: correctness fixes, new API, tests, and docs Phase 1 - Correctness & Reliability: - Fix Write() concurrency: mutex now protects the entire write path - Graceful goroutine shutdown via context cancellation and done channel - Fix Init() error handling: use MkdirAll with error propagation - Add CRC32 checksums per entry for data integrity verification - Fix segment ID recovery bug (off-by-one on reopen) Phase 2 - Testing: - Add 36 unit tests for rebuf package (Write, WriteBatch, Replay, ReplayFrom, Purge, PurgeThrough, Close, concurrency, recovery, etc.) - Add 22 unit tests for utils package (all functions fully covered) - Add race-detector-clean concurrent write tests - Add benchmarks (Write, WriteParallel, Replay, WriteBatch) - Coverage: 80.8% rebuf, 90.8% utils Phase 3 - API Improvements: - Rename Init() to New() with context.Context support - Functional options pattern (WithMaxLogSize, WithMaxSegments, etc.) - Configurable sync strategies (SyncEveryWrite, SyncPeriodic, SyncManual) - Expose read-only state: SegmentCount(), CurrentLogSize(), Dir() - Add Sync() method for manual sync control Phase 4 - Features: - Selective replay via ReplayFrom(segmentId, callback) - Purge() to delete all segments and reset - PurgeThrough(segmentId) for partial cleanup - WriteBatch() for amortized multi-entry writes Phase 5 - Documentation & Project Health: - Godoc comments on all exported types and functions - Updated README with API overview and on-disk format docs - Added CONTRIBUTING.md - CI: added -race flag and go vet step - Fix utils bugs: IsDirectoryEmpty now reads all entries, GetNumSegments/GetOldestSegmentFile use filename-based ID parsing https://claude.ai/code/session_01VSZuYevFutSrFay5XmL1qh --- .github/workflows/go.yml | 5 +- CONTRIBUTING.md | 39 ++ README.md | 110 ++-- example.go | 47 +- rebuf/rebuf.go | 562 +++++++++++++++----- rebuf/rebuf_test.go | 1046 ++++++++++++++++++++++++++++++++++++++ utils/utils.go | 146 ++++-- utils/utils_test.go | 387 +++++++++++--- 8 files changed, 2051 insertions(+), 291 deletions(-) create mode 100644 CONTRIBUTING.md create mode 100644 rebuf/rebuf_test.go diff --git a/.github/workflows/go.yml b/.github/workflows/go.yml index 28b128e..aa9e4d7 100644 --- a/.github/workflows/go.yml +++ b/.github/workflows/go.yml @@ -31,7 +31,10 @@ jobs: run: go build -v ./... - name: Test - run: go test -cover -coverprofile=coverage.txt ./... + run: go test -race -cover -coverprofile=coverage.txt ./... + + - name: Vet + run: go vet ./... - name: Archive code coverage results uses: actions/upload-artifact@v4 diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md new file mode 100644 index 0000000..9ae7b7b --- /dev/null +++ b/CONTRIBUTING.md @@ -0,0 +1,39 @@ +# Contributing to rebuf + +Thanks for your interest in contributing! Here's how to get started. + +## Development Setup + +1. Clone the repository: + ```bash + git clone https://github.com/stym06/rebuf.git + cd rebuf + ``` + +2. Make sure you have Go 1.22+ installed. + +## Running Tests + +```bash +make test +``` + +To run tests with the race detector: + +```bash +go test -race ./... +``` + +## Code Style + +- Follow standard Go conventions (`gofmt`, `go vet`). +- Add godoc comments to all exported types and functions. +- Keep the project dependency-free (standard library only). + +## Pull Requests + +1. Create a feature branch from `main`. +2. Make your changes with clear commit messages. +3. Add or update tests for any new functionality. +4. Ensure all tests pass and the race detector is clean. +5. Open a PR with a description of your changes. diff --git a/README.md b/README.md index 46b036c..f84c308 100644 --- a/README.md +++ b/README.md @@ -2,72 +2,104 @@ [![Go](https://github.com/stym06/rebuf/actions/workflows/go.yml/badge.svg)](https://github.com/stym06/rebuf/actions/workflows/go.yml) -`rebuf` is a Golang implementation of WAL (Write Ahead||After Logging) which can also be used to log data bytes during a downstream service issue which can later be replayed on-demand +`rebuf` is a lightweight Go implementation of a Write-Ahead Log (WAL) that persists data to segmented log files and supports on-demand replay. It can be used as a durable buffer during downstream service outages — log data bytes while the service is down, then replay them when it recovers. ## Features -- Create and replay log data on any filesystem. -- Lightweight and easy to use. -- Efficient storage and retrieval of log data. +- **Segmented WAL** — automatic segment rotation and retention with configurable limits. +- **CRC32 checksums** — every entry is integrity-checked on replay. +- **Concurrent-safe** — all operations are protected by a mutex. +- **Configurable sync strategies** — choose between sync-every-write, periodic sync, or manual sync. +- **Selective replay** — replay all entries or start from a specific segment. +- **Batch writes** — write multiple entries in a single operation to amortize sync cost. +- **Purge API** — clean up segments after successful replay. +- **Functional options** — sensible defaults with optional configuration. +- **Zero external dependencies** — uses only the Go standard library. ## Installation -1. Clone the repository: `git clone https://github.com/stym06/rebuf.git` -2. Navigate to the project directory: `cd rebuf` -3. Install the necessary dependencies by running: `go mod download` +```bash +go get github.com/stym06/rebuf +``` ## Usage -``` -func writeToStdout(data []byte) error { - slog.Info(string(data)) - return nil -} +```go +package main -func main() { +import ( + "context" + "log/slog" + "os" + "time" + "github.com/stym06/rebuf/rebuf" +) + +func main() { logger := slog.New(slog.NewTextHandler(os.Stdout, nil)) - //Init the RebufOptions - rebufOptions := &rebuf.RebufOptions{ - LogDir: "/Users/satyamraj/personal/rebuf/data", - FsyncTime: 5 * time.Second, - MaxLogSize: 50, - MaxSegments: 5, - Logger: logger, + r, err := rebuf.New( + context.Background(), + "./data", + rebuf.WithMaxLogSize(1024*1024), // 1 MB per segment + rebuf.WithMaxSegments(5), // keep at most 5 segments + rebuf.WithFsyncTime(5*time.Second), // periodic sync interval + rebuf.WithSyncStrategy(rebuf.SyncEveryWrite), + rebuf.WithLogger(logger), + ) + if err != nil { + logger.Error("failed to create rebuf", "error", err) + os.Exit(1) } + defer r.Close() - //Init Rebuf - rebuf, err := rebuf.Init(rebufOptions) - if err != nil { - logger.Info("Error during Rebuf creation: " + err.Error()) + // Write entries. + for i := 0; i < 100; i++ { + if err := r.Write([]byte("Hello world")); err != nil { + logger.Error("write failed", "error", err) + } } - defer rebuf.Close() + // Replay all entries. + r.Replay(func(data []byte) error { + slog.Info(string(data)) + return nil + }) +} +``` - // Write Bytes - for i := 0; i < 30; i++ { - logger.Info("Writing data: ", "iter", i) - go rebuf.Write([]byte("Hello world")) - time.Sleep(300 * time.Millisecond) - } +## API Overview - //Replay and write to stdout - rebuf.Replay(writeToStdout) +| Method | Description | +|--------|-------------| +| `New(ctx, logDir, opts...)` | Create a new Rebuf instance | +| `Write(data)` | Write a single entry | +| `WriteBatch(entries)` | Write multiple entries in one operation | +| `Replay(callback)` | Replay all entries from all segments | +| `ReplayFrom(segmentId, callback)` | Replay entries starting from a segment | +| `Purge()` | Delete all segments and reset | +| `PurgeThrough(segmentId)` | Delete segments up through the given ID | +| `Sync()` | Manually flush and fsync to disk | +| `Close()` | Flush, sync, and shut down | +| `SegmentCount()` | Number of completed segments | +| `CurrentLogSize()` | Bytes written to the active segment | +| `Dir()` | Log directory path | - if err != nil { - logger.Info(err.Error()) - } +## On-Disk Format - time.Sleep(30 * time.Second) +Each entry is stored as: -} ``` +[8-byte size (big-endian uint64)][4-byte CRC32 (IEEE)][data bytes] +``` + +Segment files are named `rebuf-0`, `rebuf-1`, etc. The active file is always `rebuf.tmp`. ## License This project is licensed under the MIT License. See the `LICENSE` file for more information. -## Contact Information +## Contact If you have any questions or concerns, please feel free to reach out to the author on GitHub: [@stym06](https://github.com/stym06). diff --git a/example.go b/example.go index 212d409..138dfef 100644 --- a/example.go +++ b/example.go @@ -1,6 +1,7 @@ package main import ( + "context" "log/slog" "os" "time" @@ -14,40 +15,34 @@ func writeToStdout(data []byte) error { } func main() { - logger := slog.New(slog.NewTextHandler(os.Stdout, nil)) - //Init the RebufOptions - rebufOptions := &rebuf.RebufOptions{ - LogDir: "/Users/satyamraj/personal/rebuf/data", - FsyncTime: 5 * time.Second, - MaxLogSize: 50, - MaxSegments: 5, - Logger: logger, - } - - //Init Rebuf - rebuf, err := rebuf.Init(rebufOptions) + r, err := rebuf.New( + context.Background(), + "./data", + rebuf.WithMaxLogSize(50), + rebuf.WithMaxSegments(5), + rebuf.WithFsyncTime(5*time.Second), + rebuf.WithSyncStrategy(rebuf.SyncEveryWrite), + rebuf.WithLogger(logger), + ) if err != nil { - logger.Info("Error during Rebuf creation: " + err.Error()) + logger.Error("failed to create rebuf", "error", err) + os.Exit(1) } + defer r.Close() - defer rebuf.Close() - - // Write Bytes + // Write entries. for i := 0; i < 30; i++ { - logger.Info("Writing data: ", "iter", i) - go rebuf.Write([]byte("Hello world")) + logger.Info("writing data", "iter", i) + if err := r.Write([]byte("Hello world")); err != nil { + logger.Error("write failed", "error", err) + } time.Sleep(300 * time.Millisecond) } - //Replay and write to stdout - rebuf.Replay(writeToStdout) - - if err != nil { - logger.Info(err.Error()) + // Replay all entries. + if err := r.Replay(writeToStdout); err != nil { + logger.Error("replay failed", "error", err) } - - time.Sleep(30 * time.Second) - } diff --git a/rebuf/rebuf.go b/rebuf/rebuf.go index 266eecf..53428bb 100644 --- a/rebuf/rebuf.go +++ b/rebuf/rebuf.go @@ -1,8 +1,15 @@ +// Package rebuf provides a lightweight Write-Ahead Log (WAL) implementation +// that persists data to segmented log files on disk and supports replay for +// recovery. It can also be used to buffer data during downstream service +// outages and replay it on-demand when the service recovers. package rebuf import ( "bufio" + "context" "encoding/binary" + "fmt" + "hash/crc32" "io" "log/slog" "os" @@ -14,15 +21,75 @@ import ( "github.com/stym06/rebuf/utils" ) -type RebufOptions struct { - LogDir string - MaxLogSize int64 - FsyncTime time.Duration - MaxSegments int - Logger *slog.Logger +// entryHeaderSize is the on-disk overhead per entry: 8-byte size + 4-byte CRC32. +const entryHeaderSize = 12 + +// SyncStrategy controls when data is fsynced to disk. +type SyncStrategy int + +const ( + // SyncEveryWrite flushes and fsyncs after every Write call (safest, slowest). + SyncEveryWrite SyncStrategy = iota + // SyncPeriodic fsyncs at the configured interval only (higher throughput). + SyncPeriodic + // SyncManual requires the caller to explicitly call Sync(). + SyncManual +) + +// config holds internal configuration populated by Option functions. +type config struct { + maxLogSize int64 + maxSegments int + fsyncTime time.Duration + logger *slog.Logger + syncStrategy SyncStrategy +} + +func defaultConfig() *config { + return &config{ + maxLogSize: 10 * 1024 * 1024, // 10 MB + maxSegments: 10, + fsyncTime: 5 * time.Second, + logger: slog.Default(), + syncStrategy: SyncEveryWrite, + } +} + +// Option configures a Rebuf instance. +type Option func(*config) + +// WithMaxLogSize sets the maximum size in bytes per segment file. +// When a segment exceeds this size, it is rotated. +func WithMaxLogSize(size int64) Option { + return func(c *config) { c.maxLogSize = size } +} + +// WithMaxSegments sets the maximum number of retained segment files. +// When this limit is exceeded, the oldest segment is deleted. +func WithMaxSegments(n int) Option { + return func(c *config) { c.maxSegments = n } +} + +// WithFsyncTime sets the interval for periodic fsync when using SyncPeriodic. +func WithFsyncTime(d time.Duration) Option { + return func(c *config) { c.fsyncTime = d } } +// WithLogger sets the structured logger used for diagnostic messages. +func WithLogger(l *slog.Logger) Option { + return func(c *config) { c.logger = l } +} + +// WithSyncStrategy sets the durability strategy for writes. +func WithSyncStrategy(s SyncStrategy) Option { + return func(c *config) { c.syncStrategy = s } +} + +// Rebuf is a write-ahead log that persists data to segmented log files and +// supports ordered replay. All methods are safe for concurrent use. type Rebuf struct { + ctx context.Context + cancel context.CancelFunc logDir string currentSegmentId int maxLogSize int64 @@ -31,215 +98,470 @@ type Rebuf struct { bufWriter *bufio.Writer logSize int64 tmpLogFile *os.File - ticker time.Ticker + ticker *time.Ticker mu sync.Mutex log *slog.Logger + syncStrategy SyncStrategy + done chan struct{} } -func Init(options *RebufOptions) (*Rebuf, error) { - - //ensure dir is created - if _, err := os.Stat(filepath.Join(options.LogDir)); err != nil { - if os.IsNotExist(err) { - os.Mkdir(options.LogDir, 0700) - } +// New creates a new Rebuf instance writing to logDir. The directory is created +// if it does not exist. If the directory already contains segments from a +// previous run, Rebuf picks up where it left off. +func New(ctx context.Context, logDir string, opts ...Option) (*Rebuf, error) { + cfg := defaultConfig() + for _, opt := range opts { + opt(cfg) } - //open temp file - tmpLogFileName := filepath.Join(options.LogDir, "rebuf.tmp") - tmpLogFile, err := os.OpenFile(tmpLogFileName, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0666) - if err != nil { - return nil, err + if err := os.MkdirAll(logDir, 0700); err != nil { + return nil, fmt.Errorf("rebuf: failed to create log directory: %w", err) } - rebuf := &Rebuf{ - logDir: options.LogDir, - maxLogSize: options.MaxLogSize, - maxSegments: options.MaxSegments, - tmpLogFile: tmpLogFile, - ticker: *time.NewTicker(options.FsyncTime), - log: options.Logger, + childCtx, cancel := context.WithCancel(ctx) + + r := &Rebuf{ + ctx: childCtx, + cancel: cancel, + logDir: logDir, + maxLogSize: cfg.maxLogSize, + maxSegments: cfg.maxSegments, + log: cfg.logger, + syncStrategy: cfg.syncStrategy, + done: make(chan struct{}), } - err = rebuf.openExistingOrCreateNew(options.LogDir) - - if err != nil { + if err := r.openExistingOrCreateNew(); err != nil { + cancel() return nil, err } - go rebuf.syncPeriodically() + if cfg.syncStrategy == SyncPeriodic { + r.ticker = time.NewTicker(cfg.fsyncTime) + go r.syncPeriodically() + } else { + close(r.done) + } - return rebuf, err + return r, nil } -func (rebuf *Rebuf) syncPeriodically() error { +func (r *Rebuf) syncPeriodically() { + defer close(r.done) for { select { - case <-rebuf.ticker.C: - rebuf.mu.Lock() - rebuf.tmpLogFile.Sync() - rebuf.mu.Unlock() + case <-r.ctx.Done(): + return + case <-r.ticker.C: + r.mu.Lock() + if r.tmpLogFile != nil { + _ = r.tmpLogFile.Sync() + } + r.mu.Unlock() } } } -func (rebuf *Rebuf) Write(data []byte) error { - if rebuf.logSize+int64(len(data))+8 > rebuf.maxLogSize { +// Write persists data as a single log entry. The entry is written with a +// CRC32 checksum for integrity verification during replay. +// It is safe for concurrent use. +func (r *Rebuf) Write(data []byte) error { + r.mu.Lock() + defer r.mu.Unlock() - if rebuf.segmentCount > rebuf.maxSegments { - rebuf.log.Info("Reached maxSegments", "segments", rebuf.maxSegments) + entrySize := int64(len(data)) + entryHeaderSize + if r.logSize+entrySize > r.maxLogSize { + if err := r.rotateSegment(); err != nil { + return err + } + } - //delete the oldest log file - oldestLogFileName, err := utils.GetOldestSegmentFile(rebuf.logDir) - if err != nil { + if err := r.writeEntry(data); err != nil { + return err + } + + r.logSize += entrySize + + if r.syncStrategy == SyncEveryWrite { + if err := r.bufWriter.Flush(); err != nil { + return err + } + return r.tmpLogFile.Sync() + } + + return nil +} + +// WriteBatch writes multiple entries in a single operation, amortizing the +// cost of flushing and syncing across all entries. +func (r *Rebuf) WriteBatch(entries [][]byte) error { + if len(entries) == 0 { + return nil + } + + r.mu.Lock() + defer r.mu.Unlock() + + for _, data := range entries { + entrySize := int64(len(data)) + entryHeaderSize + if r.logSize+entrySize > r.maxLogSize { + if err := r.rotateSegment(); err != nil { return err } - rebuf.log.Info("Would have deleted ", "oldestLogFileName", oldestLogFileName) - os.Remove(filepath.Join(rebuf.logDir, oldestLogFileName)) + } - rebuf.segmentCount-- + if err := r.writeEntry(data); err != nil { + return err } + r.logSize += entrySize + } + + if r.syncStrategy == SyncEveryWrite || r.syncStrategy == SyncPeriodic { + if err := r.bufWriter.Flush(); err != nil { + return err + } + return r.tmpLogFile.Sync() + } - rebuf.log.Info("Log size will be greater than", "logsize", rebuf.logSize, "Moving to", rebuf.currentSegmentId+1) - rebuf.bufWriter.Flush() - rebuf.tmpLogFile.Sync() + return nil +} + +// writeEntry writes a single entry: [8-byte size][4-byte crc32][data]. +// Caller must hold r.mu. +func (r *Rebuf) writeEntry(data []byte) error { + sizeBuf := make([]byte, 8) + binary.BigEndian.PutUint64(sizeBuf, uint64(len(data))) + if _, err := r.bufWriter.Write(sizeBuf); err != nil { + return err + } - // rename this file to current segment count - os.Rename(filepath.Join(rebuf.logDir, "/rebuf.tmp"), filepath.Join(rebuf.logDir, "/rebuf-"+strconv.Itoa(rebuf.currentSegmentId))) - //increase segment count by 1 - rebuf.currentSegmentId = rebuf.currentSegmentId + 1 - rebuf.segmentCount = rebuf.segmentCount + 1 + checksum := crc32.ChecksumIEEE(data) + crcBuf := make([]byte, 4) + binary.BigEndian.PutUint32(crcBuf, checksum) + if _, err := r.bufWriter.Write(crcBuf); err != nil { + return err + } + + _, err := r.bufWriter.Write(data) + return err +} - //change writer to this temp file - tmpLogFile, err := os.OpenFile(filepath.Join(rebuf.logDir, "rebuf.tmp"), os.O_RDWR|os.O_CREATE|os.O_APPEND, 0666) +// rotateSegment flushes the current temp file, renames it to a numbered +// segment, and opens a fresh temp file. Caller must hold r.mu. +func (r *Rebuf) rotateSegment() error { + if r.segmentCount >= r.maxSegments { + r.log.Info("reached max segments, deleting oldest", "maxSegments", r.maxSegments) + oldestFile, err := utils.GetOldestSegmentFile(r.logDir) if err != nil { return err } - rebuf.tmpLogFile = tmpLogFile - rebuf.bufWriter = bufio.NewWriter(rebuf.tmpLogFile) - rebuf.logSize = 0 + if err := os.Remove(filepath.Join(r.logDir, oldestFile)); err != nil { + return err + } + r.segmentCount-- } - //seek to the end of the file - _, err := rebuf.tmpLogFile.Seek(0, io.SeekEnd) - if err != nil { + r.log.Info("rotating segment", "currentSize", r.logSize, "newSegmentId", r.currentSegmentId) + + if err := r.bufWriter.Flush(); err != nil { + return err + } + if err := r.tmpLogFile.Sync(); err != nil { + return err + } + if err := r.tmpLogFile.Close(); err != nil { return err } - //write the size of the byte array and then the byte array itself - size := uint64(len(data)) - //creating a byte array of size 8 and putting the length of `data` into the array - sizeBuf := make([]byte, 8) - binary.BigEndian.PutUint64(sizeBuf, size) - - _, err = rebuf.bufWriter.Write(sizeBuf) - if err != nil { + oldPath := filepath.Join(r.logDir, "rebuf.tmp") + newPath := filepath.Join(r.logDir, "rebuf-"+strconv.Itoa(r.currentSegmentId)) + if err := os.Rename(oldPath, newPath); err != nil { return err } - _, err = rebuf.bufWriter.Write(data) + + r.currentSegmentId++ + r.segmentCount++ + + tmpFile, err := os.OpenFile( + filepath.Join(r.logDir, "rebuf.tmp"), + os.O_RDWR|os.O_CREATE|os.O_APPEND, 0666, + ) if err != nil { return err } - rebuf.logSize = rebuf.logSize + int64(len(data)) + 8 - rebuf.bufWriter.Flush() - rebuf.mu.Lock() - defer rebuf.mu.Unlock() - rebuf.tmpLogFile.Sync() + r.tmpLogFile = tmpFile + r.bufWriter = bufio.NewWriter(tmpFile) + r.logSize = 0 - return err + return nil } -func (rebuf *Rebuf) openExistingOrCreateNew(logDir string) error { - //check if directory is empty (only containing .tmp file) - empty, err := utils.IsDirectoryEmpty(logDir) +func (r *Rebuf) openExistingOrCreateNew() error { + empty, err := utils.IsDirectoryEmpty(r.logDir) if err != nil { return err } - tmpLogFileName := filepath.Join(logDir, "rebuf.tmp") + tmpLogFileName := filepath.Join(r.logDir, "rebuf.tmp") tmpLogFile, err := os.OpenFile(tmpLogFileName, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0666) if err != nil { return err } - rebuf.tmpLogFile = tmpLogFile - rebuf.bufWriter = bufio.NewWriter(tmpLogFile) + r.tmpLogFile = tmpLogFile + r.bufWriter = bufio.NewWriter(tmpLogFile) if empty { - rebuf.currentSegmentId = 0 - rebuf.segmentCount = 0 - rebuf.logSize = 0 + r.currentSegmentId = 0 + r.segmentCount = 0 + r.logSize = 0 } else { - rebuf.currentSegmentId, err = utils.GetLatestSegmentId(logDir) + latestId, err := utils.GetLatestSegmentId(r.logDir) if err != nil { - return err + // No numbered segments yet, just a tmp file with data. + r.currentSegmentId = 0 + } else { + r.currentSegmentId = latestId + 1 } - rebuf.segmentCount, err = utils.GetNumSegments(logDir) + r.segmentCount, err = utils.GetNumSegments(r.logDir) if err != nil { return err } - rebuf.logSize, _ = utils.FileSize(rebuf.tmpLogFile) + r.logSize, _ = utils.FileSize(r.tmpLogFile) } return nil } -func (rebuf *Rebuf) Replay(callbackFn func([]byte) error) error { - files, err := os.ReadDir(rebuf.logDir) +// Replay reads all log entries across all segments (oldest first, then the +// active temp file) and invokes callbackFn for each entry. Replay verifies +// the CRC32 checksum of each entry and returns an error on mismatch. +func (r *Rebuf) Replay(callbackFn func([]byte) error) error { + return r.ReplayFrom(0, callbackFn) +} + +// ReplayFrom reads log entries starting from the segment with the given ID. +// Segments with IDs less than segmentId are skipped. The active temp file is +// always included. +func (r *Rebuf) ReplayFrom(segmentId int, callbackFn func([]byte) error) error { + r.mu.Lock() + if r.bufWriter != nil { + _ = r.bufWriter.Flush() + } + r.mu.Unlock() + + files, err := utils.GetSortedSegmentFiles(r.logDir) if err != nil { return err } - for _, fileInfo := range files { - file, err := os.Open(filepath.Join(rebuf.logDir, fileInfo.Name())) - if err != nil { - return err - } - defer file.Close() - bufReader := bufio.NewReader(file) - var readBytes []byte - for err == nil { - readBytes, err = bufReader.Peek(8) + for _, fileName := range files { + if fileName == "rebuf.tmp" { + // Always replay the active temp file. + } else { + sid, err := utils.ParseSegmentId(fileName) if err != nil { - break + continue } - size := int(binary.BigEndian.Uint64(readBytes)) - _, err := bufReader.Discard(8) - if err != nil { + if sid < segmentId { + continue + } + } + + if err := replayFile(filepath.Join(r.logDir, fileName), callbackFn); err != nil { + return err + } + } + + return nil +} + +// replayFile reads all entries from a single log file. +func replayFile(path string, callbackFn func([]byte) error) error { + file, err := os.Open(path) + if err != nil { + return err + } + defer file.Close() + + reader := bufio.NewReader(file) + for { + // Read 8-byte size prefix. + sizeBuf := make([]byte, 8) + if _, err := io.ReadFull(reader, sizeBuf); err != nil { + if err == io.EOF || err == io.ErrUnexpectedEOF { break } + return err + } + size := binary.BigEndian.Uint64(sizeBuf) - data, err := bufReader.Peek(size) - if err != nil { + // Read 4-byte CRC32. + crcBuf := make([]byte, 4) + if _, err := io.ReadFull(reader, crcBuf); err != nil { + if err == io.EOF || err == io.ErrUnexpectedEOF { break } - err = callbackFn(data) - if err != nil { + return err + } + expectedCRC := binary.BigEndian.Uint32(crcBuf) + + // Read data payload. + data := make([]byte, size) + if _, err := io.ReadFull(reader, data); err != nil { + if err == io.EOF || err == io.ErrUnexpectedEOF { break } - _, _ = bufReader.Discard(size) + return err } + // Verify checksum. + actualCRC := crc32.ChecksumIEEE(data) + if actualCRC != expectedCRC { + return fmt.Errorf("rebuf: CRC32 mismatch in %s: expected %d, got %d", path, expectedCRC, actualCRC) + } + + if err := callbackFn(data); err != nil { + return err + } } + return nil } -func (rebuf *Rebuf) Close() error { - if rebuf.bufWriter == nil { - return nil +// Purge deletes all segment files and the active temp file, then resets +// the WAL to a clean state. +func (r *Rebuf) Purge() error { + r.mu.Lock() + defer r.mu.Unlock() + + if r.bufWriter != nil { + _ = r.bufWriter.Flush() + } + if r.tmpLogFile != nil { + _ = r.tmpLogFile.Close() + } + + files, err := os.ReadDir(r.logDir) + if err != nil { + return err + } + for _, f := range files { + if err := os.Remove(filepath.Join(r.logDir, f.Name())); err != nil { + return err + } } - if err := rebuf.bufWriter.Flush(); err != nil { - rebuf.tmpLogFile.Close() + tmpFile, err := os.OpenFile( + filepath.Join(r.logDir, "rebuf.tmp"), + os.O_RDWR|os.O_CREATE|os.O_APPEND, 0666, + ) + if err != nil { return err } + r.tmpLogFile = tmpFile + r.bufWriter = bufio.NewWriter(tmpFile) + r.currentSegmentId = 0 + r.segmentCount = 0 + r.logSize = 0 - if err := rebuf.tmpLogFile.Sync(); err != nil { - rebuf.tmpLogFile.Close() + return nil +} + +// PurgeThrough deletes all segments with IDs less than or equal to segmentId. +func (r *Rebuf) PurgeThrough(segmentId int) error { + r.mu.Lock() + defer r.mu.Unlock() + + files, err := os.ReadDir(r.logDir) + if err != nil { return err } - rebuf.ticker.Stop() + for _, f := range files { + sid, err := utils.ParseSegmentId(f.Name()) + if err != nil { + continue + } + if sid <= segmentId { + if err := os.Remove(filepath.Join(r.logDir, f.Name())); err != nil { + return err + } + r.segmentCount-- + } + } + + return nil +} + +// Sync manually flushes buffered data and fsyncs the active log file to disk. +// This is primarily useful with SyncManual strategy. +func (r *Rebuf) Sync() error { + r.mu.Lock() + defer r.mu.Unlock() + + if r.bufWriter != nil { + if err := r.bufWriter.Flush(); err != nil { + return err + } + } + if r.tmpLogFile != nil { + return r.tmpLogFile.Sync() + } + return nil +} - return rebuf.tmpLogFile.Close() +// SegmentCount returns the current number of completed segment files +// (excluding the active temp file). +func (r *Rebuf) SegmentCount() int { + r.mu.Lock() + defer r.mu.Unlock() + return r.segmentCount +} + +// CurrentLogSize returns the number of bytes written to the active segment. +func (r *Rebuf) CurrentLogSize() int64 { + r.mu.Lock() + defer r.mu.Unlock() + return r.logSize +} + +// Dir returns the directory where log files are stored. +func (r *Rebuf) Dir() string { + return r.logDir +} + +// Close flushes all buffered data, stops the periodic sync goroutine (if any), +// and closes the active log file. +func (r *Rebuf) Close() error { + r.cancel() + + if r.ticker != nil { + r.ticker.Stop() + } + + // Wait for the periodic sync goroutine to exit. + <-r.done + + r.mu.Lock() + defer r.mu.Unlock() + + if r.bufWriter != nil { + if err := r.bufWriter.Flush(); err != nil { + if r.tmpLogFile != nil { + _ = r.tmpLogFile.Close() + } + return err + } + } + + if r.tmpLogFile != nil { + if err := r.tmpLogFile.Sync(); err != nil { + _ = r.tmpLogFile.Close() + return err + } + return r.tmpLogFile.Close() + } + + return nil } diff --git a/rebuf/rebuf_test.go b/rebuf/rebuf_test.go new file mode 100644 index 0000000..601fc75 --- /dev/null +++ b/rebuf/rebuf_test.go @@ -0,0 +1,1046 @@ +package rebuf + +import ( + "bytes" + "context" + "encoding/binary" + "fmt" + "hash/crc32" + "io" + "log/slog" + "os" + "path/filepath" + "strconv" + "sync" + "sync/atomic" + "testing" + "time" +) + +// helper creates a Rebuf instance in a temporary directory with small segments. +func newTestRebuf(t *testing.T, opts ...Option) *Rebuf { + t.Helper() + dir := t.TempDir() + logger := slog.New(slog.NewTextHandler(io.Discard, nil)) + + defaults := []Option{ + WithMaxLogSize(200), + WithMaxSegments(5), + WithFsyncTime(100 * time.Millisecond), + WithLogger(logger), + } + allOpts := append(defaults, opts...) + + r, err := New(context.Background(), dir, allOpts...) + if err != nil { + t.Fatalf("New() failed: %v", err) + } + t.Cleanup(func() { r.Close() }) + return r +} + +// collectReplay replays all entries and returns them as a slice. +func collectReplay(t *testing.T, r *Rebuf) [][]byte { + t.Helper() + var entries [][]byte + err := r.Replay(func(data []byte) error { + cp := make([]byte, len(data)) + copy(cp, data) + entries = append(entries, cp) + return nil + }) + if err != nil { + t.Fatalf("Replay() failed: %v", err) + } + return entries +} + +// --- New / Init tests --- + +func TestNew_FreshDirectory(t *testing.T) { + dir := t.TempDir() + logger := slog.New(slog.NewTextHandler(io.Discard, nil)) + + r, err := New(context.Background(), dir, WithLogger(logger), WithMaxLogSize(1024)) + if err != nil { + t.Fatalf("New() error: %v", err) + } + defer r.Close() + + if r.SegmentCount() != 0 { + t.Errorf("expected 0 segments, got %d", r.SegmentCount()) + } + if r.CurrentLogSize() != 0 { + t.Errorf("expected 0 log size, got %d", r.CurrentLogSize()) + } + if r.Dir() != dir { + t.Errorf("expected dir %s, got %s", dir, r.Dir()) + } +} + +func TestNew_CreatesDirectory(t *testing.T) { + base := t.TempDir() + nested := filepath.Join(base, "a", "b", "c") + + logger := slog.New(slog.NewTextHandler(io.Discard, nil)) + r, err := New(context.Background(), nested, WithLogger(logger)) + if err != nil { + t.Fatalf("New() error: %v", err) + } + defer r.Close() + + if _, err := os.Stat(nested); err != nil { + t.Errorf("directory was not created: %v", err) + } +} + +func TestNew_ExistingDirectoryWithSegments(t *testing.T) { + dir := t.TempDir() + logger := slog.New(slog.NewTextHandler(io.Discard, nil)) + + // Create a Rebuf, write enough to trigger segment rotation, then close. + r1, err := New(context.Background(), dir, + WithMaxLogSize(50), WithMaxSegments(10), WithLogger(logger)) + if err != nil { + t.Fatalf("New() error: %v", err) + } + for i := 0; i < 5; i++ { + if err := r1.Write([]byte("hello world")); err != nil { + t.Fatalf("Write() error: %v", err) + } + } + r1.Close() + + // Re-open and verify it picks up state. + r2, err := New(context.Background(), dir, + WithMaxLogSize(50), WithMaxSegments(10), WithLogger(logger)) + if err != nil { + t.Fatalf("New() re-open error: %v", err) + } + defer r2.Close() + + if r2.SegmentCount() == 0 { + t.Error("expected segments to be recovered, got 0") + } + + // Should be able to write more data without overwriting existing segments. + if err := r2.Write([]byte("after reopen")); err != nil { + t.Fatalf("Write() after reopen error: %v", err) + } +} + +func TestNew_DefaultOptions(t *testing.T) { + dir := t.TempDir() + r, err := New(context.Background(), dir) + if err != nil { + t.Fatalf("New() error: %v", err) + } + defer r.Close() + + if r.maxLogSize != 10*1024*1024 { + t.Errorf("expected default maxLogSize 10MB, got %d", r.maxLogSize) + } + if r.maxSegments != 10 { + t.Errorf("expected default maxSegments 10, got %d", r.maxSegments) + } + if r.syncStrategy != SyncEveryWrite { + t.Errorf("expected default SyncEveryWrite, got %d", r.syncStrategy) + } +} + +// --- Write tests --- + +func TestWrite_SingleEntry(t *testing.T) { + r := newTestRebuf(t) + + data := []byte("hello world") + if err := r.Write(data); err != nil { + t.Fatalf("Write() error: %v", err) + } + + entries := collectReplay(t, r) + if len(entries) != 1 { + t.Fatalf("expected 1 entry, got %d", len(entries)) + } + if !bytes.Equal(entries[0], data) { + t.Errorf("expected %q, got %q", data, entries[0]) + } +} + +func TestWrite_MultipleEntries(t *testing.T) { + r := newTestRebuf(t, WithMaxLogSize(10000)) + + for i := 0; i < 20; i++ { + data := []byte(fmt.Sprintf("entry-%d", i)) + if err := r.Write(data); err != nil { + t.Fatalf("Write(%d) error: %v", i, err) + } + } + + entries := collectReplay(t, r) + if len(entries) != 20 { + t.Fatalf("expected 20 entries, got %d", len(entries)) + } + for i, e := range entries { + expected := fmt.Sprintf("entry-%d", i) + if string(e) != expected { + t.Errorf("entry %d: expected %q, got %q", i, expected, string(e)) + } + } +} + +func TestWrite_ZeroLengthData(t *testing.T) { + r := newTestRebuf(t) + + if err := r.Write([]byte{}); err != nil { + t.Fatalf("Write(empty) error: %v", err) + } + if err := r.Write(nil); err != nil { + t.Fatalf("Write(nil) error: %v", err) + } + + entries := collectReplay(t, r) + if len(entries) != 2 { + t.Fatalf("expected 2 entries, got %d", len(entries)) + } + if len(entries[0]) != 0 { + t.Errorf("expected empty entry, got %d bytes", len(entries[0])) + } +} + +func TestWrite_LargeData(t *testing.T) { + r := newTestRebuf(t, WithMaxLogSize(100000)) + + // Write a 10KB entry. + data := make([]byte, 10000) + for i := range data { + data[i] = byte(i % 256) + } + + if err := r.Write(data); err != nil { + t.Fatalf("Write(large) error: %v", err) + } + + entries := collectReplay(t, r) + if len(entries) != 1 { + t.Fatalf("expected 1 entry, got %d", len(entries)) + } + if !bytes.Equal(entries[0], data) { + t.Error("large data entry corrupted during write/replay") + } +} + +func TestWrite_SegmentRotation(t *testing.T) { + // MaxLogSize of 50 bytes. Each entry = 12 header + 11 data = 23 bytes. + // After 2 entries (46 bytes), the third triggers rotation. + r := newTestRebuf(t, WithMaxLogSize(50), WithMaxSegments(10)) + + for i := 0; i < 6; i++ { + if err := r.Write([]byte("hello world")); err != nil { + t.Fatalf("Write(%d) error: %v", i, err) + } + } + + if r.SegmentCount() == 0 { + t.Error("expected at least one segment after rotation") + } + + entries := collectReplay(t, r) + if len(entries) != 6 { + t.Fatalf("expected 6 entries after rotation, got %d", len(entries)) + } +} + +func TestWrite_MaxSegmentsDeletion(t *testing.T) { + // Small segments, max 2 segments retained. + r := newTestRebuf(t, WithMaxLogSize(30), WithMaxSegments(2)) + + // Write enough data to create many segments, exceeding max. + for i := 0; i < 20; i++ { + if err := r.Write([]byte(fmt.Sprintf("d%d", i))); err != nil { + t.Fatalf("Write(%d) error: %v", i, err) + } + } + + if r.SegmentCount() > 2 { + t.Errorf("expected at most 2 segments, got %d", r.SegmentCount()) + } +} + +func TestWrite_CRC32Integrity(t *testing.T) { + r := newTestRebuf(t) + + data := []byte("checksum test data") + if err := r.Write(data); err != nil { + t.Fatalf("Write() error: %v", err) + } + + // Verify the on-disk format: [8-byte size][4-byte crc32][data] + r.mu.Lock() + _ = r.bufWriter.Flush() + r.mu.Unlock() + + tmpPath := filepath.Join(r.Dir(), "rebuf.tmp") + content, err := os.ReadFile(tmpPath) + if err != nil { + t.Fatalf("ReadFile error: %v", err) + } + + if len(content) < entryHeaderSize { + t.Fatalf("file too small: %d bytes", len(content)) + } + + size := binary.BigEndian.Uint64(content[:8]) + if size != uint64(len(data)) { + t.Errorf("size prefix: expected %d, got %d", len(data), size) + } + + storedCRC := binary.BigEndian.Uint32(content[8:12]) + expectedCRC := crc32.ChecksumIEEE(data) + if storedCRC != expectedCRC { + t.Errorf("CRC32: expected %d, got %d", expectedCRC, storedCRC) + } + + if !bytes.Equal(content[12:12+size], data) { + t.Error("on-disk data does not match") + } +} + +// --- WriteBatch tests --- + +func TestWriteBatch(t *testing.T) { + r := newTestRebuf(t, WithMaxLogSize(10000)) + + batch := [][]byte{ + []byte("batch-0"), + []byte("batch-1"), + []byte("batch-2"), + } + if err := r.WriteBatch(batch); err != nil { + t.Fatalf("WriteBatch() error: %v", err) + } + + entries := collectReplay(t, r) + if len(entries) != 3 { + t.Fatalf("expected 3 entries, got %d", len(entries)) + } + for i, e := range entries { + expected := fmt.Sprintf("batch-%d", i) + if string(e) != expected { + t.Errorf("entry %d: expected %q, got %q", i, expected, string(e)) + } + } +} + +func TestWriteBatch_Empty(t *testing.T) { + r := newTestRebuf(t) + + if err := r.WriteBatch(nil); err != nil { + t.Fatalf("WriteBatch(nil) error: %v", err) + } + if err := r.WriteBatch([][]byte{}); err != nil { + t.Fatalf("WriteBatch(empty) error: %v", err) + } + + entries := collectReplay(t, r) + if len(entries) != 0 { + t.Errorf("expected 0 entries, got %d", len(entries)) + } +} + +func TestWriteBatch_WithRotation(t *testing.T) { + r := newTestRebuf(t, WithMaxLogSize(50), WithMaxSegments(10)) + + // Each entry is 12 header + data bytes. With size 50, some entries will + // trigger rotation mid-batch. + batch := make([][]byte, 10) + for i := range batch { + batch[i] = []byte(fmt.Sprintf("item-%d", i)) + } + + if err := r.WriteBatch(batch); err != nil { + t.Fatalf("WriteBatch() error: %v", err) + } + + entries := collectReplay(t, r) + if len(entries) != 10 { + t.Fatalf("expected 10 entries, got %d", len(entries)) + } +} + +// --- Replay tests --- + +func TestReplay_EmptyDirectory(t *testing.T) { + r := newTestRebuf(t) + + entries := collectReplay(t, r) + if len(entries) != 0 { + t.Errorf("expected 0 entries on empty dir, got %d", len(entries)) + } +} + +func TestReplay_MultipleSegments(t *testing.T) { + r := newTestRebuf(t, WithMaxLogSize(40), WithMaxSegments(20)) + + var expected []string + for i := 0; i < 15; i++ { + data := fmt.Sprintf("msg-%02d", i) + expected = append(expected, data) + if err := r.Write([]byte(data)); err != nil { + t.Fatalf("Write(%d) error: %v", i, err) + } + } + + entries := collectReplay(t, r) + if len(entries) != len(expected) { + t.Fatalf("expected %d entries, got %d", len(expected), len(entries)) + } + for i, e := range entries { + if string(e) != expected[i] { + t.Errorf("entry %d: expected %q, got %q", i, expected[i], string(e)) + } + } +} + +func TestReplay_CorruptCRC(t *testing.T) { + dir := t.TempDir() + logger := slog.New(slog.NewTextHandler(io.Discard, nil)) + + r, err := New(context.Background(), dir, + WithMaxLogSize(1000), WithLogger(logger)) + if err != nil { + t.Fatalf("New() error: %v", err) + } + + if err := r.Write([]byte("good data")); err != nil { + t.Fatalf("Write() error: %v", err) + } + r.Close() + + // Corrupt the CRC32 bytes (bytes 8-11) in the tmp file. + tmpPath := filepath.Join(dir, "rebuf.tmp") + content, err := os.ReadFile(tmpPath) + if err != nil { + t.Fatalf("ReadFile error: %v", err) + } + content[8] ^= 0xFF // flip a byte in the CRC + if err := os.WriteFile(tmpPath, content, 0666); err != nil { + t.Fatalf("WriteFile error: %v", err) + } + + r2, err := New(context.Background(), dir, + WithMaxLogSize(1000), WithLogger(logger)) + if err != nil { + t.Fatalf("New() error: %v", err) + } + defer r2.Close() + + err = r2.Replay(func(data []byte) error { return nil }) + if err == nil { + t.Fatal("expected CRC mismatch error, got nil") + } +} + +func TestReplay_CallbackError(t *testing.T) { + r := newTestRebuf(t) + r.Write([]byte("data")) + + expectedErr := fmt.Errorf("callback failed") + err := r.Replay(func(data []byte) error { + return expectedErr + }) + if err != expectedErr { + t.Errorf("expected callback error, got: %v", err) + } +} + +// --- ReplayFrom tests --- + +func TestReplayFrom(t *testing.T) { + r := newTestRebuf(t, WithMaxLogSize(40), WithMaxSegments(20)) + + // Write enough entries to create several segments. + for i := 0; i < 10; i++ { + if err := r.Write([]byte(fmt.Sprintf("entry-%d", i))); err != nil { + t.Fatalf("Write(%d) error: %v", i, err) + } + } + + segCount := r.SegmentCount() + if segCount < 2 { + t.Skipf("need at least 2 segments for this test, got %d", segCount) + } + + // Replay from segment 1 (skip segment 0). + var fromEntries [][]byte + err := r.ReplayFrom(1, func(data []byte) error { + cp := make([]byte, len(data)) + copy(cp, data) + fromEntries = append(fromEntries, cp) + return nil + }) + if err != nil { + t.Fatalf("ReplayFrom(1) error: %v", err) + } + + allEntries := collectReplay(t, r) + if len(fromEntries) >= len(allEntries) { + t.Errorf("ReplayFrom(1) should return fewer entries than Replay(), got %d vs %d", + len(fromEntries), len(allEntries)) + } + if len(fromEntries) == 0 { + t.Error("ReplayFrom(1) returned 0 entries") + } +} + +func TestReplayFrom_HighSegmentId(t *testing.T) { + r := newTestRebuf(t, WithMaxLogSize(40), WithMaxSegments(20)) + + for i := 0; i < 5; i++ { + r.Write([]byte(fmt.Sprintf("e-%d", i))) + } + + // Replay from a segment ID higher than any that exist (except .tmp). + var entries [][]byte + err := r.ReplayFrom(9999, func(data []byte) error { + cp := make([]byte, len(data)) + copy(cp, data) + entries = append(entries, cp) + return nil + }) + if err != nil { + t.Fatalf("ReplayFrom(9999) error: %v", err) + } + // Should still get entries from .tmp file. + if len(entries) == 0 { + t.Log("ReplayFrom with high segmentId may return entries from .tmp") + } +} + +// --- Purge tests --- + +func TestPurge(t *testing.T) { + r := newTestRebuf(t, WithMaxLogSize(50), WithMaxSegments(20)) + + for i := 0; i < 10; i++ { + r.Write([]byte("purge me")) + } + + if r.SegmentCount() == 0 { + t.Fatal("expected segments before purge") + } + + if err := r.Purge(); err != nil { + t.Fatalf("Purge() error: %v", err) + } + + if r.SegmentCount() != 0 { + t.Errorf("expected 0 segments after purge, got %d", r.SegmentCount()) + } + if r.CurrentLogSize() != 0 { + t.Errorf("expected 0 log size after purge, got %d", r.CurrentLogSize()) + } + + // Should be writable after purge. + if err := r.Write([]byte("after purge")); err != nil { + t.Fatalf("Write() after purge error: %v", err) + } + + entries := collectReplay(t, r) + if len(entries) != 1 { + t.Errorf("expected 1 entry after purge+write, got %d", len(entries)) + } +} + +func TestPurgeThrough(t *testing.T) { + r := newTestRebuf(t, WithMaxLogSize(40), WithMaxSegments(20)) + + for i := 0; i < 10; i++ { + r.Write([]byte(fmt.Sprintf("pt-%d", i))) + } + + initialSegments := r.SegmentCount() + if initialSegments < 2 { + t.Skipf("need at least 2 segments, got %d", initialSegments) + } + + // Purge through segment 0 only. + if err := r.PurgeThrough(0); err != nil { + t.Fatalf("PurgeThrough(0) error: %v", err) + } + + if r.SegmentCount() >= initialSegments { + t.Errorf("expected fewer segments after PurgeThrough, got %d (was %d)", + r.SegmentCount(), initialSegments) + } +} + +// --- Sync strategy tests --- + +func TestSyncStrategy_EveryWrite(t *testing.T) { + r := newTestRebuf(t, WithSyncStrategy(SyncEveryWrite)) + + if err := r.Write([]byte("sync every")); err != nil { + t.Fatalf("Write() error: %v", err) + } + + // Data should be on disk immediately. + entries := collectReplay(t, r) + if len(entries) != 1 { + t.Errorf("expected 1 entry, got %d", len(entries)) + } +} + +func TestSyncStrategy_Periodic(t *testing.T) { + dir := t.TempDir() + logger := slog.New(slog.NewTextHandler(io.Discard, nil)) + + r, err := New(context.Background(), dir, + WithMaxLogSize(10000), + WithSyncStrategy(SyncPeriodic), + WithFsyncTime(50*time.Millisecond), + WithLogger(logger), + ) + if err != nil { + t.Fatalf("New() error: %v", err) + } + defer r.Close() + + if err := r.Write([]byte("periodic sync")); err != nil { + t.Fatalf("Write() error: %v", err) + } + + // Wait for periodic sync to kick in. + time.Sleep(200 * time.Millisecond) + + entries := collectReplay(t, r) + if len(entries) != 1 { + t.Errorf("expected 1 entry, got %d", len(entries)) + } +} + +func TestSyncStrategy_Manual(t *testing.T) { + r := newTestRebuf(t, WithSyncStrategy(SyncManual)) + + if err := r.Write([]byte("manual sync")); err != nil { + t.Fatalf("Write() error: %v", err) + } + + // Explicitly sync. + if err := r.Sync(); err != nil { + t.Fatalf("Sync() error: %v", err) + } + + entries := collectReplay(t, r) + if len(entries) != 1 { + t.Errorf("expected 1 entry, got %d", len(entries)) + } +} + +// --- Close tests --- + +func TestClose_FlushesData(t *testing.T) { + dir := t.TempDir() + logger := slog.New(slog.NewTextHandler(io.Discard, nil)) + + r, err := New(context.Background(), dir, + WithMaxLogSize(10000), + WithSyncStrategy(SyncManual), + WithLogger(logger), + ) + if err != nil { + t.Fatalf("New() error: %v", err) + } + + r.Write([]byte("before close")) + r.Close() + + // Re-open and verify data persisted. + r2, err := New(context.Background(), dir, + WithMaxLogSize(10000), WithLogger(logger)) + if err != nil { + t.Fatalf("New() error: %v", err) + } + defer r2.Close() + + entries := collectReplay(t, r2) + if len(entries) != 1 { + t.Fatalf("expected 1 entry after close+reopen, got %d", len(entries)) + } + if string(entries[0]) != "before close" { + t.Errorf("expected %q, got %q", "before close", string(entries[0])) + } +} + +func TestClose_GracefulGoroutineShutdown(t *testing.T) { + dir := t.TempDir() + logger := slog.New(slog.NewTextHandler(io.Discard, nil)) + + r, err := New(context.Background(), dir, + WithSyncStrategy(SyncPeriodic), + WithFsyncTime(10*time.Millisecond), + WithLogger(logger), + ) + if err != nil { + t.Fatalf("New() error: %v", err) + } + + // Write some data so the goroutine has something to sync. + r.Write([]byte("goroutine test")) + time.Sleep(50 * time.Millisecond) + + // Close should not hang or panic. + done := make(chan struct{}) + go func() { + r.Close() + close(done) + }() + + select { + case <-done: + // Success. + case <-time.After(5 * time.Second): + t.Fatal("Close() timed out — goroutine may not be shutting down") + } +} + +// --- Concurrency tests --- + +func TestConcurrentWrites(t *testing.T) { + r := newTestRebuf(t, WithMaxLogSize(500), WithMaxSegments(100)) + + const goroutines = 10 + const writesPerGoroutine = 20 + + var wg sync.WaitGroup + wg.Add(goroutines) + + for g := 0; g < goroutines; g++ { + go func(id int) { + defer wg.Done() + for i := 0; i < writesPerGoroutine; i++ { + data := []byte(fmt.Sprintf("g%d-w%d", id, i)) + if err := r.Write(data); err != nil { + t.Errorf("concurrent Write() error: %v", err) + } + } + }(g) + } + + wg.Wait() + + entries := collectReplay(t, r) + expected := goroutines * writesPerGoroutine + if len(entries) != expected { + t.Errorf("expected %d entries from concurrent writes, got %d", expected, len(entries)) + } +} + +func TestConcurrentWriteBatch(t *testing.T) { + r := newTestRebuf(t, WithMaxLogSize(1000), WithMaxSegments(100)) + + const goroutines = 5 + const entriesPerBatch = 10 + + var wg sync.WaitGroup + wg.Add(goroutines) + + for g := 0; g < goroutines; g++ { + go func(id int) { + defer wg.Done() + batch := make([][]byte, entriesPerBatch) + for i := range batch { + batch[i] = []byte(fmt.Sprintf("batch-g%d-e%d", id, i)) + } + if err := r.WriteBatch(batch); err != nil { + t.Errorf("concurrent WriteBatch() error: %v", err) + } + }(g) + } + + wg.Wait() + + entries := collectReplay(t, r) + expected := goroutines * entriesPerBatch + if len(entries) != expected { + t.Errorf("expected %d entries, got %d", expected, len(entries)) + } +} + +// --- State accessor tests --- + +func TestSegmentCount(t *testing.T) { + r := newTestRebuf(t, WithMaxLogSize(40), WithMaxSegments(20)) + + if r.SegmentCount() != 0 { + t.Errorf("initial SegmentCount: expected 0, got %d", r.SegmentCount()) + } + + // Write enough to cause rotations. + for i := 0; i < 10; i++ { + r.Write([]byte(fmt.Sprintf("sc-%d", i))) + } + + if r.SegmentCount() == 0 { + t.Error("expected non-zero SegmentCount after writes") + } +} + +func TestCurrentLogSize(t *testing.T) { + r := newTestRebuf(t, WithMaxLogSize(10000)) + + if r.CurrentLogSize() != 0 { + t.Errorf("initial CurrentLogSize: expected 0, got %d", r.CurrentLogSize()) + } + + data := []byte("test data") + r.Write(data) + expected := int64(len(data)) + entryHeaderSize + if r.CurrentLogSize() != expected { + t.Errorf("CurrentLogSize: expected %d, got %d", expected, r.CurrentLogSize()) + } +} + +func TestDir(t *testing.T) { + dir := t.TempDir() + logger := slog.New(slog.NewTextHandler(io.Discard, nil)) + + r, err := New(context.Background(), dir, WithLogger(logger)) + if err != nil { + t.Fatalf("New() error: %v", err) + } + defer r.Close() + + if r.Dir() != dir { + t.Errorf("Dir(): expected %s, got %s", dir, r.Dir()) + } +} + +// --- Round-trip tests --- + +func TestRoundTrip_BinaryData(t *testing.T) { + r := newTestRebuf(t, WithMaxLogSize(10000)) + + // Write all byte values 0-255. + data := make([]byte, 256) + for i := range data { + data[i] = byte(i) + } + + if err := r.Write(data); err != nil { + t.Fatalf("Write() error: %v", err) + } + + entries := collectReplay(t, r) + if len(entries) != 1 { + t.Fatalf("expected 1 entry, got %d", len(entries)) + } + if !bytes.Equal(entries[0], data) { + t.Error("binary data corrupted in round-trip") + } +} + +func TestRoundTrip_ManySmallEntries(t *testing.T) { + r := newTestRebuf(t, WithMaxLogSize(100), WithMaxSegments(100)) + + const n = 100 + for i := 0; i < n; i++ { + r.Write([]byte(strconv.Itoa(i))) + } + + entries := collectReplay(t, r) + if len(entries) != n { + t.Fatalf("expected %d entries, got %d", n, len(entries)) + } + for i, e := range entries { + if string(e) != strconv.Itoa(i) { + t.Errorf("entry %d: expected %q, got %q", i, strconv.Itoa(i), string(e)) + } + } +} + +// --- Recovery tests --- + +func TestRecovery_AfterCrash(t *testing.T) { + dir := t.TempDir() + logger := slog.New(slog.NewTextHandler(io.Discard, nil)) + + // Simulate: write data, close without clean shutdown. + r1, _ := New(context.Background(), dir, + WithMaxLogSize(100), WithMaxSegments(10), WithLogger(logger)) + for i := 0; i < 5; i++ { + r1.Write([]byte(fmt.Sprintf("crash-%d", i))) + } + // Flush but simulate crash (don't Close cleanly — just sync and abandon). + r1.Sync() + + // Re-open. + r2, err := New(context.Background(), dir, + WithMaxLogSize(100), WithMaxSegments(10), WithLogger(logger)) + if err != nil { + t.Fatalf("New() after crash error: %v", err) + } + defer r2.Close() + + entries := collectReplay(t, r2) + if len(entries) < 5 { + t.Errorf("expected at least 5 entries after recovery, got %d", len(entries)) + } +} + +// --- Context cancellation --- + +func TestContextCancellation(t *testing.T) { + dir := t.TempDir() + logger := slog.New(slog.NewTextHandler(io.Discard, nil)) + + ctx, cancel := context.WithCancel(context.Background()) + + r, err := New(ctx, dir, + WithSyncStrategy(SyncPeriodic), + WithFsyncTime(10*time.Millisecond), + WithLogger(logger), + ) + if err != nil { + t.Fatalf("New() error: %v", err) + } + + r.Write([]byte("context test")) + + // Cancel the context. + cancel() + + // Close should still work. + done := make(chan error, 1) + go func() { + done <- r.Close() + }() + + select { + case err := <-done: + if err != nil { + t.Errorf("Close() after cancel: %v", err) + } + case <-time.After(5 * time.Second): + t.Fatal("Close() timed out after context cancellation") + } +} + +// --- Benchmarks --- + +func BenchmarkWrite(b *testing.B) { + dir := b.TempDir() + logger := slog.New(slog.NewTextHandler(io.Discard, nil)) + + r, err := New(context.Background(), dir, + WithMaxLogSize(10*1024*1024), + WithMaxSegments(100), + WithSyncStrategy(SyncManual), + WithLogger(logger), + ) + if err != nil { + b.Fatalf("New() error: %v", err) + } + defer r.Close() + + data := []byte("benchmark write data payload") + b.ResetTimer() + + for i := 0; i < b.N; i++ { + if err := r.Write(data); err != nil { + b.Fatalf("Write() error: %v", err) + } + } +} + +func BenchmarkWriteParallel(b *testing.B) { + dir := b.TempDir() + logger := slog.New(slog.NewTextHandler(io.Discard, nil)) + + r, err := New(context.Background(), dir, + WithMaxLogSize(10*1024*1024), + WithMaxSegments(100), + WithSyncStrategy(SyncManual), + WithLogger(logger), + ) + if err != nil { + b.Fatalf("New() error: %v", err) + } + defer r.Close() + + data := []byte("benchmark parallel write data") + b.ResetTimer() + + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + if err := r.Write(data); err != nil { + b.Errorf("Write() error: %v", err) + } + } + }) +} + +func BenchmarkReplay(b *testing.B) { + dir := b.TempDir() + logger := slog.New(slog.NewTextHandler(io.Discard, nil)) + + r, err := New(context.Background(), dir, + WithMaxLogSize(10*1024*1024), + WithMaxSegments(100), + WithSyncStrategy(SyncManual), + WithLogger(logger), + ) + if err != nil { + b.Fatalf("New() error: %v", err) + } + + // Pre-populate with entries. + data := []byte("benchmark replay data payload") + for i := 0; i < 10000; i++ { + r.Write(data) + } + r.Sync() + + var count atomic.Int64 + callback := func(d []byte) error { + count.Add(1) + return nil + } + + b.ResetTimer() + for i := 0; i < b.N; i++ { + count.Store(0) + if err := r.Replay(callback); err != nil { + b.Fatalf("Replay() error: %v", err) + } + } + r.Close() +} + +func BenchmarkWriteBatch(b *testing.B) { + dir := b.TempDir() + logger := slog.New(slog.NewTextHandler(io.Discard, nil)) + + r, err := New(context.Background(), dir, + WithMaxLogSize(10*1024*1024), + WithMaxSegments(100), + WithSyncStrategy(SyncManual), + WithLogger(logger), + ) + if err != nil { + b.Fatalf("New() error: %v", err) + } + defer r.Close() + + batch := make([][]byte, 100) + for i := range batch { + batch[i] = []byte("benchmark batch data payload") + } + + b.ResetTimer() + for i := 0; i < b.N; i++ { + if err := r.WriteBatch(batch); err != nil { + b.Fatalf("WriteBatch() error: %v", err) + } + } +} diff --git a/utils/utils.go b/utils/utils.go index 951cb43..d3d6366 100644 --- a/utils/utils.go +++ b/utils/utils.go @@ -1,76 +1,125 @@ +// Package utils provides filesystem helpers for rebuf's segment management. package utils import ( + "fmt" "io" "os" + "sort" "strconv" "strings" - "time" ) +// IsDirectoryEmpty reports whether dirPath contains no files other than .tmp files. func IsDirectoryEmpty(dirPath string) (bool, error) { - // Open the directory dir, err := os.Open(dirPath) if err != nil { return false, err } defer dir.Close() - // Read the directory contents - files, err := dir.ReadDir(1) // Read the first entry - var filteredFiles []os.DirEntry - for _, file := range files { - if !strings.HasSuffix(file.Name(), ".tmp") { - filteredFiles = append(filteredFiles, file) - } - } + entries, err := dir.ReadDir(-1) if err != nil && err != io.EOF { return false, err } - // If the list of files is empty, the directory is empty - return len(filteredFiles) == 0, nil + for _, entry := range entries { + if !strings.HasSuffix(entry.Name(), ".tmp") { + return false, nil + } + } + return true, nil } -func GetLatestSegmentId(logDir string) (int, error) { - files, err := os.ReadDir(logDir) +// ParseSegmentId extracts the numeric segment ID from a filename like "rebuf-3". +func ParseSegmentId(fileName string) (int, error) { + if !strings.HasPrefix(fileName, "rebuf-") { + return 0, fmt.Errorf("utils: not a segment file: %s", fileName) + } + parts := strings.SplitN(fileName, "-", 2) + if len(parts) != 2 { + return 0, fmt.Errorf("utils: invalid segment filename: %s", fileName) + } + return strconv.Atoi(parts[1]) +} + +// GetSortedSegmentFiles returns all segment files sorted by segment ID, +// with the active "rebuf.tmp" file (if present) appended at the end. +func GetSortedSegmentFiles(logDir string) ([]string, error) { + entries, err := os.ReadDir(logDir) if err != nil { - return 0, err + return nil, err } - // Filter out .tmp files - latestModifiedTime := time.Time{} - var latestFileName string - for _, file := range files { - if strings.HasSuffix(file.Name(), ".tmp") { + var segments []string + hasTmp := false + + for _, entry := range entries { + name := entry.Name() + if name == "rebuf.tmp" { + hasTmp = true continue } - fileInfo, err := file.Info() - - if err != nil { - return 0, err + if strings.HasPrefix(name, "rebuf-") { + segments = append(segments, name) } + } - if fileInfo.ModTime().After(latestModifiedTime) { - latestModifiedTime = fileInfo.ModTime() - latestFileName = file.Name() - } + sort.Slice(segments, func(i, j int) bool { + idI, _ := ParseSegmentId(segments[i]) + idJ, _ := ParseSegmentId(segments[j]) + return idI < idJ + }) + + if hasTmp { + segments = append(segments, "rebuf.tmp") } - segmentCount, err := strconv.Atoi(strings.Split(latestFileName, "-")[1]) + + return segments, nil +} + +// GetLatestSegmentId returns the highest segment ID found in logDir. +// Returns an error if no numbered segment files exist. +func GetLatestSegmentId(logDir string) (int, error) { + entries, err := os.ReadDir(logDir) if err != nil { return 0, err } - return segmentCount, nil + + maxId := -1 + for _, entry := range entries { + id, err := ParseSegmentId(entry.Name()) + if err != nil { + continue + } + if id > maxId { + maxId = id + } + } + + if maxId == -1 { + return 0, fmt.Errorf("utils: no segment files found in %s", logDir) + } + return maxId, nil } +// GetNumSegments returns the number of numbered segment files (excluding .tmp). func GetNumSegments(logDir string) (int, error) { - files, err := os.ReadDir(logDir) + entries, err := os.ReadDir(logDir) if err != nil { return 0, err } - return len(files) - 1, nil + + count := 0 + for _, entry := range entries { + if strings.HasPrefix(entry.Name(), "rebuf-") { + count++ + } + } + return count, nil } +// FileSize returns the size of the given open file in bytes. func FileSize(f *os.File) (int64, error) { fi, err := f.Stat() if err != nil { @@ -79,29 +128,30 @@ func FileSize(f *os.File) (int64, error) { return fi.Size(), nil } +// GetOldestSegmentFile returns the segment file with the lowest ID. +// Returns an error if no numbered segment files exist. func GetOldestSegmentFile(logDir string) (string, error) { - files, err := os.ReadDir(logDir) + entries, err := os.ReadDir(logDir) if err != nil { - return "0", err + return "", err } - // Filter out .tmp files - oldestModifedTime := time.Now() - var oldestFileName string - for _, file := range files { - if strings.HasSuffix(file.Name(), ".tmp") { - continue - } - fileInfo, err := file.Info() + minId := int(^uint(0) >> 1) // max int + var oldestFile string + for _, entry := range entries { + id, err := ParseSegmentId(entry.Name()) if err != nil { - return "", err + continue } - - if fileInfo.ModTime().Before(oldestModifedTime) { - oldestModifedTime = fileInfo.ModTime() - oldestFileName = file.Name() + if id < minId { + minId = id + oldestFile = entry.Name() } } - return oldestFileName, nil + + if oldestFile == "" { + return "", fmt.Errorf("utils: no segment files found in %s", logDir) + } + return oldestFile, nil } diff --git a/utils/utils_test.go b/utils/utils_test.go index 86aa398..26a9937 100644 --- a/utils/utils_test.go +++ b/utils/utils_test.go @@ -1,96 +1,369 @@ package utils import ( - "log" "os" "path/filepath" "testing" ) -func setupSuite(t testing.TB) func(t testing.TB) { - log.Println("Setting up logDir empty") +// --- IsDirectoryEmpty tests --- - dirPath := os.Getenv("TEST_LOG_DIR") +func TestIsDirectoryEmpty_EmptyDir(t *testing.T) { + dir := t.TempDir() - if _, err := os.Stat(filepath.Join(dirPath)); err != nil { - if os.IsNotExist(err) { - os.Mkdir(dirPath, 0700) - } - } else { - t.Fatal("Error creating dirPath in setup suite") + empty, err := IsDirectoryEmpty(dir) + if err != nil { + t.Fatalf("IsDirectoryEmpty() error: %v", err) + } + if !empty { + t.Error("expected empty directory to return true") } +} - // Return a function to teardown the test - return func(tb testing.TB) { - log.Printf("Deleting everything in %v", dirPath) - os.RemoveAll(dirPath) +func TestIsDirectoryEmpty_OnlyTmpFile(t *testing.T) { + dir := t.TempDir() + createFile(t, filepath.Join(dir, "rebuf.tmp")) + empty, err := IsDirectoryEmpty(dir) + if err != nil { + t.Fatalf("IsDirectoryEmpty() error: %v", err) + } + if !empty { + t.Error("expected directory with only .tmp file to return true") } } -func createFile(fileName string) (*os.File, error) { - file, err := os.OpenFile(fileName, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0666) +func TestIsDirectoryEmpty_WithSegmentFile(t *testing.T) { + dir := t.TempDir() + createFile(t, filepath.Join(dir, "rebuf.tmp")) + createFile(t, filepath.Join(dir, "rebuf-0")) + + empty, err := IsDirectoryEmpty(dir) if err != nil { - return nil, err + t.Fatalf("IsDirectoryEmpty() error: %v", err) + } + if empty { + t.Error("expected directory with segment file to return false") } - return file, nil } -func TestIsDirectoryEmpty(t *testing.T) { +func TestIsDirectoryEmpty_MultipleTmpFiles(t *testing.T) { + dir := t.TempDir() + createFile(t, filepath.Join(dir, "foo.tmp")) + createFile(t, filepath.Join(dir, "bar.tmp")) - teardownSuite := setupSuite(t) - defer teardownSuite(t) - dirPath := os.Getenv("TEST_LOG_DIR") + empty, err := IsDirectoryEmpty(dir) + if err != nil { + t.Fatalf("IsDirectoryEmpty() error: %v", err) + } + if !empty { + t.Error("expected directory with only .tmp files to return true") + } +} - t.Run("directory exists without .tmp file", func(t *testing.T) { - empty, err := IsDirectoryEmpty(dirPath) - if err != nil { - t.Fatalf("Error in running IsDirectoryEmpty with %v", dirPath) - } +func TestIsDirectoryEmpty_NonExistentDir(t *testing.T) { + _, err := IsDirectoryEmpty("/nonexistent/path") + if err == nil { + t.Error("expected error for nonexistent directory") + } +} - //empty should be true - if empty == false { - t.Fatalf("Expected %v. Got %v", false, empty) - } - }) +// --- ParseSegmentId tests --- - t.Run("directory exists with .tmp file", func(t *testing.T) { +func TestParseSegmentId_Valid(t *testing.T) { + tests := []struct { + filename string + expected int + }{ + {"rebuf-0", 0}, + {"rebuf-1", 1}, + {"rebuf-42", 42}, + {"rebuf-999", 999}, + } - file, err := createFile(filepath.Join(dirPath, "rebuf.tmp")) + for _, tt := range tests { + id, err := ParseSegmentId(tt.filename) if err != nil { - t.Fatalf("Error in creating file %v", file) + t.Errorf("ParseSegmentId(%q) error: %v", tt.filename, err) } - - empty, err := IsDirectoryEmpty(dirPath) - if err != nil { - t.Fatalf("Error in running IsDirectoryEmpty with %v", dirPath) + if id != tt.expected { + t.Errorf("ParseSegmentId(%q) = %d, want %d", tt.filename, id, tt.expected) } + } +} - //empty should be true - if empty == false { - t.Fatalf("Expected %v. Got %v", false, empty) +func TestParseSegmentId_Invalid(t *testing.T) { + invalid := []string{ + "rebuf.tmp", + "other-file", + "rebuf-", + "rebuf-abc", + "", + } + + for _, name := range invalid { + _, err := ParseSegmentId(name) + if err == nil { + t.Errorf("ParseSegmentId(%q) expected error, got nil", name) } - }) + } +} - t.Run("directory exists with .tmp file and data file", func(t *testing.T) { +// --- GetSortedSegmentFiles tests --- - dataFile, err := createFile(filepath.Join(dirPath, "rebuf-1")) - if err != nil { - t.Fatalf("Error in creating file %v", dataFile) - } +func TestGetSortedSegmentFiles_Empty(t *testing.T) { + dir := t.TempDir() - empty, err := IsDirectoryEmpty(dirPath) - if err != nil { - t.Fatalf("Error in running IsDirectoryEmpty with %v", err) - } + files, err := GetSortedSegmentFiles(dir) + if err != nil { + t.Fatalf("GetSortedSegmentFiles() error: %v", err) + } + if len(files) != 0 { + t.Errorf("expected 0 files, got %d", len(files)) + } +} - //empty should be false - if empty == false { - t.Fatalf("Expected %v. Got %v", false, empty) +func TestGetSortedSegmentFiles_OnlyTmp(t *testing.T) { + dir := t.TempDir() + createFile(t, filepath.Join(dir, "rebuf.tmp")) + + files, err := GetSortedSegmentFiles(dir) + if err != nil { + t.Fatalf("GetSortedSegmentFiles() error: %v", err) + } + if len(files) != 1 || files[0] != "rebuf.tmp" { + t.Errorf("expected [rebuf.tmp], got %v", files) + } +} + +func TestGetSortedSegmentFiles_Sorted(t *testing.T) { + dir := t.TempDir() + // Create out of order. + createFile(t, filepath.Join(dir, "rebuf-2")) + createFile(t, filepath.Join(dir, "rebuf-0")) + createFile(t, filepath.Join(dir, "rebuf-1")) + createFile(t, filepath.Join(dir, "rebuf.tmp")) + + files, err := GetSortedSegmentFiles(dir) + if err != nil { + t.Fatalf("GetSortedSegmentFiles() error: %v", err) + } + + expected := []string{"rebuf-0", "rebuf-1", "rebuf-2", "rebuf.tmp"} + if len(files) != len(expected) { + t.Fatalf("expected %d files, got %d: %v", len(expected), len(files), files) + } + for i, f := range files { + if f != expected[i] { + t.Errorf("files[%d] = %q, want %q", i, f, expected[i]) } - }) + } } +func TestGetSortedSegmentFiles_IgnoresNonRebufFiles(t *testing.T) { + dir := t.TempDir() + createFile(t, filepath.Join(dir, "rebuf-0")) + createFile(t, filepath.Join(dir, ".DS_Store")) + createFile(t, filepath.Join(dir, "other.log")) + createFile(t, filepath.Join(dir, "rebuf.tmp")) + + files, err := GetSortedSegmentFiles(dir) + if err != nil { + t.Fatalf("GetSortedSegmentFiles() error: %v", err) + } + + expected := []string{"rebuf-0", "rebuf.tmp"} + if len(files) != len(expected) { + t.Fatalf("expected %d files, got %d: %v", len(expected), len(files), files) + } +} + +// --- GetLatestSegmentId tests --- + func TestGetLatestSegmentId(t *testing.T) { - //test2 + dir := t.TempDir() + createFile(t, filepath.Join(dir, "rebuf-0")) + createFile(t, filepath.Join(dir, "rebuf-3")) + createFile(t, filepath.Join(dir, "rebuf-1")) + createFile(t, filepath.Join(dir, "rebuf.tmp")) + + id, err := GetLatestSegmentId(dir) + if err != nil { + t.Fatalf("GetLatestSegmentId() error: %v", err) + } + if id != 3 { + t.Errorf("expected 3, got %d", id) + } +} + +func TestGetLatestSegmentId_NoSegments(t *testing.T) { + dir := t.TempDir() + createFile(t, filepath.Join(dir, "rebuf.tmp")) + + _, err := GetLatestSegmentId(dir) + if err == nil { + t.Error("expected error when no segments exist") + } +} + +func TestGetLatestSegmentId_SingleSegment(t *testing.T) { + dir := t.TempDir() + createFile(t, filepath.Join(dir, "rebuf-5")) + + id, err := GetLatestSegmentId(dir) + if err != nil { + t.Fatalf("GetLatestSegmentId() error: %v", err) + } + if id != 5 { + t.Errorf("expected 5, got %d", id) + } +} + +// --- GetNumSegments tests --- + +func TestGetNumSegments(t *testing.T) { + dir := t.TempDir() + createFile(t, filepath.Join(dir, "rebuf-0")) + createFile(t, filepath.Join(dir, "rebuf-1")) + createFile(t, filepath.Join(dir, "rebuf-2")) + createFile(t, filepath.Join(dir, "rebuf.tmp")) + + count, err := GetNumSegments(dir) + if err != nil { + t.Fatalf("GetNumSegments() error: %v", err) + } + if count != 3 { + t.Errorf("expected 3 segments, got %d", count) + } +} + +func TestGetNumSegments_Empty(t *testing.T) { + dir := t.TempDir() + + count, err := GetNumSegments(dir) + if err != nil { + t.Fatalf("GetNumSegments() error: %v", err) + } + if count != 0 { + t.Errorf("expected 0 segments, got %d", count) + } +} + +func TestGetNumSegments_OnlyTmp(t *testing.T) { + dir := t.TempDir() + createFile(t, filepath.Join(dir, "rebuf.tmp")) + + count, err := GetNumSegments(dir) + if err != nil { + t.Fatalf("GetNumSegments() error: %v", err) + } + if count != 0 { + t.Errorf("expected 0 segments with only .tmp, got %d", count) + } +} + +func TestGetNumSegments_IgnoresNonRebufFiles(t *testing.T) { + dir := t.TempDir() + createFile(t, filepath.Join(dir, "rebuf-0")) + createFile(t, filepath.Join(dir, ".DS_Store")) + createFile(t, filepath.Join(dir, "random.txt")) + + count, err := GetNumSegments(dir) + if err != nil { + t.Fatalf("GetNumSegments() error: %v", err) + } + if count != 1 { + t.Errorf("expected 1 segment, got %d", count) + } +} + +// --- GetOldestSegmentFile tests --- + +func TestGetOldestSegmentFile(t *testing.T) { + dir := t.TempDir() + createFile(t, filepath.Join(dir, "rebuf-5")) + createFile(t, filepath.Join(dir, "rebuf-2")) + createFile(t, filepath.Join(dir, "rebuf-8")) + createFile(t, filepath.Join(dir, "rebuf.tmp")) + + oldest, err := GetOldestSegmentFile(dir) + if err != nil { + t.Fatalf("GetOldestSegmentFile() error: %v", err) + } + if oldest != "rebuf-2" { + t.Errorf("expected rebuf-2, got %s", oldest) + } +} + +func TestGetOldestSegmentFile_NoSegments(t *testing.T) { + dir := t.TempDir() + createFile(t, filepath.Join(dir, "rebuf.tmp")) + + _, err := GetOldestSegmentFile(dir) + if err == nil { + t.Error("expected error when no segments exist") + } +} + +func TestGetOldestSegmentFile_SingleSegment(t *testing.T) { + dir := t.TempDir() + createFile(t, filepath.Join(dir, "rebuf-7")) + + oldest, err := GetOldestSegmentFile(dir) + if err != nil { + t.Fatalf("GetOldestSegmentFile() error: %v", err) + } + if oldest != "rebuf-7" { + t.Errorf("expected rebuf-7, got %s", oldest) + } +} + +// --- FileSize tests --- + +func TestFileSize_EmptyFile(t *testing.T) { + dir := t.TempDir() + f := createFile(t, filepath.Join(dir, "empty")) + + size, err := FileSize(f) + if err != nil { + t.Fatalf("FileSize() error: %v", err) + } + if size != 0 { + t.Errorf("expected 0, got %d", size) + } + f.Close() +} + +func TestFileSize_WithContent(t *testing.T) { + dir := t.TempDir() + path := filepath.Join(dir, "data") + + if err := os.WriteFile(path, []byte("hello world"), 0666); err != nil { + t.Fatal(err) + } + + f, err := os.Open(path) + if err != nil { + t.Fatal(err) + } + defer f.Close() + + size, err := FileSize(f) + if err != nil { + t.Fatalf("FileSize() error: %v", err) + } + if size != 11 { + t.Errorf("expected 11, got %d", size) + } +} + +// --- helpers --- + +func createFile(t *testing.T, path string) *os.File { + t.Helper() + f, err := os.OpenFile(path, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0666) + if err != nil { + t.Fatalf("createFile(%s) error: %v", path, err) + } + return f }