diff --git a/Lib/asyncio/streams.py b/Lib/asyncio/streams.py index d2db1a930c2ad2..66115a1274150a 100644 --- a/Lib/asyncio/streams.py +++ b/Lib/asyncio/streams.py @@ -480,6 +480,29 @@ def _maybe_resume_transport(self): self._paused = False self._transport.resume_reading() + def _consume_buffer(self, n=None): + """Take *n* bytes from the buffer (all bytes when *n* is ``None``). + + Returns a :class:`bytes` object. Uses ``bytearray.take_bytes()`` + when possible, but falls back to a non-mutating replacement when + a ``BufferError`` is raised due to active memoryview exports. + """ + try: + if n is None: + return self._buffer.take_bytes() + return self._buffer.take_bytes(n) + except BufferError: + # A memoryview held by a caller (e.g. an async database driver) + # prevents in-place resize. Fall back to a copy-and-replace + # strategy that does not mutate the exported object. + if n is None: + data = bytes(self._buffer) + self._buffer = bytearray() + else: + data = bytes(self._buffer[:n]) + self._buffer = self._buffer[n:] + return data + def feed_eof(self): self._eof = True self._wakeup_waiter() @@ -562,9 +585,9 @@ async def readline(self): return e.partial except exceptions.LimitOverrunError as e: if self._buffer.startswith(sep, e.consumed): - del self._buffer[:e.consumed + seplen] + self._consume_buffer(e.consumed + seplen) else: - self._buffer.clear() + self._consume_buffer() self._maybe_resume_transport() raise ValueError(e.args[0]) return line @@ -667,7 +690,7 @@ async def readuntil(self, separator=b'\n'): # adds data which makes separator be found. That's why we check for # EOF *after* inspecting the buffer. if self._eof: - chunk = self._buffer.take_bytes() + chunk = self._consume_buffer() raise exceptions.IncompleteReadError(chunk, None) # _wait_for_data() will resume reading if stream was paused. @@ -677,7 +700,7 @@ async def readuntil(self, separator=b'\n'): raise exceptions.LimitOverrunError( 'Separator is found, but chunk is longer than limit', match_start) - chunk = self._buffer.take_bytes(match_end) + chunk = self._consume_buffer(match_end) self._maybe_resume_transport() return chunk @@ -723,7 +746,7 @@ async def read(self, n=-1): await self._wait_for_data('read') # This will work right even if buffer is less than n bytes - data = self._buffer.take_bytes(min(len(self._buffer), n)) + data = self._consume_buffer(min(len(self._buffer), n)) self._maybe_resume_transport() return data @@ -754,12 +777,12 @@ async def readexactly(self, n): while len(self._buffer) < n: if self._eof: - incomplete = self._buffer.take_bytes() + incomplete = self._consume_buffer() raise exceptions.IncompleteReadError(incomplete, n) await self._wait_for_data('readexactly') - data = self._buffer.take_bytes(n) + data = self._consume_buffer(n) self._maybe_resume_transport() return data diff --git a/Lib/test/test_asyncio/test_streams.py b/Lib/test/test_asyncio/test_streams.py index cae8c7c6f7c94c..adb40ce0f75a65 100644 --- a/Lib/test/test_asyncio/test_streams.py +++ b/Lib/test/test_asyncio/test_streams.py @@ -1257,6 +1257,94 @@ async def main(): main_coro = main() asyncio.run(main_coro) + # --- BufferError regression tests (gh-146379) --- + # StreamReader must not raise BufferError when a memoryview is held + # on the internal buffer, e.g. by an async database driver. + + def test_readexactly_with_active_memoryview(self): + """readexactly must succeed even when a memoryview is active.""" + async def go(): + reader = asyncio.StreamReader() + reader.feed_data(b"AABBCCDD") + mv = memoryview(reader._buffer) + data = await reader.readexactly(4) + self.assertEqual(data, b"AABB") + self.assertEqual(bytes(reader._buffer), b"CCDD") + mv.release() + asyncio.run(go()) + + def test_read_with_active_memoryview(self): + """read(n) must succeed even when a memoryview is active.""" + async def go(): + reader = asyncio.StreamReader() + reader.feed_data(b"HELLO") + mv = memoryview(reader._buffer) + data = await reader.read(3) + self.assertEqual(data, b"HEL") + self.assertEqual(bytes(reader._buffer), b"LO") + mv.release() + asyncio.run(go()) + + def test_readline_with_active_memoryview(self): + """readline must succeed even when a memoryview is active.""" + async def go(): + reader = asyncio.StreamReader() + reader.feed_data(b"line1\nline2\n") + mv = memoryview(reader._buffer) + data = await reader.readline() + self.assertEqual(data, b"line1\n") + mv.release() + asyncio.run(go()) + + def test_readuntil_with_active_memoryview(self): + """readuntil must succeed even when a memoryview is active.""" + async def go(): + reader = asyncio.StreamReader() + reader.feed_data(b"payload|rest") + mv = memoryview(reader._buffer) + data = await reader.readuntil(b"|") + self.assertEqual(data, b"payload|") + self.assertEqual(bytes(reader._buffer), b"rest") + mv.release() + asyncio.run(go()) + + def test_readexactly_eof_with_active_memoryview(self): + """readexactly at EOF must not raise BufferError.""" + async def go(): + reader = asyncio.StreamReader() + reader.feed_data(b"short") + reader.feed_eof() + mv = memoryview(reader._buffer) + with self.assertRaises(asyncio.IncompleteReadError) as cm: + await reader.readexactly(100) + self.assertEqual(cm.exception.partial, b"short") + mv.release() + asyncio.run(go()) + + def test_readuntil_eof_with_active_memoryview(self): + """readuntil at EOF must not raise BufferError.""" + async def go(): + reader = asyncio.StreamReader() + reader.feed_data(b"no separator") + reader.feed_eof() + mv = memoryview(reader._buffer) + with self.assertRaises(asyncio.IncompleteReadError) as cm: + await reader.readuntil(b"|") + self.assertEqual(cm.exception.partial, b"no separator") + mv.release() + asyncio.run(go()) + + def test_readline_limit_overrun_with_active_memoryview(self): + """readline over limit with active memoryview must not raise BufferError.""" + async def go(): + reader = asyncio.StreamReader(limit=5) + reader.feed_data(b"x" * 6 + b"\n") + mv = memoryview(reader._buffer) + with self.assertRaises(ValueError): + await reader.readline() + mv.release() + asyncio.run(go()) + if __name__ == '__main__': unittest.main() diff --git a/Misc/NEWS.d/next/Library/2026-03-24-13-54-27.gh-issue-146379.piap38.rst b/Misc/NEWS.d/next/Library/2026-03-24-13-54-27.gh-issue-146379.piap38.rst new file mode 100644 index 00000000000000..f8d3a3396d7069 --- /dev/null +++ b/Misc/NEWS.d/next/Library/2026-03-24-13-54-27.gh-issue-146379.piap38.rst @@ -0,0 +1,4 @@ +Fix :class:`asyncio.StreamReader` raising :exc:`BufferError` when a +:class:`memoryview` is held on the internal buffer. The ``read``, +``readline``, ``readuntil``, and ``readexactly`` methods now fall back +to non-mutating buffer replacement when in-place resize is not possible.