Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
85 changes: 63 additions & 22 deletions patch_ops.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,9 +150,11 @@ func (p *valuePatch) applyChecked(root, v reflect.Value, strict bool, path strin

func (p *valuePatch) applyResolved(root, v reflect.Value, path string, resolver ConflictResolver) error {
if resolver != nil {
if !resolver.Resolve(path, OpReplace, nil, nil, v) {
resolved, ok := resolver.Resolve(path, OpReplace, nil, nil, v, p.newVal)
if !ok {
return nil // Skipped by resolver
}
p.newVal = resolved
}
p.apply(root, v, path)
return nil
Expand Down Expand Up @@ -249,6 +251,13 @@ func (p *testPatch) applyChecked(root, v reflect.Value, strict bool, path string
}

func (p *testPatch) applyResolved(root, v reflect.Value, path string, resolver ConflictResolver) error {
if resolver != nil {
resolved, ok := resolver.Resolve(path, OpTest, nil, nil, v, p.expected)
if !ok {
return nil
}
p.expected = resolved
}
return p.applyChecked(root, v, true, path)
}

Expand Down Expand Up @@ -316,7 +325,8 @@ func (p *copyPatch) applyChecked(root, v reflect.Value, strict bool, path string

func (p *copyPatch) applyResolved(root, v reflect.Value, path string, resolver ConflictResolver) error {
if resolver != nil {
if !resolver.Resolve(path, OpCopy, nil, nil, v) {
_, ok := resolver.Resolve(path, OpCopy, nil, nil, v, reflect.Value{})
if !ok {
return nil
}
}
Expand Down Expand Up @@ -417,7 +427,8 @@ func (p *movePatch) applyChecked(root, v reflect.Value, strict bool, path string

func (p *movePatch) applyResolved(root, v reflect.Value, path string, resolver ConflictResolver) error {
if resolver != nil {
if !resolver.Resolve(path, OpMove, nil, nil, v) {
_, ok := resolver.Resolve(path, OpMove, nil, nil, v, reflect.Value{})
if !ok {
return nil
}
}
Expand Down Expand Up @@ -477,6 +488,12 @@ func (p *logPatch) applyChecked(root, v reflect.Value, strict bool, path string)
}

func (p *logPatch) applyResolved(root, v reflect.Value, path string, resolver ConflictResolver) error {
if resolver != nil {
_, ok := resolver.Resolve(path, OpLog, nil, nil, v, reflect.ValueOf(p.message))
if !ok {
return nil
}
}
return p.applyChecked(root, v, false, path)
}

Expand Down Expand Up @@ -1143,15 +1160,19 @@ func (p *mapPatch) applyResolved(root, v reflect.Value, path string, resolver Co
}

// Removals
for k, _ := range p.removed {
for k, val := range p.removed {
subPath := core.JoinPath(path, fmt.Sprintf("%v", k))
keyVal := p.getOriginalKey(k, v.Type().Key(), v)
current := v.MapIndex(keyVal)

if resolver != nil {
if !resolver.Resolve(subPath, OpRemove, k, nil, reflect.Value{}) {
_, ok := resolver.Resolve(subPath, OpRemove, k, nil, current, reflect.Value{})
if !ok {
continue
}
}
v.SetMapIndex(p.getOriginalKey(k, v.Type().Key(), v), reflect.Value{})
v.SetMapIndex(keyVal, reflect.Value{})
_ = val
}

// Modifications
Expand All @@ -1177,9 +1198,11 @@ func (p *mapPatch) applyResolved(root, v reflect.Value, path string, resolver Co
subPath := core.JoinPath(path, fmt.Sprintf("%v", k))

if resolver != nil {
if !resolver.Resolve(subPath, OpAdd, k, nil, val) {
resolved, ok := resolver.Resolve(subPath, OpAdd, k, nil, reflect.Value{}, val)
if !ok {
continue
}
val = resolved
}
v.SetMapIndex(p.getOriginalKey(k, v.Type().Key(), v), core.ConvertValue(val, v.Type().Elem()))
}
Expand Down Expand Up @@ -1315,9 +1338,9 @@ type sliceOp struct {
// It is used to implement CRDTs, 3-way merges, and other conflict resolution strategies.
type ConflictResolver interface {
// Resolve allows the resolver to intervene before an operation is applied.
// It returns true if the operation should be applied, false to skip it.
// The resolver can also modify the operation or target value directly.
Resolve(path string, op OpKind, key, prevKey any, val reflect.Value) bool
// It returns the value to be applied and true if the operation should proceed,
// or the zero reflect.Value and false to skip it.
Resolve(path string, op OpKind, key, prevKey any, current, proposed reflect.Value) (reflect.Value, bool)
}

// slicePatch handles complex edits (insertions, deletions, modifications) in a slice.
Expand Down Expand Up @@ -1470,14 +1493,20 @@ func (p *slicePatch) applyResolved(root, v reflect.Value, path string, resolver

switch op.Kind {
case OpAdd:
if resolver.Resolve(subPath, OpAdd, nil, nil, op.Val) {
newSlice = reflect.Append(newSlice, core.ConvertValue(op.Val, v.Type().Elem()))
resolved, ok := resolver.Resolve(subPath, OpAdd, nil, nil, reflect.Value{}, op.Val)
if ok {
newSlice = reflect.Append(newSlice, core.ConvertValue(resolved, v.Type().Elem()))
}
case OpRemove:
var current reflect.Value
if curIdx < v.Len() {
if resolver.Resolve(subPath, OpRemove, nil, nil, reflect.Value{}) {
curIdx++
} else {
current = v.Index(curIdx)
}
_, ok := resolver.Resolve(subPath, OpRemove, nil, nil, current, reflect.Value{})
if ok {
curIdx++
} else {
if curIdx < v.Len() {
newSlice = reflect.Append(newSlice, v.Index(curIdx))
curIdx++
}
Expand Down Expand Up @@ -1522,7 +1551,12 @@ func (p *slicePatch) applyResolved(root, v reflect.Value, path string, resolver

switch op.Kind {
case OpRemove:
if resolver.Resolve(subPath, OpRemove, op.Key, nil, reflect.Value{}) {
var current reflect.Value
if info, ok := existingMap[op.Key]; ok {
current = info.val
}
_, ok := resolver.Resolve(subPath, OpRemove, op.Key, nil, current, reflect.Value{})
if ok {
delete(existingMap, op.Key)
}
case OpReplace:
Expand All @@ -1539,7 +1573,8 @@ func (p *slicePatch) applyResolved(root, v reflect.Value, path string, resolver
for _, op := range p.ops {
if op.Kind == OpAdd {
subPath := core.JoinPath(path, fmt.Sprintf("%v", op.Key))
if resolver.Resolve(subPath, OpAdd, op.Key, op.PrevKey, op.Val) {
resolved, ok := resolver.Resolve(subPath, OpAdd, op.Key, op.PrevKey, reflect.Value{}, op.Val)
if ok {
insertIdx := 0
foundPrev := false
if op.PrevKey != nil {
Expand Down Expand Up @@ -1573,7 +1608,7 @@ func (p *slicePatch) applyResolved(root, v reflect.Value, path string, resolver
copy(orderedKeys[insertIdx+1:], orderedKeys[insertIdx:])
orderedKeys[insertIdx] = op.Key
}
existingMap[op.Key] = &elemInfo{val: core.ConvertValue(op.Val, v.Type().Elem())}
existingMap[op.Key] = &elemInfo{val: core.ConvertValue(resolved, v.Type().Elem())}

}
}
Expand Down Expand Up @@ -1964,6 +1999,16 @@ func (p *customDiffPatch) format(indent int) string {
return fmt.Sprintf("CustomPatch(%v)", p.patch)
}

func (p *customDiffPatch) dependencies(path string) (reads []string, writes []string) {
m := reflect.ValueOf(p.patch).MethodByName("Dependencies")
if m.IsValid() {
res := m.Call([]reflect.Value{reflect.ValueOf(path)})
// Expects ([]string, []string)
return res[0].Interface().([]string), res[1].Interface().([]string)
}
return nil, nil
}

func (p *customDiffPatch) walk(path string, fn func(path string, op OpKind, old, new any) error) error {
m := reflect.ValueOf(p.patch).MethodByName("Walk")
if m.IsValid() {
Expand Down Expand Up @@ -1998,7 +2043,3 @@ func (p *customDiffPatch) toJSONPatch(path string) []map[string]any {
func (p *customDiffPatch) summary(path string) string {
return "CustomPatch"
}

func (p *customDiffPatch) dependencies(path string) (reads []string, writes []string) {
return nil, nil
}
14 changes: 7 additions & 7 deletions patch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,8 @@ func TestPatch_ApplyResolved(t *testing.T) {
target := Config{Value: 10}

// Resolver that rejects everything
err := patch.ApplyResolved(&target, ConflictResolverFunc(func(path string, op OpKind, old, new any, v reflect.Value) bool {
return false
err := patch.ApplyResolved(&target, ConflictResolverFunc(func(path string, op OpKind, key, prevKey any, current, proposed reflect.Value) (reflect.Value, bool) {
return reflect.Value{}, false
}))
if err != nil {
t.Fatalf("ApplyResolved failed: %v", err)
Expand All @@ -88,8 +88,8 @@ func TestPatch_ApplyResolved(t *testing.T) {
}

// Resolver that accepts everything
err = patch.ApplyResolved(&target, ConflictResolverFunc(func(path string, op OpKind, old, new any, v reflect.Value) bool {
return true
err = patch.ApplyResolved(&target, ConflictResolverFunc(func(path string, op OpKind, key, prevKey any, current, proposed reflect.Value) (reflect.Value, bool) {
return proposed, true
}))
if err != nil {
t.Fatalf("ApplyResolved failed: %v", err)
Expand All @@ -100,10 +100,10 @@ func TestPatch_ApplyResolved(t *testing.T) {
}
}

type ConflictResolverFunc func(path string, op OpKind, old, new any, v reflect.Value) bool
type ConflictResolverFunc func(path string, op OpKind, key, prevKey any, current, proposed reflect.Value) (reflect.Value, bool)

func (f ConflictResolverFunc) Resolve(path string, op OpKind, old, new any, v reflect.Value) bool {
return f(path, op, old, new, v)
func (f ConflictResolverFunc) Resolve(path string, op OpKind, key, prevKey any, current, proposed reflect.Value) (reflect.Value, bool) {
return f(path, op, key, prevKey, current, proposed)
}

func TestPatch_ConditionsExhaustive(t *testing.T) {
Expand Down
19 changes: 9 additions & 10 deletions resolvers/crdt/lww.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@ import (
"github.com/brunoga/deep/v3/crdt/hlc"
)

// ... (skipping LWWResolver.Resolve change since I'll do it separately or just replace the whole file if easier)

// LWWResolver implements deep.ConflictResolver using Last-Write-Wins logic
// for a single operation or delta with a fixed timestamp.
type LWWResolver struct {
Expand All @@ -17,18 +15,16 @@ type LWWResolver struct {
OpTime hlc.HLC
}

func (r *LWWResolver) Resolve(path string, op deep.OpKind, key, prevKey any, val reflect.Value) bool {
func (r *LWWResolver) Resolve(path string, op deep.OpKind, key, prevKey any, current, proposed reflect.Value) (reflect.Value, bool) {
lClock := r.Clocks[path]
lTomb, hasLT := r.Tombstones[path]
lTime := lClock
if hasLT && lTomb.After(lTime) {
lTime = lTomb
}

// fmt.Printf("LWW Resolve %s: opTime=%v, lTime=%v\n", path, r.OpTime, lTime)

if !r.OpTime.After(lTime) {
return false
return reflect.Value{}, false
}

// Accepted. Update clocks for this path.
Expand All @@ -38,7 +34,7 @@ func (r *LWWResolver) Resolve(path string, op deep.OpKind, key, prevKey any, val
r.Clocks[path] = r.OpTime
}

return true
return proposed, true
}

// StateResolver implements deep.ConflictResolver for merging two full CRDT states.
Expand All @@ -50,7 +46,7 @@ type StateResolver struct {
RemoteTombstones map[string]hlc.HLC
}

func (r *StateResolver) Resolve(path string, op deep.OpKind, key, prevKey any, val reflect.Value) bool {
func (r *StateResolver) Resolve(path string, op deep.OpKind, key, prevKey any, current, proposed reflect.Value) (reflect.Value, bool) {
// Local Time
lClock := r.LocalClocks[path]
lTomb, hasLT := r.LocalTombstones[path]
Expand All @@ -63,12 +59,15 @@ func (r *StateResolver) Resolve(path string, op deep.OpKind, key, prevKey any, v
rClock, hasR := r.RemoteClocks[path]
rTomb, hasRT := r.RemoteTombstones[path]
if !hasR && !hasRT {
return false
return reflect.Value{}, false
}
rTime := rClock
if hasRT && rTomb.After(rTime) {
rTime = rTomb
}

return rTime.After(lTime)
if rTime.After(lTime) {
return proposed, true
}
return reflect.Value{}, false
}
22 changes: 18 additions & 4 deletions resolvers/crdt/lww_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,24 @@ func TestLWWResolver(t *testing.T) {
OpTime: hlc.HLC{WallTime: 101, Logical: 0, NodeID: "B"},
}

proposed := reflect.ValueOf("new")

// Newer op should be accepted
if !resolver.Resolve("f1", deep.OpReplace, nil, nil, reflect.Value{}) {
resolved, ok := resolver.Resolve("f1", deep.OpReplace, nil, nil, reflect.Value{}, proposed)
if !ok {
t.Error("Should accept newer operation")
}
if resolved.Interface() != "new" {
t.Error("Resolved value mismatch")
}
if clocks["f1"].WallTime != 101 {
t.Error("Clock should have been updated")
}

// Older op should be rejected
resolver.OpTime = hlc.HLC{WallTime: 99, Logical: 0, NodeID: "C"}
if resolver.Resolve("f1", deep.OpReplace, nil, nil, reflect.Value{}) {
_, ok = resolver.Resolve("f1", deep.OpReplace, nil, nil, reflect.Value{}, proposed)
if ok {
t.Error("Should reject older operation")
}
}
Expand All @@ -44,12 +51,19 @@ func TestStateResolver(t *testing.T) {
RemoteClocks: remoteClocks,
}

if !resolver.Resolve("f1", deep.OpReplace, nil, nil, reflect.Value{}) {
proposed := reflect.ValueOf("remote")

resolved, ok := resolver.Resolve("f1", deep.OpReplace, nil, nil, reflect.Value{}, proposed)
if !ok {
t.Error("Remote should win (newer)")
}
if resolved.Interface() != "remote" {
t.Error("Resolved value mismatch")
}

resolver.RemoteClocks["f1"] = hlc.HLC{WallTime: 99, Logical: 0, NodeID: "B"}
if resolver.Resolve("f1", deep.OpReplace, nil, nil, reflect.Value{}) {
_, ok = resolver.Resolve("f1", deep.OpReplace, nil, nil, reflect.Value{}, proposed)
if ok {
t.Error("Local should win (remote is older)")
}
}