Skip to content

Commit 2255153

Browse files
vigneshwaran-cCommitfest Bot
authored and
Commitfest Bot
committed
Enhance sequence synchronization during subscription management
This patch introduces sequence synchronization: Sequences have 2 states: - INIT (needs synchronizing) - READY (is already synchronized) A new sequencesync worker is launched as needed to synchronize sequences. It does the following: a) Retrieves remote values of sequences with pg_sequence_state() INIT. b) Logs a warning if the sequence parameters differ between the publisher and subscriber. c) Sets the local sequence values accordingly. d) Updates the local sequence state to READY. e) Repeats until all done; Commits synchronized sequences in batches of 100 Sequence synchronization occurs in 3 places: 1) CREATE SUBSCRIPTION - (PG18 command syntax is unchanged) - The subscriber retrieves sequences associated with publications. - Published sequences are added to pg_subscription_rel with INIT state. - Initiate the sequencesync worker (see above) to synchronize all sequences. 2) ALTER SUBSCRIPTION ... REFRESH PUBLICATION - (PG18 command syntax is unchanged) - Drop published sequences are removed from pg_subscription_rel. - Newly published sequences are added to pg_subscription_rel with INIT state. - Initiate the sequencesync worker (see above) to synchronize only newly added sequences. 3) ALTER SUBSCRIPTION ... REFRESH PUBLICATION SEQUENCES - The patch introduces this new command to refresh all sequences - Drop published sequences are removed from pg_subscription_rel. - Newly published sequences are added to pg_subscription_rel - All sequences in pg_subscription_rel are reset to INIT state. - Initiate the sequencesync worker (see above) to synchronize all sequences.
1 parent a6b0f41 commit 2255153

33 files changed

+1540
-186
lines changed

src/backend/catalog/pg_publication.c

+46
Original file line numberDiff line numberDiff line change
@@ -1370,3 +1370,49 @@ pg_get_publication_tables(PG_FUNCTION_ARGS)
13701370

13711371
SRF_RETURN_DONE(funcctx);
13721372
}
1373+
1374+
/*
1375+
* Returns Oids of sequences in a publication.
1376+
*/
1377+
Datum
1378+
pg_get_publication_sequences(PG_FUNCTION_ARGS)
1379+
{
1380+
FuncCallContext *funcctx;
1381+
List *sequences = NIL;
1382+
1383+
/* stuff done only on the first call of the function */
1384+
if (SRF_IS_FIRSTCALL())
1385+
{
1386+
char *pubname = text_to_cstring(PG_GETARG_TEXT_PP(0));
1387+
Publication *publication;
1388+
MemoryContext oldcontext;
1389+
1390+
/* create a function context for cross-call persistence */
1391+
funcctx = SRF_FIRSTCALL_INIT();
1392+
1393+
/* switch to memory context appropriate for multiple function calls */
1394+
oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
1395+
1396+
publication = GetPublicationByName(pubname, false);
1397+
1398+
if (publication->allsequences)
1399+
sequences = GetAllSequencesPublicationRelations();
1400+
1401+
funcctx->user_fctx = (void *) sequences;
1402+
1403+
MemoryContextSwitchTo(oldcontext);
1404+
}
1405+
1406+
/* stuff done on every call of the function */
1407+
funcctx = SRF_PERCALL_SETUP();
1408+
sequences = (List *) funcctx->user_fctx;
1409+
1410+
if (funcctx->call_cntr < list_length(sequences))
1411+
{
1412+
Oid relid = list_nth_oid(sequences, funcctx->call_cntr);
1413+
1414+
SRF_RETURN_NEXT(funcctx, ObjectIdGetDatum(relid));
1415+
}
1416+
1417+
SRF_RETURN_DONE(funcctx);
1418+
}

src/backend/catalog/pg_subscription.c

+54-9
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
#include "utils/array.h"
2828
#include "utils/builtins.h"
2929
#include "utils/fmgroids.h"
30+
#include "utils/memutils.h"
3031
#include "utils/lsyscache.h"
3132
#include "utils/pg_lsn.h"
3233
#include "utils/rel.h"
@@ -462,7 +463,9 @@ RemoveSubscriptionRel(Oid subid, Oid relid)
462463
* leave tablesync slots or origins in the system when the
463464
* corresponding table is dropped.
464465
*/
465-
if (!OidIsValid(subid) && subrel->srsubstate != SUBREL_STATE_READY)
466+
if (!OidIsValid(subid) &&
467+
get_rel_relkind(subrel->srrelid) != RELKIND_SEQUENCE &&
468+
subrel->srsubstate != SUBREL_STATE_READY)
466469
{
467470
ereport(ERROR,
468471
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
@@ -499,7 +502,8 @@ HasSubscriptionTables(Oid subid)
499502
Relation rel;
500503
ScanKeyData skey[1];
501504
SysScanDesc scan;
502-
bool has_subrels;
505+
HeapTuple tup;
506+
bool has_subrels = false;
503507

504508
rel = table_open(SubscriptionRelRelationId, AccessShareLock);
505509

@@ -511,8 +515,22 @@ HasSubscriptionTables(Oid subid)
511515
scan = systable_beginscan(rel, InvalidOid, false,
512516
NULL, 1, skey);
513517

514-
/* If even a single tuple exists then the subscription has tables. */
515-
has_subrels = HeapTupleIsValid(systable_getnext(scan));
518+
while (HeapTupleIsValid(tup = systable_getnext(scan)))
519+
{
520+
Form_pg_subscription_rel subrel;
521+
522+
subrel = (Form_pg_subscription_rel) GETSTRUCT(tup);
523+
524+
/*
525+
* Skip sequence tuples. If even a single table tuple exists then the
526+
* subscription has tables.
527+
*/
528+
if (get_rel_relkind(subrel->srrelid) != RELKIND_SEQUENCE)
529+
{
530+
has_subrels = true;
531+
break;
532+
}
533+
}
516534

517535
/* Cleanup */
518536
systable_endscan(scan);
@@ -524,12 +542,22 @@ HasSubscriptionTables(Oid subid)
524542
/*
525543
* Get the relations for the subscription.
526544
*
527-
* If not_ready is true, return only the relations that are not in a ready
528-
* state, otherwise return all the relations of the subscription. The
529-
* returned list is palloc'ed in the current memory context.
545+
* get_tables: get relations for tables of the subscription.
546+
*
547+
* get_sequences: get relations for sequences of the subscription.
548+
*
549+
* all_states:
550+
* If getting tables, if all_states is true get all tables, otherwise
551+
* only get tables that have not reached READY state.
552+
* If getting sequences, if all_states is true get all sequences,
553+
* otherwise only get sequences that have not reached READY state (i.e. are
554+
* still in INIT state).
555+
*
556+
* The returned list is palloc'ed in the current memory context.
530557
*/
531558
List *
532-
GetSubscriptionRelations(Oid subid, bool not_ready)
559+
GetSubscriptionRelations(Oid subid, bool get_tables, bool get_sequences,
560+
bool all_states)
533561
{
534562
List *res = NIL;
535563
Relation rel;
@@ -538,14 +566,17 @@ GetSubscriptionRelations(Oid subid, bool not_ready)
538566
ScanKeyData skey[2];
539567
SysScanDesc scan;
540568

569+
/* One or both of 'get_tables' and 'get_sequences' must be true. */
570+
Assert(get_tables || get_sequences);
571+
541572
rel = table_open(SubscriptionRelRelationId, AccessShareLock);
542573

543574
ScanKeyInit(&skey[nkeys++],
544575
Anum_pg_subscription_rel_srsubid,
545576
BTEqualStrategyNumber, F_OIDEQ,
546577
ObjectIdGetDatum(subid));
547578

548-
if (not_ready)
579+
if (!all_states)
549580
ScanKeyInit(&skey[nkeys++],
550581
Anum_pg_subscription_rel_srsubstate,
551582
BTEqualStrategyNumber, F_CHARNE,
@@ -560,9 +591,23 @@ GetSubscriptionRelations(Oid subid, bool not_ready)
560591
SubscriptionRelState *relstate;
561592
Datum d;
562593
bool isnull;
594+
bool issequence;
595+
bool istable;
563596

564597
subrel = (Form_pg_subscription_rel) GETSTRUCT(tup);
565598

599+
/* Relation is either a sequence or a table */
600+
issequence = get_rel_relkind(subrel->srrelid) == RELKIND_SEQUENCE;
601+
istable = !issequence;
602+
603+
/* Skip sequences if they were not requested */
604+
if (!get_sequences && issequence)
605+
continue;
606+
607+
/* Skip tables if they were not requested */
608+
if (!get_tables && istable)
609+
continue;
610+
566611
relstate = (SubscriptionRelState *) palloc(sizeof(SubscriptionRelState));
567612
relstate->relid = subrel->srrelid;
568613
relstate->state = subrel->srsubstate;

src/backend/catalog/system_views.sql

+10
Original file line numberDiff line numberDiff line change
@@ -394,6 +394,16 @@ CREATE VIEW pg_publication_tables AS
394394
pg_class C JOIN pg_namespace N ON (N.oid = C.relnamespace)
395395
WHERE C.oid = GPT.relid;
396396

397+
CREATE VIEW pg_publication_sequences AS
398+
SELECT
399+
P.pubname AS pubname,
400+
N.nspname AS schemaname,
401+
C.relname AS sequencename
402+
FROM pg_publication P,
403+
LATERAL pg_get_publication_sequences(P.pubname) GPS,
404+
pg_class C JOIN pg_namespace N ON (N.oid = C.relnamespace)
405+
WHERE C.oid = GPS.relid;
406+
397407
CREATE VIEW pg_locks AS
398408
SELECT * FROM pg_lock_status() AS L;
399409

src/backend/commands/sequence.c

+27-11
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,6 @@ static void init_params(ParseState *pstate, List *options, bool for_identity,
110110
Form_pg_sequence_data seqdataform,
111111
bool *need_seq_rewrite,
112112
List **owned_by);
113-
static void do_setval(Oid relid, int64 next, bool iscalled);
114113
static void process_owned_by(Relation seqrel, List *owned_by, bool for_identity);
115114

116115

@@ -941,9 +940,12 @@ lastval(PG_FUNCTION_ARGS)
941940
* restore the state of a sequence exactly during data-only restores -
942941
* it is the only way to clear the is_called flag in an existing
943942
* sequence.
943+
*
944+
* log_cnt is currently used only by the sequence syncworker to set the
945+
* log_cnt for sequences while synchronizing values from the publisher.
944946
*/
945-
static void
946-
do_setval(Oid relid, int64 next, bool iscalled)
947+
void
948+
SetSequence(Oid relid, int64 next, bool is_called, int64 log_cnt)
947949
{
948950
SeqTable elm;
949951
Relation seqrel;
@@ -994,7 +996,7 @@ do_setval(Oid relid, int64 next, bool iscalled)
994996
minv, maxv)));
995997

996998
/* Set the currval() state only if iscalled = true */
997-
if (iscalled)
999+
if (is_called)
9981000
{
9991001
elm->last = next; /* last returned number */
10001002
elm->last_valid = true;
@@ -1011,8 +1013,8 @@ do_setval(Oid relid, int64 next, bool iscalled)
10111013
START_CRIT_SECTION();
10121014

10131015
seq->last_value = next; /* last fetched number */
1014-
seq->is_called = iscalled;
1015-
seq->log_cnt = 0;
1016+
seq->is_called = is_called;
1017+
seq->log_cnt = log_cnt;
10161018

10171019
MarkBufferDirty(buf);
10181020

@@ -1044,22 +1046,22 @@ do_setval(Oid relid, int64 next, bool iscalled)
10441046

10451047
/*
10461048
* Implement the 2 arg setval procedure.
1047-
* See do_setval for discussion.
1049+
* See SetSequence for discussion.
10481050
*/
10491051
Datum
10501052
setval_oid(PG_FUNCTION_ARGS)
10511053
{
10521054
Oid relid = PG_GETARG_OID(0);
10531055
int64 next = PG_GETARG_INT64(1);
10541056

1055-
do_setval(relid, next, true);
1057+
SetSequence(relid, next, true, SEQ_LOG_CNT_INVALID);
10561058

10571059
PG_RETURN_INT64(next);
10581060
}
10591061

10601062
/*
10611063
* Implement the 3 arg setval procedure.
1062-
* See do_setval for discussion.
1064+
* See SetSequence for discussion.
10631065
*/
10641066
Datum
10651067
setval3_oid(PG_FUNCTION_ARGS)
@@ -1068,7 +1070,7 @@ setval3_oid(PG_FUNCTION_ARGS)
10681070
int64 next = PG_GETARG_INT64(1);
10691071
bool iscalled = PG_GETARG_BOOL(2);
10701072

1071-
do_setval(relid, next, iscalled);
1073+
SetSequence(relid, next, iscalled, SEQ_LOG_CNT_INVALID);
10721074

10731075
PG_RETURN_INT64(next);
10741076
}
@@ -1889,13 +1891,19 @@ pg_sequence_last_value(PG_FUNCTION_ARGS)
18891891
/*
18901892
* Return the current on-disk state of the sequence.
18911893
*
1894+
* The page LSN will be used in logical replication of sequences to record the
1895+
* LSN of the sequence page in the pg_subscription_rel system catalog. It
1896+
* reflects the LSN of the remote sequence at the time it was synchronized.
1897+
*
18921898
* Note: This is roughly equivalent to selecting the data from the sequence,
18931899
* except that it also returns the page LSN.
18941900
*/
18951901
Datum
18961902
pg_sequence_state(PG_FUNCTION_ARGS)
18971903
{
1898-
Oid seq_relid = PG_GETARG_OID(0);
1904+
char *schema_name = text_to_cstring(PG_GETARG_TEXT_PP(0));
1905+
char *sequence_name = text_to_cstring(PG_GETARG_TEXT_PP(1));
1906+
Oid seq_relid;
18991907
SeqTable elm;
19001908
Relation seqrel;
19011909
Buffer buf;
@@ -1917,6 +1925,14 @@ pg_sequence_state(PG_FUNCTION_ARGS)
19171925
if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
19181926
elog(ERROR, "return type must be a row type");
19191927

1928+
seq_relid = RangeVarGetRelid(makeRangeVar(schema_name, sequence_name, -1),
1929+
NoLock, true);
1930+
if (!OidIsValid(seq_relid))
1931+
ereport(ERROR,
1932+
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1933+
errmsg("logical replication sequence \"%s.%s\" does not exist",
1934+
schema_name, sequence_name));
1935+
19201936
/* open and lock sequence */
19211937
init_sequence(seq_relid, &elm, &seqrel);
19221938

0 commit comments

Comments
 (0)