diff --git a/lib/io/event/selector/select.rb b/lib/io/event/selector/select.rb index 49fc29ae..4fdf42a2 100644 --- a/lib/io/event/selector/select.rb +++ b/lib/io/event/selector/select.rb @@ -288,12 +288,14 @@ def select(duration = nil) duration = 0 end + closed = nil readable = Array.new writable = Array.new priority = Array.new @waiting.delete_if do |io, waiter| if io.closed? + (closed ||= Array.new) << waiter true else waiter.each do |fiber, events| @@ -314,6 +316,14 @@ def select(duration = nil) end end + closed&.each do |waiter| + waiter.each do |fiber, _| + fiber.raise(IOError, "closed stream") if fiber.alive? + rescue + # The fiber didn't handle the exception; it is now terminated. + end + end + duration = 0 unless @ready.empty? error = nil @@ -338,7 +348,26 @@ def select(duration = nil) end if error - # Requeue the error into the pending exception queue: + # `IO.select` can raise both IOError and Errno::EBADF when one of the given IOs is closed. In that case, we enumerate all waiting IOs to find the closed one(s) and raise on their waiters. Then, we return 0 so the event loop retries cleanly. + if error.is_a?(IOError) || error.is_a?(Errno::EBADF) + closed = [] + @waiting.delete_if do |io, waiter| + if io.closed? + waiter.each{|fiber, _| closed << fiber if fiber.alive?} + true + end + end + + closed.each do |fiber| + fiber.raise(IOError, "closed stream") + rescue + # The fiber didn't handle the exception; it is now terminated. + end + + return 0 + end + + # For all other errors (e.g. thread interrupts), re-queue on the scheduler thread: Thread.current.raise(error) return 0 end @@ -346,15 +375,17 @@ def select(duration = nil) ready = Hash.new(0).compare_by_identity readable&.each do |io| - ready[io] |= IO::READABLE + # Skip any IO that was closed/reused after IO.select returned - its fd number + # may now belong to a different file, so resuming the waiter would be wrong: + ready[io] |= IO::READABLE unless io.closed? end writable&.each do |io| - ready[io] |= IO::WRITABLE + ready[io] |= IO::WRITABLE unless io.closed? end priority&.each do |io| - ready[io] |= IO::PRIORITY + ready[io] |= IO::PRIORITY unless io.closed? end ready.each do |io, events| diff --git a/test/io/event/selector/closed_io.rb b/test/io/event/selector/closed_io.rb new file mode 100644 index 00000000..457495c0 --- /dev/null +++ b/test/io/event/selector/closed_io.rb @@ -0,0 +1,102 @@ +# frozen_string_literal: true + +# Released under the MIT License. +# Copyright, 2025, by Samuel Williams. + +require "io/event" +require "io/event/selector" +require "socket" + +require_relative "../../../../fixtures/io/event/test_scheduler" + +ClosedIO = Sus::Shared("closed io while selecting") do + with "a pipe" do + let(:pipe) {IO.pipe} + let(:input) {pipe.first} + let(:output) {pipe.last} + + after do + input.close unless input.closed? + output.close unless output.closed? + end + + it "does not raise when IO is closed from the same fiber before selecting" do + skip_unless_minimum_ruby_version("4") + + thread = Thread.new do + Thread.current.report_on_exception = false + + scheduler = IO::Event::TestScheduler.new(selector: subject.new(Fiber.current)) + Fiber.set_scheduler(scheduler) + + wait_fiber = Fiber.new do + input.wait_readable + rescue IOError + # acceptable: the IO was closed while waiting + end + + # Close must happen in a separate fiber so that rb_thread_io_close_wait + # can yield (via kernel_sleep) back to the loop fiber instead of deadlocking: + close_fiber = Fiber.new do + input.close + end + + wait_fiber.transfer + close_fiber.transfer + + scheduler.run + ensure + Fiber.set_scheduler(nil) + scheduler&.close + end + + thread.join + end + + it "does not raise when IO is closed from another thread while selecting" do + skip_unless_minimum_ruby_version("4") + + thread = Thread.new do + Thread.current.report_on_exception = false + + scheduler = IO::Event::TestScheduler.new(selector: subject.new(Fiber.current)) + Fiber.set_scheduler(scheduler) + + wait_fiber = Fiber.new do + input.wait_readable + rescue IOError + # acceptable: the IO was closed while waiting + end + + wait_fiber.transfer + + # Close the IO from another thread while the selector is blocking: + closer = Thread.new do + sleep(0.01) + input.close + end + + scheduler.run + ensure + closer&.join + Fiber.set_scheduler(nil) + scheduler&.close + end + + error = nil + begin + thread.join + rescue => error + end + expect(error).to be_nil + end + end +end + +IO::Event::Selector.constants.each do |name| + klass = IO::Event::Selector.const_get(name) + + describe(klass, unique: name) do + it_behaves_like ClosedIO + end +end