From a0d9494b6fa8949d0a45d688feffbad4ace91027 Mon Sep 17 00:00:00 2001 From: Hayato Kuroda Date: Fri, 7 Mar 2025 09:38:40 +0900 Subject: [PATCH] Avoid invalidating all entries when pg_namespace is modified --- src/backend/replication/pgoutput/pgoutput.c | 36 +++++++++++---------- 1 file changed, 19 insertions(+), 17 deletions(-) diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c index 693a766e6d75..aa4e278f59db 100644 --- a/src/backend/replication/pgoutput/pgoutput.c +++ b/src/backend/replication/pgoutput/pgoutput.c @@ -185,6 +185,9 @@ typedef struct RelationSyncEntry * row filter expressions, column list, etc. */ MemoryContext entry_cxt; + + /* Hash value of schema OID. Used for pg_namespace syscache callback */ + uint32 schema_hashvalue; } RelationSyncEntry; /* @@ -227,8 +230,7 @@ static void send_relation_and_attrs(Relation relation, TransactionId xid, LogicalDecodingContext *ctx, RelationSyncEntry *relentry); static void rel_sync_cache_relation_cb(Datum arg, Oid relid); -static void rel_sync_cache_publication_cb(Datum arg, int cacheid, - uint32 hashvalue); +static void rel_sync_cache_schema_cb(Datum arg, int cacheid, uint32 hashvalue); static void set_schema_sent_in_streamed_txn(RelationSyncEntry *entry, TransactionId xid); static bool get_schema_sent_in_streamed_txn(RelationSyncEntry *entry, @@ -1974,18 +1976,11 @@ init_rel_sync_cache(MemoryContext cachectx) CacheRegisterRelcacheCallback(rel_sync_cache_relation_cb, (Datum) 0); /* - * Flush all cache entries after a pg_namespace change, in case it was a + * Flush related entries after a pg_namespace change, in case it was a * schema rename affecting a relation being replicated. - * - * XXX: It is not a good idea to invalidate all the relation entries in - * RelationSyncCache on schema rename. We can optimize it to invalidate - * only the required relations by either having a specific invalidation - * message containing impacted relations or by having schema information - * in each RelationSyncCache entry and using hashvalue of pg_namespace.oid - * passed to the callback. */ CacheRegisterSyscacheCallback(NAMESPACEOID, - rel_sync_cache_publication_cb, + rel_sync_cache_schema_cb, (Datum) 0); relation_callbacks_registered = true; @@ -2057,6 +2052,7 @@ get_rel_sync_entry(PGOutputData *data, Relation relation) entry->publish_as_relid = InvalidOid; entry->columns = NULL; entry->attrmap = NULL; + entry->schema_hashvalue = 0; } /* Validate the entry */ @@ -2303,6 +2299,10 @@ get_rel_sync_entry(PGOutputData *data, Relation relation) list_free(schemaPubids); list_free(rel_publications); + entry->schema_hashvalue = + GetSysCacheHashValue1(NAMESPACEOID, + ObjectIdGetDatum(RelationGetNamespace(relation))); + entry->replicate_valid = true; } @@ -2401,12 +2401,12 @@ rel_sync_cache_relation_cb(Datum arg, Oid relid) } /* - * Publication relation/schema map syscache invalidation callback + * schema syscache invalidation callback * * Called for invalidations on pg_namespace. */ -static void -rel_sync_cache_publication_cb(Datum arg, int cacheid, uint32 hashvalue) +void +rel_sync_cache_schema_cb(Datum arg, int cacheid, uint32 hashvalue) { HASH_SEQ_STATUS status; RelationSyncEntry *entry; @@ -2420,13 +2420,15 @@ rel_sync_cache_publication_cb(Datum arg, int cacheid, uint32 hashvalue) return; /* - * We have no easy way to identify which cache entries this invalidation - * event might have affected, so just mark them all invalid. + * Identify entries which belongs to the specified schema, and invalidate + * it. */ hash_seq_init(&status, RelationSyncCache); while ((entry = (RelationSyncEntry *) hash_seq_search(&status)) != NULL) { - entry->replicate_valid = false; + /* hashvalue == 0 means a cache reset, must clear all state */ + if (hashvalue == 0 || entry->schema_hashvalue == hashvalue) + entry->replicate_valid = false; } }