Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,13 @@ version = "0.5.0"
easy-parallel = "3.1.0"
fastrand = "2.0.0"
socket2 = "0.6.0"
tracing-subscriber = "0.3"

[target.'cfg(unix)'.dev-dependencies]
libc = "0.2"

[target.'cfg(all(unix, not(target_os="vita")))'.dev-dependencies]
signal-hook = "0.3.17"

[target.'cfg(windows)'.dev-dependencies]
tempfile = "3.7"
29 changes: 22 additions & 7 deletions src/iocp/afd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,11 @@ use windows_sys::Win32::Networking::WinSock::{
};
use windows_sys::Win32::Storage::FileSystem::{FILE_SHARE_READ, FILE_SHARE_WRITE, SYNCHRONIZE};
use windows_sys::Win32::System::LibraryLoader::{GetModuleHandleW, GetProcAddress};
use windows_sys::Win32::System::IO::IO_STATUS_BLOCK;
use windows_sys::Win32::System::IO::{IO_STATUS_BLOCK, OVERLAPPED};

#[derive(Default)]
#[repr(C)]
pub(super) struct AfdPollInfo {
pub(crate) struct AfdPollInfo {
/// The timeout for this poll.
timeout: i64,

Expand Down Expand Up @@ -535,12 +535,27 @@ where
}
}

// The OVERLAPPED struct is larger than the IO_STATUS_BLOCK struct.
// This way it is possible to use the memory as either/or without
// the risk of reading or writing out-of-bounds.
#[repr(C)]
pub(crate) union PaddedIOStatusBlock {
pub(crate) io_status_block: IO_STATUS_BLOCK,
pub(crate) overlapped: OVERLAPPED,
}

impl Default for PaddedIOStatusBlock {
fn default() -> Self {
unsafe { core::mem::zeroed() }
}
}

pin_project_lite::pin_project! {
/// An I/O status block paired with some auxiliary data.
#[repr(C)]
pub(super) struct IoStatusBlock<T> {
pub(crate) struct IoStatusBlock<T> {
// The I/O status block.
iosb: UnsafeCell<IO_STATUS_BLOCK>,
padded_io_status_block: UnsafeCell<PaddedIOStatusBlock>,

// Whether or not the block is in use.
in_use: AtomicBool,
Expand Down Expand Up @@ -571,7 +586,7 @@ unsafe impl<T: Sync> Sync for IoStatusBlock<T> {}
impl<T> From<T> for IoStatusBlock<T> {
fn from(data: T) -> Self {
Self {
iosb: UnsafeCell::new(unsafe { std::mem::zeroed() }),
padded_io_status_block: UnsafeCell::new(unsafe { std::mem::zeroed() }),
in_use: AtomicBool::new(false),
data,
_marker: PhantomPinned,
Expand All @@ -580,8 +595,8 @@ impl<T> From<T> for IoStatusBlock<T> {
}

impl<T> IoStatusBlock<T> {
pub(super) fn iosb(self: Pin<&Self>) -> &UnsafeCell<IO_STATUS_BLOCK> {
self.project_ref().iosb
pub(crate) fn padded_io_status_block(self: Pin<&Self>) -> &UnsafeCell<PaddedIOStatusBlock> {
self.project_ref().padded_io_status_block
}

pub(super) fn data(self: Pin<&Self>) -> Pin<&T> {
Expand Down
50 changes: 21 additions & 29 deletions src/iocp/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,12 @@
mod afd;
mod port;

use afd::{base_socket, Afd, AfdPollInfo, AfdPollMask, HasAfdInfo, IoStatusBlock};
use afd::{base_socket, Afd, AfdPollInfo, AfdPollMask, HasAfdInfo};
use port::{IoCompletionPort, OverlappedEntry};

pub(crate) use afd::IoStatusBlock;
pub(crate) use port::{Completion, CompletionHandle};

use windows_sys::Win32::Foundation::{ERROR_INVALID_HANDLE, ERROR_IO_PENDING, STATUS_CANCELLED};
use windows_sys::Win32::System::Threading::{
RegisterWaitForSingleObject, UnregisterWait, INFINITE, WT_EXECUTELONGFUNCTION,
Expand Down Expand Up @@ -511,7 +514,7 @@ impl Poller {
}

/// Push an IOCP packet into the queue.
pub(super) fn post(&self, packet: CompletionPacket) -> io::Result<()> {
pub(super) fn post(&self, packet: crate::os::iocp::CompletionPacket) -> io::Result<()> {
self.port.post(0, 0, packet.0)
}

Expand Down Expand Up @@ -709,38 +712,17 @@ impl EventExtra {
}
}

/// A packet used to wake up the poller with an event.
#[derive(Debug, Clone)]
pub struct CompletionPacket(Packet);

impl CompletionPacket {
/// Create a new completion packet with a custom event.
pub fn new(event: Event) -> Self {
Self(Arc::pin(IoStatusBlock::from(PacketInner::Custom { event })))
}

/// Get the event associated with this packet.
pub fn event(&self) -> &Event {
let data = self.0.as_ref().data().project_ref();

match data {
PacketInnerProj::Custom { event } => event,
_ => unreachable!(),
}
}
}

/// The type of our completion packet.
///
/// It needs to be pinned, since it contains data that is expected by IOCP not to be moved.
type Packet = Pin<Arc<PacketUnwrapped>>;
pub(crate) type Packet = Pin<Arc<PacketUnwrapped>>;
type PacketUnwrapped = IoStatusBlock<PacketInner>;

pin_project! {
/// The inner type of the packet.
#[project_ref = PacketInnerProj]
#[project = PacketInnerProjMut]
enum PacketInner {
pub(crate) enum PacketInner {
// A packet for a socket.
Socket {
// The AFD packet state.
Expand Down Expand Up @@ -796,6 +778,16 @@ impl HasAfdInfo for PacketInner {
}

impl PacketUnwrapped {
/// If this is an event packet, get the event.
pub(crate) fn event(self: Pin<&Self>) -> &Event {
let data = self.data().project_ref();

match data {
PacketInnerProj::Custom { event } => event,
_ => unreachable!(),
}
}

/// Set the new events that this socket is waiting on.
///
/// Returns `true` if we need to be updated.
Expand Down Expand Up @@ -995,10 +987,10 @@ impl PacketUnwrapped {

unsafe {
// SAFETY: The packet is not in transit.
let iosb = &mut *self.as_ref().iosb().get();
let iosb = &mut *self.as_ref().padded_io_status_block().get();

// Check the status.
match iosb.Anonymous.Status {
match iosb.io_status_block.Anonymous.Status {
STATUS_CANCELLED => {
// Poll request was cancelled.
}
Expand Down Expand Up @@ -1113,7 +1105,7 @@ impl PacketUnwrapped {

/// Per-socket state.
#[derive(Debug)]
struct SocketState {
pub(crate) struct SocketState {
/// The raw socket handle.
socket: RawSocket,

Expand Down Expand Up @@ -1158,7 +1150,7 @@ enum SocketStatus {

/// Per-waitable handle state.
#[derive(Debug)]
struct WaitableState {
pub(crate) struct WaitableState {
/// The handle that this state is for.
handle: RawHandle,

Expand Down
4 changes: 2 additions & 2 deletions src/iocp/port.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use windows_sys::Win32::System::IO::{
/// # Safety
///
/// This must be a valid completion block.
pub(super) unsafe trait Completion {
pub(crate) unsafe trait Completion {
/// Signal to the completion block that we are about to start an operation.
fn try_lock(self: Pin<&Self>) -> bool;

Expand All @@ -40,7 +40,7 @@ pub(super) unsafe trait Completion {
/// # Safety
///
/// This must be a valid completion block.
pub(super) unsafe trait CompletionHandle: Deref + Sized {
pub(crate) unsafe trait CompletionHandle: Deref + Sized {
/// Type of the completion block.
type Completion: Completion;

Expand Down
57 changes: 56 additions & 1 deletion src/os/iocp.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,68 @@
//! Functionality that is only available for IOCP-based platforms.

pub use crate::sys::CompletionPacket;
use crate::sys::{Completion, CompletionHandle, IoStatusBlock, Packet, PacketInner};

use super::__private::PollerSealed;
use crate::{Event, PollMode, Poller};

use std::io;
use std::os::windows::io::{AsRawHandle, RawHandle};
use std::os::windows::prelude::{AsHandle, BorrowedHandle};
use std::pin::Pin;
use std::sync::Arc;

/// A packet used to wake up the poller with an event.
#[derive(Debug, Clone)]
pub struct CompletionPacket(pub(crate) Packet);

impl CompletionPacket {
/// Create a new completion packet with a custom event.
pub fn new(event: Event) -> Self {
Self(Arc::pin(IoStatusBlock::from(PacketInner::Custom { event })))
}

/// Get the event associated with this packet.
pub fn event(&self) -> &Event {
self.0.as_ref().event()
}

/// Get a pointer to the underlying I/O status block.
///
/// This pointer can be used as an `OVERLAPPED` block in Windows APIs. Calling this function
/// marks the block as "in use". Trying to call this function again before the operation is
/// indicated as complete by the poller will result in a panic.
pub fn as_overlapped_ptr(&self) -> *mut () {
if !self.0.as_ref().get().try_lock() {
panic!("completion packet is already in use");
}
// The key point here is to increment the Arc reference count by cloning it.
// Otherwise, the Arc<> will be dropped in the method Poller::wait_deadline
// after it is re-created via from_raw() once the overlapped io has completed.
unsafe {
Arc::into_raw(Pin::into_inner_unchecked(self.0.clone())) as *mut ()
}
}

/// Get the number of transferred bytes after an OVERLAPPED IO has finished.
pub fn transferred_bytes(&self) -> usize {
if !self.0.as_ref().get().try_lock() {
panic!("completion packet is currently in use");
}

unsafe {
(*self.0.as_ref().padded_io_status_block().get()).overlapped.InternalHigh
}
}

/// Cancel the in flight operation.
///
/// # Safety
///
/// The packet must be in flight and the operation must be cancelled already.
pub unsafe fn cancel(&mut self) {
self.0.as_ref().get().unlock();
}
}

/// Extension trait for the [`Poller`] type that provides functionality specific to IOCP-based
/// platforms.
Expand Down
Loading