summaryrefslogtreecommitdiff
path: root/src/backend/replication/logical
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/replication/logical')
-rw-r--r--src/backend/replication/logical/proto.c36
-rw-r--r--src/backend/replication/logical/tablesync.c142
2 files changed, 152 insertions, 26 deletions
diff --git a/src/backend/replication/logical/proto.c b/src/backend/replication/logical/proto.c
index 953942692ce..c9b0eeefd7e 100644
--- a/src/backend/replication/logical/proto.c
+++ b/src/backend/replication/logical/proto.c
@@ -31,8 +31,8 @@
static void logicalrep_write_attrs(StringInfo out, Relation rel);
static void logicalrep_write_tuple(StringInfo out, Relation rel,
- HeapTuple tuple, bool binary);
-
+ TupleTableSlot *slot,
+ bool binary);
static void logicalrep_read_attrs(StringInfo in, LogicalRepRelation *rel);
static void logicalrep_read_tuple(StringInfo in, LogicalRepTupleData *tuple);
@@ -398,7 +398,7 @@ logicalrep_read_origin(StringInfo in, XLogRecPtr *origin_lsn)
*/
void
logicalrep_write_insert(StringInfo out, TransactionId xid, Relation rel,
- HeapTuple newtuple, bool binary)
+ TupleTableSlot *newslot, bool binary)
{
pq_sendbyte(out, LOGICAL_REP_MSG_INSERT);
@@ -410,7 +410,7 @@ logicalrep_write_insert(StringInfo out, TransactionId xid, Relation rel,
pq_sendint32(out, RelationGetRelid(rel));
pq_sendbyte(out, 'N'); /* new tuple follows */
- logicalrep_write_tuple(out, rel, newtuple, binary);
+ logicalrep_write_tuple(out, rel, newslot, binary);
}
/*
@@ -442,7 +442,8 @@ logicalrep_read_insert(StringInfo in, LogicalRepTupleData *newtup)
*/
void
logicalrep_write_update(StringInfo out, TransactionId xid, Relation rel,
- HeapTuple oldtuple, HeapTuple newtuple, bool binary)
+ TupleTableSlot *oldslot, TupleTableSlot *newslot,
+ bool binary)
{
pq_sendbyte(out, LOGICAL_REP_MSG_UPDATE);
@@ -457,17 +458,17 @@ logicalrep_write_update(StringInfo out, TransactionId xid, Relation rel,
/* use Oid as relation identifier */
pq_sendint32(out, RelationGetRelid(rel));
- if (oldtuple != NULL)
+ if (oldslot != NULL)
{
if (rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
pq_sendbyte(out, 'O'); /* old tuple follows */
else
pq_sendbyte(out, 'K'); /* old key follows */
- logicalrep_write_tuple(out, rel, oldtuple, binary);
+ logicalrep_write_tuple(out, rel, oldslot, binary);
}
pq_sendbyte(out, 'N'); /* new tuple follows */
- logicalrep_write_tuple(out, rel, newtuple, binary);
+ logicalrep_write_tuple(out, rel, newslot, binary);
}
/*
@@ -516,7 +517,7 @@ logicalrep_read_update(StringInfo in, bool *has_oldtuple,
*/
void
logicalrep_write_delete(StringInfo out, TransactionId xid, Relation rel,
- HeapTuple oldtuple, bool binary)
+ TupleTableSlot *oldslot, bool binary)
{
Assert(rel->rd_rel->relreplident == REPLICA_IDENTITY_DEFAULT ||
rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL ||
@@ -536,7 +537,7 @@ logicalrep_write_delete(StringInfo out, TransactionId xid, Relation rel,
else
pq_sendbyte(out, 'K'); /* old key follows */
- logicalrep_write_tuple(out, rel, oldtuple, binary);
+ logicalrep_write_tuple(out, rel, oldslot, binary);
}
/*
@@ -749,11 +750,12 @@ logicalrep_read_typ(StringInfo in, LogicalRepTyp *ltyp)
* Write a tuple to the outputstream, in the most efficient format possible.
*/
static void
-logicalrep_write_tuple(StringInfo out, Relation rel, HeapTuple tuple, bool binary)
+logicalrep_write_tuple(StringInfo out, Relation rel, TupleTableSlot *slot,
+ bool binary)
{
TupleDesc desc;
- Datum values[MaxTupleAttributeNumber];
- bool isnull[MaxTupleAttributeNumber];
+ Datum *values;
+ bool *isnull;
int i;
uint16 nliveatts = 0;
@@ -767,11 +769,9 @@ logicalrep_write_tuple(StringInfo out, Relation rel, HeapTuple tuple, bool binar
}
pq_sendint16(out, nliveatts);
- /* try to allocate enough memory from the get-go */
- enlargeStringInfo(out, tuple->t_len +
- nliveatts * (1 + 4));
-
- heap_deform_tuple(tuple, desc, values, isnull);
+ slot_getallattrs(slot);
+ values = slot->tts_values;
+ isnull = slot->tts_isnull;
/* Write the values */
for (i = 0; i < desc->natts; i++)
diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index e596b69d466..1659964571c 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -690,19 +690,23 @@ copy_read_data(void *outbuf, int minread, int maxread)
/*
* Get information about remote relation in similar fashion the RELATION
- * message provides during replication.
+ * message provides during replication. This function also returns the relation
+ * qualifications to be used in the COPY command.
*/
static void
fetch_remote_table_info(char *nspname, char *relname,
- LogicalRepRelation *lrel)
+ LogicalRepRelation *lrel, List **qual)
{
WalRcvExecResult *res;
StringInfoData cmd;
TupleTableSlot *slot;
Oid tableRow[] = {OIDOID, CHAROID, CHAROID};
Oid attrRow[] = {TEXTOID, OIDOID, BOOLOID};
+ Oid qualRow[] = {TEXTOID};
bool isnull;
int natt;
+ ListCell *lc;
+ bool first;
lrel->nspname = nspname;
lrel->relname = relname;
@@ -798,6 +802,98 @@ fetch_remote_table_info(char *nspname, char *relname,
lrel->natts = natt;
walrcv_clear_result(res);
+
+ /*
+ * Get relation's row filter expressions. DISTINCT avoids the same
+ * expression of a table in multiple publications from being included
+ * multiple times in the final expression.
+ *
+ * We need to copy the row even if it matches just one of the
+ * publications, so we later combine all the quals with OR.
+ *
+ * For initial synchronization, row filtering can be ignored in following
+ * cases:
+ *
+ * 1) one of the subscribed publications for the table hasn't specified
+ * any row filter
+ *
+ * 2) one of the subscribed publications has puballtables set to true
+ *
+ * 3) one of the subscribed publications is declared as ALL TABLES IN
+ * SCHEMA that includes this relation
+ */
+ if (walrcv_server_version(LogRepWorkerWalRcvConn) >= 150000)
+ {
+ StringInfoData pub_names;
+
+ /* Build the pubname list. */
+ initStringInfo(&pub_names);
+ first = true;
+ foreach(lc, MySubscription->publications)
+ {
+ char *pubname = strVal(lfirst(lc));
+
+ if (first)
+ first = false;
+ else
+ appendStringInfoString(&pub_names, ", ");
+
+ appendStringInfoString(&pub_names, quote_literal_cstr(pubname));
+ }
+
+ /* Check for row filters. */
+ resetStringInfo(&cmd);
+ appendStringInfo(&cmd,
+ "SELECT DISTINCT pg_get_expr(pr.prqual, pr.prrelid)"
+ " FROM pg_publication p"
+ " LEFT OUTER JOIN pg_publication_rel pr"
+ " ON (p.oid = pr.prpubid AND pr.prrelid = %u),"
+ " LATERAL pg_get_publication_tables(p.pubname) gpt"
+ " WHERE gpt.relid = %u"
+ " AND p.pubname IN ( %s )",
+ lrel->remoteid,
+ lrel->remoteid,
+ pub_names.data);
+
+ res = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data, 1, qualRow);
+
+ if (res->status != WALRCV_OK_TUPLES)
+ ereport(ERROR,
+ (errmsg("could not fetch table WHERE clause info for table \"%s.%s\" from publisher: %s",
+ nspname, relname, res->err)));
+
+ /*
+ * Multiple row filter expressions for the same table will be combined
+ * by COPY using OR. If any of the filter expressions for this table
+ * are null, it means the whole table will be copied. In this case it
+ * is not necessary to construct a unified row filter expression at
+ * all.
+ */
+ slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
+ while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
+ {
+ Datum rf = slot_getattr(slot, 1, &isnull);
+
+ if (!isnull)
+ *qual = lappend(*qual, makeString(TextDatumGetCString(rf)));
+ else
+ {
+ /* Ignore filters and cleanup as necessary. */
+ if (*qual)
+ {
+ list_free_deep(*qual);
+ *qual = NIL;
+ }
+ break;
+ }
+
+ ExecClearTuple(slot);
+ }
+ ExecDropSingleTupleTableSlot(slot);
+
+ walrcv_clear_result(res);
+ }
+
pfree(cmd.data);
}
@@ -811,6 +907,7 @@ copy_table(Relation rel)
{
LogicalRepRelMapEntry *relmapentry;
LogicalRepRelation lrel;
+ List *qual = NIL;
WalRcvExecResult *res;
StringInfoData cmd;
CopyFromState cstate;
@@ -819,7 +916,7 @@ copy_table(Relation rel)
/* Get the publisher relation info. */
fetch_remote_table_info(get_namespace_name(RelationGetNamespace(rel)),
- RelationGetRelationName(rel), &lrel);
+ RelationGetRelationName(rel), &lrel, &qual);
/* Put the relation into relmap. */
logicalrep_relmap_update(&lrel);
@@ -830,14 +927,18 @@ copy_table(Relation rel)
/* Start copy on the publisher. */
initStringInfo(&cmd);
- if (lrel.relkind == RELKIND_RELATION)
+
+ /* Regular table with no row filter */
+ if (lrel.relkind == RELKIND_RELATION && qual == NIL)
appendStringInfo(&cmd, "COPY %s TO STDOUT",
quote_qualified_identifier(lrel.nspname, lrel.relname));
else
{
/*
- * For non-tables, we need to do COPY (SELECT ...), but we can't just
- * do SELECT * because we need to not copy generated columns.
+ * For non-tables and tables with row filters, we need to do COPY
+ * (SELECT ...), but we can't just do SELECT * because we need to not
+ * copy generated columns. For tables with any row filters, build a
+ * SELECT query with OR'ed row filters for COPY.
*/
appendStringInfoString(&cmd, "COPY (SELECT ");
for (int i = 0; i < lrel.natts; i++)
@@ -846,8 +947,33 @@ copy_table(Relation rel)
if (i < lrel.natts - 1)
appendStringInfoString(&cmd, ", ");
}
- appendStringInfo(&cmd, " FROM %s) TO STDOUT",
- quote_qualified_identifier(lrel.nspname, lrel.relname));
+
+ appendStringInfoString(&cmd, " FROM ");
+
+ /*
+ * For regular tables, make sure we don't copy data from a child that
+ * inherits the named table as those will be copied separately.
+ */
+ if (lrel.relkind == RELKIND_RELATION)
+ appendStringInfoString(&cmd, "ONLY ");
+
+ appendStringInfoString(&cmd, quote_qualified_identifier(lrel.nspname, lrel.relname));
+ /* list of OR'ed filters */
+ if (qual != NIL)
+ {
+ ListCell *lc;
+ char *q = strVal(linitial(qual));
+
+ appendStringInfo(&cmd, " WHERE %s", q);
+ for_each_from(lc, qual, 1)
+ {
+ q = strVal(lfirst(lc));
+ appendStringInfo(&cmd, " OR %s", q);
+ }
+ list_free_deep(qual);
+ }
+
+ appendStringInfoString(&cmd, ") TO STDOUT");
}
res = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data, 0, NULL);
pfree(cmd.data);