32
32
#include "postmaster/interrupt.h"
33
33
#include "replication/logicallauncher.h"
34
34
#include "replication/origin.h"
35
+ #include "replication/slot.h"
35
36
#include "replication/walreceiver.h"
36
37
#include "replication/worker_internal.h"
37
38
#include "storage/ipc.h"
@@ -91,7 +92,6 @@ static dshash_table *last_start_times = NULL;
91
92
static bool on_commit_launcher_wakeup = false;
92
93
93
94
94
- static void ApplyLauncherWakeup (void );
95
95
static void logicalrep_launcher_onexit (int code , Datum arg );
96
96
static void logicalrep_worker_onexit (int code , Datum arg );
97
97
static void logicalrep_worker_detach (void );
@@ -100,6 +100,9 @@ static int logicalrep_pa_worker_count(Oid subid);
100
100
static void logicalrep_launcher_attach_dshmem (void );
101
101
static void ApplyLauncherSetWorkerStartTime (Oid subid , TimestampTz start_time );
102
102
static TimestampTz ApplyLauncherGetWorkerStartTime (Oid subid );
103
+ static void create_conflict_slot_if_not_exists (void );
104
+ static void advance_conflict_slot_xmin (FullTransactionId new_xmin );
105
+ static void drop_conflict_slot_if_exists (void );
103
106
104
107
105
108
/*
@@ -1106,7 +1109,10 @@ ApplyLauncherWakeupAtCommit(void)
1106
1109
on_commit_launcher_wakeup = true;
1107
1110
}
1108
1111
1109
- static void
1112
+ /*
1113
+ * Wakeup the launcher immediately.
1114
+ */
1115
+ void
1110
1116
ApplyLauncherWakeup (void )
1111
1117
{
1112
1118
if (LogicalRepCtx -> launcher_pid != 0 )
@@ -1119,6 +1125,8 @@ ApplyLauncherWakeup(void)
1119
1125
void
1120
1126
ApplyLauncherMain (Datum main_arg )
1121
1127
{
1128
+ bool slot_maybe_exist = true;
1129
+
1122
1130
ereport (DEBUG1 ,
1123
1131
(errmsg_internal ("logical replication launcher started" )));
1124
1132
@@ -1147,6 +1155,8 @@ ApplyLauncherMain(Datum main_arg)
1147
1155
MemoryContext subctx ;
1148
1156
MemoryContext oldctx ;
1149
1157
long wait_time = DEFAULT_NAPTIME_PER_CYCLE ;
1158
+ bool can_advance_xmin = true;
1159
+ FullTransactionId xmin = InvalidFullTransactionId ;
1150
1160
1151
1161
CHECK_FOR_INTERRUPTS ();
1152
1162
@@ -1166,15 +1176,56 @@ ApplyLauncherMain(Datum main_arg)
1166
1176
TimestampTz now ;
1167
1177
long elapsed ;
1168
1178
1179
+ /*
1180
+ * Create the conflict slot before starting the worker to prevent
1181
+ * it from unnecessarily maintaining its oldest_nonremovable_xid.
1182
+ */
1183
+ create_conflict_slot_if_not_exists ();
1184
+
1169
1185
if (!sub -> enabled )
1186
+ {
1187
+ can_advance_xmin = false;
1170
1188
continue ;
1189
+ }
1171
1190
1172
1191
LWLockAcquire (LogicalRepWorkerLock , LW_SHARED );
1173
1192
w = logicalrep_worker_find (sub -> oid , InvalidOid , false);
1174
1193
LWLockRelease (LogicalRepWorkerLock );
1175
1194
1176
1195
if (w != NULL )
1196
+ {
1197
+ /*
1198
+ * Collect non-removable transaction IDs from all apply
1199
+ * workers to determine the xmin for advancing the replication
1200
+ * slot used in conflict detection.
1201
+ */
1202
+ if (can_advance_xmin )
1203
+ {
1204
+ FullTransactionId nonremovable_xid ;
1205
+
1206
+ SpinLockAcquire (& w -> relmutex );
1207
+ nonremovable_xid = w -> oldest_nonremovable_xid ;
1208
+ SpinLockRelease (& w -> relmutex );
1209
+
1210
+ /*
1211
+ * Stop advancing xmin if an invalid non-removable
1212
+ * transaction ID is found, otherwise update xmin.
1213
+ */
1214
+ if (!FullTransactionIdIsValid (nonremovable_xid ))
1215
+ can_advance_xmin = false;
1216
+ else if (!FullTransactionIdIsValid (xmin ) ||
1217
+ FullTransactionIdPrecedes (nonremovable_xid , xmin ))
1218
+ xmin = nonremovable_xid ;
1219
+ }
1220
+
1177
1221
continue ; /* worker is running already */
1222
+ }
1223
+
1224
+ /*
1225
+ * The worker has not yet started, so there is no valid
1226
+ * non-removable transaction ID available for advancement.
1227
+ */
1228
+ can_advance_xmin = false;
1178
1229
1179
1230
/*
1180
1231
* If the worker is eligible to start now, launch it. Otherwise,
@@ -1207,6 +1258,27 @@ ApplyLauncherMain(Datum main_arg)
1207
1258
}
1208
1259
}
1209
1260
1261
+ /*
1262
+ * Maintain the xmin value of the replication slot for conflict
1263
+ * detection if needed.
1264
+ */
1265
+ if (sublist )
1266
+ {
1267
+ if (can_advance_xmin )
1268
+ advance_conflict_slot_xmin (xmin );
1269
+
1270
+ slot_maybe_exist = true;
1271
+ }
1272
+
1273
+ /*
1274
+ * Drop the slot if we're no longer retaining dead tuples.
1275
+ */
1276
+ else if (slot_maybe_exist )
1277
+ {
1278
+ drop_conflict_slot_if_exists ();
1279
+ slot_maybe_exist = false;
1280
+ }
1281
+
1210
1282
/* Switch back to original memory context. */
1211
1283
MemoryContextSwitchTo (oldctx );
1212
1284
/* Clean the temporary memory. */
@@ -1234,6 +1306,110 @@ ApplyLauncherMain(Datum main_arg)
1234
1306
/* Not reachable */
1235
1307
}
1236
1308
1309
+ /*
1310
+ * Create and acquire the replication slot used to retain dead tuples for
1311
+ * conflict detection, if not yet.
1312
+ */
1313
+ static void
1314
+ create_conflict_slot_if_not_exists (void )
1315
+ {
1316
+ TransactionId xmin_horizon ;
1317
+
1318
+ /* Exit early if the replication slot is already created and acquired */
1319
+ if (MyReplicationSlot )
1320
+ return ;
1321
+
1322
+ /* If the replication slot exists, acquire it and exit */
1323
+ if (SearchNamedReplicationSlot (CONFLICT_DETECTION_SLOT , true))
1324
+ {
1325
+ ReplicationSlotAcquire (CONFLICT_DETECTION_SLOT , true, false);
1326
+ return ;
1327
+ }
1328
+
1329
+ ReplicationSlotCreate (CONFLICT_DETECTION_SLOT , false,
1330
+ RS_PERSISTENT , false, false, false);
1331
+
1332
+ LWLockAcquire (ProcArrayLock , LW_EXCLUSIVE );
1333
+
1334
+ xmin_horizon = GetOldestSafeDecodingTransactionId (false);
1335
+
1336
+ SpinLockAcquire (& MyReplicationSlot -> mutex );
1337
+ MyReplicationSlot -> effective_xmin = xmin_horizon ;
1338
+ MyReplicationSlot -> data .xmin = xmin_horizon ;
1339
+ SpinLockRelease (& MyReplicationSlot -> mutex );
1340
+
1341
+ ReplicationSlotsComputeRequiredXmin (true);
1342
+
1343
+ LWLockRelease (ProcArrayLock );
1344
+
1345
+ /* Write this slot to disk */
1346
+ ReplicationSlotMarkDirty ();
1347
+ ReplicationSlotSave ();
1348
+ }
1349
+
1350
+ /*
1351
+ * Attempt to advance the xmin value of the replication slot used to retain
1352
+ * dead tuples for conflict detection.
1353
+ */
1354
+ static void
1355
+ advance_conflict_slot_xmin (FullTransactionId new_xmin )
1356
+ {
1357
+ FullTransactionId full_xmin ;
1358
+ FullTransactionId next_full_xid ;
1359
+
1360
+ Assert (MyReplicationSlot );
1361
+ Assert (FullTransactionIdIsValid (new_xmin ));
1362
+
1363
+ next_full_xid = ReadNextFullTransactionId ();
1364
+
1365
+ /*
1366
+ * Compute FullTransactionId for the current xmin. This handles the case
1367
+ * where transaction ID wraparound has occurred.
1368
+ */
1369
+ full_xmin = FullTransactionIdFromAllowableAt (next_full_xid ,
1370
+ MyReplicationSlot -> data .xmin );
1371
+
1372
+ if (FullTransactionIdPrecedesOrEquals (new_xmin , full_xmin ))
1373
+ return ;
1374
+
1375
+ SpinLockAcquire (& MyReplicationSlot -> mutex );
1376
+ MyReplicationSlot -> data .xmin = XidFromFullTransactionId (new_xmin );
1377
+ SpinLockRelease (& MyReplicationSlot -> mutex );
1378
+
1379
+ /* first write new xmin to disk, so we know what's up after a crash */
1380
+
1381
+ ReplicationSlotMarkDirty ();
1382
+ ReplicationSlotSave ();
1383
+ elog (DEBUG1 , "updated xmin: %u" , MyReplicationSlot -> data .xmin );
1384
+
1385
+ /*
1386
+ * Now the new xmin is safely on disk, we can let the global value
1387
+ * advance. We do not take ProcArrayLock or similar since we only advance
1388
+ * xmin here and there's not much harm done by a concurrent computation
1389
+ * missing that.
1390
+ */
1391
+ SpinLockAcquire (& MyReplicationSlot -> mutex );
1392
+ MyReplicationSlot -> effective_xmin = MyReplicationSlot -> data .xmin ;
1393
+ SpinLockRelease (& MyReplicationSlot -> mutex );
1394
+
1395
+ ReplicationSlotsComputeRequiredXmin (false);
1396
+
1397
+ return ;
1398
+ }
1399
+
1400
+ /*
1401
+ * Drop the replication slot used to retain dead tuples for conflict detection,
1402
+ * if it exists.
1403
+ */
1404
+ static void
1405
+ drop_conflict_slot_if_exists (void )
1406
+ {
1407
+ if (MyReplicationSlot )
1408
+ ReplicationSlotDropAcquired ();
1409
+ else if (SearchNamedReplicationSlot (CONFLICT_DETECTION_SLOT , true))
1410
+ ReplicationSlotDrop (CONFLICT_DETECTION_SLOT , true);
1411
+ }
1412
+
1237
1413
/*
1238
1414
* Is current process the logical replication launcher?
1239
1415
*/
0 commit comments