Skip to content

Commit 25531d6

Browse files
authored
binder: Invoke onTransportReady() in a round-robin fashion. (#8835)
Also call onTransportReady() only if isReady() still holds by the time we get to a given Inbound. This dramatically reduces timeouts and improves throughput when flow control has kicked in. This approach is still not completely fair since each ongoing call might consume a different amount of window on its turn, but because of the way Outbound#writeMessageData() and BlockPool already work, everyone gets to send at least 16kb.
1 parent e279479 commit 25531d6

File tree

1 file changed

+17
-3
lines changed

1 file changed

+17
-3
lines changed

binder/src/main/java/io/grpc/binder/internal/BinderTransport.java

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,8 @@
6060
import io.grpc.internal.StatsTraceContext;
6161
import io.grpc.internal.TimeProvider;
6262
import java.util.ArrayList;
63+
import java.util.Iterator;
64+
import java.util.LinkedHashSet;
6365
import java.util.List;
6466
import java.util.Map;
6567
import java.util.NoSuchElementException;
@@ -186,6 +188,9 @@ protected enum TransportState {
186188

187189
protected final ConcurrentHashMap<Integer, Inbound<?>> ongoingCalls;
188190

191+
@GuardedBy("this")
192+
private final LinkedHashSet<Integer> callIdsToNotifyWhenReady = new LinkedHashSet<>();
193+
189194
@GuardedBy("this")
190195
protected Attributes attributes;
191196

@@ -529,9 +534,18 @@ final void handleAcknowledgedBytes(long numBytes) {
529534
logger.log(
530535
Level.FINE,
531536
"handleAcknowledgedBytes: Transmit Window No-Longer Full. Unblock calls: " + this);
532-
// We're ready again, and need to poke any waiting transactions.
533-
for (Inbound<?> inbound : ongoingCalls.values()) {
534-
inbound.onTransportReady();
537+
538+
// The LinkedHashSet contract guarantees that an id already present in this collection will
539+
// not lose its priority if we re-insert it here.
540+
callIdsToNotifyWhenReady.addAll(ongoingCalls.keySet());
541+
542+
Iterator<Integer> i = callIdsToNotifyWhenReady.iterator();
543+
while (isReady() && i.hasNext()) {
544+
Inbound<?> inbound = ongoingCalls.get(i.next());
545+
i.remove();
546+
if (inbound != null) { // Calls can be removed out from under us.
547+
inbound.onTransportReady();
548+
}
535549
}
536550
}
537551
}

0 commit comments

Comments
 (0)