summaryrefslogtreecommitdiff
path: root/src/backend/executor
diff options
context:
space:
mode:
authorTom Lane2007-06-11 01:16:30 +0000
committerTom Lane2007-06-11 01:16:30 +0000
commit6808f1b1de0ebcd4af558ba84c3226b2027f55ea (patch)
treeebd12580d3aaca6ec79b5d99563a1eff02451e88 /src/backend/executor
parent85d72f05167b87bc44464b2eabea8538f1fd1e45 (diff)
Support UPDATE/DELETE WHERE CURRENT OF cursor_name, per SQL standard.
Along the way, allow FOR UPDATE in non-WITH-HOLD cursors; there may once have been a reason to disallow that, but it seems to work now, and it's really rather necessary if you want to select a row via a cursor and then update it in a concurrent-safe fashion. Original patch by Arul Shaji, rather heavily editorialized by Tom Lane.
Diffstat (limited to 'src/backend/executor')
-rw-r--r--src/backend/executor/Makefile4
-rw-r--r--src/backend/executor/execCurrent.c185
-rw-r--r--src/backend/executor/execMain.c20
-rw-r--r--src/backend/executor/execQual.c43
-rw-r--r--src/backend/executor/nodeTidscan.c25
5 files changed, 270 insertions, 7 deletions
diff --git a/src/backend/executor/Makefile b/src/backend/executor/Makefile
index 55256d9e469..cb4ab9dc31a 100644
--- a/src/backend/executor/Makefile
+++ b/src/backend/executor/Makefile
@@ -4,7 +4,7 @@
# Makefile for executor
#
# IDENTIFICATION
-# $PostgreSQL: pgsql/src/backend/executor/Makefile,v 1.25 2007/01/20 17:16:11 petere Exp $
+# $PostgreSQL: pgsql/src/backend/executor/Makefile,v 1.26 2007/06/11 01:16:22 tgl Exp $
#
#-------------------------------------------------------------------------
@@ -12,7 +12,7 @@ subdir = src/backend/executor
top_builddir = ../../..
include $(top_builddir)/src/Makefile.global
-OBJS = execAmi.o execGrouping.o execJunk.o execMain.o \
+OBJS = execAmi.o execCurrent.o execGrouping.o execJunk.o execMain.o \
execProcnode.o execQual.o execScan.o execTuples.o \
execUtils.o functions.o instrument.o nodeAppend.o nodeAgg.o \
nodeBitmapAnd.o nodeBitmapOr.o \
diff --git a/src/backend/executor/execCurrent.c b/src/backend/executor/execCurrent.c
new file mode 100644
index 00000000000..ce95d58b81b
--- /dev/null
+++ b/src/backend/executor/execCurrent.c
@@ -0,0 +1,185 @@
+/*-------------------------------------------------------------------------
+ *
+ * execCurrent.c
+ * executor support for WHERE CURRENT OF cursor
+ *
+ * Portions Copyright (c) 1996-2007, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * $PostgreSQL: pgsql/src/backend/executor/execCurrent.c,v 1.1 2007/06/11 01:16:22 tgl Exp $
+ *
+ *-------------------------------------------------------------------------
+ */
+#include "postgres.h"
+
+#include "executor/executor.h"
+#include "utils/lsyscache.h"
+#include "utils/portal.h"
+
+
+static ScanState *search_plan_tree(PlanState *node, Oid table_oid);
+
+
+/*
+ * execCurrentOf
+ *
+ * Given the name of a cursor and the OID of a table, determine which row
+ * of the table is currently being scanned by the cursor, and return its
+ * TID into *current_tid.
+ *
+ * Returns TRUE if a row was identified. Returns FALSE if the cursor is valid
+ * for the table but is not currently scanning a row of the table (this is a
+ * legal situation in inheritance cases). Raises error if cursor is not a
+ * valid updatable scan of the specified table.
+ */
+bool
+execCurrentOf(char *cursor_name, Oid table_oid,
+ ItemPointer current_tid)
+{
+ char *table_name;
+ Portal portal;
+ QueryDesc *queryDesc;
+ ScanState *scanstate;
+ HeapTuple tup;
+
+ /* Fetch table name for possible use in error messages */
+ table_name = get_rel_name(table_oid);
+ if (table_name == NULL)
+ elog(ERROR, "cache lookup failed for relation %u", table_oid);
+
+ /* Find the cursor's portal */
+ portal = GetPortalByName(cursor_name);
+ if (!PortalIsValid(portal))
+ ereport(ERROR,
+ (errcode(ERRCODE_UNDEFINED_CURSOR),
+ errmsg("cursor \"%s\" does not exist", cursor_name)));
+
+ /*
+ * We have to watch out for non-SELECT queries as well as held cursors,
+ * both of which may have null queryDesc.
+ */
+ if (portal->strategy != PORTAL_ONE_SELECT)
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_CURSOR_STATE),
+ errmsg("cursor \"%s\" is not a SELECT query",
+ cursor_name)));
+ queryDesc = PortalGetQueryDesc(portal);
+ if (queryDesc == NULL)
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_CURSOR_STATE),
+ errmsg("cursor \"%s\" is held from a previous transaction",
+ cursor_name)));
+
+ /*
+ * Dig through the cursor's plan to find the scan node. Fail if it's
+ * not there or buried underneath aggregation.
+ */
+ scanstate = search_plan_tree(ExecGetActivePlanTree(queryDesc),
+ table_oid);
+ if (!scanstate)
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_CURSOR_STATE),
+ errmsg("cursor \"%s\" is not a simply updatable scan of table \"%s\"",
+ cursor_name, table_name)));
+
+ /*
+ * The cursor must have a current result row: per the SQL spec, it's
+ * an error if not. We test this at the top level, rather than at
+ * the scan node level, because in inheritance cases any one table
+ * scan could easily not be on a row. We want to return false, not
+ * raise error, if the passed-in table OID is for one of the inactive
+ * scans.
+ */
+ if (portal->atStart || portal->atEnd)
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_CURSOR_STATE),
+ errmsg("cursor \"%s\" is not positioned on a row",
+ cursor_name)));
+
+ /* Now OK to return false if we found an inactive scan */
+ if (TupIsNull(scanstate->ss_ScanTupleSlot))
+ return false;
+
+ tup = scanstate->ss_ScanTupleSlot->tts_tuple;
+ if (tup == NULL)
+ elog(ERROR, "CURRENT OF applied to non-materialized tuple");
+ Assert(tup->t_tableOid == table_oid);
+
+ *current_tid = tup->t_self;
+
+ return true;
+}
+
+/*
+ * search_plan_tree
+ *
+ * Search through a PlanState tree for a scan node on the specified table.
+ * Return NULL if not found or multiple candidates.
+ */
+static ScanState *
+search_plan_tree(PlanState *node, Oid table_oid)
+{
+ if (node == NULL)
+ return NULL;
+ switch (nodeTag(node))
+ {
+ /*
+ * scan nodes can all be treated alike
+ */
+ case T_SeqScanState:
+ case T_IndexScanState:
+ case T_BitmapHeapScanState:
+ case T_TidScanState:
+ {
+ ScanState *sstate = (ScanState *) node;
+
+ if (RelationGetRelid(sstate->ss_currentRelation) == table_oid)
+ return sstate;
+ break;
+ }
+
+ /*
+ * For Append, we must look through the members; watch out for
+ * multiple matches (possible if it was from UNION ALL)
+ */
+ case T_AppendState:
+ {
+ AppendState *astate = (AppendState *) node;
+ ScanState *result = NULL;
+ int i;
+
+ for (i = 0; i < astate->as_nplans; i++)
+ {
+ ScanState *elem = search_plan_tree(astate->appendplans[i],
+ table_oid);
+
+ if (!elem)
+ continue;
+ if (result)
+ return NULL; /* multiple matches */
+ result = elem;
+ }
+ return result;
+ }
+
+ /*
+ * Result and Limit can be descended through (these are safe
+ * because they always return their input's current row)
+ */
+ case T_ResultState:
+ case T_LimitState:
+ return search_plan_tree(node->lefttree, table_oid);
+
+ /*
+ * SubqueryScan too, but it keeps the child in a different place
+ */
+ case T_SubqueryScanState:
+ return search_plan_tree(((SubqueryScanState *) node)->subplan,
+ table_oid);
+
+ default:
+ /* Otherwise, assume we can't descend through it */
+ break;
+ }
+ return NULL;
+}
diff --git a/src/backend/executor/execMain.c b/src/backend/executor/execMain.c
index 82b17b19f01..dfd1a84ab77 100644
--- a/src/backend/executor/execMain.c
+++ b/src/backend/executor/execMain.c
@@ -26,7 +26,7 @@
*
*
* IDENTIFICATION
- * $PostgreSQL: pgsql/src/backend/executor/execMain.c,v 1.294 2007/06/03 17:07:00 tgl Exp $
+ * $PostgreSQL: pgsql/src/backend/executor/execMain.c,v 1.295 2007/06/11 01:16:22 tgl Exp $
*
*-------------------------------------------------------------------------
*/
@@ -2368,6 +2368,24 @@ EvalPlanQualStop(evalPlanQual *epq)
epq->planstate = NULL;
}
+/*
+ * ExecGetActivePlanTree --- get the active PlanState tree from a QueryDesc
+ *
+ * Ordinarily this is just the one mentioned in the QueryDesc, but if we
+ * are looking at a row returned by the EvalPlanQual machinery, we need
+ * to look at the subsidiary state instead.
+ */
+PlanState *
+ExecGetActivePlanTree(QueryDesc *queryDesc)
+{
+ EState *estate = queryDesc->estate;
+
+ if (estate && estate->es_useEvalPlan && estate->es_evalPlanQual != NULL)
+ return estate->es_evalPlanQual->planstate;
+ else
+ return queryDesc->planstate;
+}
+
/*
* Support for SELECT INTO (a/k/a CREATE TABLE AS)
diff --git a/src/backend/executor/execQual.c b/src/backend/executor/execQual.c
index fd54b89f0a5..5549142e703 100644
--- a/src/backend/executor/execQual.c
+++ b/src/backend/executor/execQual.c
@@ -8,7 +8,7 @@
*
*
* IDENTIFICATION
- * $PostgreSQL: pgsql/src/backend/executor/execQual.c,v 1.218 2007/06/05 21:31:04 tgl Exp $
+ * $PostgreSQL: pgsql/src/backend/executor/execQual.c,v 1.219 2007/06/11 01:16:22 tgl Exp $
*
*-------------------------------------------------------------------------
*/
@@ -151,6 +151,8 @@ static Datum ExecEvalCoerceViaIO(CoerceViaIOState *iostate,
static Datum ExecEvalArrayCoerceExpr(ArrayCoerceExprState *astate,
ExprContext *econtext,
bool *isNull, ExprDoneCond *isDone);
+static Datum ExecEvalCurrentOfExpr(ExprState *exprstate, ExprContext *econtext,
+ bool *isNull, ExprDoneCond *isDone);
/* ----------------------------------------------------------------
@@ -3618,6 +3620,41 @@ ExecEvalArrayCoerceExpr(ArrayCoerceExprState *astate,
astate->amstate);
}
+/* ----------------------------------------------------------------
+ * ExecEvalCurrentOfExpr
+ *
+ * Normally, the planner will convert CURRENT OF into a TidScan qualification,
+ * but we have plain execQual support in case it doesn't.
+ * ----------------------------------------------------------------
+ */
+static Datum
+ExecEvalCurrentOfExpr(ExprState *exprstate, ExprContext *econtext,
+ bool *isNull, ExprDoneCond *isDone)
+{
+ CurrentOfExpr *cexpr = (CurrentOfExpr *) exprstate->expr;
+ bool result;
+ HeapTuple tup;
+ ItemPointerData cursor_tid;
+
+ if (isDone)
+ *isDone = ExprSingleResult;
+ *isNull = false;
+
+ Assert(cexpr->cvarno != INNER);
+ Assert(cexpr->cvarno != OUTER);
+ Assert(!TupIsNull(econtext->ecxt_scantuple));
+ tup = econtext->ecxt_scantuple->tts_tuple;
+ if (tup == NULL)
+ elog(ERROR, "CURRENT OF applied to non-materialized tuple");
+
+ if (execCurrentOf(cexpr->cursor_name, tup->t_tableOid, &cursor_tid))
+ result = ItemPointerEquals(&cursor_tid, &(tup->t_self));
+ else
+ result = false;
+
+ return BoolGetDatum(result);
+}
+
/*
* ExecEvalExprSwitchContext
@@ -4266,6 +4303,10 @@ ExecInitExpr(Expr *node, PlanState *parent)
state = (ExprState *) cstate;
}
break;
+ case T_CurrentOfExpr:
+ state = (ExprState *) makeNode(ExprState);
+ state->evalfunc = ExecEvalCurrentOfExpr;
+ break;
case T_TargetEntry:
{
TargetEntry *tle = (TargetEntry *) node;
diff --git a/src/backend/executor/nodeTidscan.c b/src/backend/executor/nodeTidscan.c
index 9b6724537e7..986ff1f3f19 100644
--- a/src/backend/executor/nodeTidscan.c
+++ b/src/backend/executor/nodeTidscan.c
@@ -8,7 +8,7 @@
*
*
* IDENTIFICATION
- * $PostgreSQL: pgsql/src/backend/executor/nodeTidscan.c,v 1.53 2007/01/05 22:19:28 momjian Exp $
+ * $PostgreSQL: pgsql/src/backend/executor/nodeTidscan.c,v 1.54 2007/06/11 01:16:22 tgl Exp $
*
*-------------------------------------------------------------------------
*/
@@ -61,8 +61,8 @@ TidListCreate(TidScanState *tidstate)
/*
* We initialize the array with enough slots for the case that all quals
- * are simple OpExprs. If there's any ScalarArrayOpExprs, we may have to
- * enlarge the array.
+ * are simple OpExprs or CurrentOfExprs. If there are any
+ * ScalarArrayOpExprs, we may have to enlarge the array.
*/
numAllocTids = list_length(evalList);
tidList = (ItemPointerData *)
@@ -148,6 +148,25 @@ TidListCreate(TidScanState *tidstate)
pfree(ipdatums);
pfree(ipnulls);
}
+ else if (expr && IsA(expr, CurrentOfExpr))
+ {
+ CurrentOfExpr *cexpr = (CurrentOfExpr *) expr;
+ ItemPointerData cursor_tid;
+
+ if (execCurrentOf(cexpr->cursor_name,
+ RelationGetRelid(tidstate->ss.ss_currentRelation),
+ &cursor_tid))
+ {
+ if (numTids >= numAllocTids)
+ {
+ numAllocTids *= 2;
+ tidList = (ItemPointerData *)
+ repalloc(tidList,
+ numAllocTids * sizeof(ItemPointerData));
+ }
+ tidList[numTids++] = cursor_tid;
+ }
+ }
else
elog(ERROR, "could not identify CTID expression");
}