From e224d5842cbe325afb814e2eeb5f9576476d6a36 Mon Sep 17 00:00:00 2001 From: NthTensor Date: Fri, 27 Jun 2025 17:53:45 -0400 Subject: [PATCH] feat: add rayon-compat crate --- CHANGELOG.md | 18 +++++ Cargo.lock | 7 ++ Cargo.toml | 3 +- rayon-compat/Cargo.toml | 10 +++ rayon-compat/README.md | 14 ++++ rayon-compat/src/lib.rs | 171 ++++++++++++++++++++++++++++++++++++++++ src/job.rs | 15 +++- src/lib.rs | 1 + src/scope.rs | 88 +++++++++++---------- src/thread_pool.rs | 152 +++++++++++++++++++++-------------- src/unwind.rs | 37 +++++++++ 11 files changed, 411 insertions(+), 105 deletions(-) create mode 100644 rayon-compat/Cargo.toml create mode 100644 rayon-compat/README.md create mode 100644 rayon-compat/src/lib.rs create mode 100644 src/unwind.rs diff --git a/CHANGELOG.md b/CHANGELOG.md index 9eabdf4..4a574af 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,6 +13,24 @@ This project is currently in early [pre-release], and there may be arbitrary bre ## [Unreleased] +### Added + +- `rayon-compat` crate, for running rayon on top of forte. +- `Worker::migrated()` which is `true` when the current job has moved between threads. + +### Changed + +- Heartbeat frequency is now 100 microseconds. +- Heartbeats are now more evenly distributed between workers. +- The heartbeat thread goes to sleep when the pool is not in use. +- Shared work is now queued and claimed in the order it was shared. +- The `Scope` API is now identical to `rayon-core` and does not use `Pin`. +- `Scope::new`, `Scope::add_reference` and `Scope::remove_reference` are now private. + +### Security + +- Forte now implements exception safety. Panics can no longer cause UB. + ## [1.0.0-alpha.3] ### Added diff --git a/Cargo.lock b/Cargo.lock index d3fe02d..f3c010e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -603,6 +603,13 @@ dependencies = [ "rayon-core", ] +[[package]] +name = "rayon-compat" +version = "1.12.1" +dependencies = [ + "forte", +] + [[package]] name = "rayon-core" version = "1.12.1" diff --git a/Cargo.toml b/Cargo.toml index ae7424f..5385bca 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,8 +8,7 @@ repository = "https://github.com/NthTensor/Forte" [workspace] resolver = "2" -members = ["ci"] -exclude = ["coz"] +members = ["ci", "rayon-compat"] [dependencies] async-task = "4.7.1" diff --git a/rayon-compat/Cargo.toml b/rayon-compat/Cargo.toml new file mode 100644 index 0000000..ef76ab3 --- /dev/null +++ b/rayon-compat/Cargo.toml @@ -0,0 +1,10 @@ +[package] +name = "rayon-compat" +version = "1.12.1" +edition = "2024" + +[dependencies] +forte = { path = ".." } + +[features] +web_spin_lock = [] diff --git a/rayon-compat/README.md b/rayon-compat/README.md new file mode 100644 index 0000000..da70597 --- /dev/null +++ b/rayon-compat/README.md @@ -0,0 +1,14 @@ +# Rayon Compat + +This is a way to run `rayon` on top of `forte`! The `rayon-compat` crate mocks the important bits of the api of `rayon_core` in a pretty simple and crude way, which is none-the-less enough to support most of what `rayon` needs. + +To use this crate, apply the following cargo patch like one of these: +``` +// If you want to clone forte and use it locally +[patch.crates-io] +rayon-core = { path = "path to this repo", package = "rayon-compat" } + +// If you want to use the latest published version of forte +[patch.crates-io] +rayon-core = { path = "https://github.com/NthTensor/Forte", package = "rayon-compat" } +``` diff --git a/rayon-compat/src/lib.rs b/rayon-compat/src/lib.rs new file mode 100644 index 0000000..e130e91 --- /dev/null +++ b/rayon-compat/src/lib.rs @@ -0,0 +1,171 @@ +use std::sync::atomic::{AtomicBool, Ordering}; + +pub static THREAD_POOL: forte::ThreadPool = const { forte::ThreadPool::new() }; + +pub static STARTED: AtomicBool = const { AtomicBool::new(false) }; + +#[inline(always)] +fn ensure_started() { + if !STARTED.load(Ordering::Relaxed) && !STARTED.swap(true, Ordering::Relaxed) { + THREAD_POOL.resize_to_available(); + } +} + +#[inline(always)] +pub fn current_num_threads() -> usize { + 64 // Forte prefers smaller tasks, so it's better to lie to rayon about the size of the pool +} + +#[inline(always)] +pub fn current_thread_index() -> Option { + forte::Worker::map_current(|worker| worker.index()) +} + +#[inline(always)] +pub fn max_num_threads() -> usize { + usize::MAX // The number of forte workers is only bounded by the size of a vector. +} + +// ----------------------------------------------------------------------------- +// Join + +#[derive(Debug)] +pub struct FnContext { + /// True if the task was migrated. + migrated: bool, +} + +impl FnContext { + #[inline(always)] + pub fn migrated(&self) -> bool { + self.migrated + } +} + +#[inline(always)] +pub fn join_context(oper_a: A, oper_b: B) -> (RA, RB) +where + A: FnOnce(FnContext) -> RA + Send, + B: FnOnce(FnContext) -> RB + Send, + RA: Send, + RB: Send, +{ + ensure_started(); + THREAD_POOL.join( + |worker| { + let migrated = worker.migrated(); + let ctx = FnContext { migrated }; + oper_a(ctx) + }, + |worker| { + let migrated = worker.migrated(); + let ctx = FnContext { migrated }; + oper_b(ctx) + }, + ) +} + +#[inline(always)] +pub fn join(oper_a: A, oper_b: B) -> (RA, RB) +where + A: FnOnce() -> RA + Send, + B: FnOnce() -> RB + Send, + RA: Send, + RB: Send, +{ + ensure_started(); + THREAD_POOL.join(|_| oper_a(), |_| oper_b()) +} + +// ----------------------------------------------------------------------------- +// Scope + +pub use forte::Scope; + +#[inline(always)] +pub fn scope<'scope, OP, R>(op: OP) -> R +where + OP: FnOnce(&Scope<'scope>) -> R + Send, + R: Send, +{ + ensure_started(); + forte::scope(op) +} + +#[inline(always)] +pub fn in_place_scope<'scope, OP, R>(op: OP) -> R +where + OP: FnOnce(&Scope<'scope>) -> R, +{ + ensure_started(); + forte::scope(op) +} + +// ----------------------------------------------------------------------------- +// Spawn + +#[inline(always)] +pub fn spawn(func: F) +where + F: FnOnce() + Send + 'static, +{ + ensure_started(); + THREAD_POOL.spawn(|_| func()) +} + +// ----------------------------------------------------------------------------- +// Yield + +pub use forte::Yield; + +pub fn yield_local() -> Yield { + let result = forte::Worker::map_current(forte::Worker::yield_local); + match result { + Some(status) => status, + _ => Yield::Idle, + } +} + +pub fn yield_now() -> Yield { + let result = forte::Worker::map_current(forte::Worker::yield_now); + match result { + Some(status) => status, + _ => Yield::Idle, + } +} + +// ----------------------------------------------------------------------------- +// Fake stuff that dosn't work. These are here only so so that rayon can export +// them. + +pub struct ThreadBuilder; + +pub struct ThreadPool; + +pub struct ThreadPoolBuildError; + +pub struct ThreadPoolBuilder; + +pub struct BroadcastContext; + +pub struct ScopeFifo; + +pub fn broadcast() { + unimplemented!() +} + +pub fn spawn_broadcast() { + unimplemented!() +} + +pub fn scope_fifo() { + unimplemented!() +} + +pub fn in_place_scope_fifo() { + unimplemented!() +} + +pub fn spawn_fifo() { + unimplemented!() +} diff --git a/src/job.rs b/src/job.rs index 41d811d..854d3f2 100644 --- a/src/job.rs +++ b/src/job.rs @@ -16,9 +16,11 @@ use alloc::collections::VecDeque; use core::cell::UnsafeCell; use core::mem::ManuallyDrop; use core::ptr::NonNull; +use std::thread::Result as ThreadResult; use crate::signal::Signal; use crate::thread_pool::Worker; +use crate::unwind; // ----------------------------------------------------------------------------- // Runnable @@ -152,7 +154,7 @@ impl JobQueue { /// This is analogous to the chili type `JobStack` and the rayon type `StackJob`. pub struct StackJob { f: UnsafeCell>, - signal: Signal, + signal: Signal>, } impl StackJob @@ -210,7 +212,7 @@ where /// closure's return value is sent over this signal after the job is /// executed. #[inline(always)] - pub fn signal(&self) -> &Signal { + pub fn signal(&self) -> &Signal> { &self.signal } } @@ -235,6 +237,9 @@ where // SAFETY: The caller ensures `this` can be converted into an immutable // reference. let this = unsafe { this.cast::().as_ref() }; + // Create an abort guard. If the closure panics, this will convert the + // panic into an abort. Doing so prevents use-after-free for other elements of the stack. + let abort_guard = unwind::AbortOnDrop; // SAFETY: This memory location is accessed only in this function and in // `unwrap`. The latter cannot have been called, because it drops the // stack job, so, since this function is called only once, we can @@ -242,13 +247,15 @@ where let f_ref = unsafe { &mut *this.f.get() }; // SAFETY: The caller ensures this function is called only once. let f = unsafe { ManuallyDrop::take(f_ref) }; - // Run the job. - let result = f(worker); + // Run the job. If the job panics, we propagate the panic back to the main thread. + let result = unwind::halt_unwinding(|| f(worker)); // SAFETY: This is valid for the access used by `send` because // `&this.signal` is an immutable reference to a `Signal`. Because // `send` is only called in this function, and this function is never // called again, `send` is never called again. unsafe { Signal::send(&this.signal, result) } + // Forget the abort guard, re-enabling panics. + core::mem::forget(abort_guard); } } diff --git a/src/lib.rs b/src/lib.rs index 5666b5d..4afcebf 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -40,6 +40,7 @@ mod job; mod scope; mod signal; mod thread_pool; +mod unwind; // ----------------------------------------------------------------------------- // Top-level exports diff --git a/src/scope.rs b/src/scope.rs index fa3a646..87ccfa2 100644 --- a/src/scope.rs +++ b/src/scope.rs @@ -5,7 +5,6 @@ use alloc::boxed::Box; use core::future::Future; use core::marker::PhantomData; use core::marker::PhantomPinned; -use core::pin::Pin; use async_task::Runnable; use async_task::Task; @@ -47,13 +46,15 @@ impl<'scope> Scope<'scope> { /// Every scope contains a lifetime `'scope`, which must outlive anything /// spawned onto the scope. /// - /// Before a scope can be used, it must be pinned. The recommended approach - /// is to use the `pin!` macro to get a `Pin<&mut Scope<'scope>>` which you - /// then convert into a `Pin<&Scope<'scope>>` via `into_ref`. - /// /// When a scope is dropped, it will block the thread until all work /// spawened on the scope is complete. - pub fn new() -> Scope<'scope> { + /// + /// # Safety + /// + /// The caller must pin the scope before it can be used (This cannot be + /// enforced on the type-level due to compatibility requirements with rayon) + /// and must ensure the scope is eventually dropped. + pub(crate) unsafe fn new() -> Scope<'scope> { Scope { count: AtomicU32::new(1), signal: Signal::new(), @@ -86,12 +87,16 @@ impl<'scope> Scope<'scope> { /// /// Panics if not called from within a worker. /// - pub fn spawn(self: Pin<&Self>, f: F) + pub fn spawn(&self, f: F) where - F: FnOnce(Pin<&Scope<'scope>>) + Send + 'scope, + F: FnOnce(&Scope<'scope>) + Send + 'scope, { // Create a job to execute the spawned function in the scope. - let scope_ptr = ScopePtr::new(self); + // + // SAFETY: This scope must be pinned, since the only way to create a + // scope is via `Scope::new` and that function requires the caller pin + // the scope before using it. + let scope_ptr = unsafe { ScopePtr::new(self) }; let job = HeapJob::new(move |_| { scope_ptr.run(f); }); @@ -142,7 +147,7 @@ impl<'scope> Scope<'scope> { /// /// Panics if not called within a worker. /// - pub fn spawn_future(self: Pin<&Self>, future: F) -> Task + pub fn spawn_future(&self, future: F) -> Task where F: Future + Send + 'scope, T: Send + 'scope, @@ -162,14 +167,18 @@ impl<'scope> Scope<'scope> { /// /// Panics if not called within a worker. /// - pub fn spawn_async(self: Pin<&Self>, f: Fn) -> Task + pub fn spawn_async(&self, f: Fn) -> Task where - Fn: FnOnce(Pin<&Scope<'scope>>) -> Fut + Send + 'scope, + Fn: FnOnce(&Scope<'scope>) -> Fut + Send + 'scope, Fut: Future + Send + 'scope, T: Send + 'scope, { // Wrap the function into a future using an async block. - let scope_ptr = ScopePtr::new(self); + // + // SAFETY: This scope must be pinned, since the only way to create a + // scope is via `Scope::new` and that function requires the caller pin + // the scope before using it. + let scope_ptr = unsafe { ScopePtr::new(self) }; let future = async move { scope_ptr.run(f).await }; // The schedule function will turn the future into a job when woken. @@ -223,7 +232,7 @@ impl<'scope> Scope<'scope> { /// Every call to this should have a matching call to /// `Scope::remove_reference`, or the scope will block forever on /// completion. - pub fn add_reference(&self) { + fn add_reference(&self) { let counter = self.count.fetch_add(1, Ordering::SeqCst); tracing::trace!("scope reference counter increased to {}", counter + 1); } @@ -235,7 +244,7 @@ impl<'scope> Scope<'scope> { /// The caller must ensure that there is exactly one a matching call to /// `add_reference` for every call to this function, unless used within /// `Scope::complete`. - pub unsafe fn remove_reference(&self) { + unsafe fn remove_reference(&self) { let counter = self.count.fetch_sub(1, Ordering::SeqCst); tracing::trace!("scope reference counter decreased to {}", counter - 1); if counter == 1 { @@ -252,12 +261,6 @@ impl<'scope> Scope<'scope> { } } -impl<'scope> Default for Scope<'scope> { - fn default() -> Self { - Self::new() - } -} - impl Drop for Scope<'_> { fn drop(&mut self) { // When the scope is dropped, block to prevent deallocation until the @@ -281,8 +284,6 @@ impl Drop for Scope<'_> { mod scope_ptr { //! Defines a "lifetime-erased" reference-counting pointer to a scope. - use core::pin::Pin; - use super::Scope; /// A reference-counted pointer to a scope. Used to capture a scope pointer @@ -299,37 +300,39 @@ mod scope_ptr { impl<'scope> ScopePtr<'scope> { /// Creates a new reference-counted scope pointer which can be sent to other /// threads. - pub fn new(scope: Pin<&Scope<'scope>>) -> ScopePtr<'scope> { + /// + /// # SAFETY: + /// + /// The scope must be pinned (this cannot be enforced on the type level + /// due to compatibility requirements with rayon). + pub unsafe fn new(scope: &Scope<'scope>) -> ScopePtr<'scope> { scope.add_reference(); - ScopePtr(scope.get_ref()) + ScopePtr(scope) } /// Passes the scope referred to by this pointer into a closure. pub fn run(&self, f: F) -> T where - F: FnOnce(Pin<&Scope<'scope>>) -> T + 'scope, + F: FnOnce(&Scope<'scope>) -> T + 'scope, { // SAFETY: This pointer is convertible to a shared reference. // - // + It was created from an immutable reference, and we never change - // it so it must be non-null and dereferenceable. + // + It was created from an immutable reference to a pinned scope. + // The only way for this to be invalidated is if the scope was + // dropped in the time since we created the pointer. // // + We incremented the scope's reference counter and will not // decrement it until this pointer is dropped. Since the scope - // will remain valid as long as the reference counter is above - // zero, we know it is valid. + // will not be dropped while the reference counter is above + // zero, we know the pointer is still valid. // // + The scope is never accessed mutably, so creating shared // references is allowed. // - let inner_ref = unsafe { &*self.0 }; - - // SAFETY: The scope was pinned when passed to `ScopePtr::new` so - // the pinning rules must already be satisfied. - let pinned_ref = unsafe { Pin::new_unchecked(inner_ref) }; + let scope_ref = unsafe { &*self.0 }; // Execute the closure on the shared reference. - f(pinned_ref) + f(scope_ref) } } @@ -337,13 +340,14 @@ mod scope_ptr { fn drop(&mut self) { // SAFETY: This pointer is convertible to a shared reference. // - // + It was created from an immutable reference, and we never change - // it so it must be non-null and dereferenceable. + // + It was created from an immutable reference to a pinned scope. + // The only way for this to be invalidated is if the scope was + // dropped in the time since we created the pointer. // - // + We incremented the scope's reference counter and we have not yet - // decremented it (although we are about to). Since the scope will - // remain valid as long as the reference counter is above zero, we - // know it is valid. + // + We incremented the scope's reference counter and will not + // decrement it until this pointer is dropped. Since the scope + // will not be dropped while the reference counter is above + // zero, we know the pointer is still valid. // // + The scope is never accessed mutably, so creating shared // references is allowed. diff --git a/src/thread_pool.rs b/src/thread_pool.rs index 2c07ccb..58361d8 100644 --- a/src/thread_pool.rs +++ b/src/thread_pool.rs @@ -1,7 +1,6 @@ //! This module contains the api and worker logic for the Forte thread pool. -use alloc::collections::BTreeMap; -use alloc::collections::btree_map::Entry; +use alloc::collections::VecDeque; use alloc::format; use alloc::string::ToString; use alloc::vec::Vec; @@ -16,7 +15,6 @@ use core::ptr::NonNull; use core::task::Context; use core::task::Poll; use core::time::Duration; -use std::time::Instant; use async_task::Runnable; use async_task::Task; @@ -32,6 +30,7 @@ use crate::job::StackJob; use crate::platform::*; use crate::scope::Scope; use crate::signal::Signal; +use crate::unwind; // ----------------------------------------------------------------------------- // Thread pool worker leases @@ -49,7 +48,7 @@ pub struct Lease { /// The "heartbeat interval" controls the frequency at which workers share work. #[cfg(not(feature = "shuttle"))] -pub const HEARTBEAT_INTERVAL: Duration = Duration::from_micros(500); +pub const HEARTBEAT_INTERVAL: Duration = Duration::from_micros(100); /// The `ThreadPool` object is used to orchestrate and distribute work to a pool /// of threads, and is generally the main entry point to using `Forte`. @@ -122,17 +121,18 @@ pub const HEARTBEAT_INTERVAL: Duration = Duration::from_micros(500); pub struct ThreadPool { state: Mutex, job_is_ready: Condvar, + new_participant: Condvar, } struct ThreadPoolState { - shared_jobs: BTreeMap, + shared_jobs: VecDeque, tenants: Vec>, managed_threads: ManagedThreads, } impl ThreadPoolState { fn claim_shared_job(&mut self) -> Option { - self.shared_jobs.pop_first().map(|(_, job_ref)| job_ref) + self.shared_jobs.pop_front() } /// Claims a lease on the thread pool. A lease can be passed to @@ -144,7 +144,6 @@ impl ThreadPoolState { let heartbeat = Arc::new(AtomicBool::new(false)); let tenant = Tenant { heartbeat: Arc::downgrade(&heartbeat), - last_heartbeat: Instant::now(), }; for (index, occupant) in self.tenants.iter_mut().enumerate() { @@ -172,8 +171,6 @@ impl ThreadPoolState { fn claim_leases(&mut self, thread_pool: &'static ThreadPool, num: usize) -> Vec { let mut leases = Vec::with_capacity(num); - let now = Instant::now(); - for (index, occupant) in self.tenants.iter_mut().enumerate() { if leases.len() == num { return leases; @@ -183,7 +180,6 @@ impl ThreadPoolState { let heartbeat = Arc::new(AtomicBool::new(false)); let tenant = Tenant { heartbeat: Arc::downgrade(&heartbeat), - last_heartbeat: now, }; *occupant = Some(tenant); leases.push(Lease { @@ -198,7 +194,6 @@ impl ThreadPoolState { let heartbeat = Arc::new(AtomicBool::new(false)); let tenant = Tenant { heartbeat: Arc::downgrade(&heartbeat), - last_heartbeat: now, }; self.tenants.push(Some(tenant)); leases.push(Lease { @@ -214,7 +209,6 @@ impl ThreadPoolState { struct Tenant { heartbeat: Weak, - last_heartbeat: Instant, } /// Manages threads spawned by the pool. @@ -251,7 +245,7 @@ impl ThreadPool { pub const fn new() -> ThreadPool { ThreadPool { state: Mutex::new(ThreadPoolState { - shared_jobs: BTreeMap::new(), + shared_jobs: VecDeque::new(), tenants: Vec::new(), managed_threads: ManagedThreads { workers: Vec::new(), @@ -259,12 +253,14 @@ impl ThreadPool { }, }), job_is_ready: Condvar::new(), + new_participant: Condvar::new(), } } /// Claims a lease on the thread pool which can be occupied by a worker /// (using [`Worker::occupy`]), allowing a thread to participate in the pool. pub fn claim_lease(&'static self) -> Lease { + self.new_participant.notify_one(); let mut state = self.state.lock().unwrap(); state.claim_lease(self) } @@ -613,8 +609,7 @@ impl ThreadPool { #[inline] pub fn scope<'scope, F, T>(&'static self, f: F) -> T where - F: FnOnce(Pin<&Scope<'scope>>) -> T + Send, - T: Send, + F: FnOnce(&Scope<'scope>) -> T, { self.with_worker(|worker| worker.scope(f)) } @@ -647,6 +642,7 @@ thread_local! { /// Workers have one core memory-safety guarantee: Any jobs added to the worker /// will eventually be executed. pub struct Worker { + pub(crate) migrated: Cell, pub(crate) lease: Lease, pub(crate) queue: JobQueue, } @@ -736,6 +732,7 @@ impl Worker { // problem that the same thread can occupy multiple workers on the same // thread. We many eventually need to design something to prevent this. let worker = Worker { + migrated: Cell::new(false), lease, queue: JobQueue::new(), }; @@ -749,7 +746,7 @@ impl Worker { // Execute the work queue until it's empty while let Some(job_ref) = worker.queue.pop_front() { - job_ref.execute(&worker); + worker.execute(job_ref, false); } // Swap back to pointing to the previous value (possibly null). @@ -762,6 +759,12 @@ impl Worker { result } + /// Returns the index of the worker in the leases list. + #[inline] + pub fn index(&self) -> usize { + self.lease.index + } + /// Tries to promote the oldest job in the local stack to a shared job. If /// the local job queue is empty, or if the shared queue is full, this does /// nothing. If the promotion is successful, it tries to wake another @@ -769,11 +772,9 @@ impl Worker { #[cold] fn promote(&self) { let mut state = self.lease.thread_pool.state.lock().unwrap(); - if let Entry::Vacant(e) = state.shared_jobs.entry(self.lease.index) { - if let Some(job) = self.queue.pop_front() { - e.insert(job); - self.lease.thread_pool.job_is_ready.notify_one(); - } + if let Some(job) = self.queue.pop_front() { + state.shared_jobs.push_back(job); + self.lease.thread_pool.job_is_ready.notify_one(); } } @@ -814,12 +815,17 @@ impl Worker { /// Tries to find a job to execute, either in the local queue or shared on /// the threadpool. + /// + /// The second value is true if the job was shared, or false if it was spawned locally. #[inline] - pub fn find_work(&self) -> Option { + pub fn find_work(&self) -> Option<(JobRef, bool)> { // We give preference first to things in our local deque, then in other // workers deques, and finally to injected jobs from the outside. The // idea is to finish what we started before we take on something new. - self.queue.pop_back().or_else(|| self.claim_shared_job()) + self.queue + .pop_back() + .map(|job| (job, false)) + .or_else(|| self.claim_shared_job().map(|job| (job, true))) } /// Claims a shared job from the thread pool. @@ -842,7 +848,7 @@ impl Worker { pub fn yield_local(&self) -> Yield { match self.queue.pop_back() { Some(job_ref) => { - job_ref.execute(self); + self.execute(job_ref, false); Yield::Executed } None => Yield::Idle, @@ -860,13 +866,30 @@ impl Worker { #[inline] pub fn yield_now(&self) -> Yield { match self.find_work() { - Some(job_ref) => { - job_ref.execute(self); + Some((job_ref, migrated)) => { + self.execute(job_ref, migrated); Yield::Executed } None => Yield::Idle, } } + + /// Returns `true` if the current job is executing on a different thread + /// from the one on which it was created. Returns `false` if not executing a + /// job, or if the current job was created on the current thread. + #[inline] + pub fn migrated(&self) -> bool { + self.migrated.get() + } + + /// Executes a job. This wrapper swaps in the correct thread-migration flag + /// before the job runs, then swaps it back to what it was before. + #[inline] + fn execute(&self, job_ref: JobRef, migrated: bool) { + let migrated = self.migrated.replace(migrated); + job_ref.execute(self); + self.migrated.set(migrated); + } } // ----------------------------------------------------------------------------- @@ -1063,12 +1086,17 @@ impl Worker { // Even if it's not the droid we were looking for, we must still // execute the job. - job.execute(self); + self.execute(job, false); } - // Wait for the job to complete, then return the result. + // Wait for the job to complete. let result_a = self.wait_for_signal(stack_job.signal()); - (result_a, result_b) + + // If the job panicked, resume the panic on this thread. + match result_a { + Ok(result_a) => (result_a, result_b), + Err(error) => unwind::resume_unwinding(error), + } } /// Creates a scope on which non-static work can be spawned. Spawned jobs @@ -1076,21 +1104,18 @@ impl Worker { /// spawn additional tasks into the scope. When the closure returns, it will /// block until all tasks that have been spawned into onto the scope complete. /// - /// It is also possible to create a new scope from a worker using - /// [`Scope::new`], but it must be pinned before it can be used. This - /// function mostly just does the pinning for you. - /// /// If you do not have access to a [`Worker`], you may call /// [`ThreadPool::scope`] or simply [`scope`]. #[inline] pub fn scope<'scope, F, T>(&self, f: F) -> T where - F: FnOnce(Pin<&Scope<'scope>>) -> T + Send, - T: Send, + F: FnOnce(&Scope<'scope>) -> T, { - let scope = pin!(Scope::new()); - let scope = scope.into_ref(); - f(scope) + // SAFETY: The scope is pinned upon creation and dropped when the + // function returns. + let scope = unsafe { pin!(Scope::new()) }; + let scope_ref = Pin::get_ref(scope.into_ref()); + f(scope_ref) } } @@ -1191,8 +1216,7 @@ where /// See also: [`Worker::scope`] and [`ThreadPool::scope`]. pub fn scope<'scope, F, T>(f: F) -> T where - F: FnOnce(Pin<&Scope<'scope>>) -> T + Send, - T: Send, + F: FnOnce(&Scope<'scope>) -> T, { Worker::with_current(|worker| { worker @@ -1217,7 +1241,7 @@ fn managed_worker(lease: Lease, halt: Arc, barrier: Arc) { Worker::occupy(lease, |worker| { while !halt.load(Ordering::Relaxed) { if let Some(job) = worker.queue.pop_back() { - job.execute(worker); + worker.execute(job, false); continue; } @@ -1226,7 +1250,7 @@ fn managed_worker(lease: Lease, halt: Arc, barrier: Arc) { while !halt.load(Ordering::Relaxed) { if let Some(job) = state.claim_shared_job() { drop(state); - job.execute(worker); + worker.execute(job, true); break; } @@ -1253,36 +1277,50 @@ fn managed_worker(lease: Lease, halt: Arc, barrier: Arc) { #[cfg(not(feature = "shuttle"))] fn heartbeat_loop(thread_pool: &'static ThreadPool, halt: Arc) { use std::thread; - use std::time::Instant; trace!("starting managed heartbeat thread"); - while !halt.load(Ordering::Relaxed) { - let mut state = thread_pool.state.lock().unwrap(); + // Stores the index of the tenant we intend to send the next heartbeat to. + let mut queued_to_heartbeat = 0; + + let mut state = thread_pool.state.lock().unwrap(); - let mut num_tenants = 0; - let now = Instant::now(); - for tenancy in state.tenants.iter_mut() { - if let Some(tenant) = tenancy { + while !halt.load(Ordering::Relaxed) { + let num_slots = state.tenants.len(); + let mut num_occupied: u32 = 0; + let mut sent_heartbeat = false; + + // Iterate through all the tenants, starting at the one queued to wake + for i in 0..num_slots { + let tenant_index = (queued_to_heartbeat + i) % num_slots; + // Just ignore slots that don't have a current tenant. + if let Some(tenant) = &mut state.tenants[tenant_index] { + // Clean up any old tenants who's heartbeat atomics have been de-allocated. let Some(heartbeat) = tenant.heartbeat.upgrade() else { - *tenancy = None; + state.tenants[tenant_index] = None; continue; }; - if now.duration_since(tenant.last_heartbeat) >= HEARTBEAT_INTERVAL { + // Send a single heartbeat to the first live tenant we find. + if !sent_heartbeat { heartbeat.store(true, Ordering::Relaxed); - tenant.last_heartbeat = now; + sent_heartbeat = true; + // Start with the next tenant on the next invocation of the loop. + queued_to_heartbeat = (tenant_index + 1) % num_slots; } - num_tenants += 1; + // Count every occupied slot, even if we didn't sent them a heartbeat. + num_occupied += 1; } } - drop(state); - - if num_tenants > 0 { - let sleep_interval = HEARTBEAT_INTERVAL / num_tenants; + if num_occupied > 0 { + drop(state); + let sleep_interval = HEARTBEAT_INTERVAL / num_occupied; thread::sleep(sleep_interval); + state = thread_pool.state.lock().unwrap(); + } else { + state = thread_pool.new_participant.wait(state).unwrap(); } } } diff --git a/src/unwind.rs b/src/unwind.rs new file mode 100644 index 0000000..11f3fdf --- /dev/null +++ b/src/unwind.rs @@ -0,0 +1,37 @@ +//! Unwinding recovery utilities taken from rayon. + +use alloc::boxed::Box; +use core::any::Any; +use core::panic::AssertUnwindSafe; +use std::eprintln; +use std::panic::catch_unwind; +use std::panic::resume_unwind; +use std::process::abort; +use std::thread::Result; + +/// Executes `f` and captures any panic, translating that panic into a +/// `Err` result. The assumption is that any panic will be propagated +/// later with `resume_unwinding`, and hence `f` can be treated as +/// exception safe. +#[cold] +pub fn halt_unwinding(func: F) -> Result +where + F: FnOnce() -> R, +{ + catch_unwind(AssertUnwindSafe(func)) +} + +#[cold] +pub fn resume_unwinding(payload: Box) -> ! { + resume_unwind(payload) +} + +/// Aborts the program when dropped. +pub struct AbortOnDrop; + +impl Drop for AbortOnDrop { + fn drop(&mut self) { + eprintln!("Forte: detected unexpected panic; aborting"); + abort(); + } +}