From e075233efbdc09cf1ad7c3cbd42eab837a0c0dd0 Mon Sep 17 00:00:00 2001 From: Samuel Williams Date: Sat, 4 Apr 2026 21:55:26 +1300 Subject: [PATCH 1/5] Fix handling of closed IO objects in `IO::Event::Selector::Select`. --- lib/io/event/selector/select.rb | 39 ++++++++++-- test/io/event/selector/closed_io.rb | 93 +++++++++++++++++++++++++++++ 2 files changed, 128 insertions(+), 4 deletions(-) create mode 100644 test/io/event/selector/closed_io.rb diff --git a/lib/io/event/selector/select.rb b/lib/io/event/selector/select.rb index 49fc29a..4fdf42a 100644 --- a/lib/io/event/selector/select.rb +++ b/lib/io/event/selector/select.rb @@ -288,12 +288,14 @@ def select(duration = nil) duration = 0 end + closed = nil readable = Array.new writable = Array.new priority = Array.new @waiting.delete_if do |io, waiter| if io.closed? + (closed ||= Array.new) << waiter true else waiter.each do |fiber, events| @@ -314,6 +316,14 @@ def select(duration = nil) end end + closed&.each do |waiter| + waiter.each do |fiber, _| + fiber.raise(IOError, "closed stream") if fiber.alive? + rescue + # The fiber didn't handle the exception; it is now terminated. + end + end + duration = 0 unless @ready.empty? error = nil @@ -338,7 +348,26 @@ def select(duration = nil) end if error - # Requeue the error into the pending exception queue: + # `IO.select` can raise both IOError and Errno::EBADF when one of the given IOs is closed. In that case, we enumerate all waiting IOs to find the closed one(s) and raise on their waiters. Then, we return 0 so the event loop retries cleanly. + if error.is_a?(IOError) || error.is_a?(Errno::EBADF) + closed = [] + @waiting.delete_if do |io, waiter| + if io.closed? + waiter.each{|fiber, _| closed << fiber if fiber.alive?} + true + end + end + + closed.each do |fiber| + fiber.raise(IOError, "closed stream") + rescue + # The fiber didn't handle the exception; it is now terminated. + end + + return 0 + end + + # For all other errors (e.g. thread interrupts), re-queue on the scheduler thread: Thread.current.raise(error) return 0 end @@ -346,15 +375,17 @@ def select(duration = nil) ready = Hash.new(0).compare_by_identity readable&.each do |io| - ready[io] |= IO::READABLE + # Skip any IO that was closed/reused after IO.select returned - its fd number + # may now belong to a different file, so resuming the waiter would be wrong: + ready[io] |= IO::READABLE unless io.closed? end writable&.each do |io| - ready[io] |= IO::WRITABLE + ready[io] |= IO::WRITABLE unless io.closed? end priority&.each do |io| - ready[io] |= IO::PRIORITY + ready[io] |= IO::PRIORITY unless io.closed? end ready.each do |io, events| diff --git a/test/io/event/selector/closed_io.rb b/test/io/event/selector/closed_io.rb new file mode 100644 index 0000000..7d8d2d2 --- /dev/null +++ b/test/io/event/selector/closed_io.rb @@ -0,0 +1,93 @@ +# frozen_string_literal: true + +# Released under the MIT License. +# Copyright, 2025, by Samuel Williams. + +require "io/event" +require "io/event/selector" +require "socket" + +ClosedIO = Sus::Shared("closed io while selecting") do + with "a pipe" do + let(:pipe) {IO.pipe} + let(:input) {pipe.first} + let(:output) {pipe.last} + + after do + input.close unless input.closed? + output.close unless output.closed? + end + + it "does not raise when IO is closed from the same fiber before selecting" do + thread = Thread.new do + Thread.current.report_on_exception = false + + selector = subject.new(Fiber.current) + + wait_fiber = Fiber.new do + selector.io_wait(Fiber.current, input, IO::READABLE) + rescue IOError + # acceptable: the IO was closed while waiting + end + + wait_fiber.transfer + + # Close the IO before calling select (deterministic, no race): + input.close + + Thread.handle_interrupt(::SignalException => :never) do + selector.select(0) + end + ensure + selector&.close + end + + thread.join + end + + it "does not raise when IO is closed from another thread while selecting" do + thread = Thread.new do + Thread.current.report_on_exception = false + + selector = subject.new(Fiber.current) + + wait_fiber = Fiber.new do + selector.io_wait(Fiber.current, input, IO::READABLE) + rescue IOError + # acceptable: the IO was closed while waiting + end + + wait_fiber.transfer + + # Close the IO from another thread while IO.select is blocking — this is + # the race condition that triggers IOError: closed stream in Select#select: + closer = Thread.new do + sleep(0.01) + input.close + end + + Thread.handle_interrupt(::SignalException => :never) do + selector.select(1.0) + end + ensure + closer&.join + selector&.close + end + + error = nil + begin + thread.join + rescue => error + end + expect(error).to be_nil + end + end +end + +IO::Event::Selector.constants.each do |name| + klass = IO::Event::Selector.const_get(name) + + describe(klass, unique: name) do + it_behaves_like ClosedIO + end +end From 5b19a9dff923acb3391e36951bd58da8a8db1372 Mon Sep 17 00:00:00 2001 From: Samuel Williams Date: Sun, 5 Apr 2026 15:58:53 +1200 Subject: [PATCH 2/5] Fix handling of EBADF in URing backend. --- ext/io/event/selector/uring.c | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/ext/io/event/selector/uring.c b/ext/io/event/selector/uring.c index fea61bb..10e0f99 100644 --- a/ext/io/event/selector/uring.c +++ b/ext/io/event/selector/uring.c @@ -557,7 +557,12 @@ VALUE io_wait_transfer(VALUE _arguments) { if (DEBUG) fprintf(stderr, "io_wait_transfer:waiting=%p, result=%d\n", (void*)arguments->waiting, arguments->waiting->result); int32_t result = arguments->waiting->result; - if (result < 0) { + if (result == -EBADF) { + // The file descriptor was closed before or during the poll operation. + // Raise IOError to match Ruby's convention for operations on closed streams, + // allowing callers to rescue IOError rather than seeing Errno::EBADF propagate. + rb_raise(rb_eIOError, "closed stream"); + } else if (result < 0) { rb_syserr_fail(-result, "io_wait_transfer:io_uring_poll_add"); } else if (result > 0) { // We explicitly filter the resulting events based on the requested events. From 49f0eb662ec237bf1f288c30d47f132f20fd8858 Mon Sep 17 00:00:00 2001 From: Samuel Williams Date: Sun, 5 Apr 2026 16:10:34 +1200 Subject: [PATCH 3/5] Try fixing test instead. --- ext/io/event/selector/uring.c | 7 +------ test/io/event/selector/closed_io.rb | 25 +++++++++++++++---------- 2 files changed, 16 insertions(+), 16 deletions(-) diff --git a/ext/io/event/selector/uring.c b/ext/io/event/selector/uring.c index 10e0f99..fea61bb 100644 --- a/ext/io/event/selector/uring.c +++ b/ext/io/event/selector/uring.c @@ -557,12 +557,7 @@ VALUE io_wait_transfer(VALUE _arguments) { if (DEBUG) fprintf(stderr, "io_wait_transfer:waiting=%p, result=%d\n", (void*)arguments->waiting, arguments->waiting->result); int32_t result = arguments->waiting->result; - if (result == -EBADF) { - // The file descriptor was closed before or during the poll operation. - // Raise IOError to match Ruby's convention for operations on closed streams, - // allowing callers to rescue IOError rather than seeing Errno::EBADF propagate. - rb_raise(rb_eIOError, "closed stream"); - } else if (result < 0) { + if (result < 0) { rb_syserr_fail(-result, "io_wait_transfer:io_uring_poll_add"); } else if (result > 0) { // We explicitly filter the resulting events based on the requested events. diff --git a/test/io/event/selector/closed_io.rb b/test/io/event/selector/closed_io.rb index 7d8d2d2..b9fd50a 100644 --- a/test/io/event/selector/closed_io.rb +++ b/test/io/event/selector/closed_io.rb @@ -7,6 +7,8 @@ require "io/event/selector" require "socket" +require_relative "../../../../fixtures/io/event/test_scheduler" + ClosedIO = Sus::Shared("closed io while selecting") do with "a pipe" do let(:pipe) {IO.pipe} @@ -22,10 +24,11 @@ thread = Thread.new do Thread.current.report_on_exception = false - selector = subject.new(Fiber.current) + scheduler = IO::Event::TestScheduler.new(selector: subject.new(Fiber.current)) + Fiber.set_scheduler(scheduler) wait_fiber = Fiber.new do - selector.io_wait(Fiber.current, input, IO::READABLE) + input.wait_readable rescue IOError # acceptable: the IO was closed while waiting end @@ -36,10 +39,11 @@ input.close Thread.handle_interrupt(::SignalException => :never) do - selector.select(0) + scheduler.selector.select(0) end ensure - selector&.close + Fiber.set_scheduler(nil) + scheduler&.close end thread.join @@ -49,29 +53,30 @@ thread = Thread.new do Thread.current.report_on_exception = false - selector = subject.new(Fiber.current) + scheduler = IO::Event::TestScheduler.new(selector: subject.new(Fiber.current)) + Fiber.set_scheduler(scheduler) wait_fiber = Fiber.new do - selector.io_wait(Fiber.current, input, IO::READABLE) + input.wait_readable rescue IOError # acceptable: the IO was closed while waiting end wait_fiber.transfer - # Close the IO from another thread while IO.select is blocking — this is - # the race condition that triggers IOError: closed stream in Select#select: + # Close the IO from another thread while the selector is blocking: closer = Thread.new do sleep(0.01) input.close end Thread.handle_interrupt(::SignalException => :never) do - selector.select(1.0) + scheduler.selector.select(1.0) end ensure closer&.join - selector&.close + Fiber.set_scheduler(nil) + scheduler&.close end error = nil From 66f048ee7468cc9adb13cdde7ef3e0c526920b55 Mon Sep 17 00:00:00 2001 From: Samuel Williams Date: Sun, 5 Apr 2026 16:33:58 +1200 Subject: [PATCH 4/5] Fix test failures. --- test/io/event/selector/closed_io.rb | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/test/io/event/selector/closed_io.rb b/test/io/event/selector/closed_io.rb index b9fd50a..5512911 100644 --- a/test/io/event/selector/closed_io.rb +++ b/test/io/event/selector/closed_io.rb @@ -33,14 +33,16 @@ # acceptable: the IO was closed while waiting end - wait_fiber.transfer + # Close must happen in a separate fiber so that rb_thread_io_close_wait + # can yield (via kernel_sleep) back to the loop fiber instead of deadlocking: + close_fiber = Fiber.new do + input.close + end - # Close the IO before calling select (deterministic, no race): - input.close + wait_fiber.transfer + close_fiber.transfer - Thread.handle_interrupt(::SignalException => :never) do - scheduler.selector.select(0) - end + scheduler.run ensure Fiber.set_scheduler(nil) scheduler&.close @@ -70,9 +72,7 @@ input.close end - Thread.handle_interrupt(::SignalException => :never) do - scheduler.selector.select(1.0) - end + scheduler.run ensure closer&.join Fiber.set_scheduler(nil) From 58b3f6278b3a45719e9bcef0bf9189e97876b7e9 Mon Sep 17 00:00:00 2001 From: Samuel Williams Date: Sun, 5 Apr 2026 16:43:16 +1200 Subject: [PATCH 5/5] Only run tests on Ruby 4+. --- test/io/event/selector/closed_io.rb | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/test/io/event/selector/closed_io.rb b/test/io/event/selector/closed_io.rb index 5512911..457495c 100644 --- a/test/io/event/selector/closed_io.rb +++ b/test/io/event/selector/closed_io.rb @@ -21,6 +21,8 @@ end it "does not raise when IO is closed from the same fiber before selecting" do + skip_unless_minimum_ruby_version("4") + thread = Thread.new do Thread.current.report_on_exception = false @@ -52,6 +54,8 @@ end it "does not raise when IO is closed from another thread while selecting" do + skip_unless_minimum_ruby_version("4") + thread = Thread.new do Thread.current.report_on_exception = false