summaryrefslogtreecommitdiff
path: root/contrib/test_decoding/test_decoding.c
diff options
context:
space:
mode:
authorAmit Kapila2020-12-30 10:47:26 +0000
committerAmit Kapila2020-12-30 10:47:26 +0000
commit0aa8a01d04c8fe200b7a106878eebc3d0af9105c (patch)
tree79fe885496f4d3493ae327156c0baf1aa0e1e43a /contrib/test_decoding/test_decoding.c
parentfa744697c79189a661f802d9a979d959b4454df0 (diff)
Extend the output plugin API to allow decoding of prepared xacts.
This adds six methods to the output plugin API, adding support for streaming changes of two-phase transactions at prepare time. * begin_prepare * filter_prepare * prepare * commit_prepared * rollback_prepared * stream_prepare Most of this is a simple extension of the existing methods, with the semantic difference that the transaction is not yet committed and maybe aborted later. Until now two-phase transactions were translated into regular transactions on the subscriber, and the GID was not forwarded to it. None of the two-phase commands were communicated to the subscriber. This patch provides the infrastructure for logical decoding plugins to be informed of two-phase commands Like PREPARE TRANSACTION, COMMIT PREPARED and ROLLBACK PREPARED commands with the corresponding GID. This also extends the 'test_decoding' plugin, implementing these new methods. This commit simply adds these new APIs and the upcoming patch to "allow the decoding at prepare time in ReorderBuffer" will use these APIs. Author: Ajin Cherian and Amit Kapila based on previous work by Nikhil Sontakke and Stas Kelvich Reviewed-by: Amit Kapila, Peter Smith, Sawada Masahiko, and Dilip Kumar Discussion: https://postgr.es/m/[email protected] https://postgr.es/m/CAMGcDxeqEpWj3fTXwqhSwBdXd2RS9jzwWscO-XbeCfso6ts3+Q@mail.gmail.com
Diffstat (limited to 'contrib/test_decoding/test_decoding.c')
-rw-r--r--contrib/test_decoding/test_decoding.c167
1 files changed, 167 insertions, 0 deletions
diff --git a/contrib/test_decoding/test_decoding.c b/contrib/test_decoding/test_decoding.c
index e12278beb58..05763553a40 100644
--- a/contrib/test_decoding/test_decoding.c
+++ b/contrib/test_decoding/test_decoding.c
@@ -76,6 +76,20 @@ static void pg_decode_message(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn, XLogRecPtr message_lsn,
bool transactional, const char *prefix,
Size sz, const char *message);
+static bool pg_decode_filter_prepare(LogicalDecodingContext *ctx,
+ const char *gid);
+static void pg_decode_begin_prepare_txn(LogicalDecodingContext *ctx,
+ ReorderBufferTXN *txn);
+static void pg_decode_prepare_txn(LogicalDecodingContext *ctx,
+ ReorderBufferTXN *txn,
+ XLogRecPtr prepare_lsn);
+static void pg_decode_commit_prepared_txn(LogicalDecodingContext *ctx,
+ ReorderBufferTXN *txn,
+ XLogRecPtr commit_lsn);
+static void pg_decode_rollback_prepared_txn(LogicalDecodingContext *ctx,
+ ReorderBufferTXN *txn,
+ XLogRecPtr prepare_end_lsn,
+ TimestampTz prepare_time);
static void pg_decode_stream_start(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn);
static void pg_output_stream_start(LogicalDecodingContext *ctx,
@@ -87,6 +101,9 @@ static void pg_decode_stream_stop(LogicalDecodingContext *ctx,
static void pg_decode_stream_abort(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
XLogRecPtr abort_lsn);
+static void pg_decode_stream_prepare(LogicalDecodingContext *ctx,
+ ReorderBufferTXN *txn,
+ XLogRecPtr prepare_lsn);
static void pg_decode_stream_commit(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
XLogRecPtr commit_lsn);
@@ -123,9 +140,15 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb)
cb->filter_by_origin_cb = pg_decode_filter;
cb->shutdown_cb = pg_decode_shutdown;
cb->message_cb = pg_decode_message;
+ cb->filter_prepare_cb = pg_decode_filter_prepare;
+ cb->begin_prepare_cb = pg_decode_begin_prepare_txn;
+ cb->prepare_cb = pg_decode_prepare_txn;
+ cb->commit_prepared_cb = pg_decode_commit_prepared_txn;
+ cb->rollback_prepared_cb = pg_decode_rollback_prepared_txn;
cb->stream_start_cb = pg_decode_stream_start;
cb->stream_stop_cb = pg_decode_stream_stop;
cb->stream_abort_cb = pg_decode_stream_abort;
+ cb->stream_prepare_cb = pg_decode_stream_prepare;
cb->stream_commit_cb = pg_decode_stream_commit;
cb->stream_change_cb = pg_decode_stream_change;
cb->stream_message_cb = pg_decode_stream_message;
@@ -141,6 +164,7 @@ pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
ListCell *option;
TestDecodingData *data;
bool enable_streaming = false;
+ bool enable_twophase = false;
data = palloc0(sizeof(TestDecodingData));
data->context = AllocSetContextCreate(ctx->context,
@@ -241,6 +265,16 @@ pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
errmsg("could not parse value \"%s\" for parameter \"%s\"",
strVal(elem->arg), elem->defname)));
}
+ else if (strcmp(elem->defname, "two-phase-commit") == 0)
+ {
+ if (elem->arg == NULL)
+ continue;
+ else if (!parse_bool(strVal(elem->arg), &enable_twophase))
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ errmsg("could not parse value \"%s\" for parameter \"%s\"",
+ strVal(elem->arg), elem->defname)));
+ }
else
{
ereport(ERROR,
@@ -252,6 +286,7 @@ pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
}
ctx->streaming &= enable_streaming;
+ ctx->twophase &= enable_twophase;
}
/* cleanup this plugin's resources */
@@ -320,6 +355,111 @@ pg_decode_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
OutputPluginWrite(ctx, true);
}
+/* BEGIN PREPARE callback */
+static void
+pg_decode_begin_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
+{
+ TestDecodingData *data = ctx->output_plugin_private;
+ TestDecodingTxnData *txndata =
+ MemoryContextAllocZero(ctx->context, sizeof(TestDecodingTxnData));
+
+ txndata->xact_wrote_changes = false;
+ txn->output_plugin_private = txndata;
+
+ if (data->skip_empty_xacts)
+ return;
+
+ pg_output_begin(ctx, data, txn, true);
+}
+
+/* PREPARE callback */
+static void
+pg_decode_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
+ XLogRecPtr prepare_lsn)
+{
+ TestDecodingData *data = ctx->output_plugin_private;
+ TestDecodingTxnData *txndata = txn->output_plugin_private;
+
+ if (data->skip_empty_xacts && !txndata->xact_wrote_changes)
+ return;
+
+ OutputPluginPrepareWrite(ctx, true);
+
+ appendStringInfo(ctx->out, "PREPARE TRANSACTION %s",
+ quote_literal_cstr(txn->gid));
+
+ if (data->include_xids)
+ appendStringInfo(ctx->out, ", txid %u", txn->xid);
+
+ if (data->include_timestamp)
+ appendStringInfo(ctx->out, " (at %s)",
+ timestamptz_to_str(txn->commit_time));
+
+ OutputPluginWrite(ctx, true);
+}
+
+/* COMMIT PREPARED callback */
+static void
+pg_decode_commit_prepared_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
+ XLogRecPtr commit_lsn)
+{
+ TestDecodingData *data = ctx->output_plugin_private;
+
+ OutputPluginPrepareWrite(ctx, true);
+
+ appendStringInfo(ctx->out, "COMMIT PREPARED %s",
+ quote_literal_cstr(txn->gid));
+
+ if (data->include_xids)
+ appendStringInfo(ctx->out, ", txid %u", txn->xid);
+
+ if (data->include_timestamp)
+ appendStringInfo(ctx->out, " (at %s)",
+ timestamptz_to_str(txn->commit_time));
+
+ OutputPluginWrite(ctx, true);
+}
+
+/* ROLLBACK PREPARED callback */
+static void
+pg_decode_rollback_prepared_txn(LogicalDecodingContext *ctx,
+ ReorderBufferTXN *txn,
+ XLogRecPtr prepare_end_lsn,
+ TimestampTz prepare_time)
+{
+ TestDecodingData *data = ctx->output_plugin_private;
+
+ OutputPluginPrepareWrite(ctx, true);
+
+ appendStringInfo(ctx->out, "ROLLBACK PREPARED %s",
+ quote_literal_cstr(txn->gid));
+
+ if (data->include_xids)
+ appendStringInfo(ctx->out, ", txid %u", txn->xid);
+
+ if (data->include_timestamp)
+ appendStringInfo(ctx->out, " (at %s)",
+ timestamptz_to_str(txn->commit_time));
+
+ OutputPluginWrite(ctx, true);
+}
+
+/*
+ * Filter out two-phase transactions.
+ *
+ * Each plugin can implement its own filtering logic. Here we demonstrate a
+ * simple logic by checking the GID. If the GID contains the "_nodecode"
+ * substring, then we filter it out.
+ */
+static bool
+pg_decode_filter_prepare(LogicalDecodingContext *ctx, const char *gid)
+{
+ if (strstr(gid, "_nodecode") != NULL)
+ return true;
+
+ return false;
+}
+
static bool
pg_decode_filter(LogicalDecodingContext *ctx,
RepOriginId origin_id)
@@ -702,6 +842,33 @@ pg_decode_stream_abort(LogicalDecodingContext *ctx,
}
static void
+pg_decode_stream_prepare(LogicalDecodingContext *ctx,
+ ReorderBufferTXN *txn,
+ XLogRecPtr prepare_lsn)
+{
+ TestDecodingData *data = ctx->output_plugin_private;
+ TestDecodingTxnData *txndata = txn->output_plugin_private;
+
+ if (data->skip_empty_xacts && !txndata->xact_wrote_changes)
+ return;
+
+ OutputPluginPrepareWrite(ctx, true);
+
+ if (data->include_xids)
+ appendStringInfo(ctx->out, "preparing streamed transaction TXN %s, txid %u",
+ quote_literal_cstr(txn->gid), txn->xid);
+ else
+ appendStringInfo(ctx->out, "preparing streamed transaction %s",
+ quote_literal_cstr(txn->gid));
+
+ if (data->include_timestamp)
+ appendStringInfo(ctx->out, " (at %s)",
+ timestamptz_to_str(txn->commit_time));
+
+ OutputPluginWrite(ctx, true);
+}
+
+static void
pg_decode_stream_commit(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
XLogRecPtr commit_lsn)