Skip to content

Commit ce0fdbf

Browse files
author
Amit Kapila
committed
Allow multiple xacts during table sync in logical replication.
For the initial table data synchronization in logical replication, we use a single transaction to copy the entire table and then synchronize the position in the stream with the main apply worker. There are multiple downsides of this approach: (a) We have to perform the entire copy operation again if there is any error (network breakdown, error in the database operation, etc.) while we synchronize the WAL position between tablesync worker and apply worker; this will be onerous especially for large copies, (b) Using a single transaction in the synchronization-phase (where we can receive WAL from multiple transactions) will have the risk of exceeding the CID limit, (c) The slot will hold the WAL till the entire sync is complete because we never commit till the end. This patch solves all the above downsides by allowing multiple transactions during the tablesync phase. The initial copy is done in a single transaction and after that, we commit each transaction as we receive. To allow recovery after any error or crash, we use a permanent slot and origin to track the progress. The slot and origin will be removed once we finish the synchronization of the table. We also remove slot and origin of tablesync workers if the user performs DROP SUBSCRIPTION .. or ALTER SUBSCRIPTION .. REFERESH and some of the table syncs are still not finished. The commands ALTER SUBSCRIPTION ... REFRESH PUBLICATION and ALTER SUBSCRIPTION ... SET PUBLICATION ... with refresh option as true cannot be executed inside a transaction block because they can now drop the slots for which we have no provision to rollback. This will also open up the path for logical replication of 2PC transactions on the subscriber side. Previously, we can't do that because of the requirement of maintaining a single transaction in tablesync workers. Bump catalog version due to change of state in the catalog (pg_subscription_rel). Author: Peter Smith, Amit Kapila, and Takamichi Osumi Reviewed-by: Ajin Cherian, Petr Jelinek, Hou Zhijie and Amit Kapila Discussion: https://postgr.es/m/CAA4eK1KHJxaZS-fod-0fey=0tq3=Gkn4ho=8N4-5HWiCfu0H1A@mail.gmail.com
1 parent 3063eb1 commit ce0fdbf

File tree

23 files changed

+767
-326
lines changed

23 files changed

+767
-326
lines changed

doc/src/sgml/catalogs.sgml

+1
Original file line numberDiff line numberDiff line change
@@ -7673,6 +7673,7 @@ SCRAM-SHA-256$<replaceable>&lt;iteration count&gt;</replaceable>:<replaceable>&l
76737673
State code:
76747674
<literal>i</literal> = initialize,
76757675
<literal>d</literal> = data is being copied,
7676+
<literal>f</literal> = finished table copy,
76767677
<literal>s</literal> = synchronized,
76777678
<literal>r</literal> = ready (normal replication)
76787679
</para></entry>

doc/src/sgml/logical-replication.sgml

+37-22
Original file line numberDiff line numberDiff line change
@@ -186,9 +186,10 @@
186186

187187
<para>
188188
Each subscription will receive changes via one replication slot (see
189-
<xref linkend="streaming-replication-slots"/>). Additional temporary
190-
replication slots may be required for the initial data synchronization
191-
of pre-existing table data.
189+
<xref linkend="streaming-replication-slots"/>). Additional replication
190+
slots may be required for the initial data synchronization of
191+
pre-existing table data and those will be dropped at the end of data
192+
synchronization.
192193
</para>
193194

194195
<para>
@@ -248,13 +249,23 @@
248249

249250
<para>
250251
As mentioned earlier, each (active) subscription receives changes from a
251-
replication slot on the remote (publishing) side. Normally, the remote
252-
replication slot is created automatically when the subscription is created
253-
using <command>CREATE SUBSCRIPTION</command> and it is dropped
254-
automatically when the subscription is dropped using <command>DROP
255-
SUBSCRIPTION</command>. In some situations, however, it can be useful or
256-
necessary to manipulate the subscription and the underlying replication
257-
slot separately. Here are some scenarios:
252+
replication slot on the remote (publishing) side.
253+
</para>
254+
<para>
255+
Additional table synchronization slots are normally transient, created
256+
internally to perform initial table synchronization and dropped
257+
automatically when they are no longer needed. These table synchronization
258+
slots have generated names: <quote><literal>pg_%u_sync_%u_%llu</literal></quote>
259+
(parameters: Subscription <parameter>oid</parameter>,
260+
Table <parameter>relid</parameter>, system identifier <parameter>sysid</parameter>)
261+
</para>
262+
<para>
263+
Normally, the remote replication slot is created automatically when the
264+
subscription is created using <command>CREATE SUBSCRIPTION</command> and it
265+
is dropped automatically when the subscription is dropped using
266+
<command>DROP SUBSCRIPTION</command>. In some situations, however, it can
267+
be useful or necessary to manipulate the subscription and the underlying
268+
replication slot separately. Here are some scenarios:
258269

259270
<itemizedlist>
260271
<listitem>
@@ -294,8 +305,9 @@
294305
using <command>ALTER SUBSCRIPTION</command> before attempting to drop
295306
the subscription. If the remote database instance no longer exists, no
296307
further action is then necessary. If, however, the remote database
297-
instance is just unreachable, the replication slot should then be
298-
dropped manually; otherwise it would continue to reserve WAL and might
308+
instance is just unreachable, the replication slot (and any still
309+
remaining table synchronization slots) should then be
310+
dropped manually; otherwise it/they would continue to reserve WAL and might
299311
eventually cause the disk to fill up. Such cases should be carefully
300312
investigated.
301313
</para>
@@ -468,16 +480,19 @@
468480
<sect2 id="logical-replication-snapshot">
469481
<title>Initial Snapshot</title>
470482
<para>
471-
The initial data in existing subscribed tables are snapshotted and
472-
copied in a parallel instance of a special kind of apply process.
473-
This process will create its own temporary replication slot and
474-
copy the existing data. Once existing data is copied, the worker
475-
enters synchronization mode, which ensures that the table is brought
476-
up to a synchronized state with the main apply process by streaming
477-
any changes that happened during the initial data copy using standard
478-
logical replication. Once the synchronization is done, the control
479-
of the replication of the table is given back to the main apply
480-
process where the replication continues as normal.
483+
The initial data in existing subscribed tables are snapshotted and
484+
copied in a parallel instance of a special kind of apply process.
485+
This process will create its own replication slot and copy the existing
486+
data. As soon as the copy is finished the table contents will become
487+
visible to other backends. Once existing data is copied, the worker
488+
enters synchronization mode, which ensures that the table is brought
489+
up to a synchronized state with the main apply process by streaming
490+
any changes that happened during the initial data copy using standard
491+
logical replication. During this synchronization phase, the changes
492+
are applied and committed in the same order as they happened on the
493+
publisher. Once the synchronization is done, the control of the
494+
replication of the table is given back to the main apply process where
495+
the replication continues as normal.
481496
</para>
482497
</sect2>
483498
</sect1>

doc/src/sgml/ref/alter_subscription.sgml

+18
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,24 @@ ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> RENAME TO <
4848
(Currently, all subscription owners must be superusers, so the owner checks
4949
will be bypassed in practice. But this might change in the future.)
5050
</para>
51+
52+
<para>
53+
When refreshing a publication we remove the relations that are no longer
54+
part of the publication and we also remove the tablesync slots if there are
55+
any. It is necessary to remove tablesync slots so that the resources
56+
allocated for the subscription on the remote host are released. If due to
57+
network breakdown or some other error, <productname>PostgreSQL</productname>
58+
is unable to remove the slots, an ERROR will be reported. To proceed in this
59+
situation, either the user need to retry the operation or disassociate the
60+
slot from the subscription and drop the subscription as explained in
61+
<xref linkend="sql-dropsubscription"/>.
62+
</para>
63+
64+
<para>
65+
Commands <command>ALTER SUBSCRIPTION ... REFRESH PUBLICATION</command> and
66+
<command>ALTER SUBSCRIPTION ... SET PUBLICATION ...</command> with refresh
67+
option as true cannot be executed inside a transaction block.
68+
</para>
5169
</refsect1>
5270

5371
<refsect1>

doc/src/sgml/ref/drop_subscription.sgml

+4-2
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,8 @@ DROP SUBSCRIPTION [ IF EXISTS ] <replaceable class="parameter">name</replaceable
7979
<para>
8080
When dropping a subscription that is associated with a replication slot on
8181
the remote host (the normal state), <command>DROP SUBSCRIPTION</command>
82-
will connect to the remote host and try to drop the replication slot as
82+
will connect to the remote host and try to drop the replication slot (and
83+
any remaining table synchronization slots) as
8384
part of its operation. This is necessary so that the resources allocated
8485
for the subscription on the remote host are released. If this fails,
8586
either because the remote host is not reachable or because the remote
@@ -89,7 +90,8 @@ DROP SUBSCRIPTION [ IF EXISTS ] <replaceable class="parameter">name</replaceable
8990
executing <literal>ALTER SUBSCRIPTION ... SET (slot_name = NONE)</literal>.
9091
After that, <command>DROP SUBSCRIPTION</command> will no longer attempt any
9192
actions on a remote host. Note that if the remote replication slot still
92-
exists, it should then be dropped manually; otherwise it will continue to
93+
exists, it (and any related table synchronization slots) should then be
94+
dropped manually; otherwise it/they will continue to
9395
reserve WAL and might eventually cause the disk to fill up. See
9496
also <xref linkend="logical-replication-subscription-slot"/>.
9597
</para>

src/backend/access/transam/xact.c

-11
Original file line numberDiff line numberDiff line change
@@ -2432,15 +2432,6 @@ PrepareTransaction(void)
24322432
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
24332433
errmsg("cannot PREPARE a transaction that has exported snapshots")));
24342434

2435-
/*
2436-
* Don't allow PREPARE but for transaction that has/might kill logical
2437-
* replication workers.
2438-
*/
2439-
if (XactManipulatesLogicalReplicationWorkers())
2440-
ereport(ERROR,
2441-
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
2442-
errmsg("cannot PREPARE a transaction that has manipulated logical replication workers")));
2443-
24442435
/* Prevent cancel/die interrupt while cleaning up */
24452436
HOLD_INTERRUPTS();
24462437

@@ -4899,7 +4890,6 @@ CommitSubTransaction(void)
48994890
AtEOSubXact_HashTables(true, s->nestingLevel);
49004891
AtEOSubXact_PgStat(true, s->nestingLevel);
49014892
AtSubCommit_Snapshot(s->nestingLevel);
4902-
AtEOSubXact_ApplyLauncher(true, s->nestingLevel);
49034893

49044894
/*
49054895
* We need to restore the upper transaction's read-only state, in case the
@@ -5059,7 +5049,6 @@ AbortSubTransaction(void)
50595049
AtEOSubXact_HashTables(false, s->nestingLevel);
50605050
AtEOSubXact_PgStat(false, s->nestingLevel);
50615051
AtSubAbort_Snapshot(s->nestingLevel);
5062-
AtEOSubXact_ApplyLauncher(false, s->nestingLevel);
50635052
}
50645053

50655054
/*

src/backend/catalog/pg_subscription.c

+38
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
#include "utils/array.h"
3030
#include "utils/builtins.h"
3131
#include "utils/fmgroids.h"
32+
#include "utils/lsyscache.h"
3233
#include "utils/pg_lsn.h"
3334
#include "utils/rel.h"
3435
#include "utils/syscache.h"
@@ -337,6 +338,13 @@ GetSubscriptionRelState(Oid subid, Oid relid, XLogRecPtr *sublsn)
337338
char substate;
338339
bool isnull;
339340
Datum d;
341+
Relation rel;
342+
343+
/*
344+
* This is to avoid the race condition with AlterSubscription which tries
345+
* to remove this relstate.
346+
*/
347+
rel = table_open(SubscriptionRelRelationId, AccessShareLock);
340348

341349
/* Try finding the mapping. */
342350
tup = SearchSysCache2(SUBSCRIPTIONRELMAP,
@@ -363,6 +371,8 @@ GetSubscriptionRelState(Oid subid, Oid relid, XLogRecPtr *sublsn)
363371
/* Cleanup */
364372
ReleaseSysCache(tup);
365373

374+
table_close(rel, AccessShareLock);
375+
366376
return substate;
367377
}
368378

@@ -403,6 +413,34 @@ RemoveSubscriptionRel(Oid subid, Oid relid)
403413
scan = table_beginscan_catalog(rel, nkeys, skey);
404414
while (HeapTupleIsValid(tup = heap_getnext(scan, ForwardScanDirection)))
405415
{
416+
Form_pg_subscription_rel subrel;
417+
418+
subrel = (Form_pg_subscription_rel) GETSTRUCT(tup);
419+
420+
/*
421+
* We don't allow to drop the relation mapping when the table
422+
* synchronization is in progress unless the caller updates the
423+
* corresponding subscription as well. This is to ensure that we don't
424+
* leave tablesync slots or origins in the system when the
425+
* corresponding table is dropped.
426+
*/
427+
if (!OidIsValid(subid) && subrel->srsubstate != SUBREL_STATE_READY)
428+
{
429+
ereport(ERROR,
430+
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
431+
errmsg("could not drop relation mapping for subscription \"%s\"",
432+
get_subscription_name(subrel->srsubid, false)),
433+
errdetail("Table synchronization for relation \"%s\" is in progress and is in state \"%c\".",
434+
get_rel_name(relid), subrel->srsubstate),
435+
/*
436+
* translator: first %s is a SQL ALTER command and second %s is a
437+
* SQL DROP command
438+
*/
439+
errhint("Use %s to enable subscription if not already enabled or use %s to drop the subscription.",
440+
"ALTER SUBSCRIPTION ... ENABLE",
441+
"DROP SUBSCRIPTION ...")));
442+
}
443+
406444
CatalogTupleDelete(rel, &tup->t_self);
407445
}
408446
table_endscan(scan);

0 commit comments

Comments
 (0)