diff options
Diffstat (limited to 'src/backend/commands/publicationcmds.c')
| -rw-r--r-- | src/backend/commands/publicationcmds.c | 424 |
1 files changed, 357 insertions, 67 deletions
diff --git a/src/backend/commands/publicationcmds.c b/src/backend/commands/publicationcmds.c index 1aad2e769cb..f890d3f0baa 100644 --- a/src/backend/commands/publicationcmds.c +++ b/src/backend/commands/publicationcmds.c @@ -16,6 +16,7 @@ #include "access/genam.h" #include "access/htup_details.h" +#include "access/relation.h" #include "access/table.h" #include "access/xact.h" #include "catalog/catalog.h" @@ -67,15 +68,17 @@ typedef struct rf_context } rf_context; static List *OpenRelIdList(List *relids); -static List *OpenTableList(List *tables); -static void CloseTableList(List *rels); +static List *OpenRelationList(List *rels, char objectType); +static void CloseRelationList(List *rels); static void LockSchemaList(List *schemalist); -static void PublicationAddTables(Oid pubid, List *rels, bool if_not_exists, +static void PublicationAddRelations(Oid pubid, List *rels, bool if_not_exists, AlterPublicationStmt *stmt); -static void PublicationDropTables(Oid pubid, List *rels, bool missing_ok); -static void PublicationAddSchemas(Oid pubid, List *schemas, bool if_not_exists, - AlterPublicationStmt *stmt); -static void PublicationDropSchemas(Oid pubid, List *schemas, bool missing_ok); +static void PublicationDropRelations(Oid pubid, List *rels, bool missing_ok); +static void PublicationAddSchemas(Oid pubid, List *schemas, char objectType, + bool if_not_exists, AlterPublicationStmt *stmt); +static void PublicationDropSchemas(Oid pubid, List *schemas, char objectType, + bool missing_ok); + static void parse_publication_options(ParseState *pstate, @@ -95,6 +98,7 @@ parse_publication_options(ParseState *pstate, pubactions->pubupdate = true; pubactions->pubdelete = true; pubactions->pubtruncate = true; + pubactions->pubsequence = true; *publish_via_partition_root = false; /* Parse options */ @@ -119,6 +123,7 @@ parse_publication_options(ParseState *pstate, pubactions->pubupdate = false; pubactions->pubdelete = false; pubactions->pubtruncate = false; + pubactions->pubsequence = false; *publish_given = true; publish = defGetString(defel); @@ -141,6 +146,8 @@ parse_publication_options(ParseState *pstate, pubactions->pubdelete = true; else if (strcmp(publish_opt, "truncate") == 0) pubactions->pubtruncate = true; + else if (strcmp(publish_opt, "sequence") == 0) + pubactions->pubsequence = true; else ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), @@ -167,7 +174,9 @@ parse_publication_options(ParseState *pstate, */ static void ObjectsInPublicationToOids(List *pubobjspec_list, ParseState *pstate, - List **rels, List **schemas) + List **tables, List **sequences, + List **tables_schemas, List **sequences_schemas, + List **schemas) { ListCell *cell; PublicationObjSpec *pubobj; @@ -185,12 +194,23 @@ ObjectsInPublicationToOids(List *pubobjspec_list, ParseState *pstate, switch (pubobj->pubobjtype) { case PUBLICATIONOBJ_TABLE: - *rels = lappend(*rels, pubobj->pubtable); + *tables = lappend(*tables, pubobj->pubtable); + break; + case PUBLICATIONOBJ_SEQUENCE: + *sequences = lappend(*sequences, pubobj->pubtable); break; case PUBLICATIONOBJ_TABLES_IN_SCHEMA: schemaid = get_namespace_oid(pubobj->name, false); /* Filter out duplicates if user specifies "sch1, sch1" */ + *tables_schemas = list_append_unique_oid(*tables_schemas, schemaid); + *schemas = list_append_unique_oid(*schemas, schemaid); + break; + case PUBLICATIONOBJ_SEQUENCES_IN_SCHEMA: + schemaid = get_namespace_oid(pubobj->name, false); + + /* Filter out duplicates if user specifies "sch1, sch1" */ + *sequences_schemas = list_append_unique_oid(*sequences_schemas, schemaid); *schemas = list_append_unique_oid(*schemas, schemaid); break; case PUBLICATIONOBJ_TABLES_IN_CUR_SCHEMA: @@ -204,6 +224,21 @@ ObjectsInPublicationToOids(List *pubobjspec_list, ParseState *pstate, list_free(search_path); /* Filter out duplicates if user specifies "sch1, sch1" */ + *tables_schemas = list_append_unique_oid(*tables_schemas, schemaid); + *schemas = list_append_unique_oid(*schemas, schemaid); + break; + case PUBLICATIONOBJ_SEQUENCES_IN_CUR_SCHEMA: + search_path = fetch_search_path(false); + if (search_path == NIL) /* nothing valid in search_path? */ + ereport(ERROR, + errcode(ERRCODE_UNDEFINED_SCHEMA), + errmsg("no schema has been selected for CURRENT_SCHEMA")); + + schemaid = linitial_oid(search_path); + list_free(search_path); + + /* Filter out duplicates if user specifies "sch1, sch1" */ + *sequences_schemas = list_append_unique_oid(*sequences_schemas, schemaid); *schemas = list_append_unique_oid(*schemas, schemaid); break; default: @@ -240,6 +275,14 @@ CheckObjSchemaNotAlreadyInPublication(List *rels, List *schemaidlist, errdetail("Table \"%s\" in schema \"%s\" is already part of the publication, adding the same schema is not supported.", RelationGetRelationName(rel), get_namespace_name(relSchemaId))); + else if (checkobjtype == PUBLICATIONOBJ_SEQUENCES_IN_SCHEMA) + ereport(ERROR, + errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("cannot add schema \"%s\" to publication", + get_namespace_name(relSchemaId)), + errdetail("Sequence \"%s\" in schema \"%s\" is already part of the publication, adding the same schema is not supported.", + RelationGetRelationName(rel), + get_namespace_name(relSchemaId))); else if (checkobjtype == PUBLICATIONOBJ_TABLE) ereport(ERROR, errcode(ERRCODE_INVALID_PARAMETER_VALUE), @@ -248,6 +291,14 @@ CheckObjSchemaNotAlreadyInPublication(List *rels, List *schemaidlist, RelationGetRelationName(rel)), errdetail("Table's schema \"%s\" is already part of the publication or part of the specified schema list.", get_namespace_name(relSchemaId))); + else if (checkobjtype == PUBLICATIONOBJ_SEQUENCE) + ereport(ERROR, + errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("cannot add relation \"%s.%s\" to publication", + get_namespace_name(relSchemaId), + RelationGetRelationName(rel)), + errdetail("Sequence's schema \"%s\" is already part of the publication or part of the specified schema list.", + get_namespace_name(relSchemaId))); } } } @@ -615,6 +666,7 @@ TransformPubWhereClauses(List *tables, const char *queryString, ObjectAddress CreatePublication(ParseState *pstate, CreatePublicationStmt *stmt) { + ListCell *lc; Relation rel; ObjectAddress myself; Oid puboid; @@ -626,9 +678,25 @@ CreatePublication(ParseState *pstate, CreatePublicationStmt *stmt) bool publish_via_partition_root_given; bool publish_via_partition_root; AclResult aclresult; - List *relations = NIL; + List *tables = NIL; + List *sequences = NIL; + List *tables_schemaidlist = NIL; + List *sequences_schemaidlist = NIL; List *schemaidlist = NIL; + bool for_all_tables = false; + bool for_all_sequences = false; + + /* Translate the list of object types (represented by strings) to bool flags. */ + foreach (lc, stmt->for_all_objects) + { + char *val = strVal(lfirst(lc)); + if (strcmp(val, "tables") == 0) + for_all_tables = true; + else if (strcmp(val, "sequences") == 0) + for_all_sequences = true; + } + /* must have CREATE privilege on database */ aclresult = pg_database_aclcheck(MyDatabaseId, GetUserId(), ACL_CREATE); if (aclresult != ACLCHECK_OK) @@ -636,7 +704,7 @@ CreatePublication(ParseState *pstate, CreatePublicationStmt *stmt) get_database_name(MyDatabaseId)); /* FOR ALL TABLES requires superuser */ - if (stmt->for_all_tables && !superuser()) + if (for_all_tables && !superuser()) ereport(ERROR, (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), errmsg("must be superuser to create FOR ALL TABLES publication"))); @@ -672,7 +740,9 @@ CreatePublication(ParseState *pstate, CreatePublicationStmt *stmt) Anum_pg_publication_oid); values[Anum_pg_publication_oid - 1] = ObjectIdGetDatum(puboid); values[Anum_pg_publication_puballtables - 1] = - BoolGetDatum(stmt->for_all_tables); + BoolGetDatum(for_all_tables); + values[Anum_pg_publication_puballsequences - 1] = + BoolGetDatum(for_all_sequences); values[Anum_pg_publication_pubinsert - 1] = BoolGetDatum(pubactions.pubinsert); values[Anum_pg_publication_pubupdate - 1] = @@ -681,6 +751,8 @@ CreatePublication(ParseState *pstate, CreatePublicationStmt *stmt) BoolGetDatum(pubactions.pubdelete); values[Anum_pg_publication_pubtruncate - 1] = BoolGetDatum(pubactions.pubtruncate); + values[Anum_pg_publication_pubsequence - 1] = + BoolGetDatum(pubactions.pubsequence); values[Anum_pg_publication_pubviaroot - 1] = BoolGetDatum(publish_via_partition_root); @@ -698,45 +770,88 @@ CreatePublication(ParseState *pstate, CreatePublicationStmt *stmt) CommandCounterIncrement(); /* Associate objects with the publication. */ - if (stmt->for_all_tables) + if (for_all_tables || for_all_sequences) { /* Invalidate relcache so that publication info is rebuilt. */ CacheInvalidateRelcacheAll(); } - else + + /* + * If the publication might have either tables or sequences (directly or + * through a schema), process that. + */ + if (!for_all_tables || !for_all_sequences) { - ObjectsInPublicationToOids(stmt->pubobjects, pstate, &relations, + ObjectsInPublicationToOids(stmt->pubobjects, pstate, + &tables, &sequences, + &tables_schemaidlist, + &sequences_schemaidlist, &schemaidlist); /* FOR ALL TABLES IN SCHEMA requires superuser */ - if (list_length(schemaidlist) > 0 && !superuser()) + if (list_length(tables_schemaidlist) > 0 && !superuser()) ereport(ERROR, errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), errmsg("must be superuser to create FOR ALL TABLES IN SCHEMA publication")); - if (list_length(relations) > 0) + /* FOR ALL SEQUENCES IN SCHEMA requires superuser */ + if (list_length(sequences_schemaidlist) > 0 && !superuser()) + ereport(ERROR, + errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), + errmsg("must be superuser to create FOR ALL SEQUENCES IN SCHEMA publication")); + + /* tables added directly */ + if (list_length(tables) > 0) { List *rels; - rels = OpenTableList(relations); - CheckObjSchemaNotAlreadyInPublication(rels, schemaidlist, + rels = OpenRelationList(tables, PUB_OBJTYPE_TABLE); + CheckObjSchemaNotAlreadyInPublication(rels, tables_schemaidlist, PUBLICATIONOBJ_TABLE); TransformPubWhereClauses(rels, pstate->p_sourcetext, publish_via_partition_root); - PublicationAddTables(puboid, rels, true, NULL); - CloseTableList(rels); + PublicationAddRelations(puboid, rels, true, NULL); + CloseRelationList(rels); + } + + /* sequences added directly */ + if (list_length(sequences) > 0) + { + List *rels; + + rels = OpenRelationList(sequences, PUB_OBJTYPE_SEQUENCE); + CheckObjSchemaNotAlreadyInPublication(rels, sequences_schemaidlist, + PUBLICATIONOBJ_SEQUENCE); + PublicationAddRelations(puboid, rels, true, NULL); + CloseRelationList(rels); + } + + /* tables added through a schema */ + if (list_length(tables_schemaidlist) > 0) + { + /* + * Schema lock is held until the publication is created to prevent + * concurrent schema deletion. + */ + LockSchemaList(tables_schemaidlist); + PublicationAddSchemas(puboid, + tables_schemaidlist, PUB_OBJTYPE_TABLE, + true, NULL); } - if (list_length(schemaidlist) > 0) + /* sequences added through a schema */ + if (list_length(sequences_schemaidlist) > 0) { /* * Schema lock is held until the publication is created to prevent * concurrent schema deletion. */ - LockSchemaList(schemaidlist); - PublicationAddSchemas(puboid, schemaidlist, true, NULL); + LockSchemaList(sequences_schemaidlist); + PublicationAddSchemas(puboid, + sequences_schemaidlist, PUB_OBJTYPE_SEQUENCE, + true, NULL); } } @@ -799,6 +914,7 @@ AlterPublicationOptions(ParseState *pstate, AlterPublicationStmt *stmt, AccessShareLock); root_relids = GetPublicationRelations(pubform->oid, + PUB_OBJTYPE_TABLE, PUBLICATION_PART_ROOT); foreach(lc, root_relids) @@ -857,6 +973,9 @@ AlterPublicationOptions(ParseState *pstate, AlterPublicationStmt *stmt, values[Anum_pg_publication_pubtruncate - 1] = BoolGetDatum(pubactions.pubtruncate); replaces[Anum_pg_publication_pubtruncate - 1] = true; + + values[Anum_pg_publication_pubsequence - 1] = BoolGetDatum(pubactions.pubsequence); + replaces[Anum_pg_publication_pubsequence - 1] = true; } if (publish_via_partition_root_given) @@ -876,7 +995,7 @@ AlterPublicationOptions(ParseState *pstate, AlterPublicationStmt *stmt, pubform = (Form_pg_publication) GETSTRUCT(tup); /* Invalidate the relcache. */ - if (pubform->puballtables) + if (pubform->puballtables || pubform->puballsequences) { CacheInvalidateRelcacheAll(); } @@ -892,6 +1011,7 @@ AlterPublicationOptions(ParseState *pstate, AlterPublicationStmt *stmt, */ if (root_relids == NIL) relids = GetPublicationRelations(pubform->oid, + PUB_OBJTYPE_TABLE, PUBLICATION_PART_ALL); else { @@ -905,7 +1025,20 @@ AlterPublicationOptions(ParseState *pstate, AlterPublicationStmt *stmt, lfirst_oid(lc)); } + /* tables */ + schemarelids = GetAllSchemaPublicationRelations(pubform->oid, + PUB_OBJTYPE_TABLE, + PUBLICATION_PART_ALL); + relids = list_concat_unique_oid(relids, schemarelids); + + /* sequences */ + relids = list_concat_unique_oid(relids, + GetPublicationRelations(pubform->oid, + PUB_OBJTYPE_SEQUENCE, + PUBLICATION_PART_ALL)); + schemarelids = GetAllSchemaPublicationRelations(pubform->oid, + PUB_OBJTYPE_SEQUENCE, PUBLICATION_PART_ALL); relids = list_concat_unique_oid(relids, schemarelids); @@ -960,7 +1093,7 @@ AlterPublicationTables(AlterPublicationStmt *stmt, HeapTuple tup, if (!tables && stmt->action != AP_SetObjects) return; - rels = OpenTableList(tables); + rels = OpenRelationList(tables, PUB_OBJTYPE_TABLE); if (stmt->action == AP_AddObjects) { @@ -970,19 +1103,22 @@ AlterPublicationTables(AlterPublicationStmt *stmt, HeapTuple tup, * Check if the relation is member of the existing schema in the * publication or member of the schema list specified. */ - schemas = list_concat_copy(schemaidlist, GetPublicationSchemas(pubid)); + schemas = list_concat_copy(schemaidlist, + GetPublicationSchemas(pubid, + PUB_OBJTYPE_TABLE)); CheckObjSchemaNotAlreadyInPublication(rels, schemas, PUBLICATIONOBJ_TABLE); TransformPubWhereClauses(rels, queryString, pubform->pubviaroot); - PublicationAddTables(pubid, rels, false, stmt); + PublicationAddRelations(pubid, rels, false, stmt); } else if (stmt->action == AP_DropObjects) - PublicationDropTables(pubid, rels, false); + PublicationDropRelations(pubid, rels, false); else /* AP_SetObjects */ { List *oldrelids = GetPublicationRelations(pubid, + PUB_OBJTYPE_TABLE, PUBLICATION_PART_ROOT); List *delrels = NIL; ListCell *oldlc; @@ -1064,18 +1200,18 @@ AlterPublicationTables(AlterPublicationStmt *stmt, HeapTuple tup, } /* And drop them. */ - PublicationDropTables(pubid, delrels, true); + PublicationDropRelations(pubid, delrels, true); /* * Don't bother calculating the difference for adding, we'll catch and * skip existing ones when doing catalog update. */ - PublicationAddTables(pubid, rels, true, stmt); + PublicationAddRelations(pubid, rels, true, stmt); - CloseTableList(delrels); + CloseRelationList(delrels); } - CloseTableList(rels); + CloseRelationList(rels); } /* @@ -1085,7 +1221,8 @@ AlterPublicationTables(AlterPublicationStmt *stmt, HeapTuple tup, */ static void AlterPublicationSchemas(AlterPublicationStmt *stmt, - HeapTuple tup, List *schemaidlist) + HeapTuple tup, List *schemaidlist, + char objectType) { Form_pg_publication pubform = (Form_pg_publication) GETSTRUCT(tup); @@ -1107,20 +1244,20 @@ AlterPublicationSchemas(AlterPublicationStmt *stmt, List *rels; List *reloids; - reloids = GetPublicationRelations(pubform->oid, PUBLICATION_PART_ROOT); + reloids = GetPublicationRelations(pubform->oid, objectType, PUBLICATION_PART_ROOT); rels = OpenRelIdList(reloids); CheckObjSchemaNotAlreadyInPublication(rels, schemaidlist, PUBLICATIONOBJ_TABLES_IN_SCHEMA); - CloseTableList(rels); - PublicationAddSchemas(pubform->oid, schemaidlist, false, stmt); + CloseRelationList(rels); + PublicationAddSchemas(pubform->oid, schemaidlist, objectType, false, stmt); } else if (stmt->action == AP_DropObjects) - PublicationDropSchemas(pubform->oid, schemaidlist, false); + PublicationDropSchemas(pubform->oid, schemaidlist, objectType, false); else /* AP_SetObjects */ { - List *oldschemaids = GetPublicationSchemas(pubform->oid); + List *oldschemaids = GetPublicationSchemas(pubform->oid, objectType); List *delschemas = NIL; /* Identify which schemas should be dropped */ @@ -1133,13 +1270,13 @@ AlterPublicationSchemas(AlterPublicationStmt *stmt, LockSchemaList(delschemas); /* And drop them */ - PublicationDropSchemas(pubform->oid, delschemas, true); + PublicationDropSchemas(pubform->oid, delschemas, objectType, true); /* * Don't bother calculating the difference for adding, we'll catch and * skip existing ones when doing catalog update. */ - PublicationAddSchemas(pubform->oid, schemaidlist, true, stmt); + PublicationAddSchemas(pubform->oid, schemaidlist, objectType, true, stmt); } } @@ -1149,12 +1286,13 @@ AlterPublicationSchemas(AlterPublicationStmt *stmt, */ static void CheckAlterPublication(AlterPublicationStmt *stmt, HeapTuple tup, - List *tables, List *schemaidlist) + List *tables, List *tables_schemaidlist, + List *sequences, List *sequences_schemaidlist) { Form_pg_publication pubform = (Form_pg_publication) GETSTRUCT(tup); if ((stmt->action == AP_AddObjects || stmt->action == AP_SetObjects) && - schemaidlist && !superuser()) + (tables_schemaidlist || sequences_schemaidlist) && !superuser()) ereport(ERROR, (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), errmsg("must be superuser to add or set schemas"))); @@ -1163,13 +1301,24 @@ CheckAlterPublication(AlterPublicationStmt *stmt, HeapTuple tup, * Check that user is allowed to manipulate the publication tables in * schema */ - if (schemaidlist && pubform->puballtables) + if (tables_schemaidlist && pubform->puballtables) ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), errmsg("publication \"%s\" is defined as FOR ALL TABLES", NameStr(pubform->pubname)), errdetail("Tables from schema cannot be added to, dropped from, or set on FOR ALL TABLES publications."))); + /* + * Check that user is allowed to manipulate the publication sequences in + * schema + */ + if (sequences_schemaidlist && pubform->puballsequences) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("publication \"%s\" is defined as FOR ALL SEQUENCES", + NameStr(pubform->pubname)), + errdetail("Sequences from schema cannot be added to, dropped from, or set on FOR ALL SEQUENCES publications."))); + /* Check that user is allowed to manipulate the publication tables. */ if (tables && pubform->puballtables) ereport(ERROR, @@ -1177,6 +1326,107 @@ CheckAlterPublication(AlterPublicationStmt *stmt, HeapTuple tup, errmsg("publication \"%s\" is defined as FOR ALL TABLES", NameStr(pubform->pubname)), errdetail("Tables cannot be added to or dropped from FOR ALL TABLES publications."))); + + /* Check that user is allowed to manipulate the publication tables. */ + if (sequences && pubform->puballsequences) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("publication \"%s\" is defined as FOR ALL SEQUENCES", + NameStr(pubform->pubname)), + errdetail("Sequences cannot be added to or dropped from FOR ALL SEQUENCES publications."))); +} + +/* + * Add or remove sequence to/from publication. + */ +static void +AlterPublicationSequences(AlterPublicationStmt *stmt, HeapTuple tup, + List *sequences, List *schemaidlist) +{ + List *rels = NIL; + Form_pg_publication pubform = (Form_pg_publication) GETSTRUCT(tup); + Oid pubid = pubform->oid; + + /* + * It is quite possible that for the SET case user has not specified any + * tables in which case we need to remove all the existing tables. + */ + if (!sequences && stmt->action != AP_SetObjects) + return; + + rels = OpenRelationList(sequences, PUB_OBJTYPE_SEQUENCE); + + if (stmt->action == AP_AddObjects) + { + List *schemas = NIL; + + /* + * Check if the relation is member of the existing schema in the + * publication or member of the schema list specified. + */ + schemas = list_concat_copy(schemaidlist, + GetPublicationSchemas(pubid, + PUB_OBJTYPE_SEQUENCE)); + CheckObjSchemaNotAlreadyInPublication(rels, schemas, + PUBLICATIONOBJ_SEQUENCE); + PublicationAddRelations(pubid, rels, false, stmt); + } + else if (stmt->action == AP_DropObjects) + PublicationDropRelations(pubid, rels, false); + else /* DEFELEM_SET */ + { + List *oldrelids = GetPublicationRelations(pubid, + PUB_OBJTYPE_SEQUENCE, + PUBLICATION_PART_ROOT); + List *delrels = NIL; + ListCell *oldlc; + + CheckObjSchemaNotAlreadyInPublication(rels, schemaidlist, + PUBLICATIONOBJ_SEQUENCE); + + /* Calculate which relations to drop. */ + foreach(oldlc, oldrelids) + { + Oid oldrelid = lfirst_oid(oldlc); + ListCell *newlc; + PublicationRelInfo *oldrel; + bool found = false; + + foreach(newlc, rels) + { + PublicationRelInfo *newpubrel; + + newpubrel = (PublicationRelInfo *) lfirst(newlc); + if (RelationGetRelid(newpubrel->relation) == oldrelid) + { + found = true; + break; + } + } + /* Not yet in the list, open it and add to the list */ + if (!found) + { + oldrel = palloc(sizeof(PublicationRelInfo)); + oldrel->whereClause = NULL; + oldrel->relation = table_open(oldrelid, + ShareUpdateExclusiveLock); + delrels = lappend(delrels, oldrel); + } + } + + /* And drop them. */ + PublicationDropRelations(pubid, delrels, true); + + /* + * Don't bother calculating the difference for adding, we'll catch and + * skip existing ones when doing catalog update. + */ + PublicationAddRelations(pubid, rels, true, stmt); + + CloseRelationList(delrels); + } + + CloseRelationList(rels); } /* @@ -1214,14 +1464,22 @@ AlterPublication(ParseState *pstate, AlterPublicationStmt *stmt) AlterPublicationOptions(pstate, stmt, rel, tup); else { - List *relations = NIL; + List *tables = NIL; + List *sequences = NIL; + List *tables_schemaidlist = NIL; + List *sequences_schemaidlist = NIL; List *schemaidlist = NIL; Oid pubid = pubform->oid; - ObjectsInPublicationToOids(stmt->pubobjects, pstate, &relations, + ObjectsInPublicationToOids(stmt->pubobjects, pstate, + &tables, &sequences, + &tables_schemaidlist, + &sequences_schemaidlist, &schemaidlist); - CheckAlterPublication(stmt, tup, relations, schemaidlist); + CheckAlterPublication(stmt, tup, + tables, tables_schemaidlist, + sequences, sequences_schemaidlist); heap_freetuple(tup); @@ -1249,9 +1507,16 @@ AlterPublication(ParseState *pstate, AlterPublicationStmt *stmt) errmsg("publication \"%s\" does not exist", stmt->pubname)); - AlterPublicationTables(stmt, tup, relations, schemaidlist, + AlterPublicationTables(stmt, tup, tables, tables_schemaidlist, pstate->p_sourcetext); - AlterPublicationSchemas(stmt, tup, schemaidlist); + + AlterPublicationSequences(stmt, tup, sequences, sequences_schemaidlist); + + AlterPublicationSchemas(stmt, tup, tables_schemaidlist, + PUB_OBJTYPE_TABLE); + + AlterPublicationSchemas(stmt, tup, sequences_schemaidlist, + PUB_OBJTYPE_SEQUENCE); } /* Cleanup. */ @@ -1319,7 +1584,7 @@ RemovePublicationById(Oid pubid) pubform = (Form_pg_publication) GETSTRUCT(tup); /* Invalidate relcache so that publication info is rebuilt. */ - if (pubform->puballtables) + if (pubform->puballtables || pubform->puballsequences) CacheInvalidateRelcacheAll(); CatalogTupleDelete(rel, &tup->t_self); @@ -1355,6 +1620,7 @@ RemovePublicationSchemaById(Oid psoid) * partitions. */ schemaRels = GetSchemaPublicationRelations(pubsch->pnnspid, + pubsch->pntype, PUBLICATION_PART_ALL); InvalidatePublicationRels(schemaRels); @@ -1397,29 +1663,45 @@ OpenRelIdList(List *relids) * add them to a publication. */ static List * -OpenTableList(List *tables) +OpenRelationList(List *rels, char objectType) { List *relids = NIL; - List *rels = NIL; + List *result = NIL; ListCell *lc; List *relids_with_rf = NIL; /* * Open, share-lock, and check all the explicitly-specified relations */ - foreach(lc, tables) + foreach(lc, rels) { PublicationTable *t = lfirst_node(PublicationTable, lc); bool recurse = t->relation->inh; Relation rel; Oid myrelid; PublicationRelInfo *pub_rel; + char myrelkind; /* Allow query cancel in case this takes a long time */ CHECK_FOR_INTERRUPTS(); rel = table_openrv(t->relation, ShareUpdateExclusiveLock); myrelid = RelationGetRelid(rel); + myrelkind = get_rel_relkind(myrelid); + + /* + * Make sure the relkind matches the expected object type. This may + * happen e.g. when adding a sequence using ADD TABLE or a table + * using ADD SEQUENCE). + * + * XXX We let through unsupported object types (views etc.). Those + * will be caught later in check_publication_add_relation. + */ + if (pub_get_object_type_for_relkind(myrelkind) != PUB_OBJTYPE_UNSUPPORTED && + pub_get_object_type_for_relkind(myrelkind) != objectType) + ereport(ERROR, + errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("object type does not match type expected by command")); /* * Filter out duplicates if user specifies "foo, foo". @@ -1444,7 +1726,7 @@ OpenTableList(List *tables) pub_rel = palloc(sizeof(PublicationRelInfo)); pub_rel->relation = rel; pub_rel->whereClause = t->whereClause; - rels = lappend(rels, pub_rel); + result = lappend(result, pub_rel); relids = lappend_oid(relids, myrelid); if (t->whereClause) @@ -1498,7 +1780,7 @@ OpenTableList(List *tables) pub_rel->relation = rel; /* child inherits WHERE clause from parent */ pub_rel->whereClause = t->whereClause; - rels = lappend(rels, pub_rel); + result = lappend(result, pub_rel); relids = lappend_oid(relids, childrelid); if (t->whereClause) @@ -1510,14 +1792,14 @@ OpenTableList(List *tables) list_free(relids); list_free(relids_with_rf); - return rels; + return result; } /* * Close all relations in the list. */ static void -CloseTableList(List *rels) +CloseRelationList(List *rels) { ListCell *lc; @@ -1565,12 +1847,12 @@ LockSchemaList(List *schemalist) * Add listed tables to the publication. */ static void -PublicationAddTables(Oid pubid, List *rels, bool if_not_exists, +PublicationAddRelations(Oid pubid, List *rels, bool if_not_exists, AlterPublicationStmt *stmt) { ListCell *lc; - Assert(!stmt || !stmt->for_all_tables); + Assert(!stmt || !stmt->for_all_objects); foreach(lc, rels) { @@ -1599,7 +1881,7 @@ PublicationAddTables(Oid pubid, List *rels, bool if_not_exists, * Remove listed tables from the publication. */ static void -PublicationDropTables(Oid pubid, List *rels, bool missing_ok) +PublicationDropRelations(Oid pubid, List *rels, bool missing_ok) { ObjectAddress obj; ListCell *lc; @@ -1639,19 +1921,19 @@ PublicationDropTables(Oid pubid, List *rels, bool missing_ok) * Add listed schemas to the publication. */ static void -PublicationAddSchemas(Oid pubid, List *schemas, bool if_not_exists, - AlterPublicationStmt *stmt) +PublicationAddSchemas(Oid pubid, List *schemas, char objectType, + bool if_not_exists, AlterPublicationStmt *stmt) { ListCell *lc; - Assert(!stmt || !stmt->for_all_tables); + Assert(!stmt || !stmt->for_all_objects); foreach(lc, schemas) { Oid schemaid = lfirst_oid(lc); ObjectAddress obj; - obj = publication_add_schema(pubid, schemaid, if_not_exists); + obj = publication_add_schema(pubid, schemaid, objectType, if_not_exists); if (stmt) { EventTriggerCollectSimpleCommand(obj, InvalidObjectAddress, @@ -1667,7 +1949,7 @@ PublicationAddSchemas(Oid pubid, List *schemas, bool if_not_exists, * Remove listed schemas from the publication. */ static void -PublicationDropSchemas(Oid pubid, List *schemas, bool missing_ok) +PublicationDropSchemas(Oid pubid, List *schemas, char objectType, bool missing_ok) { ObjectAddress obj; ListCell *lc; @@ -1677,10 +1959,11 @@ PublicationDropSchemas(Oid pubid, List *schemas, bool missing_ok) { Oid schemaid = lfirst_oid(lc); - psid = GetSysCacheOid2(PUBLICATIONNAMESPACEMAP, + psid = GetSysCacheOid3(PUBLICATIONNAMESPACEMAP, Anum_pg_publication_namespace_oid, ObjectIdGetDatum(schemaid), - ObjectIdGetDatum(pubid)); + ObjectIdGetDatum(pubid), + CharGetDatum(objectType)); if (!OidIsValid(psid)) { if (missing_ok) @@ -1735,6 +2018,13 @@ AlterPublicationOwner_internal(Relation rel, HeapTuple tup, Oid newOwnerId) NameStr(form->pubname)), errhint("The owner of a FOR ALL TABLES publication must be a superuser."))); + if (form->puballsequences && !superuser_arg(newOwnerId)) + ereport(ERROR, + (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), + errmsg("permission denied to change owner of publication \"%s\"", + NameStr(form->pubname)), + errhint("The owner of a FOR ALL SEQUENCES publication must be a superuser."))); + if (!superuser_arg(newOwnerId) && is_schema_publication(form->oid)) ereport(ERROR, (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), |
