Skip to content
This repository was archived by the owner on Sep 26, 2023. It is now read-only.

Commit 14c25cd

Browse files
authored
feat: add batch throttled time to tracer (#1463)
Add `ApiTracer#batchRequestThrottled` to track total throttled time of a batch. In `BatcherImpl`, add throttled time to `ApiCallContext` so it can be accessed from callable chains and recorded in `ApiTracer`.
1 parent 27bf265 commit 14c25cd

File tree

3 files changed

+82
-7
lines changed

3 files changed

+82
-7
lines changed

gax/src/main/java/com/google/api/gax/batching/Batcher.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import com.google.api.core.ApiFuture;
3333
import com.google.api.core.BetaApi;
3434
import com.google.api.core.InternalExtensionOnly;
35+
import com.google.api.gax.rpc.ApiCallContext;
3536

3637
/**
3738
* Represents a batching context where individual elements will be accumulated and flushed in a
@@ -49,6 +50,9 @@
4950
@InternalExtensionOnly
5051
public interface Batcher<ElementT, ElementResultT> extends AutoCloseable {
5152

53+
/** {@link ApiCallContext.Key} for tracking batch total throttled time */
54+
ApiCallContext.Key<Long> THROTTLED_TIME_KEY = ApiCallContext.Key.create("total_throttled_time");
55+
5256
/**
5357
* Queues the passed in element to be sent at some point in the future.
5458
*

gax/src/main/java/com/google/api/gax/batching/BatcherImpl.java

Lines changed: 52 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -40,9 +40,11 @@
4040
import com.google.api.gax.batching.FlowController.FlowControlException;
4141
import com.google.api.gax.batching.FlowController.FlowControlRuntimeException;
4242
import com.google.api.gax.batching.FlowController.LimitExceededBehavior;
43+
import com.google.api.gax.rpc.ApiCallContext;
4344
import com.google.api.gax.rpc.UnaryCallable;
4445
import com.google.common.annotations.VisibleForTesting;
4546
import com.google.common.base.Preconditions;
47+
import com.google.common.base.Stopwatch;
4648
import com.google.common.util.concurrent.Futures;
4749
import java.lang.ref.Reference;
4850
import java.lang.ref.ReferenceQueue;
@@ -93,22 +95,26 @@ public class BatcherImpl<ElementT, ElementResultT, RequestT, ResponseT>
9395
private SettableApiFuture<Void> closeFuture;
9496
private final BatcherStats batcherStats = new BatcherStats();
9597
private final FlowController flowController;
98+
private final ApiCallContext callContext;
9699

97100
/**
98101
* @param batchingDescriptor a {@link BatchingDescriptor} for transforming individual elements
99102
* into wrappers request and response
100103
* @param unaryCallable a {@link UnaryCallable} object
101104
* @param prototype a {@link RequestT} object
102105
* @param batchingSettings a {@link BatchingSettings} with configuration of thresholds
106+
* @deprecated Please instantiate the Batcher with {@link FlowController} and {@link
107+
* ApiCallContext}
103108
*/
109+
@Deprecated
104110
public BatcherImpl(
105111
BatchingDescriptor<ElementT, ElementResultT, RequestT, ResponseT> batchingDescriptor,
106112
UnaryCallable<RequestT, ResponseT> unaryCallable,
107113
RequestT prototype,
108114
BatchingSettings batchingSettings,
109115
ScheduledExecutorService executor) {
110116

111-
this(batchingDescriptor, unaryCallable, prototype, batchingSettings, executor, null);
117+
this(batchingDescriptor, unaryCallable, prototype, batchingSettings, executor, null, null);
112118
}
113119

114120
/**
@@ -119,7 +125,9 @@ public BatcherImpl(
119125
* @param batchingSettings a {@link BatchingSettings} with configuration of thresholds
120126
* @param flowController a {@link FlowController} for throttling requests. If it's null, create a
121127
* {@link FlowController} object from {@link BatchingSettings#getFlowControlSettings()}.
128+
* @deprecated Please instantiate the Batcher with {@link ApiCallContext}
122129
*/
130+
@Deprecated
123131
public BatcherImpl(
124132
BatchingDescriptor<ElementT, ElementResultT, RequestT, ResponseT> batchingDescriptor,
125133
UnaryCallable<RequestT, ResponseT> unaryCallable,
@@ -128,6 +136,35 @@ public BatcherImpl(
128136
ScheduledExecutorService executor,
129137
@Nullable FlowController flowController) {
130138

139+
this(
140+
batchingDescriptor,
141+
unaryCallable,
142+
prototype,
143+
batchingSettings,
144+
executor,
145+
flowController,
146+
null);
147+
}
148+
149+
/**
150+
* @param batchingDescriptor a {@link BatchingDescriptor} for transforming individual elements
151+
* into wrappers request and response
152+
* @param unaryCallable a {@link UnaryCallable} object
153+
* @param prototype a {@link RequestT} object
154+
* @param batchingSettings a {@link BatchingSettings} with configuration of thresholds
155+
* @param flowController a {@link FlowController} for throttling requests. If it's null, create a
156+
* {@link FlowController} object from {@link BatchingSettings#getFlowControlSettings()}.
157+
* @param callContext a {@link ApiCallContext} object that'll be merged in unaryCallable
158+
*/
159+
public BatcherImpl(
160+
BatchingDescriptor<ElementT, ElementResultT, RequestT, ResponseT> batchingDescriptor,
161+
UnaryCallable<RequestT, ResponseT> unaryCallable,
162+
RequestT prototype,
163+
BatchingSettings batchingSettings,
164+
ScheduledExecutorService executor,
165+
@Nullable FlowController flowController,
166+
@Nullable ApiCallContext callContext) {
167+
131168
this.batchingDescriptor =
132169
Preconditions.checkNotNull(batchingDescriptor, "batching descriptor cannot be null");
133170
this.unaryCallable = Preconditions.checkNotNull(unaryCallable, "callable cannot be null");
@@ -168,6 +205,7 @@ public BatcherImpl(
168205
scheduledFuture = Futures.immediateCancelledFuture();
169206
}
170207
currentBatcherReference = new BatcherReference(this);
208+
this.callContext = callContext;
171209
}
172210

173211
/** {@inheritDoc} */
@@ -192,16 +230,18 @@ public ApiFuture<ElementResultT> add(ElementT element) {
192230
// class, which made it seem unnecessary to have blocking and non-blocking semaphore
193231
// implementations. Some refactoring may be needed for the optimized implementation. So we'll
194232
// defer it till we decide on if refactoring FlowController is necessary.
233+
Stopwatch stopwatch = Stopwatch.createStarted();
195234
try {
196235
flowController.reserve(1, batchingDescriptor.countBytes(element));
197236
} catch (FlowControlException e) {
198237
// This exception will only be thrown if the FlowController is set to ThrowException behavior
199238
throw FlowControlRuntimeException.fromFlowControlException(e);
200239
}
240+
long throttledTimeMs = stopwatch.elapsed(TimeUnit.MILLISECONDS);
201241

202242
SettableApiFuture<ElementResultT> result = SettableApiFuture.create();
203243
synchronized (elementLock) {
204-
currentOpenBatch.add(element, result);
244+
currentOpenBatch.add(element, result, throttledTimeMs);
205245
}
206246

207247
if (currentOpenBatch.hasAnyThresholdReached()) {
@@ -230,8 +270,14 @@ public void sendOutstanding() {
230270
currentOpenBatch = new Batch<>(prototype, batchingDescriptor, batchingSettings, batcherStats);
231271
}
232272

273+
// This check is for old clients that instantiated the batcher without ApiCallContext
274+
ApiCallContext callContextWithOption = null;
275+
if (callContext != null) {
276+
callContextWithOption =
277+
callContext.withOption(THROTTLED_TIME_KEY, accumulatedBatch.totalThrottledTimeMs);
278+
}
233279
final ApiFuture<ResponseT> batchResponse =
234-
unaryCallable.futureCall(accumulatedBatch.builder.build());
280+
unaryCallable.futureCall(accumulatedBatch.builder.build(), callContextWithOption);
235281

236282
numOfOutstandingBatches.incrementAndGet();
237283
ApiFutures.addCallback(
@@ -367,6 +413,7 @@ private static class Batch<ElementT, ElementResultT, RequestT, ResponseT> {
367413

368414
private long elementCounter = 0;
369415
private long byteCounter = 0;
416+
private long totalThrottledTimeMs = 0;
370417

371418
private Batch(
372419
RequestT prototype,
@@ -383,11 +430,12 @@ private Batch(
383430
this.batcherStats = batcherStats;
384431
}
385432

386-
void add(ElementT element, SettableApiFuture<ElementResultT> result) {
433+
void add(ElementT element, SettableApiFuture<ElementResultT> result, long throttledTimeMs) {
387434
builder.add(element);
388435
entries.add(BatchEntry.create(element, result));
389436
elementCounter++;
390437
byteCounter += descriptor.countBytes(element);
438+
totalThrottledTimeMs += throttledTimeMs;
391439
}
392440

393441
void onBatchSuccess(ResponseT response) {

gax/src/test/java/com/google/api/gax/batching/BatcherImplTest.java

Lines changed: 26 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import static com.google.api.gax.rpc.testing.FakeBatchableApi.callLabeledIntSquarer;
3434
import static com.google.common.truth.Truth.assertThat;
3535
import static com.google.common.truth.Truth.assertWithMessage;
36+
import static org.mockito.Mockito.when;
3637

3738
import com.google.api.core.ApiFuture;
3839
import com.google.api.core.ApiFutures;
@@ -75,6 +76,8 @@
7576
import org.junit.function.ThrowingRunnable;
7677
import org.junit.runner.RunWith;
7778
import org.junit.runners.JUnit4;
79+
import org.mockito.ArgumentCaptor;
80+
import org.mockito.Mockito;
7881
import org.threeten.bp.Duration;
7982

8083
@RunWith(JUnit4.class)
@@ -132,9 +135,10 @@ public void testSendOutstanding() {
132135
SQUARER_BATCHING_DESC_V2,
133136
new LabeledIntSquarerCallable() {
134137
@Override
135-
public ApiFuture<List<Integer>> futureCall(LabeledIntList request) {
138+
public ApiFuture<List<Integer>> futureCall(
139+
LabeledIntList request, ApiCallContext context) {
136140
callableCounter.incrementAndGet();
137-
return super.futureCall(request);
141+
return super.futureCall(request, context);
138142
}
139143
},
140144
labeledIntList,
@@ -838,8 +842,23 @@ public void testThrottlingBlocking() throws Exception {
838842
.setMaxOutstandingElementCount(1L)
839843
.build());
840844
ExecutorService executor = Executors.newSingleThreadExecutor();
845+
846+
ApiCallContext callContext = Mockito.mock(ApiCallContext.class);
847+
ArgumentCaptor<ApiCallContext.Key<Long>> key =
848+
ArgumentCaptor.forClass(ApiCallContext.Key.class);
849+
ArgumentCaptor<Long> value = ArgumentCaptor.forClass(Long.class);
850+
when(callContext.withOption(key.capture(), value.capture())).thenReturn(callContext);
851+
long throttledTime = 10;
852+
841853
try (final Batcher<Integer, Integer> batcher =
842-
createDefaultBatcherImpl(settings, flowController)) {
854+
new BatcherImpl<>(
855+
SQUARER_BATCHING_DESC_V2,
856+
callLabeledIntSquarer,
857+
labeledIntList,
858+
settings,
859+
EXECUTOR,
860+
flowController,
861+
callContext)) {
843862
flowController.reserve(1, 1);
844863
Future future =
845864
executor.submit(
@@ -851,6 +870,7 @@ public void run() {
851870
});
852871
try {
853872
future.get(10, TimeUnit.MILLISECONDS);
873+
Thread.sleep(throttledTime);
854874
assertWithMessage("adding elements to batcher should be blocked by FlowControlled").fail();
855875
} catch (TimeoutException e) {
856876
// expected
@@ -861,6 +881,9 @@ public void run() {
861881
} catch (TimeoutException e) {
862882
assertWithMessage("adding elements to batcher should not be blocked").fail();
863883
}
884+
// Verify that throttled time is recorded in ApiCallContext
885+
assertThat(key.getValue()).isSameInstanceAs(Batcher.THROTTLED_TIME_KEY);
886+
assertThat(value.getValue()).isAtLeast(throttledTime);
864887
} finally {
865888
executor.shutdownNow();
866889
}

0 commit comments

Comments
 (0)