Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 67 additions & 0 deletions crdt/counter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package crdt

// counterState holds the grow-only increment and decrement maps for a PN-Counter.
type counterState struct {
Inc map[string]int64 `json:"inc"`
Dec map[string]int64 `json:"dec"`
}

// Counter is a Positive-Negative Counter CRDT. Each node maintains independent
// increment and decrement totals; the observed value is sum(Inc) - sum(Dec).
type Counter struct {
inner *CRDT[counterState]
}

// NewCounter creates a new Counter for the given nodeID.
func NewCounter(nodeID string) *Counter {
return &Counter{
inner: NewCRDT(counterState{
Inc: make(map[string]int64),
Dec: make(map[string]int64),
}, nodeID),
}
}

// NodeID returns the node identifier for this Counter.
func (c *Counter) NodeID() string { return c.inner.NodeID() }

// Increment adds delta to this node's increment total. Ignored if delta <= 0.
func (c *Counter) Increment(delta int64) {
if delta <= 0 {
return
}
nodeID := c.inner.NodeID()
c.inner.Edit(func(s *counterState) {
s.Inc[nodeID] += delta
})
}

// Decrement adds delta to this node's decrement total. Ignored if delta <= 0.
func (c *Counter) Decrement(delta int64) {
if delta <= 0 {
return
}
nodeID := c.inner.NodeID()
c.inner.Edit(func(s *counterState) {
s.Dec[nodeID] += delta
})
}

// Value returns the current counter value: sum(Inc) - sum(Dec).
func (c *Counter) Value() int64 {
s := c.inner.View()
var total int64
for _, v := range s.Inc {
total += v
}
for _, v := range s.Dec {
total -= v
}
return total
}

// Merge merges the state of other into this Counter. Returns true if any
// changes were applied.
func (c *Counter) Merge(other *Counter) bool {
return c.inner.Merge(other.inner)
}
134 changes: 134 additions & 0 deletions crdt/counter_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
package crdt

import (
"sync"
"testing"
)

func TestCounter_BasicIncrement(t *testing.T) {
c := NewCounter("node-a")
c.Increment(1)
c.Increment(1)
c.Increment(1)
if got := c.Value(); got != 3 {
t.Errorf("expected 3, got %d", got)
}
}

func TestCounter_BasicDecrement(t *testing.T) {
c := NewCounter("node-a")
c.Increment(5)
c.Decrement(2)
if got := c.Value(); got != 3 {
t.Errorf("expected 3, got %d", got)
}
}

func TestCounter_ZeroAndNegativeDeltaIgnored(t *testing.T) {
c := NewCounter("node-a")
c.Increment(0)
c.Increment(-1)
c.Decrement(0)
if got := c.Value(); got != 0 {
t.Errorf("expected 0, got %d", got)
}
}

func TestCounter_MergeIndependentNodes(t *testing.T) {
a := NewCounter("node-a")
b := NewCounter("node-b")

a.Increment(3)
b.Increment(2)

a.Merge(b)
b.Merge(a)

if got := a.Value(); got != 5 {
t.Errorf("node-a: expected 5, got %d", got)
}
if got := b.Value(); got != 5 {
t.Errorf("node-b: expected 5, got %d", got)
}
}

func TestCounter_ConcurrentIncrements(t *testing.T) {
a := NewCounter("node-a")
b := NewCounter("node-b")

var wg sync.WaitGroup
wg.Add(2)
go func() {
defer wg.Done()
for i := 0; i < 10; i++ {
a.Increment(1)
}
}()
go func() {
defer wg.Done()
for i := 0; i < 10; i++ {
b.Increment(1)
}
}()
wg.Wait()

a.Merge(b)
b.Merge(a)

if got := a.Value(); got != 20 {
t.Errorf("node-a: expected 20, got %d", got)
}
if got := b.Value(); got != 20 {
t.Errorf("node-b: expected 20, got %d", got)
}
}

func TestCounter_MergeIdempotent(t *testing.T) {
a := NewCounter("node-a")
b := NewCounter("node-b")

a.Increment(4)
b.Increment(6)

a.Merge(b)
first := a.Value()

a.Merge(b)
second := a.Value()

if first != second {
t.Errorf("merge not idempotent: first=%d second=%d", first, second)
}
if first != 10 {
t.Errorf("expected 10, got %d", first)
}
}

func TestCounter_Commutativity(t *testing.T) {
a := NewCounter("node-a")
b := NewCounter("node-b")

a.Increment(7)
b.Increment(3)

// merge(A into B)
bCopy := NewCounter("node-b")
bCopy.Increment(3)
bCopy.Merge(a)

// merge(B into A)
a.Merge(b)

if a.Value() != bCopy.Value() {
t.Errorf("merge not commutative: merge(B,A)=%d merge(A,B)=%d", a.Value(), bCopy.Value())
}
}

func TestCounter_NegativeValue(t *testing.T) {
c := NewCounter("node-a")
c.Increment(2)
c.Decrement(5)
if got := c.Value(); got != -3 {
t.Errorf("expected -3, got %d", got)
}
}
71 changes: 71 additions & 0 deletions crdt/map.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package crdt

// Map is a distributed LWW key-value map CRDT built on top of [CRDT].
//
// Concurrent writes to the same key are resolved by Last-Write-Wins: the write
// with the strictly higher HLC timestamp wins. Deletions remove the key from
// the map and record a tombstone timestamp, so a delete with a newer timestamp
// wins over an older set, and a set with a newer timestamp wins over an older
// delete.
type Map[K comparable, V any] struct {
inner *CRDT[map[K]V]
}

// NewMap returns an empty Map CRDT bound to the given node ID.
func NewMap[K comparable, V any](nodeID string) *Map[K, V] {
return &Map[K, V]{
inner: NewCRDT(make(map[K]V), nodeID),
}
}

// NodeID returns the unique identifier for this Map instance.
func (m *Map[K, V]) NodeID() string { return m.inner.NodeID() }

// Set sets key to value.
func (m *Map[K, V]) Set(key K, value V) {
m.inner.Edit(func(mp *map[K]V) {
(*mp)[key] = value
})
}

// Delete removes key from the map. It is a no-op if the key does not exist.
func (m *Map[K, V]) Delete(key K) {
m.inner.Edit(func(mp *map[K]V) {
delete(*mp, key)
})
}

// Get returns the value for key and true if the key exists.
// It returns the zero value and false otherwise.
func (m *Map[K, V]) Get(key K) (V, bool) {
state := m.inner.View()
v, ok := state[key]
return v, ok
}

// Contains reports whether key exists in the map.
func (m *Map[K, V]) Contains(key K) bool {
_, ok := m.Get(key)
return ok
}

// Keys returns a slice of all live keys. The order is non-deterministic.
func (m *Map[K, V]) Keys() []K {
state := m.inner.View()
keys := make([]K, 0, len(state))
for k := range state {
keys = append(keys, k)
}
return keys
}

// Len returns the number of entries in the map.
func (m *Map[K, V]) Len() int {
return len(m.inner.View())
}

// Merge performs a full state-based LWW merge with another Map node.
// Returns true if the local state changed.
func (m *Map[K, V]) Merge(other *Map[K, V]) bool {
return m.inner.Merge(other.inner)
}
Loading
Loading