Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,24 @@ This project is currently in early [pre-release], and there may be arbitrary bre

## [Unreleased]

### Added

- `rayon-compat` crate, for running rayon on top of forte.
- `Worker::migrated()` which is `true` when the current job has moved between threads.

### Changed

- Heartbeat frequency is now 100 microseconds.
- Heartbeats are now more evenly distributed between workers.
- The heartbeat thread goes to sleep when the pool is not in use.
- Shared work is now queued and claimed in the order it was shared.
- The `Scope` API is now identical to `rayon-core` and does not use `Pin<T>`.
- `Scope::new`, `Scope::add_reference` and `Scope::remove_reference` are now private.

### Security

- Forte now implements exception safety. Panics can no longer cause UB.

## [1.0.0-alpha.3]

### Added
Expand Down
7 changes: 7 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 1 addition & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,7 @@ repository = "https://github.com/NthTensor/Forte"

[workspace]
resolver = "2"
members = ["ci"]
exclude = ["coz"]
members = ["ci", "rayon-compat"]

[dependencies]
async-task = "4.7.1"
Expand Down
10 changes: 10 additions & 0 deletions rayon-compat/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
[package]
name = "rayon-compat"
version = "1.12.1"
edition = "2024"

[dependencies]
forte = { path = ".." }

[features]
web_spin_lock = []
14 changes: 14 additions & 0 deletions rayon-compat/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
# Rayon Compat

This is a way to run `rayon` on top of `forte`! The `rayon-compat` crate mocks the important bits of the api of `rayon_core` in a pretty simple and crude way, which is none-the-less enough to support most of what `rayon` needs.

To use this crate, apply the following cargo patch like one of these:
```
// If you want to clone forte and use it locally
[patch.crates-io]
rayon-core = { path = "path to this repo", package = "rayon-compat" }

// If you want to use the latest published version of forte
[patch.crates-io]
rayon-core = { path = "https://github.com/NthTensor/Forte", package = "rayon-compat" }
```
171 changes: 171 additions & 0 deletions rayon-compat/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
use std::sync::atomic::{AtomicBool, Ordering};

pub static THREAD_POOL: forte::ThreadPool = const { forte::ThreadPool::new() };

pub static STARTED: AtomicBool = const { AtomicBool::new(false) };

#[inline(always)]
fn ensure_started() {
if !STARTED.load(Ordering::Relaxed) && !STARTED.swap(true, Ordering::Relaxed) {
THREAD_POOL.resize_to_available();
}
}

#[inline(always)]
pub fn current_num_threads() -> usize {
64 // Forte prefers smaller tasks, so it's better to lie to rayon about the size of the pool
}

#[inline(always)]
pub fn current_thread_index() -> Option<usize> {
forte::Worker::map_current(|worker| worker.index())
}

#[inline(always)]
pub fn max_num_threads() -> usize {
usize::MAX // The number of forte workers is only bounded by the size of a vector.
}

// -----------------------------------------------------------------------------
// Join

#[derive(Debug)]
pub struct FnContext {
/// True if the task was migrated.
migrated: bool,
}

impl FnContext {
#[inline(always)]
pub fn migrated(&self) -> bool {
self.migrated
}
}

#[inline(always)]
pub fn join_context<A, B, RA, RB>(oper_a: A, oper_b: B) -> (RA, RB)
where
A: FnOnce(FnContext) -> RA + Send,
B: FnOnce(FnContext) -> RB + Send,
RA: Send,
RB: Send,
{
ensure_started();
THREAD_POOL.join(
|worker| {
let migrated = worker.migrated();
let ctx = FnContext { migrated };
oper_a(ctx)
},
|worker| {
let migrated = worker.migrated();
let ctx = FnContext { migrated };
oper_b(ctx)
},
)
}

#[inline(always)]
pub fn join<A, B, RA, RB>(oper_a: A, oper_b: B) -> (RA, RB)
where
A: FnOnce() -> RA + Send,
B: FnOnce() -> RB + Send,
RA: Send,
RB: Send,
{
ensure_started();
THREAD_POOL.join(|_| oper_a(), |_| oper_b())
}

// -----------------------------------------------------------------------------
// Scope

pub use forte::Scope;

#[inline(always)]
pub fn scope<'scope, OP, R>(op: OP) -> R
where
OP: FnOnce(&Scope<'scope>) -> R + Send,
R: Send,
{
ensure_started();
forte::scope(op)
}

#[inline(always)]
pub fn in_place_scope<'scope, OP, R>(op: OP) -> R
where
OP: FnOnce(&Scope<'scope>) -> R,
{
ensure_started();
forte::scope(op)
}

// -----------------------------------------------------------------------------
// Spawn

#[inline(always)]
pub fn spawn<F>(func: F)
where
F: FnOnce() + Send + 'static,
{
ensure_started();
THREAD_POOL.spawn(|_| func())
}

// -----------------------------------------------------------------------------
// Yield

pub use forte::Yield;

pub fn yield_local() -> Yield {
let result = forte::Worker::map_current(forte::Worker::yield_local);
match result {
Some(status) => status,
_ => Yield::Idle,
}
}

pub fn yield_now() -> Yield {
let result = forte::Worker::map_current(forte::Worker::yield_now);
match result {
Some(status) => status,
_ => Yield::Idle,
}
}

// -----------------------------------------------------------------------------
// Fake stuff that dosn't work. These are here only so so that rayon can export
// them.

pub struct ThreadBuilder;

pub struct ThreadPool;

pub struct ThreadPoolBuildError;

pub struct ThreadPoolBuilder;

pub struct BroadcastContext;

pub struct ScopeFifo;

pub fn broadcast() {
unimplemented!()
}

pub fn spawn_broadcast() {
unimplemented!()
}

pub fn scope_fifo() {
unimplemented!()
}

pub fn in_place_scope_fifo() {
unimplemented!()
}

pub fn spawn_fifo() {
unimplemented!()
}
15 changes: 11 additions & 4 deletions src/job.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,11 @@ use alloc::collections::VecDeque;
use core::cell::UnsafeCell;
use core::mem::ManuallyDrop;
use core::ptr::NonNull;
use std::thread::Result as ThreadResult;

use crate::signal::Signal;
use crate::thread_pool::Worker;
use crate::unwind;

// -----------------------------------------------------------------------------
// Runnable
Expand Down Expand Up @@ -152,7 +154,7 @@ impl JobQueue {
/// This is analogous to the chili type `JobStack` and the rayon type `StackJob`.
pub struct StackJob<F, T> {
f: UnsafeCell<ManuallyDrop<F>>,
signal: Signal<T>,
signal: Signal<ThreadResult<T>>,
}

impl<F, T> StackJob<F, T>
Expand Down Expand Up @@ -210,7 +212,7 @@ where
/// closure's return value is sent over this signal after the job is
/// executed.
#[inline(always)]
pub fn signal(&self) -> &Signal<T> {
pub fn signal(&self) -> &Signal<ThreadResult<T>> {
&self.signal
}
}
Expand All @@ -235,20 +237,25 @@ where
// SAFETY: The caller ensures `this` can be converted into an immutable
// reference.
let this = unsafe { this.cast::<Self>().as_ref() };
// Create an abort guard. If the closure panics, this will convert the
// panic into an abort. Doing so prevents use-after-free for other elements of the stack.
let abort_guard = unwind::AbortOnDrop;
// SAFETY: This memory location is accessed only in this function and in
// `unwrap`. The latter cannot have been called, because it drops the
// stack job, so, since this function is called only once, we can
// guarantee that we have exclusive access.
let f_ref = unsafe { &mut *this.f.get() };
// SAFETY: The caller ensures this function is called only once.
let f = unsafe { ManuallyDrop::take(f_ref) };
// Run the job.
let result = f(worker);
// Run the job. If the job panics, we propagate the panic back to the main thread.
let result = unwind::halt_unwinding(|| f(worker));
// SAFETY: This is valid for the access used by `send` because
// `&this.signal` is an immutable reference to a `Signal`. Because
// `send` is only called in this function, and this function is never
// called again, `send` is never called again.
unsafe { Signal::send(&this.signal, result) }
// Forget the abort guard, re-enabling panics.
core::mem::forget(abort_guard);
}
}

Expand Down
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ mod job;
mod scope;
mod signal;
mod thread_pool;
mod unwind;

// -----------------------------------------------------------------------------
// Top-level exports
Expand Down
Loading
Loading