diff options
author | Tomas Vondra | 2022-03-25 23:45:21 +0000 |
---|---|---|
committer | Tomas Vondra | 2022-03-26 00:01:27 +0000 |
commit | 923def9a533a7d986acfb524139d8b9e5466d0a5 (patch) | |
tree | b6ce8d5bfe8d932e3cc89e52aba68519558e8033 /src/backend/commands/publicationcmds.c | |
parent | 05843b1aa49df2ecc9b97c693b755bd1b6f856a9 (diff) |
Allow specifying column lists for logical replication
This allows specifying an optional column list when adding a table to
logical replication. The column list may be specified after the table
name, enclosed in parentheses. Columns not included in this list are not
sent to the subscriber, allowing the schema on the subscriber to be a
subset of the publisher schema.
For UPDATE/DELETE publications, the column list needs to cover all
REPLICA IDENTITY columns. For INSERT publications, the column list is
arbitrary and may omit some REPLICA IDENTITY columns. Furthermore, if
the table uses REPLICA IDENTITY FULL, column list is not allowed.
The column list can contain only simple column references. Complex
expressions, function calls etc. are not allowed. This restriction could
be relaxed in the future.
During the initial table synchronization, only columns included in the
column list are copied to the subscriber. If the subscription has
several publications, containing the same table with different column
lists, columns specified in any of the lists will be copied.
This means all columns are replicated if the table has no column list
at all (which is treated as column list with all columns), or when of
the publications is defined as FOR ALL TABLES (possibly IN SCHEMA that
matches the schema of the table).
For partitioned tables, publish_via_partition_root determines whether
the column list for the root or the leaf relation will be used. If the
parameter is 'false' (the default), the list defined for the leaf
relation is used. Otherwise, the column list for the root partition
will be used.
Psql commands \dRp+ and \d <table-name> now display any column lists.
Author: Tomas Vondra, Alvaro Herrera, Rahila Syed
Reviewed-by: Peter Eisentraut, Alvaro Herrera, Vignesh C, Ibrar Ahmed,
Amit Kapila, Hou zj, Peter Smith, Wang wei, Tang, Shi yu
Discussion: https://postgr.es/m/CAH2L28vddB_NFdRVpuyRBJEBWjz4BSyTB=_ektNRH8NJ1jf95g@mail.gmail.com
Diffstat (limited to 'src/backend/commands/publicationcmds.c')
-rw-r--r-- | src/backend/commands/publicationcmds.c | 265 |
1 files changed, 254 insertions, 11 deletions
diff --git a/src/backend/commands/publicationcmds.c b/src/backend/commands/publicationcmds.c index e449e8e8f29..84e37df783b 100644 --- a/src/backend/commands/publicationcmds.c +++ b/src/backend/commands/publicationcmds.c @@ -342,7 +342,7 @@ contain_invalid_rfcolumn_walker(Node *node, rf_context *context) * Returns true if any invalid column is found. */ bool -contain_invalid_rfcolumn(Oid pubid, Relation relation, List *ancestors, +pub_rf_contains_invalid_column(Oid pubid, Relation relation, List *ancestors, bool pubviaroot) { HeapTuple rftuple; @@ -411,6 +411,114 @@ contain_invalid_rfcolumn(Oid pubid, Relation relation, List *ancestors, return result; } +/* + * Check if all columns referenced in the REPLICA IDENTITY are covered by + * the column list. + * + * Returns true if any replica identity column is not covered by column list. + */ +bool +pub_collist_contains_invalid_column(Oid pubid, Relation relation, List *ancestors, + bool pubviaroot) +{ + HeapTuple tuple; + Oid relid = RelationGetRelid(relation); + Oid publish_as_relid = RelationGetRelid(relation); + bool result = false; + Datum datum; + bool isnull; + + /* + * For a partition, if pubviaroot is true, find the topmost ancestor that + * is published via this publication as we need to use its column list + * for the changes. + * + * Note that even though the column list used is for an ancestor, the + * REPLICA IDENTITY used will be for the actual child table. + */ + if (pubviaroot && relation->rd_rel->relispartition) + { + publish_as_relid = GetTopMostAncestorInPublication(pubid, ancestors, NULL); + + if (!OidIsValid(publish_as_relid)) + publish_as_relid = relid; + } + + tuple = SearchSysCache2(PUBLICATIONRELMAP, + ObjectIdGetDatum(publish_as_relid), + ObjectIdGetDatum(pubid)); + + if (!HeapTupleIsValid(tuple)) + return false; + + datum = SysCacheGetAttr(PUBLICATIONRELMAP, tuple, + Anum_pg_publication_rel_prattrs, + &isnull); + + if (!isnull) + { + int x; + Bitmapset *idattrs; + Bitmapset *columns = NULL; + + /* With REPLICA IDENTITY FULL, no column list is allowed. */ + if (relation->rd_rel->relreplident == REPLICA_IDENTITY_FULL) + result = true; + + /* Transform the column list datum to a bitmapset. */ + columns = pub_collist_to_bitmapset(NULL, datum, NULL); + + /* Remember columns that are part of the REPLICA IDENTITY */ + idattrs = RelationGetIndexAttrBitmap(relation, + INDEX_ATTR_BITMAP_IDENTITY_KEY); + + /* + * Attnums in the bitmap returned by RelationGetIndexAttrBitmap are + * offset (to handle system columns the usual way), while column list + * does not use offset, so we can't do bms_is_subset(). Instead, we have + * to loop over the idattrs and check all of them are in the list. + */ + x = -1; + while ((x = bms_next_member(idattrs, x)) >= 0) + { + AttrNumber attnum = (x + FirstLowInvalidHeapAttributeNumber); + + /* + * If pubviaroot is true, we are validating the column list of the + * parent table, but the bitmap contains the replica identity + * information of the child table. The parent/child attnums may not + * match, so translate them to the parent - get the attname from + * the child, and look it up in the parent. + */ + if (pubviaroot) + { + /* attribute name in the child table */ + char *colname = get_attname(relid, attnum, false); + + /* + * Determine the attnum for the attribute name in parent (we + * are using the column list defined on the parent). + */ + attnum = get_attnum(publish_as_relid, colname); + } + + /* replica identity column, not covered by the column list */ + if (!bms_is_member(attnum, columns)) + { + result = true; + break; + } + } + + bms_free(idattrs); + bms_free(columns); + } + + ReleaseSysCache(tuple); + + return result; +} + /* check_functions_in_node callback */ static bool contain_mutable_or_user_functions_checker(Oid func_id, void *context) @@ -652,6 +760,39 @@ TransformPubWhereClauses(List *tables, const char *queryString, } } + +/* + * Check the publication column lists expression for all relations in the list. + */ +static void +CheckPubRelationColumnList(List *tables, const char *queryString, + bool pubviaroot) +{ + ListCell *lc; + + foreach(lc, tables) + { + PublicationRelInfo *pri = (PublicationRelInfo *) lfirst(lc); + + if (pri->columns == NIL) + continue; + + /* + * If the publication doesn't publish changes via the root partitioned + * table, the partition's column list will be used. So disallow using + * the column list on partitioned table in this case. + */ + if (!pubviaroot && + pri->relation->rd_rel->relkind == RELKIND_PARTITIONED_TABLE) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("cannot use publication column list for relation \"%s\"", + RelationGetRelationName(pri->relation)), + errdetail("column list cannot be used for a partitioned table when %s is false.", + "publish_via_partition_root"))); + } +} + /* * Create new publication. */ @@ -808,6 +949,9 @@ CreatePublication(ParseState *pstate, CreatePublicationStmt *stmt) TransformPubWhereClauses(rels, pstate->p_sourcetext, publish_via_partition_root); + CheckPubRelationColumnList(rels, pstate->p_sourcetext, + publish_via_partition_root); + PublicationAddRelations(puboid, rels, true, NULL); CloseRelationList(rels); } @@ -895,8 +1039,8 @@ AlterPublicationOptions(ParseState *pstate, AlterPublicationStmt *stmt, /* * If the publication doesn't publish changes via the root partitioned - * table, the partition's row filter will be used. So disallow using WHERE - * clause on partitioned table in this case. + * table, the partition's row filter and column list will be used. So disallow + * using WHERE clause and column lists on partitioned table in this case. */ if (!pubform->puballtables && publish_via_partition_root_given && !publish_via_partition_root) @@ -904,7 +1048,8 @@ AlterPublicationOptions(ParseState *pstate, AlterPublicationStmt *stmt, /* * Lock the publication so nobody else can do anything with it. This * prevents concurrent alter to add partitioned table(s) with WHERE - * clause(s) which we don't allow when not publishing via root. + * clause(s) and/or column lists which we don't allow when not + * publishing via root. */ LockDatabaseObject(PublicationRelationId, pubform->oid, 0, AccessShareLock); @@ -917,13 +1062,21 @@ AlterPublicationOptions(ParseState *pstate, AlterPublicationStmt *stmt, { HeapTuple rftuple; Oid relid = lfirst_oid(lc); + bool has_column_list; + bool has_row_filter; rftuple = SearchSysCache2(PUBLICATIONRELMAP, ObjectIdGetDatum(relid), ObjectIdGetDatum(pubform->oid)); + has_row_filter + = !heap_attisnull(rftuple, Anum_pg_publication_rel_prqual, NULL); + + has_column_list + = !heap_attisnull(rftuple, Anum_pg_publication_rel_prattrs, NULL); + if (HeapTupleIsValid(rftuple) && - !heap_attisnull(rftuple, Anum_pg_publication_rel_prqual, NULL)) + (has_row_filter || has_column_list)) { HeapTuple tuple; @@ -932,7 +1085,8 @@ AlterPublicationOptions(ParseState *pstate, AlterPublicationStmt *stmt, { Form_pg_class relform = (Form_pg_class) GETSTRUCT(tuple); - if (relform->relkind == RELKIND_PARTITIONED_TABLE) + if ((relform->relkind == RELKIND_PARTITIONED_TABLE) && + has_row_filter) ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), errmsg("cannot set %s for publication \"%s\"", @@ -943,6 +1097,18 @@ AlterPublicationOptions(ParseState *pstate, AlterPublicationStmt *stmt, NameStr(relform->relname), "publish_via_partition_root"))); + if ((relform->relkind == RELKIND_PARTITIONED_TABLE) && + has_column_list) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("cannot set %s for publication \"%s\"", + "publish_via_partition_root = false", + stmt->pubname), + errdetail("The publication contains a column list for a partitioned table \"%s\" " + "which is not allowed when %s is false.", + NameStr(relform->relname), + "publish_via_partition_root"))); + ReleaseSysCache(tuple); } @@ -1107,6 +1273,8 @@ AlterPublicationTables(AlterPublicationStmt *stmt, HeapTuple tup, TransformPubWhereClauses(rels, queryString, pubform->pubviaroot); + CheckPubRelationColumnList(rels, queryString, pubform->pubviaroot); + PublicationAddRelations(pubid, rels, false, stmt); } else if (stmt->action == AP_DropObjects) @@ -1124,6 +1292,8 @@ AlterPublicationTables(AlterPublicationStmt *stmt, HeapTuple tup, TransformPubWhereClauses(rels, queryString, pubform->pubviaroot); + CheckPubRelationColumnList(rels, queryString, pubform->pubviaroot); + /* * To recreate the relation list for the publication, look for * existing relations that do not need to be dropped. @@ -1135,42 +1305,79 @@ AlterPublicationTables(AlterPublicationStmt *stmt, HeapTuple tup, PublicationRelInfo *oldrel; bool found = false; HeapTuple rftuple; - bool rfisnull = true; Node *oldrelwhereclause = NULL; + Bitmapset *oldcolumns = NULL; /* look up the cache for the old relmap */ rftuple = SearchSysCache2(PUBLICATIONRELMAP, ObjectIdGetDatum(oldrelid), ObjectIdGetDatum(pubid)); + /* + * See if the existing relation currently has a WHERE clause or a + * column list. We need to compare those too. + */ if (HeapTupleIsValid(rftuple)) { + bool isnull = true; Datum whereClauseDatum; + Datum columnListDatum; + /* Load the WHERE clause for this table. */ whereClauseDatum = SysCacheGetAttr(PUBLICATIONRELMAP, rftuple, Anum_pg_publication_rel_prqual, - &rfisnull); - if (!rfisnull) + &isnull); + if (!isnull) oldrelwhereclause = stringToNode(TextDatumGetCString(whereClauseDatum)); + /* Transform the int2vector column list to a bitmap. */ + columnListDatum = SysCacheGetAttr(PUBLICATIONRELMAP, rftuple, + Anum_pg_publication_rel_prattrs, + &isnull); + + if (!isnull) + oldcolumns = pub_collist_to_bitmapset(NULL, columnListDatum, NULL); + ReleaseSysCache(rftuple); } foreach(newlc, rels) { PublicationRelInfo *newpubrel; + Oid newrelid; + Bitmapset *newcolumns = NULL; newpubrel = (PublicationRelInfo *) lfirst(newlc); + newrelid = RelationGetRelid(newpubrel->relation); + + /* + * If the new publication has column list, transform it to + * a bitmap too. + */ + if (newpubrel->columns) + { + ListCell *lc; + + foreach(lc, newpubrel->columns) + { + char *colname = strVal(lfirst(lc)); + AttrNumber attnum = get_attnum(newrelid, colname); + + newcolumns = bms_add_member(newcolumns, attnum); + } + } /* * Check if any of the new set of relations matches with the * existing relations in the publication. Additionally, if the * relation has an associated WHERE clause, check the WHERE - * expressions also match. Drop the rest. + * expressions also match. Same for the column list. Drop the + * rest. */ if (RelationGetRelid(newpubrel->relation) == oldrelid) { - if (equal(oldrelwhereclause, newpubrel->whereClause)) + if (equal(oldrelwhereclause, newpubrel->whereClause) && + bms_equal(oldcolumns, newcolumns)) { found = true; break; @@ -1186,6 +1393,7 @@ AlterPublicationTables(AlterPublicationStmt *stmt, HeapTuple tup, { oldrel = palloc(sizeof(PublicationRelInfo)); oldrel->whereClause = NULL; + oldrel->columns = NIL; oldrel->relation = table_open(oldrelid, ShareUpdateExclusiveLock); delrels = lappend(delrels, oldrel); @@ -1401,6 +1609,7 @@ AlterPublicationSequences(AlterPublicationStmt *stmt, HeapTuple tup, { oldrel = palloc(sizeof(PublicationRelInfo)); oldrel->whereClause = NULL; + oldrel->columns = NULL; oldrel->relation = table_open(oldrelid, ShareUpdateExclusiveLock); delrels = lappend(delrels, oldrel); @@ -1660,6 +1869,7 @@ OpenRelationList(List *rels, char objectType) List *result = NIL; ListCell *lc; List *relids_with_rf = NIL; + List *relids_with_collist = NIL; /* * Open, share-lock, and check all the explicitly-specified relations @@ -1710,6 +1920,13 @@ OpenRelationList(List *rels, char objectType) errmsg("conflicting or redundant WHERE clauses for table \"%s\"", RelationGetRelationName(rel)))); + /* Disallow duplicate tables if there are any with column lists. */ + if (t->columns || list_member_oid(relids_with_collist, myrelid)) + ereport(ERROR, + (errcode(ERRCODE_DUPLICATE_OBJECT), + errmsg("conflicting or redundant column lists for table \"%s\"", + RelationGetRelationName(rel)))); + table_close(rel, ShareUpdateExclusiveLock); continue; } @@ -1717,12 +1934,16 @@ OpenRelationList(List *rels, char objectType) pub_rel = palloc(sizeof(PublicationRelInfo)); pub_rel->relation = rel; pub_rel->whereClause = t->whereClause; + pub_rel->columns = t->columns; result = lappend(result, pub_rel); relids = lappend_oid(relids, myrelid); if (t->whereClause) relids_with_rf = lappend_oid(relids_with_rf, myrelid); + if (t->columns) + relids_with_collist = lappend_oid(relids_with_collist, myrelid); + /* * Add children of this rel, if requested, so that they too are added * to the publication. A partitioned table can't have any inheritance @@ -1762,6 +1983,18 @@ OpenRelationList(List *rels, char objectType) errmsg("conflicting or redundant WHERE clauses for table \"%s\"", RelationGetRelationName(rel)))); + /* + * We don't allow to specify column list for both parent + * and child table at the same time as it is not very + * clear which one should be given preference. + */ + if (childrelid != myrelid && + (t->columns || list_member_oid(relids_with_collist, childrelid))) + ereport(ERROR, + (errcode(ERRCODE_DUPLICATE_OBJECT), + errmsg("conflicting or redundant column lists for table \"%s\"", + RelationGetRelationName(rel)))); + continue; } @@ -1771,11 +2004,16 @@ OpenRelationList(List *rels, char objectType) pub_rel->relation = rel; /* child inherits WHERE clause from parent */ pub_rel->whereClause = t->whereClause; + /* child inherits column list from parent */ + pub_rel->columns = t->columns; result = lappend(result, pub_rel); relids = lappend_oid(relids, childrelid); if (t->whereClause) relids_with_rf = lappend_oid(relids_with_rf, childrelid); + + if (t->columns) + relids_with_collist = lappend_oid(relids_with_collist, childrelid); } } } @@ -1884,6 +2122,11 @@ PublicationDropRelations(Oid pubid, List *rels, bool missing_ok) Relation rel = pubrel->relation; Oid relid = RelationGetRelid(rel); + if (pubrel->columns) + ereport(ERROR, + errcode(ERRCODE_SYNTAX_ERROR), + errmsg("column list must not be specified in ALTER PUBLICATION ... DROP")); + prid = GetSysCacheOid2(PUBLICATIONRELMAP, Anum_pg_publication_rel_oid, ObjectIdGetDatum(relid), ObjectIdGetDatum(pubid)); |