From 26772eb30bc1f47613a8e661349943eae677e3a7 Mon Sep 17 00:00:00 2001
From: Antonin Houska <ah@cybertec.at>
Date: Wed, 11 Dec 2024 19:22:42 +0100
Subject: [PATCH 8/8] Call logical_rewrite_heap_tuple() when applying
 concurrent data changes.

This was implemented for the sake of completeness, but I think it's currently
not needed. Possible use cases could be:

1. VACUUM FULL / CLUSTER CONCURRENTLY can process system catalogs.

System catalogs are scanned using a historic snapshot during logical decoding,
and the "combo CIDs" information is needed for that. Since "combo CID" is
associated with the "file locator" and that locator is changed by VACUUM FULL
/ CLUSTER, these commands must record the information on individual tuples
being moved from the old file to the new one. This is what
logical_rewrite_heap_tuple() does.

However, the logical decoding subsystem currently does not support decoding of
data changes in the system catalog. Therefore, the CONCURRENTLY option cannot
be used for system catalogs.

2. VACUUM FULL / CLUSTER CONCURRENTLY is processing a relation, but once it
has released all the locks (in order to get the exclusive lock), another
backend runs VACUUM FULL / CLUSTER CONCURRENTLY on the same table. Since the
relation is treated as a system catalog while these commands are processing it
(so it can be scanned using a historic snapshot during the "initial load"), it
is important that the 2nd backend does not break decoding of the "combo CIDs"
performed by the 1st backend.

However, it's not practical to let multiple backends run VACUUM FULL / CLUSTER
CONCURRENTLY on the same relation, so we forbid that.
---
 src/backend/access/heap/heapam_handler.c      |   2 +-
 src/backend/access/heap/rewriteheap.c         |  65 ++++++-----
 src/backend/commands/cluster.c                | 110 +++++++++++++++---
 src/backend/replication/logical/decode.c      |  41 ++++++-
 .../pgoutput_cluster/pgoutput_cluster.c       |  21 ++--
 src/include/access/rewriteheap.h              |   5 +-
 src/include/commands/cluster.h                |   3 +
 src/include/replication/reorderbuffer.h       |   7 ++
 8 files changed, 194 insertions(+), 60 deletions(-)

diff --git a/src/backend/access/heap/heapam_handler.c b/src/backend/access/heap/heapam_handler.c
index 585e97335c..c01a6192c2 100644
--- a/src/backend/access/heap/heapam_handler.c
+++ b/src/backend/access/heap/heapam_handler.c
@@ -730,7 +730,7 @@ heapam_relation_copy_for_cluster(Relation OldHeap, Relation NewHeap,
 
 	/* Initialize the rewrite operation */
 	rwstate = begin_heap_rewrite(OldHeap, NewHeap, OldestXmin, *xid_cutoff,
-								 *multi_cutoff);
+								 *multi_cutoff, true);
 
 
 	/* Set up sorting if wanted */
diff --git a/src/backend/access/heap/rewriteheap.c b/src/backend/access/heap/rewriteheap.c
index 09ef220449..86881e8638 100644
--- a/src/backend/access/heap/rewriteheap.c
+++ b/src/backend/access/heap/rewriteheap.c
@@ -214,10 +214,8 @@ static void raw_heap_insert(RewriteState state, HeapTuple tup);
 
 /* internal logical remapping prototypes */
 static void logical_begin_heap_rewrite(RewriteState state);
-static void logical_rewrite_heap_tuple(RewriteState state, ItemPointerData old_tid, HeapTuple new_tuple);
 static void logical_end_heap_rewrite(RewriteState state);
 
-
 /*
  * Begin a rewrite of a table
  *
@@ -226,18 +224,19 @@ static void logical_end_heap_rewrite(RewriteState state);
  * oldest_xmin	xid used by the caller to determine which tuples are dead
  * freeze_xid	xid before which tuples will be frozen
  * cutoff_multi	multixact before which multis will be removed
+ * tid_chains	need to maintain TID chains?
  *
  * Returns an opaque RewriteState, allocated in current memory context,
  * to be used in subsequent calls to the other functions.
  */
 RewriteState
 begin_heap_rewrite(Relation old_heap, Relation new_heap, TransactionId oldest_xmin,
-				   TransactionId freeze_xid, MultiXactId cutoff_multi)
+				   TransactionId freeze_xid, MultiXactId cutoff_multi,
+				   bool tid_chains)
 {
 	RewriteState state;
 	MemoryContext rw_cxt;
 	MemoryContext old_cxt;
-	HASHCTL		hash_ctl;
 
 	/*
 	 * To ease cleanup, make a separate context that will contain the
@@ -262,29 +261,34 @@ begin_heap_rewrite(Relation old_heap, Relation new_heap, TransactionId oldest_xm
 	state->rs_cxt = rw_cxt;
 	state->rs_bulkstate = smgr_bulk_start_rel(new_heap, MAIN_FORKNUM);
 
-	/* Initialize hash tables used to track update chains */
-	hash_ctl.keysize = sizeof(TidHashKey);
-	hash_ctl.entrysize = sizeof(UnresolvedTupData);
-	hash_ctl.hcxt = state->rs_cxt;
-
-	state->rs_unresolved_tups =
-		hash_create("Rewrite / Unresolved ctids",
-					128,		/* arbitrary initial size */
-					&hash_ctl,
-					HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
-
-	hash_ctl.entrysize = sizeof(OldToNewMappingData);
+	if (tid_chains)
+	{
+		HASHCTL		hash_ctl;
+
+		/* Initialize hash tables used to track update chains */
+		hash_ctl.keysize = sizeof(TidHashKey);
+		hash_ctl.entrysize = sizeof(UnresolvedTupData);
+		hash_ctl.hcxt = state->rs_cxt;
+
+		state->rs_unresolved_tups =
+			hash_create("Rewrite / Unresolved ctids",
+						128,		/* arbitrary initial size */
+						&hash_ctl,
+						HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
+
+		hash_ctl.entrysize = sizeof(OldToNewMappingData);
+
+		state->rs_old_new_tid_map =
+			hash_create("Rewrite / Old to new tid map",
+						128,		/* arbitrary initial size */
+						&hash_ctl,
+						HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
+	}
 
-	state->rs_old_new_tid_map =
-		hash_create("Rewrite / Old to new tid map",
-					128,		/* arbitrary initial size */
-					&hash_ctl,
-					HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
+	logical_begin_heap_rewrite(state);
 
 	MemoryContextSwitchTo(old_cxt);
 
-	logical_begin_heap_rewrite(state);
-
 	return state;
 }
 
@@ -303,12 +307,15 @@ end_heap_rewrite(RewriteState state)
 	 * Write any remaining tuples in the UnresolvedTups table. If we have any
 	 * left, they should in fact be dead, but let's err on the safe side.
 	 */
-	hash_seq_init(&seq_status, state->rs_unresolved_tups);
-
-	while ((unresolved = hash_seq_search(&seq_status)) != NULL)
+	if (state->rs_unresolved_tups)
 	{
-		ItemPointerSetInvalid(&unresolved->tuple->t_data->t_ctid);
-		raw_heap_insert(state, unresolved->tuple);
+		hash_seq_init(&seq_status, state->rs_unresolved_tups);
+
+		while ((unresolved = hash_seq_search(&seq_status)) != NULL)
+		{
+			ItemPointerSetInvalid(&unresolved->tuple->t_data->t_ctid);
+			raw_heap_insert(state, unresolved->tuple);
+		}
 	}
 
 	/* Write the last page, if any */
@@ -995,7 +1002,7 @@ logical_rewrite_log_mapping(RewriteState state, TransactionId xid,
  * Perform logical remapping for a tuple that's mapped from old_tid to
  * new_tuple->t_self by rewrite_heap_tuple() if necessary for the tuple.
  */
-static void
+void
 logical_rewrite_heap_tuple(RewriteState state, ItemPointerData old_tid,
 						   HeapTuple new_tuple)
 {
diff --git a/src/backend/commands/cluster.c b/src/backend/commands/cluster.c
index 53b0f7f9b3..59dffc3bdd 100644
--- a/src/backend/commands/cluster.c
+++ b/src/backend/commands/cluster.c
@@ -23,6 +23,7 @@
 #include "access/heapam.h"
 #include "access/multixact.h"
 #include "access/relscan.h"
+#include "access/rewriteheap.h"
 #include "access/tableam.h"
 #include "access/toast_internals.h"
 #include "access/transam.h"
@@ -200,17 +201,21 @@ static HeapTuple get_changed_tuple(char *change);
 static void apply_concurrent_changes(ClusterDecodingState *dstate,
 									 Relation rel, ScanKey key, int nkeys,
 									 IndexInsertState *iistate,
-									 struct timeval *must_complete);
+									 struct timeval *must_complete,
+									 RewriteState rwstate);
 static void apply_concurrent_insert(Relation rel, ConcurrentChange *change,
 									HeapTuple tup, IndexInsertState *iistate,
-									TupleTableSlot *index_slot);
+									TupleTableSlot *index_slot,
+									RewriteState rwstate);
 static void apply_concurrent_update(Relation rel, HeapTuple tup,
 									HeapTuple tup_target,
 									ConcurrentChange *change,
 									IndexInsertState *iistate,
-									TupleTableSlot *index_slot);
+									TupleTableSlot *index_slot,
+									RewriteState rwstate);
 static void apply_concurrent_delete(Relation rel, HeapTuple tup_target,
-									ConcurrentChange *change);
+									ConcurrentChange *change,
+									RewriteState rwstate);
 static HeapTuple find_target_tuple(Relation rel, ScanKey key, int nkeys,
 								   HeapTuple tup_key,
 								   Snapshot snapshot,
@@ -224,7 +229,8 @@ static bool process_concurrent_changes(LogicalDecodingContext *ctx,
 									   ScanKey ident_key,
 									   int ident_key_nentries,
 									   IndexInsertState *iistate,
-									   struct timeval *must_complete);
+									   struct timeval *must_complete,
+									   RewriteState rwstate);
 static bool processing_time_elapsed(struct timeval *must_complete);
 static IndexInsertState *get_index_insert_state(Relation relation,
 												Oid ident_index_id);
@@ -3118,7 +3124,7 @@ cluster_decode_concurrent_changes(LogicalDecodingContext *ctx,
 static void
 apply_concurrent_changes(ClusterDecodingState *dstate, Relation rel,
 						 ScanKey key, int nkeys, IndexInsertState *iistate,
-						 struct timeval *must_complete)
+						 struct timeval *must_complete, RewriteState rwstate)
 {
 	TupleTableSlot *index_slot, *ident_slot;
 	HeapTuple	tup_old = NULL;
@@ -3192,7 +3198,8 @@ apply_concurrent_changes(ClusterDecodingState *dstate, Relation rel,
 		{
 			Assert(tup_old == NULL);
 
-			apply_concurrent_insert(rel, &change, tup, iistate, index_slot);
+			apply_concurrent_insert(rel, &change, tup, iistate, index_slot,
+									rwstate);
 
 			pfree(tup);
 		}
@@ -3200,7 +3207,7 @@ apply_concurrent_changes(ClusterDecodingState *dstate, Relation rel,
 				 change.kind == CHANGE_DELETE)
 		{
 			IndexScanDesc	ind_scan = NULL;
-			HeapTuple	tup_key;
+			HeapTuple	tup_key, tup_exist_cp;
 
 			if (change.kind == CHANGE_UPDATE_NEW)
 			{
@@ -3242,11 +3249,23 @@ apply_concurrent_changes(ClusterDecodingState *dstate, Relation rel,
 			if (tup_exist == NULL)
 				elog(ERROR, "Failed to find target tuple");
 
+			/*
+			 * Update the mapping for xmax of the old version.
+			 *
+			 * Use a copy ('tup_exist' can point to shared buffer) with xmin
+			 * invalid because mapping of that should have been written on
+			 * insertion.
+			 */
+			tup_exist_cp = heap_copytuple(tup_exist);
+			HeapTupleHeaderSetXmin(tup_exist_cp->t_data, InvalidTransactionId);
+			logical_rewrite_heap_tuple(rwstate, change.old_tid, tup_exist_cp);
+			pfree(tup_exist_cp);
+
 			if (change.kind == CHANGE_UPDATE_NEW)
 				apply_concurrent_update(rel, tup, tup_exist, &change, iistate,
-										index_slot);
+										index_slot, rwstate);
 			else
-				apply_concurrent_delete(rel, tup_exist, &change);
+				apply_concurrent_delete(rel, tup_exist, &change, rwstate);
 
 			ResetClusterCurrentXids();
 
@@ -3299,9 +3318,12 @@ apply_concurrent_changes(ClusterDecodingState *dstate, Relation rel,
 
 static void
 apply_concurrent_insert(Relation rel, ConcurrentChange *change, HeapTuple tup,
-						IndexInsertState *iistate, TupleTableSlot *index_slot)
+						IndexInsertState *iistate, TupleTableSlot *index_slot,
+						RewriteState rwstate)
 {
+	HeapTupleHeader	tup_hdr = tup->t_data;
 	Snapshot	snapshot = change->snapshot;
+	ItemPointerData		old_tid;
 	List	   *recheck;
 
 	/*
@@ -3311,6 +3333,9 @@ apply_concurrent_insert(Relation rel, ConcurrentChange *change, HeapTuple tup,
 	 */
 	SetClusterCurrentXids(snapshot->subxip, snapshot->subxcnt);
 
+	/* Remember location in the old heap. */
+	ItemPointerCopy(&tup_hdr->t_ctid, &old_tid);
+
 	/*
 	 * Write the tuple into the new heap.
 	 *
@@ -3326,6 +3351,14 @@ apply_concurrent_insert(Relation rel, ConcurrentChange *change, HeapTuple tup,
 	heap_insert(rel, tup, change->xid, snapshot->curcid - 1,
 				HEAP_INSERT_NO_LOGICAL, NULL);
 
+	/*
+	 * Update the mapping for xmin. (xmax should be invalid). This is needed
+	 * because, during the processing, the table is considered an "user
+	 * catalog".
+	 */
+	Assert(!TransactionIdIsValid(HeapTupleHeaderGetRawXmax(tup->t_data)));
+	logical_rewrite_heap_tuple(rwstate, old_tid, tup);
+
 	/*
 	 * Update indexes.
 	 *
@@ -3359,15 +3392,22 @@ apply_concurrent_insert(Relation rel, ConcurrentChange *change, HeapTuple tup,
 static void
 apply_concurrent_update(Relation rel, HeapTuple tup, HeapTuple tup_target,
 						ConcurrentChange *change, IndexInsertState *iistate,
-						TupleTableSlot *index_slot)
+						TupleTableSlot *index_slot, RewriteState rwstate)
 {
 	List	   *recheck;
 	LockTupleMode	lockmode;
 	TU_UpdateIndexes	update_indexes;
+	ItemPointerData		tid_new_old_heap, tid_old_new_heap;
 	TM_Result	res;
 	Snapshot snapshot	= change->snapshot;
 	TM_FailureData tmfd;
 
+	/* Location of the new tuple in the old heap. */
+	ItemPointerCopy(&tup->t_data->t_ctid, &tid_new_old_heap);
+
+	/* Location of the existing tuple in the new heap. */
+	ItemPointerCopy(&tup_target->t_self, &tid_old_new_heap);
+
 	/*
 	 * Write the new tuple into the new heap. ('tup' gets the TID assigned
 	 * here.)
@@ -3377,7 +3417,7 @@ apply_concurrent_update(Relation rel, HeapTuple tup, HeapTuple tup_target,
 	Assert(snapshot->curcid != InvalidCommandId &&
 		   snapshot->curcid > FirstCommandId);
 
-	res = heap_update(rel, &tup_target->t_self, tup,
+	res = heap_update(rel, &tid_old_new_heap, tup,
 					  change->xid, snapshot->curcid - 1,
 					  InvalidSnapshot,
 					  false, /* no wait - only we are doing changes */
@@ -3387,6 +3427,10 @@ apply_concurrent_update(Relation rel, HeapTuple tup, HeapTuple tup_target,
 	if (res != TM_Ok)
 		ereport(ERROR, (errmsg("failed to apply concurrent UPDATE")));
 
+	/* Update the mapping for xmin of the new version. */
+	Assert(!TransactionIdIsValid(HeapTupleHeaderGetRawXmax(tup->t_data)));
+	logical_rewrite_heap_tuple(rwstate, tid_new_old_heap, tup);
+
 	ExecStoreHeapTuple(tup, index_slot, false);
 
 	if (update_indexes != TU_None)
@@ -3410,8 +3454,9 @@ apply_concurrent_update(Relation rel, HeapTuple tup, HeapTuple tup_target,
 
 static void
 apply_concurrent_delete(Relation rel, HeapTuple tup_target,
-						ConcurrentChange *change)
+						ConcurrentChange *change, RewriteState rwstate)
 {
+	ItemPointerData		tid_old_new_heap;
 	TM_Result	res;
 	TM_FailureData tmfd;
 	Snapshot	snapshot = change->snapshot;
@@ -3420,7 +3465,10 @@ apply_concurrent_delete(Relation rel, HeapTuple tup_target,
 	Assert(snapshot->curcid != InvalidCommandId &&
 		   snapshot->curcid > FirstCommandId);
 
-	res = heap_delete(rel, &tup_target->t_self, change->xid,
+	/* Location of the existing tuple in the new heap. */
+	ItemPointerCopy(&tup_target->t_self, &tid_old_new_heap);
+
+	res = heap_delete(rel, &tid_old_new_heap, change->xid,
 					  snapshot->curcid - 1, InvalidSnapshot, false,
 					  &tmfd, false,
 					  /* wal_logical */
@@ -3501,7 +3549,8 @@ static bool
 process_concurrent_changes(LogicalDecodingContext *ctx, XLogRecPtr end_of_wal,
 						   Relation rel_dst, Relation rel_src, ScanKey ident_key,
 						   int ident_key_nentries, IndexInsertState *iistate,
-						   struct timeval *must_complete)
+						   struct timeval *must_complete,
+						   RewriteState rwstate)
 {
 	ClusterDecodingState *dstate;
 
@@ -3534,7 +3583,8 @@ process_concurrent_changes(LogicalDecodingContext *ctx, XLogRecPtr end_of_wal,
 			rel_dst->rd_toastoid = rel_src->rd_rel->reltoastrelid;
 
 		apply_concurrent_changes(dstate, rel_dst, ident_key,
-								 ident_key_nentries, iistate, must_complete);
+								 ident_key_nentries, iistate, must_complete,
+								 rwstate);
 	}
 	PG_FINALLY();
 	{
@@ -3719,6 +3769,7 @@ rebuild_relation_finish_concurrent(Relation NewHeap, Relation OldHeap,
 	bool		is_system_catalog;
 	Oid		ident_idx_old, ident_idx_new;
 	IndexInsertState *iistate;
+	RewriteState	rwstate;
 	ScanKey		ident_key;
 	int		ident_key_nentries;
 	XLogRecPtr	wal_insert_ptr, end_of_wal;
@@ -3804,11 +3855,26 @@ rebuild_relation_finish_concurrent(Relation NewHeap, Relation OldHeap,
 	 * Apply concurrent changes first time, to minimize the time we need to
 	 * hold AccessExclusiveLock. (Quite some amount of WAL could have been
 	 * written during the data copying and index creation.)
+	 *
+	 * Now we are processing individual tuples, so pass false for
+	 * 'tid_chains'. Since rwstate is now only needed for
+	 * logical_begin_heap_rewrite(), none of the transaction IDs needs to be
+	 * valid.
 	 */
+	rwstate = begin_heap_rewrite(OldHeap, NewHeap,
+								 InvalidTransactionId,
+								 InvalidTransactionId,
+								 InvalidTransactionId,
+								 false);
 	process_concurrent_changes(ctx, end_of_wal, NewHeap,
 							   swap_toast_by_content ? OldHeap : NULL,
 							   ident_key, ident_key_nentries, iistate,
-							   NULL);
+							   NULL, rwstate);
+	/*
+	 * OldHeap will be closed, so we need to initialize rwstate again for the
+	 * next call of process_concurrent_changes().
+	 */
+	end_heap_rewrite(rwstate);
 
 	/*
 	 * Release the locks that allowed concurrent data changes, in order to
@@ -3930,6 +3996,11 @@ rebuild_relation_finish_concurrent(Relation NewHeap, Relation OldHeap,
 	end_of_wal = GetFlushRecPtr(NULL);
 
 	/* Apply the concurrent changes again. */
+	rwstate = begin_heap_rewrite(OldHeap, NewHeap,
+								 InvalidTransactionId,
+								 InvalidTransactionId,
+								 InvalidTransactionId,
+								 false);
 	/*
 	 * This time we have the exclusive lock on the table, so make sure that
 	 * cluster_max_xlock_time is not exceeded.
@@ -3957,11 +4028,12 @@ rebuild_relation_finish_concurrent(Relation NewHeap, Relation OldHeap,
 	if (!process_concurrent_changes(ctx, end_of_wal, NewHeap,
 									swap_toast_by_content ? OldHeap : NULL,
 									ident_key, ident_key_nentries, iistate,
-									t_end_ptr))
+									t_end_ptr, rwstate))
 		ereport(ERROR,
 				(errmsg("could not process concurrent data changes in time"),
 				 errhint("Please consider adjusting \"cluster_max_xlock_time\".")));
 
+	end_heap_rewrite(rwstate);
 
 	/* Remember info about rel before closing OldHeap */
 	relpersistence = OldHeap->rd_rel->relpersistence;
diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c
index 140b063a6c..5e3f85fe78 100644
--- a/src/backend/replication/logical/decode.c
+++ b/src/backend/replication/logical/decode.c
@@ -983,11 +983,13 @@ DecodeInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 	xl_heap_insert *xlrec;
 	ReorderBufferChange *change;
 	RelFileLocator target_locator;
+	BlockNumber		blknum;
+	HeapTupleHeader	tuphdr;
 
 	xlrec = (xl_heap_insert *) XLogRecGetData(r);
 
 	/* only interested in our database */
-	XLogRecGetBlockTag(r, 0, &target_locator, NULL, NULL);
+	XLogRecGetBlockTag(r, 0, &target_locator, NULL, &blknum);
 	if (target_locator.dbOid != ctx->slot->data.database)
 		return;
 
@@ -1012,6 +1014,13 @@ DecodeInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 
 	DecodeXLogTuple(tupledata, datalen, change->data.tp.newtuple);
 
+	/*
+	 * CTID is needed for logical_rewrite_heap_tuple(), when doing CLUSTER
+	 * CONCURRENTLY.
+	 */
+	tuphdr = change->data.tp.newtuple->t_data;
+	ItemPointerSet(&tuphdr->t_ctid, blknum, xlrec->offnum);
+
 	change->data.tp.clear_toast_afterwards = true;
 
 	ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), buf->origptr,
@@ -1033,11 +1042,14 @@ DecodeUpdate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 	ReorderBufferChange *change;
 	char	   *data;
 	RelFileLocator target_locator;
+	BlockNumber		old_blknum, new_blknum;
 
 	xlrec = (xl_heap_update *) XLogRecGetData(r);
 
+	/* Retrieve blknum, so that we can compose CTID below. */
+	XLogRecGetBlockTag(r, 0, &target_locator, NULL, &new_blknum);
+
 	/* only interested in our database */
-	XLogRecGetBlockTag(r, 0, &target_locator, NULL, NULL);
 	if (target_locator.dbOid != ctx->slot->data.database)
 		return;
 
@@ -1054,6 +1066,7 @@ DecodeUpdate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 	{
 		Size		datalen;
 		Size		tuplelen;
+		HeapTupleHeader	tuphdr;
 
 		data = XLogRecGetBlockData(r, 0, &datalen);
 
@@ -1063,6 +1076,13 @@ DecodeUpdate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 			ReorderBufferGetTupleBuf(ctx->reorder, tuplelen);
 
 		DecodeXLogTuple(data, datalen, change->data.tp.newtuple);
+
+		/*
+		 * CTID is needed for logical_rewrite_heap_tuple(), when doing CLUSTER
+		 * CONCURRENTLY.
+		 */
+		tuphdr = change->data.tp.newtuple->t_data;
+		ItemPointerSet(&tuphdr->t_ctid, new_blknum, xlrec->new_offnum);
 	}
 
 	if (xlrec->flags & XLH_UPDATE_CONTAINS_OLD)
@@ -1081,6 +1101,14 @@ DecodeUpdate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 		DecodeXLogTuple(data, datalen, change->data.tp.oldtuple);
 	}
 
+	/*
+	 * Remember the old tuple CTID, for the sake of
+	 * logical_rewrite_heap_tuple().
+	 */
+	if (!XLogRecGetBlockTagExtended(r, 1, NULL, NULL, &old_blknum, NULL))
+		old_blknum = new_blknum;
+	ItemPointerSet(&change->data.tp.old_tid, old_blknum, xlrec->old_offnum);
+
 	change->data.tp.clear_toast_afterwards = true;
 
 	ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), buf->origptr,
@@ -1099,11 +1127,12 @@ DecodeDelete(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 	xl_heap_delete *xlrec;
 	ReorderBufferChange *change;
 	RelFileLocator target_locator;
+	BlockNumber		blknum;
 
 	xlrec = (xl_heap_delete *) XLogRecGetData(r);
 
 	/* only interested in our database */
-	XLogRecGetBlockTag(r, 0, &target_locator, NULL, NULL);
+	XLogRecGetBlockTag(r, 0, &target_locator, NULL, &blknum);
 	if (target_locator.dbOid != ctx->slot->data.database)
 		return;
 
@@ -1135,6 +1164,12 @@ DecodeDelete(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 
 		DecodeXLogTuple((char *) xlrec + SizeOfHeapDelete,
 						datalen, change->data.tp.oldtuple);
+
+		/*
+		 * CTID is needed for logical_rewrite_heap_tuple(), when doing CLUSTER
+		 * CONCURRENTLY.
+		 */
+		ItemPointerSet(&change->data.tp.old_tid, blknum, xlrec->offnum);
 	}
 
 	change->data.tp.clear_toast_afterwards = true;
diff --git a/src/backend/replication/pgoutput_cluster/pgoutput_cluster.c b/src/backend/replication/pgoutput_cluster/pgoutput_cluster.c
index 8e915c55fb..f153d1b128 100644
--- a/src/backend/replication/pgoutput_cluster/pgoutput_cluster.c
+++ b/src/backend/replication/pgoutput_cluster/pgoutput_cluster.c
@@ -34,7 +34,7 @@ static void plugin_truncate(struct LogicalDecodingContext *ctx,
 							ReorderBufferChange *change);
 static void store_change(LogicalDecodingContext *ctx,
 						 ConcurrentChangeKind kind, HeapTuple tuple,
-						 TransactionId xid);
+						 TransactionId xid, ItemPointer old_tid);
 
 void
 _PG_output_plugin_init(OutputPluginCallbacks *cb)
@@ -169,7 +169,8 @@ plugin_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 				if (newtuple == NULL)
 					elog(ERROR, "Incomplete insert info.");
 
-				store_change(ctx, CHANGE_INSERT, newtuple, change->txn->xid);
+				store_change(ctx, CHANGE_INSERT, newtuple, change->txn->xid,
+							 NULL);
 			}
 			break;
 		case REORDER_BUFFER_CHANGE_UPDATE:
@@ -187,10 +188,10 @@ plugin_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 
 				if (oldtuple != NULL)
 					store_change(ctx, CHANGE_UPDATE_OLD, oldtuple,
-								 change->txn->xid);
+								 change->txn->xid, NULL);
 
 				store_change(ctx, CHANGE_UPDATE_NEW, newtuple,
-							 change->txn->xid);
+							 change->txn->xid, &change->data.tp.old_tid);
 			}
 			break;
 		case REORDER_BUFFER_CHANGE_DELETE:
@@ -203,7 +204,8 @@ plugin_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 				if (oldtuple == NULL)
 					elog(ERROR, "Incomplete delete info.");
 
-				store_change(ctx, CHANGE_DELETE, oldtuple, change->txn->xid);
+				store_change(ctx, CHANGE_DELETE, oldtuple, change->txn->xid,
+							 &change->data.tp.old_tid);
 			}
 			break;
 		default:
@@ -237,13 +239,13 @@ plugin_truncate(struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 	if (i == nrelations)
 		return;
 
-	store_change(ctx, CHANGE_TRUNCATE, NULL, InvalidTransactionId);
+	store_change(ctx, CHANGE_TRUNCATE, NULL, InvalidTransactionId, NULL);
 }
 
 /* Store concurrent data change. */
 static void
 store_change(LogicalDecodingContext *ctx, ConcurrentChangeKind kind,
-			 HeapTuple tuple, TransactionId xid)
+			 HeapTuple tuple, TransactionId xid, ItemPointer old_tid)
 {
 	ClusterDecodingState *dstate;
 	char	   *change_raw;
@@ -316,6 +318,11 @@ store_change(LogicalDecodingContext *ctx, ConcurrentChangeKind kind,
 	change.snapshot = dstate->snapshot;
 	dstate->snapshot->active_count++;
 
+	if (old_tid)
+		ItemPointerCopy(old_tid, &change.old_tid);
+	else
+		ItemPointerSetInvalid(&change.old_tid);
+
 	/* The data has been copied. */
 	if (flattened)
 		pfree(tuple);
diff --git a/src/include/access/rewriteheap.h b/src/include/access/rewriteheap.h
index 5866a26bdd..de62b6abf8 100644
--- a/src/include/access/rewriteheap.h
+++ b/src/include/access/rewriteheap.h
@@ -23,11 +23,14 @@ typedef struct RewriteStateData *RewriteState;
 
 extern RewriteState begin_heap_rewrite(Relation old_heap, Relation new_heap,
 									   TransactionId oldest_xmin, TransactionId freeze_xid,
-									   MultiXactId cutoff_multi);
+									   MultiXactId cutoff_multi, bool tid_chains);
 extern void end_heap_rewrite(RewriteState state);
 extern void rewrite_heap_tuple(RewriteState state, HeapTuple old_tuple,
 							   HeapTuple new_tuple);
 extern bool rewrite_heap_dead_tuple(RewriteState state, HeapTuple old_tuple);
+extern void logical_rewrite_heap_tuple(RewriteState state,
+									   ItemPointerData old_tid,
+									   HeapTuple new_tuple);
 
 /*
  * On-Disk data format for an individual logical rewrite mapping.
diff --git a/src/include/commands/cluster.h b/src/include/commands/cluster.h
index 89cbb6be59..6844405d25 100644
--- a/src/include/commands/cluster.h
+++ b/src/include/commands/cluster.h
@@ -63,6 +63,9 @@ typedef struct ConcurrentChange
 	/* Transaction that changes the data. */
 	TransactionId	xid;
 
+	/* For UPDATE / DELETE, the location of the old tuple version. */
+	ItemPointerData	old_tid;
+
 	/*
 	 * Historic catalog snapshot that was used to decode this change.
 	 */
diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h
index 3bc365a7b0..81dc80596e 100644
--- a/src/include/replication/reorderbuffer.h
+++ b/src/include/replication/reorderbuffer.h
@@ -104,6 +104,13 @@ typedef struct ReorderBufferChange
 			HeapTuple	oldtuple;
 			/* valid for INSERT || UPDATE */
 			HeapTuple	newtuple;
+
+			/*
+			 * CLUSTER CONCURRENTLY needs the old TID, even if the old tuple
+			 * itself is not WAL-logged (i.e. when the identity key does not
+			 * change).
+			 */
+			ItemPointerData	old_tid;
 		}			tp;
 
 		/*
-- 
2.45.2

