diff --git a/.changeset/grumpy-pigs-hammer.md b/.changeset/grumpy-pigs-hammer.md new file mode 100644 index 000000000..1d10854cf --- /dev/null +++ b/.changeset/grumpy-pigs-hammer.md @@ -0,0 +1,5 @@ +--- +"@livekit/protocol": patch +--- + +fix flaky test diff --git a/utils/protoproxy_test.go b/utils/protoproxy_test.go index 2363f289e..7c9e16480 100644 --- a/utils/protoproxy_test.go +++ b/utils/protoproxy_test.go @@ -40,46 +40,32 @@ func TestProtoProxy(t *testing.T) { require.EqualValues(t, 0, proxy.Get().NumParticipants) // immediate change - proxy.MarkDirty(true) - time.Sleep(100 * time.Millisecond) + <-proxy.MarkDirty(true) + awaitProxyUpdate(t, proxy) require.EqualValues(t, 2, numParticipants.Load()) require.EqualValues(t, 1, proxy.Get().NumParticipants) - // queued updates + // queue an update while the cached value is still fresh so the worker + // picks it up on the next tick instead of triggering an immediate refresh. proxy.MarkDirty(false) - select { - case <-proxy.Updated(): - // consume previous notification - default: - } + assertNoProxyUpdate(t, proxy, 5*time.Millisecond) require.EqualValues(t, 1, proxy.Get().NumParticipants) // freeze and ensure that updates are not triggered freeze.Store(true) - // freezing and consuming the previous notification to ensure counter does not increase in updateFn - select { - case <-proxy.Updated(): - case <-time.After(100 * time.Millisecond): - t.Fatal("should have received an update") - } - // possible that ticker was updated while markDirty queued another update - require.GreaterOrEqual(t, int(proxy.Get().NumParticipants), 2) + awaitProxyUpdate(t, proxy) + require.EqualValues(t, 2, proxy.Get().NumParticipants) // trigger another update, but should not get notification as freeze is in place and the model should not have changed proxy.MarkDirty(false) - time.Sleep(500 * time.Millisecond) - select { - case <-proxy.Updated(): - t.Fatal("should not have received an update") - default: - } + assertNoProxyUpdate(t, proxy, 100*time.Millisecond) require.EqualValues(t, 2, proxy.Get().NumParticipants) // ensure we didn't leak proxy.Stop() - for i := 0; i < 10; i++ { + for range 10 { if runtime.NumGoroutine() <= numGoRoutines { break } @@ -112,7 +98,7 @@ func TestProtoProxy(t *testing.T) { }) t.Run("await resolve when there is no change", func(t *testing.T) { - proxy := NewProtoProxy[*livekit.Room](10*time.Millisecond, func() *livekit.Room { return nil }) + proxy := NewProtoProxy(10*time.Millisecond, func() *livekit.Room { return nil }) done := proxy.MarkDirty(true) time.Sleep(100 * time.Millisecond) select { @@ -123,11 +109,31 @@ func TestProtoProxy(t *testing.T) { }) } +func awaitProxyUpdate(t *testing.T, proxy *ProtoProxy[*livekit.Room]) { + t.Helper() + + select { + case <-proxy.Updated(): + case <-time.After(250 * time.Millisecond): + require.FailNow(t, "timed out waiting for proxy update") + } +} + +func assertNoProxyUpdate(t *testing.T, proxy *ProtoProxy[*livekit.Room], d time.Duration) { + t.Helper() + + select { + case <-proxy.Updated(): + require.FailNow(t, "should not have received an update") + case <-time.After(d): + } +} + func createTestProxy() (*ProtoProxy[*livekit.Room], *atomic.Uint32, *atomic.Bool) { // uses an update func that increments numParticipants each time var numParticipants atomic.Uint32 var freeze atomic.Bool - return NewProtoProxy[*livekit.Room]( + return NewProtoProxy( 10*time.Millisecond, func() *livekit.Room { if !freeze.Load() { diff --git a/utils/rate_test.go b/utils/rate_test.go index 58519f801..c523a37ac 100644 --- a/utils/rate_test.go +++ b/utils/rate_test.go @@ -24,6 +24,7 @@ package utils import ( + "runtime" "sync" "testing" "time" @@ -187,30 +188,51 @@ func runTest(t *testing.T, fn func(testRunner)) { constructor: tt.constructor, doneCh: make(chan struct{}), } - defer close(r.doneCh) - defer r.wg.Wait() fn(&r) - // it's possible that there are some goroutines still waiting - // in taking the bandwidth. We need to keep moving the clock forward - // until all goroutines are finished - go func() { - ticker := time.NewTicker(5 * time.Millisecond) - defer ticker.Stop() - - for { - select { - case <-ticker.C: - r.clock.Add(r.maxDuration) - case <-r.doneCh: - } - } - }() + r.advanceUntilDone() + close(r.doneCh) }) } } +func (r *runnerImpl) advanceUntilDone() { + if r.maxDuration <= 0 { + r.wg.Wait() + return + } + + waitDone := make(chan struct{}) + go func() { + r.wg.Wait() + close(waitDone) + }() + + step := r.clockAdvanceStep() + for { + select { + case <-waitDone: + return + default: + } + + r.clock.Add(step) + runtime.Gosched() + } +} + +func (r *runnerImpl) clockAdvanceStep() time.Duration { + step := r.maxDuration / 1_000 + if step < time.Millisecond { + return time.Millisecond + } + if step > 100*time.Millisecond { + return 100 * time.Millisecond + } + return step +} + // createLimiter builds a limiter with given options. func (r *runnerImpl) createLimiter(rate int, opts ...Option) Limiter { opts = append(opts, WithClock(r.clock))