Skip to content

feat: add option to wait on session pool creation #2329

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Mar 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,30 @@ class SessionPool {
ErrorCode.UNIMPLEMENTED,
ErrorCode.INTERNAL);

/**
* If the {@link SessionPoolOptions#getWaitForMinSessions()} duration is greater than zero, waits
* for the creation of at least {@link SessionPoolOptions#getMinSessions()} in the pool using the
* given duration. If the waiting times out, a {@link SpannerException} with the {@link
* ErrorCode#DEADLINE_EXCEEDED} is thrown.
*/
void maybeWaitOnMinSessions() {
final long timeoutNanos = options.getWaitForMinSessions().toNanos();
if (timeoutNanos <= 0) {
return;
}

try {
if (!waitOnMinSessionsLatch.await(timeoutNanos, TimeUnit.NANOSECONDS)) {
final long timeoutMillis = options.getWaitForMinSessions().toMillis();
throw SpannerExceptionFactory.newSpannerException(
ErrorCode.DEADLINE_EXCEEDED,
"Timed out after waiting " + timeoutMillis + "ms for session pool creation");
}
} catch (InterruptedException e) {
throw SpannerExceptionFactory.propagateInterrupt(e);
}
}

/**
* Wrapper around current time so that we can fake it in tests. TODO(user): Replace with Java 8
* Clock.
Expand Down Expand Up @@ -1855,6 +1879,8 @@ private enum Position {

@VisibleForTesting Function<PooledSession, Void> idleSessionRemovedListener;

private final CountDownLatch waitOnMinSessionsLatch;

/**
* Create a session pool with the given options and for the given database. It will also start
* eagerly creating sessions if {@link SessionPoolOptions#getMinSessions()} is greater than 0.
Expand Down Expand Up @@ -1934,6 +1960,8 @@ private SessionPool(
this.clock = clock;
this.poolMaintainer = new PoolMaintainer();
this.initMetricsCollection(metricRegistry, labelValues);
this.waitOnMinSessionsLatch =
options.getMinSessions() > 0 ? new CountDownLatch(1) : new CountDownLatch(0);
}

/**
Expand Down Expand Up @@ -2399,13 +2427,17 @@ public void onSessionReady(SessionImpl session) {
PooledSession pooledSession = null;
boolean closeSession = false;
synchronized (lock) {
int minSessions = options.getMinSessions();
pooledSession = new PooledSession(session);
numSessionsBeingCreated--;
if (closureFuture != null) {
closeSession = true;
} else {
Preconditions.checkState(totalSessions() <= options.getMaxSessions() - 1);
allSessions.add(pooledSession);
if (allSessions.size() >= minSessions) {
waitOnMinSessionsLatch.countDown();
}
if (options.isAutoDetectDialect() && !detectDialectStarted) {
// Get the dialect of the underlying database if that has not yet been done. Note that
// this method will release the session into the pool once it is done.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ public class SessionPoolOptions {
private final ActionOnSessionLeak actionOnSessionLeak;
private final long initialWaitForSessionTimeoutMillis;
private final boolean autoDetectDialect;
private final Duration waitForMinSessions;

private SessionPoolOptions(Builder builder) {
// minSessions > maxSessions is only possible if the user has only set a value for maxSessions.
Expand All @@ -69,6 +70,7 @@ private SessionPoolOptions(Builder builder) {
this.keepAliveIntervalMinutes = builder.keepAliveIntervalMinutes;
this.removeInactiveSessionAfter = builder.removeInactiveSessionAfter;
this.autoDetectDialect = builder.autoDetectDialect;
this.waitForMinSessions = builder.waitForMinSessions;
}

@Override
Expand All @@ -90,7 +92,8 @@ public boolean equals(Object o) {
&& Objects.equals(this.loopFrequency, other.loopFrequency)
&& Objects.equals(this.keepAliveIntervalMinutes, other.keepAliveIntervalMinutes)
&& Objects.equals(this.removeInactiveSessionAfter, other.removeInactiveSessionAfter)
&& Objects.equals(this.autoDetectDialect, other.autoDetectDialect);
&& Objects.equals(this.autoDetectDialect, other.autoDetectDialect)
&& Objects.equals(this.waitForMinSessions, other.waitForMinSessions);
}

@Override
Expand All @@ -108,7 +111,8 @@ public int hashCode() {
this.loopFrequency,
this.keepAliveIntervalMinutes,
this.removeInactiveSessionAfter,
this.autoDetectDialect);
this.autoDetectDialect,
this.waitForMinSessions);
}

public Builder toBuilder() {
Expand Down Expand Up @@ -186,6 +190,11 @@ boolean isFailOnSessionLeak() {
return actionOnSessionLeak == ActionOnSessionLeak.FAIL;
}

@VisibleForTesting
Duration getWaitForMinSessions() {
return waitForMinSessions;
}

public static Builder newBuilder() {
return new Builder();
}
Expand Down Expand Up @@ -229,6 +238,7 @@ public static class Builder {
private int keepAliveIntervalMinutes = 30;
private Duration removeInactiveSessionAfter = Duration.ofMinutes(55L);
private boolean autoDetectDialect = false;
private Duration waitForMinSessions = Duration.ZERO;

public Builder() {}

Expand All @@ -247,6 +257,7 @@ private Builder(SessionPoolOptions options) {
this.keepAliveIntervalMinutes = options.keepAliveIntervalMinutes;
this.removeInactiveSessionAfter = options.removeInactiveSessionAfter;
this.autoDetectDialect = options.autoDetectDialect;
this.waitForMinSessions = options.waitForMinSessions;
}

/**
Expand Down Expand Up @@ -394,6 +405,21 @@ public Builder setWriteSessionsFraction(float writeSessionsFraction) {
return this;
}

/**
* If greater than zero, waits for the session pool to have at least {@link
* SessionPoolOptions#minSessions} before returning the database client to the caller. Note that
* this check is only done during the session pool creation. This is usually done asynchronously
* in order to provide the client back to the caller as soon as possible. We don't recommend
* using this option unless you are executing benchmarks and want to guarantee the session pool
* has min sessions in the pool before continuing.
*
* <p>Defaults to zero (initialization is done asynchronously).
*/
public Builder setWaitForMinSessions(Duration waitForMinSessions) {
this.waitForMinSessions = waitForMinSessions;
return this;
}

/** Build a SessionPoolOption object */
public SessionPoolOptions build() {
validate();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,7 @@ public DatabaseClient getDatabaseClient(DatabaseId db) {
SessionPool pool =
SessionPool.createPool(
getOptions(), SpannerImpl.this.getSessionClient(db), labelValues);
pool.maybeWaitOnMinSessions();
DatabaseClientImpl dbClient = createDatabaseClient(clientId, pool);
dbClients.put(db, dbClient);
return dbClient;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@
import org.junit.runners.Parameterized.Parameters;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.threeten.bp.Duration;

/** Tests for SessionPool that mock out the underlying stub. */
@RunWith(Parameterized.class)
Expand Down Expand Up @@ -1188,6 +1189,47 @@ public void testGetDatabaseRole() throws Exception {
assertEquals(TEST_DATABASE_ROLE, pool.getDatabaseRole());
}

@Test
public void testWaitOnMinSessionsWhenSessionsAreCreatedBeforeTimeout() {
doAnswer(
invocation ->
executor.submit(
() -> {
SessionConsumerImpl consumer =
invocation.getArgument(2, SessionConsumerImpl.class);
consumer.onSessionReady(mockSession());
}))
.when(sessionClient)
.asyncBatchCreateSessions(Mockito.eq(1), Mockito.anyBoolean(), any(SessionConsumer.class));

options =
SessionPoolOptions.newBuilder()
.setMinSessions(minSessions)
.setMaxSessions(minSessions + 1)
.setWaitForMinSessions(Duration.ofSeconds(5))
.build();
pool = createPool(new FakeClock(), new FakeMetricRegistry(), SPANNER_DEFAULT_LABEL_VALUES);
pool.maybeWaitOnMinSessions();
assertTrue(pool.getNumberOfSessionsInPool() >= minSessions);
}

@Test(expected = SpannerException.class)
public void testWaitOnMinSessionsThrowsExceptionWhenTimeoutIsReached() {
// Does not call onSessionReady, so session pool is never populated
doAnswer(invocation -> null)
.when(sessionClient)
.asyncBatchCreateSessions(Mockito.eq(1), Mockito.anyBoolean(), any(SessionConsumer.class));

options =
SessionPoolOptions.newBuilder()
.setMinSessions(minSessions + 1)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

0 minSessions always succeed, so we increment it by one

.setMaxSessions(minSessions + 1)
.setWaitForMinSessions(Duration.ofMillis(100))
.build();
pool = createPool(new FakeClock(), new FakeMetricRegistry(), SPANNER_DEFAULT_LABEL_VALUES);
pool.maybeWaitOnMinSessions();
}

private void mockKeepAlive(Session session) {
ReadContext context = mock(ReadContext.class);
ResultSet resultSet = mock(ResultSet.class);
Expand Down