From 8acfb903cb62baabea2b32174ce98b78d840e068 Mon Sep 17 00:00:00 2001
From: Antonin Houska <ah@cybertec.at>
Date: Tue, 9 Jul 2024 17:46:00 +0200
Subject: [PATCH 4/4] Call logical_rewrite_heap_tuple() when applying
 concurrent data changes.

This was implemented for the sake of completeness, but I think it's currently
not needed. Possible use cases could be:

1. VACUUM FULL / CLUSTER CONCURRENTLY can process system catalogs.

System catalogs are scanned using a historic snapshot during logical decoding,
and the "combo CIDs" information is needed for that. Since "combo CID" is
associated with the "file locator" and that locator is changed by VACUUM FULL
/ CLUSTER, these commands must record the information on individual tuples
being moved from the old file to the new one. This is what
logical_rewrite_heap_tuple() does.

However, the logical decoding subsystem currently does not support decoding of
data changes in the system catalog. Therefore, the CONCURRENTLY option cannot
be used for system catalogs.

2. VACUUM FULL / CLUSTER CONCURRENTLY is processing a relation, but once it
has released all the locks (in order to get the exclusive lock), another
backend runs VACUUM FULL / CLUSTER CONCURRENTLY on the same table. Since the
relation is treated as a system catalog while these commands are processing it
(so it can be scanned using a historic snapshot during the "initial load"), it
is important that the 2nd backend does not break decoding of the "combo CIDs"
performed by the 1st backend.

However, it's not practical to let multiple backends run VACUUM FULL / CLUSTER
CONCURRENTLY on the same relation, so we forbid that.
---
 src/backend/access/heap/heapam_handler.c      |   2 +-
 src/backend/access/heap/rewriteheap.c         |  65 ++++++-----
 src/backend/commands/cluster.c                | 102 ++++++++++++++----
 src/backend/replication/logical/decode.c      |  41 ++++++-
 .../pgoutput_cluster/pgoutput_cluster.c       |  21 ++--
 src/include/access/rewriteheap.h              |   5 +-
 src/include/commands/cluster.h                |   3 +
 src/include/replication/reorderbuffer.h       |   7 ++
 8 files changed, 187 insertions(+), 59 deletions(-)

diff --git a/src/backend/access/heap/heapam_handler.c b/src/backend/access/heap/heapam_handler.c
index 02fd6d2983..cccfff62bd 100644
--- a/src/backend/access/heap/heapam_handler.c
+++ b/src/backend/access/heap/heapam_handler.c
@@ -735,7 +735,7 @@ heapam_relation_copy_for_cluster(Relation OldHeap, Relation NewHeap,
 
 	/* Initialize the rewrite operation */
 	rwstate = begin_heap_rewrite(OldHeap, NewHeap, OldestXmin, *xid_cutoff,
-								 *multi_cutoff);
+								 *multi_cutoff, true);
 
 
 	/* Set up sorting if wanted */
diff --git a/src/backend/access/heap/rewriteheap.c b/src/backend/access/heap/rewriteheap.c
index 473f3aa9be..050c8306da 100644
--- a/src/backend/access/heap/rewriteheap.c
+++ b/src/backend/access/heap/rewriteheap.c
@@ -214,10 +214,8 @@ static void raw_heap_insert(RewriteState state, HeapTuple tup);
 
 /* internal logical remapping prototypes */
 static void logical_begin_heap_rewrite(RewriteState state);
-static void logical_rewrite_heap_tuple(RewriteState state, ItemPointerData old_tid, HeapTuple new_tuple);
 static void logical_end_heap_rewrite(RewriteState state);
 
-
 /*
  * Begin a rewrite of a table
  *
@@ -226,18 +224,19 @@ static void logical_end_heap_rewrite(RewriteState state);
  * oldest_xmin	xid used by the caller to determine which tuples are dead
  * freeze_xid	xid before which tuples will be frozen
  * cutoff_multi	multixact before which multis will be removed
+ * tid_chains	need to maintain TID chains?
  *
  * Returns an opaque RewriteState, allocated in current memory context,
  * to be used in subsequent calls to the other functions.
  */
 RewriteState
 begin_heap_rewrite(Relation old_heap, Relation new_heap, TransactionId oldest_xmin,
-				   TransactionId freeze_xid, MultiXactId cutoff_multi)
+				   TransactionId freeze_xid, MultiXactId cutoff_multi,
+				   bool tid_chains)
 {
 	RewriteState state;
 	MemoryContext rw_cxt;
 	MemoryContext old_cxt;
-	HASHCTL		hash_ctl;
 
 	/*
 	 * To ease cleanup, make a separate context that will contain the
@@ -262,29 +261,34 @@ begin_heap_rewrite(Relation old_heap, Relation new_heap, TransactionId oldest_xm
 	state->rs_cxt = rw_cxt;
 	state->rs_bulkstate = smgr_bulk_start_rel(new_heap, MAIN_FORKNUM);
 
-	/* Initialize hash tables used to track update chains */
-	hash_ctl.keysize = sizeof(TidHashKey);
-	hash_ctl.entrysize = sizeof(UnresolvedTupData);
-	hash_ctl.hcxt = state->rs_cxt;
-
-	state->rs_unresolved_tups =
-		hash_create("Rewrite / Unresolved ctids",
-					128,		/* arbitrary initial size */
-					&hash_ctl,
-					HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
-
-	hash_ctl.entrysize = sizeof(OldToNewMappingData);
+	if (tid_chains)
+	{
+		HASHCTL		hash_ctl;
+
+		/* Initialize hash tables used to track update chains */
+		hash_ctl.keysize = sizeof(TidHashKey);
+		hash_ctl.entrysize = sizeof(UnresolvedTupData);
+		hash_ctl.hcxt = state->rs_cxt;
+
+		state->rs_unresolved_tups =
+			hash_create("Rewrite / Unresolved ctids",
+						128,		/* arbitrary initial size */
+						&hash_ctl,
+						HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
+
+		hash_ctl.entrysize = sizeof(OldToNewMappingData);
+
+		state->rs_old_new_tid_map =
+			hash_create("Rewrite / Old to new tid map",
+						128,		/* arbitrary initial size */
+						&hash_ctl,
+						HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
+	}
 
-	state->rs_old_new_tid_map =
-		hash_create("Rewrite / Old to new tid map",
-					128,		/* arbitrary initial size */
-					&hash_ctl,
-					HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
+	logical_begin_heap_rewrite(state);
 
 	MemoryContextSwitchTo(old_cxt);
 
-	logical_begin_heap_rewrite(state);
-
 	return state;
 }
 
@@ -303,12 +307,15 @@ end_heap_rewrite(RewriteState state)
 	 * Write any remaining tuples in the UnresolvedTups table. If we have any
 	 * left, they should in fact be dead, but let's err on the safe side.
 	 */
-	hash_seq_init(&seq_status, state->rs_unresolved_tups);
-
-	while ((unresolved = hash_seq_search(&seq_status)) != NULL)
+	if (state->rs_unresolved_tups)
 	{
-		ItemPointerSetInvalid(&unresolved->tuple->t_data->t_ctid);
-		raw_heap_insert(state, unresolved->tuple);
+		hash_seq_init(&seq_status, state->rs_unresolved_tups);
+
+		while ((unresolved = hash_seq_search(&seq_status)) != NULL)
+		{
+			ItemPointerSetInvalid(&unresolved->tuple->t_data->t_ctid);
+			raw_heap_insert(state, unresolved->tuple);
+		}
 	}
 
 	/* Write the last page, if any */
@@ -995,7 +1002,7 @@ logical_rewrite_log_mapping(RewriteState state, TransactionId xid,
  * Perform logical remapping for a tuple that's mapped from old_tid to
  * new_tuple->t_self by rewrite_heap_tuple() if necessary for the tuple.
  */
-static void
+void
 logical_rewrite_heap_tuple(RewriteState state, ItemPointerData old_tid,
 						   HeapTuple new_tuple)
 {
diff --git a/src/backend/commands/cluster.c b/src/backend/commands/cluster.c
index 6397f7f8c4..42e8118b7d 100644
--- a/src/backend/commands/cluster.c
+++ b/src/backend/commands/cluster.c
@@ -21,6 +21,7 @@
 #include "access/heapam.h"
 #include "access/multixact.h"
 #include "access/relscan.h"
+#include "access/rewriteheap.h"
 #include "access/tableam.h"
 #include "access/toast_internals.h"
 #include "access/transam.h"
@@ -179,17 +180,21 @@ static LogicalDecodingContext *setup_logical_decoding(Oid relid,
 static HeapTuple get_changed_tuple(ConcurrentChange *change);
 static void apply_concurrent_changes(ClusterDecodingState *dstate,
 									 Relation rel, ScanKey key, int nkeys,
-									 IndexInsertState *iistate);
+									 IndexInsertState *iistate,
+									 RewriteState rwstate);
 static void apply_concurrent_insert(Relation rel, ConcurrentChange *change,
 									HeapTuple tup, IndexInsertState *iistate,
-									TupleTableSlot *index_slot);
+									TupleTableSlot *index_slot,
+									RewriteState rwstate);
 static void apply_concurrent_update(Relation rel, HeapTuple tup,
 									HeapTuple tup_target,
 									ConcurrentChange *change,
 									IndexInsertState *iistate,
-									TupleTableSlot *index_slot);
+									TupleTableSlot *index_slot,
+									RewriteState rwstate);
 static void apply_concurrent_delete(Relation rel, HeapTuple tup_target,
-									ConcurrentChange *change);
+									ConcurrentChange *change,
+									RewriteState rwstate);
 static HeapTuple find_target_tuple(Relation rel, ScanKey key, int nkeys,
 								   HeapTuple tup_key,
 								   Snapshot snapshot,
@@ -202,7 +207,8 @@ static void process_concurrent_changes(LogicalDecodingContext *ctx,
 									   Relation rel_src,
 									   ScanKey ident_key,
 									   int ident_key_nentries,
-									   IndexInsertState *iistate);
+									   IndexInsertState *iistate,
+									   RewriteState rwstate);
 static IndexInsertState *get_index_insert_state(Relation relation,
 												Oid ident_index_id);
 static ScanKey build_identity_key(Oid ident_idx_oid, Relation rel_src,
@@ -3073,7 +3079,8 @@ cluster_decode_concurrent_changes(LogicalDecodingContext *ctx,
  */
 static void
 apply_concurrent_changes(ClusterDecodingState *dstate, Relation rel,
-						 ScanKey key, int nkeys, IndexInsertState *iistate)
+						 ScanKey key, int nkeys, IndexInsertState *iistate,
+						 RewriteState rwstate)
 {
 	TupleTableSlot *index_slot, *ident_slot;
 	HeapTuple	tup_old = NULL;
@@ -3144,7 +3151,8 @@ apply_concurrent_changes(ClusterDecodingState *dstate, Relation rel,
 		{
 			Assert(tup_old == NULL);
 
-			apply_concurrent_insert(rel, change, tup, iistate, index_slot);
+			apply_concurrent_insert(rel, change, tup, iistate, index_slot,
+									rwstate);
 
 			pfree(tup);
 		}
@@ -3152,7 +3160,7 @@ apply_concurrent_changes(ClusterDecodingState *dstate, Relation rel,
 				 change->kind == CHANGE_DELETE)
 		{
 			IndexScanDesc	ind_scan = NULL;
-			HeapTuple	tup_key;
+			HeapTuple	tup_key, tup_exist_cp;
 
 			if (change->kind == CHANGE_UPDATE_NEW)
 			{
@@ -3193,11 +3201,23 @@ apply_concurrent_changes(ClusterDecodingState *dstate, Relation rel,
 			if (tup_exist == NULL)
 				elog(ERROR, "Failed to find target tuple");
 
+			/*
+			 * Update the mapping for xmax of the old version.
+			 *
+			 * Use a copy ('tup_exist' can point to shared buffer) with xmin
+			 * invalid because mapping of that should have been written on
+			 * insertion.
+			 */
+			tup_exist_cp = heap_copytuple(tup_exist);
+			HeapTupleHeaderSetXmin(tup_exist_cp->t_data, InvalidTransactionId);
+			logical_rewrite_heap_tuple(rwstate, change->old_tid, tup_exist_cp);
+			pfree(tup_exist_cp);
+
 			if (change->kind == CHANGE_UPDATE_NEW)
 				apply_concurrent_update(rel, tup, tup_exist, change, iistate,
-										index_slot);
+										index_slot, rwstate);
 			else
-				apply_concurrent_delete(rel, tup_exist, change);
+				apply_concurrent_delete(rel, tup_exist, change, rwstate);
 
 			ResetClusterCurrentXids();
 
@@ -3238,9 +3258,12 @@ apply_concurrent_changes(ClusterDecodingState *dstate, Relation rel,
 
 static void
 apply_concurrent_insert(Relation rel, ConcurrentChange *change, HeapTuple tup,
-						IndexInsertState *iistate, TupleTableSlot *index_slot)
+						IndexInsertState *iistate, TupleTableSlot *index_slot,
+						RewriteState rwstate)
 {
+	HeapTupleHeader	tup_hdr = tup->t_data;
 	Snapshot	snapshot = change->snapshot;
+	ItemPointerData		old_tid;
 	List	   *recheck;
 
 	/*
@@ -3250,6 +3273,9 @@ apply_concurrent_insert(Relation rel, ConcurrentChange *change, HeapTuple tup,
 	 */
 	SetClusterCurrentXids(snapshot->subxip, snapshot->subxcnt);
 
+	/* Remember location in the old heap. */
+	ItemPointerCopy(&tup_hdr->t_ctid, &old_tid);
+
 	/*
 	 * Write the tuple into the new heap.
 	 *
@@ -3265,6 +3291,14 @@ apply_concurrent_insert(Relation rel, ConcurrentChange *change, HeapTuple tup,
 	heap_insert(rel, tup, change->xid, snapshot->curcid - 1,
 				HEAP_INSERT_NO_LOGICAL, NULL);
 
+	/*
+	 * Update the mapping for xmin. (xmax should be invalid). This is needed
+	 * because, during the processing, the table is considered an "user
+	 * catalog".
+	 */
+	Assert(!TransactionIdIsValid(HeapTupleHeaderGetRawXmax(tup->t_data)));
+	logical_rewrite_heap_tuple(rwstate, old_tid, tup);
+
 	/*
 	 * Update indexes.
 	 *
@@ -3298,16 +3332,19 @@ apply_concurrent_insert(Relation rel, ConcurrentChange *change, HeapTuple tup,
 static void
 apply_concurrent_update(Relation rel, HeapTuple tup, HeapTuple tup_target,
 						ConcurrentChange *change, IndexInsertState *iistate,
-						TupleTableSlot *index_slot)
+						TupleTableSlot *index_slot, RewriteState rwstate)
 {
 	List	   *recheck;
 	LockTupleMode	lockmode;
 	TU_UpdateIndexes	update_indexes;
-	ItemPointerData		tid_old_new_heap;
+	ItemPointerData		tid_new_old_heap, tid_old_new_heap;
 	TM_Result	res;
 	Snapshot snapshot	= change->snapshot;
 	TM_FailureData tmfd;
 
+	/* Location of the new tuple in the old heap. */
+	ItemPointerCopy(&tup->t_data->t_ctid, &tid_new_old_heap);
+
 	/* Location of the existing tuple in the new heap. */
 	ItemPointerCopy(&tup_target->t_self, &tid_old_new_heap);
 
@@ -3330,6 +3367,10 @@ apply_concurrent_update(Relation rel, HeapTuple tup, HeapTuple tup_target,
 	if (res != TM_Ok)
 		ereport(ERROR, (errmsg("failed to apply concurrent UPDATE")));
 
+	/* Update the mapping for xmin of the new version. */
+	Assert(!TransactionIdIsValid(HeapTupleHeaderGetRawXmax(tup->t_data)));
+	logical_rewrite_heap_tuple(rwstate, tid_new_old_heap, tup);
+
 	ExecStoreHeapTuple(tup, index_slot, false);
 
 	if (update_indexes != TU_None)
@@ -3353,7 +3394,7 @@ apply_concurrent_update(Relation rel, HeapTuple tup, HeapTuple tup_target,
 
 static void
 apply_concurrent_delete(Relation rel, HeapTuple tup_target,
-						ConcurrentChange *change)
+						ConcurrentChange *change, RewriteState rwstate)
 {
 	ItemPointerData		tid_old_new_heap;
 	TM_Result	res;
@@ -3444,7 +3485,8 @@ find_target_tuple(Relation rel, ScanKey key, int nkeys, HeapTuple tup_key,
 static void
 process_concurrent_changes(LogicalDecodingContext *ctx, XLogRecPtr end_of_wal,
 						   Relation rel_dst, Relation rel_src, ScanKey ident_key,
-						   int ident_key_nentries, IndexInsertState *iistate)
+						   int ident_key_nentries, IndexInsertState *iistate,
+						   RewriteState rwstate)
 {
 	ClusterDecodingState *dstate;
 
@@ -3468,7 +3510,7 @@ process_concurrent_changes(LogicalDecodingContext *ctx, XLogRecPtr end_of_wal,
 			rel_dst->rd_toastoid = rel_src->rd_rel->reltoastrelid;
 
 		apply_concurrent_changes(dstate, rel_dst, ident_key,
-								 ident_key_nentries, iistate);
+								 ident_key_nentries, iistate, rwstate);
 	}
 	PG_FINALLY();
 	{
@@ -3631,6 +3673,7 @@ rebuild_relation_finish_concurrent(Relation NewHeap, Relation OldHeap,
 	bool		is_system_catalog;
 	Oid		ident_idx_old, ident_idx_new;
 	IndexInsertState *iistate;
+	RewriteState	rwstate;
 	ScanKey		ident_key;
 	int		ident_key_nentries;
 	XLogRecPtr	wal_insert_ptr, end_of_wal;
@@ -3708,10 +3751,26 @@ rebuild_relation_finish_concurrent(Relation NewHeap, Relation OldHeap,
 	 * Apply concurrent changes first time, to minimize the time we need to
 	 * hold AccessExclusiveLock. (Quite some amount of WAL could have been
 	 * written during the data copying and index creation.)
+	 *
+	 * Now we are processing individual tuples, so pass false for
+	 * 'tid_chains'. Since rwstate is now only needed for
+	 * logical_begin_heap_rewrite(), none of the transaction IDs needs to be
+	 * valid.
 	 */
+	rwstate = begin_heap_rewrite(OldHeap, NewHeap,
+								 InvalidTransactionId,
+								 InvalidTransactionId,
+								 InvalidTransactionId,
+								 false);
 	process_concurrent_changes(ctx, end_of_wal, NewHeap,
 							   swap_toast_by_content ? OldHeap : NULL,
-							   ident_key, ident_key_nentries, iistate);
+							   ident_key, ident_key_nentries, iistate,
+							   rwstate);
+	/*
+	 * OldHeap will be closed, so we need to initialize rwstate again for the
+	 * next call of process_concurrent_changes().
+	 */
+	end_heap_rewrite(rwstate);
 
 	/*
 	 * Release the locks that allowed concurrent data changes, in order to
@@ -3833,9 +3892,16 @@ rebuild_relation_finish_concurrent(Relation NewHeap, Relation OldHeap,
 	end_of_wal = GetFlushRecPtr(NULL);
 
 	/* Apply the concurrent changes again. */
+	rwstate = begin_heap_rewrite(OldHeap, NewHeap,
+								 InvalidTransactionId,
+								 InvalidTransactionId,
+								 InvalidTransactionId,
+								 false);
 	process_concurrent_changes(ctx, end_of_wal, NewHeap,
 							   swap_toast_by_content ? OldHeap : NULL,
-							   ident_key, ident_key_nentries, iistate);
+							   ident_key, ident_key_nentries, iistate,
+							   rwstate);
+	end_heap_rewrite(rwstate);
 
 	/* Remember info about rel before closing OldHeap */
 	relpersistence = OldHeap->rd_rel->relpersistence;
diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c
index 066d96dea2..69a43e3510 100644
--- a/src/backend/replication/logical/decode.c
+++ b/src/backend/replication/logical/decode.c
@@ -951,11 +951,13 @@ DecodeInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 	xl_heap_insert *xlrec;
 	ReorderBufferChange *change;
 	RelFileLocator target_locator;
+	BlockNumber		blknum;
+	HeapTupleHeader	tuphdr;
 
 	xlrec = (xl_heap_insert *) XLogRecGetData(r);
 
 	/* only interested in our database */
-	XLogRecGetBlockTag(r, 0, &target_locator, NULL, NULL);
+	XLogRecGetBlockTag(r, 0, &target_locator, NULL, &blknum);
 	if (target_locator.dbOid != ctx->slot->data.database)
 		return;
 
@@ -980,6 +982,13 @@ DecodeInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 
 	DecodeXLogTuple(tupledata, datalen, change->data.tp.newtuple);
 
+	/*
+	 * CTID is needed for logical_rewrite_heap_tuple(), when doing CLUSTER
+	 * CONCURRENTLY.
+	 */
+	tuphdr = change->data.tp.newtuple->t_data;
+	ItemPointerSet(&tuphdr->t_ctid, blknum, xlrec->offnum);
+
 	change->data.tp.clear_toast_afterwards = true;
 
 	ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), buf->origptr,
@@ -1001,11 +1010,14 @@ DecodeUpdate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 	ReorderBufferChange *change;
 	char	   *data;
 	RelFileLocator target_locator;
+	BlockNumber		old_blknum, new_blknum;
 
 	xlrec = (xl_heap_update *) XLogRecGetData(r);
 
+	/* Retrieve blknum, so that we can compose CTID below. */
+	XLogRecGetBlockTag(r, 0, &target_locator, NULL, &new_blknum);
+
 	/* only interested in our database */
-	XLogRecGetBlockTag(r, 0, &target_locator, NULL, NULL);
 	if (target_locator.dbOid != ctx->slot->data.database)
 		return;
 
@@ -1022,6 +1034,7 @@ DecodeUpdate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 	{
 		Size		datalen;
 		Size		tuplelen;
+		HeapTupleHeader	tuphdr;
 
 		data = XLogRecGetBlockData(r, 0, &datalen);
 
@@ -1031,6 +1044,13 @@ DecodeUpdate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 			ReorderBufferGetTupleBuf(ctx->reorder, tuplelen);
 
 		DecodeXLogTuple(data, datalen, change->data.tp.newtuple);
+
+		/*
+		 * CTID is needed for logical_rewrite_heap_tuple(), when doing CLUSTER
+		 * CONCURRENTLY.
+		 */
+		tuphdr = change->data.tp.newtuple->t_data;
+		ItemPointerSet(&tuphdr->t_ctid, new_blknum, xlrec->new_offnum);
 	}
 
 	if (xlrec->flags & XLH_UPDATE_CONTAINS_OLD)
@@ -1049,6 +1069,14 @@ DecodeUpdate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 		DecodeXLogTuple(data, datalen, change->data.tp.oldtuple);
 	}
 
+	/*
+	 * Remember the old tuple CTID, for the sake of
+	 * logical_rewrite_heap_tuple().
+	 */
+	if (!XLogRecGetBlockTagExtended(r, 1, NULL, NULL, &old_blknum, NULL))
+		old_blknum = new_blknum;
+	ItemPointerSet(&change->data.tp.old_tid, old_blknum, xlrec->old_offnum);
+
 	change->data.tp.clear_toast_afterwards = true;
 
 	ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), buf->origptr,
@@ -1067,11 +1095,12 @@ DecodeDelete(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 	xl_heap_delete *xlrec;
 	ReorderBufferChange *change;
 	RelFileLocator target_locator;
+	BlockNumber		blknum;
 
 	xlrec = (xl_heap_delete *) XLogRecGetData(r);
 
 	/* only interested in our database */
-	XLogRecGetBlockTag(r, 0, &target_locator, NULL, NULL);
+	XLogRecGetBlockTag(r, 0, &target_locator, NULL, &blknum);
 	if (target_locator.dbOid != ctx->slot->data.database)
 		return;
 
@@ -1103,6 +1132,12 @@ DecodeDelete(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 
 		DecodeXLogTuple((char *) xlrec + SizeOfHeapDelete,
 						datalen, change->data.tp.oldtuple);
+
+		/*
+		 * CTID is needed for logical_rewrite_heap_tuple(), when doing CLUSTER
+		 * CONCURRENTLY.
+		 */
+		ItemPointerSet(&change->data.tp.old_tid, blknum, xlrec->offnum);
 	}
 
 	change->data.tp.clear_toast_afterwards = true;
diff --git a/src/backend/replication/pgoutput_cluster/pgoutput_cluster.c b/src/backend/replication/pgoutput_cluster/pgoutput_cluster.c
index 9fe44017a8..2c33fbad82 100644
--- a/src/backend/replication/pgoutput_cluster/pgoutput_cluster.c
+++ b/src/backend/replication/pgoutput_cluster/pgoutput_cluster.c
@@ -34,7 +34,7 @@ static void plugin_truncate(struct LogicalDecodingContext *ctx,
 							ReorderBufferChange *change);
 static void store_change(LogicalDecodingContext *ctx,
 						 ConcurrentChangeKind kind, HeapTuple tuple,
-						 TransactionId xid);
+						 TransactionId xid, ItemPointer old_tid);
 
 void
 _PG_output_plugin_init(OutputPluginCallbacks *cb)
@@ -162,7 +162,8 @@ plugin_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 				if (newtuple == NULL)
 					elog(ERROR, "Incomplete insert info.");
 
-				store_change(ctx, CHANGE_INSERT, newtuple, change->txn->xid);
+				store_change(ctx, CHANGE_INSERT, newtuple, change->txn->xid,
+							 NULL);
 			}
 			break;
 		case REORDER_BUFFER_CHANGE_UPDATE:
@@ -180,10 +181,10 @@ plugin_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 
 				if (oldtuple != NULL)
 					store_change(ctx, CHANGE_UPDATE_OLD, oldtuple,
-								 change->txn->xid);
+								 change->txn->xid, NULL);
 
 				store_change(ctx, CHANGE_UPDATE_NEW, newtuple,
-							 change->txn->xid);
+							 change->txn->xid, &change->data.tp.old_tid);
 			}
 			break;
 		case REORDER_BUFFER_CHANGE_DELETE:
@@ -196,7 +197,8 @@ plugin_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 				if (oldtuple == NULL)
 					elog(ERROR, "Incomplete delete info.");
 
-				store_change(ctx, CHANGE_DELETE, oldtuple, change->txn->xid);
+				store_change(ctx, CHANGE_DELETE, oldtuple, change->txn->xid,
+							 &change->data.tp.old_tid);
 			}
 			break;
 		default:
@@ -230,13 +232,13 @@ plugin_truncate(struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 	if (i == nrelations)
 		return;
 
-	store_change(ctx, CHANGE_TRUNCATE, NULL, InvalidTransactionId);
+	store_change(ctx, CHANGE_TRUNCATE, NULL, InvalidTransactionId, NULL);
 }
 
 /* Store concurrent data change. */
 static void
 store_change(LogicalDecodingContext *ctx, ConcurrentChangeKind kind,
-			 HeapTuple tuple, TransactionId xid)
+			 HeapTuple tuple, TransactionId xid, ItemPointer old_tid)
 {
 	ClusterDecodingState *dstate;
 	char	   *change_raw;
@@ -301,6 +303,11 @@ store_change(LogicalDecodingContext *ctx, ConcurrentChangeKind kind,
 	change->snapshot = dstate->snapshot;
 	dstate->snapshot->active_count++;
 
+	if (old_tid)
+		ItemPointerCopy(old_tid, &change->old_tid);
+	else
+		ItemPointerSetInvalid(&change->old_tid);
+
 	/* The data has been copied. */
 	if (flattened)
 		pfree(tuple);
diff --git a/src/include/access/rewriteheap.h b/src/include/access/rewriteheap.h
index 5866a26bdd..de62b6abf8 100644
--- a/src/include/access/rewriteheap.h
+++ b/src/include/access/rewriteheap.h
@@ -23,11 +23,14 @@ typedef struct RewriteStateData *RewriteState;
 
 extern RewriteState begin_heap_rewrite(Relation old_heap, Relation new_heap,
 									   TransactionId oldest_xmin, TransactionId freeze_xid,
-									   MultiXactId cutoff_multi);
+									   MultiXactId cutoff_multi, bool tid_chains);
 extern void end_heap_rewrite(RewriteState state);
 extern void rewrite_heap_tuple(RewriteState state, HeapTuple old_tuple,
 							   HeapTuple new_tuple);
 extern bool rewrite_heap_dead_tuple(RewriteState state, HeapTuple old_tuple);
+extern void logical_rewrite_heap_tuple(RewriteState state,
+									   ItemPointerData old_tid,
+									   HeapTuple new_tuple);
 
 /*
  * On-Disk data format for an individual logical rewrite mapping.
diff --git a/src/include/commands/cluster.h b/src/include/commands/cluster.h
index f98b855f21..c394ef3871 100644
--- a/src/include/commands/cluster.h
+++ b/src/include/commands/cluster.h
@@ -71,6 +71,9 @@ typedef struct ConcurrentChange
 	/* Transaction that changes the data. */
 	TransactionId	xid;
 
+	/* For UPDATE / DELETE, the location of the old tuple version. */
+	ItemPointerData	old_tid;
+
 	/*
 	 * Historic catalog snapshot that was used to decode this change.
 	 */
diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h
index 851a001c8b..1fa8f8bd6a 100644
--- a/src/include/replication/reorderbuffer.h
+++ b/src/include/replication/reorderbuffer.h
@@ -99,6 +99,13 @@ typedef struct ReorderBufferChange
 			HeapTuple	oldtuple;
 			/* valid for INSERT || UPDATE */
 			HeapTuple	newtuple;
+
+			/*
+			 * CLUSTER CONCURRENTLY needs the old TID, even if the old tuple
+			 * itself is not WAL-logged (i.e. when the identity key does not
+			 * change).
+			 */
+			ItemPointerData	old_tid;
 		}			tp;
 
 		/*
-- 
2.45.2

