diff options
Diffstat (limited to 'src/include/replication')
| -rw-r--r-- | src/include/replication/decode.h | 1 | ||||
| -rw-r--r-- | src/include/replication/output_plugin.h | 27 | ||||
| -rw-r--r-- | src/include/replication/reorderbuffer.h | 43 |
3 files changed, 70 insertions, 1 deletions
diff --git a/src/include/replication/decode.h b/src/include/replication/decode.h index a33c2a718a7..8e07bb7409a 100644 --- a/src/include/replication/decode.h +++ b/src/include/replication/decode.h @@ -27,6 +27,7 @@ extern void heap2_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf); extern void xact_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf); extern void standby_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf); extern void logicalmsg_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf); +extern void sequence_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf); extern void LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogReaderState *record); diff --git a/src/include/replication/output_plugin.h b/src/include/replication/output_plugin.h index 41157fda7cc..a16bebf76ca 100644 --- a/src/include/replication/output_plugin.h +++ b/src/include/replication/output_plugin.h @@ -89,6 +89,18 @@ typedef void (*LogicalDecodeMessageCB) (struct LogicalDecodingContext *ctx, const char *message); /* + * Called for the generic logical decoding sequences. + */ +typedef void (*LogicalDecodeSequenceCB) (struct LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, + XLogRecPtr sequence_lsn, + Relation rel, + bool transactional, + int64 last_value, + int64 log_cnt, + bool is_called); + +/* * Filter changes by origin. */ typedef bool (*LogicalDecodeFilterByOriginCB) (struct LogicalDecodingContext *ctx, @@ -200,6 +212,19 @@ typedef void (*LogicalDecodeStreamMessageCB) (struct LogicalDecodingContext *ctx const char *message); /* + * Called for the streaming generic logical decoding sequences from in-progress + * transactions. + */ +typedef void (*LogicalDecodeStreamSequenceCB) (struct LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, + XLogRecPtr sequence_lsn, + Relation rel, + bool transactional, + int64 last_value, + int64 log_cnt, + bool is_called); + +/* * Callback for streaming truncates from in-progress transactions. */ typedef void (*LogicalDecodeStreamTruncateCB) (struct LogicalDecodingContext *ctx, @@ -219,6 +244,7 @@ typedef struct OutputPluginCallbacks LogicalDecodeTruncateCB truncate_cb; LogicalDecodeCommitCB commit_cb; LogicalDecodeMessageCB message_cb; + LogicalDecodeSequenceCB sequence_cb; LogicalDecodeFilterByOriginCB filter_by_origin_cb; LogicalDecodeShutdownCB shutdown_cb; @@ -237,6 +263,7 @@ typedef struct OutputPluginCallbacks LogicalDecodeStreamCommitCB stream_commit_cb; LogicalDecodeStreamChangeCB stream_change_cb; LogicalDecodeStreamMessageCB stream_message_cb; + LogicalDecodeStreamSequenceCB stream_sequence_cb; LogicalDecodeStreamTruncateCB stream_truncate_cb; } OutputPluginCallbacks; diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h index aa0a73382f6..859424bbd9b 100644 --- a/src/include/replication/reorderbuffer.h +++ b/src/include/replication/reorderbuffer.h @@ -64,7 +64,8 @@ enum ReorderBufferChangeType REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT, - REORDER_BUFFER_CHANGE_TRUNCATE + REORDER_BUFFER_CHANGE_TRUNCATE, + REORDER_BUFFER_CHANGE_SEQUENCE }; /* forward declaration */ @@ -158,6 +159,13 @@ typedef struct ReorderBufferChange uint32 ninvalidations; /* Number of messages */ SharedInvalidationMessage *invalidations; /* invalidation message */ } inval; + + /* Context data for Sequence changes */ + struct + { + RelFileNode relnode; + ReorderBufferTupleBuf *tuple; + } sequence; } data; /* @@ -430,6 +438,15 @@ typedef void (*ReorderBufferMessageCB) (ReorderBuffer *rb, const char *prefix, Size sz, const char *message); +/* sequence callback signature */ +typedef void (*ReorderBufferSequenceCB) (ReorderBuffer *rb, + ReorderBufferTXN *txn, + XLogRecPtr sequence_lsn, + Relation rel, + bool transactional, + int64 last_value, int64 log_cnt, + bool is_called); + /* begin prepare callback signature */ typedef void (*ReorderBufferBeginPrepareCB) (ReorderBuffer *rb, ReorderBufferTXN *txn); @@ -496,6 +513,15 @@ typedef void (*ReorderBufferStreamMessageCB) ( const char *prefix, Size sz, const char *message); +/* stream sequence callback signature */ +typedef void (*ReorderBufferStreamSequenceCB) (ReorderBuffer *rb, + ReorderBufferTXN *txn, + XLogRecPtr sequence_lsn, + Relation rel, + bool transactional, + int64 last_value, int64 log_cnt, + bool is_called); + /* stream truncate callback signature */ typedef void (*ReorderBufferStreamTruncateCB) ( ReorderBuffer *rb, @@ -512,6 +538,12 @@ struct ReorderBuffer HTAB *by_txn; /* + * relfilenode => XID lookup table for sequences created in a transaction + * (also includes altered sequences, which assigns new relfilenode) + */ + HTAB *sequences; + + /* * Transactions that could be a toplevel xact, ordered by LSN of the first * record bearing that xid. */ @@ -541,6 +573,7 @@ struct ReorderBuffer ReorderBufferApplyTruncateCB apply_truncate; ReorderBufferCommitCB commit; ReorderBufferMessageCB message; + ReorderBufferSequenceCB sequence; /* * Callbacks to be called when streaming a transaction at prepare time. @@ -560,6 +593,7 @@ struct ReorderBuffer ReorderBufferStreamCommitCB stream_commit; ReorderBufferStreamChangeCB stream_change; ReorderBufferStreamMessageCB stream_message; + ReorderBufferStreamSequenceCB stream_sequence; ReorderBufferStreamTruncateCB stream_truncate; /* @@ -635,6 +669,10 @@ void ReorderBufferQueueChange(ReorderBuffer *, TransactionId, void ReorderBufferQueueMessage(ReorderBuffer *, TransactionId, Snapshot snapshot, XLogRecPtr lsn, bool transactional, const char *prefix, Size message_size, const char *message); +void ReorderBufferQueueSequence(ReorderBuffer *rb, TransactionId xid, + Snapshot snapshot, XLogRecPtr lsn, RepOriginId origin_id, + RelFileNode rnode, bool transactional, bool created, + ReorderBufferTupleBuf *tuplebuf); void ReorderBufferCommit(ReorderBuffer *, TransactionId, XLogRecPtr commit_lsn, XLogRecPtr end_lsn, TimestampTz commit_time, RepOriginId origin_id, XLogRecPtr origin_lsn); @@ -682,4 +720,7 @@ void ReorderBufferSetRestartPoint(ReorderBuffer *, XLogRecPtr ptr); void StartupReorderBuffer(void); +bool ReorderBufferSequenceIsTransactional(ReorderBuffer *rb, + RelFileNode rnode, bool created); + #endif |
