diff --git a/crdt/counter.go b/crdt/counter.go new file mode 100644 index 0000000..e827a91 --- /dev/null +++ b/crdt/counter.go @@ -0,0 +1,67 @@ +package crdt + +// counterState holds the grow-only increment and decrement maps for a PN-Counter. +type counterState struct { + Inc map[string]int64 `json:"inc"` + Dec map[string]int64 `json:"dec"` +} + +// Counter is a Positive-Negative Counter CRDT. Each node maintains independent +// increment and decrement totals; the observed value is sum(Inc) - sum(Dec). +type Counter struct { + inner *CRDT[counterState] +} + +// NewCounter creates a new Counter for the given nodeID. +func NewCounter(nodeID string) *Counter { + return &Counter{ + inner: NewCRDT(counterState{ + Inc: make(map[string]int64), + Dec: make(map[string]int64), + }, nodeID), + } +} + +// NodeID returns the node identifier for this Counter. +func (c *Counter) NodeID() string { return c.inner.NodeID() } + +// Increment adds delta to this node's increment total. Ignored if delta <= 0. +func (c *Counter) Increment(delta int64) { + if delta <= 0 { + return + } + nodeID := c.inner.NodeID() + c.inner.Edit(func(s *counterState) { + s.Inc[nodeID] += delta + }) +} + +// Decrement adds delta to this node's decrement total. Ignored if delta <= 0. +func (c *Counter) Decrement(delta int64) { + if delta <= 0 { + return + } + nodeID := c.inner.NodeID() + c.inner.Edit(func(s *counterState) { + s.Dec[nodeID] += delta + }) +} + +// Value returns the current counter value: sum(Inc) - sum(Dec). +func (c *Counter) Value() int64 { + s := c.inner.View() + var total int64 + for _, v := range s.Inc { + total += v + } + for _, v := range s.Dec { + total -= v + } + return total +} + +// Merge merges the state of other into this Counter. Returns true if any +// changes were applied. +func (c *Counter) Merge(other *Counter) bool { + return c.inner.Merge(other.inner) +} diff --git a/crdt/counter_test.go b/crdt/counter_test.go new file mode 100644 index 0000000..d077185 --- /dev/null +++ b/crdt/counter_test.go @@ -0,0 +1,134 @@ +package crdt + +import ( + "sync" + "testing" +) + +func TestCounter_BasicIncrement(t *testing.T) { + c := NewCounter("node-a") + c.Increment(1) + c.Increment(1) + c.Increment(1) + if got := c.Value(); got != 3 { + t.Errorf("expected 3, got %d", got) + } +} + +func TestCounter_BasicDecrement(t *testing.T) { + c := NewCounter("node-a") + c.Increment(5) + c.Decrement(2) + if got := c.Value(); got != 3 { + t.Errorf("expected 3, got %d", got) + } +} + +func TestCounter_ZeroAndNegativeDeltaIgnored(t *testing.T) { + c := NewCounter("node-a") + c.Increment(0) + c.Increment(-1) + c.Decrement(0) + if got := c.Value(); got != 0 { + t.Errorf("expected 0, got %d", got) + } +} + +func TestCounter_MergeIndependentNodes(t *testing.T) { + a := NewCounter("node-a") + b := NewCounter("node-b") + + a.Increment(3) + b.Increment(2) + + a.Merge(b) + b.Merge(a) + + if got := a.Value(); got != 5 { + t.Errorf("node-a: expected 5, got %d", got) + } + if got := b.Value(); got != 5 { + t.Errorf("node-b: expected 5, got %d", got) + } +} + +func TestCounter_ConcurrentIncrements(t *testing.T) { + a := NewCounter("node-a") + b := NewCounter("node-b") + + var wg sync.WaitGroup + wg.Add(2) + go func() { + defer wg.Done() + for i := 0; i < 10; i++ { + a.Increment(1) + } + }() + go func() { + defer wg.Done() + for i := 0; i < 10; i++ { + b.Increment(1) + } + }() + wg.Wait() + + a.Merge(b) + b.Merge(a) + + if got := a.Value(); got != 20 { + t.Errorf("node-a: expected 20, got %d", got) + } + if got := b.Value(); got != 20 { + t.Errorf("node-b: expected 20, got %d", got) + } +} + +func TestCounter_MergeIdempotent(t *testing.T) { + a := NewCounter("node-a") + b := NewCounter("node-b") + + a.Increment(4) + b.Increment(6) + + a.Merge(b) + first := a.Value() + + a.Merge(b) + second := a.Value() + + if first != second { + t.Errorf("merge not idempotent: first=%d second=%d", first, second) + } + if first != 10 { + t.Errorf("expected 10, got %d", first) + } +} + +func TestCounter_Commutativity(t *testing.T) { + a := NewCounter("node-a") + b := NewCounter("node-b") + + a.Increment(7) + b.Increment(3) + + // merge(A into B) + bCopy := NewCounter("node-b") + bCopy.Increment(3) + bCopy.Merge(a) + + // merge(B into A) + a.Merge(b) + + if a.Value() != bCopy.Value() { + t.Errorf("merge not commutative: merge(B,A)=%d merge(A,B)=%d", a.Value(), bCopy.Value()) + } +} + +func TestCounter_NegativeValue(t *testing.T) { + c := NewCounter("node-a") + c.Increment(2) + c.Decrement(5) + if got := c.Value(); got != -3 { + t.Errorf("expected -3, got %d", got) + } +} diff --git a/crdt/map.go b/crdt/map.go new file mode 100644 index 0000000..b791461 --- /dev/null +++ b/crdt/map.go @@ -0,0 +1,71 @@ +package crdt + +// Map is a distributed LWW key-value map CRDT built on top of [CRDT]. +// +// Concurrent writes to the same key are resolved by Last-Write-Wins: the write +// with the strictly higher HLC timestamp wins. Deletions remove the key from +// the map and record a tombstone timestamp, so a delete with a newer timestamp +// wins over an older set, and a set with a newer timestamp wins over an older +// delete. +type Map[K comparable, V any] struct { + inner *CRDT[map[K]V] +} + +// NewMap returns an empty Map CRDT bound to the given node ID. +func NewMap[K comparable, V any](nodeID string) *Map[K, V] { + return &Map[K, V]{ + inner: NewCRDT(make(map[K]V), nodeID), + } +} + +// NodeID returns the unique identifier for this Map instance. +func (m *Map[K, V]) NodeID() string { return m.inner.NodeID() } + +// Set sets key to value. +func (m *Map[K, V]) Set(key K, value V) { + m.inner.Edit(func(mp *map[K]V) { + (*mp)[key] = value + }) +} + +// Delete removes key from the map. It is a no-op if the key does not exist. +func (m *Map[K, V]) Delete(key K) { + m.inner.Edit(func(mp *map[K]V) { + delete(*mp, key) + }) +} + +// Get returns the value for key and true if the key exists. +// It returns the zero value and false otherwise. +func (m *Map[K, V]) Get(key K) (V, bool) { + state := m.inner.View() + v, ok := state[key] + return v, ok +} + +// Contains reports whether key exists in the map. +func (m *Map[K, V]) Contains(key K) bool { + _, ok := m.Get(key) + return ok +} + +// Keys returns a slice of all live keys. The order is non-deterministic. +func (m *Map[K, V]) Keys() []K { + state := m.inner.View() + keys := make([]K, 0, len(state)) + for k := range state { + keys = append(keys, k) + } + return keys +} + +// Len returns the number of entries in the map. +func (m *Map[K, V]) Len() int { + return len(m.inner.View()) +} + +// Merge performs a full state-based LWW merge with another Map node. +// Returns true if the local state changed. +func (m *Map[K, V]) Merge(other *Map[K, V]) bool { + return m.inner.Merge(other.inner) +} diff --git a/crdt/map_test.go b/crdt/map_test.go new file mode 100644 index 0000000..77c8c51 --- /dev/null +++ b/crdt/map_test.go @@ -0,0 +1,242 @@ +package crdt + +import ( + "testing" + "time" +) + +func TestMap_SetGet(t *testing.T) { + m := NewMap[string, int]("node-a") + m.Set("a", 1) + + v, ok := m.Get("a") + if !ok || v != 1 { + t.Errorf("expected (1, true), got (%d, %v)", v, ok) + } + + v2, ok2 := m.Get("b") + if ok2 || v2 != 0 { + t.Errorf("expected (0, false) for missing key, got (%d, %v)", v2, ok2) + } +} + +func TestMap_Delete(t *testing.T) { + m := NewMap[string, int]("node-a") + m.Set("a", 1) + m.Delete("a") + + if m.Contains("a") { + t.Error("expected Contains(\"a\") == false after Delete") + } +} + +func TestMap_DeleteNonExistent(t *testing.T) { + m := NewMap[string, int]("node-a") + // Should not panic. + m.Delete("missing") +} + +func TestMap_Overwrite(t *testing.T) { + m := NewMap[string, int]("node-a") + m.Set("a", 1) + m.Set("a", 2) + + v, ok := m.Get("a") + if !ok || v != 2 { + t.Errorf("expected (2, true) after overwrite, got (%d, %v)", v, ok) + } +} + +func TestMap_Keys(t *testing.T) { + m := NewMap[string, int]("node-a") + m.Set("x", 1) + m.Set("y", 2) + m.Set("z", 3) + m.Delete("y") + + keys := m.Keys() + if len(keys) != 2 { + t.Errorf("expected 2 live keys, got %d: %v", len(keys), keys) + } + for _, k := range keys { + if k == "y" { + t.Error("deleted key \"y\" should not appear in Keys()") + } + } +} + +func TestMap_MergeDisjointKeys(t *testing.T) { + a := NewMap[string, int]("node-a") + b := NewMap[string, int]("node-b") + + a.Set("x", 1) + b.Set("y", 2) + + a.Merge(b) + b.Merge(a) + + for _, node := range []*Map[string, int]{a, b} { + v, ok := node.Get("x") + if !ok || v != 1 { + t.Errorf("node %s: expected x=1, got (%d, %v)", node.NodeID(), v, ok) + } + v2, ok2 := node.Get("y") + if !ok2 || v2 != 2 { + t.Errorf("node %s: expected y=2, got (%d, %v)", node.NodeID(), v2, ok2) + } + } +} + +func TestMap_LWW_SameKey(t *testing.T) { + a := NewMap[string, string]("node-a") + b := NewMap[string, string]("node-b") + + a.Set("k", "a") + time.Sleep(2 * time.Millisecond) + b.Set("k", "b") + + a.Merge(b) + + v, ok := a.Get("k") + if !ok || v != "b" { + t.Errorf("LWW: expected \"b\" (newer) to win, got (%q, %v)", v, ok) + } +} + +func TestMap_DeleteWinsOverOlderSet(t *testing.T) { + a := NewMap[string, string]("node-a") + b := NewMap[string, string]("node-b") + + a.Set("k", "v") + time.Sleep(2 * time.Millisecond) + b.Set("k", "v") // b needs to know about the key to delete it with a newer ts + b.Delete("k") + + a.Merge(b) + + if a.Contains("k") { + t.Error("expected Contains(\"k\") == false after merging newer delete") + } +} + +func TestMap_SetWinsOverOlderDelete(t *testing.T) { + a := NewMap[string, string]("node-a") + b := NewMap[string, string]("node-b") + + // a deletes first, b re-sets with a newer timestamp + a.Set("k", "old") + a.Delete("k") + time.Sleep(2 * time.Millisecond) + b.Set("k", "new") + + a.Merge(b) + + v, ok := a.Get("k") + if !ok || v != "new" { + t.Errorf("expected newer Set to win over older Delete, got (%q, %v)", v, ok) + } +} + +func TestMap_MergeIdempotent(t *testing.T) { + a := NewMap[string, int]("node-a") + b := NewMap[string, int]("node-b") + + a.Set("x", 10) + b.Set("y", 20) + + a.Merge(b) + // Snapshot state after one merge. + keys1 := a.Len() + v1, _ := a.Get("x") + v2, _ := a.Get("y") + + // Merge again — should be idempotent. + a.Merge(b) + + if a.Len() != keys1 { + t.Errorf("idempotent: Len changed after second merge: %d vs %d", a.Len(), keys1) + } + gv1, _ := a.Get("x") + gv2, _ := a.Get("y") + if gv1 != v1 || gv2 != v2 { + t.Errorf("idempotent: values changed after second merge") + } +} + +func TestMap_Commutativity(t *testing.T) { + // Scenario 1: merge A→B then B→A + a1 := NewMap[string, int]("node-a") + b1 := NewMap[string, int]("node-b") + a1.Set("shared", 1) + time.Sleep(2 * time.Millisecond) + b1.Set("shared", 2) + a1.Set("only-a", 10) + b1.Set("only-b", 20) + + a1.Merge(b1) + b1.Merge(a1) + va1, _ := a1.Get("shared") + vb1, _ := b1.Get("shared") + + // Scenario 2: fresh nodes, merge B→A then A→B + a2 := NewMap[string, int]("node-a") + b2 := NewMap[string, int]("node-b") + a2.Set("shared", 1) + time.Sleep(2 * time.Millisecond) + b2.Set("shared", 2) + a2.Set("only-a", 10) + b2.Set("only-b", 20) + + b2.Merge(a2) + a2.Merge(b2) + va2, _ := a2.Get("shared") + vb2, _ := b2.Get("shared") + + if va1 != va2 { + t.Errorf("commutativity: a diverged: scenario1=%d scenario2=%d", va1, va2) + } + if vb1 != vb2 { + t.Errorf("commutativity: b diverged: scenario1=%d scenario2=%d", vb1, vb2) + } + if a1.Len() != a2.Len() { + t.Errorf("commutativity: Len diverged: scenario1=%d scenario2=%d", a1.Len(), a2.Len()) + } +} + +func TestMap_StringKeys(t *testing.T) { + a := NewMap[string, int]("node-a") + b := NewMap[string, int]("node-b") + + a.Set("hello", 42) + b.Set("world", 99) + + a.Merge(b) + + v1, ok1 := a.Get("hello") + v2, ok2 := a.Get("world") + if !ok1 || v1 != 42 { + t.Errorf("expected hello=42, got (%d, %v)", v1, ok1) + } + if !ok2 || v2 != 99 { + t.Errorf("expected world=99, got (%d, %v)", v2, ok2) + } +} + +func TestMap_IntKeys(t *testing.T) { + a := NewMap[int, string]("node-a") + b := NewMap[int, string]("node-b") + + a.Set(1, "one") + b.Set(2, "two") + + a.Merge(b) + + v1, ok1 := a.Get(1) + v2, ok2 := a.Get(2) + if !ok1 || v1 != "one" { + t.Errorf("expected 1=\"one\", got (%q, %v)", v1, ok1) + } + if !ok2 || v2 != "two" { + t.Errorf("expected 2=\"two\", got (%q, %v)", v2, ok2) + } +} diff --git a/crdt/set.go b/crdt/set.go new file mode 100644 index 0000000..6e7204f --- /dev/null +++ b/crdt/set.go @@ -0,0 +1,97 @@ +package crdt + +// setEntry holds an element and its live/deleted status for one Add operation. +// The unique HLC-based tag for this add is stored as the map key in setInner, +// not as a field here — it survives serialisation via the map key string. +type setEntry[T any] struct { + Elem T `json:"e"` + Deleted bool `json:"d,omitempty"` +} + +// setInner is the state type managed by the underlying CRDT. +type setInner[T comparable] struct { + Entries map[string]setEntry[T] `json:"entries"` +} + +// Set is an Add-Wins Observed-Remove Set (OR-Set) CRDT built on top of [CRDT]. +// +// Each Add creates a uniquely-tagged entry using the node's HLC. Remove only +// tombstones entries that exist at call time; a concurrent Add from another +// node produces a different tag, so after Merge the element is still present +// (add wins over remove). +type Set[T comparable] struct { + inner *CRDT[setInner[T]] +} + +// NewSet returns an empty Set CRDT bound to the given node ID. +func NewSet[T comparable](nodeID string) *Set[T] { + return &Set[T]{ + inner: NewCRDT(setInner[T]{ + Entries: make(map[string]setEntry[T]), + }, nodeID), + } +} + +// NodeID returns the unique identifier for this Set instance. +func (s *Set[T]) NodeID() string { return s.inner.NodeID() } + +// Add appends a new uniquely-tagged entry for elem. +// The tag is the current HLC timestamp serialised as a string map key. +func (s *Set[T]) Add(elem T) { + id := s.inner.Clock().Now() + s.inner.Edit(func(si *setInner[T]) { + si.Entries[id.String()] = setEntry[T]{Elem: elem} + }) +} + +// Remove marks all non-deleted entries whose Elem equals elem as deleted. +// Only entries visible at call time are tombstoned; concurrent adds on other +// nodes create entries with different tags that this Remove never sees. +func (s *Set[T]) Remove(elem T) { + s.inner.Edit(func(si *setInner[T]) { + for k, e := range si.Entries { + if !e.Deleted && e.Elem == elem { + e.Deleted = true + si.Entries[k] = e + } + } + }) +} + +// Contains reports whether elem has at least one live (non-deleted) entry. +func (s *Set[T]) Contains(elem T) bool { + state := s.inner.View() + for _, e := range state.Entries { + if !e.Deleted && e.Elem == elem { + return true + } + } + return false +} + +// Items returns a deduplicated slice of all live elements. +func (s *Set[T]) Items() []T { + state := s.inner.View() + seen := make(map[T]struct{}) + for _, e := range state.Entries { + if !e.Deleted { + seen[e.Elem] = struct{}{} + } + } + out := make([]T, 0, len(seen)) + for elem := range seen { + out = append(out, elem) + } + return out +} + +// Len returns the number of distinct live elements. +func (s *Set[T]) Len() int { + return len(s.Items()) +} + +// Merge performs a full state-based OR-Set merge with another Set node. +// Returns true if the local state changed. +func (s *Set[T]) Merge(other *Set[T]) bool { + return s.inner.Merge(other.inner) +} diff --git a/crdt/set_test.go b/crdt/set_test.go new file mode 100644 index 0000000..3793d50 --- /dev/null +++ b/crdt/set_test.go @@ -0,0 +1,173 @@ +package crdt + +import ( + "sort" + "testing" +) + +func TestSet_AddContains(t *testing.T) { + s := NewSet[string]("node-a") + s.Add("a") + s.Add("b") + s.Add("c") + + for _, elem := range []string{"a", "b", "c"} { + if !s.Contains(elem) { + t.Errorf("expected Contains(%q) == true", elem) + } + } + if s.Contains("d") { + t.Error("expected Contains(\"d\") == false") + } +} + +func TestSet_Remove(t *testing.T) { + s := NewSet[string]("node-a") + s.Add("x") + s.Remove("x") + if s.Contains("x") { + t.Error("expected Contains(\"x\") == false after Remove") + } +} + +func TestSet_ReAddAfterRemove(t *testing.T) { + s := NewSet[string]("node-a") + s.Add("x") + s.Remove("x") + s.Add("x") + if !s.Contains("x") { + t.Error("expected Contains(\"x\") == true after re-Add") + } +} + +func TestSet_Items(t *testing.T) { + s := NewSet[string]("node-a") + s.Add("alpha") + s.Add("beta") + s.Add("gamma") + if len(s.Items()) != 3 { + t.Errorf("expected 3 items, got %d", len(s.Items())) + } +} + +func TestSet_DuplicateAdd(t *testing.T) { + s := NewSet[string]("node-a") + s.Add("dup") + s.Add("dup") + + items := s.Items() + if len(items) != 1 { + t.Errorf("expected 1 distinct item after duplicate adds, got %d", len(items)) + } + if !s.Contains("dup") { + t.Error("expected Contains(\"dup\") == true") + } +} + +func TestSet_MergeConvergence(t *testing.T) { + nodeA := NewSet[string]("node-a") + nodeB := NewSet[string]("node-b") + + nodeA.Add("a") + nodeB.Add("b") + + nodeA.Merge(nodeB) + nodeB.Merge(nodeA) + + for _, s := range []*Set[string]{nodeA, nodeB} { + if !s.Contains("a") { + t.Errorf("node %s: expected Contains(\"a\")", s.NodeID()) + } + if !s.Contains("b") { + t.Errorf("node %s: expected Contains(\"b\")", s.NodeID()) + } + } +} + +func TestSet_AddWins(t *testing.T) { + // Step 1: build shared initial state on node-a, then sync to node-b. + nodeA := NewSet[string]("node-a") + nodeA.Add("x") + + nodeB := NewSet[string]("node-b") + nodeB.Merge(nodeA) // nodeB now has "x" too + + // Step 2: node-a removes "x" (tombstones the shared entry). + nodeA.Remove("x") + + // Step 3: node-b independently adds "x" again (new entry, different HLC tag). + nodeB.Add("x") + + // Step 4: bidirectional merge. + nodeA.Merge(nodeB) + nodeB.Merge(nodeA) + + // Step 5: add-wins — both nodes must still contain "x". + if !nodeA.Contains("x") { + t.Error("add-wins violated: node-a does not contain \"x\" after merge") + } + if !nodeB.Contains("x") { + t.Error("add-wins violated: node-b does not contain \"x\" after merge") + } +} + +func TestSet_MergeIdempotent(t *testing.T) { + nodeA := NewSet[string]("node-a") + nodeB := NewSet[string]("node-b") + + nodeA.Add("hello") + nodeB.Add("world") + + nodeA.Merge(nodeB) + nodeA.Merge(nodeB) // second merge should be a no-op + + items := nodeA.Items() + sort.Strings(items) + if len(items) != 2 || items[0] != "hello" || items[1] != "world" { + t.Errorf("idempotent merge produced unexpected items: %v", items) + } +} + +func TestSet_RemoveNonExistent(t *testing.T) { + s := NewSet[string]("node-a") + // Should not panic. + s.Remove("ghost") + if s.Contains("ghost") { + t.Error("expected Contains(\"ghost\") == false") + } +} + +func TestSet_Commutativity(t *testing.T) { + // Build two independent nodes with different elements. + nodeA1 := NewSet[string]("node-a") + nodeB1 := NewSet[string]("node-b") + nodeA1.Add("a") + nodeB1.Add("b") + + nodeA2 := NewSet[string]("node-a") + nodeB2 := NewSet[string]("node-b") + nodeA2.Add("a") + nodeB2.Add("b") + + // Order 1: A→B then B→A on copies 1. + nodeA1.Merge(nodeB1) + nodeB1.Merge(nodeA1) + + // Order 2: B→A then A→B on copies 2. + nodeB2.Merge(nodeA2) + nodeA2.Merge(nodeB2) + + items1 := nodeA1.Items() + items2 := nodeA2.Items() + sort.Strings(items1) + sort.Strings(items2) + + if len(items1) != len(items2) { + t.Fatalf("commutativity violated: len %d vs %d", len(items1), len(items2)) + } + for i := range items1 { + if items1[i] != items2[i] { + t.Errorf("commutativity violated at index %d: %q vs %q", i, items1[i], items2[i]) + } + } +}