diff options
Diffstat (limited to 'src/backend/catalog/index.c')
-rw-r--r-- | src/backend/catalog/index.c | 123 |
1 files changed, 91 insertions, 32 deletions
diff --git a/src/backend/catalog/index.c b/src/backend/catalog/index.c index 849a4691277..f2cb6d7fb81 100644 --- a/src/backend/catalog/index.c +++ b/src/backend/catalog/index.c @@ -56,6 +56,7 @@ #include "nodes/makefuncs.h" #include "nodes/nodeFuncs.h" #include "optimizer/clauses.h" +#include "optimizer/planner.h" #include "parser/parser.h" #include "rewrite/rewriteManip.h" #include "storage/bufmgr.h" @@ -902,7 +903,7 @@ index_create(Relation heapRelation, Assert(indexRelationId == RelationGetRelid(indexRelation)); /* - * Obtain exclusive lock on it. Although no other backends can see it + * Obtain exclusive lock on it. Although no other transactions can see it * until we commit, this prevents deadlock-risk complaints from lock * manager in cases such as CLUSTER. */ @@ -1159,7 +1160,8 @@ index_create(Relation heapRelation, } else { - index_build(heapRelation, indexRelation, indexInfo, isprimary, false); + index_build(heapRelation, indexRelation, indexInfo, isprimary, false, + true); } /* @@ -1746,6 +1748,7 @@ BuildIndexInfo(Relation index) /* initialize index-build state to default */ ii->ii_Concurrent = false; ii->ii_BrokenHotChain = false; + ii->ii_ParallelWorkers = 0; /* set up for possible use by index AM */ ii->ii_Am = index->rd_rel->relam; @@ -2164,6 +2167,7 @@ index_update_stats(Relation rel, * * isprimary tells whether to mark the index as a primary-key index. * isreindex indicates we are recreating a previously-existing index. + * parallel indicates if parallelism may be useful. * * Note: when reindexing an existing index, isprimary can be false even if * the index is a PK; it's already properly marked and need not be re-marked. @@ -2177,7 +2181,8 @@ index_build(Relation heapRelation, Relation indexRelation, IndexInfo *indexInfo, bool isprimary, - bool isreindex) + bool isreindex, + bool parallel) { IndexBuildResult *stats; Oid save_userid; @@ -2192,10 +2197,31 @@ index_build(Relation heapRelation, Assert(PointerIsValid(indexRelation->rd_amroutine->ambuild)); Assert(PointerIsValid(indexRelation->rd_amroutine->ambuildempty)); - ereport(DEBUG1, - (errmsg("building index \"%s\" on table \"%s\"", - RelationGetRelationName(indexRelation), - RelationGetRelationName(heapRelation)))); + /* + * Determine worker process details for parallel CREATE INDEX. Currently, + * only btree has support for parallel builds. + * + * Note that planner considers parallel safety for us. + */ + if (parallel && IsNormalProcessingMode() && + indexRelation->rd_rel->relam == BTREE_AM_OID) + indexInfo->ii_ParallelWorkers = + plan_create_index_workers(RelationGetRelid(heapRelation), + RelationGetRelid(indexRelation)); + + if (indexInfo->ii_ParallelWorkers == 0) + ereport(DEBUG1, + (errmsg("building index \"%s\" on table \"%s\" serially", + RelationGetRelationName(indexRelation), + RelationGetRelationName(heapRelation)))); + else + ereport(DEBUG1, + (errmsg_plural("building index \"%s\" on table \"%s\" with request for %d parallel worker", + "building index \"%s\" on table \"%s\" with request for %d parallel workers", + indexInfo->ii_ParallelWorkers, + RelationGetRelationName(indexRelation), + RelationGetRelationName(heapRelation), + indexInfo->ii_ParallelWorkers))); /* * Switch to the table owner's userid, so that any index functions are run @@ -2347,13 +2373,14 @@ IndexBuildHeapScan(Relation heapRelation, IndexInfo *indexInfo, bool allow_sync, IndexBuildCallback callback, - void *callback_state) + void *callback_state, + HeapScanDesc scan) { return IndexBuildHeapRangeScan(heapRelation, indexRelation, indexInfo, allow_sync, false, 0, InvalidBlockNumber, - callback, callback_state); + callback, callback_state, scan); } /* @@ -2375,11 +2402,11 @@ IndexBuildHeapRangeScan(Relation heapRelation, BlockNumber start_blockno, BlockNumber numblocks, IndexBuildCallback callback, - void *callback_state) + void *callback_state, + HeapScanDesc scan) { bool is_system_catalog; bool checking_uniqueness; - HeapScanDesc scan; HeapTuple heapTuple; Datum values[INDEX_MAX_KEYS]; bool isnull[INDEX_MAX_KEYS]; @@ -2389,6 +2416,7 @@ IndexBuildHeapRangeScan(Relation heapRelation, EState *estate; ExprContext *econtext; Snapshot snapshot; + bool need_unregister_snapshot = false; TransactionId OldestXmin; BlockNumber root_blkno = InvalidBlockNumber; OffsetNumber root_offsets[MaxHeapTuplesPerPage]; @@ -2432,27 +2460,59 @@ IndexBuildHeapRangeScan(Relation heapRelation, * concurrent build, or during bootstrap, we take a regular MVCC snapshot * and index whatever's live according to that. */ - if (IsBootstrapProcessingMode() || indexInfo->ii_Concurrent) - { - snapshot = RegisterSnapshot(GetTransactionSnapshot()); - OldestXmin = InvalidTransactionId; /* not used */ + OldestXmin = InvalidTransactionId; + + /* okay to ignore lazy VACUUMs here */ + if (!IsBootstrapProcessingMode() && !indexInfo->ii_Concurrent) + OldestXmin = GetOldestXmin(heapRelation, PROCARRAY_FLAGS_VACUUM); - /* "any visible" mode is not compatible with this */ - Assert(!anyvisible); + if (!scan) + { + /* + * Serial index build. + * + * Must begin our own heap scan in this case. We may also need to + * register a snapshot whose lifetime is under our direct control. + */ + if (!TransactionIdIsValid(OldestXmin)) + { + snapshot = RegisterSnapshot(GetTransactionSnapshot()); + need_unregister_snapshot = true; + } + else + snapshot = SnapshotAny; + + scan = heap_beginscan_strat(heapRelation, /* relation */ + snapshot, /* snapshot */ + 0, /* number of keys */ + NULL, /* scan key */ + true, /* buffer access strategy OK */ + allow_sync); /* syncscan OK? */ } else { - snapshot = SnapshotAny; - /* okay to ignore lazy VACUUMs here */ - OldestXmin = GetOldestXmin(heapRelation, PROCARRAY_FLAGS_VACUUM); + /* + * Parallel index build. + * + * Parallel case never registers/unregisters own snapshot. Snapshot + * is taken from parallel heap scan, and is SnapshotAny or an MVCC + * snapshot, based on same criteria as serial case. + */ + Assert(!IsBootstrapProcessingMode()); + Assert(allow_sync); + snapshot = scan->rs_snapshot; } - scan = heap_beginscan_strat(heapRelation, /* relation */ - snapshot, /* snapshot */ - 0, /* number of keys */ - NULL, /* scan key */ - true, /* buffer access strategy OK */ - allow_sync); /* syncscan OK? */ + /* + * Must call GetOldestXmin() with SnapshotAny. Should never call + * GetOldestXmin() with MVCC snapshot. (It's especially worth checking + * this for parallel builds, since ambuild routines that support parallel + * builds must work these details out for themselves.) + */ + Assert(snapshot == SnapshotAny || IsMVCCSnapshot(snapshot)); + Assert(snapshot == SnapshotAny ? TransactionIdIsValid(OldestXmin) : + !TransactionIdIsValid(OldestXmin)); + Assert(snapshot == SnapshotAny || !anyvisible); /* set our scan endpoints */ if (!allow_sync) @@ -2783,8 +2843,8 @@ IndexBuildHeapRangeScan(Relation heapRelation, heap_endscan(scan); - /* we can now forget our snapshot, if set */ - if (IsBootstrapProcessingMode() || indexInfo->ii_Concurrent) + /* we can now forget our snapshot, if set and registered by us */ + if (need_unregister_snapshot) UnregisterSnapshot(snapshot); ExecDropSingleTupleTableSlot(slot); @@ -3027,7 +3087,7 @@ validate_index(Oid heapId, Oid indexId, Snapshot snapshot) state.tuplesort = tuplesort_begin_datum(INT8OID, Int8LessOperator, InvalidOid, false, maintenance_work_mem, - false); + NULL, false); state.htups = state.itups = state.tups_inserted = 0; (void) index_bulk_delete(&ivinfo, NULL, @@ -3552,7 +3612,7 @@ reindex_index(Oid indexId, bool skip_constraint_checks, char persistence, /* Initialize the index and rebuild */ /* Note: we do not need to re-establish pkey setting */ - index_build(heapRelation, iRel, indexInfo, false, true); + index_build(heapRelation, iRel, indexInfo, false, true, true); } PG_CATCH(); { @@ -3911,8 +3971,7 @@ SetReindexProcessing(Oid heapOid, Oid indexOid) static void ResetReindexProcessing(void) { - if (IsInParallelMode()) - elog(ERROR, "cannot modify reindex state during a parallel operation"); + /* This may be called in leader error path */ currentlyReindexedHeap = InvalidOid; currentlyReindexedIndex = InvalidOid; } |