summaryrefslogtreecommitdiff
path: root/src/backend/access
diff options
context:
space:
mode:
authorAlexander Korotkov2024-10-24 11:37:53 +0000
committerAlexander Korotkov2024-10-24 11:37:53 +0000
commit5035172e4ab58e4e8eef1bc60b0712fc59e0be31 (patch)
treecfdb3a6caf509c3c1711e5a2ad379c90caee1a36 /src/backend/access
parentb85a9d046efdd27775cbe7db9e92aad96aab4ada (diff)
Move LSN waiting declarations and definitions to better place
3c5db1d6b implemented the pg_wal_replay_wait() stored procedure. Due to the patch development history, the implementation resided in src/backend/commands/waitlsn.c (src/include/commands/waitlsn.h for headers). 014f9f34d moved pg_wal_replay_wait() itself to src/backend/access/transam/xlogfuncs.c near to the WAL-manipulation functions. But most of the implementation stayed in place. The code in src/backend/commands/waitlsn.c has nothing to do with commands, but is related to WAL. So, this commit moves this code into src/backend/access/transam/xlogwait.c (src/include/access/xlogwait.h for headers). Reported-by: Peter Eisentraut Discussion: https://postgr.es/m/18c0fa64-0475-415e-a1bd-665d922c5201%40eisentraut.org Reviewed-by: Pavel Borisov
Diffstat (limited to 'src/backend/access')
-rw-r--r--src/backend/access/transam/Makefile3
-rw-r--r--src/backend/access/transam/meson.build1
-rw-r--r--src/backend/access/transam/xact.c2
-rw-r--r--src/backend/access/transam/xlog.c2
-rw-r--r--src/backend/access/transam/xlogfuncs.c2
-rw-r--r--src/backend/access/transam/xlogrecovery.c2
-rw-r--r--src/backend/access/transam/xlogwait.c338
7 files changed, 345 insertions, 5 deletions
diff --git a/src/backend/access/transam/Makefile b/src/backend/access/transam/Makefile
index 661c55a9db7..a32f473e0a2 100644
--- a/src/backend/access/transam/Makefile
+++ b/src/backend/access/transam/Makefile
@@ -36,7 +36,8 @@ OBJS = \
xlogreader.o \
xlogrecovery.o \
xlogstats.o \
- xlogutils.o
+ xlogutils.o \
+ xlogwait.o
include $(top_srcdir)/src/backend/common.mk
diff --git a/src/backend/access/transam/meson.build b/src/backend/access/transam/meson.build
index 8a3522557cd..91d258f9df1 100644
--- a/src/backend/access/transam/meson.build
+++ b/src/backend/access/transam/meson.build
@@ -24,6 +24,7 @@ backend_sources += files(
'xlogrecovery.c',
'xlogstats.c',
'xlogutils.c',
+ 'xlogwait.c',
)
# used by frontend programs to build a frontend xlogreader
diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c
index b0b05e28790..d8f6c658420 100644
--- a/src/backend/access/transam/xact.c
+++ b/src/backend/access/transam/xact.c
@@ -31,6 +31,7 @@
#include "access/xloginsert.h"
#include "access/xlogrecovery.h"
#include "access/xlogutils.h"
+#include "access/xlogwait.h"
#include "catalog/index.h"
#include "catalog/namespace.h"
#include "catalog/pg_enum.h"
@@ -38,7 +39,6 @@
#include "commands/async.h"
#include "commands/tablecmds.h"
#include "commands/trigger.h"
-#include "commands/waitlsn.h"
#include "common/pg_prng.h"
#include "executor/spi.h"
#include "libpq/be-fsstubs.h"
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 9102c8d772e..ad9b0b612f4 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -62,11 +62,11 @@
#include "access/xlogreader.h"
#include "access/xlogrecovery.h"
#include "access/xlogutils.h"
+#include "access/xlogwait.h"
#include "backup/basebackup.h"
#include "catalog/catversion.h"
#include "catalog/pg_control.h"
#include "catalog/pg_database.h"
-#include "commands/waitlsn.h"
#include "common/controldata_utils.h"
#include "common/file_utils.h"
#include "executor/instrument.h"
diff --git a/src/backend/access/transam/xlogfuncs.c b/src/backend/access/transam/xlogfuncs.c
index 3e3d2bb6189..cbf84ef7d8f 100644
--- a/src/backend/access/transam/xlogfuncs.c
+++ b/src/backend/access/transam/xlogfuncs.c
@@ -22,8 +22,8 @@
#include "access/xlog_internal.h"
#include "access/xlogbackup.h"
#include "access/xlogrecovery.h"
+#include "access/xlogwait.h"
#include "catalog/pg_type.h"
-#include "commands/waitlsn.h"
#include "funcapi.h"
#include "miscadmin.h"
#include "pgstat.h"
diff --git a/src/backend/access/transam/xlogrecovery.c b/src/backend/access/transam/xlogrecovery.c
index 320b14add1a..31caa49d6c3 100644
--- a/src/backend/access/transam/xlogrecovery.c
+++ b/src/backend/access/transam/xlogrecovery.c
@@ -40,10 +40,10 @@
#include "access/xlogreader.h"
#include "access/xlogrecovery.h"
#include "access/xlogutils.h"
+#include "access/xlogwait.h"
#include "backup/basebackup.h"
#include "catalog/pg_control.h"
#include "commands/tablespace.h"
-#include "commands/waitlsn.h"
#include "common/file_utils.h"
#include "miscadmin.h"
#include "pgstat.h"
diff --git a/src/backend/access/transam/xlogwait.c b/src/backend/access/transam/xlogwait.c
new file mode 100644
index 00000000000..eef58ce69ce
--- /dev/null
+++ b/src/backend/access/transam/xlogwait.c
@@ -0,0 +1,338 @@
+/*-------------------------------------------------------------------------
+ *
+ * xlogwait.c
+ * Implements waiting for the given replay LSN, which is used in
+ * CALL pg_wal_replay_wait(target_lsn pg_lsn, timeout float8).
+ *
+ * Copyright (c) 2024, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ * src/backend/access/transam/xlogwait.c
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include <float.h>
+#include <math.h>
+
+#include "pgstat.h"
+#include "access/xlog.h"
+#include "access/xlogrecovery.h"
+#include "access/xlogwait.h"
+#include "funcapi.h"
+#include "miscadmin.h"
+#include "storage/latch.h"
+#include "storage/proc.h"
+#include "storage/shmem.h"
+#include "utils/fmgrprotos.h"
+#include "utils/pg_lsn.h"
+#include "utils/snapmgr.h"
+#include "utils/wait_event_types.h"
+
+static int waitlsn_cmp(const pairingheap_node *a, const pairingheap_node *b,
+ void *arg);
+
+struct WaitLSNState *waitLSNState = NULL;
+
+/* Report the amount of shared memory space needed for WaitLSNState. */
+Size
+WaitLSNShmemSize(void)
+{
+ Size size;
+
+ size = offsetof(WaitLSNState, procInfos);
+ size = add_size(size, mul_size(MaxBackends, sizeof(WaitLSNProcInfo)));
+ return size;
+}
+
+/* Initialize the WaitLSNState in the shared memory. */
+void
+WaitLSNShmemInit(void)
+{
+ bool found;
+
+ waitLSNState = (WaitLSNState *) ShmemInitStruct("WaitLSNState",
+ WaitLSNShmemSize(),
+ &found);
+ if (!found)
+ {
+ pg_atomic_init_u64(&waitLSNState->minWaitedLSN, PG_UINT64_MAX);
+ pairingheap_initialize(&waitLSNState->waitersHeap, waitlsn_cmp, NULL);
+ memset(&waitLSNState->procInfos, 0, MaxBackends * sizeof(WaitLSNProcInfo));
+ }
+}
+
+/*
+ * Comparison function for waitLSN->waitersHeap heap. Waiting processes are
+ * ordered by lsn, so that the waiter with smallest lsn is at the top.
+ */
+static int
+waitlsn_cmp(const pairingheap_node *a, const pairingheap_node *b, void *arg)
+{
+ const WaitLSNProcInfo *aproc = pairingheap_const_container(WaitLSNProcInfo, phNode, a);
+ const WaitLSNProcInfo *bproc = pairingheap_const_container(WaitLSNProcInfo, phNode, b);
+
+ if (aproc->waitLSN < bproc->waitLSN)
+ return 1;
+ else if (aproc->waitLSN > bproc->waitLSN)
+ return -1;
+ else
+ return 0;
+}
+
+/*
+ * Update waitLSN->minWaitedLSN according to the current state of
+ * waitLSN->waitersHeap.
+ */
+static void
+updateMinWaitedLSN(void)
+{
+ XLogRecPtr minWaitedLSN = PG_UINT64_MAX;
+
+ if (!pairingheap_is_empty(&waitLSNState->waitersHeap))
+ {
+ pairingheap_node *node = pairingheap_first(&waitLSNState->waitersHeap);
+
+ minWaitedLSN = pairingheap_container(WaitLSNProcInfo, phNode, node)->waitLSN;
+ }
+
+ pg_atomic_write_u64(&waitLSNState->minWaitedLSN, minWaitedLSN);
+}
+
+/*
+ * Put the current process into the heap of LSN waiters.
+ */
+static void
+addLSNWaiter(XLogRecPtr lsn)
+{
+ WaitLSNProcInfo *procInfo = &waitLSNState->procInfos[MyProcNumber];
+
+ LWLockAcquire(WaitLSNLock, LW_EXCLUSIVE);
+
+ Assert(!procInfo->inHeap);
+
+ procInfo->latch = MyLatch;
+ procInfo->waitLSN = lsn;
+
+ pairingheap_add(&waitLSNState->waitersHeap, &procInfo->phNode);
+ procInfo->inHeap = true;
+ updateMinWaitedLSN();
+
+ LWLockRelease(WaitLSNLock);
+}
+
+/*
+ * Remove the current process from the heap of LSN waiters if it's there.
+ */
+static void
+deleteLSNWaiter(void)
+{
+ WaitLSNProcInfo *procInfo = &waitLSNState->procInfos[MyProcNumber];
+
+ LWLockAcquire(WaitLSNLock, LW_EXCLUSIVE);
+
+ if (!procInfo->inHeap)
+ {
+ LWLockRelease(WaitLSNLock);
+ return;
+ }
+
+ pairingheap_remove(&waitLSNState->waitersHeap, &procInfo->phNode);
+ procInfo->inHeap = false;
+ updateMinWaitedLSN();
+
+ LWLockRelease(WaitLSNLock);
+}
+
+/*
+ * Remove waiters whose LSN has been replayed from the heap and set their
+ * latches. If InvalidXLogRecPtr is given, remove all waiters from the heap
+ * and set latches for all waiters.
+ */
+void
+WaitLSNSetLatches(XLogRecPtr currentLSN)
+{
+ int i;
+ Latch **wakeUpProcLatches;
+ int numWakeUpProcs = 0;
+
+ wakeUpProcLatches = palloc(sizeof(Latch *) * MaxBackends);
+
+ LWLockAcquire(WaitLSNLock, LW_EXCLUSIVE);
+
+ /*
+ * Iterate the pairing heap of waiting processes till we find LSN not yet
+ * replayed. Record the process latches to set them later.
+ */
+ while (!pairingheap_is_empty(&waitLSNState->waitersHeap))
+ {
+ pairingheap_node *node = pairingheap_first(&waitLSNState->waitersHeap);
+ WaitLSNProcInfo *procInfo = pairingheap_container(WaitLSNProcInfo, phNode, node);
+
+ if (!XLogRecPtrIsInvalid(currentLSN) &&
+ procInfo->waitLSN > currentLSN)
+ break;
+
+ wakeUpProcLatches[numWakeUpProcs++] = procInfo->latch;
+ (void) pairingheap_remove_first(&waitLSNState->waitersHeap);
+ procInfo->inHeap = false;
+ }
+
+ updateMinWaitedLSN();
+
+ LWLockRelease(WaitLSNLock);
+
+ /*
+ * Set latches for processes, whose waited LSNs are already replayed. As
+ * the time consuming operations, we do it this outside of WaitLSNLock.
+ * This is actually fine because procLatch isn't ever freed, so we just
+ * can potentially set the wrong process' (or no process') latch.
+ */
+ for (i = 0; i < numWakeUpProcs; i++)
+ {
+ SetLatch(wakeUpProcLatches[i]);
+ }
+ pfree(wakeUpProcLatches);
+}
+
+/*
+ * Delete our item from shmem array if any.
+ */
+void
+WaitLSNCleanup(void)
+{
+ /*
+ * We do a fast-path check of the 'inHeap' flag without the lock. This
+ * flag is set to true only by the process itself. So, it's only possible
+ * to get a false positive. But that will be eliminated by a recheck
+ * inside deleteLSNWaiter().
+ */
+ if (waitLSNState->procInfos[MyProcNumber].inHeap)
+ deleteLSNWaiter();
+}
+
+/*
+ * Wait using MyLatch till the given LSN is replayed, the postmaster dies or
+ * timeout happens.
+ */
+void
+WaitForLSNReplay(XLogRecPtr targetLSN, int64 timeout)
+{
+ XLogRecPtr currentLSN;
+ TimestampTz endtime = 0;
+ int wake_events = WL_LATCH_SET | WL_EXIT_ON_PM_DEATH;
+
+ /* Shouldn't be called when shmem isn't initialized */
+ Assert(waitLSNState);
+
+ /* Should have a valid proc number */
+ Assert(MyProcNumber >= 0 && MyProcNumber < MaxBackends);
+
+ if (!RecoveryInProgress())
+ {
+ /*
+ * Recovery is not in progress. Given that we detected this in the
+ * very first check, this procedure was mistakenly called on primary.
+ * However, it's possible that standby was promoted concurrently to
+ * the procedure call, while target LSN is replayed. So, we still
+ * check the last replay LSN before reporting an error.
+ */
+ if (targetLSN <= GetXLogReplayRecPtr(NULL))
+ return;
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("recovery is not in progress"),
+ errhint("Waiting for LSN can only be executed during recovery.")));
+ }
+ else
+ {
+ /* If target LSN is already replayed, exit immediately */
+ if (targetLSN <= GetXLogReplayRecPtr(NULL))
+ return;
+ }
+
+ if (timeout > 0)
+ {
+ endtime = TimestampTzPlusMilliseconds(GetCurrentTimestamp(), timeout);
+ wake_events |= WL_TIMEOUT;
+ }
+
+ /*
+ * Add our process to the pairing heap of waiters. It might happen that
+ * target LSN gets replayed before we do. Another check at the beginning
+ * of the loop below prevents the race condition.
+ */
+ addLSNWaiter(targetLSN);
+
+ for (;;)
+ {
+ int rc;
+ long delay_ms = 0;
+
+ /* Recheck that recovery is still in-progress */
+ if (!RecoveryInProgress())
+ {
+ /*
+ * Recovery was ended, but recheck if target LSN was already
+ * replayed.
+ */
+ currentLSN = GetXLogReplayRecPtr(NULL);
+ if (targetLSN <= currentLSN)
+ return;
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("recovery is not in progress"),
+ errdetail("Recovery ended before replaying target LSN %X/%X; last replay LSN %X/%X.",
+ LSN_FORMAT_ARGS(targetLSN),
+ LSN_FORMAT_ARGS(currentLSN))));
+ }
+ else
+ {
+ /* Check if the waited LSN has been replayed */
+ currentLSN = GetXLogReplayRecPtr(NULL);
+ if (targetLSN <= currentLSN)
+ break;
+ }
+
+ /*
+ * If the timeout value is specified, calculate the number of
+ * milliseconds before the timeout. Exit if the timeout is already
+ * reached.
+ */
+ if (timeout > 0)
+ {
+ delay_ms = TimestampDifferenceMilliseconds(GetCurrentTimestamp(), endtime);
+ if (delay_ms <= 0)
+ break;
+ }
+
+ CHECK_FOR_INTERRUPTS();
+
+ rc = WaitLatch(MyLatch, wake_events, delay_ms,
+ WAIT_EVENT_WAIT_FOR_WAL_REPLAY);
+
+ if (rc & WL_LATCH_SET)
+ ResetLatch(MyLatch);
+ }
+
+ /*
+ * Delete our process from the shared memory pairing heap. We might
+ * already be deleted by the startup process. The 'inHeap' flag prevents
+ * us from the double deletion.
+ */
+ deleteLSNWaiter();
+
+ /*
+ * If we didn't reach the target LSN, we must be exited by timeout.
+ */
+ if (targetLSN > currentLSN)
+ {
+ ereport(ERROR,
+ (errcode(ERRCODE_QUERY_CANCELED),
+ errmsg("timed out while waiting for target LSN %X/%X to be replayed; current replay LSN %X/%X",
+ LSN_FORMAT_ARGS(targetLSN),
+ LSN_FORMAT_ARGS(currentLSN))));
+ }
+}