@@ -159,6 +159,8 @@ bthandler(PG_FUNCTION_ARGS)
159
159
amroutine -> ambeginscan = btbeginscan ;
160
160
amroutine -> amrescan = btrescan ;
161
161
amroutine -> amgettuple = btgettuple ;
162
+ amroutine -> amgetbatch = btgetbatch ;
163
+ amroutine -> amfreebatch = btfreebatch ;
162
164
amroutine -> amgetbitmap = btgetbitmap ;
163
165
amroutine -> amendscan = btendscan ;
164
166
amroutine -> ammarkpos = btmarkpos ;
@@ -279,6 +281,158 @@ btgettuple(IndexScanDesc scan, ScanDirection dir)
279
281
return res ;
280
282
}
281
283
284
+ /* FIXME duplicate from indexam.c */
285
+ #define INDEX_SCAN_BATCH (scan , idx ) \
286
+ ((scan)->xs_batches->batches[(idx) % (scan)->xs_batches->maxBatches])
287
+
288
+ /*
289
+ * btgetbatch() -- Get the next batch of tuples in the scan.
290
+ *
291
+ * XXX Simplified version of btgettuple(), but for batches of tuples.
292
+ */
293
+ IndexScanBatch
294
+ btgetbatch (IndexScanDesc scan , ScanDirection dir )
295
+ {
296
+ BTScanOpaque so = (BTScanOpaque ) scan -> opaque ;
297
+ IndexScanBatch res ;
298
+ BTBatchScanPos pos = NULL ;
299
+
300
+ /* batching does not work with regular scan-level positions */
301
+ Assert (!BTScanPosIsValid (so -> currPos ));
302
+ Assert (!BTScanPosIsValid (so -> markPos ));
303
+
304
+ /* btree indexes are never lossy */
305
+ scan -> xs_recheck = false;
306
+
307
+ if (scan -> xs_batches -> firstBatch < scan -> xs_batches -> nextBatch )
308
+ {
309
+ IndexScanBatch batch = INDEX_SCAN_BATCH (scan , scan -> xs_batches -> nextBatch - 1 );
310
+ pos = (BTBatchScanPos ) batch -> opaque ;
311
+ }
312
+
313
+ /* Each loop iteration performs another primitive index scan */
314
+ do
315
+ {
316
+ /*
317
+ * If we've already initialized this scan, we can just advance it in
318
+ * the appropriate direction. If we haven't done so yet, we call
319
+ * _bt_first() to get the first item in the scan.
320
+ */
321
+ if (pos == NULL )
322
+ res = _bt_first_batch (scan , dir );
323
+ else
324
+ {
325
+ /*
326
+ * Now continue the scan.
327
+ */
328
+ res = _bt_next_batch (scan , pos , dir );
329
+ }
330
+
331
+ /* If we have a batch, return it ... */
332
+ if (res )
333
+ break ;
334
+
335
+ /*
336
+ * XXX we need to invoke _bt_first_batch on the next iteration, to
337
+ * advance SAOP keys etc. But indexam.c already does this, but that's
338
+ * only after this returns, so maybe this should do this in some other
339
+ * way, not sure who should be responsible for setting currentBatch.
340
+ *
341
+ * XXX Maybe we don't even need that field? What is a current batch
342
+ * anyway? There seem to be at least multiple concepts of "current"
343
+ * batch, one for the read stream, another for executor ...
344
+ */
345
+ // scan->xs_batches->currentBatch = res;
346
+
347
+ /*
348
+ * We may do a new scan, depending on what _bt_start_prim_scan says.
349
+ * In that case we need to start from scratch, not from the position
350
+ * of the last batch. In regular non-batched scans we have currPos,
351
+ * because we have just one leaf page for the whole scan, and we
352
+ * invalidate it before loading the next one. But with batching that
353
+ * doesn't work - we have many leafs, it's not clear which one is
354
+ * 'current' (well, it's the last), and we can't invalidate it,
355
+ * that's up to amfreebatch(). For now we deduce the position and
356
+ * reset it to NULL, to indicate the same thing.
357
+ *
358
+ * XXX Maybe we should have something like 'currentBatch'? But then
359
+ * that probably should be in BTScanOpaque, not in the generic
360
+ * indexam.c part? Or it it a sufficiently generic thing? How would
361
+ * we keep it in sync with the batch queue? If freeing batches is
362
+ * up to indexam, how do we ensure the currentBatch does not point
363
+ * to already removed batch?
364
+ */
365
+ pos = NULL ;
366
+
367
+ /* ... otherwise see if we need another primitive index scan */
368
+ } while (so -> numArrayKeys && _bt_start_prim_scan (scan , dir ));
369
+
370
+ return res ;
371
+ }
372
+
373
+ /*
374
+ * btgetbatch() -- Get the next batch of tuples in the scan.
375
+ *
376
+ * XXX Pretty much like btgettuple(), but for batches of tuples.
377
+ */
378
+ void
379
+ btfreebatch (IndexScanDesc scan , IndexScanBatch batch )
380
+ {
381
+ BTScanOpaque so PG_USED_FOR_ASSERTS_ONLY = (BTScanOpaque ) scan -> opaque ;
382
+
383
+ /* batching does not work with regular scan-level positions */
384
+ Assert (!BTScanPosIsValid (so -> currPos ));
385
+ Assert (!BTScanPosIsValid (so -> markPos ));
386
+
387
+ /*
388
+ * Check to see if we should kill tuples from the previous batch.
389
+ */
390
+ _bt_kill_batch (scan , batch );
391
+
392
+ /* free all the stuff that might be allocated */
393
+
394
+ if (batch -> items )
395
+ pfree (batch -> items );
396
+
397
+ if (batch -> itups )
398
+ pfree (batch -> itups );
399
+
400
+ if (batch -> htups )
401
+ pfree (batch -> htups );
402
+
403
+ if (batch -> recheck )
404
+ pfree (batch -> recheck );
405
+
406
+ if (batch -> privateData )
407
+ pfree (batch -> privateData );
408
+
409
+ if (batch -> orderbyvals )
410
+ pfree (batch -> orderbyvals );
411
+
412
+ if (batch -> orderbynulls )
413
+ pfree (batch -> orderbynulls );
414
+
415
+ if (batch -> currTuples )
416
+ pfree (batch -> currTuples );
417
+
418
+ if (batch -> opaque )
419
+ {
420
+ BTBatchScanPos pos = (BTBatchScanPos ) batch -> opaque ;
421
+
422
+ BTBatchScanPosIsValid (* pos );
423
+ BTBatchScanPosIsPinned (* pos );
424
+
425
+ BTBatchScanPosUnpinIfPinned (* pos );
426
+
427
+ pfree (batch -> opaque );
428
+ }
429
+
430
+ /* and finally free the batch itself */
431
+ pfree (batch );
432
+
433
+ return ;
434
+ }
435
+
282
436
/*
283
437
* btgetbitmap() -- gets all matching tuples, and adds them to a bitmap
284
438
*/
@@ -376,6 +530,10 @@ btbeginscan(Relation rel, int nkeys, int norderbys)
376
530
377
531
/*
378
532
* btrescan() -- rescan an index relation
533
+ *
534
+ * Batches should have been freed from indexam using btfreebatch() before we
535
+ * get here, but then some of the generic scan stuff needs to be reset here.
536
+ * But we shouldn't need to do anything particular here, I think.
379
537
*/
380
538
void
381
539
btrescan (IndexScanDesc scan , ScanKey scankey , int nscankeys ,
@@ -400,6 +558,10 @@ btrescan(IndexScanDesc scan, ScanKey scankey, int nscankeys,
400
558
BTScanPosUnpinIfPinned (so -> markPos );
401
559
BTScanPosInvalidate (so -> markPos );
402
560
561
+ /* FIXME should be in indexam.c I think */
562
+ // if (scan->xs_batches)
563
+ // scan->xs_batches->currentBatch = NULL;
564
+
403
565
/*
404
566
* Allocate tuple workspace arrays, if needed for an index-only scan and
405
567
* not already done in a previous rescan call. To save on palloc
@@ -433,6 +595,10 @@ btrescan(IndexScanDesc scan, ScanKey scankey, int nscankeys,
433
595
434
596
/*
435
597
* btendscan() -- close down a scan
598
+ *
599
+ * Batches should have been freed from indexam using btfreebatch() before we
600
+ * get here, but then some of the generic scan stuff needs to be reset here.
601
+ * But we shouldn't need to do anything particular here, I think.
436
602
*/
437
603
void
438
604
btendscan (IndexScanDesc scan )
@@ -469,12 +635,18 @@ btendscan(IndexScanDesc scan)
469
635
470
636
/*
471
637
* btmarkpos() -- save current scan position
638
+ *
639
+ * With batching, all the interesting markpos() stuff happens in indexam.c. We
640
+ * should not even get here.
472
641
*/
473
642
void
474
643
btmarkpos (IndexScanDesc scan )
475
644
{
476
645
BTScanOpaque so = (BTScanOpaque ) scan -> opaque ;
477
646
647
+ /* with batching, mark/restore is handled in indexam */
648
+ Assert (scan -> xs_batches == NULL );
649
+
478
650
/* There may be an old mark with a pin (but no lock). */
479
651
BTScanPosUnpinIfPinned (so -> markPos );
480
652
@@ -495,12 +667,18 @@ btmarkpos(IndexScanDesc scan)
495
667
496
668
/*
497
669
* btrestrpos() -- restore scan to last saved position
670
+ *
671
+ * With batching, all the interesting restrpos() stuff happens in indexam.c. We
672
+ * should not even get here.
498
673
*/
499
674
void
500
675
btrestrpos (IndexScanDesc scan )
501
676
{
502
677
BTScanOpaque so = (BTScanOpaque ) scan -> opaque ;
503
678
679
+ /* with batching, mark/restore is handled in indexam */
680
+ Assert (scan -> xs_batches == NULL );
681
+
504
682
if (so -> markItemIndex >= 0 )
505
683
{
506
684
/*
@@ -900,6 +1078,147 @@ _bt_parallel_seize(IndexScanDesc scan, BlockNumber *next_scan_page,
900
1078
return status ;
901
1079
}
902
1080
1081
+ /*
1082
+ * _bt_parallel_seize() -- Begin the process of advancing the scan to a new
1083
+ * page. Other scans must wait until we call _bt_parallel_release()
1084
+ * or _bt_parallel_done().
1085
+ *
1086
+ * The return value is true if we successfully seized the scan and false
1087
+ * if we did not. The latter case occurs when no pages remain, or when
1088
+ * another primitive index scan is scheduled that caller's backend cannot
1089
+ * start just yet (only backends that call from _bt_first are capable of
1090
+ * starting primitive index scans, which they indicate by passing first=true).
1091
+ *
1092
+ * If the return value is true, *next_scan_page returns the next page of the
1093
+ * scan, and *last_curr_page returns the page that *next_scan_page came from.
1094
+ * An invalid *next_scan_page means the scan hasn't yet started, or that
1095
+ * caller needs to start the next primitive index scan (if it's the latter
1096
+ * case we'll set so.needPrimScan).
1097
+ *
1098
+ * Callers should ignore the value of *next_scan_page and *last_curr_page if
1099
+ * the return value is false.
1100
+ */
1101
+ bool
1102
+ _bt_parallel_seize_batch (IndexScanDesc scan , BTBatchScanPos pos ,
1103
+ BlockNumber * next_scan_page ,
1104
+ BlockNumber * last_curr_page , bool first )
1105
+ {
1106
+ Relation rel = scan -> indexRelation ;
1107
+ BTScanOpaque so = (BTScanOpaque ) scan -> opaque ;
1108
+ bool exit_loop = false,
1109
+ status = true,
1110
+ endscan = false;
1111
+ ParallelIndexScanDesc parallel_scan = scan -> parallel_scan ;
1112
+ BTParallelScanDesc btscan ;
1113
+
1114
+ * next_scan_page = InvalidBlockNumber ;
1115
+ * last_curr_page = InvalidBlockNumber ;
1116
+
1117
+ /*
1118
+ * Reset so->currPos, and initialize moreLeft/moreRight such that the next
1119
+ * call to _bt_readnextpage treats this backend similarly to a serial
1120
+ * backend that steps from *last_curr_page to *next_scan_page (unless this
1121
+ * backend's so->currPos is initialized by _bt_readfirstpage before then).
1122
+ */
1123
+ BTScanPosInvalidate (so -> currPos );
1124
+ pos -> moreLeft = pos -> moreRight = true;
1125
+
1126
+ if (first )
1127
+ {
1128
+ /*
1129
+ * Initialize array related state when called from _bt_first, assuming
1130
+ * that this will be the first primitive index scan for the scan
1131
+ */
1132
+ so -> needPrimScan = false;
1133
+ so -> scanBehind = false;
1134
+ so -> oppositeDirCheck = false;
1135
+ }
1136
+ else
1137
+ {
1138
+ /*
1139
+ * Don't attempt to seize the scan when it requires another primitive
1140
+ * index scan, since caller's backend cannot start it right now
1141
+ */
1142
+ if (so -> needPrimScan )
1143
+ return false;
1144
+ }
1145
+
1146
+ btscan = (BTParallelScanDesc ) OffsetToPointer (parallel_scan ,
1147
+ parallel_scan -> ps_offset_am );
1148
+
1149
+ while (1 )
1150
+ {
1151
+ LWLockAcquire (& btscan -> btps_lock , LW_EXCLUSIVE );
1152
+
1153
+ if (btscan -> btps_pageStatus == BTPARALLEL_DONE )
1154
+ {
1155
+ /* We're done with this parallel index scan */
1156
+ status = false;
1157
+ }
1158
+ else if (btscan -> btps_pageStatus == BTPARALLEL_IDLE &&
1159
+ btscan -> btps_nextScanPage == P_NONE )
1160
+ {
1161
+ /* End this parallel index scan */
1162
+ status = false;
1163
+ endscan = true;
1164
+ }
1165
+ else if (btscan -> btps_pageStatus == BTPARALLEL_NEED_PRIMSCAN )
1166
+ {
1167
+ Assert (so -> numArrayKeys );
1168
+
1169
+ if (first )
1170
+ {
1171
+ /* Can start scheduled primitive scan right away, so do so */
1172
+ btscan -> btps_pageStatus = BTPARALLEL_ADVANCING ;
1173
+
1174
+ /* Restore scan's array keys from serialized values */
1175
+ _bt_parallel_restore_arrays (rel , btscan , so );
1176
+ exit_loop = true;
1177
+ }
1178
+ else
1179
+ {
1180
+ /*
1181
+ * Don't attempt to seize the scan when it requires another
1182
+ * primitive index scan, since caller's backend cannot start
1183
+ * it right now
1184
+ */
1185
+ status = false;
1186
+ }
1187
+
1188
+ /*
1189
+ * Either way, update backend local state to indicate that a
1190
+ * pending primitive scan is required
1191
+ */
1192
+ so -> needPrimScan = true;
1193
+ so -> scanBehind = false;
1194
+ so -> oppositeDirCheck = false;
1195
+ }
1196
+ else if (btscan -> btps_pageStatus != BTPARALLEL_ADVANCING )
1197
+ {
1198
+ /*
1199
+ * We have successfully seized control of the scan for the purpose
1200
+ * of advancing it to a new page!
1201
+ */
1202
+ btscan -> btps_pageStatus = BTPARALLEL_ADVANCING ;
1203
+ Assert (btscan -> btps_nextScanPage != P_NONE );
1204
+ * next_scan_page = btscan -> btps_nextScanPage ;
1205
+ * last_curr_page = btscan -> btps_lastCurrPage ;
1206
+ exit_loop = true;
1207
+ }
1208
+ LWLockRelease (& btscan -> btps_lock );
1209
+ if (exit_loop || !status )
1210
+ break ;
1211
+ ConditionVariableSleep (& btscan -> btps_cv , WAIT_EVENT_BTREE_PAGE );
1212
+ }
1213
+ ConditionVariableCancelSleep ();
1214
+
1215
+ /* When the scan has reached the rightmost (or leftmost) page, end it */
1216
+ if (endscan )
1217
+ _bt_parallel_done (scan );
1218
+
1219
+ return status ;
1220
+ }
1221
+
903
1222
/*
904
1223
* _bt_parallel_release() -- Complete the process of advancing the scan to a
905
1224
* new page. We now have the new value btps_nextScanPage; another backend
0 commit comments