From df55ea59622984ae92a11813023c0ef9fc008db3 Mon Sep 17 00:00:00 2001 From: Murph Murphy Date: Tue, 24 Mar 2026 11:18:36 -0600 Subject: [PATCH 1/5] Switch from spinning on async completion to theCompose To keep the Java thread as unblocked as possible. --- src/templates/Async.java | 115 ++++++++++++------ .../TestFixtureFutures.java | 95 +++++++++++++++ 2 files changed, 171 insertions(+), 39 deletions(-) diff --git a/src/templates/Async.java b/src/templates/Async.java index 2b8bc75..7d88561 100644 --- a/src/templates/Async.java +++ b/src/templates/Async.java @@ -71,26 +71,44 @@ static java.util.concurrent.CompletableFut ){ java.util.concurrent.CompletableFuture future = new UniffiFreeingFuture<>(rustFuture, freeFunc); - java.util.concurrent.CompletableFuture.runAsync(() -> { - try { - byte pollResult; - do { - pollResult = poll(rustFuture, pollFunc); - } while (pollResult != UNIFFI_RUST_FUTURE_POLL_READY); + java.util.concurrent.CompletableFuture pollChain; + try { + pollChain = pollUntilReady(rustFuture, pollFunc); + } catch (java.lang.Exception e) { + freeFunc.accept(rustFuture); + future.completeExceptionally(e); + return future; + } - if (!future.isCancelled()) { - F result = UniffiHelpers.uniffiRustCallWithError(errorHandler, status -> { - return completeFunc.apply(rustFuture, status); - }); - T liftedResult = liftFunc.apply(result); - future.complete(liftedResult); - } + pollChain.thenApplyAsync(ignored -> { + if (future.isCancelled()) { + return null; + } + try { + F result = UniffiHelpers.uniffiRustCallWithError(errorHandler, status -> { + return completeFunc.apply(rustFuture, status); + }); + return liftFunc.apply(result); } catch (java.lang.Exception e) { - future.completeExceptionally(e); - } finally { - if (!future.isCancelled()) { - freeFunc.accept(rustFuture); + throw new java.util.concurrent.CompletionException(e); + } + }).whenComplete((result, throwable) -> { + if (future.isCancelled()) { + return; + } + try { + if (throwable != null) { + // Unwrap CompletionException to expose the original exception + java.lang.Throwable cause = throwable; + if (cause instanceof java.util.concurrent.CompletionException && cause.getCause() != null) { + cause = cause.getCause(); + } + future.completeExceptionally(cause); + } else { + future.complete(result); } + } finally { + freeFunc.accept(rustFuture); } }); @@ -110,41 +128,60 @@ static java.util.concurrent.CompletableFuture future = new UniffiFreeingFuture<>(rustFuture, freeFunc); - java.util.concurrent.CompletableFuture.runAsync(() -> { - try { - byte pollResult; - do { - pollResult = poll(rustFuture, pollFunc); - } while (pollResult != UNIFFI_RUST_FUTURE_POLL_READY); + java.util.concurrent.CompletableFuture pollChain; + try { + pollChain = pollUntilReady(rustFuture, pollFunc); + } catch (java.lang.Exception e) { + freeFunc.accept(rustFuture); + future.completeExceptionally(e); + return future; + } - // even though the outer `future` has been cancelled, this inner `runAsync` is unsupervised - // and keeps running. When it calls `completeFunc` after being cancelled, it's status is `SUCCESS` - // (assuming the Rust part succeeded), and the function being called can lead to a core dump. - // Guarding with `isCancelled` here makes everything work, but feels like a cludge. - if (!future.isCancelled()) { - UniffiHelpers.uniffiRustCallWithError(errorHandler, status -> { - completeFunc.accept(rustFuture, status); - }); + pollChain.thenApplyAsync(ignored -> { + if (future.isCancelled()) { + return null; + } + try { + UniffiHelpers.uniffiRustCallWithError(errorHandler, status -> { + completeFunc.accept(rustFuture, status); + }); + } catch (java.lang.Exception e) { + throw new java.util.concurrent.CompletionException(e); + } + return null; + }).whenComplete((result, throwable) -> { + if (future.isCancelled()) { + return; + } + try { + if (throwable != null) { + java.lang.Throwable cause = throwable; + if (cause instanceof java.util.concurrent.CompletionException && cause.getCause() != null) { + cause = cause.getCause(); + } + future.completeExceptionally(cause); + } else { future.complete(null); } - } catch (java.lang.Throwable e) { - future.completeExceptionally(e); } finally { - if (!future.isCancelled()) { - freeFunc.accept(rustFuture); - } + freeFunc.accept(rustFuture); } }); return future; } - private static byte poll(long rustFuture, PollingFunction pollFunc) throws java.lang.InterruptedException, java.util.concurrent.ExecutionException { + private static java.util.concurrent.CompletableFuture pollUntilReady(long rustFuture, PollingFunction pollFunc) { java.util.concurrent.CompletableFuture pollFuture = new java.util.concurrent.CompletableFuture<>(); var handle = uniffiContinuationHandleMap.insert(pollFuture); pollFunc.apply(rustFuture, UniffiRustFutureContinuationCallbackImpl.INSTANCE, handle); - do {} while (!pollFuture.isDone()); // removing this makes futures not cancel (sometimes) - return pollFuture.get(); + return pollFuture.thenComposeAsync(pollResult -> { + if (pollResult == UNIFFI_RUST_FUTURE_POLL_READY) { + return java.util.concurrent.CompletableFuture.completedFuture(null); + } else { + return pollUntilReady(rustFuture, pollFunc); + } + }); } {%- if ci.has_async_callback_interface_definition() %} diff --git a/tests/scripts/TestFixtureFutures/TestFixtureFutures.java b/tests/scripts/TestFixtureFutures/TestFixtureFutures.java index caf361e..3ec9cc2 100644 --- a/tests/scripts/TestFixtureFutures/TestFixtureFutures.java +++ b/tests/scripts/TestFixtureFutures/TestFixtureFutures.java @@ -411,6 +411,101 @@ public CompletableFuture tryDelay(String delayMs) { System.out.println(MessageFormat.format("useSharedResource (not cancelled): {0}ms", time)); } + // Test high fan-out concurrency. + // Spawns many concurrent futures to verify the thread pool isn't starved. + // With the old spinloop implementation, each future held a thread from the common pool; + // with thenCompose, no threads are held during polling. + { + int concurrency = 100; + var futures = new java.util.ArrayList>(concurrency); + var time = measureTimeMillis(() -> { + for (int i = 0; i < concurrency; i++) { + futures.add(Futures.sayAfter((short)10, "concurrent-" + i)); + } + for (var f : futures) { + var result = f.get(); + assert result.startsWith("Hello, concurrent-") : "unexpected result: " + result; + } + }); + + System.out.println(MessageFormat.format("high fan-out ({0} concurrent): {1}ms", concurrency, time)); + // All futures sleep 10ms and run concurrently; should complete well under 1s. + assert time < 2000 : MessageFormat.format("high fan-out too slow: {0}ms", time); + } + + // Test immediate cancellation. + // Cancel a future before any poll can complete and verify no crash or hang. + { + for (int i = 0; i < 100; i++) { + var job = Futures.sleep((short)5000); + job.cancel(true); + assert job.isCancelled(); + } + // Verify async machinery still works after many immediate cancellations. + var result = Futures.sayAfter((short)1, "still-alive").get(); + assert result.equals("Hello, still-alive!") : "async broken after cancellations"; + System.out.println("immediate cancellation (100 iterations) ... ok"); + } + + // Test double-free race condition (regression test for uniffi internals). + // + // There is a theoretical race in our async code: if cancel() fires between the + // isCancelled() check and the freeFunc call in whenComplete, both paths call + // rust_future_free on the same handle. Currently this is safe because uniffi's + // rust_future_free is effectively idempotent: + // - Handle::into_arc_borrowed increments the Arc refcount before creating the Arc, + // so the RustFuture allocation stays alive across multiple free calls. + // - RustFuture::free() just clears internal state (future=None, result=None) and + // cancels the scheduler; the second call is a no-op. + // + // This test remains in place to catch any regression if uniffi changes its handle + // management to be less tolerant of double-free. + { + for (int i = 0; i < 200; i++) { + // 1ms sleep means the future may complete around the same time we cancel + var job = Futures.sayAfter((short)1, "race-" + i); + // Small random-ish delay to vary the race timing + if (i % 3 == 0) { + Thread.yield(); + } + job.cancel(true); + } + // Verify the system is still healthy after many race attempts. + var result = Futures.sayAfter((short)1, "post-race").get(); + assert result.equals("Hello, post-race!") : "async broken after double-free race test"; + System.out.println("double-free race (200 iterations) ... ok"); + } + + // Test polling after free (regression test for uniffi internals). + // + // When a future is cancelled, our pollUntilReady chain may still have an in-flight + // poll when freeFunc is called. The orphaned chain can then call rust_future_poll on + // the freed handle. Currently this is safe because uniffi's Scheduler enters the + // Cancelled state on free, and any subsequent poll short-circuits to Ready via + // is_cancelled() without touching the inner future. + // + // This test remains in place to catch any regression if uniffi changes its + // post-free poll behavior. + { + for (int i = 0; i < 50; i++) { + // brokenSleep calls the waker multiple times, creating multiple polls. + // Cancelling while these polls are in-flight exercises the polling-after-free path. + var job = Futures.brokenSleep((short)100, (short)50); + // Vary timing to hit different points in the poll chain + if (i % 5 == 0) { + Thread.yield(); + } + job.cancel(true); + assert job.isCancelled(); + } + // Small delay to let any orphaned poll chains settle + TestFixtureFutures.delay(200).get(); + // Verify the system is still functional + var result = Futures.sayAfter((short)1, "post-poll-free").get(); + assert result.equals("Hello, post-poll-free!") : "async broken after poll-after-free test"; + System.out.println("poll after free (50 iterations) ... ok"); + } + } finally { // bring down the scheduler, if it's not shut down it'll hold the main thread open. scheduler.shutdown(); From 1bb0d50bb0c72042295d78d4dc5504a98fd60005 Mon Sep 17 00:00:00 2001 From: Murph Murphy Date: Tue, 24 Mar 2026 12:35:44 -0600 Subject: [PATCH 2/5] Allow for a custom executor on generated async methods Still keep the default of the `ForkJoinPool.commonPool` so there are no breaking changes. --- src/templates/Async.java | 16 ++++++---- src/templates/macros.java | 19 +++++++++++ .../TestFixtureFutures.java | 32 +++++++++++++++++++ 3 files changed, 60 insertions(+), 7 deletions(-) diff --git a/src/templates/Async.java b/src/templates/Async.java index 7d88561..cd2b357 100644 --- a/src/templates/Async.java +++ b/src/templates/Async.java @@ -62,6 +62,7 @@ public void cancel() { } static java.util.concurrent.CompletableFuture uniffiRustCallAsync( + java.util.concurrent.Executor uniffiExecutor, long rustFuture, PollingFunction pollFunc, java.util.function.BiFunction completeFunc, @@ -73,7 +74,7 @@ static java.util.concurrent.CompletableFut java.util.concurrent.CompletableFuture pollChain; try { - pollChain = pollUntilReady(rustFuture, pollFunc); + pollChain = pollUntilReady(rustFuture, pollFunc, uniffiExecutor); } catch (java.lang.Exception e) { freeFunc.accept(rustFuture); future.completeExceptionally(e); @@ -92,7 +93,7 @@ static java.util.concurrent.CompletableFut } catch (java.lang.Exception e) { throw new java.util.concurrent.CompletionException(e); } - }).whenComplete((result, throwable) -> { + }, uniffiExecutor).whenComplete((result, throwable) -> { if (future.isCancelled()) { return; } @@ -119,6 +120,7 @@ static java.util.concurrent.CompletableFut // overload specifically for Void cases, which aren't within the Object type. // This is only necessary because of Java's lack of proper Any/Unit static java.util.concurrent.CompletableFuture uniffiRustCallAsync( + java.util.concurrent.Executor uniffiExecutor, long rustFuture, PollingFunction pollFunc, java.util.function.BiConsumer completeFunc, @@ -130,7 +132,7 @@ static java.util.concurrent.CompletableFuture pollChain; try { - pollChain = pollUntilReady(rustFuture, pollFunc); + pollChain = pollUntilReady(rustFuture, pollFunc, uniffiExecutor); } catch (java.lang.Exception e) { freeFunc.accept(rustFuture); future.completeExceptionally(e); @@ -149,7 +151,7 @@ static java.util.concurrent.CompletableFuture { + }, uniffiExecutor).whenComplete((result, throwable) -> { if (future.isCancelled()) { return; } @@ -171,7 +173,7 @@ static java.util.concurrent.CompletableFuture pollUntilReady(long rustFuture, PollingFunction pollFunc) { + private static java.util.concurrent.CompletableFuture pollUntilReady(long rustFuture, PollingFunction pollFunc, java.util.concurrent.Executor uniffiExecutor) { java.util.concurrent.CompletableFuture pollFuture = new java.util.concurrent.CompletableFuture<>(); var handle = uniffiContinuationHandleMap.insert(pollFuture); pollFunc.apply(rustFuture, UniffiRustFutureContinuationCallbackImpl.INSTANCE, handle); @@ -179,9 +181,9 @@ private static java.util.concurrent.CompletableFuture pollUntilR if (pollResult == UNIFFI_RUST_FUTURE_POLL_READY) { return java.util.concurrent.CompletableFuture.completedFuture(null); } else { - return pollUntilReady(rustFuture, pollFunc); + return pollUntilReady(rustFuture, pollFunc, uniffiExecutor); } - }); + }, uniffiExecutor); } {%- if ci.has_async_callback_interface_definition() %} diff --git a/src/templates/macros.java b/src/templates/macros.java index 5048e1b..833b0b4 100644 --- a/src/templates/macros.java +++ b/src/templates/macros.java @@ -52,8 +52,16 @@ {% endif %} {%- if callable.is_async() %} {#- Async methods use CompletableFuture which requires boxed types -#} + {#- No-executor overload — defaults to ForkJoinPool.commonPool(), delegates to Executor version -#} {{ func_decl }} java.util.concurrent.CompletableFuture<{% match callable.return_type() -%}{%- when Some with (return_type) -%}{{ return_type|boxed_type_name(ci, config) }}{%- when None %}java.lang.Void{%- endmatch %}> {{ callable.name()|fn_name }}( {%- call arg_list(callable, !callable.self_type().is_some()) -%} + ){ + return {{ callable.name()|fn_name }}({% call arg_name_list(callable) %}{% if !callable.arguments().is_empty() %}, {% endif %}java.util.concurrent.ForkJoinPool.commonPool()); + } + + {#- With-executor overload — does the actual async work -#} + {{ func_decl }} java.util.concurrent.CompletableFuture<{% match callable.return_type() -%}{%- when Some with (return_type) -%}{{ return_type|boxed_type_name(ci, config) }}{%- when None %}java.lang.Void{%- endmatch %}> {{ callable.name()|fn_name }}( + {%- call arg_list(callable, !callable.self_type().is_some()) -%}{% if !callable.arguments().is_empty() %}, {% endif %}java.util.concurrent.Executor uniffiExecutor ){ return {% call call_async(callable) %}; } @@ -87,6 +95,7 @@ {%- macro call_async(callable) -%} UniffiAsyncHelpers.uniffiRustCallAsync( + uniffiExecutor, {%- match callable.self_type() %} {%- when Some with (Type::Object { .. }) %} callWithHandle(uniffiHandle -> { @@ -134,6 +143,16 @@ {%- endfor %} {%- endmacro -%} +{#- +// Arglist of just argument names, for forwarding calls (e.g. delegation between overloads). +-#} +{%- macro arg_name_list(func) -%} +{%- for arg in func.arguments() -%} + {{ arg.name()|var_name }} +{%- if !loop.last %}, {% endif -%} +{%- endfor %} +{%- endmacro -%} + {#- // Arglist as used in Java declarations of methods, functions and constructors. // even if `is_decl` there won't be default values, Java doesn't support them in any reasonable way. diff --git a/tests/scripts/TestFixtureFutures/TestFixtureFutures.java b/tests/scripts/TestFixtureFutures/TestFixtureFutures.java index 3ec9cc2..3084cda 100644 --- a/tests/scripts/TestFixtureFutures/TestFixtureFutures.java +++ b/tests/scripts/TestFixtureFutures/TestFixtureFutures.java @@ -506,6 +506,38 @@ public CompletableFuture tryDelay(String delayMs) { System.out.println("poll after free (50 iterations) ... ok"); } + // Verifies that async operations use the provided Executor instead of the default ForkJoinPool.commonPool(). + { + var executorInvocations = new java.util.concurrent.atomic.AtomicInteger(0); + var innerExecutor = Executors.newCachedThreadPool(); + java.util.concurrent.Executor trackingExecutor = runnable -> { + executorInvocations.incrementAndGet(); + innerExecutor.execute(runnable); + }; + + try { + // Test top-level async function with custom executor + var result = Futures.sayAfter((short)1, "custom-exec", trackingExecutor).get(); + assert result.equals("Hello, custom-exec!") : "unexpected result: " + result; + assert executorInvocations.get() > 0 : "custom executor was not invoked for top-level function"; + + // Test async method on object with custom executor + var megaphone = Futures.newMegaphone(); + var prevCount = executorInvocations.get(); + var methodResult = megaphone.sayAfter((short)1, "method-exec", trackingExecutor).get(); + assert methodResult.equals("HELLO, METHOD-EXEC!") : "unexpected result: " + methodResult; + assert executorInvocations.get() > prevCount : "custom executor was not invoked for method call"; + + // Test void-returning async function with custom executor + prevCount = executorInvocations.get(); + Futures.sleep((short)1, trackingExecutor).get(); + assert executorInvocations.get() > prevCount : "custom executor was not invoked for void function"; + } finally { + innerExecutor.shutdown(); + } + + System.out.println("custom executor ... ok"); + } } finally { // bring down the scheduler, if it's not shut down it'll hold the main thread open. scheduler.shutdown(); From 9b3d76d3c0db81354de75ec8add60506670f2747 Mon Sep 17 00:00:00 2001 From: Murph Murphy Date: Tue, 24 Mar 2026 12:49:55 -0600 Subject: [PATCH 3/5] Small cleanup, changelog --- CHANGELOG.md | 5 +++++ Cargo.toml | 2 +- src/templates/Async.java | 7 +++++++ tests/scripts/TestFixtureFutures/TestFixtureFutures.java | 6 ------ 4 files changed, 13 insertions(+), 7 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 72c71b3..6712d6d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,8 @@ +## 0.3.1 (Unreleased) + +- remove spinlock where Java checks for Rust future completion (uses a thenCompose based CF chain to prevent unnecesary blocking) +- generate overloads for async functions that accept and use a custom Executor + ## 0.3.0 - update to uniffi 0.31.0 diff --git a/Cargo.toml b/Cargo.toml index b469e5c..f0e2be3 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "uniffi-bindgen-java" -version = "0.3.0" +version = "0.3.1-pre" authors = ["IronCore Labs "] readme = "README.md" license = "MPL-2.0" diff --git a/src/templates/Async.java b/src/templates/Async.java index cd2b357..fc28b03 100644 --- a/src/templates/Async.java +++ b/src/templates/Async.java @@ -31,6 +31,12 @@ public UniffiFreeingFuture(long rustFuture, java.util.function.Consumer java.util.concurrent.CompletableFut return; } try { + // If we failed in the chain somewhere, now complete the future with the failure if (throwable != null) { // Unwrap CompletionException to expose the original exception java.lang.Throwable cause = throwable; diff --git a/tests/scripts/TestFixtureFutures/TestFixtureFutures.java b/tests/scripts/TestFixtureFutures/TestFixtureFutures.java index 3084cda..f706fff 100644 --- a/tests/scripts/TestFixtureFutures/TestFixtureFutures.java +++ b/tests/scripts/TestFixtureFutures/TestFixtureFutures.java @@ -411,7 +411,6 @@ public CompletableFuture tryDelay(String delayMs) { System.out.println(MessageFormat.format("useSharedResource (not cancelled): {0}ms", time)); } - // Test high fan-out concurrency. // Spawns many concurrent futures to verify the thread pool isn't starved. // With the old spinloop implementation, each future held a thread from the common pool; // with thenCompose, no threads are held during polling. @@ -433,7 +432,6 @@ public CompletableFuture tryDelay(String delayMs) { assert time < 2000 : MessageFormat.format("high fan-out too slow: {0}ms", time); } - // Test immediate cancellation. // Cancel a future before any poll can complete and verify no crash or hang. { for (int i = 0; i < 100; i++) { @@ -447,8 +445,6 @@ public CompletableFuture tryDelay(String delayMs) { System.out.println("immediate cancellation (100 iterations) ... ok"); } - // Test double-free race condition (regression test for uniffi internals). - // // There is a theoretical race in our async code: if cancel() fires between the // isCancelled() check and the freeFunc call in whenComplete, both paths call // rust_future_free on the same handle. Currently this is safe because uniffi's @@ -476,8 +472,6 @@ public CompletableFuture tryDelay(String delayMs) { System.out.println("double-free race (200 iterations) ... ok"); } - // Test polling after free (regression test for uniffi internals). - // // When a future is cancelled, our pollUntilReady chain may still have an in-flight // poll when freeFunc is called. The orphaned chain can then call rust_future_poll on // the freed handle. Currently this is safe because uniffi's Scheduler enters the From 4514a4e962a1efb6638386b3f0fe92e04deb50e4 Mon Sep 17 00:00:00 2001 From: Murph Murphy Date: Tue, 24 Mar 2026 14:28:56 -0600 Subject: [PATCH 4/5] Update CHANGELOG.md Co-authored-by: Craig Colegrove <34786857+giarc3@users.noreply.github.com> --- CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 6712d6d..614c363 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,6 @@ ## 0.3.1 (Unreleased) -- remove spinlock where Java checks for Rust future completion (uses a thenCompose based CF chain to prevent unnecesary blocking) +- remove spinlock where Java checks for Rust future completion (uses a thenCompose based CF chain to prevent unnecessary blocking) - generate overloads for async functions that accept and use a custom Executor ## 0.3.0 From 467a7fe2d8b6cd5c29240d107c8df932a9e4547b Mon Sep 17 00:00:00 2001 From: Murph Murphy Date: Tue, 24 Mar 2026 14:31:00 -0600 Subject: [PATCH 5/5] Prep for release, next one after this is likely to be major --- CHANGELOG.md | 2 +- Cargo.lock | 2 +- Cargo.toml | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 614c363..f55ccb8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,4 +1,4 @@ -## 0.3.1 (Unreleased) +## 0.3.1 - remove spinlock where Java checks for Rust future completion (uses a thenCompose based CF chain to prevent unnecessary blocking) - generate overloads for async functions that accept and use a custom Executor diff --git a/Cargo.lock b/Cargo.lock index d8db37d..be046bf 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1330,7 +1330,7 @@ dependencies = [ [[package]] name = "uniffi-bindgen-java" -version = "0.3.0" +version = "0.3.1" dependencies = [ "anyhow", "askama", diff --git a/Cargo.toml b/Cargo.toml index f0e2be3..126becb 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "uniffi-bindgen-java" -version = "0.3.1-pre" +version = "0.3.1" authors = ["IronCore Labs "] readme = "README.md" license = "MPL-2.0"