2222
2323import com .google .common .annotations .VisibleForTesting ;
2424import com .google .common .base .MoreObjects ;
25+ import io .grpc .ChannelLogger .ChannelLogLevel ;
2526import io .grpc .ConnectivityState ;
27+ import io .grpc .LoadBalancer ;
2628import io .grpc .LoadBalancer .Helper ;
2729import io .grpc .LoadBalancer .Subchannel ;
2830import io .grpc .LoadBalancer .SubchannelPicker ;
2931import io .grpc .LoadBalancerProvider ;
3032import io .grpc .LoadBalancerRegistry ;
33+ import io .grpc .NameResolver .ConfigOrError ;
3134import io .grpc .internal .ObjectPool ;
3235import io .grpc .rls .ChildLoadBalancerHelper .ChildLoadBalancerHelperProvider ;
3336import io .grpc .rls .RlsProtoData .RouteLookupConfig ;
@@ -191,33 +194,49 @@ public String toString() {
191194
192195 /** Factory for {@link ChildPolicyWrapper}. */
193196 static final class RefCountedChildPolicyWrapperFactory {
197+ // GuardedBy CachingRlsLbClient.lock
194198 @ VisibleForTesting
195199 final Map <String /* target */ , RefCountedChildPolicyWrapper > childPolicyMap =
196200 new HashMap <>();
197201
198202 private final ChildLoadBalancerHelperProvider childLbHelperProvider ;
199203 private final ChildLbStatusListener childLbStatusListener ;
204+ private final ChildLoadBalancingPolicy childPolicy ;
205+ private final ResolvedAddressFactory childLbResolvedAddressFactory ;
200206
201207 public RefCountedChildPolicyWrapperFactory (
208+ ChildLoadBalancingPolicy childPolicy ,
209+ ResolvedAddressFactory childLbResolvedAddressFactory ,
202210 ChildLoadBalancerHelperProvider childLbHelperProvider ,
203211 ChildLbStatusListener childLbStatusListener ) {
212+ this .childPolicy = checkNotNull (childPolicy , "childPolicy" );
213+ this .childLbResolvedAddressFactory =
214+ checkNotNull (childLbResolvedAddressFactory , "childLbResolvedAddressFactory" );
204215 this .childLbHelperProvider = checkNotNull (childLbHelperProvider , "childLbHelperProvider" );
205216 this .childLbStatusListener = checkNotNull (childLbStatusListener , "childLbStatusListener" );
206217 }
207218
219+ // GuardedBy CachingRlsLbClient.lock
208220 ChildPolicyWrapper createOrGet (String target ) {
209221 // TODO(creamsoup) check if the target is valid or not
210222 RefCountedChildPolicyWrapper pooledChildPolicyWrapper = childPolicyMap .get (target );
211223 if (pooledChildPolicyWrapper == null ) {
212- ChildPolicyWrapper childPolicyWrapper =
213- new ChildPolicyWrapper (target , childLbHelperProvider , childLbStatusListener );
224+ ChildPolicyWrapper childPolicyWrapper = new ChildPolicyWrapper (
225+ target , childPolicy , childLbResolvedAddressFactory , childLbHelperProvider ,
226+ childLbStatusListener );
214227 pooledChildPolicyWrapper = RefCountedChildPolicyWrapper .of (childPolicyWrapper );
215228 childPolicyMap .put (target , pooledChildPolicyWrapper );
229+ return pooledChildPolicyWrapper .getObject ();
230+ } else {
231+ ChildPolicyWrapper childPolicyWrapper = pooledChildPolicyWrapper .getObject ();
232+ if (childPolicyWrapper .getPicker () != null ) {
233+ childPolicyWrapper .refreshState ();
234+ }
235+ return childPolicyWrapper ;
216236 }
217-
218- return pooledChildPolicyWrapper .getObject ();
219237 }
220238
239+ // GuardedBy CachingRlsLbClient.lock
221240 void release (ChildPolicyWrapper childPolicyWrapper ) {
222241 checkNotNull (childPolicyWrapper , "childPolicyWrapper" );
223242 String target = childPolicyWrapper .getTarget ();
@@ -238,16 +257,36 @@ static final class ChildPolicyWrapper {
238257
239258 private final String target ;
240259 private final ChildPolicyReportingHelper helper ;
260+ private final LoadBalancer lb ;
241261 private volatile SubchannelPicker picker ;
242262 private ConnectivityState state ;
243263
244264 public ChildPolicyWrapper (
245265 String target ,
266+ ChildLoadBalancingPolicy childPolicy ,
267+ final ResolvedAddressFactory childLbResolvedAddressFactory ,
246268 ChildLoadBalancerHelperProvider childLbHelperProvider ,
247269 ChildLbStatusListener childLbStatusListener ) {
248270 this .target = target ;
249271 this .helper =
250272 new ChildPolicyReportingHelper (childLbHelperProvider , childLbStatusListener );
273+ LoadBalancerProvider lbProvider = childPolicy .getEffectiveLbProvider ();
274+ final ConfigOrError lbConfig =
275+ lbProvider
276+ .parseLoadBalancingPolicyConfig (
277+ childPolicy .getEffectiveChildPolicy (target ));
278+ this .lb = lbProvider .newLoadBalancer (helper );
279+ helper .getChannelLogger ().log (
280+ ChannelLogLevel .DEBUG , "RLS child lb created. config: {0}" , lbConfig .getConfig ());
281+ helper .getSynchronizationContext ().execute (
282+ new Runnable () {
283+ @ Override
284+ public void run () {
285+ lb .handleResolvedAddresses (
286+ childLbResolvedAddressFactory .create (lbConfig .getConfig ()));
287+ lb .requestConnection ();
288+ }
289+ });
251290 }
252291
253292 String getTarget () {
@@ -263,7 +302,25 @@ ChildPolicyReportingHelper getHelper() {
263302 }
264303
265304 void refreshState () {
266- helper .updateBalancingState (state , picker );
305+ helper .getSynchronizationContext ().execute (
306+ new Runnable () {
307+ @ Override
308+ public void run () {
309+ helper .updateBalancingState (state , picker );
310+ }
311+ }
312+ );
313+ }
314+
315+ void shutdown () {
316+ helper .getSynchronizationContext ().execute (
317+ new Runnable () {
318+ @ Override
319+ public void run () {
320+ lb .shutdown ();
321+ }
322+ }
323+ );
267324 }
268325
269326 @ Override
@@ -346,6 +403,7 @@ public ChildPolicyWrapper returnObject(Object object) {
346403 long newCnt = refCnt .decrementAndGet ();
347404 checkState (newCnt != -1 , "Cannot return never pooled childPolicyWrapper" );
348405 if (newCnt == 0 ) {
406+ childPolicyWrapper .shutdown ();
349407 childPolicyWrapper = null ;
350408 }
351409 return null ;
0 commit comments