Skip to content

Commit e7ae97b

Browse files
Zhijie HouCommitfest Bot
Zhijie Hou
authored and
Commitfest Bot
committed
Re-create the replication slot if the conflict retention duration reduced
The patch allows the launcher to drop and re-create the invalidated slot, if at least one apply worker has confirmed that the retention duration is now within the max_conflict_retention_duration.
1 parent d0081c7 commit e7ae97b

File tree

3 files changed

+61
-95
lines changed

3 files changed

+61
-95
lines changed

doc/src/sgml/config.sgml

+4-1
Original file line numberDiff line numberDiff line change
@@ -5403,7 +5403,10 @@ ANY <replaceable class="parameter">num_sync</replaceable> ( <replaceable class="
54035403
<literal>max_conflict_retention_duration</literal>. If the replication
54045404
slot is invalidated, you can disable
54055405
<literal>retain_conflict_info</literal> and re-enable it after
5406-
confirming this replication slot has been dropped.
5406+
confirming this replication slot has been dropped. Alternatively, the
5407+
invalidated slot will be automatically dropped and re-created once the
5408+
apply worker confirms that the retention duration is within the
5409+
specified limit.
54075410
</para>
54085411
<para>
54095412
This option is effective only if a subscription with

src/backend/replication/logical/launcher.c

+21-16
Original file line numberDiff line numberDiff line change
@@ -449,7 +449,8 @@ logicalrep_worker_launch(LogicalRepWorkerType wtype,
449449
worker->leader_pid = is_parallel_apply_worker ? MyProcPid : InvalidPid;
450450
worker->parallel_apply = is_parallel_apply_worker;
451451
worker->oldest_nonremovable_xid = InvalidFullTransactionId;
452-
worker->stop_conflict_info_retention = false;
452+
worker->stop_conflict_info_retention = (MyReplicationSlot &&
453+
MyReplicationSlot->data.invalidated != RS_INVAL_NONE);
453454
worker->last_lsn = InvalidXLogRecPtr;
454455
TIMESTAMP_NOBEGIN(worker->last_send_time);
455456
TIMESTAMP_NOBEGIN(worker->last_recv_time);
@@ -1287,27 +1288,20 @@ ApplyLauncherMain(Datum main_arg)
12871288
}
12881289
}
12891290

1290-
/*
1291-
* Do nothing if the replication slot is invalidated due to conflict
1292-
* retention duration.
1293-
*/
1294-
if (nretain_conflict_info &&
1295-
MyReplicationSlot->data.invalidated != RS_INVAL_NONE)
1296-
{
1297-
Assert(MyReplicationSlot->data.invalidated == RS_INVAL_CONFLICT_RETENTION_DURATION);
1298-
}
1299-
13001291
/*
13011292
* Invalidate the conflict slot if all workers with
13021293
* retain_conflict_info enabled have stopped further conflict
13031294
* retention.
13041295
*/
1305-
else if (nstop_retention && nretain_conflict_info == nstop_retention)
1296+
if (nstop_retention && nretain_conflict_info == nstop_retention)
13061297
{
1307-
ReplicationSlotRelease();
1308-
InvalidateObsoleteReplicationSlots(RS_INVAL_CONFLICT_RETENTION_DURATION,
1309-
InvalidXLogRecPtr, InvalidOid,
1310-
InvalidTransactionId);
1298+
if (MyReplicationSlot->data.invalidated == RS_INVAL_NONE)
1299+
{
1300+
ReplicationSlotRelease();
1301+
InvalidateObsoleteReplicationSlots(RS_INVAL_CONFLICT_RETENTION_DURATION,
1302+
InvalidXLogRecPtr, InvalidOid,
1303+
InvalidTransactionId);
1304+
}
13111305
}
13121306

13131307
/*
@@ -1316,6 +1310,17 @@ ApplyLauncherMain(Datum main_arg)
13161310
*/
13171311
else if (nretain_conflict_info)
13181312
{
1313+
/*
1314+
* Re-create the replication slot if it was invalidated because
1315+
* all workers stopped conflict retention, and an apply worker has
1316+
* now resumed the process.
1317+
*/
1318+
if (MyReplicationSlot->data.invalidated != RS_INVAL_NONE)
1319+
{
1320+
drop_conflict_slot_if_exists();
1321+
create_conflict_slot_if_not_exists();
1322+
}
1323+
13191324
if (can_advance_xmin)
13201325
advance_conflict_slot_xmin(xmin);
13211326

src/backend/replication/logical/worker.c

+36-78
Original file line numberDiff line numberDiff line change
@@ -456,7 +456,6 @@ static void reset_conflict_info_fields(RetainConflictInfoData *data);
456456
static bool should_stop_conflict_info_retention(RetainConflictInfoData *data);
457457
static void adjust_xid_advance_interval(RetainConflictInfoData *data,
458458
bool new_xid_found);
459-
static void update_conflict_retention_status(void);
460459

461460
static void apply_handle_commit_internal(LogicalRepCommitData *commit_data);
462461
static void apply_handle_insert_internal(ApplyExecutionData *edata,
@@ -4126,10 +4125,6 @@ maybe_advance_nonremovable_xid(RetainConflictInfoData *data,
41264125
if (!am_leader_apply_worker())
41274126
return;
41284127

4129-
/* Exit early if we have already stopped retaining */
4130-
if (MyLogicalRepWorker->stop_conflict_info_retention)
4131-
return;
4132-
41334128
switch (data->phase)
41344129
{
41354130
case RCI_GET_CANDIDATE_XID:
@@ -4374,6 +4369,19 @@ wait_for_local_flush(RetainConflictInfoData *data)
43744369
if (last_flushpos < data->remote_lsn)
43754370
return;
43764371

4372+
/*
4373+
* If conflict info retention was previously stopped due to a timeout, and
4374+
* the time required to advance the non-removable transaction ID has now
4375+
* decreased to within acceptable limits, log a message.
4376+
*/
4377+
if (MyLogicalRepWorker->stop_conflict_info_retention)
4378+
ereport(LOG,
4379+
errmsg("logical replication worker for subscription \"%s\" will resume retaining conflict information",
4380+
MySubscription->name),
4381+
errdetail("The time spent applying changes up to LSN %X/%X is now within the maximum limit of %u ms.",
4382+
LSN_FORMAT_ARGS(data->remote_lsn),
4383+
max_conflict_retention_duration));
4384+
43774385
/*
43784386
* Reaching here means the remote WAL position has been received, and
43794387
* all transactions up to that position on the publisher have been
@@ -4382,6 +4390,7 @@ wait_for_local_flush(RetainConflictInfoData *data)
43824390
*/
43834391
SpinLockAcquire(&MyLogicalRepWorker->relmutex);
43844392
MyLogicalRepWorker->oldest_nonremovable_xid = data->candidate_xid;
4393+
MyLogicalRepWorker->stop_conflict_info_retention = false;
43854394
SpinLockRelease(&MyLogicalRepWorker->relmutex);
43864395

43874396
elog(DEBUG2, "confirmed remote flush up to %X/%X: new oldest_nonremovable_xid %u",
@@ -4423,9 +4432,8 @@ reset_conflict_info_fields(RetainConflictInfoData *data)
44234432
* LogicalRepWorker->stop_conflict_info_retention to true, notify the launcher to
44244433
* invalidate the slot, and return true. Return false otherwise.
44254434
*
4426-
* Currently, the retention will not resume automatically unless user manually
4427-
* disables retain_conflict_info and re-enables it after confirming that the
4428-
* replication slot has been dropped.
4435+
* The retention will resume automatically if the worker has confirmed that the
4436+
* retention duration is now within the max_conflict_retention_duration.
44294437
*/
44304438
static bool
44314439
should_stop_conflict_info_retention(RetainConflictInfoData *data)
@@ -4450,19 +4458,26 @@ should_stop_conflict_info_retention(RetainConflictInfoData *data)
44504458
max_conflict_retention_duration))
44514459
return false;
44524460

4453-
ereport(LOG,
4454-
errmsg("logical replication worker for subscription \"%s\" will stop retaining conflict information",
4455-
MySubscription->name),
4456-
errdetail("The time spent advancing the non-removable transaction ID has exceeded the maximum limit of %u ms.",
4457-
max_conflict_retention_duration));
4458-
4459-
SpinLockAcquire(&MyLogicalRepWorker->relmutex);
4460-
MyLogicalRepWorker->oldest_nonremovable_xid = InvalidFullTransactionId;
4461-
MyLogicalRepWorker->stop_conflict_info_retention = true;
4462-
SpinLockRelease(&MyLogicalRepWorker->relmutex);
4463-
4464-
/* Notify launcher to invalidate the conflict slot */
4465-
ApplyLauncherWakeup();
4461+
/*
4462+
* Log a message and reset relevant data when the worker is about to stop
4463+
* retaining conflict information.
4464+
*/
4465+
if (!MyLogicalRepWorker->stop_conflict_info_retention)
4466+
{
4467+
ereport(LOG,
4468+
errmsg("logical replication worker for subscription \"%s\" will stop retaining conflict information",
4469+
MySubscription->name),
4470+
errdetail("The time spent advancing the non-removable transaction ID has exceeded the maximum limit of %u ms.",
4471+
max_conflict_retention_duration));
4472+
4473+
SpinLockAcquire(&MyLogicalRepWorker->relmutex);
4474+
MyLogicalRepWorker->oldest_nonremovable_xid = InvalidFullTransactionId;
4475+
MyLogicalRepWorker->stop_conflict_info_retention = true;
4476+
SpinLockRelease(&MyLogicalRepWorker->relmutex);
4477+
4478+
/* Notify launcher to invalidate the conflict slot */
4479+
ApplyLauncherWakeup();
4480+
}
44664481

44674482
reset_conflict_info_fields(data);
44684483

@@ -4510,51 +4525,6 @@ adjust_xid_advance_interval(RetainConflictInfoData *data, bool new_xid_found)
45104525
}
45114526
}
45124527

4513-
/*
4514-
* Update the conflict retention status for the current apply worker. It checks
4515-
* whether the worker should stop retaining conflict information due to
4516-
* invalidation of the replication slot ("pg_conflict_detection").
4517-
*
4518-
* Currently, the replication slot is invalidated only if the duration for
4519-
* retaining conflict information exceeds the allowed maximum.
4520-
*/
4521-
static void
4522-
update_conflict_retention_status(void)
4523-
{
4524-
ReplicationSlotInvalidationCause cause = RS_INVAL_NONE;
4525-
ReplicationSlot *slot;
4526-
4527-
/* Exit early if retaining conflict information is not required */
4528-
if (!MySubscription->retainconflictinfo)
4529-
return;
4530-
4531-
/*
4532-
* Only the leader apply worker manages conflict retention (see
4533-
* maybe_advance_nonremovable_xid() for details).
4534-
*/
4535-
if (!am_leader_apply_worker())
4536-
return;
4537-
4538-
LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
4539-
4540-
slot = SearchNamedReplicationSlot(CONFLICT_DETECTION_SLOT, false);
4541-
4542-
if (slot)
4543-
{
4544-
SpinLockAcquire(&slot->mutex);
4545-
cause = slot->data.invalidated;
4546-
SpinLockRelease(&slot->mutex);
4547-
4548-
Assert(cause == RS_INVAL_NONE || cause == RS_INVAL_CONFLICT_RETENTION_DURATION);
4549-
}
4550-
4551-
LWLockRelease(ReplicationSlotControlLock);
4552-
4553-
SpinLockAcquire(&MyLogicalRepWorker->relmutex);
4554-
MyLogicalRepWorker->stop_conflict_info_retention = cause != RS_INVAL_NONE;
4555-
SpinLockRelease(&MyLogicalRepWorker->relmutex);
4556-
}
4557-
45584528
/*
45594529
* Exit routine for apply workers due to subscription parameter changes.
45604530
*/
@@ -4726,16 +4696,6 @@ maybe_reread_subscription(void)
47264696
CommitTransactionCommand();
47274697

47284698
MySubscriptionValid = true;
4729-
4730-
/*
4731-
* Update worker status to avoid unnecessary conflict retention if the
4732-
* replication slot ("pg_conflict_detection") was invalidated prior to
4733-
* enabling the retain_conflict_info option. This is also necessary to
4734-
* restart conflict retention if the user has disabled and subsequently
4735-
* re-enabled the retain_conflict_info option, resulting in the
4736-
* replication slot being recreated.
4737-
*/
4738-
update_conflict_retention_status();
47394699
}
47404700

47414701
/*
@@ -5382,8 +5342,6 @@ InitializeLogRepWorker(void)
53825342
MySubscription->name)));
53835343

53845344
CommitTransactionCommand();
5385-
5386-
update_conflict_retention_status();
53875345
}
53885346

53895347
/*

0 commit comments

Comments
 (0)