PostgreSQL Source Code git master
worker.c
Go to the documentation of this file.
1/*-------------------------------------------------------------------------
2 * worker.c
3 * PostgreSQL logical replication worker (apply)
4 *
5 * Copyright (c) 2016-2025, PostgreSQL Global Development Group
6 *
7 * IDENTIFICATION
8 * src/backend/replication/logical/worker.c
9 *
10 * NOTES
11 * This file contains the worker which applies logical changes as they come
12 * from remote logical replication stream.
13 *
14 * The main worker (apply) is started by logical replication worker
15 * launcher for every enabled subscription in a database. It uses
16 * walsender protocol to communicate with publisher.
17 *
18 * This module includes server facing code and shares libpqwalreceiver
19 * module with walreceiver for providing the libpq specific functionality.
20 *
21 *
22 * STREAMED TRANSACTIONS
23 * ---------------------
24 * Streamed transactions (large transactions exceeding a memory limit on the
25 * upstream) are applied using one of two approaches:
26 *
27 * 1) Write to temporary files and apply when the final commit arrives
28 *
29 * This approach is used when the user has set the subscription's streaming
30 * option as on.
31 *
32 * Unlike the regular (non-streamed) case, handling streamed transactions has
33 * to handle aborts of both the toplevel transaction and subtransactions. This
34 * is achieved by tracking offsets for subtransactions, which is then used
35 * to truncate the file with serialized changes.
36 *
37 * The files are placed in tmp file directory by default, and the filenames
38 * include both the XID of the toplevel transaction and OID of the
39 * subscription. This is necessary so that different workers processing a
40 * remote transaction with the same XID doesn't interfere.
41 *
42 * We use BufFiles instead of using normal temporary files because (a) the
43 * BufFile infrastructure supports temporary files that exceed the OS file size
44 * limit, (b) provides a way for automatic clean up on the error and (c) provides
45 * a way to survive these files across local transactions and allow to open and
46 * close at stream start and close. We decided to use FileSet
47 * infrastructure as without that it deletes the files on the closure of the
48 * file and if we decide to keep stream files open across the start/stop stream
49 * then it will consume a lot of memory (more than 8K for each BufFile and
50 * there could be multiple such BufFiles as the subscriber could receive
51 * multiple start/stop streams for different transactions before getting the
52 * commit). Moreover, if we don't use FileSet then we also need to invent
53 * a new way to pass filenames to BufFile APIs so that we are allowed to open
54 * the file we desired across multiple stream-open calls for the same
55 * transaction.
56 *
57 * 2) Parallel apply workers.
58 *
59 * This approach is used when the user has set the subscription's streaming
60 * option as parallel. See logical/applyparallelworker.c for information about
61 * this approach.
62 *
63 * TWO_PHASE TRANSACTIONS
64 * ----------------------
65 * Two phase transactions are replayed at prepare and then committed or
66 * rolled back at commit prepared and rollback prepared respectively. It is
67 * possible to have a prepared transaction that arrives at the apply worker
68 * when the tablesync is busy doing the initial copy. In this case, the apply
69 * worker skips all the prepared operations [e.g. inserts] while the tablesync
70 * is still busy (see the condition of should_apply_changes_for_rel). The
71 * tablesync worker might not get such a prepared transaction because say it
72 * was prior to the initial consistent point but might have got some later
73 * commits. Now, the tablesync worker will exit without doing anything for the
74 * prepared transaction skipped by the apply worker as the sync location for it
75 * will be already ahead of the apply worker's current location. This would lead
76 * to an "empty prepare", because later when the apply worker does the commit
77 * prepare, there is nothing in it (the inserts were skipped earlier).
78 *
79 * To avoid this, and similar prepare confusions the subscription's two_phase
80 * commit is enabled only after the initial sync is over. The two_phase option
81 * has been implemented as a tri-state with values DISABLED, PENDING, and
82 * ENABLED.
83 *
84 * Even if the user specifies they want a subscription with two_phase = on,
85 * internally it will start with a tri-state of PENDING which only becomes
86 * ENABLED after all tablesync initializations are completed - i.e. when all
87 * tablesync workers have reached their READY state. In other words, the value
88 * PENDING is only a temporary state for subscription start-up.
89 *
90 * Until the two_phase is properly available (ENABLED) the subscription will
91 * behave as if two_phase = off. When the apply worker detects that all
92 * tablesyncs have become READY (while the tri-state was PENDING) it will
93 * restart the apply worker process. This happens in
94 * process_syncing_tables_for_apply.
95 *
96 * When the (re-started) apply worker finds that all tablesyncs are READY for a
97 * two_phase tri-state of PENDING it start streaming messages with the
98 * two_phase option which in turn enables the decoding of two-phase commits at
99 * the publisher. Then, it updates the tri-state value from PENDING to ENABLED.
100 * Now, it is possible that during the time we have not enabled two_phase, the
101 * publisher (replication server) would have skipped some prepares but we
102 * ensure that such prepares are sent along with commit prepare, see
103 * ReorderBufferFinishPrepared.
104 *
105 * If the subscription has no tables then a two_phase tri-state PENDING is
106 * left unchanged. This lets the user still do an ALTER SUBSCRIPTION REFRESH
107 * PUBLICATION which might otherwise be disallowed (see below).
108 *
109 * If ever a user needs to be aware of the tri-state value, they can fetch it
110 * from the pg_subscription catalog (see column subtwophasestate).
111 *
112 * We don't allow to toggle two_phase option of a subscription because it can
113 * lead to an inconsistent replica. Consider, initially, it was on and we have
114 * received some prepare then we turn it off, now at commit time the server
115 * will send the entire transaction data along with the commit. With some more
116 * analysis, we can allow changing this option from off to on but not sure if
117 * that alone would be useful.
118 *
119 * Finally, to avoid problems mentioned in previous paragraphs from any
120 * subsequent (not READY) tablesyncs (need to toggle two_phase option from 'on'
121 * to 'off' and then again back to 'on') there is a restriction for
122 * ALTER SUBSCRIPTION REFRESH PUBLICATION. This command is not permitted when
123 * the two_phase tri-state is ENABLED, except when copy_data = false.
124 *
125 * We can get prepare of the same GID more than once for the genuine cases
126 * where we have defined multiple subscriptions for publications on the same
127 * server and prepared transaction has operations on tables subscribed to those
128 * subscriptions. For such cases, if we use the GID sent by publisher one of
129 * the prepares will be successful and others will fail, in which case the
130 * server will send them again. Now, this can lead to a deadlock if user has
131 * set synchronous_standby_names for all the subscriptions on subscriber. To
132 * avoid such deadlocks, we generate a unique GID (consisting of the
133 * subscription oid and the xid of the prepared transaction) for each prepare
134 * transaction on the subscriber.
135 *
136 * FAILOVER
137 * ----------------------
138 * The logical slot on the primary can be synced to the standby by specifying
139 * failover = true when creating the subscription. Enabling failover allows us
140 * to smoothly transition to the promoted standby, ensuring that we can
141 * subscribe to the new primary without losing any data.
142 *-------------------------------------------------------------------------
143 */
144
145#include "postgres.h"
146
147#include <sys/stat.h>
148#include <unistd.h>
149
150#include "access/table.h"
151#include "access/tableam.h"
152#include "access/twophase.h"
153#include "access/xact.h"
154#include "catalog/indexing.h"
155#include "catalog/pg_inherits.h"
158#include "commands/tablecmds.h"
159#include "commands/trigger.h"
160#include "executor/executor.h"
162#include "libpq/pqformat.h"
163#include "miscadmin.h"
164#include "optimizer/optimizer.h"
166#include "pgstat.h"
167#include "postmaster/bgworker.h"
168#include "postmaster/interrupt.h"
169#include "postmaster/walwriter.h"
170#include "replication/conflict.h"
175#include "replication/origin.h"
179#include "storage/buffile.h"
180#include "storage/ipc.h"
181#include "storage/lmgr.h"
182#include "tcop/tcopprot.h"
183#include "utils/acl.h"
184#include "utils/dynahash.h"
185#include "utils/guc.h"
186#include "utils/inval.h"
187#include "utils/lsyscache.h"
188#include "utils/memutils.h"
189#include "utils/pg_lsn.h"
190#include "utils/rel.h"
191#include "utils/rls.h"
192#include "utils/snapmgr.h"
193#include "utils/syscache.h"
194#include "utils/usercontext.h"
195
196#define NAPTIME_PER_CYCLE 1000 /* max sleep time between cycles (1s) */
197
198typedef struct FlushPosition
199{
204
206
207typedef struct ApplyExecutionData
208{
209 EState *estate; /* executor state, used to track resources */
210
211 LogicalRepRelMapEntry *targetRel; /* replication target rel */
212 ResultRelInfo *targetRelInfo; /* ResultRelInfo for same */
213
214 /* These fields are used when the target relation is partitioned: */
215 ModifyTableState *mtstate; /* dummy ModifyTable state */
216 PartitionTupleRouting *proute; /* partition routing info */
218
219/* Struct for saving and restoring apply errcontext information */
221{
222 LogicalRepMsgType command; /* 0 if invalid */
224
225 /* Remote node information */
226 int remote_attnum; /* -1 if invalid */
231
232/*
233 * The action to be taken for the changes in the transaction.
234 *
235 * TRANS_LEADER_APPLY:
236 * This action means that we are in the leader apply worker or table sync
237 * worker. The changes of the transaction are either directly applied or
238 * are read from temporary files (for streaming transactions) and then
239 * applied by the worker.
240 *
241 * TRANS_LEADER_SERIALIZE:
242 * This action means that we are in the leader apply worker or table sync
243 * worker. Changes are written to temporary files and then applied when the
244 * final commit arrives.
245 *
246 * TRANS_LEADER_SEND_TO_PARALLEL:
247 * This action means that we are in the leader apply worker and need to send
248 * the changes to the parallel apply worker.
249 *
250 * TRANS_LEADER_PARTIAL_SERIALIZE:
251 * This action means that we are in the leader apply worker and have sent some
252 * changes directly to the parallel apply worker and the remaining changes are
253 * serialized to a file, due to timeout while sending data. The parallel apply
254 * worker will apply these serialized changes when the final commit arrives.
255 *
256 * We can't use TRANS_LEADER_SERIALIZE for this case because, in addition to
257 * serializing changes, the leader worker also needs to serialize the
258 * STREAM_XXX message to a file, and wait for the parallel apply worker to
259 * finish the transaction when processing the transaction finish command. So
260 * this new action was introduced to keep the code and logic clear.
261 *
262 * TRANS_PARALLEL_APPLY:
263 * This action means that we are in the parallel apply worker and changes of
264 * the transaction are applied directly by the worker.
265 */
266typedef enum
267{
268 /* The action for non-streaming transactions. */
270
271 /* Actions for streaming transactions. */
277
278/* errcontext tracker */
280{
281 .command = 0,
282 .rel = NULL,
283 .remote_attnum = -1,
284 .remote_xid = InvalidTransactionId,
285 .finish_lsn = InvalidXLogRecPtr,
286 .origin_name = NULL,
287};
288
290
293
294/* per stream context for streaming transactions */
296
298
300static bool MySubscriptionValid = false;
301
303
306
307/* fields valid only when processing streamed transaction */
308static bool in_streamed_transaction = false;
309
311
312/*
313 * The number of changes applied by parallel apply worker during one streaming
314 * block.
315 */
317
318/* Are we initializing an apply worker? */
320
321/*
322 * We enable skipping all data modification changes (INSERT, UPDATE, etc.) for
323 * the subscription if the remote transaction's finish LSN matches the subskiplsn.
324 * Once we start skipping changes, we don't stop it until we skip all changes of
325 * the transaction even if pg_subscription is updated and MySubscription->skiplsn
326 * gets changed or reset during that. Also, in streaming transaction cases (streaming = on),
327 * we don't skip receiving and spooling the changes since we decide whether or not
328 * to skip applying the changes when starting to apply changes. The subskiplsn is
329 * cleared after successfully skipping the transaction or applying non-empty
330 * transaction. The latter prevents the mistakenly specified subskiplsn from
331 * being left. Note that we cannot skip the streaming transactions when using
332 * parallel apply workers because we cannot get the finish LSN before applying
333 * the changes. So, we don't start parallel apply worker when finish LSN is set
334 * by the user.
335 */
337#define is_skipping_changes() (unlikely(!XLogRecPtrIsInvalid(skip_xact_finish_lsn)))
338
339/* BufFile handle of the current streaming file */
340static BufFile *stream_fd = NULL;
341
342typedef struct SubXactInfo
343{
344 TransactionId xid; /* XID of the subxact */
345 int fileno; /* file number in the buffile */
346 off_t offset; /* offset in the file */
348
349/* Sub-transaction data for the current streaming transaction */
350typedef struct ApplySubXactData
351{
352 uint32 nsubxacts; /* number of sub-transactions */
353 uint32 nsubxacts_max; /* current capacity of subxacts */
354 TransactionId subxact_last; /* xid of the last sub-transaction */
355 SubXactInfo *subxacts; /* sub-xact offset in changes file */
357
359
360static inline void subxact_filename(char *path, Oid subid, TransactionId xid);
361static inline void changes_filename(char *path, Oid subid, TransactionId xid);
362
363/*
364 * Information about subtransactions of a given toplevel transaction.
365 */
366static void subxact_info_write(Oid subid, TransactionId xid);
367static void subxact_info_read(Oid subid, TransactionId xid);
368static void subxact_info_add(TransactionId xid);
369static inline void cleanup_subxact_info(void);
370
371/*
372 * Serialize and deserialize changes for a toplevel transaction.
373 */
374static void stream_open_file(Oid subid, TransactionId xid,
375 bool first_segment);
376static void stream_write_change(char action, StringInfo s);
378static void stream_close_file(void);
379
380static void send_feedback(XLogRecPtr recvpos, bool force, bool requestReply);
381
382static void apply_handle_commit_internal(LogicalRepCommitData *commit_data);
384 ResultRelInfo *relinfo,
385 TupleTableSlot *remoteslot);
387 ResultRelInfo *relinfo,
388 TupleTableSlot *remoteslot,
389 LogicalRepTupleData *newtup,
390 Oid localindexoid);
392 ResultRelInfo *relinfo,
393 TupleTableSlot *remoteslot,
394 Oid localindexoid);
395static bool FindReplTupleInLocalRel(ApplyExecutionData *edata, Relation localrel,
396 LogicalRepRelation *remoterel,
397 Oid localidxoid,
398 TupleTableSlot *remoteslot,
399 TupleTableSlot **localslot);
401 TupleTableSlot *remoteslot,
402 LogicalRepTupleData *newtup,
403 CmdType operation);
404
405/* Functions for skipping changes */
406static void maybe_start_skipping_changes(XLogRecPtr finish_lsn);
407static void stop_skipping_changes(void);
408static void clear_subscription_skip_lsn(XLogRecPtr finish_lsn);
409
410/* Functions for apply error callback */
411static inline void set_apply_error_context_xact(TransactionId xid, XLogRecPtr lsn);
412static inline void reset_apply_error_context_info(void);
413
416
417static void replorigin_reset(int code, Datum arg);
418
419/*
420 * Form the origin name for the subscription.
421 *
422 * This is a common function for tablesync and other workers. Tablesync workers
423 * must pass a valid relid. Other callers must pass relid = InvalidOid.
424 *
425 * Return the name in the supplied buffer.
426 */
427void
429 char *originname, Size szoriginname)
430{
431 if (OidIsValid(relid))
432 {
433 /* Replication origin name for tablesync workers. */
434 snprintf(originname, szoriginname, "pg_%u_%u", suboid, relid);
435 }
436 else
437 {
438 /* Replication origin name for non-tablesync workers. */
439 snprintf(originname, szoriginname, "pg_%u", suboid);
440 }
441}
442
443/*
444 * Should this worker apply changes for given relation.
445 *
446 * This is mainly needed for initial relation data sync as that runs in
447 * separate worker process running in parallel and we need some way to skip
448 * changes coming to the leader apply worker during the sync of a table.
449 *
450 * Note we need to do smaller or equals comparison for SYNCDONE state because
451 * it might hold position of end of initial slot consistent point WAL
452 * record + 1 (ie start of next record) and next record can be COMMIT of
453 * transaction we are now processing (which is what we set remote_final_lsn
454 * to in apply_handle_begin).
455 *
456 * Note that for streaming transactions that are being applied in the parallel
457 * apply worker, we disallow applying changes if the target table in the
458 * subscription is not in the READY state, because we cannot decide whether to
459 * apply the change as we won't know remote_final_lsn by that time.
460 *
461 * We already checked this in pa_can_start() before assigning the
462 * streaming transaction to the parallel worker, but it also needs to be
463 * checked here because if the user executes ALTER SUBSCRIPTION ... REFRESH
464 * PUBLICATION in parallel, the new table can be added to pg_subscription_rel
465 * while applying this transaction.
466 */
467static bool
469{
470 switch (MyLogicalRepWorker->type)
471 {
473 return MyLogicalRepWorker->relid == rel->localreloid;
474
476 /* We don't synchronize rel's that are in unknown state. */
477 if (rel->state != SUBREL_STATE_READY &&
478 rel->state != SUBREL_STATE_UNKNOWN)
480 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
481 errmsg("logical replication parallel apply worker for subscription \"%s\" will stop",
483 errdetail("Cannot handle streamed replication transactions using parallel apply workers until all tables have been synchronized.")));
484
485 return rel->state == SUBREL_STATE_READY;
486
487 case WORKERTYPE_APPLY:
488 return (rel->state == SUBREL_STATE_READY ||
489 (rel->state == SUBREL_STATE_SYNCDONE &&
490 rel->statelsn <= remote_final_lsn));
491
493 /* Should never happen. */
494 elog(ERROR, "Unknown worker type");
495 }
496
497 return false; /* dummy for compiler */
498}
499
500/*
501 * Begin one step (one INSERT, UPDATE, etc) of a replication transaction.
502 *
503 * Start a transaction, if this is the first step (else we keep using the
504 * existing transaction).
505 * Also provide a global snapshot and ensure we run in ApplyMessageContext.
506 */
507static void
509{
511
512 if (!IsTransactionState())
513 {
516 }
517
519
521}
522
523/*
524 * Finish up one step of a replication transaction.
525 * Callers of begin_replication_step() must also call this.
526 *
527 * We don't close out the transaction here, but we should increment
528 * the command counter to make the effects of this step visible.
529 */
530static void
532{
534
536}
537
538/*
539 * Handle streamed transactions for both the leader apply worker and the
540 * parallel apply workers.
541 *
542 * In the streaming case (receiving a block of the streamed transaction), for
543 * serialize mode, simply redirect it to a file for the proper toplevel
544 * transaction, and for parallel mode, the leader apply worker will send the
545 * changes to parallel apply workers and the parallel apply worker will define
546 * savepoints if needed. (LOGICAL_REP_MSG_RELATION or LOGICAL_REP_MSG_TYPE
547 * messages will be applied by both leader apply worker and parallel apply
548 * workers).
549 *
550 * Returns true for streamed transactions (when the change is either serialized
551 * to file or sent to parallel apply worker), false otherwise (regular mode or
552 * needs to be processed by parallel apply worker).
553 *
554 * Exception: If the message being processed is LOGICAL_REP_MSG_RELATION
555 * or LOGICAL_REP_MSG_TYPE, return false even if the message needs to be sent
556 * to a parallel apply worker.
557 */
558static bool
560{
561 TransactionId current_xid;
563 TransApplyAction apply_action;
564 StringInfoData original_msg;
565
566 apply_action = get_transaction_apply_action(stream_xid, &winfo);
567
568 /* not in streaming mode */
569 if (apply_action == TRANS_LEADER_APPLY)
570 return false;
571
573
574 /*
575 * The parallel apply worker needs the xid in this message to decide
576 * whether to define a savepoint, so save the original message that has
577 * not moved the cursor after the xid. We will serialize this message to a
578 * file in PARTIAL_SERIALIZE mode.
579 */
580 original_msg = *s;
581
582 /*
583 * We should have received XID of the subxact as the first part of the
584 * message, so extract it.
585 */
586 current_xid = pq_getmsgint(s, 4);
587
588 if (!TransactionIdIsValid(current_xid))
590 (errcode(ERRCODE_PROTOCOL_VIOLATION),
591 errmsg_internal("invalid transaction ID in streamed replication transaction")));
592
593 switch (apply_action)
594 {
597
598 /* Add the new subxact to the array (unless already there). */
599 subxact_info_add(current_xid);
600
601 /* Write the change to the current file */
603 return true;
604
606 Assert(winfo);
607
608 /*
609 * XXX The publisher side doesn't always send relation/type update
610 * messages after the streaming transaction, so also update the
611 * relation/type in leader apply worker. See function
612 * cleanup_rel_sync_cache.
613 */
614 if (pa_send_data(winfo, s->len, s->data))
615 return (action != LOGICAL_REP_MSG_RELATION &&
617
618 /*
619 * Switch to serialize mode when we are not able to send the
620 * change to parallel apply worker.
621 */
622 pa_switch_to_partial_serialize(winfo, false);
623
624 /* fall through */
626 stream_write_change(action, &original_msg);
627
628 /* Same reason as TRANS_LEADER_SEND_TO_PARALLEL case. */
629 return (action != LOGICAL_REP_MSG_RELATION &&
631
634
635 /* Define a savepoint for a subxact if needed. */
636 pa_start_subtrans(current_xid, stream_xid);
637 return false;
638
639 default:
640 elog(ERROR, "unexpected apply action: %d", (int) apply_action);
641 return false; /* silence compiler warning */
642 }
643}
644
645/*
646 * Executor state preparation for evaluation of constraint expressions,
647 * indexes and triggers for the specified relation.
648 *
649 * Note that the caller must open and close any indexes to be updated.
650 */
651static ApplyExecutionData *
653{
654 ApplyExecutionData *edata;
655 EState *estate;
656 RangeTblEntry *rte;
657 List *perminfos = NIL;
658 ResultRelInfo *resultRelInfo;
659
660 edata = (ApplyExecutionData *) palloc0(sizeof(ApplyExecutionData));
661 edata->targetRel = rel;
662
663 edata->estate = estate = CreateExecutorState();
664
665 rte = makeNode(RangeTblEntry);
666 rte->rtekind = RTE_RELATION;
667 rte->relid = RelationGetRelid(rel->localrel);
668 rte->relkind = rel->localrel->rd_rel->relkind;
669 rte->rellockmode = AccessShareLock;
670
671 addRTEPermissionInfo(&perminfos, rte);
672
673 ExecInitRangeTable(estate, list_make1(rte), perminfos,
675
676 edata->targetRelInfo = resultRelInfo = makeNode(ResultRelInfo);
677
678 /*
679 * Use Relation opened by logicalrep_rel_open() instead of opening it
680 * again.
681 */
682 InitResultRelInfo(resultRelInfo, rel->localrel, 1, NULL, 0);
683
684 /*
685 * We put the ResultRelInfo in the es_opened_result_relations list, even
686 * though we don't populate the es_result_relations array. That's a bit
687 * bogus, but it's enough to make ExecGetTriggerResultRel() find them.
688 *
689 * ExecOpenIndices() is not called here either, each execution path doing
690 * an apply operation being responsible for that.
691 */
693 lappend(estate->es_opened_result_relations, resultRelInfo);
694
695 estate->es_output_cid = GetCurrentCommandId(true);
696
697 /* Prepare to catch AFTER triggers. */
699
700 /* other fields of edata remain NULL for now */
701
702 return edata;
703}
704
705/*
706 * Finish any operations related to the executor state created by
707 * create_edata_for_relation().
708 */
709static void
711{
712 EState *estate = edata->estate;
713
714 /* Handle any queued AFTER triggers. */
715 AfterTriggerEndQuery(estate);
716
717 /* Shut down tuple routing, if any was done. */
718 if (edata->proute)
719 ExecCleanupTupleRouting(edata->mtstate, edata->proute);
720
721 /*
722 * Cleanup. It might seem that we should call ExecCloseResultRelations()
723 * here, but we intentionally don't. It would close the rel we added to
724 * es_opened_result_relations above, which is wrong because we took no
725 * corresponding refcount. We rely on ExecCleanupTupleRouting() to close
726 * any other relations opened during execution.
727 */
728 ExecResetTupleTable(estate->es_tupleTable, false);
729 FreeExecutorState(estate);
730 pfree(edata);
731}
732
733/*
734 * Executes default values for columns for which we can't map to remote
735 * relation columns.
736 *
737 * This allows us to support tables which have more columns on the downstream
738 * than on the upstream.
739 */
740static void
742 TupleTableSlot *slot)
743{
745 int num_phys_attrs = desc->natts;
746 int i;
747 int attnum,
748 num_defaults = 0;
749 int *defmap;
750 ExprState **defexprs;
751 ExprContext *econtext;
752
753 econtext = GetPerTupleExprContext(estate);
754
755 /* We got all the data via replication, no need to evaluate anything. */
756 if (num_phys_attrs == rel->remoterel.natts)
757 return;
758
759 defmap = (int *) palloc(num_phys_attrs * sizeof(int));
760 defexprs = (ExprState **) palloc(num_phys_attrs * sizeof(ExprState *));
761
762 Assert(rel->attrmap->maplen == num_phys_attrs);
763 for (attnum = 0; attnum < num_phys_attrs; attnum++)
764 {
765 Expr *defexpr;
766
767 if (TupleDescAttr(desc, attnum)->attisdropped || TupleDescAttr(desc, attnum)->attgenerated)
768 continue;
769
770 if (rel->attrmap->attnums[attnum] >= 0)
771 continue;
772
773 defexpr = (Expr *) build_column_default(rel->localrel, attnum + 1);
774
775 if (defexpr != NULL)
776 {
777 /* Run the expression through planner */
778 defexpr = expression_planner(defexpr);
779
780 /* Initialize executable expression in copycontext */
781 defexprs[num_defaults] = ExecInitExpr(defexpr, NULL);
782 defmap[num_defaults] = attnum;
783 num_defaults++;
784 }
785 }
786
787 for (i = 0; i < num_defaults; i++)
788 slot->tts_values[defmap[i]] =
789 ExecEvalExpr(defexprs[i], econtext, &slot->tts_isnull[defmap[i]]);
790}
791
792/*
793 * Store tuple data into slot.
794 *
795 * Incoming data can be either text or binary format.
796 */
797static void
799 LogicalRepTupleData *tupleData)
800{
801 int natts = slot->tts_tupleDescriptor->natts;
802 int i;
803
804 ExecClearTuple(slot);
805
806 /* Call the "in" function for each non-dropped, non-null attribute */
807 Assert(natts == rel->attrmap->maplen);
808 for (i = 0; i < natts; i++)
809 {
811 int remoteattnum = rel->attrmap->attnums[i];
812
813 if (!att->attisdropped && remoteattnum >= 0)
814 {
815 StringInfo colvalue = &tupleData->colvalues[remoteattnum];
816
817 Assert(remoteattnum < tupleData->ncols);
818
819 /* Set attnum for error callback */
821
822 if (tupleData->colstatus[remoteattnum] == LOGICALREP_COLUMN_TEXT)
823 {
824 Oid typinput;
825 Oid typioparam;
826
827 getTypeInputInfo(att->atttypid, &typinput, &typioparam);
828 slot->tts_values[i] =
829 OidInputFunctionCall(typinput, colvalue->data,
830 typioparam, att->atttypmod);
831 slot->tts_isnull[i] = false;
832 }
833 else if (tupleData->colstatus[remoteattnum] == LOGICALREP_COLUMN_BINARY)
834 {
835 Oid typreceive;
836 Oid typioparam;
837
838 /*
839 * In some code paths we may be asked to re-parse the same
840 * tuple data. Reset the StringInfo's cursor so that works.
841 */
842 colvalue->cursor = 0;
843
844 getTypeBinaryInputInfo(att->atttypid, &typreceive, &typioparam);
845 slot->tts_values[i] =
846 OidReceiveFunctionCall(typreceive, colvalue,
847 typioparam, att->atttypmod);
848
849 /* Trouble if it didn't eat the whole buffer */
850 if (colvalue->cursor != colvalue->len)
852 (errcode(ERRCODE_INVALID_BINARY_REPRESENTATION),
853 errmsg("incorrect binary data format in logical replication column %d",
854 remoteattnum + 1)));
855 slot->tts_isnull[i] = false;
856 }
857 else
858 {
859 /*
860 * NULL value from remote. (We don't expect to see
861 * LOGICALREP_COLUMN_UNCHANGED here, but if we do, treat it as
862 * NULL.)
863 */
864 slot->tts_values[i] = (Datum) 0;
865 slot->tts_isnull[i] = true;
866 }
867
868 /* Reset attnum for error callback */
870 }
871 else
872 {
873 /*
874 * We assign NULL to dropped attributes and missing values
875 * (missing values should be later filled using
876 * slot_fill_defaults).
877 */
878 slot->tts_values[i] = (Datum) 0;
879 slot->tts_isnull[i] = true;
880 }
881 }
882
884}
885
886/*
887 * Replace updated columns with data from the LogicalRepTupleData struct.
888 * This is somewhat similar to heap_modify_tuple but also calls the type
889 * input functions on the user data.
890 *
891 * "slot" is filled with a copy of the tuple in "srcslot", replacing
892 * columns provided in "tupleData" and leaving others as-is.
893 *
894 * Caution: unreplaced pass-by-ref columns in "slot" will point into the
895 * storage for "srcslot". This is OK for current usage, but someday we may
896 * need to materialize "slot" at the end to make it independent of "srcslot".
897 */
898static void
901 LogicalRepTupleData *tupleData)
902{
903 int natts = slot->tts_tupleDescriptor->natts;
904 int i;
905
906 /* We'll fill "slot" with a virtual tuple, so we must start with ... */
907 ExecClearTuple(slot);
908
909 /*
910 * Copy all the column data from srcslot, so that we'll have valid values
911 * for unreplaced columns.
912 */
913 Assert(natts == srcslot->tts_tupleDescriptor->natts);
914 slot_getallattrs(srcslot);
915 memcpy(slot->tts_values, srcslot->tts_values, natts * sizeof(Datum));
916 memcpy(slot->tts_isnull, srcslot->tts_isnull, natts * sizeof(bool));
917
918 /* Call the "in" function for each replaced attribute */
919 Assert(natts == rel->attrmap->maplen);
920 for (i = 0; i < natts; i++)
921 {
923 int remoteattnum = rel->attrmap->attnums[i];
924
925 if (remoteattnum < 0)
926 continue;
927
928 Assert(remoteattnum < tupleData->ncols);
929
930 if (tupleData->colstatus[remoteattnum] != LOGICALREP_COLUMN_UNCHANGED)
931 {
932 StringInfo colvalue = &tupleData->colvalues[remoteattnum];
933
934 /* Set attnum for error callback */
936
937 if (tupleData->colstatus[remoteattnum] == LOGICALREP_COLUMN_TEXT)
938 {
939 Oid typinput;
940 Oid typioparam;
941
942 getTypeInputInfo(att->atttypid, &typinput, &typioparam);
943 slot->tts_values[i] =
944 OidInputFunctionCall(typinput, colvalue->data,
945 typioparam, att->atttypmod);
946 slot->tts_isnull[i] = false;
947 }
948 else if (tupleData->colstatus[remoteattnum] == LOGICALREP_COLUMN_BINARY)
949 {
950 Oid typreceive;
951 Oid typioparam;
952
953 /*
954 * In some code paths we may be asked to re-parse the same
955 * tuple data. Reset the StringInfo's cursor so that works.
956 */
957 colvalue->cursor = 0;
958
959 getTypeBinaryInputInfo(att->atttypid, &typreceive, &typioparam);
960 slot->tts_values[i] =
961 OidReceiveFunctionCall(typreceive, colvalue,
962 typioparam, att->atttypmod);
963
964 /* Trouble if it didn't eat the whole buffer */
965 if (colvalue->cursor != colvalue->len)
967 (errcode(ERRCODE_INVALID_BINARY_REPRESENTATION),
968 errmsg("incorrect binary data format in logical replication column %d",
969 remoteattnum + 1)));
970 slot->tts_isnull[i] = false;
971 }
972 else
973 {
974 /* must be LOGICALREP_COLUMN_NULL */
975 slot->tts_values[i] = (Datum) 0;
976 slot->tts_isnull[i] = true;
977 }
978
979 /* Reset attnum for error callback */
981 }
982 }
983
984 /* And finally, declare that "slot" contains a valid virtual tuple */
986}
987
988/*
989 * Handle BEGIN message.
990 */
991static void
993{
994 LogicalRepBeginData begin_data;
995
996 /* There must not be an active streaming transaction. */
998
999 logicalrep_read_begin(s, &begin_data);
1000 set_apply_error_context_xact(begin_data.xid, begin_data.final_lsn);
1001
1002 remote_final_lsn = begin_data.final_lsn;
1003
1005
1006 in_remote_transaction = true;
1007
1009}
1010
1011/*
1012 * Handle COMMIT message.
1013 *
1014 * TODO, support tracking of multiple origins
1015 */
1016static void
1018{
1019 LogicalRepCommitData commit_data;
1020
1021 logicalrep_read_commit(s, &commit_data);
1022
1023 if (commit_data.commit_lsn != remote_final_lsn)
1024 ereport(ERROR,
1025 (errcode(ERRCODE_PROTOCOL_VIOLATION),
1026 errmsg_internal("incorrect commit LSN %X/%X in commit message (expected %X/%X)",
1027 LSN_FORMAT_ARGS(commit_data.commit_lsn),
1029
1030 apply_handle_commit_internal(&commit_data);
1031
1032 /* Process any tables that are being synchronized in parallel. */
1033 process_syncing_tables(commit_data.end_lsn);
1034
1037}
1038
1039/*
1040 * Handle BEGIN PREPARE message.
1041 */
1042static void
1044{
1045 LogicalRepPreparedTxnData begin_data;
1046
1047 /* Tablesync should never receive prepare. */
1048 if (am_tablesync_worker())
1049 ereport(ERROR,
1050 (errcode(ERRCODE_PROTOCOL_VIOLATION),
1051 errmsg_internal("tablesync worker received a BEGIN PREPARE message")));
1052
1053 /* There must not be an active streaming transaction. */
1055
1056 logicalrep_read_begin_prepare(s, &begin_data);
1057 set_apply_error_context_xact(begin_data.xid, begin_data.prepare_lsn);
1058
1059 remote_final_lsn = begin_data.prepare_lsn;
1060
1062
1063 in_remote_transaction = true;
1064
1066}
1067
1068/*
1069 * Common function to prepare the GID.
1070 */
1071static void
1073{
1074 char gid[GIDSIZE];
1075
1076 /*
1077 * Compute unique GID for two_phase transactions. We don't use GID of
1078 * prepared transaction sent by server as that can lead to deadlock when
1079 * we have multiple subscriptions from same node point to publications on
1080 * the same node. See comments atop worker.c
1081 */
1083 gid, sizeof(gid));
1084
1085 /*
1086 * BeginTransactionBlock is necessary to balance the EndTransactionBlock
1087 * called within the PrepareTransactionBlock below.
1088 */
1089 if (!IsTransactionBlock())
1090 {
1092 CommitTransactionCommand(); /* Completes the preceding Begin command. */
1093 }
1094
1095 /*
1096 * Update origin state so we can restart streaming from correct position
1097 * in case of crash.
1098 */
1099 replorigin_session_origin_lsn = prepare_data->end_lsn;
1101
1103}
1104
1105/*
1106 * Handle PREPARE message.
1107 */
1108static void
1110{
1111 LogicalRepPreparedTxnData prepare_data;
1112
1113 logicalrep_read_prepare(s, &prepare_data);
1114
1115 if (prepare_data.prepare_lsn != remote_final_lsn)
1116 ereport(ERROR,
1117 (errcode(ERRCODE_PROTOCOL_VIOLATION),
1118 errmsg_internal("incorrect prepare LSN %X/%X in prepare message (expected %X/%X)",
1119 LSN_FORMAT_ARGS(prepare_data.prepare_lsn),
1121
1122 /*
1123 * Unlike commit, here, we always prepare the transaction even though no
1124 * change has happened in this transaction or all changes are skipped. It
1125 * is done this way because at commit prepared time, we won't know whether
1126 * we have skipped preparing a transaction because of those reasons.
1127 *
1128 * XXX, We can optimize such that at commit prepared time, we first check
1129 * whether we have prepared the transaction or not but that doesn't seem
1130 * worthwhile because such cases shouldn't be common.
1131 */
1133
1134 apply_handle_prepare_internal(&prepare_data);
1135
1138 pgstat_report_stat(false);
1139
1140 /*
1141 * It is okay not to set the local_end LSN for the prepare because we
1142 * always flush the prepare record. So, we can send the acknowledgment of
1143 * the remote_end LSN as soon as prepare is finished.
1144 *
1145 * XXX For the sake of consistency with commit, we could have set it with
1146 * the LSN of prepare but as of now we don't track that value similar to
1147 * XactLastCommitEnd, and adding it for this purpose doesn't seems worth
1148 * it.
1149 */
1151
1152 in_remote_transaction = false;
1153
1154 /* Process any tables that are being synchronized in parallel. */
1155 process_syncing_tables(prepare_data.end_lsn);
1156
1157 /*
1158 * Since we have already prepared the transaction, in a case where the
1159 * server crashes before clearing the subskiplsn, it will be left but the
1160 * transaction won't be resent. But that's okay because it's a rare case
1161 * and the subskiplsn will be cleared when finishing the next transaction.
1162 */
1165
1168}
1169
1170/*
1171 * Handle a COMMIT PREPARED of a previously PREPARED transaction.
1172 *
1173 * Note that we don't need to wait here if the transaction was prepared in a
1174 * parallel apply worker. In that case, we have already waited for the prepare
1175 * to finish in apply_handle_stream_prepare() which will ensure all the
1176 * operations in that transaction have happened in the subscriber, so no
1177 * concurrent transaction can cause deadlock or transaction dependency issues.
1178 */
1179static void
1181{
1183 char gid[GIDSIZE];
1184
1185 logicalrep_read_commit_prepared(s, &prepare_data);
1186 set_apply_error_context_xact(prepare_data.xid, prepare_data.commit_lsn);
1187
1188 /* Compute GID for two_phase transactions. */
1190 gid, sizeof(gid));
1191
1192 /* There is no transaction when COMMIT PREPARED is called */
1194
1195 /*
1196 * Update origin state so we can restart streaming from correct position
1197 * in case of crash.
1198 */
1201
1202 FinishPreparedTransaction(gid, true);
1205 pgstat_report_stat(false);
1206
1208 in_remote_transaction = false;
1209
1210 /* Process any tables that are being synchronized in parallel. */
1211 process_syncing_tables(prepare_data.end_lsn);
1212
1214
1217}
1218
1219/*
1220 * Handle a ROLLBACK PREPARED of a previously PREPARED TRANSACTION.
1221 *
1222 * Note that we don't need to wait here if the transaction was prepared in a
1223 * parallel apply worker. In that case, we have already waited for the prepare
1224 * to finish in apply_handle_stream_prepare() which will ensure all the
1225 * operations in that transaction have happened in the subscriber, so no
1226 * concurrent transaction can cause deadlock or transaction dependency issues.
1227 */
1228static void
1230{
1232 char gid[GIDSIZE];
1233
1234 logicalrep_read_rollback_prepared(s, &rollback_data);
1235 set_apply_error_context_xact(rollback_data.xid, rollback_data.rollback_end_lsn);
1236
1237 /* Compute GID for two_phase transactions. */
1239 gid, sizeof(gid));
1240
1241 /*
1242 * It is possible that we haven't received prepare because it occurred
1243 * before walsender reached a consistent point or the two_phase was still
1244 * not enabled by that time, so in such cases, we need to skip rollback
1245 * prepared.
1246 */
1247 if (LookupGXact(gid, rollback_data.prepare_end_lsn,
1248 rollback_data.prepare_time))
1249 {
1250 /*
1251 * Update origin state so we can restart streaming from correct
1252 * position in case of crash.
1253 */
1256
1257 /* There is no transaction when ABORT/ROLLBACK PREPARED is called */
1259 FinishPreparedTransaction(gid, false);
1262
1264 }
1265
1266 pgstat_report_stat(false);
1267
1268 /*
1269 * It is okay not to set the local_end LSN for the rollback of prepared
1270 * transaction because we always flush the WAL record for it. See
1271 * apply_handle_prepare.
1272 */
1274 in_remote_transaction = false;
1275
1276 /* Process any tables that are being synchronized in parallel. */
1278
1281}
1282
1283/*
1284 * Handle STREAM PREPARE.
1285 */
1286static void
1288{
1289 LogicalRepPreparedTxnData prepare_data;
1291 TransApplyAction apply_action;
1292
1293 /* Save the message before it is consumed. */
1294 StringInfoData original_msg = *s;
1295
1297 ereport(ERROR,
1298 (errcode(ERRCODE_PROTOCOL_VIOLATION),
1299 errmsg_internal("STREAM PREPARE message without STREAM STOP")));
1300
1301 /* Tablesync should never receive prepare. */
1302 if (am_tablesync_worker())
1303 ereport(ERROR,
1304 (errcode(ERRCODE_PROTOCOL_VIOLATION),
1305 errmsg_internal("tablesync worker received a STREAM PREPARE message")));
1306
1307 logicalrep_read_stream_prepare(s, &prepare_data);
1308 set_apply_error_context_xact(prepare_data.xid, prepare_data.prepare_lsn);
1309
1310 apply_action = get_transaction_apply_action(prepare_data.xid, &winfo);
1311
1312 switch (apply_action)
1313 {
1314 case TRANS_LEADER_APPLY:
1315
1316 /*
1317 * The transaction has been serialized to file, so replay all the
1318 * spooled operations.
1319 */
1321 prepare_data.xid, prepare_data.prepare_lsn);
1322
1323 /* Mark the transaction as prepared. */
1324 apply_handle_prepare_internal(&prepare_data);
1325
1327
1328 /*
1329 * It is okay not to set the local_end LSN for the prepare because
1330 * we always flush the prepare record. See apply_handle_prepare.
1331 */
1333
1334 in_remote_transaction = false;
1335
1336 /* Unlink the files with serialized changes and subxact info. */
1338
1339 elog(DEBUG1, "finished processing the STREAM PREPARE command");
1340 break;
1341
1343 Assert(winfo);
1344
1345 if (pa_send_data(winfo, s->len, s->data))
1346 {
1347 /* Finish processing the streaming transaction. */
1348 pa_xact_finish(winfo, prepare_data.end_lsn);
1349 break;
1350 }
1351
1352 /*
1353 * Switch to serialize mode when we are not able to send the
1354 * change to parallel apply worker.
1355 */
1356 pa_switch_to_partial_serialize(winfo, true);
1357
1358 /* fall through */
1360 Assert(winfo);
1361
1362 stream_open_and_write_change(prepare_data.xid,
1364 &original_msg);
1365
1367
1368 /* Finish processing the streaming transaction. */
1369 pa_xact_finish(winfo, prepare_data.end_lsn);
1370 break;
1371
1373
1374 /*
1375 * If the parallel apply worker is applying spooled messages then
1376 * close the file before preparing.
1377 */
1378 if (stream_fd)
1380
1382
1383 /* Mark the transaction as prepared. */
1384 apply_handle_prepare_internal(&prepare_data);
1385
1387
1389
1390 /*
1391 * It is okay not to set the local_end LSN for the prepare because
1392 * we always flush the prepare record. See apply_handle_prepare.
1393 */
1395
1398
1400
1401 elog(DEBUG1, "finished processing the STREAM PREPARE command");
1402 break;
1403
1404 default:
1405 elog(ERROR, "unexpected apply action: %d", (int) apply_action);
1406 break;
1407 }
1408
1409 pgstat_report_stat(false);
1410
1411 /* Process any tables that are being synchronized in parallel. */
1412 process_syncing_tables(prepare_data.end_lsn);
1413
1414 /*
1415 * Similar to prepare case, the subskiplsn could be left in a case of
1416 * server crash but it's okay. See the comments in apply_handle_prepare().
1417 */
1420
1422
1424}
1425
1426/*
1427 * Handle ORIGIN message.
1428 *
1429 * TODO, support tracking of multiple origins
1430 */
1431static void
1433{
1434 /*
1435 * ORIGIN message can only come inside streaming transaction or inside
1436 * remote transaction and before any actual writes.
1437 */
1441 ereport(ERROR,
1442 (errcode(ERRCODE_PROTOCOL_VIOLATION),
1443 errmsg_internal("ORIGIN message sent out of order")));
1444}
1445
1446/*
1447 * Initialize fileset (if not already done).
1448 *
1449 * Create a new file when first_segment is true, otherwise open the existing
1450 * file.
1451 */
1452void
1453stream_start_internal(TransactionId xid, bool first_segment)
1454{
1456
1457 /*
1458 * Initialize the worker's stream_fileset if we haven't yet. This will be
1459 * used for the entire duration of the worker so create it in a permanent
1460 * context. We create this on the very first streaming message from any
1461 * transaction and then use it for this and other streaming transactions.
1462 * Now, we could create a fileset at the start of the worker as well but
1463 * then we won't be sure that it will ever be used.
1464 */
1466 {
1467 MemoryContext oldctx;
1468
1470
1473
1474 MemoryContextSwitchTo(oldctx);
1475 }
1476
1477 /* Open the spool file for this transaction. */
1478 stream_open_file(MyLogicalRepWorker->subid, xid, first_segment);
1479
1480 /* If this is not the first segment, open existing subxact file. */
1481 if (!first_segment)
1483
1485}
1486
1487/*
1488 * Handle STREAM START message.
1489 */
1490static void
1492{
1493 bool first_segment;
1495 TransApplyAction apply_action;
1496
1497 /* Save the message before it is consumed. */
1498 StringInfoData original_msg = *s;
1499
1501 ereport(ERROR,
1502 (errcode(ERRCODE_PROTOCOL_VIOLATION),
1503 errmsg_internal("duplicate STREAM START message")));
1504
1505 /* There must not be an active streaming transaction. */
1507
1508 /* notify handle methods we're processing a remote transaction */
1510
1511 /* extract XID of the top-level transaction */
1512 stream_xid = logicalrep_read_stream_start(s, &first_segment);
1513
1515 ereport(ERROR,
1516 (errcode(ERRCODE_PROTOCOL_VIOLATION),
1517 errmsg_internal("invalid transaction ID in streamed replication transaction")));
1518
1520
1521 /* Try to allocate a worker for the streaming transaction. */
1522 if (first_segment)
1524
1525 apply_action = get_transaction_apply_action(stream_xid, &winfo);
1526
1527 switch (apply_action)
1528 {
1530
1531 /*
1532 * Function stream_start_internal starts a transaction. This
1533 * transaction will be committed on the stream stop unless it is a
1534 * tablesync worker in which case it will be committed after
1535 * processing all the messages. We need this transaction for
1536 * handling the BufFile, used for serializing the streaming data
1537 * and subxact info.
1538 */
1539 stream_start_internal(stream_xid, first_segment);
1540 break;
1541
1543 Assert(winfo);
1544
1545 /*
1546 * Once we start serializing the changes, the parallel apply
1547 * worker will wait for the leader to release the stream lock
1548 * until the end of the transaction. So, we don't need to release
1549 * the lock or increment the stream count in that case.
1550 */
1551 if (pa_send_data(winfo, s->len, s->data))
1552 {
1553 /*
1554 * Unlock the shared object lock so that the parallel apply
1555 * worker can continue to receive changes.
1556 */
1557 if (!first_segment)
1559
1560 /*
1561 * Increment the number of streaming blocks waiting to be
1562 * processed by parallel apply worker.
1563 */
1565
1566 /* Cache the parallel apply worker for this transaction. */
1568 break;
1569 }
1570
1571 /*
1572 * Switch to serialize mode when we are not able to send the
1573 * change to parallel apply worker.
1574 */
1575 pa_switch_to_partial_serialize(winfo, !first_segment);
1576
1577 /* fall through */
1579 Assert(winfo);
1580
1581 /*
1582 * Open the spool file unless it was already opened when switching
1583 * to serialize mode. The transaction started in
1584 * stream_start_internal will be committed on the stream stop.
1585 */
1586 if (apply_action != TRANS_LEADER_SEND_TO_PARALLEL)
1587 stream_start_internal(stream_xid, first_segment);
1588
1590
1591 /* Cache the parallel apply worker for this transaction. */
1593 break;
1594
1596 if (first_segment)
1597 {
1598 /* Hold the lock until the end of the transaction. */
1601
1602 /*
1603 * Signal the leader apply worker, as it may be waiting for
1604 * us.
1605 */
1607 }
1608
1610 break;
1611
1612 default:
1613 elog(ERROR, "unexpected apply action: %d", (int) apply_action);
1614 break;
1615 }
1616
1618}
1619
1620/*
1621 * Update the information about subxacts and close the file.
1622 *
1623 * This function should be called when the stream_start_internal function has
1624 * been called.
1625 */
1626void
1628{
1629 /*
1630 * Serialize information about subxacts for the toplevel transaction, then
1631 * close the stream messages spool file.
1632 */
1635
1636 /* We must be in a valid transaction state */
1638
1639 /* Commit the per-stream transaction */
1641
1642 /* Reset per-stream context */
1644}
1645
1646/*
1647 * Handle STREAM STOP message.
1648 */
1649static void
1651{
1653 TransApplyAction apply_action;
1654
1656 ereport(ERROR,
1657 (errcode(ERRCODE_PROTOCOL_VIOLATION),
1658 errmsg_internal("STREAM STOP message without STREAM START")));
1659
1660 apply_action = get_transaction_apply_action(stream_xid, &winfo);
1661
1662 switch (apply_action)
1663 {
1666 break;
1667
1669 Assert(winfo);
1670
1671 /*
1672 * Lock before sending the STREAM_STOP message so that the leader
1673 * can hold the lock first and the parallel apply worker will wait
1674 * for leader to release the lock. See Locking Considerations atop
1675 * applyparallelworker.c.
1676 */
1678
1679 if (pa_send_data(winfo, s->len, s->data))
1680 {
1682 break;
1683 }
1684
1685 /*
1686 * Switch to serialize mode when we are not able to send the
1687 * change to parallel apply worker.
1688 */
1689 pa_switch_to_partial_serialize(winfo, true);
1690
1691 /* fall through */
1696 break;
1697
1699 elog(DEBUG1, "applied %u changes in the streaming chunk",
1701
1702 /*
1703 * By the time parallel apply worker is processing the changes in
1704 * the current streaming block, the leader apply worker may have
1705 * sent multiple streaming blocks. This can lead to parallel apply
1706 * worker start waiting even when there are more chunk of streams
1707 * in the queue. So, try to lock only if there is no message left
1708 * in the queue. See Locking Considerations atop
1709 * applyparallelworker.c.
1710 *
1711 * Note that here we have a race condition where we can start
1712 * waiting even when there are pending streaming chunks. This can
1713 * happen if the leader sends another streaming block and acquires
1714 * the stream lock again after the parallel apply worker checks
1715 * that there is no pending streaming block and before it actually
1716 * starts waiting on a lock. We can handle this case by not
1717 * allowing the leader to increment the stream block count during
1718 * the time parallel apply worker acquires the lock but it is not
1719 * clear whether that is worth the complexity.
1720 *
1721 * Now, if this missed chunk contains rollback to savepoint, then
1722 * there is a risk of deadlock which probably shouldn't happen
1723 * after restart.
1724 */
1726 break;
1727
1728 default:
1729 elog(ERROR, "unexpected apply action: %d", (int) apply_action);
1730 break;
1731 }
1732
1735
1736 /*
1737 * The parallel apply worker could be in a transaction in which case we
1738 * need to report the state as STATE_IDLEINTRANSACTION.
1739 */
1742 else
1744
1746}
1747
1748/*
1749 * Helper function to handle STREAM ABORT message when the transaction was
1750 * serialized to file.
1751 */
1752static void
1754{
1755 /*
1756 * If the two XIDs are the same, it's in fact abort of toplevel xact, so
1757 * just delete the files with serialized info.
1758 */
1759 if (xid == subxid)
1761 else
1762 {
1763 /*
1764 * OK, so it's a subxact. We need to read the subxact file for the
1765 * toplevel transaction, determine the offset tracked for the subxact,
1766 * and truncate the file with changes. We also remove the subxacts
1767 * with higher offsets (or rather higher XIDs).
1768 *
1769 * We intentionally scan the array from the tail, because we're likely
1770 * aborting a change for the most recent subtransactions.
1771 *
1772 * We can't use the binary search here as subxact XIDs won't
1773 * necessarily arrive in sorted order, consider the case where we have
1774 * released the savepoint for multiple subtransactions and then
1775 * performed rollback to savepoint for one of the earlier
1776 * sub-transaction.
1777 */
1778 int64 i;
1779 int64 subidx;
1780 BufFile *fd;
1781 bool found = false;
1782 char path[MAXPGPATH];
1783
1784 subidx = -1;
1787
1788 for (i = subxact_data.nsubxacts; i > 0; i--)
1789 {
1790 if (subxact_data.subxacts[i - 1].xid == subxid)
1791 {
1792 subidx = (i - 1);
1793 found = true;
1794 break;
1795 }
1796 }
1797
1798 /*
1799 * If it's an empty sub-transaction then we will not find the subxid
1800 * here so just cleanup the subxact info and return.
1801 */
1802 if (!found)
1803 {
1804 /* Cleanup the subxact info */
1808 return;
1809 }
1810
1811 /* open the changes file */
1814 O_RDWR, false);
1815
1816 /* OK, truncate the file at the right offset */
1818 subxact_data.subxacts[subidx].offset);
1820
1821 /* discard the subxacts added later */
1822 subxact_data.nsubxacts = subidx;
1823
1824 /* write the updated subxact list */
1826
1829 }
1830}
1831
1832/*
1833 * Handle STREAM ABORT message.
1834 */
1835static void
1837{
1838 TransactionId xid;
1839 TransactionId subxid;
1840 LogicalRepStreamAbortData abort_data;
1842 TransApplyAction apply_action;
1843
1844 /* Save the message before it is consumed. */
1845 StringInfoData original_msg = *s;
1846 bool toplevel_xact;
1847
1849 ereport(ERROR,
1850 (errcode(ERRCODE_PROTOCOL_VIOLATION),
1851 errmsg_internal("STREAM ABORT message without STREAM STOP")));
1852
1853 /* We receive abort information only when we can apply in parallel. */
1854 logicalrep_read_stream_abort(s, &abort_data,
1856
1857 xid = abort_data.xid;
1858 subxid = abort_data.subxid;
1859 toplevel_xact = (xid == subxid);
1860
1861 set_apply_error_context_xact(subxid, abort_data.abort_lsn);
1862
1863 apply_action = get_transaction_apply_action(xid, &winfo);
1864
1865 switch (apply_action)
1866 {
1867 case TRANS_LEADER_APPLY:
1868
1869 /*
1870 * We are in the leader apply worker and the transaction has been
1871 * serialized to file.
1872 */
1873 stream_abort_internal(xid, subxid);
1874
1875 elog(DEBUG1, "finished processing the STREAM ABORT command");
1876 break;
1877
1879 Assert(winfo);
1880
1881 /*
1882 * For the case of aborting the subtransaction, we increment the
1883 * number of streaming blocks and take the lock again before
1884 * sending the STREAM_ABORT to ensure that the parallel apply
1885 * worker will wait on the lock for the next set of changes after
1886 * processing the STREAM_ABORT message if it is not already
1887 * waiting for STREAM_STOP message.
1888 *
1889 * It is important to perform this locking before sending the
1890 * STREAM_ABORT message so that the leader can hold the lock first
1891 * and the parallel apply worker will wait for the leader to
1892 * release the lock. This is the same as what we do in
1893 * apply_handle_stream_stop. See Locking Considerations atop
1894 * applyparallelworker.c.
1895 */
1896 if (!toplevel_xact)
1897 {
1901 }
1902
1903 if (pa_send_data(winfo, s->len, s->data))
1904 {
1905 /*
1906 * Unlike STREAM_COMMIT and STREAM_PREPARE, we don't need to
1907 * wait here for the parallel apply worker to finish as that
1908 * is not required to maintain the commit order and won't have
1909 * the risk of failures due to transaction dependencies and
1910 * deadlocks. However, it is possible that before the parallel
1911 * worker finishes and we clear the worker info, the xid
1912 * wraparound happens on the upstream and a new transaction
1913 * with the same xid can appear and that can lead to duplicate
1914 * entries in ParallelApplyTxnHash. Yet another problem could
1915 * be that we may have serialized the changes in partial
1916 * serialize mode and the file containing xact changes may
1917 * already exist, and after xid wraparound trying to create
1918 * the file for the same xid can lead to an error. To avoid
1919 * these problems, we decide to wait for the aborts to finish.
1920 *
1921 * Note, it is okay to not update the flush location position
1922 * for aborts as in worst case that means such a transaction
1923 * won't be sent again after restart.
1924 */
1925 if (toplevel_xact)
1927
1928 break;
1929 }
1930
1931 /*
1932 * Switch to serialize mode when we are not able to send the
1933 * change to parallel apply worker.
1934 */
1935 pa_switch_to_partial_serialize(winfo, true);
1936
1937 /* fall through */
1939 Assert(winfo);
1940
1941 /*
1942 * Parallel apply worker might have applied some changes, so write
1943 * the STREAM_ABORT message so that it can rollback the
1944 * subtransaction if needed.
1945 */
1947 &original_msg);
1948
1949 if (toplevel_xact)
1950 {
1953 }
1954 break;
1955
1957
1958 /*
1959 * If the parallel apply worker is applying spooled messages then
1960 * close the file before aborting.
1961 */
1962 if (toplevel_xact && stream_fd)
1964
1965 pa_stream_abort(&abort_data);
1966
1967 /*
1968 * We need to wait after processing rollback to savepoint for the
1969 * next set of changes.
1970 *
1971 * We have a race condition here due to which we can start waiting
1972 * here when there are more chunk of streams in the queue. See
1973 * apply_handle_stream_stop.
1974 */
1975 if (!toplevel_xact)
1977
1978 elog(DEBUG1, "finished processing the STREAM ABORT command");
1979 break;
1980
1981 default:
1982 elog(ERROR, "unexpected apply action: %d", (int) apply_action);
1983 break;
1984 }
1985
1987}
1988
1989/*
1990 * Ensure that the passed location is fileset's end.
1991 */
1992static void
1993ensure_last_message(FileSet *stream_fileset, TransactionId xid, int fileno,
1994 off_t offset)
1995{
1996 char path[MAXPGPATH];
1997 BufFile *fd;
1998 int last_fileno;
1999 off_t last_offset;
2000
2002
2004
2006
2007 fd = BufFileOpenFileSet(stream_fileset, path, O_RDONLY, false);
2008
2009 BufFileSeek(fd, 0, 0, SEEK_END);
2010 BufFileTell(fd, &last_fileno, &last_offset);
2011
2013
2015
2016 if (last_fileno != fileno || last_offset != offset)
2017 elog(ERROR, "unexpected message left in streaming transaction's changes file \"%s\"",
2018 path);
2019}
2020
2021/*
2022 * Common spoolfile processing.
2023 */
2024void
2026 XLogRecPtr lsn)
2027{
2028 int nchanges;
2029 char path[MAXPGPATH];
2030 char *buffer = NULL;
2031 MemoryContext oldcxt;
2032 ResourceOwner oldowner;
2033 int fileno;
2034 off_t offset;
2035
2038
2039 /* Make sure we have an open transaction */
2041
2042 /*
2043 * Allocate file handle and memory required to process all the messages in
2044 * TopTransactionContext to avoid them getting reset after each message is
2045 * processed.
2046 */
2048
2049 /* Open the spool file for the committed/prepared transaction */
2051 elog(DEBUG1, "replaying changes from file \"%s\"", path);
2052
2053 /*
2054 * Make sure the file is owned by the toplevel transaction so that the
2055 * file will not be accidentally closed when aborting a subtransaction.
2056 */
2057 oldowner = CurrentResourceOwner;
2059
2060 stream_fd = BufFileOpenFileSet(stream_fileset, path, O_RDONLY, false);
2061
2062 CurrentResourceOwner = oldowner;
2063
2064 buffer = palloc(BLCKSZ);
2065
2066 MemoryContextSwitchTo(oldcxt);
2067
2068 remote_final_lsn = lsn;
2069
2070 /*
2071 * Make sure the handle apply_dispatch methods are aware we're in a remote
2072 * transaction.
2073 */
2074 in_remote_transaction = true;
2076
2078
2079 /*
2080 * Read the entries one by one and pass them through the same logic as in
2081 * apply_dispatch.
2082 */
2083 nchanges = 0;
2084 while (true)
2085 {
2087 size_t nbytes;
2088 int len;
2089
2091
2092 /* read length of the on-disk record */
2093 nbytes = BufFileReadMaybeEOF(stream_fd, &len, sizeof(len), true);
2094
2095 /* have we reached end of the file? */
2096 if (nbytes == 0)
2097 break;
2098
2099 /* do we have a correct length? */
2100 if (len <= 0)
2101 elog(ERROR, "incorrect length %d in streaming transaction's changes file \"%s\"",
2102 len, path);
2103
2104 /* make sure we have sufficiently large buffer */
2105 buffer = repalloc(buffer, len);
2106
2107 /* and finally read the data into the buffer */
2108 BufFileReadExact(stream_fd, buffer, len);
2109
2110 BufFileTell(stream_fd, &fileno, &offset);
2111
2112 /* init a stringinfo using the buffer and call apply_dispatch */
2113 initReadOnlyStringInfo(&s2, buffer, len);
2114
2115 /* Ensure we are reading the data into our memory context. */
2117
2119
2121
2122 MemoryContextSwitchTo(oldcxt);
2123
2124 nchanges++;
2125
2126 /*
2127 * It is possible the file has been closed because we have processed
2128 * the transaction end message like stream_commit in which case that
2129 * must be the last message.
2130 */
2131 if (!stream_fd)
2132 {
2133 ensure_last_message(stream_fileset, xid, fileno, offset);
2134 break;
2135 }
2136
2137 if (nchanges % 1000 == 0)
2138 elog(DEBUG1, "replayed %d changes from file \"%s\"",
2139 nchanges, path);
2140 }
2141
2142 if (stream_fd)
2144
2145 elog(DEBUG1, "replayed %d (all) changes from file \"%s\"",
2146 nchanges, path);
2147
2148 return;
2149}
2150
2151/*
2152 * Handle STREAM COMMIT message.
2153 */
2154static void
2156{
2157 TransactionId xid;
2158 LogicalRepCommitData commit_data;
2160 TransApplyAction apply_action;
2161
2162 /* Save the message before it is consumed. */
2163 StringInfoData original_msg = *s;
2164
2166 ereport(ERROR,
2167 (errcode(ERRCODE_PROTOCOL_VIOLATION),
2168 errmsg_internal("STREAM COMMIT message without STREAM STOP")));
2169
2170 xid = logicalrep_read_stream_commit(s, &commit_data);
2171 set_apply_error_context_xact(xid, commit_data.commit_lsn);
2172
2173 apply_action = get_transaction_apply_action(xid, &winfo);
2174
2175 switch (apply_action)
2176 {
2177 case TRANS_LEADER_APPLY:
2178
2179 /*
2180 * The transaction has been serialized to file, so replay all the
2181 * spooled operations.
2182 */
2184 commit_data.commit_lsn);
2185
2186 apply_handle_commit_internal(&commit_data);
2187
2188 /* Unlink the files with serialized changes and subxact info. */
2190
2191 elog(DEBUG1, "finished processing the STREAM COMMIT command");
2192 break;
2193
2195 Assert(winfo);
2196
2197 if (pa_send_data(winfo, s->len, s->data))
2198 {
2199 /* Finish processing the streaming transaction. */
2200 pa_xact_finish(winfo, commit_data.end_lsn);
2201 break;
2202 }
2203
2204 /*
2205 * Switch to serialize mode when we are not able to send the
2206 * change to parallel apply worker.
2207 */
2208 pa_switch_to_partial_serialize(winfo, true);
2209
2210 /* fall through */
2212 Assert(winfo);
2213
2215 &original_msg);
2216
2218
2219 /* Finish processing the streaming transaction. */
2220 pa_xact_finish(winfo, commit_data.end_lsn);
2221 break;
2222
2224
2225 /*
2226 * If the parallel apply worker is applying spooled messages then
2227 * close the file before committing.
2228 */
2229 if (stream_fd)
2231
2232 apply_handle_commit_internal(&commit_data);
2233
2235
2236 /*
2237 * It is important to set the transaction state as finished before
2238 * releasing the lock. See pa_wait_for_xact_finish.
2239 */
2242
2244
2245 elog(DEBUG1, "finished processing the STREAM COMMIT command");
2246 break;
2247
2248 default:
2249 elog(ERROR, "unexpected apply action: %d", (int) apply_action);
2250 break;
2251 }
2252
2253 /* Process any tables that are being synchronized in parallel. */
2254 process_syncing_tables(commit_data.end_lsn);
2255
2257
2259}
2260
2261/*
2262 * Helper function for apply_handle_commit and apply_handle_stream_commit.
2263 */
2264static void
2266{
2267 if (is_skipping_changes())
2268 {
2270
2271 /*
2272 * Start a new transaction to clear the subskiplsn, if not started
2273 * yet.
2274 */
2275 if (!IsTransactionState())
2277 }
2278
2279 if (IsTransactionState())
2280 {
2281 /*
2282 * The transaction is either non-empty or skipped, so we clear the
2283 * subskiplsn.
2284 */
2286
2287 /*
2288 * Update origin state so we can restart streaming from correct
2289 * position in case of crash.
2290 */
2293
2295
2296 if (IsTransactionBlock())
2297 {
2298 EndTransactionBlock(false);
2300 }
2301
2302 pgstat_report_stat(false);
2303
2305 }
2306 else
2307 {
2308 /* Process any invalidation messages that might have accumulated. */
2311 }
2312
2313 in_remote_transaction = false;
2314}
2315
2316/*
2317 * Handle RELATION message.
2318 *
2319 * Note we don't do validation against local schema here. The validation
2320 * against local schema is postponed until first change for given relation
2321 * comes as we only care about it when applying changes for it anyway and we
2322 * do less locking this way.
2323 */
2324static void
2326{
2327 LogicalRepRelation *rel;
2328
2330 return;
2331
2332 rel = logicalrep_read_rel(s);
2334
2335 /* Also reset all entries in the partition map that refer to remoterel. */
2337}
2338
2339/*
2340 * Handle TYPE message.
2341 *
2342 * This implementation pays no attention to TYPE messages; we expect the user
2343 * to have set things up so that the incoming data is acceptable to the input
2344 * functions for the locally subscribed tables. Hence, we just read and
2345 * discard the message.
2346 */
2347static void
2349{
2350 LogicalRepTyp typ;
2351
2353 return;
2354
2355 logicalrep_read_typ(s, &typ);
2356}
2357
2358/*
2359 * Check that we (the subscription owner) have sufficient privileges on the
2360 * target relation to perform the given operation.
2361 */
2362static void
2364{
2365 Oid relid;
2366 AclResult aclresult;
2367
2368 relid = RelationGetRelid(rel);
2369 aclresult = pg_class_aclcheck(relid, GetUserId(), mode);
2370 if (aclresult != ACLCHECK_OK)
2371 aclcheck_error(aclresult,
2372 get_relkind_objtype(rel->rd_rel->relkind),
2373 get_rel_name(relid));
2374
2375 /*
2376 * We lack the infrastructure to honor RLS policies. It might be possible
2377 * to add such infrastructure here, but tablesync workers lack it, too, so
2378 * we don't bother. RLS does not ordinarily apply to TRUNCATE commands,
2379 * but it seems dangerous to replicate a TRUNCATE and then refuse to
2380 * replicate subsequent INSERTs, so we forbid all commands the same.
2381 */
2382 if (check_enable_rls(relid, InvalidOid, false) == RLS_ENABLED)
2383 ereport(ERROR,
2384 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
2385 errmsg("user \"%s\" cannot replicate into relation with row-level security enabled: \"%s\"",
2388}
2389
2390/*
2391 * Handle INSERT message.
2392 */
2393
2394static void
2396{
2398 LogicalRepTupleData newtup;
2399 LogicalRepRelId relid;
2400 UserContext ucxt;
2401 ApplyExecutionData *edata;
2402 EState *estate;
2403 TupleTableSlot *remoteslot;
2404 MemoryContext oldctx;
2405 bool run_as_owner;
2406
2407 /*
2408 * Quick return if we are skipping data modification changes or handling
2409 * streamed transactions.
2410 */
2411 if (is_skipping_changes() ||
2413 return;
2414
2416
2417 relid = logicalrep_read_insert(s, &newtup);
2420 {
2421 /*
2422 * The relation can't become interesting in the middle of the
2423 * transaction so it's safe to unlock it.
2424 */
2427 return;
2428 }
2429
2430 /*
2431 * Make sure that any user-supplied code runs as the table owner, unless
2432 * the user has opted out of that behavior.
2433 */
2434 run_as_owner = MySubscription->runasowner;
2435 if (!run_as_owner)
2436 SwitchToUntrustedUser(rel->localrel->rd_rel->relowner, &ucxt);
2437
2438 /* Set relation for error callback */
2440
2441 /* Initialize the executor state. */
2442 edata = create_edata_for_relation(rel);
2443 estate = edata->estate;
2444 remoteslot = ExecInitExtraTupleSlot(estate,
2446 &TTSOpsVirtual);
2447
2448 /* Process and store remote tuple in the slot */
2450 slot_store_data(remoteslot, rel, &newtup);
2451 slot_fill_defaults(rel, estate, remoteslot);
2452 MemoryContextSwitchTo(oldctx);
2453
2454 /* For a partitioned table, insert the tuple into a partition. */
2455 if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
2457 remoteslot, NULL, CMD_INSERT);
2458 else
2459 {
2460 ResultRelInfo *relinfo = edata->targetRelInfo;
2461
2462 ExecOpenIndices(relinfo, false);
2463 apply_handle_insert_internal(edata, relinfo, remoteslot);
2464 ExecCloseIndices(relinfo);
2465 }
2466
2467 finish_edata(edata);
2468
2469 /* Reset relation for error callback */
2471
2472 if (!run_as_owner)
2473 RestoreUserContext(&ucxt);
2474
2476
2478}
2479
2480/*
2481 * Workhorse for apply_handle_insert()
2482 * relinfo is for the relation we're actually inserting into
2483 * (could be a child partition of edata->targetRelInfo)
2484 */
2485static void
2487 ResultRelInfo *relinfo,
2488 TupleTableSlot *remoteslot)
2489{
2490 EState *estate = edata->estate;
2491
2492 /* Caller should have opened indexes already. */
2493 Assert(relinfo->ri_IndexRelationDescs != NULL ||
2494 !relinfo->ri_RelationDesc->rd_rel->relhasindex ||
2496
2497 /* Caller will not have done this bit. */
2499 InitConflictIndexes(relinfo);
2500
2501 /* Do the insert. */
2503 ExecSimpleRelationInsert(relinfo, estate, remoteslot);
2504}
2505
2506/*
2507 * Check if the logical replication relation is updatable and throw
2508 * appropriate error if it isn't.
2509 */
2510static void
2512{
2513 /*
2514 * For partitioned tables, we only need to care if the target partition is
2515 * updatable (aka has PK or RI defined for it).
2516 */
2517 if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
2518 return;
2519
2520 /* Updatable, no error. */
2521 if (rel->updatable)
2522 return;
2523
2524 /*
2525 * We are in error mode so it's fine this is somewhat slow. It's better to
2526 * give user correct error.
2527 */
2529 {
2530 ereport(ERROR,
2531 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2532 errmsg("publisher did not send replica identity column "
2533 "expected by the logical replication target relation \"%s.%s\"",
2534 rel->remoterel.nspname, rel->remoterel.relname)));
2535 }
2536
2537 ereport(ERROR,
2538 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2539 errmsg("logical replication target relation \"%s.%s\" has "
2540 "neither REPLICA IDENTITY index nor PRIMARY "
2541 "KEY and published relation does not have "
2542 "REPLICA IDENTITY FULL",
2543 rel->remoterel.nspname, rel->remoterel.relname)));
2544}
2545
2546/*
2547 * Handle UPDATE message.
2548 *
2549 * TODO: FDW support
2550 */
2551static void
2553{
2555 LogicalRepRelId relid;
2556 UserContext ucxt;
2557 ApplyExecutionData *edata;
2558 EState *estate;
2559 LogicalRepTupleData oldtup;
2560 LogicalRepTupleData newtup;
2561 bool has_oldtup;
2562 TupleTableSlot *remoteslot;
2563 RTEPermissionInfo *target_perminfo;
2564 MemoryContext oldctx;
2565 bool run_as_owner;
2566
2567 /*
2568 * Quick return if we are skipping data modification changes or handling
2569 * streamed transactions.
2570 */
2571 if (is_skipping_changes() ||
2573 return;
2574
2576
2577 relid = logicalrep_read_update(s, &has_oldtup, &oldtup,
2578 &newtup);
2581 {
2582 /*
2583 * The relation can't become interesting in the middle of the
2584 * transaction so it's safe to unlock it.
2585 */
2588 return;
2589 }
2590
2591 /* Set relation for error callback */
2593
2594 /* Check if we can do the update. */
2596
2597 /*
2598 * Make sure that any user-supplied code runs as the table owner, unless
2599 * the user has opted out of that behavior.
2600 */
2601 run_as_owner = MySubscription->runasowner;
2602 if (!run_as_owner)
2603 SwitchToUntrustedUser(rel->localrel->rd_rel->relowner, &ucxt);
2604
2605 /* Initialize the executor state. */
2606 edata = create_edata_for_relation(rel);
2607 estate = edata->estate;
2608 remoteslot = ExecInitExtraTupleSlot(estate,
2610 &TTSOpsVirtual);
2611
2612 /*
2613 * Populate updatedCols so that per-column triggers can fire, and so
2614 * executor can correctly pass down indexUnchanged hint. This could
2615 * include more columns than were actually changed on the publisher
2616 * because the logical replication protocol doesn't contain that
2617 * information. But it would for example exclude columns that only exist
2618 * on the subscriber, since we are not touching those.
2619 */
2620 target_perminfo = list_nth(estate->es_rteperminfos, 0);
2621 for (int i = 0; i < remoteslot->tts_tupleDescriptor->natts; i++)
2622 {
2624 int remoteattnum = rel->attrmap->attnums[i];
2625
2626 if (!att->attisdropped && remoteattnum >= 0)
2627 {
2628 Assert(remoteattnum < newtup.ncols);
2629 if (newtup.colstatus[remoteattnum] != LOGICALREP_COLUMN_UNCHANGED)
2630 target_perminfo->updatedCols =
2631 bms_add_member(target_perminfo->updatedCols,
2633 }
2634 }
2635
2636 /* Build the search tuple. */
2638 slot_store_data(remoteslot, rel,
2639 has_oldtup ? &oldtup : &newtup);
2640 MemoryContextSwitchTo(oldctx);
2641
2642 /* For a partitioned table, apply update to correct partition. */
2643 if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
2645 remoteslot, &newtup, CMD_UPDATE);
2646 else
2648 remoteslot, &newtup, rel->localindexoid);
2649
2650 finish_edata(edata);
2651
2652 /* Reset relation for error callback */
2654
2655 if (!run_as_owner)
2656 RestoreUserContext(&ucxt);
2657
2659
2661}
2662
2663/*
2664 * Workhorse for apply_handle_update()
2665 * relinfo is for the relation we're actually updating in
2666 * (could be a child partition of edata->targetRelInfo)
2667 */
2668static void
2670 ResultRelInfo *relinfo,
2671 TupleTableSlot *remoteslot,
2672 LogicalRepTupleData *newtup,
2673 Oid localindexoid)
2674{
2675 EState *estate = edata->estate;
2676 LogicalRepRelMapEntry *relmapentry = edata->targetRel;
2677 Relation localrel = relinfo->ri_RelationDesc;
2678 EPQState epqstate;
2679 TupleTableSlot *localslot = NULL;
2680 ConflictTupleInfo conflicttuple = {0};
2681 bool found;
2682 MemoryContext oldctx;
2683
2684 EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1, NIL);
2685 ExecOpenIndices(relinfo, false);
2686
2687 found = FindReplTupleInLocalRel(edata, localrel,
2688 &relmapentry->remoterel,
2689 localindexoid,
2690 remoteslot, &localslot);
2691
2692 /*
2693 * Tuple found.
2694 *
2695 * Note this will fail if there are other conflicting unique indexes.
2696 */
2697 if (found)
2698 {
2699 /*
2700 * Report the conflict if the tuple was modified by a different
2701 * origin.
2702 */
2703 if (GetTupleTransactionInfo(localslot, &conflicttuple.xmin,
2704 &conflicttuple.origin, &conflicttuple.ts) &&
2705 conflicttuple.origin != replorigin_session_origin)
2706 {
2707 TupleTableSlot *newslot;
2708
2709 /* Store the new tuple for conflict reporting */
2710 newslot = table_slot_create(localrel, &estate->es_tupleTable);
2711 slot_store_data(newslot, relmapentry, newtup);
2712
2713 conflicttuple.slot = localslot;
2714
2716 remoteslot, newslot,
2717 list_make1(&conflicttuple));
2718 }
2719
2720 /* Process and store remote tuple in the slot */
2722 slot_modify_data(remoteslot, localslot, relmapentry, newtup);
2723 MemoryContextSwitchTo(oldctx);
2724
2725 EvalPlanQualSetSlot(&epqstate, remoteslot);
2726
2727 InitConflictIndexes(relinfo);
2728
2729 /* Do the actual update. */
2731 ExecSimpleRelationUpdate(relinfo, estate, &epqstate, localslot,
2732 remoteslot);
2733 }
2734 else
2735 {
2736 TupleTableSlot *newslot = localslot;
2737
2738 /* Store the new tuple for conflict reporting */
2739 slot_store_data(newslot, relmapentry, newtup);
2740
2741 /*
2742 * The tuple to be updated could not be found. Do nothing except for
2743 * emitting a log message.
2744 */
2745 ReportApplyConflict(estate, relinfo, LOG, CT_UPDATE_MISSING,
2746 remoteslot, newslot, list_make1(&conflicttuple));
2747 }
2748
2749 /* Cleanup. */
2750 ExecCloseIndices(relinfo);
2751 EvalPlanQualEnd(&epqstate);
2752}
2753
2754/*
2755 * Handle DELETE message.
2756 *
2757 * TODO: FDW support
2758 */
2759static void
2761{
2763 LogicalRepTupleData oldtup;
2764 LogicalRepRelId relid;
2765 UserContext ucxt;
2766 ApplyExecutionData *edata;
2767 EState *estate;
2768 TupleTableSlot *remoteslot;
2769 MemoryContext oldctx;
2770 bool run_as_owner;
2771
2772 /*
2773 * Quick return if we are skipping data modification changes or handling
2774 * streamed transactions.
2775 */
2776 if (is_skipping_changes() ||
2778 return;
2779
2781
2782 relid = logicalrep_read_delete(s, &oldtup);
2785 {
2786 /*
2787 * The relation can't become interesting in the middle of the
2788 * transaction so it's safe to unlock it.
2789 */
2792 return;
2793 }
2794
2795 /* Set relation for error callback */
2797
2798 /* Check if we can do the delete. */
2800
2801 /*
2802 * Make sure that any user-supplied code runs as the table owner, unless
2803 * the user has opted out of that behavior.
2804 */
2805 run_as_owner = MySubscription->runasowner;
2806 if (!run_as_owner)
2807 SwitchToUntrustedUser(rel->localrel->rd_rel->relowner, &ucxt);
2808
2809 /* Initialize the executor state. */
2810 edata = create_edata_for_relation(rel);
2811 estate = edata->estate;
2812 remoteslot = ExecInitExtraTupleSlot(estate,
2814 &TTSOpsVirtual);
2815
2816 /* Build the search tuple. */
2818 slot_store_data(remoteslot, rel, &oldtup);
2819 MemoryContextSwitchTo(oldctx);
2820
2821 /* For a partitioned table, apply delete to correct partition. */
2822 if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
2824 remoteslot, NULL, CMD_DELETE);
2825 else
2826 {
2827 ResultRelInfo *relinfo = edata->targetRelInfo;
2828
2829 ExecOpenIndices(relinfo, false);
2830 apply_handle_delete_internal(edata, relinfo,
2831 remoteslot, rel->localindexoid);
2832 ExecCloseIndices(relinfo);
2833 }
2834
2835 finish_edata(edata);
2836
2837 /* Reset relation for error callback */
2839
2840 if (!run_as_owner)
2841 RestoreUserContext(&ucxt);
2842
2844
2846}
2847
2848/*
2849 * Workhorse for apply_handle_delete()
2850 * relinfo is for the relation we're actually deleting from
2851 * (could be a child partition of edata->targetRelInfo)
2852 */
2853static void
2855 ResultRelInfo *relinfo,
2856 TupleTableSlot *remoteslot,
2857 Oid localindexoid)
2858{
2859 EState *estate = edata->estate;
2860 Relation localrel = relinfo->ri_RelationDesc;
2861 LogicalRepRelation *remoterel = &edata->targetRel->remoterel;
2862 EPQState epqstate;
2863 TupleTableSlot *localslot;
2864 ConflictTupleInfo conflicttuple = {0};
2865 bool found;
2866
2867 EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1, NIL);
2868
2869 /* Caller should have opened indexes already. */
2870 Assert(relinfo->ri_IndexRelationDescs != NULL ||
2871 !localrel->rd_rel->relhasindex ||
2872 RelationGetIndexList(localrel) == NIL);
2873
2874 found = FindReplTupleInLocalRel(edata, localrel, remoterel, localindexoid,
2875 remoteslot, &localslot);
2876
2877 /* If found delete it. */
2878 if (found)
2879 {
2880 /*
2881 * Report the conflict if the tuple was modified by a different
2882 * origin.
2883 */
2884 if (GetTupleTransactionInfo(localslot, &conflicttuple.xmin,
2885 &conflicttuple.origin, &conflicttuple.ts) &&
2886 conflicttuple.origin != replorigin_session_origin)
2887 {
2888 conflicttuple.slot = localslot;
2890 remoteslot, NULL,
2891 list_make1(&conflicttuple));
2892 }
2893
2894 EvalPlanQualSetSlot(&epqstate, localslot);
2895
2896 /* Do the actual delete. */
2898 ExecSimpleRelationDelete(relinfo, estate, &epqstate, localslot);
2899 }
2900 else
2901 {
2902 /*
2903 * The tuple to be deleted could not be found. Do nothing except for
2904 * emitting a log message.
2905 */
2906 ReportApplyConflict(estate, relinfo, LOG, CT_DELETE_MISSING,
2907 remoteslot, NULL, list_make1(&conflicttuple));
2908 }
2909
2910 /* Cleanup. */
2911 EvalPlanQualEnd(&epqstate);
2912}
2913
2914/*
2915 * Try to find a tuple received from the publication side (in 'remoteslot') in
2916 * the corresponding local relation using either replica identity index,
2917 * primary key, index or if needed, sequential scan.
2918 *
2919 * Local tuple, if found, is returned in '*localslot'.
2920 */
2921static bool
2923 LogicalRepRelation *remoterel,
2924 Oid localidxoid,
2925 TupleTableSlot *remoteslot,
2926 TupleTableSlot **localslot)
2927{
2928 EState *estate = edata->estate;
2929 bool found;
2930
2931 /*
2932 * Regardless of the top-level operation, we're performing a read here, so
2933 * check for SELECT privileges.
2934 */
2936
2937 *localslot = table_slot_create(localrel, &estate->es_tupleTable);
2938
2939 Assert(OidIsValid(localidxoid) ||
2940 (remoterel->replident == REPLICA_IDENTITY_FULL));
2941
2942 if (OidIsValid(localidxoid))
2943 {
2944#ifdef USE_ASSERT_CHECKING
2945 Relation idxrel = index_open(localidxoid, AccessShareLock);
2946
2947 /* Index must be PK, RI, or usable for REPLICA IDENTITY FULL tables */
2948 Assert(GetRelationIdentityOrPK(localrel) == localidxoid ||
2949 (remoterel->replident == REPLICA_IDENTITY_FULL &&
2951 edata->targetRel->attrmap)));
2953#endif
2954
2955 found = RelationFindReplTupleByIndex(localrel, localidxoid,
2957 remoteslot, *localslot);
2958 }
2959 else
2961 remoteslot, *localslot);
2962
2963 return found;
2964}
2965
2966/*
2967 * This handles insert, update, delete on a partitioned table.
2968 */
2969static void
2971 TupleTableSlot *remoteslot,
2972 LogicalRepTupleData *newtup,
2973 CmdType operation)
2974{
2975 EState *estate = edata->estate;
2976 LogicalRepRelMapEntry *relmapentry = edata->targetRel;
2977 ResultRelInfo *relinfo = edata->targetRelInfo;
2978 Relation parentrel = relinfo->ri_RelationDesc;
2979 ModifyTableState *mtstate;
2980 PartitionTupleRouting *proute;
2981 ResultRelInfo *partrelinfo;
2982 Relation partrel;
2983 TupleTableSlot *remoteslot_part;
2984 TupleConversionMap *map;
2985 MemoryContext oldctx;
2986 LogicalRepRelMapEntry *part_entry = NULL;
2987 AttrMap *attrmap = NULL;
2988
2989 /* ModifyTableState is needed for ExecFindPartition(). */
2990 edata->mtstate = mtstate = makeNode(ModifyTableState);
2991 mtstate->ps.plan = NULL;
2992 mtstate->ps.state = estate;
2993 mtstate->operation = operation;
2994 mtstate->resultRelInfo = relinfo;
2995
2996 /* ... as is PartitionTupleRouting. */
2997 edata->proute = proute = ExecSetupPartitionTupleRouting(estate, parentrel);
2998
2999 /*
3000 * Find the partition to which the "search tuple" belongs.
3001 */
3002 Assert(remoteslot != NULL);
3004 partrelinfo = ExecFindPartition(mtstate, relinfo, proute,
3005 remoteslot, estate);
3006 Assert(partrelinfo != NULL);
3007 partrel = partrelinfo->ri_RelationDesc;
3008
3009 /*
3010 * Check for supported relkind. We need this since partitions might be of
3011 * unsupported relkinds; and the set of partitions can change, so checking
3012 * at CREATE/ALTER SUBSCRIPTION would be insufficient.
3013 */
3014 CheckSubscriptionRelkind(partrel->rd_rel->relkind,
3016 RelationGetRelationName(partrel));
3017
3018 /*
3019 * To perform any of the operations below, the tuple must match the
3020 * partition's rowtype. Convert if needed or just copy, using a dedicated
3021 * slot to store the tuple in any case.
3022 */
3023 remoteslot_part = partrelinfo->ri_PartitionTupleSlot;
3024 if (remoteslot_part == NULL)
3025 remoteslot_part = table_slot_create(partrel, &estate->es_tupleTable);
3026 map = ExecGetRootToChildMap(partrelinfo, estate);
3027 if (map != NULL)
3028 {
3029 attrmap = map->attrMap;
3030 remoteslot_part = execute_attr_map_slot(attrmap, remoteslot,
3031 remoteslot_part);
3032 }
3033 else
3034 {
3035 remoteslot_part = ExecCopySlot(remoteslot_part, remoteslot);
3036 slot_getallattrs(remoteslot_part);
3037 }
3038 MemoryContextSwitchTo(oldctx);
3039
3040 /* Check if we can do the update or delete on the leaf partition. */
3041 if (operation == CMD_UPDATE || operation == CMD_DELETE)
3042 {
3043 part_entry = logicalrep_partition_open(relmapentry, partrel,
3044 attrmap);
3045 check_relation_updatable(part_entry);
3046 }
3047
3048 switch (operation)
3049 {
3050 case CMD_INSERT:
3051 apply_handle_insert_internal(edata, partrelinfo,
3052 remoteslot_part);
3053 break;
3054
3055 case CMD_DELETE:
3056 apply_handle_delete_internal(edata, partrelinfo,
3057 remoteslot_part,
3058 part_entry->localindexoid);
3059 break;
3060
3061 case CMD_UPDATE:
3062
3063 /*
3064 * For UPDATE, depending on whether or not the updated tuple
3065 * satisfies the partition's constraint, perform a simple UPDATE
3066 * of the partition or move the updated tuple into a different
3067 * suitable partition.
3068 */
3069 {
3070 TupleTableSlot *localslot;
3071 ResultRelInfo *partrelinfo_new;
3072 Relation partrel_new;
3073 bool found;
3074 EPQState epqstate;
3075 ConflictTupleInfo conflicttuple = {0};
3076
3077 /* Get the matching local tuple from the partition. */
3078 found = FindReplTupleInLocalRel(edata, partrel,
3079 &part_entry->remoterel,
3080 part_entry->localindexoid,
3081 remoteslot_part, &localslot);
3082 if (!found)
3083 {
3084 TupleTableSlot *newslot = localslot;
3085
3086 /* Store the new tuple for conflict reporting */
3087 slot_store_data(newslot, part_entry, newtup);
3088
3089 /*
3090 * The tuple to be updated could not be found. Do nothing
3091 * except for emitting a log message.
3092 */
3093 ReportApplyConflict(estate, partrelinfo, LOG,
3094 CT_UPDATE_MISSING, remoteslot_part,
3095 newslot, list_make1(&conflicttuple));
3096
3097 return;
3098 }
3099
3100 /*
3101 * Report the conflict if the tuple was modified by a
3102 * different origin.
3103 */
3104 if (GetTupleTransactionInfo(localslot, &conflicttuple.xmin,
3105 &conflicttuple.origin,
3106 &conflicttuple.ts) &&
3107 conflicttuple.origin != replorigin_session_origin)
3108 {
3109 TupleTableSlot *newslot;
3110
3111 /* Store the new tuple for conflict reporting */
3112 newslot = table_slot_create(partrel, &estate->es_tupleTable);
3113 slot_store_data(newslot, part_entry, newtup);
3114
3115 conflicttuple.slot = localslot;
3116
3117 ReportApplyConflict(estate, partrelinfo, LOG, CT_UPDATE_ORIGIN_DIFFERS,
3118 remoteslot_part, newslot,
3119 list_make1(&conflicttuple));
3120 }
3121
3122 /*
3123 * Apply the update to the local tuple, putting the result in
3124 * remoteslot_part.
3125 */
3127 slot_modify_data(remoteslot_part, localslot, part_entry,
3128 newtup);
3129 MemoryContextSwitchTo(oldctx);
3130
3131 EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1, NIL);
3132
3133 /*
3134 * Does the updated tuple still satisfy the current
3135 * partition's constraint?
3136 */
3137 if (!partrel->rd_rel->relispartition ||
3138 ExecPartitionCheck(partrelinfo, remoteslot_part, estate,
3139 false))
3140 {
3141 /*
3142 * Yes, so simply UPDATE the partition. We don't call
3143 * apply_handle_update_internal() here, which would
3144 * normally do the following work, to avoid repeating some
3145 * work already done above to find the local tuple in the
3146 * partition.
3147 */
3148 InitConflictIndexes(partrelinfo);
3149
3150 EvalPlanQualSetSlot(&epqstate, remoteslot_part);
3152 ACL_UPDATE);
3153 ExecSimpleRelationUpdate(partrelinfo, estate, &epqstate,
3154 localslot, remoteslot_part);
3155 }
3156 else
3157 {
3158 /* Move the tuple into the new partition. */
3159
3160 /*
3161 * New partition will be found using tuple routing, which
3162 * can only occur via the parent table. We might need to
3163 * convert the tuple to the parent's rowtype. Note that
3164 * this is the tuple found in the partition, not the
3165 * original search tuple received by this function.
3166 */
3167 if (map)
3168 {
3169 TupleConversionMap *PartitionToRootMap =
3171 RelationGetDescr(parentrel));
3172
3173 remoteslot =
3174 execute_attr_map_slot(PartitionToRootMap->attrMap,
3175 remoteslot_part, remoteslot);
3176 }
3177 else
3178 {
3179 remoteslot = ExecCopySlot(remoteslot, remoteslot_part);
3180 slot_getallattrs(remoteslot);
3181 }
3182
3183 /* Find the new partition. */
3185 partrelinfo_new = ExecFindPartition(mtstate, relinfo,
3186 proute, remoteslot,
3187 estate);
3188 MemoryContextSwitchTo(oldctx);
3189 Assert(partrelinfo_new != partrelinfo);
3190 partrel_new = partrelinfo_new->ri_RelationDesc;
3191
3192 /* Check that new partition also has supported relkind. */
3193 CheckSubscriptionRelkind(partrel_new->rd_rel->relkind,
3195 RelationGetRelationName(partrel_new));
3196
3197 /* DELETE old tuple found in the old partition. */
3198 EvalPlanQualSetSlot(&epqstate, localslot);
3200 ExecSimpleRelationDelete(partrelinfo, estate, &epqstate, localslot);
3201
3202 /* INSERT new tuple into the new partition. */
3203
3204 /*
3205 * Convert the replacement tuple to match the destination
3206 * partition rowtype.
3207 */
3209 remoteslot_part = partrelinfo_new->ri_PartitionTupleSlot;
3210 if (remoteslot_part == NULL)
3211 remoteslot_part = table_slot_create(partrel_new,
3212 &estate->es_tupleTable);
3213 map = ExecGetRootToChildMap(partrelinfo_new, estate);
3214 if (map != NULL)
3215 {
3216 remoteslot_part = execute_attr_map_slot(map->attrMap,
3217 remoteslot,
3218 remoteslot_part);
3219 }
3220 else
3221 {
3222 remoteslot_part = ExecCopySlot(remoteslot_part,
3223 remoteslot);
3224 slot_getallattrs(remoteslot);
3225 }
3226 MemoryContextSwitchTo(oldctx);
3227 apply_handle_insert_internal(edata, partrelinfo_new,
3228 remoteslot_part);
3229 }
3230
3231 EvalPlanQualEnd(&epqstate);
3232 }
3233 break;
3234
3235 default:
3236 elog(ERROR, "unrecognized CmdType: %d", (int) operation);
3237 break;
3238 }
3239}
3240
3241/*
3242 * Handle TRUNCATE message.
3243 *
3244 * TODO: FDW support
3245 */
3246static void
3248{
3249 bool cascade = false;
3250 bool restart_seqs = false;
3251 List *remote_relids = NIL;
3252 List *remote_rels = NIL;
3253 List *rels = NIL;
3254 List *part_rels = NIL;
3255 List *relids = NIL;
3256 List *relids_logged = NIL;
3257 ListCell *lc;
3258 LOCKMODE lockmode = AccessExclusiveLock;
3259
3260 /*
3261 * Quick return if we are skipping data modification changes or handling
3262 * streamed transactions.
3263 */
3264 if (is_skipping_changes() ||
3266 return;
3267
3269
3270 remote_relids = logicalrep_read_truncate(s, &cascade, &restart_seqs);
3271
3272 foreach(lc, remote_relids)
3273 {
3274 LogicalRepRelId relid = lfirst_oid(lc);
3276
3277 rel = logicalrep_rel_open(relid, lockmode);
3279 {
3280 /*
3281 * The relation can't become interesting in the middle of the
3282 * transaction so it's safe to unlock it.
3283 */
3284 logicalrep_rel_close(rel, lockmode);
3285 continue;
3286 }
3287
3288 remote_rels = lappend(remote_rels, rel);
3290 rels = lappend(rels, rel->localrel);
3291 relids = lappend_oid(relids, rel->localreloid);
3293 relids_logged = lappend_oid(relids_logged, rel->localreloid);
3294
3295 /*
3296 * Truncate partitions if we got a message to truncate a partitioned
3297 * table.
3298 */
3299 if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
3300 {
3301 ListCell *child;
3302 List *children = find_all_inheritors(rel->localreloid,
3303 lockmode,
3304 NULL);
3305
3306 foreach(child, children)
3307 {
3308 Oid childrelid = lfirst_oid(child);
3309 Relation childrel;
3310
3311 if (list_member_oid(relids, childrelid))
3312 continue;
3313
3314 /* find_all_inheritors already got lock */
3315 childrel = table_open(childrelid, NoLock);
3316
3317 /*
3318 * Ignore temp tables of other backends. See similar code in
3319 * ExecuteTruncate().
3320 */
3321 if (RELATION_IS_OTHER_TEMP(childrel))
3322 {
3323 table_close(childrel, lockmode);
3324 continue;
3325 }
3326
3328 rels = lappend(rels, childrel);
3329 part_rels = lappend(part_rels, childrel);
3330 relids = lappend_oid(relids, childrelid);
3331 /* Log this relation only if needed for logical decoding */
3332 if (RelationIsLogicallyLogged(childrel))
3333 relids_logged = lappend_oid(relids_logged, childrelid);
3334 }
3335 }
3336 }
3337
3338 /*
3339 * Even if we used CASCADE on the upstream primary we explicitly default
3340 * to replaying changes without further cascading. This might be later
3341 * changeable with a user specified option.
3342 *
3343 * MySubscription->runasowner tells us whether we want to execute
3344 * replication actions as the subscription owner; the last argument to
3345 * TruncateGuts tells it whether we want to switch to the table owner.
3346 * Those are exactly opposite conditions.
3347 */
3349 relids,
3350 relids_logged,
3352 restart_seqs,
3354 foreach(lc, remote_rels)
3355 {
3356 LogicalRepRelMapEntry *rel = lfirst(lc);
3357
3359 }
3360 foreach(lc, part_rels)
3361 {
3362 Relation rel = lfirst(lc);
3363
3364 table_close(rel, NoLock);
3365 }
3366
3368}
3369
3370
3371/*
3372 * Logical replication protocol message dispatcher.
3373 */
3374void
3376{
3378 LogicalRepMsgType saved_command;
3379
3380 /*
3381 * Set the current command being applied. Since this function can be
3382 * called recursively when applying spooled changes, save the current
3383 * command.
3384 */
3385 saved_command = apply_error_callback_arg.command;
3387
3388 switch (action)
3389 {
3392 break;
3393
3396 break;
3397
3400 break;
3401
3404 break;
3405
3408 break;
3409
3412 break;
3413
3416 break;
3417
3420 break;
3421
3424 break;
3425
3427
3428 /*
3429 * Logical replication does not use generic logical messages yet.
3430 * Although, it could be used by other applications that use this
3431 * output plugin.
3432 */
3433 break;
3434
3437 break;
3438
3441 break;
3442
3445 break;
3446
3449 break;
3450
3453 break;
3454
3457 break;
3458
3461 break;
3462
3465 break;
3466
3469 break;
3470
3471 default:
3472 ereport(ERROR,
3473 (errcode(ERRCODE_PROTOCOL_VIOLATION),
3474 errmsg("invalid logical replication message type \"??? (%d)\"", action)));
3475 }
3476
3477 /* Reset the current command */
3478 apply_error_callback_arg.command = saved_command;
3479}
3480
3481/*
3482 * Figure out which write/flush positions to report to the walsender process.
3483 *
3484 * We can't simply report back the last LSN the walsender sent us because the
3485 * local transaction might not yet be flushed to disk locally. Instead we
3486 * build a list that associates local with remote LSNs for every commit. When
3487 * reporting back the flush position to the sender we iterate that list and
3488 * check which entries on it are already locally flushed. Those we can report
3489 * as having been flushed.
3490 *
3491 * The have_pending_txes is true if there are outstanding transactions that
3492 * need to be flushed.
3493 */
3494static void
3496 bool *have_pending_txes)
3497{
3498 dlist_mutable_iter iter;
3499 XLogRecPtr local_flush = GetFlushRecPtr(NULL);
3500
3502 *flush = InvalidXLogRecPtr;
3503
3505 {
3506 FlushPosition *pos =
3507 dlist_container(FlushPosition, node, iter.cur);
3508
3509 *write = pos->remote_end;
3510
3511 if (pos->local_end <= local_flush)
3512 {
3513 *flush = pos->remote_end;
3514 dlist_delete(iter.cur);
3515 pfree(pos);
3516 }
3517 else
3518 {
3519 /*
3520 * Don't want to uselessly iterate over the rest of the list which
3521 * could potentially be long. Instead get the last element and
3522 * grab the write position from there.
3523 */
3525 &lsn_mapping);
3526 *write = pos->remote_end;
3527 *have_pending_txes = true;
3528 return;
3529 }
3530 }
3531
3532 *have_pending_txes = !dlist_is_empty(&lsn_mapping);
3533}
3534
3535/*
3536 * Store current remote/local lsn pair in the tracking list.
3537 */
3538void
3540{
3541 FlushPosition *flushpos;
3542
3543 /*
3544 * Skip for parallel apply workers, because the lsn_mapping is maintained
3545 * by the leader apply worker.
3546 */
3548 return;
3549
3550 /* Need to do this in permanent context */
3552
3553 /* Track commit lsn */
3554 flushpos = (FlushPosition *) palloc(sizeof(FlushPosition));
3555 flushpos->local_end = local_lsn;
3556 flushpos->remote_end = remote_lsn;
3557
3558 dlist_push_tail(&lsn_mapping, &flushpos->node);
3560}
3561
3562
3563/* Update statistics of the worker. */
3564static void
3565UpdateWorkerStats(XLogRecPtr last_lsn, TimestampTz send_time, bool reply)
3566{
3567 MyLogicalRepWorker->last_lsn = last_lsn;
3570 if (reply)
3571 {
3572 MyLogicalRepWorker->reply_lsn = last_lsn;
3573 MyLogicalRepWorker->reply_time = send_time;
3574 }
3575}
3576
3577/*
3578 * Apply main loop.
3579 */
3580static void
3582{
3583 TimestampTz last_recv_timestamp = GetCurrentTimestamp();
3584 bool ping_sent = false;
3585 TimeLineID tli;
3586 ErrorContextCallback errcallback;
3587
3588 /*
3589 * Init the ApplyMessageContext which we clean up after each replication
3590 * protocol message.
3591 */
3593 "ApplyMessageContext",
3595
3596 /*
3597 * This memory context is used for per-stream data when the streaming mode
3598 * is enabled. This context is reset on each stream stop.
3599 */
3601 "LogicalStreamingContext",
3603
3604 /* mark as idle, before starting to loop */
3606
3607 /*
3608 * Push apply error context callback. Fields will be filled while applying
3609 * a change.
3610 */
3611 errcallback.callback = apply_error_callback;
3612 errcallback.previous = error_context_stack;
3613 error_context_stack = &errcallback;
3615
3616 /* This outer loop iterates once per wait. */
3617 for (;;)
3618 {
3620 int rc;
3621 int len;
3622 char *buf = NULL;
3623 bool endofstream = false;
3624 long wait_time;
3625
3627
3629
3631
3632 if (len != 0)
3633 {
3634 /* Loop to process all available data (without blocking). */
3635 for (;;)
3636 {
3638
3639 if (len == 0)
3640 {
3641 break;
3642 }
3643 else if (len < 0)
3644 {
3645 ereport(LOG,
3646 (errmsg("data stream from publisher has ended")));
3647 endofstream = true;
3648 break;
3649 }
3650 else
3651 {
3652 int c;
3654
3656 {
3657 ConfigReloadPending = false;
3659 }
3660
3661 /* Reset timeout. */
3662 last_recv_timestamp = GetCurrentTimestamp();
3663 ping_sent = false;
3664
3665 /* Ensure we are reading the data into our memory context. */
3667
3669
3670 c = pq_getmsgbyte(&s);
3671
3672 if (c == 'w')
3673 {
3674 XLogRecPtr start_lsn;
3675 XLogRecPtr end_lsn;
3676 TimestampTz send_time;
3677
3678 start_lsn = pq_getmsgint64(&s);
3679 end_lsn = pq_getmsgint64(&s);
3680 send_time = pq_getmsgint64(&s);
3681
3682 if (last_received < start_lsn)
3683 last_received = start_lsn;
3684
3685 if (last_received < end_lsn)
3686 last_received = end_lsn;
3687
3688 UpdateWorkerStats(last_received, send_time, false);
3689
3690 apply_dispatch(&s);
3691 }
3692 else if (c == 'k')
3693 {
3694 XLogRecPtr end_lsn;
3696 bool reply_requested;
3697
3698 end_lsn = pq_getmsgint64(&s);
3700 reply_requested = pq_getmsgbyte(&s);
3701
3702 if (last_received < end_lsn)
3703 last_received = end_lsn;
3704
3705 send_feedback(last_received, reply_requested, false);
3706 UpdateWorkerStats(last_received, timestamp, true);
3707 }
3708 /* other message types are purposefully ignored */
3709
3711 }
3712
3714 }
3715 }
3716
3717 /* confirm all writes so far */
3718 send_feedback(last_received, false, false);
3719
3721 {
3722 /*
3723 * If we didn't get any transactions for a while there might be
3724 * unconsumed invalidation messages in the queue, consume them
3725 * now.
3726 */
3729
3730 /* Process any table synchronization changes. */
3731 process_syncing_tables(last_received);
3732 }
3733
3734 /* Cleanup the memory. */
3737
3738 /* Check if we need to exit the streaming loop. */
3739 if (endofstream)
3740 break;
3741
3742 /*
3743 * Wait for more data or latch. If we have unflushed transactions,
3744 * wake up after WalWriterDelay to see if they've been flushed yet (in
3745 * which case we should send a feedback message). Otherwise, there's
3746 * no particular urgency about waking up unless we get data or a
3747 * signal.
3748 */
3750 wait_time = WalWriterDelay;
3751 else
3752 wait_time = NAPTIME_PER_CYCLE;
3753
3757 fd, wait_time,
3758 WAIT_EVENT_LOGICAL_APPLY_MAIN);
3759
3760 if (rc & WL_LATCH_SET)
3761 {
3764 }
3765
3767 {
3768 ConfigReloadPending = false;
3770 }
3771
3772 if (rc & WL_TIMEOUT)
3773 {
3774 /*
3775 * We didn't receive anything new. If we haven't heard anything
3776 * from the server for more than wal_receiver_timeout / 2, ping
3777 * the server. Also, if it's been longer than
3778 * wal_receiver_status_interval since the last update we sent,
3779 * send a status update to the primary anyway, to report any
3780 * progress in applying WAL.
3781 */
3782 bool requestReply = false;
3783
3784 /*
3785 * Check if time since last receive from primary has reached the
3786 * configured limit.
3787 */
3788 if (wal_receiver_timeout > 0)
3789 {
3791 TimestampTz timeout;
3792
3793 timeout =
3794 TimestampTzPlusMilliseconds(last_recv_timestamp,
3796
3797 if (now >= timeout)
3798 ereport(ERROR,
3799 (errcode(ERRCODE_CONNECTION_FAILURE),
3800 errmsg("terminating logical replication worker due to timeout")));
3801
3802 /* Check to see if it's time for a ping. */
3803 if (!ping_sent)
3804 {
3805 timeout = TimestampTzPlusMilliseconds(last_recv_timestamp,
3806 (wal_receiver_timeout / 2));
3807 if (now >= timeout)
3808 {
3809 requestReply = true;
3810 ping_sent = true;
3811 }
3812 }
3813 }
3814
3815 send_feedback(last_received, requestReply, requestReply);
3816
3817 /*
3818 * Force reporting to ensure long idle periods don't lead to
3819 * arbitrarily delayed stats. Stats can only be reported outside
3820 * of (implicit or explicit) transactions. That shouldn't lead to
3821 * stats being delayed for long, because transactions are either
3822 * sent as a whole on commit or streamed. Streamed transactions
3823 * are spilled to disk and applied on commit.
3824 */
3825 if (!IsTransactionState())
3826 pgstat_report_stat(true);
3827 }
3828 }
3829
3830 /* Pop the error context stack */
3831 error_context_stack = errcallback.previous;
3833
3834 /* All done */
3836}
3837
3838/*
3839 * Send a Standby Status Update message to server.
3840 *
3841 * 'recvpos' is the latest LSN we've received data to, force is set if we need
3842 * to send a response to avoid timeouts.
3843 */
3844static void
3845send_feedback(XLogRecPtr recvpos, bool force, bool requestReply)
3846{
3847 static StringInfo reply_message = NULL;
3848 static TimestampTz send_time = 0;
3849
3850 static XLogRecPtr last_recvpos = InvalidXLogRecPtr;
3851 static XLogRecPtr last_writepos = InvalidXLogRecPtr;
3852 static XLogRecPtr last_flushpos = InvalidXLogRecPtr;
3853
3854 XLogRecPtr writepos;
3855 XLogRecPtr flushpos;
3857 bool have_pending_txes;
3858
3859 /*
3860 * If the user doesn't want status to be reported to the publisher, be
3861 * sure to exit before doing anything at all.
3862 */
3863 if (!force && wal_receiver_status_interval <= 0)
3864 return;
3865
3866 /* It's legal to not pass a recvpos */
3867 if (recvpos < last_recvpos)
3868 recvpos = last_recvpos;
3869
3870 get_flush_position(&writepos, &flushpos, &have_pending_txes);
3871
3872 /*
3873 * No outstanding transactions to flush, we can report the latest received
3874 * position. This is important for synchronous replication.
3875 */
3876 if (!have_pending_txes)
3877 flushpos = writepos = recvpos;
3878
3879 if (writepos < last_writepos)
3880 writepos = last_writepos;
3881
3882 if (flushpos < last_flushpos)
3883 flushpos = last_flushpos;
3884
3886
3887 /* if we've already reported everything we're good */
3888 if (!force &&
3889 writepos == last_writepos &&
3890 flushpos == last_flushpos &&
3891 !TimestampDifferenceExceeds(send_time, now,
3893 return;
3894 send_time = now;
3895
3896 if (!reply_message)
3897 {
3899
3901 MemoryContextSwitchTo(oldctx);
3902 }
3903 else
3905
3907 pq_sendint64(reply_message, recvpos); /* write */
3908 pq_sendint64(reply_message, flushpos); /* flush */
3909 pq_sendint64(reply_message, writepos); /* apply */
3910 pq_sendint64(reply_message, now); /* sendTime */
3911 pq_sendbyte(reply_message, requestReply); /* replyRequested */
3912
3913 elog(DEBUG2, "sending feedback (force %d) to recv %X/%X, write %X/%X, flush %X/%X",
3914 force,
3915 LSN_FORMAT_ARGS(recvpos),
3916 LSN_FORMAT_ARGS(writepos),
3917 LSN_FORMAT_ARGS(flushpos));
3918
3921
3922 if (recvpos > last_recvpos)
3923 last_recvpos = recvpos;
3924 if (writepos > last_writepos)
3925 last_writepos = writepos;
3926 if (flushpos > last_flushpos)
3927 last_flushpos = flushpos;
3928}
3929
3930/*
3931 * Exit routine for apply workers due to subscription parameter changes.
3932 */
3933static void
3935{
3937 {
3938 /*
3939 * Don't stop the parallel apply worker as the leader will detect the
3940 * subscription parameter change and restart logical replication later
3941 * anyway. This also prevents the leader from reporting errors when
3942 * trying to communicate with a stopped parallel apply worker, which
3943 * would accidentally disable subscriptions if disable_on_error was
3944 * set.
3945 */
3946 return;
3947 }
3948
3949 /*
3950 * Reset the last-start time for this apply worker so that the launcher
3951 * will restart it without waiting for wal_retrieve_retry_interval if the
3952 * subscription is still active, and so that we won't leak that hash table
3953 * entry if it isn't.
3954 */
3957
3958 proc_exit(0);
3959}
3960
3961/*
3962 * Reread subscription info if needed.
3963 *
3964 * For significant changes, we react by exiting the current process; a new
3965 * one will be launched afterwards if needed.
3966 */
3967void
3969{
3970 MemoryContext oldctx;
3972 bool started_tx = false;
3973
3974 /* When cache state is valid there is nothing to do here. */
3976 return;
3977
3978 /* This function might be called inside or outside of transaction. */
3979 if (!IsTransactionState())
3980 {
3982 started_tx = true;
3983 }
3984
3985 /* Ensure allocations in permanent context. */
3987
3989
3990 /*
3991 * Exit if the subscription was removed. This normally should not happen
3992 * as the worker gets killed during DROP SUBSCRIPTION.
3993 */
3994 if (!newsub)
3995 {
3996 ereport(LOG,
3997 (errmsg("logical replication worker for subscription \"%s\" will stop because the subscription was removed",
3998 MySubscription->name)));
3999
4000 /* Ensure we remove no-longer-useful entry for worker's start time */
4003
4004 proc_exit(0);
4005 }
4006
4007 /* Exit if the subscription was disabled. */
4008 if (!newsub->enabled)
4009 {
4010 ereport(LOG,
4011 (errmsg("logical replication worker for subscription \"%s\" will stop because the subscription was disabled",
4012 MySubscription->name)));
4013
4015 }
4016
4017 /* !slotname should never happen when enabled is true. */
4018 Assert(newsub->slotname);
4019
4020 /* two-phase cannot be altered while the worker is running */
4021 Assert(newsub->twophasestate == MySubscription->twophasestate);
4022
4023 /*
4024 * Exit if any parameter that affects the remote connection was changed.
4025 * The launcher will start a new worker but note that the parallel apply
4026 * worker won't restart if the streaming option's value is changed from
4027 * 'parallel' to any other value or the server decides not to stream the
4028 * in-progress transaction.
4029 */
4030 if (strcmp(newsub->conninfo, MySubscription->conninfo) != 0 ||
4031 strcmp(newsub->name, MySubscription->name) != 0 ||
4032 strcmp(newsub->slotname, MySubscription->slotname) != 0 ||
4033 newsub->binary != MySubscription->binary ||
4034 newsub->stream != MySubscription->stream ||
4035 newsub->passwordrequired != MySubscription->passwordrequired ||
4036 strcmp(newsub->origin, MySubscription->origin) != 0 ||
4037 newsub->owner != MySubscription->owner ||
4038 !equal(newsub->publications, MySubscription->publications))
4039 {
4041 ereport(LOG,
4042 (errmsg("logical replication parallel apply worker for subscription \"%s\" will stop because of a parameter change",
4043 MySubscription->name)));
4044 else
4045 ereport(LOG,
4046 (errmsg("logical replication worker for subscription \"%s\" will restart because of a parameter change",
4047 MySubscription->name)));
4048
4050 }
4051
4052 /*
4053 * Exit if the subscription owner's superuser privileges have been
4054 * revoked.
4055 */
4056 if (!newsub->ownersuperuser && MySubscription->ownersuperuser)
4057 {
4059 ereport(LOG,
4060 errmsg("logical replication parallel apply worker for subscription \"%s\" will stop because the subscription owner's superuser privileges have been revoked",
4062 else
4063 ereport(LOG,
4064 errmsg("logical replication worker for subscription \"%s\" will restart because the subscription owner's superuser privileges have been revoked",
4066
4068 }
4069
4070 /* Check for other changes that should never happen too. */
4071 if (newsub->dbid != MySubscription->dbid)
4072 {
4073 elog(ERROR, "subscription %u changed unexpectedly",
4075 }
4076
4077 /* Clean old subscription info and switch to new one. */
4080
4081 MemoryContextSwitchTo(oldctx);
4082
4083 /* Change synchronous commit according to the user's wishes */
4084 SetConfigOption("synchronous_commit", MySubscription->synccommit,
4086
4087 if (started_tx)
4089
4090 MySubscriptionValid = true;
4091}
4092
4093/*
4094 * Callback from subscription syscache invalidation.
4095 */
4096static void
4097subscription_change_cb(Datum arg, int cacheid, uint32 hashvalue)
4098{
4099 MySubscriptionValid = false;
4100}
4101
4102/*
4103 * subxact_info_write
4104 * Store information about subxacts for a toplevel transaction.
4105 *
4106 * For each subxact we store offset of its first change in the main file.
4107 * The file is always over-written as a whole.
4108 *
4109 * XXX We should only store subxacts that were not aborted yet.
4110 */
4111static void
4113{
4114 char path[MAXPGPATH];
4115 Size len;
4116 BufFile *fd;
4117
4119
4120 /* construct the subxact filename */
4121 subxact_filename(path, subid, xid);
4122
4123 /* Delete the subxacts file, if exists. */
4124 if (subxact_data.nsubxacts == 0)
4125 {
4128
4129 return;
4130 }
4131
4132 /*
4133 * Create the subxact file if it not already created, otherwise open the
4134 * existing file.
4135 */
4137 true);
4138 if (fd == NULL)
4140
4142
4143 /* Write the subxact count and subxact info */
4146
4148
4149 /* free the memory allocated for subxact info */
4151}
4152
4153/*
4154 * subxact_info_read
4155 * Restore information about subxacts of a streamed transaction.
4156 *
4157 * Read information about subxacts into the structure subxact_data that can be
4158 * used later.
4159 */
4160static void
4162{
4163 char path[MAXPGPATH];
4164 Size len;
4165 BufFile *fd;
4166 MemoryContext oldctx;
4167
4171
4172 /*
4173 * If the subxact file doesn't exist that means we don't have any subxact
4174 * info.
4175 */
4176 subxact_filename(path, subid, xid);
4178 true);
4179 if (fd == NULL)
4180 return;
4181
4182 /* read number of subxact items */
4184
4186
4187 /* we keep the maximum as a power of 2 */
4189
4190 /*
4191 * Allocate subxact information in the logical streaming context. We need
4192 * this information during the complete stream so that we can add the sub
4193 * transaction info to this. On stream stop we will flush this information
4194 * to the subxact file and reset the logical streaming context.
4195 */
4198 sizeof(SubXactInfo));
4199 MemoryContextSwitchTo(oldctx);
4200
4201 if (len > 0)
4203
4205}
4206
4207/*
4208 * subxact_info_add
4209 * Add information about a subxact (offset in the main file).
4210 */
4211static void
4213{
4214 SubXactInfo *subxacts = subxact_data.subxacts;
4215 int64 i;
4216
4217 /* We must have a valid top level stream xid and a stream fd. */
4219 Assert(stream_fd != NULL);
4220
4221 /*
4222 * If the XID matches the toplevel transaction, we don't want to add it.
4223 */
4224 if (stream_xid == xid)
4225 return;
4226
4227 /*
4228 * In most cases we're checking the same subxact as we've already seen in
4229 * the last call, so make sure to ignore it (this change comes later).
4230 */
4231 if (subxact_data.subxact_last == xid)
4232 return;
4233
4234 /* OK, remember we're processing this XID. */
4236
4237 /*
4238 * Check if the transaction is already present in the array of subxact. We
4239 * intentionally scan the array from the tail, because we're likely adding
4240 * a change for the most recent subtransactions.
4241 *
4242 * XXX Can we rely on the subxact XIDs arriving in sorted order? That
4243 * would allow us to use binary search here.
4244 */
4245 for (i = subxact_data.nsubxacts; i > 0; i--)
4246 {
4247 /* found, so we're done */
4248 if (subxacts[i - 1].xid == xid)
4249 return;
4250 }
4251
4252 /* This is a new subxact, so we need to add it to the array. */
4253 if (subxact_data.nsubxacts == 0)
4254 {
4255 MemoryContext oldctx;
4256
4258
4259 /*
4260 * Allocate this memory for subxacts in per-stream context, see
4261 * subxact_info_read.
4262 */
4264 subxacts = palloc(subxact_data.nsubxacts_max * sizeof(SubXactInfo));
4265 MemoryContextSwitchTo(oldctx);
4266 }
4268 {
4270 subxacts = repalloc(subxacts,
4272 }
4273
4274 subxacts[subxact_data.nsubxacts].xid = xid;
4275
4276 /*
4277 * Get the current offset of the stream file and store it as offset of
4278 * this subxact.
4279 */
4281 &subxacts[subxact_data.nsubxacts].fileno,
4282 &subxacts[subxact_data.nsubxacts].offset);
4283
4285 subxact_data.subxacts = subxacts;
4286}
4287
4288/* format filename for file containing the info about subxacts */
4289static inline void
4290subxact_filename(char *path, Oid subid, TransactionId xid)
4291{
4292 snprintf(path, MAXPGPATH, "%u-%u.subxacts", subid, xid);
4293}
4294
4295/* format filename for file containing serialized changes */
4296static inline void
4297changes_filename(char *path, Oid subid, TransactionId xid)
4298{
4299 snprintf(path, MAXPGPATH, "%u-%u.changes", subid, xid);
4300}
4301
4302/*
4303 * stream_cleanup_files
4304 * Cleanup files for a subscription / toplevel transaction.
4305 *
4306 * Remove files with serialized changes and subxact info for a particular
4307 * toplevel transaction. Each subscription has a separate set of files
4308 * for any toplevel transaction.
4309 */
4310void
4312{
4313 char path[MAXPGPATH];
4314
4315 /* Delete the changes file. */
4316 changes_filename(path, subid, xid);
4318
4319 /* Delete the subxact file, if it exists. */
4320 subxact_filename(path, subid, xid);
4322}
4323
4324/*
4325 * stream_open_file
4326 * Open a file that we'll use to serialize changes for a toplevel
4327 * transaction.
4328 *
4329 * Open a file for streamed changes from a toplevel transaction identified
4330 * by stream_xid (global variable). If it's the first chunk of streamed
4331 * changes for this transaction, create the buffile, otherwise open the
4332 * previously created file.
4333 */
4334static void
4335stream_open_file(Oid subid, TransactionId xid, bool first_segment)
4336{
4337 char path[MAXPGPATH];
4338 MemoryContext oldcxt;
4339
4340 Assert(OidIsValid(subid));
4342 Assert(stream_fd == NULL);
4343
4344
4345 changes_filename(path, subid, xid);
4346 elog(DEBUG1, "opening file \"%s\" for streamed changes", path);
4347
4348 /*
4349 * Create/open the buffiles under the logical streaming context so that we
4350 * have those files until stream stop.
4351 */
4353
4354 /*
4355 * If this is the first streamed segment, create the changes file.
4356 * Otherwise, just open the file for writing, in append mode.
4357 */
4358 if (first_segment)
4360 path);
4361 else
4362 {
4363 /*
4364 * Open the file and seek to the end of the file because we always
4365 * append the changes file.
4366 */
4368 path, O_RDWR, false);
4369 BufFileSeek(stream_fd, 0, 0, SEEK_END);
4370 }
4371
4372 MemoryContextSwitchTo(oldcxt);
4373}
4374
4375/*
4376 * stream_close_file
4377 * Close the currently open file with streamed changes.
4378 */
4379static void
4381{
4382 Assert(stream_fd != NULL);
4383
4385
4386 stream_fd = NULL;
4387}
4388
4389/*
4390 * stream_write_change
4391 * Serialize a change to a file for the current toplevel transaction.
4392 *
4393 * The change is serialized in a simple format, with length (not including
4394 * the length), action code (identifying the message type) and message
4395 * contents (without the subxact TransactionId value).
4396 */
4397static void
4399{
4400 int len;
4401
4402 Assert(stream_fd != NULL);
4403
4404 /* total on-disk size, including the action type character */
4405 len = (s->len - s->cursor) + sizeof(char);
4406
4407 /* first write the size */
4408 BufFileWrite(stream_fd, &len, sizeof(len));
4409
4410 /* then the action */
4411 BufFileWrite(stream_fd, &action, sizeof(action));
4412
4413 /* and finally the remaining part of the buffer (after the XID) */
4414 len = (s->len - s->cursor);
4415
4417}
4418
4419/*
4420 * stream_open_and_write_change
4421 * Serialize a message to a file for the given transaction.
4422 *
4423 * This function is similar to stream_write_change except that it will open the
4424 * target file if not already before writing the message and close the file at
4425 * the end.
4426 */
4427static void
4429{
4431
4432 if (!stream_fd)
4433 stream_start_internal(xid, false);
4434
4437}
4438
4439/*
4440 * Sets streaming options including replication slot name and origin start
4441 * position. Workers need these options for logical replication.
4442 */
4443void
4445 char *slotname,
4446 XLogRecPtr *origin_startpos)
4447{
4448 int server_version;
4449
4450 options->logical = true;
4451 options->startpoint = *origin_startpos;
4452 options->slotname = slotname;
4453
4455 options->proto.logical.proto_version =
4460
4461 options->proto.logical.publication_names = MySubscription->publications;
4462 options->proto.logical.binary = MySubscription->binary;
4463
4464 /*
4465 * Assign the appropriate option value for streaming option according to
4466 * the 'streaming' mode and the publisher's ability to support that mode.
4467 */
4468 if (server_version >= 160000 &&
4469 MySubscription->stream == LOGICALREP_STREAM_PARALLEL)
4470 {
4471 options->proto.logical.streaming_str = "parallel";
4473 }
4474 else if (server_version >= 140000 &&
4475 MySubscription->stream != LOGICALREP_STREAM_OFF)
4476 {
4477 options->proto.logical.streaming_str = "on";
4479 }
4480 else
4481 {
4482 options->proto.logical.streaming_str = NULL;
4484 }
4485
4486 options->proto.logical.twophase = false;
4487 options->proto.logical.origin = pstrdup(MySubscription->origin);
4488}
4489
4490/*
4491 * Cleanup the memory for subxacts and reset the related variables.
4492 */
4493static inline void
4495{
4498
4499 subxact_data.subxacts = NULL;
4503}
4504
4505/*
4506 * Common function to run the apply loop with error handling. Disable the
4507 * subscription, if necessary.
4508 *
4509 * Note that we don't handle FATAL errors which are probably because
4510 * of system resource error and are not repeatable.
4511 */
4512void
4513start_apply(XLogRecPtr origin_startpos)
4514{
4515 PG_TRY();
4516 {
4517 LogicalRepApplyLoop(origin_startpos);
4518 }
4519 PG_CATCH();
4520 {
4521 /*
4522 * Reset the origin state to prevent the advancement of origin
4523 * progress if we fail to apply. Otherwise, this will result in
4524 * transaction loss as that transaction won't be sent again by the
4525 * server.
4526 */
4527 replorigin_reset(0, (Datum) 0);
4528
4531 else
4532 {
4533 /*
4534 * Report the worker failed while applying changes. Abort the
4535 * current transaction so that the stats message is sent in an
4536 * idle state.
4537 */
4540
4541 PG_RE_THROW();
4542 }
4543 }
4544 PG_END_TRY();
4545}
4546
4547/*
4548 * Runs the leader apply worker.
4549 *
4550 * It sets up replication origin, streaming options and then starts streaming.
4551 */
4552static void
4554{
4555 char originname[NAMEDATALEN];
4556 XLogRecPtr origin_startpos = InvalidXLogRecPtr;
4557 char *slotname = NULL;
4559 RepOriginId originid;
4560 TimeLineID startpointTLI;
4561 char *err;
4562 bool must_use_password;
4563
4564 slotname = MySubscription->slotname;
4565
4566 /*
4567 * This shouldn't happen if the subscription is enabled, but guard against
4568 * DDL bugs or manual catalog changes. (libpqwalreceiver will crash if
4569 * slot is NULL.)
4570 */
4571 if (!slotname)
4572 ereport(ERROR,
4573 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
4574 errmsg("subscription has no replication slot set")));
4575
4576 /* Setup replication origin tracking. */
4578 originname, sizeof(originname));
4580 originid = replorigin_by_name(originname, true);
4581 if (!OidIsValid(originid))
4582 originid = replorigin_create(originname);
4583 replorigin_session_setup(originid, 0);
4584 replorigin_session_origin = originid;
4585 origin_startpos = replorigin_session_get_progress(false);
4587
4588 /* Is the use of a password mandatory? */
4589 must_use_password = MySubscription->passwordrequired &&
4591
4593 true, must_use_password,
4595
4596 if (LogRepWorkerWalRcvConn == NULL)
4597 ereport(ERROR,
4598 (errcode(ERRCODE_CONNECTION_FAILURE),
4599 errmsg("apply worker for subscription \"%s\" could not connect to the publisher: %s",
4600 MySubscription->name, err)));
4601
4602 /*
4603 * We don't really use the output identify_system for anything but it does
4604 * some initializations on the upstream so let's still call it.
4605 */
4606 (void) walrcv_identify_system(LogRepWorkerWalRcvConn, &startpointTLI);
4607
4609
4610 set_stream_options(&options, slotname, &origin_startpos);
4611
4612 /*
4613 * Even when the two_phase mode is requested by the user, it remains as
4614 * the tri-state PENDING until all tablesyncs have reached READY state.
4615 * Only then, can it become ENABLED.
4616 *
4617 * Note: If the subscription has no tables then leave the state as
4618 * PENDING, which allows ALTER SUBSCRIPTION ... REFRESH PUBLICATION to
4619 * work.
4620 */
4621 if (MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_PENDING &&
4623 {
4624 /* Start streaming with two_phase enabled */
4625 options.proto.logical.twophase = true;
4627
4629 UpdateTwoPhaseState(MySubscription->oid, LOGICALREP_TWOPHASE_STATE_ENABLED);
4630 MySubscription->twophasestate = LOGICALREP_TWOPHASE_STATE_ENABLED;
4632 }
4633 else
4634 {
4636 }
4637
4639 (errmsg_internal("logical replication apply worker for subscription \"%s\" two_phase is %s",
4641 MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_DISABLED ? "DISABLED" :
4642 MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_PENDING ? "PENDING" :
4643 MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_ENABLED ? "ENABLED" :
4644 "?")));
4645
4646 /* Run the main loop. */
4647 start_apply(origin_startpos);
4648}
4649
4650/*
4651 * Common initialization for leader apply worker, parallel apply worker and
4652 * tablesync worker.
4653 *
4654 * Initialize the database connection, in-memory subscription and necessary
4655 * config options.
4656 */
4657void
4659{
4660 MemoryContext oldctx;
4661
4662 /* Run as replica session replication role. */
4663 SetConfigOption("session_replication_role", "replica",
4665
4666 /* Connect to our database. */
4669 0);
4670
4671 /*
4672 * Set always-secure search path, so malicious users can't redirect user
4673 * code (e.g. pg_index.indexprs).
4674 */
4675 SetConfigOption("search_path", "", PGC_SUSET, PGC_S_OVERRIDE);
4676
4677 /* Load the subscription into persistent memory context. */
4679 "ApplyContext",
4683
4685 if (!MySubscription)
4686 {
4687 ereport(LOG,
4688 (errmsg("logical replication worker for subscription %u will not start because the subscription was removed during startup",
4690
4691 /* Ensure we remove no-longer-useful entry for worker's start time */
4694
4695 proc_exit(0);
4696 }
4697
4698 MySubscriptionValid = true;
4699 MemoryContextSwitchTo(oldctx);
4700
4701 if (!MySubscription->enabled)
4702 {
4703 ereport(LOG,
4704 (errmsg("logical replication worker for subscription \"%s\" will not start because the subscription was disabled during startup",
4705 MySubscription->name)));
4706
4708 }
4709
4710 /* Setup synchronous commit according to the user's wishes */
4711 SetConfigOption("synchronous_commit", MySubscription->synccommit,
4713
4714 /*
4715 * Keep us informed about subscription or role changes. Note that the
4716 * role's superuser privilege can be revoked.
4717 */
4718 CacheRegisterSyscacheCallback(SUBSCRIPTIONOID,
4720 (Datum) 0);
4721
4724 (Datum) 0);
4725
4726 if (am_tablesync_worker())
4727 ereport(LOG,
4728 (errmsg("logical replication table synchronization worker for subscription \"%s\", table \"%s\" has started",
4731 else
4732 ereport(LOG,
4733 (errmsg("logical replication apply worker for subscription \"%s\" has started",
4734 MySubscription->name)));
4735
4737}
4738
4739/*
4740 * Reset the origin state.
4741 */
4742static void
4744{
4748}
4749
4750/* Common function to setup the leader apply or tablesync worker. */
4751void
4753{
4754 /* Attach to slot */
4755 logicalrep_worker_attach(worker_slot);
4756
4758
4759 /* Setup signal handling */
4761 pqsignal(SIGTERM, die);
4763
4764 /*
4765 * We don't currently need any ResourceOwner in a walreceiver process, but
4766 * if we did, we could call CreateAuxProcessResourceOwner here.
4767 */
4768
4769 /* Initialise stats to a sanish value */
4772
4773 /* Load the libpq-specific functions */
4774 load_file("libpqwalreceiver", false);
4775
4777
4778 /*
4779 * Register a callback to reset the origin state before aborting any
4780 * pending transaction during shutdown (see ShutdownPostgres()). This will
4781 * avoid origin advancement for an in-complete transaction which could
4782 * otherwise lead to its loss as such a transaction won't be sent by the
4783 * server again.
4784 *
4785 * Note that even a LOG or DEBUG statement placed after setting the origin
4786 * state may process a shutdown signal before committing the current apply
4787 * operation. So, it is important to register such a callback here.
4788 */
4790
4791 /* Connect to the origin and start the replication. */
4792 elog(DEBUG1, "connecting to publisher using connection string \"%s\"",
4794
4795 /*
4796 * Setup callback for syscache so that we know when something changes in
4797 * the subscription relation state.
4798 */
4799 CacheRegisterSyscacheCallback(SUBSCRIPTIONRELMAP,
4801 (Datum) 0);
4802}
4803
4804/* Logical Replication Apply worker entry point */
4805void
4807{
4808 int worker_slot = DatumGetInt32(main_arg);
4809
4811
4812 SetupApplyOrSyncWorker(worker_slot);
4813
4815
4817
4818 proc_exit(0);
4819}
4820
4821/*
4822 * After error recovery, disable the subscription in a new transaction
4823 * and exit cleanly.
4824 */
4825void
4827{
4828 /*
4829 * Emit the error message, and recover from the error state to an idle
4830 * state
4831 */
4833
4837
4839
4840 /* Report the worker failed during either table synchronization or apply */
4843
4844 /* Disable the subscription */
4848
4849 /* Ensure we remove no-longer-useful entry for worker's start time */
4852
4853 /* Notify the subscription has been disabled and exit */
4854 ereport(LOG,
4855 errmsg("subscription \"%s\" has been disabled because of an error",
4857
4858 proc_exit(0);
4859}
4860
4861/*
4862 * Is current process a logical replication worker?
4863 */
4864bool
4866{
4867 return MyLogicalRepWorker != NULL;
4868}
4869
4870/*
4871 * Is current process a logical replication parallel apply worker?
4872 */
4873bool
4875{
4877}
4878
4879/*
4880 * Start skipping changes of the transaction if the given LSN matches the
4881 * LSN specified by subscription's skiplsn.
4882 */
4883static void
4885{
4889
4890 /*
4891 * Quick return if it's not requested to skip this transaction. This
4892 * function is called for every remote transaction and we assume that
4893 * skipping the transaction is not used often.
4894 */
4896 MySubscription->skiplsn != finish_lsn))
4897 return;
4898
4899 /* Start skipping all changes of this transaction */
4900 skip_xact_finish_lsn = finish_lsn;
4901
4902 ereport(LOG,
4903 errmsg("logical replication starts skipping transaction at LSN %X/%X",
4905}
4906
4907/*
4908 * Stop skipping changes by resetting skip_xact_finish_lsn if enabled.
4909 */
4910static void
4912{
4913 if (!is_skipping_changes())
4914 return;
4915
4916 ereport(LOG,
4917 (errmsg("logical replication completed skipping transaction at LSN %X/%X",
4919
4920 /* Stop skipping changes */
4922}
4923
4924/*
4925 * Clear subskiplsn of pg_subscription catalog.
4926 *
4927 * finish_lsn is the transaction's finish LSN that is used to check if the
4928 * subskiplsn matches it. If not matched, we raise a warning when clearing the
4929 * subskiplsn in order to inform users for cases e.g., where the user mistakenly
4930 * specified the wrong subskiplsn.
4931 */
4932static void
4934{
4935 Relation rel;
4936 Form_pg_subscription subform;
4937 HeapTuple tup;
4938 XLogRecPtr myskiplsn = MySubscription->skiplsn;
4939 bool started_tx = false;
4940
4942 return;
4943
4944 if (!IsTransactionState())
4945 {
4947 started_tx = true;
4948 }
4949
4950 /*
4951 * Protect subskiplsn of pg_subscription from being concurrently updated
4952 * while clearing it.
4953 */
4954 LockSharedObject(SubscriptionRelationId, MySubscription->oid, 0,
4956
4957 rel = table_open(SubscriptionRelationId, RowExclusiveLock);
4958
4959 /* Fetch the existing tuple. */
4960 tup = SearchSysCacheCopy1(SUBSCRIPTIONOID,
4962
4963 if (!HeapTupleIsValid(tup))
4964 elog(ERROR, "subscription \"%s\" does not exist", MySubscription->name);
4965
4966 subform = (Form_pg_subscription) GETSTRUCT(tup);
4967
4968 /*
4969 * Clear the subskiplsn. If the user has already changed subskiplsn before
4970 * clearing it we don't update the catalog and the replication origin
4971 * state won't get advanced. So in the worst case, if the server crashes
4972 * before sending an acknowledgment of the flush position the transaction
4973 * will be sent again and the user needs to set subskiplsn again. We can
4974 * reduce the possibility by logging a replication origin WAL record to
4975 * advance the origin LSN instead but there is no way to advance the
4976 * origin timestamp and it doesn't seem to be worth doing anything about
4977 * it since it's a very rare case.
4978 */
4979 if (subform->subskiplsn == myskiplsn)
4980 {
4981 bool nulls[Natts_pg_subscription];
4982 bool replaces[Natts_pg_subscription];
4983 Datum values[Natts_pg_subscription];
4984
4985 memset(values, 0, sizeof(values));
4986 memset(nulls, false, sizeof(nulls));
4987 memset(replaces, false, sizeof(replaces));
4988
4989 /* reset subskiplsn */
4990 values[Anum_pg_subscription_subskiplsn - 1] = LSNGetDatum(InvalidXLogRecPtr);
4991 replaces[Anum_pg_subscription_subskiplsn - 1] = true;
4992
4993 tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
4994 replaces);
4995 CatalogTupleUpdate(rel, &tup->t_self, tup);
4996
4997 if (myskiplsn != finish_lsn)
4999 errmsg("skip-LSN of subscription \"%s\" cleared", MySubscription->name),
5000 errdetail("Remote transaction's finish WAL location (LSN) %X/%X did not match skip-LSN %X/%X.",
5001 LSN_FORMAT_ARGS(finish_lsn),
5002 LSN_FORMAT_ARGS(myskiplsn)));
5003 }
5004
5005 heap_freetuple(tup);
5006 table_close(rel, NoLock);
5007
5008 if (started_tx)
5010}
5011
5012/* Error callback to give more context info about the change being applied */
5013void
5015{
5017
5019 return;
5020
5021 Assert(errarg->origin_name);
5022
5023 if (errarg->rel == NULL)
5024 {
5025 if (!TransactionIdIsValid(errarg->remote_xid))
5026 errcontext("processing remote data for replication origin \"%s\" during message type \"%s\"",
5027 errarg->origin_name,
5029 else if (XLogRecPtrIsInvalid(errarg->finish_lsn))
5030 errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" in transaction %u",
5031 errarg->origin_name,
5033 errarg->remote_xid);
5034 else
5035 errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" in transaction %u, finished at %X/%X",
5036 errarg->origin_name,
5038 errarg->remote_xid,
5039 LSN_FORMAT_ARGS(errarg->finish_lsn));
5040 }
5041 else
5042 {
5043 if (errarg->remote_attnum < 0)
5044 {
5045 if (XLogRecPtrIsInvalid(errarg->finish_lsn))
5046 errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" for replication target relation \"%s.%s\" in transaction %u",
5047 errarg->origin_name,
5049 errarg->rel->remoterel.nspname,
5050 errarg->rel->remoterel.relname,
5051 errarg->remote_xid);
5052 else
5053 errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" for replication target relation \"%s.%s\" in transaction %u, finished at %X/%X",
5054 errarg->origin_name,
5056 errarg->rel->remoterel.nspname,
5057 errarg->rel->remoterel.relname,
5058 errarg->remote_xid,
5059 LSN_FORMAT_ARGS(errarg->finish_lsn));
5060 }
5061 else
5062 {
5063 if (XLogRecPtrIsInvalid(errarg->finish_lsn))
5064 errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" for replication target relation \"%s.%s\" column \"%s\" in transaction %u",
5065 errarg->origin_name,
5067 errarg->rel->remoterel.nspname,
5068 errarg->rel->remoterel.relname,
5069 errarg->rel->remoterel.attnames[errarg->remote_attnum],
5070 errarg->remote_xid);
5071 else
5072 errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" for replication target relation \"%s.%s\" column \"%s\" in transaction %u, finished at %X/%X",
5073 errarg->origin_name,
5075 errarg->rel->remoterel.nspname,
5076 errarg->rel->remoterel.relname,
5077 errarg->rel->remoterel.attnames[errarg->remote_attnum],
5078 errarg->remote_xid,
5079 LSN_FORMAT_ARGS(errarg->finish_lsn));
5080 }
5081 }
5082}
5083
5084/* Set transaction information of apply error callback */
5085static inline void
5087{
5090}
5091
5092/* Reset all information of apply error callback */
5093static inline void
5095{
5100}
5101
5102/*
5103 * Request wakeup of the workers for the given subscription OID
5104 * at commit of the current transaction.
5105 *
5106 * This is used to ensure that the workers process assorted changes
5107 * as soon as possible.
5108 */
5109void
5111{
5112 MemoryContext oldcxt;
5113
5117 MemoryContextSwitchTo(oldcxt);
5118}
5119
5120/*
5121 * Wake up the workers of any subscriptions that were changed in this xact.
5122 */
5123void
5125{
5126 if (isCommit && on_commit_wakeup_workers_subids != NIL)
5127 {
5128 ListCell *lc;
5129
5130 LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
5132 {
5133 Oid subid = lfirst_oid(lc);
5134 List *workers;
5135 ListCell *lc2;
5136
5137 workers = logicalrep_workers_find(subid, true, false);
5138 foreach(lc2, workers)
5139 {
5140 LogicalRepWorker *worker = (LogicalRepWorker *) lfirst(lc2);
5141
5143 }
5144 }
5145 LWLockRelease(LogicalRepWorkerLock);
5146 }
5147
5148 /* The List storage will be reclaimed automatically in xact cleanup. */
5150}
5151
5152/*
5153 * Allocate the origin name in long-lived context for error context message.
5154 */
5155void
5157{
5159 originname);
5160}
5161
5162/*
5163 * Return the action to be taken for the given transaction. See
5164 * TransApplyAction for information on each of the actions.
5165 *
5166 * *winfo is assigned to the destination parallel worker info when the leader
5167 * apply worker has to pass all the transaction's changes to the parallel
5168 * apply worker.
5169 */
5170static TransApplyAction
5172{
5173 *winfo = NULL;
5174
5176 {
5177 return TRANS_PARALLEL_APPLY;
5178 }
5179
5180 /*
5181 * If we are processing this transaction using a parallel apply worker
5182 * then either we send the changes to the parallel worker or if the worker
5183 * is busy then serialize the changes to the file which will later be
5184 * processed by the parallel worker.
5185 */
5186 *winfo = pa_find_worker(xid);
5187
5188 if (*winfo && (*winfo)->serialize_changes)
5189 {
5191 }
5192 else if (*winfo)
5193 {
5195 }
5196
5197 /*
5198 * If there is no parallel worker involved to process this transaction
5199 * then we either directly apply the change or serialize it to a file
5200 * which will later be applied when the transaction finish message is
5201 * processed.
5202 */
5203 else if (in_streamed_transaction)
5204 {
5206 }
5207 else
5208 {
5209 return TRANS_LEADER_APPLY;
5210 }
5211}
AclResult
Definition: acl.h:182
@ ACLCHECK_OK
Definition: acl.h:183
void aclcheck_error(AclResult aclerr, ObjectType objtype, const char *objectname)
Definition: aclchk.c:2639
AclResult pg_class_aclcheck(Oid table_oid, Oid roleid, AclMode mode)
Definition: aclchk.c:4024
void pa_set_xact_state(ParallelApplyWorkerShared *wshared, ParallelTransState xact_state)
void pa_unlock_stream(TransactionId xid, LOCKMODE lockmode)
void pa_stream_abort(LogicalRepStreamAbortData *abort_data)
void pa_lock_stream(TransactionId xid, LOCKMODE lockmode)
void pa_set_fileset_state(ParallelApplyWorkerShared *wshared, PartialFileSetState fileset_state)
void pa_reset_subtrans(void)
void pa_lock_transaction(TransactionId xid, LOCKMODE lockmode)
ParallelApplyWorkerShared * MyParallelShared
void pa_start_subtrans(TransactionId current_xid, TransactionId top_xid)
void pa_switch_to_partial_serialize(ParallelApplyWorkerInfo *winfo, bool stream_locked)
void pa_xact_finish(ParallelApplyWorkerInfo *winfo, XLogRecPtr remote_lsn)
bool pa_send_data(ParallelApplyWorkerInfo *winfo, Size nbytes, const void *data)
void pa_allocate_worker(TransactionId xid)
void pa_set_stream_apply_worker(ParallelApplyWorkerInfo *winfo)
ParallelApplyWorkerInfo * pa_find_worker(TransactionId xid)
void pa_unlock_transaction(TransactionId xid, LOCKMODE lockmode)
void pa_decr_and_wait_stream_block(void)
static uint32 pg_atomic_add_fetch_u32(volatile pg_atomic_uint32 *ptr, int32 add_)
Definition: atomics.h:424
static void check_relation_updatable(LogicalRepRelMapEntry *rel)
Definition: worker.c:2511
static void subxact_filename(char *path, Oid subid, TransactionId xid)
Definition: worker.c:4290
static void begin_replication_step(void)
Definition: worker.c:508
static void end_replication_step(void)
Definition: worker.c:531
static ApplyExecutionData * create_edata_for_relation(LogicalRepRelMapEntry *rel)
Definition: worker.c:652
static void cleanup_subxact_info(void)
Definition: worker.c:4494
void set_stream_options(WalRcvStreamOptions *options, char *slotname, XLogRecPtr *origin_startpos)
Definition: worker.c:4444
static void apply_handle_stream_prepare(StringInfo s)
Definition: worker.c:1287
static void apply_handle_insert_internal(ApplyExecutionData *edata, ResultRelInfo *relinfo, TupleTableSlot *remoteslot)
Definition: worker.c:2486
static void subxact_info_add(TransactionId xid)
Definition: worker.c:4212
void stream_cleanup_files(Oid subid, TransactionId xid)
Definition: worker.c:4311
MemoryContext ApplyMessageContext
Definition: worker.c:291
static bool should_apply_changes_for_rel(LogicalRepRelMapEntry *rel)
Definition: worker.c:468
static void apply_handle_type(StringInfo s)
Definition: worker.c:2348
static void apply_handle_truncate(StringInfo s)
Definition: worker.c:3247
static void UpdateWorkerStats(XLogRecPtr last_lsn, TimestampTz send_time, bool reply)
Definition: worker.c:3565
static void subscription_change_cb(Datum arg, int cacheid, uint32 hashvalue)
Definition: worker.c:4097
static TransApplyAction get_transaction_apply_action(TransactionId xid, ParallelApplyWorkerInfo **winfo)
Definition: worker.c:5171
TransApplyAction
Definition: worker.c:267
@ TRANS_LEADER_SERIALIZE
Definition: worker.c:272
@ TRANS_PARALLEL_APPLY
Definition: worker.c:275
@ TRANS_LEADER_SEND_TO_PARALLEL
Definition: worker.c:273
@ TRANS_LEADER_APPLY
Definition: worker.c:269
@ TRANS_LEADER_PARTIAL_SERIALIZE
Definition: worker.c:274
static bool handle_streamed_transaction(LogicalRepMsgType action, StringInfo s)
Definition: worker.c:559
static void stream_open_and_write_change(TransactionId xid, char action, StringInfo s)
Definition: worker.c:4428
struct ApplyExecutionData ApplyExecutionData
static void changes_filename(char *path, Oid subid, TransactionId xid)
Definition: worker.c:4297
bool InitializingApplyWorker
Definition: worker.c:319
static void apply_worker_exit(void)
Definition: worker.c:3934
static BufFile * stream_fd
Definition: worker.c:340
static void apply_handle_update(StringInfo s)
Definition: worker.c:2552
void stream_stop_internal(TransactionId xid)
Definition: worker.c:1627
static void apply_handle_stream_commit(StringInfo s)
Definition: worker.c:2155
void start_apply(XLogRecPtr origin_startpos)
Definition: worker.c:4513
static void stop_skipping_changes(void)
Definition: worker.c:4911
struct ApplySubXactData ApplySubXactData
#define NAPTIME_PER_CYCLE
Definition: worker.c:196
static bool FindReplTupleInLocalRel(ApplyExecutionData *edata, Relation localrel, LogicalRepRelation *remoterel, Oid localidxoid, TupleTableSlot *remoteslot, TupleTableSlot **localslot)
Definition: worker.c:2922
static void get_flush_position(XLogRecPtr *write, XLogRecPtr *flush, bool *have_pending_txes)
Definition: worker.c:3495
static uint32 parallel_stream_nchanges
Definition: worker.c:316
static void apply_handle_commit_prepared(StringInfo s)
Definition: worker.c:1180
static void LogicalRepApplyLoop(XLogRecPtr last_received)
Definition: worker.c:3581
void LogicalRepWorkersWakeupAtCommit(Oid subid)
Definition: worker.c:5110
bool IsLogicalWorker(void)
Definition: worker.c:4865
static ApplySubXactData subxact_data
Definition: worker.c:358
static void apply_handle_tuple_routing(ApplyExecutionData *edata, TupleTableSlot *remoteslot, LogicalRepTupleData *newtup, CmdType operation)
Definition: worker.c:2970
static ApplyErrorCallbackArg apply_error_callback_arg
Definition: worker.c:279
bool in_remote_transaction
Definition: worker.c:304
static XLogRecPtr skip_xact_finish_lsn
Definition: worker.c:336
static void stream_open_file(Oid subid, TransactionId xid, bool first_segment)
Definition: worker.c:4335
static void apply_handle_delete(StringInfo s)
Definition: worker.c:2760
void apply_dispatch(StringInfo s)
Definition: worker.c:3375
#define is_skipping_changes()
Definition: worker.c:337
static void stream_write_change(char action, StringInfo s)
Definition: worker.c:4398
static void clear_subscription_skip_lsn(XLogRecPtr finish_lsn)
Definition: worker.c:4933
static void replorigin_reset(int code, Datum arg)
Definition: worker.c:4743
static void apply_handle_update_internal(ApplyExecutionData *edata, ResultRelInfo *relinfo, TupleTableSlot *remoteslot, LogicalRepTupleData *newtup, Oid localindexoid)
Definition: worker.c:2669
static void ensure_last_message(FileSet *stream_fileset, TransactionId xid, int fileno, off_t offset)
Definition: worker.c:1993
static void apply_handle_begin(StringInfo s)
Definition: worker.c:992
void DisableSubscriptionAndExit(void)
Definition: worker.c:4826
static dlist_head lsn_mapping
Definition: worker.c:205
bool IsLogicalParallelApplyWorker(void)
Definition: worker.c:4874
void AtEOXact_LogicalRepWorkers(bool isCommit)
Definition: worker.c:5124
static void slot_store_data(TupleTableSlot *slot, LogicalRepRelMapEntry *rel, LogicalRepTupleData *tupleData)
Definition: worker.c:798
void ReplicationOriginNameForLogicalRep(Oid suboid, Oid relid, char *originname, Size szoriginname)
Definition: worker.c:428
static void finish_edata(ApplyExecutionData *edata)
Definition: worker.c:710
static void slot_modify_data(TupleTableSlot *slot, TupleTableSlot *srcslot, LogicalRepRelMapEntry *rel, LogicalRepTupleData *tupleData)
Definition: worker.c:899
static void set_apply_error_context_xact(TransactionId xid, XLogRecPtr lsn)
Definition: worker.c:5086
ErrorContextCallback * apply_error_context_stack
Definition: worker.c:289
static void stream_abort_internal(TransactionId xid, TransactionId subxid)
Definition: worker.c:1753
static void apply_handle_commit(StringInfo s)
Definition: worker.c:1017
void stream_start_internal(TransactionId xid, bool first_segment)
Definition: worker.c:1453
static List * on_commit_wakeup_workers_subids
Definition: worker.c:302
static void apply_handle_stream_abort(StringInfo s)
Definition: worker.c:1836
static void apply_handle_relation(StringInfo s)
Definition: worker.c:2325
void set_apply_error_context_origin(char *originname)
Definition: worker.c:5156
struct ApplyErrorCallbackArg ApplyErrorCallbackArg
MemoryContext ApplyContext
Definition: worker.c:292
static void subxact_info_write(Oid subid, TransactionId xid)
Definition: worker.c:4112
static void TargetPrivilegesCheck(Relation rel, AclMode mode)
Definition: worker.c:2363
static void apply_handle_prepare(StringInfo s)
Definition: worker.c:1109
static void apply_handle_rollback_prepared(StringInfo s)
Definition: worker.c:1229
static void run_apply_worker()
Definition: worker.c:4553
void SetupApplyOrSyncWorker(int worker_slot)
Definition: worker.c:4752
static void apply_handle_stream_stop(StringInfo s)
Definition: worker.c:1650
static void apply_handle_origin(StringInfo s)
Definition: worker.c:1432
static void send_feedback(XLogRecPtr recvpos, bool force, bool requestReply)
Definition: worker.c:3845
WalReceiverConn * LogRepWorkerWalRcvConn
Definition: worker.c:297
static XLogRecPtr remote_final_lsn
Definition: worker.c:305
static bool MySubscriptionValid
Definition: worker.c:300
void apply_error_callback(void *arg)
Definition: worker.c:5014
void store_flush_position(XLogRecPtr remote_lsn, XLogRecPtr local_lsn)
Definition: worker.c:3539
static MemoryContext LogicalStreamingContext
Definition: worker.c:295
void maybe_reread_subscription(void)
Definition: worker.c:3968
static void apply_handle_commit_internal(LogicalRepCommitData *commit_data)
Definition: worker.c:2265
void InitializeLogRepWorker(void)
Definition: worker.c:4658
static bool in_streamed_transaction
Definition: worker.c:308
struct SubXactInfo SubXactInfo
static void apply_handle_begin_prepare(StringInfo s)
Definition: worker.c:1043
struct FlushPosition FlushPosition
void ApplyWorkerMain(Datum main_arg)
Definition: worker.c:4806
void apply_spooled_messages(FileSet *stream_fileset, TransactionId xid, XLogRecPtr lsn)
Definition: worker.c:2025
static void apply_handle_stream_start(StringInfo s)
Definition: worker.c:1491
static void maybe_start_skipping_changes(XLogRecPtr finish_lsn)
Definition: worker.c:4884
Subscription * MySubscription
Definition: worker.c:299
static void apply_handle_prepare_internal(LogicalRepPreparedTxnData *prepare_data)
Definition: worker.c:1072
static void stream_close_file(void)
Definition: worker.c:4380
static TransactionId stream_xid
Definition: worker.c:310
static void apply_handle_insert(StringInfo s)
Definition: worker.c:2395
static void slot_fill_defaults(LogicalRepRelMapEntry *rel, EState *estate, TupleTableSlot *slot)
Definition: worker.c:741
static void subxact_info_read(Oid subid, TransactionId xid)
Definition: worker.c:4161
static void apply_handle_delete_internal(ApplyExecutionData *edata, ResultRelInfo *relinfo, TupleTableSlot *remoteslot, Oid localindexoid)
Definition: worker.c:2854
static void reset_apply_error_context_info(void)
Definition: worker.c:5094
bool TimestampDifferenceExceeds(TimestampTz start_time, TimestampTz stop_time, int msec)
Definition: timestamp.c:1781
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1645
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1609
void pgstat_report_activity(BackendState state, const char *cmd_str)
@ STATE_IDLE
@ STATE_IDLEINTRANSACTION
@ STATE_RUNNING
void BackgroundWorkerUnblockSignals(void)
Definition: bgworker.c:926
void BackgroundWorkerInitializeConnectionByOid(Oid dboid, Oid useroid, uint32 flags)
Definition: bgworker.c:886
Bitmapset * bms_make_singleton(int x)
Definition: bitmapset.c:216
Bitmapset * bms_add_member(Bitmapset *a, int x)
Definition: bitmapset.c:815
static Datum values[MAXATTR]
Definition: bootstrap.c:151
BufFile * BufFileOpenFileSet(FileSet *fileset, const char *name, int mode, bool missing_ok)
Definition: buffile.c:291
void BufFileReadExact(BufFile *file, void *ptr, size_t size)
Definition: buffile.c:654
void BufFileTell(BufFile *file, int *fileno, off_t *offset)
Definition: buffile.c:833
void BufFileWrite(BufFile *file, const void *ptr, size_t size)
Definition: buffile.c:676
size_t BufFileReadMaybeEOF(BufFile *file, void *ptr, size_t size, bool eofOK)
Definition: buffile.c:664
void BufFileTruncateFileSet(BufFile *file, int fileno, off_t offset)
Definition: buffile.c:928
BufFile * BufFileCreateFileSet(FileSet *fileset, const char *name)
Definition: buffile.c:267
int BufFileSeek(BufFile *file, int fileno, off_t offset, int whence)
Definition: buffile.c:740
void BufFileClose(BufFile *file)
Definition: buffile.c:412
void BufFileDeleteFileSet(FileSet *fileset, const char *name, bool missing_ok)
Definition: buffile.c:364
#define likely(x)
Definition: c.h:346
int64_t int64
Definition: c.h:499
uint32_t uint32
Definition: c.h:502
uint32 TransactionId
Definition: c.h:623
#define OidIsValid(objectId)
Definition: c.h:746
size_t Size
Definition: c.h:576
void ReportApplyConflict(EState *estate, ResultRelInfo *relinfo, int elevel, ConflictType type, TupleTableSlot *searchslot, TupleTableSlot *remoteslot, List *conflicttuples)
Definition: conflict.c:103
void InitConflictIndexes(ResultRelInfo *relInfo)
Definition: conflict.c:138
bool GetTupleTransactionInfo(TupleTableSlot *localslot, TransactionId *xmin, RepOriginId *localorigin, TimestampTz *localts)
Definition: conflict.c:62
@ CT_DELETE_MISSING
Definition: conflict.h:42
@ CT_UPDATE_ORIGIN_DIFFERS
Definition: conflict.h:30
@ CT_UPDATE_MISSING
Definition: conflict.h:36
@ CT_DELETE_ORIGIN_DIFFERS
Definition: conflict.h:39
int64 TimestampTz
Definition: timestamp.h:39
void load_file(const char *filename, bool restricted)
Definition: dfmgr.c:134
int my_log2(long num)
Definition: dynahash.c:1795
int errmsg_internal(const char *fmt,...)
Definition: elog.c:1158
void EmitErrorReport(void)
Definition: elog.c:1692
int errdetail(const char *fmt,...)
Definition: elog.c:1204
ErrorContextCallback * error_context_stack
Definition: elog.c:95
void FlushErrorState(void)
Definition: elog.c:1872
int errcode(int sqlerrcode)
Definition: elog.c:854
int errmsg(const char *fmt,...)
Definition: elog.c:1071
#define LOG
Definition: elog.h:31
#define PG_RE_THROW()
Definition: elog.h:404
#define errcontext
Definition: elog.h:197
#define PG_TRY(...)
Definition: elog.h:371
#define WARNING
Definition: elog.h:36
#define DEBUG2
Definition: elog.h:29
#define PG_END_TRY(...)
Definition: elog.h:396
#define DEBUG1
Definition: elog.h:30
#define ERROR
Definition: elog.h:39
#define PG_CATCH(...)
Definition: elog.h:381
#define elog(elevel,...)
Definition: elog.h:225
#define ereport(elevel,...)
Definition: elog.h:149
bool equal(const void *a, const void *b)
Definition: equalfuncs.c:223
void err(int eval, const char *fmt,...)
Definition: err.c:43
ExprState * ExecInitExpr(Expr *node, PlanState *parent)
Definition: execExpr.c:143
void ExecCloseIndices(ResultRelInfo *resultRelInfo)
Definition: execIndexing.c:238
void ExecOpenIndices(ResultRelInfo *resultRelInfo, bool speculative)
Definition: execIndexing.c:160
bool ExecPartitionCheck(ResultRelInfo *resultRelInfo, TupleTableSlot *slot, EState *estate, bool emitError)
Definition: execMain.c:1932
void EvalPlanQualInit(EPQState *epqstate, EState *parentestate, Plan *subplan, List *auxrowmarks, int epqParam, List *resultRelations)
Definition: execMain.c:2794
void InitResultRelInfo(ResultRelInfo *resultRelInfo, Relation resultRelationDesc, Index resultRelationIndex, ResultRelInfo *partition_root_rri, int instrument_options)
Definition: execMain.c:1329
void EvalPlanQualEnd(EPQState *epqstate)
Definition: execMain.c:3249
PartitionTupleRouting * ExecSetupPartitionTupleRouting(EState *estate, Relation rel)
ResultRelInfo * ExecFindPartition(ModifyTableState *mtstate, ResultRelInfo *rootResultRelInfo, PartitionTupleRouting *proute, TupleTableSlot *slot, EState *estate)
void ExecCleanupTupleRouting(ModifyTableState *mtstate, PartitionTupleRouting *proute)
bool RelationFindReplTupleSeq(Relation rel, LockTupleMode lockmode, TupleTableSlot *searchslot, TupleTableSlot *outslot)
bool RelationFindReplTupleByIndex(Relation rel, Oid idxoid, LockTupleMode lockmode, TupleTableSlot *searchslot, TupleTableSlot *outslot)
void ExecSimpleRelationDelete(ResultRelInfo *resultRelInfo, EState *estate, EPQState *epqstate, TupleTableSlot *searchslot)
void CheckSubscriptionRelkind(char relkind, const char *nspname, const char *relname)
void ExecSimpleRelationUpdate(ResultRelInfo *resultRelInfo, EState *estate, EPQState *epqstate, TupleTableSlot *searchslot, TupleTableSlot *slot)
void ExecSimpleRelationInsert(ResultRelInfo *resultRelInfo, EState *estate, TupleTableSlot *slot)
void ExecResetTupleTable(List *tupleTable, bool shouldFree)
Definition: execTuples.c:1380
const TupleTableSlotOps TTSOpsVirtual
Definition: execTuples.c:84
TupleTableSlot * ExecStoreVirtualTuple(TupleTableSlot *slot)
Definition: execTuples.c:1741
TupleTableSlot * ExecInitExtraTupleSlot(EState *estate, TupleDesc tupledesc, const TupleTableSlotOps *tts_ops)
Definition: execTuples.c:2020
TupleConversionMap * ExecGetRootToChildMap(ResultRelInfo *resultRelInfo, EState *estate)
Definition: execUtils.c:1327
void ExecInitRangeTable(EState *estate, List *rangeTable, List *permInfos, Bitmapset *unpruned_relids)
Definition: execUtils.c:774
void FreeExecutorState(EState *estate)
Definition: execUtils.c:193
EState * CreateExecutorState(void)
Definition: execUtils.c:88
#define GetPerTupleExprContext(estate)
Definition: executor.h:678
#define GetPerTupleMemoryContext(estate)
Definition: executor.h:683
#define EvalPlanQualSetSlot(epqstate, slot)
Definition: executor.h:287
static Datum ExecEvalExpr(ExprState *state, ExprContext *econtext, bool *isNull)
Definition: executor.h:415
void FileSetInit(FileSet *fileset)
Definition: fileset.c:52
Datum OidReceiveFunctionCall(Oid functionId, StringInfo buf, Oid typioparam, int32 typmod)
Definition: fmgr.c:1772
Datum OidInputFunctionCall(Oid functionId, char *str, Oid typioparam, int32 typmod)
Definition: fmgr.c:1754
struct Latch * MyLatch
Definition: globals.c:64
void ProcessConfigFile(GucContext context)
Definition: guc-file.l:120
void SetConfigOption(const char *name, const char *value, GucContext context, GucSource source)
Definition: guc.c:4332
@ PGC_S_OVERRIDE
Definition: guc.h:123
@ PGC_SUSET
Definition: guc.h:78
@ PGC_SIGHUP
Definition: guc.h:75
@ PGC_BACKEND
Definition: guc.h:77
Assert(PointerIsAligned(start, uint64))
HeapTuple heap_modify_tuple(HeapTuple tuple, TupleDesc tupleDesc, const Datum *replValues, const bool *replIsnull, const bool *doReplace)
Definition: heaptuple.c:1210
void heap_freetuple(HeapTuple htup)
Definition: heaptuple.c:1435
#define HeapTupleIsValid(tuple)
Definition: htup.h:78
static void * GETSTRUCT(const HeapTupleData *tuple)
Definition: htup_details.h:728
static void dlist_delete(dlist_node *node)
Definition: ilist.h:405
#define dlist_tail_element(type, membername, lhead)
Definition: ilist.h:612
#define dlist_foreach_modify(iter, lhead)
Definition: ilist.h:640
static bool dlist_is_empty(const dlist_head *head)
Definition: ilist.h:336
static void dlist_push_tail(dlist_head *head, dlist_node *node)
Definition: ilist.h:364
#define DLIST_STATIC_INIT(name)
Definition: ilist.h:281
#define dlist_container(type, membername, ptr)
Definition: ilist.h:593
void index_close(Relation relation, LOCKMODE lockmode)
Definition: indexam.c:177
Relation index_open(Oid relationId, LOCKMODE lockmode)
Definition: indexam.c:133
void CatalogTupleUpdate(Relation heapRel, ItemPointer otid, HeapTuple tup)
Definition: indexing.c:313
#define write(a, b, c)
Definition: win32.h:14
volatile sig_atomic_t ConfigReloadPending
Definition: interrupt.c:27
void SignalHandlerForConfigReload(SIGNAL_ARGS)
Definition: interrupt.c:65
void AcceptInvalidationMessages(void)
Definition: inval.c:930
void CacheRegisterSyscacheCallback(int cacheid, SyscacheCallbackFunction func, Datum arg)
Definition: inval.c:1812
void before_shmem_exit(pg_on_exit_callback function, Datum arg)
Definition: ipc.c:337
void proc_exit(int code)
Definition: ipc.c:104
int i
Definition: isn.c:77
int WaitLatchOrSocket(Latch *latch, int wakeEvents, pgsocket sock, long timeout, uint32 wait_event_info)
Definition: latch.c:221
void ResetLatch(Latch *latch)
Definition: latch.c:372
List * logicalrep_workers_find(Oid subid, bool only_running, bool acquire_lock)
Definition: launcher.c:266
void logicalrep_worker_wakeup_ptr(LogicalRepWorker *worker)
Definition: launcher.c:693
void logicalrep_worker_attach(int slot)
Definition: launcher.c:704
void logicalrep_worker_wakeup(Oid subid, Oid relid)
Definition: launcher.c:673
LogicalRepWorker * MyLogicalRepWorker
Definition: launcher.c:54
void ApplyLauncherForgetWorkerStartTime(Oid subid)
Definition: launcher.c:1072
List * lappend(List *list, void *datum)
Definition: list.c:339
List * lappend_oid(List *list, Oid datum)
Definition: list.c:375
List * list_append_unique_oid(List *list, Oid datum)
Definition: list.c:1380
bool list_member_oid(const List *list, Oid datum)
Definition: list.c:722
void LockSharedObject(Oid classid, Oid objid, uint16 objsubid, LOCKMODE lockmode)
Definition: lmgr.c:1082
int LOCKMODE
Definition: lockdefs.h:26
#define NoLock
Definition: lockdefs.h:34
#define AccessExclusiveLock
Definition: lockdefs.h:43
#define AccessShareLock
Definition: lockdefs.h:36
#define RowExclusiveLock
Definition: lockdefs.h:38
@ LockTupleExclusive
Definition: lockoptions.h:58
#define LOGICALREP_PROTO_STREAM_PARALLEL_VERSION_NUM
Definition: logicalproto.h:44
#define LOGICALREP_PROTO_STREAM_VERSION_NUM
Definition: logicalproto.h:42
#define LOGICALREP_PROTO_TWOPHASE_VERSION_NUM
Definition: logicalproto.h:43
#define LOGICALREP_COLUMN_UNCHANGED
Definition: logicalproto.h:97
LogicalRepMsgType
Definition: logicalproto.h:58
@ LOGICAL_REP_MSG_INSERT
Definition: logicalproto.h:62
@ LOGICAL_REP_MSG_TRUNCATE
Definition: logicalproto.h:65
@ LOGICAL_REP_MSG_STREAM_STOP
Definition: logicalproto.h:74
@ LOGICAL_REP_MSG_BEGIN
Definition: logicalproto.h:59
@ LOGICAL_REP_MSG_STREAM_PREPARE
Definition: logicalproto.h:77
@ LOGICAL_REP_MSG_STREAM_ABORT
Definition: logicalproto.h:76
@ LOGICAL_REP_MSG_BEGIN_PREPARE
Definition: logicalproto.h:69
@ LOGICAL_REP_MSG_STREAM_START
Definition: logicalproto.h:73
@ LOGICAL_REP_MSG_COMMIT
Definition: logicalproto.h:60
@ LOGICAL_REP_MSG_PREPARE
Definition: logicalproto.h:70
@ LOGICAL_REP_MSG_RELATION
Definition: logicalproto.h:66
@ LOGICAL_REP_MSG_MESSAGE
Definition: logicalproto.h:68
@ LOGICAL_REP_MSG_ROLLBACK_PREPARED
Definition: logicalproto.h:72
@ LOGICAL_REP_MSG_COMMIT_PREPARED
Definition: logicalproto.h:71
@ LOGICAL_REP_MSG_TYPE
Definition: logicalproto.h:67
@ LOGICAL_REP_MSG_DELETE
Definition: logicalproto.h:64
@ LOGICAL_REP_MSG_STREAM_COMMIT
Definition: logicalproto.h:75
@ LOGICAL_REP_MSG_ORIGIN
Definition: logicalproto.h:61
@ LOGICAL_REP_MSG_UPDATE
Definition: logicalproto.h:63
uint32 LogicalRepRelId
Definition: logicalproto.h:101
#define LOGICALREP_PROTO_VERSION_NUM
Definition: logicalproto.h:41
#define LOGICALREP_COLUMN_BINARY
Definition: logicalproto.h:99
#define LOGICALREP_COLUMN_TEXT
Definition: logicalproto.h:98
char * get_rel_name(Oid relid)
Definition: lsyscache.c:2068
void getTypeInputInfo(Oid type, Oid *typInput, Oid *typIOParam)
Definition: lsyscache.c:3014
char * get_namespace_name(Oid nspid)
Definition: lsyscache.c:3506
void getTypeBinaryInputInfo(Oid type, Oid *typReceive, Oid *typIOParam)
Definition: lsyscache.c:3080
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1182
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1902
@ LW_SHARED
Definition: lwlock.h:115
char * MemoryContextStrdup(MemoryContext context, const char *string)
Definition: mcxt.c:2314
void MemoryContextReset(MemoryContext context)
Definition: mcxt.c:414
MemoryContext TopTransactionContext
Definition: mcxt.c:170
char * pstrdup(const char *in)
Definition: mcxt.c:2327
void * repalloc(void *pointer, Size size)
Definition: mcxt.c:2172
void pfree(void *pointer)
Definition: mcxt.c:2152
void * palloc0(Size size)
Definition: mcxt.c:1975
MemoryContext TopMemoryContext
Definition: mcxt.c:165
void * palloc(Size size)
Definition: mcxt.c:1945
#define AllocSetContextCreate
Definition: memutils.h:149
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:180
#define RESUME_INTERRUPTS()
Definition: miscadmin.h:136
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:123
#define HOLD_INTERRUPTS()
Definition: miscadmin.h:134
Oid GetUserId(void)
Definition: miscinit.c:520
char * GetUserNameFromId(Oid roleid, bool noerr)
Definition: miscinit.c:1039
CmdType
Definition: nodes.h:269
@ CMD_INSERT
Definition: nodes.h:273
@ CMD_DELETE
Definition: nodes.h:274
@ CMD_UPDATE
Definition: nodes.h:272
#define makeNode(_type_)
Definition: nodes.h:161
ObjectType get_relkind_objtype(char relkind)
TimestampTz replorigin_session_origin_timestamp
Definition: origin.c:165
RepOriginId replorigin_by_name(const char *roname, bool missing_ok)
Definition: origin.c:226
RepOriginId replorigin_create(const char *roname)
Definition: origin.c:257
RepOriginId replorigin_session_origin
Definition: origin.c:163
void replorigin_session_setup(RepOriginId node, int acquired_by)
Definition: origin.c:1097
XLogRecPtr replorigin_session_get_progress(bool flush)
Definition: origin.c:1237
XLogRecPtr replorigin_session_origin_lsn
Definition: origin.c:164
#define InvalidRepOriginId
Definition: origin.h:33
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:124
RTEPermissionInfo * addRTEPermissionInfo(List **rteperminfos, RangeTblEntry *rte)
#define ACL_DELETE
Definition: parsenodes.h:79
uint64 AclMode
Definition: parsenodes.h:74
#define ACL_INSERT
Definition: parsenodes.h:76
#define ACL_UPDATE
Definition: parsenodes.h:78
@ RTE_RELATION
Definition: parsenodes.h:1026
@ DROP_RESTRICT
Definition: parsenodes.h:2390
#define ACL_SELECT
Definition: parsenodes.h:77
#define ACL_TRUNCATE
Definition: parsenodes.h:80
int16 attnum
Definition: pg_attribute.h:74
FormData_pg_attribute * Form_pg_attribute
Definition: pg_attribute.h:202
void * arg
static PgChecksumMode mode
Definition: pg_checksums.c:55
#define NAMEDATALEN
#define MAXPGPATH
const void size_t len
static int server_version
Definition: pg_dumpall.c:113
List * find_all_inheritors(Oid parentrelId, LOCKMODE lockmode, List **numparents)
Definition: pg_inherits.c:255
#define lfirst(lc)
Definition: pg_list.h:172
#define NIL
Definition: pg_list.h:68
#define list_make1(x1)
Definition: pg_list.h:212
static void * list_nth(const List *list, int n)
Definition: pg_list.h:299
#define lfirst_oid(lc)
Definition: pg_list.h:174
static Datum LSNGetDatum(XLogRecPtr X)
Definition: pg_lsn.h:28
static char ** options
void FreeSubscription(Subscription *sub)
void DisableSubscription(Oid subid)
Subscription * GetSubscription(Oid subid, bool missing_ok)
FormData_pg_subscription * Form_pg_subscription
#define die(msg)
static char * buf
Definition: pg_test_fsync.c:72
long pgstat_report_stat(bool force)
Definition: pgstat.c:691
void pgstat_report_subscription_error(Oid subid, bool is_apply_error)
int64 timestamp
Expr * expression_planner(Expr *expr)
Definition: planner.c:6688
#define pqsignal
Definition: port.h:531
int pgsocket
Definition: port.h:29
#define snprintf
Definition: port.h:239
#define PGINVALID_SOCKET
Definition: port.h:31
uintptr_t Datum
Definition: postgres.h:69
static Datum ObjectIdGetDatum(Oid X)
Definition: postgres.h:257
static int32 DatumGetInt32(Datum X)
Definition: postgres.h:207
#define InvalidOid
Definition: postgres_ext.h:35
unsigned int Oid
Definition: postgres_ext.h:30
unsigned int pq_getmsgint(StringInfo msg, int b)
Definition: pqformat.c:415
int pq_getmsgbyte(StringInfo msg)
Definition: pqformat.c:399
int64 pq_getmsgint64(StringInfo msg)
Definition: pqformat.c:453
static void pq_sendbyte(StringInfo buf, uint8 byt)
Definition: pqformat.h:160
static void pq_sendint64(StringInfo buf, uint64 i)
Definition: pqformat.h:152
char * c
static int fd(const char *x, int i)
Definition: preproc-init.c:105
char * s2
void logicalrep_read_commit(StringInfo in, LogicalRepCommitData *commit_data)
Definition: proto.c:98
LogicalRepRelId logicalrep_read_delete(StringInfo in, LogicalRepTupleData *oldtup)
Definition: proto.c:561
void logicalrep_read_rollback_prepared(StringInfo in, LogicalRepRollbackPreparedTxnData *rollback_data)
Definition: proto.c:325
void logicalrep_read_begin_prepare(StringInfo in, LogicalRepPreparedTxnData *begin_data)
Definition: proto.c:134
void logicalrep_read_typ(StringInfo in, LogicalRepTyp *ltyp)
Definition: proto.c:754
LogicalRepRelId logicalrep_read_update(StringInfo in, bool *has_oldtuple, LogicalRepTupleData *oldtup, LogicalRepTupleData *newtup)
Definition: proto.c:487
List * logicalrep_read_truncate(StringInfo in, bool *cascade, bool *restart_seqs)
Definition: proto.c:615
void logicalrep_read_stream_abort(StringInfo in, LogicalRepStreamAbortData *abort_data, bool read_abort_info)
Definition: proto.c:1184
void logicalrep_read_begin(StringInfo in, LogicalRepBeginData *begin_data)
Definition: proto.c:63
void logicalrep_read_commit_prepared(StringInfo in, LogicalRepCommitPreparedTxnData *prepare_data)
Definition: proto.c:267
LogicalRepRelation * logicalrep_read_rel(StringInfo in)
Definition: proto.c:698
const char * logicalrep_message_type(LogicalRepMsgType action)
Definition: proto.c:1209
void logicalrep_read_stream_prepare(StringInfo in, LogicalRepPreparedTxnData *prepare_data)
Definition: proto.c:365
TransactionId logicalrep_read_stream_commit(StringInfo in, LogicalRepCommitData *commit_data)
Definition: proto.c:1129
LogicalRepRelId logicalrep_read_insert(StringInfo in, LogicalRepTupleData *newtup)
Definition: proto.c:428
void logicalrep_read_prepare(StringInfo in, LogicalRepPreparedTxnData *prepare_data)
Definition: proto.c:228
TransactionId logicalrep_read_stream_start(StringInfo in, bool *first_segment)
Definition: proto.c:1079
static color newsub(struct colormap *cm, color co)
Definition: regc_color.c:389
#define RelationGetRelid(relation)
Definition: rel.h:516
#define RelationIsLogicallyLogged(relation)
Definition: rel.h:712
#define RelationGetDescr(relation)
Definition: rel.h:542
#define RelationGetRelationName(relation)
Definition: rel.h:550
#define RELATION_IS_OTHER_TEMP(relation)
Definition: rel.h:669
#define RelationGetNamespace(relation)
Definition: rel.h:557
List * RelationGetIndexList(Relation relation)
Definition: relcache.c:4833
ResourceOwner TopTransactionResourceOwner
Definition: resowner.c:175
ResourceOwner CurrentResourceOwner
Definition: resowner.c:173
Node * build_column_default(Relation rel, int attrno)
int check_enable_rls(Oid relid, Oid checkAsUser, bool noError)
Definition: rls.c:52
@ RLS_ENABLED
Definition: rls.h:45
Snapshot GetTransactionSnapshot(void)
Definition: snapmgr.c:271
void PushActiveSnapshot(Snapshot snapshot)
Definition: snapmgr.c:669
void PopActiveSnapshot(void)
Definition: snapmgr.c:762
void logicalrep_partmap_reset_relmap(LogicalRepRelation *remoterel)
Definition: relation.c:571
LogicalRepRelMapEntry * logicalrep_partition_open(LogicalRepRelMapEntry *root, Relation partrel, AttrMap *map)
Definition: relation.c:633
bool IsIndexUsableForReplicaIdentityFull(Relation idxrel, AttrMap *attrmap)
Definition: relation.c:821
Oid GetRelationIdentityOrPK(Relation rel)
Definition: relation.c:891
void logicalrep_relmap_update(LogicalRepRelation *remoterel)
Definition: relation.c:164
void logicalrep_rel_close(LogicalRepRelMapEntry *rel, LOCKMODE lockmode)
Definition: relation.c:504
LogicalRepRelMapEntry * logicalrep_rel_open(LogicalRepRelId remoteid, LOCKMODE lockmode)
Definition: relation.c:349
StringInfo makeStringInfo(void)
Definition: stringinfo.c:72
void resetStringInfo(StringInfo str)
Definition: stringinfo.c:126
static void initReadOnlyStringInfo(StringInfo str, char *data, int len)
Definition: stringinfo.h:157
TransactionId remote_xid
Definition: worker.c:227
LogicalRepMsgType command
Definition: worker.c:222
XLogRecPtr finish_lsn
Definition: worker.c:228
LogicalRepRelMapEntry * rel
Definition: worker.c:223
ResultRelInfo * targetRelInfo
Definition: worker.c:212
EState * estate
Definition: worker.c:209
PartitionTupleRouting * proute
Definition: worker.c:216
ModifyTableState * mtstate
Definition: worker.c:215
LogicalRepRelMapEntry * targetRel
Definition: worker.c:211
uint32 nsubxacts
Definition: worker.c:352
uint32 nsubxacts_max
Definition: worker.c:353
SubXactInfo * subxacts
Definition: worker.c:355
TransactionId subxact_last
Definition: worker.c:354
Definition: attmap.h:35
int maplen
Definition: attmap.h:37
AttrNumber * attnums
Definition: attmap.h:36
TimestampTz ts
Definition: conflict.h:68
RepOriginId origin
Definition: conflict.h:67
TransactionId xmin
Definition: conflict.h:65
TupleTableSlot * slot
Definition: conflict.h:61
List * es_rteperminfos
Definition: execnodes.h:665
List * es_tupleTable
Definition: execnodes.h:710
List * es_opened_result_relations
Definition: execnodes.h:686
CommandId es_output_cid
Definition: execnodes.h:680
struct ErrorContextCallback * previous
Definition: elog.h:296
void(* callback)(void *arg)
Definition: elog.h:297
dlist_node node
Definition: worker.c:200
XLogRecPtr remote_end
Definition: worker.c:202
XLogRecPtr local_end
Definition: worker.c:201
ItemPointerData t_self
Definition: htup.h:65
Definition: pg_list.h:54
XLogRecPtr final_lsn
Definition: logicalproto.h:129
TransactionId xid
Definition: logicalproto.h:131
TimestampTz committime
Definition: logicalproto.h:138
LogicalRepRelation remoterel
StringInfoData * colvalues
Definition: logicalproto.h:87
TimestampTz last_recv_time
LogicalRepWorkerType type
TimestampTz reply_time
FileSet * stream_fileset
XLogRecPtr reply_lsn
XLogRecPtr last_lsn
TimestampTz last_send_time
CmdType operation
Definition: execnodes.h:1398
ResultRelInfo * resultRelInfo
Definition: execnodes.h:1402
PlanState ps
Definition: execnodes.h:1397
ParallelApplyWorkerShared * shared
pg_atomic_uint32 pending_stream_count
Plan * plan
Definition: execnodes.h:1159
EState * state
Definition: execnodes.h:1161
Bitmapset * updatedCols
Definition: parsenodes.h:1309
RTEKind rtekind
Definition: parsenodes.h:1061
Form_pg_class rd_rel
Definition: rel.h:111
TupleTableSlot * ri_PartitionTupleSlot
Definition: execnodes.h:616
List * ri_onConflictArbiterIndexes
Definition: execnodes.h:575
Relation ri_RelationDesc
Definition: execnodes.h:475
RelationPtr ri_IndexRelationDescs
Definition: execnodes.h:481
off_t offset
Definition: worker.c:346
TransactionId xid
Definition: worker.c:344
int fileno
Definition: worker.c:345
XLogRecPtr skiplsn
AttrMap * attrMap
Definition: tupconvert.h:28
TupleDesc tts_tupleDescriptor
Definition: tuptable.h:123
bool * tts_isnull
Definition: tuptable.h:127
Datum * tts_values
Definition: tuptable.h:125
dlist_node * cur
Definition: ilist.h:200
#define FirstLowInvalidHeapAttributeNumber
Definition: sysattr.h:27
#define SearchSysCacheCopy1(cacheId, key1)
Definition: syscache.h:91
void table_close(Relation relation, LOCKMODE lockmode)
Definition: table.c:126
Relation table_open(Oid relationId, LOCKMODE lockmode)
Definition: table.c:40
TupleTableSlot * table_slot_create(Relation relation, List **reglist)
Definition: tableam.c:92
void ExecuteTruncateGuts(List *explicit_rels, List *relids, List *relids_logged, DropBehavior behavior, bool restart_seqs, bool run_as_table_owner)
Definition: tablecmds.c:1975
bool AllTablesyncsReady(void)
Definition: tablesync.c:1738
void invalidate_syncing_table_states(Datum arg, int cacheid, uint32 hashvalue)
Definition: tablesync.c:280
void process_syncing_tables(XLogRecPtr current_lsn)
Definition: tablesync.c:666
void UpdateTwoPhaseState(Oid suboid, char new_state)
Definition: tablesync.c:1763
#define InvalidTransactionId
Definition: transam.h:31
#define TransactionIdIsValid(xid)
Definition: transam.h:41
void AfterTriggerEndQuery(EState *estate)
Definition: trigger.c:5088
void AfterTriggerBeginQuery(void)
Definition: trigger.c:5053
TupleConversionMap * convert_tuples_by_name(TupleDesc indesc, TupleDesc outdesc)
Definition: tupconvert.c:102
TupleTableSlot * execute_attr_map_slot(AttrMap *attrMap, TupleTableSlot *in_slot, TupleTableSlot *out_slot)
Definition: tupconvert.c:192
static FormData_pg_attribute * TupleDescAttr(TupleDesc tupdesc, int i)
Definition: tupdesc.h:160
static TupleTableSlot * ExecClearTuple(TupleTableSlot *slot)
Definition: tuptable.h:458
static void slot_getallattrs(TupleTableSlot *slot)
Definition: tuptable.h:372
static TupleTableSlot * ExecCopySlot(TupleTableSlot *dstslot, TupleTableSlot *srcslot)
Definition: tuptable.h:525
void TwoPhaseTransactionGid(Oid subid, TransactionId xid, char *gid_res, int szgid)
Definition: twophase.c:2682
bool LookupGXact(const char *gid, XLogRecPtr prepare_end_lsn, TimestampTz origin_prepare_timestamp)
Definition: twophase.c:2623
void FinishPreparedTransaction(const char *gid, bool isCommit)
Definition: twophase.c:1487
void SwitchToUntrustedUser(Oid userid, UserContext *context)
Definition: usercontext.c:33
void RestoreUserContext(UserContext *context)
Definition: usercontext.c:87
#define TimestampTzPlusMilliseconds(tz, ms)
Definition: timestamp.h:85
#define WL_SOCKET_READABLE
Definition: waiteventset.h:35
#define WL_TIMEOUT
Definition: waiteventset.h:37
#define WL_EXIT_ON_PM_DEATH
Definition: waiteventset.h:39
#define WL_LATCH_SET
Definition: waiteventset.h:34
static StringInfoData reply_message
Definition: walreceiver.c:132
int wal_receiver_status_interval
Definition: walreceiver.c:88
int wal_receiver_timeout
Definition: walreceiver.c:89
#define walrcv_startstreaming(conn, options)
Definition: walreceiver.h:451
#define walrcv_connect(conninfo, replication, logical, must_use_password, appname, err)
Definition: walreceiver.h:435
#define walrcv_send(conn, buffer, nbytes)
Definition: walreceiver.h:457
#define walrcv_server_version(conn)
Definition: walreceiver.h:447
#define walrcv_endstreaming(conn, next_tli)
Definition: walreceiver.h:453
#define walrcv_identify_system(conn, primary_tli)
Definition: walreceiver.h:443
#define walrcv_receive(conn, buffer, wait_fd)
Definition: walreceiver.h:455
int WalWriterDelay
Definition: walwriter.c:70
#define SIGHUP
Definition: win32_port.h:158
@ PARALLEL_TRANS_STARTED
@ PARALLEL_TRANS_FINISHED
static bool am_parallel_apply_worker(void)
@ WORKERTYPE_TABLESYNC
@ WORKERTYPE_UNKNOWN
@ WORKERTYPE_PARALLEL_APPLY
@ WORKERTYPE_APPLY
@ FS_SERIALIZE_DONE
static bool am_tablesync_worker(void)
static bool am_leader_apply_worker(void)
bool IsTransactionOrTransactionBlock(void)
Definition: xact.c:4989
bool PrepareTransactionBlock(const char *gid)
Definition: xact.c:3992
bool IsTransactionState(void)
Definition: xact.c:387
void CommandCounterIncrement(void)
Definition: xact.c:1100
void StartTransactionCommand(void)
Definition: xact.c:3059
void SetCurrentStatementStartTimestamp(void)
Definition: xact.c:914
bool IsTransactionBlock(void)
Definition: xact.c:4971
void BeginTransactionBlock(void)
Definition: xact.c:3924
void CommitTransactionCommand(void)
Definition: xact.c:3157
bool EndTransactionBlock(bool chain)
Definition: xact.c:4044
void AbortOutOfAnyTransaction(void)
Definition: xact.c:4862
CommandId GetCurrentCommandId(bool used)
Definition: xact.c:829
#define GIDSIZE
Definition: xact.h:31
XLogRecPtr GetFlushRecPtr(TimeLineID *insertTLI)
Definition: xlog.c:6687
XLogRecPtr XactLastCommitEnd
Definition: xlog.c:255
#define LSN_FORMAT_ARGS(lsn)
Definition: xlogdefs.h:43
#define XLogRecPtrIsInvalid(r)
Definition: xlogdefs.h:29
uint16 RepOriginId
Definition: xlogdefs.h:65
uint64 XLogRecPtr
Definition: xlogdefs.h:21
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
uint32 TimeLineID
Definition: xlogdefs.h:59