Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ If you are using Maven with [BOM][libraries-bom], add this to your pom.xml file:
<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>libraries-bom</artifactId>
<version>26.76.0</version>
<version>26.78.0</version>
<type>pom</type>
<scope>import</scope>
</dependency>
Expand All @@ -41,7 +41,7 @@ If you are using Maven without the BOM, add this to your dependencies:
<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>google-cloud-spanner</artifactId>
<version>6.110.0</version>
<version>6.112.0</version>
</dependency>

```
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,18 @@
import com.google.api.core.InternalApi;
import com.google.spanner.v1.BeginTransactionRequest;
import com.google.spanner.v1.CacheUpdate;
import com.google.spanner.v1.CommitRequest;
import com.google.spanner.v1.DirectedReadOptions;
import com.google.spanner.v1.ExecuteSqlRequest;
import com.google.spanner.v1.Mutation;
import com.google.spanner.v1.ReadRequest;
import com.google.spanner.v1.RoutingHint;
import com.google.spanner.v1.TransactionOptions;
import com.google.spanner.v1.TransactionSelector;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicLong;

/**
Expand Down Expand Up @@ -95,31 +100,59 @@ public ChannelEndpoint findServer(BeginTransactionRequest.Builder reqBuilder) {
if (!reqBuilder.hasMutationKey()) {
return null;
}
TargetRange target = recipeCache.mutationToTargetRange(reqBuilder.getMutationKey());
if (target == null) {
return routeMutation(
reqBuilder.getMutationKey(),
preferLeader(reqBuilder.getOptions()),
reqBuilder.getRoutingHintBuilder());
}

public ChannelEndpoint fillRoutingHint(CommitRequest.Builder reqBuilder) {
Mutation mutation = selectMutationForRouting(reqBuilder.getMutationsList());
if (mutation == null) {
return null;
}
RoutingHint.Builder hintBuilder = RoutingHint.newBuilder();
hintBuilder.setKey(target.start);
if (!target.limit.isEmpty()) {
hintBuilder.setLimitKey(target.limit);
return routeMutation(mutation, /* preferLeader= */ true, reqBuilder.getRoutingHintBuilder());
}

private static Mutation selectMutationForRouting(List<Mutation> mutations) {
if (mutations.isEmpty()) {
return null;
}
List<Mutation> mutationsExcludingInsert = new ArrayList<>();
Mutation largestInsertMutation = null;
for (Mutation mutation : mutations) {
if (!mutation.hasInsert()) {
mutationsExcludingInsert.add(mutation);
continue;
}
if (largestInsertMutation == null
|| mutation.getInsert().getValuesCount()
> largestInsertMutation.getInsert().getValuesCount()) {
largestInsertMutation = mutation;
}
}
if (!mutationsExcludingInsert.isEmpty()) {
return mutationsExcludingInsert.get(
ThreadLocalRandom.current().nextInt(mutationsExcludingInsert.size()));
}
return largestInsertMutation;
}

private ChannelEndpoint routeMutation(
Mutation mutation, boolean preferLeader, RoutingHint.Builder hintBuilder) {
recipeCache.applySchemaGeneration(hintBuilder);
TargetRange target = recipeCache.mutationToTargetRange(mutation);
if (target == null) {
return null;
}
recipeCache.applyTargetRange(hintBuilder, target);
return fillRoutingHint(
preferLeader(reqBuilder.getOptions()),
preferLeader,
KeyRangeCache.RangeMode.COVERING_SPLIT,
DirectedReadOptions.getDefaultInstance(),
hintBuilder);
}

private ChannelEndpoint fillRoutingHint(
TransactionSelector transactionSelector,
DirectedReadOptions directedReadOptions,
KeyRangeCache.RangeMode rangeMode,
RoutingHint.Builder hintBuilder) {
return fillRoutingHint(
preferLeader(transactionSelector), rangeMode, directedReadOptions, hintBuilder);
}

private ChannelEndpoint fillRoutingHint(
boolean preferLeader,
KeyRangeCache.RangeMode rangeMode,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.google.protobuf.ByteString;
import com.google.spanner.v1.BeginTransactionRequest;
import com.google.spanner.v1.CommitRequest;
import com.google.spanner.v1.CommitResponse;
import com.google.spanner.v1.ExecuteSqlRequest;
import com.google.spanner.v1.PartialResultSet;
import com.google.spanner.v1.ReadRequest;
Expand All @@ -47,9 +48,10 @@
/**
* ManagedChannel that routes eligible requests using location-aware routing hints.
*
* <p>Routing hints are applied to streaming read/query and unary ExecuteSql. Commit/Rollback use
* transaction affinity when available. BeginTransaction is routed only when a mutation key is
* provided.
* <p>Routing hints are applied to streaming read/query and unary ExecuteSql. Mutation-based
* BeginTransaction and Commit requests also carry routing hints when recipes are available.
* Commit/Rollback use transaction affinity when available. BeginTransaction is routed only when a
* mutation key is provided.
*/
@InternalApi
final class KeyAwareChannel extends ManagedChannel {
Expand Down Expand Up @@ -355,8 +357,10 @@ public void sendMessage(RequestT message) {
BeginTransactionRequest.Builder reqBuilder =
((BeginTransactionRequest) message).toBuilder();
String databaseId = parentChannel.extractDatabaseIdFromSession(reqBuilder.getSession());
if (databaseId != null && reqBuilder.hasMutationKey()) {
if (databaseId != null) {
finder = parentChannel.getOrCreateChannelFinder(databaseId);
}
if (finder != null && reqBuilder.hasMutationKey()) {
endpoint = finder.findServer(reqBuilder);
}
if (reqBuilder.hasOptions() && reqBuilder.getOptions().hasReadOnly()) {
Expand All @@ -368,10 +372,27 @@ public void sendMessage(RequestT message) {
message = (RequestT) reqBuilder.build();
} else if (message instanceof CommitRequest) {
CommitRequest request = (CommitRequest) message;
String databaseId = parentChannel.extractDatabaseIdFromSession(request.getSession());
if (databaseId != null) {
finder = parentChannel.getOrCreateChannelFinder(databaseId);
}
CommitRequest.Builder reqBuilder = null;
if (finder != null && request.getMutationsCount() > 0) {
reqBuilder = request.toBuilder();
endpoint = finder.fillRoutingHint(reqBuilder);
request = reqBuilder.build();
}
Comment on lines +376 to +392
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Since getOrCreateChannelFinder never returns null, the null check for finder is redundant. You can combine these two if statements for better readability and conciseness.

          if (databaseId != null) {
            finder = parentChannel.getOrCreateChannelFinder(databaseId);
            finder.fillRoutingHint(reqBuilder);
          }

if (!request.getTransactionId().isEmpty()) {
endpoint = parentChannel.affinityEndpoint(request.getTransactionId());
ChannelEndpoint affinityEndpoint =
parentChannel.affinityEndpoint(request.getTransactionId());
if (affinityEndpoint != null) {
endpoint = affinityEndpoint;
}
transactionIdToClear = request.getTransactionId();
}
if (reqBuilder != null) {
message = (RequestT) request;
}
} else if (message instanceof RollbackRequest) {
RollbackRequest request = (RollbackRequest) message;
if (!request.getTransactionId().isEmpty()) {
Expand Down Expand Up @@ -610,7 +631,15 @@ public void onMessage(ResponseT message) {
transactionId = transactionIdFromMetadata(response);
} else if (message instanceof Transaction) {
Transaction response = (Transaction) message;
if (response.hasCacheUpdate() && call.channelFinder != null) {
call.channelFinder.update(response.getCacheUpdate());
}
transactionId = transactionIdFromTransaction(response);
} else if (message instanceof CommitResponse) {
CommitResponse response = (CommitResponse) message;
if (response.hasCacheUpdate() && call.channelFinder != null) {
call.channelFinder.update(response.getCacheUpdate());
}
}
if (transactionId != null) {
if (call.isReadOnlyBegin) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,9 +158,7 @@ public void computeKeys(ReadRequest.Builder reqBuilder) {
long reqFp = fingerprint(reqBuilder.buildPartial());

RoutingHint.Builder hintBuilder = reqBuilder.getRoutingHintBuilder();
if (!schemaGeneration.isEmpty()) {
hintBuilder.setSchemaGeneration(schemaGeneration);
}
applySchemaGeneration(hintBuilder);

PreparedRead preparedRead = getIfPresent(preparedReads, reqFp);
if (preparedRead == null) {
Expand All @@ -186,10 +184,7 @@ public void computeKeys(ReadRequest.Builder reqBuilder) {

try {
TargetRange target = recipe.keySetToTargetRange(reqBuilder.getKeySet());
hintBuilder.setKey(target.start);
if (!target.limit.isEmpty()) {
hintBuilder.setLimitKey(target.limit);
}
applyTargetRange(hintBuilder, target);
} catch (IllegalArgumentException e) {
logger.fine("Failed key encoding: " + e.getMessage());
}
Expand All @@ -199,9 +194,7 @@ public void computeKeys(ExecuteSqlRequest.Builder reqBuilder) {
long reqFp = fingerprint(reqBuilder.buildPartial());

RoutingHint.Builder hintBuilder = reqBuilder.getRoutingHintBuilder();
if (!schemaGeneration.isEmpty()) {
hintBuilder.setSchemaGeneration(schemaGeneration);
}
applySchemaGeneration(hintBuilder);

PreparedQuery preparedQuery = getIfPresent(preparedQueries, reqFp);
if (preparedQuery == null) {
Expand All @@ -221,15 +214,25 @@ public void computeKeys(ExecuteSqlRequest.Builder reqBuilder) {

try {
TargetRange target = recipe.queryParamsToTargetRange(reqBuilder.getParams());
hintBuilder.setKey(target.start);
if (!target.limit.isEmpty()) {
hintBuilder.setLimitKey(target.limit);
}
applyTargetRange(hintBuilder, target);
} catch (IllegalArgumentException e) {
logger.fine("Failed query param encoding: " + e.getMessage());
}
}

void applySchemaGeneration(RoutingHint.Builder hintBuilder) {
if (!schemaGeneration.isEmpty()) {
hintBuilder.setSchemaGeneration(schemaGeneration);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This updates the hint builder, but the method might still return false if the TargetRange below is null. That seems weird. I would expect this method only to modify the hint builder if it also returns true. Can we otherwise at least add some documentation to the method that explains when it returns true/false, and when it modifies the hint builder?

}
}

void applyTargetRange(RoutingHint.Builder hintBuilder, TargetRange target) {
hintBuilder.setKey(target.start);
if (!target.limit.isEmpty()) {
hintBuilder.setLimitKey(target.limit);
}
}

public TargetRange mutationToTargetRange(Mutation mutation) {
if (mutation == null) {
return null;
Expand Down
Loading
Loading