diff options
Diffstat (limited to 'src/include/utils')
-rw-r--r-- | src/include/utils/logtape.h | 39 | ||||
-rw-r--r-- | src/include/utils/tuplesort.h | 132 |
2 files changed, 159 insertions, 12 deletions
diff --git a/src/include/utils/logtape.h b/src/include/utils/logtape.h index 88662c10a43..9bf1d801424 100644 --- a/src/include/utils/logtape.h +++ b/src/include/utils/logtape.h @@ -16,15 +16,49 @@ #ifndef LOGTAPE_H #define LOGTAPE_H +#include "storage/sharedfileset.h" + /* LogicalTapeSet is an opaque type whose details are not known outside logtape.c. */ typedef struct LogicalTapeSet LogicalTapeSet; /* + * The approach tuplesort.c takes to parallel external sorts is that workers, + * whose state is almost the same as independent serial sorts, are made to + * produce a final materialized tape of sorted output in all cases. This is + * frozen, just like any case requiring a final materialized tape. However, + * there is one difference, which is that freezing will also export an + * underlying shared fileset BufFile for sharing. Freezing produces TapeShare + * metadata for the worker when this happens, which is passed along through + * shared memory to leader. + * + * The leader process can then pass an array of TapeShare metadata (one per + * worker participant) to LogicalTapeSetCreate(), alongside a handle to a + * shared fileset, which is sufficient to construct a new logical tapeset that + * consists of each of the tapes materialized by workers. + * + * Note that while logtape.c does create an empty leader tape at the end of the + * tapeset in the leader case, it can never be written to due to a restriction + * in the shared buffile infrastructure. + */ +typedef struct TapeShare +{ + /* + * firstblocknumber is first block that should be read from materialized + * tape. + * + * buffilesize is the size of associated BufFile following freezing. + */ + long firstblocknumber; + off_t buffilesize; +} TapeShare; + +/* * prototypes for functions in logtape.c */ -extern LogicalTapeSet *LogicalTapeSetCreate(int ntapes); +extern LogicalTapeSet *LogicalTapeSetCreate(int ntapes, TapeShare *shared, + SharedFileSet *fileset, int worker); extern void LogicalTapeSetClose(LogicalTapeSet *lts); extern void LogicalTapeSetForgetFreeSpace(LogicalTapeSet *lts); extern size_t LogicalTapeRead(LogicalTapeSet *lts, int tapenum, @@ -34,7 +68,8 @@ extern void LogicalTapeWrite(LogicalTapeSet *lts, int tapenum, extern void LogicalTapeRewindForRead(LogicalTapeSet *lts, int tapenum, size_t buffer_size); extern void LogicalTapeRewindForWrite(LogicalTapeSet *lts, int tapenum); -extern void LogicalTapeFreeze(LogicalTapeSet *lts, int tapenum); +extern void LogicalTapeFreeze(LogicalTapeSet *lts, int tapenum, + TapeShare *share); extern size_t LogicalTapeBackspace(LogicalTapeSet *lts, int tapenum, size_t size); extern void LogicalTapeSeek(LogicalTapeSet *lts, int tapenum, diff --git a/src/include/utils/tuplesort.h b/src/include/utils/tuplesort.h index 5d57c503ab2..d2e6754f043 100644 --- a/src/include/utils/tuplesort.h +++ b/src/include/utils/tuplesort.h @@ -8,7 +8,8 @@ * if necessary). It works efficiently for both small and large amounts * of data. Small amounts are sorted in-memory using qsort(). Large * amounts are sorted using temporary files and a standard external sort - * algorithm. + * algorithm. Parallel sorts use a variant of this external sort + * algorithm, and are typically only used for large amounts of data. * * Portions Copyright (c) 1996-2018, PostgreSQL Global Development Group * Portions Copyright (c) 1994, Regents of the University of California @@ -23,13 +24,39 @@ #include "access/itup.h" #include "executor/tuptable.h" #include "fmgr.h" +#include "storage/dsm.h" #include "utils/relcache.h" -/* Tuplesortstate is an opaque type whose details are not known outside - * tuplesort.c. +/* + * Tuplesortstate and Sharedsort are opaque types whose details are not + * known outside tuplesort.c. */ typedef struct Tuplesortstate Tuplesortstate; +typedef struct Sharedsort Sharedsort; + +/* + * Tuplesort parallel coordination state, allocated by each participant in + * local memory. Participant caller initializes everything. See usage notes + * below. + */ +typedef struct SortCoordinateData +{ + /* Worker process? If not, must be leader. */ + bool isWorker; + + /* + * Leader-process-passed number of participants known launched (workers + * set this to -1). Includes state within leader needed for it to + * participate as a worker, if any. + */ + int nParticipants; + + /* Private opaque state (points to shared memory) */ + Sharedsort *sharedsort; +} SortCoordinateData; + +typedef struct SortCoordinateData *SortCoordinate; /* * Data structures for reporting sort statistics. Note that @@ -66,6 +93,8 @@ typedef struct TuplesortInstrumentation * sorting HeapTuples and two more for sorting IndexTuples. Yet another * API supports sorting bare Datums. * + * Serial sort callers should pass NULL for their coordinate argument. + * * The "heap" API actually stores/sorts MinimalTuples, which means it doesn't * preserve the system columns (tuple identity and transaction visibility * info). The sort keys are specified by column numbers within the tuples @@ -84,30 +113,107 @@ typedef struct TuplesortInstrumentation * * The "index_hash" API is similar to index_btree, but the tuples are * actually sorted by their hash codes not the raw data. + * + * Parallel sort callers are required to coordinate multiple tuplesort states + * in a leader process and one or more worker processes. The leader process + * must launch workers, and have each perform an independent "partial" + * tuplesort, typically fed by the parallel heap interface. The leader later + * produces the final output (internally, it merges runs output by workers). + * + * Callers must do the following to perform a sort in parallel using multiple + * worker processes: + * + * 1. Request tuplesort-private shared memory for n workers. Use + * tuplesort_estimate_shared() to get the required size. + * 2. Have leader process initialize allocated shared memory using + * tuplesort_initialize_shared(). Launch workers. + * 3. Initialize a coordinate argument within both the leader process, and + * for each worker process. This has a pointer to the shared + * tuplesort-private structure, as well as some caller-initialized fields. + * Leader's coordinate argument reliably indicates number of workers + * launched (this is unused by workers). + * 4. Begin a tuplesort using some appropriate tuplesort_begin* routine, + * (passing the coordinate argument) within each worker. The workMem + * arguments need not be identical. All other arguments should match + * exactly, though. + * 5. tuplesort_attach_shared() should be called by all workers. Feed tuples + * to each worker, and call tuplesort_performsort() within each when input + * is exhausted. + * 6. Call tuplesort_end() in each worker process. Worker processes can shut + * down once tuplesort_end() returns. + * 7. Begin a tuplesort in the leader using the same tuplesort_begin* + * routine, passing a leader-appropriate coordinate argument (this can + * happen as early as during step 3, actually, since we only need to know + * the number of workers successfully launched). The leader must now wait + * for workers to finish. Caller must use own mechanism for ensuring that + * next step isn't reached until all workers have called and returned from + * tuplesort_performsort(). (Note that it's okay if workers have already + * also called tuplesort_end() by then.) + * 8. Call tuplesort_performsort() in leader. Consume output using the + * appropriate tuplesort_get* routine. Leader can skip this step if + * tuplesort turns out to be unnecessary. + * 9. Call tuplesort_end() in leader. + * + * This division of labor assumes nothing about how input tuples are produced, + * but does require that caller combine the state of multiple tuplesorts for + * any purpose other than producing the final output. For example, callers + * must consider that tuplesort_get_stats() reports on only one worker's role + * in a sort (or the leader's role), and not statistics for the sort as a + * whole. + * + * Note that callers may use the leader process to sort runs as if it was an + * independent worker process (prior to the process performing a leader sort + * to produce the final sorted output). Doing so only requires a second + * "partial" tuplesort within the leader process, initialized like that of a + * worker process. The steps above don't touch on this directly. The only + * difference is that the tuplesort_attach_shared() call is never needed within + * leader process, because the backend as a whole holds the shared fileset + * reference. A worker Tuplesortstate in leader is expected to do exactly the + * same amount of total initial processing work as a worker process + * Tuplesortstate, since the leader process has nothing else to do before + * workers finish. + * + * Note that only a very small amount of memory will be allocated prior to + * the leader state first consuming input, and that workers will free the + * vast majority of their memory upon returning from tuplesort_performsort(). + * Callers can rely on this to arrange for memory to be used in a way that + * respects a workMem-style budget across an entire parallel sort operation. + * + * Callers are responsible for parallel safety in general. However, they + * can at least rely on there being no parallel safety hazards within + * tuplesort, because tuplesort thinks of the sort as several independent + * sorts whose results are combined. Since, in general, the behavior of + * sort operators is immutable, caller need only worry about the parallel + * safety of whatever the process is through which input tuples are + * generated (typically, caller uses a parallel heap scan). */ extern Tuplesortstate *tuplesort_begin_heap(TupleDesc tupDesc, int nkeys, AttrNumber *attNums, Oid *sortOperators, Oid *sortCollations, bool *nullsFirstFlags, - int workMem, bool randomAccess); + int workMem, SortCoordinate coordinate, + bool randomAccess); extern Tuplesortstate *tuplesort_begin_cluster(TupleDesc tupDesc, - Relation indexRel, - int workMem, bool randomAccess); + Relation indexRel, int workMem, + SortCoordinate coordinate, bool randomAccess); extern Tuplesortstate *tuplesort_begin_index_btree(Relation heapRel, Relation indexRel, bool enforceUnique, - int workMem, bool randomAccess); + int workMem, SortCoordinate coordinate, + bool randomAccess); extern Tuplesortstate *tuplesort_begin_index_hash(Relation heapRel, Relation indexRel, uint32 high_mask, uint32 low_mask, uint32 max_buckets, - int workMem, bool randomAccess); + int workMem, SortCoordinate coordinate, + bool randomAccess); extern Tuplesortstate *tuplesort_begin_datum(Oid datumType, Oid sortOperator, Oid sortCollation, bool nullsFirstFlag, - int workMem, bool randomAccess); + int workMem, SortCoordinate coordinate, + bool randomAccess); extern void tuplesort_set_bound(Tuplesortstate *state, int64 bound); @@ -141,10 +247,16 @@ extern const char *tuplesort_space_type_name(TuplesortSpaceType t); extern int tuplesort_merge_order(int64 allowedMem); +extern Size tuplesort_estimate_shared(int nworkers); +extern void tuplesort_initialize_shared(Sharedsort *shared, int nWorkers, + dsm_segment *seg); +extern void tuplesort_attach_shared(Sharedsort *shared, dsm_segment *seg); + /* * These routines may only be called if randomAccess was specified 'true'. * Likewise, backwards scan in gettuple/getdatum is only allowed if - * randomAccess was specified. + * randomAccess was specified. Note that parallel sorts do not support + * randomAccess. */ extern void tuplesort_rescan(Tuplesortstate *state); |