Skip to content

Commit df47c13

Browse files
authored
perf: use streaming RPC for PDML (#287)
* perf: use streaming RPC for PDML * fix: reset resume token for each tx * cleanup: remove test code * fix: retry depening on resume token * fix: remove unused attempt param * fix: fix check for resume token * fix: keep track of total timeout * fix: clirr build failure * cleanup: add comments + remove unused code * tests: add missing exec time * chore: run formatter * chore: remove unnecessary null check * tests: add missing exec time
1 parent 9483925 commit df47c13

File tree

8 files changed

+177
-43
lines changed

8 files changed

+177
-43
lines changed

google-cloud-spanner/clirr-ignored-differences.xml

+6
Original file line numberDiff line numberDiff line change
@@ -176,4 +176,10 @@
176176
<method>com.google.api.gax.retrying.RetrySettings getPartitionedDmlRetrySettings()</method>
177177
</difference>
178178

179+
<!-- Streaming PDML -->
180+
<difference>
181+
<differenceType>7012</differenceType>
182+
<className>com/google/cloud/spanner/spi/v1/SpannerRpc</className>
183+
<method>com.google.api.gax.rpc.ServerStream executeStreamingPartitionedDml(com.google.spanner.v1.ExecuteSqlRequest, java.util.Map, org.threeten.bp.Duration)</method>
184+
</difference>
179185
</differences>

google-cloud-spanner/src/main/java/com/google/cloud/spanner/PartitionedDMLTransaction.java

+89-30
Original file line numberDiff line numberDiff line change
@@ -18,20 +18,32 @@
1818

1919
import static com.google.common.base.Preconditions.checkState;
2020

21+
import com.google.api.gax.grpc.GrpcStatusCode;
22+
import com.google.api.gax.rpc.ServerStream;
23+
import com.google.api.gax.rpc.UnavailableException;
2124
import com.google.cloud.spanner.SessionImpl.SessionTransaction;
2225
import com.google.cloud.spanner.spi.v1.SpannerRpc;
26+
import com.google.common.base.Stopwatch;
2327
import com.google.protobuf.ByteString;
2428
import com.google.spanner.v1.BeginTransactionRequest;
2529
import com.google.spanner.v1.ExecuteSqlRequest;
2630
import com.google.spanner.v1.ExecuteSqlRequest.QueryMode;
31+
import com.google.spanner.v1.PartialResultSet;
2732
import com.google.spanner.v1.Transaction;
2833
import com.google.spanner.v1.TransactionOptions;
2934
import com.google.spanner.v1.TransactionSelector;
35+
import io.grpc.Status.Code;
3036
import java.util.Map;
31-
import java.util.concurrent.Callable;
37+
import java.util.concurrent.TimeUnit;
38+
import java.util.logging.Level;
39+
import java.util.logging.Logger;
40+
import org.threeten.bp.Duration;
41+
import org.threeten.bp.temporal.ChronoUnit;
3242

3343
/** Partitioned DML transaction for bulk updates and deletes. */
3444
class PartitionedDMLTransaction implements SessionTransaction {
45+
private static final Logger log = Logger.getLogger(PartitionedDMLTransaction.class.getName());
46+
3547
private final SessionImpl session;
3648
private final SpannerRpc rpc;
3749
private volatile boolean isValid = true;
@@ -60,41 +72,88 @@ private ByteString initTransaction() {
6072

6173
/**
6274
* Executes the {@link Statement} using a partitioned dml transaction with automatic retry if the
63-
* transaction was aborted.
75+
* transaction was aborted. The update method uses the ExecuteStreamingSql RPC to execute the
76+
* statement, and will retry the stream if an {@link UnavailableException} is thrown, using the
77+
* last seen resume token if the server returns any.
6478
*/
65-
long executePartitionedUpdate(final Statement statement) {
79+
long executeStreamingPartitionedUpdate(final Statement statement, Duration timeout) {
6680
checkState(isValid, "Partitioned DML has been invalidated by a new operation on the session");
67-
Callable<com.google.spanner.v1.ResultSet> callable =
68-
new Callable<com.google.spanner.v1.ResultSet>() {
69-
@Override
70-
public com.google.spanner.v1.ResultSet call() throws Exception {
71-
ByteString transactionId = initTransaction();
72-
final ExecuteSqlRequest.Builder builder =
73-
ExecuteSqlRequest.newBuilder()
74-
.setSql(statement.getSql())
75-
.setQueryMode(QueryMode.NORMAL)
76-
.setSession(session.getName())
77-
.setTransaction(TransactionSelector.newBuilder().setId(transactionId).build());
78-
Map<String, Value> stmtParameters = statement.getParameters();
79-
if (!stmtParameters.isEmpty()) {
80-
com.google.protobuf.Struct.Builder paramsBuilder = builder.getParamsBuilder();
81-
for (Map.Entry<String, Value> param : stmtParameters.entrySet()) {
82-
paramsBuilder.putFields(param.getKey(), param.getValue().toProto());
83-
builder.putParamTypes(param.getKey(), param.getValue().getType().toProto());
81+
log.log(Level.FINER, "Starting PartitionedUpdate statement");
82+
boolean foundStats = false;
83+
long updateCount = 0L;
84+
Duration remainingTimeout = timeout;
85+
Stopwatch stopWatch = Stopwatch.createStarted();
86+
try {
87+
// Loop to catch AbortedExceptions.
88+
while (true) {
89+
ByteString resumeToken = ByteString.EMPTY;
90+
try {
91+
ByteString transactionId = initTransaction();
92+
final ExecuteSqlRequest.Builder builder =
93+
ExecuteSqlRequest.newBuilder()
94+
.setSql(statement.getSql())
95+
.setQueryMode(QueryMode.NORMAL)
96+
.setSession(session.getName())
97+
.setTransaction(TransactionSelector.newBuilder().setId(transactionId).build());
98+
Map<String, Value> stmtParameters = statement.getParameters();
99+
if (!stmtParameters.isEmpty()) {
100+
com.google.protobuf.Struct.Builder paramsBuilder = builder.getParamsBuilder();
101+
for (Map.Entry<String, Value> param : stmtParameters.entrySet()) {
102+
paramsBuilder.putFields(param.getKey(), param.getValue().toProto());
103+
builder.putParamTypes(param.getKey(), param.getValue().getType().toProto());
104+
}
105+
}
106+
while (true) {
107+
remainingTimeout =
108+
remainingTimeout.minus(stopWatch.elapsed(TimeUnit.MILLISECONDS), ChronoUnit.MILLIS);
109+
try {
110+
builder.setResumeToken(resumeToken);
111+
ServerStream<PartialResultSet> stream =
112+
rpc.executeStreamingPartitionedDml(
113+
builder.build(), session.getOptions(), remainingTimeout);
114+
for (PartialResultSet rs : stream) {
115+
if (rs.getResumeToken() != null && !ByteString.EMPTY.equals(rs.getResumeToken())) {
116+
resumeToken = rs.getResumeToken();
117+
}
118+
if (rs.hasStats()) {
119+
foundStats = true;
120+
updateCount += rs.getStats().getRowCountLowerBound();
121+
}
122+
}
123+
break;
124+
} catch (UnavailableException e) {
125+
// Retry the stream in the same transaction if the stream breaks with
126+
// UnavailableException and we have a resume token. Otherwise, we just retry the
127+
// entire transaction.
128+
if (!ByteString.EMPTY.equals(resumeToken)) {
129+
log.log(
130+
Level.FINER,
131+
"Retrying PartitionedDml stream using resume token '"
132+
+ resumeToken.toStringUtf8()
133+
+ "' because of broken stream",
134+
e);
135+
} else {
136+
throw new com.google.api.gax.rpc.AbortedException(
137+
e, GrpcStatusCode.of(Code.ABORTED), true);
84138
}
85139
}
86-
return rpc.executePartitionedDml(builder.build(), session.getOptions());
87140
}
88-
};
89-
com.google.spanner.v1.ResultSet resultSet =
90-
SpannerRetryHelper.runTxWithRetriesOnAborted(
91-
callable, rpc.getPartitionedDmlRetrySettings());
92-
if (!resultSet.hasStats()) {
93-
throw new IllegalArgumentException(
94-
"Partitioned DML response missing stats possibly due to non-DML statement as input");
141+
break;
142+
} catch (com.google.api.gax.rpc.AbortedException e) {
143+
// Retry using a new transaction but with the same session if the transaction is aborted.
144+
log.log(Level.FINER, "Retrying PartitionedDml transaction after AbortedException", e);
145+
}
146+
}
147+
if (!foundStats) {
148+
throw SpannerExceptionFactory.newSpannerException(
149+
ErrorCode.INVALID_ARGUMENT,
150+
"Partitioned DML response missing stats possibly due to non-DML statement as input");
151+
}
152+
log.log(Level.FINER, "Finished PartitionedUpdate statement");
153+
return updateCount;
154+
} catch (Exception e) {
155+
throw SpannerExceptionFactory.newSpannerException(e);
95156
}
96-
// For partitioned DML, using the row count lower bound.
97-
return resultSet.getStats().getRowCountLowerBound();
98157
}
99158

100159
@Override

google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionImpl.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,8 @@ public String getName() {
105105
public long executePartitionedUpdate(Statement stmt) {
106106
setActive(null);
107107
PartitionedDMLTransaction txn = new PartitionedDMLTransaction(this, spanner.getRpc());
108-
return txn.executePartitionedUpdate(stmt);
108+
return txn.executeStreamingPartitionedUpdate(
109+
stmt, spanner.getOptions().getPartitionedDmlTimeout());
109110
}
110111

111112
@Override

google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerExceptionFactory.java

+9-1
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
import com.google.api.gax.grpc.GrpcStatusCode;
2020
import com.google.api.gax.rpc.ApiException;
21+
import com.google.api.gax.rpc.WatchdogTimeoutException;
2122
import com.google.cloud.spanner.SpannerException.DoNotConstructDirectly;
2223
import com.google.common.base.MoreObjects;
2324
import com.google.common.base.Predicate;
@@ -212,7 +213,14 @@ private static SpannerException newSpannerExceptionPreformatted(
212213
}
213214

214215
private static SpannerException fromApiException(ApiException exception) {
215-
Status.Code code = ((GrpcStatusCode) exception.getStatusCode()).getTransportCode();
216+
Status.Code code;
217+
if (exception.getStatusCode() instanceof GrpcStatusCode) {
218+
code = ((GrpcStatusCode) exception.getStatusCode()).getTransportCode();
219+
} else if (exception instanceof WatchdogTimeoutException) {
220+
code = Status.Code.DEADLINE_EXCEEDED;
221+
} else {
222+
code = Status.Code.UNKNOWN;
223+
}
216224
ErrorCode errorCode = ErrorCode.fromGrpcStatus(Status.fromCode(code));
217225
if (exception.getCause() != null) {
218226
return SpannerExceptionFactory.newSpannerException(

google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java

+25
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
import com.google.api.gax.rpc.InstantiatingWatchdogProvider;
3939
import com.google.api.gax.rpc.OperationCallable;
4040
import com.google.api.gax.rpc.ResponseObserver;
41+
import com.google.api.gax.rpc.ServerStream;
4142
import com.google.api.gax.rpc.StatusCode;
4243
import com.google.api.gax.rpc.StreamController;
4344
import com.google.api.gax.rpc.TransportChannelProvider;
@@ -54,6 +55,7 @@
5455
import com.google.cloud.spanner.admin.database.v1.stub.GrpcDatabaseAdminStub;
5556
import com.google.cloud.spanner.admin.instance.v1.stub.GrpcInstanceAdminStub;
5657
import com.google.cloud.spanner.admin.instance.v1.stub.InstanceAdminStub;
58+
import com.google.cloud.spanner.spi.v1.SpannerRpc.Option;
5759
import com.google.cloud.spanner.v1.stub.GrpcSpannerStub;
5860
import com.google.cloud.spanner.v1.stub.SpannerStub;
5961
import com.google.cloud.spanner.v1.stub.SpannerStubSettings;
@@ -359,6 +361,21 @@ public GapicSpannerRpc(final SpannerOptions options) {
359361
.setStreamWatchdogProvider(watchdogProvider)
360362
.executeSqlSettings()
361363
.setRetrySettings(partitionedDmlRetrySettings);
364+
// The stream watchdog will by default only check for a timeout every 10 seconds, so if the
365+
// timeout is less than 10 seconds, it would be ignored for the first 10 seconds unless we
366+
// also change the StreamWatchdogCheckInterval.
367+
if (options
368+
.getPartitionedDmlTimeout()
369+
.dividedBy(10L)
370+
.compareTo(pdmlSettings.getStreamWatchdogCheckInterval())
371+
< 0) {
372+
pdmlSettings.setStreamWatchdogCheckInterval(
373+
options.getPartitionedDmlTimeout().dividedBy(10));
374+
pdmlSettings.setStreamWatchdogProvider(
375+
pdmlSettings
376+
.getStreamWatchdogProvider()
377+
.withCheckInterval(pdmlSettings.getStreamWatchdogCheckInterval()));
378+
}
362379
this.partitionedDmlStub = GrpcSpannerStub.create(pdmlSettings.build());
363380

364381
this.instanceAdminStub =
@@ -1073,6 +1090,14 @@ public RetrySettings getPartitionedDmlRetrySettings() {
10731090
return partitionedDmlRetrySettings;
10741091
}
10751092

1093+
@Override
1094+
public ServerStream<PartialResultSet> executeStreamingPartitionedDml(
1095+
ExecuteSqlRequest request, Map<Option, ?> options, Duration timeout) {
1096+
GrpcCallContext context = newCallContext(options, request.getSession());
1097+
context = context.withStreamWaitTimeout(timeout);
1098+
return partitionedDmlStub.executeStreamingSqlCallable().call(request, context);
1099+
}
1100+
10761101
@Override
10771102
public StreamingCall executeQuery(
10781103
ExecuteSqlRequest request, ResultStreamConsumer consumer, @Nullable Map<Option, ?> options) {

google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/SpannerRpc.java

+5
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import com.google.api.core.InternalApi;
2121
import com.google.api.gax.longrunning.OperationFuture;
2222
import com.google.api.gax.retrying.RetrySettings;
23+
import com.google.api.gax.rpc.ServerStream;
2324
import com.google.cloud.ServiceRpc;
2425
import com.google.cloud.spanner.SpannerException;
2526
import com.google.cloud.spanner.admin.database.v1.stub.DatabaseAdminStub;
@@ -58,6 +59,7 @@
5859
import java.util.List;
5960
import java.util.Map;
6061
import javax.annotation.Nullable;
62+
import org.threeten.bp.Duration;
6163

6264
/**
6365
* Abstracts remote calls to the Cloud Spanner service. Typically end-consumer code will never use
@@ -286,6 +288,9 @@ StreamingCall read(
286288

287289
RetrySettings getPartitionedDmlRetrySettings();
288290

291+
ServerStream<PartialResultSet> executeStreamingPartitionedDml(
292+
ExecuteSqlRequest request, @Nullable Map<Option, ?> options, Duration timeout);
293+
289294
StreamingCall executeQuery(
290295
ExecuteSqlRequest request, ResultStreamConsumer consumer, @Nullable Map<Option, ?> options);
291296

google-cloud-spanner/src/test/java/com/google/cloud/spanner/DatabaseClientImplTest.java

+10-6
Original file line numberDiff line numberDiff line change
@@ -169,7 +169,7 @@ public void testExecutePartitionedDmlAborted() {
169169
* A valid query that returns a {@link ResultSet} should not be accepted by a partitioned dml
170170
* transaction.
171171
*/
172-
@Test(expected = IllegalArgumentException.class)
172+
@Test(expected = SpannerException.class)
173173
public void testExecutePartitionedDmlWithQuery() {
174174
DatabaseClient client =
175175
spanner.getDatabaseClient(DatabaseId.of(TEST_PROJECT, TEST_INSTANCE, TEST_DATABASE));
@@ -234,20 +234,22 @@ public Void run(TransactionContext transaction) {
234234

235235
@Test
236236
public void testPartitionedDmlWithLowerTimeout() {
237-
mockSpanner.setExecuteSqlExecutionTime(SimulatedExecutionTime.ofMinimumAndRandomTime(1000, 0));
237+
mockSpanner.setExecuteStreamingSqlExecutionTime(
238+
SimulatedExecutionTime.ofMinimumAndRandomTime(1000, 0));
238239
SpannerOptions.Builder builder =
239240
SpannerOptions.newBuilder()
240241
.setProjectId(TEST_PROJECT)
241242
.setChannelProvider(channelProvider)
242243
.setCredentials(NoCredentials.getInstance());
243244
// Set PDML timeout value.
244-
builder.setPartitionedDmlTimeout(Duration.ofMillis(100L));
245+
builder.setPartitionedDmlTimeout(Duration.ofMillis(10L));
245246
try (Spanner spanner = builder.build().getService()) {
246247
DatabaseClient client =
247248
spanner.getDatabaseClient(DatabaseId.of(TEST_PROJECT, TEST_INSTANCE, TEST_DATABASE));
248-
assertThat(spanner.getOptions().getPartitionedDmlTimeout())
249-
.isEqualTo(Duration.ofMillis(100L));
249+
assertThat(spanner.getOptions().getPartitionedDmlTimeout()).isEqualTo(Duration.ofMillis(10L));
250250
// PDML should timeout with these settings.
251+
mockSpanner.setExecuteSqlExecutionTime(
252+
SimulatedExecutionTime.ofMinimumAndRandomTime(1000, 0));
251253
try {
252254
client.executePartitionedUpdate(UPDATE_STATEMENT);
253255
fail("expected DEADLINE_EXCEEDED");
@@ -275,7 +277,8 @@ public Long run(TransactionContext transaction) {
275277

276278
@Test
277279
public void testPartitionedDmlWithHigherTimeout() {
278-
mockSpanner.setExecuteSqlExecutionTime(SimulatedExecutionTime.ofMinimumAndRandomTime(100, 0));
280+
mockSpanner.setExecuteStreamingSqlExecutionTime(
281+
SimulatedExecutionTime.ofMinimumAndRandomTime(100, 0));
279282
SpannerOptions.Builder builder =
280283
SpannerOptions.newBuilder()
281284
.setProjectId(TEST_PROJECT)
@@ -307,6 +310,7 @@ public void testPartitionedDmlWithHigherTimeout() {
307310
long updateCount = client.executePartitionedUpdate(UPDATE_STATEMENT);
308311

309312
// Normal DML should timeout as it should use the ExecuteSQL RPC settings.
313+
mockSpanner.setExecuteSqlExecutionTime(SimulatedExecutionTime.ofMinimumAndRandomTime(100, 0));
310314
try {
311315
client
312316
.readWriteTransaction()

0 commit comments

Comments
 (0)