Skip to content

Commit 3dd0675

Browse files
authored
feat!: async connection API (#392)
* feat: support setting timeout per RPC The Spanner client allows a user to set custom timeouts while creating a SpannerOptions instance, but these timeouts are static and are applied to all invocations of the RPCs. This change introduces the possibility to set custom timeouts and other call options on a per-RPC basis. Fixes #378 * fix: change grpc deps from test to compile scope * feat: add async api for connection * fix: fix test failures * fix: move state handling from callback to callable * fix: fix integration tests with emulator * fix: fix timeout integration test on emulator * fix: prevent flakiness in DDL tests * fix: fix clirr build failures * fix: do not set transaction state for Aborted err * fix: set transaction state after retry * cleanup: remove sync methods and use async instead * cleanup: remove unused code * feat: make ddl async * fix: reduce timeout and remove debug info * feat: make runBatch async * test: set forkCount to 1 to investigate test failure * fix: linting + clirr * fix: prevent deadlock in DmlBatch * fix: fix DMLBatch state handling * tests: add tests for aborted async transactions * test: add aborted tests * fix: add change to clirr + more tests * fix: require a rollback after a tx has aborted * docs: add javadoc for new methods * tests: add integration tests * fix: wait for commit before select * fix: fix handling aborted commit * docs: document behavior -Async methods * fix: iterating without callback could cause exception * fix: remove todos and commented code * feat: keep track of caller to include in stacktrace * docs: explain why Aborted is active * fix: use ticker for better testability * test: increase coverage and remove unused code * test: add additional tests * docs: add missing @OverRide * docs: fix comment
1 parent 691a23c commit 3dd0675

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

46 files changed

+5690
-1762
lines changed

google-cloud-spanner/clirr-ignored-differences.xml

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -319,4 +319,56 @@
319319
<className>com/google/cloud/spanner/Value</className>
320320
<method>java.util.List getNumericArray()</method>
321321
</difference>
322+
323+
<!-- Async Connection API -->
324+
<difference>
325+
<differenceType>7012</differenceType>
326+
<className>com/google/cloud/spanner/connection/Connection</className>
327+
<method>com.google.api.core.ApiFuture beginTransactionAsync()</method>
328+
</difference>
329+
<difference>
330+
<differenceType>7012</differenceType>
331+
<className>com/google/cloud/spanner/connection/Connection</className>
332+
<method>com.google.api.core.ApiFuture commitAsync()</method>
333+
</difference>
334+
<difference>
335+
<differenceType>7012</differenceType>
336+
<className>com/google/cloud/spanner/connection/Connection</className>
337+
<method>com.google.cloud.spanner.connection.AsyncStatementResult executeAsync(com.google.cloud.spanner.Statement)</method>
338+
</difference>
339+
<difference>
340+
<differenceType>7012</differenceType>
341+
<className>com/google/cloud/spanner/connection/Connection</className>
342+
<method>com.google.api.core.ApiFuture executeBatchUpdateAsync(java.lang.Iterable)</method>
343+
</difference>
344+
<difference>
345+
<differenceType>7012</differenceType>
346+
<className>com/google/cloud/spanner/connection/Connection</className>
347+
<method>com.google.api.core.ApiFuture executeUpdateAsync(com.google.cloud.spanner.Statement)</method>
348+
</difference>
349+
<difference>
350+
<differenceType>7012</differenceType>
351+
<className>com/google/cloud/spanner/connection/Connection</className>
352+
<method>com.google.api.core.ApiFuture rollbackAsync()</method>
353+
</difference>
354+
<difference>
355+
<differenceType>7012</differenceType>
356+
<className>com/google/cloud/spanner/connection/Connection</className>
357+
<method>com.google.api.core.ApiFuture runBatchAsync()</method>
358+
</difference>
359+
<difference>
360+
<differenceType>7012</differenceType>
361+
<className>com/google/cloud/spanner/connection/Connection</className>
362+
<method>com.google.api.core.ApiFuture writeAsync(com.google.cloud.spanner.Mutation)</method>
363+
</difference>
364+
<difference>
365+
<differenceType>7012</differenceType>
366+
<className>com/google/cloud/spanner/connection/Connection</className>
367+
<method>com.google.api.core.ApiFuture writeAsync(java.lang.Iterable)</method>
368+
</difference>
369+
<difference>
370+
<differenceType>7004</differenceType>
371+
<className>com/google/cloud/spanner/ResultSets</className>
372+
<method>com.google.cloud.spanner.AsyncResultSet toAsyncResultSet(com.google.cloud.spanner.ResultSet, com.google.api.gax.core.ExecutorProvider)</method>
373+
</difference>
322374
</differences>

google-cloud-spanner/src/main/java/com/google/cloud/spanner/AsyncResultSetImpl.java

Lines changed: 22 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@
2525
import com.google.cloud.spanner.AbstractReadContext.ListenableAsyncResultSet;
2626
import com.google.common.base.Function;
2727
import com.google.common.base.Preconditions;
28+
import com.google.common.base.Supplier;
29+
import com.google.common.base.Suppliers;
2830
import com.google.common.collect.ImmutableList;
2931
import com.google.common.util.concurrent.ListeningScheduledExecutorService;
3032
import com.google.common.util.concurrent.MoreExecutors;
@@ -88,8 +90,8 @@ private State(boolean shouldStop) {
8890

8991
private final BlockingDeque<Struct> buffer;
9092
private Struct currentRow;
91-
/** The underlying synchronous {@link ResultSet} that is producing the rows. */
92-
private final ResultSet delegateResultSet;
93+
/** Supplies the underlying synchronous {@link ResultSet} that will be producing the rows. */
94+
private final Supplier<ResultSet> delegateResultSet;
9395

9496
/**
9597
* Any exception that occurs while executing the query and iterating over the result set will be
@@ -144,6 +146,11 @@ private State(boolean shouldStop) {
144146
private volatile CountDownLatch consumingLatch = new CountDownLatch(0);
145147

146148
AsyncResultSetImpl(ExecutorProvider executorProvider, ResultSet delegate, int bufferSize) {
149+
this(executorProvider, Suppliers.ofInstance(Preconditions.checkNotNull(delegate)), bufferSize);
150+
}
151+
152+
AsyncResultSetImpl(
153+
ExecutorProvider executorProvider, Supplier<ResultSet> delegate, int bufferSize) {
147154
super(delegate);
148155
this.executorProvider = Preconditions.checkNotNull(executorProvider);
149156
this.delegateResultSet = Preconditions.checkNotNull(delegate);
@@ -165,7 +172,7 @@ public void close() {
165172
return;
166173
}
167174
if (state == State.INITIALIZED || state == State.SYNC) {
168-
delegateResultSet.close();
175+
delegateResultSet.get().close();
169176
}
170177
this.closed = true;
171178
}
@@ -228,7 +235,7 @@ public CursorState tryNext() throws SpannerException {
228235

229236
private void closeDelegateResultSet() {
230237
try {
231-
delegateResultSet.close();
238+
delegateResultSet.get().close();
232239
} catch (Throwable t) {
233240
log.log(Level.FINE, "Ignoring error from closing delegate result set", t);
234241
}
@@ -261,7 +268,7 @@ public void run() {
261268
// we'll keep the cancelled state.
262269
return;
263270
}
264-
executionException = SpannerExceptionFactory.newSpannerException(e);
271+
executionException = SpannerExceptionFactory.asSpannerException(e);
265272
cursorReturnedDoneOrException = true;
266273
}
267274
return;
@@ -325,10 +332,10 @@ public Void call() throws Exception {
325332
boolean stop = false;
326333
boolean hasNext = false;
327334
try {
328-
hasNext = delegateResultSet.next();
335+
hasNext = delegateResultSet.get().next();
329336
} catch (Throwable e) {
330337
synchronized (monitor) {
331-
executionException = SpannerExceptionFactory.newSpannerException(e);
338+
executionException = SpannerExceptionFactory.asSpannerException(e);
332339
}
333340
}
334341
try {
@@ -357,13 +364,13 @@ public Void call() throws Exception {
357364
}
358365
}
359366
if (!stop) {
360-
buffer.put(delegateResultSet.getCurrentRowAsStruct());
367+
buffer.put(delegateResultSet.get().getCurrentRowAsStruct());
361368
startCallbackIfNecessary();
362-
hasNext = delegateResultSet.next();
369+
hasNext = delegateResultSet.get().next();
363370
}
364371
} catch (Throwable e) {
365372
synchronized (monitor) {
366-
executionException = SpannerExceptionFactory.newSpannerException(e);
373+
executionException = SpannerExceptionFactory.asSpannerException(e);
367374
stop = true;
368375
}
369376
}
@@ -544,9 +551,9 @@ public <T> List<T> toList(Function<StructReader, T> transformer) throws SpannerE
544551
try {
545552
return future.get();
546553
} catch (ExecutionException e) {
547-
throw SpannerExceptionFactory.newSpannerException(e.getCause());
554+
throw SpannerExceptionFactory.asSpannerException(e.getCause());
548555
} catch (Throwable e) {
549-
throw SpannerExceptionFactory.newSpannerException(e);
556+
throw SpannerExceptionFactory.asSpannerException(e);
550557
}
551558
}
552559

@@ -558,14 +565,14 @@ public boolean next() throws SpannerException {
558565
"Cannot call next() on a result set with a callback.");
559566
this.state = State.SYNC;
560567
}
561-
boolean res = delegateResultSet.next();
562-
currentRow = res ? delegateResultSet.getCurrentRowAsStruct() : null;
568+
boolean res = delegateResultSet.get().next();
569+
currentRow = res ? delegateResultSet.get().getCurrentRowAsStruct() : null;
563570
return res;
564571
}
565572

566573
@Override
567574
public ResultSetStats getStats() {
568-
return delegateResultSet.getStats();
575+
return delegateResultSet.get().getStats();
569576
}
570577

571578
@Override

google-cloud-spanner/src/main/java/com/google/cloud/spanner/ErrorCode.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,7 @@ static ErrorCode valueOf(String name, ErrorCode defaultValue) {
8989
/**
9090
* Returns the error code corresponding to a gRPC status, or {@code UNKNOWN} if not recognized.
9191
*/
92-
static ErrorCode fromGrpcStatus(Status status) {
92+
public static ErrorCode fromGrpcStatus(Status status) {
9393
ErrorCode code = errorByRpcCode.get(status.getCode().value());
9494
return code == null ? UNKNOWN : code;
9595
}

google-cloud-spanner/src/main/java/com/google/cloud/spanner/ResultSets.java

Lines changed: 38 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,14 +16,17 @@
1616

1717
package com.google.cloud.spanner;
1818

19+
import com.google.api.core.ApiFuture;
1920
import com.google.api.gax.core.ExecutorProvider;
2021
import com.google.api.gax.core.InstantiatingExecutorProvider;
2122
import com.google.cloud.ByteArray;
2223
import com.google.cloud.Date;
2324
import com.google.cloud.Timestamp;
25+
import com.google.cloud.spanner.Options.QueryOption;
2426
import com.google.cloud.spanner.Type.Code;
2527
import com.google.cloud.spanner.Type.StructField;
2628
import com.google.common.base.Preconditions;
29+
import com.google.common.base.Supplier;
2730
import com.google.common.collect.Lists;
2831
import com.google.common.util.concurrent.ThreadFactoryBuilder;
2932
import com.google.spanner.v1.ResultSetStats;
@@ -65,8 +68,41 @@ public static AsyncResultSet toAsyncResultSet(ResultSet delegate) {
6568
* ExecutorProvider}.
6669
*/
6770
public static AsyncResultSet toAsyncResultSet(
68-
ResultSet delegate, ExecutorProvider executorProvider) {
69-
return new AsyncResultSetImpl(executorProvider, delegate, 100);
71+
ResultSet delegate, ExecutorProvider executorProvider, QueryOption... options) {
72+
Options readOptions = Options.fromQueryOptions(options);
73+
final int bufferRows =
74+
readOptions.hasBufferRows()
75+
? readOptions.bufferRows()
76+
: AsyncResultSetImpl.DEFAULT_BUFFER_SIZE;
77+
return new AsyncResultSetImpl(executorProvider, delegate, bufferRows);
78+
}
79+
80+
/**
81+
* Converts the {@link ResultSet} that will be returned by the given {@link ApiFuture} to an
82+
* {@link AsyncResultSet} using the given {@link ExecutorProvider}.
83+
*/
84+
public static AsyncResultSet toAsyncResultSet(
85+
ApiFuture<ResultSet> delegate, ExecutorProvider executorProvider, QueryOption... options) {
86+
Options readOptions = Options.fromQueryOptions(options);
87+
final int bufferRows =
88+
readOptions.hasBufferRows()
89+
? readOptions.bufferRows()
90+
: AsyncResultSetImpl.DEFAULT_BUFFER_SIZE;
91+
return new AsyncResultSetImpl(
92+
executorProvider, new FutureResultSetSupplier(delegate), bufferRows);
93+
}
94+
95+
private static class FutureResultSetSupplier implements Supplier<ResultSet> {
96+
final ApiFuture<ResultSet> delegate;
97+
98+
FutureResultSetSupplier(ApiFuture<ResultSet> delegate) {
99+
this.delegate = Preconditions.checkNotNull(delegate);
100+
}
101+
102+
@Override
103+
public ResultSet get() {
104+
return SpannerApiFutures.get(delegate);
105+
}
70106
}
71107

72108
private static class PrePopulatedResultSet implements ResultSet {
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
/*
2+
* Copyright 2020 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.google.cloud.spanner;
18+
19+
import com.google.api.core.ApiFuture;
20+
import com.google.common.base.Preconditions;
21+
import java.util.concurrent.CancellationException;
22+
import java.util.concurrent.ExecutionException;
23+
24+
public class SpannerApiFutures {
25+
public static <T> T get(ApiFuture<T> future) throws SpannerException {
26+
return getOrNull(Preconditions.checkNotNull(future));
27+
}
28+
29+
public static <T> T getOrNull(ApiFuture<T> future) throws SpannerException {
30+
try {
31+
return future == null ? null : future.get();
32+
} catch (ExecutionException e) {
33+
if (e.getCause() instanceof SpannerException) {
34+
throw (SpannerException) e.getCause();
35+
}
36+
throw SpannerExceptionFactory.newSpannerException(e.getCause());
37+
} catch (InterruptedException e) {
38+
throw SpannerExceptionFactory.propagateInterrupt(e);
39+
} catch (CancellationException e) {
40+
throw SpannerExceptionFactory.newSpannerExceptionForCancellation(null, e);
41+
}
42+
}
43+
}

google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerExceptionFactory.java

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,18 @@ public static SpannerException propagateTimeout(TimeoutException e) {
8383
ErrorCode.DEADLINE_EXCEEDED, "Operation did not complete in the given time", e);
8484
}
8585

86+
/**
87+
* Converts the given {@link Throwable} to a {@link SpannerException}. If <code>t</code> is
88+
* already a (subclass of a) {@link SpannerException}, <code>t</code> is returned unaltered.
89+
* Otherwise, a new {@link SpannerException} is created with <code>t</code> as its cause.
90+
*/
91+
public static SpannerException asSpannerException(Throwable t) {
92+
if (t instanceof SpannerException) {
93+
return (SpannerException) t;
94+
}
95+
return newSpannerException(t);
96+
}
97+
8698
/**
8799
* Creates a new exception based on {@code cause}.
88100
*
@@ -126,6 +138,20 @@ public static SpannerBatchUpdateException newSpannerBatchUpdateException(
126138
databaseError);
127139
}
128140

141+
/**
142+
* Constructs a new {@link AbortedDueToConcurrentModificationException} that can be re-thrown for
143+
* a transaction that had already been aborted, but that the client application tried to use for
144+
* additional statements.
145+
*/
146+
public static AbortedDueToConcurrentModificationException
147+
newAbortedDueToConcurrentModificationException(
148+
AbortedDueToConcurrentModificationException cause) {
149+
return new AbortedDueToConcurrentModificationException(
150+
DoNotConstructDirectly.ALLOWED,
151+
"This transaction has already been aborted and could not be retried due to a concurrent modification. Rollback this transaction to start a new one.",
152+
cause);
153+
}
154+
129155
/**
130156
* Creates a new exception based on {@code cause}. If {@code cause} indicates cancellation, {@code
131157
* context} will be inspected to establish the type of cancellation.

google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionRunnerImpl.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -150,7 +150,7 @@ public void removeListener(Runnable listener) {
150150
@GuardedBy("lock")
151151
private long retryDelayInMillis = -1L;
152152

153-
private ByteString transactionId;
153+
private volatile ByteString transactionId;
154154
private Timestamp commitTimestamp;
155155

156156
private TransactionContextImpl(Builder builder) {
@@ -238,12 +238,17 @@ void commit() {
238238
try {
239239
commitTimestamp = commitAsync().get();
240240
} catch (InterruptedException e) {
241+
if (commitFuture != null) {
242+
commitFuture.cancel(true);
243+
}
241244
throw SpannerExceptionFactory.propagateInterrupt(e);
242245
} catch (ExecutionException e) {
243246
throw SpannerExceptionFactory.newSpannerException(e.getCause() == null ? e : e.getCause());
244247
}
245248
}
246249

250+
volatile ApiFuture<CommitResponse> commitFuture;
251+
247252
ApiFuture<Timestamp> commitAsync() {
248253
final SettableApiFuture<Timestamp> res = SettableApiFuture.create();
249254
final SettableApiFuture<Void> latch;
@@ -273,8 +278,7 @@ public void run() {
273278
span.addAnnotation("Starting Commit");
274279
final Span opSpan =
275280
tracer.spanBuilderWithExplicitParent(SpannerImpl.COMMIT, span).startSpan();
276-
final ApiFuture<CommitResponse> commitFuture =
277-
rpc.commitAsync(commitRequest, session.getOptions());
281+
commitFuture = rpc.commitAsync(commitRequest, session.getOptions());
278282
commitFuture.addListener(
279283
tracer.withSpan(
280284
opSpan,

0 commit comments

Comments
 (0)