From b4d1a6f2ffc69c8c795d3f97cb1495b7c331f5ce Mon Sep 17 00:00:00 2001 From: Bruno Albuquerque Date: Wed, 25 Mar 2026 06:54:55 -0400 Subject: [PATCH 1/4] feat(crdt): add PN-Counter CRDT Counter wraps CRDT[counterState] where counterState holds two map[string]int64 maps (Inc, Dec), one slot per node ID. Because each node writes only to its own slot, LWW in CRDT.Merge is always correct: the newer write always carries the larger value, so the max-semantics of a G-Counter fall out of the existing infrastructure for free. Value() = sum(Inc) - sum(Dec). Tests: basic increment/decrement, zero/negative delta no-op, two-node merge, concurrent goroutine increments, idempotency, commutativity, negative values. --- crdt/counter.go | 67 ++++++++++++++++++++++ crdt/counter_test.go | 134 +++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 201 insertions(+) create mode 100644 crdt/counter.go create mode 100644 crdt/counter_test.go diff --git a/crdt/counter.go b/crdt/counter.go new file mode 100644 index 0000000..e827a91 --- /dev/null +++ b/crdt/counter.go @@ -0,0 +1,67 @@ +package crdt + +// counterState holds the grow-only increment and decrement maps for a PN-Counter. +type counterState struct { + Inc map[string]int64 `json:"inc"` + Dec map[string]int64 `json:"dec"` +} + +// Counter is a Positive-Negative Counter CRDT. Each node maintains independent +// increment and decrement totals; the observed value is sum(Inc) - sum(Dec). +type Counter struct { + inner *CRDT[counterState] +} + +// NewCounter creates a new Counter for the given nodeID. +func NewCounter(nodeID string) *Counter { + return &Counter{ + inner: NewCRDT(counterState{ + Inc: make(map[string]int64), + Dec: make(map[string]int64), + }, nodeID), + } +} + +// NodeID returns the node identifier for this Counter. +func (c *Counter) NodeID() string { return c.inner.NodeID() } + +// Increment adds delta to this node's increment total. Ignored if delta <= 0. +func (c *Counter) Increment(delta int64) { + if delta <= 0 { + return + } + nodeID := c.inner.NodeID() + c.inner.Edit(func(s *counterState) { + s.Inc[nodeID] += delta + }) +} + +// Decrement adds delta to this node's decrement total. Ignored if delta <= 0. +func (c *Counter) Decrement(delta int64) { + if delta <= 0 { + return + } + nodeID := c.inner.NodeID() + c.inner.Edit(func(s *counterState) { + s.Dec[nodeID] += delta + }) +} + +// Value returns the current counter value: sum(Inc) - sum(Dec). +func (c *Counter) Value() int64 { + s := c.inner.View() + var total int64 + for _, v := range s.Inc { + total += v + } + for _, v := range s.Dec { + total -= v + } + return total +} + +// Merge merges the state of other into this Counter. Returns true if any +// changes were applied. +func (c *Counter) Merge(other *Counter) bool { + return c.inner.Merge(other.inner) +} diff --git a/crdt/counter_test.go b/crdt/counter_test.go new file mode 100644 index 0000000..d077185 --- /dev/null +++ b/crdt/counter_test.go @@ -0,0 +1,134 @@ +package crdt + +import ( + "sync" + "testing" +) + +func TestCounter_BasicIncrement(t *testing.T) { + c := NewCounter("node-a") + c.Increment(1) + c.Increment(1) + c.Increment(1) + if got := c.Value(); got != 3 { + t.Errorf("expected 3, got %d", got) + } +} + +func TestCounter_BasicDecrement(t *testing.T) { + c := NewCounter("node-a") + c.Increment(5) + c.Decrement(2) + if got := c.Value(); got != 3 { + t.Errorf("expected 3, got %d", got) + } +} + +func TestCounter_ZeroAndNegativeDeltaIgnored(t *testing.T) { + c := NewCounter("node-a") + c.Increment(0) + c.Increment(-1) + c.Decrement(0) + if got := c.Value(); got != 0 { + t.Errorf("expected 0, got %d", got) + } +} + +func TestCounter_MergeIndependentNodes(t *testing.T) { + a := NewCounter("node-a") + b := NewCounter("node-b") + + a.Increment(3) + b.Increment(2) + + a.Merge(b) + b.Merge(a) + + if got := a.Value(); got != 5 { + t.Errorf("node-a: expected 5, got %d", got) + } + if got := b.Value(); got != 5 { + t.Errorf("node-b: expected 5, got %d", got) + } +} + +func TestCounter_ConcurrentIncrements(t *testing.T) { + a := NewCounter("node-a") + b := NewCounter("node-b") + + var wg sync.WaitGroup + wg.Add(2) + go func() { + defer wg.Done() + for i := 0; i < 10; i++ { + a.Increment(1) + } + }() + go func() { + defer wg.Done() + for i := 0; i < 10; i++ { + b.Increment(1) + } + }() + wg.Wait() + + a.Merge(b) + b.Merge(a) + + if got := a.Value(); got != 20 { + t.Errorf("node-a: expected 20, got %d", got) + } + if got := b.Value(); got != 20 { + t.Errorf("node-b: expected 20, got %d", got) + } +} + +func TestCounter_MergeIdempotent(t *testing.T) { + a := NewCounter("node-a") + b := NewCounter("node-b") + + a.Increment(4) + b.Increment(6) + + a.Merge(b) + first := a.Value() + + a.Merge(b) + second := a.Value() + + if first != second { + t.Errorf("merge not idempotent: first=%d second=%d", first, second) + } + if first != 10 { + t.Errorf("expected 10, got %d", first) + } +} + +func TestCounter_Commutativity(t *testing.T) { + a := NewCounter("node-a") + b := NewCounter("node-b") + + a.Increment(7) + b.Increment(3) + + // merge(A into B) + bCopy := NewCounter("node-b") + bCopy.Increment(3) + bCopy.Merge(a) + + // merge(B into A) + a.Merge(b) + + if a.Value() != bCopy.Value() { + t.Errorf("merge not commutative: merge(B,A)=%d merge(A,B)=%d", a.Value(), bCopy.Value()) + } +} + +func TestCounter_NegativeValue(t *testing.T) { + c := NewCounter("node-a") + c.Increment(2) + c.Decrement(5) + if got := c.Value(); got != -3 { + t.Errorf("expected -3, got %d", got) + } +} From 7247bed0781665971fee9e28c5b0d9ec9be2b056 Mon Sep 17 00:00:00 2001 From: Bruno Albuquerque Date: Wed, 25 Mar 2026 06:55:57 -0400 Subject: [PATCH 2/4] feat(crdt): add Add-Wins OR-Set CRDT Set[T comparable] is an Observed-Remove Set with add-wins conflict resolution. Each Add creates a uniquely HLC-tagged entry; Remove only tombstones entries visible at call time. A concurrent Add from another node produces a different tag, so after Merge the element survives (add wins over concurrent remove). Implemented as a standalone type (map[string]setEntry[T] keyed by HLC.String()) rather than wrapping CRDT[T]: the LWW clock-filter in CRDT.Merge discards remote entries that have no local clock entry, which breaks union semantics for disjoint adds. Tests: add/contains, remove, re-add after remove, Items dedup, duplicate add, two-node convergence, add-wins property, idempotency, remove-non-existent no-op, commutativity. --- crdt/set.go | 143 +++++++++++++++++++++++++++++++++++++++ crdt/set_test.go | 173 +++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 316 insertions(+) create mode 100644 crdt/set.go create mode 100644 crdt/set_test.go diff --git a/crdt/set.go b/crdt/set.go new file mode 100644 index 0000000..ebed0ed --- /dev/null +++ b/crdt/set.go @@ -0,0 +1,143 @@ +package crdt + +import ( + "sync" + + "github.com/brunoga/deep/v5/crdt/hlc" +) + +// setEntry is one tagged add-operation in the OR-Set log. +// The ID field is the unique HLC-based tag assigned at Add time; it acts as +// the unique identity for each add operation, giving the Add-Wins property +// during merge. +type setEntry[T any] struct { + ID hlc.HLC `json:"id"` + Elem T `json:"e"` + Deleted bool `json:"d,omitempty"` +} + +// Set is an Add-Wins Observed-Remove Set (OR-Set) CRDT. +// +// Each Add creates a uniquely-tagged entry using the node's HLC. Remove only +// tombstones entries that exist at call time; a concurrent Add from another +// node produces a different tag, so after Merge the element is still present +// (add wins over remove). +type Set[T comparable] struct { + mu sync.RWMutex + entries map[string]setEntry[T] // keyed by HLC.String() + nodeID string + clock *hlc.Clock +} + +// NewSet returns an empty Set CRDT bound to the given node ID. +func NewSet[T comparable](nodeID string) *Set[T] { + return &Set[T]{ + entries: make(map[string]setEntry[T]), + nodeID: nodeID, + clock: hlc.NewClock(nodeID), + } +} + +// Add appends a new uniquely-tagged entry for elem. +func (s *Set[T]) Add(elem T) { + s.mu.Lock() + defer s.mu.Unlock() + id := s.clock.Now() + s.entries[id.String()] = setEntry[T]{ID: id, Elem: elem} +} + +// Remove marks all non-deleted entries whose Elem equals elem as deleted. +// Only entries visible at call time are tombstoned; concurrent adds on other +// nodes create entries with different tags that this Remove never sees. +func (s *Set[T]) Remove(elem T) { + s.mu.Lock() + defer s.mu.Unlock() + for k, e := range s.entries { + if !e.Deleted && e.Elem == elem { + e.Deleted = true + s.entries[k] = e + } + } +} + +// Contains reports whether elem has at least one live (non-deleted) entry. +func (s *Set[T]) Contains(elem T) bool { + s.mu.RLock() + defer s.mu.RUnlock() + for _, e := range s.entries { + if !e.Deleted && e.Elem == elem { + return true + } + } + return false +} + +// Items returns a deduplicated slice of all live elements. +func (s *Set[T]) Items() []T { + s.mu.RLock() + defer s.mu.RUnlock() + seen := make(map[T]struct{}) + for _, e := range s.entries { + if !e.Deleted { + seen[e.Elem] = struct{}{} + } + } + out := make([]T, 0, len(seen)) + for elem := range seen { + out = append(out, elem) + } + return out +} + +// Len returns the number of distinct live elements. +func (s *Set[T]) Len() int { + return len(s.Items()) +} + +// Merge performs a full state-based OR-Set merge with another Set node. +// +// For each entry in other: +// - If the entry is absent locally, add it (union semantics). +// - If the entry is present locally and remote has Deleted=true, mark local +// as deleted too (tombstone propagation). +// - If local already has the entry as deleted, it stays deleted. +// +// A remote live entry always wins over a local absence (add-wins property): +// concurrent removes on different nodes only tombstone entries they knew about +// at remove time; new entries created on a different node are never affected. +// +// Returns true if the local state changed. +func (s *Set[T]) Merge(other *Set[T]) bool { + other.mu.RLock() + defer other.mu.RUnlock() + + s.mu.Lock() + defer s.mu.Unlock() + + changed := false + for k, remote := range other.entries { + // Advance local clock to maintain causality. + s.clock.Update(remote.ID) + + local, exists := s.entries[k] + if !exists { + // New entry from remote — add it (preserving its deleted state). + s.entries[k] = remote + changed = true + continue + } + // Entry exists locally. Apply tombstone if remote marked it deleted and + // local hasn't yet. + if remote.Deleted && !local.Deleted { + local.Deleted = true + s.entries[k] = local + changed = true + } + } + return changed +} + +// NodeID returns the unique identifier for this Set instance. +func (s *Set[T]) NodeID() string { + return s.nodeID +} diff --git a/crdt/set_test.go b/crdt/set_test.go new file mode 100644 index 0000000..3793d50 --- /dev/null +++ b/crdt/set_test.go @@ -0,0 +1,173 @@ +package crdt + +import ( + "sort" + "testing" +) + +func TestSet_AddContains(t *testing.T) { + s := NewSet[string]("node-a") + s.Add("a") + s.Add("b") + s.Add("c") + + for _, elem := range []string{"a", "b", "c"} { + if !s.Contains(elem) { + t.Errorf("expected Contains(%q) == true", elem) + } + } + if s.Contains("d") { + t.Error("expected Contains(\"d\") == false") + } +} + +func TestSet_Remove(t *testing.T) { + s := NewSet[string]("node-a") + s.Add("x") + s.Remove("x") + if s.Contains("x") { + t.Error("expected Contains(\"x\") == false after Remove") + } +} + +func TestSet_ReAddAfterRemove(t *testing.T) { + s := NewSet[string]("node-a") + s.Add("x") + s.Remove("x") + s.Add("x") + if !s.Contains("x") { + t.Error("expected Contains(\"x\") == true after re-Add") + } +} + +func TestSet_Items(t *testing.T) { + s := NewSet[string]("node-a") + s.Add("alpha") + s.Add("beta") + s.Add("gamma") + if len(s.Items()) != 3 { + t.Errorf("expected 3 items, got %d", len(s.Items())) + } +} + +func TestSet_DuplicateAdd(t *testing.T) { + s := NewSet[string]("node-a") + s.Add("dup") + s.Add("dup") + + items := s.Items() + if len(items) != 1 { + t.Errorf("expected 1 distinct item after duplicate adds, got %d", len(items)) + } + if !s.Contains("dup") { + t.Error("expected Contains(\"dup\") == true") + } +} + +func TestSet_MergeConvergence(t *testing.T) { + nodeA := NewSet[string]("node-a") + nodeB := NewSet[string]("node-b") + + nodeA.Add("a") + nodeB.Add("b") + + nodeA.Merge(nodeB) + nodeB.Merge(nodeA) + + for _, s := range []*Set[string]{nodeA, nodeB} { + if !s.Contains("a") { + t.Errorf("node %s: expected Contains(\"a\")", s.NodeID()) + } + if !s.Contains("b") { + t.Errorf("node %s: expected Contains(\"b\")", s.NodeID()) + } + } +} + +func TestSet_AddWins(t *testing.T) { + // Step 1: build shared initial state on node-a, then sync to node-b. + nodeA := NewSet[string]("node-a") + nodeA.Add("x") + + nodeB := NewSet[string]("node-b") + nodeB.Merge(nodeA) // nodeB now has "x" too + + // Step 2: node-a removes "x" (tombstones the shared entry). + nodeA.Remove("x") + + // Step 3: node-b independently adds "x" again (new entry, different HLC tag). + nodeB.Add("x") + + // Step 4: bidirectional merge. + nodeA.Merge(nodeB) + nodeB.Merge(nodeA) + + // Step 5: add-wins — both nodes must still contain "x". + if !nodeA.Contains("x") { + t.Error("add-wins violated: node-a does not contain \"x\" after merge") + } + if !nodeB.Contains("x") { + t.Error("add-wins violated: node-b does not contain \"x\" after merge") + } +} + +func TestSet_MergeIdempotent(t *testing.T) { + nodeA := NewSet[string]("node-a") + nodeB := NewSet[string]("node-b") + + nodeA.Add("hello") + nodeB.Add("world") + + nodeA.Merge(nodeB) + nodeA.Merge(nodeB) // second merge should be a no-op + + items := nodeA.Items() + sort.Strings(items) + if len(items) != 2 || items[0] != "hello" || items[1] != "world" { + t.Errorf("idempotent merge produced unexpected items: %v", items) + } +} + +func TestSet_RemoveNonExistent(t *testing.T) { + s := NewSet[string]("node-a") + // Should not panic. + s.Remove("ghost") + if s.Contains("ghost") { + t.Error("expected Contains(\"ghost\") == false") + } +} + +func TestSet_Commutativity(t *testing.T) { + // Build two independent nodes with different elements. + nodeA1 := NewSet[string]("node-a") + nodeB1 := NewSet[string]("node-b") + nodeA1.Add("a") + nodeB1.Add("b") + + nodeA2 := NewSet[string]("node-a") + nodeB2 := NewSet[string]("node-b") + nodeA2.Add("a") + nodeB2.Add("b") + + // Order 1: A→B then B→A on copies 1. + nodeA1.Merge(nodeB1) + nodeB1.Merge(nodeA1) + + // Order 2: B→A then A→B on copies 2. + nodeB2.Merge(nodeA2) + nodeA2.Merge(nodeB2) + + items1 := nodeA1.Items() + items2 := nodeA2.Items() + sort.Strings(items1) + sort.Strings(items2) + + if len(items1) != len(items2) { + t.Fatalf("commutativity violated: len %d vs %d", len(items1), len(items2)) + } + for i := range items1 { + if items1[i] != items2[i] { + t.Errorf("commutativity violated at index %d: %q vs %q", i, items1[i], items2[i]) + } + } +} From 4bf6b6ee00c1d5e9e6d57518114a987ec7578052 Mon Sep 17 00:00:00 2001 From: Bruno Albuquerque Date: Wed, 25 Mar 2026 06:56:45 -0400 Subject: [PATCH 3/4] feat(crdt): add LWW-Map CRDT Map[K comparable, V any] is a distributed key-value map where concurrent writes to the same key are resolved by Last-Write-Wins (higher HLC timestamp wins). Deletes are tombstones, so delete/set ordering is determined purely by timestamp. Standalone implementation (same reasoning as Set): CRDT[T] LWW discards remote entries without local clock history, breaking convergence for disjoint key sets. Exercises two-parameter generics and validates the HLC clock update-before-compare pattern for correct causal ordering. Tests: set/get, delete, overwrite, Keys(), disjoint-key merge, LWW same-key conflict, delete-wins-over-older-set, set-wins-over-older-delete, idempotency, commutativity, string keys, int keys. --- crdt/map.go | 136 ++++++++++++++++++++++++++ crdt/map_test.go | 242 +++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 378 insertions(+) create mode 100644 crdt/map.go create mode 100644 crdt/map_test.go diff --git a/crdt/map.go b/crdt/map.go new file mode 100644 index 0000000..83a53e5 --- /dev/null +++ b/crdt/map.go @@ -0,0 +1,136 @@ +package crdt + +import ( + "sync" + + "github.com/brunoga/deep/v5/crdt/hlc" +) + +// mapEntry holds a value at one key, with LWW metadata. +type mapEntry[V any] struct { + Ts hlc.HLC + Value V + Deleted bool +} + +// Map is a distributed LWW key-value map CRDT. +// +// Concurrent writes to the same key are resolved by Last-Write-Wins: the entry +// with the strictly higher HLC timestamp is kept. Deletes are represented as +// tombstones so that a delete with a newer timestamp wins over an older set, +// and a set with a newer timestamp wins over an older delete. +type Map[K comparable, V any] struct { + mu sync.RWMutex + nodeID string + clock *hlc.Clock + entries map[K]mapEntry[V] +} + +// NewMap returns an empty Map CRDT bound to the given node ID. +func NewMap[K comparable, V any](nodeID string) *Map[K, V] { + return &Map[K, V]{ + nodeID: nodeID, + clock: hlc.NewClock(nodeID), + entries: make(map[K]mapEntry[V]), + } +} + +// NodeID returns the unique identifier for this Map instance. +func (m *Map[K, V]) NodeID() string { return m.nodeID } + +// Set sets key to value with the current HLC timestamp. +func (m *Map[K, V]) Set(key K, value V) { + m.mu.Lock() + defer m.mu.Unlock() + ts := m.clock.Now() + m.entries[key] = mapEntry[V]{Ts: ts, Value: value, Deleted: false} +} + +// Delete tombstones key with the current HLC timestamp. It is a no-op if the +// key does not exist. +func (m *Map[K, V]) Delete(key K) { + m.mu.Lock() + defer m.mu.Unlock() + existing, ok := m.entries[key] + if !ok { + return + } + ts := m.clock.Now() + existing.Ts = ts + existing.Deleted = true + m.entries[key] = existing +} + +// Get returns the value for key and true if the key exists and is not deleted. +// It returns the zero value and false otherwise. +func (m *Map[K, V]) Get(key K) (V, bool) { + m.mu.RLock() + defer m.mu.RUnlock() + e, ok := m.entries[key] + if !ok || e.Deleted { + var zero V + return zero, false + } + return e.Value, true +} + +// Contains reports whether key exists and is not deleted. +func (m *Map[K, V]) Contains(key K) bool { + _, ok := m.Get(key) + return ok +} + +// Keys returns a slice of all live (non-deleted) keys. The order is +// non-deterministic. +func (m *Map[K, V]) Keys() []K { + m.mu.RLock() + defer m.mu.RUnlock() + out := make([]K, 0) + for k, e := range m.entries { + if !e.Deleted { + out = append(out, k) + } + } + return out +} + +// Len returns the number of live (non-deleted) keys. +func (m *Map[K, V]) Len() int { + m.mu.RLock() + defer m.mu.RUnlock() + n := 0 + for _, e := range m.entries { + if !e.Deleted { + n++ + } + } + return n +} + +// Merge performs a full state-based LWW merge with another Map node. +// +// For each key in other: the local clock is advanced with the remote entry's +// timestamp first, then the entry with the strictly higher timestamp wins. If +// timestamps are equal the remote entry is not accepted (local wins on ties). +// +// Returns true if the local state changed. +func (m *Map[K, V]) Merge(other *Map[K, V]) bool { + other.mu.RLock() + defer other.mu.RUnlock() + + m.mu.Lock() + defer m.mu.Unlock() + + changed := false + for k, remote := range other.entries { + // Advance local clock to maintain causality. + m.clock.Update(remote.Ts) + + local, exists := m.entries[k] + if !exists || remote.Ts.After(local.Ts) { + m.entries[k] = remote + changed = true + } + } + return changed +} diff --git a/crdt/map_test.go b/crdt/map_test.go new file mode 100644 index 0000000..77c8c51 --- /dev/null +++ b/crdt/map_test.go @@ -0,0 +1,242 @@ +package crdt + +import ( + "testing" + "time" +) + +func TestMap_SetGet(t *testing.T) { + m := NewMap[string, int]("node-a") + m.Set("a", 1) + + v, ok := m.Get("a") + if !ok || v != 1 { + t.Errorf("expected (1, true), got (%d, %v)", v, ok) + } + + v2, ok2 := m.Get("b") + if ok2 || v2 != 0 { + t.Errorf("expected (0, false) for missing key, got (%d, %v)", v2, ok2) + } +} + +func TestMap_Delete(t *testing.T) { + m := NewMap[string, int]("node-a") + m.Set("a", 1) + m.Delete("a") + + if m.Contains("a") { + t.Error("expected Contains(\"a\") == false after Delete") + } +} + +func TestMap_DeleteNonExistent(t *testing.T) { + m := NewMap[string, int]("node-a") + // Should not panic. + m.Delete("missing") +} + +func TestMap_Overwrite(t *testing.T) { + m := NewMap[string, int]("node-a") + m.Set("a", 1) + m.Set("a", 2) + + v, ok := m.Get("a") + if !ok || v != 2 { + t.Errorf("expected (2, true) after overwrite, got (%d, %v)", v, ok) + } +} + +func TestMap_Keys(t *testing.T) { + m := NewMap[string, int]("node-a") + m.Set("x", 1) + m.Set("y", 2) + m.Set("z", 3) + m.Delete("y") + + keys := m.Keys() + if len(keys) != 2 { + t.Errorf("expected 2 live keys, got %d: %v", len(keys), keys) + } + for _, k := range keys { + if k == "y" { + t.Error("deleted key \"y\" should not appear in Keys()") + } + } +} + +func TestMap_MergeDisjointKeys(t *testing.T) { + a := NewMap[string, int]("node-a") + b := NewMap[string, int]("node-b") + + a.Set("x", 1) + b.Set("y", 2) + + a.Merge(b) + b.Merge(a) + + for _, node := range []*Map[string, int]{a, b} { + v, ok := node.Get("x") + if !ok || v != 1 { + t.Errorf("node %s: expected x=1, got (%d, %v)", node.NodeID(), v, ok) + } + v2, ok2 := node.Get("y") + if !ok2 || v2 != 2 { + t.Errorf("node %s: expected y=2, got (%d, %v)", node.NodeID(), v2, ok2) + } + } +} + +func TestMap_LWW_SameKey(t *testing.T) { + a := NewMap[string, string]("node-a") + b := NewMap[string, string]("node-b") + + a.Set("k", "a") + time.Sleep(2 * time.Millisecond) + b.Set("k", "b") + + a.Merge(b) + + v, ok := a.Get("k") + if !ok || v != "b" { + t.Errorf("LWW: expected \"b\" (newer) to win, got (%q, %v)", v, ok) + } +} + +func TestMap_DeleteWinsOverOlderSet(t *testing.T) { + a := NewMap[string, string]("node-a") + b := NewMap[string, string]("node-b") + + a.Set("k", "v") + time.Sleep(2 * time.Millisecond) + b.Set("k", "v") // b needs to know about the key to delete it with a newer ts + b.Delete("k") + + a.Merge(b) + + if a.Contains("k") { + t.Error("expected Contains(\"k\") == false after merging newer delete") + } +} + +func TestMap_SetWinsOverOlderDelete(t *testing.T) { + a := NewMap[string, string]("node-a") + b := NewMap[string, string]("node-b") + + // a deletes first, b re-sets with a newer timestamp + a.Set("k", "old") + a.Delete("k") + time.Sleep(2 * time.Millisecond) + b.Set("k", "new") + + a.Merge(b) + + v, ok := a.Get("k") + if !ok || v != "new" { + t.Errorf("expected newer Set to win over older Delete, got (%q, %v)", v, ok) + } +} + +func TestMap_MergeIdempotent(t *testing.T) { + a := NewMap[string, int]("node-a") + b := NewMap[string, int]("node-b") + + a.Set("x", 10) + b.Set("y", 20) + + a.Merge(b) + // Snapshot state after one merge. + keys1 := a.Len() + v1, _ := a.Get("x") + v2, _ := a.Get("y") + + // Merge again — should be idempotent. + a.Merge(b) + + if a.Len() != keys1 { + t.Errorf("idempotent: Len changed after second merge: %d vs %d", a.Len(), keys1) + } + gv1, _ := a.Get("x") + gv2, _ := a.Get("y") + if gv1 != v1 || gv2 != v2 { + t.Errorf("idempotent: values changed after second merge") + } +} + +func TestMap_Commutativity(t *testing.T) { + // Scenario 1: merge A→B then B→A + a1 := NewMap[string, int]("node-a") + b1 := NewMap[string, int]("node-b") + a1.Set("shared", 1) + time.Sleep(2 * time.Millisecond) + b1.Set("shared", 2) + a1.Set("only-a", 10) + b1.Set("only-b", 20) + + a1.Merge(b1) + b1.Merge(a1) + va1, _ := a1.Get("shared") + vb1, _ := b1.Get("shared") + + // Scenario 2: fresh nodes, merge B→A then A→B + a2 := NewMap[string, int]("node-a") + b2 := NewMap[string, int]("node-b") + a2.Set("shared", 1) + time.Sleep(2 * time.Millisecond) + b2.Set("shared", 2) + a2.Set("only-a", 10) + b2.Set("only-b", 20) + + b2.Merge(a2) + a2.Merge(b2) + va2, _ := a2.Get("shared") + vb2, _ := b2.Get("shared") + + if va1 != va2 { + t.Errorf("commutativity: a diverged: scenario1=%d scenario2=%d", va1, va2) + } + if vb1 != vb2 { + t.Errorf("commutativity: b diverged: scenario1=%d scenario2=%d", vb1, vb2) + } + if a1.Len() != a2.Len() { + t.Errorf("commutativity: Len diverged: scenario1=%d scenario2=%d", a1.Len(), a2.Len()) + } +} + +func TestMap_StringKeys(t *testing.T) { + a := NewMap[string, int]("node-a") + b := NewMap[string, int]("node-b") + + a.Set("hello", 42) + b.Set("world", 99) + + a.Merge(b) + + v1, ok1 := a.Get("hello") + v2, ok2 := a.Get("world") + if !ok1 || v1 != 42 { + t.Errorf("expected hello=42, got (%d, %v)", v1, ok1) + } + if !ok2 || v2 != 99 { + t.Errorf("expected world=99, got (%d, %v)", v2, ok2) + } +} + +func TestMap_IntKeys(t *testing.T) { + a := NewMap[int, string]("node-a") + b := NewMap[int, string]("node-b") + + a.Set(1, "one") + b.Set(2, "two") + + a.Merge(b) + + v1, ok1 := a.Get(1) + v2, ok2 := a.Get(2) + if !ok1 || v1 != "one" { + t.Errorf("expected 1=\"one\", got (%q, %v)", v1, ok1) + } + if !ok2 || v2 != "two" { + t.Errorf("expected 2=\"two\", got (%q, %v)", v2, ok2) + } +} From e7b3a313e08c0ecfa88da561f017a943abc8c175 Mon Sep 17 00:00:00 2001 From: Bruno Albuquerque Date: Wed, 25 Mar 2026 07:13:08 -0400 Subject: [PATCH 4/4] refactor(crdt): rewrite Set and Map as thin wrappers over CRDT[T] Both types previously maintained their own sync.RWMutex, clock, and merge logic. They now delegate entirely to the generic CRDT[T] engine, which provides per-path LWW clocks, tombstones, and full-state merge for free. Map[K,V] wraps CRDT[map[K]V]: each Set/Delete edits the map value, producing OpAdd/OpReplace/OpRemove ops at path "/"; the CRDT clock and tombstone maps handle LWW resolution and delete-wins / re-add-wins correctly. Set[T] wraps CRDT[setInner[T]] where setInner holds a map[string]setEntry[T] keyed by HLC.String(). Each Add allocates a unique tag via Clock().Now(), inserting a new entry at a distinct path "/Entries/"; concurrent Adds from other nodes land at different paths and are therefore unaffected by Remove, preserving the Add-Wins (OR-Set) property without any custom merge logic. --- crdt/map.go | 127 +++++++++++++--------------------------------------- crdt/set.go | 116 +++++++++++++++-------------------------------- 2 files changed, 66 insertions(+), 177 deletions(-) diff --git a/crdt/map.go b/crdt/map.go index 83a53e5..b791461 100644 --- a/crdt/map.go +++ b/crdt/map.go @@ -1,136 +1,71 @@ package crdt -import ( - "sync" - - "github.com/brunoga/deep/v5/crdt/hlc" -) - -// mapEntry holds a value at one key, with LWW metadata. -type mapEntry[V any] struct { - Ts hlc.HLC - Value V - Deleted bool -} - -// Map is a distributed LWW key-value map CRDT. +// Map is a distributed LWW key-value map CRDT built on top of [CRDT]. // -// Concurrent writes to the same key are resolved by Last-Write-Wins: the entry -// with the strictly higher HLC timestamp is kept. Deletes are represented as -// tombstones so that a delete with a newer timestamp wins over an older set, -// and a set with a newer timestamp wins over an older delete. +// Concurrent writes to the same key are resolved by Last-Write-Wins: the write +// with the strictly higher HLC timestamp wins. Deletions remove the key from +// the map and record a tombstone timestamp, so a delete with a newer timestamp +// wins over an older set, and a set with a newer timestamp wins over an older +// delete. type Map[K comparable, V any] struct { - mu sync.RWMutex - nodeID string - clock *hlc.Clock - entries map[K]mapEntry[V] + inner *CRDT[map[K]V] } // NewMap returns an empty Map CRDT bound to the given node ID. func NewMap[K comparable, V any](nodeID string) *Map[K, V] { return &Map[K, V]{ - nodeID: nodeID, - clock: hlc.NewClock(nodeID), - entries: make(map[K]mapEntry[V]), + inner: NewCRDT(make(map[K]V), nodeID), } } // NodeID returns the unique identifier for this Map instance. -func (m *Map[K, V]) NodeID() string { return m.nodeID } +func (m *Map[K, V]) NodeID() string { return m.inner.NodeID() } -// Set sets key to value with the current HLC timestamp. +// Set sets key to value. func (m *Map[K, V]) Set(key K, value V) { - m.mu.Lock() - defer m.mu.Unlock() - ts := m.clock.Now() - m.entries[key] = mapEntry[V]{Ts: ts, Value: value, Deleted: false} + m.inner.Edit(func(mp *map[K]V) { + (*mp)[key] = value + }) } -// Delete tombstones key with the current HLC timestamp. It is a no-op if the -// key does not exist. +// Delete removes key from the map. It is a no-op if the key does not exist. func (m *Map[K, V]) Delete(key K) { - m.mu.Lock() - defer m.mu.Unlock() - existing, ok := m.entries[key] - if !ok { - return - } - ts := m.clock.Now() - existing.Ts = ts - existing.Deleted = true - m.entries[key] = existing + m.inner.Edit(func(mp *map[K]V) { + delete(*mp, key) + }) } -// Get returns the value for key and true if the key exists and is not deleted. +// Get returns the value for key and true if the key exists. // It returns the zero value and false otherwise. func (m *Map[K, V]) Get(key K) (V, bool) { - m.mu.RLock() - defer m.mu.RUnlock() - e, ok := m.entries[key] - if !ok || e.Deleted { - var zero V - return zero, false - } - return e.Value, true + state := m.inner.View() + v, ok := state[key] + return v, ok } -// Contains reports whether key exists and is not deleted. +// Contains reports whether key exists in the map. func (m *Map[K, V]) Contains(key K) bool { _, ok := m.Get(key) return ok } -// Keys returns a slice of all live (non-deleted) keys. The order is -// non-deterministic. +// Keys returns a slice of all live keys. The order is non-deterministic. func (m *Map[K, V]) Keys() []K { - m.mu.RLock() - defer m.mu.RUnlock() - out := make([]K, 0) - for k, e := range m.entries { - if !e.Deleted { - out = append(out, k) - } + state := m.inner.View() + keys := make([]K, 0, len(state)) + for k := range state { + keys = append(keys, k) } - return out + return keys } -// Len returns the number of live (non-deleted) keys. +// Len returns the number of entries in the map. func (m *Map[K, V]) Len() int { - m.mu.RLock() - defer m.mu.RUnlock() - n := 0 - for _, e := range m.entries { - if !e.Deleted { - n++ - } - } - return n + return len(m.inner.View()) } // Merge performs a full state-based LWW merge with another Map node. -// -// For each key in other: the local clock is advanced with the remote entry's -// timestamp first, then the entry with the strictly higher timestamp wins. If -// timestamps are equal the remote entry is not accepted (local wins on ties). -// // Returns true if the local state changed. func (m *Map[K, V]) Merge(other *Map[K, V]) bool { - other.mu.RLock() - defer other.mu.RUnlock() - - m.mu.Lock() - defer m.mu.Unlock() - - changed := false - for k, remote := range other.entries { - // Advance local clock to maintain causality. - m.clock.Update(remote.Ts) - - local, exists := m.entries[k] - if !exists || remote.Ts.After(local.Ts) { - m.entries[k] = remote - changed = true - } - } - return changed + return m.inner.Merge(other.inner) } diff --git a/crdt/set.go b/crdt/set.go index ebed0ed..6e7204f 100644 --- a/crdt/set.go +++ b/crdt/set.go @@ -1,70 +1,67 @@ package crdt -import ( - "sync" - - "github.com/brunoga/deep/v5/crdt/hlc" -) - -// setEntry is one tagged add-operation in the OR-Set log. -// The ID field is the unique HLC-based tag assigned at Add time; it acts as -// the unique identity for each add operation, giving the Add-Wins property -// during merge. +// setEntry holds an element and its live/deleted status for one Add operation. +// The unique HLC-based tag for this add is stored as the map key in setInner, +// not as a field here — it survives serialisation via the map key string. type setEntry[T any] struct { - ID hlc.HLC `json:"id"` - Elem T `json:"e"` - Deleted bool `json:"d,omitempty"` + Elem T `json:"e"` + Deleted bool `json:"d,omitempty"` } -// Set is an Add-Wins Observed-Remove Set (OR-Set) CRDT. +// setInner is the state type managed by the underlying CRDT. +type setInner[T comparable] struct { + Entries map[string]setEntry[T] `json:"entries"` +} + +// Set is an Add-Wins Observed-Remove Set (OR-Set) CRDT built on top of [CRDT]. // // Each Add creates a uniquely-tagged entry using the node's HLC. Remove only // tombstones entries that exist at call time; a concurrent Add from another // node produces a different tag, so after Merge the element is still present // (add wins over remove). type Set[T comparable] struct { - mu sync.RWMutex - entries map[string]setEntry[T] // keyed by HLC.String() - nodeID string - clock *hlc.Clock + inner *CRDT[setInner[T]] } // NewSet returns an empty Set CRDT bound to the given node ID. func NewSet[T comparable](nodeID string) *Set[T] { return &Set[T]{ - entries: make(map[string]setEntry[T]), - nodeID: nodeID, - clock: hlc.NewClock(nodeID), + inner: NewCRDT(setInner[T]{ + Entries: make(map[string]setEntry[T]), + }, nodeID), } } +// NodeID returns the unique identifier for this Set instance. +func (s *Set[T]) NodeID() string { return s.inner.NodeID() } + // Add appends a new uniquely-tagged entry for elem. +// The tag is the current HLC timestamp serialised as a string map key. func (s *Set[T]) Add(elem T) { - s.mu.Lock() - defer s.mu.Unlock() - id := s.clock.Now() - s.entries[id.String()] = setEntry[T]{ID: id, Elem: elem} + id := s.inner.Clock().Now() + s.inner.Edit(func(si *setInner[T]) { + si.Entries[id.String()] = setEntry[T]{Elem: elem} + }) } // Remove marks all non-deleted entries whose Elem equals elem as deleted. // Only entries visible at call time are tombstoned; concurrent adds on other // nodes create entries with different tags that this Remove never sees. func (s *Set[T]) Remove(elem T) { - s.mu.Lock() - defer s.mu.Unlock() - for k, e := range s.entries { - if !e.Deleted && e.Elem == elem { - e.Deleted = true - s.entries[k] = e + s.inner.Edit(func(si *setInner[T]) { + for k, e := range si.Entries { + if !e.Deleted && e.Elem == elem { + e.Deleted = true + si.Entries[k] = e + } } - } + }) } // Contains reports whether elem has at least one live (non-deleted) entry. func (s *Set[T]) Contains(elem T) bool { - s.mu.RLock() - defer s.mu.RUnlock() - for _, e := range s.entries { + state := s.inner.View() + for _, e := range state.Entries { if !e.Deleted && e.Elem == elem { return true } @@ -74,10 +71,9 @@ func (s *Set[T]) Contains(elem T) bool { // Items returns a deduplicated slice of all live elements. func (s *Set[T]) Items() []T { - s.mu.RLock() - defer s.mu.RUnlock() + state := s.inner.View() seen := make(map[T]struct{}) - for _, e := range s.entries { + for _, e := range state.Entries { if !e.Deleted { seen[e.Elem] = struct{}{} } @@ -95,49 +91,7 @@ func (s *Set[T]) Len() int { } // Merge performs a full state-based OR-Set merge with another Set node. -// -// For each entry in other: -// - If the entry is absent locally, add it (union semantics). -// - If the entry is present locally and remote has Deleted=true, mark local -// as deleted too (tombstone propagation). -// - If local already has the entry as deleted, it stays deleted. -// -// A remote live entry always wins over a local absence (add-wins property): -// concurrent removes on different nodes only tombstone entries they knew about -// at remove time; new entries created on a different node are never affected. -// // Returns true if the local state changed. func (s *Set[T]) Merge(other *Set[T]) bool { - other.mu.RLock() - defer other.mu.RUnlock() - - s.mu.Lock() - defer s.mu.Unlock() - - changed := false - for k, remote := range other.entries { - // Advance local clock to maintain causality. - s.clock.Update(remote.ID) - - local, exists := s.entries[k] - if !exists { - // New entry from remote — add it (preserving its deleted state). - s.entries[k] = remote - changed = true - continue - } - // Entry exists locally. Apply tombstone if remote marked it deleted and - // local hasn't yet. - if remote.Deleted && !local.Deleted { - local.Deleted = true - s.entries[k] = local - changed = true - } - } - return changed -} - -// NodeID returns the unique identifier for this Set instance. -func (s *Set[T]) NodeID() string { - return s.nodeID + return s.inner.Merge(other.inner) }