|
18 | 18 |
|
19 | 19 | import static com.google.cloud.spanner.SpannerExceptionFactory.newSpannerException;
|
20 | 20 |
|
| 21 | +import com.google.api.core.ApiFunction; |
21 | 22 | import com.google.api.core.ApiFuture;
|
22 | 23 | import com.google.api.core.InternalApi;
|
23 | 24 | import com.google.api.core.NanoClock;
|
|
54 | 55 | import com.google.api.pathtemplate.PathTemplate;
|
55 | 56 | import com.google.cloud.RetryHelper;
|
56 | 57 | import com.google.cloud.RetryHelper.RetryHelperException;
|
| 58 | +import com.google.cloud.grpc.GcpManagedChannelBuilder; |
| 59 | +import com.google.cloud.grpc.GcpManagedChannelOptions; |
| 60 | +import com.google.cloud.grpc.GcpManagedChannelOptions.GcpMetricsOptions; |
57 | 61 | import com.google.cloud.grpc.GrpcTransportOptions;
|
58 | 62 | import com.google.cloud.spanner.AdminRequestsPerMinuteExceededException;
|
59 | 63 | import com.google.cloud.spanner.ErrorCode;
|
|
80 | 84 | import com.google.common.base.Preconditions;
|
81 | 85 | import com.google.common.collect.ImmutableList;
|
82 | 86 | import com.google.common.collect.ImmutableSet;
|
| 87 | +import com.google.common.io.Resources; |
83 | 88 | import com.google.common.util.concurrent.RateLimiter;
|
84 | 89 | import com.google.common.util.concurrent.ThreadFactoryBuilder;
|
85 | 90 | import com.google.iam.v1.GetIamPolicyRequest;
|
|
156 | 161 | import com.google.spanner.v1.Transaction;
|
157 | 162 | import io.grpc.CallCredentials;
|
158 | 163 | import io.grpc.Context;
|
| 164 | +import io.grpc.ManagedChannelBuilder; |
159 | 165 | import io.grpc.MethodDescriptor;
|
| 166 | +import io.opencensus.metrics.Metrics; |
160 | 167 | import java.io.IOException;
|
161 | 168 | import java.io.UnsupportedEncodingException;
|
162 | 169 | import java.net.URLDecoder;
|
| 170 | +import java.nio.charset.Charset; |
163 | 171 | import java.nio.charset.StandardCharsets;
|
164 | 172 | import java.util.Comparator;
|
165 | 173 | import java.util.HashMap;
|
@@ -249,6 +257,7 @@ private void awaitTermination() throws InterruptedException {
|
249 | 257 | private static final String CLIENT_LIBRARY_LANGUAGE = "spanner-java";
|
250 | 258 | public static final String DEFAULT_USER_AGENT =
|
251 | 259 | CLIENT_LIBRARY_LANGUAGE + "/" + GaxProperties.getLibraryVersion(GapicSpannerRpc.class);
|
| 260 | + private static final String API_FILE = "grpc-gcp-apiconfig.json"; |
252 | 261 |
|
253 | 262 | private final ManagedInstantiatingExecutorProvider executorProvider;
|
254 | 263 | private boolean rpcIsClosed;
|
@@ -368,6 +377,9 @@ public GapicSpannerRpc(final SpannerOptions options) {
|
368 | 377 | // whether the attempt is allowed is totally controlled by service owner.
|
369 | 378 | .setAttemptDirectPath(true);
|
370 | 379 |
|
| 380 | + // If it is enabled in options uses the channel pool provided by the gRPC-GCP extension. |
| 381 | + maybeEnableGrpcGcpExtension(defaultChannelProviderBuilder, options); |
| 382 | + |
371 | 383 | TransportChannelProvider channelProvider =
|
372 | 384 | MoreObjects.firstNonNull(
|
373 | 385 | options.getChannelProvider(), defaultChannelProviderBuilder.build());
|
@@ -509,6 +521,62 @@ public <RequestT, ResponseT> UnaryCallable<RequestT, ResponseT> createUnaryCalla
|
509 | 521 | }
|
510 | 522 | }
|
511 | 523 |
|
| 524 | + private static String parseGrpcGcpApiConfig() { |
| 525 | + try { |
| 526 | + return Resources.toString( |
| 527 | + GapicSpannerRpc.class.getResource(API_FILE), Charset.forName("UTF8")); |
| 528 | + } catch (IOException e) { |
| 529 | + throw newSpannerException(e); |
| 530 | + } |
| 531 | + } |
| 532 | + |
| 533 | + // Enhance metric options for gRPC-GCP extension. Adds metric registry if not specified. |
| 534 | + private static GcpManagedChannelOptions grpcGcpOptionsWithMetrics(SpannerOptions options) { |
| 535 | + GcpManagedChannelOptions grpcGcpOptions = |
| 536 | + MoreObjects.firstNonNull(options.getGrpcGcpOptions(), new GcpManagedChannelOptions()); |
| 537 | + GcpMetricsOptions metricsOptions = |
| 538 | + MoreObjects.firstNonNull( |
| 539 | + grpcGcpOptions.getMetricsOptions(), GcpMetricsOptions.newBuilder().build()); |
| 540 | + GcpMetricsOptions.Builder metricsOptionsBuilder = GcpMetricsOptions.newBuilder(metricsOptions); |
| 541 | + if (metricsOptions.getMetricRegistry() == null) { |
| 542 | + metricsOptionsBuilder.withMetricRegistry(Metrics.getMetricRegistry()); |
| 543 | + } |
| 544 | + // TODO: Add default labels with values: client_id, database, instance_id. |
| 545 | + if (metricsOptions.getNamePrefix().equals("")) { |
| 546 | + metricsOptionsBuilder.withNamePrefix("cloud.google.com/java/spanner/gcp-channel-pool/"); |
| 547 | + } |
| 548 | + return GcpManagedChannelOptions.newBuilder(grpcGcpOptions) |
| 549 | + .withMetricsOptions(metricsOptionsBuilder.build()) |
| 550 | + .build(); |
| 551 | + } |
| 552 | + |
| 553 | + @SuppressWarnings("rawtypes") |
| 554 | + private static void maybeEnableGrpcGcpExtension( |
| 555 | + InstantiatingGrpcChannelProvider.Builder defaultChannelProviderBuilder, |
| 556 | + final SpannerOptions options) { |
| 557 | + if (!options.isGrpcGcpExtensionEnabled()) { |
| 558 | + return; |
| 559 | + } |
| 560 | + |
| 561 | + final String jsonApiConfig = parseGrpcGcpApiConfig(); |
| 562 | + final GcpManagedChannelOptions grpcGcpOptions = grpcGcpOptionsWithMetrics(options); |
| 563 | + |
| 564 | + ApiFunction<ManagedChannelBuilder, ManagedChannelBuilder> apiFunction = |
| 565 | + channelBuilder -> { |
| 566 | + if (options.getChannelConfigurator() != null) { |
| 567 | + channelBuilder = options.getChannelConfigurator().apply(channelBuilder); |
| 568 | + } |
| 569 | + return GcpManagedChannelBuilder.forDelegateBuilder(channelBuilder) |
| 570 | + .withApiConfigJsonString(jsonApiConfig) |
| 571 | + .withOptions(grpcGcpOptions) |
| 572 | + .setPoolSize(options.getNumChannels()); |
| 573 | + }; |
| 574 | + |
| 575 | + // Disable the GAX channel pooling functionality by setting the GAX channel pool size to 1. |
| 576 | + // Enable gRPC-GCP channel pool via the channel configurator. |
| 577 | + defaultChannelProviderBuilder.setPoolSize(1).setChannelConfigurator(apiFunction); |
| 578 | + } |
| 579 | + |
512 | 580 | private static HeaderProvider headerProviderWithUserAgentFrom(HeaderProvider headerProvider) {
|
513 | 581 | final Map<String, String> headersWithUserAgent = new HashMap<>(headerProvider.getHeaders());
|
514 | 582 | String userAgent = null;
|
|
0 commit comments