Skip to content

Add query cancellation support via _tasks/_cancel API for PPL queries#5254

Open
sunil9977 wants to merge 2 commits intoopensearch-project:mainfrom
sunil9977:issue-4887
Open

Add query cancellation support via _tasks/_cancel API for PPL queries#5254
sunil9977 wants to merge 2 commits intoopensearch-project:mainfrom
sunil9977:issue-4887

Conversation

@sunil9977
Copy link

Fixes #4887

  1. PPL queries now register as CancellableTask, making them visible in GET /_tasks and cancellable via POST /_tasks/{task_id}/_cancel
  2. Supports optional queryId in request body for task identification

Signed-off-by: Sunil Ramchandra Pawar <pawar_sr@apple.com>
@github-actions
Copy link
Contributor

github-actions bot commented Mar 23, 2026

PR Reviewer Guide 🔍

(Review updated until commit fd7642c)

Here are some key observations to aid the review process:

🧪 PR contains tests
🔒 No security concerns identified
✅ No TODO sections
🔀 Multiple PR themes

Sub-PR theme: Add queryId field to PPL request pipeline

Relevant files:

  • ppl/src/main/java/org/opensearch/sql/ppl/domain/PPLQueryRequest.java
  • plugin/src/main/java/org/opensearch/sql/plugin/request/PPLQueryRequestFactory.java
  • plugin/src/main/java/org/opensearch/sql/plugin/transport/TransportPPLQueryRequest.java

Sub-PR theme: Implement CancellableTask registration and cooperative cancellation

Relevant files:

  • plugin/src/main/java/org/opensearch/sql/plugin/transport/PPLQueryTask.java
  • plugin/src/main/java/org/opensearch/sql/plugin/transport/TransportPPLQueryAction.java
  • opensearch/src/main/java/org/opensearch/sql/opensearch/executor/OpenSearchQueryManager.java
  • opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/OpenSearchIndexEnumerator.java
  • plugin/src/test/java/org/opensearch/sql/plugin/transport/PPLQueryTaskTest.java

⚡ Recommended focus areas for review

ThreadLocal Leak

In submit(), cancellableTask.get() is called and then cancellableTask.remove() is called before scheduling. However, inside the scheduled wrappedTask, setCancellableTask(cancelTask) is called again on the worker thread, and clearCancellableTask() is only called in the finally block. If the task throws an exception that bypasses the finally (e.g., Error), or if the thread pool reuses threads and an exception path is missed, the ThreadLocal may not be cleared. More critically, setCancellableTask is a static method that sets a ThreadLocal on whatever thread calls it — if doExecute is called on a thread that is later reused for something else before submit() is called, the stale value could be picked up. The flow relies on setCancellableTask being called in doExecute and then read in submit() on the same thread, which is fragile.

CancellableTask cancelTask = cancellableTask.get();
cancellableTask.remove();
schedule(nodeClient, queryPlan::execute, timeout, cancelTask);
Race Condition

The CancellableTask is passed from doExecute to submit via a static ThreadLocal. If submit is not called synchronously on the same thread as doExecute (e.g., if any intermediate async dispatch occurs), the ThreadLocal value will not be present on the worker thread, and cancelTask will be null. This makes the cancellation silently non-functional without any error or warning.

  }
}
Stale Task Reference

cancellableTask is captured in the constructor via OpenSearchQueryManager.getCancellableTask(). At construction time, the ThreadLocal on the worker thread may not yet be set (it is set inside the wrappedTask lambda just before task.run()). If OpenSearchIndexEnumerator is constructed before setCancellableTask(cancelTask) is called in the scheduled task, this.cancellableTask will be null and cancellation checks in moveNext() will never fire.

this.cancellableTask = OpenSearchQueryManager.getCancellableTask();

@github-actions
Copy link
Contributor

github-actions bot commented Mar 23, 2026

PR Code Suggestions ✨

Latest suggestions up to fd7642c
Explore these optional code suggestions:

CategorySuggestion                                                                                                                                    Impact
General
Verify enumerator construction timing relative to ThreadLocal set

setCancellableTask is called inside the scheduled runnable, which sets the
ThreadLocal on the worker thread. However, OpenSearchIndexEnumerator reads
getCancellableTask() in its constructor, which is called during task.run(). This
ordering is correct only if the enumerator is constructed inside task.run(). If the
enumerator is constructed before task.run() is invoked, the cancellableTask will be
null. Verify that the enumerator is always constructed within the execution of
task.run().

opensearch/src/main/java/org/opensearch/sql/opensearch/executor/OpenSearchQueryManager.java [81-84]

+setCancellableTask(cancelTask);
 
+try {
+  task.run();
+  timeoutTask.cancel();
Suggestion importance[1-10]: 5

__

Why: This is a valid concern about ordering — setCancellableTask must be called before the enumerator constructor runs. The suggestion correctly identifies that if the enumerator is constructed inside task.run(), the ordering is correct. However, the existing_code and improved_code are identical, making this more of a verification note than an actionable fix.

Low
Add null-check logging for missing cancellable task

submit is called on the transport thread, and cancellableTask.get() reads the
ThreadLocal of that thread. But setCancellableTask is called in doExecute on the
same transport thread, so this part is actually consistent. However, if submit is
ever called from a different thread than where setCancellableTask was called, the
value will be null. Consider adding a null-check or logging when cancelTask is null
to aid debugging.

opensearch/src/main/java/org/opensearch/sql/opensearch/executor/OpenSearchQueryManager.java [54-56]

 CancellableTask cancelTask = cancellableTask.get();
 cancellableTask.remove();
+if (cancelTask == null) {
+  LOG.debug("No cancellable task found in ThreadLocal for query submission.");
+}
 schedule(nodeClient, queryPlan::execute, timeout, cancelTask);
Suggestion importance[1-10]: 2

__

Why: Adding a debug log for a null cancelTask is a minor improvement with low impact. The code already handles null gracefully since cancelTask is passed to schedule and checked in the enumerator with a null guard.

Low
Possible issue
Ensure cancellation exception is unchecked in enumerator

TaskCancelledException is thrown inside moveNext(), but Calcite's Enumerator
interface does not declare checked exceptions. If TaskCancelledException is a
checked exception, this will cause a compilation error. Verify that
TaskCancelledException extends RuntimeException; if it does not, wrap it in an
unchecked exception.

opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/OpenSearchIndexEnumerator.java [121-123]

 if (cancellableTask != null && cancellableTask.isCancelled()) {
-  throw new TaskCancelledException("The task is cancelled.");
+  throw new NonFallbackCalciteException(new TaskCancelledException("The task is cancelled."));
 }
Suggestion importance[1-10]: 4

__

Why: This is a valid concern — if TaskCancelledException is a checked exception, it would cause a compilation error in moveNext(). However, org.opensearch.core.tasks.TaskCancelledException in OpenSearch extends RuntimeException, so this is not actually a problem in practice, making the suggestion less critical.

Low
ThreadLocal misused for cross-thread task passing

The setCancellableTask stores the task in a ThreadLocal, but doExecute runs on the
transport thread while submit (which reads and clears it) runs on a different thread
pool thread. This means the ThreadLocal value set here will never be visible to the
thread that calls cancellableTask.get() in submit. The task should be passed
directly rather than relying on ThreadLocal for cross-thread communication.

opensearch/src/main/java/org/opensearch/sql/opensearch/executor/OpenSearchQueryManager.java [113-115]

-if (task instanceof PPLQueryTask pplQueryTask) {
-  OpenSearchQueryManager.setCancellableTask(pplQueryTask);
-}
+// Pass pplQueryTask directly to the plan or service layer instead of using ThreadLocal
+// e.g., pplService.execute(request, pplQueryTask, listener);
Suggestion importance[1-10]: 2

__

Why: The suggestion claims setCancellableTask and submit run on different threads, but looking at the code, setCancellableTask is called in doExecute on the transport thread, and submit is also called synchronously from that same thread before scheduling. The ThreadLocal is read in submit on the same thread it was set, so the concern is largely unfounded. The improved_code is also not actionable code.

Low

Previous suggestions

Suggestions up to commit 024a8ca
CategorySuggestion                                                                                                                                    Impact
Possible issue
Guard against pre-execution cancellation race condition

There is a race condition: if the task is cancelled between
onExecutionThreadAvailable and the start of actual query execution, the interrupt
may be missed. Additionally, if the task is already cancelled before
onExecutionThreadAvailable is called, the query will still proceed. You should check
callBack.isCancelled() immediately after registering the thread and throw early if
already cancelled.

opensearch/src/main/java/org/opensearch/sql/opensearch/executor/OpenSearchQueryManager.java [71-78]

 if (callBack != null) {
     callBack.onExecutionThreadAvailable(executionThread);
+    if (callBack.isCancelled()) {
+        LOG.info("Query was cancelled before execution started");
+        throw new OpenSearchException("Query was cancelled.");
+    }
 }
 
 Scheduler.ScheduledCancellable timeoutTask =
     threadPool.schedule(
         () -> {
           LOG.warn(
Suggestion importance[1-10]: 6

__

Why: The suggestion identifies a valid race condition where a task cancelled before execution starts would not be detected. Adding an isCancelled() check after onExecutionThreadAvailable is a reasonable defensive measure, though the window is small in practice.

Low
Ensure ThreadLocal cleanup on exception paths

If an exception is thrown between setCancellationCallback and the point where
cancellationCallBackThreadLocal.remove() is called in submit(), the ThreadLocal may
not be cleaned up, causing a memory leak or stale callback in the thread pool. A
try-finally block should be used to ensure cleanup, or the removal should be
guaranteed even on failure paths.

plugin/src/main/java/org/opensearch/sql/plugin/transport/TransportPPLQueryAction.java [113-131]

 if (task instanceof SQLQueryTask sqlQueryTask) {
-
     OpenSearchQueryManager.setCancellationCallback(new OpenSearchQueryManager.CancellationCallBack() {
         @Override
         public void onExecutionThreadAvailable(Thread thread) {
             sqlQueryTask.setExecutionThread(thread);
         }
 
         @Override
         public void onExecutionComplete() {
             sqlQueryTask.clearExecutionThread();
         }
 
         @Override
         public boolean isCancelled() {
             return sqlQueryTask.isCancelled();
         }
     });
 }
+try {
Suggestion importance[1-10]: 3

__

Why: The concern about ThreadLocal cleanup is valid in theory, but the improved_code is incomplete (ends with an unclosed try {) and doesn't show the full solution. The suggestion is partially correct but the implementation is not actionable as provided.

Low
General
Restrict direct access to ThreadLocal field

The cancellationCallBackThreadLocal field is public and mutable, which allows
external code to directly manipulate the ThreadLocal bypassing the provided
setter/cleaner methods. It should be private to enforce encapsulation and prevent
misuse.

opensearch/src/main/java/org/opensearch/sql/opensearch/executor/OpenSearchQueryManager.java [43]

-public static ThreadLocal<CancellationCallBack> cancellationCallBackThreadLocal = new ThreadLocal<>();
+private static final ThreadLocal<CancellationCallBack> cancellationCallBackThreadLocal = new ThreadLocal<>();
Suggestion importance[1-10]: 5

__

Why: Making cancellationCallBackThreadLocal public and non-final is a valid encapsulation concern. Making it private static final enforces proper access through the provided setter/cleaner methods and prevents external misuse.

Low
Handle null query in description method

If pplQuery is null, the method returns prefix + null (i.e., the string "PPL:
null"), which is misleading. A null check should be added to handle this case
gracefully.

plugin/src/main/java/org/opensearch/sql/plugin/transport/TransportPPLQueryRequest.java [167-176]

 public String getDescription()
 {
     String prefix = (queryId != null) ? "PPL [queryId=" + queryId + "]: " : "PPL: ";
 
-    if (pplQuery != null && pplQuery.length() > 512) {
-        return prefix + pplQuery.substring(0,512) + "...";
+    if (pplQuery == null) {
+        return prefix;
+    }
+
+    if (pplQuery.length() > 512) {
+        return prefix + pplQuery.substring(0, 512) + "...";
     }
 
     return prefix + pplQuery;
 }
Suggestion importance[1-10]: 4

__

Why: The suggestion correctly identifies that a null pplQuery would result in the string "PPL: null" being returned. Adding a null check improves robustness, though pplQuery being null is an edge case given the existing constructors.

Low

@ahkcs ahkcs added PPL Piped processing language enhancement New feature or request labels Mar 23, 2026
@ahkcs
Copy link
Collaborator

ahkcs commented Mar 23, 2026

Hi @sunil9977, thanks for the changes! Please take a look at the CI failures

Copy link
Collaborator

@ahkcs ahkcs left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall the approach looks good — aligns well with the plan from the issue discussion. Left a couple of minor suggestions inline.

executionThread.set(thread);
}

public void clearExecutionThread() {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor: there's a small window between task registration and setExecutionThread() where a cancel would see a null thread and be a no-op. In practice this is nearly impossible to hit (cancellation requires separate HTTP roundtrips), but it could matter if the sql-worker pool is saturated.

Since the fix is small, might be worth hardening:

private final AtomicBoolean cancelled = new AtomicBoolean(false);

public void setExecutionThread(Thread thread) {
    executionThread.set(thread);
    if (cancelled.get()) {
        thread.interrupt();
    }
}

@Override
public void onCancelled() {
    cancelled.set(true);
    Thread thread = executionThread.get();
    if (thread != null) {
        thread.interrupt();
    }
}

Copy link
Collaborator

@Swiddis Swiddis Mar 23, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see any precedent for this in OS core, I wonder if we're missing something? Only 2 hits:

% rg -w 'onCancelled' -B2 -A10
server/src/main/java/org/opensearch/index/reindex/BulkByScrollTask.java
203-
204-    @Override
205:    public void onCancelled() {
206-        /*
207-         * If this task is a leader, we don't need to do anything extra because the cancel action cancels child tasks for us
208-         * If it's is a worker, we know how to cancel it here
209-         * If we don't know whether it's a leader or worker yet, we do nothing here. If the task is later set to be a worker, we cancel the
210-         * worker at that time.
211-         */
212-        if (isWorker()) {
213-            workerState.handleCancel();
214-        }
215-    }

server/src/main/java/org/opensearch/tasks/CancellableTask.java
93-        assert reason != null;
94-        if (cancelledInfo.trySet(new CancelledInfo(reason))) {
95:            onCancelled();
96-        }
97-    }
98-
99-    public boolean isCancelled() {
100-        return cancelledInfo.get() != null;
101-    }
102-
103-    /**
104-     * Returns true if this task can potentially have children that need to be cancelled when it parent is cancelled.
105-     */
--
113-     * Called after the task is cancelled so that it can take any actions that it has to take.
114-     */
115:    protected void onCancelled() {}
116-
117-    /**
118-     * Returns true if this task should be automatically cancelled if the coordinating node that
119-     * requested this task left the cluster.
120-     */
121-    public boolean cancelOnParentLeaving() {
122-        return true;
123-    }
124-
125-    @Nullable

@@ -33,26 +34,48 @@
public static final String SQL_WORKER_THREAD_POOL_NAME = "sql-worker";
public static final String SQL_BACKGROUND_THREAD_POOL_NAME = "sql_background_io";

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: could be private static since the accessor methods already exist.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These consts are directly referenced in a few places iirc, I originally refactored to this from a lot of places duplicating a "sql-worker" string.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I wasn't referring to the thread pool name constants — those should stay public static. I meant the cancellationCallBackThreadLocal field added in this PR (line 45). It's declared public static but has setCancellationCallback/clearCancellationCallback accessor methods, so the field itself could be private static to prevent direct access.

@@ -0,0 +1,44 @@
/*
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can rename this file to PPLQueryTask

@ahkcs
Copy link
Collaborator

ahkcs commented Mar 23, 2026

CI failure seems to be caused by spotless check, please run ./gradlew spotlessApply to fix this

@ahkcs
Copy link
Collaborator

ahkcs commented Mar 23, 2026

During e2e testing with the OSD cancel button, we noticed that Thread.interrupt() only takes effect when the execution thread hits a blocking call (I/O, Future.get(), etc.). For CPU-bound queries like large joins, this can mean a 50+ second delay between clicking cancel and the query actually stopping.

Adding an interrupt check in OpenSearchIndexEnumerator.moveNext() would make cancellation near-instant, since moveNext() is called for every row:

// opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/OpenSearchIndexEnumerator.java

@Override
public boolean moveNext() {
    if (Thread.interrupted()) {
        throw new NonFallbackCalciteException(
            String.format("Query was cancelled after processing %d rows.", queryCount));
    }

    if (queryCount >= maxResponseSize) {
        return false;
    }
    // ... rest of method
}

This is outside the scope of this PR but would be a nice follow-up to make the cancel experience snappy. In our testing it brought cancel response time from ~50s down to <5s.

String prefix = (queryId != null) ? "PPL [queryId=" + queryId + "]: " : "PPL: ";

if (pplQuery != null && pplQuery.length() > 512) {
return prefix + pplQuery.substring(0,512) + "...";
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion: I don't think this is worth truncating.

The only way to access this description is to directly check the task by ID. Seeing the full query might be useful debug context. Similar debug logic in OSD core exists and doesn't truncate.

if (Thread.interrupted() || e.getCause() instanceof InterruptedException) {
if (callBack != null && callBack.isCancelled()) {
LOG.info("Query was cancelled");
throw new OpenSearchException("Query was cancelled.");
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue: This exception will propagate as a 500, which is bad for people monitoring errors.

OpenSearchException is the generic top-level exception class, it defaults as internal server error when the status isn't set. You want to use one of its subclasses.

TaskCancelledException seems like a better choice.

@Swiddis
Copy link
Collaborator

Swiddis commented Mar 23, 2026

During e2e testing with the OSD cancel button, we noticed that Thread.interrupt() only takes effect when the execution thread hits a blocking call (I/O, Future.get(), etc.). For CPU-bound queries like large joins, this can mean a 50+ second delay between clicking cancel and the query actually stopping.

That's on me lol, I thought when I implemented it that the CPU part of joins would be fast (the bulk of the time on most joins is waiting for batches), so it'd be overkill to check every iteration and we could just throw when we request new batches. Happy to take it up if there's a followup task. Do you have more precise repro steps? Since I haven't noticed this issue before.

I'm also interested in if doing this on every row would have a negative effect on benchmarks.


@Override
public boolean shouldCancelChildrenOnCancellation() {
return false;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be false? Query tasks tend to spawn background IO tasks, but I don't know if those count as children.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, updated it to true.

@ahkcs
Copy link
Collaborator

ahkcs commented Mar 23, 2026

Do you have more precise repro steps?

I was doing a join query from OSD using POC OSD side change for the cancel button. And after query cancelled, the stacktrace showed up 50 seconds later

@ahkcs
Copy link
Collaborator

ahkcs commented Mar 23, 2026

Based on @Swiddis's observation that almost no core code overrides onCancelled() — it looks like core's cancellation model is cooperative: cancel() flips isCancelled() to true, and the task is expected to poll that flag and stop itself (e.g., search tasks check task.isCancelled() per Lucene segment).

I think a simpler approach might work here: pass the CancellableTask reference to the worker thread, and check task.isCancelled() in OpenSearchIndexEnumerator.moveNext(). This would remove the need for the CancellationCallBack interface, the ThreadLocal callback, onCancelled() override, thread interruption, and the AtomicReference<Thread> — all replaced by one isCancelled() check per row.

@Swiddis
Copy link
Collaborator

Swiddis commented Mar 23, 2026

Oh, interesting. If I'm understanding right they implemented their own thread interruption mechanism? I wonder why not use regular interrupt(). Given we already implemented timeouts via interrupt, I wonder if it's worthwhile to refactor everything or we should just have cancellation point to interrupt. Open to either but given the current impl it'd be less work to stick with interrupts in the core places.

@Swiddis
Copy link
Collaborator

Swiddis commented Mar 23, 2026

Ok, talked with the core folks, task cancellation is the better approach. Since opensearch uses a thread pool system, interrupting the thread has potential to poison the whole pool. It works for us because we're careful to uninterrupt the thread when handling interruption errors, but there might theoretically be a path that leaves a perpetually-interrupted thread (either currently or in the future). Cancellation also has the benefit of propagating across clusters better (which is more relevant to Core which is fanning out requests to many data nodes regularly).

So the correct approach is change our thread interrupts to use the native task cancellation feature. If all that understanding is correct, it means we can also clean up the logic that un-interrupts the thread, but I think it's only like 2 lines anyway so not a massive win.

@github-actions
Copy link
Contributor

PR Code Analyzer ❗

AI-powered 'Code-Diff-Analyzer' found issues on commit fd7642c.

PathLineSeverityDescription
plugin/src/main/java/org/opensearch/sql/plugin/transport/TransportPPLQueryRequest.java161lowUser-controlled 'queryId' field (read from request payload in PPLQueryRequestFactory) is embedded unsanitized into task descriptions via getDescription(). While task descriptions are internal OpenSearch metadata, a malicious user could inject newlines or special characters to corrupt task listing output or logs. This appears to be an oversight rather than intentional malice — the rest of the feature (task cancellation propagation via ThreadLocal) is a legitimate and well-structured implementation.

The table above displays the top 10 most important findings.

Total: 1 | Critical: 0 | High: 0 | Medium: 0 | Low: 1


Pull Requests Author(s): Please update your Pull Request according to the report above.

Repository Maintainer(s): You can bypass diff analyzer by adding label skip-diff-analyzer after reviewing the changes carefully, then re-run failed actions. To re-enable the analyzer, remove the label, then re-run all actions.


⚠️ Note: The Code-Diff-Analyzer helps protect against potentially harmful code patterns. Please ensure you have thoroughly reviewed the changes beforehand.

Thanks.

@github-actions
Copy link
Contributor

Persistent review updated to latest commit fd7642c

@sunil9977
Copy link
Author

Thank you all for reviews.
I've refactored it to use existing methods rather than using thread.interrupt().
Please let me know if my understanding is correct.

LOG.warn(
"Query execution timed out after {}. Interrupting execution thread.",
timeout);
executionThread.interrupt();
Copy link
Collaborator

@Swiddis Swiddis Mar 24, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

future PR: this interrupt needs to be changed to cancellation as well (and all code paths that depend on it)

@Swiddis
Copy link
Collaborator

Swiddis commented Mar 24, 2026

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

enhancement New feature or request PPL Piped processing language

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[FEATURE] Support task cancellation for long-running PPL queries.

3 participants