diff --git a/src/backend/access/rmgrdesc/standbydesc.c b/src/backend/access/rmgrdesc/standbydesc.c index d849f8e54ba8..81eff5f31c4f 100644 --- a/src/backend/access/rmgrdesc/standbydesc.c +++ b/src/backend/access/rmgrdesc/standbydesc.c @@ -132,6 +132,8 @@ standby_desc_invalidations(StringInfo buf, appendStringInfo(buf, " relmap db %u", msg->rm.dbId); else if (msg->id == SHAREDINVALSNAPSHOT_ID) appendStringInfo(buf, " snapshot %u", msg->sn.relId); + else if (msg->id == SHAREDINVALRELSYNC_ID) + appendStringInfo(buf, " relsync %u", msg->rs.relid); else appendStringInfo(buf, " unrecognized id %d", msg->id); } diff --git a/src/backend/commands/alter.c b/src/backend/commands/alter.c index 78c1d4e1b848..3b7114b85946 100644 --- a/src/backend/commands/alter.c +++ b/src/backend/commands/alter.c @@ -338,6 +338,22 @@ AlterObjectRename_internal(Relation rel, Oid objectId, const char *new_name) InvokeObjectPostAlterHook(classId, objectId, 0); + /* Do post catalog-update tasks */ + if (classId == PublicationRelationId) + { + Form_pg_publication pub = (Form_pg_publication) GETSTRUCT(oldtup); + + /* + * Invalidate relsynccache entries. + * + * Unlike ALTER PUBLICATION ADD/SET/DROP commands, renaming a + * publication does not impact the publication status of tables. So, we + * don't need to invalidate relcache to rebuild the rd_pubdesc. + * Instead, we invalidate only the relsyncache. + */ + InvalidatePubRelSyncCache(pub->oid, pub->puballtables); + } + /* Release memory */ pfree(values); pfree(nulls); diff --git a/src/backend/commands/publicationcmds.c b/src/backend/commands/publicationcmds.c index 150a768d16f4..94a5a2b6c767 100644 --- a/src/backend/commands/publicationcmds.c +++ b/src/backend/commands/publicationcmds.c @@ -491,6 +491,45 @@ pub_contains_invalid_column(Oid pubid, Relation relation, List *ancestors, return *invalid_column_list || *invalid_gen_col; } +/* + * Invalidate entries in the RelationSyncCache for relations included in the + * specified publication, either via FOR TABLE or FOR TABLES IN SCHEMA. + * + * If 'puballtables' is true, invalidate all cache entries. + */ +void +InvalidatePubRelSyncCache(Oid pubid, bool puballtables) +{ + if (puballtables) + { + CacheInvalidateRelSyncAll(); + } + else + { + List *relids = NIL; + List *schemarelids = NIL; + + /* + * For partitioned tables, we must invalidate all partitions and + * itself. WAL records for INSERT/UPDATE/DELETE specify leaf + * tables as a target. However, WAL records for TRUNCATE specify + * both a root and its leaves. + */ + relids = GetPublicationRelations(pubid, + PUBLICATION_PART_ALL); + schemarelids = GetAllSchemaPublicationRelations(pubid, + PUBLICATION_PART_ALL); + + relids = list_concat_unique_oid(relids, schemarelids); + + /* Invalidate the relsyncache */ + foreach_oid(relid, relids) + CacheInvalidateRelSync(relid); + } + + return; +} + /* check_functions_in_node callback */ static bool contain_mutable_or_user_functions_checker(Oid func_id, void *context) diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c index 9063af6e1df1..ed806c543004 100644 --- a/src/backend/replication/pgoutput/pgoutput.c +++ b/src/backend/replication/pgoutput/pgoutput.c @@ -531,6 +531,8 @@ pgoutput_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt, CacheRegisterSyscacheCallback(PUBLICATIONOID, publication_invalidation_cb, (Datum) 0); + CacheRegisterRelSyncCallback(rel_sync_cache_relation_cb, + (Datum) 0); publication_callback_registered = true; } @@ -1789,12 +1791,6 @@ static void publication_invalidation_cb(Datum arg, int cacheid, uint32 hashvalue) { publications_valid = false; - - /* - * Also invalidate per-relation cache so that next time the filtering info - * is checked it will be updated with the new publication settings. - */ - rel_sync_cache_publication_cb(arg, cacheid, hashvalue); } /* diff --git a/src/backend/utils/cache/inval.c b/src/backend/utils/cache/inval.c index 700ccb6df9b8..4eb67720737e 100644 --- a/src/backend/utils/cache/inval.c +++ b/src/backend/utils/cache/inval.c @@ -271,6 +271,7 @@ int debug_discard_caches = 0; #define MAX_SYSCACHE_CALLBACKS 64 #define MAX_RELCACHE_CALLBACKS 10 +#define MAX_RELSYNC_CALLBACKS 10 static struct SYSCACHECALLBACK { @@ -292,6 +293,15 @@ static struct RELCACHECALLBACK static int relcache_callback_count = 0; +static struct RELSYNCCALLBACK +{ + RelSyncCallbackFunction function; + Datum arg; +} relsync_callback_list[MAX_RELSYNC_CALLBACKS]; + +static int relsync_callback_count = 0; + + /* ---------------------------------------------------------------- * Invalidation subgroup support functions * ---------------------------------------------------------------- @@ -484,6 +494,36 @@ AddRelcacheInvalidationMessage(InvalidationMsgsGroup *group, AddInvalidationMessage(group, RelCacheMsgs, &msg); } +/* + * Add a relsync inval entry + * + * We put these into the relcache subgroup for simplicity. This message is the + * same as AddRelcacheInvalidationMessage() except that it is for + * RelationSyncCache maintained by decoding plugin pgoutput. + */ +static void +AddRelsyncInvalidationMessage(InvalidationMsgsGroup *group, + Oid dbId, Oid relId) +{ + SharedInvalidationMessage msg; + + /* Don't add a duplicate item. */ + ProcessMessageSubGroup(group, RelCacheMsgs, + if (msg->rc.id == SHAREDINVALRELSYNC_ID && + (msg->rc.relId == relId || + msg->rc.relId == InvalidOid)) + return); + + /* OK, add the item */ + msg.rc.id = SHAREDINVALRELSYNC_ID; + msg.rc.dbId = dbId; + msg.rc.relId = relId; + /* check AddCatcacheInvalidationMessage() for an explanation */ + VALGRIND_MAKE_MEM_DEFINED(&msg, sizeof(msg)); + + AddInvalidationMessage(group, RelCacheMsgs, &msg); +} + /* * Add a snapshot inval entry * @@ -611,6 +651,17 @@ RegisterRelcacheInvalidation(InvalidationInfo *info, Oid dbId, Oid relId) info->RelcacheInitFileInval = true; } +/* + * RegisterRelsyncInvalidation + * + * As above, but register a relsynccache invalidation event. + */ +static void +RegisterRelsyncInvalidation(InvalidationInfo *info, Oid dbId, Oid relId) +{ + AddRelsyncInvalidationMessage(&info->CurrentCmdInvalidMsgs, dbId, relId); +} + /* * RegisterSnapshotInvalidation * @@ -751,6 +802,13 @@ InvalidateSystemCachesExtended(bool debug_discard) ccitem->function(ccitem->arg, InvalidOid); } + + for (i = 0; i < relsync_callback_count; i++) + { + struct RELSYNCCALLBACK *ccitem = relsync_callback_list + i; + + ccitem->function(ccitem->arg, InvalidOid); + } } /* @@ -832,6 +890,12 @@ LocalExecuteInvalidationMessage(SharedInvalidationMessage *msg) else if (msg->sn.dbId == MyDatabaseId) InvalidateCatalogSnapshot(); } + else if (msg->id == SHAREDINVALRELSYNC_ID) + { + /* We only care about our own database */ + if (msg->rs.dbId == MyDatabaseId) + CallRelSyncCallbacks(msg->rs.relid); + } else elog(FATAL, "unrecognized SI message ID: %d", msg->id); } @@ -1621,6 +1685,32 @@ CacheInvalidateRelcacheByRelid(Oid relid) ReleaseSysCache(tup); } +/* + * CacheInvalidateRelSync + * Register invalidation of the cache in logical decoding output plugin + * for a database. + * + * This type of invalidation message is used for the specific purpose of output + * plugins. Processes which do not decode WALs would do nothing even when it + * receives the message. + */ +void +CacheInvalidateRelSync(Oid relid) +{ + RegisterRelsyncInvalidation(PrepareInvalidationState(), + MyDatabaseId, relid); +} + +/* + * CacheInvalidateRelSyncAll + * Register invalidation of the whole cache in logical decoding output + * plugin. + */ +void +CacheInvalidateRelSyncAll(void) +{ + CacheInvalidateRelSync(InvalidOid); +} /* * CacheInvalidateSmgr @@ -1763,6 +1853,27 @@ CacheRegisterRelcacheCallback(RelcacheCallbackFunction func, ++relcache_callback_count; } +/* + * CacheRegisterRelSyncCallback + * Register the specified function to be called for all future + * relsynccache invalidation events. + * + * This function is intended to be call from the logical decoding output + * plugins. + */ +void +CacheRegisterRelSyncCallback(RelSyncCallbackFunction func, + Datum arg) +{ + if (relsync_callback_count >= MAX_RELSYNC_CALLBACKS) + elog(FATAL, "out of relsync_callback_list slots"); + + relsync_callback_list[relsync_callback_count].function = func; + relsync_callback_list[relsync_callback_count].arg = arg; + + ++relsync_callback_count; +} + /* * CallSyscacheCallbacks * @@ -1788,6 +1899,20 @@ CallSyscacheCallbacks(int cacheid, uint32 hashvalue) } } +/* + * CallSyscacheCallbacks + */ +void +CallRelSyncCallbacks(Oid relid) +{ + for (int i = 0; i < relsync_callback_count; i++) + { + struct RELSYNCCALLBACK *ccitem = relsync_callback_list + i; + + ccitem->function(ccitem->arg, relid); + } +} + /* * LogLogicalInvalidations * diff --git a/src/include/commands/publicationcmds.h b/src/include/commands/publicationcmds.h index e11a942ea0fd..e41df6db0389 100644 --- a/src/include/commands/publicationcmds.h +++ b/src/include/commands/publicationcmds.h @@ -38,5 +38,6 @@ extern bool pub_contains_invalid_column(Oid pubid, Relation relation, char pubgencols_type, bool *invalid_column_list, bool *invalid_gen_col); +extern void InvalidatePubRelSyncCache(Oid pubid, bool puballtables); #endif /* PUBLICATIONCMDS_H */ diff --git a/src/include/pg_config_manual.h b/src/include/pg_config_manual.h index 449e50bd78c2..125d3eb5fff5 100644 --- a/src/include/pg_config_manual.h +++ b/src/include/pg_config_manual.h @@ -282,10 +282,10 @@ /* * For cache-invalidation debugging, define DISCARD_CACHES_ENABLED to enable - * use of the debug_discard_caches GUC to aggressively flush syscache/relcache - * entries whenever it's possible to deliver invalidations. See - * AcceptInvalidationMessages() in src/backend/utils/cache/inval.c for - * details. + * use of the debug_discard_caches GUC to aggressively flush + * syscache/relcache/relsynccache entries whenever it's possible to deliver + * invalidations. See AcceptInvalidationMessages() in + * src/backend/utils/cache/inval.c for details. * * USE_ASSERT_CHECKING builds default to enabling this. It's possible to use * DISCARD_CACHES_ENABLED without a cassert build and the implied diff --git a/src/include/storage/sinval.h b/src/include/storage/sinval.h index 2463c0f9fac0..f168b5fbf8c0 100644 --- a/src/include/storage/sinval.h +++ b/src/include/storage/sinval.h @@ -27,6 +27,7 @@ * * invalidate an smgr cache entry for a specific physical relation * * invalidate the mapped-relation mapping for a given database * * invalidate any saved snapshot that might be used to scan a given relation + * * invalidate a RelationSyncCache entry for a specific relation * More types could be added if needed. The message type is identified by * the first "int8" field of the message struct. Zero or positive means a * specific-catcache inval message (and also serves as the catcache ID field). @@ -46,12 +47,12 @@ * catcache inval messages must be generated for each of its caches, since * the hash keys will generally be different. * - * Catcache, relcache, and snapshot invalidations are transactional, and so - * are sent to other backends upon commit. Internally to the generating - * backend, they are also processed at CommandCounterIncrement so that later - * commands in the same transaction see the new state. The generating backend - * also has to process them at abort, to flush out any cache state it's loaded - * from no-longer-valid entries. + * Catcache, relcache, relsynccache, and snapshot invalidations are + * transactional, and so are sent to other backends upon commit. Internally + * to the generating backend, they are also processed at + * CommandCounterIncrement so that later commands in the same transaction see + * the new state. The generating backend also has to process them at abort, + * to flush out any cache state it's loaded from no-longer-valid entries. * * smgr and relation mapping invalidations are non-transactional: they are * sent immediately when the underlying file change is made. @@ -110,6 +111,15 @@ typedef struct Oid relId; /* relation ID */ } SharedInvalSnapshotMsg; +#define SHAREDINVALRELSYNC_ID (-6) + +typedef struct +{ + int8 id; /* type field --- must be first */ + Oid dbId; /* database ID */ + Oid relid; /* relation ID, or 0 if whole RelationSyncCache */ +} SharedInvalRelSyncMsg; + typedef union { int8 id; /* type field --- must be first */ @@ -119,6 +129,7 @@ typedef union SharedInvalSmgrMsg sm; SharedInvalRelmapMsg rm; SharedInvalSnapshotMsg sn; + SharedInvalRelSyncMsg rs; } SharedInvalidationMessage; diff --git a/src/include/utils/inval.h b/src/include/utils/inval.h index 40658ba2ffcb..9b871caef622 100644 --- a/src/include/utils/inval.h +++ b/src/include/utils/inval.h @@ -22,6 +22,7 @@ extern PGDLLIMPORT int debug_discard_caches; typedef void (*SyscacheCallbackFunction) (Datum arg, int cacheid, uint32 hashvalue); typedef void (*RelcacheCallbackFunction) (Datum arg, Oid relid); +typedef void (*RelSyncCallbackFunction) (Datum arg, Oid relid); extern void AcceptInvalidationMessages(void); @@ -55,6 +56,10 @@ extern void CacheInvalidateRelcacheByTuple(HeapTuple classTuple); extern void CacheInvalidateRelcacheByRelid(Oid relid); +extern void CacheInvalidateRelSync(Oid relid); + +extern void CacheInvalidateRelSyncAll(void); + extern void CacheInvalidateSmgr(RelFileLocatorBackend rlocator); extern void CacheInvalidateRelmap(Oid databaseId); @@ -66,8 +71,13 @@ extern void CacheRegisterSyscacheCallback(int cacheid, extern void CacheRegisterRelcacheCallback(RelcacheCallbackFunction func, Datum arg); +extern void CacheRegisterRelSyncCallback(RelSyncCallbackFunction func, + Datum arg); + extern void CallSyscacheCallbacks(int cacheid, uint32 hashvalue); +extern void CallRelSyncCallbacks(Oid relid); + extern void InvalidateSystemCaches(void); extern void InvalidateSystemCachesExtended(bool debug_discard); diff --git a/src/test/subscription/t/007_ddl.pl b/src/test/subscription/t/007_ddl.pl index 4d3b917ac04a..e663e9f5158c 100644 --- a/src/test/subscription/t/007_ddl.pl +++ b/src/test/subscription/t/007_ddl.pl @@ -69,6 +69,87 @@ "Alter subscription set publication throws warning for non-existent publication" ); +# Cleanup +$node_publisher->safe_psql('postgres', "DROP PUBLICATION mypub;"); +$node_subscriber->safe_psql('postgres', "DROP SUBSCRIPTION mysub1"); + +# +# Test ALTER PUBLICATION RENAME command during the replication +# + +pass "renaming publications can work"; + +# Test function for swaping name of publications +sub test_swap +{ + my ($table_name, $pubname, $appname) = @_; + + # Confirms tuples can be replicated + $node_publisher->safe_psql('postgres', "INSERT INTO $table_name VALUES (1);"); + $node_publisher->wait_for_catchup($appname); + my $result = + $node_subscriber->safe_psql('postgres', "SELECT a FROM $table_name"); + is($result, qq(1), 'check replication worked well'); + + # Swap the name of publications; $pubname <-> pub_empty + $node_publisher->safe_psql('postgres', qq[ + ALTER PUBLICATION $pubname RENAME TO tap_pub_tmp; + ALTER PUBLICATION pub_empty RENAME TO $pubname; + ALTER PUBLICATION tap_pub_tmp RENAME TO pub_empty; + ]); + + # Insert the data again + $node_publisher->safe_psql('postgres', "INSERT INTO $table_name VALUES (2);"); + $node_publisher->wait_for_catchup($appname); + + # Confirms the second tuple won't be replicated because $pubname does not + # contains relations anymore. + $result = + $node_subscriber->safe_psql('postgres', "SELECT a FROM $table_name ORDER BY a"); + is($result, qq(1), + 'check the tuple inserted after the RENAME was not replicated'); + + # Swap the name of publications again + $node_publisher->safe_psql('postgres', qq[ + ALTER PUBLICATION $pubname RENAME TO tap_pub_tmp; + ALTER PUBLICATION pub_empty RENAME TO $pubname; + ALTER PUBLICATION tap_pub_tmp RENAME TO pub_empty; + ]); + + # Confirms the replication is now resumed + $node_publisher->safe_psql('postgres', "INSERT INTO $table_name VALUES (3);"); + $result = + $node_subscriber->safe_psql('postgres', "SELECT a FROM $table_name ORDER BY a"); + is($result, qq(1 +3), 'check replicated resumed after renaming gain'); +} + +# Create another table +$ddl = "CREATE TABLE test2 (a int, b text);"; +$node_publisher->safe_psql('postgres', $ddl); +$node_subscriber->safe_psql('postgres', $ddl); + +# Create publications and a subscription +$node_publisher->safe_psql('postgres', qq[ + CREATE PUBLICATION pub_empty; + CREATE PUBLICATION pub_for_tab FOR TABLE test1; + CREATE PUBLICATION pub_for_all_tables FOR ALL TABLES; +]); +$node_subscriber->safe_psql('postgres', + "CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr' PUBLICATION pub_for_tab WITH (copy_data = off)" +); + +# Confirms RENAME command works well for a publication +test_swap('test1', 'pub_for_tab', 'tap_sub'); + +# Switches a publication which includes all tables +$node_subscriber->safe_psql('postgres', + "ALTER SUBSCRIPTION tap_sub SET PUBLICATION pub_for_all_tables WITH (refresh = true, copy_data = false);" +); + +# Confirms RENAME command works well for ALL TABLES publication +test_swap('test2', 'pub_for_all_tables', 'tap_sub'); + $node_subscriber->stop; $node_publisher->stop;