From 6f77b368ba3d2626838640c934e126ac58b10692 Mon Sep 17 00:00:00 2001 From: Khanna Date: Fri, 22 Nov 2024 12:03:13 +0530 Subject: [PATCH v10] Add support for two-phase commit in pg_createsubscriber This patch introduces the '--enable-two-phase' option to the 'pg_createsubscriber' utility, allowing users to enable two-phase commit for all subscriptions during their creation. By default, two-phase commit is disabled if the option is not provided. When two-phase commit is enabled, prepared transactions are sent to the subscriber at the time of 'PREPARE TRANSACTION', and they are processed as two-phase transactions on the subscriber as well. If disabled, prepared transactions are sent only when committed and are processed immediately by the subscriber. --- doc/src/sgml/ref/pg_createsubscriber.sgml | 28 ++++++++------- src/bin/pg_basebackup/pg_createsubscriber.c | 35 +++++++++++++------ .../t/040_pg_createsubscriber.pl | 12 ++++++- 3 files changed, 51 insertions(+), 24 deletions(-) diff --git a/doc/src/sgml/ref/pg_createsubscriber.sgml b/doc/src/sgml/ref/pg_createsubscriber.sgml index 26b8e64a4e..c9f33d4d3e 100644 --- a/doc/src/sgml/ref/pg_createsubscriber.sgml +++ b/doc/src/sgml/ref/pg_createsubscriber.sgml @@ -165,6 +165,19 @@ PostgreSQL documentation + + + + + + Enables two_phase + commit for the subscription. When multiple databases are specified, this + option applies uniformly to all subscriptions created on those databases. + The default is false. + + + + @@ -300,7 +313,9 @@ PostgreSQL documentation greater than or equal to the number of specified databases. The target server must have configured to a value greater than the number of specified databases. The target server - must accept local connections. + must accept local connections. If you are planning to use the + then you will also need to set the + appropriately. @@ -359,17 +374,6 @@ PostgreSQL documentation pg_createsubscriber. - - pg_createsubscriber sets up logical - replication with two-phase commit disabled. This means that any - prepared transactions will be replicated at the time - of COMMIT PREPARED, without advance preparation. - Once setup is complete, you can manually drop and re-create the - subscription(s) with - the two_phase - option enabled. - - pg_createsubscriber changes the system identifier using pg_resetwal. It would avoid diff --git a/src/bin/pg_basebackup/pg_createsubscriber.c b/src/bin/pg_basebackup/pg_createsubscriber.c index faf18ccf13..86bd798a55 100644 --- a/src/bin/pg_basebackup/pg_createsubscriber.c +++ b/src/bin/pg_basebackup/pg_createsubscriber.c @@ -38,6 +38,7 @@ struct CreateSubscriberOptions char *socket_dir; /* directory for Unix-domain socket, if any */ char *sub_port; /* subscriber port number */ const char *sub_username; /* subscriber username */ + bool two_phase; /* enable-two-phase option */ SimpleStringList database_names; /* list of database names */ SimpleStringList pub_names; /* list of publication names */ SimpleStringList sub_names; /* list of subscription names */ @@ -79,7 +80,7 @@ static void check_publisher(const struct LogicalRepInfo *dbinfo); static char *setup_publisher(struct LogicalRepInfo *dbinfo); static void check_subscriber(const struct LogicalRepInfo *dbinfo); static void setup_subscriber(struct LogicalRepInfo *dbinfo, - const char *consistent_lsn); + const char *consistent_lsn, bool two_phase); static void setup_recovery(const struct LogicalRepInfo *dbinfo, const char *datadir, const char *lsn); static void drop_primary_replication_slot(struct LogicalRepInfo *dbinfo, @@ -98,7 +99,9 @@ static void wait_for_end_recovery(const char *conninfo, const struct CreateSubscriberOptions *opt); static void create_publication(PGconn *conn, struct LogicalRepInfo *dbinfo); static void drop_publication(PGconn *conn, struct LogicalRepInfo *dbinfo); -static void create_subscription(PGconn *conn, const struct LogicalRepInfo *dbinfo); +static void create_subscription(PGconn *conn, + const struct LogicalRepInfo *dbinfo, + bool two_phase); static void set_replication_progress(PGconn *conn, const struct LogicalRepInfo *dbinfo, const char *lsn); static void enable_subscription(PGconn *conn, const struct LogicalRepInfo *dbinfo); @@ -227,6 +230,7 @@ usage(void) printf(_(" -P, --publisher-server=CONNSTR publisher connection string\n")); printf(_(" -s, --socketdir=DIR socket directory to use (default current dir.)\n")); printf(_(" -t, --recovery-timeout=SECS seconds to wait for recovery to end\n")); + printf(_(" -T, --enable-two-phase enable two-phase commit for all subscriptions\n")); printf(_(" -U, --subscriber-username=NAME user name for subscriber connection\n")); printf(_(" -v, --verbose output verbose messages\n")); printf(_(" --config-file=FILENAME use specified main server configuration\n" @@ -479,9 +483,10 @@ store_pub_sub_info(const struct CreateSubscriberOptions *opt, dbinfo[i].pubname ? dbinfo[i].pubname : "(auto)", dbinfo[i].replslotname ? dbinfo[i].replslotname : "(auto)", dbinfo[i].pubconninfo); - pg_log_debug("subscriber(%d): subscription: %s ; connection string: %s", i, + pg_log_debug("subscriber(%d): subscription: %s ; connection string: %s, two_phase: %s", i, dbinfo[i].subname ? dbinfo[i].subname : "(auto)", - dbinfo[i].subconninfo); + dbinfo[i].subconninfo, + opt->two_phase ? "true" : "false"); if (num_pubs > 0) pubcell = pubcell->next; @@ -1138,7 +1143,8 @@ check_and_drop_existing_subscriptions(PGconn *conn, * replication setup. */ static void -setup_subscriber(struct LogicalRepInfo *dbinfo, const char *consistent_lsn) +setup_subscriber(struct LogicalRepInfo *dbinfo, const char *consistent_lsn, + bool two_phase) { for (int i = 0; i < num_dbs; i++) { @@ -1162,7 +1168,7 @@ setup_subscriber(struct LogicalRepInfo *dbinfo, const char *consistent_lsn) */ drop_publication(conn, &dbinfo[i]); - create_subscription(conn, &dbinfo[i]); + create_subscription(conn, &dbinfo[i], two_phase); /* Set the replication progress to the correct LSN */ set_replication_progress(conn, &dbinfo[i], consistent_lsn); @@ -1677,7 +1683,8 @@ drop_publication(PGconn *conn, struct LogicalRepInfo *dbinfo) * initial location. */ static void -create_subscription(PGconn *conn, const struct LogicalRepInfo *dbinfo) +create_subscription(PGconn *conn, const struct LogicalRepInfo *dbinfo, + bool two_phase) { PQExpBuffer str = createPQExpBuffer(); PGresult *res; @@ -1699,8 +1706,9 @@ create_subscription(PGconn *conn, const struct LogicalRepInfo *dbinfo) appendPQExpBuffer(str, "CREATE SUBSCRIPTION %s CONNECTION %s PUBLICATION %s " "WITH (create_slot = false, enabled = false, " - "slot_name = %s, copy_data = false)", - subname_esc, pubconninfo_esc, pubname_esc, replslotname_esc); + "slot_name = %s, copy_data = false, two_phase = %s)", + subname_esc, pubconninfo_esc, pubname_esc, replslotname_esc, + two_phase ? "true" : "false"); pg_free(pubname_esc); pg_free(subname_esc); @@ -1872,6 +1880,7 @@ main(int argc, char **argv) {"publisher-server", required_argument, NULL, 'P'}, {"socketdir", required_argument, NULL, 's'}, {"recovery-timeout", required_argument, NULL, 't'}, + {"enable-two-phase", no_argument, NULL, 'T'}, {"subscriber-username", required_argument, NULL, 'U'}, {"verbose", no_argument, NULL, 'v'}, {"version", no_argument, NULL, 'V'}, @@ -1927,6 +1936,7 @@ main(int argc, char **argv) opt.socket_dir = NULL; opt.sub_port = DEFAULT_SUB_PORT; opt.sub_username = NULL; + opt.two_phase = false; opt.database_names = (SimpleStringList) { 0 @@ -1949,7 +1959,7 @@ main(int argc, char **argv) get_restricted_token(); - while ((c = getopt_long(argc, argv, "d:D:np:P:s:t:U:v", + while ((c = getopt_long(argc, argv, "d:D:np:P:s:t:TU:v", long_options, &option_index)) != -1) { switch (c) @@ -1986,6 +1996,9 @@ main(int argc, char **argv) case 't': opt.recovery_timeout = atoi(optarg); break; + case 'T': + opt.two_phase = true; + break; case 'U': opt.sub_username = pg_strdup(optarg); break; @@ -2229,7 +2242,7 @@ main(int argc, char **argv) * point to the LSN reported by setup_publisher(). It also cleans up * publications created by this tool and replication to the standby. */ - setup_subscriber(dbinfo, consistent_lsn); + setup_subscriber(dbinfo, consistent_lsn, opt.two_phase); /* Remove primary_slot_name if it exists on primary */ drop_primary_replication_slot(dbinfo, primary_slot_name); diff --git a/src/bin/pg_basebackup/t/040_pg_createsubscriber.pl b/src/bin/pg_basebackup/t/040_pg_createsubscriber.pl index 5426159fa5..bd9a4d5032 100644 --- a/src/bin/pg_basebackup/t/040_pg_createsubscriber.pl +++ b/src/bin/pg_basebackup/t/040_pg_createsubscriber.pl @@ -357,6 +357,7 @@ command_ok( 'run pg_createsubscriber without --databases'); # Run pg_createsubscriber on node S +# In passing, also test the --enable-two-phase option command_ok( [ 'pg_createsubscriber', '--verbose', @@ -371,7 +372,7 @@ command_ok( 'replslot1', '--replication-slot', 'replslot2', '--database', $db1, '--database', - $db2 + $db2, '--enable-two-phase' ], 'run pg_createsubscriber on node S'); @@ -390,6 +391,15 @@ $node_p->safe_psql($db2, "INSERT INTO tbl2 VALUES('row 1')"); # Start subscriber $node_s->start; +# Verify that all subtwophase states are pending or enabled, +# e.g. there are no subscriptions where subtwophase is disabled ('d') +is( $node_s->safe_psql( + 'postgres', + "SELECT count(1) = 0 FROM pg_subscription WHERE subtwophasestate = 'd'" + ), + 't', + 'subscriptions are created with the two-phase option enabled'); + # Confirm the pre-existing subscription has been removed $result = $node_s->safe_psql( 'postgres', qq( -- 2.41.0.windows.3