diff options
Diffstat (limited to 'src/backend/commands')
| -rw-r--r-- | src/backend/commands/vacuum.c | 78 | ||||
| -rw-r--r-- | src/backend/commands/vacuumparallel.c | 100 |
2 files changed, 71 insertions, 107 deletions
diff --git a/src/backend/commands/vacuum.c b/src/backend/commands/vacuum.c index e63c86cae45..b589279d49f 100644 --- a/src/backend/commands/vacuum.c +++ b/src/backend/commands/vacuum.c @@ -116,7 +116,6 @@ static bool vacuum_rel(Oid relid, RangeVar *relation, VacuumParams *params, static double compute_parallel_delay(void); static VacOptValue get_vacoptval_from_boolean(DefElem *def); static bool vac_tid_reaped(ItemPointer itemptr, void *state); -static int vac_cmp_itemptr(const void *left, const void *right); /* * GUC check function to ensure GUC value specified is within the allowable @@ -2489,16 +2488,16 @@ get_vacoptval_from_boolean(DefElem *def) */ IndexBulkDeleteResult * vac_bulkdel_one_index(IndexVacuumInfo *ivinfo, IndexBulkDeleteResult *istat, - VacDeadItems *dead_items) + TidStore *dead_items, VacDeadItemsInfo *dead_items_info) { /* Do bulk deletion */ istat = index_bulk_delete(ivinfo, istat, vac_tid_reaped, (void *) dead_items); ereport(ivinfo->message_level, - (errmsg("scanned index \"%s\" to remove %d row versions", + (errmsg("scanned index \"%s\" to remove %lld row versions", RelationGetRelationName(ivinfo->index), - dead_items->num_items))); + (long long) dead_items_info->num_items))); return istat; } @@ -2530,81 +2529,14 @@ vac_cleanup_one_index(IndexVacuumInfo *ivinfo, IndexBulkDeleteResult *istat) } /* - * Returns the total required space for VACUUM's dead_items array given a - * max_items value. - */ -Size -vac_max_items_to_alloc_size(int max_items) -{ - Assert(max_items <= MAXDEADITEMS(MaxAllocSize)); - - return offsetof(VacDeadItems, items) + sizeof(ItemPointerData) * max_items; -} - -/* * vac_tid_reaped() -- is a particular tid deletable? * * This has the right signature to be an IndexBulkDeleteCallback. - * - * Assumes dead_items array is sorted (in ascending TID order). */ static bool vac_tid_reaped(ItemPointer itemptr, void *state) { - VacDeadItems *dead_items = (VacDeadItems *) state; - int64 litem, - ritem, - item; - ItemPointer res; - - litem = itemptr_encode(&dead_items->items[0]); - ritem = itemptr_encode(&dead_items->items[dead_items->num_items - 1]); - item = itemptr_encode(itemptr); - - /* - * Doing a simple bound check before bsearch() is useful to avoid the - * extra cost of bsearch(), especially if dead items on the heap are - * concentrated in a certain range. Since this function is called for - * every index tuple, it pays to be really fast. - */ - if (item < litem || item > ritem) - return false; - - res = (ItemPointer) bsearch(itemptr, - dead_items->items, - dead_items->num_items, - sizeof(ItemPointerData), - vac_cmp_itemptr); - - return (res != NULL); -} - -/* - * Comparator routines for use with qsort() and bsearch(). - */ -static int -vac_cmp_itemptr(const void *left, const void *right) -{ - BlockNumber lblk, - rblk; - OffsetNumber loff, - roff; - - lblk = ItemPointerGetBlockNumber((ItemPointer) left); - rblk = ItemPointerGetBlockNumber((ItemPointer) right); - - if (lblk < rblk) - return -1; - if (lblk > rblk) - return 1; - - loff = ItemPointerGetOffsetNumber((ItemPointer) left); - roff = ItemPointerGetOffsetNumber((ItemPointer) right); - - if (loff < roff) - return -1; - if (loff > roff) - return 1; + TidStore *dead_items = (TidStore *) state; - return 0; + return TidStoreIsMember(dead_items, itemptr); } diff --git a/src/backend/commands/vacuumparallel.c b/src/backend/commands/vacuumparallel.c index befda1c1050..5174a4e9753 100644 --- a/src/backend/commands/vacuumparallel.c +++ b/src/backend/commands/vacuumparallel.c @@ -8,8 +8,8 @@ * * In a parallel vacuum, we perform both index bulk deletion and index cleanup * with parallel worker processes. Individual indexes are processed by one - * vacuum process. ParallelVacuumState contains shared information as well as - * the memory space for storing dead items allocated in the DSM segment. We + * vacuum process. ParalleVacuumState contains shared information as well as + * the memory space for storing dead items allocated in the DSA area. We * launch parallel worker processes at the start of parallel index * bulk-deletion and index cleanup and once all indexes are processed, the * parallel worker processes exit. Each time we process indexes in parallel, @@ -45,11 +45,10 @@ * use small integers. */ #define PARALLEL_VACUUM_KEY_SHARED 1 -#define PARALLEL_VACUUM_KEY_DEAD_ITEMS 2 -#define PARALLEL_VACUUM_KEY_QUERY_TEXT 3 -#define PARALLEL_VACUUM_KEY_BUFFER_USAGE 4 -#define PARALLEL_VACUUM_KEY_WAL_USAGE 5 -#define PARALLEL_VACUUM_KEY_INDEX_STATS 6 +#define PARALLEL_VACUUM_KEY_QUERY_TEXT 2 +#define PARALLEL_VACUUM_KEY_BUFFER_USAGE 3 +#define PARALLEL_VACUUM_KEY_WAL_USAGE 4 +#define PARALLEL_VACUUM_KEY_INDEX_STATS 5 /* * Shared information among parallel workers. So this is allocated in the DSM @@ -110,6 +109,15 @@ typedef struct PVShared /* Counter for vacuuming and cleanup */ pg_atomic_uint32 idx; + + /* DSA handle where the TidStore lives */ + dsa_handle dead_items_dsa_handle; + + /* DSA pointer to the shared TidStore */ + dsa_pointer dead_items_handle; + + /* Statistics of shared dead items */ + VacDeadItemsInfo dead_items_info; } PVShared; /* Status used during parallel index vacuum or cleanup */ @@ -176,7 +184,7 @@ struct ParallelVacuumState PVIndStats *indstats; /* Shared dead items space among parallel vacuum workers */ - VacDeadItems *dead_items; + TidStore *dead_items; /* Points to buffer usage area in DSM */ BufferUsage *buffer_usage; @@ -232,20 +240,19 @@ static void parallel_vacuum_error_callback(void *arg); */ ParallelVacuumState * parallel_vacuum_init(Relation rel, Relation *indrels, int nindexes, - int nrequested_workers, int max_items, + int nrequested_workers, int vac_work_mem, int elevel, BufferAccessStrategy bstrategy) { ParallelVacuumState *pvs; ParallelContext *pcxt; PVShared *shared; - VacDeadItems *dead_items; + TidStore *dead_items; PVIndStats *indstats; BufferUsage *buffer_usage; WalUsage *wal_usage; bool *will_parallel_vacuum; Size est_indstats_len; Size est_shared_len; - Size est_dead_items_len; int nindexes_mwm = 0; int parallel_workers = 0; int querylen; @@ -294,11 +301,6 @@ parallel_vacuum_init(Relation rel, Relation *indrels, int nindexes, shm_toc_estimate_chunk(&pcxt->estimator, est_shared_len); shm_toc_estimate_keys(&pcxt->estimator, 1); - /* Estimate size for dead_items -- PARALLEL_VACUUM_KEY_DEAD_ITEMS */ - est_dead_items_len = vac_max_items_to_alloc_size(max_items); - shm_toc_estimate_chunk(&pcxt->estimator, est_dead_items_len); - shm_toc_estimate_keys(&pcxt->estimator, 1); - /* * Estimate space for BufferUsage and WalUsage -- * PARALLEL_VACUUM_KEY_BUFFER_USAGE and PARALLEL_VACUUM_KEY_WAL_USAGE. @@ -371,6 +373,14 @@ parallel_vacuum_init(Relation rel, Relation *indrels, int nindexes, (nindexes_mwm > 0) ? maintenance_work_mem / Min(parallel_workers, nindexes_mwm) : maintenance_work_mem; + shared->dead_items_info.max_bytes = vac_work_mem * 1024L; + + /* Prepare DSA space for dead items */ + dead_items = TidStoreCreateShared(shared->dead_items_info.max_bytes, + LWTRANCHE_PARALLEL_VACUUM_DSA); + pvs->dead_items = dead_items; + shared->dead_items_handle = TidStoreGetHandle(dead_items); + shared->dead_items_dsa_handle = dsa_get_handle(TidStoreGetDSA(dead_items)); /* Use the same buffer size for all workers */ shared->ring_nbuffers = GetAccessStrategyBufferCount(bstrategy); @@ -382,15 +392,6 @@ parallel_vacuum_init(Relation rel, Relation *indrels, int nindexes, shm_toc_insert(pcxt->toc, PARALLEL_VACUUM_KEY_SHARED, shared); pvs->shared = shared; - /* Prepare the dead_items space */ - dead_items = (VacDeadItems *) shm_toc_allocate(pcxt->toc, - est_dead_items_len); - dead_items->max_items = max_items; - dead_items->num_items = 0; - MemSet(dead_items->items, 0, sizeof(ItemPointerData) * max_items); - shm_toc_insert(pcxt->toc, PARALLEL_VACUUM_KEY_DEAD_ITEMS, dead_items); - pvs->dead_items = dead_items; - /* * Allocate space for each worker's BufferUsage and WalUsage; no need to * initialize @@ -448,6 +449,8 @@ parallel_vacuum_end(ParallelVacuumState *pvs, IndexBulkDeleteResult **istats) istats[i] = NULL; } + TidStoreDestroy(pvs->dead_items); + DestroyParallelContext(pvs->pcxt); ExitParallelMode(); @@ -455,13 +458,40 @@ parallel_vacuum_end(ParallelVacuumState *pvs, IndexBulkDeleteResult **istats) pfree(pvs); } -/* Returns the dead items space */ -VacDeadItems * -parallel_vacuum_get_dead_items(ParallelVacuumState *pvs) +/* + * Returns the dead items space and dead items information. + */ +TidStore * +parallel_vacuum_get_dead_items(ParallelVacuumState *pvs, VacDeadItemsInfo **dead_items_info_p) { + *dead_items_info_p = &(pvs->shared->dead_items_info); return pvs->dead_items; } +/* Forget all items in dead_items */ +void +parallel_vacuum_reset_dead_items(ParallelVacuumState *pvs) +{ + TidStore *dead_items = pvs->dead_items; + VacDeadItemsInfo *dead_items_info = &(pvs->shared->dead_items_info); + + /* + * Free the current tidstore and return allocated DSA segments to the + * operating system. Then we recreate the tidstore with the same max_bytes + * limitation we just used. + */ + TidStoreDestroy(dead_items); + pvs->dead_items = TidStoreCreateShared(dead_items_info->max_bytes, + LWTRANCHE_PARALLEL_VACUUM_DSA); + + /* Update the DSA pointer for dead_items to the new one */ + pvs->shared->dead_items_dsa_handle = dsa_get_handle(TidStoreGetDSA(dead_items)); + pvs->shared->dead_items_handle = TidStoreGetHandle(dead_items); + + /* Reset the counter */ + dead_items_info->num_items = 0; +} + /* * Do parallel index bulk-deletion with parallel workers. */ @@ -861,7 +891,8 @@ parallel_vacuum_process_one_index(ParallelVacuumState *pvs, Relation indrel, switch (indstats->status) { case PARALLEL_INDVAC_STATUS_NEED_BULKDELETE: - istat_res = vac_bulkdel_one_index(&ivinfo, istat, pvs->dead_items); + istat_res = vac_bulkdel_one_index(&ivinfo, istat, pvs->dead_items, + &pvs->shared->dead_items_info); break; case PARALLEL_INDVAC_STATUS_NEED_CLEANUP: istat_res = vac_cleanup_one_index(&ivinfo, istat); @@ -961,7 +992,7 @@ parallel_vacuum_main(dsm_segment *seg, shm_toc *toc) Relation *indrels; PVIndStats *indstats; PVShared *shared; - VacDeadItems *dead_items; + TidStore *dead_items; BufferUsage *buffer_usage; WalUsage *wal_usage; int nindexes; @@ -1005,10 +1036,9 @@ parallel_vacuum_main(dsm_segment *seg, shm_toc *toc) PARALLEL_VACUUM_KEY_INDEX_STATS, false); - /* Set dead_items space */ - dead_items = (VacDeadItems *) shm_toc_lookup(toc, - PARALLEL_VACUUM_KEY_DEAD_ITEMS, - false); + /* Find dead_items in shared memory */ + dead_items = TidStoreAttach(shared->dead_items_dsa_handle, + shared->dead_items_handle); /* Set cost-based vacuum delay */ VacuumUpdateCosts(); @@ -1056,6 +1086,8 @@ parallel_vacuum_main(dsm_segment *seg, shm_toc *toc) InstrEndParallelQuery(&buffer_usage[ParallelWorkerNumber], &wal_usage[ParallelWorkerNumber]); + TidStoreDetach(dead_items); + /* Pop the error context stack */ error_context_stack = errcallback.previous; |
