summaryrefslogtreecommitdiff
path: root/src/test
diff options
context:
space:
mode:
Diffstat (limited to 'src/test')
-rw-r--r--src/test/regress/expected/subscription.out24
-rw-r--r--src/test/regress/sql/subscription.sql11
-rw-r--r--src/test/subscription/t/022_twophase_cascade.pl179
-rw-r--r--src/test/subscription/t/023_twophase_stream.pl284
4 files changed, 473 insertions, 25 deletions
diff --git a/src/test/regress/expected/subscription.out b/src/test/regress/expected/subscription.out
index 67f92b38787..77b4437b693 100644
--- a/src/test/regress/expected/subscription.out
+++ b/src/test/regress/expected/subscription.out
@@ -282,27 +282,29 @@ WARNING: tables were not subscribed, you will have to run ALTER SUBSCRIPTION ..
--fail - alter of two_phase option not supported.
ALTER SUBSCRIPTION regress_testsub SET (two_phase = false);
ERROR: unrecognized subscription parameter: "two_phase"
---fail - cannot set streaming when two_phase enabled
+-- but can alter streaming when two_phase enabled
ALTER SUBSCRIPTION regress_testsub SET (streaming = true);
-ERROR: cannot set streaming = true for two-phase enabled subscription
-ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
\dRs+
List of subscriptions
Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Synchronous commit | Conninfo
-----------------+---------------------------+---------+-------------+--------+-----------+------------------+--------------------+-----------------------------
- regress_testsub | regress_subscription_user | f | {testpub} | f | f | p | off | dbname=regress_doesnotexist
+ regress_testsub | regress_subscription_user | f | {testpub} | f | t | p | off | dbname=regress_doesnotexist
(1 row)
+ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
DROP SUBSCRIPTION regress_testsub;
--- fail - two_phase and streaming are mutually exclusive.
-CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (streaming = true, two_phase = true);
-ERROR: two_phase = true and streaming = true are mutually exclusive options
+-- two_phase and streaming are compatible.
+CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, streaming = true, two_phase = true);
+WARNING: tables were not subscribed, you will have to run ALTER SUBSCRIPTION ... REFRESH PUBLICATION to subscribe the tables
\dRs+
- List of subscriptions
- Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Synchronous commit | Conninfo
-------+-------+---------+-------------+--------+-----------+------------------+--------------------+----------
-(0 rows)
+ List of subscriptions
+ Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Synchronous commit | Conninfo
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+--------------------+-----------------------------
+ regress_testsub | regress_subscription_user | f | {testpub} | f | t | p | off | dbname=regress_doesnotexist
+(1 row)
+ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
+DROP SUBSCRIPTION regress_testsub;
RESET SESSION AUTHORIZATION;
DROP ROLE regress_subscription_user;
DROP ROLE regress_subscription_user2;
diff --git a/src/test/regress/sql/subscription.sql b/src/test/regress/sql/subscription.sql
index 88743ab33bd..d42104c1910 100644
--- a/src/test/regress/sql/subscription.sql
+++ b/src/test/regress/sql/subscription.sql
@@ -215,20 +215,21 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB
--fail - alter of two_phase option not supported.
ALTER SUBSCRIPTION regress_testsub SET (two_phase = false);
---fail - cannot set streaming when two_phase enabled
+-- but can alter streaming when two_phase enabled
ALTER SUBSCRIPTION regress_testsub SET (streaming = true);
-ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
-
\dRs+
+ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
DROP SUBSCRIPTION regress_testsub;
--- fail - two_phase and streaming are mutually exclusive.
-CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (streaming = true, two_phase = true);
+-- two_phase and streaming are compatible.
+CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, streaming = true, two_phase = true);
\dRs+
+ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
+DROP SUBSCRIPTION regress_testsub;
RESET SESSION AUTHORIZATION;
DROP ROLE regress_subscription_user;
diff --git a/src/test/subscription/t/022_twophase_cascade.pl b/src/test/subscription/t/022_twophase_cascade.pl
index d7cc99959f8..a47c62d8fde 100644
--- a/src/test/subscription/t/022_twophase_cascade.pl
+++ b/src/test/subscription/t/022_twophase_cascade.pl
@@ -2,11 +2,14 @@
# Copyright (c) 2021, PostgreSQL Global Development Group
# Test cascading logical replication of 2PC.
+#
+# Includes tests for options 2PC (not-streaming) and also for 2PC (streaming).
+#
use strict;
use warnings;
use PostgresNode;
use TestLib;
-use Test::More tests => 27;
+use Test::More tests => 41;
###############################
# Setup a cascade of pub/sub nodes.
@@ -17,20 +20,26 @@ use Test::More tests => 27;
# node_A
my $node_A = PostgresNode->new('node_A');
$node_A->init(allows_streaming => 'logical');
-$node_A->append_conf('postgresql.conf',
- qq(max_prepared_transactions = 10));
+$node_A->append_conf('postgresql.conf', qq(
+max_prepared_transactions = 10
+logical_decoding_work_mem = 64kB
+));
$node_A->start;
# node_B
my $node_B = PostgresNode->new('node_B');
$node_B->init(allows_streaming => 'logical');
-$node_B->append_conf('postgresql.conf',
- qq(max_prepared_transactions = 10));
+$node_B->append_conf('postgresql.conf', qq(
+max_prepared_transactions = 10
+logical_decoding_work_mem = 64kB
+));
$node_B->start;
# node_C
my $node_C = PostgresNode->new('node_C');
$node_C->init(allows_streaming => 'logical');
-$node_C->append_conf('postgresql.conf',
- qq(max_prepared_transactions = 10));
+$node_C->append_conf('postgresql.conf', qq(
+max_prepared_transactions = 10
+logical_decoding_work_mem = 64kB
+));
$node_C->start;
# Create some pre-existing content on node_A
@@ -45,12 +54,29 @@ $node_B->safe_psql('postgres',
$node_C->safe_psql('postgres',
"CREATE TABLE tab_full (a int PRIMARY KEY)");
+# Create some pre-existing content on node_A (for streaming tests)
+$node_A->safe_psql('postgres',
+ "CREATE TABLE test_tab (a int primary key, b varchar)");
+$node_A->safe_psql('postgres',
+ "INSERT INTO test_tab VALUES (1, 'foo'), (2, 'bar')");
+
+# Create the same tables on node_B and node_C
+# columns a and b are compatible with same table name on node_A
+$node_B->safe_psql('postgres',
+ "CREATE TABLE test_tab (a int primary key, b text, c timestamptz DEFAULT now(), d bigint DEFAULT 999)");
+$node_C->safe_psql('postgres',
+ "CREATE TABLE test_tab (a int primary key, b text, c timestamptz DEFAULT now(), d bigint DEFAULT 999)");
+
# Setup logical replication
+# -----------------------
+# 2PC NON-STREAMING TESTS
+# -----------------------
+
# node_A (pub) -> node_B (sub)
my $node_A_connstr = $node_A->connstr . ' dbname=postgres';
$node_A->safe_psql('postgres',
- "CREATE PUBLICATION tap_pub_A FOR TABLE tab_full");
+ "CREATE PUBLICATION tap_pub_A FOR TABLE tab_full, test_tab");
my $appname_B = 'tap_sub_B';
$node_B->safe_psql('postgres', "
CREATE SUBSCRIPTION tap_sub_B
@@ -61,7 +87,7 @@ $node_B->safe_psql('postgres', "
# node_B (pub) -> node_C (sub)
my $node_B_connstr = $node_B->connstr . ' dbname=postgres';
$node_B->safe_psql('postgres',
- "CREATE PUBLICATION tap_pub_B FOR TABLE tab_full");
+ "CREATE PUBLICATION tap_pub_B FOR TABLE tab_full, test_tab");
my $appname_C = 'tap_sub_C';
$node_C->safe_psql('postgres', "
CREATE SUBSCRIPTION tap_sub_C
@@ -203,6 +229,141 @@ is($result, qq(21), 'Rows committed are present on subscriber B');
$result = $node_C->safe_psql('postgres', "SELECT a FROM tab_full where a IN (21,22);");
is($result, qq(21), 'Rows committed are present on subscriber C');
+# ---------------------
+# 2PC + STREAMING TESTS
+# ---------------------
+
+my $oldpid_B = $node_A->safe_psql('postgres', "
+ SELECT pid FROM pg_stat_replication
+ WHERE application_name = '$appname_B';");
+my $oldpid_C = $node_B->safe_psql('postgres', "
+ SELECT pid FROM pg_stat_replication
+ WHERE application_name = '$appname_C';");
+
+# Setup logical replication (streaming = on)
+
+$node_B->safe_psql('postgres', "
+ ALTER SUBSCRIPTION tap_sub_B
+ SET (streaming = on);");
+$node_C->safe_psql('postgres', "
+ ALTER SUBSCRIPTION tap_sub_C
+ SET (streaming = on)");
+
+# Wait for subscribers to finish initialization
+
+$node_A->poll_query_until('postgres', "
+ SELECT pid != $oldpid_B FROM pg_stat_replication
+ WHERE application_name = '$appname_B';"
+) or die "Timed out while waiting for apply to restart";
+$node_B->poll_query_until('postgres', "
+ SELECT pid != $oldpid_C FROM pg_stat_replication
+ WHERE application_name = '$appname_C';"
+) or die "Timed out while waiting for apply to restart";
+
+###############################
+# Test 2PC PREPARE / COMMIT PREPARED.
+# 1. Data is streamed as a 2PC transaction.
+# 2. Then do commit prepared.
+#
+# Expect all data is replicated on subscriber(s) after the commit.
+###############################
+
+# Insert, update and delete enough rows to exceed the 64kB limit.
+# Then 2PC PREPARE
+$node_A->safe_psql('postgres', q{
+ BEGIN;
+ INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(3, 5000) s(i);
+ UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0;
+ DELETE FROM test_tab WHERE mod(a,3) = 0;
+ PREPARE TRANSACTION 'test_prepared_tab';});
+
+$node_A->wait_for_catchup($appname_B);
+$node_B->wait_for_catchup($appname_C);
+
+# check the transaction state is prepared on subscriber(s)
+$result = $node_B->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts;");
+is($result, qq(1), 'transaction is prepared on subscriber B');
+$result = $node_C->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts;");
+is($result, qq(1), 'transaction is prepared on subscriber C');
+
+# 2PC COMMIT
+$node_A->safe_psql('postgres', "COMMIT PREPARED 'test_prepared_tab';");
+
+$node_A->wait_for_catchup($appname_B);
+$node_B->wait_for_catchup($appname_C);
+
+# check that transaction was committed on subscriber(s)
+$result = $node_B->safe_psql('postgres', "SELECT count(*), count(c), count(d = 999) FROM test_tab");
+is($result, qq(3334|3334|3334), 'Rows inserted by 2PC have committed on subscriber B, and extra columns have local defaults');
+$result = $node_C->safe_psql('postgres', "SELECT count(*), count(c), count(d = 999) FROM test_tab");
+is($result, qq(3334|3334|3334), 'Rows inserted by 2PC have committed on subscriber C, and extra columns have local defaults');
+
+# check the transaction state is ended on subscriber(s)
+$result = $node_B->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts;");
+is($result, qq(0), 'transaction is committed on subscriber B');
+$result = $node_C->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts;");
+is($result, qq(0), 'transaction is committed on subscriber C');
+
+###############################
+# Test 2PC PREPARE with a nested ROLLBACK TO SAVEPOINT.
+# 0. Cleanup from previous test leaving only 2 rows.
+# 1. Insert one more row.
+# 2. Record a SAVEPOINT.
+# 3. Data is streamed using 2PC.
+# 4. Do rollback to SAVEPOINT prior to the streamed inserts.
+# 5. Then COMMIT PREPARED.
+#
+# Expect data after the SAVEPOINT is aborted leaving only 3 rows (= 2 original + 1 from step 1).
+###############################
+
+# First, delete the data except for 2 rows (delete will be replicated)
+$node_A->safe_psql('postgres', "DELETE FROM test_tab WHERE a > 2;");
+
+# 2PC PREPARE with a nested ROLLBACK TO SAVEPOINT
+$node_A->safe_psql('postgres', "
+ BEGIN;
+ INSERT INTO test_tab VALUES (9999, 'foobar');
+ SAVEPOINT sp_inner;
+ INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(3, 5000) s(i);
+ UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0;
+ DELETE FROM test_tab WHERE mod(a,3) = 0;
+ ROLLBACK TO SAVEPOINT sp_inner;
+ PREPARE TRANSACTION 'outer';
+ ");
+
+$node_A->wait_for_catchup($appname_B);
+$node_B->wait_for_catchup($appname_C);
+
+# check the transaction state prepared on subscriber(s)
+$result = $node_B->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts;");
+is($result, qq(1), 'transaction is prepared on subscriber B');
+$result = $node_C->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts;");
+is($result, qq(1), 'transaction is prepared on subscriber C');
+
+# 2PC COMMIT
+$node_A->safe_psql('postgres', "COMMIT PREPARED 'outer';");
+
+$node_A->wait_for_catchup($appname_B);
+$node_B->wait_for_catchup($appname_C);
+
+# check the transaction state is ended on subscriber
+$result = $node_B->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts;");
+is($result, qq(0), 'transaction is ended on subscriber B');
+$result = $node_C->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts;");
+is($result, qq(0), 'transaction is ended on subscriber C');
+
+# check inserts are visible at subscriber(s).
+# All the streamed data (prior to the SAVEPOINT) should be rolled back.
+# (9999, 'foobar') should be committed.
+$result = $node_B->safe_psql('postgres', "SELECT count(*) FROM test_tab where b = 'foobar';");
+is($result, qq(1), 'Rows committed are present on subscriber B');
+$result = $node_B->safe_psql('postgres', "SELECT count(*) FROM test_tab;");
+is($result, qq(3), 'Rows committed are present on subscriber B');
+$result = $node_C->safe_psql('postgres', "SELECT count(*) FROM test_tab where b = 'foobar';");
+is($result, qq(1), 'Rows committed are present on subscriber C');
+$result = $node_C->safe_psql('postgres', "SELECT count(*) FROM test_tab;");
+is($result, qq(3), 'Rows committed are present on subscriber C');
+
###############################
# check all the cleanup
###############################
diff --git a/src/test/subscription/t/023_twophase_stream.pl b/src/test/subscription/t/023_twophase_stream.pl
new file mode 100644
index 00000000000..c72c6b5ef41
--- /dev/null
+++ b/src/test/subscription/t/023_twophase_stream.pl
@@ -0,0 +1,284 @@
+
+# Copyright (c) 2021, PostgreSQL Global Development Group
+
+# Test logical replication of 2PC with streaming.
+use strict;
+use warnings;
+use PostgresNode;
+use TestLib;
+use Test::More tests => 18;
+
+###############################
+# Setup
+###############################
+
+# Initialize publisher node
+my $node_publisher = PostgresNode->new('publisher');
+$node_publisher->init(allows_streaming => 'logical');
+$node_publisher->append_conf('postgresql.conf', qq(
+max_prepared_transactions = 10
+logical_decoding_work_mem = 64kB
+));
+$node_publisher->start;
+
+# Create subscriber node
+my $node_subscriber = PostgresNode->new('subscriber');
+$node_subscriber->init(allows_streaming => 'logical');
+$node_subscriber->append_conf('postgresql.conf', qq(
+max_prepared_transactions = 10
+));
+$node_subscriber->start;
+
+# Create some pre-existing content on publisher
+$node_publisher->safe_psql('postgres', "CREATE TABLE test_tab (a int primary key, b varchar)");
+$node_publisher->safe_psql('postgres', "INSERT INTO test_tab VALUES (1, 'foo'), (2, 'bar')");
+
+# Setup structure on subscriber (columns a and b are compatible with same table name on publisher)
+$node_subscriber->safe_psql('postgres',
+ "CREATE TABLE test_tab (a int primary key, b text, c timestamptz DEFAULT now(), d bigint DEFAULT 999)");
+
+# Setup logical replication (streaming = on)
+my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres';
+$node_publisher->safe_psql('postgres', "CREATE PUBLICATION tap_pub FOR TABLE test_tab");
+
+my $appname = 'tap_sub';
+$node_subscriber->safe_psql('postgres', "
+ CREATE SUBSCRIPTION tap_sub
+ CONNECTION '$publisher_connstr application_name=$appname'
+ PUBLICATION tap_pub
+ WITH (streaming = on, two_phase = on)");
+
+# Wait for subscriber to finish initialization
+$node_publisher->wait_for_catchup($appname);
+
+# Also wait for initial table sync to finish
+my $synced_query =
+ "SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');";
+$node_subscriber->poll_query_until('postgres', $synced_query)
+ or die "Timed out while waiting for subscriber to synchronize data";
+
+# Also wait for two-phase to be enabled
+my $twophase_query =
+ "SELECT count(1) = 0 FROM pg_subscription WHERE subtwophasestate NOT IN ('e');";
+$node_subscriber->poll_query_until('postgres', $twophase_query)
+ or die "Timed out while waiting for subscriber to enable twophase";
+
+###############################
+# Check initial data was copied to subscriber
+###############################
+my $result = $node_subscriber->safe_psql('postgres', "SELECT count(*), count(c), count(d = 999) FROM test_tab");
+is($result, qq(2|2|2), 'check initial data was copied to subscriber');
+
+###############################
+# Test 2PC PREPARE / COMMIT PREPARED.
+# 1. Data is streamed as a 2PC transaction.
+# 2. Then do commit prepared.
+#
+# Expect all data is replicated on subscriber side after the commit.
+###############################
+
+# check that 2PC gets replicated to subscriber
+# Insert, update and delete enough rows to exceed the 64kB limit.
+$node_publisher->safe_psql('postgres', q{
+ BEGIN;
+ INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(3, 5000) s(i);
+ UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0;
+ DELETE FROM test_tab WHERE mod(a,3) = 0;
+ PREPARE TRANSACTION 'test_prepared_tab';});
+
+$node_publisher->wait_for_catchup($appname);
+
+# check that transaction is in prepared state on subscriber
+$result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts;");
+is($result, qq(1), 'transaction is prepared on subscriber');
+
+# 2PC transaction gets committed
+$node_publisher->safe_psql('postgres', "COMMIT PREPARED 'test_prepared_tab';");
+
+$node_publisher->wait_for_catchup($appname);
+
+# check that transaction is committed on subscriber
+$result = $node_subscriber->safe_psql('postgres', "SELECT count(*), count(c), count(d = 999) FROM test_tab");
+is($result, qq(3334|3334|3334), 'Rows inserted by 2PC have committed on subscriber, and extra columns contain local defaults');
+$result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts;");
+is($result, qq(0), 'transaction is committed on subscriber');
+
+###############################
+# Test 2PC PREPARE / ROLLBACK PREPARED.
+# 1. Table is deleted back to 2 rows which are replicated on subscriber.
+# 2. Data is streamed using 2PC.
+# 3. Do rollback prepared.
+#
+# Expect data rolls back leaving only the original 2 rows.
+###############################
+
+# First, delete the data except for 2 rows (will be replicated)
+$node_publisher->safe_psql('postgres', "DELETE FROM test_tab WHERE a > 2;");
+
+# Then insert, update and delete enough rows to exceed the 64kB limit.
+$node_publisher->safe_psql('postgres', q{
+ BEGIN;
+ INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(3, 5000) s(i);
+ UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0;
+ DELETE FROM test_tab WHERE mod(a,3) = 0;
+ PREPARE TRANSACTION 'test_prepared_tab';});
+
+$node_publisher->wait_for_catchup($appname);
+
+# check that transaction is in prepared state on subscriber
+$result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts;");
+is($result, qq(1), 'transaction is prepared on subscriber');
+
+# 2PC transaction gets aborted
+$node_publisher->safe_psql('postgres', "ROLLBACK PREPARED 'test_prepared_tab';");
+
+$node_publisher->wait_for_catchup($appname);
+
+# check that transaction is aborted on subscriber
+$result = $node_subscriber->safe_psql('postgres', "SELECT count(*), count(c), count(d = 999) FROM test_tab");
+is($result, qq(2|2|2), 'Rows inserted by 2PC are rolled back, leaving only the original 2 rows');
+
+$result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts;");
+is($result, qq(0), 'transaction is aborted on subscriber');
+
+###############################
+# Check that 2PC COMMIT PREPARED is decoded properly on crash restart.
+# 1. insert, update and delete enough rows to exceed the 64kB limit.
+# 2. Then server crashes before the 2PC transaction is committed.
+# 3. After servers are restarted the pending transaction is committed.
+#
+# Expect all data is replicated on subscriber side after the commit.
+# Note: both publisher and subscriber do crash/restart.
+###############################
+
+$node_publisher->safe_psql('postgres', q{
+ BEGIN;
+ INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(3, 5000) s(i);
+ UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0;
+ DELETE FROM test_tab WHERE mod(a,3) = 0;
+ PREPARE TRANSACTION 'test_prepared_tab';});
+
+$node_subscriber->stop('immediate');
+$node_publisher->stop('immediate');
+
+$node_publisher->start;
+$node_subscriber->start;
+
+# commit post the restart
+$node_publisher->safe_psql('postgres', "COMMIT PREPARED 'test_prepared_tab';");
+$node_publisher->wait_for_catchup($appname);
+
+# check inserts are visible
+$result = $node_subscriber->safe_psql('postgres', "SELECT count(*), count(c), count(d = 999) FROM test_tab");
+is($result, qq(3334|3334|3334), 'Rows inserted by 2PC have committed on subscriber, and extra columns contain local defaults');
+
+###############################
+# Do INSERT after the PREPARE but before ROLLBACK PREPARED.
+# 1. Table is deleted back to 2 rows which are replicated on subscriber.
+# 2. Data is streamed using 2PC.
+# 3. A single row INSERT is done which is after the PREPARE.
+# 4. Then do a ROLLBACK PREPARED.
+#
+# Expect the 2PC data rolls back leaving only 3 rows on the subscriber
+# (the original 2 + inserted 1).
+###############################
+
+# First, delete the data except for 2 rows (will be replicated)
+$node_publisher->safe_psql('postgres', "DELETE FROM test_tab WHERE a > 2;");
+
+# Then insert, update and delete enough rows to exceed the 64kB limit.
+$node_publisher->safe_psql('postgres', q{
+ BEGIN;
+ INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(3, 5000) s(i);
+ UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0;
+ DELETE FROM test_tab WHERE mod(a,3) = 0;
+ PREPARE TRANSACTION 'test_prepared_tab';});
+
+$node_publisher->wait_for_catchup($appname);
+
+# check that transaction is in prepared state on subscriber
+$result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts;");
+is($result, qq(1), 'transaction is prepared on subscriber');
+
+# Insert a different record (now we are outside of the 2PC transaction)
+# Note: the 2PC transaction still holds row locks so make sure this insert is for a separate primary key
+$node_publisher->safe_psql('postgres', "INSERT INTO test_tab VALUES (99999, 'foobar')");
+
+# 2PC transaction gets aborted
+$node_publisher->safe_psql('postgres', "ROLLBACK PREPARED 'test_prepared_tab';");
+
+$node_publisher->wait_for_catchup($appname);
+
+# check that transaction is aborted on subscriber,
+# but the extra INSERT outside of the 2PC still was replicated
+$result = $node_subscriber->safe_psql('postgres', "SELECT count(*), count(c), count(d = 999) FROM test_tab");
+is($result, qq(3|3|3), 'check the outside insert was copied to subscriber');
+
+$result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts;");
+is($result, qq(0), 'transaction is aborted on subscriber');
+
+###############################
+# Do INSERT after the PREPARE but before COMMIT PREPARED.
+# 1. Table is deleted back to 2 rows which are replicated on subscriber.
+# 2. Data is streamed using 2PC.
+# 3. A single row INSERT is done which is after the PREPARE.
+# 4. Then do a COMMIT PREPARED.
+#
+# Expect 2PC data + the extra row are on the subscriber
+# (the 3334 + inserted 1 = 3335).
+###############################
+
+# First, delete the data except for 2 rows (will be replicated)
+$node_publisher->safe_psql('postgres', "DELETE FROM test_tab WHERE a > 2;");
+
+# Then insert, update and delete enough rows to exceed the 64kB limit.
+$node_publisher->safe_psql('postgres', q{
+ BEGIN;
+ INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(3, 5000) s(i);
+ UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0;
+ DELETE FROM test_tab WHERE mod(a,3) = 0;
+ PREPARE TRANSACTION 'test_prepared_tab';});
+
+$node_publisher->wait_for_catchup($appname);
+
+# check that transaction is in prepared state on subscriber
+$result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts;");
+is($result, qq(1), 'transaction is prepared on subscriber');
+
+# Insert a different record (now we are outside of the 2PC transaction)
+# Note: the 2PC transaction still holds row locks so make sure this insert is for a separare primary key
+$node_publisher->safe_psql('postgres', "INSERT INTO test_tab VALUES (99999, 'foobar')");
+
+# 2PC transaction gets committed
+$node_publisher->safe_psql('postgres', "COMMIT PREPARED 'test_prepared_tab';");
+
+$node_publisher->wait_for_catchup($appname);
+
+# check that transaction is committed on subscriber
+$result = $node_subscriber->safe_psql('postgres',
+ "SELECT count(*), count(c), count(d = 999) FROM test_tab");
+is($result, qq(3335|3335|3335), 'Rows inserted by 2PC (as well as outside insert) have committed on subscriber, and extra columns contain local defaults');
+
+$result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts;");
+is($result, qq(0), 'transaction is committed on subscriber');
+
+###############################
+# check all the cleanup
+###############################
+
+$node_subscriber->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub");
+
+$result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM pg_subscription");
+is($result, qq(0), 'check subscription was dropped on subscriber');
+
+$result = $node_publisher->safe_psql('postgres', "SELECT count(*) FROM pg_replication_slots");
+is($result, qq(0), 'check replication slot was dropped on publisher');
+
+$result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM pg_subscription_rel");
+is($result, qq(0), 'check subscription relation status was dropped on subscriber');
+
+$result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM pg_replication_origin");
+is($result, qq(0), 'check replication origin was dropped on subscriber');
+
+$node_subscriber->stop('fast');
+$node_publisher->stop('fast');