Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 7 additions & 8 deletions topic.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,9 @@ func (t *Topic[T]) Publish(value ...T) uint64 {
// If there is no new data, Receive() blocks until there is new data or the
// given channel is done. The same happens with id 0, when there is no data at
// all in the topic.
//
// For performance reasons, this function returns the internal slice of the
// topic. It is not allowed to manipulate the values.
func (t *Topic[T]) Receive(ctx context.Context, id uint64) (uint64, []T, error) {
t.mu.RLock()

Expand Down Expand Up @@ -130,17 +133,13 @@ func (t *Topic[T]) Prune(until time.Time) {
}

if n >= len(t.data) {
t.data = t.data[:0]
t.insertTime = t.insertTime[:0]
t.data = nil
t.insertTime = nil
t.offset += uint64(n)
return
}

copy(t.data, t.data[n:])
copy(t.insertTime, t.insertTime[n:])

t.data = t.data[:len(t.data)-n]
t.insertTime = t.insertTime[:len(t.insertTime)-n]

t.data = slices.Clone(t.data[n:])
t.insertTime = slices.Clone(t.insertTime[n:])
t.offset += uint64(n)
}
26 changes: 22 additions & 4 deletions topic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,10 +116,10 @@ func TestPrune(t *testing.T) {

top.Prune(pruneTime)

_, got, err := top.Receive(t.Context(), 0)
if err != nil {
t.Fatalf("Receive(): %v", err)
}
ctxCanceled, cancel := context.WithCancel(t.Context())
cancel()

_, got, _ := top.Receive(ctxCanceled, 0)

if !cmpSlice(got, tt.expect) {
t.Errorf("Got %v, want %v", got, tt.expect)
Expand All @@ -144,6 +144,24 @@ func TestPruneEmptyTopic(t *testing.T) {
}
}

func TestPruneUsedValue(t *testing.T) {
top := topic.New[string]()
top.Publish("val1")
top.Publish("val2")
top.Publish("val3")
ti := time.Now()
_, data, err := top.Receive(t.Context(), 0)
if err != nil {
t.Fatalf("Receive(): %v", err)
}
top.Publish("val4")
top.Prune(ti)

if data[0] != "val1" {
t.Errorf("Received value changed to %s, expected `val1`", data[0])
}
}

func TestErrUnknownID(t *testing.T) {
top := topic.New[string]()
top.Publish("v1")
Expand Down
Loading