Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
## 0.3.1

- remove spinlock where Java checks for Rust future completion (uses a thenCompose based CF chain to prevent unnecessary blocking)
- generate overloads for async functions that accept and use a custom Executor

## 0.3.0

- update to uniffi 0.31.0
Expand Down
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "uniffi-bindgen-java"
version = "0.3.0"
version = "0.3.1"
authors = ["IronCore Labs <info@ironcorelabs.com>"]
readme = "README.md"
license = "MPL-2.0"
Expand Down
124 changes: 85 additions & 39 deletions src/templates/Async.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,12 @@ public UniffiFreeingFuture(long rustFuture, java.util.function.Consumer<java.lan
this.rustFuture = rustFuture;
}

// Cancellation calls freeFunc immediately, which frees the underlying Rust future.
// This races with the thenApplyAsync pipeline stage that calls completeFunc on
// the same handle. The pipeline guards against this with isCancelled() checks
// before touching the Rust future, but a narrow race window remains if cancel()
// fires after the check passes. This is safe in practice because uniffi's
// rust_future_free is idempotent (see the double-free and poll-after-free tests).
@Override
public boolean cancel(boolean ignored) {
boolean cancelled = super.cancel(ignored);
Expand Down Expand Up @@ -62,6 +68,7 @@ public void cancel() {
}

static <T, F, E extends java.lang.Exception> java.util.concurrent.CompletableFuture<T> uniffiRustCallAsync(
java.util.concurrent.Executor uniffiExecutor,
long rustFuture,
PollingFunction pollFunc,
java.util.function.BiFunction<java.lang.Long, UniffiRustCallStatus, F> completeFunc,
Expand All @@ -71,26 +78,45 @@ static <T, F, E extends java.lang.Exception> java.util.concurrent.CompletableFut
){
java.util.concurrent.CompletableFuture<T> future = new UniffiFreeingFuture<>(rustFuture, freeFunc);

java.util.concurrent.CompletableFuture.runAsync(() -> {
try {
byte pollResult;
do {
pollResult = poll(rustFuture, pollFunc);
} while (pollResult != UNIFFI_RUST_FUTURE_POLL_READY);
java.util.concurrent.CompletableFuture<java.lang.Void> pollChain;
try {
pollChain = pollUntilReady(rustFuture, pollFunc, uniffiExecutor);
} catch (java.lang.Exception e) {
freeFunc.accept(rustFuture);
future.completeExceptionally(e);
return future;
}

if (!future.isCancelled()) {
F result = UniffiHelpers.uniffiRustCallWithError(errorHandler, status -> {
return completeFunc.apply(rustFuture, status);
});
T liftedResult = liftFunc.apply(result);
future.complete(liftedResult);
}
pollChain.thenApplyAsync(ignored -> {
if (future.isCancelled()) {
return null;
}
try {
F result = UniffiHelpers.uniffiRustCallWithError(errorHandler, status -> {
return completeFunc.apply(rustFuture, status);
});
return liftFunc.apply(result);
} catch (java.lang.Exception e) {
future.completeExceptionally(e);
} finally {
if (!future.isCancelled()) {
freeFunc.accept(rustFuture);
throw new java.util.concurrent.CompletionException(e);
}
}, uniffiExecutor).whenComplete((result, throwable) -> {
if (future.isCancelled()) {
return;
}
try {
// If we failed in the chain somewhere, now complete the future with the failure
if (throwable != null) {
// Unwrap CompletionException to expose the original exception
java.lang.Throwable cause = throwable;
if (cause instanceof java.util.concurrent.CompletionException && cause.getCause() != null) {
cause = cause.getCause();
}
future.completeExceptionally(cause);
} else {
future.complete(result);
}
} finally {
freeFunc.accept(rustFuture);
}
});

Expand All @@ -101,6 +127,7 @@ static <T, F, E extends java.lang.Exception> java.util.concurrent.CompletableFut
// overload specifically for Void cases, which aren't within the Object type.
// This is only necessary because of Java's lack of proper Any/Unit
static <E extends java.lang.Exception> java.util.concurrent.CompletableFuture<java.lang.Void> uniffiRustCallAsync(
java.util.concurrent.Executor uniffiExecutor,
long rustFuture,
PollingFunction pollFunc,
java.util.function.BiConsumer<java.lang.Long, UniffiRustCallStatus> completeFunc,
Expand All @@ -110,41 +137,60 @@ static <E extends java.lang.Exception> java.util.concurrent.CompletableFuture<ja
){
java.util.concurrent.CompletableFuture<java.lang.Void> future = new UniffiFreeingFuture<>(rustFuture, freeFunc);

java.util.concurrent.CompletableFuture.runAsync(() -> {
try {
byte pollResult;
do {
pollResult = poll(rustFuture, pollFunc);
} while (pollResult != UNIFFI_RUST_FUTURE_POLL_READY);
java.util.concurrent.CompletableFuture<java.lang.Void> pollChain;
try {
pollChain = pollUntilReady(rustFuture, pollFunc, uniffiExecutor);
} catch (java.lang.Exception e) {
freeFunc.accept(rustFuture);
future.completeExceptionally(e);
return future;
}

// even though the outer `future` has been cancelled, this inner `runAsync` is unsupervised
// and keeps running. When it calls `completeFunc` after being cancelled, it's status is `SUCCESS`
// (assuming the Rust part succeeded), and the function being called can lead to a core dump.
// Guarding with `isCancelled` here makes everything work, but feels like a cludge.
if (!future.isCancelled()) {
UniffiHelpers.uniffiRustCallWithError(errorHandler, status -> {
completeFunc.accept(rustFuture, status);
});
pollChain.<java.lang.Void>thenApplyAsync(ignored -> {
if (future.isCancelled()) {
return null;
}
try {
UniffiHelpers.uniffiRustCallWithError(errorHandler, status -> {
completeFunc.accept(rustFuture, status);
});
} catch (java.lang.Exception e) {
throw new java.util.concurrent.CompletionException(e);
}
return null;
}, uniffiExecutor).whenComplete((result, throwable) -> {
if (future.isCancelled()) {
return;
}
try {
if (throwable != null) {
java.lang.Throwable cause = throwable;
if (cause instanceof java.util.concurrent.CompletionException && cause.getCause() != null) {
cause = cause.getCause();
}
future.completeExceptionally(cause);
} else {
future.complete(null);
}
} catch (java.lang.Throwable e) {
future.completeExceptionally(e);
} finally {
if (!future.isCancelled()) {
freeFunc.accept(rustFuture);
}
freeFunc.accept(rustFuture);
}
});

return future;
}

private static byte poll(long rustFuture, PollingFunction pollFunc) throws java.lang.InterruptedException, java.util.concurrent.ExecutionException {
private static java.util.concurrent.CompletableFuture<java.lang.Void> pollUntilReady(long rustFuture, PollingFunction pollFunc, java.util.concurrent.Executor uniffiExecutor) {
java.util.concurrent.CompletableFuture<java.lang.Byte> pollFuture = new java.util.concurrent.CompletableFuture<>();
var handle = uniffiContinuationHandleMap.insert(pollFuture);
pollFunc.apply(rustFuture, UniffiRustFutureContinuationCallbackImpl.INSTANCE, handle);
do {} while (!pollFuture.isDone()); // removing this makes futures not cancel (sometimes)
return pollFuture.get();
return pollFuture.thenComposeAsync(pollResult -> {
if (pollResult == UNIFFI_RUST_FUTURE_POLL_READY) {
return java.util.concurrent.CompletableFuture.completedFuture(null);
} else {
return pollUntilReady(rustFuture, pollFunc, uniffiExecutor);
}
}, uniffiExecutor);
}

{%- if ci.has_async_callback_interface_definition() %}
Expand Down
19 changes: 19 additions & 0 deletions src/templates/macros.java
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,16 @@
{% endif %}
{%- if callable.is_async() %}
{#- Async methods use CompletableFuture<T> which requires boxed types -#}
{#- No-executor overload — defaults to ForkJoinPool.commonPool(), delegates to Executor version -#}
{{ func_decl }} java.util.concurrent.CompletableFuture<{% match callable.return_type() -%}{%- when Some with (return_type) -%}{{ return_type|boxed_type_name(ci, config) }}{%- when None %}java.lang.Void{%- endmatch %}> {{ callable.name()|fn_name }}(
{%- call arg_list(callable, !callable.self_type().is_some()) -%}
){
return {{ callable.name()|fn_name }}({% call arg_name_list(callable) %}{% if !callable.arguments().is_empty() %}, {% endif %}java.util.concurrent.ForkJoinPool.commonPool());
}

{#- With-executor overload — does the actual async work -#}
{{ func_decl }} java.util.concurrent.CompletableFuture<{% match callable.return_type() -%}{%- when Some with (return_type) -%}{{ return_type|boxed_type_name(ci, config) }}{%- when None %}java.lang.Void{%- endmatch %}> {{ callable.name()|fn_name }}(
{%- call arg_list(callable, !callable.self_type().is_some()) -%}{% if !callable.arguments().is_empty() %}, {% endif %}java.util.concurrent.Executor uniffiExecutor
){
return {% call call_async(callable) %};
}
Expand Down Expand Up @@ -87,6 +95,7 @@

{%- macro call_async(callable) -%}
UniffiAsyncHelpers.uniffiRustCallAsync(
uniffiExecutor,
{%- match callable.self_type() %}
{%- when Some with (Type::Object { .. }) %}
callWithHandle(uniffiHandle -> {
Expand Down Expand Up @@ -134,6 +143,16 @@
{%- endfor %}
{%- endmacro -%}

{#-
// Arglist of just argument names, for forwarding calls (e.g. delegation between overloads).
-#}
{%- macro arg_name_list(func) -%}
{%- for arg in func.arguments() -%}
{{ arg.name()|var_name }}
{%- if !loop.last %}, {% endif -%}
{%- endfor %}
{%- endmacro -%}

{#-
// Arglist as used in Java declarations of methods, functions and constructors.
// even if `is_decl` there won't be default values, Java doesn't support them in any reasonable way.
Expand Down
121 changes: 121 additions & 0 deletions tests/scripts/TestFixtureFutures/TestFixtureFutures.java
Original file line number Diff line number Diff line change
Expand Up @@ -411,6 +411,127 @@ public CompletableFuture<java.lang.Void> tryDelay(String delayMs) {

System.out.println(MessageFormat.format("useSharedResource (not cancelled): {0}ms", time));
}
// Spawns many concurrent futures to verify the thread pool isn't starved.
// With the old spinloop implementation, each future held a thread from the common pool;
// with thenCompose, no threads are held during polling.
{
int concurrency = 100;
var futures = new java.util.ArrayList<CompletableFuture<String>>(concurrency);
var time = measureTimeMillis(() -> {
for (int i = 0; i < concurrency; i++) {
futures.add(Futures.sayAfter((short)10, "concurrent-" + i));
}
for (var f : futures) {
var result = f.get();
assert result.startsWith("Hello, concurrent-") : "unexpected result: " + result;
}
});

System.out.println(MessageFormat.format("high fan-out ({0} concurrent): {1}ms", concurrency, time));
// All futures sleep 10ms and run concurrently; should complete well under 1s.
assert time < 2000 : MessageFormat.format("high fan-out too slow: {0}ms", time);
}

// Cancel a future before any poll can complete and verify no crash or hang.
{
for (int i = 0; i < 100; i++) {
var job = Futures.sleep((short)5000);
job.cancel(true);
assert job.isCancelled();
}
// Verify async machinery still works after many immediate cancellations.
var result = Futures.sayAfter((short)1, "still-alive").get();
assert result.equals("Hello, still-alive!") : "async broken after cancellations";
System.out.println("immediate cancellation (100 iterations) ... ok");
}

// There is a theoretical race in our async code: if cancel() fires between the
// isCancelled() check and the freeFunc call in whenComplete, both paths call
// rust_future_free on the same handle. Currently this is safe because uniffi's
// rust_future_free is effectively idempotent:
// - Handle::into_arc_borrowed increments the Arc refcount before creating the Arc,
// so the RustFuture allocation stays alive across multiple free calls.
// - RustFuture::free() just clears internal state (future=None, result=None) and
// cancels the scheduler; the second call is a no-op.
//
// This test remains in place to catch any regression if uniffi changes its handle
// management to be less tolerant of double-free.
{
for (int i = 0; i < 200; i++) {
// 1ms sleep means the future may complete around the same time we cancel
var job = Futures.sayAfter((short)1, "race-" + i);
// Small random-ish delay to vary the race timing
if (i % 3 == 0) {
Thread.yield();
}
job.cancel(true);
}
// Verify the system is still healthy after many race attempts.
var result = Futures.sayAfter((short)1, "post-race").get();
assert result.equals("Hello, post-race!") : "async broken after double-free race test";
System.out.println("double-free race (200 iterations) ... ok");
}

// When a future is cancelled, our pollUntilReady chain may still have an in-flight
// poll when freeFunc is called. The orphaned chain can then call rust_future_poll on
// the freed handle. Currently this is safe because uniffi's Scheduler enters the
// Cancelled state on free, and any subsequent poll short-circuits to Ready via
// is_cancelled() without touching the inner future.
//
// This test remains in place to catch any regression if uniffi changes its
// post-free poll behavior.
{
for (int i = 0; i < 50; i++) {
// brokenSleep calls the waker multiple times, creating multiple polls.
// Cancelling while these polls are in-flight exercises the polling-after-free path.
var job = Futures.brokenSleep((short)100, (short)50);
// Vary timing to hit different points in the poll chain
if (i % 5 == 0) {
Thread.yield();
}
job.cancel(true);
assert job.isCancelled();
}
// Small delay to let any orphaned poll chains settle
TestFixtureFutures.delay(200).get();
// Verify the system is still functional
var result = Futures.sayAfter((short)1, "post-poll-free").get();
assert result.equals("Hello, post-poll-free!") : "async broken after poll-after-free test";
System.out.println("poll after free (50 iterations) ... ok");
}

// Verifies that async operations use the provided Executor instead of the default ForkJoinPool.commonPool().
{
var executorInvocations = new java.util.concurrent.atomic.AtomicInteger(0);
var innerExecutor = Executors.newCachedThreadPool();
java.util.concurrent.Executor trackingExecutor = runnable -> {
executorInvocations.incrementAndGet();
innerExecutor.execute(runnable);
};

try {
// Test top-level async function with custom executor
var result = Futures.sayAfter((short)1, "custom-exec", trackingExecutor).get();
assert result.equals("Hello, custom-exec!") : "unexpected result: " + result;
assert executorInvocations.get() > 0 : "custom executor was not invoked for top-level function";

// Test async method on object with custom executor
var megaphone = Futures.newMegaphone();
var prevCount = executorInvocations.get();
var methodResult = megaphone.sayAfter((short)1, "method-exec", trackingExecutor).get();
assert methodResult.equals("HELLO, METHOD-EXEC!") : "unexpected result: " + methodResult;
assert executorInvocations.get() > prevCount : "custom executor was not invoked for method call";

// Test void-returning async function with custom executor
prevCount = executorInvocations.get();
Futures.sleep((short)1, trackingExecutor).get();
assert executorInvocations.get() > prevCount : "custom executor was not invoked for void function";
} finally {
innerExecutor.shutdown();
}

System.out.println("custom executor ... ok");
}
} finally {
// bring down the scheduler, if it's not shut down it'll hold the main thread open.
scheduler.shutdown();
Expand Down
Loading