diff --git a/.github/workflows/hermetic_library_generation.yaml b/.github/workflows/hermetic_library_generation.yaml index 7146cc3dc1c..ab23b9fec09 100644 --- a/.github/workflows/hermetic_library_generation.yaml +++ b/.github/workflows/hermetic_library_generation.yaml @@ -17,10 +17,14 @@ name: Hermetic library generation upon generation config change through pull req on: pull_request: +env: + HEAD_REF: ${{ github.head_ref }} + REPO_FULL_NAME: ${{ github.event.pull_request.head.repo.full_name }} + jobs: library_generation: # skip pull requests coming from a forked repository - if: github.event.pull_request.head.repo.full_name == github.repository + if: github.env.REPO_FULL_NAME == github.repository runs-on: ubuntu-latest steps: - uses: actions/checkout@v4 @@ -30,11 +34,11 @@ jobs: - name: Generate changed libraries shell: bash run: | - set -x + set -ex [ -z "$(git config user.email)" ] && git config --global user.email "cloud-java-bot@google.com" [ -z "$(git config user.name)" ] && git config --global user.name "cloud-java-bot" bash .github/scripts/hermetic_library_generation.sh \ --target_branch ${{ github.base_ref }} \ - --current_branch ${{ github.head_ref }} + --current_branch $HEAD_REF env: GH_TOKEN: ${{ secrets.CLOUD_JAVA_BOT_TOKEN }} diff --git a/.github/workflows/unmanaged_dependency_check.yaml b/.github/workflows/unmanaged_dependency_check.yaml index 2e6ec1a12bf..e3d689d03c3 100644 --- a/.github/workflows/unmanaged_dependency_check.yaml +++ b/.github/workflows/unmanaged_dependency_check.yaml @@ -17,6 +17,6 @@ jobs: # repository .kokoro/build.sh - name: Unmanaged dependency check - uses: googleapis/sdk-platform-java/java-shared-dependencies/unmanaged-dependency-check@google-cloud-shared-dependencies/v3.33.0 + uses: googleapis/sdk-platform-java/java-shared-dependencies/unmanaged-dependency-check@google-cloud-shared-dependencies/v3.34.0 with: bom-path: google-cloud-spanner-bom/pom.xml diff --git a/.kokoro/presubmit/graalvm-native-17.cfg b/.kokoro/presubmit/graalvm-native-17.cfg index 7008a721567..53cd15405a6 100644 --- a/.kokoro/presubmit/graalvm-native-17.cfg +++ b/.kokoro/presubmit/graalvm-native-17.cfg @@ -3,7 +3,7 @@ # Configure the docker image for kokoro-trampoline. env_vars: { key: "TRAMPOLINE_IMAGE" - value: "gcr.io/cloud-devrel-public-resources/graalvm_sdk_platform_b:3.33.0" + value: "gcr.io/cloud-devrel-public-resources/graalvm_sdk_platform_b:3.34.0" } env_vars: { diff --git a/.kokoro/presubmit/graalvm-native.cfg b/.kokoro/presubmit/graalvm-native.cfg index 931f9bb0052..e211e47fc69 100644 --- a/.kokoro/presubmit/graalvm-native.cfg +++ b/.kokoro/presubmit/graalvm-native.cfg @@ -3,7 +3,7 @@ # Configure the docker image for kokoro-trampoline. env_vars: { key: "TRAMPOLINE_IMAGE" - value: "gcr.io/cloud-devrel-public-resources/graalvm_sdk_platform_a:3.33.0" + value: "gcr.io/cloud-devrel-public-resources/graalvm_sdk_platform_a:3.34.0" } env_vars: { diff --git a/CHANGELOG.md b/CHANGELOG.md index fc0757052a6..26f3fdf2fff 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,34 @@ # Changelog +## [6.73.0](https://github.com/googleapis/java-spanner/compare/v6.72.0...v6.73.0) (2024-08-22) + + +### Features + +* Add option for cancelling queries when closing client ([#3276](https://github.com/googleapis/java-spanner/issues/3276)) ([95da1ed](https://github.com/googleapis/java-spanner/commit/95da1eddbc979f4ce78c9d1ac15bc4c1faba6dca)) + + +### Bug Fixes + +* Github workflow vulnerable to script injection ([#3232](https://github.com/googleapis/java-spanner/issues/3232)) ([599255c](https://github.com/googleapis/java-spanner/commit/599255c36d1fbe8317705a7eeb2a9e400c3efd15)) +* Make DecodeMode.DIRECT the deafult ([#3280](https://github.com/googleapis/java-spanner/issues/3280)) ([f31a95a](https://github.com/googleapis/java-spanner/commit/f31a95ab105407305e988e86c8f7b0d8654995e0)) +* Synchronize lazy ResultSet decoding ([#3267](https://github.com/googleapis/java-spanner/issues/3267)) ([4219cf8](https://github.com/googleapis/java-spanner/commit/4219cf86dba5e44d55f13ab118113f119c92b9e9)) + + +### Dependencies + +* Update dependency com.google.cloud:sdk-platform-java-config to v3.34.0 ([#3277](https://github.com/googleapis/java-spanner/issues/3277)) ([c449a91](https://github.com/googleapis/java-spanner/commit/c449a91628b005481996bce5ab449d62496a4d2d)) +* Update dependency commons-cli:commons-cli to v1.9.0 ([#3275](https://github.com/googleapis/java-spanner/issues/3275)) ([84790f7](https://github.com/googleapis/java-spanner/commit/84790f7d437e88739487b148bf963f0ac9dc3f96)) +* Update dependency io.opentelemetry:opentelemetry-bom to v1.41.0 ([#3269](https://github.com/googleapis/java-spanner/issues/3269)) ([a7458e9](https://github.com/googleapis/java-spanner/commit/a7458e970e4ca55ff3e312b2129e890576145db1)) +* Update dependency org.hamcrest:hamcrest to v3 ([#3271](https://github.com/googleapis/java-spanner/issues/3271)) ([fc2e343](https://github.com/googleapis/java-spanner/commit/fc2e343dc06f80617a2cd6f2bea59b0631e70678)) +* Update dependency org.junit.vintage:junit-vintage-engine to v5.11.0 ([#3272](https://github.com/googleapis/java-spanner/issues/3272)) ([1bc0c46](https://github.com/googleapis/java-spanner/commit/1bc0c469b99ebf3778592b04dbf175b00bf5b06e)) +* Update opentelemetry.version to v1.41.0 ([#3270](https://github.com/googleapis/java-spanner/issues/3270)) ([88f6b56](https://github.com/googleapis/java-spanner/commit/88f6b56fb243bb17b814a7ae150c8f38dced119a)) + + +### Documentation + +* Create a few code snippets as examples for using Spanner Graph using Java ([#3234](https://github.com/googleapis/java-spanner/issues/3234)) ([61f0ab7](https://github.com/googleapis/java-spanner/commit/61f0ab7a48bc3e51b830534b1cfa70e40166ec91)) + ## [6.72.0](https://github.com/googleapis/java-spanner/compare/v6.71.0...v6.72.0) (2024-08-07) diff --git a/benchmarks/pom.xml b/benchmarks/pom.xml index ae700bd3372..23427597b8d 100644 --- a/benchmarks/pom.xml +++ b/benchmarks/pom.xml @@ -24,7 +24,7 @@ com.google.cloud google-cloud-spanner-parent - 6.72.0 + 6.73.0 @@ -34,7 +34,7 @@ UTF-8 UTF-8 2.10.0 - 1.40.0 + 1.41.0 @@ -85,19 +85,19 @@ io.opentelemetry opentelemetry-bom - 1.40.0 + 1.41.0 pom import com.google.cloud google-cloud-spanner - 6.71.0 + 6.72.0 commons-cli commons-cli - 1.8.0 + 1.9.0 com.google.auto.value @@ -118,7 +118,7 @@ commons-cli commons-cli - 1.8.0 + 1.9.0 @@ -133,7 +133,7 @@ org.codehaus.mojo exec-maven-plugin - 3.4.0 + 3.4.1 com.google.cloud.spanner.benchmark.LatencyBenchmark false diff --git a/google-cloud-spanner-bom/pom.xml b/google-cloud-spanner-bom/pom.xml index fa9e6207a2c..a05a10d5a4d 100644 --- a/google-cloud-spanner-bom/pom.xml +++ b/google-cloud-spanner-bom/pom.xml @@ -3,12 +3,12 @@ 4.0.0 com.google.cloud google-cloud-spanner-bom - 6.72.0 + 6.73.0 pom com.google.cloud sdk-platform-java-config - 3.33.0 + 3.34.0 Google Cloud Spanner BOM @@ -53,43 +53,43 @@ com.google.cloud google-cloud-spanner - 6.72.0 + 6.73.0 com.google.cloud google-cloud-spanner test-jar - 6.72.0 + 6.73.0 com.google.api.grpc grpc-google-cloud-spanner-v1 - 6.72.0 + 6.73.0 com.google.api.grpc grpc-google-cloud-spanner-admin-instance-v1 - 6.72.0 + 6.73.0 com.google.api.grpc grpc-google-cloud-spanner-admin-database-v1 - 6.72.0 + 6.73.0 com.google.api.grpc proto-google-cloud-spanner-admin-instance-v1 - 6.72.0 + 6.73.0 com.google.api.grpc proto-google-cloud-spanner-v1 - 6.72.0 + 6.73.0 com.google.api.grpc proto-google-cloud-spanner-admin-database-v1 - 6.72.0 + 6.73.0 diff --git a/google-cloud-spanner-executor/pom.xml b/google-cloud-spanner-executor/pom.xml index 3a99cd00624..35fc9e7c99d 100644 --- a/google-cloud-spanner-executor/pom.xml +++ b/google-cloud-spanner-executor/pom.xml @@ -5,14 +5,14 @@ 4.0.0 com.google.cloud google-cloud-spanner-executor - 6.72.0 + 6.73.0 jar Google Cloud Spanner Executor com.google.cloud google-cloud-spanner-parent - 6.72.0 + 6.73.0 @@ -129,7 +129,7 @@ commons-cli commons-cli - 1.8.0 + 1.9.0 commons-io @@ -188,7 +188,7 @@ org.apache.maven.plugins maven-failsafe-plugin - 3.3.1 + 3.4.0 diff --git a/google-cloud-spanner-executor/src/main/java/com/google/cloud/executor/spanner/CloudClientExecutor.java b/google-cloud-spanner-executor/src/main/java/com/google/cloud/executor/spanner/CloudClientExecutor.java index 4de2e277ffa..443a8faf238 100644 --- a/google-cloud-spanner-executor/src/main/java/com/google/cloud/executor/spanner/CloudClientExecutor.java +++ b/google-cloud-spanner-executor/src/main/java/com/google/cloud/executor/spanner/CloudClientExecutor.java @@ -75,6 +75,7 @@ import com.google.cloud.spanner.encryption.CustomerManagedEncryption; import com.google.cloud.spanner.v1.stub.SpannerStubSettings; import com.google.common.base.Function; +import com.google.common.base.Joiner; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; import com.google.common.util.concurrent.ThreadFactoryBuilder; @@ -191,6 +192,16 @@ public CloudClientExecutor(boolean enableGrpcFaultInjector) { this.enableGrpcFaultInjector = enableGrpcFaultInjector; } + // Helper for unexpected results. + public static String unexpectedExceptionResponse(Exception e) { + return "Unexpected error in Github Cloud Java Client Executor: " + + e + + " Msg: " + + e.getMessage() + + " Stack: " + + Joiner.on("\n").join(e.getStackTrace()); + } + /** * Implementation of a ReadWriteTransaction, which is a wrapper of the cloud TransactionRunner. It * stores all the status and related variables from the start to finish, and control the running @@ -1083,7 +1094,7 @@ private Status executeCreateCloudInstance( return sender.finishWithError( toStatus( SpannerExceptionFactory.newSpannerException( - ErrorCode.INVALID_ARGUMENT, "Unexpected error: " + e.getMessage()))); + ErrorCode.INVALID_ARGUMENT, CloudClientExecutor.unexpectedExceptionResponse(e)))); } return sender.finishWithOK(); } diff --git a/google-cloud-spanner/pom.xml b/google-cloud-spanner/pom.xml index 5f42b91fd7e..59e7ed4a485 100644 --- a/google-cloud-spanner/pom.xml +++ b/google-cloud-spanner/pom.xml @@ -3,7 +3,7 @@ 4.0.0 com.google.cloud google-cloud-spanner - 6.72.0 + 6.73.0 jar Google Cloud Spanner https://github.com/googleapis/java-spanner @@ -11,7 +11,7 @@ com.google.cloud google-cloud-spanner-parent - 6.72.0 + 6.73.0 google-cloud-spanner @@ -359,6 +359,12 @@ ${graal-sdk.version} provided + + org.graalvm.sdk + nativeimage + ${graal-sdk.version} + provided + @@ -411,7 +417,7 @@ org.hamcrest hamcrest - 2.2 + 3.0 test diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractReadContext.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractReadContext.java index 92ebf006d29..caf0e06379e 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractReadContext.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractReadContext.java @@ -36,6 +36,7 @@ import com.google.cloud.spanner.SessionClient.SessionOption; import com.google.cloud.spanner.SessionImpl.SessionTransaction; import com.google.cloud.spanner.spi.v1.SpannerRpc; +import com.google.cloud.spanner.spi.v1.SpannerRpc.Option; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableMap; @@ -45,7 +46,6 @@ import com.google.spanner.v1.DirectedReadOptions; import com.google.spanner.v1.ExecuteBatchDmlRequest; import com.google.spanner.v1.ExecuteSqlRequest; -import com.google.spanner.v1.ExecuteSqlRequest.Builder; import com.google.spanner.v1.ExecuteSqlRequest.QueryMode; import com.google.spanner.v1.ExecuteSqlRequest.QueryOptions; import com.google.spanner.v1.PartialResultSet; @@ -69,6 +69,7 @@ abstract class AbstractReadContext abstract static class Builder, T extends AbstractReadContext> { private SessionImpl session; + private boolean cancelQueryWhenClientIsClosed; private SpannerRpc rpc; private ISpan span; private TraceWrapper tracer; @@ -91,6 +92,11 @@ B setSession(SessionImpl session) { return self(); } + B setCancelQueryWhenClientIsClosed(boolean cancelQueryWhenClientIsClosed) { + this.cancelQueryWhenClientIsClosed = cancelQueryWhenClientIsClosed; + return self(); + } + B setRpc(SpannerRpc rpc) { this.rpc = rpc; return self(); @@ -184,7 +190,7 @@ static Builder newBuilder() { @GuardedBy("lock") private boolean used; - private final Map channelHint; + private Map channelHint; private SingleReadContext(Builder builder) { super(builder); @@ -227,6 +233,16 @@ TransactionSelector getTransactionSelector() { Map getTransactionChannelHint() { return channelHint; } + + @Override + boolean prepareRetryOnDifferentGrpcChannel() { + if (session.getIsMultiplexed() && channelHint.get(Option.CHANNEL_HINT) != null) { + long channelHintForTransaction = Option.CHANNEL_HINT.getLong(channelHint) + 1L; + channelHint = optionMap(SessionOption.channelHint(channelHintForTransaction)); + return true; + } + return super.prepareRetryOnDifferentGrpcChannel(); + } } private static void assertTimestampAvailable(boolean available) { @@ -440,6 +456,7 @@ void initTransaction() { final Object lock = new Object(); final SessionImpl session; + final boolean cancelQueryWhenClientIsClosed; final SpannerRpc rpc; final ExecutorProvider executorProvider; ISpan span; @@ -469,6 +486,7 @@ void initTransaction() { AbstractReadContext(Builder builder) { this.session = builder.session; + this.cancelQueryWhenClientIsClosed = builder.cancelQueryWhenClientIsClosed; this.rpc = builder.rpc; this.defaultPrefetchChunks = builder.defaultPrefetchChunks; this.defaultQueryOptions = builder.defaultQueryOptions; @@ -745,11 +763,13 @@ ResultSet executeQueryInternalWithOptions( span, tracer, tracer.createStatementAttributes(statement, options), + session.getErrorHandler(), rpc.getExecuteQueryRetrySettings(), rpc.getExecuteQueryRetryableCodes()) { @Override CloseableIterator startStream(@Nullable ByteString resumeToken) { - GrpcStreamIterator stream = new GrpcStreamIterator(statement, prefetchChunks); + GrpcStreamIterator stream = + new GrpcStreamIterator(statement, prefetchChunks, cancelQueryWhenClientIsClosed); if (partitionToken != null) { request.setPartitionToken(partitionToken); } @@ -774,6 +794,11 @@ CloseableIterator startStream(@Nullable ByteString resumeToken stream.setCall(call, request.getTransaction().hasBegin()); return stream; } + + @Override + boolean prepareIteratorForRetryOnDifferentGrpcChannel() { + return AbstractReadContext.this.prepareRetryOnDifferentGrpcChannel(); + } }; return new GrpcResultSet( stream, this, options.hasDecodeMode() ? options.decodeMode() : defaultDecodeMode); @@ -840,6 +865,10 @@ public void close() { */ abstract Map getTransactionChannelHint(); + boolean prepareRetryOnDifferentGrpcChannel() { + return false; + } + /** * Returns the transaction tag for this {@link AbstractReadContext} or null if this * {@link AbstractReadContext} does not have a transaction tag. @@ -918,11 +947,13 @@ ResultSet readInternalWithOptions( SpannerImpl.READ, span, tracer, + session.getErrorHandler(), rpc.getReadRetrySettings(), rpc.getReadRetryableCodes()) { @Override CloseableIterator startStream(@Nullable ByteString resumeToken) { - GrpcStreamIterator stream = new GrpcStreamIterator(prefetchChunks); + GrpcStreamIterator stream = + new GrpcStreamIterator(prefetchChunks, cancelQueryWhenClientIsClosed); TransactionSelector selector = null; if (resumeToken != null) { builder.setResumeToken(resumeToken); @@ -945,6 +976,11 @@ CloseableIterator startStream(@Nullable ByteString resumeToken stream.setCall(call, /* withBeginTransaction = */ builder.getTransaction().hasBegin()); return stream; } + + @Override + boolean prepareIteratorForRetryOnDifferentGrpcChannel() { + return AbstractReadContext.this.prepareRetryOnDifferentGrpcChannel(); + } }; return new GrpcResultSet( stream, this, readOptions.hasDecodeMode() ? readOptions.decodeMode() : defaultDecodeMode); diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/BatchClientImpl.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/BatchClientImpl.java index 22fb9f710c1..3d886dd383b 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/BatchClientImpl.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/BatchClientImpl.java @@ -54,6 +54,7 @@ public BatchReadOnlyTransaction batchReadOnlyTransaction(TimestampBound bound) { return new BatchReadOnlyTransactionImpl( MultiUseReadOnlyTransaction.newBuilder() .setSession(session) + .setCancelQueryWhenClientIsClosed(true) .setRpc(sessionClient.getSpanner().getRpc()) .setTimestampBound(bound) .setDefaultQueryOptions( @@ -75,6 +76,7 @@ public BatchReadOnlyTransaction batchReadOnlyTransaction(BatchTransactionId batc return new BatchReadOnlyTransactionImpl( MultiUseReadOnlyTransaction.newBuilder() .setSession(session) + .setCancelQueryWhenClientIsClosed(true) .setRpc(sessionClient.getSpanner().getRpc()) .setTransactionId(batchTransactionId.getTransactionId()) .setTimestamp(batchTransactionId.getTimestamp()) diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/ErrorHandler.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/ErrorHandler.java new file mode 100644 index 00000000000..cf2465d7ade --- /dev/null +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/ErrorHandler.java @@ -0,0 +1,50 @@ +/* + * Copyright 2024 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.spanner; + +import com.google.api.core.BetaApi; +import javax.annotation.Nonnull; + +/** + * The {@link ErrorHandler} interface can be used to implement custom error and retry handling for + * specific cases. The default implementation does nothing and falls back to the standard error and + * retry handling in Gax and the Spanner client. + */ +@BetaApi +interface ErrorHandler { + @Nonnull + Throwable translateException(@Nonnull Throwable exception); + + int getMaxAttempts(); + + class DefaultErrorHandler implements ErrorHandler { + static final DefaultErrorHandler INSTANCE = new DefaultErrorHandler(); + + private DefaultErrorHandler() {} + + @Nonnull + @Override + public Throwable translateException(@Nonnull Throwable exception) { + return exception; + } + + @Override + public int getMaxAttempts() { + return 0; + } + } +} diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/GrpcResultSet.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/GrpcResultSet.java index 7b61901a60e..be75c1e5c4e 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/GrpcResultSet.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/GrpcResultSet.java @@ -25,6 +25,7 @@ import com.google.spanner.v1.ResultSetMetadata; import com.google.spanner.v1.ResultSetStats; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import javax.annotation.Nullable; @@ -35,6 +36,7 @@ class GrpcResultSet extends AbstractResultSet> implements ProtobufR private final DecodeMode decodeMode; private ResultSetMetadata metadata; private GrpcStruct currRow; + private List rowData; private SpannerException error; private ResultSetStats statistics; private boolean closed; @@ -85,7 +87,15 @@ public boolean next() throws SpannerException { throw SpannerExceptionFactory.newSpannerException( ErrorCode.FAILED_PRECONDITION, AbstractReadContext.NO_TRANSACTION_RETURNED_MSG); } - currRow = new GrpcStruct(iterator.type(), new ArrayList<>(), decodeMode); + if (rowData == null) { + rowData = new ArrayList<>(metadata.getRowType().getFieldsCount()); + if (decodeMode != DecodeMode.DIRECT) { + rowData = Collections.synchronizedList(rowData); + } + } else { + rowData.clear(); + } + currRow = new GrpcStruct(iterator.type(), rowData, decodeMode); } boolean hasNext = currRow.consumeRow(iterator); if (!hasNext) { diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/GrpcStreamIterator.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/GrpcStreamIterator.java index dde6b69c461..af6b5683502 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/GrpcStreamIterator.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/GrpcStreamIterator.java @@ -38,7 +38,7 @@ class GrpcStreamIterator extends AbstractIterator private static final Logger logger = Logger.getLogger(GrpcStreamIterator.class.getName()); private static final PartialResultSet END_OF_STREAM = PartialResultSet.newBuilder().build(); - private final ConsumerImpl consumer = new ConsumerImpl(); + private final ConsumerImpl consumer; private final BlockingQueue stream; private final Statement statement; @@ -49,13 +49,15 @@ class GrpcStreamIterator extends AbstractIterator private SpannerException error; @VisibleForTesting - GrpcStreamIterator(int prefetchChunks) { - this(null, prefetchChunks); + GrpcStreamIterator(int prefetchChunks, boolean cancelQueryWhenClientIsClosed) { + this(null, prefetchChunks, cancelQueryWhenClientIsClosed); } @VisibleForTesting - GrpcStreamIterator(Statement statement, int prefetchChunks) { + GrpcStreamIterator( + Statement statement, int prefetchChunks, boolean cancelQueryWhenClientIsClosed) { this.statement = statement; + this.consumer = new ConsumerImpl(cancelQueryWhenClientIsClosed); // One extra to allow for END_OF_STREAM message. this.stream = new LinkedBlockingQueue<>(prefetchChunks + 1); } @@ -136,6 +138,12 @@ private void addToStream(PartialResultSet results) { } private class ConsumerImpl implements SpannerRpc.ResultStreamConsumer { + private final boolean cancelQueryWhenClientIsClosed; + + ConsumerImpl(boolean cancelQueryWhenClientIsClosed) { + this.cancelQueryWhenClientIsClosed = cancelQueryWhenClientIsClosed; + } + @Override public void onPartialResultSet(PartialResultSet results) { addToStream(results); @@ -168,5 +176,10 @@ public void onError(SpannerException e) { error = e; addToStream(END_OF_STREAM); } + + @Override + public boolean cancelQueryWhenClientIsClosed() { + return this.cancelQueryWhenClientIsClosed; + } } } diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/GrpcStruct.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/GrpcStruct.java index 852b9ed61a3..4d07a12880c 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/GrpcStruct.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/GrpcStruct.java @@ -49,6 +49,7 @@ import java.util.Collections; import java.util.Iterator; import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Function; import java.util.stream.Collectors; @@ -60,7 +61,7 @@ class GrpcStruct extends Struct implements Serializable { private final List rowData; private final DecodeMode decodeMode; private final BitSet colDecoded; - private boolean rowDecoded; + private final AtomicBoolean rowDecoded; /** * Builds an immutable version of this struct using {@link Struct#newBuilder()} which is used as a @@ -224,7 +225,7 @@ private GrpcStruct( this.type = type; this.rowData = rowData; this.decodeMode = decodeMode; - this.rowDecoded = rowDecoded; + this.rowDecoded = new AtomicBoolean(rowDecoded); this.colDecoded = colDecoded; } @@ -234,29 +235,31 @@ public String toString() { } boolean consumeRow(Iterator iterator) { - rowData.clear(); - if (decodeMode == DecodeMode.LAZY_PER_ROW) { - rowDecoded = false; - } else if (decodeMode == DecodeMode.LAZY_PER_COL) { - colDecoded.clear(); - } - if (!iterator.hasNext()) { - return false; - } - for (Type.StructField fieldType : getType().getStructFields()) { + synchronized (rowData) { + rowData.clear(); + if (decodeMode == DecodeMode.LAZY_PER_ROW) { + rowDecoded.set(false); + } else if (decodeMode == DecodeMode.LAZY_PER_COL) { + colDecoded.clear(); + } if (!iterator.hasNext()) { - throw newSpannerException( - ErrorCode.INTERNAL, - "Invalid value stream: end of stream reached before row is complete"); + return false; } - com.google.protobuf.Value value = iterator.next(); - if (decodeMode == DecodeMode.DIRECT) { - rowData.add(decodeValue(fieldType.getType(), value)); - } else { - rowData.add(value); + for (Type.StructField fieldType : getType().getStructFields()) { + if (!iterator.hasNext()) { + throw newSpannerException( + ErrorCode.INTERNAL, + "Invalid value stream: end of stream reached before row is complete"); + } + com.google.protobuf.Value value = iterator.next(); + if (decodeMode == DecodeMode.DIRECT) { + rowData.add(decodeValue(fieldType.getType(), value)); + } else { + rowData.add(value); + } } + return true; } - return true; } private static Object decodeValue(Type fieldType, com.google.protobuf.Value proto) { @@ -367,12 +370,16 @@ private static void checkType( } Struct immutableCopy() { - return new GrpcStruct( - type, - new ArrayList<>(rowData), - this.decodeMode, - this.rowDecoded, - this.colDecoded == null ? null : (BitSet) this.colDecoded.clone()); + synchronized (rowData) { + return new GrpcStruct( + type, + this.decodeMode == DecodeMode.DIRECT + ? new ArrayList<>(rowData) + : Collections.synchronizedList(new ArrayList<>(rowData)), + this.decodeMode, + this.rowDecoded.get(), + this.colDecoded == null ? null : (BitSet) this.colDecoded.clone()); + } } @Override @@ -382,9 +389,14 @@ public Type getType() { @Override public boolean isNull(int columnIndex) { - if ((decodeMode == DecodeMode.LAZY_PER_ROW && !rowDecoded) - || (decodeMode == DecodeMode.LAZY_PER_COL && !colDecoded.get(columnIndex))) { - return ((com.google.protobuf.Value) rowData.get(columnIndex)).hasNullValue(); + if (decodeMode == DecodeMode.LAZY_PER_ROW || decodeMode == DecodeMode.LAZY_PER_COL) { + synchronized (rowData) { + if ((decodeMode == DecodeMode.LAZY_PER_ROW && !rowDecoded.get()) + || (decodeMode == DecodeMode.LAZY_PER_COL && !colDecoded.get(columnIndex))) { + return ((com.google.protobuf.Value) rowData.get(columnIndex)).hasNullValue(); + } + return rowData.get(columnIndex) == null; + } } return rowData.get(columnIndex) == null; } @@ -496,14 +508,18 @@ private boolean isUnrecognizedType(int columnIndex) { } boolean canGetProtoValue(int columnIndex) { - return isUnrecognizedType(columnIndex) - || (decodeMode == DecodeMode.LAZY_PER_ROW && !rowDecoded) - || (decodeMode == DecodeMode.LAZY_PER_COL && !colDecoded.get(columnIndex)); + synchronized (rowData) { + return isUnrecognizedType(columnIndex) + || (decodeMode == DecodeMode.LAZY_PER_ROW && !rowDecoded.get()) + || (decodeMode == DecodeMode.LAZY_PER_COL && !colDecoded.get(columnIndex)); + } } protected com.google.protobuf.Value getProtoValueInternal(int columnIndex) { - checkProtoValueSupported(columnIndex); - return (com.google.protobuf.Value) rowData.get(columnIndex); + synchronized (rowData) { + checkProtoValueSupported(columnIndex); + return (com.google.protobuf.Value) rowData.get(columnIndex); + } } private void checkProtoValueSupported(int columnIndex) { @@ -515,7 +531,7 @@ private void checkProtoValueSupported(int columnIndex) { decodeMode != DecodeMode.DIRECT, "Getting proto value is not supported when DecodeMode#DIRECT is used."); Preconditions.checkState( - !(decodeMode == DecodeMode.LAZY_PER_ROW && rowDecoded), + !(decodeMode == DecodeMode.LAZY_PER_ROW && rowDecoded.get()), "Getting proto value after the row has been decoded is not supported."); Preconditions.checkState( !(decodeMode == DecodeMode.LAZY_PER_COL && colDecoded.get(columnIndex)), @@ -523,22 +539,48 @@ private void checkProtoValueSupported(int columnIndex) { } private void ensureDecoded(int columnIndex) { - if (decodeMode == DecodeMode.LAZY_PER_ROW && !rowDecoded) { - for (int i = 0; i < rowData.size(); i++) { - rowData.set( - i, - decodeValue( - type.getStructFields().get(i).getType(), - (com.google.protobuf.Value) rowData.get(i))); + if (decodeMode == DecodeMode.LAZY_PER_ROW) { + synchronized (rowData) { + if (!rowDecoded.get()) { + for (int i = 0; i < rowData.size(); i++) { + rowData.set( + i, + decodeValue( + type.getStructFields().get(i).getType(), + (com.google.protobuf.Value) rowData.get(i))); + } + } + rowDecoded.set(true); + } + } else if (decodeMode == DecodeMode.LAZY_PER_COL) { + boolean decoded; + Object value; + synchronized (rowData) { + decoded = colDecoded.get(columnIndex); + value = rowData.get(columnIndex); + } + if (!decoded) { + // Use the column as a lock during decoding to ensure that we decode once (mostly), but also + // that multiple different columns can be decoded in parallel if requested. + synchronized (type.getStructFields().get(columnIndex)) { + // Note: It can be that we decode the value twice if two threads request this at the same + // time, but the synchronization on rowData above and below makes sure that we always get + // and set a consistent value (and only set it once). + if (!colDecoded.get(columnIndex)) { + value = + decodeValue( + type.getStructFields().get(columnIndex).getType(), + (com.google.protobuf.Value) value); + decoded = true; + } + } + if (decoded) { + synchronized (rowData) { + rowData.set(columnIndex, value); + colDecoded.set(columnIndex); + } + } } - rowDecoded = true; - } else if (decodeMode == DecodeMode.LAZY_PER_COL && !colDecoded.get(columnIndex)) { - rowData.set( - columnIndex, - decodeValue( - type.getStructFields().get(columnIndex).getType(), - (com.google.protobuf.Value) rowData.get(columnIndex))); - colDecoded.set(columnIndex); } } diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/IsChannelShutdownException.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/IsChannelShutdownException.java new file mode 100644 index 00000000000..367d75a13cb --- /dev/null +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/IsChannelShutdownException.java @@ -0,0 +1,50 @@ +/* + * Copyright 2024 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.spanner; + +import com.google.api.gax.rpc.UnavailableException; +import com.google.common.base.Predicate; +import io.grpc.Status.Code; +import io.grpc.StatusRuntimeException; + +/** + * Predicate that checks whether an exception is a ChannelShutdownException. This exception is + * thrown by gRPC if the underlying gRPC stub has been shut down and uses the UNAVAILABLE error + * code. This means that it would normally be retried by the Spanner client, but this specific + * UNAVAILABLE error should not be retried, as it would otherwise directly return the same error. + */ +class IsChannelShutdownException implements Predicate { + + @Override + public boolean apply(Throwable input) { + Throwable cause = input; + do { + if (isUnavailableError(cause) + && (cause.getMessage().contains("Channel shutdown invoked") + || cause.getMessage().contains("Channel shutdownNow invoked"))) { + return true; + } + } while ((cause = cause.getCause()) != null); + return false; + } + + private boolean isUnavailableError(Throwable cause) { + return (cause instanceof UnavailableException) + || (cause instanceof StatusRuntimeException + && ((StatusRuntimeException) cause).getStatus().getCode() == Code.UNAVAILABLE); + } +} diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/ResumableStreamIterator.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/ResumableStreamIterator.java index d6d72aac33c..3e82ab7d5ff 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/ResumableStreamIterator.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/ResumableStreamIterator.java @@ -57,6 +57,7 @@ abstract class ResumableStreamIterator extends AbstractIterator { private static final RetrySettings DEFAULT_STREAMING_RETRY_SETTINGS = SpannerStubSettings.newBuilder().executeStreamingSqlSettings().getRetrySettings(); + private final ErrorHandler errorHandler; private final RetrySettings streamingRetrySettings; private final Set retryableCodes; private static final Logger logger = Logger.getLogger(ResumableStreamIterator.class.getName()); @@ -80,6 +81,7 @@ protected ResumableStreamIterator( String streamName, ISpan parent, TraceWrapper tracer, + ErrorHandler errorHandler, RetrySettings streamingRetrySettings, Set retryableCodes) { this( @@ -88,6 +90,7 @@ protected ResumableStreamIterator( parent, tracer, Attributes.empty(), + errorHandler, streamingRetrySettings, retryableCodes); } @@ -98,12 +101,14 @@ protected ResumableStreamIterator( ISpan parent, TraceWrapper tracer, Attributes attributes, + ErrorHandler errorHandler, RetrySettings streamingRetrySettings, Set retryableCodes) { checkArgument(maxBufferSize >= 0); this.maxBufferSize = maxBufferSize; this.tracer = tracer; this.span = tracer.spanBuilderWithExplicitParent(streamName, parent, attributes); + this.errorHandler = errorHandler; this.streamingRetrySettings = Preconditions.checkNotNull(streamingRetrySettings); this.retryableCodes = Preconditions.checkNotNull(retryableCodes); } @@ -193,6 +198,14 @@ public void execute(Runnable command) { abstract CloseableIterator startStream(@Nullable ByteString resumeToken); + /** + * Prepares the iterator for a retry on a different gRPC channel. Returns true if that is + * possible, and false otherwise. A retry should only be attempted if the method returns true. + */ + boolean prepareIteratorForRetryOnDifferentGrpcChannel() { + return false; + } + @Override public void close(@Nullable String message) { if (stream != null) { @@ -209,6 +222,7 @@ public boolean isWithBeginTransaction() { @Override protected PartialResultSet computeNext() { + int numAttemptsOnOtherChannel = 0; Context context = Context.current(); while (true) { // Eagerly start stream before consuming any buffered items. @@ -279,6 +293,17 @@ protected PartialResultSet computeNext() { continue; } + // Check if we should retry the request on a different gRPC channel. + if (resumeToken == null && buffer.isEmpty()) { + Throwable translated = errorHandler.translateException(spannerException); + if (translated instanceof RetryOnDifferentGrpcChannelException) { + if (++numAttemptsOnOtherChannel < errorHandler.getMaxAttempts() + && prepareIteratorForRetryOnDifferentGrpcChannel()) { + stream = null; + continue; + } + } + } span.addAnnotation("Stream broken. Not safe to retry", spannerException); span.setStatus(spannerException); throw spannerException; diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/RetryOnDifferentGrpcChannelErrorHandler.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/RetryOnDifferentGrpcChannelErrorHandler.java new file mode 100644 index 00000000000..46607d33a88 --- /dev/null +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/RetryOnDifferentGrpcChannelErrorHandler.java @@ -0,0 +1,83 @@ +/* + * Copyright 2024 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.spanner; + +import static com.google.cloud.spanner.SessionImpl.NO_CHANNEL_HINT; + +import com.google.api.core.BetaApi; +import com.google.cloud.spanner.spi.v1.SpannerRpc.Option; +import javax.annotation.Nonnull; + +/** + * An experimental error handler that allows DEADLINE_EXCEEDED errors to be retried on a different + * gRPC channel. This handler is only used if the system property + * 'spanner.retry_deadline_exceeded_on_different_channel' has been set to true, and it is only used + * in the following specific cases: + * + *
    + *
  1. A DEADLINE_EXCEEDED error during a read/write transaction. The error is translated to a + * {@link RetryOnDifferentGrpcChannelException}, which is caught by the session pool and + * causes a retry of the entire transaction on a different session and different gRPC channel. + *
  2. A DEADLINE_EXCEEDED error during a single-use read-only transaction using a multiplexed + * session. Note that errors for the same using a regular session are not retried. + *
+ */ +@BetaApi +class RetryOnDifferentGrpcChannelErrorHandler implements ErrorHandler { + private final int maxAttempts; + + private final SessionImpl session; + + static boolean isEnabled() { + return Boolean.parseBoolean( + System.getProperty("spanner.retry_deadline_exceeded_on_different_channel", "false")); + } + + RetryOnDifferentGrpcChannelErrorHandler(int maxAttempts, SessionImpl session) { + this.maxAttempts = maxAttempts; + this.session = session; + } + + @Override + @Nonnull + public Throwable translateException(@Nonnull Throwable exception) { + if (session == null || !(exception instanceof SpannerException)) { + return exception; + } + SpannerException spannerException = (SpannerException) exception; + if (spannerException.getErrorCode() == ErrorCode.DEADLINE_EXCEEDED) { + if (session.getIsMultiplexed() + || (session.getOptions() != null + && session.getOptions().containsKey(Option.CHANNEL_HINT))) { + int channel = NO_CHANNEL_HINT; + if (session.getOptions() != null && session.getOptions().containsKey(Option.CHANNEL_HINT)) { + channel = Option.CHANNEL_HINT.getLong(session.getOptions()).intValue(); + } + return SpannerExceptionFactory.newRetryOnDifferentGrpcChannelException( + "Retrying on a new gRPC channel due to a DEADLINE_EXCEEDED error", + channel, + spannerException); + } + } + return spannerException; + } + + @Override + public int getMaxAttempts() { + return maxAttempts; + } +} diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/RetryOnDifferentGrpcChannelException.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/RetryOnDifferentGrpcChannelException.java new file mode 100644 index 00000000000..59e50ef6a1e --- /dev/null +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/RetryOnDifferentGrpcChannelException.java @@ -0,0 +1,34 @@ +/* + * Copyright 2024 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.spanner; + +import javax.annotation.Nullable; + +class RetryOnDifferentGrpcChannelException extends SpannerException { + private final int channel; + + RetryOnDifferentGrpcChannelException( + @Nullable String message, int channel, @Nullable Throwable cause) { + // Note: We set retryable=false, as the exception is not retryable in the standard way. + super(DoNotConstructDirectly.ALLOWED, ErrorCode.INTERNAL, /*retryable=*/ false, message, cause); + this.channel = channel; + } + + int getChannel() { + return this.channel; + } +} diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionImpl.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionImpl.java index d06385050f5..6c53689dd7d 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionImpl.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionImpl.java @@ -26,6 +26,7 @@ import com.google.cloud.spanner.AbstractReadContext.MultiUseReadOnlyTransaction; import com.google.cloud.spanner.AbstractReadContext.SingleReadContext; import com.google.cloud.spanner.AbstractReadContext.SingleUseReadOnlyTransaction; +import com.google.cloud.spanner.ErrorHandler.DefaultErrorHandler; import com.google.cloud.spanner.Options.TransactionOption; import com.google.cloud.spanner.Options.UpdateOption; import com.google.cloud.spanner.SessionClient.SessionOption; @@ -116,6 +117,7 @@ interface SessionTransaction { private ISpan currentSpan; private final Clock clock; private final Map options; + private final ErrorHandler errorHandler; SessionImpl(SpannerImpl spanner, SessionReference sessionReference) { this(spanner, sessionReference, NO_CHANNEL_HINT); @@ -127,6 +129,7 @@ interface SessionTransaction { this.sessionReference = sessionReference; this.clock = spanner.getOptions().getSessionPoolOptions().getPoolMaintainerClock(); this.options = createOptions(sessionReference, channelHint); + this.errorHandler = createErrorHandler(spanner.getOptions()); } static Map createOptions( @@ -137,6 +140,13 @@ interface SessionTransaction { return CHANNEL_HINT_OPTIONS[channelHint % CHANNEL_HINT_OPTIONS.length]; } + private ErrorHandler createErrorHandler(SpannerOptions options) { + if (RetryOnDifferentGrpcChannelErrorHandler.isEnabled()) { + return new RetryOnDifferentGrpcChannelErrorHandler(options.getNumChannels(), this); + } + return DefaultErrorHandler.INSTANCE; + } + @Override public String getName() { return sessionReference.getName(); @@ -146,6 +156,10 @@ public String getName() { return options; } + ErrorHandler getErrorHandler() { + return this.errorHandler; + } + void setCurrentSpan(ISpan span) { currentSpan = span; } diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java index 1819224495d..0a7c42477e2 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java @@ -67,6 +67,9 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.MoreObjects; import com.google.common.base.Preconditions; +import com.google.common.base.Ticker; +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; import com.google.common.collect.ImmutableList; import com.google.common.util.concurrent.ForwardingListenableFuture; import com.google.common.util.concurrent.ForwardingListenableFuture.SimpleForwardingListenableFuture; @@ -560,6 +563,8 @@ public Timestamp getReadTimestamp() { interface SessionReplacementHandler { T replaceSession(SessionNotFoundException notFound, T sessionFuture); + + T denyListSession(RetryOnDifferentGrpcChannelException retryException, T sessionFuture); } class PooledSessionReplacementHandler implements SessionReplacementHandler { @@ -580,6 +585,37 @@ public PooledSessionFuture replaceSession( throw e; } } + + @Override + public PooledSessionFuture denyListSession( + RetryOnDifferentGrpcChannelException retryException, PooledSessionFuture session) { + // The feature was not enabled when the session pool was created. + if (denyListedChannels == null) { + throw SpannerExceptionFactory.asSpannerException(retryException.getCause()); + } + + int channel = session.get().getChannel(); + synchronized (lock) { + // Calculate the size manually by iterating over the possible keys. We do this because the + // size of a cache can be stale, and manually checking for each possible key will make sure + // we get the correct value, and it will update the cache. + int currentSize = 0; + for (int i = 0; i < numChannels; i++) { + if (denyListedChannels.getIfPresent(i) != null) { + currentSize++; + } + } + if (currentSize < numChannels - 1) { + denyListedChannels.put(channel, DENY_LISTED); + } else { + // We have now deny-listed all channels. Give up and just throw the original error. + throw SpannerExceptionFactory.asSpannerException(retryException.getCause()); + } + } + session.get().releaseToPosition = Position.LAST; + session.close(); + return getSession(); + } } interface SessionNotFoundHandler { @@ -1005,6 +1041,14 @@ public T run(TransactionCallable callable) { session = sessionReplacementHandler.replaceSession(e, session); CachedSession cachedSession = session.get(); runner = cachedSession.getDelegate().readWriteTransaction(); + } catch (RetryOnDifferentGrpcChannelException retryException) { + // This error is thrown by the RetryOnDifferentGrpcChannelErrorHandler in the specific + // case that a transaction failed with a DEADLINE_EXCEEDED error. This is an + // experimental feature that is disabled by default, and that can be removed in a + // future version. + session = sessionReplacementHandler.denyListSession(retryException, session); + CachedSession cachedSession = session.get(); + runner = cachedSession.getDelegate().readWriteTransaction(); } } session.get().markUsed(); @@ -2300,6 +2344,9 @@ enum Position { private final PooledSessionReplacementHandler pooledSessionReplacementHandler = new PooledSessionReplacementHandler(); + private static final Object DENY_LISTED = new Object(); + private final Cache denyListedChannels; + /** * Create a session pool with the given options and for the given database. It will also start * eagerly creating sessions if {@link SessionPoolOptions#getMinSessions()} is greater than 0. @@ -2442,6 +2489,22 @@ private SessionPool( openTelemetry, attributes, numMultiplexedSessionsAcquired, numMultiplexedSessionsReleased); this.waitOnMinSessionsLatch = options.getMinSessions() > 0 ? new CountDownLatch(1) : new CountDownLatch(0); + this.denyListedChannels = + RetryOnDifferentGrpcChannelErrorHandler.isEnabled() + ? CacheBuilder.newBuilder() + .expireAfterWrite(java.time.Duration.ofMinutes(1)) + .maximumSize(this.numChannels) + .concurrencyLevel(1) + .ticker( + new Ticker() { + @Override + public long read() { + return TimeUnit.NANOSECONDS.convert( + clock.instant().toEpochMilli(), TimeUnit.MILLISECONDS); + } + }) + .build() + : null; } /** @@ -2671,7 +2734,26 @@ PooledSessionFuture getSession() throws SpannerException { resourceNotFoundException.getMessage()), resourceNotFoundException); } - sess = sessions.poll(); + if (denyListedChannels != null + && denyListedChannels.size() > 0 + && denyListedChannels.size() < numChannels) { + // There are deny-listed channels. Get a session that is not affiliated with a deny-listed + // channel. + for (PooledSession session : sessions) { + if (denyListedChannels.getIfPresent(session.getChannel()) == null) { + sessions.remove(session); + sess = session; + break; + } + // Size is cached and can change after calling getIfPresent. + if (denyListedChannels.size() == 0) { + break; + } + } + } + if (sess == null) { + sess = sessions.poll(); + } if (sess == null) { span.addAnnotation("No session available"); maybeCreateSession(); diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerExceptionFactory.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerExceptionFactory.java index 2c52192d214..39b254fe997 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerExceptionFactory.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerExceptionFactory.java @@ -181,6 +181,17 @@ public static SpannerException newSpannerException(@Nullable Context context, Th return newSpannerException(ErrorCode.fromGrpcStatus(status), cause.getMessage(), cause); } + /** + * Creates a new SpannerException that indicates that the RPC or transaction should be retried on + * a different gRPC channel. This is an experimental feature that can be removed in the future. + * The exception should not be surfaced to the client application, and should instead be caught + * and handled in the client library. + */ + static SpannerException newRetryOnDifferentGrpcChannelException( + String message, int channel, Throwable cause) { + return new RetryOnDifferentGrpcChannelException(message, channel, cause); + } + static SpannerException newSpannerExceptionForCancellation( @Nullable Context context, @Nullable Throwable cause) { if (context != null && context.isCancelled()) { @@ -322,7 +333,9 @@ private static boolean isRetryable(ErrorCode code, @Nullable Throwable cause) { case UNAVAILABLE: // SSLHandshakeException is (probably) not retryable, as it is an indication that the server // certificate was not accepted by the client. - return !hasCauseMatching(cause, Matchers.isSSLHandshakeException); + // Channel shutdown is also not a retryable exception. + return !(hasCauseMatching(cause, Matchers.isSSLHandshakeException) + || hasCauseMatching(cause, Matchers.IS_CHANNEL_SHUTDOWN_EXCEPTION)); case RESOURCE_EXHAUSTED: return SpannerException.extractRetryDelay(cause) > 0; default: @@ -345,5 +358,8 @@ private static class Matchers { static final Predicate isRetryableInternalError = new IsRetryableInternalError(); static final Predicate isSSLHandshakeException = new IsSslHandshakeException(); + + static final Predicate IS_CHANNEL_SHUTDOWN_EXCEPTION = + new IsChannelShutdownException(); } } diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerOptions.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerOptions.java index e6f60090acb..3a8632e2ebe 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerOptions.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerOptions.java @@ -741,9 +741,7 @@ public static class Builder extends ServiceOptions.Builder { static final int DEFAULT_PREFETCH_CHUNKS = 4; static final QueryOptions DEFAULT_QUERY_OPTIONS = QueryOptions.getDefaultInstance(); - // TODO: Set the default to DecodeMode.DIRECT before merging to keep the current default. - // It is currently set to LAZY_PER_COL so it is used in all tests. - static final DecodeMode DEFAULT_DECODE_MODE = DecodeMode.LAZY_PER_COL; + static final DecodeMode DEFAULT_DECODE_MODE = DecodeMode.DIRECT; static final RetrySettings DEFAULT_ADMIN_REQUESTS_LIMIT_EXCEEDED_RETRY_SETTINGS = RetrySettings.newBuilder() .setInitialRetryDelay(Duration.ofSeconds(5L)) diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerRetryHelper.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerRetryHelper.java index 9c49efe2f11..a25c706e8a7 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerRetryHelper.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerRetryHelper.java @@ -23,6 +23,7 @@ import com.google.api.gax.retrying.TimedAttemptSettings; import com.google.cloud.RetryHelper; import com.google.cloud.RetryHelper.RetryHelperException; +import com.google.cloud.spanner.ErrorHandler.DefaultErrorHandler; import com.google.cloud.spanner.v1.stub.SpannerStub; import com.google.cloud.spanner.v1.stub.SpannerStubSettings; import com.google.common.annotations.VisibleForTesting; @@ -65,7 +66,12 @@ class SpannerRetryHelper { /** Executes the {@link Callable} and retries if it fails with an {@link AbortedException}. */ static T runTxWithRetriesOnAborted(Callable callable) { - return runTxWithRetriesOnAborted(callable, txRetrySettings, NanoClock.getDefaultClock()); + return runTxWithRetriesOnAborted(callable, DefaultErrorHandler.INSTANCE); + } + + static T runTxWithRetriesOnAborted(Callable callable, ErrorHandler errorHandler) { + return runTxWithRetriesOnAborted( + callable, errorHandler, txRetrySettings, NanoClock.getDefaultClock()); } /** @@ -75,11 +81,20 @@ static T runTxWithRetriesOnAborted(Callable callable) { @VisibleForTesting static T runTxWithRetriesOnAborted( Callable callable, RetrySettings retrySettings, ApiClock clock) { + return runTxWithRetriesOnAborted(callable, DefaultErrorHandler.INSTANCE, retrySettings, clock); + } + + @VisibleForTesting + static T runTxWithRetriesOnAborted( + Callable callable, + ErrorHandler errorHandler, + RetrySettings retrySettings, + ApiClock clock) { try { return RetryHelper.runWithRetries(callable, retrySettings, new TxRetryAlgorithm<>(), clock); } catch (RetryHelperException e) { if (e.getCause() != null) { - Throwables.throwIfUnchecked(e.getCause()); + Throwables.throwIfUnchecked(errorHandler.translateException(e.getCause())); } throw e; } @@ -107,9 +122,8 @@ public boolean shouldRetry(Throwable prevThrowable, T prevResponse) if (Context.current().isCancelled()) { throw SpannerExceptionFactory.newSpannerExceptionForCancellation(Context.current(), null); } - return prevThrowable != null - && (prevThrowable instanceof AbortedException - || prevThrowable instanceof com.google.api.gax.rpc.AbortedException); + return prevThrowable instanceof AbortedException + || prevThrowable instanceof com.google.api.gax.rpc.AbortedException; } } } diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionRunnerImpl.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionRunnerImpl.java index 7219389e775..40929aeb3a8 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionRunnerImpl.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionRunnerImpl.java @@ -1180,7 +1180,7 @@ private T runInternal(final TransactionCallable txCallable) { throw e; } }; - return SpannerRetryHelper.runTxWithRetriesOnAborted(retryCallable); + return SpannerRetryHelper.runTxWithRetriesOnAborted(retryCallable, session.getErrorHandler()); } @Override diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java index 00ae72f169a..e1e15b851b4 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java @@ -201,6 +201,7 @@ import java.util.concurrent.Callable; import java.util.concurrent.CancellationException; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedDeque; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; @@ -262,6 +263,9 @@ public class GapicSpannerRpc implements SpannerRpc { private final ScheduledExecutorService spannerWatchdog; + private final ConcurrentLinkedDeque responseObservers = + new ConcurrentLinkedDeque<>(); + private final boolean throttleAdministrativeRequests; private final RetrySettings retryAdministrativeRequestsSettings; private static final double ADMINISTRATIVE_REQUESTS_RATE_LIMIT = 1.0D; @@ -2004,9 +2008,29 @@ GrpcCallContext newCallContext( return (GrpcCallContext) context.merge(apiCallContextFromContext); } + void registerResponseObserver(SpannerResponseObserver responseObserver) { + responseObservers.add(responseObserver); + } + + void unregisterResponseObserver(SpannerResponseObserver responseObserver) { + responseObservers.remove(responseObserver); + } + + void closeResponseObservers() { + responseObservers.forEach(SpannerResponseObserver::close); + responseObservers.clear(); + } + + @InternalApi + @VisibleForTesting + public int getNumActiveResponseObservers() { + return responseObservers.size(); + } + @Override public void shutdown() { this.rpcIsClosed = true; + closeResponseObservers(); if (this.spannerStub != null) { this.spannerStub.close(); this.partitionedDmlStub.close(); @@ -2028,6 +2052,7 @@ public void shutdown() { public void shutdownNow() { this.rpcIsClosed = true; + closeResponseObservers(); this.spannerStub.close(); this.partitionedDmlStub.close(); this.instanceAdminStub.close(); @@ -2085,7 +2110,7 @@ public void cancel(@Nullable String message) { * A {@code ResponseObserver} that exposes the {@code StreamController} and delegates callbacks to * the {@link ResultStreamConsumer}. */ - private static class SpannerResponseObserver implements ResponseObserver { + private class SpannerResponseObserver implements ResponseObserver { private StreamController controller; private final ResultStreamConsumer consumer; @@ -2094,13 +2119,21 @@ public SpannerResponseObserver(ResultStreamConsumer consumer) { this.consumer = consumer; } + void close() { + if (this.controller != null) { + this.controller.cancel(); + } + } + @Override public void onStart(StreamController controller) { - // Disable the auto flow control to allow client library // set the number of messages it prefers to request controller.disableAutoInboundFlowControl(); this.controller = controller; + if (this.consumer.cancelQueryWhenClientIsClosed()) { + registerResponseObserver(this); + } } @Override @@ -2110,11 +2143,19 @@ public void onResponse(PartialResultSet response) { @Override public void onError(Throwable t) { + // Unregister the response observer when the query has completed with an error. + if (this.consumer.cancelQueryWhenClientIsClosed()) { + unregisterResponseObserver(this); + } consumer.onError(newSpannerException(t)); } @Override public void onComplete() { + // Unregister the response observer when the query has completed normally. + if (this.consumer.cancelQueryWhenClientIsClosed()) { + unregisterResponseObserver(this); + } consumer.onCompleted(); } diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/SpannerRpc.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/SpannerRpc.java index f07a28fb918..0b040df4197 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/SpannerRpc.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/SpannerRpc.java @@ -94,7 +94,8 @@ T get(@Nullable Map options) { return (T) options.get(this); } - Long getLong(@Nullable Map options) { + @InternalApi + public Long getLong(@Nullable Map options) { return get(options); } @@ -152,6 +153,15 @@ interface ResultStreamConsumer { void onCompleted(); void onError(SpannerException e); + + /** + * Returns true if the stream should be cancelled when the Spanner client is closed. This + * returns true for {@link com.google.cloud.spanner.BatchReadOnlyTransaction}, as these use a + * non-pooled session. Pooled sessions are deleted when the Spanner client is closed, and this + * automatically also cancels any query that uses the session, which means that we don't need to + * explicitly cancel those queries when the Spanner client is closed. + */ + boolean cancelQueryWhenClientIsClosed(); } /** Handle for cancellation of a streaming read or query call. */ diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/AsyncTransactionManagerTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/AsyncTransactionManagerTest.java index 7318e6e2c31..9b05a18d714 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/AsyncTransactionManagerTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/AsyncTransactionManagerTest.java @@ -46,6 +46,7 @@ import com.google.common.collect.Range; import com.google.common.util.concurrent.MoreExecutors; import com.google.protobuf.AbstractMessage; +import com.google.protobuf.GeneratedMessageV3; import com.google.spanner.v1.BatchCreateSessionsRequest; import com.google.spanner.v1.BeginTransactionRequest; import com.google.spanner.v1.CommitRequest; @@ -345,33 +346,22 @@ public void asyncTransactionManagerFireAndForgetInvalidUpdate() throws Exception } } } + ImmutableList> expectedRequests = + ImmutableList.of( + BatchCreateSessionsRequest.class, + // The first update that fails. This will cause a transaction retry. + ExecuteSqlRequest.class, + // The retry will use an explicit BeginTransaction call. + BeginTransactionRequest.class, + // The first update will again fail, but now there is a transaction id, so the + // transaction can continue. + ExecuteSqlRequest.class, + ExecuteSqlRequest.class, + CommitRequest.class); if (isMultiplexedSessionsEnabled()) { - assertThat(mockSpanner.getRequestTypes()) - .containsExactly( - CreateSessionRequest.class, - BatchCreateSessionsRequest.class, - // The first update that fails. This will cause a transaction retry. - ExecuteSqlRequest.class, - // The retry will use an explicit BeginTransaction call. - BeginTransactionRequest.class, - // The first update will again fail, but now there is a transaction id, so the - // transaction can continue. - ExecuteSqlRequest.class, - ExecuteSqlRequest.class, - CommitRequest.class); + assertThat(mockSpanner.getRequestTypes()).containsAtLeastElementsIn(expectedRequests); } else { - assertThat(mockSpanner.getRequestTypes()) - .containsExactly( - BatchCreateSessionsRequest.class, - // The first update that fails. This will cause a transaction retry. - ExecuteSqlRequest.class, - // The retry will use an explicit BeginTransaction call. - BeginTransactionRequest.class, - // The first update will again fail, but now there is a transaction id, so the - // transaction can continue. - ExecuteSqlRequest.class, - ExecuteSqlRequest.class, - CommitRequest.class); + assertThat(mockSpanner.getRequestTypes()).containsExactlyElementsIn(expectedRequests); } } @@ -681,21 +671,16 @@ public void asyncTransactionManagerFireAndForgetInvalidBatchUpdate() throws Exce } } } + ImmutableList> expectedRequests = + ImmutableList.of( + BatchCreateSessionsRequest.class, + ExecuteBatchDmlRequest.class, + ExecuteBatchDmlRequest.class, + CommitRequest.class); if (isMultiplexedSessionsEnabled()) { - assertThat(mockSpanner.getRequestTypes()) - .containsExactly( - CreateSessionRequest.class, - BatchCreateSessionsRequest.class, - ExecuteBatchDmlRequest.class, - ExecuteBatchDmlRequest.class, - CommitRequest.class); + assertThat(mockSpanner.getRequestTypes()).containsAtLeastElementsIn(expectedRequests); } else { - assertThat(mockSpanner.getRequestTypes()) - .containsExactly( - BatchCreateSessionsRequest.class, - ExecuteBatchDmlRequest.class, - ExecuteBatchDmlRequest.class, - CommitRequest.class); + assertThat(mockSpanner.getRequestTypes()).containsExactlyElementsIn(expectedRequests); } } @@ -729,23 +714,17 @@ public void asyncTransactionManagerBatchUpdateAborted() throws Exception { assertThat(attempt.get()).isEqualTo(2); // There should only be 1 CommitRequest, as the first attempt should abort already after the // ExecuteBatchDmlRequest. + ImmutableList> expectedRequests = + ImmutableList.of( + BatchCreateSessionsRequest.class, + ExecuteBatchDmlRequest.class, + BeginTransactionRequest.class, + ExecuteBatchDmlRequest.class, + CommitRequest.class); if (isMultiplexedSessionsEnabled()) { - assertThat(mockSpanner.getRequestTypes()) - .containsExactly( - CreateSessionRequest.class, - BatchCreateSessionsRequest.class, - ExecuteBatchDmlRequest.class, - BeginTransactionRequest.class, - ExecuteBatchDmlRequest.class, - CommitRequest.class); + assertThat(mockSpanner.getRequestTypes()).containsAtLeastElementsIn(expectedRequests); } else { - assertThat(mockSpanner.getRequestTypes()) - .containsExactly( - BatchCreateSessionsRequest.class, - ExecuteBatchDmlRequest.class, - BeginTransactionRequest.class, - ExecuteBatchDmlRequest.class, - CommitRequest.class); + assertThat(mockSpanner.getRequestTypes()).containsExactlyElementsIn(expectedRequests); } } @@ -777,23 +756,17 @@ public void asyncTransactionManagerBatchUpdateAbortedBeforeFirstStatement() thro assertThat(attempt.get()).isEqualTo(2); // There should only be 1 CommitRequest, as the first attempt should abort already after the // ExecuteBatchDmlRequest. + ImmutableList> expectedRequests = + ImmutableList.of( + BatchCreateSessionsRequest.class, + ExecuteBatchDmlRequest.class, + BeginTransactionRequest.class, + ExecuteBatchDmlRequest.class, + CommitRequest.class); if (isMultiplexedSessionsEnabled()) { - assertThat(mockSpanner.getRequestTypes()) - .containsExactly( - CreateSessionRequest.class, - BatchCreateSessionsRequest.class, - ExecuteBatchDmlRequest.class, - BeginTransactionRequest.class, - ExecuteBatchDmlRequest.class, - CommitRequest.class); + assertThat(mockSpanner.getRequestTypes()).containsAtLeastElementsIn(expectedRequests); } else { - assertThat(mockSpanner.getRequestTypes()) - .containsExactly( - BatchCreateSessionsRequest.class, - ExecuteBatchDmlRequest.class, - BeginTransactionRequest.class, - ExecuteBatchDmlRequest.class, - CommitRequest.class); + assertThat(mockSpanner.getRequestTypes()).containsExactlyElementsIn(expectedRequests); } } @@ -843,25 +816,18 @@ public void asyncTransactionManagerWithBatchUpdateCommitAborted() throws Excepti } finally { mockSpanner.putStatementResult(StatementResult.update(UPDATE_STATEMENT, UPDATE_COUNT)); } + ImmutableList> expectedRequests = + ImmutableList.of( + BatchCreateSessionsRequest.class, + ExecuteBatchDmlRequest.class, + CommitRequest.class, + BeginTransactionRequest.class, + ExecuteBatchDmlRequest.class, + CommitRequest.class); if (isMultiplexedSessionsEnabled()) { - assertThat(mockSpanner.getRequestTypes()) - .containsExactly( - CreateSessionRequest.class, - BatchCreateSessionsRequest.class, - ExecuteBatchDmlRequest.class, - CommitRequest.class, - BeginTransactionRequest.class, - ExecuteBatchDmlRequest.class, - CommitRequest.class); + assertThat(mockSpanner.getRequestTypes()).containsAtLeastElementsIn(expectedRequests); } else { - assertThat(mockSpanner.getRequestTypes()) - .containsExactly( - BatchCreateSessionsRequest.class, - ExecuteBatchDmlRequest.class, - CommitRequest.class, - BeginTransactionRequest.class, - ExecuteBatchDmlRequest.class, - CommitRequest.class); + assertThat(mockSpanner.getRequestTypes()).containsExactlyElementsIn(expectedRequests); } } @@ -925,7 +891,7 @@ public void asyncTransactionManagerBatchUpdateAbortedWithoutGettingResult() thro } @Test - public void asyncTransactionManagerWithBatchUpdateCommitFails() throws Exception { + public void asyncTransactionManagerWithBatchUpdateCommitFails() { mockSpanner.setCommitExecutionTime( SimulatedExecutionTime.ofException( Status.INVALID_ARGUMENT @@ -948,17 +914,13 @@ public void asyncTransactionManagerWithBatchUpdateCommitFails() throws Exception assertThat(e.getErrorCode()).isEqualTo(ErrorCode.INVALID_ARGUMENT); assertThat(e.getMessage()).contains("mutation limit exceeded"); } + ImmutableList> expectedRequests = + ImmutableList.of( + BatchCreateSessionsRequest.class, ExecuteBatchDmlRequest.class, CommitRequest.class); if (isMultiplexedSessionsEnabled()) { - assertThat(mockSpanner.getRequestTypes()) - .containsExactly( - CreateSessionRequest.class, - BatchCreateSessionsRequest.class, - ExecuteBatchDmlRequest.class, - CommitRequest.class); + assertThat(mockSpanner.getRequestTypes()).containsAtLeastElementsIn(expectedRequests); } else { - assertThat(mockSpanner.getRequestTypes()) - .containsExactly( - BatchCreateSessionsRequest.class, ExecuteBatchDmlRequest.class, CommitRequest.class); + assertThat(mockSpanner.getRequestTypes()).containsExactlyElementsIn(expectedRequests); } } @@ -983,17 +945,13 @@ public void asyncTransactionManagerWaitsUntilAsyncBatchUpdateHasFinished() throw } } } + ImmutableList> expectedRequests = + ImmutableList.of( + BatchCreateSessionsRequest.class, ExecuteBatchDmlRequest.class, CommitRequest.class); if (isMultiplexedSessionsEnabled()) { - assertThat(mockSpanner.getRequestTypes()) - .containsExactly( - CreateSessionRequest.class, - BatchCreateSessionsRequest.class, - ExecuteBatchDmlRequest.class, - CommitRequest.class); + assertThat(mockSpanner.getRequestTypes()).containsAtLeastElementsIn(expectedRequests); } else { - assertThat(mockSpanner.getRequestTypes()) - .containsExactly( - BatchCreateSessionsRequest.class, ExecuteBatchDmlRequest.class, CommitRequest.class); + assertThat(mockSpanner.getRequestTypes()).containsExactlyElementsIn(expectedRequests); } } diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/CloseSpannerWithOpenResultSetTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/CloseSpannerWithOpenResultSetTest.java new file mode 100644 index 00000000000..67b14f60a4e --- /dev/null +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/CloseSpannerWithOpenResultSetTest.java @@ -0,0 +1,164 @@ +/* + * Copyright 2024 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.spanner; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertThrows; +import static org.junit.Assert.assertTrue; +import static org.junit.Assume.assumeFalse; + +import com.google.cloud.NoCredentials; +import com.google.cloud.spanner.MockSpannerServiceImpl.StatementResult; +import com.google.cloud.spanner.connection.AbstractMockServerTest; +import com.google.cloud.spanner.spi.v1.GapicSpannerRpc; +import com.google.spanner.v1.DeleteSessionRequest; +import com.google.spanner.v1.ExecuteSqlRequest; +import io.grpc.ManagedChannelBuilder; +import io.grpc.Status; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import org.junit.After; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; +import org.threeten.bp.Duration; + +@RunWith(JUnit4.class) +public class CloseSpannerWithOpenResultSetTest extends AbstractMockServerTest { + + Spanner createSpanner() { + return SpannerOptions.newBuilder() + .setProjectId("p") + .setHost(String.format("http://localhost:%d", getPort())) + .setChannelConfigurator(ManagedChannelBuilder::usePlaintext) + .setCredentials(NoCredentials.getInstance()) + .setSessionPoolOption( + SessionPoolOptions.newBuilder().setWaitForMinSessions(Duration.ofSeconds(5L)).build()) + .build() + .getService(); + } + + @After + public void cleanup() { + mockSpanner.unfreeze(); + mockSpanner.clearRequests(); + } + + @Test + public void testBatchClient_closedSpannerWithOpenResultSet_streamsAreCancelled() { + Spanner spanner = createSpanner(); + assumeFalse(spanner.getOptions().getSessionPoolOptions().getUseMultiplexedSession()); + + BatchClient client = spanner.getBatchClient(DatabaseId.of("p", "i", "d")); + try (BatchReadOnlyTransaction transaction = + client.batchReadOnlyTransaction(TimestampBound.strong()); + ResultSet resultSet = transaction.executeQuery(SELECT_RANDOM_STATEMENT)) { + mockSpanner.freezeAfterReturningNumRows(1); + assertTrue(resultSet.next()); + ((SpannerImpl) spanner).close(1, TimeUnit.MILLISECONDS); + // This should return an error as the stream is cancelled. + SpannerException exception = assertThrows(SpannerException.class, resultSet::next); + assertEquals(ErrorCode.CANCELLED, exception.getErrorCode()); + } + } + + @Test + public void testNormalDatabaseClient_closedSpannerWithOpenResultSet_sessionsAreDeleted() + throws Exception { + Spanner spanner = createSpanner(); + assumeFalse(spanner.getOptions().getSessionPoolOptions().getUseMultiplexedSession()); + + DatabaseClient client = spanner.getDatabaseClient(DatabaseId.of("p", "i", "d")); + try (ReadOnlyTransaction transaction = client.readOnlyTransaction(TimestampBound.strong()); + ResultSet resultSet = transaction.executeQuery(SELECT_RANDOM_STATEMENT)) { + mockSpanner.freezeAfterReturningNumRows(1); + assertTrue(resultSet.next()); + List executeSqlRequests = + mockSpanner.getRequestsOfType(ExecuteSqlRequest.class).stream() + .filter(request -> request.getSql().equals(SELECT_RANDOM_STATEMENT.getSql())) + .collect(Collectors.toList()); + assertEquals(1, executeSqlRequests.size()); + ExecutorService service = Executors.newSingleThreadExecutor(); + service.submit(spanner::close); + // Verify that the session that is used by this transaction is deleted. + // That will automatically cancel the query. + mockSpanner.waitForRequestsToContain( + request -> + request instanceof DeleteSessionRequest + && ((DeleteSessionRequest) request) + .getName() + .equals(executeSqlRequests.get(0).getSession()), + /*timeoutMillis=*/ 1000L); + service.shutdownNow(); + } + } + + @Test + public void testStreamsAreCleanedUp() throws Exception { + String invalidSql = "select * from foo"; + Statement invalidStatement = Statement.of(invalidSql); + mockSpanner.putStatementResult( + StatementResult.exception( + invalidStatement, + Status.NOT_FOUND.withDescription("Table not found: foo").asRuntimeException())); + int numThreads = 16; + int numQueries = 32; + try (Spanner spanner = createSpanner()) { + BatchClient client = spanner.getBatchClient(DatabaseId.of("p", "i", "d")); + ExecutorService service = Executors.newFixedThreadPool(numThreads); + List> futures = new ArrayList<>(numQueries); + for (int n = 0; n < numQueries; n++) { + futures.add( + service.submit( + () -> { + try (BatchReadOnlyTransaction transaction = + client.batchReadOnlyTransaction(TimestampBound.strong())) { + if (ThreadLocalRandom.current().nextInt(10) < 2) { + try (ResultSet resultSet = transaction.executeQuery(invalidStatement)) { + SpannerException exception = + assertThrows(SpannerException.class, resultSet::next); + assertEquals(ErrorCode.NOT_FOUND, exception.getErrorCode()); + } + } else { + try (ResultSet resultSet = + transaction.executeQuery(SELECT_RANDOM_STATEMENT)) { + while (resultSet.next()) { + assertNotNull(resultSet.getCurrentRowAsStruct()); + } + } + } + } + })); + } + service.shutdown(); + for (Future fut : futures) { + fut.get(); + } + assertTrue(service.awaitTermination(1L, TimeUnit.MINUTES)); + // Verify that all response observers have been unregistered. + assertEquals( + 0, ((GapicSpannerRpc) ((SpannerImpl) spanner).getRpc()).getNumActiveResponseObservers()); + } + } +} diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/GrpcResultSetTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/GrpcResultSetTest.java index 2051e006d81..62336163eaf 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/GrpcResultSetTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/GrpcResultSetTest.java @@ -81,7 +81,7 @@ public void onDone(boolean withBeginTransaction) {} @Before public void setUp() { - stream = new GrpcStreamIterator(10); + stream = new GrpcStreamIterator(10, /*cancelQueryWhenClientIsClosed=*/ false); stream.setCall( new SpannerRpc.StreamingCall() { @Override diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MockSpannerServiceImpl.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MockSpannerServiceImpl.java index 5266ecad7c8..9f0a2822d87 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MockSpannerServiceImpl.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MockSpannerServiceImpl.java @@ -578,6 +578,7 @@ private static void checkStreamException( private final Object lock = new Object(); private Deque requests = new ConcurrentLinkedDeque<>(); private volatile CountDownLatch freezeLock = new CountDownLatch(0); + private final AtomicInteger freezeAfterReturningNumRows = new AtomicInteger(); private Queue exceptions = new ConcurrentLinkedQueue<>(); private boolean stickyGlobalExceptions = false; private ConcurrentMap statementResults = new ConcurrentHashMap<>(); @@ -784,6 +785,10 @@ public void unfreeze() { freezeLock.countDown(); } + public void freezeAfterReturningNumRows(int numRows) { + freezeAfterReturningNumRows.set(numRows); + } + public void setMaxSessionsInOneBatch(int max) { this.maxNumSessionsInOneBatch = max; } @@ -1678,7 +1683,8 @@ private void returnPartialResultSet( ByteString transactionId, TransactionSelector transactionSelector, StreamObserver responseObserver, - SimulatedExecutionTime executionTime) { + SimulatedExecutionTime executionTime) + throws Exception { ResultSetMetadata metadata = resultSet.getMetadata(); if (transactionId == null) { Transaction transaction = getTemporaryTransactionOrNull(transactionSelector); @@ -1700,6 +1706,12 @@ private void returnPartialResultSet( SimulatedExecutionTime.checkStreamException( index, executionTime.exceptions, executionTime.streamIndices); responseObserver.onNext(iterator.next()); + if (freezeAfterReturningNumRows.get() > 0) { + if (freezeAfterReturningNumRows.decrementAndGet() == 0) { + freeze(); + freezeLock.await(); + } + } index++; } responseObserver.onCompleted(); diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/OpenTelemetrySpanTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/OpenTelemetrySpanTest.java index c309932f0b2..a2aeb887733 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/OpenTelemetrySpanTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/OpenTelemetrySpanTest.java @@ -572,22 +572,13 @@ public void transactionRunnerWithError() { @Test public void transactionRunnerWithFailedAndBeginTransaction() { List expectedReadWriteTransactionWithCommitAndBeginTransactionSpans = - isMultiplexedSessionsEnabled() - ? ImmutableList.of( - "CloudSpannerOperation.CreateMultiplexedSession", - "CloudSpannerOperation.BeginTransaction", - "CloudSpannerOperation.BatchCreateSessionsRequest", - "CloudSpannerOperation.ExecuteUpdate", - "CloudSpannerOperation.Commit", - "CloudSpannerOperation.BatchCreateSessions", - "CloudSpanner.ReadWriteTransaction") - : ImmutableList.of( - "CloudSpannerOperation.BeginTransaction", - "CloudSpannerOperation.BatchCreateSessionsRequest", - "CloudSpannerOperation.ExecuteUpdate", - "CloudSpannerOperation.Commit", - "CloudSpannerOperation.BatchCreateSessions", - "CloudSpanner.ReadWriteTransaction"); + ImmutableList.of( + "CloudSpannerOperation.BeginTransaction", + "CloudSpannerOperation.BatchCreateSessionsRequest", + "CloudSpannerOperation.ExecuteUpdate", + "CloudSpannerOperation.Commit", + "CloudSpannerOperation.BatchCreateSessions", + "CloudSpanner.ReadWriteTransaction"); DatabaseClient client = getClient(); assertEquals( Long.valueOf(1L), @@ -612,7 +603,7 @@ public void transactionRunnerWithFailedAndBeginTransaction() { Stopwatch stopwatch = Stopwatch.createStarted(); while (spanExporter.getFinishedSpanItems().size() < expectedReadWriteTransactionWithCommitAndBeginTransactionSpans.size() - && stopwatch.elapsed().compareTo(java.time.Duration.ofMillis(1000)) < 0) { + && stopwatch.elapsed(TimeUnit.MILLISECONDS) < 2000) { Thread.yield(); } @@ -621,7 +612,11 @@ public void transactionRunnerWithFailedAndBeginTransaction() { .getFinishedSpanItems() .forEach( spanItem -> { - actualSpanItems.add(spanItem.getName()); + // Ignore multiplexed sessions, as they are not used by this test and can therefore + // best be ignored, as it is not 100% certain that it has already been created. + if (!"CloudSpannerOperation.CreateMultiplexedSession".equals(spanItem.getName())) { + actualSpanItems.add(spanItem.getName()); + } switch (spanItem.getName()) { case "CloudSpannerOperation.CreateMultiplexedSession": verifyRequestEvents( diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/ReadFormatTestRunner.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/ReadFormatTestRunner.java index 8d97d9d894b..c973b7e471e 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/ReadFormatTestRunner.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/ReadFormatTestRunner.java @@ -114,7 +114,7 @@ private static class TestCaseRunner { } private void run() throws Exception { - stream = new GrpcStreamIterator(10); + stream = new GrpcStreamIterator(10, /*cancelQueryWhenClientIsClosed=*/ false); stream.setCall( new SpannerRpc.StreamingCall() { @Override diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/ResumableStreamIteratorTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/ResumableStreamIteratorTest.java index 88080a39ef5..d126719ebb8 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/ResumableStreamIteratorTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/ResumableStreamIteratorTest.java @@ -24,6 +24,7 @@ import static org.mockito.Mockito.when; import com.google.api.client.util.BackOff; +import com.google.cloud.spanner.ErrorHandler.DefaultErrorHandler; import com.google.cloud.spanner.v1.stub.SpannerStubSettings; import com.google.common.collect.AbstractIterator; import com.google.common.collect.ImmutableList; @@ -158,6 +159,7 @@ private void initWithLimit(int maxBufferSize) { "", new OpenTelemetrySpan(mock(io.opentelemetry.api.trace.Span.class)), new TraceWrapper(Tracing.getTracer(), OpenTelemetry.noop().getTracer(""), false), + DefaultErrorHandler.INSTANCE, SpannerStubSettings.newBuilder().executeStreamingSqlSettings().getRetrySettings(), SpannerStubSettings.newBuilder().executeStreamingSqlSettings().getRetryableCodes()) { @Override diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/RetryOnDifferentGrpcChannelMockServerTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/RetryOnDifferentGrpcChannelMockServerTest.java new file mode 100644 index 00000000000..b5e3e2e54cf --- /dev/null +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/RetryOnDifferentGrpcChannelMockServerTest.java @@ -0,0 +1,362 @@ +/* + * Copyright 2024 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.spanner; + +import static io.grpc.Grpc.TRANSPORT_ATTR_REMOTE_ADDR; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertThrows; +import static org.junit.Assert.assertTrue; + +import com.google.cloud.NoCredentials; +import com.google.cloud.spanner.MockSpannerServiceImpl.SimulatedExecutionTime; +import com.google.cloud.spanner.connection.AbstractMockServerTest; +import com.google.common.collect.ImmutableSet; +import com.google.spanner.v1.BatchCreateSessionsRequest; +import com.google.spanner.v1.BeginTransactionRequest; +import com.google.spanner.v1.ExecuteSqlRequest; +import io.grpc.Attributes; +import io.grpc.Context; +import io.grpc.Deadline; +import io.grpc.ManagedChannelBuilder; +import io.grpc.Metadata; +import io.grpc.ServerCall; +import io.grpc.ServerCall.Listener; +import io.grpc.ServerCallHandler; +import io.grpc.ServerInterceptor; +import io.grpc.Status; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; +import org.threeten.bp.Duration; + +@RunWith(JUnit4.class) +public class RetryOnDifferentGrpcChannelMockServerTest extends AbstractMockServerTest { + private static final Map> SERVER_ADDRESSES = new HashMap<>(); + + @BeforeClass + public static void startStaticServer() throws IOException { + System.setProperty("spanner.retry_deadline_exceeded_on_different_channel", "true"); + startStaticServer(createServerInterceptor()); + } + + @AfterClass + public static void removeSystemProperty() { + System.clearProperty("spanner.retry_deadline_exceeded_on_different_channel"); + } + + @After + public void clearRequests() { + SERVER_ADDRESSES.clear(); + mockSpanner.clearRequests(); + mockSpanner.removeAllExecutionTimes(); + } + + static ServerInterceptor createServerInterceptor() { + return new ServerInterceptor() { + @Override + public Listener interceptCall( + ServerCall serverCall, + Metadata metadata, + ServerCallHandler serverCallHandler) { + Attributes attributes = serverCall.getAttributes(); + //noinspection unchecked,deprecation + Attributes.Key key = + (Attributes.Key) + attributes.keys().stream() + .filter(k -> k.equals(TRANSPORT_ATTR_REMOTE_ADDR)) + .findFirst() + .orElse(null); + if (key != null) { + InetSocketAddress address = attributes.get(key); + synchronized (SERVER_ADDRESSES) { + Set addresses = + SERVER_ADDRESSES.getOrDefault( + serverCall.getMethodDescriptor().getFullMethodName(), new HashSet<>()); + addresses.add(address); + SERVER_ADDRESSES.putIfAbsent( + serverCall.getMethodDescriptor().getFullMethodName(), addresses); + } + } + return serverCallHandler.startCall(serverCall, metadata); + } + }; + } + + SpannerOptions.Builder createSpannerOptionsBuilder() { + return SpannerOptions.newBuilder() + .setProjectId("my-project") + .setHost(String.format("http://localhost:%d", getPort())) + .setChannelConfigurator(ManagedChannelBuilder::usePlaintext) + .setCredentials(NoCredentials.getInstance()); + } + + @Test + public void testReadWriteTransaction_retriesOnNewChannel() { + SpannerOptions.Builder builder = createSpannerOptionsBuilder(); + builder.setSessionPoolOption( + SessionPoolOptions.newBuilder().setWaitForMinSessions(Duration.ofSeconds(5L)).build()); + mockSpanner.setBeginTransactionExecutionTime( + SimulatedExecutionTime.ofStickyException(Status.DEADLINE_EXCEEDED.asRuntimeException())); + AtomicInteger attempts = new AtomicInteger(); + + try (Spanner spanner = builder.build().getService()) { + DatabaseClient client = spanner.getDatabaseClient(DatabaseId.of("p", "i", "d")); + client + .readWriteTransaction() + .run( + transaction -> { + if (attempts.incrementAndGet() > 1) { + mockSpanner.setBeginTransactionExecutionTime( + MockSpannerServiceImpl.NO_EXECUTION_TIME); + } + transaction.buffer(Mutation.newInsertBuilder("foo").set("id").to(1L).build()); + return null; + }); + } + assertEquals(2, mockSpanner.countRequestsOfType(BeginTransactionRequest.class)); + List requests = + mockSpanner.getRequestsOfType(BeginTransactionRequest.class); + assertNotEquals(requests.get(0).getSession(), requests.get(1).getSession()); + assertEquals( + 2, + SERVER_ADDRESSES + .getOrDefault("google.spanner.v1.Spanner/BeginTransaction", ImmutableSet.of()) + .size()); + } + + @Test + public void testReadWriteTransaction_stopsRetrying() { + SpannerOptions.Builder builder = createSpannerOptionsBuilder(); + builder.setSessionPoolOption( + SessionPoolOptions.newBuilder().setWaitForMinSessions(Duration.ofSeconds(5L)).build()); + mockSpanner.setBeginTransactionExecutionTime( + SimulatedExecutionTime.ofStickyException(Status.DEADLINE_EXCEEDED.asRuntimeException())); + + try (Spanner spanner = builder.build().getService()) { + DatabaseClient client = spanner.getDatabaseClient(DatabaseId.of("p", "i", "d")); + SpannerException exception = + assertThrows( + SpannerException.class, + () -> + client + .readWriteTransaction() + .run( + transaction -> { + transaction.buffer( + Mutation.newInsertBuilder("foo").set("id").to(1L).build()); + return null; + })); + assertEquals(ErrorCode.DEADLINE_EXCEEDED, exception.getErrorCode()); + + int numChannels = spanner.getOptions().getNumChannels(); + assertEquals(numChannels, mockSpanner.countRequestsOfType(BeginTransactionRequest.class)); + List requests = + mockSpanner.getRequestsOfType(BeginTransactionRequest.class); + Set sessions = + requests.stream().map(BeginTransactionRequest::getSession).collect(Collectors.toSet()); + assertEquals(numChannels, sessions.size()); + assertEquals( + numChannels, + SERVER_ADDRESSES + .getOrDefault("google.spanner.v1.Spanner/BeginTransaction", ImmutableSet.of()) + .size()); + } + } + + @Test + public void testDenyListedChannelIsCleared() { + FakeClock clock = new FakeClock(); + SpannerOptions.Builder builder = createSpannerOptionsBuilder(); + builder.setSessionPoolOption( + SessionPoolOptions.newBuilder() + .setWaitForMinSessions(Duration.ofSeconds(5)) + .setPoolMaintainerClock(clock) + .build()); + mockSpanner.setBeginTransactionExecutionTime( + SimulatedExecutionTime.ofStickyException(Status.DEADLINE_EXCEEDED.asRuntimeException())); + + try (Spanner spanner = builder.build().getService()) { + DatabaseClient client = spanner.getDatabaseClient(DatabaseId.of("p", "i", "d")); + + // Retry until all channels have been deny-listed. + SpannerException exception = + assertThrows( + SpannerException.class, + () -> + client + .readWriteTransaction() + .run( + transaction -> { + transaction.buffer( + Mutation.newInsertBuilder("foo").set("id").to(1L).build()); + return null; + })); + assertEquals(ErrorCode.DEADLINE_EXCEEDED, exception.getErrorCode()); + + // Now advance the clock by 1 minute. This should clear all deny-listed channels. + clock.currentTimeMillis.addAndGet(TimeUnit.MILLISECONDS.convert(2L, TimeUnit.MINUTES)); + AtomicInteger attempts = new AtomicInteger(); + client + .readWriteTransaction() + .run( + transaction -> { + if (attempts.incrementAndGet() > 1) { + mockSpanner.setBeginTransactionExecutionTime(SimulatedExecutionTime.none()); + } + transaction.buffer(Mutation.newInsertBuilder("foo").set("id").to(1L).build()); + return null; + }); + + int numChannels = spanner.getOptions().getNumChannels(); + // We should have numChannels BeginTransactionRequests from the first transaction, and 2 from + // the second transaction. + assertEquals(numChannels + 2, mockSpanner.countRequestsOfType(BeginTransactionRequest.class)); + List requests = + mockSpanner.getRequestsOfType(BeginTransactionRequest.class); + // The requests should all use different sessions, as deny-listing a session will bring it to + // the back of the session pool. + Set sessions = + requests.stream().map(BeginTransactionRequest::getSession).collect(Collectors.toSet()); + // We should have used numChannels+1==5 sessions. The reason for that is that first 3 attempts + // of the first transaction used 3 different sessions, that were then all deny-listed. The + // 4th attempt also failed, but as it would be the last channel to be deny-listed, it was not + // deny-listed and instead added to the front of the pool. + // The first attempt of the second transaction then uses the same session as the last attempt + // of the first transaction. That fails, the session is deny-listed, the transaction is + // retried on yet another session and succeeds. + assertEquals(numChannels + 1, sessions.size()); + assertEquals( + numChannels, + SERVER_ADDRESSES + .getOrDefault("google.spanner.v1.Spanner/BeginTransaction", ImmutableSet.of()) + .size()); + assertEquals(numChannels, mockSpanner.countRequestsOfType(BatchCreateSessionsRequest.class)); + } + } + + @Test + public void testSingleUseQuery_retriesOnNewChannel() { + SpannerOptions.Builder builder = createSpannerOptionsBuilder(); + builder.setSessionPoolOption( + SessionPoolOptions.newBuilder().setUseMultiplexedSession(true).build()); + mockSpanner.setExecuteStreamingSqlExecutionTime( + SimulatedExecutionTime.ofException(Status.DEADLINE_EXCEEDED.asRuntimeException())); + + try (Spanner spanner = builder.build().getService()) { + DatabaseClient client = spanner.getDatabaseClient(DatabaseId.of("p", "i", "d")); + try (ResultSet resultSet = client.singleUse().executeQuery(SELECT1_STATEMENT)) { + assertTrue(resultSet.next()); + assertEquals(1L, resultSet.getLong(0)); + assertFalse(resultSet.next()); + } + } + assertEquals(2, mockSpanner.countRequestsOfType(ExecuteSqlRequest.class)); + List requests = mockSpanner.getRequestsOfType(ExecuteSqlRequest.class); + // The requests use the same multiplexed session. + assertEquals(requests.get(0).getSession(), requests.get(1).getSession()); + // The requests use two different gRPC channels. + assertEquals( + 2, + SERVER_ADDRESSES + .getOrDefault("google.spanner.v1.Spanner/ExecuteStreamingSql", ImmutableSet.of()) + .size()); + } + + @Test + public void testSingleUseQuery_stopsRetrying() { + SpannerOptions.Builder builder = createSpannerOptionsBuilder(); + builder.setSessionPoolOption( + SessionPoolOptions.newBuilder().setUseMultiplexedSession(true).build()); + mockSpanner.setExecuteStreamingSqlExecutionTime( + SimulatedExecutionTime.ofStickyException(Status.DEADLINE_EXCEEDED.asRuntimeException())); + + try (Spanner spanner = builder.build().getService()) { + DatabaseClient client = spanner.getDatabaseClient(DatabaseId.of("p", "i", "d")); + try (ResultSet resultSet = client.singleUse().executeQuery(SELECT1_STATEMENT)) { + SpannerException exception = assertThrows(SpannerException.class, resultSet::next); + assertEquals(ErrorCode.DEADLINE_EXCEEDED, exception.getErrorCode()); + } + int numChannels = spanner.getOptions().getNumChannels(); + assertEquals(numChannels, mockSpanner.countRequestsOfType(ExecuteSqlRequest.class)); + List requests = mockSpanner.getRequestsOfType(ExecuteSqlRequest.class); + // The requests use the same multiplexed session. + String session = requests.get(0).getSession(); + for (ExecuteSqlRequest request : requests) { + assertEquals(session, request.getSession()); + } + // The requests use all gRPC channels. + assertEquals( + numChannels, + SERVER_ADDRESSES + .getOrDefault("google.spanner.v1.Spanner/ExecuteStreamingSql", ImmutableSet.of()) + .size()); + } + } + + @Test + public void testReadWriteTransaction_withGrpcContextDeadline_doesNotRetry() { + SpannerOptions.Builder builder = createSpannerOptionsBuilder(); + builder.setSessionPoolOption( + SessionPoolOptions.newBuilder().setWaitForMinSessions(Duration.ofSeconds(5L)).build()); + mockSpanner.setBeginTransactionExecutionTime( + SimulatedExecutionTime.ofMinimumAndRandomTime(500, 500)); + + try (Spanner spanner = builder.build().getService()) { + DatabaseClient client = spanner.getDatabaseClient(DatabaseId.of("p", "i", "d")); + ScheduledExecutorService service = Executors.newScheduledThreadPool(1); + Context context = + Context.current().withDeadline(Deadline.after(50L, TimeUnit.MILLISECONDS), service); + SpannerException exception = + assertThrows( + SpannerException.class, + () -> + context.run( + () -> + client + .readWriteTransaction() + .run( + transaction -> { + transaction.buffer( + Mutation.newInsertBuilder("foo").set("id").to(1L).build()); + return null; + }))); + assertEquals(ErrorCode.DEADLINE_EXCEEDED, exception.getErrorCode()); + } + // A gRPC context deadline will still cause the underlying error handler to try to retry the + // transaction on a new channel, but as the deadline has been exceeded even before those RPCs + // are being executed, the RPC invocation will be skipped, and the error will eventually bubble + // up. + assertEquals(1, mockSpanner.countRequestsOfType(BeginTransactionRequest.class)); + } +} diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolTest.java index 58b0280cf08..1bf58fa7ecf 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolTest.java @@ -59,6 +59,7 @@ import com.google.api.core.ApiFutures; import com.google.cloud.Timestamp; +import com.google.cloud.spanner.ErrorHandler.DefaultErrorHandler; import com.google.cloud.spanner.MetricRegistryTestUtils.FakeMetricRegistry; import com.google.cloud.spanner.MetricRegistryTestUtils.MetricsRecord; import com.google.cloud.spanner.MetricRegistryTestUtils.PointWithFunction; @@ -1294,7 +1295,7 @@ public void blockAndTimeoutOnPoolExhaustion_withAcquireSessionTimeout() throws E .setMinSessions(minSessions) .setMaxSessions(1) .setInitialWaitForSessionTimeoutMillis(20L) - .setAcquireSessionTimeout(Duration.ofMillis(20L)) + .setAcquireSessionTimeout(null) .build(); setupMockSessionCreation(); pool = createPool(); @@ -1307,18 +1308,18 @@ public void blockAndTimeoutOnPoolExhaustion_withAcquireSessionTimeout() throws E Future fut = executor.submit( () -> { - latch.countDown(); PooledSessionFuture session = pool.getSession(); + latch.countDown(); + session.get(); session.close(); return null; }); // Wait until the background thread is actually waiting for a session. latch.await(); // Wait until the request has timed out. - int waitCount = 0; - while (pool.getNumWaiterTimeouts() == 0L && waitCount < 5000) { - Thread.sleep(1L); - waitCount++; + Stopwatch watch = Stopwatch.createStarted(); + while (pool.getNumWaiterTimeouts() == 0L && watch.elapsed(TimeUnit.MILLISECONDS) < 1000) { + Thread.yield(); } // Return the checked out session to the pool so the async request will get a session and // finish. @@ -1326,10 +1327,11 @@ public void blockAndTimeoutOnPoolExhaustion_withAcquireSessionTimeout() throws E // Verify that the async request also succeeds. fut.get(10L, TimeUnit.SECONDS); executor.shutdown(); + assertTrue(executor.awaitTermination(10L, TimeUnit.SECONDS)); // Verify that the session was returned to the pool and that we can get it again. - Session session = pool.getSession(); - assertThat(session).isNotNull(); + PooledSessionFuture session = pool.getSession(); + assertThat(session.get()).isNotNull(); session.close(); assertThat(pool.getNumWaiterTimeouts()).isAtLeast(1L); } @@ -1477,6 +1479,7 @@ public void testSessionNotFoundReadWriteTransaction() { final SessionImpl closedSession = mock(SessionImpl.class); when(closedSession.getName()) .thenReturn("projects/dummy/instances/dummy/database/dummy/sessions/session-closed"); + when(closedSession.getErrorHandler()).thenReturn(DefaultErrorHandler.INSTANCE); Span oTspan = mock(Span.class); ISpan span = new OpenTelemetrySpan(oTspan); @@ -1501,6 +1504,7 @@ public void testSessionNotFoundReadWriteTransaction() { when(closedSession.readWriteTransaction()).thenReturn(closedTransactionRunner); final SessionImpl openSession = mock(SessionImpl.class); + when(openSession.getErrorHandler()).thenReturn(DefaultErrorHandler.INSTANCE); when(openSession.asyncClose()) .thenReturn(ApiFutures.immediateFuture(Empty.getDefaultInstance())); when(openSession.getName()) diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SpannerOptionsTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SpannerOptionsTest.java index 522cd0147a3..73455f06688 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SpannerOptionsTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SpannerOptionsTest.java @@ -104,16 +104,21 @@ public static void resetLogging() { @Test public void defaultBuilder() { - // We need to set the project id since in test environment we cannot obtain a default project - // id. - SpannerOptions options = SpannerOptions.newBuilder().setProjectId("test-project").build(); + // We need to set the project id and credentials since in test environments we cannot guarantee + // that a default project id and credentials are available. + SpannerOptions options = + SpannerOptions.newBuilder() + .setProjectId("test-project") + .setCredentials(NoCredentials.getInstance()) + .build(); if (Strings.isNullOrEmpty(System.getenv("SPANNER_EMULATOR_HOST"))) { - assertThat(options.getHost()).isEqualTo("https://spanner.googleapis.com"); + assertEquals("https://spanner.googleapis.com", options.getHost()); } else { - assertThat(options.getHost()).isEqualTo("https://" + System.getenv("SPANNER_EMULATOR_HOST")); + assertEquals("https://" + System.getenv("SPANNER_EMULATOR_HOST"), options.getHost()); } - assertThat(options.getPrefetchChunks()).isEqualTo(4); + assertEquals(4, options.getPrefetchChunks()); assertNull(options.getSessionLabels()); + assertEquals(DecodeMode.DIRECT, options.getDecodeMode()); } @Test diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/TransactionRunnerImplTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/TransactionRunnerImplTest.java index f5d9f1841a2..c647bb3642a 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/TransactionRunnerImplTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/TransactionRunnerImplTest.java @@ -32,6 +32,7 @@ import com.google.api.core.ApiFutures; import com.google.cloud.grpc.GrpcTransportOptions; import com.google.cloud.grpc.GrpcTransportOptions.ExecutorFactory; +import com.google.cloud.spanner.ErrorHandler.DefaultErrorHandler; import com.google.cloud.spanner.SessionClient.SessionId; import com.google.cloud.spanner.TransactionRunnerImpl.TransactionContextImpl; import com.google.cloud.spanner.spi.v1.SpannerRpc; @@ -115,6 +116,7 @@ public void setUp() { MockitoAnnotations.initMocks(this); tracer = new TraceWrapper(Tracing.getTracer(), OpenTelemetry.noop().getTracer(""), false); firstRun = true; + when(session.getErrorHandler()).thenReturn(DefaultErrorHandler.INSTANCE); when(session.newTransaction(Options.fromTransactionOptions())).thenReturn(txn); when(session.getTracer()).thenReturn(tracer); when(rpc.executeQuery(Mockito.any(ExecuteSqlRequest.class), Mockito.anyMap(), eq(true))) diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/connection/AbstractMockServerTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/connection/AbstractMockServerTest.java index 1a5cfdf73e4..3e08eafed07 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/connection/AbstractMockServerTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/connection/AbstractMockServerTest.java @@ -43,7 +43,12 @@ import com.google.spanner.v1.StructType.Field; import com.google.spanner.v1.Type; import com.google.spanner.v1.TypeCode; +import io.grpc.Metadata; import io.grpc.Server; +import io.grpc.ServerCall; +import io.grpc.ServerCall.Listener; +import io.grpc.ServerCallHandler; +import io.grpc.ServerInterceptor; import io.grpc.internal.LogExceptionRunnable; import io.grpc.netty.shaded.io.grpc.netty.NettyServerBuilder; import io.grpc.stub.StreamObserver; @@ -151,6 +156,10 @@ public abstract class AbstractMockServerTest { @BeforeClass public static void startStaticServer() throws IOException { + startStaticServer(createServerInterceptor()); + } + + public static void startStaticServer(ServerInterceptor interceptor) throws IOException { mockSpanner = new MockSpannerServiceImpl(); mockSpanner.setAbortProbability(0.0D); // We don't want any unpredictable aborted transactions. mockInstanceAdmin = new MockInstanceAdminImpl(); @@ -176,6 +185,7 @@ public void getOperation( .addService(mockInstanceAdmin) .addService(mockDatabaseAdmin) .addService(mockOperations) + .intercept(interceptor) .build() .start(); mockSpanner.putStatementResult( @@ -205,6 +215,16 @@ public void getOperation( Logger.getLogger("io.grpc.internal.AbstractClientStream").setUseParentHandlers(false); } + static ServerInterceptor createServerInterceptor() { + return new ServerInterceptor() { + @Override + public Listener interceptCall( + ServerCall call, Metadata headers, ServerCallHandler next) { + return next.startCall(call, headers); + } + }; + } + @AfterClass public static void stopServer() { try { diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/connection/DecodeModeTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/connection/DecodeModeTest.java index 6a6125e1dda..b64a05b2ef4 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/connection/DecodeModeTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/connection/DecodeModeTest.java @@ -27,6 +27,12 @@ import com.google.cloud.spanner.ResultSet; import com.google.cloud.spanner.SpannerException; import com.google.cloud.spanner.Statement; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.ThreadLocalRandom; import org.junit.After; import org.junit.Test; import org.junit.runner.RunWith; @@ -41,7 +47,7 @@ public void clearRequests() { } @Test - public void testAllDecodeModes() { + public void testAllDecodeModes() throws Exception { int numRows = 10; RandomResultSetGenerator generator = new RandomResultSetGenerator(numRows); String sql = "select * from random"; @@ -50,57 +56,85 @@ public void testAllDecodeModes() { MockSpannerServiceImpl.StatementResult.query(statement, generator.generate())); try (Connection connection = createConnection()) { - for (boolean readonly : new boolean[] {true, false}) { - for (boolean autocommit : new boolean[] {true, false}) { - connection.setReadOnly(readonly); - connection.setAutocommit(autocommit); + for (boolean multiThreaded : new boolean[] {true, false}) { + for (boolean readonly : new boolean[] {true, false}) { + for (boolean autocommit : new boolean[] {true, false}) { + connection.setReadOnly(readonly); + connection.setAutocommit(autocommit); - int receivedRows = 0; - // DecodeMode#DIRECT is not supported in read/write transactions, as the protobuf value is - // used for checksum calculation. - try (ResultSet direct = - connection.executeQuery( - statement, - !readonly && !autocommit - ? Options.decodeMode(DecodeMode.LAZY_PER_ROW) - : Options.decodeMode(DecodeMode.DIRECT)); - ResultSet lazyPerRow = - connection.executeQuery(statement, Options.decodeMode(DecodeMode.LAZY_PER_ROW)); - ResultSet lazyPerCol = - connection.executeQuery(statement, Options.decodeMode(DecodeMode.LAZY_PER_COL))) { - while (direct.next() && lazyPerRow.next() && lazyPerCol.next()) { - assertEquals(direct.getColumnCount(), lazyPerRow.getColumnCount()); - assertEquals(direct.getColumnCount(), lazyPerCol.getColumnCount()); - for (int col = 0; col < direct.getColumnCount(); col++) { - // Test getting the entire row as a struct both as the first thing we do, and as the - // last thing we do. This ensures that the method works as expected both when a row - // is lazily decoded by this method, and when it has already been decoded by another - // method. - if (col % 2 == 0) { - assertEquals(direct.getCurrentRowAsStruct(), lazyPerRow.getCurrentRowAsStruct()); - assertEquals(direct.getCurrentRowAsStruct(), lazyPerCol.getCurrentRowAsStruct()); - } - assertEquals(direct.isNull(col), lazyPerRow.isNull(col)); - assertEquals(direct.isNull(col), lazyPerCol.isNull(col)); - assertEquals(direct.getValue(col), lazyPerRow.getValue(col)); - assertEquals(direct.getValue(col), lazyPerCol.getValue(col)); - if (col % 2 == 1) { - assertEquals(direct.getCurrentRowAsStruct(), lazyPerRow.getCurrentRowAsStruct()); - assertEquals(direct.getCurrentRowAsStruct(), lazyPerCol.getCurrentRowAsStruct()); + int receivedRows = 0; + // DecodeMode#DIRECT is not supported in read/write transactions, as the protobuf value + // is + // used for checksum calculation. + try (ResultSet direct = + connection.executeQuery( + statement, + !readonly && !autocommit + ? Options.decodeMode(DecodeMode.LAZY_PER_ROW) + : Options.decodeMode(DecodeMode.DIRECT)); + ResultSet lazyPerRow = + connection.executeQuery( + statement, Options.decodeMode(DecodeMode.LAZY_PER_ROW)); + ResultSet lazyPerCol = + connection.executeQuery( + statement, Options.decodeMode(DecodeMode.LAZY_PER_COL))) { + while (direct.next() && lazyPerRow.next() && lazyPerCol.next()) { + assertEquals(direct.getColumnCount(), lazyPerRow.getColumnCount()); + assertEquals(direct.getColumnCount(), lazyPerCol.getColumnCount()); + if (multiThreaded) { + ExecutorService service = Executors.newFixedThreadPool(direct.getColumnCount()); + List> futures = new ArrayList<>(direct.getColumnCount()); + for (int col = 0; col < direct.getColumnCount(); col++) { + final int colNumber = col; + futures.add( + service.submit( + () -> checkRowValues(colNumber, direct, lazyPerRow, lazyPerCol))); + } + service.shutdown(); + for (Future future : futures) { + future.get(); + } + } else { + for (int col = 0; col < direct.getColumnCount(); col++) { + checkRowValues(col, direct, lazyPerRow, lazyPerCol); + } } + receivedRows++; } - receivedRows++; + assertEquals(numRows, receivedRows); + } + if (!autocommit) { + connection.commit(); } - assertEquals(numRows, receivedRows); - } - if (!autocommit) { - connection.commit(); } } } } } + private void checkRowValues( + int col, ResultSet direct, ResultSet lazyPerRow, ResultSet lazyPerCol) { + // Randomly decode and get a column to trigger parallel decoding of one column. + lazyPerCol.getValue(ThreadLocalRandom.current().nextInt(lazyPerCol.getColumnCount())); + + // Test getting the entire row as a struct both as the first thing we do, and as the + // last thing we do. This ensures that the method works as expected both when a row + // is lazily decoded by this method, and when it has already been decoded by another + // method. + if (col % 2 == 0) { + assertEquals(direct.getCurrentRowAsStruct(), lazyPerRow.getCurrentRowAsStruct()); + assertEquals(direct.getCurrentRowAsStruct(), lazyPerCol.getCurrentRowAsStruct()); + } + assertEquals(direct.isNull(col), lazyPerRow.isNull(col)); + assertEquals(direct.isNull(col), lazyPerCol.isNull(col)); + assertEquals(direct.getValue(col), lazyPerRow.getValue(col)); + assertEquals(direct.getValue(col), lazyPerCol.getValue(col)); + if (col % 2 == 1) { + assertEquals(direct.getCurrentRowAsStruct(), lazyPerRow.getCurrentRowAsStruct()); + assertEquals(direct.getCurrentRowAsStruct(), lazyPerCol.getCurrentRowAsStruct()); + } + } + @Test public void testDecodeModeDirect_failsInReadWriteTransaction() { int numRows = 1; diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/it/ITAsyncAPITest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/it/ITAsyncAPITest.java index 1e408fadf33..1ca69fe975d 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/it/ITAsyncAPITest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/it/ITAsyncAPITest.java @@ -58,6 +58,7 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.ThreadLocalRandom; import org.junit.AfterClass; import org.junit.Before; import org.junit.BeforeClass; @@ -237,17 +238,24 @@ public void invalidDatabase() throws Exception { RemoteSpannerHelper helper = env.getTestHelper(); DatabaseClient invalidClient = helper.getClient().getDatabaseClient(DatabaseId.of(helper.getInstanceId(), "invalid")); - ApiFuture row = - invalidClient - .singleUse(TimestampBound.strong()) - .readRowAsync(TABLE_NAME, Key.of("k99"), ALL_COLUMNS); + Thread.sleep(ThreadLocalRandom.current().nextLong(100L)); try { + // The NOT_FOUND error can come from both the call to invalidClient.singleUse() as well as + // from the call to row.get(), which is why both need to be inside the try block. + ApiFuture row = + invalidClient + .singleUse(TimestampBound.strong()) + .readRowAsync(TABLE_NAME, Key.of("k99"), ALL_COLUMNS); row.get(); fail("missing expected exception"); - } catch (ExecutionException e) { - assertThat(e.getCause()).isInstanceOf(SpannerException.class); - SpannerException se = (SpannerException) e.getCause(); - assertThat(se.getErrorCode()).isEqualTo(ErrorCode.NOT_FOUND); + } catch (ExecutionException | SpannerException thrownException) { + SpannerException spannerException; + if (thrownException instanceof ExecutionException) { + spannerException = (SpannerException) thrownException.getCause(); + } else { + spannerException = (SpannerException) thrownException; + } + assertEquals(ErrorCode.NOT_FOUND, spannerException.getErrorCode()); } } diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/it/ITAutogeneratedAdminClientTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/it/ITAutogeneratedAdminClientTest.java index a1c6a35819f..de5597da4f9 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/it/ITAutogeneratedAdminClientTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/it/ITAutogeneratedAdminClientTest.java @@ -266,9 +266,9 @@ private Database createAndUpdateDatabase( private String getCreateTableStatement() { if (dialect == DatabaseDialect.POSTGRESQL) { - return "CREATE TABLE T (" + " \"K\" VARCHAR PRIMARY KEY" + ")"; + return "CREATE TABLE IF NOT EXISTS T (" + " \"K\" VARCHAR PRIMARY KEY" + ")"; } else { - return "CREATE TABLE T (" + " K STRING(MAX)" + ") PRIMARY KEY (K)"; + return "CREATE TABLE IF NOT EXISTS T (" + " K STRING(MAX)" + ") PRIMARY KEY (K)"; } } diff --git a/grpc-google-cloud-spanner-admin-database-v1/pom.xml b/grpc-google-cloud-spanner-admin-database-v1/pom.xml index d316ece2b6c..2dff3c6bcb1 100644 --- a/grpc-google-cloud-spanner-admin-database-v1/pom.xml +++ b/grpc-google-cloud-spanner-admin-database-v1/pom.xml @@ -4,13 +4,13 @@ 4.0.0 com.google.api.grpc grpc-google-cloud-spanner-admin-database-v1 - 6.72.0 + 6.73.0 grpc-google-cloud-spanner-admin-database-v1 GRPC library for grpc-google-cloud-spanner-admin-database-v1 com.google.cloud google-cloud-spanner-parent - 6.72.0 + 6.73.0 diff --git a/grpc-google-cloud-spanner-admin-instance-v1/pom.xml b/grpc-google-cloud-spanner-admin-instance-v1/pom.xml index d971f88fdce..86b53bb1225 100644 --- a/grpc-google-cloud-spanner-admin-instance-v1/pom.xml +++ b/grpc-google-cloud-spanner-admin-instance-v1/pom.xml @@ -4,13 +4,13 @@ 4.0.0 com.google.api.grpc grpc-google-cloud-spanner-admin-instance-v1 - 6.72.0 + 6.73.0 grpc-google-cloud-spanner-admin-instance-v1 GRPC library for grpc-google-cloud-spanner-admin-instance-v1 com.google.cloud google-cloud-spanner-parent - 6.72.0 + 6.73.0 diff --git a/grpc-google-cloud-spanner-executor-v1/pom.xml b/grpc-google-cloud-spanner-executor-v1/pom.xml index ac9f792bd3d..719d280a34b 100644 --- a/grpc-google-cloud-spanner-executor-v1/pom.xml +++ b/grpc-google-cloud-spanner-executor-v1/pom.xml @@ -4,13 +4,13 @@ 4.0.0 com.google.api.grpc grpc-google-cloud-spanner-executor-v1 - 6.72.0 + 6.73.0 grpc-google-cloud-spanner-executor-v1 GRPC library for google-cloud-spanner com.google.cloud google-cloud-spanner-parent - 6.72.0 + 6.73.0 diff --git a/grpc-google-cloud-spanner-v1/pom.xml b/grpc-google-cloud-spanner-v1/pom.xml index d7723c6e774..154179923d6 100644 --- a/grpc-google-cloud-spanner-v1/pom.xml +++ b/grpc-google-cloud-spanner-v1/pom.xml @@ -4,13 +4,13 @@ 4.0.0 com.google.api.grpc grpc-google-cloud-spanner-v1 - 6.72.0 + 6.73.0 grpc-google-cloud-spanner-v1 GRPC library for grpc-google-cloud-spanner-v1 com.google.cloud google-cloud-spanner-parent - 6.72.0 + 6.73.0 diff --git a/pom.xml b/pom.xml index 955e456f999..d6fa38a61f8 100644 --- a/pom.xml +++ b/pom.xml @@ -4,7 +4,7 @@ com.google.cloud google-cloud-spanner-parent pom - 6.72.0 + 6.73.0 Google Cloud Spanner Parent https://github.com/googleapis/java-spanner @@ -14,7 +14,7 @@ com.google.cloud sdk-platform-java-config - 3.33.0 + 3.34.0 @@ -61,47 +61,47 @@ com.google.api.grpc proto-google-cloud-spanner-admin-instance-v1 - 6.72.0 + 6.73.0 com.google.api.grpc proto-google-cloud-spanner-executor-v1 - 6.72.0 + 6.73.0 com.google.api.grpc grpc-google-cloud-spanner-executor-v1 - 6.72.0 + 6.73.0 com.google.api.grpc proto-google-cloud-spanner-v1 - 6.72.0 + 6.73.0 com.google.api.grpc proto-google-cloud-spanner-admin-database-v1 - 6.72.0 + 6.73.0 com.google.api.grpc grpc-google-cloud-spanner-v1 - 6.72.0 + 6.73.0 com.google.api.grpc grpc-google-cloud-spanner-admin-instance-v1 - 6.72.0 + 6.73.0 com.google.api.grpc grpc-google-cloud-spanner-admin-database-v1 - 6.72.0 + 6.73.0 com.google.cloud google-cloud-spanner - 6.72.0 + 6.73.0 diff --git a/proto-google-cloud-spanner-admin-database-v1/pom.xml b/proto-google-cloud-spanner-admin-database-v1/pom.xml index e9cc23ac3ab..ce1b16d9072 100644 --- a/proto-google-cloud-spanner-admin-database-v1/pom.xml +++ b/proto-google-cloud-spanner-admin-database-v1/pom.xml @@ -4,13 +4,13 @@ 4.0.0 com.google.api.grpc proto-google-cloud-spanner-admin-database-v1 - 6.72.0 + 6.73.0 proto-google-cloud-spanner-admin-database-v1 PROTO library for proto-google-cloud-spanner-admin-database-v1 com.google.cloud google-cloud-spanner-parent - 6.72.0 + 6.73.0 diff --git a/proto-google-cloud-spanner-admin-instance-v1/pom.xml b/proto-google-cloud-spanner-admin-instance-v1/pom.xml index a7d13f01a46..5a412970bea 100644 --- a/proto-google-cloud-spanner-admin-instance-v1/pom.xml +++ b/proto-google-cloud-spanner-admin-instance-v1/pom.xml @@ -4,13 +4,13 @@ 4.0.0 com.google.api.grpc proto-google-cloud-spanner-admin-instance-v1 - 6.72.0 + 6.73.0 proto-google-cloud-spanner-admin-instance-v1 PROTO library for proto-google-cloud-spanner-admin-instance-v1 com.google.cloud google-cloud-spanner-parent - 6.72.0 + 6.73.0 diff --git a/proto-google-cloud-spanner-executor-v1/pom.xml b/proto-google-cloud-spanner-executor-v1/pom.xml index fe38be68ce6..478793fa0ea 100644 --- a/proto-google-cloud-spanner-executor-v1/pom.xml +++ b/proto-google-cloud-spanner-executor-v1/pom.xml @@ -4,13 +4,13 @@ 4.0.0 com.google.api.grpc proto-google-cloud-spanner-executor-v1 - 6.72.0 + 6.73.0 proto-google-cloud-spanner-executor-v1 Proto library for google-cloud-spanner com.google.cloud google-cloud-spanner-parent - 6.72.0 + 6.73.0 diff --git a/proto-google-cloud-spanner-v1/pom.xml b/proto-google-cloud-spanner-v1/pom.xml index fce72e60709..8540ba0c93f 100644 --- a/proto-google-cloud-spanner-v1/pom.xml +++ b/proto-google-cloud-spanner-v1/pom.xml @@ -4,13 +4,13 @@ 4.0.0 com.google.api.grpc proto-google-cloud-spanner-v1 - 6.72.0 + 6.73.0 proto-google-cloud-spanner-v1 PROTO library for proto-google-cloud-spanner-v1 com.google.cloud google-cloud-spanner-parent - 6.72.0 + 6.73.0 diff --git a/samples/install-without-bom/pom.xml b/samples/install-without-bom/pom.xml index 56b1c708c8c..23595b69744 100644 --- a/samples/install-without-bom/pom.xml +++ b/samples/install-without-bom/pom.xml @@ -33,7 +33,7 @@ com.google.cloud google-cloud-spanner - 6.71.0 + 6.72.0 @@ -145,7 +145,7 @@ org.apache.maven.plugins maven-failsafe-plugin - 3.3.1 + 3.4.0 java-client-integration-tests diff --git a/samples/native-image/pom.xml b/samples/native-image/pom.xml index b34fc048486..85df5b61636 100644 --- a/samples/native-image/pom.xml +++ b/samples/native-image/pom.xml @@ -29,7 +29,7 @@ com.google.cloud libraries-bom - 26.43.0 + 26.44.0 pom import @@ -104,7 +104,7 @@ org.junit.vintage junit-vintage-engine - 5.10.3 + 5.11.0 test @@ -121,7 +121,7 @@ org.apache.maven.plugins maven-surefire-plugin - 3.3.1 + 3.4.0 **/*IT diff --git a/samples/pom.xml b/samples/pom.xml index 07c91c62ad0..b12b188d6cd 100644 --- a/samples/pom.xml +++ b/samples/pom.xml @@ -40,7 +40,7 @@ org.apache.maven.plugins maven-deploy-plugin - 3.1.2 + 3.1.3 true diff --git a/samples/snapshot/pom.xml b/samples/snapshot/pom.xml index 86832f71b9d..1f55b81ef1d 100644 --- a/samples/snapshot/pom.xml +++ b/samples/snapshot/pom.xml @@ -32,7 +32,7 @@ com.google.cloud google-cloud-spanner - 6.72.0 + 6.73.0 @@ -144,7 +144,7 @@ org.apache.maven.plugins maven-failsafe-plugin - 3.3.1 + 3.4.0 java-client-integration-tests diff --git a/samples/snippets/pom.xml b/samples/snippets/pom.xml index c96c572d509..bac633aaae4 100644 --- a/samples/snippets/pom.xml +++ b/samples/snippets/pom.xml @@ -34,7 +34,7 @@ com.google.cloud libraries-bom - 26.43.0 + 26.44.0 pom import @@ -175,7 +175,7 @@ org.apache.maven.plugins maven-failsafe-plugin - 3.3.1 + 3.4.0 java-client-integration-tests diff --git a/samples/snippets/src/main/java/com/example/spanner/SpannerGraphSample.java b/samples/snippets/src/main/java/com/example/spanner/SpannerGraphSample.java new file mode 100644 index 00000000000..fa23b6401b5 --- /dev/null +++ b/samples/snippets/src/main/java/com/example/spanner/SpannerGraphSample.java @@ -0,0 +1,588 @@ +/* + * Copyright 2024 Google Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.example.spanner; + +import com.google.cloud.Timestamp; +import com.google.cloud.spanner.DatabaseClient; +import com.google.cloud.spanner.DatabaseId; +import com.google.cloud.spanner.Key; +import com.google.cloud.spanner.KeyRange; +import com.google.cloud.spanner.KeySet; +import com.google.cloud.spanner.Mutation; +import com.google.cloud.spanner.ResultSet; +import com.google.cloud.spanner.Spanner; +import com.google.cloud.spanner.SpannerException; +import com.google.cloud.spanner.SpannerExceptionFactory; +import com.google.cloud.spanner.SpannerOptions; +import com.google.cloud.spanner.Statement; +import com.google.cloud.spanner.admin.database.v1.DatabaseAdminClient; +import com.google.spanner.admin.database.v1.CreateDatabaseRequest; +import com.google.spanner.admin.database.v1.InstanceName; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.ExecutionException; + +/** + * Example code for using the Cloud Spanner API. This example demonstrates all the common property + * graph operations that can be done on Cloud Spanner. These are: + * + *

+ * + *

    + *
  • Creating a Cloud Spanner database with a property graph. + *
  • Inserting data, updating and deleting data. + *
  • Executing graph queries. + *
+ */ +public class SpannerGraphSample { + + // [START spanner_insert_graph_data] + /** Class to contain sample Person data. */ + static class Person { + + final long id; + final String name; + final Timestamp birthday; + final String country; + final String city; + + Person(long id, String name, Timestamp birthday, String country, String city) { + this.id = id; + this.name = name; + this.birthday = birthday; + this.country = country; + this.city = city; + } + } + + /** Class to contain sample Account data. */ + static class Account { + + final long id; + final Timestamp createTime; + final boolean isBlocked; + final String nickName; + + Account(long id, Timestamp createTime, boolean isBlocked, String nickName) { + this.id = id; + this.createTime = createTime; + this.isBlocked = isBlocked; + this.nickName = nickName; + } + } + + /** Class to contain sample Transfer data. */ + static class Transfer { + + final long id; + final long toId; + final double amount; + final Timestamp createTime; + final String orderNumber; + + Transfer(long id, long toId, double amount, Timestamp createTime, String orderNumber) { + this.id = id; + this.toId = toId; + this.amount = amount; + this.createTime = createTime; + this.orderNumber = orderNumber; + } + } + + /** Class to contain sample Ownership data. */ + static class Own { + + final long id; + final long accountId; + final Timestamp createTime; + + Own(long id, long accountId, Timestamp createTime) { + this.id = id; + this.accountId = accountId; + this.createTime = createTime; + } + } + + // [END spanner_insert_graph_data] + + // [START spanner_create_database_with_property_graph] + static void createDatabaseWithPropertyGraph( + DatabaseAdminClient dbAdminClient, InstanceName instanceName, String databaseId) { + CreateDatabaseRequest createDatabaseRequest = + CreateDatabaseRequest.newBuilder() + .setCreateStatement("CREATE DATABASE `" + databaseId + "`") + .setParent(instanceName.toString()) + .addAllExtraStatements( + Arrays.asList( + "CREATE TABLE Person (" + + " id INT64 NOT NULL," + + " name STRING(MAX)," + + " birthday TIMESTAMP," + + " country STRING(MAX)," + + " city STRING(MAX)," + + ") PRIMARY KEY (id)", + "CREATE TABLE Account (" + + " id INT64 NOT NULL," + + " create_time TIMESTAMP," + + " is_blocked BOOL," + + " nick_name STRING(MAX)," + + ") PRIMARY KEY (id)", + "CREATE TABLE PersonOwnAccount (" + + " id INT64 NOT NULL," + + " account_id INT64 NOT NULL," + + " create_time TIMESTAMP," + + " FOREIGN KEY (account_id)" + + " REFERENCES Account (id)" + + ") PRIMARY KEY (id, account_id)," + + "INTERLEAVE IN PARENT Person ON DELETE CASCADE", + "CREATE TABLE AccountTransferAccount (" + + " id INT64 NOT NULL," + + " to_id INT64 NOT NULL," + + " amount FLOAT64," + + " create_time TIMESTAMP NOT NULL OPTIONS" + + " (allow_commit_timestamp=true)," + + " order_number STRING(MAX)," + + " FOREIGN KEY (to_id) REFERENCES Account (id)" + + ") PRIMARY KEY (id, to_id, create_time)," + + "INTERLEAVE IN PARENT Account ON DELETE CASCADE", + "CREATE OR REPLACE PROPERTY GRAPH FinGraph " + + "NODE TABLES (Account, Person)" + + "EDGE TABLES (" + + " PersonOwnAccount" + + " SOURCE KEY(id) REFERENCES Person(id)" + + " DESTINATION KEY(account_id) REFERENCES Account(id)" + + " LABEL Owns," + + " AccountTransferAccount" + + " SOURCE KEY(id) REFERENCES Account(id)" + + " DESTINATION KEY(to_id) REFERENCES Account(id)" + + " LABEL Transfers)")) + .build(); + try { + // Initiate the request which returns an OperationFuture. + com.google.spanner.admin.database.v1.Database db = + dbAdminClient.createDatabaseAsync(createDatabaseRequest).get(); + System.out.println("Created database [" + db.getName() + "]"); + } catch (ExecutionException e) { + // If the operation failed during execution, expose the cause. + System.out.println("Encountered exception" + e.getCause()); + throw (SpannerException) e.getCause(); + } catch (InterruptedException e) { + // Throw when a thread is waiting, sleeping, or otherwise occupied, + // and the thread is interrupted, either before or during the activity. + throw SpannerExceptionFactory.propagateInterrupt(e); + } + } + + // [END spanner_create_database_with_property_graph] + + // [START spanner_insert_graph_data] + static final List ACCOUNTS = + Arrays.asList( + new Account( + 7, Timestamp.parseTimestamp("2020-01-10T06:22:20.12Z"), false, "Vacation Fund"), + new Account( + 16, Timestamp.parseTimestamp("2020-01-27T17:55:09.12Z"), true, "Vacation Fund"), + new Account( + 20, Timestamp.parseTimestamp("2020-02-18T05:44:20.12Z"), false, "Rainy Day Fund")); + + static final List PERSONS = + Arrays.asList( + new Person( + 1, + "Alex", + Timestamp.parseTimestamp("1991-12-21T00:00:00.12Z"), + "Australia", + " Adelaide"), + new Person( + 2, + "Dana", + Timestamp.parseTimestamp("1980-10-31T00:00:00.12Z"), + "Czech_Republic", + "Moravia"), + new Person( + 3, "Lee", Timestamp.parseTimestamp("1986-12-07T00:00:00.12Z"), "India", "Kollam")); + + static final List TRANSFERS = + Arrays.asList( + new Transfer( + 7, 16, 300.0, Timestamp.parseTimestamp("2020-08-29T15:28:58.12Z"), "304330008004315"), + new Transfer( + 7, 16, 100.0, Timestamp.parseTimestamp("2020-10-04T16:55:05.12Z"), "304120005529714"), + new Transfer( + 16, + 20, + 300.0, + Timestamp.parseTimestamp("2020-09-25T02:36:14.12Z"), + "103650009791820"), + new Transfer( + 20, 7, 500.0, Timestamp.parseTimestamp("2020-10-04T16:55:05.12Z"), "304120005529714"), + new Transfer( + 20, + 16, + 200.0, + Timestamp.parseTimestamp("2020-10-17T03:59:40.12Z"), + "302290001255747")); + + static final List OWNERSHIPS = + Arrays.asList( + new Own(1, 7, Timestamp.parseTimestamp("2020-01-10T06:22:20.12Z")), + new Own(2, 20, Timestamp.parseTimestamp("2020-01-27T17:55:09.12Z")), + new Own(3, 16, Timestamp.parseTimestamp("2020-02-18T05:44:20.12Z"))); + + static void insertData(DatabaseClient dbClient) { + List mutations = new ArrayList<>(); + for (Account account : ACCOUNTS) { + mutations.add( + Mutation.newInsertBuilder("Account") + .set("id") + .to(account.id) + .set("create_time") + .to(account.createTime) + .set("is_blocked") + .to(account.isBlocked) + .set("nick_name") + .to(account.nickName) + .build()); + } + for (Person person : PERSONS) { + mutations.add( + Mutation.newInsertBuilder("Person") + .set("id") + .to(person.id) + .set("name") + .to(person.name) + .set("birthday") + .to(person.birthday) + .set("country") + .to(person.country) + .set("city") + .to(person.city) + .build()); + } + for (Transfer transfer : TRANSFERS) { + mutations.add( + Mutation.newInsertBuilder("AccountTransferAccount") + .set("id") + .to(transfer.id) + .set("to_id") + .to(transfer.toId) + .set("amount") + .to(transfer.amount) + .set("create_time") + .to(transfer.createTime) + .set("order_number") + .to(transfer.orderNumber) + .build()); + } + for (Own own : OWNERSHIPS) { + mutations.add( + Mutation.newInsertBuilder("PersonOwnAccount") + .set("id") + .to(own.id) + .set("account_id") + .to(own.accountId) + .set("create_time") + .to(own.createTime) + .build()); + } + + dbClient.write(mutations); + } + + // [END spanner_insert_graph_data] + + // [START spanner_insert_graph_data_with_dml] + static void insertUsingDml(DatabaseClient dbClient) { + dbClient + .readWriteTransaction() + .run( + transaction -> { + String sql = + "INSERT INTO Account (id, create_time, is_blocked) " + + " VALUES" + + " (1, CAST('2000-08-10 08:18:48.463959-07:52' AS TIMESTAMP), false)," + + " (2, CAST('2000-08-12 08:18:48.463959-07:52' AS TIMESTAMP), true)"; + long rowCount = transaction.executeUpdate(Statement.of(sql)); + System.out.printf("%d record(s) inserted into Account.\n", rowCount); + return null; + }); + + dbClient + .readWriteTransaction() + .run( + transaction -> { + String sql = + "INSERT INTO AccountTransferAccount (id, to_id, create_time, amount) " + + " VALUES" + + " (1, 2, PENDING_COMMIT_TIMESTAMP(), 100)," + + " (1, 1, PENDING_COMMIT_TIMESTAMP(), 200) "; + long rowCount = transaction.executeUpdate(Statement.of(sql)); + System.out.printf("%d record(s) inserted into AccountTransferAccount.\n", rowCount); + return null; + }); + } + + // [END spanner_insert_graph_data_with_dml] + + // [START spanner_update_graph_data_with_dml] + static void updateUsingDml(DatabaseClient dbClient) { + dbClient + .readWriteTransaction() + .run( + transaction -> { + String sql = "UPDATE Account SET is_blocked = false WHERE id = 2"; + long rowCount = transaction.executeUpdate(Statement.of(sql)); + System.out.printf("%d Account record(s) updated.\n", rowCount); + return null; + }); + + dbClient + .readWriteTransaction() + .run( + transaction -> { + String sql = + "UPDATE AccountTransferAccount SET amount = 300 WHERE id = 1 AND to_id = 2"; + long rowCount = transaction.executeUpdate(Statement.of(sql)); + System.out.printf("%d AccountTransferAccount record(s) updated.\n", rowCount); + return null; + }); + } + + // [END spanner_update_graph_data_with_dml] + + // [START spanner_update_graph_data_with_graph_query_in_dml] + static void updateUsingGraphQueryInDml(DatabaseClient dbClient) { + dbClient + .readWriteTransaction() + .run( + transaction -> { + String sql = + "UPDATE Account SET is_blocked = true " + + "WHERE id IN {" + + " GRAPH FinGraph" + + " MATCH (a:Account WHERE a.id = 1)-[:TRANSFERS]->{1,2}(b:Account)" + + " RETURN b.id}"; + long rowCount = transaction.executeUpdate(Statement.of(sql)); + System.out.printf("%d Account record(s) updated.\n", rowCount); + return null; + }); + } + + // [END spanner_update_graph_data_with_graph_query_in_dml] + + // [START spanner_query_graph_data] + static void query(DatabaseClient dbClient) { + try (ResultSet resultSet = + dbClient + .singleUse() // Execute a single query against Cloud Spanner. + .executeQuery( + Statement.of( + "Graph FinGraph MATCH" + + " (a:Person)-[o:Owns]->()-[t:Transfers]->()<-[p:Owns]-(b:Person)RETURN" + + " a.name AS sender, b.name AS receiver, t.amount, t.create_time AS" + + " transfer_at"))) { + while (resultSet.next()) { + System.out.printf( + "%s %s %f %s\n", + resultSet.getString(0), + resultSet.getString(1), + resultSet.getDouble(2), + resultSet.getTimestamp(3)); + } + } + } + + // [END spanner_query_graph_data] + + // [START spanner_query_graph_data_with_parameter] + static void queryWithParameter(DatabaseClient dbClient) { + Statement statement = + Statement.newBuilder( + "Graph FinGraph MATCH" + + " (a:Person)-[o:Owns]->()-[t:Transfers]->()<-[p:Owns]-(b:Person) WHERE" + + " t.amount >= @min RETURN a.name AS sender, b.name AS receiver, t.amount," + + " t.create_time AS transfer_at") + .bind("min") + .to(500) + .build(); + try (ResultSet resultSet = dbClient.singleUse().executeQuery(statement)) { + while (resultSet.next()) { + System.out.printf( + "%s %s %f %s\n", + resultSet.getString("sender"), + resultSet.getString("receiver"), + resultSet.getDouble("amount"), + resultSet.getTimestamp("transfer_at")); + } + } + } + + // [END spanner_query_graph_data_with_parameter] + + // [START spanner_delete_graph_data_with_dml] + static void deleteUsingDml(DatabaseClient dbClient) { + dbClient + .readWriteTransaction() + .run( + transaction -> { + String sql = "DELETE FROM AccountTransferAccount WHERE id = 1 AND to_id = 2"; + long rowCount = transaction.executeUpdate(Statement.of(sql)); + System.out.printf("%d AccountTransferAccount record(s) deleted.\n", rowCount); + return null; + }); + + dbClient + .readWriteTransaction() + .run( + transaction -> { + String sql = "DELETE FROM Account WHERE id = 2"; + long rowCount = transaction.executeUpdate(Statement.of(sql)); + System.out.printf("%d Account record(s) deleted.\n", rowCount); + return null; + }); + } + + // [END spanner_delete_graph_data_with_dml] + + // [START spanner_delete_graph_data] + static void deleteData(DatabaseClient dbClient) { + List mutations = new ArrayList<>(); + + // KeySet.Builder can be used to delete a specific set of rows. + // Delete the PersonOwnAccount rows with the key values (1,7) and (2,20). + mutations.add( + Mutation.delete( + "PersonOwnAccount", + KeySet.newBuilder().addKey(Key.of(1, 7)).addKey(Key.of(2, 20)).build())); + + // KeyRange can be used to delete rows with a key in a specific range. + // Delete a range of rows where the key prefix is >=1 and <8 + mutations.add( + Mutation.delete( + "AccountTransferAccount", KeySet.range(KeyRange.closedOpen(Key.of(1), Key.of(8))))); + + // KeySet.all() can be used to delete all the rows in a table. + // Delete all Account rows, which will also delete the remaining + // AccountTransferAccount rows since it was defined with ON DELETE CASCADE. + mutations.add(Mutation.delete("Account", KeySet.all())); + + // KeySet.all() can be used to delete all the rows in a table. + // Delete all Person rows, which will also delete the remaining + // PersonOwnAccount rows since it was defined with ON DELETE CASCADE. + mutations.add(Mutation.delete("Person", KeySet.all())); + + dbClient.write(mutations); + System.out.printf("Records deleted.\n"); + } + + // [END spanner_delete_graph_data] + + static void run( + DatabaseClient dbClient, + DatabaseAdminClient dbAdminClient, + String command, + DatabaseId database) { + switch (command) { + case "createdatabase": + createDatabaseWithPropertyGraph( + dbAdminClient, + InstanceName.of( + database.getInstanceId().getProject(), database.getInstanceId().getInstance()), + database.getDatabase()); + break; + case "insert": + insertData(dbClient); + break; + case "insertusingdml": + insertUsingDml(dbClient); + break; + case "updateusingdml": + updateUsingDml(dbClient); + break; + case "updateusinggraphqueryindml": + updateUsingGraphQueryInDml(dbClient); + break; + case "query": + query(dbClient); + break; + case "querywithparameter": + queryWithParameter(dbClient); + break; + case "deleteusingdml": + deleteUsingDml(dbClient); + break; + case "delete": + deleteData(dbClient); + break; + default: + printUsageAndExit(); + } + } + + static void printUsageAndExit() { + System.err.println("Usage:"); + System.err.println(" SpannerGraphExample "); + System.err.println(""); + System.err.println("Examples:"); + System.err.println(" SpannerGraphExample createdatabase my-instance example-db"); + System.err.println(" SpannerGraphExample insert my-instance example-db"); + System.err.println(" SpannerGraphExample insertusingdml my-instance example-db"); + System.err.println(" SpannerGraphExample updateusingdml my-instance example-db"); + System.err.println(" SpannerGraphExample updateusinggraphqueryindml my-instance example-db"); + System.err.println(" SpannerGraphExample query my-instance example-db"); + System.err.println(" SpannerGraphExample querywithparameter my-instance example-db"); + System.err.println(" SpannerGraphExample deleteusingdml my-instance example-db"); + System.err.println(" SpannerGraphExample delete my-instance example-db"); + System.exit(1); + } + + public static void main(String[] args) { + if (args.length != 3 && args.length != 4) { + printUsageAndExit(); + } + SpannerOptions options = SpannerOptions.newBuilder().build(); + Spanner spanner = options.getService(); + DatabaseAdminClient dbAdminClient = null; + try { + final String command = args[0]; + DatabaseId db = DatabaseId.of(options.getProjectId(), args[1], args[2]); + // This will return the default project id based on the environment. + String clientProject = spanner.getOptions().getProjectId(); + if (!db.getInstanceId().getProject().equals(clientProject)) { + System.err.println( + "Invalid project specified. Project in the database id should match the" + + "project name set in the environment variable GOOGLE_CLOUD_PROJECT. Expected: " + + clientProject); + printUsageAndExit(); + } + + DatabaseClient dbClient = spanner.getDatabaseClient(db); + dbAdminClient = spanner.createDatabaseAdminClient(); + + run(dbClient, dbAdminClient, command, db); + } finally { + if (dbAdminClient != null) { + if (!dbAdminClient.isShutdown() || !dbAdminClient.isTerminated()) { + dbAdminClient.close(); + } + } + spanner.close(); + } + System.out.println("Closed client"); + } +} diff --git a/samples/snippets/src/test/java/com/example/spanner/SpannerGraphSampleIT.java b/samples/snippets/src/test/java/com/example/spanner/SpannerGraphSampleIT.java new file mode 100644 index 00000000000..6f778de49ab --- /dev/null +++ b/samples/snippets/src/test/java/com/example/spanner/SpannerGraphSampleIT.java @@ -0,0 +1,109 @@ +/* + * Copyright 2024 Google Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.example.spanner; + +import static com.google.common.truth.Truth.assertThat; + +import com.google.cloud.spanner.DatabaseId; +import com.google.cloud.spanner.Spanner; +import com.google.cloud.spanner.admin.database.v1.DatabaseAdminClient; +import java.io.ByteArrayOutputStream; +import java.io.PrintStream; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** Unit tests for {@code SpannerGraphSample} */ +@RunWith(JUnit4.class) +@SuppressWarnings("checkstyle:abbreviationaswordinname") +public class SpannerGraphSampleIT extends SampleTestBaseV2 { + + private static final int DBID_LENGTH = 20; + // The instance needs to exist for tests to pass. + private static final String instanceId = System.getProperty("spanner.test.instance"); + private static final String baseDbId = System.getProperty("spanner.sample.database"); + static Spanner spanner; + static DatabaseAdminClient databaseAdminClient; + + private String runSample(String command, String databaseId) throws Exception { + System.out.println("Running " + command + " on " + instanceId + ":" + databaseId); + PrintStream stdOut = System.out; + ByteArrayOutputStream bout = new ByteArrayOutputStream(); + PrintStream out = new PrintStream(bout); + System.setOut(out); + SpannerGraphSample.main(new String[] {command, instanceId, databaseId}); + System.setOut(stdOut); + return bout.toString(); + } + + @Test + public void testSample() throws Exception { + String databaseId = idGenerator.generateDatabaseId(); + assertThat(instanceId).isNotNull(); + assertThat(databaseId).isNotNull(); + + System.out.println("Create database with property graph ..."); + String out = runSample("createdatabase", databaseId); + + DatabaseId dbId = DatabaseId.of(projectId, instanceId, databaseId); + assertThat(out).contains("Created database"); + assertThat(out).contains(dbId.getName()); + + System.out.println("Insert some data ..."); + out = runSample("insert", databaseId); + + System.out.println("Insert more data using DML ..."); + out = runSample("insertusingdml", databaseId); + assertThat(out).contains("2 record(s) inserted into Account."); + assertThat(out).contains("2 record(s) inserted into AccountTransferAccount."); + + System.out.println("Update some data using DML ..."); + out = runSample("updateusingdml", databaseId); + assertThat(out).contains("1 Account record(s) updated."); + assertThat(out).contains("1 AccountTransferAccount record(s) updated."); + + System.out.println("Update some data using a graph query in DML ..."); + out = runSample("updateusinggraphqueryindml", databaseId); + assertThat(out).contains("2 Account record(s) updated."); + + System.out.println("Query the property graph ..."); + out = runSample("query", databaseId); + assertThat(out).contains("Dana Alex 500.0"); + assertThat(out).contains("Lee Dana 300.0"); + assertThat(out).contains("Alex Lee 300.0"); + assertThat(out).contains("Alex Lee 100.0"); + assertThat(out).contains("Dana Lee 200.0"); + + System.out.println("Query the property graph with a parameter ..."); + out = runSample("querywithparameter", databaseId); + assertThat(out).contains("Dana Alex 500.0"); + + System.out.println("Delete some data using DML ..."); + out = runSample("deleteusingdml", databaseId); + assertThat(out).contains("1 Account record(s) deleted."); + + System.out.println("Delete the remaining data in the database ..."); + out = runSample("delete", databaseId); + assertThat(out).contains("Records deleted."); + + System.out.println("Query the property graph ..."); + out = runSample("query", databaseId); + assertThat(out).doesNotContain("Dana"); + assertThat(out).doesNotContain("Alex"); + assertThat(out).doesNotContain("Lee"); + } +} diff --git a/versions.txt b/versions.txt index 6cf9f7b626e..fe78ba6cadf 100644 --- a/versions.txt +++ b/versions.txt @@ -1,13 +1,13 @@ # Format: # module:released-version:current-version -proto-google-cloud-spanner-admin-instance-v1:6.72.0:6.72.0 -proto-google-cloud-spanner-v1:6.72.0:6.72.0 -proto-google-cloud-spanner-admin-database-v1:6.72.0:6.72.0 -grpc-google-cloud-spanner-v1:6.72.0:6.72.0 -grpc-google-cloud-spanner-admin-instance-v1:6.72.0:6.72.0 -grpc-google-cloud-spanner-admin-database-v1:6.72.0:6.72.0 -google-cloud-spanner:6.72.0:6.72.0 -google-cloud-spanner-executor:6.72.0:6.72.0 -proto-google-cloud-spanner-executor-v1:6.72.0:6.72.0 -grpc-google-cloud-spanner-executor-v1:6.72.0:6.72.0 +proto-google-cloud-spanner-admin-instance-v1:6.73.0:6.73.0 +proto-google-cloud-spanner-v1:6.73.0:6.73.0 +proto-google-cloud-spanner-admin-database-v1:6.73.0:6.73.0 +grpc-google-cloud-spanner-v1:6.73.0:6.73.0 +grpc-google-cloud-spanner-admin-instance-v1:6.73.0:6.73.0 +grpc-google-cloud-spanner-admin-database-v1:6.73.0:6.73.0 +google-cloud-spanner:6.73.0:6.73.0 +google-cloud-spanner-executor:6.73.0:6.73.0 +proto-google-cloud-spanner-executor-v1:6.73.0:6.73.0 +grpc-google-cloud-spanner-executor-v1:6.73.0:6.73.0