Skip to content

Commit 216a784

Browse files
author
Amit Kapila
committed
Perform apply of large transactions by parallel workers.
Currently, for large transactions, the publisher sends the data in multiple streams (changes divided into chunks depending upon logical_decoding_work_mem), and then on the subscriber-side, the apply worker writes the changes into temporary files and once it receives the commit, it reads from those files and applies the entire transaction. To improve the performance of such transactions, we can instead allow them to be applied via parallel workers. In this approach, we assign a new parallel apply worker (if available) as soon as the xact's first stream is received and the leader apply worker will send changes to this new worker via shared memory. The parallel apply worker will directly apply the change instead of writing it to temporary files. However, if the leader apply worker times out while attempting to send a message to the parallel apply worker, it will switch to "partial serialize" mode - in this mode, the leader serializes all remaining changes to a file and notifies the parallel apply workers to read and apply them at the end of the transaction. We use a non-blocking way to send the messages from the leader apply worker to the parallel apply to avoid deadlocks. We keep this parallel apply assigned till the transaction commit is received and also wait for the worker to finish at commit. This preserves commit ordering and avoid writing to and reading from files in most cases. We still need to spill if there is no worker available. This patch also extends the SUBSCRIPTION 'streaming' parameter so that the user can control whether to apply the streaming transaction in a parallel apply worker or spill the change to disk. The user can set the streaming parameter to 'on/off', or 'parallel'. The parameter value 'parallel' means the streaming will be applied via a parallel apply worker, if available. The parameter value 'on' means the streaming transaction will be spilled to disk. The default value is 'off' (same as current behaviour). In addition, the patch extends the logical replication STREAM_ABORT message so that abort_lsn and abort_time can also be sent which can be used to update the replication origin in parallel apply worker when the streaming transaction is aborted. Because this message extension is needed to support parallel streaming, parallel streaming is not supported for publications on servers < PG16. Author: Hou Zhijie, Wang wei, Amit Kapila with design inputs from Sawada Masahiko Reviewed-by: Sawada Masahiko, Peter Smith, Dilip Kumar, Shi yu, Kuroda Hayato, Shveta Mallik Discussion: https://postgr.es/m/CAA4eK1+wyN6zpaHUkCLorEWNx75MG0xhMwcFhvjqm2KURZEAGw@mail.gmail.com
1 parent 5687e78 commit 216a784

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

58 files changed

+4511
-759
lines changed

doc/src/sgml/catalogs.sgml

+8-3
Original file line numberDiff line numberDiff line change
@@ -7913,11 +7913,16 @@ SCRAM-SHA-256$<replaceable>&lt;iteration count&gt;</replaceable>:<replaceable>&l
79137913

79147914
<row>
79157915
<entry role="catalog_table_entry"><para role="column_definition">
7916-
<structfield>substream</structfield> <type>bool</type>
7916+
<structfield>substream</structfield> <type>char</type>
79177917
</para>
79187918
<para>
7919-
If true, the subscription will allow streaming of in-progress
7920-
transactions
7919+
Controls how to handle the streaming of in-progress transactions:
7920+
<literal>f</literal> = disallow streaming of in-progress transactions,
7921+
<literal>t</literal> = spill the changes of in-progress transactions to
7922+
disk and apply at once after the transaction is committed on the
7923+
publisher and received by the subscriber,
7924+
<literal>p</literal> = apply changes directly using a parallel apply
7925+
worker if available (same as 't' if no worker is available)
79217926
</para></entry>
79227927
</row>
79237928

doc/src/sgml/config.sgml

+27-1
Original file line numberDiff line numberDiff line change
@@ -4968,7 +4968,8 @@ ANY <replaceable class="parameter">num_sync</replaceable> ( <replaceable class="
49684968
<listitem>
49694969
<para>
49704970
Specifies maximum number of logical replication workers. This includes
4971-
both apply workers and table synchronization workers.
4971+
leader apply workers, parallel apply workers, and table synchronization
4972+
workers.
49724973
</para>
49734974
<para>
49744975
Logical replication workers are taken from the pool defined by
@@ -5008,6 +5009,31 @@ ANY <replaceable class="parameter">num_sync</replaceable> ( <replaceable class="
50085009
</listitem>
50095010
</varlistentry>
50105011

5012+
<varlistentry id="guc-max-parallel-apply-workers-per-subscription" xreflabel="max_parallel_apply_workers_per_subscription">
5013+
<term><varname>max_parallel_apply_workers_per_subscription</varname> (<type>integer</type>)
5014+
<indexterm>
5015+
<primary><varname>max_parallel_apply_workers_per_subscription</varname> configuration parameter</primary>
5016+
</indexterm>
5017+
</term>
5018+
<listitem>
5019+
<para>
5020+
Maximum number of parallel apply workers per subscription. This
5021+
parameter controls the amount of parallelism for streaming of
5022+
in-progress transactions with subscription parameter
5023+
<literal>streaming = parallel</literal>.
5024+
</para>
5025+
<para>
5026+
The parallel apply workers are taken from the pool defined by
5027+
<varname>max_logical_replication_workers</varname>.
5028+
</para>
5029+
<para>
5030+
The default value is 2. This parameter can only be set in the
5031+
<filename>postgresql.conf</filename> file or on the server command
5032+
line.
5033+
</para>
5034+
</listitem>
5035+
</varlistentry>
5036+
50115037
</variablelist>
50125038
</sect2>
50135039

doc/src/sgml/logical-replication.sgml

+20-2
Original file line numberDiff line numberDiff line change
@@ -1501,6 +1501,16 @@ CONTEXT: processing remote data for replication origin "pg_16395" during "INSER
15011501
might not violate any constraint. This can easily make the subscriber
15021502
inconsistent.
15031503
</para>
1504+
1505+
<para>
1506+
When the streaming mode is <literal>parallel</literal>, the finish LSN of
1507+
failed transactions may not be logged. In that case, it may be necessary to
1508+
change the streaming mode to <literal>on</literal> or <literal>off</literal> and
1509+
cause the same conflicts again so the finish LSN of the failed transaction will
1510+
be written to the server log. For the usage of finish LSN, please refer to <link
1511+
linkend="sql-altersubscription"><command>ALTER SUBSCRIPTION ...
1512+
SKIP</command></link>.
1513+
</para>
15041514
</sect1>
15051515

15061516
<sect1 id="logical-replication-restrictions">
@@ -1809,8 +1819,9 @@ CONTEXT: processing remote data for replication origin "pg_16395" during "INSER
18091819

18101820
<para>
18111821
<link linkend="guc-max-logical-replication-workers"><varname>max_logical_replication_workers</varname></link>
1812-
must be set to at least the number of subscriptions (for apply workers), plus
1813-
some reserve for the table synchronization workers.
1822+
must be set to at least the number of subscriptions (for leader apply
1823+
workers), plus some reserve for the table synchronization workers and
1824+
parallel apply workers.
18141825
</para>
18151826

18161827
<para>
@@ -1827,6 +1838,13 @@ CONTEXT: processing remote data for replication origin "pg_16395" during "INSER
18271838
subscription initialization or when new tables are added.
18281839
</para>
18291840

1841+
<para>
1842+
<link linkend="guc-max-parallel-apply-workers-per-subscription"><varname>max_parallel_apply_workers_per_subscription</varname></link>
1843+
controls the amount of parallelism for streaming of in-progress
1844+
transactions with subscription parameter
1845+
<literal>streaming = parallel</literal>.
1846+
</para>
1847+
18301848
<para>
18311849
Logical replication workers are also affected by
18321850
<link linkend="guc-wal-receiver-timeout"><varname>wal_receiver_timeout</varname></link>,

doc/src/sgml/monitoring.sgml

+5
Original file line numberDiff line numberDiff line change
@@ -1858,6 +1858,11 @@ postgres 27093 0.0 0.0 30096 2752 ? Ss 11:34 0:00 postgres: ser
18581858
<entry><literal>advisory</literal></entry>
18591859
<entry>Waiting to acquire an advisory user lock.</entry>
18601860
</row>
1861+
<row>
1862+
<entry><literal>applytransaction</literal></entry>
1863+
<entry>Waiting to acquire a lock on a remote transaction being applied
1864+
by a logical replication subscriber.</entry>
1865+
</row>
18611866
<row>
18621867
<entry><literal>extend</literal></entry>
18631868
<entry>Waiting to extend a relation.</entry>

doc/src/sgml/protocol.sgml

+28-1
Original file line numberDiff line numberDiff line change
@@ -3103,7 +3103,7 @@ psql "dbname=postgres replication=database" -c "IDENTIFY_SYSTEM;"
31033103
<listitem>
31043104
<para>
31053105
Protocol version. Currently versions <literal>1</literal>, <literal>2</literal>,
3106-
and <literal>3</literal> are supported.
3106+
<literal>3</literal>, and <literal>4</literal> are supported.
31073107
</para>
31083108
<para>
31093109
Version <literal>2</literal> is supported only for server version 14
@@ -3113,6 +3113,11 @@ psql "dbname=postgres replication=database" -c "IDENTIFY_SYSTEM;"
31133113
Version <literal>3</literal> is supported only for server version 15
31143114
and above, and it allows streaming of two-phase commits.
31153115
</para>
3116+
<para>
3117+
Version <literal>4</literal> is supported only for server version 16
3118+
and above, and it allows streams of large in-progress transactions to
3119+
be applied in parallel.
3120+
</para>
31163121
</listitem>
31173122
</varlistentry>
31183123

@@ -6883,6 +6888,28 @@ psql "dbname=postgres replication=database" -c "IDENTIFY_SYSTEM;"
68836888
</para>
68846889
</listitem>
68856890
</varlistentry>
6891+
6892+
<varlistentry>
6893+
<term>Int64 (XLogRecPtr)</term>
6894+
<listitem>
6895+
<para>
6896+
The LSN of the abort. This field is available since protocol version
6897+
4.
6898+
</para>
6899+
</listitem>
6900+
</varlistentry>
6901+
6902+
<varlistentry>
6903+
<term>Int64 (TimestampTz)</term>
6904+
<listitem>
6905+
<para>
6906+
Abort timestamp of the transaction. The value is in number
6907+
of microseconds since PostgreSQL epoch (2000-01-01). This field is
6908+
available since protocol version 4.
6909+
</para>
6910+
</listitem>
6911+
</varlistentry>
6912+
68866913
</variablelist>
68876914
</listitem>
68886915
</varlistentry>

doc/src/sgml/ref/create_subscription.sgml

+20-4
Original file line numberDiff line numberDiff line change
@@ -228,13 +228,29 @@ CREATE SUBSCRIPTION <replaceable class="parameter">subscription_name</replaceabl
228228
</varlistentry>
229229

230230
<varlistentry>
231-
<term><literal>streaming</literal> (<type>boolean</type>)</term>
231+
<term><literal>streaming</literal> (<type>enum</type>)</term>
232232
<listitem>
233233
<para>
234234
Specifies whether to enable streaming of in-progress transactions
235-
for this subscription. By default, all transactions
236-
are fully decoded on the publisher and only then sent to the
237-
subscriber as a whole.
235+
for this subscription. The default value is <literal>off</literal>,
236+
meaning all transactions are fully decoded on the publisher and only
237+
then sent to the subscriber as a whole.
238+
</para>
239+
240+
<para>
241+
If set to <literal>on</literal>, the incoming changes are written to
242+
temporary files and then applied only after the transaction is
243+
committed on the publisher and received by the subscriber.
244+
</para>
245+
246+
<para>
247+
If set to <literal>parallel</literal>, incoming changes are directly
248+
applied via one of the parallel apply workers, if available. If no
249+
parallel apply worker is free to handle streaming transactions then
250+
the changes are written to temporary files and applied after the
251+
transaction is committed. Note that if an error happens in a
252+
parallel apply worker, the finish LSN of the remote transaction
253+
might not be reported in the server log.
238254
</para>
239255
</listitem>
240256
</varlistentry>

doc/src/sgml/system-views.sgml

+12-2
Original file line numberDiff line numberDiff line change
@@ -1379,8 +1379,9 @@
13791379
<literal>virtualxid</literal>,
13801380
<literal>spectoken</literal>,
13811381
<literal>object</literal>,
1382-
<literal>userlock</literal>, or
1383-
<literal>advisory</literal>.
1382+
<literal>userlock</literal>,
1383+
<literal>advisory</literal>, or
1384+
<literal>applytransaction</literal>.
13841385
(See also <xref linkend="wait-event-lock-table"/>.)
13851386
</para></entry>
13861387
</row>
@@ -1594,6 +1595,15 @@
15941595
so the <structfield>database</structfield> column is meaningful for an advisory lock.
15951596
</para>
15961597

1598+
<para>
1599+
Apply transaction locks are used in parallel mode to apply the transaction
1600+
in logical replication. The remote transaction id is displayed in the
1601+
<structfield>transactionid</structfield> column. The <structfield>objsubid</structfield>
1602+
displays the lock subtype which is 0 for the lock used to synchronize the
1603+
set of changes, and 1 for the lock used to wait for the transaction to
1604+
finish to ensure commit order.
1605+
</para>
1606+
15971607
<para>
15981608
<structname>pg_locks</structname> provides a global view of all locks
15991609
in the database cluster, not only those relevant to the current database.

src/backend/access/transam/xact.c

+18-6
Original file line numberDiff line numberDiff line change
@@ -1713,6 +1713,7 @@ RecordTransactionAbort(bool isSubXact)
17131713
int nchildren;
17141714
TransactionId *children;
17151715
TimestampTz xact_time;
1716+
bool replorigin;
17161717

17171718
/*
17181719
* If we haven't been assigned an XID, nobody will care whether we aborted
@@ -1743,6 +1744,13 @@ RecordTransactionAbort(bool isSubXact)
17431744
elog(PANIC, "cannot abort transaction %u, it was already committed",
17441745
xid);
17451746

1747+
/*
1748+
* Are we using the replication origins feature? Or, in other words, are
1749+
* we replaying remote actions?
1750+
*/
1751+
replorigin = (replorigin_session_origin != InvalidRepOriginId &&
1752+
replorigin_session_origin != DoNotReplicateId);
1753+
17461754
/* Fetch the data we need for the abort record */
17471755
nrels = smgrGetPendingDeletes(false, &rels);
17481756
nchildren = xactGetCommittedChildren(&children);
@@ -1766,6 +1774,11 @@ RecordTransactionAbort(bool isSubXact)
17661774
MyXactFlags, InvalidTransactionId,
17671775
NULL);
17681776

1777+
if (replorigin)
1778+
/* Move LSNs forward for this replication origin */
1779+
replorigin_session_advance(replorigin_session_origin_lsn,
1780+
XactLastRecEnd);
1781+
17691782
/*
17701783
* Report the latest async abort LSN, so that the WAL writer knows to
17711784
* flush this abort. There's nothing to be gained by delaying this, since
@@ -5873,11 +5886,10 @@ XactLogAbortRecord(TimestampTz abort_time,
58735886
}
58745887

58755888
/*
5876-
* Dump transaction origin information only for abort prepared. We need
5877-
* this during recovery to update the replication origin progress.
5889+
* Dump transaction origin information. We need this during recovery to
5890+
* update the replication origin progress.
58785891
*/
5879-
if ((replorigin_session_origin != InvalidRepOriginId) &&
5880-
TransactionIdIsValid(twophase_xid))
5892+
if (replorigin_session_origin != InvalidRepOriginId)
58815893
{
58825894
xl_xinfo.xinfo |= XACT_XINFO_HAS_ORIGIN;
58835895

@@ -5934,8 +5946,8 @@ XactLogAbortRecord(TimestampTz abort_time,
59345946
if (xl_xinfo.xinfo & XACT_XINFO_HAS_ORIGIN)
59355947
XLogRegisterData((char *) (&xl_origin), sizeof(xl_xact_origin));
59365948

5937-
if (TransactionIdIsValid(twophase_xid))
5938-
XLogSetRecordFlags(XLOG_INCLUDE_ORIGIN);
5949+
/* Include the replication origin */
5950+
XLogSetRecordFlags(XLOG_INCLUDE_ORIGIN);
59395951

59405952
return XLogInsert(RM_XACT_ID, info);
59415953
}

src/backend/commands/subscriptioncmds.c

+62-5
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ typedef struct SubOpts
8585
bool copy_data;
8686
bool refresh;
8787
bool binary;
88-
bool streaming;
88+
char streaming;
8989
bool twophase;
9090
bool disableonerr;
9191
char *origin;
@@ -139,7 +139,7 @@ parse_subscription_options(ParseState *pstate, List *stmt_options,
139139
if (IsSet(supported_opts, SUBOPT_BINARY))
140140
opts->binary = false;
141141
if (IsSet(supported_opts, SUBOPT_STREAMING))
142-
opts->streaming = false;
142+
opts->streaming = LOGICALREP_STREAM_OFF;
143143
if (IsSet(supported_opts, SUBOPT_TWOPHASE_COMMIT))
144144
opts->twophase = false;
145145
if (IsSet(supported_opts, SUBOPT_DISABLE_ON_ERR))
@@ -242,7 +242,7 @@ parse_subscription_options(ParseState *pstate, List *stmt_options,
242242
errorConflictingDefElem(defel, pstate);
243243

244244
opts->specified_opts |= SUBOPT_STREAMING;
245-
opts->streaming = defGetBoolean(defel);
245+
opts->streaming = defGetStreamingMode(defel);
246246
}
247247
else if (strcmp(defel->defname, "two_phase") == 0)
248248
{
@@ -630,7 +630,7 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
630630
values[Anum_pg_subscription_subowner - 1] = ObjectIdGetDatum(owner);
631631
values[Anum_pg_subscription_subenabled - 1] = BoolGetDatum(opts.enabled);
632632
values[Anum_pg_subscription_subbinary - 1] = BoolGetDatum(opts.binary);
633-
values[Anum_pg_subscription_substream - 1] = BoolGetDatum(opts.streaming);
633+
values[Anum_pg_subscription_substream - 1] = CharGetDatum(opts.streaming);
634634
values[Anum_pg_subscription_subtwophasestate - 1] =
635635
CharGetDatum(opts.twophase ?
636636
LOGICALREP_TWOPHASE_STATE_PENDING :
@@ -1099,7 +1099,7 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
10991099
if (IsSet(opts.specified_opts, SUBOPT_STREAMING))
11001100
{
11011101
values[Anum_pg_subscription_substream - 1] =
1102-
BoolGetDatum(opts.streaming);
1102+
CharGetDatum(opts.streaming);
11031103
replaces[Anum_pg_subscription_substream - 1] = true;
11041104
}
11051105

@@ -2128,3 +2128,60 @@ merge_publications(List *oldpublist, List *newpublist, bool addpub, const char *
21282128

21292129
return oldpublist;
21302130
}
2131+
2132+
/*
2133+
* Extract the streaming mode value from a DefElem. This is like
2134+
* defGetBoolean() but also accepts the special value of "parallel".
2135+
*/
2136+
char
2137+
defGetStreamingMode(DefElem *def)
2138+
{
2139+
/*
2140+
* If no parameter value given, assume "true" is meant.
2141+
*/
2142+
if (!def->arg)
2143+
return LOGICALREP_STREAM_ON;
2144+
2145+
/*
2146+
* Allow 0, 1, "false", "true", "off", "on" or "parallel".
2147+
*/
2148+
switch (nodeTag(def->arg))
2149+
{
2150+
case T_Integer:
2151+
switch (intVal(def->arg))
2152+
{
2153+
case 0:
2154+
return LOGICALREP_STREAM_OFF;
2155+
case 1:
2156+
return LOGICALREP_STREAM_ON;
2157+
default:
2158+
/* otherwise, error out below */
2159+
break;
2160+
}
2161+
break;
2162+
default:
2163+
{
2164+
char *sval = defGetString(def);
2165+
2166+
/*
2167+
* The set of strings accepted here should match up with the
2168+
* grammar's opt_boolean_or_string production.
2169+
*/
2170+
if (pg_strcasecmp(sval, "false") == 0 ||
2171+
pg_strcasecmp(sval, "off") == 0)
2172+
return LOGICALREP_STREAM_OFF;
2173+
if (pg_strcasecmp(sval, "true") == 0 ||
2174+
pg_strcasecmp(sval, "on") == 0)
2175+
return LOGICALREP_STREAM_ON;
2176+
if (pg_strcasecmp(sval, "parallel") == 0)
2177+
return LOGICALREP_STREAM_PARALLEL;
2178+
}
2179+
break;
2180+
}
2181+
2182+
ereport(ERROR,
2183+
(errcode(ERRCODE_SYNTAX_ERROR),
2184+
errmsg("%s requires a Boolean value or \"parallel\"",
2185+
def->defname)));
2186+
return LOGICALREP_STREAM_OFF; /* keep compiler quiet */
2187+
}

0 commit comments

Comments
 (0)