69
69
import io .opencensus .trace .Status ;
70
70
import io .opencensus .trace .Tracer ;
71
71
import io .opencensus .trace .Tracing ;
72
+ import java .util .Arrays ;
72
73
import java .util .HashSet ;
73
74
import java .util .Iterator ;
74
75
import java .util .LinkedList ;
@@ -848,7 +849,7 @@ private void keepAlive() {
848
849
}
849
850
}
850
851
851
- private void markUsed () {
852
+ void markUsed () {
852
853
lastUseTime = clock .instant ();
853
854
}
854
855
@@ -929,24 +930,30 @@ private SessionOrError pollUninterruptiblyWithTimeout(long timeoutMillis) {
929
930
}
930
931
}
931
932
932
- // Background task to maintain the pool. It closes idle sessions, keeps alive sessions that have
933
- // not been used for a user configured time and creates session if needed to bring pool up to
934
- // minimum required sessions. We keep track of the number of concurrent sessions being used.
935
- // The maximum value of that over a window (10 minutes) tells us how many sessions we need in the
936
- // pool. We close the remaining sessions. To prevent bursty traffic, we smear this out over the
937
- // window length. We also smear out the keep alive traffic over the keep alive period.
933
+ /**
934
+ * Background task to maintain the pool. Tasks:
935
+ *
936
+ * <ul>
937
+ * <li>Removes idle sessions from the pool. Sessions that go above MinSessions that have not
938
+ * been used for the last 55 minutes will be removed from the pool. These will automatically
939
+ * be garbage collected by the backend.
940
+ * <li>Keeps alive sessions that have not been used for a user configured time in order to keep
941
+ * MinSessions sessions alive in the pool at any time. The keep-alive traffic is smeared out
942
+ * over a window of 10 minutes to avoid bursty traffic.
943
+ * </ul>
944
+ */
938
945
final class PoolMaintainer {
939
946
// Length of the window in millis over which we keep track of maximum number of concurrent
940
947
// sessions in use.
941
948
private final Duration windowLength = Duration .ofMillis (TimeUnit .MINUTES .toMillis (10 ));
942
949
// Frequency of the timer loop.
943
- @ VisibleForTesting static final long LOOP_FREQUENCY = 10 * 1000L ;
950
+ @ VisibleForTesting final long loopFrequency = options . getLoopFrequency () ;
944
951
// Number of loop iterations in which we need to to close all the sessions waiting for closure.
945
- @ VisibleForTesting final long numClosureCycles = windowLength .toMillis () / LOOP_FREQUENCY ;
952
+ @ VisibleForTesting final long numClosureCycles = windowLength .toMillis () / loopFrequency ;
946
953
private final Duration keepAliveMilis =
947
954
Duration .ofMillis (TimeUnit .MINUTES .toMillis (options .getKeepAliveIntervalMinutes ()));
948
955
// Number of loop iterations in which we need to keep alive all the sessions
949
- @ VisibleForTesting final long numKeepAliveCycles = keepAliveMilis .toMillis () / LOOP_FREQUENCY ;
956
+ @ VisibleForTesting final long numKeepAliveCycles = keepAliveMilis .toMillis () / loopFrequency ;
950
957
951
958
Instant lastResetTime = Instant .ofEpochMilli (0 );
952
959
int numSessionsToClose = 0 ;
@@ -969,8 +976,8 @@ public void run() {
969
976
maintainPool ();
970
977
}
971
978
},
972
- LOOP_FREQUENCY ,
973
- LOOP_FREQUENCY ,
979
+ loopFrequency ,
980
+ loopFrequency ,
974
981
TimeUnit .MILLISECONDS );
975
982
}
976
983
}
@@ -993,7 +1000,7 @@ void maintainPool() {
993
1000
running = true ;
994
1001
}
995
1002
Instant currTime = clock .instant ();
996
- closeIdleSessions (currTime );
1003
+ removeIdleSessions (currTime );
997
1004
// Now go over all the remaining sessions and see if they need to be kept alive explicitly.
998
1005
keepAliveSessions (currTime );
999
1006
replenishPool ();
@@ -1005,46 +1012,43 @@ void maintainPool() {
1005
1012
}
1006
1013
}
1007
1014
1008
- private void closeIdleSessions (Instant currTime ) {
1009
- LinkedList <PooledSession > sessionsToClose = new LinkedList <>();
1015
+ private void removeIdleSessions (Instant currTime ) {
1010
1016
synchronized (lock ) {
1011
- // Every ten minutes figure out how many sessions need to be closed then close them over
1012
- // next ten minutes.
1013
- if (currTime .isAfter (lastResetTime .plus (windowLength ))) {
1014
- int sessionsToKeep =
1015
- Math .max (options .getMinSessions (), maxSessionsInUse + options .getMaxIdleSessions ());
1016
- numSessionsToClose = totalSessions () - sessionsToKeep ;
1017
- sessionsToClosePerLoop = (int ) Math .ceil ((double ) numSessionsToClose / numClosureCycles );
1018
- maxSessionsInUse = 0 ;
1019
- lastResetTime = currTime ;
1020
- }
1021
- if (numSessionsToClose > 0 ) {
1022
- while (sessionsToClose .size () < Math .min (numSessionsToClose , sessionsToClosePerLoop )) {
1023
- PooledSession sess =
1024
- readSessions .size () > 0 ? readSessions .poll () : writePreparedSessions .poll ();
1025
- if (sess != null ) {
1026
- if (sess .state != SessionState .CLOSING ) {
1027
- sess .markClosing ();
1028
- sessionsToClose .add (sess );
1017
+ // Determine the minimum last use time for a session to be deemed to still be alive. Remove
1018
+ // all sessions that have a lastUseTime before that time, unless it would cause us to go
1019
+ // below MinSessions. Prefer to remove read sessions above write-prepared sessions.
1020
+ Instant minLastUseTime = currTime .minus (options .getRemoveInactiveSessionAfter ());
1021
+ for (Iterator <PooledSession > iterator :
1022
+ Arrays .asList (
1023
+ readSessions .descendingIterator (), writePreparedSessions .descendingIterator ())) {
1024
+ while (iterator .hasNext ()) {
1025
+ PooledSession session = iterator .next ();
1026
+ if (session .lastUseTime .isBefore (minLastUseTime )) {
1027
+ if (session .state != SessionState .CLOSING ) {
1028
+ removeFromPool (session );
1029
+ iterator .remove ();
1029
1030
}
1030
- } else {
1031
- break ;
1032
1031
}
1033
1032
}
1034
- numSessionsToClose -= sessionsToClose .size ();
1035
1033
}
1036
1034
}
1037
- for (PooledSession sess : sessionsToClose ) {
1038
- logger .log (Level .FINE , "Closing session {0}" , sess .getName ());
1039
- closeSessionAsync (sess );
1040
- }
1041
1035
}
1042
1036
1043
1037
private void keepAliveSessions (Instant currTime ) {
1044
1038
long numSessionsToKeepAlive = 0 ;
1045
1039
synchronized (lock ) {
1040
+ if (numSessionsInUse >= (options .getMinSessions () + options .getMaxIdleSessions ())) {
1041
+ // At least MinSessions are in use, so we don't have to ping any sessions.
1042
+ return ;
1043
+ }
1046
1044
// In each cycle only keep alive a subset of sessions to prevent burst of traffic.
1047
- numSessionsToKeepAlive = (long ) Math .ceil ((double ) totalSessions () / numKeepAliveCycles );
1045
+ numSessionsToKeepAlive =
1046
+ (long )
1047
+ Math .ceil (
1048
+ (double )
1049
+ ((options .getMinSessions () + options .getMaxIdleSessions ())
1050
+ - numSessionsInUse )
1051
+ / numKeepAliveCycles );
1048
1052
}
1049
1053
// Now go over all the remaining sessions and see if they need to be kept alive explicitly.
1050
1054
Instant keepAliveThreshold = currTime .minus (keepAliveMilis );
@@ -1053,9 +1057,11 @@ private void keepAliveSessions(Instant currTime) {
1053
1057
while (numSessionsToKeepAlive > 0 ) {
1054
1058
PooledSession sessionToKeepAlive = null ;
1055
1059
synchronized (lock ) {
1056
- sessionToKeepAlive = findSessionToKeepAlive (readSessions , keepAliveThreshold );
1060
+ sessionToKeepAlive = findSessionToKeepAlive (readSessions , keepAliveThreshold , 0 );
1057
1061
if (sessionToKeepAlive == null ) {
1058
- sessionToKeepAlive = findSessionToKeepAlive (writePreparedSessions , keepAliveThreshold );
1062
+ sessionToKeepAlive =
1063
+ findSessionToKeepAlive (
1064
+ writePreparedSessions , keepAliveThreshold , readSessions .size ());
1059
1065
}
1060
1066
}
1061
1067
if (sessionToKeepAlive == null ) {
@@ -1137,13 +1143,18 @@ private static enum Position {
1137
1143
@ GuardedBy ("lock" )
1138
1144
private long numSessionsReleased = 0 ;
1139
1145
1146
+ @ GuardedBy ("lock" )
1147
+ private long numIdleSessionsRemoved = 0 ;
1148
+
1140
1149
private AtomicLong numWaiterTimeouts = new AtomicLong ();
1141
1150
1142
1151
@ GuardedBy ("lock" )
1143
1152
private final Set <PooledSession > allSessions = new HashSet <>();
1144
1153
1145
1154
private final SessionConsumer sessionConsumer = new SessionConsumerImpl ();
1146
1155
1156
+ @ VisibleForTesting Function <PooledSession , Void > idleSessionRemovedListener ;
1157
+
1147
1158
/**
1148
1159
* Create a session pool with the given options and for the given database. It will also start
1149
1160
* eagerly creating sessions if {@link SessionPoolOptions#getMinSessions()} is greater than 0.
@@ -1232,6 +1243,28 @@ private SessionPool(
1232
1243
this .initMetricsCollection (metricRegistry , labelValues );
1233
1244
}
1234
1245
1246
+ @ VisibleForTesting
1247
+ void removeFromPool (PooledSession session ) {
1248
+ synchronized (lock ) {
1249
+ if (isClosed ()) {
1250
+ decrementPendingClosures (1 );
1251
+ return ;
1252
+ }
1253
+ session .markClosing ();
1254
+ allSessions .remove (session );
1255
+ numIdleSessionsRemoved ++;
1256
+ if (idleSessionRemovedListener != null ) {
1257
+ idleSessionRemovedListener .apply (session );
1258
+ }
1259
+ }
1260
+ }
1261
+
1262
+ long numIdleSessionsRemoved () {
1263
+ synchronized (lock ) {
1264
+ return numIdleSessionsRemoved ;
1265
+ }
1266
+ }
1267
+
1235
1268
@ VisibleForTesting
1236
1269
int getNumberOfAvailableWritePreparedSessions () {
1237
1270
synchronized (lock ) {
@@ -1313,14 +1346,18 @@ private void invalidateSession(PooledSession session) {
1313
1346
}
1314
1347
1315
1348
private PooledSession findSessionToKeepAlive (
1316
- Queue <PooledSession > queue , Instant keepAliveThreshold ) {
1349
+ Queue <PooledSession > queue , Instant keepAliveThreshold , int numAlreadyChecked ) {
1350
+ int numChecked = 0 ;
1317
1351
Iterator <PooledSession > iterator = queue .iterator ();
1318
- while (iterator .hasNext ()) {
1352
+ while (iterator .hasNext ()
1353
+ && (numChecked + numAlreadyChecked )
1354
+ < (options .getMinSessions () + options .getMaxIdleSessions () - numSessionsInUse )) {
1319
1355
PooledSession session = iterator .next ();
1320
1356
if (session .lastUseTime .isBefore (keepAliveThreshold )) {
1321
1357
iterator .remove ();
1322
1358
return session ;
1323
1359
}
1360
+ numChecked ++;
1324
1361
}
1325
1362
return null ;
1326
1363
}
0 commit comments