Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,11 @@
import org.assertj.core.api.Assertions;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestName;
import org.junit.rules.TestWatcher;
import org.junit.runner.Description;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -95,13 +99,27 @@ private StreamObserver<Message> createStream(StreamObserver<Event> recv) {
return in;
}

@Rule
public TestName currentTest = new TestName();

private boolean testFailed;

@Rule
public TestWatcher __ = new TestWatcher() {
@Override
protected void failed(Throwable e, Description description) {
testFailed = true;
}
};

/**
* Create new unstarted context with default maxSizeBytes, collection
* descriptor, and collection handle defaults.
*/
@Before
public void startContext() throws InterruptedException {
log.debug("===================startContext==================");
log.debug(currentTest.getMethodName());

assert !Thread.currentThread().isInterrupted() : "main thread interrupted";
assert REQUEST_QUEUE.isEmpty() : "stream contains incoming message " + REQUEST_QUEUE.peek();
Expand All @@ -117,12 +135,21 @@ public void startContext() throws InterruptedException {
context.start();

in.expectMessage(START);
out.emitEvent(Event.STARTED);
out.emitEventAsync(Event.STARTED);
}

@After
public void reset() throws Exception {
if (!contextClosed) {
log.atDebug()
.addKeyValue("contextClosed", contextClosed)
.addKeyValue("testFailed", testFailed)
.log("Begin test cleanup");

// Do not attempt to close the context if it has been previously closed
// by the test or the test has failed. In the latter case closing the
// context may lead to a deadlock if the case hasn't scheduled Results
// for all submitted messages.
if (!contextClosed && !testFailed) {
closeContext();
}

Expand Down Expand Up @@ -159,7 +186,7 @@ private void closeContext() throws Exception {

try {
context.close();
eof.get();
eof.get(5, TimeUnit.SECONDS);
} finally {
contextClosed = true;
}
Expand All @@ -175,17 +202,21 @@ public void test_sendOneBatch() throws Exception {
// BatchContext should flush the current batch once it hits its limit.
// We will ack all items in the batch and send successful result for each one.
List<String> received = recvDataAndAck();
out.beforeEof(new Event.Results(received, Collections.emptyMap()));

Assertions.assertThat(tasks)
.extracting(TaskHandle::id).containsExactlyInAnyOrderElementsOf(received);
Assertions.assertThat(tasks)
.extracting(TaskHandle::isAcked).allMatch(CompletableFuture::isDone);

out.beforeEof(new Event.Results(received, Collections.emptyMap()));
CompletableFuture<?>[] tasksAcked = tasks.stream()
.map(TaskHandle::isAcked).toArray(CompletableFuture[]::new);
Assertions.assertThat(CompletableFuture.allOf(tasksAcked))
.succeedsWithin(5, TimeUnit.SECONDS);

// Since MockServer runs in the same thread as this test,
// the context will be updated before the last emitEvent returns.
closeContext();

// By the time context.close() returns all tasks MUST have results set.
Assertions.assertThat(tasks).extracting(TaskHandle::result)
.allMatch(CompletableFuture::isDone)
.extracting(CompletableFuture::get).extracting(TaskHandle.Result::error)
Expand All @@ -207,8 +238,11 @@ public void test_drainOnClose() throws Exception {
List<String> received = recvDataAndAck();
Assertions.assertThat(tasks).extracting(TaskHandle::id)
.containsExactlyInAnyOrderElementsOf(received);
Assertions.assertThat(tasks).extracting(TaskHandle::isAcked)
.allMatch(CompletableFuture::isDone);

CompletableFuture<?>[] tasksAcked = tasks.stream()
.map(TaskHandle::isAcked).toArray(CompletableFuture[]::new);
Assertions.assertThat(CompletableFuture.allOf(tasksAcked))
.succeedsWithin(5, TimeUnit.SECONDS);
} catch (Exception e) {
throw new RuntimeException(e);
}
Expand Down Expand Up @@ -273,6 +307,7 @@ public void test_backoffBacklog() throws Exception {
int batchSizeNew = BATCH_SIZE / 2;

// Force the last BATCH_SIZE / 2 - 1 items to be transferred to the backlog.
// Await for this event to be processed before moving forward.
out.emitEvent(new Event.Backoff(batchSizeNew));

// The next item will go on the backlog and the trigger a flush,
Expand All @@ -298,14 +333,14 @@ public void test_backoffBacklog() throws Exception {

@Test
public void test_reconnect_onShutdown() throws Exception {
out.emitEvent(Event.SHUTTING_DOWN);
out.emitEventAsync(Event.SHUTTING_DOWN);
in.expectMessage(STOP);
out.eof(true);
in.expectMessage(START);

// Not strictly necessary -- we can close the context
// before a new connection is established.
out.emitEvent(Event.STARTED);
out.emitEventAsync(Event.STARTED);
}

@Test
Expand All @@ -319,18 +354,18 @@ public void test_reconnect_onOom() throws Exception {

// Respond with OOM and wait for the client to close its end of the stream.
in.expectMessage(DATA);
out.emitEvent(new Event.Oom(0));
out.emitEventAsync(new Event.Oom(0));

// Close the server's end of the stream.
in.expectMessage(STOP);

// Allow the client to reconnect to another "instance" and Ack the batch.
in.expectMessage(START);
out.emitEvent(Event.STARTED);
out.emitEventAsync(Event.STARTED);
recvDataAndAck();

List<String> submitted = tasks.stream().map(TaskHandle::id).toList();
out.emitEvent(new Event.Results(submitted, Collections.emptyMap()));
out.emitEventAsync(new Event.Results(submitted, Collections.emptyMap()));
}

@Test
Expand All @@ -347,7 +382,7 @@ public void test_reconnect_onStreamHangup() throws Exception {

// The client should try to reconnect, because the context is still open.
in.expectMessage(START);
out.emitEvent(Event.STARTED);
out.emitEventAsync(Event.STARTED);

// The previous batch hasn't been acked, so we should expect to receive it
// again.
Expand All @@ -358,7 +393,7 @@ public void test_reconnect_onStreamHangup() throws Exception {
// in the queue to wake the sender up.
out.hangup();
in.expectMessage(START);
out.emitEvent(Event.STARTED);
out.emitEventAsync(Event.STARTED);
tasks.add(context.add(WeaviateObject.of()));
recvDataAndAck();

Expand Down Expand Up @@ -399,7 +434,7 @@ public void test_reconnect_DrainAfterStreamHangup() throws Exception {
// When the server starts accepting connections again, the client should
// drain the remaining BATCH_SIZE+1 objects as we close the context.
in.expectMessage(START);
out.emitEvent(Event.STARTED);
out.emitEventAsync(Event.STARTED);
Future<?> backgroundAcks = backgroundThread.submit(() -> {
try {
recvDataAndAck();
Expand All @@ -426,7 +461,7 @@ public void test_reconnect_DrainAfterStreamHangup() throws Exception {
public void test_closeAfterStreamHangup() throws Exception {
out.hangup();
in.expectMessage(START);
out.emitEvent(Event.STARTED);
out.emitEventAsync(Event.STARTED);
}

@Test
Expand Down Expand Up @@ -464,7 +499,7 @@ public void test_startAfterClose() throws Exception {
*/
private List<String> recvDataAndAck() throws InterruptedException {
List<String> received = recvData();
out.emitEvent(new Event.Acks(received));
out.emitEventAsync(new Event.Acks(received));
return received;
}

Expand Down Expand Up @@ -492,7 +527,16 @@ private static final class OutboundStream {
this.eventThread = eventThread;
}

CompletableFuture<Void> emitEvent(Event event) {
/** Emit event on the current thread. */
void emitEvent(Event event) {
assert event != Event.EOF : "must not use synthetic EOF event";
assert !(event instanceof Event.StreamHangup) : "must not use synthetic StreamHangup event";

stream.onNext(event);
}

/** Emit event on the {@link #eventThread}. */
CompletableFuture<Void> emitEventAsync(Event event) {
assert event != Event.EOF : "must not use synthetic EOF event";
assert !(event instanceof Event.StreamHangup) : "must not use synthetic StreamHangup event";

Expand Down Expand Up @@ -522,7 +566,7 @@ CompletableFuture<Void> eof(boolean ok) {
if (ok) {
// These are guaranteed to finish before onCompleted,
// as eventThread is just 1 thread.
pendingEvents.forEach(this::emitEvent);
pendingEvents.forEach(this::emitEventAsync);
}
return CompletableFuture.runAsync(stream::onCompleted, eventThread);
}
Expand Down
Loading