diff options
Diffstat (limited to 'src/backend')
| -rw-r--r-- | src/backend/catalog/pg_publication.c | 59 | ||||
| -rw-r--r-- | src/backend/commands/publicationcmds.c | 583 | ||||
| -rw-r--r-- | src/backend/executor/execReplication.c | 39 | ||||
| -rw-r--r-- | src/backend/nodes/copyfuncs.c | 1 | ||||
| -rw-r--r-- | src/backend/nodes/equalfuncs.c | 1 | ||||
| -rw-r--r-- | src/backend/parser/gram.y | 38 | ||||
| -rw-r--r-- | src/backend/replication/logical/proto.c | 36 | ||||
| -rw-r--r-- | src/backend/replication/logical/tablesync.c | 142 | ||||
| -rw-r--r-- | src/backend/replication/pgoutput/pgoutput.c | 833 | ||||
| -rw-r--r-- | src/backend/utils/cache/relcache.c | 98 |
10 files changed, 1621 insertions, 209 deletions
diff --git a/src/backend/catalog/pg_publication.c b/src/backend/catalog/pg_publication.c index e14ca2f5630..25998fbb39b 100644 --- a/src/backend/catalog/pg_publication.c +++ b/src/backend/catalog/pg_publication.c @@ -276,17 +276,56 @@ GetPubPartitionOptionRelations(List *result, PublicationPartOpt pub_partopt, } /* + * Returns the relid of the topmost ancestor that is published via this + * publication if any, otherwise returns InvalidOid. + * + * Note that the list of ancestors should be ordered such that the topmost + * ancestor is at the end of the list. + */ +Oid +GetTopMostAncestorInPublication(Oid puboid, List *ancestors) +{ + ListCell *lc; + Oid topmost_relid = InvalidOid; + + /* + * Find the "topmost" ancestor that is in this publication. + */ + foreach(lc, ancestors) + { + Oid ancestor = lfirst_oid(lc); + List *apubids = GetRelationPublications(ancestor); + List *aschemaPubids = NIL; + + if (list_member_oid(apubids, puboid)) + topmost_relid = ancestor; + else + { + aschemaPubids = GetSchemaPublications(get_rel_namespace(ancestor)); + if (list_member_oid(aschemaPubids, puboid)) + topmost_relid = ancestor; + } + + list_free(apubids); + list_free(aschemaPubids); + } + + return topmost_relid; +} + +/* * Insert new publication / relation mapping. */ ObjectAddress -publication_add_relation(Oid pubid, PublicationRelInfo *targetrel, +publication_add_relation(Oid pubid, PublicationRelInfo *pri, bool if_not_exists) { Relation rel; HeapTuple tup; Datum values[Natts_pg_publication_rel]; bool nulls[Natts_pg_publication_rel]; - Oid relid = RelationGetRelid(targetrel->relation); + Relation targetrel = pri->relation; + Oid relid = RelationGetRelid(targetrel); Oid pubreloid; Publication *pub = GetPublication(pubid); ObjectAddress myself, @@ -311,10 +350,10 @@ publication_add_relation(Oid pubid, PublicationRelInfo *targetrel, ereport(ERROR, (errcode(ERRCODE_DUPLICATE_OBJECT), errmsg("relation \"%s\" is already member of publication \"%s\"", - RelationGetRelationName(targetrel->relation), pub->name))); + RelationGetRelationName(targetrel), pub->name))); } - check_publication_add_relation(targetrel->relation); + check_publication_add_relation(targetrel); /* Form a tuple. */ memset(values, 0, sizeof(values)); @@ -328,6 +367,12 @@ publication_add_relation(Oid pubid, PublicationRelInfo *targetrel, values[Anum_pg_publication_rel_prrelid - 1] = ObjectIdGetDatum(relid); + /* Add qualifications, if available */ + if (pri->whereClause != NULL) + values[Anum_pg_publication_rel_prqual - 1] = CStringGetTextDatum(nodeToString(pri->whereClause)); + else + nulls[Anum_pg_publication_rel_prqual - 1] = true; + tup = heap_form_tuple(RelationGetDescr(rel), values, nulls); /* Insert tuple into catalog. */ @@ -345,6 +390,12 @@ publication_add_relation(Oid pubid, PublicationRelInfo *targetrel, ObjectAddressSet(referenced, RelationRelationId, relid); recordDependencyOn(&myself, &referenced, DEPENDENCY_AUTO); + /* Add dependency on the objects mentioned in the qualifications */ + if (pri->whereClause) + recordDependencyOnSingleRelExpr(&myself, pri->whereClause, relid, + DEPENDENCY_NORMAL, DEPENDENCY_NORMAL, + false); + /* Close the table. */ table_close(rel, RowExclusiveLock); diff --git a/src/backend/commands/publicationcmds.c b/src/backend/commands/publicationcmds.c index 0e4bb97fb73..16b8661a1b7 100644 --- a/src/backend/commands/publicationcmds.c +++ b/src/backend/commands/publicationcmds.c @@ -26,6 +26,7 @@ #include "catalog/partition.h" #include "catalog/pg_inherits.h" #include "catalog/pg_namespace.h" +#include "catalog/pg_proc.h" #include "catalog/pg_publication.h" #include "catalog/pg_publication_namespace.h" #include "catalog/pg_publication_rel.h" @@ -36,6 +37,10 @@ #include "commands/publicationcmds.h" #include "funcapi.h" #include "miscadmin.h" +#include "nodes/nodeFuncs.h" +#include "parser/parse_clause.h" +#include "parser/parse_collate.h" +#include "parser/parse_relation.h" #include "storage/lmgr.h" #include "utils/acl.h" #include "utils/array.h" @@ -48,6 +53,19 @@ #include "utils/syscache.h" #include "utils/varlena.h" +/* + * Information used to validate the columns in the row filter expression. See + * contain_invalid_rfcolumn_walker for details. + */ +typedef struct rf_context +{ + Bitmapset *bms_replident; /* bitset of replica identity columns */ + bool pubviaroot; /* true if we are validating the parent + * relation's row filter */ + Oid relid; /* relid of the relation */ + Oid parentid; /* relid of the parent relation */ +} rf_context; + static List *OpenRelIdList(List *relids); static List *OpenTableList(List *tables); static void CloseTableList(List *rels); @@ -235,6 +253,362 @@ CheckObjSchemaNotAlreadyInPublication(List *rels, List *schemaidlist, } /* + * Returns true if any of the columns used in the row filter WHERE expression is + * not part of REPLICA IDENTITY, false otherwise. + */ +static bool +contain_invalid_rfcolumn_walker(Node *node, rf_context *context) +{ + if (node == NULL) + return false; + + if (IsA(node, Var)) + { + Var *var = (Var *) node; + AttrNumber attnum = var->varattno; + + /* + * If pubviaroot is true, we are validating the row filter of the + * parent table, but the bitmap contains the replica identity + * information of the child table. So, get the column number of the + * child table as parent and child column order could be different. + */ + if (context->pubviaroot) + { + char *colname = get_attname(context->parentid, attnum, false); + + attnum = get_attnum(context->relid, colname); + } + + if (!bms_is_member(attnum - FirstLowInvalidHeapAttributeNumber, + context->bms_replident)) + return true; + } + + return expression_tree_walker(node, contain_invalid_rfcolumn_walker, + (void *) context); +} + +/* + * Check if all columns referenced in the filter expression are part of the + * REPLICA IDENTITY index or not. + * + * Returns true if any invalid column is found. + */ +bool +contain_invalid_rfcolumn(Oid pubid, Relation relation, List *ancestors, + bool pubviaroot) +{ + HeapTuple rftuple; + Oid relid = RelationGetRelid(relation); + Oid publish_as_relid = RelationGetRelid(relation); + bool result = false; + Datum rfdatum; + bool rfisnull; + + /* + * FULL means all columns are in the REPLICA IDENTITY, so all columns are + * allowed in the row filter and we can skip the validation. + */ + if (relation->rd_rel->relreplident == REPLICA_IDENTITY_FULL) + return false; + + /* + * For a partition, if pubviaroot is true, find the topmost ancestor that + * is published via this publication as we need to use its row filter + * expression to filter the partition's changes. + * + * Note that even though the row filter used is for an ancestor, the + * REPLICA IDENTITY used will be for the actual child table. + */ + if (pubviaroot && relation->rd_rel->relispartition) + { + publish_as_relid = GetTopMostAncestorInPublication(pubid, ancestors); + + if (!OidIsValid(publish_as_relid)) + publish_as_relid = relid; + } + + rftuple = SearchSysCache2(PUBLICATIONRELMAP, + ObjectIdGetDatum(publish_as_relid), + ObjectIdGetDatum(pubid)); + + if (!HeapTupleIsValid(rftuple)) + return false; + + rfdatum = SysCacheGetAttr(PUBLICATIONRELMAP, rftuple, + Anum_pg_publication_rel_prqual, + &rfisnull); + + if (!rfisnull) + { + rf_context context = {0}; + Node *rfnode; + Bitmapset *bms = NULL; + + context.pubviaroot = pubviaroot; + context.parentid = publish_as_relid; + context.relid = relid; + + /* Remember columns that are part of the REPLICA IDENTITY */ + bms = RelationGetIndexAttrBitmap(relation, + INDEX_ATTR_BITMAP_IDENTITY_KEY); + + context.bms_replident = bms; + rfnode = stringToNode(TextDatumGetCString(rfdatum)); + result = contain_invalid_rfcolumn_walker(rfnode, &context); + + bms_free(bms); + pfree(rfnode); + } + + ReleaseSysCache(rftuple); + + return result; +} + +/* check_functions_in_node callback */ +static bool +contain_mutable_or_user_functions_checker(Oid func_id, void *context) +{ + return (func_volatile(func_id) != PROVOLATILE_IMMUTABLE || + func_id >= FirstNormalObjectId); +} + +/* + * Check if the node contains any unallowed object. See + * check_simple_rowfilter_expr_walker. + * + * Returns the error detail message in errdetail_msg for unallowed expressions. + */ +static void +expr_allowed_in_node(Node *node, ParseState *pstate, char **errdetail_msg) +{ + if (IsA(node, List)) + { + /* + * OK, we don't need to perform other expr checks for List nodes + * because those are undefined for List. + */ + return; + } + + if (exprType(node) >= FirstNormalObjectId) + *errdetail_msg = _("User-defined types are not allowed."); + else if (check_functions_in_node(node, contain_mutable_or_user_functions_checker, + (void *) pstate)) + *errdetail_msg = _("User-defined or built-in mutable functions are not allowed."); + else if (exprCollation(node) >= FirstNormalObjectId || + exprInputCollation(node) >= FirstNormalObjectId) + *errdetail_msg = _("User-defined collations are not allowed."); +} + +/* + * The row filter walker checks if the row filter expression is a "simple + * expression". + * + * It allows only simple or compound expressions such as: + * - (Var Op Const) + * - (Var Op Var) + * - (Var Op Const) AND/OR (Var Op Const) + * - etc + * (where Var is a column of the table this filter belongs to) + * + * The simple expression has the following restrictions: + * - User-defined operators are not allowed; + * - User-defined functions are not allowed; + * - User-defined types are not allowed; + * - User-defined collations are not allowed; + * - Non-immutable built-in functions are not allowed; + * - System columns are not allowed. + * + * NOTES + * + * We don't allow user-defined functions/operators/types/collations because + * (a) if a user drops a user-defined object used in a row filter expression or + * if there is any other error while using it, the logical decoding + * infrastructure won't be able to recover from such an error even if the + * object is recreated again because a historic snapshot is used to evaluate + * the row filter; + * (b) a user-defined function can be used to access tables that could have + * unpleasant results because a historic snapshot is used. That's why only + * immutable built-in functions are allowed in row filter expressions. + * + * We don't allow system columns because currently, we don't have that + * information in the tuple passed to downstream. Also, as we don't replicate + * those to subscribers, there doesn't seem to be a need for a filter on those + * columns. + * + * We can allow other node types after more analysis and testing. + */ +static bool +check_simple_rowfilter_expr_walker(Node *node, ParseState *pstate) +{ + char *errdetail_msg = NULL; + + if (node == NULL) + return false; + + switch (nodeTag(node)) + { + case T_Var: + /* System columns are not allowed. */ + if (((Var *) node)->varattno < InvalidAttrNumber) + errdetail_msg = _("System columns are not allowed."); + break; + case T_OpExpr: + case T_DistinctExpr: + case T_NullIfExpr: + /* OK, except user-defined operators are not allowed. */ + if (((OpExpr *) node)->opno >= FirstNormalObjectId) + errdetail_msg = _("User-defined operators are not allowed."); + break; + case T_ScalarArrayOpExpr: + /* OK, except user-defined operators are not allowed. */ + if (((ScalarArrayOpExpr *) node)->opno >= FirstNormalObjectId) + errdetail_msg = _("User-defined operators are not allowed."); + + /* + * We don't need to check the hashfuncid and negfuncid of + * ScalarArrayOpExpr as those functions are only built for a + * subquery. + */ + break; + case T_RowCompareExpr: + { + ListCell *opid; + + /* OK, except user-defined operators are not allowed. */ + foreach(opid, ((RowCompareExpr *) node)->opnos) + { + if (lfirst_oid(opid) >= FirstNormalObjectId) + { + errdetail_msg = _("User-defined operators are not allowed."); + break; + } + } + } + break; + case T_Const: + case T_FuncExpr: + case T_BoolExpr: + case T_RelabelType: + case T_CollateExpr: + case T_CaseExpr: + case T_CaseTestExpr: + case T_ArrayExpr: + case T_RowExpr: + case T_CoalesceExpr: + case T_MinMaxExpr: + case T_XmlExpr: + case T_NullTest: + case T_BooleanTest: + case T_List: + /* OK, supported */ + break; + default: + errdetail_msg = _("Expressions only allow columns, constants, built-in operators, built-in data types, built-in collations and immutable built-in functions."); + break; + } + + /* + * For all the supported nodes, check the types, functions, and collations + * used in the nodes. + */ + if (!errdetail_msg) + expr_allowed_in_node(node, pstate, &errdetail_msg); + + if (errdetail_msg) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("invalid publication WHERE expression"), + errdetail("%s", errdetail_msg), + parser_errposition(pstate, exprLocation(node)))); + + return expression_tree_walker(node, check_simple_rowfilter_expr_walker, + (void *) pstate); +} + +/* + * Check if the row filter expression is a "simple expression". + * + * See check_simple_rowfilter_expr_walker for details. + */ +static bool +check_simple_rowfilter_expr(Node *node, ParseState *pstate) +{ + return check_simple_rowfilter_expr_walker(node, pstate); +} + +/* + * Transform the publication WHERE expression for all the relations in the list, + * ensuring it is coerced to boolean and necessary collation information is + * added if required, and add a new nsitem/RTE for the associated relation to + * the ParseState's namespace list. + * + * Also check the publication row filter expression and throw an error if + * anything not permitted or unexpected is encountered. + */ +static void +TransformPubWhereClauses(List *tables, const char *queryString, + bool pubviaroot) +{ + ListCell *lc; + + foreach(lc, tables) + { + ParseNamespaceItem *nsitem; + Node *whereclause = NULL; + ParseState *pstate; + PublicationRelInfo *pri = (PublicationRelInfo *) lfirst(lc); + + if (pri->whereClause == NULL) + continue; + + /* + * If the publication doesn't publish changes via the root partitioned + * table, the partition's row filter will be used. So disallow using + * WHERE clause on partitioned table in this case. + */ + if (!pubviaroot && + pri->relation->rd_rel->relkind == RELKIND_PARTITIONED_TABLE) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("cannot use publication WHERE clause for relation \"%s\"", + RelationGetRelationName(pri->relation)), + errdetail("WHERE clause cannot be used for a partitioned table when %s is false.", + "publish_via_partition_root"))); + + pstate = make_parsestate(NULL); + pstate->p_sourcetext = queryString; + + nsitem = addRangeTableEntryForRelation(pstate, pri->relation, + AccessShareLock, NULL, + false, false); + + addNSItemToQuery(pstate, nsitem, false, true, true); + + whereclause = transformWhereClause(pstate, + copyObject(pri->whereClause), + EXPR_KIND_WHERE, + "PUBLICATION WHERE"); + + /* Fix up collation information */ + assign_expr_collations(pstate, whereclause); + + /* + * We allow only simple expressions in row filters. See + * check_simple_rowfilter_expr_walker. + */ + check_simple_rowfilter_expr(whereclause, pstate); + + free_parsestate(pstate); + + pri->whereClause = whereclause; + } +} + +/* * Create new publication. */ ObjectAddress @@ -346,6 +720,10 @@ CreatePublication(ParseState *pstate, CreatePublicationStmt *stmt) rels = OpenTableList(relations); CheckObjSchemaNotAlreadyInPublication(rels, schemaidlist, PUBLICATIONOBJ_TABLE); + + TransformPubWhereClauses(rels, pstate->p_sourcetext, + publish_via_partition_root); + PublicationAddTables(puboid, rels, true, NULL); CloseTableList(rels); } @@ -392,6 +770,8 @@ AlterPublicationOptions(ParseState *pstate, AlterPublicationStmt *stmt, bool publish_via_partition_root; ObjectAddress obj; Form_pg_publication pubform; + List *root_relids = NIL; + ListCell *lc; parse_publication_options(pstate, stmt->options, @@ -399,6 +779,65 @@ AlterPublicationOptions(ParseState *pstate, AlterPublicationStmt *stmt, &publish_via_partition_root_given, &publish_via_partition_root); + pubform = (Form_pg_publication) GETSTRUCT(tup); + + /* + * If the publication doesn't publish changes via the root partitioned + * table, the partition's row filter will be used. So disallow using WHERE + * clause on partitioned table in this case. + */ + if (!pubform->puballtables && publish_via_partition_root_given && + !publish_via_partition_root) + { + /* + * Lock the publication so nobody else can do anything with it. This + * prevents concurrent alter to add partitioned table(s) with WHERE + * clause(s) which we don't allow when not publishing via root. + */ + LockDatabaseObject(PublicationRelationId, pubform->oid, 0, + AccessShareLock); + + root_relids = GetPublicationRelations(pubform->oid, + PUBLICATION_PART_ROOT); + + foreach(lc, root_relids) + { + HeapTuple rftuple; + Oid relid = lfirst_oid(lc); + + rftuple = SearchSysCache2(PUBLICATIONRELMAP, + ObjectIdGetDatum(relid), + ObjectIdGetDatum(pubform->oid)); + + if (HeapTupleIsValid(rftuple) && + !heap_attisnull(rftuple, Anum_pg_publication_rel_prqual, NULL)) + { + HeapTuple tuple; + + tuple = SearchSysCache1(RELOID, ObjectIdGetDatum(relid)); + if (HeapTupleIsValid(tuple)) + { + Form_pg_class relform = (Form_pg_class) GETSTRUCT(tuple); + + if (relform->relkind == RELKIND_PARTITIONED_TABLE) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("cannot set %s for publication \"%s\"", + "publish_via_partition_root = false", + stmt->pubname), + errdetail("The publication contains a WHERE clause for a partitioned table \"%s\" " + "which is not allowed when %s is false.", + NameStr(relform->relname), + "publish_via_partition_root"))); + + ReleaseSysCache(tuple); + } + + ReleaseSysCache(rftuple); + } + } + } + /* Everything ok, form a new tuple. */ memset(values, 0, sizeof(values)); memset(nulls, false, sizeof(nulls)); @@ -450,8 +889,21 @@ AlterPublicationOptions(ParseState *pstate, AlterPublicationStmt *stmt, * invalidate all partitions contained in the respective partition * trees, not just those explicitly mentioned in the publication. */ - relids = GetPublicationRelations(pubform->oid, - PUBLICATION_PART_ALL); + if (root_relids == NIL) + relids = GetPublicationRelations(pubform->oid, + PUBLICATION_PART_ALL); + else + { + /* + * We already got tables explicitly mentioned in the publication. + * Now get all partitions for the partitioned table in the list. + */ + foreach(lc, root_relids) + relids = GetPubPartitionOptionRelations(relids, + PUBLICATION_PART_ALL, + lfirst_oid(lc)); + } + schemarelids = GetAllSchemaPublicationRelations(pubform->oid, PUBLICATION_PART_ALL); relids = list_concat_unique_oid(relids, schemarelids); @@ -492,7 +944,8 @@ InvalidatePublicationRels(List *relids) */ static void AlterPublicationTables(AlterPublicationStmt *stmt, HeapTuple tup, - List *tables, List *schemaidlist) + List *tables, List *schemaidlist, + const char *queryString) { List *rels = NIL; Form_pg_publication pubform = (Form_pg_publication) GETSTRUCT(tup); @@ -519,6 +972,9 @@ AlterPublicationTables(AlterPublicationStmt *stmt, HeapTuple tup, schemas = list_concat_copy(schemaidlist, GetPublicationSchemas(pubid)); CheckObjSchemaNotAlreadyInPublication(rels, schemas, PUBLICATIONOBJ_TABLE); + + TransformPubWhereClauses(rels, queryString, pubform->pubviaroot); + PublicationAddTables(pubid, rels, false, stmt); } else if (stmt->action == AP_DropObjects) @@ -533,37 +989,76 @@ AlterPublicationTables(AlterPublicationStmt *stmt, HeapTuple tup, CheckObjSchemaNotAlreadyInPublication(rels, schemaidlist, PUBLICATIONOBJ_TABLE); - /* Calculate which relations to drop. */ + TransformPubWhereClauses(rels, queryString, pubform->pubviaroot); + + /* + * To recreate the relation list for the publication, look for + * existing relations that do not need to be dropped. + */ foreach(oldlc, oldrelids) { Oid oldrelid = lfirst_oid(oldlc); ListCell *newlc; + PublicationRelInfo *oldrel; bool found = false; + HeapTuple rftuple; + bool rfisnull = true; + Node *oldrelwhereclause = NULL; + + /* look up the cache for the old relmap */ + rftuple = SearchSysCache2(PUBLICATIONRELMAP, + ObjectIdGetDatum(oldrelid), + ObjectIdGetDatum(pubid)); + + if (HeapTupleIsValid(rftuple)) + { + Datum whereClauseDatum; + + whereClauseDatum = SysCacheGetAttr(PUBLICATIONRELMAP, rftuple, + Anum_pg_publication_rel_prqual, + &rfisnull); + if (!rfisnull) + oldrelwhereclause = stringToNode(TextDatumGetCString(whereClauseDatum)); + + ReleaseSysCache(rftuple); + } foreach(newlc, rels) { PublicationRelInfo *newpubrel; newpubrel = (PublicationRelInfo *) lfirst(newlc); + + /* + * Check if any of the new set of relations matches with the + * existing relations in the publication. Additionally, if the + * relation has an associated WHERE clause, check the WHERE + * expressions also match. Drop the rest. + */ if (RelationGetRelid(newpubrel->relation) == oldrelid) { - found = true; - break; + if (equal(oldrelwhereclause, newpubrel->whereClause)) + { + found = true; + break; + } } } - /* Not yet in the list, open it and add to the list */ - if (!found) - { - Relation oldrel; - PublicationRelInfo *pubrel; - /* Wrap relation into PublicationRelInfo */ - oldrel = table_open(oldrelid, ShareUpdateExclusiveLock); + if (oldrelwhereclause) + pfree(oldrelwhereclause); - pubrel = palloc(sizeof(PublicationRelInfo)); - pubrel->relation = oldrel; - - delrels = lappend(delrels, pubrel); + /* + * Add the non-matched relations to a list so that they can be + * dropped. + */ + if (!found) + { + oldrel = palloc(sizeof(PublicationRelInfo)); + oldrel->whereClause = NULL; + oldrel->relation = table_open(oldrelid, + ShareUpdateExclusiveLock); + delrels = lappend(delrels, oldrel); } } @@ -720,12 +1215,15 @@ AlterPublication(ParseState *pstate, AlterPublicationStmt *stmt) { List *relations = NIL; List *schemaidlist = NIL; + Oid pubid = pubform->oid; ObjectsInPublicationToOids(stmt->pubobjects, pstate, &relations, &schemaidlist); CheckAlterPublication(stmt, tup, relations, schemaidlist); + heap_freetuple(tup); + /* * Lock the publication so nobody else can do anything with it. This * prevents concurrent alter to add table(s) that were already going @@ -734,22 +1232,24 @@ AlterPublication(ParseState *pstate, AlterPublicationStmt *stmt) * addition of schema(s) for which there is any corresponding table * being added by this command. */ - LockDatabaseObject(PublicationRelationId, pubform->oid, 0, + LockDatabaseObject(PublicationRelationId, pubid, 0, AccessExclusiveLock); /* * It is possible that by the time we acquire the lock on publication, * concurrent DDL has removed it. We can test this by checking the - * existence of publication. + * existence of publication. We get the tuple again to avoid the risk + * of any publication option getting changed. */ - if (!SearchSysCacheExists1(PUBLICATIONOID, - ObjectIdGetDatum(pubform->oid))) + tup = SearchSysCacheCopy1(PUBLICATIONOID, ObjectIdGetDatum(pubid)); + if (!HeapTupleIsValid(tup)) ereport(ERROR, errcode(ERRCODE_UNDEFINED_OBJECT), errmsg("publication \"%s\" does not exist", stmt->pubname)); - AlterPublicationTables(stmt, tup, relations, schemaidlist); + AlterPublicationTables(stmt, tup, relations, schemaidlist, + pstate->p_sourcetext); AlterPublicationSchemas(stmt, tup, schemaidlist); } @@ -901,6 +1401,7 @@ OpenTableList(List *tables) List *relids = NIL; List *rels = NIL; ListCell *lc; + List *relids_with_rf = NIL; /* * Open, share-lock, and check all the explicitly-specified relations @@ -928,15 +1429,26 @@ OpenTableList(List *tables) */ if (list_member_oid(relids, myrelid)) { + /* Disallow duplicate tables if there are any with row filters. */ + if (t->whereClause || list_member_oid(relids_with_rf, myrelid)) + ereport(ERROR, + (errcode(ERRCODE_DUPLICATE_OBJECT), + errmsg("conflicting or redundant WHERE clauses for table \"%s\"", + RelationGetRelationName(rel)))); + table_close(rel, ShareUpdateExclusiveLock); continue; } pub_rel = palloc(sizeof(PublicationRelInfo)); pub_rel->relation = rel; + pub_rel->whereClause = t->whereClause; rels = lappend(rels, pub_rel); relids = lappend_oid(relids, myrelid); + if (t->whereClause) + relids_with_rf = lappend_oid(relids_with_rf, myrelid); + /* * Add children of this rel, if requested, so that they too are added * to the publication. A partitioned table can't have any inheritance @@ -963,19 +1475,39 @@ OpenTableList(List *tables) * tables. */ if (list_member_oid(relids, childrelid)) + { + /* + * We don't allow to specify row filter for both parent + * and child table at the same time as it is not very + * clear which one should be given preference. + */ + if (childrelid != myrelid && + (t->whereClause || list_member_oid(relids_with_rf, childrelid))) + ereport(ERROR, + (errcode(ERRCODE_DUPLICATE_OBJECT), + errmsg("conflicting or redundant WHERE clauses for table \"%s\"", + RelationGetRelationName(rel)))); + continue; + } /* find_all_inheritors already got lock */ rel = table_open(childrelid, NoLock); pub_rel = palloc(sizeof(PublicationRelInfo)); pub_rel->relation = rel; + /* child inherits WHERE clause from parent */ + pub_rel->whereClause = t->whereClause; rels = lappend(rels, pub_rel); relids = lappend_oid(relids, childrelid); + + if (t->whereClause) + relids_with_rf = lappend_oid(relids_with_rf, childrelid); } } } list_free(relids); + list_free(relids_with_rf); return rels; } @@ -995,6 +1527,8 @@ CloseTableList(List *rels) pub_rel = (PublicationRelInfo *) lfirst(lc); table_close(pub_rel->relation, NoLock); } + + list_free_deep(rels); } /* @@ -1090,6 +1624,11 @@ PublicationDropTables(Oid pubid, List *rels, bool missing_ok) RelationGetRelationName(rel)))); } + if (pubrel->whereClause) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("cannot use a WHERE clause when removing a table from a publication"))); + ObjectAddressSet(obj, PublicationRelRelationId, prid); performDeletion(&obj, DROP_CASCADE, 0); } diff --git a/src/backend/executor/execReplication.c b/src/backend/executor/execReplication.c index 313c87398b2..de106d767d1 100644 --- a/src/backend/executor/execReplication.c +++ b/src/backend/executor/execReplication.c @@ -567,15 +567,43 @@ ExecSimpleRelationDelete(ResultRelInfo *resultRelInfo, void CheckCmdReplicaIdentity(Relation rel, CmdType cmd) { - PublicationActions *pubactions; + PublicationDesc pubdesc; /* We only need to do checks for UPDATE and DELETE. */ if (cmd != CMD_UPDATE && cmd != CMD_DELETE) return; + if (rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL) + return; + + /* + * It is only safe to execute UPDATE/DELETE when all columns, referenced + * in the row filters from publications which the relation is in, are + * valid - i.e. when all referenced columns are part of REPLICA IDENTITY + * or the table does not publish UPDATEs or DELETEs. + * + * XXX We could optimize it by first checking whether any of the + * publications have a row filter for this relation. If not and relation + * has replica identity then we can avoid building the descriptor but as + * this happens only one time it doesn't seem worth the additional + * complexity. + */ + RelationBuildPublicationDesc(rel, &pubdesc); + if (cmd == CMD_UPDATE && !pubdesc.rf_valid_for_update) + ereport(ERROR, + (errcode(ERRCODE_INVALID_COLUMN_REFERENCE), + errmsg("cannot update table \"%s\"", + RelationGetRelationName(rel)), + errdetail("Column used in the publication WHERE expression is not part of the replica identity."))); + else if (cmd == CMD_DELETE && !pubdesc.rf_valid_for_delete) + ereport(ERROR, + (errcode(ERRCODE_INVALID_COLUMN_REFERENCE), + errmsg("cannot delete from table \"%s\"", + RelationGetRelationName(rel)), + errdetail("Column used in the publication WHERE expression is not part of the replica identity."))); + /* If relation has replica identity we are always good. */ - if (rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL || - OidIsValid(RelationGetReplicaIndex(rel))) + if (OidIsValid(RelationGetReplicaIndex(rel))) return; /* @@ -583,14 +611,13 @@ CheckCmdReplicaIdentity(Relation rel, CmdType cmd) * * Check if the table publishes UPDATES or DELETES. */ - pubactions = GetRelationPublicationActions(rel); - if (cmd == CMD_UPDATE && pubactions->pubupdate) + if (cmd == CMD_UPDATE && pubdesc.pubactions.pubupdate) ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), errmsg("cannot update table \"%s\" because it does not have a replica identity and publishes updates", RelationGetRelationName(rel)), errhint("To enable updating the table, set REPLICA IDENTITY using ALTER TABLE."))); - else if (cmd == CMD_DELETE && pubactions->pubdelete) + else if (cmd == CMD_DELETE && pubdesc.pubactions.pubdelete) ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), errmsg("cannot delete from table \"%s\" because it does not have a replica identity and publishes deletes", diff --git a/src/backend/nodes/copyfuncs.c b/src/backend/nodes/copyfuncs.c index bc0d90b4b1b..d4f8455a2bd 100644 --- a/src/backend/nodes/copyfuncs.c +++ b/src/backend/nodes/copyfuncs.c @@ -4849,6 +4849,7 @@ _copyPublicationTable(const PublicationTable *from) PublicationTable *newnode = makeNode(PublicationTable); COPY_NODE_FIELD(relation); + COPY_NODE_FIELD(whereClause); return newnode; } diff --git a/src/backend/nodes/equalfuncs.c b/src/backend/nodes/equalfuncs.c index 2e7122ad2f6..f1002afe7a0 100644 --- a/src/backend/nodes/equalfuncs.c +++ b/src/backend/nodes/equalfuncs.c @@ -2321,6 +2321,7 @@ static bool _equalPublicationTable(const PublicationTable *a, const PublicationTable *b) { COMPARE_NODE_FIELD(relation); + COMPARE_NODE_FIELD(whereClause); return true; } diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y index 92f93cfc72d..a03b33b53bd 100644 --- a/src/backend/parser/gram.y +++ b/src/backend/parser/gram.y @@ -9751,12 +9751,13 @@ CreatePublicationStmt: * relation_expr here. */ PublicationObjSpec: - TABLE relation_expr + TABLE relation_expr OptWhereClause { $$ = makeNode(PublicationObjSpec); $$->pubobjtype = PUBLICATIONOBJ_TABLE; $$->pubtable = makeNode(PublicationTable); $$->pubtable->relation = $2; + $$->pubtable->whereClause = $3; } | ALL TABLES IN_P SCHEMA ColId { @@ -9771,28 +9772,45 @@ PublicationObjSpec: $$->pubobjtype = PUBLICATIONOBJ_TABLES_IN_CUR_SCHEMA; $$->location = @5; } - | ColId + | ColId OptWhereClause { $$ = makeNode(PublicationObjSpec); $$->pubobjtype = PUBLICATIONOBJ_CONTINUATION; - $$->name = $1; + if ($2) + { + /* + * The OptWhereClause must be stored here but it is + * valid only for tables. For non-table objects, an + * error will be thrown later via + * preprocess_pubobj_list(). + */ + $$->pubtable = makeNode(PublicationTable); + $$->pubtable->relation = makeRangeVar(NULL, $1, @1); + $$->pubtable->whereClause = $2; + } + else + { + $$->name = $1; + } $$->location = @1; } - | ColId indirection + | ColId indirection OptWhereClause { $$ = makeNode(PublicationObjSpec); $$->pubobjtype = PUBLICATIONOBJ_CONTINUATION; $$->pubtable = makeNode(PublicationTable); $$->pubtable->relation = makeRangeVarFromQualifiedName($1, $2, @1, yyscanner); + $$->pubtable->whereClause = $3; $$->location = @1; } /* grammar like tablename * , ONLY tablename, ONLY ( tablename ) */ - | extended_relation_expr + | extended_relation_expr OptWhereClause { $$ = makeNode(PublicationObjSpec); $$->pubobjtype = PUBLICATIONOBJ_CONTINUATION; $$->pubtable = makeNode(PublicationTable); $$->pubtable->relation = $1; + $$->pubtable->whereClause = $2; } | CURRENT_SCHEMA { @@ -17448,7 +17466,8 @@ preprocess_pubobj_list(List *pubobjspec_list, core_yyscan_t yyscanner) errcode(ERRCODE_SYNTAX_ERROR), errmsg("invalid table name at or near"), parser_errposition(pubobj->location)); - else if (pubobj->name) + + if (pubobj->name) { /* convert it to PublicationTable */ PublicationTable *pubtable = makeNode(PublicationTable); @@ -17462,6 +17481,13 @@ preprocess_pubobj_list(List *pubobjspec_list, core_yyscan_t yyscanner) else if (pubobj->pubobjtype == PUBLICATIONOBJ_TABLES_IN_SCHEMA || pubobj->pubobjtype == PUBLICATIONOBJ_TABLES_IN_CUR_SCHEMA) { + /* WHERE clause is not allowed on a schema object */ + if (pubobj->pubtable && pubobj->pubtable->whereClause) + ereport(ERROR, + errcode(ERRCODE_SYNTAX_ERROR), + errmsg("WHERE clause not allowed for schema"), + parser_errposition(pubobj->location)); + /* * We can distinguish between the different type of schema * objects based on whether name and pubtable is set. diff --git a/src/backend/replication/logical/proto.c b/src/backend/replication/logical/proto.c index 953942692ce..c9b0eeefd7e 100644 --- a/src/backend/replication/logical/proto.c +++ b/src/backend/replication/logical/proto.c @@ -31,8 +31,8 @@ static void logicalrep_write_attrs(StringInfo out, Relation rel); static void logicalrep_write_tuple(StringInfo out, Relation rel, - HeapTuple tuple, bool binary); - + TupleTableSlot *slot, + bool binary); static void logicalrep_read_attrs(StringInfo in, LogicalRepRelation *rel); static void logicalrep_read_tuple(StringInfo in, LogicalRepTupleData *tuple); @@ -398,7 +398,7 @@ logicalrep_read_origin(StringInfo in, XLogRecPtr *origin_lsn) */ void logicalrep_write_insert(StringInfo out, TransactionId xid, Relation rel, - HeapTuple newtuple, bool binary) + TupleTableSlot *newslot, bool binary) { pq_sendbyte(out, LOGICAL_REP_MSG_INSERT); @@ -410,7 +410,7 @@ logicalrep_write_insert(StringInfo out, TransactionId xid, Relation rel, pq_sendint32(out, RelationGetRelid(rel)); pq_sendbyte(out, 'N'); /* new tuple follows */ - logicalrep_write_tuple(out, rel, newtuple, binary); + logicalrep_write_tuple(out, rel, newslot, binary); } /* @@ -442,7 +442,8 @@ logicalrep_read_insert(StringInfo in, LogicalRepTupleData *newtup) */ void logicalrep_write_update(StringInfo out, TransactionId xid, Relation rel, - HeapTuple oldtuple, HeapTuple newtuple, bool binary) + TupleTableSlot *oldslot, TupleTableSlot *newslot, + bool binary) { pq_sendbyte(out, LOGICAL_REP_MSG_UPDATE); @@ -457,17 +458,17 @@ logicalrep_write_update(StringInfo out, TransactionId xid, Relation rel, /* use Oid as relation identifier */ pq_sendint32(out, RelationGetRelid(rel)); - if (oldtuple != NULL) + if (oldslot != NULL) { if (rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL) pq_sendbyte(out, 'O'); /* old tuple follows */ else pq_sendbyte(out, 'K'); /* old key follows */ - logicalrep_write_tuple(out, rel, oldtuple, binary); + logicalrep_write_tuple(out, rel, oldslot, binary); } pq_sendbyte(out, 'N'); /* new tuple follows */ - logicalrep_write_tuple(out, rel, newtuple, binary); + logicalrep_write_tuple(out, rel, newslot, binary); } /* @@ -516,7 +517,7 @@ logicalrep_read_update(StringInfo in, bool *has_oldtuple, */ void logicalrep_write_delete(StringInfo out, TransactionId xid, Relation rel, - HeapTuple oldtuple, bool binary) + TupleTableSlot *oldslot, bool binary) { Assert(rel->rd_rel->relreplident == REPLICA_IDENTITY_DEFAULT || rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL || @@ -536,7 +537,7 @@ logicalrep_write_delete(StringInfo out, TransactionId xid, Relation rel, else pq_sendbyte(out, 'K'); /* old key follows */ - logicalrep_write_tuple(out, rel, oldtuple, binary); + logicalrep_write_tuple(out, rel, oldslot, binary); } /* @@ -749,11 +750,12 @@ logicalrep_read_typ(StringInfo in, LogicalRepTyp *ltyp) * Write a tuple to the outputstream, in the most efficient format possible. */ static void -logicalrep_write_tuple(StringInfo out, Relation rel, HeapTuple tuple, bool binary) +logicalrep_write_tuple(StringInfo out, Relation rel, TupleTableSlot *slot, + bool binary) { TupleDesc desc; - Datum values[MaxTupleAttributeNumber]; - bool isnull[MaxTupleAttributeNumber]; + Datum *values; + bool *isnull; int i; uint16 nliveatts = 0; @@ -767,11 +769,9 @@ logicalrep_write_tuple(StringInfo out, Relation rel, HeapTuple tuple, bool binar } pq_sendint16(out, nliveatts); - /* try to allocate enough memory from the get-go */ - enlargeStringInfo(out, tuple->t_len + - nliveatts * (1 + 4)); - - heap_deform_tuple(tuple, desc, values, isnull); + slot_getallattrs(slot); + values = slot->tts_values; + isnull = slot->tts_isnull; /* Write the values */ for (i = 0; i < desc->natts; i++) diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c index e596b69d466..1659964571c 100644 --- a/src/backend/replication/logical/tablesync.c +++ b/src/backend/replication/logical/tablesync.c @@ -690,19 +690,23 @@ copy_read_data(void *outbuf, int minread, int maxread) /* * Get information about remote relation in similar fashion the RELATION - * message provides during replication. + * message provides during replication. This function also returns the relation + * qualifications to be used in the COPY command. */ static void fetch_remote_table_info(char *nspname, char *relname, - LogicalRepRelation *lrel) + LogicalRepRelation *lrel, List **qual) { WalRcvExecResult *res; StringInfoData cmd; TupleTableSlot *slot; Oid tableRow[] = {OIDOID, CHAROID, CHAROID}; Oid attrRow[] = {TEXTOID, OIDOID, BOOLOID}; + Oid qualRow[] = {TEXTOID}; bool isnull; int natt; + ListCell *lc; + bool first; lrel->nspname = nspname; lrel->relname = relname; @@ -798,6 +802,98 @@ fetch_remote_table_info(char *nspname, char *relname, lrel->natts = natt; walrcv_clear_result(res); + + /* + * Get relation's row filter expressions. DISTINCT avoids the same + * expression of a table in multiple publications from being included + * multiple times in the final expression. + * + * We need to copy the row even if it matches just one of the + * publications, so we later combine all the quals with OR. + * + * For initial synchronization, row filtering can be ignored in following + * cases: + * + * 1) one of the subscribed publications for the table hasn't specified + * any row filter + * + * 2) one of the subscribed publications has puballtables set to true + * + * 3) one of the subscribed publications is declared as ALL TABLES IN + * SCHEMA that includes this relation + */ + if (walrcv_server_version(LogRepWorkerWalRcvConn) >= 150000) + { + StringInfoData pub_names; + + /* Build the pubname list. */ + initStringInfo(&pub_names); + first = true; + foreach(lc, MySubscription->publications) + { + char *pubname = strVal(lfirst(lc)); + + if (first) + first = false; + else + appendStringInfoString(&pub_names, ", "); + + appendStringInfoString(&pub_names, quote_literal_cstr(pubname)); + } + + /* Check for row filters. */ + resetStringInfo(&cmd); + appendStringInfo(&cmd, + "SELECT DISTINCT pg_get_expr(pr.prqual, pr.prrelid)" + " FROM pg_publication p" + " LEFT OUTER JOIN pg_publication_rel pr" + " ON (p.oid = pr.prpubid AND pr.prrelid = %u)," + " LATERAL pg_get_publication_tables(p.pubname) gpt" + " WHERE gpt.relid = %u" + " AND p.pubname IN ( %s )", + lrel->remoteid, + lrel->remoteid, + pub_names.data); + + res = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data, 1, qualRow); + + if (res->status != WALRCV_OK_TUPLES) + ereport(ERROR, + (errmsg("could not fetch table WHERE clause info for table \"%s.%s\" from publisher: %s", + nspname, relname, res->err))); + + /* + * Multiple row filter expressions for the same table will be combined + * by COPY using OR. If any of the filter expressions for this table + * are null, it means the whole table will be copied. In this case it + * is not necessary to construct a unified row filter expression at + * all. + */ + slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple); + while (tuplestore_gettupleslot(res->tuplestore, true, false, slot)) + { + Datum rf = slot_getattr(slot, 1, &isnull); + + if (!isnull) + *qual = lappend(*qual, makeString(TextDatumGetCString(rf))); + else + { + /* Ignore filters and cleanup as necessary. */ + if (*qual) + { + list_free_deep(*qual); + *qual = NIL; + } + break; + } + + ExecClearTuple(slot); + } + ExecDropSingleTupleTableSlot(slot); + + walrcv_clear_result(res); + } + pfree(cmd.data); } @@ -811,6 +907,7 @@ copy_table(Relation rel) { LogicalRepRelMapEntry *relmapentry; LogicalRepRelation lrel; + List *qual = NIL; WalRcvExecResult *res; StringInfoData cmd; CopyFromState cstate; @@ -819,7 +916,7 @@ copy_table(Relation rel) /* Get the publisher relation info. */ fetch_remote_table_info(get_namespace_name(RelationGetNamespace(rel)), - RelationGetRelationName(rel), &lrel); + RelationGetRelationName(rel), &lrel, &qual); /* Put the relation into relmap. */ logicalrep_relmap_update(&lrel); @@ -830,14 +927,18 @@ copy_table(Relation rel) /* Start copy on the publisher. */ initStringInfo(&cmd); - if (lrel.relkind == RELKIND_RELATION) + + /* Regular table with no row filter */ + if (lrel.relkind == RELKIND_RELATION && qual == NIL) appendStringInfo(&cmd, "COPY %s TO STDOUT", quote_qualified_identifier(lrel.nspname, lrel.relname)); else { /* - * For non-tables, we need to do COPY (SELECT ...), but we can't just - * do SELECT * because we need to not copy generated columns. + * For non-tables and tables with row filters, we need to do COPY + * (SELECT ...), but we can't just do SELECT * because we need to not + * copy generated columns. For tables with any row filters, build a + * SELECT query with OR'ed row filters for COPY. */ appendStringInfoString(&cmd, "COPY (SELECT "); for (int i = 0; i < lrel.natts; i++) @@ -846,8 +947,33 @@ copy_table(Relation rel) if (i < lrel.natts - 1) appendStringInfoString(&cmd, ", "); } - appendStringInfo(&cmd, " FROM %s) TO STDOUT", - quote_qualified_identifier(lrel.nspname, lrel.relname)); + + appendStringInfoString(&cmd, " FROM "); + + /* + * For regular tables, make sure we don't copy data from a child that + * inherits the named table as those will be copied separately. + */ + if (lrel.relkind == RELKIND_RELATION) + appendStringInfoString(&cmd, "ONLY "); + + appendStringInfoString(&cmd, quote_qualified_identifier(lrel.nspname, lrel.relname)); + /* list of OR'ed filters */ + if (qual != NIL) + { + ListCell *lc; + char *q = strVal(linitial(qual)); + + appendStringInfo(&cmd, " WHERE %s", q); + for_each_from(lc, qual, 1) + { + q = strVal(lfirst(lc)); + appendStringInfo(&cmd, " OR %s", q); + } + list_free_deep(qual); + } + + appendStringInfoString(&cmd, ") TO STDOUT"); } res = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data, 0, NULL); pfree(cmd.data); diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c index 4162bb8de7b..ea57a0477f0 100644 --- a/src/backend/replication/pgoutput/pgoutput.c +++ b/src/backend/replication/pgoutput/pgoutput.c @@ -15,12 +15,17 @@ #include "access/tupconvert.h" #include "catalog/partition.h" #include "catalog/pg_publication.h" +#include "catalog/pg_publication_rel.h" #include "commands/defrem.h" +#include "executor/executor.h" #include "fmgr.h" +#include "nodes/makefuncs.h" +#include "optimizer/optimizer.h" #include "replication/logical.h" #include "replication/logicalproto.h" #include "replication/origin.h" #include "replication/pgoutput.h" +#include "utils/builtins.h" #include "utils/inval.h" #include "utils/lsyscache.h" #include "utils/memutils.h" @@ -86,6 +91,19 @@ static void send_repl_origin(LogicalDecodingContext *ctx, bool send_origin); /* + * Only 3 publication actions are used for row filtering ("insert", "update", + * "delete"). See RelationSyncEntry.exprstate[]. + */ +enum RowFilterPubAction +{ + PUBACTION_INSERT, + PUBACTION_UPDATE, + PUBACTION_DELETE +}; + +#define NUM_ROWFILTER_PUBACTIONS (PUBACTION_DELETE+1) + +/* * Entry in the map used to remember which relation schemas we sent. * * The schema_sent flag determines if the current schema record for the @@ -117,6 +135,21 @@ typedef struct RelationSyncEntry PublicationActions pubactions; /* + * ExprState array for row filter. Different publication actions don't + * allow multiple expressions to always be combined into one, because + * updates or deletes restrict the column in expression to be part of the + * replica identity index whereas inserts do not have this restriction, so + * there is one ExprState per publication action. + */ + ExprState *exprstate[NUM_ROWFILTER_PUBACTIONS]; + EState *estate; /* executor state used for row filter */ + MemoryContext cache_expr_cxt; /* private context for exprstate and + * estate, if any */ + + TupleTableSlot *new_slot; /* slot for storing new tuple */ + TupleTableSlot *old_slot; /* slot for storing old tuple */ + + /* * OID of the relation to publish changes as. For a partition, this may * be set to one of its ancestors whose schema will be used when * replicating changes, if publish_via_partition_root is set for the @@ -130,7 +163,7 @@ typedef struct RelationSyncEntry * same as 'relid' or if unnecessary due to partition and the ancestor * having identical TupleDesc. */ - TupleConversionMap *map; + AttrMap *attrmap; } RelationSyncEntry; /* Map used to remember which relation schemas we sent. */ @@ -138,7 +171,8 @@ static HTAB *RelationSyncCache = NULL; static void init_rel_sync_cache(MemoryContext decoding_context); static void cleanup_rel_sync_cache(TransactionId xid, bool is_commit); -static RelationSyncEntry *get_rel_sync_entry(PGOutputData *data, Oid relid); +static RelationSyncEntry *get_rel_sync_entry(PGOutputData *data, + Relation relation); static void rel_sync_cache_relation_cb(Datum arg, Oid relid); static void rel_sync_cache_publication_cb(Datum arg, int cacheid, uint32 hashvalue); @@ -146,6 +180,20 @@ static void set_schema_sent_in_streamed_txn(RelationSyncEntry *entry, TransactionId xid); static bool get_schema_sent_in_streamed_txn(RelationSyncEntry *entry, TransactionId xid); +static void init_tuple_slot(PGOutputData *data, Relation relation, + RelationSyncEntry *entry); + +/* row filter routines */ +static EState *create_estate_for_relation(Relation rel); +static void pgoutput_row_filter_init(PGOutputData *data, + List *publications, + RelationSyncEntry *entry); +static bool pgoutput_row_filter_exec_expr(ExprState *state, + ExprContext *econtext); +static bool pgoutput_row_filter(Relation relation, TupleTableSlot *old_slot, + TupleTableSlot **new_slot_ptr, + RelationSyncEntry *entry, + ReorderBufferChangeType *action); /* * Specify output plugin callbacks @@ -303,6 +351,10 @@ pgoutput_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt, "logical replication output context", ALLOCSET_DEFAULT_SIZES); + data->cachectx = AllocSetContextCreate(ctx->context, + "logical replication cache context", + ALLOCSET_DEFAULT_SIZES); + ctx->output_plugin_private = data; /* This plugin uses binary protocol. */ @@ -543,37 +595,14 @@ maybe_send_schema(LogicalDecodingContext *ctx, return; /* - * Nope, so send the schema. If the changes will be published using an - * ancestor's schema, not the relation's own, send that ancestor's schema - * before sending relation's own (XXX - maybe sending only the former - * suffices?). This is also a good place to set the map that will be used - * to convert the relation's tuples into the ancestor's format, if needed. + * Send the schema. If the changes will be published using an ancestor's + * schema, not the relation's own, send that ancestor's schema before + * sending relation's own (XXX - maybe sending only the former suffices?). */ if (relentry->publish_as_relid != RelationGetRelid(relation)) { Relation ancestor = RelationIdGetRelation(relentry->publish_as_relid); - TupleDesc indesc = RelationGetDescr(relation); - TupleDesc outdesc = RelationGetDescr(ancestor); - MemoryContext oldctx; - - /* Map must live as long as the session does. */ - oldctx = MemoryContextSwitchTo(CacheMemoryContext); - /* - * Make copies of the TupleDescs that will live as long as the map - * does before putting into the map. - */ - indesc = CreateTupleDescCopy(indesc); - outdesc = CreateTupleDescCopy(outdesc); - relentry->map = convert_tuples_by_name(indesc, outdesc); - if (relentry->map == NULL) - { - /* Map not necessary, so free the TupleDescs too. */ - FreeTupleDesc(indesc); - FreeTupleDesc(outdesc); - } - - MemoryContextSwitchTo(oldctx); send_relation_and_attrs(ancestor, xid, ctx); RelationClose(ancestor); } @@ -625,6 +654,484 @@ send_relation_and_attrs(Relation relation, TransactionId xid, } /* + * Executor state preparation for evaluation of row filter expressions for the + * specified relation. + */ +static EState * +create_estate_for_relation(Relation rel) +{ + EState *estate; + RangeTblEntry *rte; + + estate = CreateExecutorState(); + + rte = makeNode(RangeTblEntry); + rte->rtekind = RTE_RELATION; + rte->relid = RelationGetRelid(rel); + rte->relkind = rel->rd_rel->relkind; + rte->rellockmode = AccessShareLock; + ExecInitRangeTable(estate, list_make1(rte)); + + estate->es_output_cid = GetCurrentCommandId(false); + + return estate; +} + +/* + * Evaluates row filter. + * + * If the row filter evaluates to NULL, it is taken as false i.e. the change + * isn't replicated. + */ +static bool +pgoutput_row_filter_exec_expr(ExprState *state, ExprContext *econtext) +{ + Datum ret; + bool isnull; + + Assert(state != NULL); + + ret = ExecEvalExprSwitchContext(state, econtext, &isnull); + + elog(DEBUG3, "row filter evaluates to %s (isnull: %s)", + isnull ? "false" : DatumGetBool(ret) ? "true" : "false", + isnull ? "true" : "false"); + + if (isnull) + return false; + + return DatumGetBool(ret); +} + +/* + * Initialize the row filter. + */ +static void +pgoutput_row_filter_init(PGOutputData *data, List *publications, + RelationSyncEntry *entry) +{ + ListCell *lc; + List *rfnodes[] = {NIL, NIL, NIL}; /* One per pubaction */ + bool no_filter[] = {false, false, false}; /* One per pubaction */ + MemoryContext oldctx; + int idx; + bool has_filter = true; + + /* + * Find if there are any row filters for this relation. If there are, then + * prepare the necessary ExprState and cache it in entry->exprstate. To + * build an expression state, we need to ensure the following: + * + * All the given publication-table mappings must be checked. + * + * Multiple publications might have multiple row filters for this + * relation. Since row filter usage depends on the DML operation, there + * are multiple lists (one for each operation) to which row filters will + * be appended. + * + * FOR ALL TABLES implies "don't use row filter expression" so it takes + * precedence. + */ + foreach(lc, publications) + { + Publication *pub = lfirst(lc); + HeapTuple rftuple = NULL; + Datum rfdatum = 0; + bool pub_no_filter = false; + + if (pub->alltables) + { + /* + * If the publication is FOR ALL TABLES then it is treated the + * same as if this table has no row filters (even if for other + * publications it does). + */ + pub_no_filter = true; + } + else + { + /* + * Check for the presence of a row filter in this publication. + */ + rftuple = SearchSysCache2(PUBLICATIONRELMAP, + ObjectIdGetDatum(entry->publish_as_relid), + ObjectIdGetDatum(pub->oid)); + + if (HeapTupleIsValid(rftuple)) + { + /* Null indicates no filter. */ + rfdatum = SysCacheGetAttr(PUBLICATIONRELMAP, rftuple, + Anum_pg_publication_rel_prqual, + &pub_no_filter); + } + else + { + pub_no_filter = true; + } + } + + if (pub_no_filter) + { + if (rftuple) + ReleaseSysCache(rftuple); + + no_filter[PUBACTION_INSERT] |= pub->pubactions.pubinsert; + no_filter[PUBACTION_UPDATE] |= pub->pubactions.pubupdate; + no_filter[PUBACTION_DELETE] |= pub->pubactions.pubdelete; + + /* + * Quick exit if all the DML actions are publicized via this + * publication. + */ + if (no_filter[PUBACTION_INSERT] && + no_filter[PUBACTION_UPDATE] && + no_filter[PUBACTION_DELETE]) + { + has_filter = false; + break; + } + + /* No additional work for this publication. Next one. */ + continue; + } + + /* Form the per pubaction row filter lists. */ + if (pub->pubactions.pubinsert && !no_filter[PUBACTION_INSERT]) + rfnodes[PUBACTION_INSERT] = lappend(rfnodes[PUBACTION_INSERT], + TextDatumGetCString(rfdatum)); + if (pub->pubactions.pubupdate && !no_filter[PUBACTION_UPDATE]) + rfnodes[PUBACTION_UPDATE] = lappend(rfnodes[PUBACTION_UPDATE], + TextDatumGetCString(rfdatum)); + if (pub->pubactions.pubdelete && !no_filter[PUBACTION_DELETE]) + rfnodes[PUBACTION_DELETE] = lappend(rfnodes[PUBACTION_DELETE], + TextDatumGetCString(rfdatum)); + + ReleaseSysCache(rftuple); + } /* loop all subscribed publications */ + + /* Clean the row filter */ + for (idx = 0; idx < NUM_ROWFILTER_PUBACTIONS; idx++) + { + if (no_filter[idx]) + { + list_free_deep(rfnodes[idx]); + rfnodes[idx] = NIL; + } + } + + if (has_filter) + { + Relation relation = RelationIdGetRelation(entry->publish_as_relid); + + Assert(entry->cache_expr_cxt == NULL); + + /* Create the memory context for row filters */ + entry->cache_expr_cxt = AllocSetContextCreate(data->cachectx, + "Row filter expressions", + ALLOCSET_DEFAULT_SIZES); + + MemoryContextCopyAndSetIdentifier(entry->cache_expr_cxt, + RelationGetRelationName(relation)); + + /* + * Now all the filters for all pubactions are known. Combine them when + * their pubactions are the same. + */ + oldctx = MemoryContextSwitchTo(entry->cache_expr_cxt); + entry->estate = create_estate_for_relation(relation); + for (idx = 0; idx < NUM_ROWFILTER_PUBACTIONS; idx++) + { + List *filters = NIL; + Expr *rfnode; + + if (rfnodes[idx] == NIL) + continue; + + foreach(lc, rfnodes[idx]) + filters = lappend(filters, stringToNode((char *) lfirst(lc))); + + /* combine the row filter and cache the ExprState */ + rfnode = make_orclause(filters); + entry->exprstate[idx] = ExecPrepareExpr(rfnode, entry->estate); + } /* for each pubaction */ + MemoryContextSwitchTo(oldctx); + + RelationClose(relation); + } +} + +/* + * Initialize the slot for storing new and old tuples, and build the map that + * will be used to convert the relation's tuples into the ancestor's format. + */ +static void +init_tuple_slot(PGOutputData *data, Relation relation, + RelationSyncEntry *entry) +{ + MemoryContext oldctx; + TupleDesc oldtupdesc; + TupleDesc newtupdesc; + + oldctx = MemoryContextSwitchTo(data->cachectx); + + /* + * Create tuple table slots. Create a copy of the TupleDesc as it needs to + * live as long as the cache remains. + */ + oldtupdesc = CreateTupleDescCopy(RelationGetDescr(relation)); + newtupdesc = CreateTupleDescCopy(RelationGetDescr(relation)); + + entry->old_slot = MakeSingleTupleTableSlot(oldtupdesc, &TTSOpsHeapTuple); + entry->new_slot = MakeSingleTupleTableSlot(newtupdesc, &TTSOpsHeapTuple); + + MemoryContextSwitchTo(oldctx); + + /* + * Cache the map that will be used to convert the relation's tuples into + * the ancestor's format, if needed. + */ + if (entry->publish_as_relid != RelationGetRelid(relation)) + { + Relation ancestor = RelationIdGetRelation(entry->publish_as_relid); + TupleDesc indesc = RelationGetDescr(relation); + TupleDesc outdesc = RelationGetDescr(ancestor); + + /* Map must live as long as the session does. */ + oldctx = MemoryContextSwitchTo(CacheMemoryContext); + + entry->attrmap = build_attrmap_by_name_if_req(indesc, outdesc); + + MemoryContextSwitchTo(oldctx); + RelationClose(ancestor); + } +} + +/* + * Change is checked against the row filter if any. + * + * Returns true if the change is to be replicated, else false. + * + * For inserts, evaluate the row filter for new tuple. + * For deletes, evaluate the row filter for old tuple. + * For updates, evaluate the row filter for old and new tuple. + * + * For updates, if both evaluations are true, we allow sending the UPDATE and + * if both the evaluations are false, it doesn't replicate the UPDATE. Now, if + * only one of the tuples matches the row filter expression, we transform + * UPDATE to DELETE or INSERT to avoid any data inconsistency based on the + * following rules: + * + * Case 1: old-row (no match) new-row (no match) -> (drop change) + * Case 2: old-row (no match) new row (match) -> INSERT + * Case 3: old-row (match) new-row (no match) -> DELETE + * Case 4: old-row (match) new row (match) -> UPDATE + * + * The new action is updated in the action parameter. + * + * The new slot could be updated when transforming the UPDATE into INSERT, + * because the original new tuple might not have column values from the replica + * identity. + * + * Examples: + * Let's say the old tuple satisfies the row filter but the new tuple doesn't. + * Since the old tuple satisfies, the initial table synchronization copied this + * row (or another method was used to guarantee that there is data + * consistency). However, after the UPDATE the new tuple doesn't satisfy the + * row filter, so from a data consistency perspective, that row should be + * removed on the subscriber. The UPDATE should be transformed into a DELETE + * statement and be sent to the subscriber. Keeping this row on the subscriber + * is undesirable because it doesn't reflect what was defined in the row filter + * expression on the publisher. This row on the subscriber would likely not be + * modified by replication again. If someone inserted a new row with the same + * old identifier, replication could stop due to a constraint violation. + * + * Let's say the old tuple doesn't match the row filter but the new tuple does. + * Since the old tuple doesn't satisfy, the initial table synchronization + * probably didn't copy this row. However, after the UPDATE the new tuple does + * satisfy the row filter, so from a data consistency perspective, that row + * should be inserted on the subscriber. Otherwise, subsequent UPDATE or DELETE + * statements have no effect (it matches no row -- see + * apply_handle_update_internal()). So, the UPDATE should be transformed into a + * INSERT statement and be sent to the subscriber. However, this might surprise + * someone who expects the data set to satisfy the row filter expression on the + * provider. + */ +static bool +pgoutput_row_filter(Relation relation, TupleTableSlot *old_slot, + TupleTableSlot **new_slot_ptr, RelationSyncEntry *entry, + ReorderBufferChangeType *action) +{ + TupleDesc desc; + int i; + bool old_matched, + new_matched, + result; + TupleTableSlot *tmp_new_slot; + TupleTableSlot *new_slot = *new_slot_ptr; + ExprContext *ecxt; + ExprState *filter_exprstate; + + /* + * We need this map to avoid relying on ReorderBufferChangeType enums + * having specific values. + */ + static const int map_changetype_pubaction[] = { + [REORDER_BUFFER_CHANGE_INSERT] = PUBACTION_INSERT, + [REORDER_BUFFER_CHANGE_UPDATE] = PUBACTION_UPDATE, + [REORDER_BUFFER_CHANGE_DELETE] = PUBACTION_DELETE + }; + + Assert(*action == REORDER_BUFFER_CHANGE_INSERT || + *action == REORDER_BUFFER_CHANGE_UPDATE || + *action == REORDER_BUFFER_CHANGE_DELETE); + + Assert(new_slot || old_slot); + + /* Get the corresponding row filter */ + filter_exprstate = entry->exprstate[map_changetype_pubaction[*action]]; + + /* Bail out if there is no row filter */ + if (!filter_exprstate) + return true; + + elog(DEBUG3, "table \"%s.%s\" has row filter", + get_namespace_name(RelationGetNamespace(relation)), + RelationGetRelationName(relation)); + + ResetPerTupleExprContext(entry->estate); + + ecxt = GetPerTupleExprContext(entry->estate); + + /* + * For the following occasions where there is only one tuple, we can + * evaluate the row filter for that tuple and return. + * + * For inserts, we only have the new tuple. + * + * For updates, we can have only a new tuple when none of the replica + * identity columns changed but we still need to evaluate the row filter + * for new tuple as the existing values of those columns might not match + * the filter. Also, users can use constant expressions in the row filter, + * so we anyway need to evaluate it for the new tuple. + * + * For deletes, we only have the old tuple. + */ + if (!new_slot || !old_slot) + { + ecxt->ecxt_scantuple = new_slot ? new_slot : old_slot; + result = pgoutput_row_filter_exec_expr(filter_exprstate, ecxt); + + return result; + } + + /* + * Both the old and new tuples must be valid only for updates and need to + * be checked against the row filter. + */ + Assert(map_changetype_pubaction[*action] == PUBACTION_UPDATE); + + slot_getallattrs(new_slot); + slot_getallattrs(old_slot); + + tmp_new_slot = NULL; + desc = RelationGetDescr(relation); + + /* + * The new tuple might not have all the replica identity columns, in which + * case it needs to be copied over from the old tuple. + */ + for (i = 0; i < desc->natts; i++) + { + Form_pg_attribute att = TupleDescAttr(desc, i); + + /* + * if the column in the new tuple or old tuple is null, nothing to do + */ + if (new_slot->tts_isnull[i] || old_slot->tts_isnull[i]) + continue; + + /* + * Unchanged toasted replica identity columns are only logged in the + * old tuple. Copy this over to the new tuple. The changed (or WAL + * Logged) toast values are always assembled in memory and set as + * VARTAG_INDIRECT. See ReorderBufferToastReplace. + */ + if (att->attlen == -1 && + VARATT_IS_EXTERNAL_ONDISK(new_slot->tts_values[i]) && + !VARATT_IS_EXTERNAL_ONDISK(old_slot->tts_values[i])) + { + if (!tmp_new_slot) + { + tmp_new_slot = MakeSingleTupleTableSlot(desc, &TTSOpsVirtual); + ExecClearTuple(tmp_new_slot); + + memcpy(tmp_new_slot->tts_values, new_slot->tts_values, + desc->natts * sizeof(Datum)); + memcpy(tmp_new_slot->tts_isnull, new_slot->tts_isnull, + desc->natts * sizeof(bool)); + } + + tmp_new_slot->tts_values[i] = old_slot->tts_values[i]; + tmp_new_slot->tts_isnull[i] = old_slot->tts_isnull[i]; + } + } + + ecxt->ecxt_scantuple = old_slot; + old_matched = pgoutput_row_filter_exec_expr(filter_exprstate, ecxt); + + if (tmp_new_slot) + { + ExecStoreVirtualTuple(tmp_new_slot); + ecxt->ecxt_scantuple = tmp_new_slot; + } + else + ecxt->ecxt_scantuple = new_slot; + + new_matched = pgoutput_row_filter_exec_expr(filter_exprstate, ecxt); + + /* + * Case 1: if both tuples don't match the row filter, bailout. Send + * nothing. + */ + if (!old_matched && !new_matched) + return false; + + /* + * Case 2: if the old tuple doesn't satisfy the row filter but the new + * tuple does, transform the UPDATE into INSERT. + * + * Use the newly transformed tuple that must contain the column values for + * all the replica identity columns. This is required to ensure that the + * while inserting the tuple in the downstream node, we have all the + * required column values. + */ + if (!old_matched && new_matched) + { + *action = REORDER_BUFFER_CHANGE_INSERT; + + if (tmp_new_slot) + *new_slot_ptr = tmp_new_slot; + } + + /* + * Case 3: if the old tuple satisfies the row filter but the new tuple + * doesn't, transform the UPDATE into DELETE. + * + * This transformation does not require another tuple. The Old tuple will + * be used for DELETE. + */ + else if (old_matched && !new_matched) + *action = REORDER_BUFFER_CHANGE_DELETE; + + /* + * Case 4: if both tuples match the row filter, transformation isn't + * required. (*action is default UPDATE). + */ + + return true; +} + +/* * Sends the decoded DML over wire. * * This is called both in streaming and non-streaming modes. @@ -638,6 +1145,10 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, RelationSyncEntry *relentry; TransactionId xid = InvalidTransactionId; Relation ancestor = NULL; + Relation targetrel = relation; + ReorderBufferChangeType action = change->action; + TupleTableSlot *old_slot = NULL; + TupleTableSlot *new_slot = NULL; if (!is_publishable_relation(relation)) return; @@ -651,10 +1162,10 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, if (in_streaming) xid = change->txn->xid; - relentry = get_rel_sync_entry(data, RelationGetRelid(relation)); + relentry = get_rel_sync_entry(data, relation); /* First check the table filter */ - switch (change->action) + switch (action) { case REORDER_BUFFER_CHANGE_INSERT: if (!relentry->pubactions.pubinsert) @@ -675,80 +1186,149 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, /* Avoid leaking memory by using and resetting our own context */ old = MemoryContextSwitchTo(data->context); - maybe_send_schema(ctx, change, relation, relentry); - /* Send the data */ - switch (change->action) + switch (action) { case REORDER_BUFFER_CHANGE_INSERT: - { - HeapTuple tuple = &change->data.tp.newtuple->tuple; + new_slot = relentry->new_slot; + ExecStoreHeapTuple(&change->data.tp.newtuple->tuple, + new_slot, false); - /* Switch relation if publishing via root. */ - if (relentry->publish_as_relid != RelationGetRelid(relation)) + /* Switch relation if publishing via root. */ + if (relentry->publish_as_relid != RelationGetRelid(relation)) + { + Assert(relation->rd_rel->relispartition); + ancestor = RelationIdGetRelation(relentry->publish_as_relid); + targetrel = ancestor; + /* Convert tuple if needed. */ + if (relentry->attrmap) { - Assert(relation->rd_rel->relispartition); - ancestor = RelationIdGetRelation(relentry->publish_as_relid); - relation = ancestor; - /* Convert tuple if needed. */ - if (relentry->map) - tuple = execute_attr_map_tuple(tuple, relentry->map); + TupleDesc tupdesc = RelationGetDescr(targetrel); + + new_slot = execute_attr_map_slot(relentry->attrmap, + new_slot, + MakeTupleTableSlot(tupdesc, &TTSOpsVirtual)); } + } - OutputPluginPrepareWrite(ctx, true); - logicalrep_write_insert(ctx->out, xid, relation, tuple, - data->binary); - OutputPluginWrite(ctx, true); + /* Check row filter */ + if (!pgoutput_row_filter(targetrel, NULL, &new_slot, relentry, + &action)) break; - } + + /* + * Schema should be sent using the original relation because it + * also sends the ancestor's relation. + */ + maybe_send_schema(ctx, change, relation, relentry); + + OutputPluginPrepareWrite(ctx, true); + logicalrep_write_insert(ctx->out, xid, targetrel, new_slot, + data->binary); + OutputPluginWrite(ctx, true); + break; case REORDER_BUFFER_CHANGE_UPDATE: + if (change->data.tp.oldtuple) { - HeapTuple oldtuple = change->data.tp.oldtuple ? - &change->data.tp.oldtuple->tuple : NULL; - HeapTuple newtuple = &change->data.tp.newtuple->tuple; + old_slot = relentry->old_slot; + ExecStoreHeapTuple(&change->data.tp.oldtuple->tuple, + old_slot, false); + } - /* Switch relation if publishing via root. */ - if (relentry->publish_as_relid != RelationGetRelid(relation)) + new_slot = relentry->new_slot; + ExecStoreHeapTuple(&change->data.tp.newtuple->tuple, + new_slot, false); + + /* Switch relation if publishing via root. */ + if (relentry->publish_as_relid != RelationGetRelid(relation)) + { + Assert(relation->rd_rel->relispartition); + ancestor = RelationIdGetRelation(relentry->publish_as_relid); + targetrel = ancestor; + /* Convert tuples if needed. */ + if (relentry->attrmap) { - Assert(relation->rd_rel->relispartition); - ancestor = RelationIdGetRelation(relentry->publish_as_relid); - relation = ancestor; - /* Convert tuples if needed. */ - if (relentry->map) - { - if (oldtuple) - oldtuple = execute_attr_map_tuple(oldtuple, - relentry->map); - newtuple = execute_attr_map_tuple(newtuple, - relentry->map); - } + TupleDesc tupdesc = RelationGetDescr(targetrel); + + if (old_slot) + old_slot = execute_attr_map_slot(relentry->attrmap, + old_slot, + MakeTupleTableSlot(tupdesc, &TTSOpsVirtual)); + + new_slot = execute_attr_map_slot(relentry->attrmap, + new_slot, + MakeTupleTableSlot(tupdesc, &TTSOpsVirtual)); } + } - OutputPluginPrepareWrite(ctx, true); - logicalrep_write_update(ctx->out, xid, relation, oldtuple, - newtuple, data->binary); - OutputPluginWrite(ctx, true); + /* Check row filter */ + if (!pgoutput_row_filter(targetrel, old_slot, &new_slot, + relentry, &action)) break; + + maybe_send_schema(ctx, change, relation, relentry); + + OutputPluginPrepareWrite(ctx, true); + + /* + * Updates could be transformed to inserts or deletes based on the + * results of the row filter for old and new tuple. + */ + switch (action) + { + case REORDER_BUFFER_CHANGE_INSERT: + logicalrep_write_insert(ctx->out, xid, targetrel, + new_slot, data->binary); + break; + case REORDER_BUFFER_CHANGE_UPDATE: + logicalrep_write_update(ctx->out, xid, targetrel, + old_slot, new_slot, data->binary); + break; + case REORDER_BUFFER_CHANGE_DELETE: + logicalrep_write_delete(ctx->out, xid, targetrel, + old_slot, data->binary); + break; + default: + Assert(false); } + + OutputPluginWrite(ctx, true); + break; case REORDER_BUFFER_CHANGE_DELETE: if (change->data.tp.oldtuple) { - HeapTuple oldtuple = &change->data.tp.oldtuple->tuple; + old_slot = relentry->old_slot; + + ExecStoreHeapTuple(&change->data.tp.oldtuple->tuple, + old_slot, false); /* Switch relation if publishing via root. */ if (relentry->publish_as_relid != RelationGetRelid(relation)) { Assert(relation->rd_rel->relispartition); ancestor = RelationIdGetRelation(relentry->publish_as_relid); - relation = ancestor; + targetrel = ancestor; /* Convert tuple if needed. */ - if (relentry->map) - oldtuple = execute_attr_map_tuple(oldtuple, relentry->map); + if (relentry->attrmap) + { + TupleDesc tupdesc = RelationGetDescr(targetrel); + + old_slot = execute_attr_map_slot(relentry->attrmap, + old_slot, + MakeTupleTableSlot(tupdesc, &TTSOpsVirtual)); + } } + /* Check row filter */ + if (!pgoutput_row_filter(targetrel, old_slot, &new_slot, + relentry, &action)) + break; + + maybe_send_schema(ctx, change, relation, relentry); + OutputPluginPrepareWrite(ctx, true); - logicalrep_write_delete(ctx->out, xid, relation, oldtuple, - data->binary); + logicalrep_write_delete(ctx->out, xid, targetrel, + old_slot, data->binary); OutputPluginWrite(ctx, true); } else @@ -798,7 +1378,7 @@ pgoutput_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, if (!is_publishable_relation(relation)) continue; - relentry = get_rel_sync_entry(data, relid); + relentry = get_rel_sync_entry(data, relation); if (!relentry->pubactions.pubtruncate) continue; @@ -873,8 +1453,9 @@ pgoutput_origin_filter(LogicalDecodingContext *ctx, /* * Shutdown the output plugin. * - * Note, we don't need to clean the data->context as it's child context - * of the ctx->context so it will be cleaned up by logical decoding machinery. + * Note, we don't need to clean the data->context and data->cachectx as + * they are child context of the ctx->context so it will be cleaned up by + * logical decoding machinery. */ static void pgoutput_shutdown(LogicalDecodingContext *ctx) @@ -1122,11 +1703,12 @@ set_schema_sent_in_streamed_txn(RelationSyncEntry *entry, TransactionId xid) * when publishing. */ static RelationSyncEntry * -get_rel_sync_entry(PGOutputData *data, Oid relid) +get_rel_sync_entry(PGOutputData *data, Relation relation) { RelationSyncEntry *entry; bool found; MemoryContext oldctx; + Oid relid = RelationGetRelid(relation); Assert(RelationSyncCache != NULL); @@ -1144,9 +1726,12 @@ get_rel_sync_entry(PGOutputData *data, Oid relid) entry->streamed_txns = NIL; entry->pubactions.pubinsert = entry->pubactions.pubupdate = entry->pubactions.pubdelete = entry->pubactions.pubtruncate = false; + entry->new_slot = NULL; + entry->old_slot = NULL; + memset(entry->exprstate, 0, sizeof(entry->exprstate)); + entry->cache_expr_cxt = NULL; entry->publish_as_relid = InvalidOid; - entry->map = NULL; /* will be set by maybe_send_schema() if - * needed */ + entry->attrmap = NULL; } /* Validate the entry */ @@ -1165,6 +1750,7 @@ get_rel_sync_entry(PGOutputData *data, Oid relid) Oid publish_as_relid = relid; bool am_partition = get_rel_relispartition(relid); char relkind = get_rel_relkind(relid); + List *rel_publications = NIL; /* Reload publications if needed before use. */ if (!publications_valid) @@ -1193,17 +1779,31 @@ get_rel_sync_entry(PGOutputData *data, Oid relid) entry->pubactions.pubupdate = false; entry->pubactions.pubdelete = false; entry->pubactions.pubtruncate = false; - if (entry->map) - { - /* - * Must free the TupleDescs contained in the map explicitly, - * because free_conversion_map() doesn't. - */ - FreeTupleDesc(entry->map->indesc); - FreeTupleDesc(entry->map->outdesc); - free_conversion_map(entry->map); - } - entry->map = NULL; + + /* + * Tuple slots cleanups. (Will be rebuilt later if needed). + */ + if (entry->old_slot) + ExecDropSingleTupleTableSlot(entry->old_slot); + if (entry->new_slot) + ExecDropSingleTupleTableSlot(entry->new_slot); + + entry->old_slot = NULL; + entry->new_slot = NULL; + + if (entry->attrmap) + free_attrmap(entry->attrmap); + entry->attrmap = NULL; + + /* + * Row filter cache cleanups. + */ + if (entry->cache_expr_cxt) + MemoryContextDelete(entry->cache_expr_cxt); + + entry->cache_expr_cxt = NULL; + entry->estate = NULL; + memset(entry->exprstate, 0, sizeof(entry->exprstate)); /* * Build publication cache. We can't use one provided by relcache as @@ -1234,28 +1834,17 @@ get_rel_sync_entry(PGOutputData *data, Oid relid) */ if (am_partition) { + Oid ancestor; List *ancestors = get_partition_ancestors(relid); - ListCell *lc2; - /* - * Find the "topmost" ancestor that is in this - * publication. - */ - foreach(lc2, ancestors) + ancestor = GetTopMostAncestorInPublication(pub->oid, + ancestors); + + if (ancestor != InvalidOid) { - Oid ancestor = lfirst_oid(lc2); - List *apubids = GetRelationPublications(ancestor); - List *aschemaPubids = GetSchemaPublications(get_rel_namespace(ancestor)); - - if (list_member_oid(apubids, pub->oid) || - list_member_oid(aschemaPubids, pub->oid)) - { - ancestor_published = true; - if (pub->pubviaroot) - publish_as_relid = ancestor; - } - list_free(apubids); - list_free(aschemaPubids); + ancestor_published = true; + if (pub->pubviaroot) + publish_as_relid = ancestor; } } @@ -1277,17 +1866,31 @@ get_rel_sync_entry(PGOutputData *data, Oid relid) entry->pubactions.pubupdate |= pub->pubactions.pubupdate; entry->pubactions.pubdelete |= pub->pubactions.pubdelete; entry->pubactions.pubtruncate |= pub->pubactions.pubtruncate; + + rel_publications = lappend(rel_publications, pub); } + } - if (entry->pubactions.pubinsert && entry->pubactions.pubupdate && - entry->pubactions.pubdelete && entry->pubactions.pubtruncate) - break; + entry->publish_as_relid = publish_as_relid; + + /* + * Initialize the tuple slot, map, and row filter. These are only used + * when publishing inserts, updates, or deletes. + */ + if (entry->pubactions.pubinsert || entry->pubactions.pubupdate || + entry->pubactions.pubdelete) + { + /* Initialize the tuple slot and map */ + init_tuple_slot(data, relation, entry); + + /* Initialize the row filter */ + pgoutput_row_filter_init(data, rel_publications, entry); } list_free(pubids); list_free(schemaPubids); + list_free(rel_publications); - entry->publish_as_relid = publish_as_relid; entry->replicate_valid = true; } diff --git a/src/backend/utils/cache/relcache.c b/src/backend/utils/cache/relcache.c index 2707fed12f4..fccffce5729 100644 --- a/src/backend/utils/cache/relcache.c +++ b/src/backend/utils/cache/relcache.c @@ -66,6 +66,7 @@ #include "catalog/schemapg.h" #include "catalog/storage.h" #include "commands/policy.h" +#include "commands/publicationcmds.h" #include "commands/trigger.h" #include "miscadmin.h" #include "nodes/makefuncs.h" @@ -2419,8 +2420,8 @@ RelationDestroyRelation(Relation relation, bool remember_tupdesc) bms_free(relation->rd_pkattr); bms_free(relation->rd_idattr); bms_free(relation->rd_hotblockingattr); - if (relation->rd_pubactions) - pfree(relation->rd_pubactions); + if (relation->rd_pubdesc) + pfree(relation->rd_pubdesc); if (relation->rd_options) pfree(relation->rd_options); if (relation->rd_indextuple) @@ -5523,38 +5524,57 @@ RelationGetExclusionInfo(Relation indexRelation, } /* - * Get publication actions for the given relation. + * Get the publication information for the given relation. + * + * Traverse all the publications which the relation is in to get the + * publication actions and validate the row filter expressions for such + * publications if any. We consider the row filter expression as invalid if it + * references any column which is not part of REPLICA IDENTITY. + * + * To avoid fetching the publication information repeatedly, we cache the + * publication actions and row filter validation information. */ -struct PublicationActions * -GetRelationPublicationActions(Relation relation) +void +RelationBuildPublicationDesc(Relation relation, PublicationDesc *pubdesc) { List *puboids; ListCell *lc; MemoryContext oldcxt; Oid schemaid; - PublicationActions *pubactions = palloc0(sizeof(PublicationActions)); + List *ancestors = NIL; + Oid relid = RelationGetRelid(relation); /* * If not publishable, it publishes no actions. (pgoutput_change() will * ignore it.) */ if (!is_publishable_relation(relation)) - return pubactions; + { + memset(pubdesc, 0, sizeof(PublicationDesc)); + pubdesc->rf_valid_for_update = true; + pubdesc->rf_valid_for_delete = true; + return; + } + + if (relation->rd_pubdesc) + { + memcpy(pubdesc, relation->rd_pubdesc, sizeof(PublicationDesc)); + return; + } - if (relation->rd_pubactions) - return memcpy(pubactions, relation->rd_pubactions, - sizeof(PublicationActions)); + memset(pubdesc, 0, sizeof(PublicationDesc)); + pubdesc->rf_valid_for_update = true; + pubdesc->rf_valid_for_delete = true; /* Fetch the publication membership info. */ - puboids = GetRelationPublications(RelationGetRelid(relation)); + puboids = GetRelationPublications(relid); schemaid = RelationGetNamespace(relation); puboids = list_concat_unique_oid(puboids, GetSchemaPublications(schemaid)); if (relation->rd_rel->relispartition) { /* Add publications that the ancestors are in too. */ - List *ancestors = get_partition_ancestors(RelationGetRelid(relation)); - ListCell *lc; + ancestors = get_partition_ancestors(relid); foreach(lc, ancestors) { @@ -5582,35 +5602,53 @@ GetRelationPublicationActions(Relation relation) pubform = (Form_pg_publication) GETSTRUCT(tup); - pubactions->pubinsert |= pubform->pubinsert; - pubactions->pubupdate |= pubform->pubupdate; - pubactions->pubdelete |= pubform->pubdelete; - pubactions->pubtruncate |= pubform->pubtruncate; + pubdesc->pubactions.pubinsert |= pubform->pubinsert; + pubdesc->pubactions.pubupdate |= pubform->pubupdate; + pubdesc->pubactions.pubdelete |= pubform->pubdelete; + pubdesc->pubactions.pubtruncate |= pubform->pubtruncate; + + /* + * Check if all columns referenced in the filter expression are part of + * the REPLICA IDENTITY index or not. + * + * If the publication is FOR ALL TABLES then it means the table has no + * row filters and we can skip the validation. + */ + if (!pubform->puballtables && + (pubform->pubupdate || pubform->pubdelete) && + contain_invalid_rfcolumn(pubid, relation, ancestors, + pubform->pubviaroot)) + { + if (pubform->pubupdate) + pubdesc->rf_valid_for_update = false; + if (pubform->pubdelete) + pubdesc->rf_valid_for_delete = false; + } ReleaseSysCache(tup); /* - * If we know everything is replicated, there is no point to check for - * other publications. + * If we know everything is replicated and the row filter is invalid + * for update and delete, there is no point to check for other + * publications. */ - if (pubactions->pubinsert && pubactions->pubupdate && - pubactions->pubdelete && pubactions->pubtruncate) + if (pubdesc->pubactions.pubinsert && pubdesc->pubactions.pubupdate && + pubdesc->pubactions.pubdelete && pubdesc->pubactions.pubtruncate && + !pubdesc->rf_valid_for_update && !pubdesc->rf_valid_for_delete) break; } - if (relation->rd_pubactions) + if (relation->rd_pubdesc) { - pfree(relation->rd_pubactions); - relation->rd_pubactions = NULL; + pfree(relation->rd_pubdesc); + relation->rd_pubdesc = NULL; } - /* Now save copy of the actions in the relcache entry. */ + /* Now save copy of the descriptor in the relcache entry. */ oldcxt = MemoryContextSwitchTo(CacheMemoryContext); - relation->rd_pubactions = palloc(sizeof(PublicationActions)); - memcpy(relation->rd_pubactions, pubactions, sizeof(PublicationActions)); + relation->rd_pubdesc = palloc(sizeof(PublicationDesc)); + memcpy(relation->rd_pubdesc, pubdesc, sizeof(PublicationDesc)); MemoryContextSwitchTo(oldcxt); - - return pubactions; } /* @@ -6163,7 +6201,7 @@ load_relcache_init_file(bool shared) rel->rd_pkattr = NULL; rel->rd_idattr = NULL; rel->rd_hotblockingattr = NULL; - rel->rd_pubactions = NULL; + rel->rd_pubdesc = NULL; rel->rd_statvalid = false; rel->rd_statlist = NIL; rel->rd_fkeyvalid = false; |
