From 0da92dc530c9251735fc70b20cd004d9630a1266 Mon Sep 17 00:00:00 2001 From: Tomas Vondra Date: Thu, 10 Feb 2022 18:43:28 +0100 Subject: Logical decoding of sequences This extends the logical decoding to also decode sequence increments. We differentiate between sequences created in the current (in-progress) transaction, and sequences created earlier. This mixed behavior is necessary because while sequences are not transactional (increments are not subject to ROLLBACK), relfilenode changes are. So we do this: * Changes for sequences created in the same top-level transaction are treated as transactional, i.e. just like any other change from that transaction, and discarded in case of a rollback. * Changes for sequences created earlier are applied immediately, as if performed outside any transaction. This applies also after ALTER SEQUENCE, which may create a new relfilenode. Moreover, if we ever get support for DDL replication, the sequence won't exist until the transaction gets applied. Sequences created in the current transaction are tracked in a simple hash table, identified by a relfilenode. That means a sequence may already exist, but if a transaction does ALTER SEQUENCE then the increments for the new relfilenode will be treated as transactional. For each relfilenode we track the XID of (sub)transaction that created it, which is needed for cleanup at transaction end. We don't need to check the XID to decide if an increment is transactional - if we find a match in the hash table, it has to be the same transaction. This requires two minor changes to WAL-logging. Firstly, we need to ensure the sequence record has a valid XID - until now the the increment might have XID 0 if it was the first change in a subxact. But the sequence might have been created in the same top-level transaction. So we ensure the XID is assigned when WAL-logging increments. The other change is addition of "created" flag, marking increments for newly created relfilenodes. This makes it easier to maintain the hash table of sequences that need transactional handling. Note: This is needed because of subxacts. A XID 0 might still have the sequence created in a different subxact of the same top-level xact. This does not include any changes to test_decoding and/or the built-in replication - those will be committed in separate patches. A patch adding decoding of sequences was originally submitted by Cary Huang. This commit reworks various important aspects (e.g. the WAL logging and transactional/non-transactional handling). However, the original patch and reviews were very useful. Author: Tomas Vondra, Cary Huang Reviewed-by: Peter Eisentraut, Hannu Krosing, Andres Freund Discussion: https://postgr.es/m/d045f3c2-6cfb-06d3-5540-e63c320df8bc@enterprisedb.com Discussion: https://postgr.es/m/1710ed7e13b.cd7177461430746.3372264562543607781@highgo.ca --- src/include/access/rmgrlist.h | 2 +- src/include/commands/sequence.h | 1 + src/include/replication/decode.h | 1 + src/include/replication/output_plugin.h | 27 +++++++++++++++++++++ src/include/replication/reorderbuffer.h | 43 ++++++++++++++++++++++++++++++++- 5 files changed, 72 insertions(+), 2 deletions(-) (limited to 'src/include') diff --git a/src/include/access/rmgrlist.h b/src/include/access/rmgrlist.h index 9a74721c97c..cf8b6d48193 100644 --- a/src/include/access/rmgrlist.h +++ b/src/include/access/rmgrlist.h @@ -40,7 +40,7 @@ PG_RMGR(RM_BTREE_ID, "Btree", btree_redo, btree_desc, btree_identify, btree_xlog PG_RMGR(RM_HASH_ID, "Hash", hash_redo, hash_desc, hash_identify, NULL, NULL, hash_mask, NULL) PG_RMGR(RM_GIN_ID, "Gin", gin_redo, gin_desc, gin_identify, gin_xlog_startup, gin_xlog_cleanup, gin_mask, NULL) PG_RMGR(RM_GIST_ID, "Gist", gist_redo, gist_desc, gist_identify, gist_xlog_startup, gist_xlog_cleanup, gist_mask, NULL) -PG_RMGR(RM_SEQ_ID, "Sequence", seq_redo, seq_desc, seq_identify, NULL, NULL, seq_mask, NULL) +PG_RMGR(RM_SEQ_ID, "Sequence", seq_redo, seq_desc, seq_identify, NULL, NULL, seq_mask, sequence_decode) PG_RMGR(RM_SPGIST_ID, "SPGist", spg_redo, spg_desc, spg_identify, spg_xlog_startup, spg_xlog_cleanup, spg_mask, NULL) PG_RMGR(RM_BRIN_ID, "BRIN", brin_redo, brin_desc, brin_identify, NULL, NULL, brin_mask, NULL) PG_RMGR(RM_COMMIT_TS_ID, "CommitTs", commit_ts_redo, commit_ts_desc, commit_ts_identify, NULL, NULL, NULL, NULL) diff --git a/src/include/commands/sequence.h b/src/include/commands/sequence.h index f7542f2bccb..9fecc41954e 100644 --- a/src/include/commands/sequence.h +++ b/src/include/commands/sequence.h @@ -48,6 +48,7 @@ typedef FormData_pg_sequence_data *Form_pg_sequence_data; typedef struct xl_seq_rec { RelFileNode node; + bool created; /* creates a new relfilenode (CREATE/ALTER) */ /* SEQUENCE TUPLE DATA FOLLOWS AT THE END */ } xl_seq_rec; diff --git a/src/include/replication/decode.h b/src/include/replication/decode.h index a33c2a718a7..8e07bb7409a 100644 --- a/src/include/replication/decode.h +++ b/src/include/replication/decode.h @@ -27,6 +27,7 @@ extern void heap2_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf); extern void xact_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf); extern void standby_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf); extern void logicalmsg_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf); +extern void sequence_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf); extern void LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogReaderState *record); diff --git a/src/include/replication/output_plugin.h b/src/include/replication/output_plugin.h index 41157fda7cc..a16bebf76ca 100644 --- a/src/include/replication/output_plugin.h +++ b/src/include/replication/output_plugin.h @@ -88,6 +88,18 @@ typedef void (*LogicalDecodeMessageCB) (struct LogicalDecodingContext *ctx, Size message_size, const char *message); +/* + * Called for the generic logical decoding sequences. + */ +typedef void (*LogicalDecodeSequenceCB) (struct LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, + XLogRecPtr sequence_lsn, + Relation rel, + bool transactional, + int64 last_value, + int64 log_cnt, + bool is_called); + /* * Filter changes by origin. */ @@ -199,6 +211,19 @@ typedef void (*LogicalDecodeStreamMessageCB) (struct LogicalDecodingContext *ctx Size message_size, const char *message); +/* + * Called for the streaming generic logical decoding sequences from in-progress + * transactions. + */ +typedef void (*LogicalDecodeStreamSequenceCB) (struct LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, + XLogRecPtr sequence_lsn, + Relation rel, + bool transactional, + int64 last_value, + int64 log_cnt, + bool is_called); + /* * Callback for streaming truncates from in-progress transactions. */ @@ -219,6 +244,7 @@ typedef struct OutputPluginCallbacks LogicalDecodeTruncateCB truncate_cb; LogicalDecodeCommitCB commit_cb; LogicalDecodeMessageCB message_cb; + LogicalDecodeSequenceCB sequence_cb; LogicalDecodeFilterByOriginCB filter_by_origin_cb; LogicalDecodeShutdownCB shutdown_cb; @@ -237,6 +263,7 @@ typedef struct OutputPluginCallbacks LogicalDecodeStreamCommitCB stream_commit_cb; LogicalDecodeStreamChangeCB stream_change_cb; LogicalDecodeStreamMessageCB stream_message_cb; + LogicalDecodeStreamSequenceCB stream_sequence_cb; LogicalDecodeStreamTruncateCB stream_truncate_cb; } OutputPluginCallbacks; diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h index aa0a73382f6..859424bbd9b 100644 --- a/src/include/replication/reorderbuffer.h +++ b/src/include/replication/reorderbuffer.h @@ -64,7 +64,8 @@ enum ReorderBufferChangeType REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT, - REORDER_BUFFER_CHANGE_TRUNCATE + REORDER_BUFFER_CHANGE_TRUNCATE, + REORDER_BUFFER_CHANGE_SEQUENCE }; /* forward declaration */ @@ -158,6 +159,13 @@ typedef struct ReorderBufferChange uint32 ninvalidations; /* Number of messages */ SharedInvalidationMessage *invalidations; /* invalidation message */ } inval; + + /* Context data for Sequence changes */ + struct + { + RelFileNode relnode; + ReorderBufferTupleBuf *tuple; + } sequence; } data; /* @@ -430,6 +438,15 @@ typedef void (*ReorderBufferMessageCB) (ReorderBuffer *rb, const char *prefix, Size sz, const char *message); +/* sequence callback signature */ +typedef void (*ReorderBufferSequenceCB) (ReorderBuffer *rb, + ReorderBufferTXN *txn, + XLogRecPtr sequence_lsn, + Relation rel, + bool transactional, + int64 last_value, int64 log_cnt, + bool is_called); + /* begin prepare callback signature */ typedef void (*ReorderBufferBeginPrepareCB) (ReorderBuffer *rb, ReorderBufferTXN *txn); @@ -496,6 +513,15 @@ typedef void (*ReorderBufferStreamMessageCB) ( const char *prefix, Size sz, const char *message); +/* stream sequence callback signature */ +typedef void (*ReorderBufferStreamSequenceCB) (ReorderBuffer *rb, + ReorderBufferTXN *txn, + XLogRecPtr sequence_lsn, + Relation rel, + bool transactional, + int64 last_value, int64 log_cnt, + bool is_called); + /* stream truncate callback signature */ typedef void (*ReorderBufferStreamTruncateCB) ( ReorderBuffer *rb, @@ -511,6 +537,12 @@ struct ReorderBuffer */ HTAB *by_txn; + /* + * relfilenode => XID lookup table for sequences created in a transaction + * (also includes altered sequences, which assigns new relfilenode) + */ + HTAB *sequences; + /* * Transactions that could be a toplevel xact, ordered by LSN of the first * record bearing that xid. @@ -541,6 +573,7 @@ struct ReorderBuffer ReorderBufferApplyTruncateCB apply_truncate; ReorderBufferCommitCB commit; ReorderBufferMessageCB message; + ReorderBufferSequenceCB sequence; /* * Callbacks to be called when streaming a transaction at prepare time. @@ -560,6 +593,7 @@ struct ReorderBuffer ReorderBufferStreamCommitCB stream_commit; ReorderBufferStreamChangeCB stream_change; ReorderBufferStreamMessageCB stream_message; + ReorderBufferStreamSequenceCB stream_sequence; ReorderBufferStreamTruncateCB stream_truncate; /* @@ -635,6 +669,10 @@ void ReorderBufferQueueChange(ReorderBuffer *, TransactionId, void ReorderBufferQueueMessage(ReorderBuffer *, TransactionId, Snapshot snapshot, XLogRecPtr lsn, bool transactional, const char *prefix, Size message_size, const char *message); +void ReorderBufferQueueSequence(ReorderBuffer *rb, TransactionId xid, + Snapshot snapshot, XLogRecPtr lsn, RepOriginId origin_id, + RelFileNode rnode, bool transactional, bool created, + ReorderBufferTupleBuf *tuplebuf); void ReorderBufferCommit(ReorderBuffer *, TransactionId, XLogRecPtr commit_lsn, XLogRecPtr end_lsn, TimestampTz commit_time, RepOriginId origin_id, XLogRecPtr origin_lsn); @@ -682,4 +720,7 @@ void ReorderBufferSetRestartPoint(ReorderBuffer *, XLogRecPtr ptr); void StartupReorderBuffer(void); +bool ReorderBufferSequenceIsTransactional(ReorderBuffer *rb, + RelFileNode rnode, bool created); + #endif -- cgit v1.2.3