|
1 | 1 | # Copyright (c) 2025, PostgreSQL Global Development Group
|
2 | 2 |
|
3 |
| -# Test the conflict detection of conflict type 'multiple_unique_conflicts'. |
| 3 | +# Test conflicts in logical replication |
4 | 4 | use strict;
|
5 | 5 | use warnings FATAL => 'all';
|
6 | 6 | use PostgreSQL::Test::Cluster;
|
|
18 | 18 |
|
19 | 19 | # Create a subscriber node
|
20 | 20 | my $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber');
|
21 |
| -$node_subscriber->init; |
| 21 | +$node_subscriber->init(allows_streaming => 'logical'); |
22 | 22 | $node_subscriber->start;
|
23 | 23 |
|
24 | 24 | # Create a table on publisher
|
|
145 | 145 |
|
146 | 146 | pass('multiple_unique_conflicts detected on a leaf partition during insert');
|
147 | 147 |
|
| 148 | +############################################################################### |
| 149 | +# Setup a bidirectional logical replication between node_A & node_B |
| 150 | +############################################################################### |
| 151 | + |
| 152 | +# Initialize nodes. |
| 153 | + |
| 154 | +# node_A. Increase the log_min_messages setting to DEBUG2 to debug test |
| 155 | +# failures. Disable autovacuum to avoid generating xid that could affect the |
| 156 | +# replication slot's xmin value. |
| 157 | +my $node_A = $node_publisher; |
| 158 | +$node_A->append_conf( |
| 159 | + 'postgresql.conf', |
| 160 | + qq{autovacuum = off |
| 161 | + log_min_messages = 'debug2'}); |
| 162 | +$node_A->restart; |
| 163 | + |
| 164 | +# node_B |
| 165 | +my $node_B = $node_subscriber; |
| 166 | +$node_B->append_conf('postgresql.conf', "track_commit_timestamp = on"); |
| 167 | +$node_B->restart; |
| 168 | + |
| 169 | +# Create table on node_A |
| 170 | +$node_A->safe_psql('postgres', "CREATE TABLE tab (a int PRIMARY KEY, b int)"); |
| 171 | + |
| 172 | +# Create the same table on node_B |
| 173 | +$node_B->safe_psql('postgres', "CREATE TABLE tab (a int PRIMARY KEY, b int)"); |
| 174 | + |
| 175 | +my $subname_AB = 'tap_sub_a_b'; |
| 176 | +my $subname_BA = 'tap_sub_b_a'; |
| 177 | + |
| 178 | +# Setup logical replication |
| 179 | +# node_A (pub) -> node_B (sub) |
| 180 | +my $node_A_connstr = $node_A->connstr . ' dbname=postgres'; |
| 181 | +$node_A->safe_psql('postgres', "CREATE PUBLICATION tap_pub_A FOR TABLE tab"); |
| 182 | +$node_B->safe_psql( |
| 183 | + 'postgres', " |
| 184 | + CREATE SUBSCRIPTION $subname_BA |
| 185 | + CONNECTION '$node_A_connstr application_name=$subname_BA' |
| 186 | + PUBLICATION tap_pub_A |
| 187 | + WITH (origin = none, retain_conflict_info = true)"); |
| 188 | + |
| 189 | +# node_B (pub) -> node_A (sub) |
| 190 | +my $node_B_connstr = $node_B->connstr . ' dbname=postgres'; |
| 191 | +$node_B->safe_psql('postgres', "CREATE PUBLICATION tap_pub_B FOR TABLE tab"); |
| 192 | +$node_A->safe_psql( |
| 193 | + 'postgres', " |
| 194 | + CREATE SUBSCRIPTION $subname_AB |
| 195 | + CONNECTION '$node_B_connstr application_name=$subname_AB' |
| 196 | + PUBLICATION tap_pub_B |
| 197 | + WITH (origin = none, copy_data = off, retain_conflict_info = true)"); |
| 198 | + |
| 199 | +# Wait for initial table sync to finish |
| 200 | +$node_A->wait_for_subscription_sync($node_B, $subname_AB); |
| 201 | +$node_B->wait_for_subscription_sync($node_A, $subname_BA); |
| 202 | + |
| 203 | +is(1, 1, 'Bidirectional replication setup is complete'); |
| 204 | + |
| 205 | +# Confirm that the additional replication slot is created on both nodes and the |
| 206 | +# xmin value is valid. |
| 207 | + |
| 208 | +ok( $node_A->poll_query_until( |
| 209 | + 'postgres', |
| 210 | + "SELECT xmin IS NOT NULL from pg_replication_slots WHERE slot_name = 'pg_conflict_detection'" |
| 211 | + ), |
| 212 | + "the xmin value of slot 'pg_conflict_detection' is valid on Node A"); |
| 213 | + |
| 214 | +ok( $node_B->poll_query_until( |
| 215 | + 'postgres', |
| 216 | + "SELECT xmin IS NOT NULL from pg_replication_slots WHERE slot_name = 'pg_conflict_detection'" |
| 217 | + ), |
| 218 | + "the xmin value of slot 'pg_conflict_detection' is valid on Node B"); |
| 219 | + |
| 220 | +my $result = $node_A->safe_psql('postgres', |
| 221 | + "SELECT retain_conflict_info FROM pg_stat_subscription WHERE subname='$subname_AB';"); |
| 222 | +is($result, qq(t), 'worker on node A retains conflict information'); |
| 223 | +$result = $node_B->safe_psql('postgres', |
| 224 | + "SELECT retain_conflict_info FROM pg_stat_subscription WHERE subname='$subname_BA';"); |
| 225 | +is($result, qq(t), 'worker on node B retains conflict information'); |
| 226 | + |
| 227 | +############################################################################### |
| 228 | +# Check that dead tuples on node A cannot be cleaned by VACUUM until the |
| 229 | +# concurrent transactions on Node B have been applied and flushed on Node A. |
| 230 | +############################################################################### |
| 231 | + |
| 232 | +# Insert a record |
| 233 | +$node_A->safe_psql('postgres', "INSERT INTO tab VALUES (1, 1), (2, 2);"); |
| 234 | +$node_A->wait_for_catchup($subname_BA); |
| 235 | + |
| 236 | +$result = $node_B->safe_psql('postgres', "SELECT * FROM tab;"); |
| 237 | +is($result, qq(1|1 |
| 238 | +2|2), 'check replicated insert on node B'); |
| 239 | + |
| 240 | +# Disable the logical replication from node B to node A |
| 241 | +$node_A->safe_psql('postgres', "ALTER SUBSCRIPTION $subname_AB DISABLE"); |
| 242 | + |
| 243 | +my $log_location = -s $node_B->logfile; |
| 244 | + |
| 245 | +$node_B->safe_psql('postgres', "UPDATE tab SET b = 3 WHERE a = 1;"); |
| 246 | +$node_A->safe_psql('postgres', "DELETE FROM tab WHERE a = 1;"); |
| 247 | + |
| 248 | +$node_A->wait_for_catchup($subname_BA); |
| 249 | + |
| 250 | +my ($cmdret, $stdout, $stderr) = $node_A->psql( |
| 251 | + 'postgres', qq(VACUUM (verbose) public.tab;) |
| 252 | +); |
| 253 | + |
| 254 | +ok( $stderr =~ |
| 255 | + qr/1 are dead but not yet removable/, |
| 256 | + 'the deleted column is non-removable'); |
| 257 | + |
| 258 | +$node_A->safe_psql( |
| 259 | + 'postgres', "ALTER SUBSCRIPTION $subname_AB ENABLE;"); |
| 260 | +$node_B->wait_for_catchup($subname_AB); |
| 261 | + |
| 262 | +# Remember the next transaction ID to be assigned |
| 263 | +my $next_xid = $node_A->safe_psql('postgres', "SELECT txid_current() + 1;"); |
| 264 | + |
| 265 | +# Confirm that the xmin value is updated |
| 266 | +ok( $node_A->poll_query_until( |
| 267 | + 'postgres', |
| 268 | + "SELECT xmin = $next_xid from pg_replication_slots WHERE slot_name = 'pg_conflict_detection'" |
| 269 | + ), |
| 270 | + "the xmin value of slot 'pg_conflict_detection' is updated on Node A"); |
| 271 | + |
| 272 | +############################################################################### |
| 273 | +# Check that the replication slot pg_conflict_detection is dropped after |
| 274 | +# removing all the subscriptions. |
| 275 | +############################################################################### |
| 276 | + |
| 277 | +$node_B->safe_psql( |
| 278 | + 'postgres', "DROP SUBSCRIPTION $subname_BA"); |
| 279 | + |
| 280 | +ok( $node_B->poll_query_until( |
| 281 | + 'postgres', |
| 282 | + "SELECT count(*) = 0 FROM pg_replication_slots WHERE slot_name = 'pg_conflict_detection'" |
| 283 | + ), |
| 284 | + "the slot 'pg_conflict_detection' has been dropped on Node B"); |
| 285 | + |
| 286 | +$node_A->safe_psql( |
| 287 | + 'postgres', "DROP SUBSCRIPTION $subname_AB"); |
| 288 | + |
| 289 | +ok( $node_A->poll_query_until( |
| 290 | + 'postgres', |
| 291 | + "SELECT count(*) = 0 FROM pg_replication_slots WHERE slot_name = 'pg_conflict_detection'" |
| 292 | + ), |
| 293 | + "the slot 'pg_conflict_detection' has been dropped on Node A"); |
| 294 | + |
| 295 | + |
148 | 296 | done_testing();
|
0 commit comments