Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 35 additions & 4 deletions lib/io/event/selector/select.rb
Original file line number Diff line number Diff line change
Expand Up @@ -288,12 +288,14 @@ def select(duration = nil)
duration = 0
end

closed = nil
readable = Array.new
writable = Array.new
priority = Array.new

@waiting.delete_if do |io, waiter|
if io.closed?
(closed ||= Array.new) << waiter
true
else
waiter.each do |fiber, events|
Expand All @@ -314,6 +316,14 @@ def select(duration = nil)
end
end

closed&.each do |waiter|
waiter.each do |fiber, _|
fiber.raise(IOError, "closed stream") if fiber.alive?
rescue
# The fiber didn't handle the exception; it is now terminated.
end
end

duration = 0 unless @ready.empty?
error = nil

Expand All @@ -338,23 +348,44 @@ def select(duration = nil)
end

if error
# Requeue the error into the pending exception queue:
# `IO.select` can raise both IOError and Errno::EBADF when one of the given IOs is closed. In that case, we enumerate all waiting IOs to find the closed one(s) and raise on their waiters. Then, we return 0 so the event loop retries cleanly.
if error.is_a?(IOError) || error.is_a?(Errno::EBADF)
closed = []
@waiting.delete_if do |io, waiter|
if io.closed?
waiter.each{|fiber, _| closed << fiber if fiber.alive?}
true
end
end

closed.each do |fiber|
fiber.raise(IOError, "closed stream")
rescue
# The fiber didn't handle the exception; it is now terminated.
end

return 0
end

# For all other errors (e.g. thread interrupts), re-queue on the scheduler thread:
Thread.current.raise(error)
return 0
end

ready = Hash.new(0).compare_by_identity

readable&.each do |io|
ready[io] |= IO::READABLE
# Skip any IO that was closed/reused after IO.select returned - its fd number
# may now belong to a different file, so resuming the waiter would be wrong:
ready[io] |= IO::READABLE unless io.closed?
end

writable&.each do |io|
ready[io] |= IO::WRITABLE
ready[io] |= IO::WRITABLE unless io.closed?
end

priority&.each do |io|
ready[io] |= IO::PRIORITY
ready[io] |= IO::PRIORITY unless io.closed?
end

ready.each do |io, events|
Expand Down
102 changes: 102 additions & 0 deletions test/io/event/selector/closed_io.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
# frozen_string_literal: true

# Released under the MIT License.
# Copyright, 2025, by Samuel Williams.

require "io/event"
require "io/event/selector"
require "socket"

require_relative "../../../../fixtures/io/event/test_scheduler"

ClosedIO = Sus::Shared("closed io while selecting") do
with "a pipe" do
let(:pipe) {IO.pipe}
let(:input) {pipe.first}
let(:output) {pipe.last}

after do
input.close unless input.closed?
output.close unless output.closed?
end

it "does not raise when IO is closed from the same fiber before selecting" do
skip_unless_minimum_ruby_version("4")

thread = Thread.new do
Thread.current.report_on_exception = false

scheduler = IO::Event::TestScheduler.new(selector: subject.new(Fiber.current))
Fiber.set_scheduler(scheduler)

wait_fiber = Fiber.new do
input.wait_readable
rescue IOError
# acceptable: the IO was closed while waiting
end

# Close must happen in a separate fiber so that rb_thread_io_close_wait
# can yield (via kernel_sleep) back to the loop fiber instead of deadlocking:
close_fiber = Fiber.new do
input.close
end

wait_fiber.transfer
close_fiber.transfer

scheduler.run
ensure
Fiber.set_scheduler(nil)
scheduler&.close
end

thread.join
end

it "does not raise when IO is closed from another thread while selecting" do
skip_unless_minimum_ruby_version("4")

thread = Thread.new do
Thread.current.report_on_exception = false

scheduler = IO::Event::TestScheduler.new(selector: subject.new(Fiber.current))
Fiber.set_scheduler(scheduler)

wait_fiber = Fiber.new do
input.wait_readable
rescue IOError
# acceptable: the IO was closed while waiting
end

wait_fiber.transfer

# Close the IO from another thread while the selector is blocking:
closer = Thread.new do
sleep(0.01)
input.close
end

scheduler.run
ensure
closer&.join
Fiber.set_scheduler(nil)
scheduler&.close
end

error = nil
begin
thread.join
rescue => error
end
expect(error).to be_nil
end
end
end

IO::Event::Selector.constants.each do |name|
klass = IO::Event::Selector.const_get(name)

describe(klass, unique: name) do
it_behaves_like ClosedIO
end
end
Loading