From 94d0f92c807895e6edadf583a06bb39c5dc52a4c Mon Sep 17 00:00:00 2001 From: Amit Langote Date: Tue, 27 Jan 2026 14:07:55 +0900 Subject: [PATCH v5 2/5] SeqScan: add batch-driven variants returning slots Teach SeqScan to drive the table AM via the new batch API added in the previous commit, while still returning one TupleTableSlot at a time to callers. This reduces per tuple AM crossings without changing the node interface seen by parents. Add TupleBatch and supporting code in execBatch.c/h to hold executor side batching state. PlanState gains ps_Batch to carry the active TupleBatch when a node supports batching. Add executor_batch_rows GUC to specify the maximum number of rows that can be added into a batch. Wire up runtime selection in ExecInitSeqScan using ScanCanUseBatching(). When executor_batch_rows > 1, EPQ is inactive, the scan is not backward, and the relation supports batching, ps.ExecProcNode is set to a batch-driven variant. Otherwise the non-batch path is used. Plan shape and EXPLAIN output remain unchanged; only the internal tuple flow differs when batching is enabled and allowed. Notes / current limits: - With the current heapam, batches are composed from a single page, so the batch may not always be full. Future work may let SeqScan and/or AMs top up batches across pages when safe to do so. Reviewed-by: Daniil Davydov <3danissimo@gmail.com> Reviewed-by: ChangAo Chen <2624345507@qq.com> Discussion: https://postgr.es/m/CA+HiwqFfAY_ZFqN8wcAEMw71T9hM_kA8UtyHaZZEZtuT3UyogA@mail.gmail.com --- src/backend/access/heap/heapam.c | 28 ++++ src/backend/access/heap/heapam_handler.c | 16 ++ src/backend/access/table/tableam.c | 11 ++ src/backend/executor/Makefile | 1 + src/backend/executor/execBatch.c | 112 ++++++++++++++ src/backend/executor/execScan.c | 31 ++++ src/backend/executor/meson.build | 1 + src/backend/executor/nodeSeqscan.c | 176 +++++++++++++++++++++- src/backend/utils/init/globals.c | 3 + src/backend/utils/misc/guc_parameters.dat | 9 ++ src/include/access/heapam.h | 1 + src/include/access/tableam.h | 27 ++++ src/include/executor/execBatch.h | 99 ++++++++++++ src/include/executor/execScan.h | 69 +++++++++ src/include/executor/executor.h | 4 + src/include/miscadmin.h | 1 + src/include/nodes/execnodes.h | 4 + 17 files changed, 592 insertions(+), 1 deletion(-) create mode 100644 src/backend/executor/execBatch.c create mode 100644 src/include/executor/execBatch.h diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c index d8d1bdf5191..db91085b07c 100644 --- a/src/backend/access/heap/heapam.c +++ b/src/backend/access/heap/heapam.c @@ -1644,6 +1644,34 @@ heap_begin_batch(TableScanDesc sscan, int maxitems) return hb; } +/* + * heap_scan_materialize_all + * + * Bind all tuples of the current batch into 'slots'. We bind the + * HeapTupleData header that points into the pinned page. No per-row copy. + */ +void +heap_materialize_batch_all(void *am_batch, TupleTableSlot **slots, int n) +{ + HeapBatch *hb = (HeapBatch *) am_batch; + + Assert(n <= hb->nitems); + + for (int i = 0; i < n; i++) + { + HeapTupleData *tuple = &hb->tupdata[i]; + HeapTupleTableSlot *slot = (HeapTupleTableSlot *) slots[i]; + + /* Inline of ExecStoreHeapTuple(tuple, slot, false) */ + slot->tuple = tuple; + slot->off = 0; + slot->base.tts_nvalid = 0; + slot->base.tts_flags &= ~(TTS_FLAG_EMPTY | TTS_FLAG_SHOULDFREE); + slot->base.tts_tid = tuple->t_self; + slot->base.tts_tableOid = tuple->t_tableOid; + } +} + /* * heap_scan_end_batch * diff --git a/src/backend/access/heap/heapam_handler.c b/src/backend/access/heap/heapam_handler.c index e4cf7fc296b..0f6bda7b69f 100644 --- a/src/backend/access/heap/heapam_handler.c +++ b/src/backend/access/heap/heapam_handler.c @@ -72,6 +72,21 @@ heapam_slot_callbacks(Relation relation) return &TTSOpsBufferHeapTuple; } +/* ------------------------------------------------------------------------ + * TupleBatch related callbacks for heap AM + * ------------------------------------------------------------------------ + */ + +static const TupleBatchOps TupleBatchHeapOps = +{ + .materialize_all = heap_materialize_batch_all +}; + +static const TupleBatchOps * +heapam_batch_callbacks(Relation relation) +{ + return &TupleBatchHeapOps; +} /* ------------------------------------------------------------------------ * Index Scan Callbacks for heap AM @@ -2631,6 +2646,7 @@ static const TableAmRoutine heapam_methods = { .type = T_TableAmRoutine, .slot_callbacks = heapam_slot_callbacks, + .batch_callbacks = heapam_batch_callbacks, .scan_begin = heap_beginscan, .scan_end = heap_endscan, diff --git a/src/backend/access/table/tableam.c b/src/backend/access/table/tableam.c index 87491796523..ffb3b738f6a 100644 --- a/src/backend/access/table/tableam.c +++ b/src/backend/access/table/tableam.c @@ -103,6 +103,17 @@ table_slot_create(Relation relation, List **reglist) return slot; } +/* ---------------------------------------------------------------------------- + * TupleBatch support routines + * ---------------------------------------------------------------------------- + */ +const TupleBatchOps * +table_batch_callbacks(Relation relation) +{ + if (relation->rd_tableam) + return relation->rd_tableam->batch_callbacks(relation); + elog(ERROR, "relation does not support TupleBatch operations"); +} /* ---------------------------------------------------------------------------- * Table scan functions. diff --git a/src/backend/executor/Makefile b/src/backend/executor/Makefile index 11118d0ce02..3e72f3fe03c 100644 --- a/src/backend/executor/Makefile +++ b/src/backend/executor/Makefile @@ -15,6 +15,7 @@ include $(top_builddir)/src/Makefile.global OBJS = \ execAmi.o \ execAsync.o \ + execBatch.o \ execCurrent.o \ execExpr.o \ execExprInterp.o \ diff --git a/src/backend/executor/execBatch.c b/src/backend/executor/execBatch.c new file mode 100644 index 00000000000..1ef4117b87c --- /dev/null +++ b/src/backend/executor/execBatch.c @@ -0,0 +1,112 @@ +/*------------------------------------------------------------------------- + * + * execBatch.c + * Helpers for TupleBatch + * + * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * IDENTIFICATION + * src/backend/executor/execBatch.c + *------------------------------------------------------------------------- + */ + +#include "postgres.h" +#include "executor/execBatch.h" + +/* + * TupleBatchCreate + * Allocate and initialize a new TupleBatch envelope. + */ +TupleBatch * +TupleBatchCreate(TupleDesc scandesc, int capacity) +{ + TupleBatch *b; + TupleTableSlot **inslots, + **outslots; + Size alloc_size; + + /* Single allocation for TupleBatch + inslots + outslots arrays */ + alloc_size = sizeof(TupleBatch) + 2 * sizeof(TupleTableSlot *) * capacity; + b = palloc(alloc_size); + inslots = (TupleTableSlot **) ((char *) b + sizeof(TupleBatch)); + outslots = (TupleTableSlot **) ((char *) b + sizeof(TupleBatch) + + sizeof(TupleTableSlot *) * capacity); + + for (int i = 0; i < capacity; i++) + inslots[i] = MakeSingleTupleTableSlot(scandesc, &TTSOpsHeapTuple); + + /* Initial state: empty envelope */ + b->am_payload = NULL; + b->ntuples = 0; + b->inslots = inslots; + b->outslots = outslots; + b->activeslots = NULL; + b->maxslots = capacity; + + b->nvalid = 0; + b->next = 0; + + return b; +} + +/* + * TupleBatchReset + * Reset an existing TupleBatch envelope to empty. + */ +void +TupleBatchReset(TupleBatch *b, bool drop_slots) +{ + Assert(b != NULL); + + for (int i = 0; i < b->maxslots; i++) + { + ExecClearTuple(b->inslots[i]); + if (drop_slots) + ExecDropSingleTupleTableSlot(b->inslots[i]); + } + + b->ntuples = 0; + b->nvalid = 0; + b->next = 0; + b->activeslots = NULL; +} + +void +TupleBatchUseInput(TupleBatch *b, int nvalid) +{ + b->materialized = true; + b->activeslots = b->inslots; + b->nvalid = nvalid; + b->next = 0; +} + +void +TupleBatchUseOutput(TupleBatch *b, int nvalid) +{ + b->materialized = true; + b->activeslots = b->outslots; + b->nvalid = nvalid; + b->next = 0; +} + +bool +TupleBatchIsValid(TupleBatch *b) +{ + return b != NULL && + b->maxslots > 0 && + b->inslots != NULL && + b->outslots != NULL; +} + +void +TupleBatchRewind(TupleBatch *b) +{ + b->next = 0; +} + +int +TupleBatchGetNumValid(TupleBatch *b) +{ + return b->nvalid; +} diff --git a/src/backend/executor/execScan.c b/src/backend/executor/execScan.c index 9f68be17b99..5023eb6756a 100644 --- a/src/backend/executor/execScan.c +++ b/src/backend/executor/execScan.c @@ -18,6 +18,7 @@ */ #include "postgres.h" +#include "access/tableam.h" #include "executor/executor.h" #include "executor/execScan.h" #include "miscadmin.h" @@ -154,3 +155,33 @@ ExecScanReScan(ScanState *node) } } } + +bool +ScanCanUseBatching(ScanState *scanstate, int eflags) +{ + Relation relation = scanstate->ss_currentRelation; + + return executor_batch_rows > 1 && + (scanstate->ps.state->es_epq_active == NULL) && + !(eflags & EXEC_FLAG_BACKWARD) && + relation && table_supports_batching(relation); +} + +void +ScanResetBatching(ScanState *scanstate, bool drop) +{ + TupleBatch *b = scanstate->ps.ps_Batch; + + if (b) + { + TupleBatchReset(b, drop); + if (b->am_payload) + { + table_scan_end_batch(scanstate->ss_currentScanDesc, + b->am_payload); + b->am_payload = NULL; + } + if (drop) + pfree(b); + } +} diff --git a/src/backend/executor/meson.build b/src/backend/executor/meson.build index dc45be0b2ce..e5af90e3a0f 100644 --- a/src/backend/executor/meson.build +++ b/src/backend/executor/meson.build @@ -3,6 +3,7 @@ backend_sources += files( 'execAmi.c', 'execAsync.c', + 'execBatch.c', 'execCurrent.c', 'execExpr.c', 'execExprInterp.c', diff --git a/src/backend/executor/nodeSeqscan.c b/src/backend/executor/nodeSeqscan.c index af3c788ce8b..08d93e6f0be 100644 --- a/src/backend/executor/nodeSeqscan.c +++ b/src/backend/executor/nodeSeqscan.c @@ -203,6 +203,171 @@ ExecSeqScanEPQ(PlanState *pstate) (ExecScanRecheckMtd) SeqRecheck); } +/* ---------------------------------------------------------------- + * Batch Support + * ---------------------------------------------------------------- + */ +static bool +SeqNextBatch(SeqScanState *node) +{ + TableScanDesc scandesc; + EState *estate; + ScanDirection direction; + + Assert(node->ss.ps.ps_Batch != NULL); + + /* + * get information from the estate and scan state + */ + scandesc = node->ss.ss_currentScanDesc; + estate = node->ss.ps.state; + direction = estate->es_direction; + Assert(ScanDirectionIsForward(direction)); + + if (scandesc == NULL) + { + /* + * We reach here if the scan is not parallel, or if we're serially + * executing a scan that was planned to be parallel. + */ + scandesc = table_beginscan(node->ss.ss_currentRelation, + estate->es_snapshot, + 0, NULL); + node->ss.ss_currentScanDesc = scandesc; + } + + /* Lazily create the AM batch payload. */ + if (node->ss.ps.ps_Batch->am_payload == NULL) + { + const TableAmRoutine *tam PG_USED_FOR_ASSERTS_ONLY = scandesc->rs_rd->rd_tableam; + + Assert(tam && tam->scan_begin_batch); + node->ss.ps.ps_Batch->am_payload = + table_scan_begin_batch(scandesc, node->ss.ps.ps_Batch->maxslots); + node->ss.ps.ps_Batch->ops = table_batch_callbacks(node->ss.ss_currentRelation); + } + + node->ss.ps.ps_Batch->ntuples = + table_scan_getnextbatch(scandesc, node->ss.ps.ps_Batch->am_payload, direction); + node->ss.ps.ps_Batch->nvalid = node->ss.ps.ps_Batch->ntuples; + node->ss.ps.ps_Batch->materialized = false; + + return node->ss.ps.ps_Batch->ntuples > 0; +} + +static bool +SeqNextBatchMaterialize(SeqScanState *node) +{ + if (SeqNextBatch(node)) + { + TupleBatchMaterializeAll(node->ss.ps.ps_Batch); + return true; + } + + return false; +} + +static TupleTableSlot * +ExecSeqScanBatchSlot(PlanState *pstate) +{ + SeqScanState *node = castNode(SeqScanState, pstate); + + Assert(pstate->state->es_epq_active == NULL); + Assert(pstate->qual == NULL); + Assert(pstate->ps_ProjInfo == NULL); + + return ExecScanExtendedBatchSlot(&node->ss, + (ExecScanAccessBatchMtd) SeqNextBatchMaterialize, + NULL, NULL); +} + +static TupleTableSlot * +ExecSeqScanBatchSlotWithQual(PlanState *pstate) +{ + SeqScanState *node = castNode(SeqScanState, pstate); + + /* + * Use pg_assume() for != NULL tests to make the compiler realize no + * runtime check for the field is needed in ExecScanExtended(). + */ + Assert(pstate->state->es_epq_active == NULL); + pg_assume(pstate->qual != NULL); + Assert(pstate->ps_ProjInfo == NULL); + + return ExecScanExtendedBatchSlot(&node->ss, + (ExecScanAccessBatchMtd) SeqNextBatchMaterialize, + pstate->qual, NULL); +} + +/* + * Variant of ExecSeqScan() but when projection is required. + */ +static TupleTableSlot * +ExecSeqScanBatchSlotWithProject(PlanState *pstate) +{ + SeqScanState *node = castNode(SeqScanState, pstate); + + Assert(pstate->state->es_epq_active == NULL); + Assert(pstate->qual == NULL); + pg_assume(pstate->ps_ProjInfo != NULL); + + return ExecScanExtendedBatchSlot(&node->ss, + (ExecScanAccessBatchMtd) SeqNextBatchMaterialize, + NULL, pstate->ps_ProjInfo); +} + +/* + * Variant of ExecSeqScan() but when qual evaluation and projection are + * required. + */ +static TupleTableSlot * +ExecSeqScanBatchSlotWithQualProject(PlanState *pstate) +{ + SeqScanState *node = castNode(SeqScanState, pstate); + + Assert(pstate->state->es_epq_active == NULL); + pg_assume(pstate->qual != NULL); + pg_assume(pstate->ps_ProjInfo != NULL); + + return ExecScanExtendedBatchSlot(&node->ss, + (ExecScanAccessBatchMtd) SeqNextBatchMaterialize, + pstate->qual, pstate->ps_ProjInfo); +} + +/* Batch SeqScan enablement and dispatch */ +static void +SeqScanInitBatching(SeqScanState *scanstate, int eflags) +{ + const int cap = executor_batch_rows; + TupleDesc scandesc = RelationGetDescr(scanstate->ss.ss_currentRelation); + + scanstate->ss.ps.ps_Batch = TupleBatchCreate(scandesc, cap); + + /* Choose batch variant to preserve your specialization matrix */ + if (scanstate->ss.ps.qual == NULL) + { + if (scanstate->ss.ps.ps_ProjInfo == NULL) + { + scanstate->ss.ps.ExecProcNode = ExecSeqScanBatchSlot; + } + else + { + scanstate->ss.ps.ExecProcNode = ExecSeqScanBatchSlotWithProject; + } + } + else + { + if (scanstate->ss.ps.ps_ProjInfo == NULL) + { + scanstate->ss.ps.ExecProcNode = ExecSeqScanBatchSlotWithQual; + } + else + { + scanstate->ss.ps.ExecProcNode = ExecSeqScanBatchSlotWithQualProject; + } + } +} + /* ---------------------------------------------------------------- * ExecInitSeqScan * ---------------------------------------------------------------- @@ -211,6 +376,7 @@ SeqScanState * ExecInitSeqScan(SeqScan *node, EState *estate, int eflags) { SeqScanState *scanstate; + bool use_batching; /* * Once upon a time it was possible to have an outerPlan of a SeqScan, but @@ -241,9 +407,12 @@ ExecInitSeqScan(SeqScan *node, EState *estate, int eflags) node->scan.scanrelid, eflags); + use_batching = ScanCanUseBatching(&scanstate->ss, eflags); + /* and create slot with the appropriate rowtype */ ExecInitScanTupleSlot(estate, &scanstate->ss, RelationGetDescr(scanstate->ss.ss_currentRelation), + use_batching ? &TTSOpsHeapTuple : table_slot_callbacks(scanstate->ss.ss_currentRelation)); /* @@ -280,6 +449,9 @@ ExecInitSeqScan(SeqScan *node, EState *estate, int eflags) scanstate->ss.ps.ExecProcNode = ExecSeqScanWithQualProject; } + if (use_batching) + SeqScanInitBatching(scanstate, eflags); + return scanstate; } @@ -299,6 +471,8 @@ ExecEndSeqScan(SeqScanState *node) */ scanDesc = node->ss.ss_currentScanDesc; + ScanResetBatching(&node->ss, true); + /* * close heap scan */ @@ -327,7 +501,7 @@ ExecReScanSeqScan(SeqScanState *node) if (scan != NULL) table_rescan(scan, /* scan desc */ NULL); /* new scan keys */ - + ScanResetBatching(&node->ss, false); ExecScanReScan((ScanState *) node); } diff --git a/src/backend/utils/init/globals.c b/src/backend/utils/init/globals.c index 36ad708b360..535e29d7823 100644 --- a/src/backend/utils/init/globals.c +++ b/src/backend/utils/init/globals.c @@ -165,3 +165,6 @@ int notify_buffers = 16; int serializable_buffers = 32; int subtransaction_buffers = 0; int transaction_buffers = 0; + +/* executor batching */ +int executor_batch_rows = 64; diff --git a/src/backend/utils/misc/guc_parameters.dat b/src/backend/utils/misc/guc_parameters.dat index f0260e6e412..4c422c854d0 100644 --- a/src/backend/utils/misc/guc_parameters.dat +++ b/src/backend/utils/misc/guc_parameters.dat @@ -1004,6 +1004,15 @@ boot_val => 'true', }, +{ name => 'executor_batch_rows', type => 'int', context => 'PGC_USERSET', group => 'DEVELOPER_OPTIONS', + short_desc => 'Number of rows to include in batches during execution.', + flags => 'GUC_NOT_IN_SAMPLE', + variable => 'executor_batch_rows', + boot_val => '64', + min => '0', + max => '1024', +}, + { name => 'exit_on_error', type => 'bool', context => 'PGC_USERSET', group => 'ERROR_HANDLING_OPTIONS', short_desc => 'Terminate session on any error.', variable => 'ExitOnAnyError', diff --git a/src/include/access/heapam.h b/src/include/access/heapam.h index e2417650c5f..d6154d5ab15 100644 --- a/src/include/access/heapam.h +++ b/src/include/access/heapam.h @@ -354,6 +354,7 @@ extern bool heap_getnextslot(TableScanDesc sscan, extern void *heap_begin_batch(TableScanDesc sscan, int maxitems); extern void heap_end_batch(TableScanDesc sscan, void *am_batch); extern int heap_getnextbatch(TableScanDesc sscan, void *am_batch, ScanDirection dir); +extern void heap_materialize_batch_all(void *am_batch, TupleTableSlot **slots, int n); extern void heap_set_tidrange(TableScanDesc sscan, ItemPointer mintid, ItemPointer maxtid); diff --git a/src/include/access/tableam.h b/src/include/access/tableam.h index 584b580f7a1..bdf733c8b22 100644 --- a/src/include/access/tableam.h +++ b/src/include/access/tableam.h @@ -21,6 +21,7 @@ #include "access/sdir.h" #include "access/xact.h" #include "commands/vacuum.h" +#include "executor/execBatch.h" #include "executor/tuptable.h" #include "storage/read_stream.h" #include "utils/rel.h" @@ -39,6 +40,7 @@ typedef struct BulkInsertStateData BulkInsertStateData; typedef struct IndexInfo IndexInfo; typedef struct SampleScanState SampleScanState; typedef struct ValidateIndexState ValidateIndexState; +typedef struct TupleBatchOps TupleBatchOps; /* * Bitmask values for the flags argument to the scan_begin callback. @@ -301,6 +303,7 @@ typedef struct TableAmRoutine * Return slot implementation suitable for storing a tuple of this AM. */ const TupleTableSlotOps *(*slot_callbacks) (Relation rel); + const TupleBatchOps *(*batch_callbacks)(Relation rel); /* ------------------------------------------------------------------------ @@ -361,6 +364,7 @@ typedef struct TableAmRoutine ScanDirection dir); void (*scan_end_batch)(TableScanDesc sscan, void *am_batch); + /*----------- * Optional functions to provide scanning for ranges of ItemPointers. * Implementations must either provide both of these functions, or neither @@ -872,6 +876,16 @@ extern const TupleTableSlotOps *table_slot_callbacks(Relation relation); */ extern TupleTableSlot *table_slot_create(Relation relation, List **reglist); +/* ---------------------------------------------------------------------------- + * TupleBatch functions. + * ---------------------------------------------------------------------------- + */ + +/* + * Returns callbacks for manipulating TupleBatch for tuples of the given + * relation. + */ +extern const TupleBatchOps *table_batch_callbacks(Relation relation); /* ---------------------------------------------------------------------------- * Table scan functions. @@ -1046,6 +1060,18 @@ table_scan_getnextslot(TableScanDesc sscan, ScanDirection direction, TupleTableS return sscan->rs_rd->rd_tableam->scan_getnextslot(sscan, direction, slot); } +/* + * table_supports_batching + * Does the relation's AM support batching? + */ +static inline bool +table_supports_batching(Relation relation) +{ + const TableAmRoutine *tam = relation->rd_tableam; + + return tam->scan_getnextbatch != NULL; +} + /* * table_scan_begin_batch * Allocate AM-owned batch payload with capacity 'maxitems'. @@ -2128,5 +2154,6 @@ extern const TableAmRoutine *GetTableAmRoutine(Oid amhandler); */ extern const TableAmRoutine *GetHeapamTableAmRoutine(void); +extern struct TupleBatchOps *GetHeapamTupleBatchOps(void); #endif /* TABLEAM_H */ diff --git a/src/include/executor/execBatch.h b/src/include/executor/execBatch.h new file mode 100644 index 00000000000..2d0066103ce --- /dev/null +++ b/src/include/executor/execBatch.h @@ -0,0 +1,99 @@ +/*------------------------------------------------------------------------- + * + * execBatch.h + * Executor batch envelope for passing tuple batch state upward + * + * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * IDENTIFICATION + * src/include/executor/execBatch.h + *------------------------------------------------------------------------- + */ +#ifndef EXECBATCH_H +#define EXECBATCH_H + +#include "executor/tuptable.h" + +/* + * TupleBatchOps -- AM-specific helpers for lazy materialization. + */ +typedef struct TupleBatchOps +{ + void (*materialize_all)(void *am_payload, + TupleTableSlot **dst, + int maxslots); +} TupleBatchOps; + +/* + * TupleBatch + * + * Envelope for a batch of tuples produced by a plan node (e.g., SeqScan) per + * call to a batch variant of ExecSeqScan(). + */ +typedef struct TupleBatch +{ + void *am_payload; + const TupleBatchOps *ops; + int ntuples; /* number of tuples in am_payload */ + bool materialized; /* tuples in slots valid? */ + struct TupleTableSlot **inslots; /* slots for tuples read "into" batch */ + struct TupleTableSlot **outslots; /* slots for tuples going "out of" + * batch */ + struct TupleTableSlot **activeslots; + int maxslots; + + int nvalid; /* number of returnable tuples in outslots */ + int next; /* 0-based index of next tuple to be returned */ +} TupleBatch; + + +/* Helpers */ +extern TupleBatch *TupleBatchCreate(TupleDesc scandesc, int capacity); +extern void TupleBatchReset(TupleBatch *b, bool drop_slots); +extern void TupleBatchUseInput(TupleBatch *b, int nvalid); +extern void TupleBatchUseOutput(TupleBatch *b, int nvalid); +extern bool TupleBatchIsValid(TupleBatch *b); +extern void TupleBatchRewind(TupleBatch *b); +extern int TupleBatchGetNumValid(TupleBatch *b); + +static inline TupleTableSlot * +TupleBatchGetNextSlot(TupleBatch *b) +{ + return b->next < b->nvalid ? b->activeslots[b->next++] : NULL; +} + +static inline TupleTableSlot * +TupleBatchGetSlot(TupleBatch *b, int index) +{ + Assert(index < b->nvalid); + return b->activeslots[index]; +} + +static inline void +TupleBatchStoreInOut(TupleBatch *b, int index, TupleTableSlot *out) +{ + Assert(TupleBatchIsValid(b)); + b->outslots[index] = out; +} + +static inline bool +TupleBatchHasMore(TupleBatch *b) +{ + return b->activeslots && b->next < b->nvalid; +} + +static inline void +TupleBatchMaterializeAll(TupleBatch *b) +{ + if (b->materialized) + return; + + if (b->ops == NULL || b->ops->materialize_all == NULL) + elog(ERROR, "TupleBatch has no slots and no materialize_all op"); + + b->ops->materialize_all(b->am_payload, b->inslots, b->ntuples); + TupleBatchUseInput(b, b->ntuples); +} + +#endif /* EXECBATCH_H */ diff --git a/src/include/executor/execScan.h b/src/include/executor/execScan.h index 028edb8d9fd..d9185331e22 100644 --- a/src/include/executor/execScan.h +++ b/src/include/executor/execScan.h @@ -251,4 +251,73 @@ ExecScanExtended(ScanState *node, } } +/* + * ExecScanExtendedBatchSlot + * Batch-driven variant of ExecScanExtended. + * + * Returns one tuple at a time to callers, but internally fetches tuples + * in batches from the AM via accessBatchMtd. This reduces per-tuple AM + * call overhead while preserving the single-slot interface expected by + * parent nodes. + * + * The batch is refilled when exhausted by calling accessBatchMtd, which + * returns false at end-of-scan. + * + * Note: EPQ is not supported in the batch path; callers must ensure + * es_epq_active is NULL before using this function. + */ +static inline TupleTableSlot * +ExecScanExtendedBatchSlot(ScanState *node, + ExecScanAccessBatchMtd accessBatchMtd, + ExprState *qual, ProjectionInfo *projInfo) +{ + ExprContext *econtext = node->ps.ps_ExprContext; + TupleBatch *b = node->ps.ps_Batch; + + /* Batch path does not support EPQ */ + Assert(node->ps.state->es_epq_active == NULL); + Assert(TupleBatchIsValid(b)); + + for (;;) + { + TupleTableSlot *in; + + CHECK_FOR_INTERRUPTS(); + + /* Get next input slot from current batch, or refill */ + if (!TupleBatchHasMore(b)) + { + if (!accessBatchMtd(node)) + return NULL; + } + + in = TupleBatchGetNextSlot(b); + Assert(in); + + /* No qual, no projection: direct return */ + if (qual == NULL && projInfo == NULL) + return in; + + ResetExprContext(econtext); + econtext->ecxt_scantuple = in; + + /* Qual only */ + if (projInfo == NULL) + { + if (qual == NULL || ExecQual(qual, econtext)) + return in; + else + InstrCountFiltered1(node, 1); + continue; + } + + /* Projection (with or without qual) */ + if (qual == NULL || ExecQual(qual, econtext)) + return ExecProject(projInfo); + else + InstrCountFiltered1(node, 1); + /* else try next tuple */ + } +} + #endif /* EXECSCAN_H */ diff --git a/src/include/executor/executor.h b/src/include/executor/executor.h index 5929aabc353..e82fd6c0c8a 100644 --- a/src/include/executor/executor.h +++ b/src/include/executor/executor.h @@ -578,12 +578,16 @@ extern Datum ExecMakeFunctionResultSet(SetExprState *fcache, */ typedef TupleTableSlot *(*ExecScanAccessMtd) (ScanState *node); typedef bool (*ExecScanRecheckMtd) (ScanState *node, TupleTableSlot *slot); +typedef bool (*ExecScanAccessBatchMtd)(ScanState *node); extern TupleTableSlot *ExecScan(ScanState *node, ExecScanAccessMtd accessMtd, ExecScanRecheckMtd recheckMtd); + extern void ExecAssignScanProjectionInfo(ScanState *node); extern void ExecAssignScanProjectionInfoWithVarno(ScanState *node, int varno); extern void ExecScanReScan(ScanState *node); +extern bool ScanCanUseBatching(ScanState *scanstate, int eflags); +extern void ScanResetBatching(ScanState *scanstate, bool drop); /* * prototypes from functions in execTuples.c diff --git a/src/include/miscadmin.h b/src/include/miscadmin.h index db559b39c4d..f6bd59f2af1 100644 --- a/src/include/miscadmin.h +++ b/src/include/miscadmin.h @@ -288,6 +288,7 @@ extern PGDLLIMPORT double VacuumCostDelay; extern PGDLLIMPORT int VacuumCostBalance; extern PGDLLIMPORT bool VacuumCostActive; +extern PGDLLIMPORT int executor_batch_rows; /* in utils/misc/stack_depth.c */ diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h index f8053d9e572..6a191202ced 100644 --- a/src/include/nodes/execnodes.h +++ b/src/include/nodes/execnodes.h @@ -31,6 +31,7 @@ #include "access/skey.h" #include "access/tupconvert.h" +#include "executor/execBatch.h" #include "executor/instrument.h" #include "executor/instrument_node.h" #include "fmgr.h" @@ -1206,6 +1207,9 @@ typedef struct PlanState ExprContext *ps_ExprContext; /* node's expression-evaluation context */ ProjectionInfo *ps_ProjInfo; /* info for doing tuple projection */ + /* Batching state if node supports it. */ + TupleBatch *ps_Batch; + bool async_capable; /* true if node is async-capable */ /* -- 2.47.3