diff --git a/README.md b/README.md index 0176ad2033..a8a99436fc 100644 --- a/README.md +++ b/README.md @@ -19,7 +19,7 @@ If you are using Maven with [BOM][libraries-bom], add this to your pom.xml file: com.google.cloud libraries-bom - 26.76.0 + 26.78.0 pom import @@ -41,7 +41,7 @@ If you are using Maven without the BOM, add this to your dependencies: com.google.cloud google-cloud-spanner - 6.110.0 + 6.112.0 ``` diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/ChannelFinder.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/ChannelFinder.java index 6c4554ea77..4ced18eb92 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/ChannelFinder.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/ChannelFinder.java @@ -19,13 +19,18 @@ import com.google.api.core.InternalApi; import com.google.spanner.v1.BeginTransactionRequest; import com.google.spanner.v1.CacheUpdate; +import com.google.spanner.v1.CommitRequest; import com.google.spanner.v1.DirectedReadOptions; import com.google.spanner.v1.ExecuteSqlRequest; +import com.google.spanner.v1.Mutation; import com.google.spanner.v1.ReadRequest; import com.google.spanner.v1.RoutingHint; import com.google.spanner.v1.TransactionOptions; import com.google.spanner.v1.TransactionSelector; +import java.util.ArrayList; +import java.util.List; import java.util.Objects; +import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.atomic.AtomicLong; /** @@ -95,31 +100,59 @@ public ChannelEndpoint findServer(BeginTransactionRequest.Builder reqBuilder) { if (!reqBuilder.hasMutationKey()) { return null; } - TargetRange target = recipeCache.mutationToTargetRange(reqBuilder.getMutationKey()); - if (target == null) { + return routeMutation( + reqBuilder.getMutationKey(), + preferLeader(reqBuilder.getOptions()), + reqBuilder.getRoutingHintBuilder()); + } + + public ChannelEndpoint fillRoutingHint(CommitRequest.Builder reqBuilder) { + Mutation mutation = selectMutationForRouting(reqBuilder.getMutationsList()); + if (mutation == null) { return null; } - RoutingHint.Builder hintBuilder = RoutingHint.newBuilder(); - hintBuilder.setKey(target.start); - if (!target.limit.isEmpty()) { - hintBuilder.setLimitKey(target.limit); + return routeMutation(mutation, /* preferLeader= */ true, reqBuilder.getRoutingHintBuilder()); + } + + private static Mutation selectMutationForRouting(List mutations) { + if (mutations.isEmpty()) { + return null; + } + List mutationsExcludingInsert = new ArrayList<>(); + Mutation largestInsertMutation = null; + for (Mutation mutation : mutations) { + if (!mutation.hasInsert()) { + mutationsExcludingInsert.add(mutation); + continue; + } + if (largestInsertMutation == null + || mutation.getInsert().getValuesCount() + > largestInsertMutation.getInsert().getValuesCount()) { + largestInsertMutation = mutation; + } + } + if (!mutationsExcludingInsert.isEmpty()) { + return mutationsExcludingInsert.get( + ThreadLocalRandom.current().nextInt(mutationsExcludingInsert.size())); + } + return largestInsertMutation; + } + + private ChannelEndpoint routeMutation( + Mutation mutation, boolean preferLeader, RoutingHint.Builder hintBuilder) { + recipeCache.applySchemaGeneration(hintBuilder); + TargetRange target = recipeCache.mutationToTargetRange(mutation); + if (target == null) { + return null; } + recipeCache.applyTargetRange(hintBuilder, target); return fillRoutingHint( - preferLeader(reqBuilder.getOptions()), + preferLeader, KeyRangeCache.RangeMode.COVERING_SPLIT, DirectedReadOptions.getDefaultInstance(), hintBuilder); } - private ChannelEndpoint fillRoutingHint( - TransactionSelector transactionSelector, - DirectedReadOptions directedReadOptions, - KeyRangeCache.RangeMode rangeMode, - RoutingHint.Builder hintBuilder) { - return fillRoutingHint( - preferLeader(transactionSelector), rangeMode, directedReadOptions, hintBuilder); - } - private ChannelEndpoint fillRoutingHint( boolean preferLeader, KeyRangeCache.RangeMode rangeMode, diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/KeyAwareChannel.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/KeyAwareChannel.java index 382ccead71..59fc03dfd8 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/KeyAwareChannel.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/KeyAwareChannel.java @@ -23,6 +23,7 @@ import com.google.protobuf.ByteString; import com.google.spanner.v1.BeginTransactionRequest; import com.google.spanner.v1.CommitRequest; +import com.google.spanner.v1.CommitResponse; import com.google.spanner.v1.ExecuteSqlRequest; import com.google.spanner.v1.PartialResultSet; import com.google.spanner.v1.ReadRequest; @@ -47,9 +48,10 @@ /** * ManagedChannel that routes eligible requests using location-aware routing hints. * - *

Routing hints are applied to streaming read/query and unary ExecuteSql. Commit/Rollback use - * transaction affinity when available. BeginTransaction is routed only when a mutation key is - * provided. + *

Routing hints are applied to streaming read/query and unary ExecuteSql. Mutation-based + * BeginTransaction and Commit requests also carry routing hints when recipes are available. + * Commit/Rollback use transaction affinity when available. BeginTransaction is routed only when a + * mutation key is provided. */ @InternalApi final class KeyAwareChannel extends ManagedChannel { @@ -355,8 +357,10 @@ public void sendMessage(RequestT message) { BeginTransactionRequest.Builder reqBuilder = ((BeginTransactionRequest) message).toBuilder(); String databaseId = parentChannel.extractDatabaseIdFromSession(reqBuilder.getSession()); - if (databaseId != null && reqBuilder.hasMutationKey()) { + if (databaseId != null) { finder = parentChannel.getOrCreateChannelFinder(databaseId); + } + if (finder != null && reqBuilder.hasMutationKey()) { endpoint = finder.findServer(reqBuilder); } if (reqBuilder.hasOptions() && reqBuilder.getOptions().hasReadOnly()) { @@ -368,10 +372,27 @@ public void sendMessage(RequestT message) { message = (RequestT) reqBuilder.build(); } else if (message instanceof CommitRequest) { CommitRequest request = (CommitRequest) message; + String databaseId = parentChannel.extractDatabaseIdFromSession(request.getSession()); + if (databaseId != null) { + finder = parentChannel.getOrCreateChannelFinder(databaseId); + } + CommitRequest.Builder reqBuilder = null; + if (finder != null && request.getMutationsCount() > 0) { + reqBuilder = request.toBuilder(); + endpoint = finder.fillRoutingHint(reqBuilder); + request = reqBuilder.build(); + } if (!request.getTransactionId().isEmpty()) { - endpoint = parentChannel.affinityEndpoint(request.getTransactionId()); + ChannelEndpoint affinityEndpoint = + parentChannel.affinityEndpoint(request.getTransactionId()); + if (affinityEndpoint != null) { + endpoint = affinityEndpoint; + } transactionIdToClear = request.getTransactionId(); } + if (reqBuilder != null) { + message = (RequestT) request; + } } else if (message instanceof RollbackRequest) { RollbackRequest request = (RollbackRequest) message; if (!request.getTransactionId().isEmpty()) { @@ -610,7 +631,15 @@ public void onMessage(ResponseT message) { transactionId = transactionIdFromMetadata(response); } else if (message instanceof Transaction) { Transaction response = (Transaction) message; + if (response.hasCacheUpdate() && call.channelFinder != null) { + call.channelFinder.update(response.getCacheUpdate()); + } transactionId = transactionIdFromTransaction(response); + } else if (message instanceof CommitResponse) { + CommitResponse response = (CommitResponse) message; + if (response.hasCacheUpdate() && call.channelFinder != null) { + call.channelFinder.update(response.getCacheUpdate()); + } } if (transactionId != null) { if (call.isReadOnlyBegin) { diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/KeyRecipeCache.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/KeyRecipeCache.java index eff3aeacb6..1e0857108b 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/KeyRecipeCache.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/KeyRecipeCache.java @@ -158,9 +158,7 @@ public void computeKeys(ReadRequest.Builder reqBuilder) { long reqFp = fingerprint(reqBuilder.buildPartial()); RoutingHint.Builder hintBuilder = reqBuilder.getRoutingHintBuilder(); - if (!schemaGeneration.isEmpty()) { - hintBuilder.setSchemaGeneration(schemaGeneration); - } + applySchemaGeneration(hintBuilder); PreparedRead preparedRead = getIfPresent(preparedReads, reqFp); if (preparedRead == null) { @@ -186,10 +184,7 @@ public void computeKeys(ReadRequest.Builder reqBuilder) { try { TargetRange target = recipe.keySetToTargetRange(reqBuilder.getKeySet()); - hintBuilder.setKey(target.start); - if (!target.limit.isEmpty()) { - hintBuilder.setLimitKey(target.limit); - } + applyTargetRange(hintBuilder, target); } catch (IllegalArgumentException e) { logger.fine("Failed key encoding: " + e.getMessage()); } @@ -199,9 +194,7 @@ public void computeKeys(ExecuteSqlRequest.Builder reqBuilder) { long reqFp = fingerprint(reqBuilder.buildPartial()); RoutingHint.Builder hintBuilder = reqBuilder.getRoutingHintBuilder(); - if (!schemaGeneration.isEmpty()) { - hintBuilder.setSchemaGeneration(schemaGeneration); - } + applySchemaGeneration(hintBuilder); PreparedQuery preparedQuery = getIfPresent(preparedQueries, reqFp); if (preparedQuery == null) { @@ -221,15 +214,25 @@ public void computeKeys(ExecuteSqlRequest.Builder reqBuilder) { try { TargetRange target = recipe.queryParamsToTargetRange(reqBuilder.getParams()); - hintBuilder.setKey(target.start); - if (!target.limit.isEmpty()) { - hintBuilder.setLimitKey(target.limit); - } + applyTargetRange(hintBuilder, target); } catch (IllegalArgumentException e) { logger.fine("Failed query param encoding: " + e.getMessage()); } } + void applySchemaGeneration(RoutingHint.Builder hintBuilder) { + if (!schemaGeneration.isEmpty()) { + hintBuilder.setSchemaGeneration(schemaGeneration); + } + } + + void applyTargetRange(RoutingHint.Builder hintBuilder, TargetRange target) { + hintBuilder.setKey(target.start); + if (!target.limit.isEmpty()) { + hintBuilder.setLimitKey(target.limit); + } + } + public TargetRange mutationToTargetRange(Mutation mutation) { if (mutation == null) { return null; diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/spi/v1/KeyAwareChannelTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/spi/v1/KeyAwareChannelTest.java index 123ffba1d4..a4919389a8 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/spi/v1/KeyAwareChannelTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/spi/v1/KeyAwareChannelTest.java @@ -17,20 +17,28 @@ package com.google.cloud.spanner.spi.v1; import static com.google.common.truth.Truth.assertThat; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertThrows; import com.google.api.gax.grpc.InstantiatingGrpcChannelProvider; import com.google.protobuf.ByteString; import com.google.protobuf.Empty; +import com.google.protobuf.ListValue; +import com.google.protobuf.TextFormat; +import com.google.protobuf.Value; import com.google.spanner.v1.BeginTransactionRequest; import com.google.spanner.v1.CacheUpdate; import com.google.spanner.v1.CommitRequest; import com.google.spanner.v1.CommitResponse; import com.google.spanner.v1.ExecuteSqlRequest; import com.google.spanner.v1.Group; +import com.google.spanner.v1.Mutation; import com.google.spanner.v1.PartialResultSet; import com.google.spanner.v1.Range; import com.google.spanner.v1.ReadRequest; +import com.google.spanner.v1.RecipeList; import com.google.spanner.v1.ResultSet; import com.google.spanner.v1.ResultSetMetadata; import com.google.spanner.v1.RollbackRequest; @@ -274,6 +282,277 @@ public void resultSetCacheUpdateRoutesSubsequentRequest() throws Exception { assertThat(harness.endpointCache.callCountForAddress("routed:1234")).isEqualTo(1); } + @Test + public void beginTransactionWithMutationKeyAddsRoutingHint() throws Exception { + TestHarness harness = createHarness(); + seedCache(harness, createMutationRoutingCacheUpdate()); + + Mutation mutation = createInsertMutation("b"); + ClientCall beginCall = + harness.channel.newCall(SpannerGrpc.getBeginTransactionMethod(), CallOptions.DEFAULT); + beginCall.start(new CapturingListener(), new Metadata()); + beginCall.sendMessage( + BeginTransactionRequest.newBuilder().setSession(SESSION).setMutationKey(mutation).build()); + + @SuppressWarnings("unchecked") + RecordingClientCall beginDelegate = + (RecordingClientCall) + harness.defaultManagedChannel.latestCall(); + + assertNotNull(beginDelegate.lastMessage); + assertEquals(7L, beginDelegate.lastMessage.getRoutingHint().getDatabaseId()); + assertEquals( + "1", beginDelegate.lastMessage.getRoutingHint().getSchemaGeneration().toStringUtf8()); + assertFalse(beginDelegate.lastMessage.getRoutingHint().getKey().isEmpty()); + } + + @Test + public void transactionCacheUpdateEnablesCommitRoutingHint() throws Exception { + TestHarness harness = createHarness(); + ByteString transactionId = ByteString.copyFromUtf8("tx-with-cache-update"); + + ClientCall beginCall = + harness.channel.newCall(SpannerGrpc.getBeginTransactionMethod(), CallOptions.DEFAULT); + beginCall.start(new CapturingListener(), new Metadata()); + beginCall.sendMessage(BeginTransactionRequest.newBuilder().setSession(SESSION).build()); + + @SuppressWarnings("unchecked") + RecordingClientCall beginDelegate = + (RecordingClientCall) + harness.defaultManagedChannel.latestCall(); + beginDelegate.emitOnMessage( + Transaction.newBuilder() + .setId(transactionId) + .setCacheUpdate(createMutationRoutingCacheUpdate()) + .build()); + beginDelegate.emitOnClose(Status.OK, new Metadata()); + + ClientCall commitCall = + harness.channel.newCall(SpannerGrpc.getCommitMethod(), CallOptions.DEFAULT); + commitCall.start(new CapturingListener(), new Metadata()); + commitCall.sendMessage( + CommitRequest.newBuilder() + .setSession(SESSION) + .setTransactionId(transactionId) + .addMutations(createInsertMutation("b")) + .build()); + + @SuppressWarnings("unchecked") + RecordingClientCall commitDelegate = + (RecordingClientCall) + harness.defaultManagedChannel.latestCall(); + + assertNotNull(commitDelegate.lastMessage); + assertEquals(7L, commitDelegate.lastMessage.getRoutingHint().getDatabaseId()); + assertEquals( + "1", commitDelegate.lastMessage.getRoutingHint().getSchemaGeneration().toStringUtf8()); + assertFalse(commitDelegate.lastMessage.getRoutingHint().getKey().isEmpty()); + } + + @Test + public void singleUseCommitWithMutationsRoutesUsingRoutingHint() throws Exception { + TestHarness harness = createHarness(); + seedCache(harness, createMutationRecipeCacheUpdate()); + + ClientCall firstCommitCall = + harness.channel.newCall(SpannerGrpc.getCommitMethod(), CallOptions.DEFAULT); + firstCommitCall.start(new CapturingListener(), new Metadata()); + firstCommitCall.sendMessage( + CommitRequest.newBuilder() + .setSession(SESSION) + .setSingleUseTransaction( + TransactionOptions.newBuilder() + .setReadWrite(TransactionOptions.ReadWrite.getDefaultInstance())) + .addMutations(createInsertMutation("b")) + .build()); + + @SuppressWarnings("unchecked") + RecordingClientCall firstCommitDelegate = + (RecordingClientCall) + harness.defaultManagedChannel.latestCall(); + + assertNotNull(firstCommitDelegate.lastMessage); + RoutingHint routingHint = firstCommitDelegate.lastMessage.getRoutingHint(); + assertFalse(routingHint.getKey().isEmpty()); + + seedCache(harness, createRangeCacheUpdateForHint(routingHint)); + + ClientCall secondCommitCall = + harness.channel.newCall(SpannerGrpc.getCommitMethod(), CallOptions.DEFAULT); + secondCommitCall.start(new CapturingListener(), new Metadata()); + secondCommitCall.sendMessage( + CommitRequest.newBuilder() + .setSession(SESSION) + .setSingleUseTransaction( + TransactionOptions.newBuilder() + .setReadWrite(TransactionOptions.ReadWrite.getDefaultInstance())) + .addMutations(createInsertMutation("b")) + .build()); + + assertThat(harness.endpointCache.callCountForAddress(DEFAULT_ADDRESS)).isEqualTo(3); + assertThat(harness.endpointCache.callCountForAddress("server-a:1234")).isEqualTo(1); + + @SuppressWarnings("unchecked") + RecordingClientCall commitDelegate = + (RecordingClientCall) + harness.endpointCache.latestCallForAddress("server-a:1234"); + + assertNotNull(commitDelegate.lastMessage); + assertEquals(7L, commitDelegate.lastMessage.getRoutingHint().getDatabaseId()); + assertEquals( + "1", commitDelegate.lastMessage.getRoutingHint().getSchemaGeneration().toStringUtf8()); + assertFalse(commitDelegate.lastMessage.getRoutingHint().getKey().isEmpty()); + } + + @Test + public void singleUseCommitUsesSameMutationSelectionHeuristicAsBeginTransaction() + throws Exception { + TestHarness harness = createHarness(); + seedCache(harness, createMutationRecipeCacheUpdate()); + + Mutation deleteMutation = createDeleteMutation("b"); + + ClientCall beginCall = + harness.channel.newCall(SpannerGrpc.getBeginTransactionMethod(), CallOptions.DEFAULT); + beginCall.start(new CapturingListener(), new Metadata()); + beginCall.sendMessage( + BeginTransactionRequest.newBuilder() + .setSession(SESSION) + .setMutationKey(deleteMutation) + .build()); + + @SuppressWarnings("unchecked") + RecordingClientCall beginDelegate = + (RecordingClientCall) + harness.defaultManagedChannel.latestCall(); + + assertNotNull(beginDelegate.lastMessage); + RoutingHint expectedRoutingHint = beginDelegate.lastMessage.getRoutingHint(); + + ClientCall commitCall = + harness.channel.newCall(SpannerGrpc.getCommitMethod(), CallOptions.DEFAULT); + commitCall.start(new CapturingListener(), new Metadata()); + commitCall.sendMessage( + CommitRequest.newBuilder() + .setSession(SESSION) + .setSingleUseTransaction( + TransactionOptions.newBuilder() + .setReadWrite(TransactionOptions.ReadWrite.getDefaultInstance())) + .addMutations(createInsertMutation("a")) + .addMutations(deleteMutation) + .build()); + + @SuppressWarnings("unchecked") + RecordingClientCall commitDelegate = + (RecordingClientCall) + harness.defaultManagedChannel.latestCall(); + + assertNotNull(commitDelegate.lastMessage); + assertEquals(expectedRoutingHint, commitDelegate.lastMessage.getRoutingHint()); + } + + @Test + public void commitWithTransactionIdRoutesUsingRoutingHintWhenAffinityMissing() throws Exception { + TestHarness harness = createHarness(); + ByteString transactionId = ByteString.copyFromUtf8("tx-without-affinity"); + seedCache(harness, createMutationRecipeCacheUpdate()); + + ClientCall firstCommitCall = + harness.channel.newCall(SpannerGrpc.getCommitMethod(), CallOptions.DEFAULT); + firstCommitCall.start(new CapturingListener(), new Metadata()); + firstCommitCall.sendMessage( + CommitRequest.newBuilder() + .setSession(SESSION) + .setTransactionId(transactionId) + .addMutations(createInsertMutation("b")) + .build()); + + @SuppressWarnings("unchecked") + RecordingClientCall firstCommitDelegate = + (RecordingClientCall) + harness.defaultManagedChannel.latestCall(); + + assertNotNull(firstCommitDelegate.lastMessage); + RoutingHint routingHint = firstCommitDelegate.lastMessage.getRoutingHint(); + assertFalse(routingHint.getKey().isEmpty()); + + seedCache(harness, createRangeCacheUpdateForHint(routingHint)); + + ClientCall secondCommitCall = + harness.channel.newCall(SpannerGrpc.getCommitMethod(), CallOptions.DEFAULT); + secondCommitCall.start(new CapturingListener(), new Metadata()); + secondCommitCall.sendMessage( + CommitRequest.newBuilder() + .setSession(SESSION) + .setTransactionId(transactionId) + .addMutations(createInsertMutation("b")) + .build()); + + assertThat(harness.endpointCache.callCountForAddress(DEFAULT_ADDRESS)).isEqualTo(3); + assertThat(harness.endpointCache.callCountForAddress("server-a:1234")).isEqualTo(1); + + @SuppressWarnings("unchecked") + RecordingClientCall commitDelegate = + (RecordingClientCall) + harness.endpointCache.latestCallForAddress("server-a:1234"); + + assertNotNull(commitDelegate.lastMessage); + assertEquals(7L, commitDelegate.lastMessage.getRoutingHint().getDatabaseId()); + assertEquals( + "1", commitDelegate.lastMessage.getRoutingHint().getSchemaGeneration().toStringUtf8()); + assertFalse(commitDelegate.lastMessage.getRoutingHint().getKey().isEmpty()); + } + + @Test + public void commitResponseCacheUpdateEnablesSubsequentBeginRoutingHint() throws Exception { + TestHarness harness = createHarness(); + ByteString transactionId = ByteString.copyFromUtf8("tx-before-commit-cache-update"); + + ClientCall beginCall = + harness.channel.newCall(SpannerGrpc.getBeginTransactionMethod(), CallOptions.DEFAULT); + beginCall.start(new CapturingListener(), new Metadata()); + beginCall.sendMessage(BeginTransactionRequest.newBuilder().setSession(SESSION).build()); + + @SuppressWarnings("unchecked") + RecordingClientCall beginDelegate = + (RecordingClientCall) + harness.defaultManagedChannel.latestCall(); + beginDelegate.emitOnMessage(Transaction.newBuilder().setId(transactionId).build()); + beginDelegate.emitOnClose(Status.OK, new Metadata()); + + ClientCall commitCall = + harness.channel.newCall(SpannerGrpc.getCommitMethod(), CallOptions.DEFAULT); + commitCall.start(new CapturingListener(), new Metadata()); + commitCall.sendMessage( + CommitRequest.newBuilder().setSession(SESSION).setTransactionId(transactionId).build()); + + @SuppressWarnings("unchecked") + RecordingClientCall commitDelegate = + (RecordingClientCall) + harness.defaultManagedChannel.latestCall(); + commitDelegate.emitOnMessage( + CommitResponse.newBuilder().setCacheUpdate(createMutationRoutingCacheUpdate()).build()); + commitDelegate.emitOnClose(Status.OK, new Metadata()); + + Mutation mutation = createInsertMutation("b"); + ClientCall secondBeginCall = + harness.channel.newCall(SpannerGrpc.getBeginTransactionMethod(), CallOptions.DEFAULT); + secondBeginCall.start(new CapturingListener(), new Metadata()); + secondBeginCall.sendMessage( + BeginTransactionRequest.newBuilder().setSession(SESSION).setMutationKey(mutation).build()); + + @SuppressWarnings("unchecked") + RecordingClientCall routedBeginDelegate = + (RecordingClientCall) + harness.defaultManagedChannel.latestCall(); + + assertNotNull(routedBeginDelegate.lastMessage); + assertEquals(7L, routedBeginDelegate.lastMessage.getRoutingHint().getDatabaseId()); + assertEquals( + "1", routedBeginDelegate.lastMessage.getRoutingHint().getSchemaGeneration().toStringUtf8()); + assertFalse(routedBeginDelegate.lastMessage.getRoutingHint().getKey().isEmpty()); + } + @Test public void readOnlyTransactionRoutesEachReadIndependently() throws Exception { TestHarness harness = createHarness(); @@ -635,6 +914,58 @@ private static CacheUpdate createTwoRangeCacheUpdate() { .build(); } + private static CacheUpdate createMutationRoutingCacheUpdate() throws TextFormat.ParseException { + return createMutationRecipeCacheUpdate().toBuilder() + .mergeFrom( + createRangeCacheUpdateForHint(RoutingHint.newBuilder().setKey(bytes("a")).build())) + .build(); + } + + private static CacheUpdate createMutationRecipeCacheUpdate() throws TextFormat.ParseException { + RecipeList keyRecipes = + parseRecipeList( + "schema_generation: \"1\"\n" + + "recipe {\n" + + " table_name: \"T\"\n" + + " part { tag: 1 }\n" + + " part {\n" + + " order: ASCENDING\n" + + " null_order: NULLS_FIRST\n" + + " type { code: STRING }\n" + + " identifier: \"k\"\n" + + " }\n" + + "}\n"); + return CacheUpdate.newBuilder().setDatabaseId(7L).setKeyRecipes(keyRecipes).build(); + } + + private static CacheUpdate createRangeCacheUpdateForHint(RoutingHint hint) { + ByteString key = hint.getKey(); + ByteString limitKey = + hint.getLimitKey().isEmpty() + ? key.concat(ByteString.copyFrom(new byte[] {0})) + : hint.getLimitKey(); + return CacheUpdate.newBuilder() + .setDatabaseId(7L) + .addRange( + Range.newBuilder() + .setStartKey(key) + .setLimitKey(limitKey) + .setGroupUid(1L) + .setSplitId(1L) + .setGeneration(bytes("1"))) + .addGroup( + Group.newBuilder() + .setGroupUid(1L) + .setGeneration(bytes("1")) + .addTablets( + Tablet.newBuilder() + .setTabletUid(1L) + .setServerAddress("server-a:1234") + .setIncarnation(bytes("1")) + .setDistance(0))) + .build(); + } + private static void seedCache(TestHarness harness, CacheUpdate cacheUpdate) { ClientCall seedCall = harness.channel.newCall(SpannerGrpc.getExecuteSqlMethod(), CallOptions.DEFAULT); @@ -652,6 +983,40 @@ private static void seedCache(TestHarness harness, CacheUpdate cacheUpdate) { seedDelegate.emitOnMessage(ResultSet.newBuilder().setCacheUpdate(cacheUpdate).build()); } + private static Mutation createInsertMutation(String keyValue) { + return Mutation.newBuilder() + .setInsert( + Mutation.Write.newBuilder() + .setTable("T") + .addColumns("k") + .addValues( + ListValue.newBuilder() + .addValues(Value.newBuilder().setStringValue(keyValue).build()) + .build())) + .build(); + } + + private static Mutation createDeleteMutation(String keyValue) { + return Mutation.newBuilder() + .setDelete( + Mutation.Delete.newBuilder() + .setTable("T") + .setKeySet( + com.google.spanner.v1.KeySet.newBuilder() + .addKeys( + ListValue.newBuilder() + .addValues(Value.newBuilder().setStringValue(keyValue).build()) + .build()) + .build())) + .build(); + } + + private static RecipeList parseRecipeList(String text) throws TextFormat.ParseException { + RecipeList.Builder builder = RecipeList.newBuilder(); + TextFormat.merge(text, builder); + return builder.build(); + } + private static TestHarness createHarness() throws IOException { FakeEndpointCache endpointCache = new FakeEndpointCache(DEFAULT_ADDRESS); InstantiatingGrpcChannelProvider provider = @@ -841,6 +1206,7 @@ int callCount() { private static final class RecordingClientCall extends ClientCall { @Nullable private ClientCall.Listener listener; + @Nullable private RequestT lastMessage; private boolean cancelCalled; @Nullable private String cancelMessage; @Nullable private Throwable cancelCause; @@ -864,7 +1230,9 @@ public void cancel(@Nullable String message, @Nullable Throwable cause) { public void halfClose() {} @Override - public void sendMessage(RequestT message) {} + public void sendMessage(RequestT message) { + this.lastMessage = message; + } void emitOnMessage(ResponseT response) { if (listener != null) {