From 0e851beb823d0936b5f627a3723afe2818c9e672 Mon Sep 17 00:00:00 2001 From: Yura Sokolov Date: Sun, 19 Jan 2025 17:40:28 +0300 Subject: [PATCH] Lock-free XLog Reservation using lock-free hash-table Removed PrevBytePos to eliminate lock contention, allowing atomic updates to CurrBytePos. Use lock-free hash-table based on 4-way Cuckoo Hashing to store link to PrevBytePos. --- src/backend/access/transam/xlog.c | 585 +++++++++++++++++++++++++++--- src/tools/pgindent/typedefs.list | 2 + 2 files changed, 532 insertions(+), 55 deletions(-) diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index 2d4c346473b7..0dff9addfe10 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -68,6 +68,8 @@ #include "catalog/pg_database.h" #include "common/controldata_utils.h" #include "common/file_utils.h" +#include "common/hashfn.h" +#include "common/pg_prng.h" #include "executor/instrument.h" #include "miscadmin.h" #include "pg_trace.h" @@ -379,6 +381,94 @@ typedef union WALInsertLockPadded char pad[PG_CACHE_LINE_SIZE]; } WALInsertLockPadded; +/* #define WAL_LINK_64 0 */ +#ifndef WAL_LINK_64 +#ifdef PG_HAVE_ATOMIC_U64_SIMULATION +#define WAL_LINK_64 0 +#else +#define WAL_LINK_64 1 +#endif +#endif + +/* + * It links current position with previous one. + * - CurrPosId is (CurrBytePos ^ (CurrBytePos>>32)) + * Since CurrBytePos grows monotonically and it is aligned to MAXALIGN, + * CurrPosId correctly identifies CurrBytePos for at least 4*2^32 = 32GB of + * WAL logs. + * - CurrPosHigh is (CurrBytePos>>32), it is stored for strong uniqueness check. + * - PrevSize is difference between CurrBytePos and PrevBytePos + */ +typedef struct +{ +#if WAL_LINK_64 + uint64 CurrPos; + uint64 PrevPos; +#define WAL_PREV_EMPTY (~((uint64)0)) +#define WALLinkEmpty(l) ((l).PrevPos == WAL_PREV_EMPTY) +#define WALLinkSamePos(a, b) ((a).CurrPos == (b).CurrPos) +#define WALLinkCopyPrev(a, b) do {(a).PrevPos = (b).PrevPos;} while(0) +#else + uint32 CurrPosId; + uint32 CurrPosHigh; + uint32 PrevSize; +#define WALLinkEmpty(l) ((l).PrevSize == 0) +#define WALLinkSamePos(a, b) ((a).CurrPosId == (b).CurrPosId && (a).CurrPosHigh == (b).CurrPosHigh) +#define WALLinkCopyPrev(a, b) do {(a).PrevSize = (b).PrevSize;} while(0) +#endif +} WALPrevPosLinkVal; + +/* + * This is an element of lock-free hash-table. + * In 32 bit mode PrevSize's lowest bit is used as a lock, relying on fact it is MAXALIGN-ed. + * In 64 bit mode lock protocol is more complex. + */ +typedef struct +{ +#if WAL_LINK_64 + pg_atomic_uint64 CurrPos; + pg_atomic_uint64 PrevPos; +#else + pg_atomic_uint32 CurrPosId; + uint32 CurrPosHigh; + pg_atomic_uint32 PrevSize; + uint32 pad; /* to align to 16 bytes */ +#endif +} WALPrevPosLink; + +StaticAssertDecl(sizeof(WALPrevPosLink) == 16, "WALPrevPosLink should be 16 bytes"); + +#define PREV_LINKS_HASH_CAPA (NUM_XLOGINSERT_LOCKS * 2) +StaticAssertDecl(!(PREV_LINKS_HASH_CAPA & (PREV_LINKS_HASH_CAPA - 1)), + "PREV_LINKS_HASH_CAPA should be power of two"); +StaticAssertDecl(PREV_LINKS_HASH_CAPA < UINT16_MAX, + "PREV_LINKS_HASH_CAPA is too large"); + +/*----------- + * PREV_LINKS_HASH_STRATEGY - the way slots are chosen in hash table + * 1 - 4 positions h1,h1+1,h2,h2+2 - it guarantees at least 3 distinct points, + * but may spread at 4 cache lines. + * 2 - 4 positions h,h^1,h^2,h^3 - 4 points in single cache line. + * 3 - 8 positions h1,h1^1,h1^2,h1^4,h2,h2^1,h2^2,h2^3 - 8 distinct points in + * in two cache lines. + */ +#ifndef PREV_LINKS_HASH_STRATEGY +#define PREV_LINKS_HASH_STRATEGY 3 +#endif + +#if PREV_LINKS_HASH_STRATEGY <= 2 +#define PREV_LINKS_LOOKUPS 4 +#else +#define PREV_LINKS_LOOKUPS 8 +#endif + +struct WALPrevLinksLookups +{ + uint16 pos[PREV_LINKS_LOOKUPS]; +}; + +#define SWAP_ONCE_IN 128 + /* * Session status of running backup, used for sanity checks in SQL-callable * functions to start and stop backups. @@ -390,17 +480,18 @@ static SessionBackupState sessionBackupState = SESSION_BACKUP_NONE; */ typedef struct XLogCtlInsert { - slock_t insertpos_lck; /* protects CurrBytePos and PrevBytePos */ - /* * CurrBytePos is the end of reserved WAL. The next record will be - * inserted at that position. PrevBytePos is the start position of the - * previously inserted (or rather, reserved) record - it is copied to the - * prev-link of the next record. These are stored as "usable byte - * positions" rather than XLogRecPtrs (see XLogBytePosToRecPtr()). + * inserted at that position. + * + * The start position of the previously inserted (or rather, reserved) + * record (it is copied to the prev-link of the next record) will be + * stored in PrevLinksHash. + * + * These are stored as "usable byte positions" rather than XLogRecPtrs + * (see XLogBytePosToRecPtr()). */ - uint64 CurrBytePos; - uint64 PrevBytePos; + pg_atomic_uint64 CurrBytePos; /* * Make sure the above heavily-contended spinlock and byte positions are @@ -437,8 +528,37 @@ typedef struct XLogCtlInsert * WAL insertion locks. */ WALInsertLockPadded *WALInsertLocks; + + /* + * PrevLinksHash is a lock-free hash table based on Cuckoo algorithm. + * + * With default PREV_LINKS_HASH_STRATEGY == 1 it is mostly 4 way: for + * every element computed two positions h1, h2, and neighbour h1+1 and + * h2+2 are used as well. This way even on collision we have 3 distinct + * position, which provide us ~75% fill rate without unsolvable cycles + * (due to Cuckoo's theory). But chosen slots may be in 4 distinct + * cache-lines. + * + * With PREV_LINKS_HASH_STRATEGY == 3 it takes two buckets 4 elements each + * - 8 positions in total, but guaranteed to be in two cache lines. It + * provides very high fill rate - upto 90%. + * + * With PREV_LINKS_HASH_STRATEGY == 2 it takes only one bucket with 4 + * elements. Strictly speaking it is not Cuckoo-hashing, but should work + * for our case. + * + * Certainly, we rely on the fact we will delete elements with same speed + * as we add them, and even unsolvable cycles will be destroyed soon by + * concurrent deletions. + */ + WALPrevPosLink *PrevLinksHash; + } XLogCtlInsert; +StaticAssertDecl(offsetof(XLogCtlInsert, RedoRecPtr) / PG_CACHE_LINE_SIZE != + offsetof(XLogCtlInsert, CurrBytePos) / PG_CACHE_LINE_SIZE, + "offset ok"); + /* * Total shared-memory state for XLOG. */ @@ -579,6 +699,9 @@ static XLogCtlData *XLogCtl = NULL; /* a private copy of XLogCtl->Insert.WALInsertLocks, for convenience */ static WALInsertLockPadded *WALInsertLocks = NULL; +/* same for XLogCtl->Insert.PrevLinksHash */ +static WALPrevPosLink *PrevLinksHash = NULL; + /* * We maintain an image of pg_control in shared memory. */ @@ -711,6 +834,19 @@ static void CopyXLogRecordToWAL(int write_len, bool isLogSwitch, XLogRecData *rdata, XLogRecPtr StartPos, XLogRecPtr EndPos, TimeLineID tli); + +static void WALPrevPosLinkValCompose(WALPrevPosLinkVal *val, XLogRecPtr StartPos, XLogRecPtr PrevPos); +static void WALPrevPosLinkValGetPrev(WALPrevPosLinkVal val, XLogRecPtr *PrevPos); +static void CalcCuckooPositions(WALPrevPosLinkVal linkval, struct WALPrevLinksLookups *pos); + +static bool WALPrevPosLinkInsert(WALPrevPosLink *link, WALPrevPosLinkVal val); +static bool WALPrevPosLinkConsume(WALPrevPosLink *link, WALPrevPosLinkVal *val); +static bool WALPrevPosLinkSwap(WALPrevPosLink *link, WALPrevPosLinkVal *val); +static void LinkAndFindPrevPos(XLogRecPtr StartPos, XLogRecPtr EndPos, + XLogRecPtr *PrevPtr); +static void LinkStartPrevPos(XLogRecPtr EndOfLog, XLogRecPtr LastRec); +static XLogRecPtr ReadInsertCurrBytePos(void); + static void ReserveXLogInsertLocation(int size, XLogRecPtr *StartPos, XLogRecPtr *EndPos, XLogRecPtr *PrevPtr); static bool ReserveXLogSwitch(XLogRecPtr *StartPos, XLogRecPtr *EndPos, @@ -1097,6 +1233,341 @@ XLogInsertRecord(XLogRecData *rdata, return EndPos; } +static pg_attribute_always_inline void +WALPrevPosLinkValCompose(WALPrevPosLinkVal *val, XLogRecPtr StartPos, XLogRecPtr PrevPos) +{ +#if WAL_LINK_64 + val->CurrPos = StartPos; + val->PrevPos = PrevPos; +#else + val->CurrPosHigh = StartPos >> 32; + val->CurrPosId = StartPos ^ val->CurrPosHigh; + val->PrevSize = StartPos - PrevPos; +#endif +} + +static pg_attribute_always_inline void +WALPrevPosLinkValGetPrev(WALPrevPosLinkVal val, XLogRecPtr *PrevPos) +{ +#if WAL_LINK_64 + *PrevPos = val.PrevPos; +#else + XLogRecPtr StartPos = val.CurrPosHigh; + + StartPos ^= (StartPos << 32) | val.CurrPosId; + *PrevPos = StartPos - val.PrevSize; +#endif +} + +static pg_attribute_always_inline void +CalcCuckooPositions(WALPrevPosLinkVal linkval, struct WALPrevLinksLookups *pos) +{ + uint32 hash; +#if PREV_LINKS_HASH_STRATEGY == 3 + uint32 offset; +#endif + + +#if WAL_LINK_64 + hash = murmurhash32(linkval.CurrPos ^ (linkval.CurrPos >> 32)); +#else + hash = murmurhash32(linkval.CurrPosId); +#endif + +#if PREV_LINKS_HASH_STRATEGY == 1 + pos->pos[0] = hash % PREV_LINKS_HASH_CAPA; + pos->pos[1] = (pos->pos[0] + 1) % PREV_LINKS_HASH_CAPA; + pos->pos[2] = (hash >> 16) % PREV_LINKS_HASH_CAPA; + pos->pos[3] = (pos->pos[2] + 2) % PREV_LINKS_HASH_CAPA; +#else + pos->pos[0] = hash % PREV_LINKS_HASH_CAPA; + pos->pos[1] = pos->pos[0] ^ 1; + pos->pos[2] = pos->pos[0] ^ 2; + pos->pos[3] = pos->pos[0] ^ 3; +#if PREV_LINKS_HASH_STRATEGY == 3 + /* use multiplication compute 0 <= offset < PREV_LINKS_HASH_CAPA-4 */ + offset = (hash / PREV_LINKS_HASH_CAPA) * (PREV_LINKS_HASH_CAPA - 4); + offset /= UINT32_MAX / PREV_LINKS_HASH_CAPA + 1; + /* add start of next bucket */ + offset += (pos->pos[0] | 3) + 1; + /* get position in strictly other bucket */ + pos->pos[4] = offset % PREV_LINKS_HASH_CAPA; + pos->pos[5] = pos->pos[4] ^ 1; + pos->pos[6] = pos->pos[4] ^ 2; + pos->pos[7] = pos->pos[4] ^ 3; +#endif +#endif +} + +/* + * Attempt to write into empty link. + */ +static pg_attribute_always_inline bool +WALPrevPosLinkInsert(WALPrevPosLink *link, WALPrevPosLinkVal val) +{ +#if WAL_LINK_64 + uint64 empty = WAL_PREV_EMPTY; + + if (pg_atomic_read_u64(&link->PrevPos) != WAL_PREV_EMPTY) + return false; + if (!pg_atomic_compare_exchange_u64(&link->PrevPos, &empty, val.PrevPos)) + return false; + /* we could ignore concurrent lock of CurrPos */ + pg_atomic_write_u64(&link->CurrPos, val.CurrPos); + return true; +#else + uint32 empty = 0; + + /* first check it read-only */ + if (pg_atomic_read_u32(&link->PrevSize) != 0) + return false; + if (!pg_atomic_compare_exchange_u32(&link->PrevSize, &empty, 1)) + /* someone else occupied the entry */ + return false; + + pg_atomic_write_u32(&link->CurrPosId, val.CurrPosId); + link->CurrPosHigh = val.CurrPosHigh; + pg_write_barrier(); + /* This write acts as unlock as well. */ + pg_atomic_write_u32(&link->PrevSize, val.PrevSize); + return true; +#endif +} + +/* + * Attempt to consume matched link. + */ +static pg_attribute_always_inline bool +WALPrevPosLinkConsume(WALPrevPosLink *link, WALPrevPosLinkVal *val) +{ +#if WAL_LINK_64 + uint64 oldCurr; + + if (pg_atomic_read_u64(&link->CurrPos) != val->CurrPos) + return false; + /* lock against concurrent swapper */ + oldCurr = pg_atomic_fetch_or_u64(&link->CurrPos, 1); + if (oldCurr & 1) + return false; /* lock failed */ + if (oldCurr != val->CurrPos) + { + /* link was swapped */ + pg_atomic_write_u64(&link->CurrPos, oldCurr); + return false; + } + val->PrevPos = pg_atomic_read_u64(&link->PrevPos); + pg_atomic_write_u64(&link->PrevPos, WAL_PREV_EMPTY); + + /* + * concurrent inserter may already reuse this link, so we don't check + * result of compare_exchange + */ + oldCurr |= 1; + pg_atomic_compare_exchange_u64(&link->CurrPos, &oldCurr, 0); + return true; +#else + if (pg_atomic_read_u32(&link->CurrPosId) != val->CurrPosId) + return false; + + /* Try lock */ + val->PrevSize = pg_atomic_fetch_or_u32(&link->PrevSize, 1); + if (val->PrevSize & 1) + /* Lock failed */ + return false; + + if (pg_atomic_read_u32(&link->CurrPosId) != val->CurrPosId || + link->CurrPosHigh != val->CurrPosHigh) + { + /* unlock with old value */ + pg_atomic_write_u32(&link->PrevSize, val->PrevSize); + return false; + } + + pg_atomic_write_u32(&link->CurrPosId, 0); + link->CurrPosHigh = 0; + pg_write_barrier(); + /* This write acts as unlock as well. */ + pg_atomic_write_u32(&link->PrevSize, 0); + return true; +#endif +} + +/* + * Attempt to swap entry: remember existing link and write our. + * It could happen we consume empty entry. Caller will detect it by checking + * remembered value. + */ +static pg_attribute_always_inline bool +WALPrevPosLinkSwap(WALPrevPosLink *link, WALPrevPosLinkVal *val) +{ +#if WAL_LINK_64 + uint64 oldCurr; + uint64 oldPrev; + + /* lock against concurrent swapper or consumer */ + oldCurr = pg_atomic_fetch_or_u64(&link->CurrPos, 1); + if (oldCurr & 1) + return false; /* lock failed */ + if (oldCurr == 0) + { + /* link was empty */ + oldPrev = WAL_PREV_EMPTY; + /* but concurrent inserter may concurrently insert */ + if (!pg_atomic_compare_exchange_u64(&link->PrevPos, &oldPrev, val->PrevPos)) + return false; /* concurrent inserter won. It will overwrite + * CurrPos */ + /* this write acts as unlock */ + pg_atomic_write_u64(&link->CurrPos, val->CurrPos); + val->CurrPos = 0; + val->PrevPos = WAL_PREV_EMPTY; + return true; + } + oldPrev = pg_atomic_read_u64(&link->PrevPos); + pg_atomic_write_u64(&link->PrevPos, val->PrevPos); + pg_write_barrier(); + /* write acts as unlock */ + pg_atomic_write_u64(&link->CurrPos, val->CurrPos); + val->CurrPos = oldCurr; + val->PrevPos = oldPrev; + return true; +#else + uint32 oldPrev; + uint32 oldCurId; + uint32 oldCurHigh; + + /* Attempt to lock entry against concurrent consumer or swapper */ + oldPrev = pg_atomic_fetch_or_u32(&link->PrevSize, 1); + if (oldPrev & 1) + /* Lock failed */ + return false; + + oldCurId = pg_atomic_read_u32(&link->CurrPosId); + oldCurHigh = link->CurrPosHigh; + pg_atomic_write_u32(&link->CurrPosId, val->CurrPosId); + link->CurrPosHigh = val->CurrPosHigh; + pg_write_barrier(); + /* This write acts as unlock as well. */ + pg_atomic_write_u32(&link->PrevSize, val->PrevSize); + + val->CurrPosId = oldCurId; + val->CurrPosHigh = oldCurHigh; + val->PrevSize = oldPrev; + return true; +#endif +} + +/* + * Write new link (EndPos, StartPos) and find PrevPtr for StartPos. + * + * Links are stored in lock-free Cuckoo based hash-table. + * We use mostly-4 way Cuckoo hashing which provides high fill rate without + * hard cycle collisions. Also we rely on concurrent consumers of existing + * entry, so cycles will be broken in mean time. + * + * Cuckoo hashing relies on re-insertion for balancing, so we occasionally + * swaps entry and try to insert swapped instead of our. + */ +static void +LinkAndFindPrevPos(XLogRecPtr StartPos, XLogRecPtr EndPos, XLogRecPtr *PrevPtr) +{ + SpinDelayStatus spin_stat; + WALPrevPosLinkVal lookup; + WALPrevPosLinkVal insert; + struct WALPrevLinksLookups lookup_pos; + struct WALPrevLinksLookups insert_pos; + uint32 i; + uint32 rand = 0; + bool inserted = false; + bool found = false; + + /* pass StartPos second time to set PrevSize = 0 */ + WALPrevPosLinkValCompose(&lookup, StartPos, StartPos); + WALPrevPosLinkValCompose(&insert, EndPos, StartPos); + + CalcCuckooPositions(lookup, &lookup_pos); + CalcCuckooPositions(insert, &insert_pos); + + init_local_spin_delay(&spin_stat); + + while (!inserted || !found) + { + for (i = 0; !found && i < PREV_LINKS_LOOKUPS; i++) + found = WALPrevPosLinkConsume(&PrevLinksHash[lookup_pos.pos[i]], &lookup); + + if (inserted) + { + /* + * we may sleep only after we inserted our value, since other + * backend waits for it + */ + perform_spin_delay(&spin_stat); + goto next; + } + + for (i = 0; !inserted && i < PREV_LINKS_LOOKUPS; i++) + inserted = WALPrevPosLinkInsert(&PrevLinksHash[insert_pos.pos[i]], insert); + + if (inserted) + goto next; + + rand = pg_prng_uint32(&pg_global_prng_state); + if (rand % SWAP_ONCE_IN != 0) + goto next; + + i = rand / SWAP_ONCE_IN % PREV_LINKS_LOOKUPS; + if (!WALPrevPosLinkSwap(&PrevLinksHash[insert_pos.pos[i]], &insert)) + goto next; + + if (WALLinkEmpty(insert)) + /* Lucky case: entry become empty and we inserted into */ + inserted = true; + else if (WALLinkSamePos(lookup, insert)) + { + /* + * We occasionally replaced entry we looked for. No need to insert + * it again. + */ + inserted = true; + Assert(!found); + found = true; + WALLinkCopyPrev(lookup, insert); + break; + } + else + CalcCuckooPositions(insert, &insert_pos); + +next: + pg_spin_delay(); + pg_read_barrier(); + } + + WALPrevPosLinkValGetPrev(lookup, PrevPtr); +} + +static pg_attribute_always_inline void +LinkStartPrevPos(XLogRecPtr EndOfLog, XLogRecPtr LastRec) +{ + WALPrevPosLinkVal insert; + struct WALPrevLinksLookups insert_pos; + + WALPrevPosLinkValCompose(&insert, EndOfLog, LastRec); + CalcCuckooPositions(insert, &insert_pos); +#if WAL_LINK_64 + pg_atomic_write_u64(&PrevLinksHash[insert_pos.pos[0]].CurrPos, insert.CurrPos); + pg_atomic_write_u64(&PrevLinksHash[insert_pos.pos[0]].PrevPos, insert.PrevPos); +#else + pg_atomic_write_u32(&PrevLinksHash[insert_pos.pos[0]].CurrPosId, insert.CurrPosId); + PrevLinksHash[insert_pos.pos[0]].CurrPosHigh = insert.CurrPosHigh; + pg_atomic_write_u32(&PrevLinksHash[insert_pos.pos[0]].PrevSize, insert.PrevSize); +#endif +} + +static pg_attribute_always_inline XLogRecPtr +ReadInsertCurrBytePos(void) +{ + return pg_atomic_read_u64(&XLogCtl->Insert.CurrBytePos); +} + /* * Reserves the right amount of space for a record of given size from the WAL. * *StartPos is set to the beginning of the reserved section, *EndPos to @@ -1129,25 +1600,9 @@ ReserveXLogInsertLocation(int size, XLogRecPtr *StartPos, XLogRecPtr *EndPos, /* All (non xlog-switch) records should contain data. */ Assert(size > SizeOfXLogRecord); - /* - * The duration the spinlock needs to be held is minimized by minimizing - * the calculations that have to be done while holding the lock. The - * current tip of reserved WAL is kept in CurrBytePos, as a byte position - * that only counts "usable" bytes in WAL, that is, it excludes all WAL - * page headers. The mapping between "usable" byte positions and physical - * positions (XLogRecPtrs) can be done outside the locked region, and - * because the usable byte position doesn't include any headers, reserving - * X bytes from WAL is almost as simple as "CurrBytePos += X". - */ - SpinLockAcquire(&Insert->insertpos_lck); - - startbytepos = Insert->CurrBytePos; + startbytepos = pg_atomic_fetch_add_u64(&Insert->CurrBytePos, size); endbytepos = startbytepos + size; - prevbytepos = Insert->PrevBytePos; - Insert->CurrBytePos = endbytepos; - Insert->PrevBytePos = startbytepos; - - SpinLockRelease(&Insert->insertpos_lck); + LinkAndFindPrevPos(startbytepos, endbytepos, &prevbytepos); *StartPos = XLogBytePosToRecPtr(startbytepos); *EndPos = XLogBytePosToEndRecPtr(endbytepos); @@ -1183,26 +1638,24 @@ ReserveXLogSwitch(XLogRecPtr *StartPos, XLogRecPtr *EndPos, XLogRecPtr *PrevPtr) uint32 segleft; /* - * These calculations are a bit heavy-weight to be done while holding a - * spinlock, but since we're holding all the WAL insertion locks, there - * are no other inserters competing for it. GetXLogInsertRecPtr() does - * compete for it, but that's not called very frequently. + * Currently ReserveXLogInsertLocation is protected with exclusive + * insertion lock, so there is no contention against CurrBytePos, But we + * still do CAS loop for being uniform. + * + * Probably we'll get rid of exclusive lock in a future. */ - SpinLockAcquire(&Insert->insertpos_lck); - startbytepos = Insert->CurrBytePos; +repeat: + startbytepos = pg_atomic_read_u64(&Insert->CurrBytePos); ptr = XLogBytePosToEndRecPtr(startbytepos); if (XLogSegmentOffset(ptr, wal_segment_size) == 0) { - SpinLockRelease(&Insert->insertpos_lck); *EndPos = *StartPos = ptr; return false; } endbytepos = startbytepos + size; - prevbytepos = Insert->PrevBytePos; - *StartPos = XLogBytePosToRecPtr(startbytepos); *EndPos = XLogBytePosToEndRecPtr(endbytepos); @@ -1213,10 +1666,19 @@ ReserveXLogSwitch(XLogRecPtr *StartPos, XLogRecPtr *EndPos, XLogRecPtr *PrevPtr) *EndPos += segleft; endbytepos = XLogRecPtrToBytePos(*EndPos); } - Insert->CurrBytePos = endbytepos; - Insert->PrevBytePos = startbytepos; - SpinLockRelease(&Insert->insertpos_lck); + if (!pg_atomic_compare_exchange_u64(&Insert->CurrBytePos, + &startbytepos, + endbytepos)) + { + /* + * Don't use spin delay here: perform_spin_delay primary case is for + * solving single core contention. But on single core we will succeed + * on the next attempt. + */ + goto repeat; + } + LinkAndFindPrevPos(startbytepos, endbytepos, &prevbytepos); *PrevPtr = XLogBytePosToRecPtr(prevbytepos); @@ -1518,7 +1980,6 @@ WaitXLogInsertionsToFinish(XLogRecPtr upto) XLogRecPtr inserted; XLogRecPtr reservedUpto; XLogRecPtr finishedUpto; - XLogCtlInsert *Insert = &XLogCtl->Insert; int i; if (MyProc == NULL) @@ -1533,9 +1994,7 @@ WaitXLogInsertionsToFinish(XLogRecPtr upto) return inserted; /* Read the current insert position */ - SpinLockAcquire(&Insert->insertpos_lck); - bytepos = Insert->CurrBytePos; - SpinLockRelease(&Insert->insertpos_lck); + bytepos = ReadInsertCurrBytePos(); reservedUpto = XLogBytePosToEndRecPtr(bytepos); /* @@ -5079,6 +5538,8 @@ XLOGShmemSize(void) /* WAL insertion locks, plus alignment */ size = add_size(size, mul_size(sizeof(WALInsertLockPadded), NUM_XLOGINSERT_LOCKS + 1)); + /* prevlinkshash, abuses alignment of WAL insertion locks. */ + size = add_size(size, mul_size(sizeof(WALPrevPosLink), PREV_LINKS_HASH_CAPA)); /* xlblocks array */ size = add_size(size, mul_size(sizeof(pg_atomic_uint64), XLOGbuffers)); /* extra alignment padding for XLOG I/O buffers */ @@ -5133,8 +5594,9 @@ XLOGShmemInit(void) /* both should be present or neither */ Assert(foundCFile && foundXLog); - /* Initialize local copy of WALInsertLocks */ + /* Initialize local copy of WALInsertLocks and PrevLinksHash */ WALInsertLocks = XLogCtl->Insert.WALInsertLocks; + PrevLinksHash = XLogCtl->Insert.PrevLinksHash; if (localControlFile) pfree(localControlFile); @@ -5180,6 +5642,9 @@ XLOGShmemInit(void) WALInsertLocks[i].l.lastImportantAt = InvalidXLogRecPtr; } + PrevLinksHash = XLogCtl->Insert.PrevLinksHash = (WALPrevPosLink *) allocptr; + allocptr += sizeof(WALPrevPosLink) * PREV_LINKS_HASH_CAPA; + /* * Align the start of the page buffers to a full xlog block size boundary. * This simplifies some calculations in XLOG insertion. It is also @@ -5198,7 +5663,6 @@ XLOGShmemInit(void) XLogCtl->InstallXLogFileSegmentActive = false; XLogCtl->WalWriterSleeping = false; - SpinLockInit(&XLogCtl->Insert.insertpos_lck); SpinLockInit(&XLogCtl->info_lck); pg_atomic_init_u64(&XLogCtl->logInsertResult, InvalidXLogRecPtr); pg_atomic_init_u64(&XLogCtl->logWriteResult, InvalidXLogRecPtr); @@ -5208,6 +5672,19 @@ XLOGShmemInit(void) pg_atomic_init_u64(&XLogCtl->InitializeReserved, InvalidXLogRecPtr); pg_atomic_init_u64(&XLogCtl->InitializedUpTo, InvalidXLogRecPtr); ConditionVariableInit(&XLogCtl->InitializedUpToCondVar); + + pg_atomic_init_u64(&XLogCtl->Insert.CurrBytePos, 0); + + for (i = 0; i < PREV_LINKS_HASH_CAPA; i++) + { +#if WAL_LINK_64 + pg_atomic_init_u64(&PrevLinksHash[i].CurrPos, 0); + pg_atomic_init_u64(&PrevLinksHash[i].PrevPos, WAL_PREV_EMPTY); +#else + pg_atomic_init_u32(&PrevLinksHash[i].CurrPosId, 0); + pg_atomic_init_u32(&PrevLinksHash[i].PrevSize, 0); +#endif + } } /* @@ -6203,8 +6680,13 @@ StartupXLOG(void) * previous incarnation. */ Insert = &XLogCtl->Insert; - Insert->PrevBytePos = XLogRecPtrToBytePos(endOfRecoveryInfo->lastRec); - Insert->CurrBytePos = XLogRecPtrToBytePos(EndOfLog); + { + XLogRecPtr endOfLog = XLogRecPtrToBytePos(EndOfLog); + XLogRecPtr lastRec = XLogRecPtrToBytePos(endOfRecoveryInfo->lastRec); + + pg_atomic_write_u64(&Insert->CurrBytePos, endOfLog); + LinkStartPrevPos(endOfLog, lastRec); + } /* * Tricky point here: lastPage contains the *last* block that the LastRec @@ -7193,7 +7675,7 @@ CreateCheckPoint(int flags) if (shutdown) { - XLogRecPtr curInsert = XLogBytePosToRecPtr(Insert->CurrBytePos); + XLogRecPtr curInsert = XLogBytePosToRecPtr(ReadInsertCurrBytePos()); /* * Compute new REDO record ptr = location of next XLOG record. @@ -9611,14 +10093,7 @@ register_persistent_abort_backup_handler(void) XLogRecPtr GetXLogInsertRecPtr(void) { - XLogCtlInsert *Insert = &XLogCtl->Insert; - uint64 current_bytepos; - - SpinLockAcquire(&Insert->insertpos_lck); - current_bytepos = Insert->CurrBytePos; - SpinLockRelease(&Insert->insertpos_lck); - - return XLogBytePosToRecPtr(current_bytepos); + return XLogBytePosToRecPtr(ReadInsertCurrBytePos()); } /* diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index e5879e00dffe..f0bfb4762c3e 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -3211,6 +3211,8 @@ WALAvailability WALInsertLock WALInsertLockPadded WALOpenSegment +WALPrevPosLink +WALPrevPosLinkVal WALReadError WALSegmentCloseCB WALSegmentContext