Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion .github/workflows/go.yml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,10 @@ jobs:
run: go build -v ./...

- name: Test
run: go test -cover -coverprofile=coverage.txt ./...
run: go test -race -cover -coverprofile=coverage.txt ./...

- name: Vet
run: go vet ./...

- name: Archive code coverage results
uses: actions/upload-artifact@v4
Expand Down
39 changes: 39 additions & 0 deletions CONTRIBUTING.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
# Contributing to rebuf

Thanks for your interest in contributing! Here's how to get started.

## Development Setup

1. Clone the repository:
```bash
git clone https://github.com/stym06/rebuf.git
cd rebuf
```

2. Make sure you have Go 1.22+ installed.

## Running Tests

```bash
make test
```

To run tests with the race detector:

```bash
go test -race ./...
```

## Code Style

- Follow standard Go conventions (`gofmt`, `go vet`).
- Add godoc comments to all exported types and functions.
- Keep the project dependency-free (standard library only).

## Pull Requests

1. Create a feature branch from `main`.
2. Make your changes with clear commit messages.
3. Add or update tests for any new functionality.
4. Ensure all tests pass and the race detector is clean.
5. Open a PR with a description of your changes.
110 changes: 71 additions & 39 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,72 +2,104 @@

[![Go](https://github.com/stym06/rebuf/actions/workflows/go.yml/badge.svg)](https://github.com/stym06/rebuf/actions/workflows/go.yml)

`rebuf` is a Golang implementation of WAL (Write Ahead||After Logging) which can also be used to log data bytes during a downstream service issue which can later be replayed on-demand
`rebuf` is a lightweight Go implementation of a Write-Ahead Log (WAL) that persists data to segmented log files and supports on-demand replay. It can be used as a durable buffer during downstream service outages — log data bytes while the service is down, then replay them when it recovers.

## Features

- Create and replay log data on any filesystem.
- Lightweight and easy to use.
- Efficient storage and retrieval of log data.
- **Segmented WAL** — automatic segment rotation and retention with configurable limits.
- **CRC32 checksums** — every entry is integrity-checked on replay.
- **Concurrent-safe** — all operations are protected by a mutex.
- **Configurable sync strategies** — choose between sync-every-write, periodic sync, or manual sync.
- **Selective replay** — replay all entries or start from a specific segment.
- **Batch writes** — write multiple entries in a single operation to amortize sync cost.
- **Purge API** — clean up segments after successful replay.
- **Functional options** — sensible defaults with optional configuration.
- **Zero external dependencies** — uses only the Go standard library.

## Installation

1. Clone the repository: `git clone https://github.com/stym06/rebuf.git`
2. Navigate to the project directory: `cd rebuf`
3. Install the necessary dependencies by running: `go mod download`
```bash
go get github.com/stym06/rebuf
```

## Usage

```
func writeToStdout(data []byte) error {
slog.Info(string(data))
return nil
}
```go
package main

func main() {
import (
"context"
"log/slog"
"os"
"time"

"github.com/stym06/rebuf/rebuf"
)

func main() {
logger := slog.New(slog.NewTextHandler(os.Stdout, nil))

//Init the RebufOptions
rebufOptions := &rebuf.RebufOptions{
LogDir: "/Users/satyamraj/personal/rebuf/data",
FsyncTime: 5 * time.Second,
MaxLogSize: 50,
MaxSegments: 5,
Logger: logger,
r, err := rebuf.New(
context.Background(),
"./data",
rebuf.WithMaxLogSize(1024*1024), // 1 MB per segment
rebuf.WithMaxSegments(5), // keep at most 5 segments
rebuf.WithFsyncTime(5*time.Second), // periodic sync interval
rebuf.WithSyncStrategy(rebuf.SyncEveryWrite),
rebuf.WithLogger(logger),
)
if err != nil {
logger.Error("failed to create rebuf", "error", err)
os.Exit(1)
}
defer r.Close()

//Init Rebuf
rebuf, err := rebuf.Init(rebufOptions)
if err != nil {
logger.Info("Error during Rebuf creation: " + err.Error())
// Write entries.
for i := 0; i < 100; i++ {
if err := r.Write([]byte("Hello world")); err != nil {
logger.Error("write failed", "error", err)
}
}

defer rebuf.Close()
// Replay all entries.
r.Replay(func(data []byte) error {
slog.Info(string(data))
return nil
})
}
```

// Write Bytes
for i := 0; i < 30; i++ {
logger.Info("Writing data: ", "iter", i)
go rebuf.Write([]byte("Hello world"))
time.Sleep(300 * time.Millisecond)
}
## API Overview

//Replay and write to stdout
rebuf.Replay(writeToStdout)
| Method | Description |
|--------|-------------|
| `New(ctx, logDir, opts...)` | Create a new Rebuf instance |
| `Write(data)` | Write a single entry |
| `WriteBatch(entries)` | Write multiple entries in one operation |
| `Replay(callback)` | Replay all entries from all segments |
| `ReplayFrom(segmentId, callback)` | Replay entries starting from a segment |
| `Purge()` | Delete all segments and reset |
| `PurgeThrough(segmentId)` | Delete segments up through the given ID |
| `Sync()` | Manually flush and fsync to disk |
| `Close()` | Flush, sync, and shut down |
| `SegmentCount()` | Number of completed segments |
| `CurrentLogSize()` | Bytes written to the active segment |
| `Dir()` | Log directory path |

if err != nil {
logger.Info(err.Error())
}
## On-Disk Format

time.Sleep(30 * time.Second)
Each entry is stored as:

}
```
[8-byte size (big-endian uint64)][4-byte CRC32 (IEEE)][data bytes]
```

Segment files are named `rebuf-0`, `rebuf-1`, etc. The active file is always `rebuf.tmp`.

## License

This project is licensed under the MIT License. See the `LICENSE` file for more information.

## Contact Information
## Contact

If you have any questions or concerns, please feel free to reach out to the author on GitHub: [@stym06](https://github.com/stym06).
176 changes: 176 additions & 0 deletions ROADMAP.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,176 @@
# rebuf Roadmap

This document outlines the planned improvements and features for rebuf, organized by priority.

---

## Phase 1: Correctness & Reliability

These are foundational issues that should be addressed before adding new features.

### 1.1 Fix concurrency in `Write()`
The current `Write()` method only acquires the mutex for the final `Sync()` call, but the entire write path — including segment rotation, seeking, and buffered writes — is unprotected. Concurrent goroutine writes (as shown in `example.go`) can corrupt data or cause races.

**Action:** Protect the entire `Write()` method with the mutex, not just the sync portion.

### 1.2 Graceful shutdown of `syncPeriodically` goroutine
The background goroutine started in `Init()` runs forever with no shutdown signal. While `Close()` stops the ticker, the goroutine itself is never terminated, leaking it.

**Action:** Use a `context.Context` or a done channel to cleanly stop the goroutine in `Close()`.

### 1.3 Improve error handling in `Init()`
- `os.Mkdir` errors are silently ignored — a permission failure would go unnoticed.
- The temp file is opened twice (once in `Init`, once in `openExistingOrCreateNew`).

**Action:** Propagate `os.Mkdir` errors; remove the duplicate file open.

### 1.4 Data integrity via checksums
There is currently no mechanism to detect corrupted log entries. A single flipped bit in the 8-byte size prefix could cause the reader to consume arbitrary data.

**Action:** Add a CRC32 checksum per entry (e.g., `[8-byte size][4-byte crc32][data]`) and validate on read/replay.

---

## Phase 2: Testing

### 2.1 Core package unit tests
The `rebuf` package has zero test coverage. The following test cases are needed:

- `Init` with a fresh directory
- `Init` recovering from an existing directory with segments
- `Write` a single entry, then `Replay` it back
- `Write` enough data to trigger segment rotation
- `Write` enough data to trigger oldest-segment deletion (`maxSegments`)
- `Close` flushes and syncs correctly
- Round-trip: arbitrary `[]byte` data survives write + replay

### 2.2 Concurrency tests
- Multiple goroutines calling `Write` concurrently (use `-race` detector)
- `Write` and `Replay` interleaving

### 2.3 Edge case tests
- Zero-length data write
- Very large single write exceeding `MaxLogSize`
- `Replay` on an empty directory
- `Replay` on a directory with only a `.tmp` file
- Corrupt segment file handling

### 2.4 Benchmarks
- `BenchmarkWrite` — single-entry write throughput
- `BenchmarkWriteParallel` — concurrent write throughput
- `BenchmarkReplay` — replay throughput over N entries

### 2.5 Expand utils tests
- `TestGetLatestSegmentId` (currently a stub)
- `TestGetNumSegments`
- `TestGetOldestSegmentFile`
- `TestFileSize`

---

## Phase 3: API Improvements

### 3.1 Follow Go naming conventions
Rename `Init` to `New` or `Open` to follow standard Go constructor patterns. The current `Init` name implies it modifies global state.

### 3.2 Add `context.Context` support
- `Init` / `New` should accept a context for cancellation.
- `Write` and `Replay` should accept a context so callers can cancel long-running operations.

### 3.3 Expose read-only state
Add methods to query internal state without breaking encapsulation:
- `SegmentCount() int`
- `CurrentLogSize() int64`
- `LogDir() string`

### 3.4 Functional options pattern
Replace `RebufOptions` struct with functional options (`WithMaxLogSize(int64)`, `WithFsyncTime(time.Duration)`, etc.) to allow sensible defaults and optional configuration.

### 3.5 Configurable sync strategy
Currently every `Write` calls both `Flush()` and `Sync()`, which negates the benefit of buffered I/O. Offer configurable strategies:
- **Sync every write** (current behavior, safest)
- **Periodic sync only** (higher throughput, small durability window)
- **Manual sync** (caller controls when to sync)

---

## Phase 4: Features

### 4.1 Selective replay
Allow replaying from a specific segment ID or offset, rather than always replaying all segments from the beginning.

```go
func (r *Rebuf) ReplayFrom(segmentId int, callbackFn func([]byte) error) error
```

### 4.2 Truncate / Purge after replay
After a successful replay, callers often want to delete the replayed segments. Provide an API for this:

```go
func (r *Rebuf) Purge() error // delete all segments
func (r *Rebuf) PurgeThrough(segmentId int) // delete segments up to ID
```

### 4.3 Compression
Optional per-segment or per-entry compression (e.g., snappy or zstd) to reduce disk usage for large payloads.

### 4.4 Batch writes
Allow writing multiple entries in a single call to amortize the cost of flushing and syncing:

```go
func (r *Rebuf) WriteBatch(entries [][]byte) error
```

### 4.5 Metrics / observability hooks
Expose counters and hooks for monitoring:
- Entries written / replayed
- Bytes written
- Segment rotations
- Segment deletions
- Sync latency

This could be a callback interface or integration with Go's `expvar` / OpenTelemetry.

### 4.6 Entry metadata
Support optional per-entry metadata (timestamp, sequence number) to enable ordered replay and deduplication in distributed scenarios.

---

## Phase 5: Documentation & Project Health

### 5.1 Godoc comments
Add doc comments to all exported types and functions (`Rebuf`, `RebufOptions`, `Init`, `Write`, `Replay`, `Close`).

### 5.2 Architecture document
Write a short design doc explaining the on-disk format, segment lifecycle, and durability guarantees.

### 5.3 Improved README
- Remove hardcoded paths from the usage example.
- Add a "go get" installation command (`go get github.com/stym06/rebuf`).
- Add a section on when to use rebuf vs. alternatives.
- Add badges for Go reference docs.

### 5.4 Contributing guide
Add `CONTRIBUTING.md` covering how to run tests, coding conventions, and the PR process.

### 5.5 Versioning and releases
- Tag releases with semver (start with `v0.1.0`).
- Add a GitHub Actions workflow for creating releases on tag push.
- Add a `CHANGELOG.md`.

### 5.6 Linting in CI
Add `golangci-lint` to the CI pipeline to catch issues early.

---

## Summary

| Phase | Focus | Complexity |
|-------|-------|------------|
| 1 | Correctness & Reliability | Low–Medium |
| 2 | Testing | Medium |
| 3 | API Improvements | Medium |
| 4 | Features | Medium–High |
| 5 | Documentation & Project Health | Low |

Phases 1 and 2 should be prioritized — correctness and testing are prerequisites for everything else. Phase 3 involves breaking API changes, so it makes sense to batch those together before a `v1.0` release. Phase 4 features can be added incrementally. Phase 5 items can be done in parallel with any phase.
Loading
Loading