Skip to content

Commit c580df8

Browse files
authored
fix: AsyncTransactionManager should rollback on close (#505)
* fix: AsyncTransactionManager should rollback on close AsyncTransctionManager should rollback the transaction if close is called while the transaction is still in state STARTED. Failing to do so, will keep the transaction open on the backend for longer than necessary, holding on to locks until it is garbage collected after 10 seconds. Fixes #504 * feat: return rollback result from close * fix: add ignored diff to clirr
1 parent 3914577 commit c580df8

File tree

6 files changed

+101
-7
lines changed

6 files changed

+101
-7
lines changed

google-cloud-spanner/clirr-ignored-differences.xml

+6
Original file line numberDiff line numberDiff line change
@@ -371,4 +371,10 @@
371371
<className>com/google/cloud/spanner/ResultSets</className>
372372
<method>com.google.cloud.spanner.AsyncResultSet toAsyncResultSet(com.google.cloud.spanner.ResultSet, com.google.api.gax.core.ExecutorProvider)</method>
373373
</difference>
374+
375+
<difference>
376+
<differenceType>7012</differenceType>
377+
<className>com/google/cloud/spanner/AsyncTransactionManager</className>
378+
<method>com.google.api.core.ApiFuture closeAsync()</method>
379+
</difference>
374380
</differences>

google-cloud-spanner/src/main/java/com/google/cloud/spanner/AsyncTransactionManager.java

+7-3
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,6 @@
1818

1919
import com.google.api.core.ApiFuture;
2020
import com.google.cloud.Timestamp;
21-
import com.google.cloud.spanner.AsyncTransactionManager.AsyncTransactionFunction;
22-
import com.google.cloud.spanner.AsyncTransactionManager.CommitTimestampFuture;
23-
import com.google.cloud.spanner.AsyncTransactionManager.TransactionContextFuture;
2421
import com.google.cloud.spanner.TransactionManager.TransactionState;
2522
import com.google.common.util.concurrent.ListenableFuture;
2623
import com.google.common.util.concurrent.MoreExecutors;
@@ -200,4 +197,11 @@ public interface AsyncTransactionFunction<I, O> {
200197
*/
201198
@Override
202199
void close();
200+
201+
/**
202+
* Closes the transaction manager. If there is an active transaction, it will be rolled back. The
203+
* underlying session will be released back to the session pool. The returned {@link ApiFuture} is
204+
* done when the transaction (if any) has been rolled back.
205+
*/
206+
ApiFuture<Void> closeAsync();
203207
}

google-cloud-spanner/src/main/java/com/google/cloud/spanner/AsyncTransactionManagerImpl.java

+11
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import com.google.cloud.spanner.SessionImpl.SessionTransaction;
2525
import com.google.cloud.spanner.TransactionContextFutureImpl.CommittableAsyncTransactionManager;
2626
import com.google.cloud.spanner.TransactionManager.TransactionState;
27+
import com.google.common.base.MoreObjects;
2728
import com.google.common.base.Preconditions;
2829
import com.google.common.util.concurrent.MoreExecutors;
2930
import io.opencensus.trace.Span;
@@ -54,7 +55,17 @@ public void setSpan(Span span) {
5455

5556
@Override
5657
public void close() {
58+
closeAsync();
59+
}
60+
61+
@Override
62+
public ApiFuture<Void> closeAsync() {
63+
ApiFuture<Void> res = null;
64+
if (txnState == TransactionState.STARTED) {
65+
res = rollbackAsync();
66+
}
5767
txn.close();
68+
return MoreObjects.firstNonNull(res, ApiFutures.<Void>immediateFuture(null));
5869
}
5970

6071
@Override

google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPoolAsyncTransactionManager.java

+30-4
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222
import com.google.api.core.ApiFutures;
2323
import com.google.api.core.SettableApiFuture;
2424
import com.google.cloud.Timestamp;
25-
import com.google.cloud.spanner.AsyncTransactionManager.TransactionContextFuture;
2625
import com.google.cloud.spanner.SessionPool.PooledSessionFuture;
2726
import com.google.cloud.spanner.TransactionContextFutureImpl.CommittableAsyncTransactionManager;
2827
import com.google.cloud.spanner.TransactionManager.TransactionState;
@@ -59,14 +58,41 @@ public void run() {
5958

6059
@Override
6160
public void close() {
62-
delegate.addListener(
63-
new Runnable() {
61+
SpannerApiFutures.get(closeAsync());
62+
}
63+
64+
@Override
65+
public ApiFuture<Void> closeAsync() {
66+
final SettableApiFuture<Void> res = SettableApiFuture.create();
67+
ApiFutures.addCallback(
68+
delegate,
69+
new ApiFutureCallback<AsyncTransactionManagerImpl>() {
6470
@Override
65-
public void run() {
71+
public void onFailure(Throwable t) {
6672
session.close();
6773
}
74+
75+
@Override
76+
public void onSuccess(AsyncTransactionManagerImpl result) {
77+
ApiFutures.addCallback(
78+
result.closeAsync(),
79+
new ApiFutureCallback<Void>() {
80+
@Override
81+
public void onFailure(Throwable t) {
82+
res.setException(t);
83+
}
84+
85+
@Override
86+
public void onSuccess(Void result) {
87+
session.close();
88+
res.set(result);
89+
}
90+
},
91+
MoreExecutors.directExecutor());
92+
}
6893
},
6994
MoreExecutors.directExecutor());
95+
return res;
7096
}
7197

7298
@Override

google-cloud-spanner/src/test/java/com/google/cloud/spanner/AsyncTransactionManagerTest.java

+28
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,9 @@
3636
import com.google.cloud.spanner.MockSpannerServiceImpl.SimulatedExecutionTime;
3737
import com.google.cloud.spanner.MockSpannerServiceImpl.StatementResult;
3838
import com.google.cloud.spanner.Options.ReadOption;
39+
import com.google.cloud.spanner.TransactionRunnerImpl.TransactionContextImpl;
3940
import com.google.common.base.Function;
41+
import com.google.common.base.Predicate;
4042
import com.google.common.collect.ImmutableList;
4143
import com.google.common.collect.Iterables;
4244
import com.google.common.collect.Range;
@@ -47,6 +49,8 @@
4749
import com.google.spanner.v1.CommitRequest;
4850
import com.google.spanner.v1.ExecuteBatchDmlRequest;
4951
import com.google.spanner.v1.ExecuteSqlRequest;
52+
import com.google.spanner.v1.RollbackRequest;
53+
import com.google.spanner.v1.TransactionSelector;
5054
import io.grpc.Status;
5155
import java.util.Arrays;
5256
import java.util.Collection;
@@ -181,6 +185,30 @@ public void onSuccess(long[] input) {
181185
}
182186
}
183187

188+
@Test
189+
public void asyncTransactionManager_shouldRollbackOnCloseAsync() throws Exception {
190+
AsyncTransactionManager manager = client().transactionManagerAsync();
191+
TransactionContext txn = manager.beginAsync().get();
192+
txn.executeUpdateAsync(UPDATE_STATEMENT).get();
193+
final TransactionSelector selector = ((TransactionContextImpl) txn).getTransactionSelector();
194+
195+
SpannerApiFutures.get(manager.closeAsync());
196+
// The mock server should already have the Rollback request, as we are waiting for the returned
197+
// ApiFuture to be done.
198+
mockSpanner.waitForRequestsToContain(
199+
new Predicate<AbstractMessage>() {
200+
@Override
201+
public boolean apply(AbstractMessage input) {
202+
if (input instanceof RollbackRequest) {
203+
RollbackRequest request = (RollbackRequest) input;
204+
return request.getTransactionId().equals(selector.getId());
205+
}
206+
return false;
207+
}
208+
},
209+
0L);
210+
}
211+
184212
@Test
185213
public void asyncTransactionManagerUpdate() throws Exception {
186214
final SettableApiFuture<Long> updateCount = SettableApiFuture.create();

google-cloud-spanner/src/test/java/com/google/cloud/spanner/MockSpannerServiceImpl.java

+19
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,10 @@
2323
import com.google.cloud.spanner.TransactionRunnerImpl.TransactionContextImpl;
2424
import com.google.common.base.Optional;
2525
import com.google.common.base.Preconditions;
26+
import com.google.common.base.Predicate;
2627
import com.google.common.base.Stopwatch;
2728
import com.google.common.base.Throwables;
29+
import com.google.common.collect.Iterables;
2830
import com.google.common.util.concurrent.Uninterruptibles;
2931
import com.google.protobuf.AbstractMessage;
3032
import com.google.protobuf.ByteString;
@@ -1927,6 +1929,23 @@ public void waitForRequestsToContain(Class<? extends AbstractMessage> type, long
19271929
}
19281930
}
19291931

1932+
public void waitForRequestsToContain(
1933+
Predicate<? super AbstractMessage> predicate, long timeoutMillis)
1934+
throws InterruptedException, TimeoutException {
1935+
Stopwatch watch = Stopwatch.createStarted();
1936+
while (true) {
1937+
Iterable<AbstractMessage> msg = Iterables.filter(getRequests(), predicate);
1938+
if (msg.iterator().hasNext()) {
1939+
break;
1940+
}
1941+
Thread.sleep(10L);
1942+
if (watch.elapsed(TimeUnit.MILLISECONDS) > timeoutMillis) {
1943+
throw new TimeoutException(
1944+
"Timeout while waiting for requests to contain the wanted request");
1945+
}
1946+
}
1947+
}
1948+
19301949
@Override
19311950
public void addResponse(AbstractMessage response) {
19321951
throw new UnsupportedOperationException();

0 commit comments

Comments
 (0)