Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/grumpy-pigs-hammer.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@livekit/protocol": patch
---

fix flaky test
56 changes: 31 additions & 25 deletions utils/protoproxy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,46 +40,32 @@ func TestProtoProxy(t *testing.T) {
require.EqualValues(t, 0, proxy.Get().NumParticipants)

// immediate change
proxy.MarkDirty(true)
time.Sleep(100 * time.Millisecond)
<-proxy.MarkDirty(true)
awaitProxyUpdate(t, proxy)

require.EqualValues(t, 2, numParticipants.Load())
require.EqualValues(t, 1, proxy.Get().NumParticipants)

// queued updates
// queue an update while the cached value is still fresh so the worker
// picks it up on the next tick instead of triggering an immediate refresh.
proxy.MarkDirty(false)
select {
case <-proxy.Updated():
// consume previous notification
default:
}
assertNoProxyUpdate(t, proxy, 5*time.Millisecond)
require.EqualValues(t, 1, proxy.Get().NumParticipants)

// freeze and ensure that updates are not triggered
freeze.Store(true)
// freezing and consuming the previous notification to ensure counter does not increase in updateFn
select {
case <-proxy.Updated():
case <-time.After(100 * time.Millisecond):
t.Fatal("should have received an update")
}
// possible that ticker was updated while markDirty queued another update
require.GreaterOrEqual(t, int(proxy.Get().NumParticipants), 2)
awaitProxyUpdate(t, proxy)
require.EqualValues(t, 2, proxy.Get().NumParticipants)

// trigger another update, but should not get notification as freeze is in place and the model should not have changed
proxy.MarkDirty(false)
time.Sleep(500 * time.Millisecond)
select {
case <-proxy.Updated():
t.Fatal("should not have received an update")
default:
}
assertNoProxyUpdate(t, proxy, 100*time.Millisecond)
require.EqualValues(t, 2, proxy.Get().NumParticipants)

// ensure we didn't leak
proxy.Stop()

for i := 0; i < 10; i++ {
for range 10 {
if runtime.NumGoroutine() <= numGoRoutines {
break
}
Expand Down Expand Up @@ -112,7 +98,7 @@ func TestProtoProxy(t *testing.T) {
})

t.Run("await resolve when there is no change", func(t *testing.T) {
proxy := NewProtoProxy[*livekit.Room](10*time.Millisecond, func() *livekit.Room { return nil })
proxy := NewProtoProxy(10*time.Millisecond, func() *livekit.Room { return nil })
done := proxy.MarkDirty(true)
time.Sleep(100 * time.Millisecond)
select {
Expand All @@ -123,11 +109,31 @@ func TestProtoProxy(t *testing.T) {
})
}

func awaitProxyUpdate(t *testing.T, proxy *ProtoProxy[*livekit.Room]) {
t.Helper()

select {
case <-proxy.Updated():
case <-time.After(250 * time.Millisecond):
require.FailNow(t, "timed out waiting for proxy update")
}
}

func assertNoProxyUpdate(t *testing.T, proxy *ProtoProxy[*livekit.Room], d time.Duration) {
t.Helper()

select {
case <-proxy.Updated():
require.FailNow(t, "should not have received an update")
case <-time.After(d):
}
}

func createTestProxy() (*ProtoProxy[*livekit.Room], *atomic.Uint32, *atomic.Bool) {
// uses an update func that increments numParticipants each time
var numParticipants atomic.Uint32
var freeze atomic.Bool
return NewProtoProxy[*livekit.Room](
return NewProtoProxy(
10*time.Millisecond,
func() *livekit.Room {
if !freeze.Load() {
Expand Down
56 changes: 39 additions & 17 deletions utils/rate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
package utils

import (
"runtime"
"sync"
"testing"
"time"
Expand Down Expand Up @@ -187,30 +188,51 @@ func runTest(t *testing.T, fn func(testRunner)) {
constructor: tt.constructor,
doneCh: make(chan struct{}),
}
defer close(r.doneCh)
defer r.wg.Wait()

fn(&r)

// it's possible that there are some goroutines still waiting
// in taking the bandwidth. We need to keep moving the clock forward
// until all goroutines are finished
go func() {
ticker := time.NewTicker(5 * time.Millisecond)
defer ticker.Stop()

for {
select {
case <-ticker.C:
r.clock.Add(r.maxDuration)
case <-r.doneCh:
}
}
}()
r.advanceUntilDone()
close(r.doneCh)
})
}
}

func (r *runnerImpl) advanceUntilDone() {
if r.maxDuration <= 0 {
r.wg.Wait()
return
}

waitDone := make(chan struct{})
go func() {
r.wg.Wait()
close(waitDone)
}()

step := r.clockAdvanceStep()
for {
select {
case <-waitDone:
return
default:
}

r.clock.Add(step)
runtime.Gosched()
}
}

func (r *runnerImpl) clockAdvanceStep() time.Duration {
step := r.maxDuration / 1_000
if step < time.Millisecond {
return time.Millisecond
}
if step > 100*time.Millisecond {
return 100 * time.Millisecond
}
return step
}

// createLimiter builds a limiter with given options.
func (r *runnerImpl) createLimiter(rate int, opts ...Option) Limiter {
opts = append(opts, WithClock(r.clock))
Expand Down
Loading