Skip to content

chore(spanner): handle commit retry protocol extension for mux rw #3449

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Nov 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -409,7 +409,9 @@ ApiFuture<CommitResponse> commitAsync() {
}
builder.addAllMutations(mutationsProto);
finishOps.addListener(
new CommitRunnable(res, finishOps, builder), MoreExecutors.directExecutor());
new CommitRunnable(
res, finishOps, builder, /* retryAttemptDueToCommitProtocolExtension = */ false),
MoreExecutors.directExecutor());
return res;
}

Expand All @@ -418,14 +420,17 @@ private final class CommitRunnable implements Runnable {
private final SettableApiFuture<CommitResponse> res;
private final ApiFuture<Void> prev;
private final CommitRequest.Builder requestBuilder;
private final boolean retryAttemptDueToCommitProtocolExtension;

CommitRunnable(
SettableApiFuture<CommitResponse> res,
ApiFuture<Void> prev,
CommitRequest.Builder requestBuilder) {
CommitRequest.Builder requestBuilder,
boolean retryAttemptDueToCommitProtocolExtension) {
this.res = res;
this.prev = prev;
this.requestBuilder = requestBuilder;
this.retryAttemptDueToCommitProtocolExtension = retryAttemptDueToCommitProtocolExtension;
}

@Override
Expand Down Expand Up @@ -459,6 +464,13 @@ public void run() {
// Set the precommit token in the CommitRequest for multiplexed sessions.
requestBuilder.setPrecommitToken(getLatestPrecommitToken());
}
if (retryAttemptDueToCommitProtocolExtension) {
// When a retry occurs due to the commit protocol extension, clear all mutations because
// they were already buffered in SpanFE during the previous attempt.
requestBuilder.clearMutations();
span.addAnnotation(
"Retrying commit operation with a new precommit token obtained from the previous CommitResponse");
}
final CommitRequest commitRequest = requestBuilder.build();
span.addAnnotation("Starting Commit");
final ApiFuture<com.google.spanner.v1.CommitResponse> commitFuture;
Expand All @@ -479,6 +491,29 @@ public void run() {
return;
}
com.google.spanner.v1.CommitResponse proto = commitFuture.get();

// If the CommitResponse includes a precommit token, the client will retry the
// commit RPC once with the new token and clear any existing mutations.
// This case is applicable only when the read-write transaction uses multiplexed
// session.
if (proto.hasPrecommitToken() && !retryAttemptDueToCommitProtocolExtension) {
// track the latest pre commit token
onPrecommitToken(proto.getPrecommitToken());
span.addAnnotation(
"Commit operation will be retried with new precommit token as the CommitResponse includes a MultiplexedSessionRetry field");
opSpan.end();

// Retry the commit RPC with the latest precommit token from CommitResponse.
new CommitRunnable(
res,
prev,
requestBuilder,
/* retryAttemptDueToCommitProtocolExtension = */ true)
.run();

// Exit to prevent further processing in this attempt.
return;
}
if (!proto.hasCommitTimestamp()) {
throw newSpannerException(
ErrorCode.INTERNAL, "Missing commitTimestamp:\n" + session.getName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -603,6 +603,7 @@ private static void checkStreamException(
private ConcurrentMap<ByteString, Boolean> isPartitionedDmlTransaction =
new ConcurrentHashMap<>();
private ConcurrentMap<ByteString, Boolean> abortedTransactions = new ConcurrentHashMap<>();
private ConcurrentMap<ByteString, Boolean> commitRetryTransactions = new ConcurrentHashMap<>();
private final AtomicBoolean abortNextTransaction = new AtomicBoolean();
private final AtomicBoolean abortNextStatement = new AtomicBoolean();
private final AtomicBoolean ignoreNextInlineBeginRequest = new AtomicBoolean();
Expand Down Expand Up @@ -2045,15 +2046,23 @@ public void commit(CommitRequest request, StreamObserver<CommitResponse> respons
return;
}
simulateAbort(session, request.getTransactionId());
commitTransaction(transaction.getId());
CommitResponse.Builder responseBuilder =
CommitResponse.newBuilder().setCommitTimestamp(getCurrentGoogleTimestamp());
if (request.getReturnCommitStats()) {
responseBuilder.setCommitStats(
com.google.spanner.v1.CommitResponse.CommitStats.newBuilder()
// This is not really always equal, but at least it returns a value.
.setMutationCount(request.getMutationsCount())
.build());
CommitResponse.Builder responseBuilder = CommitResponse.newBuilder();
Optional<Boolean> commitRetry =
Optional.fromNullable(commitRetryTransactions.get(request.getTransactionId()));
if (commitRetry.or(Boolean.FALSE) && session.getMultiplexed()) {
responseBuilder.setPrecommitToken(
getCommitResponsePrecommitToken(request.getTransactionId()));
commitRetryTransactions.remove(request.getTransactionId());
} else {
commitTransaction(transaction.getId());
responseBuilder.setCommitTimestamp(getCurrentGoogleTimestamp());
if (request.getReturnCommitStats()) {
responseBuilder.setCommitStats(
com.google.spanner.v1.CommitResponse.CommitStats.newBuilder()
// This is not really always equal, but at least it returns a value.
.setMutationCount(request.getMutationsCount())
.build());
}
}
responseObserver.onNext(responseBuilder.build());
responseObserver.onCompleted();
Expand Down Expand Up @@ -2134,6 +2143,14 @@ void markAbortedTransaction(ByteString transactionId) {
transactionSequenceNo.remove(transactionId);
}

public void markCommitRetryOnTransaction(ByteString transactionId) {
Transaction transaction = transactions.get(transactionId);
if (transaction == null || !isReadWriteTransaction(transactionId)) {
return;
}
commitRetryTransactions.putIfAbsent(transactionId, Boolean.TRUE);
}

@Override
public void partitionQuery(
PartitionQueryRequest request, StreamObserver<PartitionResponse> responseObserver) {
Expand Down Expand Up @@ -2527,6 +2544,11 @@ static MultiplexedSessionPrecommitToken getExecuteBatchDmlResponsePrecommitToken
return getPrecommitToken("ExecuteBatchDmlResponsePrecommitToken", transactionId);
}

static MultiplexedSessionPrecommitToken getCommitResponsePrecommitToken(
ByteString transactionId) {
return getPrecommitToken("CommitResponsePrecommitToken", transactionId);
}

static MultiplexedSessionPrecommitToken getPrecommitToken(
String value, ByteString transactionId) {
transactionSequenceNo.putIfAbsent(transactionId, new AtomicInteger(0));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1575,6 +1575,66 @@ public void testOtherUnimplementedError_ReadWriteTransactionStillUsesMultiplexed
assertEquals(2L, client.multiplexedSessionDatabaseClient.getNumSessionsReleased().get());
}

@Test
public void testReadWriteTransactionWithCommitRetryProtocolExtensionSet() {
// This test simulates the commit retry protocol extension which occurs when a read-write
// transaction contains read/query + mutation operations.
DatabaseClientImpl client =
(DatabaseClientImpl) spanner.getDatabaseClient(DatabaseId.of("p", "i", "d"));

client
.readWriteTransaction()
.run(
transaction -> {
try (ResultSet resultSet = transaction.executeQuery(STATEMENT)) {
//noinspection StatementWithEmptyBody
while (resultSet.next()) {
// ignore
}
}

Mutation mutation =
Mutation.newInsertBuilder("FOO").set("ID").to(1L).set("NAME").to("Bar").build();
transaction.buffer(mutation);

TransactionContextImpl impl = (TransactionContextImpl) transaction;
// Force the Commit RPC to return a CommitResponse with MultiplexedSessionRetry field
// set.
// This scenario is only possible when a read-write transaction contains read/query +
// mutation operations.
mockSpanner.markCommitRetryOnTransaction(impl.transactionId);
return null;
});

List<ExecuteSqlRequest> executeSqlRequests =
mockSpanner.getRequestsOfType(ExecuteSqlRequest.class);
assertEquals(1, executeSqlRequests.size());
// Verify the request is executed using multiplexed sessions
assertTrue(mockSpanner.getSession(executeSqlRequests.get(0).getSession()).getMultiplexed());

List<CommitRequest> commitRequests = mockSpanner.getRequestsOfType(CommitRequest.class);
assertEquals(2, commitRequests.size());
assertNotNull(commitRequests.get(0).getPrecommitToken());
assertEquals(
ByteString.copyFromUtf8("PartialResultSetPrecommitToken"),
commitRequests.get(0).getPrecommitToken().getPrecommitToken());
// Verify that the first request has mutations set
assertTrue(commitRequests.get(0).getMutationsCount() > 0);

// Second CommitRequest should contain the latest precommit token received via the
// CommitResponse in previous attempt.
assertNotNull(commitRequests.get(1).getPrecommitToken());
assertEquals(
ByteString.copyFromUtf8("CommitResponsePrecommitToken"),
commitRequests.get(1).getPrecommitToken().getPrecommitToken());
// Verify that the commit retry request does not have any mutations set
assertEquals(0, commitRequests.get(1).getMutationsCount());

assertNotNull(client.multiplexedSessionDatabaseClient);
assertEquals(1L, client.multiplexedSessionDatabaseClient.getNumSessionsAcquired().get());
assertEquals(1L, client.multiplexedSessionDatabaseClient.getNumSessionsReleased().get());
}

private void waitForSessionToBeReplaced(DatabaseClientImpl client) {
assertNotNull(client.multiplexedSessionDatabaseClient);
SessionReference sessionReference =
Expand Down
Loading