Skip to content

Commit 1fad80a

Browse files
authored
[grid] Purge Nodes if health check fails consistently
1 parent c498dad commit 1fad80a

File tree

3 files changed

+149
-2
lines changed

3 files changed

+149
-2
lines changed

java/src/org/openqa/selenium/grid/distributor/GridModel.java

Lines changed: 35 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,9 +57,11 @@ public class GridModel {
5757
private static final Logger LOG = Logger.getLogger(GridModel.class.getName());
5858
// How many times a node's heartbeat duration needs to be exceeded before the node is considered purgeable.
5959
private static final int PURGE_TIMEOUT_MULTIPLIER = 4;
60+
private static final int UNHEALTHY_THRESHOLD = 4;
6061
private final ReadWriteLock lock = new ReentrantReadWriteLock(/* fair */ true);
6162
private final Set<NodeStatus> nodes = Collections.newSetFromMap(new ConcurrentHashMap<>());
6263
private final Map<NodeId, Instant> nodePurgeTimes = new ConcurrentHashMap<>();
64+
private final Map<NodeId, Integer> nodeHealthCount = new ConcurrentHashMap<>();
6365
private final EventBus events;
6466

6567
public GridModel(EventBus events) {
@@ -96,6 +98,7 @@ public void add(NodeStatus node) {
9698
NodeStatus refreshed = rewrite(node, next.getAvailability());
9799
nodes.add(refreshed);
98100
nodePurgeTimes.put(refreshed.getNodeId(), Instant.now());
101+
updateHealthCheckCount(refreshed.getNodeId(), refreshed.getAvailability());
99102

100103
return;
101104
}
@@ -117,6 +120,7 @@ public void add(NodeStatus node) {
117120
NodeStatus refreshed = rewrite(node, DOWN);
118121
nodes.add(refreshed);
119122
nodePurgeTimes.put(refreshed.getNodeId(), Instant.now());
123+
updateHealthCheckCount(refreshed.getNodeId(), refreshed.getAvailability());
120124
} finally {
121125
writeLock.unlock();
122126
}
@@ -177,6 +181,7 @@ public void remove(NodeId id) {
177181
try {
178182
nodes.removeIf(n -> n.getNodeId().equals(id));
179183
nodePurgeTimes.remove(id);
184+
nodeHealthCount.remove(id);
180185
} finally {
181186
writeLock.unlock();
182187
}
@@ -190,8 +195,13 @@ public void purgeDeadNodes() {
190195
Set<NodeStatus> toRemove = new HashSet<>();
191196

192197
for (NodeStatus node : nodes) {
198+
NodeId id = node.getNodeId();
199+
if (nodeHealthCount.getOrDefault(id, 0) > UNHEALTHY_THRESHOLD) {
200+
toRemove.add(node);
201+
}
202+
193203
Instant now = Instant.now();
194-
Instant lastTouched = nodePurgeTimes.getOrDefault(node.getNodeId(), Instant.now());
204+
Instant lastTouched = nodePurgeTimes.getOrDefault(id, Instant.now());
195205
Instant lostTime = lastTouched.plus(node.getHeartbeatPeriod().multipliedBy(PURGE_TIMEOUT_MULTIPLIER / 2));
196206
Instant deadTime = lastTouched.plus(node.getHeartbeatPeriod().multipliedBy(PURGE_TIMEOUT_MULTIPLIER));
197207

@@ -212,6 +222,7 @@ public void purgeDeadNodes() {
212222
toRemove.forEach(node -> {
213223
nodes.remove(node);
214224
nodePurgeTimes.remove(node.getNodeId());
225+
nodeHealthCount.remove(node.getNodeId());
215226
});
216227
} finally {
217228
writeLock.unlock();
@@ -423,6 +434,29 @@ public void setSession(SlotId slotId, Session session) {
423434
}
424435
}
425436

437+
public void updateHealthCheckCount(NodeId id, Availability availability) {
438+
Require.nonNull("Node ID", id);
439+
Require.nonNull("Availability", availability);
440+
441+
Lock writeLock = lock.writeLock();
442+
writeLock.lock();
443+
try {
444+
int unhealthyCount = nodeHealthCount.getOrDefault(id, 0);
445+
446+
// Keep track of consecutive number of times the Node health check fails
447+
if (availability.equals(DOWN)) {
448+
nodeHealthCount.put(id, unhealthyCount + 1);
449+
}
450+
451+
// If the Node is healthy again before crossing the threshold, then reset the count.
452+
if (unhealthyCount <= UNHEALTHY_THRESHOLD && availability.equals(UP)) {
453+
nodeHealthCount.put(id, 0);
454+
}
455+
} finally {
456+
writeLock.unlock();
457+
}
458+
}
459+
426460
private void amend(Availability availability, NodeStatus status, Slot slot) {
427461
Set<Slot> newSlots = new HashSet<>(status.getSlots());
428462
newSlots.removeIf(s -> s.getId().equals(slot.getId()));

java/src/org/openqa/selenium/grid/distributor/local/LocalDistributor.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -290,6 +290,7 @@ private Runnable asRunnableHealthCheck(Node node) {
290290
getDebugLogLevel(),
291291
String.format("Health check result for %s was %s", node.getId(), result.getAvailability()));
292292
model.setAvailability(id, result.getAvailability());
293+
model.updateHealthCheckCount(id, result.getAvailability());
293294
} finally {
294295
writeLock.unlock();
295296
}

java/test/org/openqa/selenium/grid/distributor/DistributorTest.java

Lines changed: 113 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,7 @@
9090
import java.util.concurrent.CountDownLatch;
9191
import java.util.concurrent.TimeUnit;
9292
import java.util.concurrent.atomic.AtomicBoolean;
93+
import java.util.concurrent.atomic.AtomicInteger;
9394
import java.util.concurrent.atomic.AtomicReference;
9495
import java.util.logging.Logger;
9596
import java.util.stream.Collectors;
@@ -941,6 +942,117 @@ public void shouldReturnNodesThatWereDownToPoolOfNodesOnceTheyMarkTheirHealthChe
941942
assertThatEither(result).isRight();
942943
}
943944

945+
@Test
946+
public void shouldRemoveNodeWhoseHealthCheckFailsConsistently() {
947+
CombinedHandler handler = new CombinedHandler();
948+
949+
AtomicReference<Availability> availability = new AtomicReference<>(UP);
950+
951+
SessionMap sessions = new LocalSessionMap(tracer, bus);
952+
handler.addHandler(sessions);
953+
NewSessionQueue queue = new LocalNewSessionQueue(
954+
tracer,
955+
bus,
956+
new DefaultSlotMatcher(),
957+
Duration.ofSeconds(2),
958+
Duration.ofSeconds(2),
959+
registrationSecret);
960+
961+
URI uri = createUri();
962+
Node node = LocalNode.builder(tracer, bus, uri, uri, registrationSecret)
963+
.add(
964+
caps,
965+
new TestSessionFactory((id, caps) -> new Session(id, uri, stereotype, caps, Instant.now())))
966+
.advanced()
967+
.healthCheck(() -> new HealthCheck.Result(availability.get(), "TL;DR"))
968+
.build();
969+
handler.addHandler(node);
970+
971+
LocalDistributor distributor = new LocalDistributor(
972+
tracer,
973+
bus,
974+
new PassthroughHttpClient.Factory(handler),
975+
sessions,
976+
queue,
977+
new DefaultSlotSelector(),
978+
registrationSecret,
979+
Duration.ofSeconds(1),
980+
false);
981+
handler.addHandler(distributor);
982+
distributor.add(node);
983+
984+
waitToHaveCapacity(distributor);
985+
986+
Either<SessionNotCreatedException, CreateSessionResponse> result =
987+
distributor.newSession(createRequest(caps));
988+
assertThatEither(result).isRight();
989+
990+
availability.set(DOWN);
991+
992+
waitTillNodesAreRemoved(distributor);
993+
994+
result =
995+
distributor.newSession(createRequest(caps));
996+
assertThatEither(result).isLeft();
997+
}
998+
999+
@Test
1000+
public void shouldNotRemoveNodeWhoseHealthCheckPassesBeforeThreshold()
1001+
throws InterruptedException {
1002+
CombinedHandler handler = new CombinedHandler();
1003+
1004+
AtomicInteger count = new AtomicInteger(0);
1005+
CountDownLatch latch = new CountDownLatch(1);
1006+
1007+
SessionMap sessions = new LocalSessionMap(tracer, bus);
1008+
handler.addHandler(sessions);
1009+
NewSessionQueue queue = new LocalNewSessionQueue(
1010+
tracer,
1011+
bus,
1012+
new DefaultSlotMatcher(),
1013+
Duration.ofSeconds(2),
1014+
Duration.ofSeconds(2),
1015+
registrationSecret);
1016+
1017+
URI uri = createUri();
1018+
Node node = LocalNode.builder(tracer, bus, uri, uri, registrationSecret)
1019+
.add(
1020+
caps,
1021+
new TestSessionFactory((id, caps) -> new Session(id, uri, stereotype, caps, Instant.now())))
1022+
.advanced()
1023+
.healthCheck(() -> {
1024+
if (count.get() <= 4) {
1025+
count.incrementAndGet();
1026+
return new HealthCheck.Result(DOWN, "Down");
1027+
}
1028+
latch.countDown();
1029+
return new HealthCheck.Result(UP, "Up");
1030+
})
1031+
.build();
1032+
handler.addHandler(node);
1033+
1034+
LocalDistributor distributor = new LocalDistributor(
1035+
tracer,
1036+
bus,
1037+
new PassthroughHttpClient.Factory(handler),
1038+
sessions,
1039+
queue,
1040+
new DefaultSlotSelector(),
1041+
registrationSecret,
1042+
Duration.ofSeconds(1),
1043+
false);
1044+
handler.addHandler(distributor);
1045+
distributor.add(node);
1046+
1047+
latch.await(60, TimeUnit.SECONDS);
1048+
1049+
waitToHaveCapacity(distributor);
1050+
1051+
Either<SessionNotCreatedException, CreateSessionResponse> result =
1052+
distributor.newSession(createRequest(caps));
1053+
assertThatEither(result).isRight();
1054+
}
1055+
9441056
private Set<Node> createNodeSet(CombinedHandler handler, Distributor distributor, int count, Capabilities...capabilities) {
9451057
Set<Node> nodeSet = new HashSet<>();
9461058
for (int i=0; i<count; i++) {
@@ -1253,7 +1365,7 @@ private void waitToHaveCapacity(Distributor distributor) {
12531365

12541366
private void waitTillNodesAreRemoved(Distributor distributor) {
12551367
new FluentWait<>(distributor)
1256-
.withTimeout(Duration.ofSeconds(5))
1368+
.withTimeout(Duration.ofSeconds(60))
12571369
.pollingEvery(Duration.ofMillis(100))
12581370
.until(d -> {
12591371
Set<NodeStatus> nodes = d.getStatus().getNodes();

0 commit comments

Comments
 (0)