diff --git a/CHANGELOG.md b/CHANGELOG.md index 72c71b3..f55ccb8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,8 @@ +## 0.3.1 + +- remove spinlock where Java checks for Rust future completion (uses a thenCompose based CF chain to prevent unnecessary blocking) +- generate overloads for async functions that accept and use a custom Executor + ## 0.3.0 - update to uniffi 0.31.0 diff --git a/Cargo.lock b/Cargo.lock index d8db37d..be046bf 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1330,7 +1330,7 @@ dependencies = [ [[package]] name = "uniffi-bindgen-java" -version = "0.3.0" +version = "0.3.1" dependencies = [ "anyhow", "askama", diff --git a/Cargo.toml b/Cargo.toml index b469e5c..126becb 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "uniffi-bindgen-java" -version = "0.3.0" +version = "0.3.1" authors = ["IronCore Labs "] readme = "README.md" license = "MPL-2.0" diff --git a/src/templates/Async.java b/src/templates/Async.java index 2b8bc75..fc28b03 100644 --- a/src/templates/Async.java +++ b/src/templates/Async.java @@ -31,6 +31,12 @@ public UniffiFreeingFuture(long rustFuture, java.util.function.Consumer java.util.concurrent.CompletableFuture uniffiRustCallAsync( + java.util.concurrent.Executor uniffiExecutor, long rustFuture, PollingFunction pollFunc, java.util.function.BiFunction completeFunc, @@ -71,26 +78,45 @@ static java.util.concurrent.CompletableFut ){ java.util.concurrent.CompletableFuture future = new UniffiFreeingFuture<>(rustFuture, freeFunc); - java.util.concurrent.CompletableFuture.runAsync(() -> { - try { - byte pollResult; - do { - pollResult = poll(rustFuture, pollFunc); - } while (pollResult != UNIFFI_RUST_FUTURE_POLL_READY); + java.util.concurrent.CompletableFuture pollChain; + try { + pollChain = pollUntilReady(rustFuture, pollFunc, uniffiExecutor); + } catch (java.lang.Exception e) { + freeFunc.accept(rustFuture); + future.completeExceptionally(e); + return future; + } - if (!future.isCancelled()) { - F result = UniffiHelpers.uniffiRustCallWithError(errorHandler, status -> { - return completeFunc.apply(rustFuture, status); - }); - T liftedResult = liftFunc.apply(result); - future.complete(liftedResult); - } + pollChain.thenApplyAsync(ignored -> { + if (future.isCancelled()) { + return null; + } + try { + F result = UniffiHelpers.uniffiRustCallWithError(errorHandler, status -> { + return completeFunc.apply(rustFuture, status); + }); + return liftFunc.apply(result); } catch (java.lang.Exception e) { - future.completeExceptionally(e); - } finally { - if (!future.isCancelled()) { - freeFunc.accept(rustFuture); + throw new java.util.concurrent.CompletionException(e); + } + }, uniffiExecutor).whenComplete((result, throwable) -> { + if (future.isCancelled()) { + return; + } + try { + // If we failed in the chain somewhere, now complete the future with the failure + if (throwable != null) { + // Unwrap CompletionException to expose the original exception + java.lang.Throwable cause = throwable; + if (cause instanceof java.util.concurrent.CompletionException && cause.getCause() != null) { + cause = cause.getCause(); + } + future.completeExceptionally(cause); + } else { + future.complete(result); } + } finally { + freeFunc.accept(rustFuture); } }); @@ -101,6 +127,7 @@ static java.util.concurrent.CompletableFut // overload specifically for Void cases, which aren't within the Object type. // This is only necessary because of Java's lack of proper Any/Unit static java.util.concurrent.CompletableFuture uniffiRustCallAsync( + java.util.concurrent.Executor uniffiExecutor, long rustFuture, PollingFunction pollFunc, java.util.function.BiConsumer completeFunc, @@ -110,41 +137,60 @@ static java.util.concurrent.CompletableFuture future = new UniffiFreeingFuture<>(rustFuture, freeFunc); - java.util.concurrent.CompletableFuture.runAsync(() -> { - try { - byte pollResult; - do { - pollResult = poll(rustFuture, pollFunc); - } while (pollResult != UNIFFI_RUST_FUTURE_POLL_READY); + java.util.concurrent.CompletableFuture pollChain; + try { + pollChain = pollUntilReady(rustFuture, pollFunc, uniffiExecutor); + } catch (java.lang.Exception e) { + freeFunc.accept(rustFuture); + future.completeExceptionally(e); + return future; + } - // even though the outer `future` has been cancelled, this inner `runAsync` is unsupervised - // and keeps running. When it calls `completeFunc` after being cancelled, it's status is `SUCCESS` - // (assuming the Rust part succeeded), and the function being called can lead to a core dump. - // Guarding with `isCancelled` here makes everything work, but feels like a cludge. - if (!future.isCancelled()) { - UniffiHelpers.uniffiRustCallWithError(errorHandler, status -> { - completeFunc.accept(rustFuture, status); - }); + pollChain.thenApplyAsync(ignored -> { + if (future.isCancelled()) { + return null; + } + try { + UniffiHelpers.uniffiRustCallWithError(errorHandler, status -> { + completeFunc.accept(rustFuture, status); + }); + } catch (java.lang.Exception e) { + throw new java.util.concurrent.CompletionException(e); + } + return null; + }, uniffiExecutor).whenComplete((result, throwable) -> { + if (future.isCancelled()) { + return; + } + try { + if (throwable != null) { + java.lang.Throwable cause = throwable; + if (cause instanceof java.util.concurrent.CompletionException && cause.getCause() != null) { + cause = cause.getCause(); + } + future.completeExceptionally(cause); + } else { future.complete(null); } - } catch (java.lang.Throwable e) { - future.completeExceptionally(e); } finally { - if (!future.isCancelled()) { - freeFunc.accept(rustFuture); - } + freeFunc.accept(rustFuture); } }); return future; } - private static byte poll(long rustFuture, PollingFunction pollFunc) throws java.lang.InterruptedException, java.util.concurrent.ExecutionException { + private static java.util.concurrent.CompletableFuture pollUntilReady(long rustFuture, PollingFunction pollFunc, java.util.concurrent.Executor uniffiExecutor) { java.util.concurrent.CompletableFuture pollFuture = new java.util.concurrent.CompletableFuture<>(); var handle = uniffiContinuationHandleMap.insert(pollFuture); pollFunc.apply(rustFuture, UniffiRustFutureContinuationCallbackImpl.INSTANCE, handle); - do {} while (!pollFuture.isDone()); // removing this makes futures not cancel (sometimes) - return pollFuture.get(); + return pollFuture.thenComposeAsync(pollResult -> { + if (pollResult == UNIFFI_RUST_FUTURE_POLL_READY) { + return java.util.concurrent.CompletableFuture.completedFuture(null); + } else { + return pollUntilReady(rustFuture, pollFunc, uniffiExecutor); + } + }, uniffiExecutor); } {%- if ci.has_async_callback_interface_definition() %} diff --git a/src/templates/macros.java b/src/templates/macros.java index 5048e1b..833b0b4 100644 --- a/src/templates/macros.java +++ b/src/templates/macros.java @@ -52,8 +52,16 @@ {% endif %} {%- if callable.is_async() %} {#- Async methods use CompletableFuture which requires boxed types -#} + {#- No-executor overload — defaults to ForkJoinPool.commonPool(), delegates to Executor version -#} {{ func_decl }} java.util.concurrent.CompletableFuture<{% match callable.return_type() -%}{%- when Some with (return_type) -%}{{ return_type|boxed_type_name(ci, config) }}{%- when None %}java.lang.Void{%- endmatch %}> {{ callable.name()|fn_name }}( {%- call arg_list(callable, !callable.self_type().is_some()) -%} + ){ + return {{ callable.name()|fn_name }}({% call arg_name_list(callable) %}{% if !callable.arguments().is_empty() %}, {% endif %}java.util.concurrent.ForkJoinPool.commonPool()); + } + + {#- With-executor overload — does the actual async work -#} + {{ func_decl }} java.util.concurrent.CompletableFuture<{% match callable.return_type() -%}{%- when Some with (return_type) -%}{{ return_type|boxed_type_name(ci, config) }}{%- when None %}java.lang.Void{%- endmatch %}> {{ callable.name()|fn_name }}( + {%- call arg_list(callable, !callable.self_type().is_some()) -%}{% if !callable.arguments().is_empty() %}, {% endif %}java.util.concurrent.Executor uniffiExecutor ){ return {% call call_async(callable) %}; } @@ -87,6 +95,7 @@ {%- macro call_async(callable) -%} UniffiAsyncHelpers.uniffiRustCallAsync( + uniffiExecutor, {%- match callable.self_type() %} {%- when Some with (Type::Object { .. }) %} callWithHandle(uniffiHandle -> { @@ -134,6 +143,16 @@ {%- endfor %} {%- endmacro -%} +{#- +// Arglist of just argument names, for forwarding calls (e.g. delegation between overloads). +-#} +{%- macro arg_name_list(func) -%} +{%- for arg in func.arguments() -%} + {{ arg.name()|var_name }} +{%- if !loop.last %}, {% endif -%} +{%- endfor %} +{%- endmacro -%} + {#- // Arglist as used in Java declarations of methods, functions and constructors. // even if `is_decl` there won't be default values, Java doesn't support them in any reasonable way. diff --git a/tests/scripts/TestFixtureFutures/TestFixtureFutures.java b/tests/scripts/TestFixtureFutures/TestFixtureFutures.java index caf361e..f706fff 100644 --- a/tests/scripts/TestFixtureFutures/TestFixtureFutures.java +++ b/tests/scripts/TestFixtureFutures/TestFixtureFutures.java @@ -411,6 +411,127 @@ public CompletableFuture tryDelay(String delayMs) { System.out.println(MessageFormat.format("useSharedResource (not cancelled): {0}ms", time)); } + // Spawns many concurrent futures to verify the thread pool isn't starved. + // With the old spinloop implementation, each future held a thread from the common pool; + // with thenCompose, no threads are held during polling. + { + int concurrency = 100; + var futures = new java.util.ArrayList>(concurrency); + var time = measureTimeMillis(() -> { + for (int i = 0; i < concurrency; i++) { + futures.add(Futures.sayAfter((short)10, "concurrent-" + i)); + } + for (var f : futures) { + var result = f.get(); + assert result.startsWith("Hello, concurrent-") : "unexpected result: " + result; + } + }); + + System.out.println(MessageFormat.format("high fan-out ({0} concurrent): {1}ms", concurrency, time)); + // All futures sleep 10ms and run concurrently; should complete well under 1s. + assert time < 2000 : MessageFormat.format("high fan-out too slow: {0}ms", time); + } + + // Cancel a future before any poll can complete and verify no crash or hang. + { + for (int i = 0; i < 100; i++) { + var job = Futures.sleep((short)5000); + job.cancel(true); + assert job.isCancelled(); + } + // Verify async machinery still works after many immediate cancellations. + var result = Futures.sayAfter((short)1, "still-alive").get(); + assert result.equals("Hello, still-alive!") : "async broken after cancellations"; + System.out.println("immediate cancellation (100 iterations) ... ok"); + } + + // There is a theoretical race in our async code: if cancel() fires between the + // isCancelled() check and the freeFunc call in whenComplete, both paths call + // rust_future_free on the same handle. Currently this is safe because uniffi's + // rust_future_free is effectively idempotent: + // - Handle::into_arc_borrowed increments the Arc refcount before creating the Arc, + // so the RustFuture allocation stays alive across multiple free calls. + // - RustFuture::free() just clears internal state (future=None, result=None) and + // cancels the scheduler; the second call is a no-op. + // + // This test remains in place to catch any regression if uniffi changes its handle + // management to be less tolerant of double-free. + { + for (int i = 0; i < 200; i++) { + // 1ms sleep means the future may complete around the same time we cancel + var job = Futures.sayAfter((short)1, "race-" + i); + // Small random-ish delay to vary the race timing + if (i % 3 == 0) { + Thread.yield(); + } + job.cancel(true); + } + // Verify the system is still healthy after many race attempts. + var result = Futures.sayAfter((short)1, "post-race").get(); + assert result.equals("Hello, post-race!") : "async broken after double-free race test"; + System.out.println("double-free race (200 iterations) ... ok"); + } + + // When a future is cancelled, our pollUntilReady chain may still have an in-flight + // poll when freeFunc is called. The orphaned chain can then call rust_future_poll on + // the freed handle. Currently this is safe because uniffi's Scheduler enters the + // Cancelled state on free, and any subsequent poll short-circuits to Ready via + // is_cancelled() without touching the inner future. + // + // This test remains in place to catch any regression if uniffi changes its + // post-free poll behavior. + { + for (int i = 0; i < 50; i++) { + // brokenSleep calls the waker multiple times, creating multiple polls. + // Cancelling while these polls are in-flight exercises the polling-after-free path. + var job = Futures.brokenSleep((short)100, (short)50); + // Vary timing to hit different points in the poll chain + if (i % 5 == 0) { + Thread.yield(); + } + job.cancel(true); + assert job.isCancelled(); + } + // Small delay to let any orphaned poll chains settle + TestFixtureFutures.delay(200).get(); + // Verify the system is still functional + var result = Futures.sayAfter((short)1, "post-poll-free").get(); + assert result.equals("Hello, post-poll-free!") : "async broken after poll-after-free test"; + System.out.println("poll after free (50 iterations) ... ok"); + } + + // Verifies that async operations use the provided Executor instead of the default ForkJoinPool.commonPool(). + { + var executorInvocations = new java.util.concurrent.atomic.AtomicInteger(0); + var innerExecutor = Executors.newCachedThreadPool(); + java.util.concurrent.Executor trackingExecutor = runnable -> { + executorInvocations.incrementAndGet(); + innerExecutor.execute(runnable); + }; + + try { + // Test top-level async function with custom executor + var result = Futures.sayAfter((short)1, "custom-exec", trackingExecutor).get(); + assert result.equals("Hello, custom-exec!") : "unexpected result: " + result; + assert executorInvocations.get() > 0 : "custom executor was not invoked for top-level function"; + + // Test async method on object with custom executor + var megaphone = Futures.newMegaphone(); + var prevCount = executorInvocations.get(); + var methodResult = megaphone.sayAfter((short)1, "method-exec", trackingExecutor).get(); + assert methodResult.equals("HELLO, METHOD-EXEC!") : "unexpected result: " + methodResult; + assert executorInvocations.get() > prevCount : "custom executor was not invoked for method call"; + + // Test void-returning async function with custom executor + prevCount = executorInvocations.get(); + Futures.sleep((short)1, trackingExecutor).get(); + assert executorInvocations.get() > prevCount : "custom executor was not invoked for void function"; + } finally { + innerExecutor.shutdown(); + } + + System.out.println("custom executor ... ok"); + } } finally { // bring down the scheduler, if it's not shut down it'll hold the main thread open. scheduler.shutdown();