summaryrefslogtreecommitdiff
path: root/src/backend/replication/pgoutput/pgoutput.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/replication/pgoutput/pgoutput.c')
-rw-r--r--src/backend/replication/pgoutput/pgoutput.c833
1 files changed, 718 insertions, 115 deletions
diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index 4162bb8de7b..ea57a0477f0 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -15,12 +15,17 @@
#include "access/tupconvert.h"
#include "catalog/partition.h"
#include "catalog/pg_publication.h"
+#include "catalog/pg_publication_rel.h"
#include "commands/defrem.h"
+#include "executor/executor.h"
#include "fmgr.h"
+#include "nodes/makefuncs.h"
+#include "optimizer/optimizer.h"
#include "replication/logical.h"
#include "replication/logicalproto.h"
#include "replication/origin.h"
#include "replication/pgoutput.h"
+#include "utils/builtins.h"
#include "utils/inval.h"
#include "utils/lsyscache.h"
#include "utils/memutils.h"
@@ -86,6 +91,19 @@ static void send_repl_origin(LogicalDecodingContext *ctx,
bool send_origin);
/*
+ * Only 3 publication actions are used for row filtering ("insert", "update",
+ * "delete"). See RelationSyncEntry.exprstate[].
+ */
+enum RowFilterPubAction
+{
+ PUBACTION_INSERT,
+ PUBACTION_UPDATE,
+ PUBACTION_DELETE
+};
+
+#define NUM_ROWFILTER_PUBACTIONS (PUBACTION_DELETE+1)
+
+/*
* Entry in the map used to remember which relation schemas we sent.
*
* The schema_sent flag determines if the current schema record for the
@@ -117,6 +135,21 @@ typedef struct RelationSyncEntry
PublicationActions pubactions;
/*
+ * ExprState array for row filter. Different publication actions don't
+ * allow multiple expressions to always be combined into one, because
+ * updates or deletes restrict the column in expression to be part of the
+ * replica identity index whereas inserts do not have this restriction, so
+ * there is one ExprState per publication action.
+ */
+ ExprState *exprstate[NUM_ROWFILTER_PUBACTIONS];
+ EState *estate; /* executor state used for row filter */
+ MemoryContext cache_expr_cxt; /* private context for exprstate and
+ * estate, if any */
+
+ TupleTableSlot *new_slot; /* slot for storing new tuple */
+ TupleTableSlot *old_slot; /* slot for storing old tuple */
+
+ /*
* OID of the relation to publish changes as. For a partition, this may
* be set to one of its ancestors whose schema will be used when
* replicating changes, if publish_via_partition_root is set for the
@@ -130,7 +163,7 @@ typedef struct RelationSyncEntry
* same as 'relid' or if unnecessary due to partition and the ancestor
* having identical TupleDesc.
*/
- TupleConversionMap *map;
+ AttrMap *attrmap;
} RelationSyncEntry;
/* Map used to remember which relation schemas we sent. */
@@ -138,7 +171,8 @@ static HTAB *RelationSyncCache = NULL;
static void init_rel_sync_cache(MemoryContext decoding_context);
static void cleanup_rel_sync_cache(TransactionId xid, bool is_commit);
-static RelationSyncEntry *get_rel_sync_entry(PGOutputData *data, Oid relid);
+static RelationSyncEntry *get_rel_sync_entry(PGOutputData *data,
+ Relation relation);
static void rel_sync_cache_relation_cb(Datum arg, Oid relid);
static void rel_sync_cache_publication_cb(Datum arg, int cacheid,
uint32 hashvalue);
@@ -146,6 +180,20 @@ static void set_schema_sent_in_streamed_txn(RelationSyncEntry *entry,
TransactionId xid);
static bool get_schema_sent_in_streamed_txn(RelationSyncEntry *entry,
TransactionId xid);
+static void init_tuple_slot(PGOutputData *data, Relation relation,
+ RelationSyncEntry *entry);
+
+/* row filter routines */
+static EState *create_estate_for_relation(Relation rel);
+static void pgoutput_row_filter_init(PGOutputData *data,
+ List *publications,
+ RelationSyncEntry *entry);
+static bool pgoutput_row_filter_exec_expr(ExprState *state,
+ ExprContext *econtext);
+static bool pgoutput_row_filter(Relation relation, TupleTableSlot *old_slot,
+ TupleTableSlot **new_slot_ptr,
+ RelationSyncEntry *entry,
+ ReorderBufferChangeType *action);
/*
* Specify output plugin callbacks
@@ -303,6 +351,10 @@ pgoutput_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
"logical replication output context",
ALLOCSET_DEFAULT_SIZES);
+ data->cachectx = AllocSetContextCreate(ctx->context,
+ "logical replication cache context",
+ ALLOCSET_DEFAULT_SIZES);
+
ctx->output_plugin_private = data;
/* This plugin uses binary protocol. */
@@ -543,37 +595,14 @@ maybe_send_schema(LogicalDecodingContext *ctx,
return;
/*
- * Nope, so send the schema. If the changes will be published using an
- * ancestor's schema, not the relation's own, send that ancestor's schema
- * before sending relation's own (XXX - maybe sending only the former
- * suffices?). This is also a good place to set the map that will be used
- * to convert the relation's tuples into the ancestor's format, if needed.
+ * Send the schema. If the changes will be published using an ancestor's
+ * schema, not the relation's own, send that ancestor's schema before
+ * sending relation's own (XXX - maybe sending only the former suffices?).
*/
if (relentry->publish_as_relid != RelationGetRelid(relation))
{
Relation ancestor = RelationIdGetRelation(relentry->publish_as_relid);
- TupleDesc indesc = RelationGetDescr(relation);
- TupleDesc outdesc = RelationGetDescr(ancestor);
- MemoryContext oldctx;
-
- /* Map must live as long as the session does. */
- oldctx = MemoryContextSwitchTo(CacheMemoryContext);
- /*
- * Make copies of the TupleDescs that will live as long as the map
- * does before putting into the map.
- */
- indesc = CreateTupleDescCopy(indesc);
- outdesc = CreateTupleDescCopy(outdesc);
- relentry->map = convert_tuples_by_name(indesc, outdesc);
- if (relentry->map == NULL)
- {
- /* Map not necessary, so free the TupleDescs too. */
- FreeTupleDesc(indesc);
- FreeTupleDesc(outdesc);
- }
-
- MemoryContextSwitchTo(oldctx);
send_relation_and_attrs(ancestor, xid, ctx);
RelationClose(ancestor);
}
@@ -625,6 +654,484 @@ send_relation_and_attrs(Relation relation, TransactionId xid,
}
/*
+ * Executor state preparation for evaluation of row filter expressions for the
+ * specified relation.
+ */
+static EState *
+create_estate_for_relation(Relation rel)
+{
+ EState *estate;
+ RangeTblEntry *rte;
+
+ estate = CreateExecutorState();
+
+ rte = makeNode(RangeTblEntry);
+ rte->rtekind = RTE_RELATION;
+ rte->relid = RelationGetRelid(rel);
+ rte->relkind = rel->rd_rel->relkind;
+ rte->rellockmode = AccessShareLock;
+ ExecInitRangeTable(estate, list_make1(rte));
+
+ estate->es_output_cid = GetCurrentCommandId(false);
+
+ return estate;
+}
+
+/*
+ * Evaluates row filter.
+ *
+ * If the row filter evaluates to NULL, it is taken as false i.e. the change
+ * isn't replicated.
+ */
+static bool
+pgoutput_row_filter_exec_expr(ExprState *state, ExprContext *econtext)
+{
+ Datum ret;
+ bool isnull;
+
+ Assert(state != NULL);
+
+ ret = ExecEvalExprSwitchContext(state, econtext, &isnull);
+
+ elog(DEBUG3, "row filter evaluates to %s (isnull: %s)",
+ isnull ? "false" : DatumGetBool(ret) ? "true" : "false",
+ isnull ? "true" : "false");
+
+ if (isnull)
+ return false;
+
+ return DatumGetBool(ret);
+}
+
+/*
+ * Initialize the row filter.
+ */
+static void
+pgoutput_row_filter_init(PGOutputData *data, List *publications,
+ RelationSyncEntry *entry)
+{
+ ListCell *lc;
+ List *rfnodes[] = {NIL, NIL, NIL}; /* One per pubaction */
+ bool no_filter[] = {false, false, false}; /* One per pubaction */
+ MemoryContext oldctx;
+ int idx;
+ bool has_filter = true;
+
+ /*
+ * Find if there are any row filters for this relation. If there are, then
+ * prepare the necessary ExprState and cache it in entry->exprstate. To
+ * build an expression state, we need to ensure the following:
+ *
+ * All the given publication-table mappings must be checked.
+ *
+ * Multiple publications might have multiple row filters for this
+ * relation. Since row filter usage depends on the DML operation, there
+ * are multiple lists (one for each operation) to which row filters will
+ * be appended.
+ *
+ * FOR ALL TABLES implies "don't use row filter expression" so it takes
+ * precedence.
+ */
+ foreach(lc, publications)
+ {
+ Publication *pub = lfirst(lc);
+ HeapTuple rftuple = NULL;
+ Datum rfdatum = 0;
+ bool pub_no_filter = false;
+
+ if (pub->alltables)
+ {
+ /*
+ * If the publication is FOR ALL TABLES then it is treated the
+ * same as if this table has no row filters (even if for other
+ * publications it does).
+ */
+ pub_no_filter = true;
+ }
+ else
+ {
+ /*
+ * Check for the presence of a row filter in this publication.
+ */
+ rftuple = SearchSysCache2(PUBLICATIONRELMAP,
+ ObjectIdGetDatum(entry->publish_as_relid),
+ ObjectIdGetDatum(pub->oid));
+
+ if (HeapTupleIsValid(rftuple))
+ {
+ /* Null indicates no filter. */
+ rfdatum = SysCacheGetAttr(PUBLICATIONRELMAP, rftuple,
+ Anum_pg_publication_rel_prqual,
+ &pub_no_filter);
+ }
+ else
+ {
+ pub_no_filter = true;
+ }
+ }
+
+ if (pub_no_filter)
+ {
+ if (rftuple)
+ ReleaseSysCache(rftuple);
+
+ no_filter[PUBACTION_INSERT] |= pub->pubactions.pubinsert;
+ no_filter[PUBACTION_UPDATE] |= pub->pubactions.pubupdate;
+ no_filter[PUBACTION_DELETE] |= pub->pubactions.pubdelete;
+
+ /*
+ * Quick exit if all the DML actions are publicized via this
+ * publication.
+ */
+ if (no_filter[PUBACTION_INSERT] &&
+ no_filter[PUBACTION_UPDATE] &&
+ no_filter[PUBACTION_DELETE])
+ {
+ has_filter = false;
+ break;
+ }
+
+ /* No additional work for this publication. Next one. */
+ continue;
+ }
+
+ /* Form the per pubaction row filter lists. */
+ if (pub->pubactions.pubinsert && !no_filter[PUBACTION_INSERT])
+ rfnodes[PUBACTION_INSERT] = lappend(rfnodes[PUBACTION_INSERT],
+ TextDatumGetCString(rfdatum));
+ if (pub->pubactions.pubupdate && !no_filter[PUBACTION_UPDATE])
+ rfnodes[PUBACTION_UPDATE] = lappend(rfnodes[PUBACTION_UPDATE],
+ TextDatumGetCString(rfdatum));
+ if (pub->pubactions.pubdelete && !no_filter[PUBACTION_DELETE])
+ rfnodes[PUBACTION_DELETE] = lappend(rfnodes[PUBACTION_DELETE],
+ TextDatumGetCString(rfdatum));
+
+ ReleaseSysCache(rftuple);
+ } /* loop all subscribed publications */
+
+ /* Clean the row filter */
+ for (idx = 0; idx < NUM_ROWFILTER_PUBACTIONS; idx++)
+ {
+ if (no_filter[idx])
+ {
+ list_free_deep(rfnodes[idx]);
+ rfnodes[idx] = NIL;
+ }
+ }
+
+ if (has_filter)
+ {
+ Relation relation = RelationIdGetRelation(entry->publish_as_relid);
+
+ Assert(entry->cache_expr_cxt == NULL);
+
+ /* Create the memory context for row filters */
+ entry->cache_expr_cxt = AllocSetContextCreate(data->cachectx,
+ "Row filter expressions",
+ ALLOCSET_DEFAULT_SIZES);
+
+ MemoryContextCopyAndSetIdentifier(entry->cache_expr_cxt,
+ RelationGetRelationName(relation));
+
+ /*
+ * Now all the filters for all pubactions are known. Combine them when
+ * their pubactions are the same.
+ */
+ oldctx = MemoryContextSwitchTo(entry->cache_expr_cxt);
+ entry->estate = create_estate_for_relation(relation);
+ for (idx = 0; idx < NUM_ROWFILTER_PUBACTIONS; idx++)
+ {
+ List *filters = NIL;
+ Expr *rfnode;
+
+ if (rfnodes[idx] == NIL)
+ continue;
+
+ foreach(lc, rfnodes[idx])
+ filters = lappend(filters, stringToNode((char *) lfirst(lc)));
+
+ /* combine the row filter and cache the ExprState */
+ rfnode = make_orclause(filters);
+ entry->exprstate[idx] = ExecPrepareExpr(rfnode, entry->estate);
+ } /* for each pubaction */
+ MemoryContextSwitchTo(oldctx);
+
+ RelationClose(relation);
+ }
+}
+
+/*
+ * Initialize the slot for storing new and old tuples, and build the map that
+ * will be used to convert the relation's tuples into the ancestor's format.
+ */
+static void
+init_tuple_slot(PGOutputData *data, Relation relation,
+ RelationSyncEntry *entry)
+{
+ MemoryContext oldctx;
+ TupleDesc oldtupdesc;
+ TupleDesc newtupdesc;
+
+ oldctx = MemoryContextSwitchTo(data->cachectx);
+
+ /*
+ * Create tuple table slots. Create a copy of the TupleDesc as it needs to
+ * live as long as the cache remains.
+ */
+ oldtupdesc = CreateTupleDescCopy(RelationGetDescr(relation));
+ newtupdesc = CreateTupleDescCopy(RelationGetDescr(relation));
+
+ entry->old_slot = MakeSingleTupleTableSlot(oldtupdesc, &TTSOpsHeapTuple);
+ entry->new_slot = MakeSingleTupleTableSlot(newtupdesc, &TTSOpsHeapTuple);
+
+ MemoryContextSwitchTo(oldctx);
+
+ /*
+ * Cache the map that will be used to convert the relation's tuples into
+ * the ancestor's format, if needed.
+ */
+ if (entry->publish_as_relid != RelationGetRelid(relation))
+ {
+ Relation ancestor = RelationIdGetRelation(entry->publish_as_relid);
+ TupleDesc indesc = RelationGetDescr(relation);
+ TupleDesc outdesc = RelationGetDescr(ancestor);
+
+ /* Map must live as long as the session does. */
+ oldctx = MemoryContextSwitchTo(CacheMemoryContext);
+
+ entry->attrmap = build_attrmap_by_name_if_req(indesc, outdesc);
+
+ MemoryContextSwitchTo(oldctx);
+ RelationClose(ancestor);
+ }
+}
+
+/*
+ * Change is checked against the row filter if any.
+ *
+ * Returns true if the change is to be replicated, else false.
+ *
+ * For inserts, evaluate the row filter for new tuple.
+ * For deletes, evaluate the row filter for old tuple.
+ * For updates, evaluate the row filter for old and new tuple.
+ *
+ * For updates, if both evaluations are true, we allow sending the UPDATE and
+ * if both the evaluations are false, it doesn't replicate the UPDATE. Now, if
+ * only one of the tuples matches the row filter expression, we transform
+ * UPDATE to DELETE or INSERT to avoid any data inconsistency based on the
+ * following rules:
+ *
+ * Case 1: old-row (no match) new-row (no match) -> (drop change)
+ * Case 2: old-row (no match) new row (match) -> INSERT
+ * Case 3: old-row (match) new-row (no match) -> DELETE
+ * Case 4: old-row (match) new row (match) -> UPDATE
+ *
+ * The new action is updated in the action parameter.
+ *
+ * The new slot could be updated when transforming the UPDATE into INSERT,
+ * because the original new tuple might not have column values from the replica
+ * identity.
+ *
+ * Examples:
+ * Let's say the old tuple satisfies the row filter but the new tuple doesn't.
+ * Since the old tuple satisfies, the initial table synchronization copied this
+ * row (or another method was used to guarantee that there is data
+ * consistency). However, after the UPDATE the new tuple doesn't satisfy the
+ * row filter, so from a data consistency perspective, that row should be
+ * removed on the subscriber. The UPDATE should be transformed into a DELETE
+ * statement and be sent to the subscriber. Keeping this row on the subscriber
+ * is undesirable because it doesn't reflect what was defined in the row filter
+ * expression on the publisher. This row on the subscriber would likely not be
+ * modified by replication again. If someone inserted a new row with the same
+ * old identifier, replication could stop due to a constraint violation.
+ *
+ * Let's say the old tuple doesn't match the row filter but the new tuple does.
+ * Since the old tuple doesn't satisfy, the initial table synchronization
+ * probably didn't copy this row. However, after the UPDATE the new tuple does
+ * satisfy the row filter, so from a data consistency perspective, that row
+ * should be inserted on the subscriber. Otherwise, subsequent UPDATE or DELETE
+ * statements have no effect (it matches no row -- see
+ * apply_handle_update_internal()). So, the UPDATE should be transformed into a
+ * INSERT statement and be sent to the subscriber. However, this might surprise
+ * someone who expects the data set to satisfy the row filter expression on the
+ * provider.
+ */
+static bool
+pgoutput_row_filter(Relation relation, TupleTableSlot *old_slot,
+ TupleTableSlot **new_slot_ptr, RelationSyncEntry *entry,
+ ReorderBufferChangeType *action)
+{
+ TupleDesc desc;
+ int i;
+ bool old_matched,
+ new_matched,
+ result;
+ TupleTableSlot *tmp_new_slot;
+ TupleTableSlot *new_slot = *new_slot_ptr;
+ ExprContext *ecxt;
+ ExprState *filter_exprstate;
+
+ /*
+ * We need this map to avoid relying on ReorderBufferChangeType enums
+ * having specific values.
+ */
+ static const int map_changetype_pubaction[] = {
+ [REORDER_BUFFER_CHANGE_INSERT] = PUBACTION_INSERT,
+ [REORDER_BUFFER_CHANGE_UPDATE] = PUBACTION_UPDATE,
+ [REORDER_BUFFER_CHANGE_DELETE] = PUBACTION_DELETE
+ };
+
+ Assert(*action == REORDER_BUFFER_CHANGE_INSERT ||
+ *action == REORDER_BUFFER_CHANGE_UPDATE ||
+ *action == REORDER_BUFFER_CHANGE_DELETE);
+
+ Assert(new_slot || old_slot);
+
+ /* Get the corresponding row filter */
+ filter_exprstate = entry->exprstate[map_changetype_pubaction[*action]];
+
+ /* Bail out if there is no row filter */
+ if (!filter_exprstate)
+ return true;
+
+ elog(DEBUG3, "table \"%s.%s\" has row filter",
+ get_namespace_name(RelationGetNamespace(relation)),
+ RelationGetRelationName(relation));
+
+ ResetPerTupleExprContext(entry->estate);
+
+ ecxt = GetPerTupleExprContext(entry->estate);
+
+ /*
+ * For the following occasions where there is only one tuple, we can
+ * evaluate the row filter for that tuple and return.
+ *
+ * For inserts, we only have the new tuple.
+ *
+ * For updates, we can have only a new tuple when none of the replica
+ * identity columns changed but we still need to evaluate the row filter
+ * for new tuple as the existing values of those columns might not match
+ * the filter. Also, users can use constant expressions in the row filter,
+ * so we anyway need to evaluate it for the new tuple.
+ *
+ * For deletes, we only have the old tuple.
+ */
+ if (!new_slot || !old_slot)
+ {
+ ecxt->ecxt_scantuple = new_slot ? new_slot : old_slot;
+ result = pgoutput_row_filter_exec_expr(filter_exprstate, ecxt);
+
+ return result;
+ }
+
+ /*
+ * Both the old and new tuples must be valid only for updates and need to
+ * be checked against the row filter.
+ */
+ Assert(map_changetype_pubaction[*action] == PUBACTION_UPDATE);
+
+ slot_getallattrs(new_slot);
+ slot_getallattrs(old_slot);
+
+ tmp_new_slot = NULL;
+ desc = RelationGetDescr(relation);
+
+ /*
+ * The new tuple might not have all the replica identity columns, in which
+ * case it needs to be copied over from the old tuple.
+ */
+ for (i = 0; i < desc->natts; i++)
+ {
+ Form_pg_attribute att = TupleDescAttr(desc, i);
+
+ /*
+ * if the column in the new tuple or old tuple is null, nothing to do
+ */
+ if (new_slot->tts_isnull[i] || old_slot->tts_isnull[i])
+ continue;
+
+ /*
+ * Unchanged toasted replica identity columns are only logged in the
+ * old tuple. Copy this over to the new tuple. The changed (or WAL
+ * Logged) toast values are always assembled in memory and set as
+ * VARTAG_INDIRECT. See ReorderBufferToastReplace.
+ */
+ if (att->attlen == -1 &&
+ VARATT_IS_EXTERNAL_ONDISK(new_slot->tts_values[i]) &&
+ !VARATT_IS_EXTERNAL_ONDISK(old_slot->tts_values[i]))
+ {
+ if (!tmp_new_slot)
+ {
+ tmp_new_slot = MakeSingleTupleTableSlot(desc, &TTSOpsVirtual);
+ ExecClearTuple(tmp_new_slot);
+
+ memcpy(tmp_new_slot->tts_values, new_slot->tts_values,
+ desc->natts * sizeof(Datum));
+ memcpy(tmp_new_slot->tts_isnull, new_slot->tts_isnull,
+ desc->natts * sizeof(bool));
+ }
+
+ tmp_new_slot->tts_values[i] = old_slot->tts_values[i];
+ tmp_new_slot->tts_isnull[i] = old_slot->tts_isnull[i];
+ }
+ }
+
+ ecxt->ecxt_scantuple = old_slot;
+ old_matched = pgoutput_row_filter_exec_expr(filter_exprstate, ecxt);
+
+ if (tmp_new_slot)
+ {
+ ExecStoreVirtualTuple(tmp_new_slot);
+ ecxt->ecxt_scantuple = tmp_new_slot;
+ }
+ else
+ ecxt->ecxt_scantuple = new_slot;
+
+ new_matched = pgoutput_row_filter_exec_expr(filter_exprstate, ecxt);
+
+ /*
+ * Case 1: if both tuples don't match the row filter, bailout. Send
+ * nothing.
+ */
+ if (!old_matched && !new_matched)
+ return false;
+
+ /*
+ * Case 2: if the old tuple doesn't satisfy the row filter but the new
+ * tuple does, transform the UPDATE into INSERT.
+ *
+ * Use the newly transformed tuple that must contain the column values for
+ * all the replica identity columns. This is required to ensure that the
+ * while inserting the tuple in the downstream node, we have all the
+ * required column values.
+ */
+ if (!old_matched && new_matched)
+ {
+ *action = REORDER_BUFFER_CHANGE_INSERT;
+
+ if (tmp_new_slot)
+ *new_slot_ptr = tmp_new_slot;
+ }
+
+ /*
+ * Case 3: if the old tuple satisfies the row filter but the new tuple
+ * doesn't, transform the UPDATE into DELETE.
+ *
+ * This transformation does not require another tuple. The Old tuple will
+ * be used for DELETE.
+ */
+ else if (old_matched && !new_matched)
+ *action = REORDER_BUFFER_CHANGE_DELETE;
+
+ /*
+ * Case 4: if both tuples match the row filter, transformation isn't
+ * required. (*action is default UPDATE).
+ */
+
+ return true;
+}
+
+/*
* Sends the decoded DML over wire.
*
* This is called both in streaming and non-streaming modes.
@@ -638,6 +1145,10 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
RelationSyncEntry *relentry;
TransactionId xid = InvalidTransactionId;
Relation ancestor = NULL;
+ Relation targetrel = relation;
+ ReorderBufferChangeType action = change->action;
+ TupleTableSlot *old_slot = NULL;
+ TupleTableSlot *new_slot = NULL;
if (!is_publishable_relation(relation))
return;
@@ -651,10 +1162,10 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
if (in_streaming)
xid = change->txn->xid;
- relentry = get_rel_sync_entry(data, RelationGetRelid(relation));
+ relentry = get_rel_sync_entry(data, relation);
/* First check the table filter */
- switch (change->action)
+ switch (action)
{
case REORDER_BUFFER_CHANGE_INSERT:
if (!relentry->pubactions.pubinsert)
@@ -675,80 +1186,149 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
/* Avoid leaking memory by using and resetting our own context */
old = MemoryContextSwitchTo(data->context);
- maybe_send_schema(ctx, change, relation, relentry);
-
/* Send the data */
- switch (change->action)
+ switch (action)
{
case REORDER_BUFFER_CHANGE_INSERT:
- {
- HeapTuple tuple = &change->data.tp.newtuple->tuple;
+ new_slot = relentry->new_slot;
+ ExecStoreHeapTuple(&change->data.tp.newtuple->tuple,
+ new_slot, false);
- /* Switch relation if publishing via root. */
- if (relentry->publish_as_relid != RelationGetRelid(relation))
+ /* Switch relation if publishing via root. */
+ if (relentry->publish_as_relid != RelationGetRelid(relation))
+ {
+ Assert(relation->rd_rel->relispartition);
+ ancestor = RelationIdGetRelation(relentry->publish_as_relid);
+ targetrel = ancestor;
+ /* Convert tuple if needed. */
+ if (relentry->attrmap)
{
- Assert(relation->rd_rel->relispartition);
- ancestor = RelationIdGetRelation(relentry->publish_as_relid);
- relation = ancestor;
- /* Convert tuple if needed. */
- if (relentry->map)
- tuple = execute_attr_map_tuple(tuple, relentry->map);
+ TupleDesc tupdesc = RelationGetDescr(targetrel);
+
+ new_slot = execute_attr_map_slot(relentry->attrmap,
+ new_slot,
+ MakeTupleTableSlot(tupdesc, &TTSOpsVirtual));
}
+ }
- OutputPluginPrepareWrite(ctx, true);
- logicalrep_write_insert(ctx->out, xid, relation, tuple,
- data->binary);
- OutputPluginWrite(ctx, true);
+ /* Check row filter */
+ if (!pgoutput_row_filter(targetrel, NULL, &new_slot, relentry,
+ &action))
break;
- }
+
+ /*
+ * Schema should be sent using the original relation because it
+ * also sends the ancestor's relation.
+ */
+ maybe_send_schema(ctx, change, relation, relentry);
+
+ OutputPluginPrepareWrite(ctx, true);
+ logicalrep_write_insert(ctx->out, xid, targetrel, new_slot,
+ data->binary);
+ OutputPluginWrite(ctx, true);
+ break;
case REORDER_BUFFER_CHANGE_UPDATE:
+ if (change->data.tp.oldtuple)
{
- HeapTuple oldtuple = change->data.tp.oldtuple ?
- &change->data.tp.oldtuple->tuple : NULL;
- HeapTuple newtuple = &change->data.tp.newtuple->tuple;
+ old_slot = relentry->old_slot;
+ ExecStoreHeapTuple(&change->data.tp.oldtuple->tuple,
+ old_slot, false);
+ }
- /* Switch relation if publishing via root. */
- if (relentry->publish_as_relid != RelationGetRelid(relation))
+ new_slot = relentry->new_slot;
+ ExecStoreHeapTuple(&change->data.tp.newtuple->tuple,
+ new_slot, false);
+
+ /* Switch relation if publishing via root. */
+ if (relentry->publish_as_relid != RelationGetRelid(relation))
+ {
+ Assert(relation->rd_rel->relispartition);
+ ancestor = RelationIdGetRelation(relentry->publish_as_relid);
+ targetrel = ancestor;
+ /* Convert tuples if needed. */
+ if (relentry->attrmap)
{
- Assert(relation->rd_rel->relispartition);
- ancestor = RelationIdGetRelation(relentry->publish_as_relid);
- relation = ancestor;
- /* Convert tuples if needed. */
- if (relentry->map)
- {
- if (oldtuple)
- oldtuple = execute_attr_map_tuple(oldtuple,
- relentry->map);
- newtuple = execute_attr_map_tuple(newtuple,
- relentry->map);
- }
+ TupleDesc tupdesc = RelationGetDescr(targetrel);
+
+ if (old_slot)
+ old_slot = execute_attr_map_slot(relentry->attrmap,
+ old_slot,
+ MakeTupleTableSlot(tupdesc, &TTSOpsVirtual));
+
+ new_slot = execute_attr_map_slot(relentry->attrmap,
+ new_slot,
+ MakeTupleTableSlot(tupdesc, &TTSOpsVirtual));
}
+ }
- OutputPluginPrepareWrite(ctx, true);
- logicalrep_write_update(ctx->out, xid, relation, oldtuple,
- newtuple, data->binary);
- OutputPluginWrite(ctx, true);
+ /* Check row filter */
+ if (!pgoutput_row_filter(targetrel, old_slot, &new_slot,
+ relentry, &action))
break;
+
+ maybe_send_schema(ctx, change, relation, relentry);
+
+ OutputPluginPrepareWrite(ctx, true);
+
+ /*
+ * Updates could be transformed to inserts or deletes based on the
+ * results of the row filter for old and new tuple.
+ */
+ switch (action)
+ {
+ case REORDER_BUFFER_CHANGE_INSERT:
+ logicalrep_write_insert(ctx->out, xid, targetrel,
+ new_slot, data->binary);
+ break;
+ case REORDER_BUFFER_CHANGE_UPDATE:
+ logicalrep_write_update(ctx->out, xid, targetrel,
+ old_slot, new_slot, data->binary);
+ break;
+ case REORDER_BUFFER_CHANGE_DELETE:
+ logicalrep_write_delete(ctx->out, xid, targetrel,
+ old_slot, data->binary);
+ break;
+ default:
+ Assert(false);
}
+
+ OutputPluginWrite(ctx, true);
+ break;
case REORDER_BUFFER_CHANGE_DELETE:
if (change->data.tp.oldtuple)
{
- HeapTuple oldtuple = &change->data.tp.oldtuple->tuple;
+ old_slot = relentry->old_slot;
+
+ ExecStoreHeapTuple(&change->data.tp.oldtuple->tuple,
+ old_slot, false);
/* Switch relation if publishing via root. */
if (relentry->publish_as_relid != RelationGetRelid(relation))
{
Assert(relation->rd_rel->relispartition);
ancestor = RelationIdGetRelation(relentry->publish_as_relid);
- relation = ancestor;
+ targetrel = ancestor;
/* Convert tuple if needed. */
- if (relentry->map)
- oldtuple = execute_attr_map_tuple(oldtuple, relentry->map);
+ if (relentry->attrmap)
+ {
+ TupleDesc tupdesc = RelationGetDescr(targetrel);
+
+ old_slot = execute_attr_map_slot(relentry->attrmap,
+ old_slot,
+ MakeTupleTableSlot(tupdesc, &TTSOpsVirtual));
+ }
}
+ /* Check row filter */
+ if (!pgoutput_row_filter(targetrel, old_slot, &new_slot,
+ relentry, &action))
+ break;
+
+ maybe_send_schema(ctx, change, relation, relentry);
+
OutputPluginPrepareWrite(ctx, true);
- logicalrep_write_delete(ctx->out, xid, relation, oldtuple,
- data->binary);
+ logicalrep_write_delete(ctx->out, xid, targetrel,
+ old_slot, data->binary);
OutputPluginWrite(ctx, true);
}
else
@@ -798,7 +1378,7 @@ pgoutput_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
if (!is_publishable_relation(relation))
continue;
- relentry = get_rel_sync_entry(data, relid);
+ relentry = get_rel_sync_entry(data, relation);
if (!relentry->pubactions.pubtruncate)
continue;
@@ -873,8 +1453,9 @@ pgoutput_origin_filter(LogicalDecodingContext *ctx,
/*
* Shutdown the output plugin.
*
- * Note, we don't need to clean the data->context as it's child context
- * of the ctx->context so it will be cleaned up by logical decoding machinery.
+ * Note, we don't need to clean the data->context and data->cachectx as
+ * they are child context of the ctx->context so it will be cleaned up by
+ * logical decoding machinery.
*/
static void
pgoutput_shutdown(LogicalDecodingContext *ctx)
@@ -1122,11 +1703,12 @@ set_schema_sent_in_streamed_txn(RelationSyncEntry *entry, TransactionId xid)
* when publishing.
*/
static RelationSyncEntry *
-get_rel_sync_entry(PGOutputData *data, Oid relid)
+get_rel_sync_entry(PGOutputData *data, Relation relation)
{
RelationSyncEntry *entry;
bool found;
MemoryContext oldctx;
+ Oid relid = RelationGetRelid(relation);
Assert(RelationSyncCache != NULL);
@@ -1144,9 +1726,12 @@ get_rel_sync_entry(PGOutputData *data, Oid relid)
entry->streamed_txns = NIL;
entry->pubactions.pubinsert = entry->pubactions.pubupdate =
entry->pubactions.pubdelete = entry->pubactions.pubtruncate = false;
+ entry->new_slot = NULL;
+ entry->old_slot = NULL;
+ memset(entry->exprstate, 0, sizeof(entry->exprstate));
+ entry->cache_expr_cxt = NULL;
entry->publish_as_relid = InvalidOid;
- entry->map = NULL; /* will be set by maybe_send_schema() if
- * needed */
+ entry->attrmap = NULL;
}
/* Validate the entry */
@@ -1165,6 +1750,7 @@ get_rel_sync_entry(PGOutputData *data, Oid relid)
Oid publish_as_relid = relid;
bool am_partition = get_rel_relispartition(relid);
char relkind = get_rel_relkind(relid);
+ List *rel_publications = NIL;
/* Reload publications if needed before use. */
if (!publications_valid)
@@ -1193,17 +1779,31 @@ get_rel_sync_entry(PGOutputData *data, Oid relid)
entry->pubactions.pubupdate = false;
entry->pubactions.pubdelete = false;
entry->pubactions.pubtruncate = false;
- if (entry->map)
- {
- /*
- * Must free the TupleDescs contained in the map explicitly,
- * because free_conversion_map() doesn't.
- */
- FreeTupleDesc(entry->map->indesc);
- FreeTupleDesc(entry->map->outdesc);
- free_conversion_map(entry->map);
- }
- entry->map = NULL;
+
+ /*
+ * Tuple slots cleanups. (Will be rebuilt later if needed).
+ */
+ if (entry->old_slot)
+ ExecDropSingleTupleTableSlot(entry->old_slot);
+ if (entry->new_slot)
+ ExecDropSingleTupleTableSlot(entry->new_slot);
+
+ entry->old_slot = NULL;
+ entry->new_slot = NULL;
+
+ if (entry->attrmap)
+ free_attrmap(entry->attrmap);
+ entry->attrmap = NULL;
+
+ /*
+ * Row filter cache cleanups.
+ */
+ if (entry->cache_expr_cxt)
+ MemoryContextDelete(entry->cache_expr_cxt);
+
+ entry->cache_expr_cxt = NULL;
+ entry->estate = NULL;
+ memset(entry->exprstate, 0, sizeof(entry->exprstate));
/*
* Build publication cache. We can't use one provided by relcache as
@@ -1234,28 +1834,17 @@ get_rel_sync_entry(PGOutputData *data, Oid relid)
*/
if (am_partition)
{
+ Oid ancestor;
List *ancestors = get_partition_ancestors(relid);
- ListCell *lc2;
- /*
- * Find the "topmost" ancestor that is in this
- * publication.
- */
- foreach(lc2, ancestors)
+ ancestor = GetTopMostAncestorInPublication(pub->oid,
+ ancestors);
+
+ if (ancestor != InvalidOid)
{
- Oid ancestor = lfirst_oid(lc2);
- List *apubids = GetRelationPublications(ancestor);
- List *aschemaPubids = GetSchemaPublications(get_rel_namespace(ancestor));
-
- if (list_member_oid(apubids, pub->oid) ||
- list_member_oid(aschemaPubids, pub->oid))
- {
- ancestor_published = true;
- if (pub->pubviaroot)
- publish_as_relid = ancestor;
- }
- list_free(apubids);
- list_free(aschemaPubids);
+ ancestor_published = true;
+ if (pub->pubviaroot)
+ publish_as_relid = ancestor;
}
}
@@ -1277,17 +1866,31 @@ get_rel_sync_entry(PGOutputData *data, Oid relid)
entry->pubactions.pubupdate |= pub->pubactions.pubupdate;
entry->pubactions.pubdelete |= pub->pubactions.pubdelete;
entry->pubactions.pubtruncate |= pub->pubactions.pubtruncate;
+
+ rel_publications = lappend(rel_publications, pub);
}
+ }
- if (entry->pubactions.pubinsert && entry->pubactions.pubupdate &&
- entry->pubactions.pubdelete && entry->pubactions.pubtruncate)
- break;
+ entry->publish_as_relid = publish_as_relid;
+
+ /*
+ * Initialize the tuple slot, map, and row filter. These are only used
+ * when publishing inserts, updates, or deletes.
+ */
+ if (entry->pubactions.pubinsert || entry->pubactions.pubupdate ||
+ entry->pubactions.pubdelete)
+ {
+ /* Initialize the tuple slot and map */
+ init_tuple_slot(data, relation, entry);
+
+ /* Initialize the row filter */
+ pgoutput_row_filter_init(data, rel_publications, entry);
}
list_free(pubids);
list_free(schemaPubids);
+ list_free(rel_publications);
- entry->publish_as_relid = publish_as_relid;
entry->replicate_valid = true;
}