diff --git a/CHANGELOG.md b/CHANGELOG.md
index c389ceb017e..d4aecbac2d2 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,5 +1,27 @@
# Changelog
+## [6.81.1](https://github.com/googleapis/java-spanner/compare/v6.81.0...v6.81.1) (2024-11-11)
+
+
+### Bug Fixes
+
+* Client built in metrics. Skip export if instance id is null ([#3447](https://github.com/googleapis/java-spanner/issues/3447)) ([8b2e5ef](https://github.com/googleapis/java-spanner/commit/8b2e5ef5bb391e5a4d4df3cb45d6a3f722a8cfbe))
+* **spanner:** Avoid blocking thread in AsyncResultSet ([#3446](https://github.com/googleapis/java-spanner/issues/3446)) ([7c82f1c](https://github.com/googleapis/java-spanner/commit/7c82f1c7823d4d529a70c0da231d2593f00b638b))
+
+
+### Dependencies
+
+* Update dependency com.google.api.grpc:proto-google-cloud-monitoring-v3 to v3.54.0 ([#3437](https://github.com/googleapis/java-spanner/issues/3437)) ([7e28326](https://github.com/googleapis/java-spanner/commit/7e283261961d6435488ed668133dc3bdd238d402))
+* Update dependency com.google.cloud:google-cloud-monitoring to v3.54.0 ([#3438](https://github.com/googleapis/java-spanner/issues/3438)) ([fa18894](https://github.com/googleapis/java-spanner/commit/fa188942c506c85f4c628a8b442b0ee2e6cb845f))
+* Update dependency com.google.cloud:google-cloud-trace to v2.53.0 ([#3440](https://github.com/googleapis/java-spanner/issues/3440)) ([314eeb8](https://github.com/googleapis/java-spanner/commit/314eeb823e14c386ea6e65caae8c80e908e05600))
+* Update dependency io.opentelemetry:opentelemetry-bom to v1.44.1 ([#3452](https://github.com/googleapis/java-spanner/issues/3452)) ([6518eea](https://github.com/googleapis/java-spanner/commit/6518eea2921006f1aa431e02754118e3d3d3b620))
+* Update opentelemetry.version to v1.44.1 ([#3451](https://github.com/googleapis/java-spanner/issues/3451)) ([d9b0271](https://github.com/googleapis/java-spanner/commit/d9b0271603dd14c51954532054b134419150625a))
+
+
+### Documentation
+
+* Update samples' README.md to ensure given ([#3420](https://github.com/googleapis/java-spanner/issues/3420)) ([663a974](https://github.com/googleapis/java-spanner/commit/663a974dc2a52d773deb620b0bc65f0049f63693))
+
## [6.81.0](https://github.com/googleapis/java-spanner/compare/v6.80.1...v6.81.0) (2024-11-01)
diff --git a/README.md b/README.md
index 9fee399283c..4e1fbea168f 100644
--- a/README.md
+++ b/README.md
@@ -41,7 +41,7 @@ If you are using Maven without the BOM, add this to your dependencies:
com.google.cloud
google-cloud-spanner
- 6.79.0
+ 6.81.0
```
@@ -49,20 +49,20 @@ If you are using Maven without the BOM, add this to your dependencies:
If you are using Gradle 5.x or later, add this to your dependencies:
```Groovy
-implementation platform('com.google.cloud:libraries-bom:26.49.0')
+implementation platform('com.google.cloud:libraries-bom:26.50.0')
implementation 'com.google.cloud:google-cloud-spanner'
```
If you are using Gradle without BOM, add this to your dependencies:
```Groovy
-implementation 'com.google.cloud:google-cloud-spanner:6.81.0'
+implementation 'com.google.cloud:google-cloud-spanner:6.81.1'
```
If you are using SBT, add this to your dependencies:
```Scala
-libraryDependencies += "com.google.cloud" % "google-cloud-spanner" % "6.81.0"
+libraryDependencies += "com.google.cloud" % "google-cloud-spanner" % "6.81.1"
```
## Authentication
@@ -719,7 +719,7 @@ Java is a registered trademark of Oracle and/or its affiliates.
[kokoro-badge-link-5]: http://storage.googleapis.com/cloud-devrel-public/java/badges/java-spanner/java11.html
[stability-image]: https://img.shields.io/badge/stability-stable-green
[maven-version-image]: https://img.shields.io/maven-central/v/com.google.cloud/google-cloud-spanner.svg
-[maven-version-link]: https://central.sonatype.com/artifact/com.google.cloud/google-cloud-spanner/6.81.0
+[maven-version-link]: https://central.sonatype.com/artifact/com.google.cloud/google-cloud-spanner/6.81.1
[authentication]: https://github.com/googleapis/google-cloud-java#authentication
[auth-scopes]: https://developers.google.com/identity/protocols/oauth2/scopes
[predefined-iam-roles]: https://cloud.google.com/iam/docs/understanding-roles#predefined_roles
diff --git a/benchmarks/pom.xml b/benchmarks/pom.xml
index 1ebfc87bc77..a94cd89a40f 100644
--- a/benchmarks/pom.xml
+++ b/benchmarks/pom.xml
@@ -24,7 +24,7 @@
com.google.cloud
google-cloud-spanner-parent
- 6.81.0
+ 6.81.1
@@ -34,7 +34,7 @@
UTF-8
UTF-8
2.10.1
- 1.43.0
+ 1.44.1
@@ -85,14 +85,14 @@
io.opentelemetry
opentelemetry-bom
- 1.43.0
+ 1.44.1
pom
import
com.google.cloud
google-cloud-spanner
- 6.79.0
+ 6.81.0
commons-cli
diff --git a/generation_config.yaml b/generation_config.yaml
index 291fc91c39e..c5c16b838c9 100644
--- a/generation_config.yaml
+++ b/generation_config.yaml
@@ -1,6 +1,6 @@
gapic_generator_version: 2.49.0
-googleapis_commitish: 48d30c4966ef9ea31b897e13f75d8f94070cc8e9
-libraries_bom_version: 26.49.0
+googleapis_commitish: c72f219fedbb57d3f83c10550e135c4824b670eb
+libraries_bom_version: 26.50.0
libraries:
- api_shortname: spanner
name_pretty: Cloud Spanner
diff --git a/google-cloud-spanner-bom/pom.xml b/google-cloud-spanner-bom/pom.xml
index 8f2a678f7a0..c07ccbe30a4 100644
--- a/google-cloud-spanner-bom/pom.xml
+++ b/google-cloud-spanner-bom/pom.xml
@@ -3,7 +3,7 @@
4.0.0
com.google.cloud
google-cloud-spanner-bom
- 6.81.0
+ 6.81.1
pom
com.google.cloud
@@ -53,43 +53,43 @@
com.google.cloud
google-cloud-spanner
- 6.81.0
+ 6.81.1
com.google.cloud
google-cloud-spanner
test-jar
- 6.81.0
+ 6.81.1
com.google.api.grpc
grpc-google-cloud-spanner-v1
- 6.81.0
+ 6.81.1
com.google.api.grpc
grpc-google-cloud-spanner-admin-instance-v1
- 6.81.0
+ 6.81.1
com.google.api.grpc
grpc-google-cloud-spanner-admin-database-v1
- 6.81.0
+ 6.81.1
com.google.api.grpc
proto-google-cloud-spanner-admin-instance-v1
- 6.81.0
+ 6.81.1
com.google.api.grpc
proto-google-cloud-spanner-v1
- 6.81.0
+ 6.81.1
com.google.api.grpc
proto-google-cloud-spanner-admin-database-v1
- 6.81.0
+ 6.81.1
diff --git a/google-cloud-spanner-executor/pom.xml b/google-cloud-spanner-executor/pom.xml
index 6acdd898545..e0113c09aec 100644
--- a/google-cloud-spanner-executor/pom.xml
+++ b/google-cloud-spanner-executor/pom.xml
@@ -5,14 +5,14 @@
4.0.0
com.google.cloud
google-cloud-spanner-executor
- 6.81.0
+ 6.81.1
jar
Google Cloud Spanner Executor
com.google.cloud
google-cloud-spanner-parent
- 6.81.0
+ 6.81.1
@@ -22,10 +22,40 @@
+
+ io.opentelemetry
+ opentelemetry-api
+
+
+ io.opentelemetry
+ opentelemetry-context
+
+
+ io.opentelemetry
+ opentelemetry-sdk
+
+
+ io.opentelemetry
+ opentelemetry-sdk-common
+
+
+ io.opentelemetry
+ opentelemetry-sdk-trace
+
+
+ com.google.cloud.opentelemetry
+ exporter-trace
+ 0.32.0
+
com.google.cloud
google-cloud-spanner
+
+ com.google.cloud
+ google-cloud-trace
+ 2.52.0
+
io.grpc
grpc-api
@@ -94,6 +124,11 @@
com.google.api.grpc
proto-google-cloud-spanner-executor-v1
+
+ com.google.api.grpc
+ proto-google-cloud-trace-v1
+ 2.52.0
+
com.google.api.grpc
grpc-google-cloud-spanner-executor-v1
@@ -219,7 +254,7 @@
org.apache.maven.plugins
maven-failsafe-plugin
- 3.5.1
+ 3.5.2
diff --git a/google-cloud-spanner-executor/src/main/java/com/google/cloud/executor/spanner/CloudClientExecutor.java b/google-cloud-spanner-executor/src/main/java/com/google/cloud/executor/spanner/CloudClientExecutor.java
index d180f55d06a..714dbc309cc 100644
--- a/google-cloud-spanner-executor/src/main/java/com/google/cloud/executor/spanner/CloudClientExecutor.java
+++ b/google-cloud-spanner-executor/src/main/java/com/google/cloud/executor/spanner/CloudClientExecutor.java
@@ -18,6 +18,7 @@
import static com.google.cloud.spanner.TransactionRunner.TransactionCallable;
+import com.google.api.gax.core.FixedCredentialsProvider;
import com.google.api.gax.longrunning.OperationFuture;
import com.google.api.gax.paging.Page;
import com.google.api.gax.retrying.RetrySettings;
@@ -70,15 +71,21 @@
import com.google.cloud.spanner.TimestampBound;
import com.google.cloud.spanner.TransactionContext;
import com.google.cloud.spanner.TransactionRunner;
+import com.google.cloud.spanner.TransactionRunner.TransactionCallable;
import com.google.cloud.spanner.Type;
import com.google.cloud.spanner.Value;
import com.google.cloud.spanner.encryption.CustomerManagedEncryption;
import com.google.cloud.spanner.v1.stub.SpannerStubSettings;
+import com.google.cloud.trace.v1.TraceServiceClient;
+import com.google.cloud.trace.v1.TraceServiceSettings;
import com.google.common.base.Function;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import com.google.devtools.cloudtrace.v1.GetTraceRequest;
+import com.google.devtools.cloudtrace.v1.Trace;
+import com.google.devtools.cloudtrace.v1.TraceSpan;
import com.google.longrunning.Operation;
import com.google.protobuf.ByteString;
import com.google.protobuf.util.Timestamps;
@@ -152,6 +159,9 @@
import com.google.spanner.v1.TypeCode;
import io.grpc.Status;
import io.grpc.stub.StreamObserver;
+import io.opentelemetry.api.trace.Span;
+import io.opentelemetry.api.trace.Tracer;
+import io.opentelemetry.context.Scope;
import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.IOException;
@@ -166,7 +176,9 @@
import java.util.Objects;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;
@@ -332,24 +344,28 @@ public void startRWTransaction() throws Exception {
// Try to commit
return null;
};
+ io.opentelemetry.context.Context context = io.opentelemetry.context.Context.current();
Runnable runnable =
- () -> {
- try {
- runner =
- optimistic
- ? dbClient.readWriteTransaction(Options.optimisticLock())
- : dbClient.readWriteTransaction();
- LOGGER.log(Level.INFO, String.format("Ready to run callable %s\n", transactionSeed));
- runner.run(callable);
- transactionSucceeded(runner.getCommitTimestamp().toProto());
- } catch (SpannerException e) {
- LOGGER.log(
- Level.WARNING,
- String.format("Transaction runnable failed with exception %s\n", e.getMessage()),
- e);
- transactionFailed(e);
- }
- };
+ context.wrap(
+ () -> {
+ try {
+ runner =
+ optimistic
+ ? dbClient.readWriteTransaction(Options.optimisticLock())
+ : dbClient.readWriteTransaction();
+ LOGGER.log(
+ Level.INFO, String.format("Ready to run callable %s\n", transactionSeed));
+ runner.run(callable);
+ transactionSucceeded(runner.getCommitTimestamp().toProto());
+ } catch (SpannerException e) {
+ LOGGER.log(
+ Level.WARNING,
+ String.format(
+ "Transaction runnable failed with exception %s\n", e.getMessage()),
+ e);
+ transactionFailed(e);
+ }
+ });
LOGGER.log(
Level.INFO,
String.format("Callable and Runnable created, ready to execute %s\n", transactionSeed));
@@ -753,6 +769,11 @@ public synchronized void closeBatchTxn() throws SpannerException {
Executors.newCachedThreadPool(
new ThreadFactoryBuilder().setNameFormat("action-pool-%d").build());
+ // Thread pool to verify end to end traces.
+ private static final ExecutorService endToEndTracesThreadPool =
+ Executors.newCachedThreadPool(
+ new ThreadFactoryBuilder().setNameFormat("end-to-end-traces-pool-%d").build());
+
private synchronized Spanner getClientWithTimeout(
long timeoutSeconds, boolean useMultiplexedSession) throws IOException {
if (clientWithTimeout != null) {
@@ -818,6 +839,8 @@ private synchronized Spanner getClient(long timeoutSeconds, boolean useMultiplex
.setHost(HOST_PREFIX + WorkerProxy.spannerPort)
.setCredentials(credentials)
.setChannelProvider(channelProvider)
+ .setEnableEndToEndTracing(true)
+ .setOpenTelemetry(WorkerProxy.openTelemetrySdk)
.setSessionPoolOption(sessionPoolOptions);
SpannerStubSettings.Builder stubSettingsBuilder =
@@ -841,6 +864,88 @@ private synchronized Spanner getClient(long timeoutSeconds, boolean useMultiplex
return optionsBuilder.build().getService();
}
+ private TraceServiceClient traceServiceClient;
+
+ // Return the trace service client, create one if not exists.
+ private synchronized TraceServiceClient getTraceServiceClient() throws IOException {
+ if (traceServiceClient != null) {
+ return traceServiceClient;
+ }
+ // Create a trace service client
+ Credentials credentials;
+ if (WorkerProxy.serviceKeyFile.isEmpty()) {
+ credentials = NoCredentials.getInstance();
+ } else {
+ credentials =
+ GoogleCredentials.fromStream(
+ new ByteArrayInputStream(
+ FileUtils.readFileToByteArray(new File(WorkerProxy.serviceKeyFile))),
+ HTTP_TRANSPORT_FACTORY);
+ }
+
+ TraceServiceSettings traceServiceSettings =
+ TraceServiceSettings.newBuilder()
+ .setEndpoint(WorkerProxy.CLOUD_TRACE_ENDPOINT)
+ .setCredentialsProvider(FixedCredentialsProvider.create(credentials))
+ .build();
+
+ traceServiceClient = TraceServiceClient.create(traceServiceSettings);
+ return traceServiceClient;
+ }
+
+ public Future getEndToEndTraceVerificationTask(String traceId) {
+ return endToEndTracesThreadPool.submit(
+ () -> {
+ try {
+ // Wait for 10 seconds before verifying to ensure traces are exported.
+ long sleepDuration = TimeUnit.SECONDS.toMillis(10);
+ LOGGER.log(
+ Level.INFO,
+ String.format(
+ "Sleeping for %d milliseconds before verifying end to end trace",
+ sleepDuration));
+ Thread.sleep(sleepDuration);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt(); // Handle interruption
+ LOGGER.log(Level.INFO, String.format("Thread interrupted."));
+ return false; // Return false if interrupted
+ }
+ return isExportedEndToEndTraceValid(traceId);
+ });
+ }
+
+ private static final String READ_WRITE_TRANSACTION = "CloudSpanner.ReadWriteTransaction";
+ private static final String READ_ONLY_TRANSACTION = "CloudSpanner.ReadOnlyTransaction";
+
+ /* Returns whether a exported trace is valid. */
+ public boolean isExportedEndToEndTraceValid(String traceId) {
+ try {
+ GetTraceRequest getTraceRequest =
+ GetTraceRequest.newBuilder()
+ .setProjectId(WorkerProxy.PROJECT_ID)
+ .setTraceId(traceId)
+ .build();
+ Trace trace = getTraceServiceClient().getTrace(getTraceRequest);
+ boolean readWriteOrReadOnlyTxnPresent = false, spannerServerSideSpanPresent = false;
+ for (TraceSpan span : trace.getSpansList()) {
+ if (span.getName().contains(READ_ONLY_TRANSACTION)
+ || span.getName().contains(READ_WRITE_TRANSACTION)) {
+ readWriteOrReadOnlyTxnPresent = true;
+ }
+ if (span.getName().startsWith("Spanner.")) {
+ spannerServerSideSpanPresent = true;
+ }
+ }
+ if (readWriteOrReadOnlyTxnPresent && !spannerServerSideSpanPresent) {
+ return false;
+ }
+ } catch (Exception e) {
+ LOGGER.log(Level.WARNING, "Failed to verify end to end trace.", e);
+ return false;
+ }
+ return true;
+ }
+
/** Handle actions. */
public Status startHandlingRequest(
SpannerAsyncActionRequest req, ExecutionFlowContext executionContext) {
@@ -865,17 +970,20 @@ public Status startHandlingRequest(
useMultiplexedSession = false;
}
+ io.opentelemetry.context.Context context = io.opentelemetry.context.Context.current();
actionThreadPool.execute(
- () -> {
- Status status =
- executeAction(outcomeSender, action, dbPath, useMultiplexedSession, executionContext);
- if (!status.isOk()) {
- LOGGER.log(
- Level.WARNING,
- String.format("Failed to execute action with error: %s\n%s", status, action));
- executionContext.onError(status.getCause());
- }
- });
+ context.wrap(
+ () -> {
+ Status status =
+ executeAction(
+ outcomeSender, action, dbPath, useMultiplexedSession, executionContext);
+ if (!status.isOk()) {
+ LOGGER.log(
+ Level.WARNING,
+ String.format("Failed to execute action with error: %s\n%s", status, action));
+ executionContext.onError(status.getCause());
+ }
+ }));
return Status.OK;
}
@@ -886,7 +994,10 @@ private Status executeAction(
String dbPath,
boolean useMultiplexedSession,
ExecutionFlowContext executionContext) {
-
+ Tracer tracer = WorkerProxy.openTelemetrySdk.getTracer(CloudClientExecutor.class.getName());
+ String actionType = action.getActionCase().toString();
+ Span span = tracer.spanBuilder(String.format("performaction_%s", actionType)).startSpan();
+ Scope scope = span.makeCurrent();
try {
if (action.hasAdmin()) {
return executeAdminAction(useMultiplexedSession, action.getAdmin(), outcomeSender);
@@ -959,11 +1070,15 @@ private Status executeAction(
ErrorCode.UNIMPLEMENTED, "Not implemented yet: \n" + action)));
}
} catch (Exception e) {
+ span.recordException(e);
LOGGER.log(Level.WARNING, "Unexpected error: " + e.getMessage());
return outcomeSender.finishWithError(
toStatus(
SpannerExceptionFactory.newSpannerException(
ErrorCode.INVALID_ARGUMENT, "Unexpected error: " + e.getMessage())));
+ } finally {
+ scope.close();
+ span.end();
}
}
diff --git a/google-cloud-spanner-executor/src/main/java/com/google/cloud/executor/spanner/CloudExecutorImpl.java b/google-cloud-spanner-executor/src/main/java/com/google/cloud/executor/spanner/CloudExecutorImpl.java
index d2e7d9b19d1..6fee10c95b6 100644
--- a/google-cloud-spanner-executor/src/main/java/com/google/cloud/executor/spanner/CloudExecutorImpl.java
+++ b/google-cloud-spanner-executor/src/main/java/com/google/cloud/executor/spanner/CloudExecutorImpl.java
@@ -26,6 +26,11 @@
import com.google.spanner.executor.v1.SpannerOptions;
import io.grpc.Status;
import io.grpc.stub.StreamObserver;
+import io.opentelemetry.api.trace.Span;
+import io.opentelemetry.api.trace.Tracer;
+import io.opentelemetry.context.Scope;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.logging.Level;
import java.util.logging.Logger;
@@ -40,16 +45,39 @@ public class CloudExecutorImpl extends SpannerExecutorProxyGrpc.SpannerExecutorP
// Ratio of operations to use multiplexed sessions.
private final double multiplexedSessionOperationsRatio;
+ // Count of checks performed to verify end to end traces using Cloud Trace APIs.
+ private int cloudTraceCheckCount = 0;
+
+ // Maximum checks allowed to verify end to end traces using Cloud Trace APIs.
+ private static final int MAX_CLOUD_TRACE_CHECK_LIMIT = 20;
+
public CloudExecutorImpl(
boolean enableGrpcFaultInjector, double multiplexedSessionOperationsRatio) {
clientExecutor = new CloudClientExecutor(enableGrpcFaultInjector);
this.multiplexedSessionOperationsRatio = multiplexedSessionOperationsRatio;
}
+ private synchronized void incrementCloudTraceCheckCount() {
+ cloudTraceCheckCount++;
+ }
+
+ private synchronized int getCloudTraceCheckCount() {
+ return cloudTraceCheckCount;
+ }
+
/** Execute SpannerAsync action requests. */
@Override
public StreamObserver executeActionAsync(
StreamObserver responseObserver) {
+ // Create a top-level OpenTelemetry span for streaming request.
+ Tracer tracer = WorkerProxy.openTelemetrySdk.getTracer(CloudClientExecutor.class.getName());
+ Span span = tracer.spanBuilder("java_systest_execute_actions_stream").setNoParent().startSpan();
+ Scope scope = span.makeCurrent();
+
+ final String traceId = span.getSpanContext().getTraceId();
+ final boolean isSampled = span.getSpanContext().getTraceFlags().isSampled();
+ AtomicBoolean requestHasReadOrQueryAction = new AtomicBoolean(false);
+
CloudClientExecutor.ExecutionFlowContext executionContext =
clientExecutor.new ExecutionFlowContext(responseObserver);
return new StreamObserver() {
@@ -86,6 +114,11 @@ public void onNext(SpannerAsyncActionRequest request) {
Level.INFO,
String.format("Updated request to set multiplexed session flag: \n%s", request));
}
+ String actionName = request.getAction().getActionCase().toString();
+ if (actionName == "READ" || actionName == "QUERY") {
+ requestHasReadOrQueryAction.set(true);
+ }
+
Status status = clientExecutor.startHandlingRequest(request, executionContext);
if (!status.isOk()) {
LOGGER.log(
@@ -104,6 +137,41 @@ public void onError(Throwable t) {
@Override
public void onCompleted() {
+ // Close the scope and end the span.
+ scope.close();
+ span.end();
+ if (isSampled
+ && getCloudTraceCheckCount() < MAX_CLOUD_TRACE_CHECK_LIMIT
+ && requestHasReadOrQueryAction.get()) {
+ Future traceVerificationTask =
+ clientExecutor.getEndToEndTraceVerificationTask(traceId);
+ try {
+ LOGGER.log(
+ Level.INFO,
+ String.format("Starting end to end trace verification for trace_id:%s", traceId));
+ Boolean isValidTrace = traceVerificationTask.get();
+ incrementCloudTraceCheckCount();
+ if (!isValidTrace) {
+ executionContext.onError(
+ Status.INTERNAL
+ .withDescription(
+ String.format(
+ "failed to verify end to end trace for trace_id: %s", traceId))
+ .getCause());
+ executionContext.cleanup();
+ return;
+ }
+ } catch (Exception e) {
+ LOGGER.log(
+ Level.WARNING,
+ String.format(
+ "Failed to verify end to end trace with exception: %s\n", e.getMessage()),
+ e);
+ executionContext.onError(e);
+ executionContext.cleanup();
+ return;
+ }
+ }
LOGGER.log(Level.INFO, "Client called Done, half closed");
executionContext.cleanup();
responseObserver.onCompleted();
diff --git a/google-cloud-spanner-executor/src/main/java/com/google/cloud/executor/spanner/CloudUtil.java b/google-cloud-spanner-executor/src/main/java/com/google/cloud/executor/spanner/CloudUtil.java
index 30a4d98a354..17b98bbdada 100644
--- a/google-cloud-spanner-executor/src/main/java/com/google/cloud/executor/spanner/CloudUtil.java
+++ b/google-cloud-spanner-executor/src/main/java/com/google/cloud/executor/spanner/CloudUtil.java
@@ -22,6 +22,7 @@
import com.google.api.gax.rpc.FixedTransportChannelProvider;
import com.google.api.gax.rpc.TransportChannel;
import com.google.api.gax.rpc.TransportChannelProvider;
+import com.google.cloud.spanner.spi.v1.TraceContextInterceptor;
import com.google.common.net.HostAndPort;
import io.grpc.ManagedChannelBuilder;
import io.grpc.netty.shaded.io.grpc.netty.GrpcSslContexts;
@@ -91,6 +92,7 @@ public static ManagedChannelBuilder> getChannelBuilderForTestGFE(
return channelBuilder
.overrideAuthority(hostInCert)
.sslContext(sslContext)
+ .intercept(new TraceContextInterceptor(WorkerProxy.openTelemetrySdk))
.negotiationType(NegotiationType.TLS);
} catch (Throwable t) {
throw new RuntimeException(t);
diff --git a/google-cloud-spanner-executor/src/main/java/com/google/cloud/executor/spanner/WorkerProxy.java b/google-cloud-spanner-executor/src/main/java/com/google/cloud/executor/spanner/WorkerProxy.java
index 61034754f20..2146adb1d47 100644
--- a/google-cloud-spanner-executor/src/main/java/com/google/cloud/executor/spanner/WorkerProxy.java
+++ b/google-cloud-spanner-executor/src/main/java/com/google/cloud/executor/spanner/WorkerProxy.java
@@ -16,12 +16,29 @@
package com.google.cloud.executor.spanner;
+import com.google.api.client.http.javanet.NetHttpTransport;
+import com.google.auth.Credentials;
+import com.google.auth.http.HttpTransportFactory;
+import com.google.auth.oauth2.GoogleCredentials;
+import com.google.cloud.opentelemetry.trace.TraceConfiguration;
+import com.google.cloud.opentelemetry.trace.TraceExporter;
import com.google.cloud.spanner.ErrorCode;
import com.google.cloud.spanner.SpannerExceptionFactory;
+import com.google.cloud.spanner.SpannerOptions;
import io.grpc.Server;
import io.grpc.ServerBuilder;
import io.grpc.protobuf.services.HealthStatusManager;
import io.grpc.protobuf.services.ProtoReflectionService;
+import io.opentelemetry.api.trace.propagation.W3CTraceContextPropagator;
+import io.opentelemetry.context.propagation.ContextPropagators;
+import io.opentelemetry.sdk.OpenTelemetrySdk;
+import io.opentelemetry.sdk.resources.Resource;
+import io.opentelemetry.sdk.trace.SdkTracerProvider;
+import io.opentelemetry.sdk.trace.export.BatchSpanProcessor;
+import io.opentelemetry.sdk.trace.export.SpanExporter;
+import io.opentelemetry.sdk.trace.samplers.Sampler;
+import java.io.ByteArrayInputStream;
+import java.io.File;
import java.io.IOException;
import java.util.logging.Level;
import java.util.logging.Logger;
@@ -30,6 +47,7 @@
import org.apache.commons.cli.DefaultParser;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
+import org.apache.commons.io.FileUtils;
/**
* Worker proxy for Java API. This is the main entry of the Java client proxy on cloud Spanner Java
@@ -55,13 +73,48 @@ public class WorkerProxy {
public static double multiplexedSessionOperationsRatio = 0.0;
public static boolean usePlainTextChannel = false;
public static boolean enableGrpcFaultInjector = false;
+ public static OpenTelemetrySdk openTelemetrySdk;
public static CommandLine commandLine;
+ public static final String PROJECT_ID = "spanner-cloud-systest";
+ public static final String CLOUD_TRACE_ENDPOINT = "staging-cloudtrace.sandbox.googleapis.com:443";
+
private static final int MIN_PORT = 0, MAX_PORT = 65535;
- private static final double MIN_RATIO = 0.0, MAX_RATIO = 1.0;
+ private static final double MIN_RATIO = 0.0, MAX_RATIO = 1.0, TRACE_SAMPLING_RATE = 0.01;
+
+ public static OpenTelemetrySdk setupOpenTelemetrySdk() throws Exception {
+ // Read credentials from the serviceKeyFile.
+ HttpTransportFactory HTTP_TRANSPORT_FACTORY = NetHttpTransport::new;
+ Credentials credentials =
+ GoogleCredentials.fromStream(
+ new ByteArrayInputStream(FileUtils.readFileToByteArray(new File(serviceKeyFile))),
+ HTTP_TRANSPORT_FACTORY);
+
+ // OpenTelemetry configuration.
+ SpanExporter spanExporter =
+ TraceExporter.createWithConfiguration(
+ TraceConfiguration.builder()
+ .setProjectId(PROJECT_ID)
+ .setCredentials(credentials)
+ .setTraceServiceEndpoint(CLOUD_TRACE_ENDPOINT)
+ .build());
+ return OpenTelemetrySdk.builder()
+ .setTracerProvider(
+ SdkTracerProvider.builder()
+ .addSpanProcessor(BatchSpanProcessor.builder(spanExporter).build())
+ .setResource(Resource.getDefault())
+ .setSampler(Sampler.parentBased(Sampler.traceIdRatioBased(TRACE_SAMPLING_RATE)))
+ .build())
+ .setPropagators(ContextPropagators.create(W3CTraceContextPropagator.getInstance()))
+ .build();
+ }
public static void main(String[] args) throws Exception {
+ // Enable OpenTelemetry metrics and traces before injecting Opentelemetry.
+ SpannerOptions.enableOpenTelemetryMetrics();
+ SpannerOptions.enableOpenTelemetryTraces();
+
commandLine = buildOptions(args);
if (!commandLine.hasOption(OPTION_SPANNER_PORT)) {
@@ -117,6 +170,8 @@ public static void main(String[] args) throws Exception {
+ MAX_RATIO);
}
}
+ // Setup the OpenTelemetry for tracing.
+ openTelemetrySdk = setupOpenTelemetrySdk();
Server server;
while (true) {
diff --git a/google-cloud-spanner/pom.xml b/google-cloud-spanner/pom.xml
index 1379d5a11c0..69d0f82b034 100644
--- a/google-cloud-spanner/pom.xml
+++ b/google-cloud-spanner/pom.xml
@@ -3,7 +3,7 @@
4.0.0
com.google.cloud
google-cloud-spanner
- 6.81.0
+ 6.81.1
jar
Google Cloud Spanner
https://github.com/googleapis/java-spanner
@@ -11,7 +11,7 @@
com.google.cloud
google-cloud-spanner-parent
- 6.81.0
+ 6.81.1
google-cloud-spanner
@@ -265,12 +265,12 @@
com.google.cloud
google-cloud-monitoring
- 3.53.0
+ 3.54.0
com.google.api.grpc
proto-google-cloud-monitoring-v3
- 3.53.0
+ 3.54.0
com.google.auth
diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractReadContext.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractReadContext.java
index a89090e34d9..cecf462bd25 100644
--- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractReadContext.java
+++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractReadContext.java
@@ -768,9 +768,14 @@ ResultSet executeQueryInternalWithOptions(
rpc.getExecuteQueryRetrySettings(),
rpc.getExecuteQueryRetryableCodes()) {
@Override
- CloseableIterator startStream(@Nullable ByteString resumeToken) {
+ CloseableIterator startStream(
+ @Nullable ByteString resumeToken,
+ AsyncResultSet.StreamMessageListener streamListener) {
GrpcStreamIterator stream =
new GrpcStreamIterator(statement, prefetchChunks, cancelQueryWhenClientIsClosed);
+ if (streamListener != null) {
+ stream.registerListener(streamListener);
+ }
if (partitionToken != null) {
request.setPartitionToken(partitionToken);
}
@@ -791,8 +796,8 @@ CloseableIterator startStream(@Nullable ByteString resumeToken
getTransactionChannelHint(),
isRouteToLeader());
session.markUsed(clock.instant());
- call.request(prefetchChunks);
stream.setCall(call, request.getTransaction().hasBegin());
+ call.request(prefetchChunks);
return stream;
}
@@ -959,9 +964,14 @@ ResultSet readInternalWithOptions(
rpc.getReadRetrySettings(),
rpc.getReadRetryableCodes()) {
@Override
- CloseableIterator startStream(@Nullable ByteString resumeToken) {
+ CloseableIterator startStream(
+ @Nullable ByteString resumeToken,
+ AsyncResultSet.StreamMessageListener streamListener) {
GrpcStreamIterator stream =
new GrpcStreamIterator(prefetchChunks, cancelQueryWhenClientIsClosed);
+ if (streamListener != null) {
+ stream.registerListener(streamListener);
+ }
TransactionSelector selector = null;
if (resumeToken != null) {
builder.setResumeToken(resumeToken);
@@ -980,8 +990,8 @@ CloseableIterator startStream(@Nullable ByteString resumeToken
getTransactionChannelHint(),
isRouteToLeader());
session.markUsed(clock.instant());
- call.request(prefetchChunks);
stream.setCall(call, /* withBeginTransaction = */ builder.getTransaction().hasBegin());
+ call.request(prefetchChunks);
return stream;
}
diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractResultSet.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractResultSet.java
index fdc0398d5fe..3dca970f96e 100644
--- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractResultSet.java
+++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractResultSet.java
@@ -150,6 +150,14 @@ interface CloseableIterator extends Iterator {
void close(@Nullable String message);
boolean isWithBeginTransaction();
+
+ /**
+ * @param streamMessageListener A class object which implements StreamMessageListener
+ * @return true if streaming is supported by the iterator, otherwise false
+ */
+ default boolean initiateStreaming(AsyncResultSet.StreamMessageListener streamMessageListener) {
+ return false;
+ }
}
static double valueProtoToFloat64(com.google.protobuf.Value proto) {
diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AsyncResultSet.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AsyncResultSet.java
index dfedcc4f8be..2b3225bfc59 100644
--- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AsyncResultSet.java
+++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AsyncResultSet.java
@@ -18,6 +18,7 @@
import com.google.api.core.ApiFuture;
import com.google.common.base.Function;
+import com.google.spanner.v1.PartialResultSet;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
@@ -223,4 +224,12 @@ interface ReadyCallback {
* @param transformer function which will be used to transform the row. It should not return null.
*/
List toList(Function transformer) throws SpannerException;
+
+ /**
+ * An interface to register the listener for streaming gRPC request. It will be called when a
+ * chunk is received from gRPC streaming call.
+ */
+ interface StreamMessageListener {
+ void onStreamMessage(PartialResultSet partialResultSet, boolean bufferIsFull);
+ }
}
diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AsyncResultSetImpl.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AsyncResultSetImpl.java
index fa7cc158c19..1161822cd10 100644
--- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AsyncResultSetImpl.java
+++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AsyncResultSetImpl.java
@@ -18,7 +18,6 @@
import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutures;
-import com.google.api.core.ListenableFutureToApiFuture;
import com.google.api.core.SettableApiFuture;
import com.google.api.gax.core.ExecutorProvider;
import com.google.cloud.spanner.AbstractReadContext.ListenableAsyncResultSet;
@@ -29,13 +28,13 @@
import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.ListeningScheduledExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
+import com.google.spanner.v1.PartialResultSet;
import com.google.spanner.v1.ResultSetMetadata;
import com.google.spanner.v1.ResultSetStats;
import java.util.Collection;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.BlockingDeque;
-import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
@@ -45,12 +44,14 @@
import java.util.logging.Logger;
/** Default implementation for {@link AsyncResultSet}. */
-class AsyncResultSetImpl extends ForwardingStructReader implements ListenableAsyncResultSet {
+class AsyncResultSetImpl extends ForwardingStructReader
+ implements ListenableAsyncResultSet, AsyncResultSet.StreamMessageListener {
private static final Logger log = Logger.getLogger(AsyncResultSetImpl.class.getName());
/** State of an {@link AsyncResultSetImpl}. */
private enum State {
INITIALIZED,
+ STREAMING_INITIALIZED,
/** SYNC indicates that the {@link ResultSet} is used in sync pattern. */
SYNC,
CONSUMING,
@@ -115,12 +116,15 @@ private enum State {
private State state = State.INITIALIZED;
+ /** This variable indicates that produce rows thread is initiated */
+ private volatile boolean produceRowsInitiated;
+
/**
* This variable indicates whether all the results from the underlying result set have been read.
*/
private volatile boolean finished;
- private volatile ApiFuture result;
+ private volatile SettableApiFuture result;
/**
* This variable indicates whether {@link #tryNext()} has returned {@link CursorState#DONE} or a
@@ -329,12 +333,12 @@ public void run() {
private final CallbackRunnable callbackRunnable = new CallbackRunnable();
/**
- * {@link ProduceRowsCallable} reads data from the underlying {@link ResultSet}, places these in
+ * {@link ProduceRowsRunnable} reads data from the underlying {@link ResultSet}, places these in
* the buffer and dispatches the {@link CallbackRunnable} when data is ready to be consumed.
*/
- private class ProduceRowsCallable implements Callable {
+ private class ProduceRowsRunnable implements Runnable {
@Override
- public Void call() throws Exception {
+ public void run() {
boolean stop = false;
boolean hasNext = false;
try {
@@ -393,12 +397,17 @@ public Void call() throws Exception {
}
// Call the callback if there are still rows in the buffer that need to be processed.
while (!stop) {
- waitIfPaused();
- startCallbackIfNecessary();
- // Make sure we wait until the callback runner has actually finished.
- consumingLatch.await();
- synchronized (monitor) {
- stop = cursorReturnedDoneOrException;
+ try {
+ waitIfPaused();
+ startCallbackIfNecessary();
+ // Make sure we wait until the callback runner has actually finished.
+ consumingLatch.await();
+ synchronized (monitor) {
+ stop = cursorReturnedDoneOrException;
+ }
+ } catch (Throwable e) {
+ result.setException(e);
+ return;
}
}
} finally {
@@ -410,14 +419,14 @@ public Void call() throws Exception {
}
synchronized (monitor) {
if (executionException != null) {
- throw executionException;
- }
- if (state == State.CANCELLED) {
- throw CANCELLED_EXCEPTION;
+ result.setException(executionException);
+ } else if (state == State.CANCELLED) {
+ result.setException(CANCELLED_EXCEPTION);
+ } else {
+ result.set(null);
}
}
}
- return null;
}
private void waitIfPaused() throws InterruptedException {
@@ -449,6 +458,26 @@ private void startCallbackWithBufferLatchIfNecessary(int bufferLatch) {
}
}
+ private class InitiateStreamingRunnable implements Runnable {
+
+ @Override
+ public void run() {
+ try {
+ // This method returns true if the underlying result set is a streaming result set (e.g. a
+ // GrpcResultSet).
+ // Those result sets will trigger initiateProduceRows() when the first results are received.
+ // Non-streaming result sets do not trigger this callback, and for those result sets, we
+ // need to eagerly start the ProduceRowsRunnable.
+ if (!initiateStreaming(AsyncResultSetImpl.this)) {
+ initiateProduceRows();
+ }
+ } catch (Throwable exception) {
+ executionException = SpannerExceptionFactory.asSpannerException(exception);
+ initiateProduceRows();
+ }
+ }
+ }
+
/** Sets the callback for this {@link AsyncResultSet}. */
@Override
public ApiFuture setCallback(Executor exec, ReadyCallback cb) {
@@ -458,16 +487,24 @@ public ApiFuture setCallback(Executor exec, ReadyCallback cb) {
this.state == State.INITIALIZED, "callback may not be set multiple times");
// Start to fetch data and buffer these.
- this.result =
- new ListenableFutureToApiFuture<>(this.service.submit(new ProduceRowsCallable()));
+ this.result = SettableApiFuture.create();
+ this.state = State.STREAMING_INITIALIZED;
+ this.service.execute(new InitiateStreamingRunnable());
this.executor = MoreExecutors.newSequentialExecutor(Preconditions.checkNotNull(exec));
this.callback = Preconditions.checkNotNull(cb);
- this.state = State.RUNNING;
pausedLatch.countDown();
return result;
}
}
+ private void initiateProduceRows() {
+ if (this.state == State.STREAMING_INITIALIZED) {
+ this.state = State.RUNNING;
+ }
+ produceRowsInitiated = true;
+ this.service.execute(new ProduceRowsRunnable());
+ }
+
Future getResult() {
return result;
}
@@ -578,6 +615,10 @@ public ResultSetMetadata getMetadata() {
return delegateResultSet.get().getMetadata();
}
+ boolean initiateStreaming(StreamMessageListener streamMessageListener) {
+ return StreamingUtil.initiateStreaming(delegateResultSet.get(), streamMessageListener);
+ }
+
@Override
protected void checkValidState() {
synchronized (monitor) {
@@ -593,4 +634,22 @@ public Struct getCurrentRowAsStruct() {
checkValidState();
return currentRow;
}
+
+ @Override
+ public void onStreamMessage(PartialResultSet partialResultSet, boolean bufferIsFull) {
+ synchronized (monitor) {
+ if (produceRowsInitiated) {
+ return;
+ }
+ // if PartialResultSet contains a resume token or buffer size is full, or
+ // we have reached the end of the stream, we can start the thread.
+ boolean startJobThread =
+ !partialResultSet.getResumeToken().isEmpty()
+ || bufferIsFull
+ || partialResultSet == GrpcStreamIterator.END_OF_STREAM;
+ if (startJobThread || state != State.STREAMING_INITIALIZED) {
+ initiateProduceRows();
+ }
+ }
+ }
}
diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClientImpl.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClientImpl.java
index 91edce79325..d7f16f89524 100644
--- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClientImpl.java
+++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClientImpl.java
@@ -92,7 +92,7 @@ DatabaseClient getMultiplexedSession() {
@VisibleForTesting
DatabaseClient getMultiplexedSessionForRW() {
- if (this.useMultiplexedSessionForRW) {
+ if (canUseMultiplexedSessionsForRW()) {
return getMultiplexedSession();
}
return getSession();
@@ -107,6 +107,12 @@ private boolean canUseMultiplexedSessions() {
&& this.multiplexedSessionDatabaseClient.isMultiplexedSessionsSupported();
}
+ private boolean canUseMultiplexedSessionsForRW() {
+ return this.useMultiplexedSessionForRW
+ && this.multiplexedSessionDatabaseClient != null
+ && this.multiplexedSessionDatabaseClient.isMultiplexedSessionsForRWSupported();
+ }
+
@Override
public Dialect getDialect() {
return pool.getDialect();
@@ -129,7 +135,7 @@ public CommitResponse writeWithOptions(
throws SpannerException {
ISpan span = tracer.spanBuilder(READ_WRITE_TRANSACTION, options);
try (IScope s = tracer.withSpan(span)) {
- if (this.useMultiplexedSessionForRW && getMultiplexedSessionDatabaseClient() != null) {
+ if (canUseMultiplexedSessionsForRW() && getMultiplexedSessionDatabaseClient() != null) {
return getMultiplexedSessionDatabaseClient().writeWithOptions(mutations, options);
}
return runWithSessionRetry(session -> session.writeWithOptions(mutations, options));
diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/ForwardingResultSet.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/ForwardingResultSet.java
index babbb310a45..3c4883e6586 100644
--- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/ForwardingResultSet.java
+++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/ForwardingResultSet.java
@@ -16,6 +16,7 @@
package com.google.cloud.spanner;
+import com.google.api.core.InternalApi;
import com.google.common.base.Preconditions;
import com.google.common.base.Supplier;
import com.google.common.base.Suppliers;
@@ -23,7 +24,8 @@
import com.google.spanner.v1.ResultSetStats;
/** Forwarding implementation of ResultSet that forwards all calls to a delegate. */
-public class ForwardingResultSet extends ForwardingStructReader implements ProtobufResultSet {
+public class ForwardingResultSet extends ForwardingStructReader
+ implements ProtobufResultSet, StreamingResultSet {
private Supplier extends ResultSet> delegate;
@@ -102,4 +104,10 @@ public ResultSetStats getStats() {
public ResultSetMetadata getMetadata() {
return delegate.get().getMetadata();
}
+
+ @Override
+ @InternalApi
+ public boolean initiateStreaming(AsyncResultSet.StreamMessageListener streamMessageListener) {
+ return StreamingUtil.initiateStreaming(delegate.get(), streamMessageListener);
+ }
}
diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/GrpcResultSet.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/GrpcResultSet.java
index 23c9dd7c2d3..c2a4ee5a585 100644
--- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/GrpcResultSet.java
+++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/GrpcResultSet.java
@@ -19,6 +19,7 @@
import static com.google.cloud.spanner.SpannerExceptionFactory.newSpannerException;
import static com.google.common.base.Preconditions.checkState;
+import com.google.api.core.InternalApi;
import com.google.common.annotations.VisibleForTesting;
import com.google.protobuf.Value;
import com.google.spanner.v1.PartialResultSet;
@@ -30,7 +31,8 @@
import javax.annotation.Nullable;
@VisibleForTesting
-class GrpcResultSet extends AbstractResultSet> implements ProtobufResultSet {
+class GrpcResultSet extends AbstractResultSet>
+ implements ProtobufResultSet, StreamingResultSet {
private final GrpcValueIterator iterator;
private final Listener listener;
private final DecodeMode decodeMode;
@@ -123,6 +125,12 @@ public ResultSetMetadata getMetadata() {
return metadata;
}
+ @Override
+ @InternalApi
+ public boolean initiateStreaming(AsyncResultSet.StreamMessageListener streamMessageListener) {
+ return iterator.initiateStreaming(streamMessageListener);
+ }
+
@Override
public void close() {
synchronized (this) {
diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/GrpcStreamIterator.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/GrpcStreamIterator.java
index af6b5683502..79c02eab58c 100644
--- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/GrpcStreamIterator.java
+++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/GrpcStreamIterator.java
@@ -20,9 +20,11 @@
import com.google.cloud.spanner.AbstractResultSet.CloseableIterator;
import com.google.cloud.spanner.spi.v1.SpannerRpc;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
import com.google.common.collect.AbstractIterator;
import com.google.common.util.concurrent.Uninterruptibles;
import com.google.spanner.v1.PartialResultSet;
+import java.util.Optional;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
@@ -36,7 +38,8 @@
class GrpcStreamIterator extends AbstractIterator
implements CloseableIterator {
private static final Logger logger = Logger.getLogger(GrpcStreamIterator.class.getName());
- private static final PartialResultSet END_OF_STREAM = PartialResultSet.newBuilder().build();
+ static final PartialResultSet END_OF_STREAM = PartialResultSet.newBuilder().build();
+ private AsyncResultSet.StreamMessageListener streamMessageListener;
private final ConsumerImpl consumer;
private final BlockingQueue stream;
@@ -66,6 +69,10 @@ protected final SpannerRpc.ResultStreamConsumer consumer() {
return consumer;
}
+ void registerListener(AsyncResultSet.StreamMessageListener streamMessageListener) {
+ this.streamMessageListener = Preconditions.checkNotNull(streamMessageListener);
+ }
+
public void setCall(SpannerRpc.StreamingCall call, boolean withBeginTransaction) {
this.call = call;
this.withBeginTransaction = withBeginTransaction;
@@ -135,6 +142,7 @@ protected final PartialResultSet computeNext() {
private void addToStream(PartialResultSet results) {
// We assume that nothing from the user will interrupt gRPC event threads.
Uninterruptibles.putUninterruptibly(stream, results);
+ onStreamMessage(results);
}
private class ConsumerImpl implements SpannerRpc.ResultStreamConsumer {
@@ -182,4 +190,9 @@ public boolean cancelQueryWhenClientIsClosed() {
return this.cancelQueryWhenClientIsClosed;
}
}
+
+ private void onStreamMessage(PartialResultSet partialResultSet) {
+ Optional.ofNullable(streamMessageListener)
+ .ifPresent(sl -> sl.onStreamMessage(partialResultSet, stream.remainingCapacity() <= 1));
+ }
}
diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/GrpcValueIterator.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/GrpcValueIterator.java
index 1a3df8b9123..24c431eec31 100644
--- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/GrpcValueIterator.java
+++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/GrpcValueIterator.java
@@ -127,6 +127,10 @@ ResultSetStats getStats() {
return statistics;
}
+ boolean initiateStreaming(AsyncResultSet.StreamMessageListener streamMessageListener) {
+ return stream.initiateStreaming(streamMessageListener);
+ }
+
Type type() {
checkState(type != null, "metadata has not been received");
return type;
diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClient.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClient.java
index 0c20c4cbc76..71e364bde83 100644
--- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClient.java
+++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClient.java
@@ -17,6 +17,7 @@
package com.google.cloud.spanner;
import static com.google.cloud.spanner.SessionImpl.NO_CHANNEL_HINT;
+import static com.google.cloud.spanner.SpannerExceptionFactory.newSpannerException;
import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutures;
@@ -27,6 +28,10 @@
import com.google.cloud.spanner.SpannerException.ResourceNotFoundException;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.google.spanner.v1.BeginTransactionRequest;
+import com.google.spanner.v1.RequestOptions;
+import com.google.spanner.v1.Transaction;
import java.time.Clock;
import java.time.Duration;
import java.time.Instant;
@@ -92,6 +97,10 @@ void onError(SpannerException spannerException) {
// synchronizing, as it does not really matter exactly which error is set.
this.client.resourceNotFoundException.set((ResourceNotFoundException) spannerException);
}
+ // Mark multiplexed sessions for RW as unimplemented and fall back to regular sessions if
+ // UNIMPLEMENTED with error message "Transaction type read_write not supported with
+ // multiplexed sessions" is returned.
+ this.client.maybeMarkUnimplementedForRW(spannerException);
}
@Override
@@ -164,6 +173,12 @@ public void close() {
/** The current multiplexed session that is used by this client. */
private final AtomicReference> multiplexedSessionReference;
+ /**
+ * The Transaction response returned by the BeginTransaction request with read-write when a
+ * multiplexed session is created during client initialization.
+ */
+ private final SettableApiFuture readWriteBeginTransactionReferenceFuture;
+
/** The expiration date/time of the current multiplexed session. */
private final AtomicReference expirationDate;
@@ -190,6 +205,12 @@ public void close() {
*/
private final AtomicBoolean unimplemented = new AtomicBoolean(false);
+ /**
+ * This flag is set to true if the server return UNIMPLEMENTED when a read-write transaction is
+ * executed on a multiplexed session. TODO: Remove once this is guaranteed to be available.
+ */
+ @VisibleForTesting final AtomicBoolean unimplementedForRW = new AtomicBoolean(false);
+
MultiplexedSessionDatabaseClient(SessionClient sessionClient) {
this(sessionClient, Clock.systemUTC());
}
@@ -217,6 +238,7 @@ public void close() {
this.tracer = sessionClient.getSpanner().getTracer();
final SettableApiFuture initialSessionReferenceFuture =
SettableApiFuture.create();
+ this.readWriteBeginTransactionReferenceFuture = SettableApiFuture.create();
this.multiplexedSessionReference = new AtomicReference<>(initialSessionReferenceFuture);
this.sessionClient.asyncCreateMultiplexedSession(
new SessionConsumer() {
@@ -226,6 +248,16 @@ public void onSessionReady(SessionImpl session) {
// only start the maintainer if we actually managed to create a session in the first
// place.
maintainer.start();
+
+ // initiate a begin transaction request to verify if read-write transactions are
+ // supported using multiplexed sessions.
+ if (sessionClient
+ .getSpanner()
+ .getOptions()
+ .getSessionPoolOptions()
+ .getUseMultiplexedSessionForRW()) {
+ verifyBeginTransactionWithRWOnMultiplexedSessionAsync(session.getName());
+ }
}
@Override
@@ -267,6 +299,70 @@ private void maybeMarkUnimplemented(Throwable t) {
}
}
+ private void maybeMarkUnimplementedForRW(SpannerException spannerException) {
+ if (spannerException.getErrorCode() == ErrorCode.UNIMPLEMENTED
+ && verifyErrorMessage(
+ spannerException,
+ "Transaction type read_write not supported with multiplexed sessions")) {
+ unimplementedForRW.set(true);
+ }
+ }
+
+ private boolean verifyErrorMessage(SpannerException spannerException, String message) {
+ if (spannerException.getCause() == null) {
+ return false;
+ }
+ if (spannerException.getCause().getMessage() == null) {
+ return false;
+ }
+ return spannerException.getCause().getMessage().contains(message);
+ }
+
+ private void verifyBeginTransactionWithRWOnMultiplexedSessionAsync(String sessionName) {
+ // TODO: Remove once this is guaranteed to be available.
+ // annotate the explict BeginTransactionRequest with a transaction tag
+ // "multiplexed-rw-background-begin-txn" to avoid storing this request on mock spanner.
+ // this is to safeguard other mock spanner tests whose BeginTransaction request count will
+ // otherwise increase by 1. Modifying the unit tests do not seem valid since this code is
+ // temporary and will be removed once the read-write on multiplexed session looks stable at
+ // backend.
+ BeginTransactionRequest.Builder requestBuilder =
+ BeginTransactionRequest.newBuilder()
+ .setSession(sessionName)
+ .setOptions(
+ SessionImpl.createReadWriteTransactionOptions(
+ Options.fromTransactionOptions(), /* previousTransactionId = */ null))
+ .setRequestOptions(
+ RequestOptions.newBuilder()
+ .setTransactionTag("multiplexed-rw-background-begin-txn")
+ .build());
+ final BeginTransactionRequest request = requestBuilder.build();
+ final ApiFuture requestFuture;
+ requestFuture =
+ sessionClient
+ .getSpanner()
+ .getRpc()
+ .beginTransactionAsync(request, /* options = */ null, /* routeToLeader = */ true);
+ requestFuture.addListener(
+ () -> {
+ try {
+ Transaction txn = requestFuture.get();
+ if (txn.getId().isEmpty()) {
+ throw newSpannerException(
+ ErrorCode.INTERNAL, "Missing id in transaction\n" + sessionName);
+ }
+ readWriteBeginTransactionReferenceFuture.set(txn);
+ } catch (Exception e) {
+ SpannerException spannerException = SpannerExceptionFactory.newSpannerException(e);
+ // Mark multiplexed sessions for RW as unimplemented and fall back to regular sessions
+ // if UNIMPLEMENTED is returned.
+ maybeMarkUnimplementedForRW(spannerException);
+ readWriteBeginTransactionReferenceFuture.setException(e);
+ }
+ },
+ MoreExecutors.directExecutor());
+ }
+
boolean isValid() {
return resourceNotFoundException.get() == null;
}
@@ -283,6 +379,10 @@ boolean isMultiplexedSessionsSupported() {
return !this.unimplemented.get();
}
+ boolean isMultiplexedSessionsForRWSupported() {
+ return !this.unimplementedForRW.get();
+ }
+
void close() {
synchronized (this) {
if (!this.isClosed) {
@@ -308,6 +408,17 @@ SessionReference getCurrentSessionReference() {
}
}
+ @VisibleForTesting
+ Transaction getReadWriteBeginTransactionReference() {
+ try {
+ return this.readWriteBeginTransactionReferenceFuture.get();
+ } catch (ExecutionException executionException) {
+ throw SpannerExceptionFactory.asSpannerException(executionException.getCause());
+ } catch (InterruptedException interruptedException) {
+ throw SpannerExceptionFactory.propagateInterrupt(interruptedException);
+ }
+ }
+
/**
* Returns true if the multiplexed session has been created. This client can be used before the
* session has been created, and will in that case use a delayed transaction that contains a
diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/ResumableStreamIterator.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/ResumableStreamIterator.java
index 3e82ab7d5ff..39165da2d38 100644
--- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/ResumableStreamIterator.java
+++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/ResumableStreamIterator.java
@@ -23,6 +23,7 @@
import com.google.api.client.util.BackOff;
import com.google.api.client.util.ExponentialBackOff;
+import com.google.api.core.InternalApi;
import com.google.api.gax.grpc.GrpcStatusCode;
import com.google.api.gax.retrying.RetrySettings;
import com.google.api.gax.rpc.StatusCode.Code;
@@ -58,6 +59,7 @@ abstract class ResumableStreamIterator extends AbstractIterator retryableCodes;
private static final Logger logger = Logger.getLogger(ResumableStreamIterator.class.getName());
@@ -196,7 +198,8 @@ public void execute(Runnable command) {
}
}
- abstract CloseableIterator startStream(@Nullable ByteString resumeToken);
+ abstract CloseableIterator startStream(
+ @Nullable ByteString resumeToken, AsyncResultSet.StreamMessageListener streamMessageListener);
/**
* Prepares the iterator for a retry on a different gRPC channel. Returns true if that is
@@ -220,23 +223,21 @@ public boolean isWithBeginTransaction() {
return stream != null && stream.isWithBeginTransaction();
}
+ @Override
+ @InternalApi
+ public boolean initiateStreaming(AsyncResultSet.StreamMessageListener streamMessageListener) {
+ this.streamMessageListener = streamMessageListener;
+ startGrpcStreaming();
+ return true;
+ }
+
@Override
protected PartialResultSet computeNext() {
int numAttemptsOnOtherChannel = 0;
Context context = Context.current();
while (true) {
// Eagerly start stream before consuming any buffered items.
- if (stream == null) {
- span.addAnnotation(
- "Starting/Resuming stream",
- "ResumeToken",
- resumeToken == null ? "null" : resumeToken.toStringUtf8());
- try (IScope scope = tracer.withSpan(span)) {
- // When start a new stream set the Span as current to make the gRPC Span a child of
- // this Span.
- stream = checkNotNull(startStream(resumeToken));
- }
- }
+ startGrpcStreaming();
// Buffer contains items up to a resume token or has reached capacity: flush.
if (!buffer.isEmpty()
&& (finished || !safeToRetry || !buffer.getLast().getResumeToken().isEmpty())) {
@@ -315,6 +316,20 @@ && prepareIteratorForRetryOnDifferentGrpcChannel()) {
}
}
+ private void startGrpcStreaming() {
+ if (stream == null) {
+ span.addAnnotation(
+ "Starting/Resuming stream",
+ "ResumeToken",
+ resumeToken == null ? "null" : resumeToken.toStringUtf8());
+ try (IScope scope = tracer.withSpan(span)) {
+ // When start a new stream set the Span as current to make the gRPC Span a child of
+ // this Span.
+ stream = checkNotNull(startStream(resumeToken, streamMessageListener));
+ }
+ }
+ }
+
boolean isRetryable(SpannerException spannerException) {
return spannerException.isRetryable()
|| retryableCodes.contains(
diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerCloudMonitoringExporter.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerCloudMonitoringExporter.java
index 51dc890902c..3577c9f7b45 100644
--- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerCloudMonitoringExporter.java
+++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerCloudMonitoringExporter.java
@@ -138,6 +138,14 @@ private CompletableResultCode exportSpannerClientMetrics(Collection
return CompletableResultCode.ofFailure();
}
+ // Verifies if metrics data has missing instance id.
+ if (spannerMetricData.stream()
+ .flatMap(metricData -> metricData.getData().getPoints().stream())
+ .anyMatch(pd -> SpannerCloudMonitoringExporterUtils.getInstanceId(pd) == null)) {
+ logger.log(Level.WARNING, "Metric data has missing instanceId. Skipping export.");
+ return CompletableResultCode.ofFailure();
+ }
+
List spannerTimeSeries;
try {
spannerTimeSeries =
@@ -166,7 +174,7 @@ public void onFailure(Throwable throwable) {
// TODO: Add the link of public documentation when available in the log message.
msg +=
String.format(
- " Need monitoring metric writer permission on project=%s.",
+ " Need monitoring metric writer permission on project=%s. Follow https://cloud.google.com/spanner/docs/view-manage-client-side-metrics#access-client-side-metrics to set up permissions",
projectName.getProject());
}
logger.log(Level.WARNING, msg, throwable);
diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerCloudMonitoringExporterUtils.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerCloudMonitoringExporterUtils.java
index a6d1e29d587..21fcba8194d 100644
--- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerCloudMonitoringExporterUtils.java
+++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerCloudMonitoringExporterUtils.java
@@ -23,6 +23,7 @@
import static com.google.api.MetricDescriptor.ValueType.DOUBLE;
import static com.google.api.MetricDescriptor.ValueType.INT64;
import static com.google.cloud.spanner.BuiltInMetricsConstant.GAX_METER_NAME;
+import static com.google.cloud.spanner.BuiltInMetricsConstant.INSTANCE_ID_KEY;
import static com.google.cloud.spanner.BuiltInMetricsConstant.PROJECT_ID_KEY;
import static com.google.cloud.spanner.BuiltInMetricsConstant.SPANNER_PROMOTED_RESOURCE_LABELS;
import static com.google.cloud.spanner.BuiltInMetricsConstant.SPANNER_RESOURCE_TYPE;
@@ -66,6 +67,10 @@ static String getProjectId(PointData pointData) {
return pointData.getAttributes().get(PROJECT_ID_KEY);
}
+ static String getInstanceId(PointData pointData) {
+ return pointData.getAttributes().get(INSTANCE_ID_KEY);
+ }
+
static List convertToSpannerTimeSeries(List collection) {
List allTimeSeries = new ArrayList<>();
diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/StreamingResultSet.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/StreamingResultSet.java
new file mode 100644
index 00000000000..47b10d852c6
--- /dev/null
+++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/StreamingResultSet.java
@@ -0,0 +1,31 @@
+/*
+ * Copyright 2024 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.google.cloud.spanner;
+
+import com.google.api.core.InternalApi;
+
+/** Streaming implementation of ResultSet that supports streaming of chunks */
+interface StreamingResultSet extends ResultSet {
+
+ /**
+ * Returns the {@link boolean} for this {@link ResultSet}. This method will be used by
+ * AsyncResultSet internally to initiate gRPC streaming. This method should not be called by the
+ * users.
+ */
+ @InternalApi
+ boolean initiateStreaming(AsyncResultSet.StreamMessageListener streamMessageListener);
+}
diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/StreamingUtil.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/StreamingUtil.java
new file mode 100644
index 00000000000..54496d39f96
--- /dev/null
+++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/StreamingUtil.java
@@ -0,0 +1,30 @@
+/*
+ * Copyright 2024 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.google.cloud.spanner;
+
+final class StreamingUtil {
+
+ private StreamingUtil() {}
+
+ static boolean initiateStreaming(
+ ResultSet resultSet, AsyncResultSet.StreamMessageListener streamMessageListener) {
+ if (resultSet instanceof StreamingResultSet) {
+ return ((StreamingResultSet) resultSet).initiateStreaming(streamMessageListener);
+ }
+ return false;
+ }
+}
diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionRunnerImpl.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionRunnerImpl.java
index 5eaa54cd050..9e9fe62304a 100644
--- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionRunnerImpl.java
+++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionRunnerImpl.java
@@ -409,7 +409,9 @@ ApiFuture commitAsync() {
}
builder.addAllMutations(mutationsProto);
finishOps.addListener(
- new CommitRunnable(res, finishOps, builder), MoreExecutors.directExecutor());
+ new CommitRunnable(
+ res, finishOps, builder, /* retryAttemptDueToCommitProtocolExtension = */ false),
+ MoreExecutors.directExecutor());
return res;
}
@@ -418,14 +420,17 @@ private final class CommitRunnable implements Runnable {
private final SettableApiFuture res;
private final ApiFuture prev;
private final CommitRequest.Builder requestBuilder;
+ private final boolean retryAttemptDueToCommitProtocolExtension;
CommitRunnable(
SettableApiFuture res,
ApiFuture prev,
- CommitRequest.Builder requestBuilder) {
+ CommitRequest.Builder requestBuilder,
+ boolean retryAttemptDueToCommitProtocolExtension) {
this.res = res;
this.prev = prev;
this.requestBuilder = requestBuilder;
+ this.retryAttemptDueToCommitProtocolExtension = retryAttemptDueToCommitProtocolExtension;
}
@Override
@@ -459,6 +464,13 @@ public void run() {
// Set the precommit token in the CommitRequest for multiplexed sessions.
requestBuilder.setPrecommitToken(getLatestPrecommitToken());
}
+ if (retryAttemptDueToCommitProtocolExtension) {
+ // When a retry occurs due to the commit protocol extension, clear all mutations because
+ // they were already buffered in SpanFE during the previous attempt.
+ requestBuilder.clearMutations();
+ span.addAnnotation(
+ "Retrying commit operation with a new precommit token obtained from the previous CommitResponse");
+ }
final CommitRequest commitRequest = requestBuilder.build();
span.addAnnotation("Starting Commit");
final ApiFuture commitFuture;
@@ -479,6 +491,29 @@ public void run() {
return;
}
com.google.spanner.v1.CommitResponse proto = commitFuture.get();
+
+ // If the CommitResponse includes a precommit token, the client will retry the
+ // commit RPC once with the new token and clear any existing mutations.
+ // This case is applicable only when the read-write transaction uses multiplexed
+ // session.
+ if (proto.hasPrecommitToken() && !retryAttemptDueToCommitProtocolExtension) {
+ // track the latest pre commit token
+ onPrecommitToken(proto.getPrecommitToken());
+ span.addAnnotation(
+ "Commit operation will be retried with new precommit token as the CommitResponse includes a MultiplexedSessionRetry field");
+ opSpan.end();
+
+ // Retry the commit RPC with the latest precommit token from CommitResponse.
+ new CommitRunnable(
+ res,
+ prev,
+ requestBuilder,
+ /* retryAttemptDueToCommitProtocolExtension = */ true)
+ .run();
+
+ // Exit to prevent further processing in this attempt.
+ return;
+ }
if (!proto.hasCommitTimestamp()) {
throw newSpannerException(
ErrorCode.INTERNAL, "Missing commitTimestamp:\n" + session.getName());
diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/TraceContextInterceptor.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/TraceContextInterceptor.java
index 3b46ba4f880..4280e310355 100644
--- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/TraceContextInterceptor.java
+++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/TraceContextInterceptor.java
@@ -33,11 +33,11 @@
* Spanner. This class takes reference from OpenTelemetry's JAVA instrumentation library for gRPC.
* https://github.com/open-telemetry/opentelemetry-java-instrumentation/blob/9ecf7965aa455d41ea8cc0761b6c6b6eeb106324/instrumentation/grpc-1.6/library/src/main/java/io/opentelemetry/instrumentation/grpc/v1_6/TracingClientInterceptor.java#L27
*/
-class TraceContextInterceptor implements ClientInterceptor {
+public class TraceContextInterceptor implements ClientInterceptor {
private final TextMapPropagator textMapPropagator;
- TraceContextInterceptor(OpenTelemetry openTelemetry) {
+ public TraceContextInterceptor(OpenTelemetry openTelemetry) {
this.textMapPropagator = openTelemetry.getPropagators().getTextMapPropagator();
}
diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/AsyncResultSetImplTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/AsyncResultSetImplTest.java
index 98497fbf140..0ba924ef740 100644
--- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/AsyncResultSetImplTest.java
+++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/AsyncResultSetImplTest.java
@@ -22,7 +22,9 @@
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertThrows;
+import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
import static org.mockito.Mockito.when;
import com.google.api.core.ApiFuture;
@@ -32,6 +34,9 @@
import com.google.cloud.spanner.AsyncResultSet.ReadyCallback;
import com.google.common.base.Function;
import com.google.common.collect.Range;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.Value;
+import com.google.spanner.v1.PartialResultSet;
import java.util.List;
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.CountDownLatch;
@@ -48,6 +53,7 @@
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
+import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
@@ -381,13 +387,20 @@ public Boolean answer(InvocationOnMock invocation) throws Throwable {
public void testCallbackIsNotCalledWhilePausedAndCanceled()
throws InterruptedException, ExecutionException {
Executor executor = Executors.newSingleThreadExecutor();
- ResultSet delegate = mock(ResultSet.class);
+ StreamingResultSet delegate = mock(StreamingResultSet.class);
final AtomicInteger callbackCounter = new AtomicInteger();
ApiFuture callbackResult;
try (AsyncResultSetImpl rs =
new AsyncResultSetImpl(simpleProvider, delegate, AsyncResultSetImpl.DEFAULT_BUFFER_SIZE)) {
+
+ when(delegate.initiateStreaming(any(AsyncResultSet.StreamMessageListener.class)))
+ .thenAnswer(
+ answer -> {
+ rs.onStreamMessage(PartialResultSet.newBuilder().build(), false);
+ return null;
+ });
callbackResult =
rs.setCallback(
executor,
@@ -498,4 +511,60 @@ public void callbackReturnsDoneBeforeEnd_shouldStopIteration() throws Exception
rs.getResult().get(10L, TimeUnit.SECONDS);
}
}
+
+ @Test
+ public void testOnStreamMessageWhenResumeTokenIsPresent() {
+ StreamingResultSet delegate = mock(StreamingResultSet.class);
+ try (AsyncResultSetImpl rs =
+ new AsyncResultSetImpl(mockedProvider, delegate, AsyncResultSetImpl.DEFAULT_BUFFER_SIZE)) {
+ // Marking Streaming as supported
+ Mockito.when(
+ delegate.initiateStreaming(Mockito.any(AsyncResultSet.StreamMessageListener.class)))
+ .thenReturn(true);
+
+ rs.setCallback(Executors.newSingleThreadExecutor(), ignored -> CallbackResponse.DONE);
+ rs.onStreamMessage(
+ PartialResultSet.newBuilder().addValues(Value.newBuilder().build()).build(), false);
+
+ rs.onStreamMessage(
+ PartialResultSet.newBuilder().setResumeToken(ByteString.copyFromUtf8("test")).build(),
+ false);
+ Mockito.verify(mockedProvider.getExecutor(), times(2)).execute(Mockito.any());
+ }
+ }
+
+ @Test
+ public void testOnStreamMessageWhenCurrentBufferSizeReachedPrefetchChunkSize() {
+ StreamingResultSet delegate = mock(StreamingResultSet.class);
+ try (AsyncResultSetImpl rs =
+ new AsyncResultSetImpl(mockedProvider, delegate, AsyncResultSetImpl.DEFAULT_BUFFER_SIZE)) {
+ // Marking Streaming as supported
+ Mockito.when(
+ delegate.initiateStreaming(Mockito.any(AsyncResultSet.StreamMessageListener.class)))
+ .thenReturn(true);
+
+ rs.setCallback(Executors.newSingleThreadExecutor(), ignored -> CallbackResponse.DONE);
+ rs.onStreamMessage(
+ PartialResultSet.newBuilder().addValues(Value.newBuilder().build()).build(), true);
+ Mockito.verify(mockedProvider.getExecutor(), times(2)).execute(Mockito.any());
+ }
+ }
+
+ @Test
+ public void testOnStreamMessageWhenAsyncResultIsCancelled() {
+ StreamingResultSet delegate = mock(StreamingResultSet.class);
+ try (AsyncResultSetImpl rs =
+ new AsyncResultSetImpl(mockedProvider, delegate, AsyncResultSetImpl.DEFAULT_BUFFER_SIZE)) {
+ // Marking Streaming as supported
+ Mockito.when(
+ delegate.initiateStreaming(Mockito.any(AsyncResultSet.StreamMessageListener.class)))
+ .thenReturn(true);
+
+ rs.setCallback(Executors.newSingleThreadExecutor(), ignored -> CallbackResponse.DONE);
+ rs.cancel();
+ rs.onStreamMessage(
+ PartialResultSet.newBuilder().addValues(Value.newBuilder().build()).build(), false);
+ Mockito.verify(mockedProvider.getExecutor(), times(2)).execute(Mockito.any());
+ }
+ }
}
diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MockSpannerServiceImpl.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MockSpannerServiceImpl.java
index 415bbe6e793..39f1ff180fa 100644
--- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MockSpannerServiceImpl.java
+++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MockSpannerServiceImpl.java
@@ -603,6 +603,7 @@ private static void checkStreamException(
private ConcurrentMap isPartitionedDmlTransaction =
new ConcurrentHashMap<>();
private ConcurrentMap abortedTransactions = new ConcurrentHashMap<>();
+ private ConcurrentMap commitRetryTransactions = new ConcurrentHashMap<>();
private final AtomicBoolean abortNextTransaction = new AtomicBoolean();
private final AtomicBoolean abortNextStatement = new AtomicBoolean();
private final AtomicBoolean ignoreNextInlineBeginRequest = new AtomicBoolean();
@@ -1872,7 +1873,17 @@ private Transaction getTemporaryTransactionOrNull(TransactionSelector tx) {
@Override
public void beginTransaction(
BeginTransactionRequest request, StreamObserver responseObserver) {
- requests.add(request);
+ // TODO: Remove once this is guaranteed to be available.
+ // Skip storing the explicit BeginTransactionRequest used to verify read-write transaction
+ // server availability on multiplexed sessions.
+ // This code will be removed once read-write multiplexed sessions are stable on the backend,
+ // hence the temporary trade-off.
+ if (!request
+ .getRequestOptions()
+ .getTransactionTag()
+ .equals("multiplexed-rw-background-begin-txn")) {
+ requests.add(request);
+ }
Preconditions.checkNotNull(request.getSession());
Session session = getSession(request.getSession());
if (session == null) {
@@ -2035,15 +2046,23 @@ public void commit(CommitRequest request, StreamObserver respons
return;
}
simulateAbort(session, request.getTransactionId());
- commitTransaction(transaction.getId());
- CommitResponse.Builder responseBuilder =
- CommitResponse.newBuilder().setCommitTimestamp(getCurrentGoogleTimestamp());
- if (request.getReturnCommitStats()) {
- responseBuilder.setCommitStats(
- com.google.spanner.v1.CommitResponse.CommitStats.newBuilder()
- // This is not really always equal, but at least it returns a value.
- .setMutationCount(request.getMutationsCount())
- .build());
+ CommitResponse.Builder responseBuilder = CommitResponse.newBuilder();
+ Optional commitRetry =
+ Optional.fromNullable(commitRetryTransactions.get(request.getTransactionId()));
+ if (commitRetry.or(Boolean.FALSE) && session.getMultiplexed()) {
+ responseBuilder.setPrecommitToken(
+ getCommitResponsePrecommitToken(request.getTransactionId()));
+ commitRetryTransactions.remove(request.getTransactionId());
+ } else {
+ commitTransaction(transaction.getId());
+ responseBuilder.setCommitTimestamp(getCurrentGoogleTimestamp());
+ if (request.getReturnCommitStats()) {
+ responseBuilder.setCommitStats(
+ com.google.spanner.v1.CommitResponse.CommitStats.newBuilder()
+ // This is not really always equal, but at least it returns a value.
+ .setMutationCount(request.getMutationsCount())
+ .build());
+ }
}
responseObserver.onNext(responseBuilder.build());
responseObserver.onCompleted();
@@ -2124,6 +2143,14 @@ void markAbortedTransaction(ByteString transactionId) {
transactionSequenceNo.remove(transactionId);
}
+ public void markCommitRetryOnTransaction(ByteString transactionId) {
+ Transaction transaction = transactions.get(transactionId);
+ if (transaction == null || !isReadWriteTransaction(transactionId)) {
+ return;
+ }
+ commitRetryTransactions.putIfAbsent(transactionId, Boolean.TRUE);
+ }
+
@Override
public void partitionQuery(
PartitionQueryRequest request, StreamObserver responseObserver) {
@@ -2517,6 +2544,11 @@ static MultiplexedSessionPrecommitToken getExecuteBatchDmlResponsePrecommitToken
return getPrecommitToken("ExecuteBatchDmlResponsePrecommitToken", transactionId);
}
+ static MultiplexedSessionPrecommitToken getCommitResponsePrecommitToken(
+ ByteString transactionId) {
+ return getPrecommitToken("CommitResponsePrecommitToken", transactionId);
+ }
+
static MultiplexedSessionPrecommitToken getPrecommitToken(
String value, ByteString transactionId) {
transactionSequenceNo.putIfAbsent(transactionId, new AtomicInteger(0));
diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClientMockServerTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClientMockServerTest.java
index 36c527f3842..4dc1da62e7b 100644
--- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClientMockServerTest.java
+++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClientMockServerTest.java
@@ -50,6 +50,7 @@
import com.google.spanner.v1.ExecuteSqlRequest;
import com.google.spanner.v1.RequestOptions.Priority;
import com.google.spanner.v1.Session;
+import com.google.spanner.v1.Transaction;
import io.grpc.Status;
import java.time.Duration;
import java.util.Collections;
@@ -1121,7 +1122,7 @@ public void testPrecommitTokenForTransactionResponse() {
// Verify that for mutation only case, a mutation key is set in BeginTransactionRequest.
List beginTxnRequest =
mockSpanner.getRequestsOfType(BeginTransactionRequest.class);
- assertEquals(1L, beginTxnRequest.size());
+ assertEquals(1, beginTxnRequest.size());
assertTrue(mockSpanner.getSession(beginTxnRequest.get(0).getSession()).getMultiplexed());
assertTrue(beginTxnRequest.get(0).hasMutationKey());
assertTrue(beginTxnRequest.get(0).getMutationKey().hasInsert());
@@ -1161,7 +1162,7 @@ public void testMutationOnlyCaseAborted() {
// Verify that for mutation only case, a mutation key is set in BeginTransactionRequest.
List beginTransactionRequests =
mockSpanner.getRequestsOfType(BeginTransactionRequest.class);
- assertEquals(2L, beginTransactionRequests.size());
+ assertEquals(2, beginTransactionRequests.size());
// Verify the requests are executed using multiplexed sessions
for (BeginTransactionRequest request : beginTransactionRequests) {
assertTrue(mockSpanner.getSession(request.getSession()).getMultiplexed());
@@ -1290,6 +1291,350 @@ public void testMutationOnlyUsingAsyncTransactionManager() {
request.getPrecommitToken().getPrecommitToken());
}
+ // Tests the behavior of the server-side kill switch for read-write multiplexed sessions..
+ @Test
+ public void testInitialBeginTransactionWithRW_receivesUnimplemented_fallsBackToRegularSession() {
+ mockSpanner.setBeginTransactionExecutionTime(
+ SimulatedExecutionTime.ofException(
+ Status.UNIMPLEMENTED
+ .withDescription(
+ "Transaction type read_write not supported with multiplexed sessions")
+ .asRuntimeException()));
+ DatabaseClientImpl client =
+ (DatabaseClientImpl) spanner.getDatabaseClient(DatabaseId.of("p", "i", "d"));
+
+ assertNotNull(client.multiplexedSessionDatabaseClient);
+
+ // Wait until the client sees that MultiplexedSessions are not supported for read-write.
+ // Get the begin transaction reference. This will block until the BeginTransaction RPC with
+ // read-write has failed.
+ SpannerException spannerException =
+ assertThrows(
+ SpannerException.class,
+ client.multiplexedSessionDatabaseClient::getReadWriteBeginTransactionReference);
+ assertEquals(ErrorCode.UNIMPLEMENTED, spannerException.getErrorCode());
+ assertTrue(client.multiplexedSessionDatabaseClient.unimplementedForRW.get());
+
+ // read-write transaction should fallback to regular sessions
+ client
+ .readWriteTransaction()
+ .run(
+ transaction -> {
+ try (ResultSet resultSet = transaction.executeQuery(STATEMENT)) {
+ //noinspection StatementWithEmptyBody
+ while (resultSet.next()) {
+ // ignore
+ }
+ }
+ return null;
+ });
+
+ // Verify that we received one ExecuteSqlRequest, and it uses a regular session due to fallback.
+ List executeSqlRequests =
+ mockSpanner.getRequestsOfType(ExecuteSqlRequest.class);
+ assertEquals(1, executeSqlRequests.size());
+ // Verify the requests are not executed using multiplexed sessions
+ Session session2 = mockSpanner.getSession(executeSqlRequests.get(0).getSession());
+ assertNotNull(session2);
+ assertFalse(session2.getMultiplexed());
+ }
+
+ @Test
+ public void
+ testReadWriteUnimplementedErrorDuringInitialBeginTransactionRPC_firstReceivesError_secondFallsBackToRegularSessions() {
+ // This test simulates the following scenario,
+ // 1. The server-side flag for RW multiplexed sessions is disabled.
+ // 2. Application starts. The initial BeginTransaction RPC during client initialization will
+ // fail with UNIMPLEMENTED error.
+ // 3. Read-write transaction initialized before the BeginTransaction RPC response will fail with
+ // UNIMPLEMENTED error.
+ // 4. Read-write transaction initialized after the BeginTransaction RPC response will fallback
+ // to regular sessions.
+ mockSpanner.setBeginTransactionExecutionTime(
+ SimulatedExecutionTime.ofException(
+ Status.UNIMPLEMENTED
+ .withDescription(
+ "Transaction type read_write not supported with multiplexed sessions")
+ .asRuntimeException()));
+ mockSpanner.setExecuteStreamingSqlExecutionTime(
+ SimulatedExecutionTime.ofException(
+ Status.UNIMPLEMENTED
+ .withDescription(
+ "Transaction type read_write not supported with multiplexed sessions")
+ .asRuntimeException()));
+ // Freeze the mock server to ensure that the BeginTransaction with read-write on multiplexed
+ // session RPC does not return an error or any
+ // other result just yet.
+ mockSpanner.freeze();
+ // Get a database client using multiplexed sessions. The BeginTransaction RPC to validation
+ // read-write on multiplexed session will be blocked as
+ // long as the mock server is frozen.
+ DatabaseClientImpl client =
+ (DatabaseClientImpl) spanner.getDatabaseClient(DatabaseId.of("p", "i", "d"));
+
+ // Get the runner so that the read-write transaction is executed via multiplexed session.
+ TransactionRunner runner = client.readWriteTransaction();
+
+ // Unfreeze the mock server to get the error from the backend. The above read-write transaction
+ // will then fail.
+ mockSpanner.unfreeze();
+
+ SpannerException e =
+ assertThrows(
+ SpannerException.class,
+ () ->
+ runner.run(
+ transaction -> {
+ ResultSet resultSet = transaction.executeQuery(STATEMENT);
+ //noinspection StatementWithEmptyBody
+ while (resultSet.next()) {
+ // ignore
+ }
+ return null;
+ }));
+ assertEquals(ErrorCode.UNIMPLEMENTED, e.getErrorCode());
+
+ // Wait until the client sees that MultiplexedSessions are not supported for read-write.
+ assertNotNull(client.multiplexedSessionDatabaseClient);
+ SpannerException spannerException =
+ assertThrows(
+ SpannerException.class,
+ client.multiplexedSessionDatabaseClient::getReadWriteBeginTransactionReference);
+ assertEquals(ErrorCode.UNIMPLEMENTED, spannerException.getErrorCode());
+ assertTrue(client.multiplexedSessionDatabaseClient.unimplementedForRW.get());
+
+ // The next read-write transaction will fall back to regular sessions and succeed.
+ client
+ .readWriteTransaction()
+ .run(
+ transaction -> {
+ try (ResultSet resultSet = transaction.executeQuery(STATEMENT)) {
+ //noinspection StatementWithEmptyBody
+ while (resultSet.next()) {
+ // ignore
+ }
+ }
+ return null;
+ });
+
+ // Verify that two ExecuteSqlRequests were received: the first using a multiplexed session and
+ // the second using a regular session.
+ assertEquals(2, mockSpanner.countRequestsOfType(ExecuteSqlRequest.class));
+ List requests = mockSpanner.getRequestsOfType(ExecuteSqlRequest.class);
+
+ Session session1 = mockSpanner.getSession(requests.get(0).getSession());
+ assertNotNull(session1);
+ assertTrue(session1.getMultiplexed());
+
+ Session session2 = mockSpanner.getSession(requests.get(1).getSession());
+ assertNotNull(session2);
+ assertFalse(session2.getMultiplexed());
+
+ assertNotNull(client.multiplexedSessionDatabaseClient);
+ assertEquals(1L, client.multiplexedSessionDatabaseClient.getNumSessionsAcquired().get());
+ assertEquals(1L, client.multiplexedSessionDatabaseClient.getNumSessionsReleased().get());
+ }
+
+ @Test
+ public void testReadWriteUnimplemented_firstReceivesError_secondFallsBackToRegularSessions() {
+ // This test simulates the following scenario,
+ // 1. The server side flag for read-write multiplexed session is not disabled. When an
+ // application starts, the initial BeginTransaction RPC with read-write will succeed.
+ // 2. After time t, the server side flag for read-write multiplexed session is disabled. After
+ // this a read-write transaction executed with multiplexed sessions should fail with
+ // UNIMPLEMENTED error.
+ // 3. All read-write transactions in the application after the initial failure should fallback
+ // to using regular sessions.
+ mockSpanner.setExecuteStreamingSqlExecutionTime(
+ SimulatedExecutionTime.ofException(
+ Status.UNIMPLEMENTED
+ .withDescription(
+ "Transaction type read_write not supported with multiplexed sessions")
+ .asRuntimeException()));
+
+ DatabaseClientImpl client =
+ (DatabaseClientImpl) spanner.getDatabaseClient(DatabaseId.of("p", "i", "d"));
+
+ // Wait until the initial BeginTransaction RPC with read-write is complete.
+ assertNotNull(client.multiplexedSessionDatabaseClient);
+ Transaction txn =
+ client.multiplexedSessionDatabaseClient.getReadWriteBeginTransactionReference();
+ assertNotNull(txn);
+ assertNotNull(txn.getId());
+ assertFalse(client.multiplexedSessionDatabaseClient.unimplementedForRW.get());
+
+ SpannerException e =
+ assertThrows(
+ SpannerException.class,
+ () ->
+ client
+ .readWriteTransaction()
+ .run(
+ transaction -> {
+ ResultSet resultSet = transaction.executeQuery(STATEMENT);
+ //noinspection StatementWithEmptyBody
+ while (resultSet.next()) {
+ // ignore
+ }
+ return null;
+ }));
+ assertEquals(ErrorCode.UNIMPLEMENTED, e.getErrorCode());
+
+ // Verify that the previous failed transaction has marked multiplexed session client to be
+ // unimplemented for read-write.
+ assertTrue(client.multiplexedSessionDatabaseClient.unimplementedForRW.get());
+
+ // The next read-write transaction will fall back to regular sessions and succeed.
+ client
+ .readWriteTransaction()
+ .run(
+ transaction -> {
+ try (ResultSet resultSet = transaction.executeQuery(STATEMENT)) {
+ //noinspection StatementWithEmptyBody
+ while (resultSet.next()) {
+ // ignore
+ }
+ }
+ return null;
+ });
+
+ // Verify that two ExecuteSqlRequests were received: the first using a multiplexed session and
+ // the second using a regular session.
+ assertEquals(2, mockSpanner.countRequestsOfType(ExecuteSqlRequest.class));
+ List requests = mockSpanner.getRequestsOfType(ExecuteSqlRequest.class);
+
+ Session session1 = mockSpanner.getSession(requests.get(0).getSession());
+ assertNotNull(session1);
+ assertTrue(session1.getMultiplexed());
+
+ Session session2 = mockSpanner.getSession(requests.get(1).getSession());
+ assertNotNull(session2);
+ assertFalse(session2.getMultiplexed());
+
+ assertNotNull(client.multiplexedSessionDatabaseClient);
+ assertEquals(1L, client.multiplexedSessionDatabaseClient.getNumSessionsAcquired().get());
+ assertEquals(1L, client.multiplexedSessionDatabaseClient.getNumSessionsReleased().get());
+ }
+
+ @Test
+ public void testOtherUnimplementedError_ReadWriteTransactionStillUsesMultiplexedSession() {
+ mockSpanner.setExecuteStreamingSqlExecutionTime(
+ SimulatedExecutionTime.ofException(
+ Status.UNIMPLEMENTED
+ .withDescription("Multiplexed sessions are not supported.")
+ .asRuntimeException()));
+
+ DatabaseClientImpl client =
+ (DatabaseClientImpl) spanner.getDatabaseClient(DatabaseId.of("p", "i", "d"));
+
+ // Wait until the initial BeginTransaction RPC with read-write is complete.
+ assertNotNull(client.multiplexedSessionDatabaseClient);
+ Transaction txn =
+ client.multiplexedSessionDatabaseClient.getReadWriteBeginTransactionReference();
+ assertNotNull(txn);
+ assertNotNull(txn.getId());
+ assertFalse(client.multiplexedSessionDatabaseClient.unimplementedForRW.get());
+
+ // Try to execute a query using single use transaction.
+ try (ResultSet resultSet = client.singleUse().executeQuery(STATEMENT)) {
+ SpannerException spannerException = assertThrows(SpannerException.class, resultSet::next);
+ assertEquals(ErrorCode.UNIMPLEMENTED, spannerException.getErrorCode());
+ }
+ // Verify other UNIMPLEMENTED errors does not turn off read-write transactions to use
+ // multiplexed sessions.
+ assertFalse(client.multiplexedSessionDatabaseClient.unimplementedForRW.get());
+
+ // The read-write transaction should use multiplexed sessions and succeed.
+ client
+ .readWriteTransaction()
+ .run(
+ transaction -> {
+ // Returns a ResultSet containing the precommit token (ResultSetPrecommitToken)
+ transaction.executeUpdate(UPDATE_STATEMENT);
+
+ // Verify that a precommit token is received. This guarantees that the read-write
+ // transaction was executed on a multiplexed session.
+ TransactionContextImpl impl = (TransactionContextImpl) transaction;
+ assertNotNull(impl.getLatestPrecommitToken());
+ assertEquals(
+ ByteString.copyFromUtf8("ResultSetPrecommitToken"),
+ impl.getLatestPrecommitToken().getPrecommitToken());
+ return null;
+ });
+
+ // Verify that two ExecuteSqlRequests were received and second one uses a multiplexed session.
+ assertEquals(2, mockSpanner.countRequestsOfType(ExecuteSqlRequest.class));
+ List requests = mockSpanner.getRequestsOfType(ExecuteSqlRequest.class);
+
+ Session session2 = mockSpanner.getSession(requests.get(1).getSession());
+ assertNotNull(session2);
+ assertTrue(session2.getMultiplexed());
+
+ assertNotNull(client.multiplexedSessionDatabaseClient);
+ assertEquals(2L, client.multiplexedSessionDatabaseClient.getNumSessionsAcquired().get());
+ assertEquals(2L, client.multiplexedSessionDatabaseClient.getNumSessionsReleased().get());
+ }
+
+ @Test
+ public void testReadWriteTransactionWithCommitRetryProtocolExtensionSet() {
+ // This test simulates the commit retry protocol extension which occurs when a read-write
+ // transaction contains read/query + mutation operations.
+ DatabaseClientImpl client =
+ (DatabaseClientImpl) spanner.getDatabaseClient(DatabaseId.of("p", "i", "d"));
+
+ client
+ .readWriteTransaction()
+ .run(
+ transaction -> {
+ try (ResultSet resultSet = transaction.executeQuery(STATEMENT)) {
+ //noinspection StatementWithEmptyBody
+ while (resultSet.next()) {
+ // ignore
+ }
+ }
+
+ Mutation mutation =
+ Mutation.newInsertBuilder("FOO").set("ID").to(1L).set("NAME").to("Bar").build();
+ transaction.buffer(mutation);
+
+ TransactionContextImpl impl = (TransactionContextImpl) transaction;
+ // Force the Commit RPC to return a CommitResponse with MultiplexedSessionRetry field
+ // set.
+ // This scenario is only possible when a read-write transaction contains read/query +
+ // mutation operations.
+ mockSpanner.markCommitRetryOnTransaction(impl.transactionId);
+ return null;
+ });
+
+ List executeSqlRequests =
+ mockSpanner.getRequestsOfType(ExecuteSqlRequest.class);
+ assertEquals(1, executeSqlRequests.size());
+ // Verify the request is executed using multiplexed sessions
+ assertTrue(mockSpanner.getSession(executeSqlRequests.get(0).getSession()).getMultiplexed());
+
+ List commitRequests = mockSpanner.getRequestsOfType(CommitRequest.class);
+ assertEquals(2, commitRequests.size());
+ assertNotNull(commitRequests.get(0).getPrecommitToken());
+ assertEquals(
+ ByteString.copyFromUtf8("PartialResultSetPrecommitToken"),
+ commitRequests.get(0).getPrecommitToken().getPrecommitToken());
+ // Verify that the first request has mutations set
+ assertTrue(commitRequests.get(0).getMutationsCount() > 0);
+
+ // Second CommitRequest should contain the latest precommit token received via the
+ // CommitResponse in previous attempt.
+ assertNotNull(commitRequests.get(1).getPrecommitToken());
+ assertEquals(
+ ByteString.copyFromUtf8("CommitResponsePrecommitToken"),
+ commitRequests.get(1).getPrecommitToken().getPrecommitToken());
+ // Verify that the commit retry request does not have any mutations set
+ assertEquals(0, commitRequests.get(1).getMutationsCount());
+
+ assertNotNull(client.multiplexedSessionDatabaseClient);
+ assertEquals(1L, client.multiplexedSessionDatabaseClient.getNumSessionsAcquired().get());
+ assertEquals(1L, client.multiplexedSessionDatabaseClient.getNumSessionsReleased().get());
+ }
+
private void waitForSessionToBeReplaced(DatabaseClientImpl client) {
assertNotNull(client.multiplexedSessionDatabaseClient);
SessionReference sessionReference =
diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/ResumableStreamIteratorTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/ResumableStreamIteratorTest.java
index d126719ebb8..ebe86724678 100644
--- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/ResumableStreamIteratorTest.java
+++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/ResumableStreamIteratorTest.java
@@ -64,7 +64,8 @@
public class ResumableStreamIteratorTest {
interface Starter {
AbstractResultSet.CloseableIterator startStream(
- @Nullable ByteString resumeToken);
+ @Nullable ByteString resumeToken,
+ AsyncResultSet.StreamMessageListener streamMessageListener);
}
interface ResultSetStream {
@@ -164,8 +165,9 @@ private void initWithLimit(int maxBufferSize) {
SpannerStubSettings.newBuilder().executeStreamingSqlSettings().getRetryableCodes()) {
@Override
AbstractResultSet.CloseableIterator startStream(
- @Nullable ByteString resumeToken) {
- return starter.startStream(resumeToken);
+ @Nullable ByteString resumeToken,
+ AsyncResultSet.StreamMessageListener streamMessageListener) {
+ return starter.startStream(resumeToken, null);
}
};
}
@@ -173,7 +175,7 @@ AbstractResultSet.CloseableIterator startStream(
@Test
public void simple() {
ResultSetStream s1 = Mockito.mock(ResultSetStream.class);
- Mockito.when(starter.startStream(null)).thenReturn(new ResultSetIterator(s1));
+ Mockito.when(starter.startStream(null, null)).thenReturn(new ResultSetIterator(s1));
Mockito.when(s1.next())
.thenReturn(resultSet(null, "a"))
.thenReturn(resultSet(null, "b"))
@@ -195,7 +197,7 @@ public void closedOTSpan() {
setInternalState(ResumableStreamIterator.class, this.resumableStreamIterator, "span", span);
ResultSetStream s1 = Mockito.mock(ResultSetStream.class);
- Mockito.when(starter.startStream(null)).thenReturn(new ResultSetIterator(s1));
+ Mockito.when(starter.startStream(null, null)).thenReturn(new ResultSetIterator(s1));
Mockito.when(s1.next())
.thenReturn(resultSet(ByteString.copyFromUtf8("r1"), "a"))
.thenReturn(resultSet(ByteString.copyFromUtf8("r2"), "b"))
@@ -218,7 +220,7 @@ public void closedOCSpan() {
setInternalState(ResumableStreamIterator.class, this.resumableStreamIterator, "span", span);
ResultSetStream s1 = Mockito.mock(ResultSetStream.class);
- Mockito.when(starter.startStream(null)).thenReturn(new ResultSetIterator(s1));
+ Mockito.when(starter.startStream(null, null)).thenReturn(new ResultSetIterator(s1));
Mockito.when(s1.next())
.thenReturn(resultSet(ByteString.copyFromUtf8("r1"), "a"))
.thenReturn(resultSet(ByteString.copyFromUtf8("r2"), "b"))
@@ -232,14 +234,14 @@ public void closedOCSpan() {
@Test
public void restart() {
ResultSetStream s1 = Mockito.mock(ResultSetStream.class);
- Mockito.when(starter.startStream(null)).thenReturn(new ResultSetIterator(s1));
+ Mockito.when(starter.startStream(null, null)).thenReturn(new ResultSetIterator(s1));
Mockito.when(s1.next())
.thenReturn(resultSet(ByteString.copyFromUtf8("r1"), "a"))
.thenReturn(resultSet(ByteString.copyFromUtf8("r2"), "b"))
.thenThrow(new RetryableException(errorCodeParameter, "failed by test"));
ResultSetStream s2 = Mockito.mock(ResultSetStream.class);
- Mockito.when(starter.startStream(ByteString.copyFromUtf8("r2")))
+ Mockito.when(starter.startStream(ByteString.copyFromUtf8("r2"), null))
.thenReturn(new ResultSetIterator(s2));
Mockito.when(s2.next())
.thenReturn(resultSet(ByteString.copyFromUtf8("r3"), "c"))
@@ -251,7 +253,7 @@ public void restart() {
@Test
public void restartWithHoldBack() {
ResultSetStream s1 = Mockito.mock(ResultSetStream.class);
- Mockito.when(starter.startStream(null)).thenReturn(new ResultSetIterator(s1));
+ Mockito.when(starter.startStream(null, null)).thenReturn(new ResultSetIterator(s1));
Mockito.when(s1.next())
.thenReturn(resultSet(ByteString.copyFromUtf8("r1"), "a"))
.thenReturn(resultSet(ByteString.copyFromUtf8("r2"), "b"))
@@ -260,7 +262,7 @@ public void restartWithHoldBack() {
.thenThrow(new RetryableException(errorCodeParameter, "failed by test"));
ResultSetStream s2 = Mockito.mock(ResultSetStream.class);
- Mockito.when(starter.startStream(ByteString.copyFromUtf8("r2")))
+ Mockito.when(starter.startStream(ByteString.copyFromUtf8("r2"), null))
.thenReturn(new ResultSetIterator(s2));
Mockito.when(s2.next())
.thenReturn(resultSet(ByteString.copyFromUtf8("r3"), "c"))
@@ -272,7 +274,7 @@ public void restartWithHoldBack() {
@Test
public void restartWithHoldBackMidStream() {
ResultSetStream s1 = Mockito.mock(ResultSetStream.class);
- Mockito.when(starter.startStream(null)).thenReturn(new ResultSetIterator(s1));
+ Mockito.when(starter.startStream(null, null)).thenReturn(new ResultSetIterator(s1));
Mockito.when(s1.next())
.thenReturn(resultSet(ByteString.copyFromUtf8("r1"), "a"))
.thenReturn(resultSet(null, "b"))
@@ -281,7 +283,7 @@ public void restartWithHoldBackMidStream() {
.thenThrow(new RetryableException(errorCodeParameter, "failed by test"));
ResultSetStream s2 = Mockito.mock(ResultSetStream.class);
- Mockito.when(starter.startStream(ByteString.copyFromUtf8("r2")))
+ Mockito.when(starter.startStream(ByteString.copyFromUtf8("r2"), null))
.thenReturn(new ResultSetIterator(s2));
Mockito.when(s2.next())
.thenReturn(resultSet(ByteString.copyFromUtf8("r3"), "e"))
@@ -304,7 +306,7 @@ public void retryableErrorWithoutRetryInfo() throws IOException {
ResumableStreamIterator.class, this.resumableStreamIterator, "backOff", backOff);
ResultSetStream s1 = Mockito.mock(ResultSetStream.class);
- Mockito.when(starter.startStream(null)).thenReturn(new ResultSetIterator(s1));
+ Mockito.when(starter.startStream(null, null)).thenReturn(new ResultSetIterator(s1));
Mockito.when(s1.next())
.thenReturn(resultSet(ByteString.copyFromUtf8("r1"), "a"))
.thenThrow(
@@ -312,7 +314,7 @@ public void retryableErrorWithoutRetryInfo() throws IOException {
ErrorCode.UNAVAILABLE, "failed by test", Status.UNAVAILABLE.asRuntimeException()));
ResultSetStream s2 = Mockito.mock(ResultSetStream.class);
- Mockito.when(starter.startStream(ByteString.copyFromUtf8("r1")))
+ Mockito.when(starter.startStream(ByteString.copyFromUtf8("r1"), null))
.thenReturn(new ResultSetIterator(s2));
Mockito.when(s2.next())
.thenReturn(resultSet(ByteString.copyFromUtf8("r2"), "b"))
@@ -324,7 +326,7 @@ public void retryableErrorWithoutRetryInfo() throws IOException {
@Test
public void nonRetryableError() {
ResultSetStream s1 = Mockito.mock(ResultSetStream.class);
- Mockito.when(starter.startStream(null)).thenReturn(new ResultSetIterator(s1));
+ Mockito.when(starter.startStream(null, null)).thenReturn(new ResultSetIterator(s1));
Mockito.when(s1.next())
.thenReturn(resultSet(ByteString.copyFromUtf8("r1"), "a"))
.thenReturn(resultSet(ByteString.copyFromUtf8("r2"), "b"))
@@ -343,7 +345,7 @@ public void bufferLimitSimple() {
initWithLimit(1);
ResultSetStream s1 = Mockito.mock(ResultSetStream.class);
- Mockito.when(starter.startStream(null)).thenReturn(new ResultSetIterator(s1));
+ Mockito.when(starter.startStream(null, null)).thenReturn(new ResultSetIterator(s1));
Mockito.when(s1.next())
.thenReturn(resultSet(null, "a"))
.thenReturn(resultSet(null, "b"))
@@ -356,7 +358,7 @@ public void bufferLimitSimpleWithRestartTokens() {
initWithLimit(1);
ResultSetStream s1 = Mockito.mock(ResultSetStream.class);
- Mockito.when(starter.startStream(null)).thenReturn(new ResultSetIterator(s1));
+ Mockito.when(starter.startStream(null, null)).thenReturn(new ResultSetIterator(s1));
Mockito.when(s1.next())
.thenReturn(resultSet(ByteString.copyFromUtf8("r1"), "a"))
.thenReturn(resultSet(ByteString.copyFromUtf8("r2"), "b"))
@@ -369,14 +371,14 @@ public void bufferLimitRestart() {
initWithLimit(1);
ResultSetStream s1 = Mockito.mock(ResultSetStream.class);
- Mockito.when(starter.startStream(null)).thenReturn(new ResultSetIterator(s1));
+ Mockito.when(starter.startStream(null, null)).thenReturn(new ResultSetIterator(s1));
Mockito.when(s1.next())
.thenReturn(resultSet(ByteString.copyFromUtf8("r1"), "a"))
.thenReturn(resultSet(ByteString.copyFromUtf8("r2"), "b"))
.thenThrow(new RetryableException(errorCodeParameter, "failed by test"));
ResultSetStream s2 = Mockito.mock(ResultSetStream.class);
- Mockito.when(starter.startStream(ByteString.copyFromUtf8("r2")))
+ Mockito.when(starter.startStream(ByteString.copyFromUtf8("r2"), null))
.thenReturn(new ResultSetIterator(s2));
Mockito.when(s2.next())
.thenReturn(resultSet(ByteString.copyFromUtf8("r3"), "c"))
@@ -390,13 +392,13 @@ public void bufferLimitRestartWithinLimitAtStartOfResults() {
initWithLimit(1);
ResultSetStream s1 = Mockito.mock(ResultSetStream.class);
- Mockito.when(starter.startStream(null)).thenReturn(new ResultSetIterator(s1));
+ Mockito.when(starter.startStream(null, null)).thenReturn(new ResultSetIterator(s1));
Mockito.when(s1.next())
.thenReturn(resultSet(null, "XXXXXX"))
.thenThrow(new RetryableException(errorCodeParameter, "failed by test"));
ResultSetStream s2 = Mockito.mock(ResultSetStream.class);
- Mockito.when(starter.startStream(null)).thenReturn(new ResultSetIterator(s2));
+ Mockito.when(starter.startStream(null, null)).thenReturn(new ResultSetIterator(s2));
Mockito.when(s2.next())
.thenReturn(resultSet(null, "a"))
.thenReturn(resultSet(null, "b"))
@@ -409,14 +411,14 @@ public void bufferLimitRestartWithinLimitMidResults() {
initWithLimit(1);
ResultSetStream s1 = Mockito.mock(ResultSetStream.class);
- Mockito.when(starter.startStream(null)).thenReturn(new ResultSetIterator(s1));
+ Mockito.when(starter.startStream(null, null)).thenReturn(new ResultSetIterator(s1));
Mockito.when(s1.next())
.thenReturn(resultSet(ByteString.copyFromUtf8("r1"), "a"))
.thenReturn(resultSet(null, "XXXXXX"))
.thenThrow(new RetryableException(errorCodeParameter, "failed by test"));
ResultSetStream s2 = Mockito.mock(ResultSetStream.class);
- Mockito.when(starter.startStream(ByteString.copyFromUtf8("r1")))
+ Mockito.when(starter.startStream(ByteString.copyFromUtf8("r1"), null))
.thenReturn(new ResultSetIterator(s2));
Mockito.when(s2.next())
.thenReturn(resultSet(null, "b"))
@@ -430,7 +432,7 @@ public void bufferLimitMissingTokensUnsafeToRetry() {
initWithLimit(1);
ResultSetStream s1 = Mockito.mock(ResultSetStream.class);
- Mockito.when(starter.startStream(null)).thenReturn(new ResultSetIterator(s1));
+ Mockito.when(starter.startStream(null, null)).thenReturn(new ResultSetIterator(s1));
Mockito.when(s1.next())
.thenReturn(resultSet(ByteString.copyFromUtf8("r1"), "a"))
.thenReturn(resultSet(null, "b"))
@@ -447,7 +449,7 @@ public void bufferLimitMissingTokensSafeToRetry() {
initWithLimit(1);
ResultSetStream s1 = Mockito.mock(ResultSetStream.class);
- Mockito.when(starter.startStream(null)).thenReturn(new ResultSetIterator(s1));
+ Mockito.when(starter.startStream(null, null)).thenReturn(new ResultSetIterator(s1));
Mockito.when(s1.next())
.thenReturn(resultSet(ByteString.copyFromUtf8("r1"), "a"))
.thenReturn(resultSet(null, "b"))
@@ -455,7 +457,7 @@ public void bufferLimitMissingTokensSafeToRetry() {
.thenThrow(new RetryableException(errorCodeParameter, "failed by test"));
ResultSetStream s2 = Mockito.mock(ResultSetStream.class);
- Mockito.when(starter.startStream(ByteString.copyFromUtf8("r3")))
+ Mockito.when(starter.startStream(ByteString.copyFromUtf8("r3"), null))
.thenReturn(new ResultSetIterator(s2));
Mockito.when(s2.next()).thenReturn(resultSet(null, "d")).thenReturn(null);
diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SpannerCloudMonitoringExporterTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SpannerCloudMonitoringExporterTest.java
index db245d3af81..acb7ae9fa1e 100644
--- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SpannerCloudMonitoringExporterTest.java
+++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SpannerCloudMonitoringExporterTest.java
@@ -16,6 +16,7 @@
package com.google.cloud.spanner;
+import static com.google.cloud.spanner.BuiltInMetricsConstant.ATTEMPT_COUNT_NAME;
import static com.google.cloud.spanner.BuiltInMetricsConstant.CLIENT_HASH_KEY;
import static com.google.cloud.spanner.BuiltInMetricsConstant.CLIENT_NAME_KEY;
import static com.google.cloud.spanner.BuiltInMetricsConstant.CLIENT_UID_KEY;
@@ -44,6 +45,7 @@
import com.google.monitoring.v3.TimeSeries;
import com.google.protobuf.Empty;
import io.opentelemetry.api.common.Attributes;
+import io.opentelemetry.sdk.common.CompletableResultCode;
import io.opentelemetry.sdk.common.InstrumentationScopeInfo;
import io.opentelemetry.sdk.metrics.InstrumentType;
import io.opentelemetry.sdk.metrics.data.AggregationTemporality;
@@ -340,6 +342,55 @@ public void getAggregationTemporality() throws IOException {
.isEqualTo(AggregationTemporality.CUMULATIVE);
}
+ @Test
+ public void testSkipExportingDataIfMissingInstanceId() throws IOException {
+ Attributes attributesWithoutInstanceId =
+ Attributes.builder().putAll(attributes).remove(INSTANCE_ID_KEY).build();
+
+ SpannerCloudMonitoringExporter actualExporter =
+ SpannerCloudMonitoringExporter.create(projectId, null);
+ assertThat(actualExporter.getAggregationTemporality(InstrumentType.COUNTER))
+ .isEqualTo(AggregationTemporality.CUMULATIVE);
+ ArgumentCaptor argumentCaptor =
+ ArgumentCaptor.forClass(CreateTimeSeriesRequest.class);
+
+ UnaryCallable mockCallable = Mockito.mock(UnaryCallable.class);
+ Mockito.when(mockMetricServiceStub.createServiceTimeSeriesCallable()).thenReturn(mockCallable);
+ ApiFuture future = ApiFutures.immediateFuture(Empty.getDefaultInstance());
+ Mockito.when(mockCallable.futureCall(argumentCaptor.capture())).thenReturn(future);
+
+ long fakeValue = 11L;
+
+ long startEpoch = 10;
+ long endEpoch = 15;
+ LongPointData longPointData =
+ ImmutableLongPointData.create(startEpoch, endEpoch, attributesWithoutInstanceId, fakeValue);
+
+ MetricData operationLongData =
+ ImmutableMetricData.createLongSum(
+ resource,
+ scope,
+ "spanner.googleapis.com/internal/client/" + OPERATION_COUNT_NAME,
+ "description",
+ "1",
+ ImmutableSumData.create(
+ true, AggregationTemporality.CUMULATIVE, ImmutableList.of(longPointData)));
+
+ MetricData attemptLongData =
+ ImmutableMetricData.createLongSum(
+ resource,
+ scope,
+ "spanner.googleapis.com/internal/client/" + ATTEMPT_COUNT_NAME,
+ "description",
+ "1",
+ ImmutableSumData.create(
+ true, AggregationTemporality.CUMULATIVE, ImmutableList.of(longPointData)));
+
+ CompletableResultCode resultCode =
+ exporter.export(Arrays.asList(operationLongData, attemptLongData));
+ assertThat(resultCode).isEqualTo(CompletableResultCode.ofFailure());
+ }
+
private static class FakeMetricServiceClient extends MetricServiceClient {
protected FakeMetricServiceClient(MetricServiceStub stub) {
diff --git a/grpc-google-cloud-spanner-admin-database-v1/pom.xml b/grpc-google-cloud-spanner-admin-database-v1/pom.xml
index 8871db39e10..92dc688e75f 100644
--- a/grpc-google-cloud-spanner-admin-database-v1/pom.xml
+++ b/grpc-google-cloud-spanner-admin-database-v1/pom.xml
@@ -4,13 +4,13 @@
4.0.0
com.google.api.grpc
grpc-google-cloud-spanner-admin-database-v1
- 6.81.0
+ 6.81.1
grpc-google-cloud-spanner-admin-database-v1
GRPC library for grpc-google-cloud-spanner-admin-database-v1
com.google.cloud
google-cloud-spanner-parent
- 6.81.0
+ 6.81.1
diff --git a/grpc-google-cloud-spanner-admin-instance-v1/pom.xml b/grpc-google-cloud-spanner-admin-instance-v1/pom.xml
index 814a006e46e..eb834047b1e 100644
--- a/grpc-google-cloud-spanner-admin-instance-v1/pom.xml
+++ b/grpc-google-cloud-spanner-admin-instance-v1/pom.xml
@@ -4,13 +4,13 @@
4.0.0
com.google.api.grpc
grpc-google-cloud-spanner-admin-instance-v1
- 6.81.0
+ 6.81.1
grpc-google-cloud-spanner-admin-instance-v1
GRPC library for grpc-google-cloud-spanner-admin-instance-v1
com.google.cloud
google-cloud-spanner-parent
- 6.81.0
+ 6.81.1
diff --git a/grpc-google-cloud-spanner-executor-v1/pom.xml b/grpc-google-cloud-spanner-executor-v1/pom.xml
index 37bbf63c980..86d8ec3fa8c 100644
--- a/grpc-google-cloud-spanner-executor-v1/pom.xml
+++ b/grpc-google-cloud-spanner-executor-v1/pom.xml
@@ -4,13 +4,13 @@
4.0.0
com.google.api.grpc
grpc-google-cloud-spanner-executor-v1
- 6.81.0
+ 6.81.1
grpc-google-cloud-spanner-executor-v1
GRPC library for google-cloud-spanner
com.google.cloud
google-cloud-spanner-parent
- 6.81.0
+ 6.81.1
diff --git a/grpc-google-cloud-spanner-v1/pom.xml b/grpc-google-cloud-spanner-v1/pom.xml
index 31aaae87d13..7a5e2be0a06 100644
--- a/grpc-google-cloud-spanner-v1/pom.xml
+++ b/grpc-google-cloud-spanner-v1/pom.xml
@@ -4,13 +4,13 @@
4.0.0
com.google.api.grpc
grpc-google-cloud-spanner-v1
- 6.81.0
+ 6.81.1
grpc-google-cloud-spanner-v1
GRPC library for grpc-google-cloud-spanner-v1
com.google.cloud
google-cloud-spanner-parent
- 6.81.0
+ 6.81.1
diff --git a/pom.xml b/pom.xml
index b00bfa5818e..b1783c50cc4 100644
--- a/pom.xml
+++ b/pom.xml
@@ -4,7 +4,7 @@
com.google.cloud
google-cloud-spanner-parent
pom
- 6.81.0
+ 6.81.1
Google Cloud Spanner Parent
https://github.com/googleapis/java-spanner
@@ -61,47 +61,47 @@
com.google.api.grpc
proto-google-cloud-spanner-admin-instance-v1
- 6.81.0
+ 6.81.1
com.google.api.grpc
proto-google-cloud-spanner-executor-v1
- 6.81.0
+ 6.81.1
com.google.api.grpc
grpc-google-cloud-spanner-executor-v1
- 6.81.0
+ 6.81.1
com.google.api.grpc
proto-google-cloud-spanner-v1
- 6.81.0
+ 6.81.1
com.google.api.grpc
proto-google-cloud-spanner-admin-database-v1
- 6.81.0
+ 6.81.1
com.google.api.grpc
grpc-google-cloud-spanner-v1
- 6.81.0
+ 6.81.1
com.google.api.grpc
grpc-google-cloud-spanner-admin-instance-v1
- 6.81.0
+ 6.81.1
com.google.api.grpc
grpc-google-cloud-spanner-admin-database-v1
- 6.81.0
+ 6.81.1
com.google.cloud
google-cloud-spanner
- 6.81.0
+ 6.81.1
diff --git a/proto-google-cloud-spanner-admin-database-v1/pom.xml b/proto-google-cloud-spanner-admin-database-v1/pom.xml
index 015dca3162b..39f837ed429 100644
--- a/proto-google-cloud-spanner-admin-database-v1/pom.xml
+++ b/proto-google-cloud-spanner-admin-database-v1/pom.xml
@@ -4,13 +4,13 @@
4.0.0
com.google.api.grpc
proto-google-cloud-spanner-admin-database-v1
- 6.81.0
+ 6.81.1
proto-google-cloud-spanner-admin-database-v1
PROTO library for proto-google-cloud-spanner-admin-database-v1
com.google.cloud
google-cloud-spanner-parent
- 6.81.0
+ 6.81.1
diff --git a/proto-google-cloud-spanner-admin-instance-v1/pom.xml b/proto-google-cloud-spanner-admin-instance-v1/pom.xml
index 656521decea..b5ddad74e34 100644
--- a/proto-google-cloud-spanner-admin-instance-v1/pom.xml
+++ b/proto-google-cloud-spanner-admin-instance-v1/pom.xml
@@ -4,13 +4,13 @@
4.0.0
com.google.api.grpc
proto-google-cloud-spanner-admin-instance-v1
- 6.81.0
+ 6.81.1
proto-google-cloud-spanner-admin-instance-v1
PROTO library for proto-google-cloud-spanner-admin-instance-v1
com.google.cloud
google-cloud-spanner-parent
- 6.81.0
+ 6.81.1
diff --git a/proto-google-cloud-spanner-executor-v1/pom.xml b/proto-google-cloud-spanner-executor-v1/pom.xml
index 13ac37c1c03..adcfaaefbdc 100644
--- a/proto-google-cloud-spanner-executor-v1/pom.xml
+++ b/proto-google-cloud-spanner-executor-v1/pom.xml
@@ -4,13 +4,13 @@
4.0.0
com.google.api.grpc
proto-google-cloud-spanner-executor-v1
- 6.81.0
+ 6.81.1
proto-google-cloud-spanner-executor-v1
Proto library for google-cloud-spanner
com.google.cloud
google-cloud-spanner-parent
- 6.81.0
+ 6.81.1
diff --git a/proto-google-cloud-spanner-v1/pom.xml b/proto-google-cloud-spanner-v1/pom.xml
index 00e74eb2eaa..f819a8f9ac2 100644
--- a/proto-google-cloud-spanner-v1/pom.xml
+++ b/proto-google-cloud-spanner-v1/pom.xml
@@ -4,13 +4,13 @@
4.0.0
com.google.api.grpc
proto-google-cloud-spanner-v1
- 6.81.0
+ 6.81.1
proto-google-cloud-spanner-v1
PROTO library for proto-google-cloud-spanner-v1
com.google.cloud
google-cloud-spanner-parent
- 6.81.0
+ 6.81.1
diff --git a/samples/README.md b/samples/README.md
index 914c455b46f..d4ad2ac7035 100644
--- a/samples/README.md
+++ b/samples/README.md
@@ -18,29 +18,23 @@ Install [Maven](http://maven.apache.org/).
Build your project from the root directory (`java-spanner`):
mvn clean package -DskipTests
+ cd samples/snippets
+ mvn package
-Every subsequent command here should be run from a subdirectory (`cd samples/snippets`).
+Every subsequent command here should be run from a subdirectory `samples/snippets`.
-You can run a given `ClassName` via:
+### Running samples
- mvn exec:java -Dexec.mainClass=com.example.spanner.ClassName \
- -DpropertyName=propertyValue \
- -Dexec.args="any arguments to the app"
+Usage:
-### Running a simple query (using the quickstart sample)
+ java -jar target/spanner-snippets/spanner-google-cloud-samples.jar operation my-instance my-database
- mvn exec:java -Dexec.mainClass=com.example.spanner.QuickstartSample -Dexec.args="my-instance my-database"
+#### Examples
-## Tutorial
+Create Database:
-### Running the tutorial
- mvn exec:java -Dexec.mainClass=com.example.spanner.admin.archived.SpannerSample -Dexec.args=" my-instance my-database"
+ java -jar target/spanner-google-cloud-samples-jar-with-dependencies.jar my-instance my-database
-## Tracing sample
-`TracingSample.java` demonstrates how to export traces generated by client library to StackDriver and to /tracez page.
+Listing database operations:
-### Running the tracing sample
- mvn exec:java -Dexec.mainClass=com.example.spanner.TracingSample -Dexec.args="my-instance my-database"
-
-## Test
- mvn verify -Dspanner.test.instance= -Dspanner.sample.database= -Dspanner.quickstart.database=
+ java -jar target/spanner-snippets/spanner-google-cloud-samples.jar listdatabaseoperations my-instance my-database
\ No newline at end of file
diff --git a/samples/install-without-bom/pom.xml b/samples/install-without-bom/pom.xml
index 3182e45e5b1..e8f4ddaf030 100644
--- a/samples/install-without-bom/pom.xml
+++ b/samples/install-without-bom/pom.xml
@@ -23,7 +23,7 @@
1.8
UTF-8
0.31.1
- 2.52.0
+ 2.53.0
3.54.0
@@ -33,7 +33,7 @@
com.google.cloud
google-cloud-spanner
- 6.79.0
+ 6.81.0
@@ -145,7 +145,7 @@
org.apache.maven.plugins
maven-failsafe-plugin
- 3.5.1
+ 3.5.2
java-client-integration-tests
diff --git a/samples/snapshot/pom.xml b/samples/snapshot/pom.xml
index 3a2c710e35a..a5e65f1a696 100644
--- a/samples/snapshot/pom.xml
+++ b/samples/snapshot/pom.xml
@@ -23,7 +23,7 @@
1.8
UTF-8
0.31.1
- 2.52.0
+ 2.53.0
3.54.0
@@ -32,7 +32,7 @@
com.google.cloud
google-cloud-spanner
- 6.81.0
+ 6.81.1
@@ -144,7 +144,7 @@
org.apache.maven.plugins
maven-failsafe-plugin
- 3.5.1
+ 3.5.2
java-client-integration-tests
diff --git a/samples/snippets/pom.xml b/samples/snippets/pom.xml
index 1ac402301c1..06698627ef9 100644
--- a/samples/snippets/pom.xml
+++ b/samples/snippets/pom.xml
@@ -175,7 +175,7 @@
org.apache.maven.plugins
maven-failsafe-plugin
- 3.5.1
+ 3.5.2
java-client-integration-tests
diff --git a/versions.txt b/versions.txt
index 4c3c8df4edd..6fbbc5e2230 100644
--- a/versions.txt
+++ b/versions.txt
@@ -1,13 +1,13 @@
# Format:
# module:released-version:current-version
-proto-google-cloud-spanner-admin-instance-v1:6.81.0:6.81.0
-proto-google-cloud-spanner-v1:6.81.0:6.81.0
-proto-google-cloud-spanner-admin-database-v1:6.81.0:6.81.0
-grpc-google-cloud-spanner-v1:6.81.0:6.81.0
-grpc-google-cloud-spanner-admin-instance-v1:6.81.0:6.81.0
-grpc-google-cloud-spanner-admin-database-v1:6.81.0:6.81.0
-google-cloud-spanner:6.81.0:6.81.0
-google-cloud-spanner-executor:6.81.0:6.81.0
-proto-google-cloud-spanner-executor-v1:6.81.0:6.81.0
-grpc-google-cloud-spanner-executor-v1:6.81.0:6.81.0
+proto-google-cloud-spanner-admin-instance-v1:6.81.1:6.81.1
+proto-google-cloud-spanner-v1:6.81.1:6.81.1
+proto-google-cloud-spanner-admin-database-v1:6.81.1:6.81.1
+grpc-google-cloud-spanner-v1:6.81.1:6.81.1
+grpc-google-cloud-spanner-admin-instance-v1:6.81.1:6.81.1
+grpc-google-cloud-spanner-admin-database-v1:6.81.1:6.81.1
+google-cloud-spanner:6.81.1:6.81.1
+google-cloud-spanner-executor:6.81.1:6.81.1
+proto-google-cloud-spanner-executor-v1:6.81.1:6.81.1
+grpc-google-cloud-spanner-executor-v1:6.81.1:6.81.1