diff options
Diffstat (limited to 'src/include')
| -rw-r--r-- | src/include/access/rmgrlist.h | 2 | ||||
| -rw-r--r-- | src/include/commands/sequence.h | 1 | ||||
| -rw-r--r-- | src/include/replication/decode.h | 1 | ||||
| -rw-r--r-- | src/include/replication/output_plugin.h | 27 | ||||
| -rw-r--r-- | src/include/replication/reorderbuffer.h | 43 |
5 files changed, 72 insertions, 2 deletions
diff --git a/src/include/access/rmgrlist.h b/src/include/access/rmgrlist.h index 9a74721c97c..cf8b6d48193 100644 --- a/src/include/access/rmgrlist.h +++ b/src/include/access/rmgrlist.h @@ -40,7 +40,7 @@ PG_RMGR(RM_BTREE_ID, "Btree", btree_redo, btree_desc, btree_identify, btree_xlog PG_RMGR(RM_HASH_ID, "Hash", hash_redo, hash_desc, hash_identify, NULL, NULL, hash_mask, NULL) PG_RMGR(RM_GIN_ID, "Gin", gin_redo, gin_desc, gin_identify, gin_xlog_startup, gin_xlog_cleanup, gin_mask, NULL) PG_RMGR(RM_GIST_ID, "Gist", gist_redo, gist_desc, gist_identify, gist_xlog_startup, gist_xlog_cleanup, gist_mask, NULL) -PG_RMGR(RM_SEQ_ID, "Sequence", seq_redo, seq_desc, seq_identify, NULL, NULL, seq_mask, NULL) +PG_RMGR(RM_SEQ_ID, "Sequence", seq_redo, seq_desc, seq_identify, NULL, NULL, seq_mask, sequence_decode) PG_RMGR(RM_SPGIST_ID, "SPGist", spg_redo, spg_desc, spg_identify, spg_xlog_startup, spg_xlog_cleanup, spg_mask, NULL) PG_RMGR(RM_BRIN_ID, "BRIN", brin_redo, brin_desc, brin_identify, NULL, NULL, brin_mask, NULL) PG_RMGR(RM_COMMIT_TS_ID, "CommitTs", commit_ts_redo, commit_ts_desc, commit_ts_identify, NULL, NULL, NULL, NULL) diff --git a/src/include/commands/sequence.h b/src/include/commands/sequence.h index f7542f2bccb..9fecc41954e 100644 --- a/src/include/commands/sequence.h +++ b/src/include/commands/sequence.h @@ -48,6 +48,7 @@ typedef FormData_pg_sequence_data *Form_pg_sequence_data; typedef struct xl_seq_rec { RelFileNode node; + bool created; /* creates a new relfilenode (CREATE/ALTER) */ /* SEQUENCE TUPLE DATA FOLLOWS AT THE END */ } xl_seq_rec; diff --git a/src/include/replication/decode.h b/src/include/replication/decode.h index a33c2a718a7..8e07bb7409a 100644 --- a/src/include/replication/decode.h +++ b/src/include/replication/decode.h @@ -27,6 +27,7 @@ extern void heap2_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf); extern void xact_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf); extern void standby_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf); extern void logicalmsg_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf); +extern void sequence_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf); extern void LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogReaderState *record); diff --git a/src/include/replication/output_plugin.h b/src/include/replication/output_plugin.h index 41157fda7cc..a16bebf76ca 100644 --- a/src/include/replication/output_plugin.h +++ b/src/include/replication/output_plugin.h @@ -89,6 +89,18 @@ typedef void (*LogicalDecodeMessageCB) (struct LogicalDecodingContext *ctx, const char *message); /* + * Called for the generic logical decoding sequences. + */ +typedef void (*LogicalDecodeSequenceCB) (struct LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, + XLogRecPtr sequence_lsn, + Relation rel, + bool transactional, + int64 last_value, + int64 log_cnt, + bool is_called); + +/* * Filter changes by origin. */ typedef bool (*LogicalDecodeFilterByOriginCB) (struct LogicalDecodingContext *ctx, @@ -200,6 +212,19 @@ typedef void (*LogicalDecodeStreamMessageCB) (struct LogicalDecodingContext *ctx const char *message); /* + * Called for the streaming generic logical decoding sequences from in-progress + * transactions. + */ +typedef void (*LogicalDecodeStreamSequenceCB) (struct LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, + XLogRecPtr sequence_lsn, + Relation rel, + bool transactional, + int64 last_value, + int64 log_cnt, + bool is_called); + +/* * Callback for streaming truncates from in-progress transactions. */ typedef void (*LogicalDecodeStreamTruncateCB) (struct LogicalDecodingContext *ctx, @@ -219,6 +244,7 @@ typedef struct OutputPluginCallbacks LogicalDecodeTruncateCB truncate_cb; LogicalDecodeCommitCB commit_cb; LogicalDecodeMessageCB message_cb; + LogicalDecodeSequenceCB sequence_cb; LogicalDecodeFilterByOriginCB filter_by_origin_cb; LogicalDecodeShutdownCB shutdown_cb; @@ -237,6 +263,7 @@ typedef struct OutputPluginCallbacks LogicalDecodeStreamCommitCB stream_commit_cb; LogicalDecodeStreamChangeCB stream_change_cb; LogicalDecodeStreamMessageCB stream_message_cb; + LogicalDecodeStreamSequenceCB stream_sequence_cb; LogicalDecodeStreamTruncateCB stream_truncate_cb; } OutputPluginCallbacks; diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h index aa0a73382f6..859424bbd9b 100644 --- a/src/include/replication/reorderbuffer.h +++ b/src/include/replication/reorderbuffer.h @@ -64,7 +64,8 @@ enum ReorderBufferChangeType REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT, - REORDER_BUFFER_CHANGE_TRUNCATE + REORDER_BUFFER_CHANGE_TRUNCATE, + REORDER_BUFFER_CHANGE_SEQUENCE }; /* forward declaration */ @@ -158,6 +159,13 @@ typedef struct ReorderBufferChange uint32 ninvalidations; /* Number of messages */ SharedInvalidationMessage *invalidations; /* invalidation message */ } inval; + + /* Context data for Sequence changes */ + struct + { + RelFileNode relnode; + ReorderBufferTupleBuf *tuple; + } sequence; } data; /* @@ -430,6 +438,15 @@ typedef void (*ReorderBufferMessageCB) (ReorderBuffer *rb, const char *prefix, Size sz, const char *message); +/* sequence callback signature */ +typedef void (*ReorderBufferSequenceCB) (ReorderBuffer *rb, + ReorderBufferTXN *txn, + XLogRecPtr sequence_lsn, + Relation rel, + bool transactional, + int64 last_value, int64 log_cnt, + bool is_called); + /* begin prepare callback signature */ typedef void (*ReorderBufferBeginPrepareCB) (ReorderBuffer *rb, ReorderBufferTXN *txn); @@ -496,6 +513,15 @@ typedef void (*ReorderBufferStreamMessageCB) ( const char *prefix, Size sz, const char *message); +/* stream sequence callback signature */ +typedef void (*ReorderBufferStreamSequenceCB) (ReorderBuffer *rb, + ReorderBufferTXN *txn, + XLogRecPtr sequence_lsn, + Relation rel, + bool transactional, + int64 last_value, int64 log_cnt, + bool is_called); + /* stream truncate callback signature */ typedef void (*ReorderBufferStreamTruncateCB) ( ReorderBuffer *rb, @@ -512,6 +538,12 @@ struct ReorderBuffer HTAB *by_txn; /* + * relfilenode => XID lookup table for sequences created in a transaction + * (also includes altered sequences, which assigns new relfilenode) + */ + HTAB *sequences; + + /* * Transactions that could be a toplevel xact, ordered by LSN of the first * record bearing that xid. */ @@ -541,6 +573,7 @@ struct ReorderBuffer ReorderBufferApplyTruncateCB apply_truncate; ReorderBufferCommitCB commit; ReorderBufferMessageCB message; + ReorderBufferSequenceCB sequence; /* * Callbacks to be called when streaming a transaction at prepare time. @@ -560,6 +593,7 @@ struct ReorderBuffer ReorderBufferStreamCommitCB stream_commit; ReorderBufferStreamChangeCB stream_change; ReorderBufferStreamMessageCB stream_message; + ReorderBufferStreamSequenceCB stream_sequence; ReorderBufferStreamTruncateCB stream_truncate; /* @@ -635,6 +669,10 @@ void ReorderBufferQueueChange(ReorderBuffer *, TransactionId, void ReorderBufferQueueMessage(ReorderBuffer *, TransactionId, Snapshot snapshot, XLogRecPtr lsn, bool transactional, const char *prefix, Size message_size, const char *message); +void ReorderBufferQueueSequence(ReorderBuffer *rb, TransactionId xid, + Snapshot snapshot, XLogRecPtr lsn, RepOriginId origin_id, + RelFileNode rnode, bool transactional, bool created, + ReorderBufferTupleBuf *tuplebuf); void ReorderBufferCommit(ReorderBuffer *, TransactionId, XLogRecPtr commit_lsn, XLogRecPtr end_lsn, TimestampTz commit_time, RepOriginId origin_id, XLogRecPtr origin_lsn); @@ -682,4 +720,7 @@ void ReorderBufferSetRestartPoint(ReorderBuffer *, XLogRecPtr ptr); void StartupReorderBuffer(void); +bool ReorderBufferSequenceIsTransactional(ReorderBuffer *rb, + RelFileNode rnode, bool created); + #endif |
