Skip to content

Commit a46560e

Browse files
authored
xds: refactor XdsClient in preparation to support federation (#8630)
See go/java-xds-client-api-for-federation for detailed description
1 parent 14eb3b2 commit a46560e

File tree

7 files changed

+363
-236
lines changed

7 files changed

+363
-236
lines changed

xds/src/main/java/io/grpc/xds/AbstractXdsClient.java

Lines changed: 58 additions & 122 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import io.envoyproxy.envoy.service.discovery.v3.AggregatedDiscoveryServiceGrpc;
2929
import io.envoyproxy.envoy.service.discovery.v3.DiscoveryRequest;
3030
import io.envoyproxy.envoy.service.discovery.v3.DiscoveryResponse;
31+
import io.grpc.Channel;
3132
import io.grpc.Context;
3233
import io.grpc.InternalLogId;
3334
import io.grpc.ManagedChannel;
@@ -36,6 +37,11 @@
3637
import io.grpc.SynchronizationContext.ScheduledHandle;
3738
import io.grpc.internal.BackoffPolicy;
3839
import io.grpc.stub.StreamObserver;
40+
import io.grpc.xds.Bootstrapper.ServerInfo;
41+
import io.grpc.xds.ClientXdsClient.XdsChannelFactory;
42+
import io.grpc.xds.EnvoyProtoData.Node;
43+
import io.grpc.xds.XdsClient.ResourceStore;
44+
import io.grpc.xds.XdsClient.XdsResponseHandler;
3945
import io.grpc.xds.XdsLogger.XdsLogLevel;
4046
import java.util.Collection;
4147
import java.util.Collections;
@@ -48,7 +54,7 @@
4854
* Common base type for XdsClient implementations, which encapsulates the layer abstraction of
4955
* the xDS RPC stream.
5056
*/
51-
abstract class AbstractXdsClient extends XdsClient {
57+
final class AbstractXdsClient {
5258

5359
private static final String ADS_TYPE_URL_LDS_V2 = "type.googleapis.com/envoy.api.v2.Listener";
5460
private static final String ADS_TYPE_URL_LDS =
@@ -66,26 +72,18 @@ abstract class AbstractXdsClient extends XdsClient {
6672
private static final String ADS_TYPE_URL_EDS =
6773
"type.googleapis.com/envoy.config.endpoint.v3.ClusterLoadAssignment";
6874

69-
private final SynchronizationContext syncContext = new SynchronizationContext(
70-
new Thread.UncaughtExceptionHandler() {
71-
@Override
72-
public void uncaughtException(Thread t, Throwable e) {
73-
getLogger().log(
74-
XdsLogLevel.ERROR,
75-
"Uncaught exception in XdsClient SynchronizationContext. Panic!",
76-
e);
77-
// TODO(chengyuanzhang): better error handling.
78-
throw new AssertionError(e);
79-
}
80-
});
75+
private final SynchronizationContext syncContext;
8176
private final InternalLogId logId;
8277
private final XdsLogger logger;
78+
private final ServerInfo serverInfo;
8379
private final ManagedChannel channel;
80+
private final XdsResponseHandler xdsResponseHandler;
81+
private final ResourceStore resourceStore;
8482
private final Context context;
8583
private final ScheduledExecutorService timeService;
8684
private final BackoffPolicy.Provider backoffPolicyProvider;
8785
private final Stopwatch stopwatch;
88-
private final Bootstrapper.BootstrapInfo bootstrapInfo;
86+
private final Node bootstrapNode;
8987

9088
// Last successfully applied version_info for each resource type. Starts with empty string.
9189
// A version_info is used to update management server with client's most recent knowledge of
@@ -103,71 +101,42 @@ public void uncaughtException(Thread t, Throwable e) {
103101
@Nullable
104102
private ScheduledHandle rpcRetryTimer;
105103

106-
AbstractXdsClient(ManagedChannel channel, Bootstrapper.BootstrapInfo bootstrapInfo,
107-
Context context, ScheduledExecutorService timeService,
108-
BackoffPolicy.Provider backoffPolicyProvider, Supplier<Stopwatch> stopwatchSupplier) {
109-
this.channel = checkNotNull(channel, "channel");
110-
this.bootstrapInfo = checkNotNull(bootstrapInfo, "bootstrapInfo");
104+
/** An entity that manages ADS RPCs over a single channel. */
105+
// TODO: rename to XdsChannel
106+
AbstractXdsClient(
107+
XdsChannelFactory xdsChannelFactory,
108+
ServerInfo serverInfo,
109+
Node bootstrapNode,
110+
XdsResponseHandler xdsResponseHandler,
111+
ResourceStore resourceStore,
112+
Context context,
113+
ScheduledExecutorService
114+
timeService,
115+
SynchronizationContext syncContext,
116+
BackoffPolicy.Provider backoffPolicyProvider,
117+
Supplier<Stopwatch> stopwatchSupplier) {
118+
this.serverInfo = checkNotNull(serverInfo, "serverInfo");
119+
this.channel = checkNotNull(xdsChannelFactory, "xdsChannelFactory").create(serverInfo);
120+
this.xdsResponseHandler = checkNotNull(xdsResponseHandler, "xdsResponseHandler");
121+
this.resourceStore = checkNotNull(resourceStore, "resourcesSubscriber");
122+
this.bootstrapNode = checkNotNull(bootstrapNode, "bootstrapNode");
111123
this.context = checkNotNull(context, "context");
112124
this.timeService = checkNotNull(timeService, "timeService");
125+
this.syncContext = checkNotNull(syncContext, "syncContext");
113126
this.backoffPolicyProvider = checkNotNull(backoffPolicyProvider, "backoffPolicyProvider");
114127
stopwatch = checkNotNull(stopwatchSupplier, "stopwatchSupplier").get();
115-
logId = InternalLogId.allocate("xds-client", null);
128+
logId = InternalLogId.allocate("xds-client", serverInfo.target());
116129
logger = XdsLogger.withLogId(logId);
117130
logger.log(XdsLogLevel.INFO, "Created");
118131
}
119132

120-
/**
121-
* Called when an LDS response is received.
122-
*/
123-
// Must be synchronized.
124-
protected void handleLdsResponse(String versionInfo, List<Any> resources, String nonce) {
125-
}
126-
127-
/**
128-
* Called when a RDS response is received.
129-
*/
130-
// Must be synchronized.
131-
protected void handleRdsResponse(String versionInfo, List<Any> resources, String nonce) {
132-
}
133-
134-
/**
135-
* Called when a CDS response is received.
136-
*/
137-
// Must be synchronized.
138-
protected void handleCdsResponse(String versionInfo, List<Any> resources, String nonce) {
139-
}
140-
141-
/**
142-
* Called when an EDS response is received.
143-
*/
144-
// Must be synchronized.
145-
protected void handleEdsResponse(String versionInfo, List<Any> resources, String nonce) {
146-
}
147-
148-
/**
149-
* Called when the ADS stream is closed passively.
150-
*/
151-
// Must be synchronized.
152-
protected void handleStreamClosed(Status error) {
153-
}
154-
155-
/**
156-
* Called when the ADS stream has been recreated.
157-
*/
158-
// Must be synchronized.
159-
protected void handleStreamRestarted() {
160-
}
161-
162-
/**
163-
* Called when being shut down.
164-
*/
165-
// Must be synchronized.
166-
protected void handleShutdown() {
133+
/** The underlying channel. */
134+
// Currently, only externally used for LrsClient.
135+
Channel channel() {
136+
return channel;
167137
}
168138

169-
@Override
170-
final void shutdown() {
139+
void shutdown() {
171140
syncContext.execute(new Runnable() {
172141
@Override
173142
public void run() {
@@ -179,49 +148,28 @@ public void run() {
179148
if (rpcRetryTimer != null && rpcRetryTimer.isPending()) {
180149
rpcRetryTimer.cancel();
181150
}
182-
handleShutdown();
151+
channel.shutdown();
183152
}
184153
});
185154
}
186155

187-
@Override
188-
boolean isShutDown() {
189-
return shutdown;
190-
}
191-
192-
@Override
193-
Bootstrapper.BootstrapInfo getBootstrapInfo() {
194-
return bootstrapInfo;
195-
}
196-
197156
@Override
198157
public String toString() {
199158
return logId.toString();
200159
}
201160

202-
/**
203-
* Returns the collection of resources currently subscribing to or {@code null} if not
204-
* subscribing to any resources for the given type.
205-
*
206-
* <p>Note an empty collection indicates subscribing to resources of the given type with
207-
* wildcard mode.
208-
*/
209-
// Must be synchronized.
210-
@Nullable
211-
abstract Collection<String> getSubscribedResources(ResourceType type);
212-
213161
/**
214162
* Updates the resource subscription for the given resource type.
215163
*/
216164
// Must be synchronized.
217-
protected final void adjustResourceSubscription(ResourceType type) {
165+
void adjustResourceSubscription(ResourceType type) {
218166
if (isInBackoff()) {
219167
return;
220168
}
221169
if (adsStream == null) {
222170
startRpcStream();
223171
}
224-
Collection<String> resources = getSubscribedResources(type);
172+
Collection<String> resources = resourceStore.getSubscribedResources(serverInfo, type);
225173
if (resources != null) {
226174
adsStream.sendDiscoveryRequest(type, resources);
227175
}
@@ -232,7 +180,7 @@ protected final void adjustResourceSubscription(ResourceType type) {
232180
* and sends an ACK request to the management server.
233181
*/
234182
// Must be synchronized.
235-
protected final void ackResponse(ResourceType type, String versionInfo, String nonce) {
183+
void ackResponse(ResourceType type, String versionInfo, String nonce) {
236184
switch (type) {
237185
case LDS:
238186
ldsVersion = versionInfo;
@@ -252,7 +200,7 @@ protected final void ackResponse(ResourceType type, String versionInfo, String n
252200
}
253201
logger.log(XdsLogLevel.INFO, "Sending ACK for {0} update, nonce: {1}, current version: {2}",
254202
type, nonce, versionInfo);
255-
Collection<String> resources = getSubscribedResources(type);
203+
Collection<String> resources = resourceStore.getSubscribedResources(serverInfo, type);
256204
if (resources == null) {
257205
resources = Collections.emptyList();
258206
}
@@ -264,34 +212,22 @@ protected final void ackResponse(ResourceType type, String versionInfo, String n
264212
* accepted version) to the management server.
265213
*/
266214
// Must be synchronized.
267-
protected final void nackResponse(ResourceType type, String nonce, String errorDetail) {
215+
void nackResponse(ResourceType type, String nonce, String errorDetail) {
268216
String versionInfo = getCurrentVersion(type);
269217
logger.log(XdsLogLevel.INFO, "Sending NACK for {0} update, nonce: {1}, current version: {2}",
270218
type, nonce, versionInfo);
271-
Collection<String> resources = getSubscribedResources(type);
219+
Collection<String> resources = resourceStore.getSubscribedResources(serverInfo, type);
272220
if (resources == null) {
273221
resources = Collections.emptyList();
274222
}
275223
adsStream.sendDiscoveryRequest(type, versionInfo, resources, nonce, errorDetail);
276224
}
277225

278-
protected final SynchronizationContext getSyncContext() {
279-
return syncContext;
280-
}
281-
282-
protected final ScheduledExecutorService getTimeService() {
283-
return timeService;
284-
}
285-
286-
protected final XdsLogger getLogger() {
287-
return logger;
288-
}
289-
290226
/**
291227
* Returns {@code true} if the resource discovery is currently in backoff.
292228
*/
293229
// Must be synchronized.
294-
protected final boolean isInBackoff() {
230+
boolean isInBackoff() {
295231
return rpcRetryTimer != null && rpcRetryTimer.isPending();
296232
}
297233

@@ -302,7 +238,7 @@ protected final boolean isInBackoff() {
302238
// Must be synchronized.
303239
private void startRpcStream() {
304240
checkState(adsStream == null, "Previous adsStream has not been cleared yet");
305-
if (bootstrapInfo.servers().get(0).useProtocolV3()) {
241+
if (serverInfo.useProtocolV3()) {
306242
adsStream = new AdsStreamV3();
307243
} else {
308244
adsStream = new AdsStreamV2();
@@ -317,8 +253,8 @@ private void startRpcStream() {
317253
stopwatch.reset().start();
318254
}
319255

256+
/** Returns the latest accepted version of the given resource type. */
320257
// Must be synchronized.
321-
@Override
322258
String getCurrentVersion(ResourceType type) {
323259
String version;
324260
switch (type) {
@@ -353,16 +289,16 @@ public void run() {
353289
if (type == ResourceType.UNKNOWN) {
354290
continue;
355291
}
356-
Collection<String> resources = getSubscribedResources(type);
292+
Collection<String> resources = resourceStore.getSubscribedResources(serverInfo, type);
357293
if (resources != null) {
358294
adsStream.sendDiscoveryRequest(type, resources);
359295
}
360296
}
361-
handleStreamRestarted();
297+
xdsResponseHandler.handleStreamRestarted(serverInfo);
362298
}
363299
}
364300

365-
protected enum ResourceType {
301+
enum ResourceType {
366302
UNKNOWN, LDS, RDS, CDS, EDS;
367303

368304
String typeUrl() {
@@ -488,19 +424,19 @@ final void handleRpcResponse(
488424
switch (type) {
489425
case LDS:
490426
ldsRespNonce = nonce;
491-
handleLdsResponse(versionInfo, resources, nonce);
427+
xdsResponseHandler.handleLdsResponse(serverInfo, versionInfo, resources, nonce);
492428
break;
493429
case RDS:
494430
rdsRespNonce = nonce;
495-
handleRdsResponse(versionInfo, resources, nonce);
431+
xdsResponseHandler.handleRdsResponse(serverInfo, versionInfo, resources, nonce);
496432
break;
497433
case CDS:
498434
cdsRespNonce = nonce;
499-
handleCdsResponse(versionInfo, resources, nonce);
435+
xdsResponseHandler.handleCdsResponse(serverInfo, versionInfo, resources, nonce);
500436
break;
501437
case EDS:
502438
edsRespNonce = nonce;
503-
handleEdsResponse(versionInfo, resources, nonce);
439+
xdsResponseHandler.handleEdsResponse(serverInfo, versionInfo, resources, nonce);
504440
break;
505441
case UNKNOWN:
506442
default:
@@ -526,7 +462,7 @@ private void handleRpcStreamClosed(Status error) {
526462
"ADS stream closed with status {0}: {1}. Cause: {2}",
527463
error.getCode(), error.getDescription(), error.getCause());
528464
closed = true;
529-
handleStreamClosed(error);
465+
xdsResponseHandler.handleStreamClosed(error);
530466
cleanUp();
531467
if (responseReceived || retryBackoffPolicy == null) {
532468
// Reset the backoff sequence if had received a response, or backoff sequence
@@ -619,7 +555,7 @@ void sendDiscoveryRequest(ResourceType type, String versionInfo, Collection<Stri
619555
io.envoyproxy.envoy.api.v2.DiscoveryRequest.Builder builder =
620556
io.envoyproxy.envoy.api.v2.DiscoveryRequest.newBuilder()
621557
.setVersionInfo(versionInfo)
622-
.setNode(bootstrapInfo.node().toEnvoyProtoNodeV2())
558+
.setNode(bootstrapNode.toEnvoyProtoNodeV2())
623559
.addAllResourceNames(resources)
624560
.setTypeUrl(type.typeUrlV2())
625561
.setResponseNonce(nonce);
@@ -699,7 +635,7 @@ void sendDiscoveryRequest(ResourceType type, String versionInfo, Collection<Stri
699635
DiscoveryRequest.Builder builder =
700636
DiscoveryRequest.newBuilder()
701637
.setVersionInfo(versionInfo)
702-
.setNode(bootstrapInfo.node().toEnvoyProtoNode())
638+
.setNode(bootstrapNode.toEnvoyProtoNode())
703639
.addAllResourceNames(resources)
704640
.setTypeUrl(type.typeUrl())
705641
.setResponseNonce(nonce);

0 commit comments

Comments
 (0)