@@ -127,6 +127,11 @@ typedef struct
127
127
128
128
int num_requests ; /* current # of requests */
129
129
int max_requests ; /* allocated array size */
130
+
131
+ int head ; /* Index of the first request in the ring
132
+ * buffer */
133
+ int tail ; /* Index of the last request in the ring
134
+ * buffer */
130
135
CheckpointerRequest requests [FLEXIBLE_ARRAY_MEMBER ];
131
136
} CheckpointerShmemStruct ;
132
137
@@ -135,6 +140,12 @@ static CheckpointerShmemStruct *CheckpointerShmem;
135
140
/* interval for calling AbsorbSyncRequests in CheckpointWriteDelay */
136
141
#define WRITES_PER_ABSORB 1000
137
142
143
+ /* Maximum number of checkpointer requests to process in one batch */
144
+ #define CKPT_REQ_BATCH_SIZE 10000
145
+
146
+ /* Max number of requests the checkpointer request queue can hold */
147
+ #define MAX_CHECKPOINT_REQUESTS 10000000
148
+
138
149
/*
139
150
* GUC parameters
140
151
*/
@@ -974,7 +985,8 @@ CheckpointerShmemInit(void)
974
985
*/
975
986
MemSet (CheckpointerShmem , 0 , size );
976
987
SpinLockInit (& CheckpointerShmem -> ckpt_lck );
977
- CheckpointerShmem -> max_requests = NBuffers ;
988
+ CheckpointerShmem -> max_requests = Min (NBuffers , MAX_CHECKPOINT_REQUESTS );
989
+ CheckpointerShmem -> head = CheckpointerShmem -> tail = 0 ;
978
990
ConditionVariableInit (& CheckpointerShmem -> start_cv );
979
991
ConditionVariableInit (& CheckpointerShmem -> done_cv );
980
992
}
@@ -1152,6 +1164,7 @@ ForwardSyncRequest(const FileTag *ftag, SyncRequestType type)
1152
1164
{
1153
1165
CheckpointerRequest * request ;
1154
1166
bool too_full ;
1167
+ int insert_pos ;
1155
1168
1156
1169
if (!IsUnderPostmaster )
1157
1170
return false; /* probably shouldn't even get here */
@@ -1175,10 +1188,14 @@ ForwardSyncRequest(const FileTag *ftag, SyncRequestType type)
1175
1188
}
1176
1189
1177
1190
/* OK, insert request */
1178
- request = & CheckpointerShmem -> requests [CheckpointerShmem -> num_requests ++ ];
1191
+ insert_pos = CheckpointerShmem -> tail ;
1192
+ request = & CheckpointerShmem -> requests [insert_pos ];
1179
1193
request -> ftag = * ftag ;
1180
1194
request -> type = type ;
1181
1195
1196
+ CheckpointerShmem -> tail = (CheckpointerShmem -> tail + 1 ) % CheckpointerShmem -> max_requests ;
1197
+ CheckpointerShmem -> num_requests ++ ;
1198
+
1182
1199
/* If queue is more than half full, nudge the checkpointer to empty it */
1183
1200
too_full = (CheckpointerShmem -> num_requests >=
1184
1201
CheckpointerShmem -> max_requests / 2 );
@@ -1220,12 +1237,16 @@ CompactCheckpointerRequestQueue(void)
1220
1237
struct CheckpointerSlotMapping
1221
1238
{
1222
1239
CheckpointerRequest request ;
1223
- int slot ;
1240
+ int ring_idx ;
1224
1241
};
1225
1242
1226
- int n ,
1227
- preserve_count ;
1243
+ int n ;
1228
1244
int num_skipped = 0 ;
1245
+ int head ;
1246
+ int max_requests ;
1247
+ int num_requests ;
1248
+ int read_idx ,
1249
+ write_idx ;
1229
1250
HASHCTL ctl ;
1230
1251
HTAB * htab ;
1231
1252
bool * skip_slot ;
@@ -1237,8 +1258,13 @@ CompactCheckpointerRequestQueue(void)
1237
1258
if (CritSectionCount > 0 )
1238
1259
return false;
1239
1260
1261
+ max_requests = CheckpointerShmem -> max_requests ;
1262
+ num_requests = CheckpointerShmem -> num_requests ;
1263
+
1240
1264
/* Initialize skip_slot array */
1241
- skip_slot = palloc0 (sizeof (bool ) * CheckpointerShmem -> num_requests );
1265
+ skip_slot = palloc0 (sizeof (bool ) * max_requests );
1266
+
1267
+ head = CheckpointerShmem -> head ;
1242
1268
1243
1269
/* Initialize temporary hash table */
1244
1270
ctl .keysize = sizeof (CheckpointerRequest );
@@ -1262,7 +1288,8 @@ CompactCheckpointerRequestQueue(void)
1262
1288
* away preceding entries that would end up being canceled anyhow), but
1263
1289
* it's not clear that the extra complexity would buy us anything.
1264
1290
*/
1265
- for (n = 0 ; n < CheckpointerShmem -> num_requests ; n ++ )
1291
+ read_idx = head ;
1292
+ for (n = 0 ; n < num_requests ; n ++ )
1266
1293
{
1267
1294
CheckpointerRequest * request ;
1268
1295
struct CheckpointerSlotMapping * slotmap ;
@@ -1275,16 +1302,19 @@ CompactCheckpointerRequestQueue(void)
1275
1302
* CheckpointerShmemInit. Note also that RelFileLocator had better
1276
1303
* contain no pad bytes.
1277
1304
*/
1278
- request = & CheckpointerShmem -> requests [n ];
1305
+ request = & CheckpointerShmem -> requests [read_idx ];
1279
1306
slotmap = hash_search (htab , request , HASH_ENTER , & found );
1280
1307
if (found )
1281
1308
{
1282
1309
/* Duplicate, so mark the previous occurrence as skippable */
1283
- skip_slot [slotmap -> slot ] = true;
1310
+ skip_slot [slotmap -> ring_idx ] = true;
1284
1311
num_skipped ++ ;
1285
1312
}
1286
1313
/* Remember slot containing latest occurrence of this request value */
1287
- slotmap -> slot = n ;
1314
+ slotmap -> ring_idx = read_idx ;
1315
+
1316
+ /* Move to the next request in the ring buffer */
1317
+ read_idx = (read_idx + 1 ) % max_requests ;
1288
1318
}
1289
1319
1290
1320
/* Done with the hash table. */
@@ -1298,17 +1328,34 @@ CompactCheckpointerRequestQueue(void)
1298
1328
}
1299
1329
1300
1330
/* We found some duplicates; remove them. */
1301
- preserve_count = 0 ;
1302
- for (n = 0 ; n < CheckpointerShmem -> num_requests ; n ++ )
1331
+ read_idx = write_idx = head ;
1332
+ for (n = 0 ; n < num_requests ; n ++ )
1303
1333
{
1304
- if (skip_slot [n ])
1305
- continue ;
1306
- CheckpointerShmem -> requests [preserve_count ++ ] = CheckpointerShmem -> requests [n ];
1334
+ /* If this slot is NOT skipped, keep it */
1335
+ if (!skip_slot [read_idx ])
1336
+ {
1337
+ /* If the read and write positions are different, copy the request */
1338
+ if (write_idx != read_idx )
1339
+ CheckpointerShmem -> requests [write_idx ] =
1340
+ CheckpointerShmem -> requests [read_idx ];
1341
+
1342
+ /* Advance the write position */
1343
+ write_idx = (write_idx + 1 ) % max_requests ;
1344
+ }
1345
+
1346
+ read_idx = (read_idx + 1 ) % max_requests ;
1307
1347
}
1348
+
1349
+ /*
1350
+ * Update ring buffer state: head remains the same, tail moves, count
1351
+ * decreases
1352
+ */
1353
+ CheckpointerShmem -> tail = write_idx ;
1354
+ CheckpointerShmem -> num_requests -= num_skipped ;
1355
+
1308
1356
ereport (DEBUG1 ,
1309
1357
(errmsg_internal ("compacted fsync request queue from %d entries to %d entries" ,
1310
- CheckpointerShmem -> num_requests , preserve_count )));
1311
- CheckpointerShmem -> num_requests = preserve_count ;
1358
+ num_requests , CheckpointerShmem -> num_requests )));
1312
1359
1313
1360
/* Cleanup. */
1314
1361
pfree (skip_slot );
@@ -1329,40 +1376,61 @@ AbsorbSyncRequests(void)
1329
1376
{
1330
1377
CheckpointerRequest * requests = NULL ;
1331
1378
CheckpointerRequest * request ;
1332
- int n ;
1379
+ int n ,
1380
+ i ;
1381
+ bool loop ;
1333
1382
1334
1383
if (!AmCheckpointerProcess ())
1335
1384
return ;
1336
1385
1337
- LWLockAcquire (CheckpointerCommLock , LW_EXCLUSIVE );
1338
-
1339
- /*
1340
- * We try to avoid holding the lock for a long time by copying the request
1341
- * array, and processing the requests after releasing the lock.
1342
- *
1343
- * Once we have cleared the requests from shared memory, we have to PANIC
1344
- * if we then fail to absorb them (eg, because our hashtable runs out of
1345
- * memory). This is because the system cannot run safely if we are unable
1346
- * to fsync what we have been told to fsync. Fortunately, the hashtable
1347
- * is so small that the problem is quite unlikely to arise in practice.
1348
- */
1349
- n = CheckpointerShmem -> num_requests ;
1350
- if (n > 0 )
1386
+ do
1351
1387
{
1352
- requests = (CheckpointerRequest * ) palloc (n * sizeof (CheckpointerRequest ));
1353
- memcpy (requests , CheckpointerShmem -> requests , n * sizeof (CheckpointerRequest ));
1354
- }
1388
+ LWLockAcquire (CheckpointerCommLock , LW_EXCLUSIVE );
1389
+
1390
+ /*
1391
+ * We try to avoid holding the lock for a long time by copying the
1392
+ * request array, and processing the requests after releasing the
1393
+ * lock.
1394
+ *
1395
+ * Once we have cleared the requests from shared memory, we have to
1396
+ * PANIC if we then fail to absorb them (eg, because our hashtable
1397
+ * runs out of memory). This is because the system cannot run safely
1398
+ * if we are unable to fsync what we have been told to fsync.
1399
+ * Fortunately, the hashtable is so small that the problem is quite
1400
+ * unlikely to arise in practice.
1401
+ *
1402
+ * Note: we could not palloc more than 1Gb of memory, thus make sure
1403
+ * that the maximum number of elements will fit in the requests
1404
+ * buffer.
1405
+ */
1406
+ n = Min (CheckpointerShmem -> num_requests , CKPT_REQ_BATCH_SIZE );
1407
+ if (n > 0 )
1408
+ {
1409
+ if (!requests )
1410
+ requests = (CheckpointerRequest * ) palloc (n * sizeof (CheckpointerRequest ));
1355
1411
1356
- START_CRIT_SECTION ();
1412
+ for (i = 0 ; i < n ; i ++ )
1413
+ {
1414
+ requests [i ] = CheckpointerShmem -> requests [CheckpointerShmem -> head ];
1415
+ CheckpointerShmem -> head = (CheckpointerShmem -> head + 1 ) % CheckpointerShmem -> max_requests ;
1416
+ }
1357
1417
1358
- CheckpointerShmem -> num_requests = 0 ;
1418
+ CheckpointerShmem -> num_requests -= n ;
1359
1419
1360
- LWLockRelease (CheckpointerCommLock );
1420
+ }
1421
+
1422
+ START_CRIT_SECTION ();
1423
+
1424
+ /* Are there any requests in the queue? If so, keep going. */
1425
+ loop = CheckpointerShmem -> num_requests != 0 ;
1426
+
1427
+ LWLockRelease (CheckpointerCommLock );
1361
1428
1362
- for (request = requests ; n > 0 ; request ++ , n -- )
1363
- RememberSyncRequest (& request -> ftag , request -> type );
1429
+ for (request = requests ; n > 0 ; request ++ , n -- )
1430
+ RememberSyncRequest (& request -> ftag , request -> type );
1364
1431
1365
- END_CRIT_SECTION ();
1432
+ END_CRIT_SECTION ();
1433
+ } while (loop );
1366
1434
1367
1435
if (requests )
1368
1436
pfree (requests );
0 commit comments