diff options
| author | Amit Kapila | 2021-03-03 01:58:43 +0000 |
|---|---|---|
| committer | Amit Kapila | 2021-03-03 02:04:11 +0000 |
| commit | 19890a064ebf53dedcefed0d8339ed3d449b06e6 (patch) | |
| tree | 76adf39f2d69ce5d6f8bda9efc48c34c0879e6bc /src/backend/replication | |
| parent | ee28cacf619f4d9c23af5a80e1171a5adae97381 (diff) | |
Add option to enable two_phase commits via pg_create_logical_replication_slot.
Commit 0aa8a01d04 extends the output plugin API to allow decoding of
prepared xacts and allowed the user to enable/disable the two-phase option
via pg_logical_slot_get_changes(). This can lead to a problem such that
the first time when it gets changes via pg_logical_slot_get_changes()
without two_phase option enabled it will not get the prepared even though
prepare is after consistent snapshot. Now next time during getting changes,
if the two_phase option is enabled it can skip prepare because by that
time start decoding point has been moved. So the user will only get commit
prepared.
Allow to enable/disable this option at the create slot time and default
will be false. It will break the existing slots which is fine in a major
release.
Author: Ajin Cherian
Reviewed-by: Amit Kapila and Vignesh C
Discussion: https://postgr.es/m/[email protected]
Diffstat (limited to 'src/backend/replication')
| -rw-r--r-- | src/backend/replication/logical/logical.c | 12 | ||||
| -rw-r--r-- | src/backend/replication/slot.c | 10 | ||||
| -rw-r--r-- | src/backend/replication/slotfuncs.c | 14 | ||||
| -rw-r--r-- | src/backend/replication/walsender.c | 6 |
4 files changed, 35 insertions, 7 deletions
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c index 3f6d723d096..37b75deb728 100644 --- a/src/backend/replication/logical/logical.c +++ b/src/backend/replication/logical/logical.c @@ -431,6 +431,12 @@ CreateInitDecodingContext(const char *plugin, startup_cb_wrapper(ctx, &ctx->options, true); MemoryContextSwitchTo(old_context); + /* + * We allow decoding of prepared transactions iff the two_phase option is + * enabled at the time of slot creation. + */ + ctx->twophase &= MyReplicationSlot->data.two_phase; + ctx->reorder->output_rewrites = ctx->options.receive_rewrites; return ctx; @@ -531,6 +537,12 @@ CreateDecodingContext(XLogRecPtr start_lsn, startup_cb_wrapper(ctx, &ctx->options, false); MemoryContextSwitchTo(old_context); + /* + * We allow decoding of prepared transactions iff the two_phase option is + * enabled at the time of slot creation. + */ + ctx->twophase &= MyReplicationSlot->data.two_phase; + ctx->reorder->output_rewrites = ctx->options.receive_rewrites; ereport(LOG, diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c index fb4af2ef52d..75a087c2f9d 100644 --- a/src/backend/replication/slot.c +++ b/src/backend/replication/slot.c @@ -216,10 +216,17 @@ ReplicationSlotValidateName(const char *name, int elevel) * name: Name of the slot * db_specific: logical decoding is db specific; if the slot is going to * be used for that pass true, otherwise false. + * two_phase: Allows decoding of prepared transactions. We allow this option + * to be enabled only at the slot creation time. If we allow this option + * to be changed during decoding then it is quite possible that we skip + * prepare first time because this option was not enabled. Now next time + * during getting changes, if the two_phase option is enabled it can skip + * prepare because by that time start decoding point has been moved. So the + * user will only get commit prepared. */ void ReplicationSlotCreate(const char *name, bool db_specific, - ReplicationSlotPersistency persistency) + ReplicationSlotPersistency persistency, bool two_phase) { ReplicationSlot *slot = NULL; int i; @@ -277,6 +284,7 @@ ReplicationSlotCreate(const char *name, bool db_specific, namestrcpy(&slot->data.name, name); slot->data.database = db_specific ? MyDatabaseId : InvalidOid; slot->data.persistency = persistency; + slot->data.two_phase = two_phase; /* and then data only present in shared memory */ slot->just_dirtied = false; diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c index d24bb5b0b5f..9817b441136 100644 --- a/src/backend/replication/slotfuncs.c +++ b/src/backend/replication/slotfuncs.c @@ -50,7 +50,7 @@ create_physical_replication_slot(char *name, bool immediately_reserve, /* acquire replication slot, this will check for conflicting names */ ReplicationSlotCreate(name, false, - temporary ? RS_TEMPORARY : RS_PERSISTENT); + temporary ? RS_TEMPORARY : RS_PERSISTENT, false); if (immediately_reserve) { @@ -124,7 +124,8 @@ pg_create_physical_replication_slot(PG_FUNCTION_ARGS) */ static void create_logical_replication_slot(char *name, char *plugin, - bool temporary, XLogRecPtr restart_lsn, + bool temporary, bool two_phase, + XLogRecPtr restart_lsn, bool find_startpoint) { LogicalDecodingContext *ctx = NULL; @@ -140,7 +141,7 @@ create_logical_replication_slot(char *name, char *plugin, * error as well. */ ReplicationSlotCreate(name, true, - temporary ? RS_TEMPORARY : RS_EPHEMERAL); + temporary ? RS_TEMPORARY : RS_EPHEMERAL, two_phase); /* * Create logical decoding context to find start point or, if we don't @@ -177,6 +178,7 @@ pg_create_logical_replication_slot(PG_FUNCTION_ARGS) Name name = PG_GETARG_NAME(0); Name plugin = PG_GETARG_NAME(1); bool temporary = PG_GETARG_BOOL(2); + bool two_phase = PG_GETARG_BOOL(3); Datum result; TupleDesc tupdesc; HeapTuple tuple; @@ -193,6 +195,7 @@ pg_create_logical_replication_slot(PG_FUNCTION_ARGS) create_logical_replication_slot(NameStr(*name), NameStr(*plugin), temporary, + two_phase, InvalidXLogRecPtr, true); @@ -236,7 +239,7 @@ pg_drop_replication_slot(PG_FUNCTION_ARGS) Datum pg_get_replication_slots(PG_FUNCTION_ARGS) { -#define PG_GET_REPLICATION_SLOTS_COLS 13 +#define PG_GET_REPLICATION_SLOTS_COLS 14 ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo; TupleDesc tupdesc; Tuplestorestate *tupstore; @@ -432,6 +435,8 @@ pg_get_replication_slots(PG_FUNCTION_ARGS) values[i++] = Int64GetDatum(failLSN - currlsn); } + values[i++] = BoolGetDatum(slot_contents.data.two_phase); + Assert(i == PG_GET_REPLICATION_SLOTS_COLS); tuplestore_putvalues(tupstore, tupdesc, values, nulls); @@ -796,6 +801,7 @@ copy_replication_slot(FunctionCallInfo fcinfo, bool logical_slot) create_logical_replication_slot(NameStr(*dst_name), plugin, temporary, + false, src_restart_lsn, false); } diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index eb3f18ed487..23baa4498af 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -938,7 +938,8 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd) if (cmd->kind == REPLICATION_KIND_PHYSICAL) { ReplicationSlotCreate(cmd->slotname, false, - cmd->temporary ? RS_TEMPORARY : RS_PERSISTENT); + cmd->temporary ? RS_TEMPORARY : RS_PERSISTENT, + false); } else { @@ -952,7 +953,8 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd) * they get dropped on error as well. */ ReplicationSlotCreate(cmd->slotname, true, - cmd->temporary ? RS_TEMPORARY : RS_EPHEMERAL); + cmd->temporary ? RS_TEMPORARY : RS_EPHEMERAL, + cmd->two_phase); } if (cmd->kind == REPLICATION_KIND_LOGICAL) |
