summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAmit Kapila2023-01-18 03:33:12 +0000
committerAmit Kapila2023-01-18 03:33:12 +0000
commitd540a02a724b9643205abce8c5644a0f0908f6e3 (patch)
treebb5103b31bfcbfb162bb1544c3e4766d66d13780
parent14bdb3f13de16523609d838b725540af5e23ddd3 (diff)
Display the leader apply worker's PID for parallel apply workers.
Add leader_pid to pg_stat_subscription. leader_pid is the process ID of the leader apply worker if this process is a parallel apply worker. If this field is NULL, it indicates that the process is a leader apply worker or a synchronization worker. The new column makes it easier to distinguish parallel apply workers from other kinds of workers and helps to identify the leader for the parallel workers corresponding to a particular subscription. Additionally, update the leader_pid column in pg_stat_activity as well to display the PID of the leader apply worker for parallel apply workers. Author: Hou Zhijie Reviewed-by: Peter Smith, Sawada Masahiko, Amit Kapila, Shveta Mallik Discussion: https://postgr.es/m/CAA4eK1+wyN6zpaHUkCLorEWNx75MG0xhMwcFhvjqm2KURZEAGw@mail.gmail.com
-rw-r--r--doc/src/sgml/logical-replication.sgml3
-rw-r--r--doc/src/sgml/monitoring.sgml36
-rw-r--r--src/backend/catalog/system_views.sql1
-rw-r--r--src/backend/replication/logical/applyparallelworker.c6
-rw-r--r--src/backend/replication/logical/launcher.c64
-rw-r--r--src/backend/utils/adt/pgstatfuncs.c21
-rw-r--r--src/include/catalog/catversion.h2
-rw-r--r--src/include/catalog/pg_proc.dat6
-rw-r--r--src/include/replication/logicallauncher.h2
-rw-r--r--src/include/replication/worker_internal.h4
-rw-r--r--src/test/regress/expected/rules.out3
11 files changed, 105 insertions, 43 deletions
diff --git a/doc/src/sgml/logical-replication.sgml b/doc/src/sgml/logical-replication.sgml
index 54f48be87f3..f4b4e641beb 100644
--- a/doc/src/sgml/logical-replication.sgml
+++ b/doc/src/sgml/logical-replication.sgml
@@ -1692,7 +1692,8 @@ CONTEXT: processing remote data for replication origin "pg_16395" during "INSER
subscription. A disabled subscription or a crashed subscription will have
zero rows in this view. If the initial data synchronization of any
table is in progress, there will be additional workers for the tables
- being synchronized.
+ being synchronized. Moreover, if the streaming transaction is applied in
+ parallel, there may be additional parallel apply workers.
</para>
</sect1>
diff --git a/doc/src/sgml/monitoring.sgml b/doc/src/sgml/monitoring.sgml
index 358d2ff90f8..e3a783abd0f 100644
--- a/doc/src/sgml/monitoring.sgml
+++ b/doc/src/sgml/monitoring.sgml
@@ -743,9 +743,11 @@ postgres 27093 0.0 0.0 30096 2752 ? Ss 11:34 0:00 postgres: ser
<structfield>leader_pid</structfield> <type>integer</type>
</para>
<para>
- Process ID of the parallel group leader, if this process is a
- parallel query worker. <literal>NULL</literal> if this process is a
- parallel group leader or does not participate in parallel query.
+ Process ID of the parallel group leader if this process is a parallel
+ query worker, or process ID of the leader apply worker if this process
+ is a parallel apply worker. <literal>NULL</literal> indicates that this
+ process is a parallel group leader or leader apply worker, or does not
+ participate in any parallel operation.
</para></entry>
</row>
@@ -3208,11 +3210,22 @@ SELECT pid, wait_event_type, wait_event FROM pg_stat_activity WHERE wait_event i
<row>
<entry role="catalog_table_entry"><para role="column_definition">
+ <structfield>leader_pid</structfield> <type>integer</type>
+ </para>
+ <para>
+ Process ID of the leader apply worker if this process is a parallel
+ apply worker; NULL if this process is a leader apply worker or a
+ synchronization worker
+ </para></entry>
+ </row>
+
+ <row>
+ <entry role="catalog_table_entry"><para role="column_definition">
<structfield>relid</structfield> <type>oid</type>
</para>
<para>
- OID of the relation that the worker is synchronizing; null for the
- main apply worker
+ OID of the relation that the worker is synchronizing; NULL for the
+ leader apply worker and parallel apply workers
</para></entry>
</row>
@@ -3222,7 +3235,7 @@ SELECT pid, wait_event_type, wait_event FROM pg_stat_activity WHERE wait_event i
</para>
<para>
Last write-ahead log location received, the initial value of
- this field being 0
+ this field being 0; NULL for parallel apply workers
</para></entry>
</row>
@@ -3231,7 +3244,8 @@ SELECT pid, wait_event_type, wait_event FROM pg_stat_activity WHERE wait_event i
<structfield>last_msg_send_time</structfield> <type>timestamp with time zone</type>
</para>
<para>
- Send time of last message received from origin WAL sender
+ Send time of last message received from origin WAL sender; NULL for
+ parallel apply workers
</para></entry>
</row>
@@ -3240,7 +3254,8 @@ SELECT pid, wait_event_type, wait_event FROM pg_stat_activity WHERE wait_event i
<structfield>last_msg_receipt_time</structfield> <type>timestamp with time zone</type>
</para>
<para>
- Receipt time of last message received from origin WAL sender
+ Receipt time of last message received from origin WAL sender; NULL for
+ parallel apply workers
</para></entry>
</row>
@@ -3249,7 +3264,8 @@ SELECT pid, wait_event_type, wait_event FROM pg_stat_activity WHERE wait_event i
<structfield>latest_end_lsn</structfield> <type>pg_lsn</type>
</para>
<para>
- Last write-ahead log location reported to origin WAL sender
+ Last write-ahead log location reported to origin WAL sender; NULL for
+ parallel apply workers
</para></entry>
</row>
@@ -3259,7 +3275,7 @@ SELECT pid, wait_event_type, wait_event FROM pg_stat_activity WHERE wait_event i
</para>
<para>
Time of last write-ahead log location reported to origin WAL
- sender
+ sender; NULL for parallel apply workers
</para></entry>
</row>
</tbody>
diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql
index d2a8c82900e..8608e3fa5b1 100644
--- a/src/backend/catalog/system_views.sql
+++ b/src/backend/catalog/system_views.sql
@@ -948,6 +948,7 @@ CREATE VIEW pg_stat_subscription AS
su.oid AS subid,
su.subname,
st.pid,
+ st.leader_pid,
st.relid,
st.received_lsn,
st.last_msg_send_time,
diff --git a/src/backend/replication/logical/applyparallelworker.c b/src/backend/replication/logical/applyparallelworker.c
index 3dfcff2798f..3579e704fe5 100644
--- a/src/backend/replication/logical/applyparallelworker.c
+++ b/src/backend/replication/logical/applyparallelworker.c
@@ -849,7 +849,7 @@ LogicalParallelApplyLoop(shm_mq_handle *mqh)
static void
pa_shutdown(int code, Datum arg)
{
- SendProcSignal(MyLogicalRepWorker->apply_leader_pid,
+ SendProcSignal(MyLogicalRepWorker->leader_pid,
PROCSIG_PARALLEL_APPLY_MESSAGE,
InvalidBackendId);
@@ -932,7 +932,7 @@ ParallelApplyWorkerMain(Datum main_arg)
error_mqh = shm_mq_attach(mq, seg, NULL);
pq_redirect_to_shm_mq(seg, error_mqh);
- pq_set_parallel_leader(MyLogicalRepWorker->apply_leader_pid,
+ pq_set_parallel_leader(MyLogicalRepWorker->leader_pid,
InvalidBackendId);
MyLogicalRepWorker->last_send_time = MyLogicalRepWorker->last_recv_time =
@@ -950,7 +950,7 @@ ParallelApplyWorkerMain(Datum main_arg)
* The parallel apply worker doesn't need to monopolize this replication
* origin which was already acquired by its leader process.
*/
- replorigin_session_setup(originid, MyLogicalRepWorker->apply_leader_pid);
+ replorigin_session_setup(originid, MyLogicalRepWorker->leader_pid);
replorigin_session_origin = originid;
CommitTransactionCommand();
diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c
index afb7acddaa6..27e58566cec 100644
--- a/src/backend/replication/logical/launcher.c
+++ b/src/backend/replication/logical/launcher.c
@@ -410,7 +410,7 @@ retry:
worker->relstate = SUBREL_STATE_UNKNOWN;
worker->relstate_lsn = InvalidXLogRecPtr;
worker->stream_fileset = NULL;
- worker->apply_leader_pid = is_parallel_apply_worker ? MyProcPid : InvalidPid;
+ worker->leader_pid = is_parallel_apply_worker ? MyProcPid : InvalidPid;
worker->parallel_apply = is_parallel_apply_worker;
worker->last_lsn = InvalidXLogRecPtr;
TIMESTAMP_NOBEGIN(worker->last_send_time);
@@ -732,7 +732,7 @@ logicalrep_worker_cleanup(LogicalRepWorker *worker)
worker->userid = InvalidOid;
worker->subid = InvalidOid;
worker->relid = InvalidOid;
- worker->apply_leader_pid = InvalidPid;
+ worker->leader_pid = InvalidPid;
worker->parallel_apply = false;
}
@@ -1067,12 +1067,40 @@ IsLogicalLauncher(void)
}
/*
+ * Return the pid of the leader apply worker if the given pid is the pid of a
+ * parallel apply worker, otherwise, return InvalidPid.
+ */
+pid_t
+GetLeaderApplyWorkerPid(pid_t pid)
+{
+ int leader_pid = InvalidPid;
+ int i;
+
+ LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
+
+ for (i = 0; i < max_logical_replication_workers; i++)
+ {
+ LogicalRepWorker *w = &LogicalRepCtx->workers[i];
+
+ if (isParallelApplyWorker(w) && w->proc && pid == w->proc->pid)
+ {
+ leader_pid = w->leader_pid;
+ break;
+ }
+ }
+
+ LWLockRelease(LogicalRepWorkerLock);
+
+ return leader_pid;
+}
+
+/*
* Returns state of the subscriptions.
*/
Datum
pg_stat_get_subscription(PG_FUNCTION_ARGS)
{
-#define PG_STAT_GET_SUBSCRIPTION_COLS 8
+#define PG_STAT_GET_SUBSCRIPTION_COLS 9
Oid subid = PG_ARGISNULL(0) ? InvalidOid : PG_GETARG_OID(0);
int i;
ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
@@ -1098,10 +1126,6 @@ pg_stat_get_subscription(PG_FUNCTION_ARGS)
if (OidIsValid(subid) && worker.subid != subid)
continue;
- /* Skip if this is a parallel apply worker */
- if (isParallelApplyWorker(&worker))
- continue;
-
worker_pid = worker.proc->pid;
values[0] = ObjectIdGetDatum(worker.subid);
@@ -1110,26 +1134,32 @@ pg_stat_get_subscription(PG_FUNCTION_ARGS)
else
nulls[1] = true;
values[2] = Int32GetDatum(worker_pid);
- if (XLogRecPtrIsInvalid(worker.last_lsn))
+
+ if (isParallelApplyWorker(&worker))
+ values[3] = Int32GetDatum(worker.leader_pid);
+ else
nulls[3] = true;
+
+ if (XLogRecPtrIsInvalid(worker.last_lsn))
+ nulls[4] = true;
else
- values[3] = LSNGetDatum(worker.last_lsn);
+ values[4] = LSNGetDatum(worker.last_lsn);
if (worker.last_send_time == 0)
- nulls[4] = true;
+ nulls[5] = true;
else
- values[4] = TimestampTzGetDatum(worker.last_send_time);
+ values[5] = TimestampTzGetDatum(worker.last_send_time);
if (worker.last_recv_time == 0)
- nulls[5] = true;
+ nulls[6] = true;
else
- values[5] = TimestampTzGetDatum(worker.last_recv_time);
+ values[6] = TimestampTzGetDatum(worker.last_recv_time);
if (XLogRecPtrIsInvalid(worker.reply_lsn))
- nulls[6] = true;
+ nulls[7] = true;
else
- values[6] = LSNGetDatum(worker.reply_lsn);
+ values[7] = LSNGetDatum(worker.reply_lsn);
if (worker.reply_time == 0)
- nulls[7] = true;
+ nulls[8] = true;
else
- values[7] = TimestampTzGetDatum(worker.reply_time);
+ values[8] = TimestampTzGetDatum(worker.reply_time);
tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc,
values, nulls);
diff --git a/src/backend/utils/adt/pgstatfuncs.c b/src/backend/utils/adt/pgstatfuncs.c
index 58bd1360b97..67374934022 100644
--- a/src/backend/utils/adt/pgstatfuncs.c
+++ b/src/backend/utils/adt/pgstatfuncs.c
@@ -25,6 +25,7 @@
#include "pgstat.h"
#include "postmaster/bgworker_internals.h"
#include "postmaster/postmaster.h"
+#include "replication/logicallauncher.h"
#include "storage/proc.h"
#include "storage/procarray.h"
#include "utils/acl.h"
@@ -409,9 +410,9 @@ pg_stat_get_activity(PG_FUNCTION_ARGS)
/*
* If a PGPROC entry was retrieved, display wait events and lock
- * group leader information if any. To avoid extra overhead, no
- * extra lock is being held, so there is no guarantee of
- * consistency across multiple rows.
+ * group leader or apply leader information if any. To avoid
+ * extra overhead, no extra lock is being held, so there is no
+ * guarantee of consistency across multiple rows.
*/
if (proc != NULL)
{
@@ -426,14 +427,24 @@ pg_stat_get_activity(PG_FUNCTION_ARGS)
/*
* Show the leader only for active parallel workers. This
- * leaves the field as NULL for the leader of a parallel
- * group.
+ * leaves the field as NULL for the leader of a parallel group
+ * or the leader of parallel apply workers.
*/
if (leader && leader->pid != beentry->st_procpid)
{
values[28] = Int32GetDatum(leader->pid);
nulls[28] = false;
}
+ else if (beentry->st_backendType == B_BG_WORKER)
+ {
+ int leader_pid = GetLeaderApplyWorkerPid(beentry->st_procpid);
+
+ if (leader_pid != InvalidPid)
+ {
+ values[28] = Int32GetDatum(leader_pid);
+ nulls[28] = false;
+ }
+ }
}
if (wait_event_type)
diff --git a/src/include/catalog/catversion.h b/src/include/catalog/catversion.h
index 2afed46b899..fe35d7c76ae 100644
--- a/src/include/catalog/catversion.h
+++ b/src/include/catalog/catversion.h
@@ -57,6 +57,6 @@
*/
/* yyyymmddN */
-#define CATALOG_VERSION_NO 202301131
+#define CATALOG_VERSION_NO 202301181
#endif
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index 3810de7b22d..86eb8e8c58a 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -5430,9 +5430,9 @@
proname => 'pg_stat_get_subscription', prorows => '10', proisstrict => 'f',
proretset => 't', provolatile => 's', proparallel => 'r',
prorettype => 'record', proargtypes => 'oid',
- proallargtypes => '{oid,oid,oid,int4,pg_lsn,timestamptz,timestamptz,pg_lsn,timestamptz}',
- proargmodes => '{i,o,o,o,o,o,o,o,o}',
- proargnames => '{subid,subid,relid,pid,received_lsn,last_msg_send_time,last_msg_receipt_time,latest_end_lsn,latest_end_time}',
+ proallargtypes => '{oid,oid,oid,int4,int4,pg_lsn,timestamptz,timestamptz,pg_lsn,timestamptz}',
+ proargmodes => '{i,o,o,o,o,o,o,o,o,o}',
+ proargnames => '{subid,subid,relid,pid,leader_pid,received_lsn,last_msg_send_time,last_msg_receipt_time,latest_end_lsn,latest_end_time}',
prosrc => 'pg_stat_get_subscription' },
{ oid => '2026', descr => 'statistics: current backend PID',
proname => 'pg_backend_pid', provolatile => 's', proparallel => 'r',
diff --git a/src/include/replication/logicallauncher.h b/src/include/replication/logicallauncher.h
index c85593ad113..360e98702a8 100644
--- a/src/include/replication/logicallauncher.h
+++ b/src/include/replication/logicallauncher.h
@@ -27,4 +27,6 @@ extern void AtEOXact_ApplyLauncher(bool isCommit);
extern bool IsLogicalLauncher(void);
+extern pid_t GetLeaderApplyWorkerPid(pid_t pid);
+
#endif /* LOGICALLAUNCHER_H */
diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h
index db891eea8ae..dc87a4edd13 100644
--- a/src/include/replication/worker_internal.h
+++ b/src/include/replication/worker_internal.h
@@ -71,7 +71,7 @@ typedef struct LogicalRepWorker
* PID of leader apply worker if this slot is used for a parallel apply
* worker, InvalidPid otherwise.
*/
- pid_t apply_leader_pid;
+ pid_t leader_pid;
/* Indicates whether apply can be performed in parallel. */
bool parallel_apply;
@@ -303,7 +303,7 @@ extern void pa_decr_and_wait_stream_block(void);
extern void pa_xact_finish(ParallelApplyWorkerInfo *winfo,
XLogRecPtr remote_lsn);
-#define isParallelApplyWorker(worker) ((worker)->apply_leader_pid != InvalidPid)
+#define isParallelApplyWorker(worker) ((worker)->leader_pid != InvalidPid)
static inline bool
am_tablesync_worker(void)
diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out
index a969ae63eb5..05eec2adfd2 100644
--- a/src/test/regress/expected/rules.out
+++ b/src/test/regress/expected/rules.out
@@ -2094,6 +2094,7 @@ pg_stat_ssl| SELECT s.pid,
pg_stat_subscription| SELECT su.oid AS subid,
su.subname,
st.pid,
+ st.leader_pid,
st.relid,
st.received_lsn,
st.last_msg_send_time,
@@ -2101,7 +2102,7 @@ pg_stat_subscription| SELECT su.oid AS subid,
st.latest_end_lsn,
st.latest_end_time
FROM (pg_subscription su
- LEFT JOIN pg_stat_get_subscription(NULL::oid) st(subid, relid, pid, received_lsn, last_msg_send_time, last_msg_receipt_time, latest_end_lsn, latest_end_time) ON ((st.subid = su.oid)));
+ LEFT JOIN pg_stat_get_subscription(NULL::oid) st(subid, relid, pid, leader_pid, received_lsn, last_msg_send_time, last_msg_receipt_time, latest_end_lsn, latest_end_time) ON ((st.subid = su.oid)));
pg_stat_subscription_stats| SELECT ss.subid,
s.subname,
ss.apply_error_count,