From d1e5d6c33e193626f05911462e66a6c96366bfa6 Mon Sep 17 00:00:00 2001
From: Jehan-Guillaume de Rorthais <jgdr@dalibo.com>
Date: Tue, 31 Dec 2019 18:29:13 +0100
Subject: [PATCH] Always expose available stats from wal receiver

Makes admin function pg_stat_get_wal_receiver() return available data
from WalRcv in shared memory, whatever the state of the wal receiver
process.

This allows supervision or HA tools to gather various physical
replication stats even when the wal receiver is stopped. For example,
the latest timeline the wal receiver was receiving before shutting
down.

The behavior of the pg_stat_wal_receiver view has been kept to avoid
regressions: it returns no row when the wal receiver is shut down.
---
 src/backend/replication/walreceiver.c      | 61 ++++++++++++----------
 src/include/catalog/pg_proc.dat            |  6 +--
 src/test/recovery/t/004_timeline_switch.pl | 12 ++++-
 3 files changed, 48 insertions(+), 31 deletions(-)

diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c
index a5e85d32f3..e4273b7f55 100644
--- a/src/backend/replication/walreceiver.c
+++ b/src/backend/replication/walreceiver.c
@@ -226,7 +226,6 @@ WalReceiverMain(void)
 	}
 	/* Advertise our PID so that the startup process can kill us */
 	walrcv->pid = MyProcPid;
-	walrcv->walRcvState = WALRCV_STREAMING;
 
 	/* Fetch information required to start streaming */
 	walrcv->ready_to_display = false;
@@ -295,6 +294,7 @@ WalReceiverMain(void)
 		strlcpy((char *) walrcv->sender_host, sender_host, NI_MAXHOST);
 
 	walrcv->sender_port = sender_port;
+	walrcv->walRcvState = WALRCV_STREAMING;
 	walrcv->ready_to_display = true;
 	SpinLockRelease(&walrcv->mutex);
 
@@ -1368,6 +1368,8 @@ pg_stat_get_wal_receiver(PG_FUNCTION_ARGS)
 	TimeLineID	receive_start_tli;
 	XLogRecPtr	received_lsn;
 	TimeLineID	received_tli;
+	XLogRecPtr	applied_lsn;
+	TimeLineID	applied_tli;
 	TimestampTz last_send_time;
 	TimestampTz last_receipt_time;
 	XLogRecPtr	latest_end_lsn;
@@ -1379,6 +1381,7 @@ pg_stat_get_wal_receiver(PG_FUNCTION_ARGS)
 
 	/* Take a lock to ensure value consistency */
 	SpinLockAcquire(&WalRcv->mutex);
+	applied_lsn = GetXLogReplayRecPtr(&applied_tli);
 	pid = (int) WalRcv->pid;
 	ready_to_display = WalRcv->ready_to_display;
 	state = WalRcv->walRcvState;
@@ -1396,13 +1399,6 @@ pg_stat_get_wal_receiver(PG_FUNCTION_ARGS)
 	strlcpy(conninfo, (char *) WalRcv->conninfo, sizeof(conninfo));
 	SpinLockRelease(&WalRcv->mutex);
 
-	/*
-	 * No WAL receiver (or not ready yet), just return a tuple with NULL
-	 * values
-	 */
-	if (pid == 0 || !ready_to_display)
-		PG_RETURN_NULL();
-
 	/* determine result type */
 	if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
 		elog(ERROR, "return type must be a row type");
@@ -1411,7 +1407,10 @@ pg_stat_get_wal_receiver(PG_FUNCTION_ARGS)
 	nulls = palloc0(sizeof(bool) * tupdesc->natts);
 
 	/* Fetch values */
-	values[0] = Int32GetDatum(pid);
+	if (pid == 0)
+		nulls[0] = true;
+	else
+		values[0] = Int32GetDatum(pid);
 
 	if (!is_member_of_role(GetUserId(), DEFAULT_ROLE_READ_ALL_STATS))
 	{
@@ -1436,38 +1435,46 @@ pg_stat_get_wal_receiver(PG_FUNCTION_ARGS)
 		else
 			values[4] = LSNGetDatum(received_lsn);
 		values[5] = Int32GetDatum(received_tli);
-		if (last_send_time == 0)
+		if (XLogRecPtrIsInvalid(applied_lsn)) {
 			nulls[6] = true;
-		else
-			values[6] = TimestampTzGetDatum(last_send_time);
-		if (last_receipt_time == 0)
 			nulls[7] = true;
-		else
-			values[7] = TimestampTzGetDatum(last_receipt_time);
-		if (XLogRecPtrIsInvalid(latest_end_lsn))
+		}
+		else {
+			values[6] = LSNGetDatum(applied_lsn);
+			values[7] = Int32GetDatum(applied_tli);
+		}
+		if (last_send_time == 0)
 			nulls[8] = true;
 		else
-			values[8] = LSNGetDatum(latest_end_lsn);
-		if (latest_end_time == 0)
+			values[8] = TimestampTzGetDatum(last_send_time);
+		if (last_receipt_time == 0)
 			nulls[9] = true;
 		else
-			values[9] = TimestampTzGetDatum(latest_end_time);
-		if (*slotname == '\0')
+			values[9] = TimestampTzGetDatum(last_receipt_time);
+		if (XLogRecPtrIsInvalid(latest_end_lsn))
 			nulls[10] = true;
 		else
-			values[10] = CStringGetTextDatum(slotname);
-		if (*sender_host == '\0')
+			values[10] = LSNGetDatum(latest_end_lsn);
+		if (latest_end_time == 0)
 			nulls[11] = true;
 		else
-			values[11] = CStringGetTextDatum(sender_host);
-		if (sender_port == 0)
+			values[11] = TimestampTzGetDatum(latest_end_time);
+		if (*slotname == '\0')
 			nulls[12] = true;
 		else
-			values[12] = Int32GetDatum(sender_port);
-		if (*conninfo == '\0')
+			values[12] = CStringGetTextDatum(slotname);
+		if (*sender_host == '\0' || !ready_to_display)
 			nulls[13] = true;
 		else
-			values[13] = CStringGetTextDatum(conninfo);
+			values[13] = CStringGetTextDatum(sender_host);
+		if (sender_port == 0 || !ready_to_display)
+			nulls[14] = true;
+		else
+			values[14] = Int32GetDatum(sender_port);
+		if (*conninfo == '\0' || !ready_to_display)
+			nulls[15] = true;
+		else
+			values[15] = CStringGetTextDatum(conninfo);
 	}
 
 	/* Returns the record as Datum */
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index fcf2a1214c..f6a6bc5b7b 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -5180,9 +5180,9 @@
 { oid => '3317', descr => 'statistics: information about WAL receiver',
   proname => 'pg_stat_get_wal_receiver', proisstrict => 'f', provolatile => 's',
   proparallel => 'r', prorettype => 'record', proargtypes => '',
-  proallargtypes => '{int4,text,pg_lsn,int4,pg_lsn,int4,timestamptz,timestamptz,pg_lsn,timestamptz,text,text,int4,text}',
-  proargmodes => '{o,o,o,o,o,o,o,o,o,o,o,o,o,o}',
-  proargnames => '{pid,status,receive_start_lsn,receive_start_tli,received_lsn,received_tli,last_msg_send_time,last_msg_receipt_time,latest_end_lsn,latest_end_time,slot_name,sender_host,sender_port,conninfo}',
+  proallargtypes => '{int4,text,pg_lsn,int4,pg_lsn,int4,pg_lsn,int4,timestamptz,timestamptz,pg_lsn,timestamptz,text,text,int4,text}',
+  proargmodes => '{o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o}',
+  proargnames => '{pid,status,receive_start_lsn,receive_start_tli,received_lsn,received_tli,applied_lsn,applied_tli,last_msg_send_time,last_msg_receipt_time,latest_end_lsn,latest_end_time,slot_name,sender_host,sender_port,conninfo}',
   prosrc => 'pg_stat_get_wal_receiver' },
 { oid => '6118', descr => 'statistics: information about subscription',
   proname => 'pg_stat_get_subscription', proisstrict => 'f', provolatile => 's',
diff --git a/src/test/recovery/t/004_timeline_switch.pl b/src/test/recovery/t/004_timeline_switch.pl
index 7e952d3667..cdcdd2d981 100644
--- a/src/test/recovery/t/004_timeline_switch.pl
+++ b/src/test/recovery/t/004_timeline_switch.pl
@@ -6,7 +6,7 @@ use warnings;
 use File::Path qw(rmtree);
 use PostgresNode;
 use TestLib;
-use Test::More tests => 2;
+use Test::More tests => 4;
 
 $ENV{PGDATABASE} = 'postgres';
 
@@ -37,6 +37,11 @@ $node_master->safe_psql('postgres',
 $node_master->wait_for_catchup($node_standby_1, 'replay',
 	$node_master->lsn('write'));
 
+# Check received timeline from pg_stat_get_wal_receiver() on standby 1
+my $node_standby_1_lsn = $node_standby_1->safe_psql('postgres',
+	'SELECT received_tli FROM pg_stat_get_wal_receiver()');
+is($node_standby_1_lsn, 1, 'check received timeline on standby 1');
+
 # Stop and remove master
 $node_master->teardown_node;
 
@@ -66,3 +71,8 @@ $node_standby_1->wait_for_catchup($node_standby_2, 'replay',
 my $result =
   $node_standby_2->safe_psql('postgres', "SELECT count(*) FROM tab_int");
 is($result, qq(2000), 'check content of standby 2');
+
+# Check received timeline from pg_stat_get_wal_receiver() on standby 2
+my $node_standby_2_lsn = $node_standby_2->safe_psql('postgres',
+	'SELECT received_tli FROM pg_stat_get_wal_receiver()');
+is($node_standby_2_lsn, 2, 'check received timeline on standby 2');
-- 
2.20.1

