From 71848149a871a6cc9eee49eb081e374f807341fa Mon Sep 17 00:00:00 2001 From: Stephan Puchegger Date: Thu, 5 Feb 2026 15:37:16 +0100 Subject: [PATCH 1/5] rebased original io completionport branch --- Cargo.toml | 4 + src/iocp/afd.rs | 4 +- src/iocp/mod.rs | 46 ++++------ src/iocp/port.rs | 4 +- src/os/iocp.rs | 41 ++++++++- tests/windows_overlapped.rs | 178 ++++++++++++++++++++++++++++++++++++ 6 files changed, 245 insertions(+), 32 deletions(-) create mode 100644 tests/windows_overlapped.rs diff --git a/Cargo.toml b/Cargo.toml index e99607c8..5c6e19ab 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -59,9 +59,13 @@ version = "0.5.0" easy-parallel = "3.1.0" fastrand = "2.0.0" socket2 = "0.6.0" +tracing-subscriber = "0.3" [target.'cfg(unix)'.dev-dependencies] libc = "0.2" [target.'cfg(all(unix, not(target_os="vita")))'.dev-dependencies] signal-hook = "0.3.17" + +[target.'cfg(windows)'.dev_dependencies] +tempfile = "3.7" diff --git a/src/iocp/afd.rs b/src/iocp/afd.rs index f7c7c3fe..4f966244 100644 --- a/src/iocp/afd.rs +++ b/src/iocp/afd.rs @@ -29,7 +29,7 @@ use windows_sys::Win32::System::IO::IO_STATUS_BLOCK; #[derive(Default)] #[repr(C)] -pub(super) struct AfdPollInfo { +pub(crate) struct AfdPollInfo { /// The timeout for this poll. timeout: i64, @@ -538,7 +538,7 @@ where pin_project_lite::pin_project! { /// An I/O status block paired with some auxiliary data. #[repr(C)] - pub(super) struct IoStatusBlock { + pub(crate) struct IoStatusBlock { // The I/O status block. iosb: UnsafeCell, diff --git a/src/iocp/mod.rs b/src/iocp/mod.rs index facbe066..c2b0c538 100644 --- a/src/iocp/mod.rs +++ b/src/iocp/mod.rs @@ -28,9 +28,12 @@ mod afd; mod port; -use afd::{base_socket, Afd, AfdPollInfo, AfdPollMask, HasAfdInfo, IoStatusBlock}; +use afd::{base_socket, Afd, AfdPollInfo, AfdPollMask, HasAfdInfo}; use port::{IoCompletionPort, OverlappedEntry}; +pub(crate) use afd::IoStatusBlock; +pub(crate) use port::{Completion, CompletionHandle}; + use windows_sys::Win32::Foundation::{ERROR_INVALID_HANDLE, ERROR_IO_PENDING, STATUS_CANCELLED}; use windows_sys::Win32::System::Threading::{ RegisterWaitForSingleObject, UnregisterWait, INFINITE, WT_EXECUTELONGFUNCTION, @@ -511,7 +514,7 @@ impl Poller { } /// Push an IOCP packet into the queue. - pub(super) fn post(&self, packet: CompletionPacket) -> io::Result<()> { + pub(super) fn post(&self, packet: crate::os::iocp::CompletionPacket) -> io::Result<()> { self.port.post(0, 0, packet.0) } @@ -709,38 +712,17 @@ impl EventExtra { } } -/// A packet used to wake up the poller with an event. -#[derive(Debug, Clone)] -pub struct CompletionPacket(Packet); - -impl CompletionPacket { - /// Create a new completion packet with a custom event. - pub fn new(event: Event) -> Self { - Self(Arc::pin(IoStatusBlock::from(PacketInner::Custom { event }))) - } - - /// Get the event associated with this packet. - pub fn event(&self) -> &Event { - let data = self.0.as_ref().data().project_ref(); - - match data { - PacketInnerProj::Custom { event } => event, - _ => unreachable!(), - } - } -} - /// The type of our completion packet. /// /// It needs to be pinned, since it contains data that is expected by IOCP not to be moved. -type Packet = Pin>; +pub(crate) type Packet = Pin>; type PacketUnwrapped = IoStatusBlock; pin_project! { /// The inner type of the packet. #[project_ref = PacketInnerProj] #[project = PacketInnerProjMut] - enum PacketInner { + pub(crate) enum PacketInner { // A packet for a socket. Socket { // The AFD packet state. @@ -796,6 +778,16 @@ impl HasAfdInfo for PacketInner { } impl PacketUnwrapped { + /// If this is an event packet, get the event. + pub(crate) fn event(self: Pin<&Self>) -> &Event { + let data = self.data().project_ref(); + + match data { + PacketInnerProj::Custom { event } => event, + _ => unreachable!(), + } + } + /// Set the new events that this socket is waiting on. /// /// Returns `true` if we need to be updated. @@ -1113,7 +1105,7 @@ impl PacketUnwrapped { /// Per-socket state. #[derive(Debug)] -struct SocketState { +pub(crate) struct SocketState { /// The raw socket handle. socket: RawSocket, @@ -1158,7 +1150,7 @@ enum SocketStatus { /// Per-waitable handle state. #[derive(Debug)] -struct WaitableState { +pub(crate) struct WaitableState { /// The handle that this state is for. handle: RawHandle, diff --git a/src/iocp/port.rs b/src/iocp/port.rs index 6d9b8bee..2887f500 100644 --- a/src/iocp/port.rs +++ b/src/iocp/port.rs @@ -27,7 +27,7 @@ use windows_sys::Win32::System::IO::{ /// # Safety /// /// This must be a valid completion block. -pub(super) unsafe trait Completion { +pub(crate) unsafe trait Completion { /// Signal to the completion block that we are about to start an operation. fn try_lock(self: Pin<&Self>) -> bool; @@ -40,7 +40,7 @@ pub(super) unsafe trait Completion { /// # Safety /// /// This must be a valid completion block. -pub(super) unsafe trait CompletionHandle: Deref + Sized { +pub(crate) unsafe trait CompletionHandle: Deref + Sized { /// Type of the completion block. type Completion: Completion; diff --git a/src/os/iocp.rs b/src/os/iocp.rs index 3370118e..d51a9fc8 100644 --- a/src/os/iocp.rs +++ b/src/os/iocp.rs @@ -1,6 +1,6 @@ //! Functionality that is only available for IOCP-based platforms. -pub use crate::sys::CompletionPacket; +use crate::sys::{Completion, CompletionHandle, IoStatusBlock, Packet, PacketInner}; use super::__private::PollerSealed; use crate::{Event, PollMode, Poller}; @@ -8,6 +8,45 @@ use crate::{Event, PollMode, Poller}; use std::io; use std::os::windows::io::{AsRawHandle, RawHandle}; use std::os::windows::prelude::{AsHandle, BorrowedHandle}; +use std::sync::Arc; + +/// A packet used to wake up the poller with an event. +#[derive(Debug, Clone)] +pub struct CompletionPacket(pub(crate) Packet); + +impl CompletionPacket { + /// Create a new completion packet with a custom event. + pub fn new(event: Event) -> Self { + Self(Arc::pin(IoStatusBlock::from(PacketInner::Custom { event }))) + } + + /// Get the event associated with this packet. + pub fn event(&self) -> &Event { + self.0.as_ref().event() + } + + /// Get a pointer to the underlying I/O status block. + /// + /// This pointer can be used as an `OVERLAPPED` block in Windows APIs. Calling this function + /// marks the block as "in use". Trying to call this function again before the operation is + /// indicated as complete by the poller will result in a panic. + pub fn as_ptr(&self) -> *mut () { + if !self.0.as_ref().get().try_lock() { + panic!("completion packet is already in use"); + } + + self.0.as_ref().get_ref() as *const _ as *const () as *mut () + } + + /// Cancel the in flight operation. + /// + /// # Safety + /// + /// The packet must be in flight and the operation must be cancelled already. + pub unsafe fn cancel(&mut self) { + self.0.as_ref().get().unlock(); + } +} /// Extension trait for the [`Poller`] type that provides functionality specific to IOCP-based /// platforms. diff --git a/tests/windows_overlapped.rs b/tests/windows_overlapped.rs new file mode 100644 index 00000000..7138529c --- /dev/null +++ b/tests/windows_overlapped.rs @@ -0,0 +1,178 @@ +//! Take advantage of overlapped I/O on Windows using CompletionPacket. +#![cfg(windows)] + +use polling::os::iocp::CompletionPacket; +use polling::{Event, Events, Poller}; + +use std::io; +use std::os::windows::ffi::OsStrExt; +use std::os::windows::io::{AsRawHandle, FromRawHandle, OwnedHandle}; +use windows_sys::core::BOOL; +use windows_sys::Win32::{Foundation as wf, Storage::FileSystem as wfs, System::IO as wio}; + +#[test] +fn win32_file_io() { + // Create two completion packets: one for reading, one for writing. + let read_packet = CompletionPacket::new(Event::readable(1)); + let write_packet = CompletionPacket::new(Event::writable(2)); + + // Create a poller. + let poller = Poller::new().unwrap(); + let mut events = Events::new(); + + // Open a file for writing. + let dir = tempfile::tempdir().unwrap(); + let file_path = dir.path().join("test.txt"); + let fname = file_path + .as_os_str() + .encode_wide() + .chain(Some(0)) + .collect::>(); + let file_handle = unsafe { + let raw_handle = wfs::CreateFileW( + fname.as_ptr(), + wf::GENERIC_WRITE | wf::GENERIC_READ, + 0, + std::ptr::null_mut(), + wfs::CREATE_ALWAYS, + wfs::FILE_FLAG_OVERLAPPED, + std::ptr::null_mut(), + ); + + if raw_handle == wf::INVALID_HANDLE_VALUE { + panic!("CreateFileW failed: {}", io::Error::last_os_error()); + } + + OwnedHandle::from_raw_handle(raw_handle as _) + }; + + // Associate this file with the poller. + unsafe { + let poller_handle = poller.as_raw_handle(); + if wio::CreateIoCompletionPort(file_handle.as_raw_handle() as _, poller_handle as _, 1, 0) + == std::ptr::null_mut() + { + panic!( + "CreateIoCompletionPort failed: {}", + io::Error::last_os_error() + ); + } + } + + // Repeatedly write to the pipe. + let input_text = "Now is the time for all good men to come to the aid of their party"; + let mut len = input_text.len(); + while len > 0 { + // Begin to write. + let ptr = write_packet.as_ptr() as *mut _; + unsafe { + if wfs::WriteFile( + file_handle.as_raw_handle() as _, + input_text.as_ptr() as _, + len as _, + std::ptr::null_mut(), + ptr, + ) == 0 + && wf::GetLastError() != wf::ERROR_IO_PENDING + { + panic!("WriteFile failed: {}", io::Error::last_os_error()); + } + } + + // Wait for the overlapped operation to complete. + 'waiter: loop { + events.clear(); + println!("Starting wait..."); + poller.wait(&mut events, None).unwrap(); + println!("Got events"); + + for event in events.iter() { + if event.writable && event.key == 2 { + break 'waiter; + } + } + } + + // Decrement the length by the number of bytes written. + let bytes_written = input_text.len(); + len -= bytes_written; + } + + // Close the file and re-open it for reading. + drop(file_handle); + let file_handle = unsafe { + let raw_handle = wfs::CreateFileW( + fname.as_ptr(), + wf::GENERIC_READ | wf::GENERIC_WRITE, + 0, + std::ptr::null_mut(), + wfs::OPEN_EXISTING, + wfs::FILE_FLAG_OVERLAPPED, + std::ptr::null_mut(), + ); + + if raw_handle == wf::INVALID_HANDLE_VALUE { + panic!("CreateFileW failed: {}", io::Error::last_os_error()); + } + + OwnedHandle::from_raw_handle(raw_handle as _) + }; + + // Associate this file with the poller. + unsafe { + let poller_handle = poller.as_raw_handle(); + if wio::CreateIoCompletionPort(file_handle.as_raw_handle() as _, poller_handle as _, 2, 0) + == std::ptr::null_mut() + { + panic!( + "CreateIoCompletionPort failed: {}", + io::Error::last_os_error() + ); + } + } + + // Repeatedly read from the pipe. + let mut buffer = vec![0u8; 1024]; + let mut buffer_cursor = &mut *buffer; + let mut len = 1024; + let mut bytes_received = 0; + + while bytes_received < input_text.len() { + // Begin the read. + let ptr = read_packet.as_ptr().cast(); + unsafe { + if wfs::ReadFile( + file_handle.as_raw_handle() as _, + buffer_cursor.as_mut_ptr() as _, + len as _, + std::ptr::null_mut(), + ptr, + ) == 0 + && wf::GetLastError() != wf::ERROR_IO_PENDING + { + panic!("ReadFile failed: {}", io::Error::last_os_error()); + } + } + + // Wait for the overlapped operation to complete. + 'waiter: loop { + events.clear(); + poller.wait(&mut events, None).unwrap(); + + for event in events.iter() { + if event.readable && event.key == 1 { + break 'waiter; + } + } + } + + // Increment the cursor and decrement the length by the number of bytes read. + let bytes_read = input_text.len(); + buffer_cursor = &mut buffer_cursor[bytes_read..]; + len -= bytes_read; + bytes_received += bytes_read; + } + + assert_eq!(bytes_received, input_text.len()); + assert_eq!(&buffer[..bytes_received], input_text.as_bytes()); +} \ No newline at end of file From 64061d1ca17a4062542ae265ddfacee2ed7b3771 Mon Sep 17 00:00:00 2001 From: Stephan Puchegger Date: Fri, 13 Feb 2026 12:52:07 +0100 Subject: [PATCH 2/5] Fix deprecation (or typo) in Cargo.toml --- Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index 5c6e19ab..1a813b12 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -67,5 +67,5 @@ libc = "0.2" [target.'cfg(all(unix, not(target_os="vita")))'.dev-dependencies] signal-hook = "0.3.17" -[target.'cfg(windows)'.dev_dependencies] +[target.'cfg(windows)'.dev-dependencies] tempfile = "3.7" From 5198a6dddd4ef134ce165dbe30ba8f00d2bf204e Mon Sep 17 00:00:00 2001 From: Stephan Puchegger Date: Fri, 13 Feb 2026 12:54:30 +0100 Subject: [PATCH 3/5] The windows_oeverlapped test kind of works now - except the structural problem with the double-free of the (Completion)packet. --- examples/tcp_client.rs | 2 +- src/epoll.rs | 8 +++++ src/iocp/afd.rs | 25 ++++++++++++---- src/iocp/mod.rs | 30 +++++++++++++++---- src/iocp/port.rs | 2 ++ src/kqueue.rs | 8 +++++ src/lib.rs | 49 +++++++++++++++++++++++------- tests/windows_overlapped.rs | 60 ++++++++++++++++++++----------------- 8 files changed, 134 insertions(+), 50 deletions(-) diff --git a/examples/tcp_client.rs b/examples/tcp_client.rs index 431c0ae1..fd5219e6 100644 --- a/examples/tcp_client.rs +++ b/examples/tcp_client.rs @@ -7,7 +7,7 @@ fn main() -> io::Result<()> { let socket = socket2::Socket::new(socket2::Domain::IPV4, Type::STREAM, None)?; let poller = polling::Poller::new()?; unsafe { - poller.add(&socket, Event::new(0, true, true))?; + poller.add(&socket, Event::new(0, true, true, false))?; } let addr = net::SocketAddr::new(net::Ipv4Addr::LOCALHOST.into(), 8080); socket.set_nonblocking(true)?; diff --git a/src/epoll.rs b/src/epoll.rs index 4137cf10..fb3735ea 100644 --- a/src/epoll.rs +++ b/src/epoll.rs @@ -408,6 +408,14 @@ impl EventExtra { pub fn is_err(&self) -> Option { Some(self.flags.contains(epoll::EventFlags::ERR)) } + + #[inline] + pub fn set_transferred_bytes(&mut self, _transferred_bytes: usize) { + // No-op. + } + + #[inline] + pub fn transferred_bytes(&self) -> usize { 0 } } /// The notifier for Linux. diff --git a/src/iocp/afd.rs b/src/iocp/afd.rs index 4f966244..cd5a462c 100644 --- a/src/iocp/afd.rs +++ b/src/iocp/afd.rs @@ -25,7 +25,7 @@ use windows_sys::Win32::Networking::WinSock::{ }; use windows_sys::Win32::Storage::FileSystem::{FILE_SHARE_READ, FILE_SHARE_WRITE, SYNCHRONIZE}; use windows_sys::Win32::System::LibraryLoader::{GetModuleHandleW, GetProcAddress}; -use windows_sys::Win32::System::IO::IO_STATUS_BLOCK; +use windows_sys::Win32::System::IO::{IO_STATUS_BLOCK, OVERLAPPED}; #[derive(Default)] #[repr(C)] @@ -535,12 +535,27 @@ where } } +// The OVERLAPPED struct is larger than the IO_STATUS_BLOCK struct. +// This way it is possible to use the memory as either/or without +// the risk of reading or writing out-of-bounds. +#[repr(C)] +pub(crate) union PaddedIOStatusBlock { + pub(crate) io_status_block: IO_STATUS_BLOCK, + pub(crate) overlapped: OVERLAPPED, +} + +impl Default for PaddedIOStatusBlock { + fn default() -> Self { + unsafe { core::mem::zeroed() } + } +} + pin_project_lite::pin_project! { /// An I/O status block paired with some auxiliary data. #[repr(C)] pub(crate) struct IoStatusBlock { // The I/O status block. - iosb: UnsafeCell, + padded_io_status_block: UnsafeCell, // Whether or not the block is in use. in_use: AtomicBool, @@ -571,7 +586,7 @@ unsafe impl Sync for IoStatusBlock {} impl From for IoStatusBlock { fn from(data: T) -> Self { Self { - iosb: UnsafeCell::new(unsafe { std::mem::zeroed() }), + padded_io_status_block: UnsafeCell::new(unsafe { std::mem::zeroed() }), in_use: AtomicBool::new(false), data, _marker: PhantomPinned, @@ -580,8 +595,8 @@ impl From for IoStatusBlock { } impl IoStatusBlock { - pub(super) fn iosb(self: Pin<&Self>) -> &UnsafeCell { - self.project_ref().iosb + pub(super) fn padded_io_status_block(self: Pin<&Self>) -> &UnsafeCell { + self.project_ref().padded_io_status_block } pub(super) fn data(self: Pin<&Self>) -> Pin<&T> { diff --git a/src/iocp/mod.rs b/src/iocp/mod.rs index c2b0c538..a2eeadbf 100644 --- a/src/iocp/mod.rs +++ b/src/iocp/mod.rs @@ -479,10 +479,11 @@ impl Poller { // Process all of the events. for entry in events.completions.drain(..) { + let transferred_bytes = entry.transferred_bytes(); let packet = entry.into_packet(); // Feed the event into the packet. - match packet.feed_event(self)? { + match packet.feed_event(self, transferred_bytes)? { FeedEventResult::NoEvent => {} FeedEventResult::Event(event) => { events.packets.push(event); @@ -664,6 +665,8 @@ impl Events { pub struct EventExtra { /// Flags associated with this event. flags: AfdPollMask, + /// Number of bytes transferred + transferred_bytes: usize, } impl EventExtra { @@ -672,6 +675,7 @@ impl EventExtra { pub const fn empty() -> EventExtra { EventExtra { flags: AfdPollMask::empty(), + transferred_bytes: 0, } } @@ -692,7 +696,7 @@ impl EventExtra { pub fn set_hup(&mut self, active: bool) { self.flags.set(AfdPollMask::ABORT, active); } - + /// Set up a listener for PRI events. #[inline] pub fn set_pri(&mut self, active: bool) { @@ -710,6 +714,15 @@ impl EventExtra { pub fn is_err(&self) -> Option { Some(self.flags.intersects(AfdPollMask::CONNECT_FAIL)) } + + /// Set the number of transferred bytes + #[inline] + pub fn set_transferred_bytes(&mut self, transferred_bytes: usize) { self.transferred_bytes = transferred_bytes; } + + /// Get the number of transferred bytes + #[inline] + pub fn transferred_bytes(&self) -> usize { self.transferred_bytes } + } /// The type of our completion packet. @@ -944,14 +957,19 @@ impl PacketUnwrapped { /// /// This indicates that this packet was indicated as "ready" by the IOCP and needs to be /// processed. - fn feed_event(self: Pin>, poller: &Poller) -> io::Result { + fn feed_event( + self: Pin>, + poller: &Poller, + transferred_bytes: u32) -> io::Result { let inner = self.as_ref().data().project_ref(); let (afd_info, socket) = match inner { PacketInnerProj::Socket { packet, socket } => (packet, socket), PacketInnerProj::Custom { event } => { + let mut event = event.clone(); + event.set_transferred_bytes(transferred_bytes as usize); // This is a custom event. - return Ok(FeedEventResult::Event(*event)); + return Ok(FeedEventResult::Event(event)); } PacketInnerProj::Wakeup { .. } => { // The poller was notified. @@ -987,10 +1005,10 @@ impl PacketUnwrapped { unsafe { // SAFETY: The packet is not in transit. - let iosb = &mut *self.as_ref().iosb().get(); + let iosb = &mut *self.as_ref().padded_io_status_block().get(); // Check the status. - match iosb.Anonymous.Status { + match iosb.io_status_block.Anonymous.Status { STATUS_CANCELLED => { // Poll request was cancelled. } diff --git a/src/iocp/port.rs b/src/iocp/port.rs index 2887f500..4a24ca49 100644 --- a/src/iocp/port.rs +++ b/src/iocp/port.rs @@ -279,6 +279,8 @@ impl OverlappedEntry { packet } + pub(super) fn transferred_bytes(&self) -> u32 { self.entry.dwNumberOfBytesTransferred } + /// Get the packet reference that this entry refers to. /// /// # Safety diff --git a/src/kqueue.rs b/src/kqueue.rs index 8af4f4b3..9f393459 100644 --- a/src/kqueue.rs +++ b/src/kqueue.rs @@ -411,6 +411,14 @@ impl EventExtra { pub fn is_err(&self) -> Option { None } + + #[inline] + pub fn set_transferred_bytes(&mut self, _transferred_bytes: usize) { + // No-op. + } + + #[inline] + pub fn transferred_bytes(&self) -> usize { 0 } } pub(crate) fn mode_to_flags(mode: PollMode) -> kqueue::EventFlags { diff --git a/src/lib.rs b/src/lib.rs index f4e8c69c..aafca048 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -133,6 +133,8 @@ pub struct Event { pub readable: bool, /// Can it do a write operation without blocking? pub writable: bool, + /// Has the io operation completed? + pub completed: bool, /// System-specific event data. extra: sys::EventExtra, } @@ -183,45 +185,50 @@ pub enum PollMode { impl Event { /// Create a new event. - pub const fn new(key: usize, readable: bool, writable: bool) -> Event { + pub const fn new(key: usize, readable: bool, writable: bool, completed: bool) -> Event { Event { key, readable, writable, + completed, extra: sys::EventExtra::empty(), } } /// All kinds of events (readable and writable). /// - /// Equivalent to: `Event::new(key, true, true)` + /// Equivalent to: `Event::new(key, true, true, false)` #[inline] pub const fn all(key: usize) -> Event { - Event::new(key, true, true) + Event::new(key, true, true, false) } /// Only the readable event. /// - /// Equivalent to: `Event::new(key, true, false)` + /// Equivalent to: `Event::new(key, true, false, false)` #[inline] pub const fn readable(key: usize) -> Event { - Event::new(key, true, false) + Event::new(key, true, false, false) } /// Only the writable event. /// - /// Equivalent to: `Event::new(key, false, true)` + /// Equivalent to: `Event::new(key, false, true, false)` #[inline] - pub const fn writable(key: usize) -> Event { - Event::new(key, false, true) - } + pub const fn writable(key: usize) -> Event { Event::new(key, false, true, false) } + + /// Only the completed event. + /// + /// Equivalent to: `Event::new(key, false, false, true)` + #[inline] + pub const fn completed(key: usize) -> Event { Event::new(key, false, false, true) } /// No events. /// /// Equivalent to: `Event::new(key, false, false)` #[inline] pub const fn none(key: usize) -> Event { - Event::new(key, false, false) + Event::new(key, false, false, false) } /// Add interruption events to this interest. @@ -334,6 +341,26 @@ impl Event { self.extra.is_pri() } + /// The number of transferred bytes. + /// + /// This is only supported on the following platforms: + /// + /// - IOCP + /// + /// On other platforms, this will always return 0. + #[inline] + pub fn transferred_bytes(&self) -> usize { self.extra.transferred_bytes() } + + /// Set the number of transferred bytes. + /// + /// This is only supported on the following platforms: + /// + /// - IOCP + /// + /// On other platforms, this function is a no-op. + #[inline] + pub(crate) fn set_transferred_bytes(&mut self, bytes: usize) { self.extra.set_transferred_bytes(bytes); } + /// Tells if this event is the result of a connection failure. /// /// This function checks if a TCP connection has failed. It corresponds to the `EPOLLERR` or `EPOLLHUP` event in Linux @@ -351,7 +378,7 @@ impl Event { /// let socket = socket2::Socket::new(socket2::Domain::IPV4, Type::STREAM, None)?; /// let poller = polling::Poller::new()?; /// unsafe { - /// poller.add(&socket, Event::new(0, true, true))?; + /// poller.add(&socket, Event::new(0, true, true, false))?; /// } /// let addr = net::SocketAddr::new(net::Ipv4Addr::LOCALHOST.into(), 8080); /// socket.set_nonblocking(true)?; diff --git a/tests/windows_overlapped.rs b/tests/windows_overlapped.rs index 7138529c..873c3a50 100644 --- a/tests/windows_overlapped.rs +++ b/tests/windows_overlapped.rs @@ -7,15 +7,10 @@ use polling::{Event, Events, Poller}; use std::io; use std::os::windows::ffi::OsStrExt; use std::os::windows::io::{AsRawHandle, FromRawHandle, OwnedHandle}; -use windows_sys::core::BOOL; use windows_sys::Win32::{Foundation as wf, Storage::FileSystem as wfs, System::IO as wio}; #[test] fn win32_file_io() { - // Create two completion packets: one for reading, one for writing. - let read_packet = CompletionPacket::new(Event::readable(1)); - let write_packet = CompletionPacket::new(Event::writable(2)); - // Create a poller. let poller = Poller::new().unwrap(); let mut events = Events::new(); @@ -49,8 +44,11 @@ fn win32_file_io() { // Associate this file with the poller. unsafe { let poller_handle = poller.as_raw_handle(); - if wio::CreateIoCompletionPort(file_handle.as_raw_handle() as _, poller_handle as _, 1, 0) - == std::ptr::null_mut() + if wio::CreateIoCompletionPort( + file_handle.as_raw_handle() as _, + poller_handle as _, + 1, + 0) == std::ptr::null_mut() { panic!( "CreateIoCompletionPort failed: {}", @@ -61,24 +59,30 @@ fn win32_file_io() { // Repeatedly write to the pipe. let input_text = "Now is the time for all good men to come to the aid of their party"; + let write_buffer : &[u8] = input_text.as_bytes(); + let mut write_buffer_cursor = & *write_buffer; let mut len = input_text.len(); + + let write_packet = CompletionPacket::new(Event::writable(2)); + while len > 0 { // Begin to write. + // TODO: CompletionPacket is already in use!!!!! But this should be reset + // once the packet completes. in poller.wait(...) let ptr = write_packet.as_ptr() as *mut _; unsafe { if wfs::WriteFile( file_handle.as_raw_handle() as _, - input_text.as_ptr() as _, + write_buffer_cursor.as_ptr() as _, len as _, std::ptr::null_mut(), - ptr, - ) == 0 - && wf::GetLastError() != wf::ERROR_IO_PENDING + ptr) == 0 && wf::GetLastError() != wf::ERROR_IO_PENDING { panic!("WriteFile failed: {}", io::Error::last_os_error()); } } + // Wait for the overlapped operation to complete. 'waiter: loop { events.clear(); @@ -88,16 +92,17 @@ fn win32_file_io() { for event in events.iter() { if event.writable && event.key == 2 { + let bytes_written = event.transferred_bytes(); + write_buffer_cursor = & write_buffer_cursor[bytes_written as usize..]; + len -= bytes_written as usize; break 'waiter; + } } } - - // Decrement the length by the number of bytes written. - let bytes_written = input_text.len(); - len -= bytes_written; } +/* // Close the file and re-open it for reading. drop(file_handle); let file_handle = unsafe { @@ -121,8 +126,11 @@ fn win32_file_io() { // Associate this file with the poller. unsafe { let poller_handle = poller.as_raw_handle(); - if wio::CreateIoCompletionPort(file_handle.as_raw_handle() as _, poller_handle as _, 2, 0) - == std::ptr::null_mut() + if wio::CreateIoCompletionPort( + file_handle.as_raw_handle() as _, + poller_handle as _, + 2, + 0) == std::ptr::null_mut() { panic!( "CreateIoCompletionPort failed: {}", @@ -137,6 +145,7 @@ fn win32_file_io() { let mut len = 1024; let mut bytes_received = 0; + let read_packet = CompletionPacket::new(Event::readable(1)); while bytes_received < input_text.len() { // Begin the read. let ptr = read_packet.as_ptr().cast(); @@ -146,9 +155,7 @@ fn win32_file_io() { buffer_cursor.as_mut_ptr() as _, len as _, std::ptr::null_mut(), - ptr, - ) == 0 - && wf::GetLastError() != wf::ERROR_IO_PENDING + ptr) == 0 && wf::GetLastError() != wf::ERROR_IO_PENDING { panic!("ReadFile failed: {}", io::Error::last_os_error()); } @@ -161,18 +168,17 @@ fn win32_file_io() { for event in events.iter() { if event.readable && event.key == 1 { + let bytes_read = event.transferred_bytes(); + buffer_cursor = &mut buffer_cursor[bytes_read ..]; + len -= bytes_read; + bytes_received += bytes_read; break 'waiter; } } } - - // Increment the cursor and decrement the length by the number of bytes read. - let bytes_read = input_text.len(); - buffer_cursor = &mut buffer_cursor[bytes_read..]; - len -= bytes_read; - bytes_received += bytes_read; } assert_eq!(bytes_received, input_text.len()); assert_eq!(&buffer[..bytes_received], input_text.as_bytes()); -} \ No newline at end of file +*/ +} From 592784fb1b78e580708e6df45f128d132322d503 Mon Sep 17 00:00:00 2001 From: Stephan Puchegger Date: Thu, 26 Feb 2026 14:23:42 +0100 Subject: [PATCH 4/5] Fixed the remaining bug: It was basically a double free. --- src/os/iocp.rs | 8 +++++--- tests/windows_overlapped.rs | 9 +++------ 2 files changed, 8 insertions(+), 9 deletions(-) diff --git a/src/os/iocp.rs b/src/os/iocp.rs index d51a9fc8..9636dbc1 100644 --- a/src/os/iocp.rs +++ b/src/os/iocp.rs @@ -8,6 +8,7 @@ use crate::{Event, PollMode, Poller}; use std::io; use std::os::windows::io::{AsRawHandle, RawHandle}; use std::os::windows::prelude::{AsHandle, BorrowedHandle}; +use std::pin::Pin; use std::sync::Arc; /// A packet used to wake up the poller with an event. @@ -30,12 +31,13 @@ impl CompletionPacket { /// This pointer can be used as an `OVERLAPPED` block in Windows APIs. Calling this function /// marks the block as "in use". Trying to call this function again before the operation is /// indicated as complete by the poller will result in a panic. - pub fn as_ptr(&self) -> *mut () { + pub fn as_overlapped(&self) -> *mut () { if !self.0.as_ref().get().try_lock() { panic!("completion packet is already in use"); } - - self.0.as_ref().get_ref() as *const _ as *const () as *mut () + unsafe { + Arc::into_raw(Pin::into_inner_unchecked(self.0.clone())) as *mut () + } } /// Cancel the in flight operation. diff --git a/tests/windows_overlapped.rs b/tests/windows_overlapped.rs index 873c3a50..0a5b4a24 100644 --- a/tests/windows_overlapped.rs +++ b/tests/windows_overlapped.rs @@ -67,9 +67,7 @@ fn win32_file_io() { while len > 0 { // Begin to write. - // TODO: CompletionPacket is already in use!!!!! But this should be reset - // once the packet completes. in poller.wait(...) - let ptr = write_packet.as_ptr() as *mut _; + let ptr = write_packet.as_overlapped().cast(); unsafe { if wfs::WriteFile( file_handle.as_raw_handle() as _, @@ -102,7 +100,7 @@ fn win32_file_io() { } } -/* + // Close the file and re-open it for reading. drop(file_handle); let file_handle = unsafe { @@ -148,7 +146,7 @@ fn win32_file_io() { let read_packet = CompletionPacket::new(Event::readable(1)); while bytes_received < input_text.len() { // Begin the read. - let ptr = read_packet.as_ptr().cast(); + let ptr = read_packet.as_overlapped().cast(); unsafe { if wfs::ReadFile( file_handle.as_raw_handle() as _, @@ -180,5 +178,4 @@ fn win32_file_io() { assert_eq!(bytes_received, input_text.len()); assert_eq!(&buffer[..bytes_received], input_text.as_bytes()); -*/ } From c05700613669824a46124f9ed2c599540acefe7d Mon Sep 17 00:00:00 2001 From: Stephan Puchegger Date: Thu, 26 Feb 2026 17:16:15 +0100 Subject: [PATCH 5/5] Simplify the handling of how many bytes were actually transferred. --- examples/tcp_client.rs | 2 +- src/epoll.rs | 8 ------ src/iocp/afd.rs | 2 +- src/iocp/mod.rs | 26 +++----------------- src/iocp/port.rs | 2 -- src/kqueue.rs | 8 ------ src/lib.rs | 49 +++++++++---------------------------- src/os/iocp.rs | 16 +++++++++++- tests/windows_overlapped.rs | 9 +++---- 9 files changed, 36 insertions(+), 86 deletions(-) diff --git a/examples/tcp_client.rs b/examples/tcp_client.rs index fd5219e6..431c0ae1 100644 --- a/examples/tcp_client.rs +++ b/examples/tcp_client.rs @@ -7,7 +7,7 @@ fn main() -> io::Result<()> { let socket = socket2::Socket::new(socket2::Domain::IPV4, Type::STREAM, None)?; let poller = polling::Poller::new()?; unsafe { - poller.add(&socket, Event::new(0, true, true, false))?; + poller.add(&socket, Event::new(0, true, true))?; } let addr = net::SocketAddr::new(net::Ipv4Addr::LOCALHOST.into(), 8080); socket.set_nonblocking(true)?; diff --git a/src/epoll.rs b/src/epoll.rs index fb3735ea..4137cf10 100644 --- a/src/epoll.rs +++ b/src/epoll.rs @@ -408,14 +408,6 @@ impl EventExtra { pub fn is_err(&self) -> Option { Some(self.flags.contains(epoll::EventFlags::ERR)) } - - #[inline] - pub fn set_transferred_bytes(&mut self, _transferred_bytes: usize) { - // No-op. - } - - #[inline] - pub fn transferred_bytes(&self) -> usize { 0 } } /// The notifier for Linux. diff --git a/src/iocp/afd.rs b/src/iocp/afd.rs index cd5a462c..33b32945 100644 --- a/src/iocp/afd.rs +++ b/src/iocp/afd.rs @@ -595,7 +595,7 @@ impl From for IoStatusBlock { } impl IoStatusBlock { - pub(super) fn padded_io_status_block(self: Pin<&Self>) -> &UnsafeCell { + pub(crate) fn padded_io_status_block(self: Pin<&Self>) -> &UnsafeCell { self.project_ref().padded_io_status_block } diff --git a/src/iocp/mod.rs b/src/iocp/mod.rs index a2eeadbf..a8771cf4 100644 --- a/src/iocp/mod.rs +++ b/src/iocp/mod.rs @@ -479,11 +479,10 @@ impl Poller { // Process all of the events. for entry in events.completions.drain(..) { - let transferred_bytes = entry.transferred_bytes(); let packet = entry.into_packet(); // Feed the event into the packet. - match packet.feed_event(self, transferred_bytes)? { + match packet.feed_event(self)? { FeedEventResult::NoEvent => {} FeedEventResult::Event(event) => { events.packets.push(event); @@ -665,8 +664,6 @@ impl Events { pub struct EventExtra { /// Flags associated with this event. flags: AfdPollMask, - /// Number of bytes transferred - transferred_bytes: usize, } impl EventExtra { @@ -675,7 +672,6 @@ impl EventExtra { pub const fn empty() -> EventExtra { EventExtra { flags: AfdPollMask::empty(), - transferred_bytes: 0, } } @@ -696,7 +692,7 @@ impl EventExtra { pub fn set_hup(&mut self, active: bool) { self.flags.set(AfdPollMask::ABORT, active); } - + /// Set up a listener for PRI events. #[inline] pub fn set_pri(&mut self, active: bool) { @@ -714,15 +710,6 @@ impl EventExtra { pub fn is_err(&self) -> Option { Some(self.flags.intersects(AfdPollMask::CONNECT_FAIL)) } - - /// Set the number of transferred bytes - #[inline] - pub fn set_transferred_bytes(&mut self, transferred_bytes: usize) { self.transferred_bytes = transferred_bytes; } - - /// Get the number of transferred bytes - #[inline] - pub fn transferred_bytes(&self) -> usize { self.transferred_bytes } - } /// The type of our completion packet. @@ -957,19 +944,14 @@ impl PacketUnwrapped { /// /// This indicates that this packet was indicated as "ready" by the IOCP and needs to be /// processed. - fn feed_event( - self: Pin>, - poller: &Poller, - transferred_bytes: u32) -> io::Result { + fn feed_event(self: Pin>, poller: &Poller) -> io::Result { let inner = self.as_ref().data().project_ref(); let (afd_info, socket) = match inner { PacketInnerProj::Socket { packet, socket } => (packet, socket), PacketInnerProj::Custom { event } => { - let mut event = event.clone(); - event.set_transferred_bytes(transferred_bytes as usize); // This is a custom event. - return Ok(FeedEventResult::Event(event)); + return Ok(FeedEventResult::Event(*event)); } PacketInnerProj::Wakeup { .. } => { // The poller was notified. diff --git a/src/iocp/port.rs b/src/iocp/port.rs index 4a24ca49..2887f500 100644 --- a/src/iocp/port.rs +++ b/src/iocp/port.rs @@ -279,8 +279,6 @@ impl OverlappedEntry { packet } - pub(super) fn transferred_bytes(&self) -> u32 { self.entry.dwNumberOfBytesTransferred } - /// Get the packet reference that this entry refers to. /// /// # Safety diff --git a/src/kqueue.rs b/src/kqueue.rs index 9f393459..8af4f4b3 100644 --- a/src/kqueue.rs +++ b/src/kqueue.rs @@ -411,14 +411,6 @@ impl EventExtra { pub fn is_err(&self) -> Option { None } - - #[inline] - pub fn set_transferred_bytes(&mut self, _transferred_bytes: usize) { - // No-op. - } - - #[inline] - pub fn transferred_bytes(&self) -> usize { 0 } } pub(crate) fn mode_to_flags(mode: PollMode) -> kqueue::EventFlags { diff --git a/src/lib.rs b/src/lib.rs index aafca048..f4e8c69c 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -133,8 +133,6 @@ pub struct Event { pub readable: bool, /// Can it do a write operation without blocking? pub writable: bool, - /// Has the io operation completed? - pub completed: bool, /// System-specific event data. extra: sys::EventExtra, } @@ -185,50 +183,45 @@ pub enum PollMode { impl Event { /// Create a new event. - pub const fn new(key: usize, readable: bool, writable: bool, completed: bool) -> Event { + pub const fn new(key: usize, readable: bool, writable: bool) -> Event { Event { key, readable, writable, - completed, extra: sys::EventExtra::empty(), } } /// All kinds of events (readable and writable). /// - /// Equivalent to: `Event::new(key, true, true, false)` + /// Equivalent to: `Event::new(key, true, true)` #[inline] pub const fn all(key: usize) -> Event { - Event::new(key, true, true, false) + Event::new(key, true, true) } /// Only the readable event. /// - /// Equivalent to: `Event::new(key, true, false, false)` + /// Equivalent to: `Event::new(key, true, false)` #[inline] pub const fn readable(key: usize) -> Event { - Event::new(key, true, false, false) + Event::new(key, true, false) } /// Only the writable event. /// - /// Equivalent to: `Event::new(key, false, true, false)` + /// Equivalent to: `Event::new(key, false, true)` #[inline] - pub const fn writable(key: usize) -> Event { Event::new(key, false, true, false) } - - /// Only the completed event. - /// - /// Equivalent to: `Event::new(key, false, false, true)` - #[inline] - pub const fn completed(key: usize) -> Event { Event::new(key, false, false, true) } + pub const fn writable(key: usize) -> Event { + Event::new(key, false, true) + } /// No events. /// /// Equivalent to: `Event::new(key, false, false)` #[inline] pub const fn none(key: usize) -> Event { - Event::new(key, false, false, false) + Event::new(key, false, false) } /// Add interruption events to this interest. @@ -341,26 +334,6 @@ impl Event { self.extra.is_pri() } - /// The number of transferred bytes. - /// - /// This is only supported on the following platforms: - /// - /// - IOCP - /// - /// On other platforms, this will always return 0. - #[inline] - pub fn transferred_bytes(&self) -> usize { self.extra.transferred_bytes() } - - /// Set the number of transferred bytes. - /// - /// This is only supported on the following platforms: - /// - /// - IOCP - /// - /// On other platforms, this function is a no-op. - #[inline] - pub(crate) fn set_transferred_bytes(&mut self, bytes: usize) { self.extra.set_transferred_bytes(bytes); } - /// Tells if this event is the result of a connection failure. /// /// This function checks if a TCP connection has failed. It corresponds to the `EPOLLERR` or `EPOLLHUP` event in Linux @@ -378,7 +351,7 @@ impl Event { /// let socket = socket2::Socket::new(socket2::Domain::IPV4, Type::STREAM, None)?; /// let poller = polling::Poller::new()?; /// unsafe { - /// poller.add(&socket, Event::new(0, true, true, false))?; + /// poller.add(&socket, Event::new(0, true, true))?; /// } /// let addr = net::SocketAddr::new(net::Ipv4Addr::LOCALHOST.into(), 8080); /// socket.set_nonblocking(true)?; diff --git a/src/os/iocp.rs b/src/os/iocp.rs index 9636dbc1..22512511 100644 --- a/src/os/iocp.rs +++ b/src/os/iocp.rs @@ -31,15 +31,29 @@ impl CompletionPacket { /// This pointer can be used as an `OVERLAPPED` block in Windows APIs. Calling this function /// marks the block as "in use". Trying to call this function again before the operation is /// indicated as complete by the poller will result in a panic. - pub fn as_overlapped(&self) -> *mut () { + pub fn as_overlapped_ptr(&self) -> *mut () { if !self.0.as_ref().get().try_lock() { panic!("completion packet is already in use"); } + // The key point here is to increment the Arc reference count by cloning it. + // Otherwise, the Arc<> will be dropped in the method Poller::wait_deadline + // after it is re-created via from_raw() once the overlapped io has completed. unsafe { Arc::into_raw(Pin::into_inner_unchecked(self.0.clone())) as *mut () } } + /// Get the number of transferred bytes after an OVERLAPPED IO has finished. + pub fn transferred_bytes(&self) -> usize { + if !self.0.as_ref().get().try_lock() { + panic!("completion packet is currently in use"); + } + + unsafe { + (*self.0.as_ref().padded_io_status_block().get()).overlapped.InternalHigh + } + } + /// Cancel the in flight operation. /// /// # Safety diff --git a/tests/windows_overlapped.rs b/tests/windows_overlapped.rs index 0a5b4a24..0564f107 100644 --- a/tests/windows_overlapped.rs +++ b/tests/windows_overlapped.rs @@ -67,7 +67,7 @@ fn win32_file_io() { while len > 0 { // Begin to write. - let ptr = write_packet.as_overlapped().cast(); + let ptr = write_packet.as_overlapped_ptr().cast(); unsafe { if wfs::WriteFile( file_handle.as_raw_handle() as _, @@ -90,11 +90,10 @@ fn win32_file_io() { for event in events.iter() { if event.writable && event.key == 2 { - let bytes_written = event.transferred_bytes(); + let bytes_written = write_packet.transferred_bytes(); write_buffer_cursor = & write_buffer_cursor[bytes_written as usize..]; len -= bytes_written as usize; break 'waiter; - } } } @@ -146,7 +145,7 @@ fn win32_file_io() { let read_packet = CompletionPacket::new(Event::readable(1)); while bytes_received < input_text.len() { // Begin the read. - let ptr = read_packet.as_overlapped().cast(); + let ptr = read_packet.as_overlapped_ptr().cast(); unsafe { if wfs::ReadFile( file_handle.as_raw_handle() as _, @@ -166,7 +165,7 @@ fn win32_file_io() { for event in events.iter() { if event.readable && event.key == 1 { - let bytes_read = event.transferred_bytes(); + let bytes_read = read_packet.transferred_bytes(); buffer_cursor = &mut buffer_cursor[bytes_read ..]; len -= bytes_read; bytes_received += bytes_read;