From aab6a7cbb2ae7d0d181062f972d2e559bbd4cef6 Mon Sep 17 00:00:00 2001
From: Nathan Bossart <bossartn@amazon.com>
Date: Fri, 19 Nov 2021 01:04:41 +0000
Subject: [PATCH v16 1/3] Introduce archive modules infrastructure.

---
 src/backend/access/transam/xlog.c             |   2 +-
 src/backend/postmaster/pgarch.c               | 111 ++++++++++++++++--
 src/backend/postmaster/shell_archive.c        |  24 +++-
 src/backend/utils/init/miscinit.c             |   1 +
 src/backend/utils/misc/guc.c                  |  12 +-
 src/backend/utils/misc/postgresql.conf.sample |   1 +
 src/include/access/xlog.h                     |   1 -
 src/include/postmaster/pgarch.h               |  52 +++++++-
 8 files changed, 189 insertions(+), 15 deletions(-)

diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index dfe2a0bcce..958220c495 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -8831,7 +8831,7 @@ ShutdownXLOG(int code, Datum arg)
 		 * process one more time at the end of shutdown). The checkpoint
 		 * record will go to the next XLOG file and won't be archived (yet).
 		 */
-		if (XLogArchivingActive() && XLogArchiveCommandSet())
+		if (XLogArchivingActive())
 			RequestXLogSwitch(false);
 
 		CreateCheckPoint(CHECKPOINT_IS_SHUTDOWN | CHECKPOINT_IMMEDIATE);
diff --git a/src/backend/postmaster/pgarch.c b/src/backend/postmaster/pgarch.c
index 6e3fcedc97..865f1930df 100644
--- a/src/backend/postmaster/pgarch.c
+++ b/src/backend/postmaster/pgarch.c
@@ -89,6 +89,8 @@ typedef struct PgArchData
 	slock_t		arch_lck;
 } PgArchData;
 
+char *XLogArchiveLibrary = "";
+
 
 /* ----------
  * Local data
@@ -96,6 +98,8 @@ typedef struct PgArchData
  */
 static time_t last_sigterm_time = 0;
 static PgArchData *PgArch = NULL;
+static ArchiveModuleCallbacks ArchiveContext;
+
 
 /*
  * Stuff for tracking multiple files to archive from each scan of
@@ -140,6 +144,8 @@ static void pgarch_archiveDone(char *xlog);
 static void pgarch_die(int code, Datum arg);
 static void HandlePgArchInterrupts(void);
 static int ready_file_comparator(Datum a, Datum b, void *arg);
+static void LoadArchiveLibrary(void);
+static void call_archive_module_shutdown_callback(int code, Datum arg);
 
 /* Report shared memory space needed by PgArchShmemInit */
 Size
@@ -244,7 +250,16 @@ PgArchiverMain(void)
 	arch_files->arch_heap = binaryheap_allocate(NUM_FILES_PER_DIRECTORY_SCAN,
 												ready_file_comparator, NULL);
 
-	pgarch_MainLoop();
+	/* Load the archive_library. */
+	LoadArchiveLibrary();
+
+	PG_ENSURE_ERROR_CLEANUP(call_archive_module_shutdown_callback, 0);
+	{
+		pgarch_MainLoop();
+	}
+	PG_END_ENSURE_ERROR_CLEANUP(call_archive_module_shutdown_callback, 0);
+
+	call_archive_module_shutdown_callback(0, 0);
 
 	proc_exit(0);
 }
@@ -407,11 +422,12 @@ pgarch_ArchiverCopyLoop(void)
 			 */
 			HandlePgArchInterrupts();
 
-			/* can't do anything if no command ... */
-			if (!XLogArchiveCommandSet())
+			/* can't do anything if not configured ... */
+			if (ArchiveContext.check_configured_cb != NULL &&
+				!ArchiveContext.check_configured_cb())
 			{
 				ereport(WARNING,
-						(errmsg("archive_mode enabled, yet archive_command is not set")));
+						(errmsg("archive_mode enabled, yet archiving is not configured")));
 				return;
 			}
 
@@ -492,7 +508,7 @@ pgarch_ArchiverCopyLoop(void)
 /*
  * pgarch_archiveXlog
  *
- * Invokes system(3) to copy one archive file to wherever it should go
+ * Invokes archive_file_cb to copy one archive file to wherever it should go
  *
  * Returns true if successful
  */
@@ -509,7 +525,7 @@ pgarch_archiveXlog(char *xlog)
 	snprintf(activitymsg, sizeof(activitymsg), "archiving %s", xlog);
 	set_ps_display(activitymsg);
 
-	ret = shell_archive_file(xlog, pathname);
+	ret = ArchiveContext.archive_file_cb(xlog, pathname);
 	if (ret)
 		snprintf(activitymsg, sizeof(activitymsg), "last was %s", xlog);
 	else
@@ -759,13 +775,90 @@ HandlePgArchInterrupts(void)
 	if (ProcSignalBarrierPending)
 		ProcessProcSignalBarrier();
 
+	/* Perform logging of memory contexts of this process */
+	if (LogMemoryContextPending)
+		ProcessLogMemoryContextInterrupt();
+
 	if (ConfigReloadPending)
 	{
+		char	   *archiveLib = pstrdup(XLogArchiveLibrary);
+		bool		archiveLibChanged;
+
 		ConfigReloadPending = false;
 		ProcessConfigFile(PGC_SIGHUP);
+
+		archiveLibChanged = strcmp(XLogArchiveLibrary, archiveLib) != 0;
+		pfree(archiveLib);
+
+		if (archiveLibChanged)
+		{
+			/*
+			 * Call the currently loaded archive module's shutdown callback, if
+			 * one is defined.
+			 */
+			call_archive_module_shutdown_callback(0, 0);
+
+			/*
+			 * Ideally, we would simply unload the previous archive module and
+			 * load the new one, but there is presently no mechanism for
+			 * unloading a library (see the comment above
+			 * internal_unload_library()).  To deal with this, we simply restart
+			 * the archiver.  The new archive module will be loaded when the new
+			 * archiver process starts up.
+			 */
+			ereport(LOG,
+					(errmsg("restarting archiver process because value of "
+							"\"archive_library\" was changed")));
+
+			proc_exit(0);
+		}
 	}
+}
 
-	/* Perform logging of memory contexts of this process */
-	if (LogMemoryContextPending)
-		ProcessLogMemoryContextInterrupt();
+/*
+ * LoadArchiveLibrary
+ *
+ * Loads the archiving callbacks into our local ArchiveContext.
+ */
+static void
+LoadArchiveLibrary(void)
+{
+	ArchiveModuleInit archive_init;
+
+	memset(&ArchiveContext, 0, sizeof(ArchiveModuleCallbacks));
+
+	/*
+	 * If shell archiving is enabled, use our special initialization
+	 * function.  Otherwise, load the library and call its
+	 * _PG_archive_module_init().
+	 */
+	if (ShellArchivingEnabled())
+		archive_init = shell_archive_init;
+	else
+		archive_init = (ArchiveModuleInit)
+			load_external_function(XLogArchiveLibrary,
+								   "_PG_archive_module_init", false, NULL);
+
+	if (archive_init == NULL)
+		ereport(ERROR,
+				(errmsg("archive modules have to declare the "
+						"_PG_archive_module_init symbol")));
+
+	(*archive_init) (&ArchiveContext);
+
+	if (ArchiveContext.archive_file_cb == NULL)
+		ereport(ERROR,
+				(errmsg("archive modules must register an archive callback")));
+}
+
+/*
+ * call_archive_module_shutdown_callback
+ *
+ * Calls the loaded archive module's shutdown callback, if one is defined.
+ */
+static void
+call_archive_module_shutdown_callback(int code, Datum arg)
+{
+	if (ArchiveContext.shutdown_cb != NULL)
+		ArchiveContext.shutdown_cb();
 }
diff --git a/src/backend/postmaster/shell_archive.c b/src/backend/postmaster/shell_archive.c
index b54e701da4..19e240c205 100644
--- a/src/backend/postmaster/shell_archive.c
+++ b/src/backend/postmaster/shell_archive.c
@@ -2,6 +2,10 @@
  *
  * shell_archive.c
  *
+ * This archiving function uses a user-specified shell command (the
+ * archive_command GUC) to copy write-ahead log files.  It is used as the
+ * default, but other modules may define their own custom archiving logic.
+ *
  * Copyright (c) 2022, PostgreSQL Global Development Group
  *
  * IDENTIFICATION
@@ -17,7 +21,25 @@
 #include "pgstat.h"
 #include "postmaster/pgarch.h"
 
-bool
+static bool shell_archive_configured(void);
+static bool shell_archive_file(const char *file, const char *path);
+
+void
+shell_archive_init(ArchiveModuleCallbacks *cb)
+{
+	AssertVariableIsOfType(&shell_archive_init, ArchiveModuleInit);
+
+	cb->check_configured_cb = shell_archive_configured;
+	cb->archive_file_cb = shell_archive_file;
+}
+
+static bool
+shell_archive_configured(void)
+{
+	return XLogArchiveCommand[0] != '\0';
+}
+
+static bool
 shell_archive_file(const char *file, const char *path)
 {
 	char		xlogarchcmd[MAXPGPATH];
diff --git a/src/backend/utils/init/miscinit.c b/src/backend/utils/init/miscinit.c
index 0f2570d626..0868e5a24f 100644
--- a/src/backend/utils/init/miscinit.c
+++ b/src/backend/utils/init/miscinit.c
@@ -38,6 +38,7 @@
 #include "pgstat.h"
 #include "postmaster/autovacuum.h"
 #include "postmaster/interrupt.h"
+#include "postmaster/pgarch.h"
 #include "postmaster/postmaster.h"
 #include "storage/fd.h"
 #include "storage/ipc.h"
diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c
index 4c94f09c64..86b223821f 100644
--- a/src/backend/utils/misc/guc.c
+++ b/src/backend/utils/misc/guc.c
@@ -3881,13 +3881,23 @@ static struct config_string ConfigureNamesString[] =
 	{
 		{"archive_command", PGC_SIGHUP, WAL_ARCHIVING,
 			gettext_noop("Sets the shell command that will be called to archive a WAL file."),
-			NULL
+			gettext_noop("This is unused if \"archive_library\" does not indicate archiving via shell is enabled.")
 		},
 		&XLogArchiveCommand,
 		"",
 		NULL, NULL, show_archive_command
 	},
 
+	{
+		{"archive_library", PGC_SIGHUP, WAL_ARCHIVING,
+			gettext_noop("Sets the library that will be called to archive a WAL file."),
+			gettext_noop("A value of \"shell\" or an empty string indicates that \"archive_command\" should be used.")
+		},
+		&XLogArchiveLibrary,
+		"shell",
+		NULL, NULL, NULL
+	},
+
 	{
 		{"restore_command", PGC_SIGHUP, WAL_ARCHIVE_RECOVERY,
 			gettext_noop("Sets the shell command that will be called to retrieve an archived WAL file."),
diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample
index 817d5f5324..b4376d76aa 100644
--- a/src/backend/utils/misc/postgresql.conf.sample
+++ b/src/backend/utils/misc/postgresql.conf.sample
@@ -245,6 +245,7 @@
 
 #archive_mode = off		# enables archiving; off, on, or always
 				# (change requires restart)
+#archive_library = 'shell'	# library to use to archive a logfile segment
 #archive_command = ''		# command to use to archive a logfile segment
 				# placeholders: %p = path of file to archive
 				#               %f = file name only
diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h
index bb0c52686a..85114b2e5f 100644
--- a/src/include/access/xlog.h
+++ b/src/include/access/xlog.h
@@ -155,7 +155,6 @@ extern PGDLLIMPORT int wal_level;
 /* Is WAL archiving enabled always (even during recovery)? */
 #define XLogArchivingAlways() \
 	(AssertMacro(XLogArchiveMode == ARCHIVE_MODE_OFF || wal_level >= WAL_LEVEL_REPLICA), XLogArchiveMode == ARCHIVE_MODE_ALWAYS)
-#define XLogArchiveCommandSet() (XLogArchiveCommand[0] != '\0')
 
 /*
  * Is WAL-logging necessary for archival or log-shipping, or can we skip
diff --git a/src/include/postmaster/pgarch.h b/src/include/postmaster/pgarch.h
index 991a6d0616..732b12c0ba 100644
--- a/src/include/postmaster/pgarch.h
+++ b/src/include/postmaster/pgarch.h
@@ -33,7 +33,55 @@ extern void PgArchiverMain(void) pg_attribute_noreturn();
 extern void PgArchWakeup(void);
 extern void PgArchForceDirScan(void);
 
-/* in shell_archive.c */
-extern bool shell_archive_file(const char *file, const char *path);
+/*
+ * The value of the archive_library GUC.
+ */
+extern char *XLogArchiveLibrary;
+
+/*
+ * Callback that gets called to determine if the archive module is
+ * configured.
+ */
+typedef bool (*ArchiveCheckConfiguredCB) (void);
+
+/*
+ * Callback called to archive a single WAL file.
+ */
+typedef bool (*ArchiveFileCB) (const char *file, const char *path);
+
+/*
+ * Called to shutdown an archive module.
+ */
+typedef void (*ArchiveShutdownCB) (void);
+
+/*
+ * Archive module callbacks
+ */
+typedef struct ArchiveModuleCallbacks
+{
+	ArchiveCheckConfiguredCB check_configured_cb;
+	ArchiveFileCB archive_file_cb;
+	ArchiveShutdownCB shutdown_cb;
+} ArchiveModuleCallbacks;
+
+/*
+ * Type of the shared library symbol _PG_archive_module_init that is looked
+ * up when loading an archive library.
+ */
+typedef void (*ArchiveModuleInit) (ArchiveModuleCallbacks *cb);
+
+/*
+ * Since the logic for archiving via a shell command is in the core server
+ * and does not need to be loaded via a shared library, it has a special
+ * initialization function.
+ */
+extern void shell_archive_init(ArchiveModuleCallbacks *cb);
+
+/*
+ * We consider archiving via shell to be enabled if archive_library is
+ * empty or if archive_library is set to "shell".
+ */
+#define ShellArchivingEnabled() \
+	(XLogArchiveLibrary[0] == '\0' || strcmp(XLogArchiveLibrary, "shell") == 0)
 
 #endif							/* _PGARCH_H */
-- 
2.25.1

