summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/backend/commands/subscriptioncmds.c10
-rw-r--r--src/backend/replication/logical/tablesync.c58
-rw-r--r--src/test/subscription/t/004_sync.pl6
3 files changed, 39 insertions, 35 deletions
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index abbcaff0838..921cd9674f1 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -1099,10 +1099,10 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data,
*
* It is possible that the origin is not yet created for
* tablesync worker, this can happen for the states before
- * SUBREL_STATE_FINISHEDCOPY. The tablesync worker or
- * apply worker can also concurrently try to drop the
- * origin and by this time the origin might be already
- * removed. For these reasons, passing missing_ok = true.
+ * SUBREL_STATE_DATASYNC. The tablesync worker or apply
+ * worker can also concurrently try to drop the origin and
+ * by this time the origin might be already removed. For
+ * these reasons, passing missing_ok = true.
*/
ReplicationOriginNameForLogicalRep(sub->oid, relid, originname,
sizeof(originname));
@@ -2174,7 +2174,7 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
*
* It is possible that the origin is not yet created for tablesync
* worker so passing missing_ok = true. This can happen for the states
- * before SUBREL_STATE_FINISHEDCOPY.
+ * before SUBREL_STATE_DATASYNC.
*/
ReplicationOriginNameForLogicalRep(subid, relid, originname,
sizeof(originname));
diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index 6bb0cbeedad..2522e372036 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -1333,13 +1333,27 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
MyLogicalRepWorker->relstate_lsn = InvalidXLogRecPtr;
SpinLockRelease(&MyLogicalRepWorker->relmutex);
- /* Update the state and make it visible to others. */
+ /*
+ * Update the state, create the replication origin, and make them visible
+ * to others.
+ */
StartTransactionCommand();
UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
MyLogicalRepWorker->relid,
MyLogicalRepWorker->relstate,
MyLogicalRepWorker->relstate_lsn,
false);
+
+ /*
+ * Create the replication origin in a separate transaction from the one
+ * that sets up the origin in shared memory. This prevents the risk that
+ * changes to the origin in shared memory cannot be rolled back if the
+ * transaction aborts.
+ */
+ originid = replorigin_by_name(originname, true);
+ if (!OidIsValid(originid))
+ originid = replorigin_create(originname);
+
CommitTransactionCommand();
pgstat_report_stat(true);
@@ -1379,37 +1393,21 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
CRS_USE_SNAPSHOT, origin_startpos);
/*
- * Setup replication origin tracking. The purpose of doing this before the
- * copy is to avoid doing the copy again due to any error in setting up
- * origin tracking.
+ * Advance the origin to the LSN got from walrcv_create_slot and then set
+ * up the origin. The advancement is WAL logged for the purpose of
+ * recovery. Locks are to prevent the replication origin from vanishing
+ * while advancing.
+ *
+ * The purpose of doing these before the copy is to avoid doing the copy
+ * again due to any error in advancing or setting up origin tracking.
*/
- originid = replorigin_by_name(originname, true);
- if (!OidIsValid(originid))
- {
- /*
- * Origin tracking does not exist, so create it now.
- *
- * Then advance to the LSN got from walrcv_create_slot. This is WAL
- * logged for the purpose of recovery. Locks are to prevent the
- * replication origin from vanishing while advancing.
- */
- originid = replorigin_create(originname);
-
- LockRelationOid(ReplicationOriginRelationId, RowExclusiveLock);
- replorigin_advance(originid, *origin_startpos, InvalidXLogRecPtr,
- true /* go backward */ , true /* WAL log */ );
- UnlockRelationOid(ReplicationOriginRelationId, RowExclusiveLock);
+ LockRelationOid(ReplicationOriginRelationId, RowExclusiveLock);
+ replorigin_advance(originid, *origin_startpos, InvalidXLogRecPtr,
+ true /* go backward */ , true /* WAL log */ );
+ UnlockRelationOid(ReplicationOriginRelationId, RowExclusiveLock);
- replorigin_session_setup(originid, 0);
- replorigin_session_origin = originid;
- }
- else
- {
- ereport(ERROR,
- (errcode(ERRCODE_DUPLICATE_OBJECT),
- errmsg("replication origin \"%s\" already exists",
- originname)));
- }
+ replorigin_session_setup(originid, 0);
+ replorigin_session_origin = originid;
/*
* If the user did not opt to run as the owner of the subscription
diff --git a/src/test/subscription/t/004_sync.pl b/src/test/subscription/t/004_sync.pl
index d5eac05a3b3..2c6ae2d023a 100644
--- a/src/test/subscription/t/004_sync.pl
+++ b/src/test/subscription/t/004_sync.pl
@@ -172,6 +172,12 @@ ok( $node_publisher->poll_query_until(
'postgres', 'SELECT count(*) = 0 FROM pg_replication_slots'),
'DROP SUBSCRIPTION during error can clean up the slots on the publisher');
+# After dropping the subscription, all replication origins, whether created by
+# an apply worker or table sync worker, should have been cleaned up.
+$result = $node_subscriber->safe_psql('postgres',
+ "SELECT count(*) FROM pg_replication_origin_status");
+is($result, qq(0), 'all replication origins have been cleaned up');
+
$node_subscriber->stop('fast');
$node_publisher->stop('fast');