Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 31 additions & 13 deletions xds/src/main/java/io/grpc/xds/ClientXdsClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,12 @@ final class ClientXdsClient extends AbstractXdsClient {
@VisibleForTesting
static final int INITIAL_RESOURCE_FETCH_TIMEOUT_SEC = 15;
@VisibleForTesting
static final long DEFAULT_RING_HASH_LB_POLICY_MIN_RING_SIZE = 1024L;
@VisibleForTesting
static final long DEFAULT_RING_HASH_LB_POLICY_MAX_RING_SIZE = 8 * 1024 * 1024L;
@VisibleForTesting
static final long MAX_RING_HASH_LB_POLICY_RING_SIZE = 8 * 1024 * 1024L;
@VisibleForTesting
static final String AGGREGATE_CLUSTER_TYPE_NAME = "envoy.clusters.aggregate";
@VisibleForTesting
static final String HASH_POLICY_FILTER_STATE_KEY = "io.grpc.channel_id";
Expand Down Expand Up @@ -790,7 +796,7 @@ protected void handleCdsResponse(String versionInfo, List<Any> resources, String
// Process Cluster into CdsUpdate.
CdsUpdate cdsUpdate;
try {
cdsUpdate = processCluster(cluster, retainedEdsResources);
cdsUpdate = parseCluster(cluster, retainedEdsResources);
} catch (ResourceInvalidException e) {
errors.add(
"CDS response Cluster '" + clusterName + "' validation error: " + e.getMessage());
Expand Down Expand Up @@ -818,7 +824,8 @@ protected void handleCdsResponse(String versionInfo, List<Any> resources, String
}
}

private static CdsUpdate processCluster(Cluster cluster, Set<String> retainedEdsResources)
@VisibleForTesting
static CdsUpdate parseCluster(Cluster cluster, Set<String> retainedEdsResources)
throws ResourceInvalidException {
StructOrError<CdsUpdate.Builder> structOrError;
switch (cluster.getClusterDiscoveryTypeCase()) {
Expand All @@ -830,26 +837,36 @@ private static CdsUpdate processCluster(Cluster cluster, Set<String> retainedEds
break;
case CLUSTERDISCOVERYTYPE_NOT_SET:
default:
throw new ResourceInvalidException("Unspecified cluster discovery type");
throw new ResourceInvalidException(
"Cluster " + cluster.getName() + ": unspecified cluster discovery type");
}
if (structOrError.getErrorDetail() != null) {
throw new ResourceInvalidException(structOrError.getErrorDetail());
}

CdsUpdate.Builder updateBuilder = structOrError.getStruct();

if (cluster.getLbPolicy() == LbPolicy.RING_HASH) {
RingHashLbConfig lbConfig = cluster.getRingHashLbConfig();
if (lbConfig.getHashFunction() != RingHashLbConfig.HashFunction.XX_HASH) {
long minRingSize =
lbConfig.hasMinimumRingSize()
? lbConfig.getMinimumRingSize().getValue()
: DEFAULT_RING_HASH_LB_POLICY_MIN_RING_SIZE;
long maxRingSize =
lbConfig.hasMaximumRingSize()
? lbConfig.getMaximumRingSize().getValue()
: DEFAULT_RING_HASH_LB_POLICY_MAX_RING_SIZE;
if (lbConfig.getHashFunction() != RingHashLbConfig.HashFunction.XX_HASH
|| minRingSize > maxRingSize
|| maxRingSize > MAX_RING_HASH_LB_POLICY_RING_SIZE) {
throw new ResourceInvalidException(
"Unsupported ring hash function: " + lbConfig.getHashFunction());
"Cluster " + cluster.getName() + ": invalid ring_hash_lb_config: " + lbConfig);
}
updateBuilder.lbPolicy(CdsUpdate.LbPolicy.RING_HASH,
lbConfig.getMinimumRingSize().getValue(), lbConfig.getMaximumRingSize().getValue());
updateBuilder.ringHashLbPolicy(minRingSize, maxRingSize);
} else if (cluster.getLbPolicy() == LbPolicy.ROUND_ROBIN) {
updateBuilder.lbPolicy(CdsUpdate.LbPolicy.ROUND_ROBIN);
updateBuilder.roundRobinLbPolicy();
} else {
throw new ResourceInvalidException("Unsupported lb policy: " + cluster.getLbPolicy());
throw new ResourceInvalidException(
"Cluster " + cluster.getName() + ": unsupported lb policy: " + cluster.getLbPolicy());
}

return updateBuilder.build();
Expand Down Expand Up @@ -1610,14 +1627,15 @@ private void notifyWatcher(ResourceWatcher watcher, ResourceUpdate update) {
}
}

private static final class ResourceInvalidException extends Exception {
@VisibleForTesting
static final class ResourceInvalidException extends Exception {
private static final long serialVersionUID = 0L;

public ResourceInvalidException(String message) {
private ResourceInvalidException(String message) {
super(message, null, false, false);
}

public ResourceInvalidException(String message, Throwable cause) {
private ResourceInvalidException(String message, Throwable cause) {
super(cause != null ? message + ": " + cause.getMessage() : message, cause, false, false);
}
}
Expand Down
23 changes: 19 additions & 4 deletions xds/src/main/java/io/grpc/xds/RingHashLoadBalancerProvider.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package io.grpc.xds;

import com.google.common.annotations.VisibleForTesting;
import io.grpc.Internal;
import io.grpc.LoadBalancer;
import io.grpc.LoadBalancer.Helper;
Expand All @@ -32,6 +33,17 @@
@Internal
public final class RingHashLoadBalancerProvider extends LoadBalancerProvider {

// Same as ClientXdsClient.DEFAULT_RING_HASH_LB_POLICY_MIN_RING_SIZE
@VisibleForTesting
static final long DEFAULT_MIN_RING_SIZE = 1024L;
// Same as ClientXdsClient.DEFAULT_RING_HASH_LB_POLICY_MAX_RING_SIZE
@VisibleForTesting
static final long DEFAULT_MAX_RING_SIZE = 8 * 1024 * 1024L;
// Maximum number of ring entries allowed. Setting this too large can result in slow
// ring construction and OOM error.
// Same as ClientXdsClient.MAX_RING_HASH_LB_POLICY_RING_SIZE
static final long MAX_RING_SIZE = 8 * 1024 * 1024L;

private static final boolean enableRingHash =
Boolean.parseBoolean(System.getenv("GRPC_XDS_EXPERIMENTAL_ENABLE_RING_HASH"));

Expand Down Expand Up @@ -59,11 +71,14 @@ public String getPolicyName() {
public ConfigOrError parseLoadBalancingPolicyConfig(Map<String, ?> rawLoadBalancingPolicyConfig) {
Long minRingSize = JsonUtil.getNumberAsLong(rawLoadBalancingPolicyConfig, "minRingSize");
Long maxRingSize = JsonUtil.getNumberAsLong(rawLoadBalancingPolicyConfig, "maxRingSize");
if (minRingSize == null || maxRingSize == null) {
return ConfigOrError.fromError(Status.INVALID_ARGUMENT.withDescription(
"Missing 'mingRingSize'/'maxRingSize'"));
if (minRingSize == null) {
minRingSize = DEFAULT_MIN_RING_SIZE;
}
if (maxRingSize == null) {
maxRingSize = DEFAULT_MAX_RING_SIZE;
}
if (minRingSize <= 0 || maxRingSize <= 0 || minRingSize > maxRingSize) {
if (minRingSize <= 0 || maxRingSize <= 0 || minRingSize > maxRingSize
|| maxRingSize > MAX_RING_SIZE) {
return ConfigOrError.fromError(Status.INVALID_ARGUMENT.withDescription(
"Invalid 'mingRingSize'/'maxRingSize'"));
}
Expand Down
15 changes: 10 additions & 5 deletions xds/src/main/java/io/grpc/xds/XdsClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -289,16 +289,21 @@ abstract static class Builder {
// Private, use one of the static factory methods instead.
protected abstract Builder clusterType(ClusterType clusterType);

abstract Builder lbPolicy(LbPolicy lbPolicy);
// Private, use roundRobinLbPolicy() or ringHashLbPolicy(long, long).
protected abstract Builder lbPolicy(LbPolicy lbPolicy);

Builder lbPolicy(LbPolicy lbPolicy, long minRingSize, long maxRingSize) {
return this.lbPolicy(lbPolicy).minRingSize(minRingSize).maxRingSize(maxRingSize);
Builder roundRobinLbPolicy() {
return this.lbPolicy(LbPolicy.ROUND_ROBIN);
}

// Private, use lbPolicy(LbPolicy, long, long).
Builder ringHashLbPolicy(long minRingSize, long maxRingSize) {
return this.lbPolicy(LbPolicy.RING_HASH).minRingSize(minRingSize).maxRingSize(maxRingSize);
}

// Private, use ringHashLbPolicy(long, long).
protected abstract Builder minRingSize(long minRingSize);

// Private, use lbPolicy(.LbPolicy, long, long)
// Private, use ringHashLbPolicy(long, long).
protected abstract Builder maxRingSize(long maxRingSize);

// Private, use CdsUpdate.forEds() instead.
Expand Down
51 changes: 25 additions & 26 deletions xds/src/test/java/io/grpc/xds/CdsLoadBalancer2Test.java
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@
import io.grpc.xds.EnvoyServerProtoData.UpstreamTlsContext;
import io.grpc.xds.RingHashLoadBalancer.RingHashConfig;
import io.grpc.xds.XdsClient.CdsUpdate;
import io.grpc.xds.XdsClient.CdsUpdate.LbPolicy;
import io.grpc.xds.internal.sds.CommonTlsContextTestsUtil;
import java.util.ArrayList;
import java.util.Arrays;
Expand Down Expand Up @@ -148,7 +147,7 @@ public void tearDown() {
public void discoverTopLevelEdsCluster() {
CdsUpdate update =
CdsUpdate.forEds(CLUSTER, EDS_SERVICE_NAME, LRS_SERVER_NAME, 100L, upstreamTlsContext)
.lbPolicy(LbPolicy.ROUND_ROBIN).build();
.roundRobinLbPolicy().build();
xdsClient.deliverCdsUpdate(CLUSTER, update);
assertThat(childBalancers).hasSize(1);
FakeLoadBalancer childBalancer = Iterables.getOnlyElement(childBalancers);
Expand All @@ -165,7 +164,7 @@ public void discoverTopLevelEdsCluster() {
public void discoverTopLevelLogicalDnsCluster() {
CdsUpdate update =
CdsUpdate.forLogicalDns(CLUSTER, DNS_HOST_NAME, LRS_SERVER_NAME, 100L, upstreamTlsContext)
.lbPolicy(LbPolicy.ROUND_ROBIN).build();
.roundRobinLbPolicy().build();
xdsClient.deliverCdsUpdate(CLUSTER, update);
assertThat(childBalancers).hasSize(1);
FakeLoadBalancer childBalancer = Iterables.getOnlyElement(childBalancers);
Expand All @@ -192,7 +191,7 @@ public void nonAggregateCluster_resourceNotExist_returnErrorPicker() {
public void nonAggregateCluster_resourceUpdate() {
CdsUpdate update =
CdsUpdate.forEds(CLUSTER, null, null, 100L, upstreamTlsContext)
.lbPolicy(LbPolicy.ROUND_ROBIN).build();
.roundRobinLbPolicy().build();
xdsClient.deliverCdsUpdate(CLUSTER, update);
assertThat(childBalancers).hasSize(1);
FakeLoadBalancer childBalancer = Iterables.getOnlyElement(childBalancers);
Expand All @@ -202,7 +201,7 @@ public void nonAggregateCluster_resourceUpdate() {
100L, upstreamTlsContext);

update = CdsUpdate.forEds(CLUSTER, EDS_SERVICE_NAME, LRS_SERVER_NAME, 200L, null)
.lbPolicy(LbPolicy.ROUND_ROBIN).build();
.roundRobinLbPolicy().build();
xdsClient.deliverCdsUpdate(CLUSTER, update);
childLbConfig = (ClusterResolverConfig) childBalancer.config;
instance = Iterables.getOnlyElement(childLbConfig.discoveryMechanisms);
Expand All @@ -214,7 +213,7 @@ public void nonAggregateCluster_resourceUpdate() {
public void nonAggregateCluster_resourceRevoked() {
CdsUpdate update =
CdsUpdate.forLogicalDns(CLUSTER, DNS_HOST_NAME, null, 100L, upstreamTlsContext)
.lbPolicy(LbPolicy.ROUND_ROBIN).build();
.roundRobinLbPolicy().build();
xdsClient.deliverCdsUpdate(CLUSTER, update);
assertThat(childBalancers).hasSize(1);
FakeLoadBalancer childBalancer = Iterables.getOnlyElement(childBalancers);
Expand All @@ -240,7 +239,7 @@ public void discoverAggregateCluster() {
// CLUSTER (aggr.) -> [cluster1 (aggr.), cluster2 (logical DNS)]
CdsUpdate update =
CdsUpdate.forAggregate(CLUSTER, Arrays.asList(cluster1, cluster2))
.lbPolicy(LbPolicy.RING_HASH, 100L, 1000L).build();
.ringHashLbPolicy(100L, 1000L).build();
xdsClient.deliverCdsUpdate(CLUSTER, update);
assertThat(xdsClient.watchers.keySet()).containsExactly(CLUSTER, cluster1, cluster2);
assertThat(childBalancers).isEmpty();
Expand All @@ -249,24 +248,24 @@ public void discoverAggregateCluster() {
// cluster1 (aggr.) -> [cluster3 (EDS), cluster4 (EDS)]
CdsUpdate update1 =
CdsUpdate.forAggregate(cluster1, Arrays.asList(cluster3, cluster4))
.lbPolicy(LbPolicy.ROUND_ROBIN).build();
.roundRobinLbPolicy().build();
xdsClient.deliverCdsUpdate(cluster1, update1);
assertThat(xdsClient.watchers.keySet()).containsExactly(
CLUSTER, cluster1, cluster2, cluster3, cluster4);
assertThat(childBalancers).isEmpty();
CdsUpdate update3 =
CdsUpdate.forEds(cluster3, EDS_SERVICE_NAME, LRS_SERVER_NAME, 200L, upstreamTlsContext)
.lbPolicy(LbPolicy.ROUND_ROBIN).build();
.roundRobinLbPolicy().build();
xdsClient.deliverCdsUpdate(cluster3, update3);
assertThat(childBalancers).isEmpty();
CdsUpdate update2 =
CdsUpdate.forLogicalDns(cluster2, DNS_HOST_NAME, null, 100L, null)
.lbPolicy(LbPolicy.ROUND_ROBIN).build();
.roundRobinLbPolicy().build();
xdsClient.deliverCdsUpdate(cluster2, update2);
assertThat(childBalancers).isEmpty();
CdsUpdate update4 =
CdsUpdate.forEds(cluster4, null, LRS_SERVER_NAME, 300L, null)
.lbPolicy(LbPolicy.ROUND_ROBIN).build();
.roundRobinLbPolicy().build();
xdsClient.deliverCdsUpdate(cluster4, update4);
assertThat(childBalancers).hasSize(1); // all non-aggregate clusters discovered
FakeLoadBalancer childBalancer = Iterables.getOnlyElement(childBalancers);
Expand All @@ -293,7 +292,7 @@ public void aggregateCluster_noNonAggregateClusterExits_returnErrorPicker() {
// CLUSTER (aggr.) -> [cluster1 (EDS)]
CdsUpdate update =
CdsUpdate.forAggregate(CLUSTER, Collections.singletonList(cluster1))
.lbPolicy(LbPolicy.ROUND_ROBIN).build();
.roundRobinLbPolicy().build();
xdsClient.deliverCdsUpdate(CLUSTER, update);
assertThat(xdsClient.watchers.keySet()).containsExactly(CLUSTER, cluster1);
xdsClient.deliverResourceNotExist(cluster1);
Expand All @@ -311,16 +310,16 @@ public void aggregateCluster_descendantClustersRevoked() {
// CLUSTER (aggr.) -> [cluster1 (EDS), cluster2 (logical DNS)]
CdsUpdate update =
CdsUpdate.forAggregate(CLUSTER, Arrays.asList(cluster1, cluster2))
.lbPolicy(LbPolicy.ROUND_ROBIN).build();
.roundRobinLbPolicy().build();
xdsClient.deliverCdsUpdate(CLUSTER, update);
assertThat(xdsClient.watchers.keySet()).containsExactly(CLUSTER, cluster1, cluster2);
CdsUpdate update1 =
CdsUpdate.forEds(cluster1, EDS_SERVICE_NAME, LRS_SERVER_NAME, 200L, upstreamTlsContext)
.lbPolicy(LbPolicy.ROUND_ROBIN).build();
.roundRobinLbPolicy().build();
xdsClient.deliverCdsUpdate(cluster1, update1);
CdsUpdate update2 =
CdsUpdate.forLogicalDns(cluster2, DNS_HOST_NAME, LRS_SERVER_NAME, 100L, null)
.lbPolicy(LbPolicy.ROUND_ROBIN).build();
.roundRobinLbPolicy().build();
xdsClient.deliverCdsUpdate(cluster2, update2);
FakeLoadBalancer childBalancer = Iterables.getOnlyElement(childBalancers);
ClusterResolverConfig childLbConfig = (ClusterResolverConfig) childBalancer.config;
Expand Down Expand Up @@ -358,16 +357,16 @@ public void aggregateCluster_rootClusterRevoked() {
// CLUSTER (aggr.) -> [cluster1 (EDS), cluster2 (logical DNS)]
CdsUpdate update =
CdsUpdate.forAggregate(CLUSTER, Arrays.asList(cluster1, cluster2))
.lbPolicy(LbPolicy.ROUND_ROBIN).build();
.roundRobinLbPolicy().build();
xdsClient.deliverCdsUpdate(CLUSTER, update);
assertThat(xdsClient.watchers.keySet()).containsExactly(CLUSTER, cluster1, cluster2);
CdsUpdate update1 =
CdsUpdate.forEds(cluster1, EDS_SERVICE_NAME, LRS_SERVER_NAME, 200L, upstreamTlsContext)
.lbPolicy(LbPolicy.ROUND_ROBIN).build();
.roundRobinLbPolicy().build();
xdsClient.deliverCdsUpdate(cluster1, update1);
CdsUpdate update2 =
CdsUpdate.forLogicalDns(cluster2, DNS_HOST_NAME, LRS_SERVER_NAME, 100L, null)
.lbPolicy(LbPolicy.ROUND_ROBIN).build();
.roundRobinLbPolicy().build();
xdsClient.deliverCdsUpdate(cluster2, update2);
FakeLoadBalancer childBalancer = Iterables.getOnlyElement(childBalancers);
ClusterResolverConfig childLbConfig = (ClusterResolverConfig) childBalancer.config;
Expand Down Expand Up @@ -395,28 +394,28 @@ public void aggregateCluster_intermediateClusterChanges() {
// CLUSTER (aggr.) -> [cluster1]
CdsUpdate update =
CdsUpdate.forAggregate(CLUSTER, Collections.singletonList(cluster1))
.lbPolicy(LbPolicy.ROUND_ROBIN).build();
.roundRobinLbPolicy().build();
xdsClient.deliverCdsUpdate(CLUSTER, update);
assertThat(xdsClient.watchers.keySet()).containsExactly(CLUSTER, cluster1);

// CLUSTER (aggr.) -> [cluster2 (aggr.)]
String cluster2 = "cluster-02.googleapis.com";
update =
CdsUpdate.forAggregate(CLUSTER, Collections.singletonList(cluster2))
.lbPolicy(LbPolicy.ROUND_ROBIN).build();
.roundRobinLbPolicy().build();
xdsClient.deliverCdsUpdate(CLUSTER, update);
assertThat(xdsClient.watchers.keySet()).containsExactly(CLUSTER, cluster2);

// cluster2 (aggr.) -> [cluster3 (EDS)]
String cluster3 = "cluster-03.googleapis.com";
CdsUpdate update2 =
CdsUpdate.forAggregate(cluster2, Collections.singletonList(cluster3))
.lbPolicy(LbPolicy.ROUND_ROBIN).build();
.roundRobinLbPolicy().build();
xdsClient.deliverCdsUpdate(cluster2, update2);
assertThat(xdsClient.watchers.keySet()).containsExactly(CLUSTER, cluster2, cluster3);
CdsUpdate update3 =
CdsUpdate.forEds(cluster3, EDS_SERVICE_NAME, LRS_SERVER_NAME, 100L, upstreamTlsContext)
.lbPolicy(LbPolicy.ROUND_ROBIN).build();
.roundRobinLbPolicy().build();
xdsClient.deliverCdsUpdate(cluster3, update3);
FakeLoadBalancer childBalancer = Iterables.getOnlyElement(childBalancers);
ClusterResolverConfig childLbConfig = (ClusterResolverConfig) childBalancer.config;
Expand All @@ -443,7 +442,7 @@ public void aggregateCluster_discoveryErrorBeforeChildLbCreated_returnErrorPicke
// CLUSTER (aggr.) -> [cluster1]
CdsUpdate update =
CdsUpdate.forAggregate(CLUSTER, Collections.singletonList(cluster1))
.lbPolicy(LbPolicy.ROUND_ROBIN).build();
.roundRobinLbPolicy().build();
xdsClient.deliverCdsUpdate(CLUSTER, update);
assertThat(xdsClient.watchers.keySet()).containsExactly(CLUSTER, cluster1);
Status error = Status.RESOURCE_EXHAUSTED.withDescription("OOM");
Expand All @@ -460,11 +459,11 @@ public void aggregateCluster_discoveryErrorAfterChildLbCreated_propagateToChildL
// CLUSTER (aggr.) -> [cluster1 (logical DNS)]
CdsUpdate update =
CdsUpdate.forAggregate(CLUSTER, Collections.singletonList(cluster1))
.lbPolicy(LbPolicy.ROUND_ROBIN).build();
.roundRobinLbPolicy().build();
xdsClient.deliverCdsUpdate(CLUSTER, update);
CdsUpdate update1 =
CdsUpdate.forLogicalDns(cluster1, DNS_HOST_NAME, LRS_SERVER_NAME, 200L, null)
.lbPolicy(LbPolicy.ROUND_ROBIN).build();
.roundRobinLbPolicy().build();
xdsClient.deliverCdsUpdate(cluster1, update1);
FakeLoadBalancer childLb = Iterables.getOnlyElement(childBalancers);
ClusterResolverConfig childLbConfig = (ClusterResolverConfig) childLb.config;
Expand All @@ -489,7 +488,7 @@ public void handleNameResolutionErrorFromUpstream_beforeChildLbCreated_returnErr
public void handleNameResolutionErrorFromUpstream_afterChildLbCreated_fallThrough() {
CdsUpdate update =
CdsUpdate.forEds(CLUSTER, EDS_SERVICE_NAME, LRS_SERVER_NAME, 100L, upstreamTlsContext)
.lbPolicy(LbPolicy.ROUND_ROBIN).build();
.roundRobinLbPolicy().build();
xdsClient.deliverCdsUpdate(CLUSTER, update);
FakeLoadBalancer childBalancer = Iterables.getOnlyElement(childBalancers);
assertThat(childBalancer.shutdown).isFalse();
Expand Down
Loading