diff --git a/sender/rtc_sender_test.go b/sender/rtc_sender_test.go index b0b68aa..0ad03a5 100644 --- a/sender/rtc_sender_test.go +++ b/sender/rtc_sender_test.go @@ -8,6 +8,7 @@ package sender import ( "errors" + "fmt" "image" "testing" "time" @@ -16,6 +17,7 @@ import ( "github.com/pion/mediadevices/pkg/codec" "github.com/pion/mediadevices/pkg/io/video" "github.com/pion/mediadevices/pkg/prop" + "github.com/pion/webrtc/v4" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -504,6 +506,75 @@ func TestRTCSender_ForceKeyFrame_EncoderError(t *testing.T) { assert.ErrorIs(t, err, errEncoderBusy) } +// slowEncodedReadCloser simulates a VP8 encoder that takes a fixed duration +// per Read() call, used to verify that processEncodedFrames encodes in parallel. +type slowEncodedReadCloser struct { + delay time.Duration +} + +func (s *slowEncodedReadCloser) Read() (mediadevices.EncodedBuffer, func(), error) { + time.Sleep(s.delay) + + return mediadevices.EncodedBuffer{ + Data: []byte{0x10}, // Non-empty VP8 frame (non-keyframe). + Samples: 1, + }, func() {}, nil +} + +func (s *slowEncodedReadCloser) Close() error { return nil } + +func (s *slowEncodedReadCloser) Controller() codec.EncoderController { return nil } + +func TestRTCSender_ProcessEncodedFrames_Parallel(t *testing.T) { + const ( + numTracks = 10 + encodeDelay = 50 * time.Millisecond + // Parallel: all tracks encode concurrently, so wall time ≈ encodeDelay. + // Sequential would take numTracks * encodeDelay = 500ms. + // Threshold at 3x single-encode gives generous margin to avoid flakiness. + maxWallTime = 3 * encodeDelay + ) + + sender, err := NewRTCSender() + require.NoError(t, err) + + // Clean up manually since Close() expects non-nil videoSource/mediaTrack. + defer func() { + sender.tracksMu.Lock() + sender.tracks = make(map[string]*EncodedTrack) + sender.tracksMu.Unlock() + }() + + // Inject mock tracks directly to bypass real VP8 encoder setup. + sender.tracksMu.Lock() + for i := range numTracks { + trackID := fmt.Sprintf("cam-%d", i) + + videoTrack, trackErr := webrtc.NewTrackLocalStaticSample( + webrtc.RTPCodecCapability{MimeType: webrtc.MimeTypeVP8}, + trackID, trackID, + ) + require.NoError(t, trackErr) + + sender.tracks[trackID] = &EncodedTrack{ + info: VideoTrackInfo{TrackID: trackID, Width: 640, Height: 480}, + videoTrack: videoTrack, + encodedReader: &slowEncodedReadCloser{delay: encodeDelay}, + bitrateTracker: codec.NewBitrateTracker(300 * time.Millisecond), + } + } + sender.tracksMu.Unlock() + + start := time.Now() + sender.processEncodedFrames() + elapsed := time.Since(start) + + assert.Less(t, elapsed, maxWallTime, + "processEncodedFrames took %v with %d tracks (each %v); "+ + "expected < %v for parallel execution", + elapsed, numTracks, encodeDelay, maxWallTime) +} + func TestStaticErrors(t *testing.T) { // Test that all static errors are properly defined assert.NotNil(t, ErrTrackAlreadyExists)