summaryrefslogtreecommitdiff
path: root/src/backend/replication
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/replication')
-rw-r--r--src/backend/replication/logical/proto.c36
-rw-r--r--src/backend/replication/logical/tablesync.c142
-rw-r--r--src/backend/replication/pgoutput/pgoutput.c833
3 files changed, 870 insertions, 141 deletions
diff --git a/src/backend/replication/logical/proto.c b/src/backend/replication/logical/proto.c
index 953942692ce..c9b0eeefd7e 100644
--- a/src/backend/replication/logical/proto.c
+++ b/src/backend/replication/logical/proto.c
@@ -31,8 +31,8 @@
static void logicalrep_write_attrs(StringInfo out, Relation rel);
static void logicalrep_write_tuple(StringInfo out, Relation rel,
- HeapTuple tuple, bool binary);
-
+ TupleTableSlot *slot,
+ bool binary);
static void logicalrep_read_attrs(StringInfo in, LogicalRepRelation *rel);
static void logicalrep_read_tuple(StringInfo in, LogicalRepTupleData *tuple);
@@ -398,7 +398,7 @@ logicalrep_read_origin(StringInfo in, XLogRecPtr *origin_lsn)
*/
void
logicalrep_write_insert(StringInfo out, TransactionId xid, Relation rel,
- HeapTuple newtuple, bool binary)
+ TupleTableSlot *newslot, bool binary)
{
pq_sendbyte(out, LOGICAL_REP_MSG_INSERT);
@@ -410,7 +410,7 @@ logicalrep_write_insert(StringInfo out, TransactionId xid, Relation rel,
pq_sendint32(out, RelationGetRelid(rel));
pq_sendbyte(out, 'N'); /* new tuple follows */
- logicalrep_write_tuple(out, rel, newtuple, binary);
+ logicalrep_write_tuple(out, rel, newslot, binary);
}
/*
@@ -442,7 +442,8 @@ logicalrep_read_insert(StringInfo in, LogicalRepTupleData *newtup)
*/
void
logicalrep_write_update(StringInfo out, TransactionId xid, Relation rel,
- HeapTuple oldtuple, HeapTuple newtuple, bool binary)
+ TupleTableSlot *oldslot, TupleTableSlot *newslot,
+ bool binary)
{
pq_sendbyte(out, LOGICAL_REP_MSG_UPDATE);
@@ -457,17 +458,17 @@ logicalrep_write_update(StringInfo out, TransactionId xid, Relation rel,
/* use Oid as relation identifier */
pq_sendint32(out, RelationGetRelid(rel));
- if (oldtuple != NULL)
+ if (oldslot != NULL)
{
if (rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
pq_sendbyte(out, 'O'); /* old tuple follows */
else
pq_sendbyte(out, 'K'); /* old key follows */
- logicalrep_write_tuple(out, rel, oldtuple, binary);
+ logicalrep_write_tuple(out, rel, oldslot, binary);
}
pq_sendbyte(out, 'N'); /* new tuple follows */
- logicalrep_write_tuple(out, rel, newtuple, binary);
+ logicalrep_write_tuple(out, rel, newslot, binary);
}
/*
@@ -516,7 +517,7 @@ logicalrep_read_update(StringInfo in, bool *has_oldtuple,
*/
void
logicalrep_write_delete(StringInfo out, TransactionId xid, Relation rel,
- HeapTuple oldtuple, bool binary)
+ TupleTableSlot *oldslot, bool binary)
{
Assert(rel->rd_rel->relreplident == REPLICA_IDENTITY_DEFAULT ||
rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL ||
@@ -536,7 +537,7 @@ logicalrep_write_delete(StringInfo out, TransactionId xid, Relation rel,
else
pq_sendbyte(out, 'K'); /* old key follows */
- logicalrep_write_tuple(out, rel, oldtuple, binary);
+ logicalrep_write_tuple(out, rel, oldslot, binary);
}
/*
@@ -749,11 +750,12 @@ logicalrep_read_typ(StringInfo in, LogicalRepTyp *ltyp)
* Write a tuple to the outputstream, in the most efficient format possible.
*/
static void
-logicalrep_write_tuple(StringInfo out, Relation rel, HeapTuple tuple, bool binary)
+logicalrep_write_tuple(StringInfo out, Relation rel, TupleTableSlot *slot,
+ bool binary)
{
TupleDesc desc;
- Datum values[MaxTupleAttributeNumber];
- bool isnull[MaxTupleAttributeNumber];
+ Datum *values;
+ bool *isnull;
int i;
uint16 nliveatts = 0;
@@ -767,11 +769,9 @@ logicalrep_write_tuple(StringInfo out, Relation rel, HeapTuple tuple, bool binar
}
pq_sendint16(out, nliveatts);
- /* try to allocate enough memory from the get-go */
- enlargeStringInfo(out, tuple->t_len +
- nliveatts * (1 + 4));
-
- heap_deform_tuple(tuple, desc, values, isnull);
+ slot_getallattrs(slot);
+ values = slot->tts_values;
+ isnull = slot->tts_isnull;
/* Write the values */
for (i = 0; i < desc->natts; i++)
diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index e596b69d466..1659964571c 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -690,19 +690,23 @@ copy_read_data(void *outbuf, int minread, int maxread)
/*
* Get information about remote relation in similar fashion the RELATION
- * message provides during replication.
+ * message provides during replication. This function also returns the relation
+ * qualifications to be used in the COPY command.
*/
static void
fetch_remote_table_info(char *nspname, char *relname,
- LogicalRepRelation *lrel)
+ LogicalRepRelation *lrel, List **qual)
{
WalRcvExecResult *res;
StringInfoData cmd;
TupleTableSlot *slot;
Oid tableRow[] = {OIDOID, CHAROID, CHAROID};
Oid attrRow[] = {TEXTOID, OIDOID, BOOLOID};
+ Oid qualRow[] = {TEXTOID};
bool isnull;
int natt;
+ ListCell *lc;
+ bool first;
lrel->nspname = nspname;
lrel->relname = relname;
@@ -798,6 +802,98 @@ fetch_remote_table_info(char *nspname, char *relname,
lrel->natts = natt;
walrcv_clear_result(res);
+
+ /*
+ * Get relation's row filter expressions. DISTINCT avoids the same
+ * expression of a table in multiple publications from being included
+ * multiple times in the final expression.
+ *
+ * We need to copy the row even if it matches just one of the
+ * publications, so we later combine all the quals with OR.
+ *
+ * For initial synchronization, row filtering can be ignored in following
+ * cases:
+ *
+ * 1) one of the subscribed publications for the table hasn't specified
+ * any row filter
+ *
+ * 2) one of the subscribed publications has puballtables set to true
+ *
+ * 3) one of the subscribed publications is declared as ALL TABLES IN
+ * SCHEMA that includes this relation
+ */
+ if (walrcv_server_version(LogRepWorkerWalRcvConn) >= 150000)
+ {
+ StringInfoData pub_names;
+
+ /* Build the pubname list. */
+ initStringInfo(&pub_names);
+ first = true;
+ foreach(lc, MySubscription->publications)
+ {
+ char *pubname = strVal(lfirst(lc));
+
+ if (first)
+ first = false;
+ else
+ appendStringInfoString(&pub_names, ", ");
+
+ appendStringInfoString(&pub_names, quote_literal_cstr(pubname));
+ }
+
+ /* Check for row filters. */
+ resetStringInfo(&cmd);
+ appendStringInfo(&cmd,
+ "SELECT DISTINCT pg_get_expr(pr.prqual, pr.prrelid)"
+ " FROM pg_publication p"
+ " LEFT OUTER JOIN pg_publication_rel pr"
+ " ON (p.oid = pr.prpubid AND pr.prrelid = %u),"
+ " LATERAL pg_get_publication_tables(p.pubname) gpt"
+ " WHERE gpt.relid = %u"
+ " AND p.pubname IN ( %s )",
+ lrel->remoteid,
+ lrel->remoteid,
+ pub_names.data);
+
+ res = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data, 1, qualRow);
+
+ if (res->status != WALRCV_OK_TUPLES)
+ ereport(ERROR,
+ (errmsg("could not fetch table WHERE clause info for table \"%s.%s\" from publisher: %s",
+ nspname, relname, res->err)));
+
+ /*
+ * Multiple row filter expressions for the same table will be combined
+ * by COPY using OR. If any of the filter expressions for this table
+ * are null, it means the whole table will be copied. In this case it
+ * is not necessary to construct a unified row filter expression at
+ * all.
+ */
+ slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
+ while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
+ {
+ Datum rf = slot_getattr(slot, 1, &isnull);
+
+ if (!isnull)
+ *qual = lappend(*qual, makeString(TextDatumGetCString(rf)));
+ else
+ {
+ /* Ignore filters and cleanup as necessary. */
+ if (*qual)
+ {
+ list_free_deep(*qual);
+ *qual = NIL;
+ }
+ break;
+ }
+
+ ExecClearTuple(slot);
+ }
+ ExecDropSingleTupleTableSlot(slot);
+
+ walrcv_clear_result(res);
+ }
+
pfree(cmd.data);
}
@@ -811,6 +907,7 @@ copy_table(Relation rel)
{
LogicalRepRelMapEntry *relmapentry;
LogicalRepRelation lrel;
+ List *qual = NIL;
WalRcvExecResult *res;
StringInfoData cmd;
CopyFromState cstate;
@@ -819,7 +916,7 @@ copy_table(Relation rel)
/* Get the publisher relation info. */
fetch_remote_table_info(get_namespace_name(RelationGetNamespace(rel)),
- RelationGetRelationName(rel), &lrel);
+ RelationGetRelationName(rel), &lrel, &qual);
/* Put the relation into relmap. */
logicalrep_relmap_update(&lrel);
@@ -830,14 +927,18 @@ copy_table(Relation rel)
/* Start copy on the publisher. */
initStringInfo(&cmd);
- if (lrel.relkind == RELKIND_RELATION)
+
+ /* Regular table with no row filter */
+ if (lrel.relkind == RELKIND_RELATION && qual == NIL)
appendStringInfo(&cmd, "COPY %s TO STDOUT",
quote_qualified_identifier(lrel.nspname, lrel.relname));
else
{
/*
- * For non-tables, we need to do COPY (SELECT ...), but we can't just
- * do SELECT * because we need to not copy generated columns.
+ * For non-tables and tables with row filters, we need to do COPY
+ * (SELECT ...), but we can't just do SELECT * because we need to not
+ * copy generated columns. For tables with any row filters, build a
+ * SELECT query with OR'ed row filters for COPY.
*/
appendStringInfoString(&cmd, "COPY (SELECT ");
for (int i = 0; i < lrel.natts; i++)
@@ -846,8 +947,33 @@ copy_table(Relation rel)
if (i < lrel.natts - 1)
appendStringInfoString(&cmd, ", ");
}
- appendStringInfo(&cmd, " FROM %s) TO STDOUT",
- quote_qualified_identifier(lrel.nspname, lrel.relname));
+
+ appendStringInfoString(&cmd, " FROM ");
+
+ /*
+ * For regular tables, make sure we don't copy data from a child that
+ * inherits the named table as those will be copied separately.
+ */
+ if (lrel.relkind == RELKIND_RELATION)
+ appendStringInfoString(&cmd, "ONLY ");
+
+ appendStringInfoString(&cmd, quote_qualified_identifier(lrel.nspname, lrel.relname));
+ /* list of OR'ed filters */
+ if (qual != NIL)
+ {
+ ListCell *lc;
+ char *q = strVal(linitial(qual));
+
+ appendStringInfo(&cmd, " WHERE %s", q);
+ for_each_from(lc, qual, 1)
+ {
+ q = strVal(lfirst(lc));
+ appendStringInfo(&cmd, " OR %s", q);
+ }
+ list_free_deep(qual);
+ }
+
+ appendStringInfoString(&cmd, ") TO STDOUT");
}
res = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data, 0, NULL);
pfree(cmd.data);
diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index 4162bb8de7b..ea57a0477f0 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -15,12 +15,17 @@
#include "access/tupconvert.h"
#include "catalog/partition.h"
#include "catalog/pg_publication.h"
+#include "catalog/pg_publication_rel.h"
#include "commands/defrem.h"
+#include "executor/executor.h"
#include "fmgr.h"
+#include "nodes/makefuncs.h"
+#include "optimizer/optimizer.h"
#include "replication/logical.h"
#include "replication/logicalproto.h"
#include "replication/origin.h"
#include "replication/pgoutput.h"
+#include "utils/builtins.h"
#include "utils/inval.h"
#include "utils/lsyscache.h"
#include "utils/memutils.h"
@@ -86,6 +91,19 @@ static void send_repl_origin(LogicalDecodingContext *ctx,
bool send_origin);
/*
+ * Only 3 publication actions are used for row filtering ("insert", "update",
+ * "delete"). See RelationSyncEntry.exprstate[].
+ */
+enum RowFilterPubAction
+{
+ PUBACTION_INSERT,
+ PUBACTION_UPDATE,
+ PUBACTION_DELETE
+};
+
+#define NUM_ROWFILTER_PUBACTIONS (PUBACTION_DELETE+1)
+
+/*
* Entry in the map used to remember which relation schemas we sent.
*
* The schema_sent flag determines if the current schema record for the
@@ -117,6 +135,21 @@ typedef struct RelationSyncEntry
PublicationActions pubactions;
/*
+ * ExprState array for row filter. Different publication actions don't
+ * allow multiple expressions to always be combined into one, because
+ * updates or deletes restrict the column in expression to be part of the
+ * replica identity index whereas inserts do not have this restriction, so
+ * there is one ExprState per publication action.
+ */
+ ExprState *exprstate[NUM_ROWFILTER_PUBACTIONS];
+ EState *estate; /* executor state used for row filter */
+ MemoryContext cache_expr_cxt; /* private context for exprstate and
+ * estate, if any */
+
+ TupleTableSlot *new_slot; /* slot for storing new tuple */
+ TupleTableSlot *old_slot; /* slot for storing old tuple */
+
+ /*
* OID of the relation to publish changes as. For a partition, this may
* be set to one of its ancestors whose schema will be used when
* replicating changes, if publish_via_partition_root is set for the
@@ -130,7 +163,7 @@ typedef struct RelationSyncEntry
* same as 'relid' or if unnecessary due to partition and the ancestor
* having identical TupleDesc.
*/
- TupleConversionMap *map;
+ AttrMap *attrmap;
} RelationSyncEntry;
/* Map used to remember which relation schemas we sent. */
@@ -138,7 +171,8 @@ static HTAB *RelationSyncCache = NULL;
static void init_rel_sync_cache(MemoryContext decoding_context);
static void cleanup_rel_sync_cache(TransactionId xid, bool is_commit);
-static RelationSyncEntry *get_rel_sync_entry(PGOutputData *data, Oid relid);
+static RelationSyncEntry *get_rel_sync_entry(PGOutputData *data,
+ Relation relation);
static void rel_sync_cache_relation_cb(Datum arg, Oid relid);
static void rel_sync_cache_publication_cb(Datum arg, int cacheid,
uint32 hashvalue);
@@ -146,6 +180,20 @@ static void set_schema_sent_in_streamed_txn(RelationSyncEntry *entry,
TransactionId xid);
static bool get_schema_sent_in_streamed_txn(RelationSyncEntry *entry,
TransactionId xid);
+static void init_tuple_slot(PGOutputData *data, Relation relation,
+ RelationSyncEntry *entry);
+
+/* row filter routines */
+static EState *create_estate_for_relation(Relation rel);
+static void pgoutput_row_filter_init(PGOutputData *data,
+ List *publications,
+ RelationSyncEntry *entry);
+static bool pgoutput_row_filter_exec_expr(ExprState *state,
+ ExprContext *econtext);
+static bool pgoutput_row_filter(Relation relation, TupleTableSlot *old_slot,
+ TupleTableSlot **new_slot_ptr,
+ RelationSyncEntry *entry,
+ ReorderBufferChangeType *action);
/*
* Specify output plugin callbacks
@@ -303,6 +351,10 @@ pgoutput_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
"logical replication output context",
ALLOCSET_DEFAULT_SIZES);
+ data->cachectx = AllocSetContextCreate(ctx->context,
+ "logical replication cache context",
+ ALLOCSET_DEFAULT_SIZES);
+
ctx->output_plugin_private = data;
/* This plugin uses binary protocol. */
@@ -543,37 +595,14 @@ maybe_send_schema(LogicalDecodingContext *ctx,
return;
/*
- * Nope, so send the schema. If the changes will be published using an
- * ancestor's schema, not the relation's own, send that ancestor's schema
- * before sending relation's own (XXX - maybe sending only the former
- * suffices?). This is also a good place to set the map that will be used
- * to convert the relation's tuples into the ancestor's format, if needed.
+ * Send the schema. If the changes will be published using an ancestor's
+ * schema, not the relation's own, send that ancestor's schema before
+ * sending relation's own (XXX - maybe sending only the former suffices?).
*/
if (relentry->publish_as_relid != RelationGetRelid(relation))
{
Relation ancestor = RelationIdGetRelation(relentry->publish_as_relid);
- TupleDesc indesc = RelationGetDescr(relation);
- TupleDesc outdesc = RelationGetDescr(ancestor);
- MemoryContext oldctx;
-
- /* Map must live as long as the session does. */
- oldctx = MemoryContextSwitchTo(CacheMemoryContext);
- /*
- * Make copies of the TupleDescs that will live as long as the map
- * does before putting into the map.
- */
- indesc = CreateTupleDescCopy(indesc);
- outdesc = CreateTupleDescCopy(outdesc);
- relentry->map = convert_tuples_by_name(indesc, outdesc);
- if (relentry->map == NULL)
- {
- /* Map not necessary, so free the TupleDescs too. */
- FreeTupleDesc(indesc);
- FreeTupleDesc(outdesc);
- }
-
- MemoryContextSwitchTo(oldctx);
send_relation_and_attrs(ancestor, xid, ctx);
RelationClose(ancestor);
}
@@ -625,6 +654,484 @@ send_relation_and_attrs(Relation relation, TransactionId xid,
}
/*
+ * Executor state preparation for evaluation of row filter expressions for the
+ * specified relation.
+ */
+static EState *
+create_estate_for_relation(Relation rel)
+{
+ EState *estate;
+ RangeTblEntry *rte;
+
+ estate = CreateExecutorState();
+
+ rte = makeNode(RangeTblEntry);
+ rte->rtekind = RTE_RELATION;
+ rte->relid = RelationGetRelid(rel);
+ rte->relkind = rel->rd_rel->relkind;
+ rte->rellockmode = AccessShareLock;
+ ExecInitRangeTable(estate, list_make1(rte));
+
+ estate->es_output_cid = GetCurrentCommandId(false);
+
+ return estate;
+}
+
+/*
+ * Evaluates row filter.
+ *
+ * If the row filter evaluates to NULL, it is taken as false i.e. the change
+ * isn't replicated.
+ */
+static bool
+pgoutput_row_filter_exec_expr(ExprState *state, ExprContext *econtext)
+{
+ Datum ret;
+ bool isnull;
+
+ Assert(state != NULL);
+
+ ret = ExecEvalExprSwitchContext(state, econtext, &isnull);
+
+ elog(DEBUG3, "row filter evaluates to %s (isnull: %s)",
+ isnull ? "false" : DatumGetBool(ret) ? "true" : "false",
+ isnull ? "true" : "false");
+
+ if (isnull)
+ return false;
+
+ return DatumGetBool(ret);
+}
+
+/*
+ * Initialize the row filter.
+ */
+static void
+pgoutput_row_filter_init(PGOutputData *data, List *publications,
+ RelationSyncEntry *entry)
+{
+ ListCell *lc;
+ List *rfnodes[] = {NIL, NIL, NIL}; /* One per pubaction */
+ bool no_filter[] = {false, false, false}; /* One per pubaction */
+ MemoryContext oldctx;
+ int idx;
+ bool has_filter = true;
+
+ /*
+ * Find if there are any row filters for this relation. If there are, then
+ * prepare the necessary ExprState and cache it in entry->exprstate. To
+ * build an expression state, we need to ensure the following:
+ *
+ * All the given publication-table mappings must be checked.
+ *
+ * Multiple publications might have multiple row filters for this
+ * relation. Since row filter usage depends on the DML operation, there
+ * are multiple lists (one for each operation) to which row filters will
+ * be appended.
+ *
+ * FOR ALL TABLES implies "don't use row filter expression" so it takes
+ * precedence.
+ */
+ foreach(lc, publications)
+ {
+ Publication *pub = lfirst(lc);
+ HeapTuple rftuple = NULL;
+ Datum rfdatum = 0;
+ bool pub_no_filter = false;
+
+ if (pub->alltables)
+ {
+ /*
+ * If the publication is FOR ALL TABLES then it is treated the
+ * same as if this table has no row filters (even if for other
+ * publications it does).
+ */
+ pub_no_filter = true;
+ }
+ else
+ {
+ /*
+ * Check for the presence of a row filter in this publication.
+ */
+ rftuple = SearchSysCache2(PUBLICATIONRELMAP,
+ ObjectIdGetDatum(entry->publish_as_relid),
+ ObjectIdGetDatum(pub->oid));
+
+ if (HeapTupleIsValid(rftuple))
+ {
+ /* Null indicates no filter. */
+ rfdatum = SysCacheGetAttr(PUBLICATIONRELMAP, rftuple,
+ Anum_pg_publication_rel_prqual,
+ &pub_no_filter);
+ }
+ else
+ {
+ pub_no_filter = true;
+ }
+ }
+
+ if (pub_no_filter)
+ {
+ if (rftuple)
+ ReleaseSysCache(rftuple);
+
+ no_filter[PUBACTION_INSERT] |= pub->pubactions.pubinsert;
+ no_filter[PUBACTION_UPDATE] |= pub->pubactions.pubupdate;
+ no_filter[PUBACTION_DELETE] |= pub->pubactions.pubdelete;
+
+ /*
+ * Quick exit if all the DML actions are publicized via this
+ * publication.
+ */
+ if (no_filter[PUBACTION_INSERT] &&
+ no_filter[PUBACTION_UPDATE] &&
+ no_filter[PUBACTION_DELETE])
+ {
+ has_filter = false;
+ break;
+ }
+
+ /* No additional work for this publication. Next one. */
+ continue;
+ }
+
+ /* Form the per pubaction row filter lists. */
+ if (pub->pubactions.pubinsert && !no_filter[PUBACTION_INSERT])
+ rfnodes[PUBACTION_INSERT] = lappend(rfnodes[PUBACTION_INSERT],
+ TextDatumGetCString(rfdatum));
+ if (pub->pubactions.pubupdate && !no_filter[PUBACTION_UPDATE])
+ rfnodes[PUBACTION_UPDATE] = lappend(rfnodes[PUBACTION_UPDATE],
+ TextDatumGetCString(rfdatum));
+ if (pub->pubactions.pubdelete && !no_filter[PUBACTION_DELETE])
+ rfnodes[PUBACTION_DELETE] = lappend(rfnodes[PUBACTION_DELETE],
+ TextDatumGetCString(rfdatum));
+
+ ReleaseSysCache(rftuple);
+ } /* loop all subscribed publications */
+
+ /* Clean the row filter */
+ for (idx = 0; idx < NUM_ROWFILTER_PUBACTIONS; idx++)
+ {
+ if (no_filter[idx])
+ {
+ list_free_deep(rfnodes[idx]);
+ rfnodes[idx] = NIL;
+ }
+ }
+
+ if (has_filter)
+ {
+ Relation relation = RelationIdGetRelation(entry->publish_as_relid);
+
+ Assert(entry->cache_expr_cxt == NULL);
+
+ /* Create the memory context for row filters */
+ entry->cache_expr_cxt = AllocSetContextCreate(data->cachectx,
+ "Row filter expressions",
+ ALLOCSET_DEFAULT_SIZES);
+
+ MemoryContextCopyAndSetIdentifier(entry->cache_expr_cxt,
+ RelationGetRelationName(relation));
+
+ /*
+ * Now all the filters for all pubactions are known. Combine them when
+ * their pubactions are the same.
+ */
+ oldctx = MemoryContextSwitchTo(entry->cache_expr_cxt);
+ entry->estate = create_estate_for_relation(relation);
+ for (idx = 0; idx < NUM_ROWFILTER_PUBACTIONS; idx++)
+ {
+ List *filters = NIL;
+ Expr *rfnode;
+
+ if (rfnodes[idx] == NIL)
+ continue;
+
+ foreach(lc, rfnodes[idx])
+ filters = lappend(filters, stringToNode((char *) lfirst(lc)));
+
+ /* combine the row filter and cache the ExprState */
+ rfnode = make_orclause(filters);
+ entry->exprstate[idx] = ExecPrepareExpr(rfnode, entry->estate);
+ } /* for each pubaction */
+ MemoryContextSwitchTo(oldctx);
+
+ RelationClose(relation);
+ }
+}
+
+/*
+ * Initialize the slot for storing new and old tuples, and build the map that
+ * will be used to convert the relation's tuples into the ancestor's format.
+ */
+static void
+init_tuple_slot(PGOutputData *data, Relation relation,
+ RelationSyncEntry *entry)
+{
+ MemoryContext oldctx;
+ TupleDesc oldtupdesc;
+ TupleDesc newtupdesc;
+
+ oldctx = MemoryContextSwitchTo(data->cachectx);
+
+ /*
+ * Create tuple table slots. Create a copy of the TupleDesc as it needs to
+ * live as long as the cache remains.
+ */
+ oldtupdesc = CreateTupleDescCopy(RelationGetDescr(relation));
+ newtupdesc = CreateTupleDescCopy(RelationGetDescr(relation));
+
+ entry->old_slot = MakeSingleTupleTableSlot(oldtupdesc, &TTSOpsHeapTuple);
+ entry->new_slot = MakeSingleTupleTableSlot(newtupdesc, &TTSOpsHeapTuple);
+
+ MemoryContextSwitchTo(oldctx);
+
+ /*
+ * Cache the map that will be used to convert the relation's tuples into
+ * the ancestor's format, if needed.
+ */
+ if (entry->publish_as_relid != RelationGetRelid(relation))
+ {
+ Relation ancestor = RelationIdGetRelation(entry->publish_as_relid);
+ TupleDesc indesc = RelationGetDescr(relation);
+ TupleDesc outdesc = RelationGetDescr(ancestor);
+
+ /* Map must live as long as the session does. */
+ oldctx = MemoryContextSwitchTo(CacheMemoryContext);
+
+ entry->attrmap = build_attrmap_by_name_if_req(indesc, outdesc);
+
+ MemoryContextSwitchTo(oldctx);
+ RelationClose(ancestor);
+ }
+}
+
+/*
+ * Change is checked against the row filter if any.
+ *
+ * Returns true if the change is to be replicated, else false.
+ *
+ * For inserts, evaluate the row filter for new tuple.
+ * For deletes, evaluate the row filter for old tuple.
+ * For updates, evaluate the row filter for old and new tuple.
+ *
+ * For updates, if both evaluations are true, we allow sending the UPDATE and
+ * if both the evaluations are false, it doesn't replicate the UPDATE. Now, if
+ * only one of the tuples matches the row filter expression, we transform
+ * UPDATE to DELETE or INSERT to avoid any data inconsistency based on the
+ * following rules:
+ *
+ * Case 1: old-row (no match) new-row (no match) -> (drop change)
+ * Case 2: old-row (no match) new row (match) -> INSERT
+ * Case 3: old-row (match) new-row (no match) -> DELETE
+ * Case 4: old-row (match) new row (match) -> UPDATE
+ *
+ * The new action is updated in the action parameter.
+ *
+ * The new slot could be updated when transforming the UPDATE into INSERT,
+ * because the original new tuple might not have column values from the replica
+ * identity.
+ *
+ * Examples:
+ * Let's say the old tuple satisfies the row filter but the new tuple doesn't.
+ * Since the old tuple satisfies, the initial table synchronization copied this
+ * row (or another method was used to guarantee that there is data
+ * consistency). However, after the UPDATE the new tuple doesn't satisfy the
+ * row filter, so from a data consistency perspective, that row should be
+ * removed on the subscriber. The UPDATE should be transformed into a DELETE
+ * statement and be sent to the subscriber. Keeping this row on the subscriber
+ * is undesirable because it doesn't reflect what was defined in the row filter
+ * expression on the publisher. This row on the subscriber would likely not be
+ * modified by replication again. If someone inserted a new row with the same
+ * old identifier, replication could stop due to a constraint violation.
+ *
+ * Let's say the old tuple doesn't match the row filter but the new tuple does.
+ * Since the old tuple doesn't satisfy, the initial table synchronization
+ * probably didn't copy this row. However, after the UPDATE the new tuple does
+ * satisfy the row filter, so from a data consistency perspective, that row
+ * should be inserted on the subscriber. Otherwise, subsequent UPDATE or DELETE
+ * statements have no effect (it matches no row -- see
+ * apply_handle_update_internal()). So, the UPDATE should be transformed into a
+ * INSERT statement and be sent to the subscriber. However, this might surprise
+ * someone who expects the data set to satisfy the row filter expression on the
+ * provider.
+ */
+static bool
+pgoutput_row_filter(Relation relation, TupleTableSlot *old_slot,
+ TupleTableSlot **new_slot_ptr, RelationSyncEntry *entry,
+ ReorderBufferChangeType *action)
+{
+ TupleDesc desc;
+ int i;
+ bool old_matched,
+ new_matched,
+ result;
+ TupleTableSlot *tmp_new_slot;
+ TupleTableSlot *new_slot = *new_slot_ptr;
+ ExprContext *ecxt;
+ ExprState *filter_exprstate;
+
+ /*
+ * We need this map to avoid relying on ReorderBufferChangeType enums
+ * having specific values.
+ */
+ static const int map_changetype_pubaction[] = {
+ [REORDER_BUFFER_CHANGE_INSERT] = PUBACTION_INSERT,
+ [REORDER_BUFFER_CHANGE_UPDATE] = PUBACTION_UPDATE,
+ [REORDER_BUFFER_CHANGE_DELETE] = PUBACTION_DELETE
+ };
+
+ Assert(*action == REORDER_BUFFER_CHANGE_INSERT ||
+ *action == REORDER_BUFFER_CHANGE_UPDATE ||
+ *action == REORDER_BUFFER_CHANGE_DELETE);
+
+ Assert(new_slot || old_slot);
+
+ /* Get the corresponding row filter */
+ filter_exprstate = entry->exprstate[map_changetype_pubaction[*action]];
+
+ /* Bail out if there is no row filter */
+ if (!filter_exprstate)
+ return true;
+
+ elog(DEBUG3, "table \"%s.%s\" has row filter",
+ get_namespace_name(RelationGetNamespace(relation)),
+ RelationGetRelationName(relation));
+
+ ResetPerTupleExprContext(entry->estate);
+
+ ecxt = GetPerTupleExprContext(entry->estate);
+
+ /*
+ * For the following occasions where there is only one tuple, we can
+ * evaluate the row filter for that tuple and return.
+ *
+ * For inserts, we only have the new tuple.
+ *
+ * For updates, we can have only a new tuple when none of the replica
+ * identity columns changed but we still need to evaluate the row filter
+ * for new tuple as the existing values of those columns might not match
+ * the filter. Also, users can use constant expressions in the row filter,
+ * so we anyway need to evaluate it for the new tuple.
+ *
+ * For deletes, we only have the old tuple.
+ */
+ if (!new_slot || !old_slot)
+ {
+ ecxt->ecxt_scantuple = new_slot ? new_slot : old_slot;
+ result = pgoutput_row_filter_exec_expr(filter_exprstate, ecxt);
+
+ return result;
+ }
+
+ /*
+ * Both the old and new tuples must be valid only for updates and need to
+ * be checked against the row filter.
+ */
+ Assert(map_changetype_pubaction[*action] == PUBACTION_UPDATE);
+
+ slot_getallattrs(new_slot);
+ slot_getallattrs(old_slot);
+
+ tmp_new_slot = NULL;
+ desc = RelationGetDescr(relation);
+
+ /*
+ * The new tuple might not have all the replica identity columns, in which
+ * case it needs to be copied over from the old tuple.
+ */
+ for (i = 0; i < desc->natts; i++)
+ {
+ Form_pg_attribute att = TupleDescAttr(desc, i);
+
+ /*
+ * if the column in the new tuple or old tuple is null, nothing to do
+ */
+ if (new_slot->tts_isnull[i] || old_slot->tts_isnull[i])
+ continue;
+
+ /*
+ * Unchanged toasted replica identity columns are only logged in the
+ * old tuple. Copy this over to the new tuple. The changed (or WAL
+ * Logged) toast values are always assembled in memory and set as
+ * VARTAG_INDIRECT. See ReorderBufferToastReplace.
+ */
+ if (att->attlen == -1 &&
+ VARATT_IS_EXTERNAL_ONDISK(new_slot->tts_values[i]) &&
+ !VARATT_IS_EXTERNAL_ONDISK(old_slot->tts_values[i]))
+ {
+ if (!tmp_new_slot)
+ {
+ tmp_new_slot = MakeSingleTupleTableSlot(desc, &TTSOpsVirtual);
+ ExecClearTuple(tmp_new_slot);
+
+ memcpy(tmp_new_slot->tts_values, new_slot->tts_values,
+ desc->natts * sizeof(Datum));
+ memcpy(tmp_new_slot->tts_isnull, new_slot->tts_isnull,
+ desc->natts * sizeof(bool));
+ }
+
+ tmp_new_slot->tts_values[i] = old_slot->tts_values[i];
+ tmp_new_slot->tts_isnull[i] = old_slot->tts_isnull[i];
+ }
+ }
+
+ ecxt->ecxt_scantuple = old_slot;
+ old_matched = pgoutput_row_filter_exec_expr(filter_exprstate, ecxt);
+
+ if (tmp_new_slot)
+ {
+ ExecStoreVirtualTuple(tmp_new_slot);
+ ecxt->ecxt_scantuple = tmp_new_slot;
+ }
+ else
+ ecxt->ecxt_scantuple = new_slot;
+
+ new_matched = pgoutput_row_filter_exec_expr(filter_exprstate, ecxt);
+
+ /*
+ * Case 1: if both tuples don't match the row filter, bailout. Send
+ * nothing.
+ */
+ if (!old_matched && !new_matched)
+ return false;
+
+ /*
+ * Case 2: if the old tuple doesn't satisfy the row filter but the new
+ * tuple does, transform the UPDATE into INSERT.
+ *
+ * Use the newly transformed tuple that must contain the column values for
+ * all the replica identity columns. This is required to ensure that the
+ * while inserting the tuple in the downstream node, we have all the
+ * required column values.
+ */
+ if (!old_matched && new_matched)
+ {
+ *action = REORDER_BUFFER_CHANGE_INSERT;
+
+ if (tmp_new_slot)
+ *new_slot_ptr = tmp_new_slot;
+ }
+
+ /*
+ * Case 3: if the old tuple satisfies the row filter but the new tuple
+ * doesn't, transform the UPDATE into DELETE.
+ *
+ * This transformation does not require another tuple. The Old tuple will
+ * be used for DELETE.
+ */
+ else if (old_matched && !new_matched)
+ *action = REORDER_BUFFER_CHANGE_DELETE;
+
+ /*
+ * Case 4: if both tuples match the row filter, transformation isn't
+ * required. (*action is default UPDATE).
+ */
+
+ return true;
+}
+
+/*
* Sends the decoded DML over wire.
*
* This is called both in streaming and non-streaming modes.
@@ -638,6 +1145,10 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
RelationSyncEntry *relentry;
TransactionId xid = InvalidTransactionId;
Relation ancestor = NULL;
+ Relation targetrel = relation;
+ ReorderBufferChangeType action = change->action;
+ TupleTableSlot *old_slot = NULL;
+ TupleTableSlot *new_slot = NULL;
if (!is_publishable_relation(relation))
return;
@@ -651,10 +1162,10 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
if (in_streaming)
xid = change->txn->xid;
- relentry = get_rel_sync_entry(data, RelationGetRelid(relation));
+ relentry = get_rel_sync_entry(data, relation);
/* First check the table filter */
- switch (change->action)
+ switch (action)
{
case REORDER_BUFFER_CHANGE_INSERT:
if (!relentry->pubactions.pubinsert)
@@ -675,80 +1186,149 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
/* Avoid leaking memory by using and resetting our own context */
old = MemoryContextSwitchTo(data->context);
- maybe_send_schema(ctx, change, relation, relentry);
-
/* Send the data */
- switch (change->action)
+ switch (action)
{
case REORDER_BUFFER_CHANGE_INSERT:
- {
- HeapTuple tuple = &change->data.tp.newtuple->tuple;
+ new_slot = relentry->new_slot;
+ ExecStoreHeapTuple(&change->data.tp.newtuple->tuple,
+ new_slot, false);
- /* Switch relation if publishing via root. */
- if (relentry->publish_as_relid != RelationGetRelid(relation))
+ /* Switch relation if publishing via root. */
+ if (relentry->publish_as_relid != RelationGetRelid(relation))
+ {
+ Assert(relation->rd_rel->relispartition);
+ ancestor = RelationIdGetRelation(relentry->publish_as_relid);
+ targetrel = ancestor;
+ /* Convert tuple if needed. */
+ if (relentry->attrmap)
{
- Assert(relation->rd_rel->relispartition);
- ancestor = RelationIdGetRelation(relentry->publish_as_relid);
- relation = ancestor;
- /* Convert tuple if needed. */
- if (relentry->map)
- tuple = execute_attr_map_tuple(tuple, relentry->map);
+ TupleDesc tupdesc = RelationGetDescr(targetrel);
+
+ new_slot = execute_attr_map_slot(relentry->attrmap,
+ new_slot,
+ MakeTupleTableSlot(tupdesc, &TTSOpsVirtual));
}
+ }
- OutputPluginPrepareWrite(ctx, true);
- logicalrep_write_insert(ctx->out, xid, relation, tuple,
- data->binary);
- OutputPluginWrite(ctx, true);
+ /* Check row filter */
+ if (!pgoutput_row_filter(targetrel, NULL, &new_slot, relentry,
+ &action))
break;
- }
+
+ /*
+ * Schema should be sent using the original relation because it
+ * also sends the ancestor's relation.
+ */
+ maybe_send_schema(ctx, change, relation, relentry);
+
+ OutputPluginPrepareWrite(ctx, true);
+ logicalrep_write_insert(ctx->out, xid, targetrel, new_slot,
+ data->binary);
+ OutputPluginWrite(ctx, true);
+ break;
case REORDER_BUFFER_CHANGE_UPDATE:
+ if (change->data.tp.oldtuple)
{
- HeapTuple oldtuple = change->data.tp.oldtuple ?
- &change->data.tp.oldtuple->tuple : NULL;
- HeapTuple newtuple = &change->data.tp.newtuple->tuple;
+ old_slot = relentry->old_slot;
+ ExecStoreHeapTuple(&change->data.tp.oldtuple->tuple,
+ old_slot, false);
+ }
- /* Switch relation if publishing via root. */
- if (relentry->publish_as_relid != RelationGetRelid(relation))
+ new_slot = relentry->new_slot;
+ ExecStoreHeapTuple(&change->data.tp.newtuple->tuple,
+ new_slot, false);
+
+ /* Switch relation if publishing via root. */
+ if (relentry->publish_as_relid != RelationGetRelid(relation))
+ {
+ Assert(relation->rd_rel->relispartition);
+ ancestor = RelationIdGetRelation(relentry->publish_as_relid);
+ targetrel = ancestor;
+ /* Convert tuples if needed. */
+ if (relentry->attrmap)
{
- Assert(relation->rd_rel->relispartition);
- ancestor = RelationIdGetRelation(relentry->publish_as_relid);
- relation = ancestor;
- /* Convert tuples if needed. */
- if (relentry->map)
- {
- if (oldtuple)
- oldtuple = execute_attr_map_tuple(oldtuple,
- relentry->map);
- newtuple = execute_attr_map_tuple(newtuple,
- relentry->map);
- }
+ TupleDesc tupdesc = RelationGetDescr(targetrel);
+
+ if (old_slot)
+ old_slot = execute_attr_map_slot(relentry->attrmap,
+ old_slot,
+ MakeTupleTableSlot(tupdesc, &TTSOpsVirtual));
+
+ new_slot = execute_attr_map_slot(relentry->attrmap,
+ new_slot,
+ MakeTupleTableSlot(tupdesc, &TTSOpsVirtual));
}
+ }
- OutputPluginPrepareWrite(ctx, true);
- logicalrep_write_update(ctx->out, xid, relation, oldtuple,
- newtuple, data->binary);
- OutputPluginWrite(ctx, true);
+ /* Check row filter */
+ if (!pgoutput_row_filter(targetrel, old_slot, &new_slot,
+ relentry, &action))
break;
+
+ maybe_send_schema(ctx, change, relation, relentry);
+
+ OutputPluginPrepareWrite(ctx, true);
+
+ /*
+ * Updates could be transformed to inserts or deletes based on the
+ * results of the row filter for old and new tuple.
+ */
+ switch (action)
+ {
+ case REORDER_BUFFER_CHANGE_INSERT:
+ logicalrep_write_insert(ctx->out, xid, targetrel,
+ new_slot, data->binary);
+ break;
+ case REORDER_BUFFER_CHANGE_UPDATE:
+ logicalrep_write_update(ctx->out, xid, targetrel,
+ old_slot, new_slot, data->binary);
+ break;
+ case REORDER_BUFFER_CHANGE_DELETE:
+ logicalrep_write_delete(ctx->out, xid, targetrel,
+ old_slot, data->binary);
+ break;
+ default:
+ Assert(false);
}
+
+ OutputPluginWrite(ctx, true);
+ break;
case REORDER_BUFFER_CHANGE_DELETE:
if (change->data.tp.oldtuple)
{
- HeapTuple oldtuple = &change->data.tp.oldtuple->tuple;
+ old_slot = relentry->old_slot;
+
+ ExecStoreHeapTuple(&change->data.tp.oldtuple->tuple,
+ old_slot, false);
/* Switch relation if publishing via root. */
if (relentry->publish_as_relid != RelationGetRelid(relation))
{
Assert(relation->rd_rel->relispartition);
ancestor = RelationIdGetRelation(relentry->publish_as_relid);
- relation = ancestor;
+ targetrel = ancestor;
/* Convert tuple if needed. */
- if (relentry->map)
- oldtuple = execute_attr_map_tuple(oldtuple, relentry->map);
+ if (relentry->attrmap)
+ {
+ TupleDesc tupdesc = RelationGetDescr(targetrel);
+
+ old_slot = execute_attr_map_slot(relentry->attrmap,
+ old_slot,
+ MakeTupleTableSlot(tupdesc, &TTSOpsVirtual));
+ }
}
+ /* Check row filter */
+ if (!pgoutput_row_filter(targetrel, old_slot, &new_slot,
+ relentry, &action))
+ break;
+
+ maybe_send_schema(ctx, change, relation, relentry);
+
OutputPluginPrepareWrite(ctx, true);
- logicalrep_write_delete(ctx->out, xid, relation, oldtuple,
- data->binary);
+ logicalrep_write_delete(ctx->out, xid, targetrel,
+ old_slot, data->binary);
OutputPluginWrite(ctx, true);
}
else
@@ -798,7 +1378,7 @@ pgoutput_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
if (!is_publishable_relation(relation))
continue;
- relentry = get_rel_sync_entry(data, relid);
+ relentry = get_rel_sync_entry(data, relation);
if (!relentry->pubactions.pubtruncate)
continue;
@@ -873,8 +1453,9 @@ pgoutput_origin_filter(LogicalDecodingContext *ctx,
/*
* Shutdown the output plugin.
*
- * Note, we don't need to clean the data->context as it's child context
- * of the ctx->context so it will be cleaned up by logical decoding machinery.
+ * Note, we don't need to clean the data->context and data->cachectx as
+ * they are child context of the ctx->context so it will be cleaned up by
+ * logical decoding machinery.
*/
static void
pgoutput_shutdown(LogicalDecodingContext *ctx)
@@ -1122,11 +1703,12 @@ set_schema_sent_in_streamed_txn(RelationSyncEntry *entry, TransactionId xid)
* when publishing.
*/
static RelationSyncEntry *
-get_rel_sync_entry(PGOutputData *data, Oid relid)
+get_rel_sync_entry(PGOutputData *data, Relation relation)
{
RelationSyncEntry *entry;
bool found;
MemoryContext oldctx;
+ Oid relid = RelationGetRelid(relation);
Assert(RelationSyncCache != NULL);
@@ -1144,9 +1726,12 @@ get_rel_sync_entry(PGOutputData *data, Oid relid)
entry->streamed_txns = NIL;
entry->pubactions.pubinsert = entry->pubactions.pubupdate =
entry->pubactions.pubdelete = entry->pubactions.pubtruncate = false;
+ entry->new_slot = NULL;
+ entry->old_slot = NULL;
+ memset(entry->exprstate, 0, sizeof(entry->exprstate));
+ entry->cache_expr_cxt = NULL;
entry->publish_as_relid = InvalidOid;
- entry->map = NULL; /* will be set by maybe_send_schema() if
- * needed */
+ entry->attrmap = NULL;
}
/* Validate the entry */
@@ -1165,6 +1750,7 @@ get_rel_sync_entry(PGOutputData *data, Oid relid)
Oid publish_as_relid = relid;
bool am_partition = get_rel_relispartition(relid);
char relkind = get_rel_relkind(relid);
+ List *rel_publications = NIL;
/* Reload publications if needed before use. */
if (!publications_valid)
@@ -1193,17 +1779,31 @@ get_rel_sync_entry(PGOutputData *data, Oid relid)
entry->pubactions.pubupdate = false;
entry->pubactions.pubdelete = false;
entry->pubactions.pubtruncate = false;
- if (entry->map)
- {
- /*
- * Must free the TupleDescs contained in the map explicitly,
- * because free_conversion_map() doesn't.
- */
- FreeTupleDesc(entry->map->indesc);
- FreeTupleDesc(entry->map->outdesc);
- free_conversion_map(entry->map);
- }
- entry->map = NULL;
+
+ /*
+ * Tuple slots cleanups. (Will be rebuilt later if needed).
+ */
+ if (entry->old_slot)
+ ExecDropSingleTupleTableSlot(entry->old_slot);
+ if (entry->new_slot)
+ ExecDropSingleTupleTableSlot(entry->new_slot);
+
+ entry->old_slot = NULL;
+ entry->new_slot = NULL;
+
+ if (entry->attrmap)
+ free_attrmap(entry->attrmap);
+ entry->attrmap = NULL;
+
+ /*
+ * Row filter cache cleanups.
+ */
+ if (entry->cache_expr_cxt)
+ MemoryContextDelete(entry->cache_expr_cxt);
+
+ entry->cache_expr_cxt = NULL;
+ entry->estate = NULL;
+ memset(entry->exprstate, 0, sizeof(entry->exprstate));
/*
* Build publication cache. We can't use one provided by relcache as
@@ -1234,28 +1834,17 @@ get_rel_sync_entry(PGOutputData *data, Oid relid)
*/
if (am_partition)
{
+ Oid ancestor;
List *ancestors = get_partition_ancestors(relid);
- ListCell *lc2;
- /*
- * Find the "topmost" ancestor that is in this
- * publication.
- */
- foreach(lc2, ancestors)
+ ancestor = GetTopMostAncestorInPublication(pub->oid,
+ ancestors);
+
+ if (ancestor != InvalidOid)
{
- Oid ancestor = lfirst_oid(lc2);
- List *apubids = GetRelationPublications(ancestor);
- List *aschemaPubids = GetSchemaPublications(get_rel_namespace(ancestor));
-
- if (list_member_oid(apubids, pub->oid) ||
- list_member_oid(aschemaPubids, pub->oid))
- {
- ancestor_published = true;
- if (pub->pubviaroot)
- publish_as_relid = ancestor;
- }
- list_free(apubids);
- list_free(aschemaPubids);
+ ancestor_published = true;
+ if (pub->pubviaroot)
+ publish_as_relid = ancestor;
}
}
@@ -1277,17 +1866,31 @@ get_rel_sync_entry(PGOutputData *data, Oid relid)
entry->pubactions.pubupdate |= pub->pubactions.pubupdate;
entry->pubactions.pubdelete |= pub->pubactions.pubdelete;
entry->pubactions.pubtruncate |= pub->pubactions.pubtruncate;
+
+ rel_publications = lappend(rel_publications, pub);
}
+ }
- if (entry->pubactions.pubinsert && entry->pubactions.pubupdate &&
- entry->pubactions.pubdelete && entry->pubactions.pubtruncate)
- break;
+ entry->publish_as_relid = publish_as_relid;
+
+ /*
+ * Initialize the tuple slot, map, and row filter. These are only used
+ * when publishing inserts, updates, or deletes.
+ */
+ if (entry->pubactions.pubinsert || entry->pubactions.pubupdate ||
+ entry->pubactions.pubdelete)
+ {
+ /* Initialize the tuple slot and map */
+ init_tuple_slot(data, relation, entry);
+
+ /* Initialize the row filter */
+ pgoutput_row_filter_init(data, rel_publications, entry);
}
list_free(pubids);
list_free(schemaPubids);
+ list_free(rel_publications);
- entry->publish_as_relid = publish_as_relid;
entry->replicate_valid = true;
}