From ecf37ee76133905ff36edc8810458cafc9c8c19a Mon Sep 17 00:00:00 2001 From: dyma solovei Date: Thu, 12 Mar 2026 22:08:37 +0100 Subject: [PATCH 1/3] fix: await until tasks are acked --- .../api/collections/batch/BatchContextTest.java | 17 ++++++++++++----- 1 file changed, 12 insertions(+), 5 deletions(-) diff --git a/src/test/java/io/weaviate/client6/v1/api/collections/batch/BatchContextTest.java b/src/test/java/io/weaviate/client6/v1/api/collections/batch/BatchContextTest.java index 80ab367ce..17c7d7b15 100644 --- a/src/test/java/io/weaviate/client6/v1/api/collections/batch/BatchContextTest.java +++ b/src/test/java/io/weaviate/client6/v1/api/collections/batch/BatchContextTest.java @@ -159,7 +159,7 @@ private void closeContext() throws Exception { try { context.close(); - eof.get(); + eof.get(5, TimeUnit.SECONDS); } finally { contextClosed = true; } @@ -177,8 +177,11 @@ public void test_sendOneBatch() throws Exception { List received = recvDataAndAck(); Assertions.assertThat(tasks) .extracting(TaskHandle::id).containsExactlyInAnyOrderElementsOf(received); - Assertions.assertThat(tasks) - .extracting(TaskHandle::isAcked).allMatch(CompletableFuture::isDone); + + CompletableFuture[] tasksAcked = tasks.stream() + .map(TaskHandle::isAcked).toArray(CompletableFuture[]::new); + Assertions.assertThat(CompletableFuture.allOf(tasksAcked)) + .succeedsWithin(5, TimeUnit.SECONDS); out.beforeEof(new Event.Results(received, Collections.emptyMap())); @@ -186,6 +189,7 @@ public void test_sendOneBatch() throws Exception { // the context will be updated before the last emitEvent returns. closeContext(); + // By the time context.close() returns all tasks MUST have results set. Assertions.assertThat(tasks).extracting(TaskHandle::result) .allMatch(CompletableFuture::isDone) .extracting(CompletableFuture::get).extracting(TaskHandle.Result::error) @@ -207,8 +211,11 @@ public void test_drainOnClose() throws Exception { List received = recvDataAndAck(); Assertions.assertThat(tasks).extracting(TaskHandle::id) .containsExactlyInAnyOrderElementsOf(received); - Assertions.assertThat(tasks).extracting(TaskHandle::isAcked) - .allMatch(CompletableFuture::isDone); + + CompletableFuture[] tasksAcked = tasks.stream() + .map(TaskHandle::isAcked).toArray(CompletableFuture[]::new); + Assertions.assertThat(CompletableFuture.allOf(tasksAcked)) + .succeedsWithin(5, TimeUnit.SECONDS); } catch (Exception e) { throw new RuntimeException(e); } From eb95f77b6cec2dfa75d6c7a85d43bf5d89369783 Mon Sep 17 00:00:00 2001 From: dyma solovei Date: Fri, 13 Mar 2026 09:06:41 +0100 Subject: [PATCH 2/3] test(batch): do not attempt to close context for a failed test --- .../collections/batch/BatchContextTest.java | 28 +++++++++++++++++-- 1 file changed, 25 insertions(+), 3 deletions(-) diff --git a/src/test/java/io/weaviate/client6/v1/api/collections/batch/BatchContextTest.java b/src/test/java/io/weaviate/client6/v1/api/collections/batch/BatchContextTest.java index 17c7d7b15..f68d6712b 100644 --- a/src/test/java/io/weaviate/client6/v1/api/collections/batch/BatchContextTest.java +++ b/src/test/java/io/weaviate/client6/v1/api/collections/batch/BatchContextTest.java @@ -20,7 +20,11 @@ import org.assertj.core.api.Assertions; import org.junit.After; import org.junit.Before; +import org.junit.Rule; import org.junit.Test; +import org.junit.rules.TestName; +import org.junit.rules.TestWatcher; +import org.junit.runner.Description; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -95,6 +99,19 @@ private StreamObserver createStream(StreamObserver recv) { return in; } + @Rule + public TestName currentTest = new TestName(); + + private boolean testFailed; + + @Rule + public TestWatcher __ = new TestWatcher() { + @Override + protected void failed(Throwable e, Description description) { + testFailed = true; + } + }; + /** * Create new unstarted context with default maxSizeBytes, collection * descriptor, and collection handle defaults. @@ -102,6 +119,7 @@ private StreamObserver createStream(StreamObserver recv) { @Before public void startContext() throws InterruptedException { log.debug("===================startContext=================="); + log.debug(currentTest.getMethodName()); assert !Thread.currentThread().isInterrupted() : "main thread interrupted"; assert REQUEST_QUEUE.isEmpty() : "stream contains incoming message " + REQUEST_QUEUE.peek(); @@ -122,7 +140,11 @@ public void startContext() throws InterruptedException { @After public void reset() throws Exception { - if (!contextClosed) { + // Do not attempt to close the context if it has been previously closed + // by the test or the test has failed. In the latter case closing the + // context may lead to a deadlock if the case hasn't scheduled Results + // for all submitted messages. + if (!contextClosed && !testFailed) { closeContext(); } @@ -175,6 +197,8 @@ public void test_sendOneBatch() throws Exception { // BatchContext should flush the current batch once it hits its limit. // We will ack all items in the batch and send successful result for each one. List received = recvDataAndAck(); + out.beforeEof(new Event.Results(received, Collections.emptyMap())); + Assertions.assertThat(tasks) .extracting(TaskHandle::id).containsExactlyInAnyOrderElementsOf(received); @@ -183,8 +207,6 @@ public void test_sendOneBatch() throws Exception { Assertions.assertThat(CompletableFuture.allOf(tasksAcked)) .succeedsWithin(5, TimeUnit.SECONDS); - out.beforeEof(new Event.Results(received, Collections.emptyMap())); - // Since MockServer runs in the same thread as this test, // the context will be updated before the last emitEvent returns. closeContext(); From 86f198c30f107098e3a059901abc0226bbc950fe Mon Sep 17 00:00:00 2001 From: dyma solovei Date: Fri, 13 Mar 2026 10:00:23 +0100 Subject: [PATCH 3/3] test(batch): emit Backoff events synchronously --- .../collections/batch/BatchContextTest.java | 41 +++++++++++++------ 1 file changed, 28 insertions(+), 13 deletions(-) diff --git a/src/test/java/io/weaviate/client6/v1/api/collections/batch/BatchContextTest.java b/src/test/java/io/weaviate/client6/v1/api/collections/batch/BatchContextTest.java index f68d6712b..1a7ec0968 100644 --- a/src/test/java/io/weaviate/client6/v1/api/collections/batch/BatchContextTest.java +++ b/src/test/java/io/weaviate/client6/v1/api/collections/batch/BatchContextTest.java @@ -135,11 +135,16 @@ public void startContext() throws InterruptedException { context.start(); in.expectMessage(START); - out.emitEvent(Event.STARTED); + out.emitEventAsync(Event.STARTED); } @After public void reset() throws Exception { + log.atDebug() + .addKeyValue("contextClosed", contextClosed) + .addKeyValue("testFailed", testFailed) + .log("Begin test cleanup"); + // Do not attempt to close the context if it has been previously closed // by the test or the test has failed. In the latter case closing the // context may lead to a deadlock if the case hasn't scheduled Results @@ -302,6 +307,7 @@ public void test_backoffBacklog() throws Exception { int batchSizeNew = BATCH_SIZE / 2; // Force the last BATCH_SIZE / 2 - 1 items to be transferred to the backlog. + // Await for this event to be processed before moving forward. out.emitEvent(new Event.Backoff(batchSizeNew)); // The next item will go on the backlog and the trigger a flush, @@ -327,14 +333,14 @@ public void test_backoffBacklog() throws Exception { @Test public void test_reconnect_onShutdown() throws Exception { - out.emitEvent(Event.SHUTTING_DOWN); + out.emitEventAsync(Event.SHUTTING_DOWN); in.expectMessage(STOP); out.eof(true); in.expectMessage(START); // Not strictly necessary -- we can close the context // before a new connection is established. - out.emitEvent(Event.STARTED); + out.emitEventAsync(Event.STARTED); } @Test @@ -348,18 +354,18 @@ public void test_reconnect_onOom() throws Exception { // Respond with OOM and wait for the client to close its end of the stream. in.expectMessage(DATA); - out.emitEvent(new Event.Oom(0)); + out.emitEventAsync(new Event.Oom(0)); // Close the server's end of the stream. in.expectMessage(STOP); // Allow the client to reconnect to another "instance" and Ack the batch. in.expectMessage(START); - out.emitEvent(Event.STARTED); + out.emitEventAsync(Event.STARTED); recvDataAndAck(); List submitted = tasks.stream().map(TaskHandle::id).toList(); - out.emitEvent(new Event.Results(submitted, Collections.emptyMap())); + out.emitEventAsync(new Event.Results(submitted, Collections.emptyMap())); } @Test @@ -376,7 +382,7 @@ public void test_reconnect_onStreamHangup() throws Exception { // The client should try to reconnect, because the context is still open. in.expectMessage(START); - out.emitEvent(Event.STARTED); + out.emitEventAsync(Event.STARTED); // The previous batch hasn't been acked, so we should expect to receive it // again. @@ -387,7 +393,7 @@ public void test_reconnect_onStreamHangup() throws Exception { // in the queue to wake the sender up. out.hangup(); in.expectMessage(START); - out.emitEvent(Event.STARTED); + out.emitEventAsync(Event.STARTED); tasks.add(context.add(WeaviateObject.of())); recvDataAndAck(); @@ -428,7 +434,7 @@ public void test_reconnect_DrainAfterStreamHangup() throws Exception { // When the server starts accepting connections again, the client should // drain the remaining BATCH_SIZE+1 objects as we close the context. in.expectMessage(START); - out.emitEvent(Event.STARTED); + out.emitEventAsync(Event.STARTED); Future backgroundAcks = backgroundThread.submit(() -> { try { recvDataAndAck(); @@ -455,7 +461,7 @@ public void test_reconnect_DrainAfterStreamHangup() throws Exception { public void test_closeAfterStreamHangup() throws Exception { out.hangup(); in.expectMessage(START); - out.emitEvent(Event.STARTED); + out.emitEventAsync(Event.STARTED); } @Test @@ -493,7 +499,7 @@ public void test_startAfterClose() throws Exception { */ private List recvDataAndAck() throws InterruptedException { List received = recvData(); - out.emitEvent(new Event.Acks(received)); + out.emitEventAsync(new Event.Acks(received)); return received; } @@ -521,7 +527,16 @@ private static final class OutboundStream { this.eventThread = eventThread; } - CompletableFuture emitEvent(Event event) { + /** Emit event on the current thread. */ + void emitEvent(Event event) { + assert event != Event.EOF : "must not use synthetic EOF event"; + assert !(event instanceof Event.StreamHangup) : "must not use synthetic StreamHangup event"; + + stream.onNext(event); + } + + /** Emit event on the {@link #eventThread}. */ + CompletableFuture emitEventAsync(Event event) { assert event != Event.EOF : "must not use synthetic EOF event"; assert !(event instanceof Event.StreamHangup) : "must not use synthetic StreamHangup event"; @@ -551,7 +566,7 @@ CompletableFuture eof(boolean ok) { if (ok) { // These are guaranteed to finish before onCompleted, // as eventThread is just 1 thread. - pendingEvents.forEach(this::emitEvent); + pendingEvents.forEach(this::emitEventAsync); } return CompletableFuture.runAsync(stream::onCompleted, eventThread); }