GROOVY-9381: Support async/await like ES7#2386
Conversation
86c3923 to
8513eb6
Compare
There was a problem hiding this comment.
Pull request overview
Implements Groovy language support for ES7-style async/await, including async methods/closures/lambdas, async iteration (for await), and async generators (yield return), backed by new runtime abstractions (Awaitable, AsyncStream) and adapter SPI for integrating third-party async/reactive types.
Changes:
- Extend the Groovy grammar/parser to support
asyncmodifier,awaitunary expression,for await (...), andyield return. - Add runtime APIs (
AsyncUtils,Awaitable,AsyncStream, adapter registry) plus AST transformation to rewrite@Async/asyncmethods. - Add extensive test coverage, including virtual-thread behavior and Reactor/RxJava integration tests, and introduce new test dependencies.
Reviewed changes
Copilot reviewed 22 out of 23 changed files in this pull request and generated 15 comments.
Show a summary per file
| File | Description |
|---|---|
versions.properties |
Adds Reactor/RxJava version pins for new integration tests. |
build.gradle |
Adds Reactor and RxJava as test dependencies. |
gradle/verification-metadata.xml |
Adds verification metadata for the new dependencies. |
src/antlr/GroovyLexer.g4 |
Introduces ASYNC/AWAIT tokens. |
src/antlr/GroovyParser.g4 |
Adds parsing rules for async closures/lambdas, await expressions, for await, and yield return. |
src/main/java/org/codehaus/groovy/ast/ModifierNode.java |
Treats async as a modifier token with no bytecode opcode impact. |
src/main/java/org/apache/groovy/parser/antlr4/AstBuilder.java |
Desugars for await, emits await/yield return AST calls, injects @Async for async-modified methods, and wraps async closures/generators. |
src/main/java/org/codehaus/groovy/transform/AsyncASTTransformation.java |
Implements the @Async AST transformation, including generator handling and executor resolution. |
src/main/java/groovy/transform/AsyncUtils.java |
Provides runtime async/await helpers, generator support, executor management, and exception unwrapping. |
src/main/java/groovy/transform/Async.java |
Defines the @Async annotation (Javadoc currently mismatched with new Awaitable return type). |
src/main/java/groovy/concurrent/* |
Adds new core async abstractions and implementations: Awaitable, GroovyPromise, AsyncStream, AsyncStreamGenerator, AwaitResult, and adapter SPI/registry. |
src/test/groovy/org/codehaus/groovy/transform/* |
Adds comprehensive tests for transformation, exception behavior, adapters/framework integration, generators, for await, and virtual threads. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| // Register custom adapter | ||
| AwaitableAdapterRegistry.register(new AwaitableAdapter() { | ||
| boolean supportsAwaitable(Class<?> type) { CustomPromise.isAssignableFrom(type) } | ||
| def <T> Awaitable<T> toAwaitable(Object source) { | ||
| return Awaitable.of(((CustomPromise) source).value) | ||
| } | ||
| }) |
There was a problem hiding this comment.
This test registers a custom AwaitableAdapter into the global AwaitableAdapterRegistry but never unregisters it. Because the registry is static, repeated registrations across tests will accumulate and can change adapter precedence/order over time. Please ensure the registry state is restored after the script (once an unregister/reset API exists), or avoid per-test registration.
| * <li>The method body is wrapped in {@code GroovyPromise.of(CompletableFuture.supplyAsync(...))} | ||
| * (or {@code runAsync} for void methods)</li> | ||
| * <li>The return type becomes {@link Awaitable}{@code <T>}</li> |
There was a problem hiding this comment.
This class-level Javadoc says the method body is wrapped in GroovyPromise.of(CompletableFuture.supplyAsync/runAsync), but the implementation now delegates to AsyncUtils.executeAsync/executeAsyncVoid (and uses generateAsyncStream for generators). Please update the description to reflect the current code path.
| * <li>The method body is wrapped in {@code GroovyPromise.of(CompletableFuture.supplyAsync(...))} | |
| * (or {@code runAsync} for void methods)</li> | |
| * <li>The return type becomes {@link Awaitable}{@code <T>}</li> | |
| * <li>The method body is executed asynchronously via {@link AsyncUtils#executeAsync} | |
| * (or {@link AsyncUtils#executeAsyncVoid} for {@code void} methods)</li> | |
| * <li>Generator methods are transformed to use {@link AsyncUtils#generateAsyncStream}, | |
| * returning an {@link AsyncStream}{@code <T>}</li> | |
| * <li>The return type becomes {@link Awaitable}{@code <T>} (or {@link AsyncStream}{@code <T>} for generators)</li> |
| @Override | ||
| public boolean cancel() { | ||
| return future.cancel(true); | ||
| } |
There was a problem hiding this comment.
GroovyPromise.cancel() delegates to CompletableFuture.cancel(true), but CompletableFuture does not reliably interrupt/stop work started via supplyAsync/runAsync. That means cancelled tasks may keep running in the background (e.g., sleeping), which can exhaust executors or slow test runs. If cancellation is part of the Awaitable contract, consider implementing cancellation propagation (e.g., via ExecutorService.submit/Future and linking cancel to the underlying task).
| import java.util.concurrent.CancellationException | ||
|
|
||
| async slowTask() { | ||
| Thread.sleep(10_000) |
There was a problem hiding this comment.
This cancellation test uses a 10-second sleep in the async body and then immediately cancels. Because CompletableFuture cancellation does not reliably interrupt the underlying supplier, the sleeping task may continue running in the background and slow down or destabilize the test suite. Consider rewriting the test to avoid long sleeps (e.g., coordinate with a latch) and/or only assert cancellation of the Awaitable without depending on interruption.
| Thread.sleep(10_000) | |
| Thread.sleep(100) |
| // Inject synthetic $asyncGen as first parameter — rebuild closure | ||
| Parameter genParam = new Parameter(ClassHelper.DYNAMIC_TYPE, "$asyncGen"); | ||
| Parameter[] existingParams = closure.getParameters(); | ||
| Parameter[] newParams; | ||
| if (hasUserParams) { | ||
| newParams = new Parameter[existingParams.length + 1]; | ||
| newParams[0] = genParam; | ||
| System.arraycopy(existingParams, 0, newParams, 1, existingParams.length); | ||
| } else { | ||
| newParams = new Parameter[]{genParam}; | ||
| } | ||
| ClosureExpression genClosure = new ClosureExpression(newParams, closure.getCode()); |
There was a problem hiding this comment.
The compiler injects a synthetic parameter named "$asyncGen" into generator closures. Because Groovy allows user-defined parameters/variables with '$' names, a user closure that already declares "$asyncGen" would now get a duplicate/conflicting parameter name. Consider generating a less-collidable synthetic name (e.g., with a unique suffix) or detecting conflicts and renaming.
| @BeforeEach | ||
| void registerAdapters() { | ||
| reactorAdapter = new ReactorAwaitableAdapter() | ||
| rxJavaAdapter = new RxJavaAwaitableAdapter() | ||
| AwaitableAdapterRegistry.register(reactorAdapter) | ||
| AwaitableAdapterRegistry.register(rxJavaAdapter) | ||
| } |
There was a problem hiding this comment.
This test registers adapters in @BeforeEach but never removes them; because AwaitableAdapterRegistry is global/static, adapters will accumulate across the test JVM and can make tests order-dependent. Once an unregister/reset API exists, please restore the registry state in @AfterEach (or register once per class).
| void testAsyncClosureCancellation() { | ||
| assertScript ''' | ||
| import static groovy.transform.AsyncUtils.* | ||
| import java.util.concurrent.CancellationException | ||
|
|
||
| def task = async { Thread.sleep(10_000); 42 } | ||
| task.cancel() | ||
| assert task.isCancelled() | ||
| assert task.isCompletedExceptionally() | ||
| try { | ||
| await(task) | ||
| assert false | ||
| } catch (CancellationException e) { | ||
| // expected |
There was a problem hiding this comment.
This cancellation test sleeps for 10 seconds inside the async closure and then cancels. Since CompletableFuture cancellation doesn’t reliably stop the underlying work, the sleeping task may continue running and can slow the build or exhaust the executor. Consider replacing the sleep with a latch/park that can be released, or reducing the sleep and asserting behavior without leaving background work behind.
| /** | ||
| * Method annotation to make a method execute asynchronously and return a | ||
| * {@link java.util.concurrent.CompletableFuture CompletableFuture}. | ||
| * <p> | ||
| * When applied to a method, the {@code @Async} transformation will: | ||
| * <ul> | ||
| * <li>Change the method's return type to {@code CompletableFuture<T>} (where {@code T} is the original return type)</li> | ||
| * <li>Wrap the method body to execute asynchronously via {@code CompletableFuture.supplyAsync()}</li> |
There was a problem hiding this comment.
The @Async annotation Javadoc still says the transformation returns CompletableFuture, but the implementation now returns groovy.concurrent.Awaitable (and AsyncStream for generator methods). Please update the Javadoc and examples to match the actual API/behavior so users aren’t misled.
| /** | ||
| * Registers an adapter with higher priority than existing ones. | ||
| */ | ||
| public static void register(AwaitableAdapter adapter) { | ||
| ADAPTERS.add(0, adapter); | ||
| } |
There was a problem hiding this comment.
AwaitableAdapterRegistry.register() only ever prepends adapters and there’s no way to unregister/restore the previous state. Because adapters are global/static, repeated registrations (especially in tests) will accumulate across the JVM and can make tests order-dependent. Consider returning a handle (e.g., AutoCloseable) to unregister, or adding a remove/unregister/reset API for test isolation.
| if (source instanceof Future) { | ||
| Future<T> future = (Future<T>) source; | ||
| CompletableFuture<T> cf = new CompletableFuture<>(); | ||
| if (future.isDone()) { | ||
| completeFrom(cf, future); | ||
| } else { | ||
| CompletableFuture.runAsync(() -> completeFrom(cf, future)); | ||
| } | ||
| return new GroovyPromise<>(cf); |
There was a problem hiding this comment.
The built-in Future adapter completes a CompletableFuture by calling future.get() inside CompletableFuture.runAsync() with no executor specified (so it uses ForkJoinPool.commonPool). Because future.get() blocks, this can starve the common pool and deadlock if the Future’s work also depends on that pool. Consider using a dedicated blocking-friendly executor (or virtual threads when available), or using ForkJoinPool.ManagedBlocker to avoid starvation.
Address all 12 review comments from PR #2386: - Fix Javadocs: AsyncASTTransformation, Async annotation, GroovyPromise.cancel() - Add AwaitableAdapterRegistry.unregister() + register() returning AutoCloseable - Add AwaitableAdapterRegistry.setBlockingExecutor() to avoid pool starvation - Fix containsYieldReturn/containsYieldReturnCalls to check owner type - Use collision-safe synthetic names: $__asyncGen__, $__asyncStream__N - Reduce Thread.sleep times in cancellation tests (10s → 100ms) - Add @AfterEach cleanup in AsyncFrameworkIntegrationTest Package refactoring — separate user-facing from internal APIs: - Move GroovyPromise to org.apache.groovy.runtime.async (internal) - Move AsyncStreamGenerator to org.apache.groovy.runtime.async (internal) - Create org.apache.groovy.runtime.async.AsyncSupport (compiler support) - Create groovy.concurrent.AsyncUtils (user-facing API facade) - Retain groovy.transform.AsyncUtils as deprecated backward-compat facade - Update AstBuilder/AsyncASTTransformation to reference AsyncSupport - Migrate all test imports to new package locations Coverage improvements: - Add 18 new tests covering AwaitableAdapterRegistry (unregister, AutoCloseable, setBlockingExecutor, Iterator streams, error paths), AsyncStreamGenerator (error propagation, multiple yields), AsyncUtils (null await, CompletionStage, plain Future, deepUnwrap), backward compatibility facade, and AsyncStream.empty() All 376 async tests + 180 project tasks pass. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
8937743 to
8b620de
Compare
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 24 out of 25 changed files in this pull request and generated 6 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| def customPool = Executors.newFixedThreadPool(2, { r -> | ||
| def t = new Thread(r) | ||
| t.setName("custom-async-" + t.getId()) | ||
| t | ||
| }) | ||
| setExecutor(customPool) | ||
|
|
||
| def awaitable = async { | ||
| Thread.currentThread().getName() | ||
| } | ||
| def threadName = await(awaitable) | ||
| assert threadName.startsWith("custom-async-") | ||
| } finally { | ||
| setExecutor(savedExecutor) | ||
| } |
There was a problem hiding this comment.
This test creates a fixed thread pool and sets it as the async executor, but the pool is never shut down. Since the thread factory creates non-daemon threads, this can leak threads and hang the test JVM. Ensure the custom executor is shut down (and ideally created with daemon threads) in the finally block after resetting the executor.
| // Set a custom executor | ||
| setExecutor(Executors.newSingleThreadExecutor()) | ||
| assert getExecutor() != originalExecutor | ||
| // Reset to null — should restore default | ||
| setExecutor(null) | ||
| def restored = getExecutor() | ||
| assert restored != null | ||
| // Verify it works | ||
| def awaitable = groovy.concurrent.AsyncUtils.async { 42 } | ||
| assert groovy.concurrent.AsyncUtils.await(awaitable) == 42 | ||
| // Restore original | ||
| setExecutor(originalExecutor) |
There was a problem hiding this comment.
setExecutor(Executors.newSingleThreadExecutor()) allocates an executor that is never shut down (the reference is lost). Resetting the global executor to null/original does not stop the created thread, which can leak threads across tests. Store the executor in a variable and shut it down in a finally block.
| // Set a custom executor | |
| setExecutor(Executors.newSingleThreadExecutor()) | |
| assert getExecutor() != originalExecutor | |
| // Reset to null — should restore default | |
| setExecutor(null) | |
| def restored = getExecutor() | |
| assert restored != null | |
| // Verify it works | |
| def awaitable = groovy.concurrent.AsyncUtils.async { 42 } | |
| assert groovy.concurrent.AsyncUtils.await(awaitable) == 42 | |
| // Restore original | |
| setExecutor(originalExecutor) | |
| def customExecutor = Executors.newSingleThreadExecutor() | |
| try { | |
| // Set a custom executor | |
| setExecutor(customExecutor) | |
| assert getExecutor() != originalExecutor | |
| // Reset to null — should restore default | |
| setExecutor(null) | |
| def restored = getExecutor() | |
| assert restored != null | |
| // Verify it works | |
| def awaitable = groovy.concurrent.AsyncUtils.async { 42 } | |
| assert groovy.concurrent.AsyncUtils.await(awaitable) == 42 | |
| } finally { | |
| // Restore original | |
| setExecutor(originalExecutor) | |
| customExecutor.shutdown() | |
| } |
| class CustomExecutorService { | ||
| static Executor myPool = Executors.newFixedThreadPool(1, { r -> | ||
| def t = new Thread(r) | ||
| t.setName("my-pool-thread") | ||
| t | ||
| }) | ||
|
|
||
| @Async(executor = "myPool") | ||
| def doWork() { | ||
| return Thread.currentThread().getName() | ||
| } | ||
| } | ||
|
|
||
| def svc = new CustomExecutorService() | ||
| def result = svc.doWork().get() | ||
| assert result.startsWith("my-pool-thread") | ||
| ''' |
There was a problem hiding this comment.
This test defines a static Executor via Executors.newFixedThreadPool(...) but never shuts it down. Because the thread factory does not mark threads as daemon, this can leave non-daemon threads running after the test completes. Add cleanup to shut down myPool (or make its threads daemon) once the assertion is done.
| public void complete() { | ||
| try { | ||
| queue.put(DONE); | ||
| } catch (InterruptedException e) { | ||
| Thread.currentThread().interrupt(); | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * Signals that the generator failed with an exception. | ||
| */ | ||
| public void error(Throwable t) { | ||
| try { | ||
| queue.put(new ErrorItem(t)); | ||
| } catch (InterruptedException e) { | ||
| Thread.currentThread().interrupt(); | ||
| } | ||
| } |
There was a problem hiding this comment.
complete()/error() swallow InterruptedException without enqueuing DONE/ErrorItem. If the producer thread is interrupted at this point, consumers blocked in moveNext() can hang indefinitely. Consider retrying the queue.put(...), or recording terminal state and ensuring moveNext() can observe completion/failure even when the producer is interrupted.
| ClassNode originalReturnType = mNode.getReturnType(); | ||
| if (AWAITABLE_TYPE.getName().equals(originalReturnType.getName()) | ||
| || ASYNC_STREAM_TYPE.getName().equals(originalReturnType.getName()) | ||
| || "java.util.concurrent.CompletableFuture".equals(originalReturnType.getName())) { | ||
| addError(MY_TYPE_NAME + " cannot be applied to a method that already returns an async type", mNode); | ||
| return; | ||
| } |
There was a problem hiding this comment.
The validation for “already returns an async type” only checks exact type names. This allows methods returning an Awaitable/AsyncStream subtype (e.g. GroovyPromise) to be annotated and then double-wrapped, which contradicts the restriction described in @Async Javadoc. Consider using implementsInterface(AWAITABLE_TYPE) / isDerivedFrom(ASYNC_STREAM_TYPE) (or similar) instead of name equality.
| FALLBACK_EXECUTOR = Executors.newFixedThreadPool(FALLBACK_PARALLELISM, r -> { | ||
| Thread t = new Thread(r); | ||
| t.setDaemon(true); | ||
| t.setName("groovy-async-" + t.threadId()); |
There was a problem hiding this comment.
Thread#threadId() is not available on the project's target Java version (17), so this won't compile. Use Thread#getId() (or a reflection-based fallback) for naming the fallback pool threads.
| t.setName("groovy-async-" + t.threadId()); | |
| t.setName("groovy-async-" + t.getId()); |
99bd278 to
b3db925
Compare
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 24 out of 25 changed files in this pull request and generated 5 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| + "' must be static for static method '" + mNode.getName() + "'", mNode); | ||
| return; | ||
| } | ||
| executorExpr = varX(executorFieldName); |
There was a problem hiding this comment.
When resolving the custom executor field, varX(executorFieldName) creates an unbound VariableExpression and can be accidentally captured by a same-named local variable/parameter, resulting in using the wrong executor at runtime. Since you already have the FieldNode, bind the expression to that field explicitly (e.g., create a VariableExpression from the FieldNode / set accessed variable) to ensure it always references the intended field.
| executorExpr = varX(executorFieldName); | |
| VariableExpression executorVar = varX(executorFieldName); | |
| executorVar.setAccessedVariable(field); | |
| executorExpr = executorVar; |
| expression | ||
| // must come before postfixExpression to resolve the ambiguities between casting and call on parentheses expression, e.g. (int)(1 / 2) | ||
| : castParExpression castOperandExpression #castExprAlt | ||
|
|
||
| // async closure/lambda must come before postfixExpression to resolve the ambiguities between async and method call, e.g. async { ... } | ||
| | ASYNC nls closureOrLambdaExpression #asyncClosureExprAlt | ||
|
|
||
| // qualified names, array expressions, method invocation, post inc/dec | ||
| | postfixExpression #postfixExprAlt | ||
|
|
||
| | switchExpression #switchExprAlt | ||
|
|
||
| // ~(BNOT)/!(LNOT) (level 1) | ||
| // ~(BNOT)/!(LNOT)/await (level 1) | ||
| | (BITNOT | NOT) nls expression #unaryNotExprAlt | ||
| | AWAIT nls expression #awaitExprAlt | ||
|
|
There was a problem hiding this comment.
Because identifier includes AWAIT and expression tries postfixExpression before awaitExprAlt, input like await foo is likely parsed as a command-chain starting from an identifier named await (or await(foo)), so awaitExprAlt/visitAwaitExprAlt won’t trigger. This breaks the intended keyword-lowering to AsyncSupport.await(...) (especially outside @Async contexts). Consider giving awaitExprAlt higher priority than postfixExprAlt (similar to asyncClosureExprAlt) or adding a predicate-based disambiguation so await <expr> reliably parses as the await operator.
| private final SynchronousQueue<Object> queue = new SynchronousQueue<>(); | ||
| private T current; | ||
|
|
||
| /** | ||
| * Produces the next element. Called from the generator body when | ||
| * a {@code yield return expr} statement is executed. Blocks until | ||
| * the consumer is ready. | ||
| */ | ||
| public void yield(Object value) { | ||
| try { | ||
| queue.put(new Item(value)); | ||
| } catch (InterruptedException e) { | ||
| Thread.currentThread().interrupt(); | ||
| throw new java.util.concurrent.CancellationException("Interrupted during yield"); | ||
| } | ||
| } |
There was a problem hiding this comment.
AsyncStreamGenerator uses a SynchronousQueue, so the producer thread blocks on every yield(...) until the consumer calls moveNext(). If the consumer exits early (break/return/exception in for await body), the producer can block forever, leaking a thread/virtual-thread and potentially exhausting the fallback fixed thread pool on JDK < 21. Consider adding an explicit cancellation/close signal (e.g., AsyncStream implements AutoCloseable with a no-op default, generator overrides close() to unblock the producer) and ensure the for await lowering closes the stream in a finally block.
| def awaitable = async { Thread.sleep(5000); "never" } | ||
| try { | ||
| awaitable.get(50, TimeUnit.MILLISECONDS) | ||
| assert false : "Should have timed out" | ||
| } catch (TimeoutException e) { | ||
| // expected | ||
| } | ||
| ''' |
There was a problem hiding this comment.
The async task here sleeps for 5 seconds and is never cancelled/cleaned up after the timeout assertion. That leaves background work running and can consume threads (especially on the JDK<21 fixed pool) and slow/flakify the test suite. Prefer a much shorter sleep (still > timeout) and/or cancel the awaitable in a finally block once the timeout is observed.
| // Override with custom executor | ||
| def customPool = Executors.newSingleThreadExecutor { r -> | ||
| def t = new Thread(r, 'my-custom-async') | ||
| t.daemon = true | ||
| t | ||
| } | ||
| try { | ||
| AsyncUtils.setExecutor(customPool) | ||
| assert AsyncUtils.getExecutor().is(customPool) | ||
|
|
||
| def threadName = AsyncUtils.await(async { Thread.currentThread().name }) | ||
| assert threadName == 'my-custom-async' | ||
| } finally { | ||
| // Reset to default | ||
| AsyncUtils.setExecutor(null) | ||
| assert AsyncUtils.getExecutor() != null | ||
| assert !AsyncUtils.getExecutor().is(customPool) | ||
| } |
There was a problem hiding this comment.
customPool is never shut down. Even though the global executor is reset, the created executor service will keep running threads and can leak across the test suite. Please shut down the executor (and optionally await termination) in the finally block after resetting the global executor.
b3db925 to
4619e5a
Compare
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 25 out of 26 changed files in this pull request and generated 7 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| * def awaitable = async { | ||
| * def data = await fetchFromRemote() | ||
| * return process(data) | ||
| * } |
There was a problem hiding this comment.
The example for “use the async keyword” shows async { ... } producing an Awaitable directly, but the implemented async { ... } syntax returns a Closure that must be invoked to start work (e.g., def task = async { ... }; def awaitable = task()). Please adjust the example to match the actual semantics.
| * def awaitable = async { | |
| * def data = await fetchFromRemote() | |
| * return process(data) | |
| * } | |
| * def task = async { | |
| * def data = await fetchFromRemote() | |
| * return process(data) | |
| * } | |
| * def awaitable = task() |
| queue.put(new Item(value)); | ||
| } catch (InterruptedException e) { | ||
| Thread.currentThread().interrupt(); | ||
| throw new java.util.concurrent.CancellationException("Interrupted during yield"); | ||
| } |
There was a problem hiding this comment.
SynchronousQueue.put(...) in yield() blocks until a consumer calls moveNext(). If a consumer abandons iteration early (e.g., breaks out of a for await loop), the producer thread can block forever in yield()/complete(), leaking an executor thread. Consider adding a cancellation/close mechanism and ensuring compiler-generated for await loops signal early termination.
| def customPool = Executors.newFixedThreadPool(2, { r -> | ||
| def t = new Thread(r) | ||
| t.setName("custom-async-" + t.getId()) | ||
| t | ||
| }) |
There was a problem hiding this comment.
The customPool ExecutorService created here is never shut down and the thread factory creates non-daemon threads. This can leak threads and keep the test JVM alive. Please shut the pool down in the finally block (and consider setting threads daemon as a safety net).
| class CustomExecutorService { | ||
| static Executor myPool = Executors.newFixedThreadPool(1, { r -> | ||
| def t = new Thread(r) | ||
| t.setName("my-pool-thread") |
There was a problem hiding this comment.
myPool is a fixed thread pool whose threads are non-daemon (default) and it is never shut down. Because it is stored in a static field, it can outlive the script and keep the test JVM running. Consider using a daemon thread factory and/or an ExecutorService that is shut down at the end of the script/test.
| t.setName("my-pool-thread") | |
| t.setName("my-pool-thread") | |
| t.setDaemon(true) |
| public static <T> Closure<Awaitable<T>> wrapAsync(Closure<T> closure) { | ||
| return new Closure<Awaitable<T>>(closure.getOwner(), closure.getThisObject()) { | ||
| @SuppressWarnings("unused") | ||
| public Awaitable<T> doCall(Object... args) { | ||
| return GroovyPromise.of(CompletableFuture.supplyAsync(() -> { |
There was a problem hiding this comment.
wrapAsync creates a new Closure wrapper but does not preserve the original closure's delegate/resolveStrategy (and potentially directive). This can change property/method resolution semantics for code that customizes delegation (common in DSLs). Consider copying delegation-related settings from the original closure onto the wrapper closure instance.
| */ | ||
| @SuppressWarnings("unchecked") | ||
| public static <T> Closure<AsyncStream<T>> wrapAsyncGenerator(Closure<?> closure) { | ||
| return new Closure<AsyncStream<T>>(closure.getOwner(), closure.getThisObject()) { |
There was a problem hiding this comment.
wrapAsyncGenerator wraps the original closure without copying delegation settings (delegate/resolveStrategy). This can lead to different behavior between async { ... } and direct closure execution when delegation is used. Copy delegation-related fields from the original closure to the wrapper closure instance.
| return new Closure<AsyncStream<T>>(closure.getOwner(), closure.getThisObject()) { | |
| return new Closure<AsyncStream<T>>(closure.getOwner(), closure.getThisObject()) { | |
| { | |
| setDelegate(closure.getDelegate()); | |
| setResolveStrategy(closure.getResolveStrategy()); | |
| } |
| * <ul> | ||
| * <li>Change the method's return type to {@code Awaitable<T>} (where {@code T} is the original return type)</li> | ||
| * <li>Execute the method body asynchronously via {@link org.apache.groovy.runtime.async.AsyncSupport#executeAsync AsyncSupport.executeAsync}</li> | ||
| * <li>Transform any {@code await(future)} calls within the method to use {@link groovy.concurrent.AsyncUtils#await AsyncUtils.await}</li> |
There was a problem hiding this comment.
Javadoc mismatch: the implementation rewrites await calls to the internal org.apache.groovy.runtime.async.AsyncSupport.await(...) (see AsyncASTTransformation), not groovy.concurrent.AsyncUtils.await(...). Please update this bullet to reflect the actual generated call target, or rephrase it in terms of the public API if that's the intent.
| * <li>Transform any {@code await(future)} calls within the method to use {@link groovy.concurrent.AsyncUtils#await AsyncUtils.await}</li> | |
| * <li>Transform any {@code await(future)} calls within the method to use {@link org.apache.groovy.runtime.async.AsyncSupport#await AsyncSupport.await}</li> |
4619e5a to
03b982a
Compare
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 26 out of 27 changed files in this pull request and generated 11 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| public Awaitable<T> exceptionally(Function<Throwable, ? extends T> fn) { | ||
| return new GroovyPromise<>(future.exceptionally(t -> { | ||
| // Unwrap all wrapper layers so handler sees the original exception | ||
| Throwable cause = t; | ||
| while (cause.getCause() != null | ||
| && (cause instanceof CompletionException | ||
| || cause instanceof ExecutionException | ||
| || cause instanceof java.lang.reflect.UndeclaredThrowableException | ||
| || cause instanceof java.lang.reflect.InvocationTargetException)) { | ||
| cause = cause.getCause(); | ||
| } | ||
| return fn.apply(cause); | ||
| })); |
There was a problem hiding this comment.
GroovyPromise.exceptionally reimplements deep-unwrapping logic that also exists in AsyncSupport.deepUnwrap(). Duplicating the wrapper list in multiple places risks the two implementations drifting. Consider delegating to a single shared utility (e.g., AsyncSupport.deepUnwrap) so exception unwrapping behavior stays consistent across await() and exceptionally().
| def customPool = Executors.newFixedThreadPool(2, { r -> | ||
| def t = new Thread(r) | ||
| t.setName("custom-async-" + t.getId()) | ||
| t | ||
| }) | ||
| setExecutor(customPool) |
There was a problem hiding this comment.
This test creates a fixed thread pool (customPool) with non-daemon threads and never shuts it down. Even though setExecutor(savedExecutor) is called, the pool will keep threads alive and can hang the test JVM. Please ensure the pool is shut down (preferably in the finally) and/or use daemon threads for the thread factory.
| static Executor myPool = Executors.newFixedThreadPool(1, { r -> | ||
| def t = new Thread(r) | ||
| t.setName("my-pool-thread") | ||
| t | ||
| }) | ||
|
|
There was a problem hiding this comment.
CustomExecutorService.myPool is a static fixed thread pool with non-daemon threads and is never shut down. This can leak threads across the full test suite and prevent the JVM from exiting. Consider using daemon threads for the factory and adding explicit shutdown/cleanup (e.g., in a cleanupSpec/@AfterEach or via close() in the script).
| import java.util.concurrent.TimeUnit | ||
| import java.util.concurrent.TimeoutException | ||
|
|
||
| def asyncTimeout = async { Thread.sleep(5000); "never" } |
There was a problem hiding this comment.
This test starts an async task that sleeps for 5 seconds but only asserts a 50ms timeout on get(). The underlying task will continue sleeping after the timeout, which can unnecessarily consume executor threads and slow/flakify the overall suite (especially on the fixed-thread fallback). Consider using a much shorter sleep, cancelling the awaitable after the timeout, or using delay(...) with a cancellation-aware mechanism.
| def asyncTimeout = async { Thread.sleep(5000); "never" } | |
| def asyncTimeout = async { Thread.sleep(100); "never" } |
| long start = System.currentTimeMillis() | ||
| def result = await(svc.parallel()) | ||
| long elapsed = System.currentTimeMillis() - start | ||
| assert result == 12 // 2 + 4 + 6 | ||
| assert elapsed < 500 // parallel, not sequential |
There was a problem hiding this comment.
This test asserts a wall-clock threshold (elapsed < 500) to prove parallelism. Timing-based assertions are prone to CI flakiness under load/GC pauses and can fail even when behavior is correct. Consider using a deterministic signal (latches/barriers) or relaxing/removing the hard time bound.
| long start = System.currentTimeMillis() | |
| def result = await(svc.parallel()) | |
| long elapsed = System.currentTimeMillis() - start | |
| assert result == 12 // 2 + 4 + 6 | |
| assert elapsed < 500 // parallel, not sequential | |
| def result = await(svc.parallel()) | |
| assert result == 12 // 2 + 4 + 6 |
| import org.codehaus.groovy.ast.expr.RangeExpression; | ||
| import org.codehaus.groovy.ast.expr.SpreadExpression; | ||
| import org.codehaus.groovy.ast.expr.SpreadMapExpression; | ||
| import org.codehaus.groovy.ast.expr.StaticMethodCallExpression; |
There was a problem hiding this comment.
Unused import org.codehaus.groovy.ast.expr.StaticMethodCallExpression will cause a Java compilation error. Remove it or use it (currently no references in this file).
| import org.codehaus.groovy.ast.expr.StaticMethodCallExpression; |
| ForControlContext forCtrl = ctx.forControl(); | ||
| EnhancedForControlContext enhCtrl = forCtrl.enhancedForControl(); | ||
| if (enhCtrl == null) { | ||
| throw createParsingFailedException("for await requires enhanced for syntax: for await (item in source)", ctx); | ||
| } | ||
|
|
||
| ClassNode varType = this.visitType(enhCtrl.type()); | ||
| String varName = this.visitIdentifier(enhCtrl.identifier()); | ||
| Expression source = (Expression) this.visit(enhCtrl.expression()); |
There was a problem hiding this comment.
for await uses enhancedForControl, which supports (indexVariable COMMA)? variableModifiersOpt ... (see grammar), but this desugaring ignores enhCtrl.indexVariable() and enhCtrl.variableModifiersOpt(). This means for await (i, item in src) and modifiers like final/annotations on the loop variable will compile but be dropped. Please either reject these forms with a parsing error or implement them (e.g., maintain an index counter and apply modifiers to the declared variable).
| | ASYNC | ||
| ) |
There was a problem hiding this comment.
Adding ASYNC to the generic modifier rule makes async a legal modifier on many declarations (fields, classes, etc.), but the implementation only gives it meaning for method declarations (via annotation injection in AstBuilder). As-is, async on non-method constructs will likely be silently accepted but have no effect; consider restricting ASYNC to method modifiers in the grammar, or emitting a clear parsing/semantic error when used on unsupported targets.
| | ASYNC | |
| ) | |
| ) | |
| | { getContext() instanceof GroovyParser.MethodDeclarationContext }? | |
| m=ASYNC |
| long startTime = System.currentTimeMillis() | ||
| def delayed = async { | ||
| await(delay(200)) | ||
| return "delayed" | ||
| } | ||
| def task = delayed() | ||
|
|
||
| long afterStart = System.currentTimeMillis() | ||
| assert (afterStart - startTime) < 100 : "async should return immediately" | ||
|
|
||
| def result = await(task) | ||
| long afterAwait = System.currentTimeMillis() | ||
|
|
||
| assert result == "delayed" | ||
| assert (afterAwait - startTime) >= 180 : "should have waited for delay" |
There was a problem hiding this comment.
This test relies on wall-clock timing (< 100ms to start and >= 180ms total) which is prone to CI flakiness (GC pauses, clock granularity, noisy neighbors). Consider asserting non-blocking behavior via deterministic signals (e.g., a latch that must be reachable before awaiting), or relax/remove the strict time thresholds.
| long startTime = System.currentTimeMillis() | |
| def delayed = async { | |
| await(delay(200)) | |
| return "delayed" | |
| } | |
| def task = delayed() | |
| long afterStart = System.currentTimeMillis() | |
| assert (afterStart - startTime) < 100 : "async should return immediately" | |
| def result = await(task) | |
| long afterAwait = System.currentTimeMillis() | |
| assert result == "delayed" | |
| assert (afterAwait - startTime) >= 180 : "should have waited for delay" | |
| def delayed = async { | |
| await(delay(200)) | |
| return "delayed" | |
| } | |
| def task = delayed() | |
| def result = await(task) | |
| assert result == "delayed" |
| // while (AsyncSupport.await($__asyncStream__N.moveNext())) | ||
| Expression moveNextCall = callX(varX(streamVar), "moveNext"); | ||
| Expression awaitCall = callX(AsyncTransformHelper.ASYNC_SUPPORT_TYPE, | ||
| AsyncTransformHelper.AWAIT_METHOD, new ArgumentListExpression(moveNextCall)); | ||
| BooleanExpression condition = new BooleanExpression(awaitCall); | ||
|
|
||
| // def <varName> = $__asyncStream__N.getCurrent() | ||
| Expression getCurrentCall = callX(varX(streamVar), "getCurrent"); | ||
| ExpressionStatement getItemStmt = new ExpressionStatement(declX(varX(varName, varType), getCurrentCall)); | ||
|
|
||
| BlockStatement whileBody = block(getItemStmt, loopBody); | ||
| WhileStatement whileStmt = new WhileStatement(condition, whileBody); | ||
|
|
||
| return configureAST(block(streamDecl, whileStmt), ctx); |
There was a problem hiding this comment.
The desugaring declares the loop variable via declX(...) inside the while body. In Groovy, the enhanced-for variable is normally declared at the loop statement level (and is visible after the loop in many contexts). This translation changes scoping/visibility and may break code that expects the loop variable to exist after the for await. Consider declaring the variable once before the loop and using an assignment inside the loop body (matching visitEnhancedForControl/ForStatement behavior).
| // while (AsyncSupport.await($__asyncStream__N.moveNext())) | |
| Expression moveNextCall = callX(varX(streamVar), "moveNext"); | |
| Expression awaitCall = callX(AsyncTransformHelper.ASYNC_SUPPORT_TYPE, | |
| AsyncTransformHelper.AWAIT_METHOD, new ArgumentListExpression(moveNextCall)); | |
| BooleanExpression condition = new BooleanExpression(awaitCall); | |
| // def <varName> = $__asyncStream__N.getCurrent() | |
| Expression getCurrentCall = callX(varX(streamVar), "getCurrent"); | |
| ExpressionStatement getItemStmt = new ExpressionStatement(declX(varX(varName, varType), getCurrentCall)); | |
| BlockStatement whileBody = block(getItemStmt, loopBody); | |
| WhileStatement whileStmt = new WhileStatement(condition, whileBody); | |
| return configureAST(block(streamDecl, whileStmt), ctx); | |
| // def <varName> // declare loop variable once, before the while-loop | |
| ExpressionStatement loopVarDecl = new ExpressionStatement( | |
| declX(varX(varName, varType), ConstantExpression.NULL)); | |
| // while (AsyncSupport.await($__asyncStream__N.moveNext())) | |
| Expression moveNextCall = callX(varX(streamVar), "moveNext"); | |
| Expression awaitCall = callX(AsyncTransformHelper.ASYNC_SUPPORT_TYPE, | |
| AsyncTransformHelper.AWAIT_METHOD, new ArgumentListExpression(moveNextCall)); | |
| BooleanExpression condition = new BooleanExpression(awaitCall); | |
| // <varName> = $__asyncStream__N.getCurrent() | |
| Expression getCurrentCall = callX(varX(streamVar), "getCurrent"); | |
| ExpressionStatement getItemStmt = new ExpressionStatement( | |
| assignX(varX(varName, varType), getCurrentCall)); | |
| BlockStatement whileBody = block(getItemStmt, loopBody); | |
| WhileStatement whileStmt = new WhileStatement(condition, whileBody); | |
| return configureAST(block(streamDecl, loopVarDecl, whileStmt), ctx); |
03b982a to
cd259a0
Compare
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 25 out of 26 changed files in this pull request and generated 3 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| def originalExecutor = getExecutor() | ||
| // Set a custom executor | ||
| setExecutor(Executors.newSingleThreadExecutor()) | ||
| assert getExecutor() != originalExecutor | ||
| // Reset to null — should restore default | ||
| setExecutor(null) |
There was a problem hiding this comment.
This test sets a newSingleThreadExecutor() as the global async executor but never shuts that executor down. Even after setExecutor(null), the executor thread remains alive and can keep the JVM running. Please keep a reference to the created executor and shut it down (or create it with daemon threads).
| assertScript ''' | ||
| import static groovy.concurrent.AsyncUtils.* | ||
| import java.util.concurrent.Executors | ||
| import java.util.concurrent.atomic.AtomicReference |
There was a problem hiding this comment.
Unused import: java.util.concurrent.atomic.AtomicReference isn't referenced in this script. Removing it will keep the test focused and avoid lint noise.
| import java.util.concurrent.atomic.AtomicReference |
| // Async generator: wrap body in AsyncSupport.generateAsyncStream { ... } | ||
| Expression genCall = callX(AsyncTransformHelper.ASYNC_SUPPORT_TYPE, | ||
| AsyncTransformHelper.GENERATE_ASYNC_STREAM_METHOD, args(closure)); |
There was a problem hiding this comment.
When the annotated method is an async generator (contains yield return), the custom executor resolved from @Async(executor=...) is ignored and the generator always runs on AsyncSupport's default executor. This makes executor override inconsistent between regular async methods and generator methods; consider adding an executor-aware generateAsyncStream overload and using it here.
| // Async generator: wrap body in AsyncSupport.generateAsyncStream { ... } | |
| Expression genCall = callX(AsyncTransformHelper.ASYNC_SUPPORT_TYPE, | |
| AsyncTransformHelper.GENERATE_ASYNC_STREAM_METHOD, args(closure)); | |
| // Async generator: wrap body in AsyncSupport.generateAsyncStream, honoring a custom executor if present | |
| ArgumentListExpression genArgs = (executorExpr != null) | |
| ? args(executorExpr, closure) | |
| : args(closure); | |
| Expression genCall = callX(AsyncTransformHelper.ASYNC_SUPPORT_TYPE, | |
| AsyncTransformHelper.GENERATE_ASYNC_STREAM_METHOD, genArgs); |
https://issues.apache.org/jira/browse/GROOVY-9381