From 2165efb19d7654f462a50f1817283c87cc2aef8c Mon Sep 17 00:00:00 2001 From: Oskar Hahn Date: Sat, 18 Oct 2025 07:34:25 +0200 Subject: [PATCH 1/3] Fix spelling mistakes --- README.md | 19 ++++---- doc.go | 136 ++++++++++++++++++++++++++---------------------------- topic.go | 28 +++++------ 3 files changed, 88 insertions(+), 95 deletions(-) diff --git a/README.md b/README.md index 4415400..cfecb68 100644 --- a/README.md +++ b/README.md @@ -3,8 +3,8 @@ [![Actions Status](https://github.com/ostcar/topic/workflows/Topic/badge.svg)](https://github.com/ostcar/topic/actions) [![PkgGoDev](https://pkg.go.dev/badge/github.com/ostcar/topic)](https://pkg.go.dev/github.com/ostcar/topic) -Package topic is an in process pubsub system where new values have to be pulled -instead of beeing pushed. + Package topic is an in-process pubsub system where new +values can be pulled instead of being pushed. The idea of pulling updates is inspired by [Kafka](https://kafka.apache.org/) or [Redis-Streams](https://redis.io/topics/streams-intro). A subscriber does not @@ -19,25 +19,24 @@ A topic can be created with: `top := topic.New[string]()`. To publish one or more values, use: `top.Publish("info1", "info2")`. To receive values for the first time use: `id, values, err := top.Receive(ctx, -0)`. The first value is a numeric id, it is needed for for next call of -`top.Receive()`. The second argument is a list of all values that where +0)`. The first value is a numeric id, it is needed for the next call of +`top.Receive()`. The second argument is a list of all values that were published by this topic. To receive newer values, use `id, values, err = top.Receive(ctx, id)`. It -returns all values that published after the given `id`. - -A topic is save for concurrent use. +returns all values that were published after the given `id`. +A topic is safe for concurrent use. ## Run tests -Contibutions are wellcome. Please make sure that the tests are running with: +Contributions are welcome. Please make sure that the tests are running with: ```go test``` - ## Who is using topic -Topic was build for [OpenSlides](https://openslides.com) and is used in +Topic was built for [OpenSlides](https://openslides.com) and is used in production for the [Autoupdate-Service](https://github.com/openslides/openslides-autoupdate-service) +for many years without any problems. diff --git a/doc.go b/doc.go index 78dfeb2..585e1be 100644 --- a/doc.go +++ b/doc.go @@ -1,146 +1,140 @@ /* -Package topic is a inmemory pubsub system where new values are pulled instead of -beeing pushed. +Package topic is an in-memory pubsub system where new values are pulled instead of +being pushed. The idea of pulling updates is inspired by Kafka or Redis-Streams. A subscriber does not have to register or unsubscribe to a topic and can take as much time as it needs to process the messages. Therefore, the system is less error prone. In common pubsub systems, the publisher pushes values to the receivers. The -problem with this pattern is, that the publisher could send messages faster, -then a slow receivers can process them. A buffer can help to delay the problem, +problem with this pattern is, that the publisher could send messages faster +than a slow receiver can process them. A buffer can help to delay the problem, but eventually the buffer could be full. If this happens, there are two options. Either the publisher has to wait on the slowest receiver or a slow receiver has -to drop messages. In the first case, the system is only as fast, as the slowest -receiver. On the second case, it is not guaranteed, that a receiver gets all +to drop messages. In the first case, the system is only as fast as the slowest +receiver. In the second case, it is not guaranteed that a receiver gets all messages. A third pattern is, that the publisher does not push the values, but the -receivers has to pull them. The publisher can save values without waiting on +receivers have to pull them. The publisher can save values without waiting on slow receivers. A receiver has all the time it needs to process messages and can -pull again as soon as the work is done. This packet implements the third +pull again as soon as the work is done. This package implements the third pattern. Another benefit of this pattern is, that a receiver does not have to register on the pubsub system. Since the publisher does not send the messages, it does not -have to know how many receivers there are. Therefore there a no register or +have to know how many receivers there are. Therefore there are no register or unregister methods in this package. - -Create new topic +# Create new topic To create a new topic use the topic.New() constructor: - top := topic.New[string]() - + top := topic.New[string]() -Publish messages +# Publish messages Messages can be published with the Publish()-method: - top.Publish("some value") + top.Publish("some value") -More then one message can be published at once: +More than one message can be published at once: - top.Publish("some value", "other value") + top.Publish("some value", "other value") Internally, the topic creates a new id that can be used to receive newer values. The Publish()-method returns this id. In most cases, the returned id can be ignored. - -Receive messages +# Receive messages Messages can be received with the Receive()-method: - id, values, err := top.Receive(context.Background(), 0) + id, values, err := top.Receive(context.Background(), 0) -The first returned value is the id creates by the last Publish()-call. The -second value is a slice of all all message that where published before. Each +The first returned value is the id created by the last Publish()-call. The +second value is a slice of all messages that were published before. Each value in the returned slice is unique. To receive newer values, Receive() can be called again with the id from the last call: - id, values, err := top.Receive(context.Background(), 0) - ... - id, values, err = top.Receive(context.Background(), id) + id, values, err := top.Receive(context.Background(), 0) + ... + id, values, err = top.Receive(context.Background(), id) When the given id is zero, then all messages are returned. If the id is greater -then zero, then only messages are returned, that where published by the topic +than zero, then only messages are returned that were published by the topic after the id was created. When there are no new values in the topic, then the Receive()-call blocks until there are new values. To add a timeout to the call, the context can be used: - ctx, cencel := context.WithTimeout(context.Background(), 10*time.Second) - defer cencel() - id, values, err = top.Receive(ctx, id) + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + id, values, err = top.Receive(ctx, id) If there are no new values before the context is canceled, the topic returns with the error `context.DeadlineExceeded`. - -The usual pattern to subscibe to a topic is: - - ctx, cancel := context.WithTimeout(context.Background(), time.Minute) - defer cancel() - - var id uint64 - var values []string - var err error - for { - id, values, err = top.Receive(ctx, id) - if err != nil { - if errors.Is(err, context.DeadlineExceeded) || errors.Is(err, context.Canceled) { - // Timeout - break - } - // Handle other errors - } - // Process values - } +The usual pattern to subscribe to a topic is: + + ctx, cancel := context.WithTimeout(context.Background(), time.Minute) + defer cancel() + + var id uint64 + var values []string + var err error + for { + id, values, err = top.Receive(ctx, id) + if err != nil { + if errors.Is(err, context.DeadlineExceeded) || errors.Is(err, context.Canceled) { + // Timeout + break + } + // Handle other errors + } + // Process values + } The loop will process all values published by the topic for one minute. - -Get Last ID +# Get Last ID The example above will process all messages in the topic. If only messages -should be processed, that where published after the loop starts, the method +should be processed that were published after the loop starts, the method LastID() can be used: - id := top.LastID() - id, values, err = top.Receive(context.Background(), id) + id := top.LastID() + id, values, err = top.Receive(context.Background(), id) The return value of LastID() is the highest id in the topic. So a Receive() call -on top.LastID() will only return data, that was published after the call. +on top.LastID() will only return data that was published after the call. A pattern to receive only new data is: - id := top.LastID() - var values []string - var err error - for { - id, values, err = top.Receive(context.Background(), id) - if err != nil { - // Handle error - } - // Process values - } - + id := top.LastID() + var values []string + var err error + for { + id, values, err = top.Receive(context.Background(), id) + if err != nil { + // Handle error + } + // Process values + } -Prune old values +# Prune old values -For this pattern to work, the topic has to save all values that where ever +For this pattern to work, the topic has to save all values that were ever published. To free some memory, old values can be deleted from time to time. This can be accomplished with the Prune() method: - top.Prune(10*time.Minute) + top.Prune(10*time.Minute) -This call will remove all values in the topic that are older then ten minutes. +This call will remove all values in the topic that are older than ten minutes. -Make sure, that all receivers have read the values before they are pruned. +Make sure that all receivers have read the values before they are pruned. If a Receive()-call tries to receive pruned values, it will return with the error `topic.ErrUnknownID`. diff --git a/topic.go b/topic.go index 35aba3f..73cc473 100644 --- a/topic.go +++ b/topic.go @@ -6,15 +6,15 @@ import ( "time" ) -// Topic is a datastructure that holds a set values. Values can be published to +// Topic is a datastructure that holds a set of values. Values can be published to // a topic. Each time a list of values is published, a new id is created. It is -// possible to receive all values at once or the values that published after a -// specivic id. +// possible to receive all values at once or the values that were published after a +// specific id. // // A Topic has to be created with the topic.New() function. For example // topic.New[string](). // -// A Topic is save for concourent use. +// A Topic is safe for concurrent use. // // The type of value is restricted to be a comparable. This is required, so the // topic.Receive function can return a list of unique values. This restriction @@ -24,7 +24,7 @@ type Topic[T comparable] struct { mu sync.RWMutex // The topic is implemented by a linked list and an index from each id to - // the node. Therefore nodes get be added, retrieved and deleted from the + // the node. Therefore nodes can be added, retrieved and deleted from the // top in constant time. head *node[T] tail *node[T] @@ -71,7 +71,7 @@ func (t *Topic[T]) Publish(value ...T) uint64 { t.index[newNode.id] = newNode // Closes the signal channel to signal all Receive()-calls. To overwrite the - // value afterwars is not a race condition. Since the go-implementation of a + // value afterwards is not a race condition. Since the go-implementation of a // channel is a pointer-type, a new object is created, while the // Receive()-calls keep listening on the old object. close(t.signal) @@ -81,13 +81,13 @@ func (t *Topic[T]) Publish(value ...T) uint64 { } // Receive returns a slice of unique values from the topic. If id is 0, all -// values are returned, else, all values that where inserted after the id are +// values are returned, else, all values that were inserted after the id are // returned. // -// If the id is lower then the lowest id in the topic, an error of type -// ErrUnknownTopicID is returned. +// If the id is lower than the lowest id in the topic, an error of type +// UnknownIDError is returned. // -// If there is no new data, Receive() blocks until threre is new data or the +// If there is no new data, Receive() blocks until there is new data or the // given channel is done. The same happens with id 0, when there is no data at // all in the topic. // @@ -137,10 +137,10 @@ func (t *Topic[T]) LastID() uint64 { return t.tail.id } -// Prune removes entries from the topic that are older then the given time. +// Prune removes entries from the topic that are older than the given time. // // Prune has a complexity of O(n) where n is the count of all nodes that are -// older then the given time. +// older than the given time. func (t *Topic[T]) Prune(until time.Time) { t.mu.Lock() defer t.mu.Unlock() @@ -149,8 +149,8 @@ func (t *Topic[T]) Prune(until time.Time) { return } - // Delete all nodes from the index, that are older then the given time. - // After the loop, n is the oldes index, that is still in the index. + // Delete all nodes from the index, that are older than the given time. + // After the loop, n is the oldest index, that is still in the index. n := t.head for ; n.t.Before(until) && n.next != nil; n = n.next { delete(t.index, n.id) From 5e325e204b867ac188d5eb91103aa90dfb0eac2c Mon Sep 17 00:00:00 2001 From: Oskar Hahn Date: Sat, 18 Oct 2025 07:41:44 +0200 Subject: [PATCH 2/3] Use struct{} instead of bool for set --- topic.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/topic.go b/topic.go index 73cc473..479b129 100644 --- a/topic.go +++ b/topic.go @@ -170,12 +170,12 @@ type node[T comparable] struct { // unique. If there are no values, an empty slice (not nil) is returned. func runNode[T comparable](n *node[T]) []T { var values []T - seen := make(map[T]bool) + seen := make(map[T]struct{}) for ; n != nil; n = n.next { for _, v := range n.value { - if !seen[v] { + if _, ok := seen[v]; !ok { values = append(values, v) - seen[v] = true + seen[v] = struct{}{} } } } From 57010482b4fabbbd304c84f56c648927922d5497 Mon Sep 17 00:00:00 2001 From: Oskar Hahn Date: Sun, 19 Oct 2025 08:16:57 +0200 Subject: [PATCH 3/3] Create new slice on prune to prevent data race on received values --- topic.go | 15 +++++++-------- topic_test.go | 26 ++++++++++++++++++++++---- 2 files changed, 29 insertions(+), 12 deletions(-) diff --git a/topic.go b/topic.go index 7b1b61e..2361343 100644 --- a/topic.go +++ b/topic.go @@ -73,6 +73,9 @@ func (t *Topic[T]) Publish(value ...T) uint64 { // If there is no new data, Receive() blocks until there is new data or the // given channel is done. The same happens with id 0, when there is no data at // all in the topic. +// +// For performance reasons, this function returns the internal slice of the +// topic. It is not allowed to manipulate the values. func (t *Topic[T]) Receive(ctx context.Context, id uint64) (uint64, []T, error) { t.mu.RLock() @@ -130,17 +133,13 @@ func (t *Topic[T]) Prune(until time.Time) { } if n >= len(t.data) { - t.data = t.data[:0] - t.insertTime = t.insertTime[:0] + t.data = nil + t.insertTime = nil t.offset += uint64(n) return } - copy(t.data, t.data[n:]) - copy(t.insertTime, t.insertTime[n:]) - - t.data = t.data[:len(t.data)-n] - t.insertTime = t.insertTime[:len(t.insertTime)-n] - + t.data = slices.Clone(t.data[n:]) + t.insertTime = slices.Clone(t.insertTime[n:]) t.offset += uint64(n) } diff --git a/topic_test.go b/topic_test.go index 5d21194..2f1a96a 100644 --- a/topic_test.go +++ b/topic_test.go @@ -116,10 +116,10 @@ func TestPrune(t *testing.T) { top.Prune(pruneTime) - _, got, err := top.Receive(t.Context(), 0) - if err != nil { - t.Fatalf("Receive(): %v", err) - } + ctxCanceled, cancel := context.WithCancel(t.Context()) + cancel() + + _, got, _ := top.Receive(ctxCanceled, 0) if !cmpSlice(got, tt.expect) { t.Errorf("Got %v, want %v", got, tt.expect) @@ -144,6 +144,24 @@ func TestPruneEmptyTopic(t *testing.T) { } } +func TestPruneUsedValue(t *testing.T) { + top := topic.New[string]() + top.Publish("val1") + top.Publish("val2") + top.Publish("val3") + ti := time.Now() + _, data, err := top.Receive(t.Context(), 0) + if err != nil { + t.Fatalf("Receive(): %v", err) + } + top.Publish("val4") + top.Prune(ti) + + if data[0] != "val1" { + t.Errorf("Received value changed to %s, expected `val1`", data[0]) + } +} + func TestErrUnknownID(t *testing.T) { top := topic.New[string]() top.Publish("v1")