Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 18 additions & 7 deletions xds/src/main/java/io/grpc/xds/PriorityLoadBalancer.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
Expand All @@ -66,6 +67,7 @@ final class PriorityLoadBalancer extends LoadBalancer {
private List<String> priorityNames;
// Config for each priority.
private Map<String, PriorityChildConfig> priorityConfigs;
@Nullable private String currentPriority;
private ConnectivityState currentConnectivityState;
private SubchannelPicker currentPicker;

Expand Down Expand Up @@ -113,7 +115,7 @@ public void handleNameResolutionError(Status error) {
}
}
if (gotoTransientFailure) {
updateOverallState(TRANSIENT_FAILURE, new ErrorPicker(error));
updateOverallState(null, TRANSIENT_FAILURE, new ErrorPicker(error));
}
}

Expand All @@ -134,14 +136,14 @@ private void tryNextPriority(boolean reportConnecting) {
new ChildLbState(priority, priorityConfigs.get(priority).ignoreReresolution);
children.put(priority, child);
child.updateResolvedAddresses();
updateOverallState(CONNECTING, BUFFER_PICKER);
updateOverallState(priority, CONNECTING, BUFFER_PICKER);
return; // Give priority i time to connect.
}
ChildLbState child = children.get(priority);
child.reactivate();
if (child.connectivityState.equals(READY) || child.connectivityState.equals(IDLE)) {
logger.log(XdsLogLevel.DEBUG, "Shifted to priority {0}", priority);
updateOverallState(child.connectivityState, child.picker);
updateOverallState(priority, child.connectivityState, child.picker);
for (int j = i + 1; j < priorityNames.size(); j++) {
String p = priorityNames.get(j);
if (children.containsKey(p)) {
Expand All @@ -152,20 +154,28 @@ private void tryNextPriority(boolean reportConnecting) {
}
if (child.failOverTimer != null && child.failOverTimer.isPending()) {
if (reportConnecting) {
updateOverallState(CONNECTING, BUFFER_PICKER);
updateOverallState(priority, CONNECTING, BUFFER_PICKER);
}
return; // Give priority i time to connect.
}
if (priority.equals(currentPriority) && child.connectivityState != TRANSIENT_FAILURE) {
// If the current priority is not changed into TRANSIENT_FAILURE, keep using it.
updateOverallState(priority, child.connectivityState, child.picker);
return;
}
}
// TODO(zdapeng): Include error details of each priority.
logger.log(XdsLogLevel.DEBUG, "All priority failed");
String lastPriority = priorityNames.get(priorityNames.size() - 1);
SubchannelPicker errorPicker = children.get(lastPriority).picker;
updateOverallState(TRANSIENT_FAILURE, errorPicker);
updateOverallState(lastPriority, TRANSIENT_FAILURE, errorPicker);
}

private void updateOverallState(ConnectivityState state, SubchannelPicker picker) {
if (!state.equals(currentConnectivityState) || !picker.equals(currentPicker)) {
private void updateOverallState(
@Nullable String priority, ConnectivityState state, SubchannelPicker picker) {
if (!Objects.equals(priority, currentPriority) || !state.equals(currentConnectivityState)
|| !picker.equals(currentPicker)) {
currentPriority = priority;
currentConnectivityState = state;
currentPicker = picker;
helper.updateBalancingState(state, picker);
Expand Down Expand Up @@ -201,6 +211,7 @@ public void run() {
picker = new ErrorPicker(
Status.UNAVAILABLE.withDescription("Connection timeout for priority " + priority));
logger.log(XdsLogLevel.DEBUG, "Priority {0} failed over to next", priority);
currentPriority = null; // reset currentPriority to guarantee failover happen
tryNextPriority(true);
}
}
Expand Down
151 changes: 107 additions & 44 deletions xds/src/test/java/io/grpc/xds/PriorityLoadBalancerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -399,6 +399,99 @@ public PickResult pickSubchannel(PickSubchannelArgs args) {
verify(balancer3).shutdown();
}

@Test
public void idleToConnectingDoesNotTriggerFailOver() {
PriorityChildConfig priorityChildConfig0 =
new PriorityChildConfig(new PolicySelection(fooLbProvider, new Object()), true);
PriorityChildConfig priorityChildConfig1 =
new PriorityChildConfig(new PolicySelection(fooLbProvider, new Object()), true);
PriorityLbConfig priorityLbConfig =
new PriorityLbConfig(
ImmutableMap.of("p0", priorityChildConfig0, "p1", priorityChildConfig1),
ImmutableList.of("p0", "p1"));
priorityLb.handleResolvedAddresses(
ResolvedAddresses.newBuilder()
.setAddresses(ImmutableList.<EquivalentAddressGroup>of())
.setLoadBalancingPolicyConfig(priorityLbConfig)
.build());
assertThat(fooBalancers).hasSize(1);
assertThat(fooHelpers).hasSize(1);
Helper helper0 = Iterables.getOnlyElement(fooHelpers);

// p0 gets IDLE.
helper0.updateBalancingState(
IDLE,
BUFFER_PICKER);
assertCurrentPickerIsBufferPicker();

// p0 goes to CONNECTING
helper0.updateBalancingState(
IDLE,
BUFFER_PICKER);
assertCurrentPickerIsBufferPicker();

// no failover happened
assertThat(fooBalancers).hasSize(1);
assertThat(fooHelpers).hasSize(1);
}

@Test
public void readyToConnectDoesNotFailOverButUpdatesPicker() {
PriorityChildConfig priorityChildConfig0 =
new PriorityChildConfig(new PolicySelection(fooLbProvider, new Object()), true);
PriorityChildConfig priorityChildConfig1 =
new PriorityChildConfig(new PolicySelection(fooLbProvider, new Object()), true);
PriorityLbConfig priorityLbConfig =
new PriorityLbConfig(
ImmutableMap.of("p0", priorityChildConfig0, "p1", priorityChildConfig1),
ImmutableList.of("p0", "p1"));
priorityLb.handleResolvedAddresses(
ResolvedAddresses.newBuilder()
.setAddresses(ImmutableList.<EquivalentAddressGroup>of())
.setLoadBalancingPolicyConfig(priorityLbConfig)
.build());
assertThat(fooBalancers).hasSize(1);
assertThat(fooHelpers).hasSize(1);
Helper helper0 = Iterables.getOnlyElement(fooHelpers);

// p0 gets READY.
final Subchannel subchannel0 = mock(Subchannel.class);
helper0.updateBalancingState(
READY,
new SubchannelPicker() {
@Override
public PickResult pickSubchannel(PickSubchannelArgs args) {
return PickResult.withSubchannel(subchannel0);
}
});
assertCurrentPickerPicksSubchannel(subchannel0);

// p0 goes to CONNECTING
helper0.updateBalancingState(
IDLE,
BUFFER_PICKER);
assertCurrentPickerIsBufferPicker();

// no failover happened
assertThat(fooBalancers).hasSize(1);
assertThat(fooHelpers).hasSize(1);

// resolution update without priority change does not trigger failover
Attributes.Key<String> fooKey = Attributes.Key.create("fooKey");
priorityLb.handleResolvedAddresses(
ResolvedAddresses.newBuilder()
.setAddresses(ImmutableList.<EquivalentAddressGroup>of())
.setLoadBalancingPolicyConfig(priorityLbConfig)
.setAttributes(Attributes.newBuilder().set(fooKey, "barVal").build())
.build());

assertCurrentPickerIsBufferPicker();

// no failover happened
assertThat(fooBalancers).hasSize(1);
assertThat(fooHelpers).hasSize(1);
}

@Test
public void typicalPriorityFailOverFlowWithIdleUpdate() {
PriorityChildConfig priorityChildConfig0 =
Expand All @@ -425,16 +518,10 @@ public void typicalPriorityFailOverFlowWithIdleUpdate() {
Helper helper0 = Iterables.getOnlyElement(fooHelpers);

// p0 gets IDLE.
final Subchannel subchannel0 = mock(Subchannel.class);
helper0.updateBalancingState(
IDLE,
new SubchannelPicker() {
@Override
public PickResult pickSubchannel(PickSubchannelArgs args) {
return PickResult.withSubchannel(subchannel0);
}
});
assertCurrentPickerPicksIdleSubchannel(subchannel0);
BUFFER_PICKER);
assertCurrentPickerIsBufferPicker();

// p0 fails over to p1 immediately.
helper0.updateBalancingState(TRANSIENT_FAILURE, new ErrorPicker(Status.ABORTED));
Expand All @@ -452,32 +539,20 @@ public PickResult pickSubchannel(PickSubchannelArgs args) {
Helper helper2 = Iterables.getLast(fooHelpers);

// p2 gets IDLE
final Subchannel subchannel1 = mock(Subchannel.class);
helper2.updateBalancingState(
IDLE,
new SubchannelPicker() {
@Override
public PickResult pickSubchannel(PickSubchannelArgs args) {
return PickResult.withSubchannel(subchannel1);
}
});
assertCurrentPickerPicksIdleSubchannel(subchannel1);
BUFFER_PICKER);
assertCurrentPickerIsBufferPicker();

// p0 gets back to IDLE
final Subchannel subchannel2 = mock(Subchannel.class);
helper0.updateBalancingState(
IDLE,
new SubchannelPicker() {
@Override
public PickResult pickSubchannel(PickSubchannelArgs args) {
return PickResult.withSubchannel(subchannel2);
}
});
assertCurrentPickerPicksIdleSubchannel(subchannel2);
BUFFER_PICKER);
assertCurrentPickerIsBufferPicker();

// p2 fails but does not affect overall picker
helper2.updateBalancingState(TRANSIENT_FAILURE, new ErrorPicker(Status.UNAVAILABLE));
assertCurrentPickerPicksIdleSubchannel(subchannel2);
assertCurrentPickerIsBufferPicker();

// p0 fails over to p3 immediately since p1 already timeout and p2 already in TRANSIENT_FAILURE.
helper0.updateBalancingState(TRANSIENT_FAILURE, new ErrorPicker(Status.UNAVAILABLE));
Expand All @@ -497,32 +572,20 @@ public PickResult pickSubchannel(PickSubchannelArgs args) {
assertCurrentPickerReturnsError(Status.Code.DATA_LOSS, "foo");

// p2 gets back to IDLE
final Subchannel subchannel3 = mock(Subchannel.class);
helper2.updateBalancingState(
IDLE,
new SubchannelPicker() {
@Override
public PickResult pickSubchannel(PickSubchannelArgs args) {
return PickResult.withSubchannel(subchannel3);
}
});
assertCurrentPickerPicksIdleSubchannel(subchannel3);
BUFFER_PICKER);
assertCurrentPickerIsBufferPicker();

// p0 gets back to IDLE
final Subchannel subchannel4 = mock(Subchannel.class);
helper0.updateBalancingState(
IDLE,
new SubchannelPicker() {
@Override
public PickResult pickSubchannel(PickSubchannelArgs args) {
return PickResult.withSubchannel(subchannel4);
}
});
assertCurrentPickerPicksIdleSubchannel(subchannel4);
BUFFER_PICKER);
assertCurrentPickerIsBufferPicker();

// p0 fails over to p2 and picker is updated to p2's existing picker.
helper0.updateBalancingState(TRANSIENT_FAILURE, new ErrorPicker(Status.UNAVAILABLE));
assertCurrentPickerPicksIdleSubchannel(subchannel3);
assertCurrentPickerIsBufferPicker();

// Deactivate child balancer get deleted.
fakeClock.forwardTime(15, TimeUnit.MINUTES);
Expand Down Expand Up @@ -607,9 +670,9 @@ private void assertCurrentPickerPicksSubchannel(Subchannel expectedSubchannelToP
assertThat(pickResult.getSubchannel()).isEqualTo(expectedSubchannelToPick);
}

private void assertCurrentPickerPicksIdleSubchannel(Subchannel expectedSubchannelToPick) {
private void assertCurrentPickerIsBufferPicker() {
assertLatestConnectivityState(IDLE);
PickResult pickResult = pickerCaptor.getValue().pickSubchannel(mock(PickSubchannelArgs.class));
assertThat(pickResult.getSubchannel()).isEqualTo(expectedSubchannelToPick);
assertThat(pickResult).isEqualTo(PickResult.withNoResult());
}
}