summaryrefslogtreecommitdiff
path: root/src/backend/storage
diff options
context:
space:
mode:
authorRobert Haas2014-02-01 03:45:17 +0000
committerRobert Haas2014-02-01 03:45:36 +0000
commit858ec11858a914d4c380971985709b6d6b7dd6fc (patch)
tree59eb508185cd8544c3485919a25dee15f3818c21 /src/backend/storage
parent5bdef38b8917cfbe206d14969c61a5d38fc822b6 (diff)
Introduce replication slots.
Replication slots are a crash-safe data structure which can be created on either a master or a standby to prevent premature removal of write-ahead log segments needed by a standby, as well as (with hot_standby_feedback=on) pruning of tuples whose removal would cause replication conflicts. Slots have some advantages over existing techniques, as explained in the documentation. In a few places, we refer to the type of replication slots introduced by this patch as "physical" slots, because forthcoming patches for logical decoding will also have slots, but with somewhat different properties. Andres Freund and Robert Haas
Diffstat (limited to 'src/backend/storage')
-rw-r--r--src/backend/storage/ipc/ipci.c3
-rw-r--r--src/backend/storage/ipc/procarray.c42
-rw-r--r--src/backend/storage/lmgr/lwlock.c4
-rw-r--r--src/backend/storage/lmgr/proc.c5
4 files changed, 54 insertions, 0 deletions
diff --git a/src/backend/storage/ipc/ipci.c b/src/backend/storage/ipc/ipci.c
index 2e717457b12..c392d4fa228 100644
--- a/src/backend/storage/ipc/ipci.c
+++ b/src/backend/storage/ipc/ipci.c
@@ -27,6 +27,7 @@
#include "postmaster/bgworker_internals.h"
#include "postmaster/bgwriter.h"
#include "postmaster/postmaster.h"
+#include "replication/slot.h"
#include "replication/walreceiver.h"
#include "replication/walsender.h"
#include "storage/bufmgr.h"
@@ -126,6 +127,7 @@ CreateSharedMemoryAndSemaphores(bool makePrivate, int port)
size = add_size(size, ProcSignalShmemSize());
size = add_size(size, CheckpointerShmemSize());
size = add_size(size, AutoVacuumShmemSize());
+ size = add_size(size, ReplicationSlotsShmemSize());
size = add_size(size, WalSndShmemSize());
size = add_size(size, WalRcvShmemSize());
size = add_size(size, BTreeShmemSize());
@@ -230,6 +232,7 @@ CreateSharedMemoryAndSemaphores(bool makePrivate, int port)
ProcSignalShmemInit();
CheckpointerShmemInit();
AutoVacuumShmemInit();
+ ReplicationSlotsShmemInit();
WalSndShmemInit();
WalRcvShmemInit();
diff --git a/src/backend/storage/ipc/procarray.c b/src/backend/storage/ipc/procarray.c
index b68c95612c5..082115b4fff 100644
--- a/src/backend/storage/ipc/procarray.c
+++ b/src/backend/storage/ipc/procarray.c
@@ -82,6 +82,9 @@ typedef struct ProcArrayStruct
*/
TransactionId lastOverflowedXid;
+ /* oldest xmin of any replication slot */
+ TransactionId replication_slot_xmin;
+
/*
* We declare pgprocnos[] as 1 entry because C wants a fixed-size array,
* but actually it is maxProcs entries long.
@@ -228,6 +231,7 @@ CreateSharedProcArray(void)
*/
procArray->numProcs = 0;
procArray->maxProcs = PROCARRAY_MAXPROCS;
+ procArray->replication_slot_xmin = InvalidTransactionId;
procArray->maxKnownAssignedXids = TOTAL_MAX_CACHED_SUBXIDS;
procArray->numKnownAssignedXids = 0;
procArray->tailKnownAssignedXids = 0;
@@ -1153,6 +1157,7 @@ GetOldestXmin(bool allDbs, bool ignoreVacuum)
ProcArrayStruct *arrayP = procArray;
TransactionId result;
int index;
+ volatile TransactionId replication_slot_xmin = InvalidTransactionId;
/* Cannot look for individual databases during recovery */
Assert(allDbs || !RecoveryInProgress());
@@ -1204,6 +1209,9 @@ GetOldestXmin(bool allDbs, bool ignoreVacuum)
}
}
+ /* fetch into volatile var while ProcArrayLock is held */
+ replication_slot_xmin = procArray->replication_slot_xmin;
+
if (RecoveryInProgress())
{
/*
@@ -1244,6 +1252,13 @@ GetOldestXmin(bool allDbs, bool ignoreVacuum)
result = FirstNormalTransactionId;
}
+ /*
+ * Check whether there are replication slots requiring an older xmin.
+ */
+ if (TransactionIdIsValid(replication_slot_xmin) &&
+ NormalTransactionIdPrecedes(replication_slot_xmin, result))
+ result = replication_slot_xmin;
+
return result;
}
@@ -1313,6 +1328,7 @@ GetSnapshotData(Snapshot snapshot)
int count = 0;
int subcount = 0;
bool suboverflowed = false;
+ volatile TransactionId replication_slot_xmin = InvalidTransactionId;
Assert(snapshot != NULL);
@@ -1490,8 +1506,13 @@ GetSnapshotData(Snapshot snapshot)
suboverflowed = true;
}
+
+ /* fetch into volatile var while ProcArrayLock is held */
+ replication_slot_xmin = procArray->replication_slot_xmin;
+
if (!TransactionIdIsValid(MyPgXact->xmin))
MyPgXact->xmin = TransactionXmin = xmin;
+
LWLockRelease(ProcArrayLock);
/*
@@ -1506,6 +1527,12 @@ GetSnapshotData(Snapshot snapshot)
RecentGlobalXmin = globalxmin - vacuum_defer_cleanup_age;
if (!TransactionIdIsNormal(RecentGlobalXmin))
RecentGlobalXmin = FirstNormalTransactionId;
+
+ /* Check whether there's a replication slot requiring an older xmin. */
+ if (TransactionIdIsValid(replication_slot_xmin) &&
+ NormalTransactionIdPrecedes(replication_slot_xmin, RecentGlobalXmin))
+ RecentGlobalXmin = replication_slot_xmin;
+
RecentXmin = xmin;
snapshot->xmin = xmin;
@@ -2491,6 +2518,21 @@ CountOtherDBBackends(Oid databaseId, int *nbackends, int *nprepared)
return true; /* timed out, still conflicts */
}
+/*
+ * ProcArraySetReplicationSlotXmin
+ *
+ * Install limits to future computations of the xmin horizon to prevent vacuum
+ * and HOT pruning from removing affected rows still needed by clients with
+ * replicaton slots.
+ */
+void
+ProcArraySetReplicationSlotXmin(TransactionId xmin)
+{
+ LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
+ procArray->replication_slot_xmin = xmin;
+ LWLockRelease(ProcArrayLock);
+}
+
#define XidCacheRemove(i) \
do { \
diff --git a/src/backend/storage/lmgr/lwlock.c b/src/backend/storage/lmgr/lwlock.c
index 55d9d7837ca..82ef4409494 100644
--- a/src/backend/storage/lmgr/lwlock.c
+++ b/src/backend/storage/lmgr/lwlock.c
@@ -27,6 +27,7 @@
#include "commands/async.h"
#include "miscadmin.h"
#include "pg_trace.h"
+#include "replication/slot.h"
#include "storage/ipc.h"
#include "storage/predicate.h"
#include "storage/proc.h"
@@ -238,6 +239,9 @@ NumLWLocks(void)
/* predicate.c needs one per old serializable xid buffer */
numLocks += NUM_OLDSERXID_BUFFERS;
+ /* slot.c needs one for each slot */
+ numLocks += max_replication_slots;
+
/*
* Add any requested by loadable modules; for backwards-compatibility
* reasons, allocate at least NUM_USER_DEFINED_LWLOCKS of them even if
diff --git a/src/backend/storage/lmgr/proc.c b/src/backend/storage/lmgr/proc.c
index 9d32f9405d5..fb449a88204 100644
--- a/src/backend/storage/lmgr/proc.c
+++ b/src/backend/storage/lmgr/proc.c
@@ -40,6 +40,7 @@
#include "access/xact.h"
#include "miscadmin.h"
#include "postmaster/autovacuum.h"
+#include "replication/slot.h"
#include "replication/syncrep.h"
#include "storage/ipc.h"
#include "storage/lmgr.h"
@@ -780,6 +781,10 @@ ProcKill(int code, Datum arg)
/* Make sure we're out of the sync rep lists */
SyncRepCleanupAtProcExit();
+ /* Make sure active replication slots are released */
+ if (MyReplicationSlot != NULL)
+ ReplicationSlotRelease();
+
#ifdef USE_ASSERT_CHECKING
if (assert_enabled)
{