From 4ec6049cd8c191e74a8547b04d8844306c7cc639 Mon Sep 17 00:00:00 2001 From: Paul Wells Date: Sat, 7 Mar 2026 02:22:59 -0800 Subject: [PATCH 1/4] fix flaky test --- utils/protoproxy_test.go | 56 ++++++++++++++++++++++------------------ 1 file changed, 31 insertions(+), 25 deletions(-) diff --git a/utils/protoproxy_test.go b/utils/protoproxy_test.go index 2363f289e..7c9e16480 100644 --- a/utils/protoproxy_test.go +++ b/utils/protoproxy_test.go @@ -40,46 +40,32 @@ func TestProtoProxy(t *testing.T) { require.EqualValues(t, 0, proxy.Get().NumParticipants) // immediate change - proxy.MarkDirty(true) - time.Sleep(100 * time.Millisecond) + <-proxy.MarkDirty(true) + awaitProxyUpdate(t, proxy) require.EqualValues(t, 2, numParticipants.Load()) require.EqualValues(t, 1, proxy.Get().NumParticipants) - // queued updates + // queue an update while the cached value is still fresh so the worker + // picks it up on the next tick instead of triggering an immediate refresh. proxy.MarkDirty(false) - select { - case <-proxy.Updated(): - // consume previous notification - default: - } + assertNoProxyUpdate(t, proxy, 5*time.Millisecond) require.EqualValues(t, 1, proxy.Get().NumParticipants) // freeze and ensure that updates are not triggered freeze.Store(true) - // freezing and consuming the previous notification to ensure counter does not increase in updateFn - select { - case <-proxy.Updated(): - case <-time.After(100 * time.Millisecond): - t.Fatal("should have received an update") - } - // possible that ticker was updated while markDirty queued another update - require.GreaterOrEqual(t, int(proxy.Get().NumParticipants), 2) + awaitProxyUpdate(t, proxy) + require.EqualValues(t, 2, proxy.Get().NumParticipants) // trigger another update, but should not get notification as freeze is in place and the model should not have changed proxy.MarkDirty(false) - time.Sleep(500 * time.Millisecond) - select { - case <-proxy.Updated(): - t.Fatal("should not have received an update") - default: - } + assertNoProxyUpdate(t, proxy, 100*time.Millisecond) require.EqualValues(t, 2, proxy.Get().NumParticipants) // ensure we didn't leak proxy.Stop() - for i := 0; i < 10; i++ { + for range 10 { if runtime.NumGoroutine() <= numGoRoutines { break } @@ -112,7 +98,7 @@ func TestProtoProxy(t *testing.T) { }) t.Run("await resolve when there is no change", func(t *testing.T) { - proxy := NewProtoProxy[*livekit.Room](10*time.Millisecond, func() *livekit.Room { return nil }) + proxy := NewProtoProxy(10*time.Millisecond, func() *livekit.Room { return nil }) done := proxy.MarkDirty(true) time.Sleep(100 * time.Millisecond) select { @@ -123,11 +109,31 @@ func TestProtoProxy(t *testing.T) { }) } +func awaitProxyUpdate(t *testing.T, proxy *ProtoProxy[*livekit.Room]) { + t.Helper() + + select { + case <-proxy.Updated(): + case <-time.After(250 * time.Millisecond): + require.FailNow(t, "timed out waiting for proxy update") + } +} + +func assertNoProxyUpdate(t *testing.T, proxy *ProtoProxy[*livekit.Room], d time.Duration) { + t.Helper() + + select { + case <-proxy.Updated(): + require.FailNow(t, "should not have received an update") + case <-time.After(d): + } +} + func createTestProxy() (*ProtoProxy[*livekit.Room], *atomic.Uint32, *atomic.Bool) { // uses an update func that increments numParticipants each time var numParticipants atomic.Uint32 var freeze atomic.Bool - return NewProtoProxy[*livekit.Room]( + return NewProtoProxy( 10*time.Millisecond, func() *livekit.Room { if !freeze.Load() { From 38a86a648ee8f93ddc63364a7237cd4670776402 Mon Sep 17 00:00:00 2001 From: Paul Wells Date: Sat, 7 Mar 2026 02:29:01 -0800 Subject: [PATCH 2/4] Create grumpy-pigs-hammer.md --- .changeset/grumpy-pigs-hammer.md | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 .changeset/grumpy-pigs-hammer.md diff --git a/.changeset/grumpy-pigs-hammer.md b/.changeset/grumpy-pigs-hammer.md new file mode 100644 index 000000000..1d10854cf --- /dev/null +++ b/.changeset/grumpy-pigs-hammer.md @@ -0,0 +1,5 @@ +--- +"@livekit/protocol": patch +--- + +fix flaky test From eb3b65d2bb70562b64afc7fac869edbcb7867664 Mon Sep 17 00:00:00 2001 From: Paul Wells Date: Sat, 7 Mar 2026 02:36:28 -0800 Subject: [PATCH 3/4] fix rate limiter test --- utils/rate_test.go | 20 +++++++++++++++----- 1 file changed, 15 insertions(+), 5 deletions(-) diff --git a/utils/rate_test.go b/utils/rate_test.go index 58519f801..8e232e950 100644 --- a/utils/rate_test.go +++ b/utils/rate_test.go @@ -331,14 +331,24 @@ func TestDelayedRateLimiter(t *testing.T) { func TestPer(t *testing.T) { runTest(t, func(r testRunner) { + clock := r.getClock() rl := r.createLimiter(7, WithoutSlack, Per(time.Minute)) + perRequest := time.Minute / 7 - r.startTaking(rl) - r.startTaking(rl) + start := clock.Now() + require.Equal(t, start, rl.Take()) + + var ts time.Time + for i := 1; i <= 15; i++ { + clock.Add(perRequest) + ts = rl.Take() + require.Equal(t, start.Add(time.Duration(i)*perRequest), ts) + } - r.assertCountAt(1*time.Second, 1) - r.assertCountAt(1*time.Minute, 8) - r.assertCountAt(2*time.Minute, 15) + require.Less(t, start.Add(7*perRequest).Sub(start), time.Minute) + require.Greater(t, start.Add(8*perRequest).Sub(start), time.Minute) + require.Less(t, start.Add(14*perRequest).Sub(start), 2*time.Minute) + require.Greater(t, ts.Sub(start), 2*time.Minute) }) } From 54b56893c26b869fcba315dc47d6d99289b6baaa Mon Sep 17 00:00:00 2001 From: Paul Wells Date: Sat, 7 Mar 2026 02:49:10 -0800 Subject: [PATCH 4/4] update rate limiter test harness --- utils/rate_test.go | 76 +++++++++++++++++++++++++++------------------- 1 file changed, 44 insertions(+), 32 deletions(-) diff --git a/utils/rate_test.go b/utils/rate_test.go index 8e232e950..c523a37ac 100644 --- a/utils/rate_test.go +++ b/utils/rate_test.go @@ -24,6 +24,7 @@ package utils import ( + "runtime" "sync" "testing" "time" @@ -187,30 +188,51 @@ func runTest(t *testing.T, fn func(testRunner)) { constructor: tt.constructor, doneCh: make(chan struct{}), } - defer close(r.doneCh) - defer r.wg.Wait() fn(&r) - // it's possible that there are some goroutines still waiting - // in taking the bandwidth. We need to keep moving the clock forward - // until all goroutines are finished - go func() { - ticker := time.NewTicker(5 * time.Millisecond) - defer ticker.Stop() - - for { - select { - case <-ticker.C: - r.clock.Add(r.maxDuration) - case <-r.doneCh: - } - } - }() + r.advanceUntilDone() + close(r.doneCh) }) } } +func (r *runnerImpl) advanceUntilDone() { + if r.maxDuration <= 0 { + r.wg.Wait() + return + } + + waitDone := make(chan struct{}) + go func() { + r.wg.Wait() + close(waitDone) + }() + + step := r.clockAdvanceStep() + for { + select { + case <-waitDone: + return + default: + } + + r.clock.Add(step) + runtime.Gosched() + } +} + +func (r *runnerImpl) clockAdvanceStep() time.Duration { + step := r.maxDuration / 1_000 + if step < time.Millisecond { + return time.Millisecond + } + if step > 100*time.Millisecond { + return 100 * time.Millisecond + } + return step +} + // createLimiter builds a limiter with given options. func (r *runnerImpl) createLimiter(rate int, opts ...Option) Limiter { opts = append(opts, WithClock(r.clock)) @@ -331,24 +353,14 @@ func TestDelayedRateLimiter(t *testing.T) { func TestPer(t *testing.T) { runTest(t, func(r testRunner) { - clock := r.getClock() rl := r.createLimiter(7, WithoutSlack, Per(time.Minute)) - perRequest := time.Minute / 7 - - start := clock.Now() - require.Equal(t, start, rl.Take()) - var ts time.Time - for i := 1; i <= 15; i++ { - clock.Add(perRequest) - ts = rl.Take() - require.Equal(t, start.Add(time.Duration(i)*perRequest), ts) - } + r.startTaking(rl) + r.startTaking(rl) - require.Less(t, start.Add(7*perRequest).Sub(start), time.Minute) - require.Greater(t, start.Add(8*perRequest).Sub(start), time.Minute) - require.Less(t, start.Add(14*perRequest).Sub(start), 2*time.Minute) - require.Greater(t, ts.Sub(start), 2*time.Minute) + r.assertCountAt(1*time.Second, 1) + r.assertCountAt(1*time.Minute, 8) + r.assertCountAt(2*time.Minute, 15) }) }