diff --git a/src/backend/access/gin/ginget.c b/src/backend/access/gin/ginget.c index 4a56f19390dc..f29ccd3c2d1f 100644 --- a/src/backend/access/gin/ginget.c +++ b/src/backend/access/gin/ginget.c @@ -332,8 +332,8 @@ startScanEntry(GinState *ginstate, GinScanEntry entry, Snapshot snapshot) entry->list = NULL; entry->nlist = 0; entry->matchBitmap = NULL; - entry->matchResult = NULL; entry->matchNtuples = -1; + entry->matchResult.blockno = InvalidBlockNumber; entry->reduceResult = false; entry->predictNumberResult = 0; @@ -827,20 +827,19 @@ entryGetItem(GinState *ginstate, GinScanEntry entry, { /* * If we've exhausted all items on this block, move to next block - * in the bitmap. + * in the bitmap. tbm_private_iterate() sets matchResult.blockno + * to InvalidBlockNumber when the bitmap is exhausted. */ - while (entry->matchResult == NULL || - (!entry->matchResult->lossy && + while ((!BlockNumberIsValid(entry->matchResult.blockno)) || + (!entry->matchResult.lossy && entry->offset >= entry->matchNtuples) || - entry->matchResult->blockno < advancePastBlk || + entry->matchResult.blockno < advancePastBlk || (ItemPointerIsLossyPage(&advancePast) && - entry->matchResult->blockno == advancePastBlk)) + entry->matchResult.blockno == advancePastBlk)) { - entry->matchResult = - tbm_private_iterate(entry->matchIterator); - - if (entry->matchResult == NULL) + if (!tbm_private_iterate(entry->matchIterator, &entry->matchResult)) { + Assert(!BlockNumberIsValid(entry->matchResult.blockno)); ItemPointerSetInvalid(&entry->curItem); tbm_end_private_iterate(entry->matchIterator); entry->matchIterator = NULL; @@ -849,14 +848,14 @@ entryGetItem(GinState *ginstate, GinScanEntry entry, } /* Exact pages need their tuple offsets extracted. */ - if (!entry->matchResult->lossy) - entry->matchNtuples = tbm_extract_page_tuple(entry->matchResult, + if (!entry->matchResult.lossy) + entry->matchNtuples = tbm_extract_page_tuple(&entry->matchResult, entry->matchOffsets, TBM_MAX_TUPLES_PER_PAGE); /* * Reset counter to the beginning of entry->matchResult. Note: - * entry->offset is still greater than entry->matchNtuples if + * entry->offset is still greater than matchResult.ntuples if * matchResult is lossy. So, on next call we will get next * result from TIDBitmap. */ @@ -869,10 +868,10 @@ entryGetItem(GinState *ginstate, GinScanEntry entry, * We're now on the first page after advancePast which has any * items on it. If it's a lossy result, return that. */ - if (entry->matchResult->lossy) + if (entry->matchResult.lossy) { ItemPointerSetLossyPage(&entry->curItem, - entry->matchResult->blockno); + entry->matchResult.blockno); /* * We might as well fall out of the loop; we could not @@ -889,7 +888,7 @@ entryGetItem(GinState *ginstate, GinScanEntry entry, Assert(entry->matchNtuples > -1); /* Skip over any offsets <= advancePast, and return that. */ - if (entry->matchResult->blockno == advancePastBlk) + if (entry->matchResult.blockno == advancePastBlk) { Assert(entry->matchNtuples > 0); @@ -910,7 +909,7 @@ entryGetItem(GinState *ginstate, GinScanEntry entry, } ItemPointerSet(&entry->curItem, - entry->matchResult->blockno, + entry->matchResult.blockno, entry->matchOffsets[entry->offset]); entry->offset++; diff --git a/src/backend/access/gin/ginscan.c b/src/backend/access/gin/ginscan.c index f6cdd098a028..c2d1771bd77b 100644 --- a/src/backend/access/gin/ginscan.c +++ b/src/backend/access/gin/ginscan.c @@ -111,7 +111,7 @@ ginFillScanEntry(GinScanOpaque so, OffsetNumber attnum, ItemPointerSetMin(&scanEntry->curItem); scanEntry->matchBitmap = NULL; scanEntry->matchIterator = NULL; - scanEntry->matchResult = NULL; + scanEntry->matchResult.blockno = InvalidBlockNumber; scanEntry->matchNtuples = -1; scanEntry->list = NULL; scanEntry->nlist = 0; diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c index fa7935a0ed39..65e44b25b74e 100644 --- a/src/backend/access/heap/heapam.c +++ b/src/backend/access/heap/heapam.c @@ -279,6 +279,72 @@ heap_scan_stream_read_next_serial(ReadStream *stream, return scan->rs_prefetch_block; } +/* + * Read stream API callback for bitmap heap scans. + * Returns the next block the caller wants from the read stream or + * InvalidBlockNumber when done. + */ +static BlockNumber +bitmapheap_stream_read_next(ReadStream *pgsr, void *private_data, + void *per_buffer_data) +{ + TBMIterateResult *tbmres = per_buffer_data; + BitmapHeapScanDesc bscan = (BitmapHeapScanDesc) private_data; + HeapScanDesc hscan = (HeapScanDesc) bscan; + TableScanDesc sscan = &hscan->rs_base; + + for (;;) + { + CHECK_FOR_INTERRUPTS(); + + /* no more entries in the bitmap */ + if (!tbm_iterate(&sscan->st.rs_tbmiterator, tbmres)) + return InvalidBlockNumber; + + /* + * Ignore any claimed entries past what we think is the end of the + * relation. It may have been extended after the start of our scan (we + * only hold an AccessShareLock, and it could be inserts from this + * backend). We don't take this optimization in SERIALIZABLE + * isolation though, as we need to examine all invisible tuples + * reachable by the index. + */ + if (!IsolationIsSerializable() && + tbmres->blockno >= hscan->rs_nblocks) + continue; + + /* + * We can skip fetching the heap page if we don't need any fields from + * the heap, the bitmap entries don't need rechecking, and all tuples + * on the page are visible to our transaction. + */ + if (!(sscan->rs_flags & SO_NEED_TUPLES) && + !tbmres->recheck && + VM_ALL_VISIBLE(sscan->rs_rd, tbmres->blockno, &bscan->rs_vmbuffer)) + { + OffsetNumber offsets[TBM_MAX_TUPLES_PER_PAGE]; + int noffsets; + + /* can't be lossy in the skip_fetch case */ + Assert(!tbmres->lossy); + Assert(bscan->rs_empty_tuples_pending >= 0); + + /* + * We throw away the offsets, but this is the easiest way to get a + * count of tuples. + */ + noffsets = tbm_extract_page_tuple(tbmres, offsets, TBM_MAX_TUPLES_PER_PAGE); + bscan->rs_empty_tuples_pending += noffsets; + continue; + } + + return tbmres->blockno; + } + + /* not reachable */ + Assert(false); +} + /* ---------------- * initscan - scan code common to heap_beginscan and heap_rescan * ---------------- @@ -1067,6 +1133,7 @@ heap_beginscan(Relation relation, Snapshot snapshot, scan->rs_base.rs_flags = flags; scan->rs_base.rs_parallel = parallel_scan; scan->rs_strategy = NULL; /* set in initscan */ + scan->rs_cbuf = InvalidBuffer; /* * Disable page-at-a-time mode if it's not a MVCC-safe snapshot. @@ -1146,6 +1213,16 @@ heap_beginscan(Relation relation, Snapshot snapshot, scan, 0); } + else if (scan->rs_base.rs_flags & SO_TYPE_BITMAPSCAN) + { + scan->rs_read_stream = read_stream_begin_relation(READ_STREAM_DEFAULT, + scan->rs_strategy, + scan->rs_base.rs_rd, + MAIN_FORKNUM, + bitmapheap_stream_read_next, + scan, + sizeof(TBMIterateResult)); + } return (TableScanDesc) scan; @@ -1180,7 +1257,10 @@ heap_rescan(TableScanDesc sscan, ScanKey key, bool set_params, * unpin scan buffers */ if (BufferIsValid(scan->rs_cbuf)) + { ReleaseBuffer(scan->rs_cbuf); + scan->rs_cbuf = InvalidBuffer; + } if (scan->rs_base.rs_flags & SO_TYPE_BITMAPSCAN) { diff --git a/src/backend/access/heap/heapam_handler.c b/src/backend/access/heap/heapam_handler.c index d74f0fbc5cd1..49267ce8cec2 100644 --- a/src/backend/access/heap/heapam_handler.c +++ b/src/backend/access/heap/heapam_handler.c @@ -56,6 +56,10 @@ static bool SampleHeapTupleVisible(TableScanDesc scan, Buffer buffer, static BlockNumber heapam_scan_get_blocks_done(HeapScanDesc hscan); +static bool BitmapHeapScanNextBlock(TableScanDesc scan, + bool *recheck, + uint64 *lossy_pages, uint64 *exact_pages); + /* ------------------------------------------------------------------------ * Slot related callbacks for heap AM @@ -2116,210 +2120,44 @@ heapam_estimate_rel_size(Relation rel, int32 *attr_widths, */ static bool -heapam_scan_bitmap_next_block(TableScanDesc scan, - BlockNumber *blockno, bool *recheck, - uint64 *lossy_pages, uint64 *exact_pages) +heapam_scan_bitmap_next_tuple(TableScanDesc scan, + TupleTableSlot *slot, + bool *recheck, + uint64 *lossy_pages, + uint64 *exact_pages) { BitmapHeapScanDesc bscan = (BitmapHeapScanDesc) scan; HeapScanDesc hscan = (HeapScanDesc) bscan; - BlockNumber block; - Buffer buffer; - Snapshot snapshot; - int ntup; - TBMIterateResult *tbmres; - OffsetNumber offsets[TBM_MAX_TUPLES_PER_PAGE]; - int noffsets = -1; - - Assert(scan->rs_flags & SO_TYPE_BITMAPSCAN); - - hscan->rs_cindex = 0; - hscan->rs_ntuples = 0; - - *blockno = InvalidBlockNumber; - *recheck = true; - - do - { - CHECK_FOR_INTERRUPTS(); - - tbmres = tbm_iterate(&scan->st.rs_tbmiterator); - - if (tbmres == NULL) - return false; - - /* Exact pages need their tuple offsets extracted. */ - if (!tbmres->lossy) - noffsets = tbm_extract_page_tuple(tbmres, offsets, - TBM_MAX_TUPLES_PER_PAGE); - - /* - * Ignore any claimed entries past what we think is the end of the - * relation. It may have been extended after the start of our scan (we - * only hold an AccessShareLock, and it could be inserts from this - * backend). We don't take this optimization in SERIALIZABLE - * isolation though, as we need to examine all invisible tuples - * reachable by the index. - */ - } while (!IsolationIsSerializable() && - tbmres->blockno >= hscan->rs_nblocks); - - /* Got a valid block */ - *blockno = tbmres->blockno; - *recheck = tbmres->recheck; - - /* - * We can skip fetching the heap page if we don't need any fields from the - * heap, the bitmap entries don't need rechecking, and all tuples on the - * page are visible to our transaction. - */ - if (!(scan->rs_flags & SO_NEED_TUPLES) && - !tbmres->recheck && - VM_ALL_VISIBLE(scan->rs_rd, tbmres->blockno, &bscan->rs_vmbuffer)) - { - /* can't be lossy in the skip_fetch case */ - Assert(!tbmres->lossy); - Assert(bscan->rs_empty_tuples_pending >= 0); - Assert(noffsets > -1); - - bscan->rs_empty_tuples_pending += noffsets; - - return true; - } - - block = tbmres->blockno; - - /* - * Acquire pin on the target heap page, trading in any pin we held before. - */ - hscan->rs_cbuf = ReleaseAndReadBuffer(hscan->rs_cbuf, - scan->rs_rd, - block); - hscan->rs_cblock = block; - buffer = hscan->rs_cbuf; - snapshot = scan->rs_snapshot; - - ntup = 0; - - /* - * Prune and repair fragmentation for the whole page, if possible. - */ - heap_page_prune_opt(scan->rs_rd, buffer); - - /* - * We must hold share lock on the buffer content while examining tuple - * visibility. Afterwards, however, the tuples we have found to be - * visible are guaranteed good as long as we hold the buffer pin. - */ - LockBuffer(buffer, BUFFER_LOCK_SHARE); + OffsetNumber targoffset; + Page page; + ItemId lp; /* - * We need two separate strategies for lossy and non-lossy cases. + * Out of range? If so, nothing more to look at on this page */ - if (!tbmres->lossy) + while (hscan->rs_cindex >= hscan->rs_ntuples) { /* - * Bitmap is non-lossy, so we just look through the offsets listed in - * tbmres; but we have to follow any HOT chain starting at each such - * offset. + * Emit empty tuples before advancing to the next block */ - int curslot; - - /* We must have extracted the tuple offsets by now */ - Assert(noffsets > -1); - - for (curslot = 0; curslot < noffsets; curslot++) + if (bscan->rs_empty_tuples_pending > 0) { - OffsetNumber offnum = offsets[curslot]; - ItemPointerData tid; - HeapTupleData heapTuple; - - ItemPointerSet(&tid, block, offnum); - if (heap_hot_search_buffer(&tid, scan->rs_rd, buffer, snapshot, - &heapTuple, NULL, true)) - hscan->rs_vistuples[ntup++] = ItemPointerGetOffsetNumber(&tid); - } - } - else - { - /* - * Bitmap is lossy, so we must examine each line pointer on the page. - * But we can ignore HOT chains, since we'll check each tuple anyway. - */ - Page page = BufferGetPage(buffer); - OffsetNumber maxoff = PageGetMaxOffsetNumber(page); - OffsetNumber offnum; - - for (offnum = FirstOffsetNumber; offnum <= maxoff; offnum = OffsetNumberNext(offnum)) - { - ItemId lp; - HeapTupleData loctup; - bool valid; - - lp = PageGetItemId(page, offnum); - if (!ItemIdIsNormal(lp)) - continue; - loctup.t_data = (HeapTupleHeader) PageGetItem(page, lp); - loctup.t_len = ItemIdGetLength(lp); - loctup.t_tableOid = scan->rs_rd->rd_id; - ItemPointerSet(&loctup.t_self, block, offnum); - valid = HeapTupleSatisfiesVisibility(&loctup, snapshot, buffer); - if (valid) - { - hscan->rs_vistuples[ntup++] = offnum; - PredicateLockTID(scan->rs_rd, &loctup.t_self, snapshot, - HeapTupleHeaderGetXmin(loctup.t_data)); - } - HeapCheckForSerializableConflictOut(valid, scan->rs_rd, &loctup, - buffer, snapshot); + /* + * If we don't have to fetch the tuple, just return nulls. + */ + ExecStoreAllNullTuple(slot); + bscan->rs_empty_tuples_pending--; + return true; } - } - - LockBuffer(buffer, BUFFER_LOCK_UNLOCK); - - Assert(ntup <= MaxHeapTuplesPerPage); - hscan->rs_ntuples = ntup; - - if (tbmres->lossy) - (*lossy_pages)++; - else - (*exact_pages)++; - - /* - * Return true to indicate that a valid block was found and the bitmap is - * not exhausted. If there are no visible tuples on this page, - * hscan->rs_ntuples will be 0 and heapam_scan_bitmap_next_tuple() will - * return false returning control to this function to advance to the next - * block in the bitmap. - */ - return true; -} - -static bool -heapam_scan_bitmap_next_tuple(TableScanDesc scan, - TupleTableSlot *slot) -{ - BitmapHeapScanDesc bscan = (BitmapHeapScanDesc) scan; - HeapScanDesc hscan = (HeapScanDesc) bscan; - OffsetNumber targoffset; - Page page; - ItemId lp; - if (bscan->rs_empty_tuples_pending > 0) - { /* - * If we don't have to fetch the tuple, just return nulls. + * Returns false if the bitmap is exhausted and there are no further + * blocks we need to scan. */ - ExecStoreAllNullTuple(slot); - bscan->rs_empty_tuples_pending--; - return true; + if (!BitmapHeapScanNextBlock(scan, recheck, lossy_pages, exact_pages)) + return false; } - /* - * Out of range? If so, nothing more to look at on this page - */ - if (hscan->rs_cindex >= hscan->rs_ntuples) - return false; - targoffset = hscan->rs_vistuples[hscan->rs_cindex]; page = BufferGetPage(hscan->rs_cbuf); lp = PageGetItemId(page, targoffset); @@ -2626,6 +2464,177 @@ SampleHeapTupleVisible(TableScanDesc scan, Buffer buffer, } } +/* + * Helper function get the next block of a bitmap heap scan. Returns true when + * it got the next block and saved it in the scan descriptor and false when + * the bitmap and or relation are exhausted. + */ +static bool +BitmapHeapScanNextBlock(TableScanDesc scan, + bool *recheck, + uint64 *lossy_pages, uint64 *exact_pages) +{ + BitmapHeapScanDesc bscan = (BitmapHeapScanDesc) scan; + HeapScanDesc hscan = (HeapScanDesc) bscan; + BlockNumber block; + void *per_buffer_data; + Buffer buffer; + Snapshot snapshot; + int ntup; + TBMIterateResult *tbmres; + OffsetNumber offsets[TBM_MAX_TUPLES_PER_PAGE]; + int noffsets = -1; + + Assert(scan->rs_flags & SO_TYPE_BITMAPSCAN); + Assert(hscan->rs_read_stream); + + hscan->rs_cindex = 0; + hscan->rs_ntuples = 0; + + /* Release buffer containing previous block. */ + if (BufferIsValid(hscan->rs_cbuf)) + { + ReleaseBuffer(hscan->rs_cbuf); + hscan->rs_cbuf = InvalidBuffer; + } + + hscan->rs_cbuf = read_stream_next_buffer(hscan->rs_read_stream, + &per_buffer_data); + + if (BufferIsInvalid(hscan->rs_cbuf)) + { + if (BufferIsValid(bscan->rs_vmbuffer)) + { + ReleaseBuffer(bscan->rs_vmbuffer); + bscan->rs_vmbuffer = InvalidBuffer; + } + + /* + * Bitmap is exhausted. Time to emit empty tuples if relevant. We emit + * all empty tuples at the end instead of emitting them per block we + * skip fetching. This is necessary because the streaming read API + * will only return TBMIterateResults for blocks actually fetched. + * When we skip fetching a block, we keep track of how many empty + * tuples to emit at the end of the BitmapHeapScan. We do not recheck + * all NULL tuples. + */ + *recheck = false; + return bscan->rs_empty_tuples_pending > 0; + } + + Assert(per_buffer_data); + + tbmres = per_buffer_data; + + Assert(BlockNumberIsValid(tbmres->blockno)); + Assert(BufferGetBlockNumber(hscan->rs_cbuf) == tbmres->blockno); + + /* Exact pages need their tuple offsets extracted. */ + if (!tbmres->lossy) + noffsets = tbm_extract_page_tuple(tbmres, offsets, + TBM_MAX_TUPLES_PER_PAGE); + + *recheck = tbmres->recheck; + + block = hscan->rs_cblock = tbmres->blockno; + buffer = hscan->rs_cbuf; + snapshot = scan->rs_snapshot; + + ntup = 0; + + /* + * Prune and repair fragmentation for the whole page, if possible. + */ + heap_page_prune_opt(scan->rs_rd, buffer); + + /* + * We must hold share lock on the buffer content while examining tuple + * visibility. Afterwards, however, the tuples we have found to be + * visible are guaranteed good as long as we hold the buffer pin. + */ + LockBuffer(buffer, BUFFER_LOCK_SHARE); + + /* + * We need two separate strategies for lossy and non-lossy cases. + */ + if (!tbmres->lossy) + { + /* + * Bitmap is non-lossy, so we just look through the offsets listed in + * tbmres; but we have to follow any HOT chain starting at each such + * offset. + */ + int curslot; + + /* We must have extracted the tuple offsets by now */ + Assert(noffsets > -1); + + for (curslot = 0; curslot < noffsets; curslot++) + { + OffsetNumber offnum = offsets[curslot]; + ItemPointerData tid; + HeapTupleData heapTuple; + + ItemPointerSet(&tid, block, offnum); + if (heap_hot_search_buffer(&tid, scan->rs_rd, buffer, snapshot, + &heapTuple, NULL, true)) + hscan->rs_vistuples[ntup++] = ItemPointerGetOffsetNumber(&tid); + } + } + else + { + /* + * Bitmap is lossy, so we must examine each line pointer on the page. + * But we can ignore HOT chains, since we'll check each tuple anyway. + */ + Page page = BufferGetPage(buffer); + OffsetNumber maxoff = PageGetMaxOffsetNumber(page); + OffsetNumber offnum; + + for (offnum = FirstOffsetNumber; offnum <= maxoff; offnum = OffsetNumberNext(offnum)) + { + ItemId lp; + HeapTupleData loctup; + bool valid; + + lp = PageGetItemId(page, offnum); + if (!ItemIdIsNormal(lp)) + continue; + loctup.t_data = (HeapTupleHeader) PageGetItem(page, lp); + loctup.t_len = ItemIdGetLength(lp); + loctup.t_tableOid = scan->rs_rd->rd_id; + ItemPointerSet(&loctup.t_self, block, offnum); + valid = HeapTupleSatisfiesVisibility(&loctup, snapshot, buffer); + if (valid) + { + hscan->rs_vistuples[ntup++] = offnum; + PredicateLockTID(scan->rs_rd, &loctup.t_self, snapshot, + HeapTupleHeaderGetXmin(loctup.t_data)); + } + HeapCheckForSerializableConflictOut(valid, scan->rs_rd, &loctup, + buffer, snapshot); + } + } + + LockBuffer(buffer, BUFFER_LOCK_UNLOCK); + + Assert(ntup <= MaxHeapTuplesPerPage); + hscan->rs_ntuples = ntup; + + if (tbmres->lossy) + (*lossy_pages)++; + else + (*exact_pages)++; + + /* + * Return true to indicate that a valid block was found and the bitmap is + * not exhausted. If there are no visible tuples on this page, + * hscan->rs_ntuples will be 0 and heapam_scan_bitmap_next_tuple() will + * return false returning control to this function to advance to the next + * block in the bitmap. + */ + return true; +} /* ------------------------------------------------------------------------ * Definition of the heap table access method. @@ -2685,7 +2694,6 @@ static const TableAmRoutine heapam_methods = { .relation_estimate_size = heapam_estimate_rel_size, - .scan_bitmap_next_block = heapam_scan_bitmap_next_block, .scan_bitmap_next_tuple = heapam_scan_bitmap_next_tuple, .scan_sample_next_block = heapam_scan_sample_next_block, .scan_sample_next_tuple = heapam_scan_sample_next_tuple diff --git a/src/backend/access/table/tableamapi.c b/src/backend/access/table/tableamapi.c index 760a36fd2a17..476663b66aad 100644 --- a/src/backend/access/table/tableamapi.c +++ b/src/backend/access/table/tableamapi.c @@ -91,9 +91,6 @@ GetTableAmRoutine(Oid amhandler) Assert(routine->relation_estimate_size != NULL); - /* optional, but one callback implies presence of the other */ - Assert((routine->scan_bitmap_next_block == NULL) == - (routine->scan_bitmap_next_tuple == NULL)); Assert(routine->scan_sample_next_block != NULL); Assert(routine->scan_sample_next_tuple != NULL); diff --git a/src/backend/executor/nodeBitmapHeapscan.c b/src/backend/executor/nodeBitmapHeapscan.c index be0d24d901b5..3e33360c0fce 100644 --- a/src/backend/executor/nodeBitmapHeapscan.c +++ b/src/backend/executor/nodeBitmapHeapscan.c @@ -51,10 +51,6 @@ static void BitmapTableScanSetup(BitmapHeapScanState *node); static TupleTableSlot *BitmapHeapNext(BitmapHeapScanState *node); static inline void BitmapDoneInitializingSharedState(ParallelBitmapHeapState *pstate); -static inline void BitmapAdjustPrefetchIterator(BitmapHeapScanState *node); -static inline void BitmapAdjustPrefetchTarget(BitmapHeapScanState *node); -static inline void BitmapPrefetch(BitmapHeapScanState *node, - TableScanDesc scan); static bool BitmapShouldInitializeSharedState(ParallelBitmapHeapState *pstate); @@ -62,14 +58,6 @@ static bool BitmapShouldInitializeSharedState(ParallelBitmapHeapState *pstate); * Do the underlying index scan, build the bitmap, set up the parallel state * needed for parallel workers to iterate through the bitmap, and set up the * underlying table scan descriptor. - * - * For prefetching, we use *two* iterators, one for the pages we are actually - * scanning and another that runs ahead of the first for prefetching. - * node->prefetch_pages tracks exactly how many pages ahead the prefetch - * iterator is. Also, node->prefetch_target tracks the desired prefetch - * distance, which starts small and increases up to the - * node->prefetch_maximum. This is to avoid doing a lot of prefetching in a - * scan that stops after a few tuples because of a LIMIT. */ static void BitmapTableScanSetup(BitmapHeapScanState *node) @@ -102,14 +90,6 @@ BitmapTableScanSetup(BitmapHeapScanState *node) */ pstate->tbmiterator = tbm_prepare_shared_iterate(node->tbm); -#ifdef USE_PREFETCH - if (node->prefetch_maximum > 0) - { - pstate->prefetch_iterator = - tbm_prepare_shared_iterate(node->tbm); - } -#endif /* USE_PREFETCH */ - /* We have initialized the shared state so wake up others. */ BitmapDoneInitializingSharedState(pstate); } @@ -119,15 +99,6 @@ BitmapTableScanSetup(BitmapHeapScanState *node) pstate->tbmiterator : InvalidDsaPointer); -#ifdef USE_PREFETCH - if (node->prefetch_maximum > 0) - node->prefetch_iterator = - tbm_begin_iterate(node->tbm, dsa, - pstate ? - pstate->prefetch_iterator : - InvalidDsaPointer); -#endif /* USE_PREFETCH */ - /* * If this is the first scan of the underlying table, create the table * scan descriptor and begin the scan. @@ -158,7 +129,6 @@ BitmapTableScanSetup(BitmapHeapScanState *node) node->initialized = true; } - /* ---------------------------------------------------------------- * BitmapHeapNext * @@ -168,120 +138,44 @@ BitmapTableScanSetup(BitmapHeapScanState *node) static TupleTableSlot * BitmapHeapNext(BitmapHeapScanState *node) { - ExprContext *econtext; - TableScanDesc scan; - TupleTableSlot *slot; - -#ifdef USE_PREFETCH - ParallelBitmapHeapState *pstate = node->pstate; -#endif - - /* - * extract necessary information from index scan node - */ - econtext = node->ss.ps.ps_ExprContext; - slot = node->ss.ss_ScanTupleSlot; - scan = node->ss.ss_currentScanDesc; + ExprContext *econtext = node->ss.ps.ps_ExprContext; + TupleTableSlot *slot = node->ss.ss_ScanTupleSlot; /* * If we haven't yet performed the underlying index scan, do it, and begin * the iteration over the bitmap. */ if (!node->initialized) - { BitmapTableScanSetup(node); - scan = node->ss.ss_currentScanDesc; - goto new_page; - } - for (;;) + while (table_scan_bitmap_next_tuple(node->ss.ss_currentScanDesc, + slot, &node->recheck, + &node->stats.lossy_pages, + &node->stats.exact_pages)) { - while (table_scan_bitmap_next_tuple(scan, slot)) - { - /* - * Continuing in previously obtained page. - */ - - CHECK_FOR_INTERRUPTS(); - -#ifdef USE_PREFETCH - - /* - * Try to prefetch at least a few pages even before we get to the - * second page if we don't stop reading after the first tuple. - */ - if (!pstate) - { - if (node->prefetch_target < node->prefetch_maximum) - node->prefetch_target++; - } - else if (pstate->prefetch_target < node->prefetch_maximum) - { - /* take spinlock while updating shared state */ - SpinLockAcquire(&pstate->mutex); - if (pstate->prefetch_target < node->prefetch_maximum) - pstate->prefetch_target++; - SpinLockRelease(&pstate->mutex); - } -#endif /* USE_PREFETCH */ - - /* - * We issue prefetch requests *after* fetching the current page to - * try to avoid having prefetching interfere with the main I/O. - * Also, this should happen only when we have determined there is - * still something to do on the current page, else we may - * uselessly prefetch the same page we are just about to request - * for real. - */ - BitmapPrefetch(node, scan); - - /* - * If we are using lossy info, we have to recheck the qual - * conditions at every tuple. - */ - if (node->recheck) - { - econtext->ecxt_scantuple = slot; - if (!ExecQualAndReset(node->bitmapqualorig, econtext)) - { - /* Fails recheck, so drop it and loop back for another */ - InstrCountFiltered2(node, 1); - ExecClearTuple(slot); - continue; - } - } - - /* OK to return this tuple */ - return slot; - } - -new_page: - - BitmapAdjustPrefetchIterator(node); - /* - * Returns false if the bitmap is exhausted and there are no further - * blocks we need to scan. + * Continuing in previously obtained page. */ - if (!table_scan_bitmap_next_block(scan, &node->blockno, - &node->recheck, - &node->stats.lossy_pages, - &node->stats.exact_pages)) - break; + CHECK_FOR_INTERRUPTS(); /* - * If serial, we can error out if the prefetch block doesn't stay - * ahead of the current block. + * If we are using lossy info, we have to recheck the qual conditions + * at every tuple. */ - if (node->pstate == NULL && - !tbm_exhausted(&node->prefetch_iterator) && - node->prefetch_blockno < node->blockno) - elog(ERROR, - "prefetch and main iterators are out of sync. pfblockno: %d. blockno: %d", - node->prefetch_blockno, node->blockno); - - /* Adjust the prefetch target */ - BitmapAdjustPrefetchTarget(node); + if (node->recheck) + { + econtext->ecxt_scantuple = slot; + if (!ExecQualAndReset(node->bitmapqualorig, econtext)) + { + /* Fails recheck, so drop it and loop back for another */ + InstrCountFiltered2(node, 1); + ExecClearTuple(slot); + continue; + } + } + + /* OK to return this tuple */ + return slot; } /* @@ -305,226 +199,6 @@ BitmapDoneInitializingSharedState(ParallelBitmapHeapState *pstate) ConditionVariableBroadcast(&pstate->cv); } -/* - * BitmapAdjustPrefetchIterator - Adjust the prefetch iterator - * - * We keep track of how far the prefetch iterator is ahead of the main - * iterator in prefetch_pages. For each block the main iterator returns, we - * decrement prefetch_pages. - */ -static inline void -BitmapAdjustPrefetchIterator(BitmapHeapScanState *node) -{ -#ifdef USE_PREFETCH - ParallelBitmapHeapState *pstate = node->pstate; - TBMIterateResult *tbmpre; - - if (pstate == NULL) - { - TBMIterator *prefetch_iterator = &node->prefetch_iterator; - - if (node->prefetch_pages > 0) - { - /* The main iterator has closed the distance by one page */ - node->prefetch_pages--; - } - else if (!tbm_exhausted(prefetch_iterator)) - { - tbmpre = tbm_iterate(prefetch_iterator); - node->prefetch_blockno = tbmpre ? tbmpre->blockno : - InvalidBlockNumber; - } - return; - } - - /* - * XXX: There is a known issue with keeping the prefetch and current block - * iterators in sync for parallel bitmap table scans. This can lead to - * prefetching blocks that have already been read. See the discussion - * here: - * https://postgr.es/m/20240315211449.en2jcmdqxv5o6tlz%40alap3.anarazel.de - * Note that moving the call site of BitmapAdjustPrefetchIterator() - * exacerbates the effects of this bug. - */ - if (node->prefetch_maximum > 0) - { - TBMIterator *prefetch_iterator = &node->prefetch_iterator; - - SpinLockAcquire(&pstate->mutex); - if (pstate->prefetch_pages > 0) - { - pstate->prefetch_pages--; - SpinLockRelease(&pstate->mutex); - } - else - { - /* Release the mutex before iterating */ - SpinLockRelease(&pstate->mutex); - - /* - * In case of shared mode, we can not ensure that the current - * blockno of the main iterator and that of the prefetch iterator - * are same. It's possible that whatever blockno we are - * prefetching will be processed by another process. Therefore, - * we don't validate the blockno here as we do in non-parallel - * case. - */ - if (!tbm_exhausted(prefetch_iterator)) - { - tbmpre = tbm_iterate(prefetch_iterator); - node->prefetch_blockno = tbmpre ? tbmpre->blockno : - InvalidBlockNumber; - } - } - } -#endif /* USE_PREFETCH */ -} - -/* - * BitmapAdjustPrefetchTarget - Adjust the prefetch target - * - * Increase prefetch target if it's not yet at the max. Note that - * we will increase it to zero after fetching the very first - * page/tuple, then to one after the second tuple is fetched, then - * it doubles as later pages are fetched. - */ -static inline void -BitmapAdjustPrefetchTarget(BitmapHeapScanState *node) -{ -#ifdef USE_PREFETCH - ParallelBitmapHeapState *pstate = node->pstate; - - if (pstate == NULL) - { - if (node->prefetch_target >= node->prefetch_maximum) - /* don't increase any further */ ; - else if (node->prefetch_target >= node->prefetch_maximum / 2) - node->prefetch_target = node->prefetch_maximum; - else if (node->prefetch_target > 0) - node->prefetch_target *= 2; - else - node->prefetch_target++; - return; - } - - /* Do an unlocked check first to save spinlock acquisitions. */ - if (pstate->prefetch_target < node->prefetch_maximum) - { - SpinLockAcquire(&pstate->mutex); - if (pstate->prefetch_target >= node->prefetch_maximum) - /* don't increase any further */ ; - else if (pstate->prefetch_target >= node->prefetch_maximum / 2) - pstate->prefetch_target = node->prefetch_maximum; - else if (pstate->prefetch_target > 0) - pstate->prefetch_target *= 2; - else - pstate->prefetch_target++; - SpinLockRelease(&pstate->mutex); - } -#endif /* USE_PREFETCH */ -} - -/* - * BitmapPrefetch - Prefetch, if prefetch_pages are behind prefetch_target - */ -static inline void -BitmapPrefetch(BitmapHeapScanState *node, TableScanDesc scan) -{ -#ifdef USE_PREFETCH - ParallelBitmapHeapState *pstate = node->pstate; - - if (pstate == NULL) - { - TBMIterator *prefetch_iterator = &node->prefetch_iterator; - - if (!tbm_exhausted(prefetch_iterator)) - { - while (node->prefetch_pages < node->prefetch_target) - { - TBMIterateResult *tbmpre = tbm_iterate(prefetch_iterator); - bool skip_fetch; - - if (tbmpre == NULL) - { - /* No more pages to prefetch */ - tbm_end_iterate(prefetch_iterator); - break; - } - node->prefetch_pages++; - node->prefetch_blockno = tbmpre->blockno; - - /* - * If we expect not to have to actually read this heap page, - * skip this prefetch call, but continue to run the prefetch - * logic normally. (Would it be better not to increment - * prefetch_pages?) - */ - skip_fetch = (!(scan->rs_flags & SO_NEED_TUPLES) && - !tbmpre->recheck && - VM_ALL_VISIBLE(node->ss.ss_currentRelation, - tbmpre->blockno, - &node->pvmbuffer)); - - if (!skip_fetch) - PrefetchBuffer(scan->rs_rd, MAIN_FORKNUM, tbmpre->blockno); - } - } - - return; - } - - if (pstate->prefetch_pages < pstate->prefetch_target) - { - TBMIterator *prefetch_iterator = &node->prefetch_iterator; - - if (!tbm_exhausted(prefetch_iterator)) - { - while (1) - { - TBMIterateResult *tbmpre; - bool do_prefetch = false; - bool skip_fetch; - - /* - * Recheck under the mutex. If some other process has already - * done enough prefetching then we need not to do anything. - */ - SpinLockAcquire(&pstate->mutex); - if (pstate->prefetch_pages < pstate->prefetch_target) - { - pstate->prefetch_pages++; - do_prefetch = true; - } - SpinLockRelease(&pstate->mutex); - - if (!do_prefetch) - return; - - tbmpre = tbm_iterate(prefetch_iterator); - if (tbmpre == NULL) - { - /* No more pages to prefetch */ - tbm_end_iterate(prefetch_iterator); - break; - } - - node->prefetch_blockno = tbmpre->blockno; - - /* As above, skip prefetch if we expect not to need page */ - skip_fetch = (!(scan->rs_flags & SO_NEED_TUPLES) && - !tbmpre->recheck && - VM_ALL_VISIBLE(node->ss.ss_currentRelation, - tbmpre->blockno, - &node->pvmbuffer)); - - if (!skip_fetch) - PrefetchBuffer(scan->rs_rd, MAIN_FORKNUM, tbmpre->blockno); - } - } - } -#endif /* USE_PREFETCH */ -} - /* * BitmapHeapRecheck -- access method routine to recheck a tuple in EvalPlanQual */ @@ -581,24 +255,12 @@ ExecReScanBitmapHeapScan(BitmapHeapScanState *node) table_rescan(node->ss.ss_currentScanDesc, NULL); } - /* If we did not already clean up the prefetch iterator, do so now. */ - if (!tbm_exhausted(&node->prefetch_iterator)) - tbm_end_iterate(&node->prefetch_iterator); - /* release bitmaps and buffers if any */ if (node->tbm) tbm_free(node->tbm); - if (node->pvmbuffer != InvalidBuffer) - ReleaseBuffer(node->pvmbuffer); node->tbm = NULL; node->initialized = false; - node->pvmbuffer = InvalidBuffer; node->recheck = true; - /* Only used for serial BHS */ - node->blockno = InvalidBlockNumber; - node->prefetch_blockno = InvalidBlockNumber; - node->prefetch_pages = 0; - node->prefetch_target = -1; ExecScanReScan(&node->ss); @@ -667,17 +329,11 @@ ExecEndBitmapHeapScan(BitmapHeapScanState *node) table_endscan(scanDesc); } - /* If we did not already clean up the prefetch iterator, do so now. */ - if (!tbm_exhausted(&node->prefetch_iterator)) - tbm_end_iterate(&node->prefetch_iterator); - /* * release bitmaps and buffers if any */ if (node->tbm) tbm_free(node->tbm); - if (node->pvmbuffer != InvalidBuffer) - ReleaseBuffer(node->pvmbuffer); } /* ---------------------------------------------------------------- @@ -710,18 +366,13 @@ ExecInitBitmapHeapScan(BitmapHeapScan *node, EState *estate, int eflags) scanstate->ss.ps.ExecProcNode = ExecBitmapHeapScan; scanstate->tbm = NULL; - scanstate->pvmbuffer = InvalidBuffer; /* Zero the statistics counters */ memset(&scanstate->stats, 0, sizeof(BitmapHeapScanInstrumentation)); - scanstate->prefetch_pages = 0; - scanstate->prefetch_target = -1; scanstate->initialized = false; scanstate->pstate = NULL; scanstate->recheck = true; - scanstate->blockno = InvalidBlockNumber; - scanstate->prefetch_blockno = InvalidBlockNumber; /* * Miscellaneous initialization @@ -761,13 +412,6 @@ ExecInitBitmapHeapScan(BitmapHeapScan *node, EState *estate, int eflags) scanstate->bitmapqualorig = ExecInitQual(node->bitmapqualorig, (PlanState *) scanstate); - /* - * Maximum number of prefetches for the tablespace if configured, - * otherwise the current value of the effective_io_concurrency GUC. - */ - scanstate->prefetch_maximum = - get_tablespace_io_concurrency(currentRelation->rd_rel->reltablespace); - scanstate->ss.ss_currentRelation = currentRelation; /* @@ -871,12 +515,9 @@ ExecBitmapHeapInitializeDSM(BitmapHeapScanState *node, sinstrument = (SharedBitmapHeapInstrumentation *) ptr; pstate->tbmiterator = 0; - pstate->prefetch_iterator = 0; /* Initialize the mutex */ SpinLockInit(&pstate->mutex); - pstate->prefetch_pages = 0; - pstate->prefetch_target = -1; pstate->state = BM_INITIAL; ConditionVariableInit(&pstate->cv); @@ -913,17 +554,11 @@ ExecBitmapHeapReInitializeDSM(BitmapHeapScanState *node, return; pstate->state = BM_INITIAL; - pstate->prefetch_pages = 0; - pstate->prefetch_target = -1; if (DsaPointerIsValid(pstate->tbmiterator)) tbm_free_shared_area(dsa, pstate->tbmiterator); - if (DsaPointerIsValid(pstate->prefetch_iterator)) - tbm_free_shared_area(dsa, pstate->prefetch_iterator); - pstate->tbmiterator = InvalidDsaPointer; - pstate->prefetch_iterator = InvalidDsaPointer; } /* ---------------------------------------------------------------- diff --git a/src/backend/nodes/tidbitmap.c b/src/backend/nodes/tidbitmap.c index 3d835024caa2..41031aa8f2fa 100644 --- a/src/backend/nodes/tidbitmap.c +++ b/src/backend/nodes/tidbitmap.c @@ -172,7 +172,6 @@ struct TBMPrivateIterator int spageptr; /* next spages index */ int schunkptr; /* next schunks index */ int schunkbit; /* next bit to check in current schunk */ - TBMIterateResult output; }; /* @@ -213,7 +212,6 @@ struct TBMSharedIterator PTEntryArray *ptbase; /* pagetable element array */ PTIterationArray *ptpages; /* sorted exact page index list */ PTIterationArray *ptchunks; /* sorted lossy page index list */ - TBMIterateResult output; }; /* Local function prototypes */ @@ -957,21 +955,28 @@ tbm_advance_schunkbit(PagetableEntry *chunk, int *schunkbitp) /* * tbm_private_iterate - scan through next page of a TIDBitmap * - * Returns a TBMIterateResult representing one page, or NULL if there are - * no more pages to scan. Pages are guaranteed to be delivered in numerical - * order. If lossy is true, then the bitmap is "lossy" and failed to - * remember the exact tuples to look at on this page --- the caller must - * examine all tuples on the page and check if they meet the intended - * condition. result->ntuples is set to -1 when the bitmap is lossy. - * If result->recheck is true, only the indicated tuples need - * be examined, but the condition must be rechecked anyway. (For ease of - * testing, recheck is always set true when lossy is true.) + * Caller must pass in a TBMIterateResult to be filled. + * + * Pages are guaranteed to be delivered in numerical order. + * + * Returns false when there are no more pages to scan and true otherwise. When + * there are no more pages to scan, tbmres->blockno is set to + * InvalidBlockNumber. + * + * If lossy is true, then the bitmap is "lossy" and failed to remember + * the exact tuples to look at on this page --- the caller must examine all + * tuples on the page and check if they meet the intended condition. If lossy + * is false, the caller must later extract the tuple offsets from the page + * pointed to by internal_page with tbm_extract_page_tuple. + * + * If tbmres->recheck is true, only the indicated tuples need be examined, but + * the condition must be rechecked anyway. (For ease of testing, recheck is + * always set true when lossy is true.) */ -TBMIterateResult * -tbm_private_iterate(TBMPrivateIterator *iterator) +bool +tbm_private_iterate(TBMPrivateIterator *iterator, TBMIterateResult *tbmres) { TIDBitmap *tbm = iterator->tbm; - TBMIterateResult *output = &(iterator->output); Assert(tbm->iterating == TBM_ITERATING_PRIVATE); @@ -1009,12 +1014,12 @@ tbm_private_iterate(TBMPrivateIterator *iterator) chunk_blockno < tbm->spages[iterator->spageptr]->blockno) { /* Return a lossy page indicator from the chunk */ - output->blockno = chunk_blockno; - output->lossy = true; - output->recheck = true; - output->internal_page = NULL; + tbmres->blockno = chunk_blockno; + tbmres->lossy = true; + tbmres->recheck = true; + tbmres->internal_page = NULL; iterator->schunkbit++; - return output; + return true; } } @@ -1028,16 +1033,17 @@ tbm_private_iterate(TBMPrivateIterator *iterator) else page = tbm->spages[iterator->spageptr]; - output->internal_page = page; - output->blockno = page->blockno; - output->lossy = false; - output->recheck = page->recheck; + tbmres->internal_page = page; + tbmres->blockno = page->blockno; + tbmres->lossy = false; + tbmres->recheck = page->recheck; iterator->spageptr++; - return output; + return true; } /* Nothing more in the bitmap */ - return NULL; + tbmres->blockno = InvalidBlockNumber; + return false; } /* @@ -1047,10 +1053,9 @@ tbm_private_iterate(TBMPrivateIterator *iterator) * across multiple processes. We need to acquire the iterator LWLock, * before accessing the shared members. */ -TBMIterateResult * -tbm_shared_iterate(TBMSharedIterator *iterator) +bool +tbm_shared_iterate(TBMSharedIterator *iterator, TBMIterateResult *tbmres) { - TBMIterateResult *output = &iterator->output; TBMSharedIteratorState *istate = iterator->state; PagetableEntry *ptbase = NULL; int *idxpages = NULL; @@ -1101,14 +1106,14 @@ tbm_shared_iterate(TBMSharedIterator *iterator) chunk_blockno < ptbase[idxpages[istate->spageptr]].blockno) { /* Return a lossy page indicator from the chunk */ - output->blockno = chunk_blockno; - output->lossy = true; - output->recheck = true; - output->internal_page = NULL; + tbmres->blockno = chunk_blockno; + tbmres->lossy = true; + tbmres->recheck = true; + tbmres->internal_page = NULL; istate->schunkbit++; LWLockRelease(&istate->lock); - return output; + return true; } } @@ -1116,21 +1121,22 @@ tbm_shared_iterate(TBMSharedIterator *iterator) { PagetableEntry *page = &ptbase[idxpages[istate->spageptr]]; - output->internal_page = page; - output->blockno = page->blockno; - output->lossy = false; - output->recheck = page->recheck; + tbmres->internal_page = page; + tbmres->blockno = page->blockno; + tbmres->lossy = false; + tbmres->recheck = page->recheck; istate->spageptr++; LWLockRelease(&istate->lock); - return output; + return true; } LWLockRelease(&istate->lock); /* Nothing more in the bitmap */ - return NULL; + tbmres->blockno = InvalidBlockNumber; + return false; } /* @@ -1604,15 +1610,17 @@ tbm_end_iterate(TBMIterator *iterator) } /* - * Get the next TBMIterateResult from the shared or private bitmap iterator. + * Populate the next TBMIterateResult using the shared or private bitmap + * iterator. Returns false when there is nothing more to scan. */ -TBMIterateResult * -tbm_iterate(TBMIterator *iterator) +bool +tbm_iterate(TBMIterator *iterator, TBMIterateResult *tbmres) { Assert(iterator); + Assert(tbmres); if (iterator->shared) - return tbm_shared_iterate(iterator->i.shared_iterator); + return tbm_shared_iterate(iterator->i.shared_iterator, tbmres); else - return tbm_private_iterate(iterator->i.private_iterator); + return tbm_private_iterate(iterator->i.private_iterator, tbmres); } diff --git a/src/backend/optimizer/util/plancat.c b/src/backend/optimizer/util/plancat.c index 71abb01f6558..0489ad366446 100644 --- a/src/backend/optimizer/util/plancat.c +++ b/src/backend/optimizer/util/plancat.c @@ -325,7 +325,7 @@ get_relation_info(PlannerInfo *root, Oid relationObjectId, bool inhparent, info->amcanparallel = amroutine->amcanparallel; info->amhasgettuple = (amroutine->amgettuple != NULL); info->amhasgetbitmap = amroutine->amgetbitmap != NULL && - relation->rd_tableam->scan_bitmap_next_block != NULL; + relation->rd_tableam->scan_bitmap_next_tuple != NULL; info->amcanmarkpos = (amroutine->ammarkpos != NULL && amroutine->amrestrpos != NULL); info->amcostestimate = amroutine->amcostestimate; diff --git a/src/backend/storage/aio/read_stream.c b/src/backend/storage/aio/read_stream.c index 36fb9fe152cf..51c15330117c 100644 --- a/src/backend/storage/aio/read_stream.c +++ b/src/backend/storage/aio/read_stream.c @@ -17,30 +17,12 @@ * pending read. When that isn't possible, the existing pending read is sent * to StartReadBuffers() so that a new one can begin to form. * - * The algorithm for controlling the look-ahead distance tries to classify the - * stream into three ideal behaviors: + * The algorithm for controlling the look-ahead distance is based on recent + * cache hits and misses: * - * A) No I/O is necessary, because the requested blocks are fully cached - * already. There is no benefit to looking ahead more than one block, so - * distance is 1. This is the default initial assumption. - * - * B) I/O is necessary, but read-ahead advice is undesirable because the - * access is sequential and we can rely on the kernel's read-ahead heuristics, - * or impossible because direct I/O is enabled, or the system doesn't support - * read-ahead advice. There is no benefit in looking ahead more than - * io_combine_limit, because in this case the only goal is larger read system - * calls. Looking further ahead would pin many buffers and perform - * speculative work for no benefit. - * - * C) I/O is necessary, it appears to be random, and this system supports - * read-ahead advice. We'll look further ahead in order to reach the - * configured level of I/O concurrency. - * - * The distance increases rapidly and decays slowly, so that it moves towards - * those levels as different I/O patterns are discovered. For example, a - * sequential scan of fully cached data doesn't bother looking ahead, but a - * sequential scan that hits a region of uncached blocks will start issuing - * increasingly wide read calls until it plateaus at io_combine_limit. + * When no I/O is necessary, there is no point in looking ahead more than one + * block. This is the default initial assumption. Otherwise rapidly increase + * the distance to try to benefit from I/O combining and I/O concurrency. * * The main data structure is a circular queue of buffers of size * max_pinned_buffers plus some extra space for technical reasons, ready to be @@ -113,9 +95,12 @@ struct ReadStream int16 ios_in_progress; int16 queue_size; int16 max_pinned_buffers; + int16 forwarded_buffers; int16 pinned_buffers; int16 distance; + int16 initialized_buffers; bool advice_enabled; + bool temporary; /* * One-block buffer to support 'ungetting' a block number, to resolve flow @@ -132,6 +117,7 @@ struct ReadStream /* Next expected block, for detecting sequential access. */ BlockNumber seq_blocknum; + BlockNumber seq_until_processed; /* The read operation we are currently preparing. */ BlockNumber pending_read_blocknum; @@ -225,21 +211,34 @@ read_stream_unget_block(ReadStream *stream, BlockNumber blocknum) stream->buffered_blocknum = blocknum; } -static void -read_stream_start_pending_read(ReadStream *stream, bool suppress_advice) +/* + * Start as much of the current pending read as we can. If we have to split it + * because of the per-backend buffer limit, or the buffer manager decides to + * split it, then the pending read is adjusted to hold the remaining portion. + * + * We can always start a read of at least size one if we have no progress yet. + * Otherwise it's possible that we can't start a read at all because of a lack + * of buffers, and then false is returned. Buffer shortages also reduce the + * distance to a level that prevents look-ahead until buffers are released. + */ +static bool +read_stream_start_pending_read(ReadStream *stream) { bool need_wait; + int requested_nblocks; int nblocks; - int flags; + int flags = 0; + int forwarded; int16 io_index; int16 overflow; int16 buffer_index; + int16 buffer_limit; /* This should only be called with a pending read. */ Assert(stream->pending_read_nblocks > 0); Assert(stream->pending_read_nblocks <= stream->io_combine_limit); - /* We had better not exceed the pin limit by starting this read. */ + /* We had better not exceed the per-stream buffer limit with this read. */ Assert(stream->pinned_buffers + stream->pending_read_nblocks <= stream->max_pinned_buffers); @@ -249,21 +248,101 @@ read_stream_start_pending_read(ReadStream *stream, bool suppress_advice) else Assert(stream->next_buffer_index == stream->oldest_buffer_index); + /* Do we need to issue read-ahead advice? */ + if (stream->advice_enabled) + { + bool no_wait; + + /* + * We only issue advice if we won't immediately have to call + * WaitReadBuffers(). + */ + no_wait = stream->pinned_buffers > 0 || + stream->pending_read_nblocks < stream->distance; + + if (stream->pending_read_blocknum == stream->seq_blocknum) + { + /* + * Sequential: issue advice only until the WaitReadBuffers() calls + * catch up with the first advice issued for this sequential + * region, so the kernel can see sequential access. + */ + if (stream->seq_until_processed != InvalidBlockNumber && no_wait) + flags = READ_BUFFERS_ISSUE_ADVICE; + } + else + { + /* Random jump: start tracking new region. */ + stream->seq_until_processed = stream->pending_read_blocknum; + if (no_wait) + flags = READ_BUFFERS_ISSUE_ADVICE; + } + } + /* - * If advice hasn't been suppressed, this system supports it, and this - * isn't a strictly sequential pattern, then we'll issue advice. + * Compute the remaining portion of the per-backend buffer limit. If we + * already have some forwarded buffers, we can certainly use those. They + * are already pinned, and are mapped to the starting blocks of the pending + * read, they just don't have any I/O started yet and are not counted in + * stream->pinned_buffers. */ - if (!suppress_advice && - stream->advice_enabled && - stream->pending_read_blocknum != stream->seq_blocknum) - flags = READ_BUFFERS_ISSUE_ADVICE; + if (stream->temporary) + buffer_limit = Min(GetAdditionalLocalPinLimit(), PG_INT16_MAX); else - flags = 0; + buffer_limit = Min(GetAdditionalPinLimit(), PG_INT16_MAX); + Assert(stream->forwarded_buffers <= stream->pending_read_nblocks); + buffer_limit += stream->forwarded_buffers; + if (buffer_limit == 0 && stream->pinned_buffers == 0) + buffer_limit = 1; /* guarantee progress */ + + /* Does the per-backend buffer limit affect this read? */ + nblocks = stream->pending_read_nblocks; + if (buffer_limit < nblocks) + { + int16 new_distance; - /* We say how many blocks we want to read, but may be smaller on return. */ + /* Shrink distance: no more look-ahead until buffers are released. */ + new_distance = stream->pinned_buffers + buffer_limit; + if (stream->distance > new_distance) + stream->distance = new_distance; + + /* If we've already made progress, just give up and wait for buffers. */ + if (stream->pinned_buffers > 0) + return false; + + /* A short read is required to make progress. */ + nblocks = buffer_limit; + } + + /* + * We say how many blocks we want to read, but it may be smaller on return + * if the buffer manager decides it needs a short read at its level. + */ + requested_nblocks = Min(buffer_limit, stream->pending_read_nblocks); + nblocks = requested_nblocks; buffer_index = stream->next_buffer_index; io_index = stream->next_io_index; - nblocks = stream->pending_read_nblocks; + + /* + * The first time around the queue we initialize it as we go, including + * the overflow zone, because otherwise the entries would appear as + * forwarded buffers. This avoids initializing the whole queue up front + * in cases where it is large but we don't ever use it due to the + * all-cached fast path or small scans. + */ + while (stream->initialized_buffers < buffer_index + nblocks) + stream->buffers[stream->initialized_buffers++] = InvalidBuffer; + + /* + * Start the I/O. Any buffers that are not InvalidBuffer will be + * interpreted as already pinned, forwarded by an earlier call to + * StartReadBuffers(), and must map to the expected blocks. The nblocks + * value may be smaller on return indicating the size of the I/O that + * could be started. Buffers beyond the output nblocks number may also + * have been pinned without starting I/O due to various edge cases. In + * that case we'll just leave them in the queue ahead of us, "forwarded" + * to the next call, avoiding the need to unpin/repin. + */ need_wait = StartReadBuffers(&stream->ios[io_index].op, &stream->buffers[buffer_index], stream->pending_read_blocknum, @@ -274,7 +353,7 @@ read_stream_start_pending_read(ReadStream *stream, bool suppress_advice) /* Remember whether we need to wait before returning this buffer. */ if (!need_wait) { - /* Look-ahead distance decays, no I/O necessary (behavior A). */ + /* Look-ahead distance decays, no I/O necessary. */ if (stream->distance > 1) stream->distance--; } @@ -292,16 +371,35 @@ read_stream_start_pending_read(ReadStream *stream, bool suppress_advice) stream->seq_blocknum = stream->pending_read_blocknum + nblocks; } + /* + * How many pins were acquired but forwarded to the next call? These need + * to be passed to the next StartReadBuffers() call, or released if the + * stream ends early. We need the number for accounting purposes, since + * they are not counted in stream->pinned_buffers but we already hold + * them. + */ + forwarded = 0; + while (nblocks + forwarded < requested_nblocks && + stream->buffers[buffer_index + nblocks + forwarded] != InvalidBuffer) + forwarded++; + stream->forwarded_buffers = forwarded; + /* * We gave a contiguous range of buffer space to StartReadBuffers(), but - * we want it to wrap around at queue_size. Slide overflowing buffers to - * the front of the array. + * we want it to wrap around at queue_size. Copy overflowing buffers to + * the front of the array where they'll be consumed, but also leave a copy + * in the overflow zone which the I/O operation has a pointer to (it needs + * a contiguous array). Both copies will be cleared when the buffers are + * handed to the consumer. */ - overflow = (buffer_index + nblocks) - stream->queue_size; + overflow = (buffer_index + nblocks + forwarded) - stream->queue_size; if (overflow > 0) - memmove(&stream->buffers[0], - &stream->buffers[stream->queue_size], - sizeof(stream->buffers[0]) * overflow); + { + Assert(overflow < stream->queue_size); /* can't overlap */ + memcpy(&stream->buffers[0], + &stream->buffers[stream->queue_size], + sizeof(stream->buffers[0]) * overflow); + } /* Compute location of start of next read, without using % operator. */ buffer_index += nblocks; @@ -313,10 +411,12 @@ read_stream_start_pending_read(ReadStream *stream, bool suppress_advice) /* Adjust the pending read to cover the remaining portion, if any. */ stream->pending_read_blocknum += nblocks; stream->pending_read_nblocks -= nblocks; + + return true; } static void -read_stream_look_ahead(ReadStream *stream, bool suppress_advice) +read_stream_look_ahead(ReadStream *stream) { while (stream->ios_in_progress < stream->max_ios && stream->pinned_buffers + stream->pending_read_nblocks < stream->distance) @@ -327,8 +427,7 @@ read_stream_look_ahead(ReadStream *stream, bool suppress_advice) if (stream->pending_read_nblocks == stream->io_combine_limit) { - read_stream_start_pending_read(stream, suppress_advice); - suppress_advice = false; + read_stream_start_pending_read(stream); continue; } @@ -361,11 +460,10 @@ read_stream_look_ahead(ReadStream *stream, bool suppress_advice) /* We have to start the pending read before we can build another. */ while (stream->pending_read_nblocks > 0) { - read_stream_start_pending_read(stream, suppress_advice); - suppress_advice = false; - if (stream->ios_in_progress == stream->max_ios) + if (!read_stream_start_pending_read(stream) || + stream->ios_in_progress == stream->max_ios) { - /* And we've hit the limit. Rewind, and stop here. */ + /* And we've hit a buffer or I/O limit. Rewind and wait. */ read_stream_unget_block(stream, blocknum); return; } @@ -382,15 +480,25 @@ read_stream_look_ahead(ReadStream *stream, bool suppress_advice) * io_combine_limit size once more buffers have been consumed. However, * if we've already reached io_combine_limit, or we've reached the * distance limit and there isn't anything pinned yet, or the callback has - * signaled end-of-stream, we start the read immediately. + * signaled end-of-stream, we start the read immediately. Note that the + * pending read could even exceed the distance goal, if the latter was + * reduced on buffer limit exhaustion. */ if (stream->pending_read_nblocks > 0 && (stream->pending_read_nblocks == stream->io_combine_limit || - (stream->pending_read_nblocks == stream->distance && + (stream->pending_read_nblocks >= stream->distance && stream->pinned_buffers == 0) || stream->distance == 0) && stream->ios_in_progress < stream->max_ios) - read_stream_start_pending_read(stream, suppress_advice); + read_stream_start_pending_read(stream); + + /* + * There should always be something pinned when we leave this function, + * whether started by this call or not, unless we've hit the end of the + * stream. In the worst case we can always make progress one buffer at a + * time. + */ + Assert(stream->pinned_buffers > 0 || stream->distance == 0); } /* @@ -420,6 +528,7 @@ read_stream_begin_impl(int flags, int max_ios; int strategy_pin_limit; uint32 max_pinned_buffers; + uint32 max_possible_buffer_limit; Oid tablespace_id; /* @@ -444,6 +553,15 @@ read_stream_begin_impl(int flags, else max_ios = get_tablespace_io_concurrency(tablespace_id); + /* + * XXX Since we don't have asynchronous I/O yet, if direct I/O is enabled + * then just behave as though I/O concurrency is set to 0. Otherwise we + * would look ahead pinning many buffers for no benefit, for lack of + * advice and AIO. + */ + if (io_direct_flags & IO_DIRECT_DATA) + max_ios = 0; + /* Cap to INT16_MAX to avoid overflowing below */ max_ios = Min(max_ios, PG_INT16_MAX); @@ -475,12 +593,23 @@ read_stream_begin_impl(int flags, strategy_pin_limit = GetAccessStrategyPinLimit(strategy); max_pinned_buffers = Min(strategy_pin_limit, max_pinned_buffers); - /* Don't allow this backend to pin more than its share of buffers. */ + /* + * Also limit our queue to the maximum number of pins we could possibly + * ever be allowed to acquire according to the buffer manager. We may not + * really be able to use them all due to other pins held by this backend, + * but we'll check that later in read_stream_start_pending_read(). + */ if (SmgrIsTemp(smgr)) - LimitAdditionalLocalPins(&max_pinned_buffers); + max_possible_buffer_limit = GetSoftLocalPinLimit(); else - LimitAdditionalPins(&max_pinned_buffers); - Assert(max_pinned_buffers > 0); + max_possible_buffer_limit = GetSoftPinLimit(); + max_pinned_buffers = Min(max_pinned_buffers, max_possible_buffer_limit); + + /* + * The soft limit might be zero on a system configured with more + * connections than buffers. We need at least one to make progress. + */ + max_pinned_buffers = Max(1, max_pinned_buffers); /* * We need one extra entry for buffers and per-buffer data, because users @@ -546,11 +675,14 @@ read_stream_begin_impl(int flags, stream->callback = callback; stream->callback_private_data = callback_private_data; stream->buffered_blocknum = InvalidBlockNumber; + stream->seq_blocknum = InvalidBlockNumber; + stream->seq_until_processed = InvalidBlockNumber; + stream->temporary = SmgrIsTemp(smgr); /* * Skip the initial ramp-up phase if the caller says we're going to be * reading the whole relation. This way we start out assuming we'll be - * doing full io_combine_limit sized reads (behavior B). + * doing full io_combine_limit sized reads. */ if (flags & READ_STREAM_FULL) stream->distance = Min(max_pinned_buffers, stream->io_combine_limit); @@ -641,10 +773,10 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data) #ifndef READ_STREAM_DISABLE_FAST_PATH /* - * A fast path for all-cached scans (behavior A). This is the same as the - * usual algorithm, but it is specialized for no I/O and no per-buffer - * data, so we can skip the queue management code, stay in the same buffer - * slot and use singular StartReadBuffer(). + * A fast path for all-cached scans. This is the same as the usual + * algorithm, but it is specialized for no I/O and no per-buffer data, so + * we can skip the queue management code, stay in the same buffer slot and + * use singular StartReadBuffer(). */ if (likely(stream->fast_path)) { @@ -652,10 +784,12 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data) /* Fast path assumptions. */ Assert(stream->ios_in_progress == 0); + Assert(stream->forwarded_buffers == 0); Assert(stream->pinned_buffers == 1); Assert(stream->distance == 1); Assert(stream->pending_read_nblocks == 0); Assert(stream->per_buffer_data_size == 0); + Assert(stream->initialized_buffers > stream->oldest_buffer_index); /* We're going to return the buffer we pinned last time. */ oldest_buffer_index = stream->oldest_buffer_index; @@ -674,6 +808,12 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data) * arbitrary I/O entry (they're all free). We don't have to * adjust pinned_buffers because we're transferring one to caller * but pinning one more. + * + * In the fast path we don't need to check the pin limit. We're + * always allowed at least one pin so that progress can be made, + * and that's all we need here. Although two pins are momentarily + * held at the same time, the model used here is that the stream + * holds only one, and the other now belongs to the caller. */ if (likely(!StartReadBuffer(&stream->ios[0].op, &stream->buffers[oldest_buffer_index], @@ -698,6 +838,7 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data) stream->distance = 0; stream->oldest_buffer_index = stream->next_buffer_index; stream->pinned_buffers = 0; + stream->buffers[oldest_buffer_index] = InvalidBuffer; } stream->fast_path = false; @@ -719,7 +860,7 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data) * space for more, but if we're just starting up we'll need to crank * the handle to get started. */ - read_stream_look_ahead(stream, true); + read_stream_look_ahead(stream); /* End of stream reached? */ if (stream->pinned_buffers == 0) @@ -758,34 +899,31 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data) if (++stream->oldest_io_index == stream->max_ios) stream->oldest_io_index = 0; - if (stream->ios[io_index].op.flags & READ_BUFFERS_ISSUE_ADVICE) - { - /* Distance ramps up fast (behavior C). */ - distance = stream->distance * 2; - distance = Min(distance, stream->max_pinned_buffers); - stream->distance = distance; - } - else - { - /* No advice; move towards io_combine_limit (behavior B). */ - if (stream->distance > stream->io_combine_limit) - { - stream->distance--; - } - else - { - distance = stream->distance * 2; - distance = Min(distance, stream->io_combine_limit); - distance = Min(distance, stream->max_pinned_buffers); - stream->distance = distance; - } - } + /* Look-ahead distance ramps up quickly after we do I/O. */ + distance = stream->distance * 2; + distance = Min(distance, stream->max_pinned_buffers); + stream->distance = distance; + + /* + * If we've caught up with the first advice issued for the current + * sequential region, cancel further advice until the next random + * jump. The kernel should be able to see the pattern now that we're + * actually making sequential preadv() calls. + */ + if (stream->advice_enabled && + stream->ios[io_index].op.blocknum == stream->seq_until_processed) + stream->seq_until_processed = InvalidBlockNumber; } -#ifdef CLOBBER_FREED_MEMORY - /* Clobber old buffer for debugging purposes. */ + /* + * We must zap this queue entry, or else it would appear as a forwarded + * buffer. If it's potentially in the overflow zone (ie it wrapped around + * the queue), also zap that copy. + */ stream->buffers[oldest_buffer_index] = InvalidBuffer; -#endif + if (oldest_buffer_index < io_combine_limit - 1) + stream->buffers[stream->queue_size + oldest_buffer_index] = + InvalidBuffer; #if defined(CLOBBER_FREED_MEMORY) || defined(USE_VALGRIND) @@ -825,11 +963,12 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data) stream->oldest_buffer_index = 0; /* Prepare for the next call. */ - read_stream_look_ahead(stream, false); + read_stream_look_ahead(stream); #ifndef READ_STREAM_DISABLE_FAST_PATH /* See if we can take the fast path for all-cached scans next time. */ if (stream->ios_in_progress == 0 && + stream->forwarded_buffers == 0 && stream->pinned_buffers == 1 && stream->distance == 1 && stream->pending_read_nblocks == 0 && @@ -865,6 +1004,7 @@ read_stream_next_block(ReadStream *stream, BufferAccessStrategy *strategy) void read_stream_reset(ReadStream *stream) { + int16 index; Buffer buffer; /* Stop looking ahead. */ @@ -874,10 +1014,31 @@ read_stream_reset(ReadStream *stream) stream->buffered_blocknum = InvalidBlockNumber; stream->fast_path = false; + /* There is no point in reading whatever was pending. */ + stream->pending_read_nblocks = 0; + /* Unpin anything that wasn't consumed. */ while ((buffer = read_stream_next_buffer(stream, NULL)) != InvalidBuffer) ReleaseBuffer(buffer); + /* Unpin any unused forwarded buffers. */ + index = stream->next_buffer_index; + while (index < stream->initialized_buffers && + (buffer = stream->buffers[index]) != InvalidBuffer) + { + Assert(stream->forwarded_buffers > 0); + stream->forwarded_buffers--; + ReleaseBuffer(buffer); + + stream->buffers[index] = InvalidBuffer; + if (index < io_combine_limit - 1) + stream->buffers[stream->queue_size + index] = InvalidBuffer; + + if (++index == stream->queue_size) + index = 0; + } + + Assert(stream->forwarded_buffers == 0); Assert(stream->pinned_buffers == 0); Assert(stream->ios_in_progress == 0); diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c index 7915ed624c12..d56bff96cec4 100644 --- a/src/backend/storage/buffer/bufmgr.c +++ b/src/backend/storage/buffer/bufmgr.c @@ -211,6 +211,8 @@ static int32 PrivateRefCountOverflowed = 0; static uint32 PrivateRefCountClock = 0; static PrivateRefCountEntry *ReservedRefCountEntry = NULL; +static uint32 MaxProportionalPins; + static void ReservePrivateRefCountEntry(void); static PrivateRefCountEntry *NewPrivateRefCountEntry(Buffer buffer); static PrivateRefCountEntry *GetPrivateRefCountEntry(Buffer buffer, bool do_move); @@ -1255,10 +1257,10 @@ StartReadBuffersImpl(ReadBuffersOperation *operation, Buffer *buffers, BlockNumber blockNum, int *nblocks, - int flags) + int flags, + bool allow_forwarding) { int actual_nblocks = *nblocks; - int io_buffers_len = 0; int maxcombine = 0; Assert(*nblocks > 0); @@ -1268,30 +1270,80 @@ StartReadBuffersImpl(ReadBuffersOperation *operation, { bool found; - buffers[i] = PinBufferForBlock(operation->rel, - operation->smgr, - operation->persistence, - operation->forknum, - blockNum + i, - operation->strategy, - &found); + if (allow_forwarding && buffers[i] != InvalidBuffer) + { + BufferDesc *bufHdr; + + /* + * This is a buffer that was pinned by an earlier call to + * StartReadBuffers(), but couldn't be handled in one operation at + * that time. The operation was split, and the caller has passed + * an already pinned buffer back to us to handle the rest of the + * operation. It must continue at the expected block number. + */ + Assert(BufferGetBlockNumber(buffers[i]) == blockNum + i); + + /* + * It might be an already valid buffer (a hit) that followed the + * final contiguous block of an earlier I/O (a miss) marking the + * end of it, or a buffer that some other backend has since made + * valid by performing the I/O for us, in which case we can handle + * it as a hit now. It is safe to check for a BM_VALID flag with + * a relaxed load, because we got a fresh view of it while pinning + * it in the previous call. + * + * On the other hand if we don't see BM_VALID yet, it must be an + * I/O that was split by the previous call and we need to try to + * start a new I/O from this block. We're also racing against any + * other backend that might start the I/O or even manage to mark + * it BM_VALID after this check, BM_VALID after this check, but + * StartBufferIO() will handle those cases. + */ + if (BufferIsLocal(buffers[i])) + bufHdr = GetLocalBufferDescriptor(-buffers[i] - 1); + else + bufHdr = GetBufferDescriptor(buffers[i] - 1); + found = pg_atomic_read_u32(&bufHdr->state) & BM_VALID; + } + else + { + buffers[i] = PinBufferForBlock(operation->rel, + operation->smgr, + operation->persistence, + operation->forknum, + blockNum + i, + operation->strategy, + &found); + } if (found) { /* - * Terminate the read as soon as we get a hit. It could be a - * single buffer hit, or it could be a hit that follows a readable - * range. We don't want to create more than one readable range, - * so we stop here. + * We have a hit. If it's the first block in the requested range, + * we can return it immediately and report that WaitReadBuffers() + * does not need to be called. If the initial value of *nblocks + * was larger, the caller will have to call again for the rest. */ - actual_nblocks = i + 1; + if (i == 0) + { + *nblocks = 1; + return false; + } + + /* + * Otherwise we already have an I/O to perform, but this block + * can't be included as it is already valid. Split the I/O here. + * There may or may not be more blocks requiring I/O after this + * one, we haven't checked, but it can't be contiguous with this + * hit in the way. We'll leave this buffer pinned, forwarding it + * to the next call, avoiding the need to unpin it here and re-pin + * it in the next call. + */ + actual_nblocks = i; break; } else { - /* Extend the readable range to cover this block. */ - io_buffers_len++; - /* * Check how many blocks we can cover with the same IO. The smgr * implementation might e.g. be limited due to a segment boundary. @@ -1312,15 +1364,11 @@ StartReadBuffersImpl(ReadBuffersOperation *operation, } *nblocks = actual_nblocks; - if (likely(io_buffers_len == 0)) - return false; - /* Populate information needed for I/O. */ operation->buffers = buffers; operation->blocknum = blockNum; operation->flags = flags; operation->nblocks = actual_nblocks; - operation->io_buffers_len = io_buffers_len; if (flags & READ_BUFFERS_ISSUE_ADVICE) { @@ -1335,7 +1383,7 @@ StartReadBuffersImpl(ReadBuffersOperation *operation, smgrprefetch(operation->smgr, operation->forknum, blockNum, - operation->io_buffers_len); + actual_nblocks); } /* Indicate that WaitReadBuffers() should be called. */ @@ -1349,11 +1397,21 @@ StartReadBuffersImpl(ReadBuffersOperation *operation, * actual number, which may be fewer than requested. Caller sets some of the * members of operation; see struct definition. * + * The initial contents of the elements of buffers up to *nblocks should + * either be InvalidBuffer or an already-pinned buffer that was left by an + * preceding call to StartReadBuffers() that had to be split. On return, some + * elements of buffers may hold pinned buffers beyond the number indicated by + * the updated value of *nblocks. Operations are split on boundaries known to + * smgr (eg md.c segment boundaries that require crossing into a different + * underlying file), or when already cached blocks are found in the buffer + * that prevent the formation of a contiguous read. + * * If false is returned, no I/O is necessary. If true is returned, one I/O * has been started, and WaitReadBuffers() must be called with the same * operation object before the buffers are accessed. Along with the operation * object, the caller-supplied array of buffers must remain valid until - * WaitReadBuffers() is called. + * WaitReadBuffers() is called, and any forwarded buffers must also be + * preserved for a future call unless explicitly released. * * Currently the I/O is only started with optional operating system advice if * requested by the caller with READ_BUFFERS_ISSUE_ADVICE, and the real I/O @@ -1367,13 +1425,18 @@ StartReadBuffers(ReadBuffersOperation *operation, int *nblocks, int flags) { - return StartReadBuffersImpl(operation, buffers, blockNum, nblocks, flags); + return StartReadBuffersImpl(operation, buffers, blockNum, nblocks, flags, + true /* expect forwarded buffers */ ); } /* * Single block version of the StartReadBuffers(). This might save a few * instructions when called from another translation unit, because it is * specialized for nblocks == 1. + * + * This version does not support "forwarded" buffers: they cannot be created + * by reading only one block, and the current contents of *buffer is ignored + * on entry. */ bool StartReadBuffer(ReadBuffersOperation *operation, @@ -1384,7 +1447,8 @@ StartReadBuffer(ReadBuffersOperation *operation, int nblocks = 1; bool result; - result = StartReadBuffersImpl(operation, buffer, blocknum, &nblocks, flags); + result = StartReadBuffersImpl(operation, buffer, blocknum, &nblocks, flags, + false /* single block, no forwarding */ ); Assert(nblocks == 1); /* single block can't be short */ return result; @@ -1414,24 +1478,16 @@ WaitReadBuffers(ReadBuffersOperation *operation) IOObject io_object; char persistence; - /* - * Currently operations are only allowed to include a read of some range, - * with an optional extra buffer that is already pinned at the end. So - * nblocks can be at most one more than io_buffers_len. - */ - Assert((operation->nblocks == operation->io_buffers_len) || - (operation->nblocks == operation->io_buffers_len + 1)); - /* Find the range of the physical read we need to perform. */ - nblocks = operation->io_buffers_len; - if (nblocks == 0) - return; /* nothing to do */ - + nblocks = operation->nblocks; buffers = &operation->buffers[0]; blocknum = operation->blocknum; forknum = operation->forknum; persistence = operation->persistence; + Assert(nblocks > 0); + Assert(nblocks <= MAX_IO_COMBINE_LIMIT); + if (persistence == RELPERSISTENCE_TEMP) { io_context = IOCONTEXT_NORMAL; @@ -2097,43 +2153,67 @@ GetVictimBuffer(BufferAccessStrategy strategy, IOContext io_context) return buf; } +/* + * Return the maximum number of buffer than this backend should try to pin at + * once, to avoid pinning more than its fair share. This is the highest value + * that GetAdditionalPinLimit() and LimitAdditionalPins() could ever return. + * + * It's called a soft limit because nothing stops a backend from trying to + * acquire more pins than this if it needs them to make progress, but code that + * wants optional extra buffers for optimizations should respect this + * per-backend limit. + */ +uint32 +GetSoftPinLimit(void) +{ + return MaxProportionalPins; +} + +/* + * Return the maximum number of additional buffers that this backend should + * pin if it wants to stay under the per-backend soft limit, considering the + * number of buffers it has already pinned. Unlike LimitAdditionalPins(), the + * result can be zero, so the caller is expected to adjust it if required to + * make progress. + */ +uint32 +GetAdditionalPinLimit(void) +{ + uint32 estimated_pins_held; + + /* + * We get the number of "overflowed" pins for free, but don't know the + * number of pins in PrivateRefCountArray. The cost of calculating that + * exactly doesn't seem worth it, so just assume the max. + */ + estimated_pins_held = PrivateRefCountOverflowed + REFCOUNT_ARRAY_ENTRIES; + + /* Is this backend already holding more than its fair share? */ + if (estimated_pins_held > MaxProportionalPins) + return 0; + + return MaxProportionalPins - estimated_pins_held; +} + /* * Limit the number of pins a batch operation may additionally acquire, to * avoid running out of pinnable buffers. * - * One additional pin is always allowed, as otherwise the operation likely - * cannot be performed at all. - * - * The number of allowed pins for a backend is computed based on - * shared_buffers and the maximum number of connections possible. That's very - * pessimistic, but outside of toy-sized shared_buffers it should allow - * sufficient pins. + * One additional pin is always allowed, on the assumption that the operation + * requires at least one to make progress. */ void LimitAdditionalPins(uint32 *additional_pins) { - uint32 max_backends; - int max_proportional_pins; + uint32 limit; if (*additional_pins <= 1) return; - max_backends = MaxBackends + NUM_AUXILIARY_PROCS; - max_proportional_pins = NBuffers / max_backends; - - /* - * Subtract the approximate number of buffers already pinned by this - * backend. We get the number of "overflowed" pins for free, but don't - * know the number of pins in PrivateRefCountArray. The cost of - * calculating that exactly doesn't seem worth it, so just assume the max. - */ - max_proportional_pins -= PrivateRefCountOverflowed + REFCOUNT_ARRAY_ENTRIES; - - if (max_proportional_pins <= 0) - max_proportional_pins = 1; - - if (*additional_pins > max_proportional_pins) - *additional_pins = max_proportional_pins; + limit = GetAdditionalPinLimit(); + limit = Max(limit, 1); + if (limit < *additional_pins) + *additional_pins = limit; } /* @@ -3575,6 +3655,15 @@ InitBufferManagerAccess(void) { HASHCTL hash_ctl; + /* + * The soft limit on the number of pins each backend should respect, based + * on shared_buffers and the maximum number of connections possible. + * That's very pessimistic, but outside toy-sized shared_buffers it should + * allow plenty of pins. LimitAdditionalPins() or GetAdditionalPinLimit() + * can be used to check the remaining balance. + */ + MaxProportionalPins = NBuffers / (MaxBackends + NUM_AUXILIARY_PROCS); + memset(&PrivateRefCountArray, 0, sizeof(PrivateRefCountArray)); hash_ctl.keysize = sizeof(int32); diff --git a/src/backend/storage/buffer/localbuf.c b/src/backend/storage/buffer/localbuf.c index 80b83444eb2d..5378ba843162 100644 --- a/src/backend/storage/buffer/localbuf.c +++ b/src/backend/storage/buffer/localbuf.c @@ -286,6 +286,22 @@ GetLocalVictimBuffer(void) return BufferDescriptorGetBuffer(bufHdr); } +/* see GetSoftPinLimit() */ +uint32 +GetSoftLocalPinLimit(void) +{ + /* Every backend has its own temporary buffers, and can pin them all. */ + return num_temp_buffers; +} + +/* see GetAdditionalPinLimit() */ +uint32 +GetAdditionalLocalPinLimit(void) +{ + Assert(NLocalPinnedBuffers <= num_temp_buffers); + return num_temp_buffers - NLocalPinnedBuffers; +} + /* see LimitAdditionalPins() */ void LimitAdditionalLocalPins(uint32 *additional_pins) diff --git a/src/include/access/gin_private.h b/src/include/access/gin_private.h index 95d8805b66f2..aee1f70c22ee 100644 --- a/src/include/access/gin_private.h +++ b/src/include/access/gin_private.h @@ -354,7 +354,12 @@ typedef struct GinScanEntryData /* for a partial-match or full-scan query, we accumulate all TIDs here */ TIDBitmap *matchBitmap; TBMPrivateIterator *matchIterator; - TBMIterateResult *matchResult; + + /* + * If blockno is InvalidBlockNumber, all of the other fields in the + * matchResult are meaningless. + */ + TBMIterateResult matchResult; OffsetNumber matchOffsets[TBM_MAX_TUPLES_PER_PAGE]; int matchNtuples; diff --git a/src/include/access/tableam.h b/src/include/access/tableam.h index 131c050c15f1..b8cb1e744ad1 100644 --- a/src/include/access/tableam.h +++ b/src/include/access/tableam.h @@ -780,59 +780,23 @@ typedef struct TableAmRoutine */ /* - * Prepare to fetch / check / return tuples from `blockno` as part of a - * bitmap table scan. `scan` was started via table_beginscan_bm(). Return - * false if the bitmap is exhausted and true otherwise. - * - * This will typically read and pin the target block, and do the necessary - * work to allow scan_bitmap_next_tuple() to return tuples (e.g. it might - * make sense to perform tuple visibility checks at this time). - * - * `lossy_pages` and `exact_pages` are EXPLAIN counters that can be - * incremented by the table AM to indicate whether or not the block's - * representation in the bitmap is lossy. - * - * `recheck` is set by the table AM to indicate whether or not the tuples - * from this block should be rechecked. Tuples from lossy pages will - * always need to be rechecked, but some non-lossy pages' tuples may also - * require recheck. - * - * `blockno` is the current block and is set by the table AM. The table AM - * is responsible for advancing the main iterator, but the bitmap table - * scan code still advances the prefetch iterator. `blockno` is used by - * bitmap table scan code to validate that the prefetch block stays ahead - * of the current block. + * Fetch the next tuple of a bitmap table scan into `slot` and return true + * if a visible tuple was found, false otherwise. * - * XXX: Currently this may only be implemented if the AM uses md.c as its - * storage manager, and uses ItemPointer->ip_blkid in a manner that maps - * blockids directly to the underlying storage. nodeBitmapHeapscan.c - * performs prefetching directly using that interface. This probably - * needs to be rectified at a later point. + * `lossy_pages` is incremented if the bitmap is lossy for the selected + * page; otherwise, `exact_pages` is incremented. These are tracked for + * display in EXPLAIN ANALYZE output. * - * XXX: Currently this may only be implemented if the AM uses the - * visibilitymap, as nodeBitmapHeapscan.c unconditionally accesses it to - * perform prefetching. This probably needs to be rectified at a later - * point. + * Prefetching additional data from the bitmap is left to the table AM. * - * Optional callback, but either both scan_bitmap_next_block and - * scan_bitmap_next_tuple need to exist, or neither. + * This is an optional callback. */ - bool (*scan_bitmap_next_block) (TableScanDesc scan, - BlockNumber *blockno, + bool (*scan_bitmap_next_tuple) (TableScanDesc scan, + TupleTableSlot *slot, bool *recheck, uint64 *lossy_pages, uint64 *exact_pages); - /* - * Fetch the next tuple of a bitmap table scan into `slot` and return true - * if a visible tuple was found, false otherwise. - * - * Optional callback, but either both scan_bitmap_next_block and - * scan_bitmap_next_tuple need to exist, or neither. - */ - bool (*scan_bitmap_next_tuple) (TableScanDesc scan, - TupleTableSlot *slot); - /* * Prepare to fetch tuples from the next block in a sample scan. Return * false if the sample scan is finished, true otherwise. `scan` was @@ -1955,56 +1919,24 @@ table_relation_estimate_size(Relation rel, int32 *attr_widths, */ /* - * Prepare to fetch / check / return tuples as part of a bitmap table scan. - * `scan` needs to have been started via table_beginscan_bm(). Returns false - * if there are no more blocks in the bitmap, true otherwise. - * - * `lossy_pages` and `exact_pages` are EXPLAIN counters that can be - * incremented by the table AM to indicate whether or not the block's - * representation in the bitmap is lossy. + * Fetch / check / return tuples as part of a bitmap table scan. `scan` needs + * to have been started via table_beginscan_bm(). Fetch the next tuple of a + * bitmap table scan into `slot` and return true if a visible tuple was found, + * false otherwise. * - * `recheck` is set by the table AM to indicate whether or not the tuples - * from this block should be rechecked. + * `recheck` is set by the table AM to indicate whether or not the tuple in + * `slot` should be rechecked. Tuples from lossy pages will always need to be + * rechecked, but some non-lossy pages' tuples may also require recheck. * - * `blockno` is the current block and is set by the table AM and is used by - * bitmap table scan code to validate that the prefetch block stays ahead of - * the current block. - * - * Note, this is an optionally implemented function, therefore should only be - * used after verifying the presence (at plan time or such). + * `lossy_pages` is incremented if the block's representation in the bitmap is + * lossy; otherwise, `exact_pages` is incremented. */ static inline bool -table_scan_bitmap_next_block(TableScanDesc scan, - BlockNumber *blockno, +table_scan_bitmap_next_tuple(TableScanDesc scan, + TupleTableSlot *slot, bool *recheck, uint64 *lossy_pages, uint64 *exact_pages) -{ - /* - * We don't expect direct calls to table_scan_bitmap_next_block with valid - * CheckXidAlive for catalog or regular tables. See detailed comments in - * xact.c where these variables are declared. - */ - if (unlikely(TransactionIdIsValid(CheckXidAlive) && !bsysscan)) - elog(ERROR, "unexpected table_scan_bitmap_next_block call during logical decoding"); - - return scan->rs_rd->rd_tableam->scan_bitmap_next_block(scan, - blockno, recheck, - lossy_pages, - exact_pages); -} - -/* - * Fetch the next tuple of a bitmap table scan into `slot` and return true if - * a visible tuple was found, false otherwise. - * table_scan_bitmap_next_block() needs to previously have selected a - * block (i.e. returned true), and no previous - * table_scan_bitmap_next_tuple() for the same block may have - * returned false. - */ -static inline bool -table_scan_bitmap_next_tuple(TableScanDesc scan, - TupleTableSlot *slot) { /* * We don't expect direct calls to table_scan_bitmap_next_tuple with valid @@ -2015,7 +1947,10 @@ table_scan_bitmap_next_tuple(TableScanDesc scan, elog(ERROR, "unexpected table_scan_bitmap_next_tuple call during logical decoding"); return scan->rs_rd->rd_tableam->scan_bitmap_next_tuple(scan, - slot); + slot, + recheck, + lossy_pages, + exact_pages); } /* diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h index 575b0b1bd246..d4d4e655180c 100644 --- a/src/include/nodes/execnodes.h +++ b/src/include/nodes/execnodes.h @@ -1840,11 +1840,7 @@ typedef enum /* ---------------- * ParallelBitmapHeapState information * tbmiterator iterator for scanning current pages - * prefetch_iterator iterator for prefetching ahead of current page - * mutex mutual exclusion for the prefetching variable - * and state - * prefetch_pages # pages prefetch iterator is ahead of current - * prefetch_target current target prefetch distance + * mutex mutual exclusion for state * state current state of the TIDBitmap * cv conditional wait variable * ---------------- @@ -1852,10 +1848,7 @@ typedef enum typedef struct ParallelBitmapHeapState { dsa_pointer tbmiterator; - dsa_pointer prefetch_iterator; slock_t mutex; - int prefetch_pages; - int prefetch_target; SharedBitmapState state; ConditionVariable cv; } ParallelBitmapHeapState; @@ -1879,18 +1872,11 @@ typedef struct SharedBitmapHeapInstrumentation * * bitmapqualorig execution state for bitmapqualorig expressions * tbm bitmap obtained from child index scan(s) - * pvmbuffer buffer for visibility-map lookups of prefetched pages * stats execution statistics - * prefetch_iterator iterator for prefetching ahead of current page - * prefetch_pages # pages prefetch iterator is ahead of current - * prefetch_target current target prefetch distance - * prefetch_maximum maximum value for prefetch_target * initialized is node is ready to iterate * pstate shared state for parallel bitmap scan * sinstrument statistics for parallel workers * recheck do current page's tuples need recheck - * blockno used to validate pf and current block stay in sync - * prefetch_blockno used to validate pf stays ahead of current block * ---------------- */ typedef struct BitmapHeapScanState @@ -1898,18 +1884,11 @@ typedef struct BitmapHeapScanState ScanState ss; /* its first field is NodeTag */ ExprState *bitmapqualorig; TIDBitmap *tbm; - Buffer pvmbuffer; BitmapHeapScanInstrumentation stats; - TBMIterator prefetch_iterator; - int prefetch_pages; - int prefetch_target; - int prefetch_maximum; bool initialized; ParallelBitmapHeapState *pstate; SharedBitmapHeapInstrumentation *sinstrument; bool recheck; - BlockNumber blockno; - BlockNumber prefetch_blockno; } BitmapHeapScanState; /* ---------------- diff --git a/src/include/nodes/tidbitmap.h b/src/include/nodes/tidbitmap.h index e185635c10be..99f795ceab52 100644 --- a/src/include/nodes/tidbitmap.h +++ b/src/include/nodes/tidbitmap.h @@ -101,8 +101,8 @@ extern bool tbm_is_empty(const TIDBitmap *tbm); extern TBMPrivateIterator *tbm_begin_private_iterate(TIDBitmap *tbm); extern dsa_pointer tbm_prepare_shared_iterate(TIDBitmap *tbm); -extern TBMIterateResult *tbm_private_iterate(TBMPrivateIterator *iterator); -extern TBMIterateResult *tbm_shared_iterate(TBMSharedIterator *iterator); +extern bool tbm_private_iterate(TBMPrivateIterator *iterator, TBMIterateResult *tbmres); +extern bool tbm_shared_iterate(TBMSharedIterator *iterator, TBMIterateResult *tbmres); extern void tbm_end_private_iterate(TBMPrivateIterator *iterator); extern void tbm_end_shared_iterate(TBMSharedIterator *iterator); extern TBMSharedIterator *tbm_attach_shared_iterate(dsa_area *dsa, @@ -113,7 +113,7 @@ extern TBMIterator tbm_begin_iterate(TIDBitmap *tbm, dsa_area *dsa, dsa_pointer dsp); extern void tbm_end_iterate(TBMIterator *iterator); -extern TBMIterateResult *tbm_iterate(TBMIterator *iterator); +extern bool tbm_iterate(TBMIterator *iterator, TBMIterateResult *tbmres); static inline bool tbm_exhausted(TBMIterator *iterator) diff --git a/src/include/storage/bufmgr.h b/src/include/storage/bufmgr.h index b204e4731c18..307f36af3849 100644 --- a/src/include/storage/bufmgr.h +++ b/src/include/storage/bufmgr.h @@ -130,7 +130,6 @@ struct ReadBuffersOperation BlockNumber blocknum; int flags; int16 nblocks; - int16 io_buffers_len; }; typedef struct ReadBuffersOperation ReadBuffersOperation; @@ -290,6 +289,10 @@ extern bool HoldingBufferPinThatDelaysRecovery(void); extern bool BgBufferSync(struct WritebackContext *wb_context); +extern uint32 GetSoftPinLimit(void); +extern uint32 GetSoftLocalPinLimit(void); +extern uint32 GetAdditionalPinLimit(void); +extern uint32 GetAdditionalLocalPinLimit(void); extern void LimitAdditionalPins(uint32 *additional_pins); extern void LimitAdditionalLocalPins(uint32 *additional_pins);