diff options
Diffstat (limited to 'src/backend/storage/lmgr')
| -rw-r--r-- | src/backend/storage/lmgr/Makefile.inc | 14 | ||||
| -rw-r--r-- | src/backend/storage/lmgr/README | 93 | ||||
| -rw-r--r-- | src/backend/storage/lmgr/lmgr.c | 933 | ||||
| -rw-r--r-- | src/backend/storage/lmgr/lock.c | 1020 | ||||
| -rw-r--r-- | src/backend/storage/lmgr/multi.c | 415 | ||||
| -rw-r--r-- | src/backend/storage/lmgr/proc.c | 826 | ||||
| -rw-r--r-- | src/backend/storage/lmgr/single.c | 86 |
7 files changed, 3387 insertions, 0 deletions
diff --git a/src/backend/storage/lmgr/Makefile.inc b/src/backend/storage/lmgr/Makefile.inc new file mode 100644 index 00000000000..ac507558b57 --- /dev/null +++ b/src/backend/storage/lmgr/Makefile.inc @@ -0,0 +1,14 @@ +#------------------------------------------------------------------------- +# +# Makefile.inc-- +# Makefile for storage/lmgr +# +# Copyright (c) 1994, Regents of the University of California +# +# +# IDENTIFICATION +# $Header: /cvsroot/pgsql/src/backend/storage/lmgr/Attic/Makefile.inc,v 1.1.1.1 1996/07/09 06:21:55 scrappy Exp $ +# +#------------------------------------------------------------------------- + +SUBSRCS+= lmgr.c lock.c multi.c proc.c single.c diff --git a/src/backend/storage/lmgr/README b/src/backend/storage/lmgr/README new file mode 100644 index 00000000000..e382003f2a4 --- /dev/null +++ b/src/backend/storage/lmgr/README @@ -0,0 +1,93 @@ +$Header: /cvsroot/pgsql/src/backend/storage/lmgr/README,v 1.1.1.1 1996/07/09 06:21:55 scrappy Exp $ + +This file is an attempt to save me (and future code maintainers) some +time and a lot of headaches. The existing lock manager code at the time +of this writing (June 16 1992) can best be described as confusing. The +complexity seems inherent in lock manager functionality, but variable +names chosen in the current implementation really confuse me everytime +I have to track down a bug. Also, what gets done where and by whom isn't +always clear.... + +Starting with the data structures the lock manager relies upon... + +(NOTE - these will undoubtedly change over time and it is likely +that this file won't always be updated along with the structs.) + +The lock manager's LOCK: + +tag - + The key fields that are used for hashing locks in the shared memory + lock hash table. This is kept as a separate struct to ensure that we + always zero out the correct number of bytes. This is a problem as + part of the tag is an itempointer which is 6 bytes and causes 2 + additional bytes to be added as padding. + + tag.relId - + Uniquely identifies the relation that the lock corresponds to. + + tag.dbId - + Uniquely identifies the database in which the relation lives. If + this is a shared system relation (e.g. pg_user) the dbId should be + set to 0. + + tag.tupleId - + Uniquely identifies the block/page within the relation and the + tuple within the block. If we are setting a table level lock + both the blockId and tupleId (in an item pointer this is called + the position) are set to invalid, if it is a page level lock the + blockId is valid, while the tuleId is still invalid. Finally if + this is a tuple level lock (we currently never do this) then both + the blockId and tupleId are set to valid specifications. This is + how we get the appearance of a multi-level lock table while using + only a single table (see Gray's paper on 2 phase locking if + you are puzzled about how multi-level lock tables work). + +mask - + This field indicates what types of locks are currently held in the + given lock. It is used (against the lock table's conflict table) + to determine if the new lock request will conflict with existing + lock types held. Conficts are determined by bitwise AND operations + between the mask and the conflict table entry for the given lock type + to be set. The current representation is that each bit (1 through 5) + is set when that lock type (WRITE, READ, WRITE INTENT, READ INTENT, EXTEND) + has been acquired for the lock. + +waitProcs - + This is a shared memory queue of all process structures corresponding to + a backend that is waiting (sleeping) until another backend releases this + lock. The process structure holds the information needed to determine + if it should be woken up when this lock is released. If, for example, + we are releasing a read lock and the process is sleeping trying to acquire + a read lock then there is no point in waking it since the lock being + released isn't what caused it to sleep in the first place. There will + be more on this below (when I get to releasing locks and waking sleeping + process routines). + +nHolding - + Keeps a count of how many times this lock has been attempted to be + acquired. The count includes attempts by processes which were put + to sleep due to conflicts. It also counts the same backend twice + if, for example, a backend process first acquires a read and then + acquires a write. + +holders - + Keeps a count of how many locks of each type have been attempted. Only + elements 1 through MAX_LOCK_TYPES are used as they correspond to the lock + type defined constants (WRITE through EXTEND). Summing the values of + holders should come out equal to nHolding. + +nActive - + Keeps a count of how many times this lock has been succesfully acquired. + This count does not include attempts that were rejected due to conflicts, + but can count the same backend twice (e.g. a read then a write -- since + its the same transaction this won't cause a conflict) + +activeHolders - + Keeps a count of how locks of each type are currently held. Once again + only elements 1 through MAX_LOCK_TYPES are used (0 is not). Also, like + holders, summing the values of activeHolders should total to the value + of nActive. + + +This is all I had the stomach for right now..... I will get back to this +someday. -mer 17 June 1992 12:00 am diff --git a/src/backend/storage/lmgr/lmgr.c b/src/backend/storage/lmgr/lmgr.c new file mode 100644 index 00000000000..bfc2f5b2eec --- /dev/null +++ b/src/backend/storage/lmgr/lmgr.c @@ -0,0 +1,933 @@ +/*------------------------------------------------------------------------- + * + * lmgr.c-- + * POSTGRES lock manager code + * + * Copyright (c) 1994, Regents of the University of California + * + * + * IDENTIFICATION + * $Header: /cvsroot/pgsql/src/backend/storage/lmgr/lmgr.c,v 1.1.1.1 1996/07/09 06:21:56 scrappy Exp $ + * + *------------------------------------------------------------------------- + */ +/* #define LOCKDEBUGALL 1 */ +/* #define LOCKDEBUG 1 */ + +#ifdef LOCKDEBUGALL +#define LOCKDEBUG 1 +#endif /* LOCKDEBUGALL */ + +#include "postgres.h" + +#include "access/heapam.h" +#include "access/htup.h" +#include "access/relscan.h" +#include "access/skey.h" +#include "utils/tqual.h" +#include "access/xact.h" + +#include "storage/block.h" +#include "storage/buf.h" +#include "storage/itemptr.h" +#include "storage/bufpage.h" +#include "storage/multilev.h" +#include "storage/lmgr.h" + +#include "utils/elog.h" +#include "utils/palloc.h" +#include "utils/rel.h" + +#include "catalog/catname.h" +#include "catalog/catalog.h" +#include "catalog/pg_class.h" + +#include "nodes/memnodes.h" +#include "storage/bufmgr.h" +#include "access/transam.h" /* for AmiTransactionId */ + +/* ---------------- + * + * ---------------- + */ +#define MaxRetries 4 /* XXX about 1/4 minute--a hack */ + +#define IntentReadRelationLock 0x0100 +#define ReadRelationLock 0x0200 +#define IntentWriteRelationLock 0x0400 +#define WriteRelationLock 0x0800 +#define IntentReadPageLock 0x1000 +#define ReadTupleLock 0x2000 + +#define TupleLevelLockCountMask 0x000f + +#define TupleLevelLockLimit 10 + +extern Oid MyDatabaseId; + +static LRelId VariableRelationLRelId = { + RelOid_pg_variable, + InvalidOid +}; + +/* ---------------- + * RelationGetLRelId + * ---------------- + */ +#ifdef LOCKDEBUG +#define LOCKDEBUG_10 \ +elog(NOTICE, "RelationGetLRelId(%s) invalid lockInfo", \ + RelationGetRelationName(relation)); +#else +#define LOCKDEBUG_10 +#endif /* LOCKDEBUG */ + +/* + * RelationGetLRelId -- + * Returns "lock" relation identifier for a relation. + */ +LRelId +RelationGetLRelId(Relation relation) +{ + LockInfo linfo; + + /* ---------------- + * sanity checks + * ---------------- + */ + Assert(RelationIsValid(relation)); + linfo = (LockInfo) relation->lockInfo; + + /* ---------------- + * initialize lock info if necessary + * ---------------- + */ + if (! LockInfoIsValid(linfo)) { + LOCKDEBUG_10; + RelationInitLockInfo(relation); + linfo = (LockInfo) relation->lockInfo; + } + + /* ---------------- + * XXX hack to prevent problems during + * VARIABLE relation initialization + * ---------------- + */ + if (strcmp(RelationGetRelationName(relation)->data, + VariableRelationName) == 0) { + return (VariableRelationLRelId); + } + + return (linfo->lRelId); +} + +/* + * LRelIdGetDatabaseId -- + * Returns database identifier for a "lock" relation identifier. + */ +/* ---------------- + * LRelIdGetDatabaseId + * + * Note: The argument may not be correct, if it is not used soon + * after it is created. + * ---------------- + */ +Oid +LRelIdGetDatabaseId(LRelId lRelId) +{ + return (lRelId.dbId); +} + + +/* + * LRelIdGetRelationId -- + * Returns relation identifier for a "lock" relation identifier. + */ +Oid +LRelIdGetRelationId(LRelId lRelId) +{ + return (lRelId.relId); +} + +/* + * DatabaseIdIsMyDatabaseId -- + * True iff database object identifier is valid in my present database. + */ +bool +DatabaseIdIsMyDatabaseId(Oid databaseId) +{ + return (bool) + (!OidIsValid(databaseId) || databaseId == MyDatabaseId); +} + +/* + * LRelIdContainsMyDatabaseId -- + * True iff "lock" relation identifier is valid in my present database. + */ +bool +LRelIdContainsMyDatabaseId(LRelId lRelId) +{ + return (bool) + (!OidIsValid(lRelId.dbId) || lRelId.dbId == MyDatabaseId); +} + +/* + * RelationInitLockInfo -- + * Initializes the lock information in a relation descriptor. + */ +/* ---------------- + * RelationInitLockInfo + * + * XXX processingVariable is a hack to prevent problems during + * VARIABLE relation initialization. + * ---------------- + */ +void +RelationInitLockInfo(Relation relation) +{ + LockInfo info; + char *relname; + Oid relationid; + bool processingVariable; + extern Oid MyDatabaseId; /* XXX use include */ + extern GlobalMemory CacheCxt; + + /* ---------------- + * sanity checks + * ---------------- + */ + Assert(RelationIsValid(relation)); + Assert(OidIsValid(RelationGetRelationId(relation))); + + /* ---------------- + * get information from relation descriptor + * ---------------- + */ + info = (LockInfo) relation->lockInfo; + relname = (char *) RelationGetRelationName(relation); + relationid = RelationGetRelationId(relation); + processingVariable = (strcmp(relname, VariableRelationName) == 0); + + /* ---------------- + * create a new lockinfo if not already done + * ---------------- + */ + if (! PointerIsValid(info)) + { + MemoryContext oldcxt; + + oldcxt = MemoryContextSwitchTo((MemoryContext)CacheCxt); + info = (LockInfo)palloc(sizeof(LockInfoData)); + MemoryContextSwitchTo(oldcxt); + } + else if (processingVariable) { + if (IsTransactionState()) { + TransactionIdStore(GetCurrentTransactionId(), + &info->transactionIdData); + } + info->flags = 0x0; + return; /* prevent an infinite loop--still true? */ + } + else if (info->initialized) + { + /* ------------ + * If we've already initialized we're done. + * ------------ + */ + return; + } + + /* ---------------- + * initialize lockinfo.dbId and .relId appropriately + * ---------------- + */ + if (IsSharedSystemRelationName(relname)) + LRelIdAssign(&info->lRelId, InvalidOid, relationid); + else + LRelIdAssign(&info->lRelId, MyDatabaseId, relationid); + + /* ---------------- + * store the transaction id in the lockInfo field + * ---------------- + */ + if (processingVariable) + TransactionIdStore(AmiTransactionId, + &info->transactionIdData); + else if (IsTransactionState()) + TransactionIdStore(GetCurrentTransactionId(), + &info->transactionIdData); + else + StoreInvalidTransactionId(&(info->transactionIdData)); + + /* ---------------- + * initialize rest of lockinfo + * ---------------- + */ + info->flags = 0x0; + info->initialized = (bool)true; + relation->lockInfo = (Pointer) info; +} + +/* ---------------- + * RelationDiscardLockInfo + * ---------------- + */ +#ifdef LOCKDEBUG +#define LOCKDEBUG_20 \ +elog(DEBUG, "DiscardLockInfo: NULL relation->lockInfo") +#else +#define LOCKDEBUG_20 +#endif /* LOCKDEBUG */ + +/* + * RelationDiscardLockInfo -- + * Discards the lock information in a relation descriptor. + */ +void +RelationDiscardLockInfo(Relation relation) +{ + if (! LockInfoIsValid(relation->lockInfo)) { + LOCKDEBUG_20; + return; + } + + pfree(relation->lockInfo); + relation->lockInfo = NULL; +} + +/* + * RelationSetLockForDescriptorOpen -- + * Sets read locks for a relation descriptor. + */ +#ifdef LOCKDEBUGALL +#define LOCKDEBUGALL_30 \ +elog(DEBUG, "RelationSetLockForDescriptorOpen(%s[%d,%d]) called", \ + RelationGetRelationName(relation), lRelId.dbId, lRelId.relId) +#else +#define LOCKDEBUGALL_30 +#endif /* LOCKDEBUGALL*/ + +void +RelationSetLockForDescriptorOpen(Relation relation) +{ + /* ---------------- + * sanity checks + * ---------------- + */ + Assert(RelationIsValid(relation)); + if (LockingDisabled()) + return; + + LOCKDEBUGALL_30; + + /* ---------------- + * read lock catalog tuples which compose the relation descriptor + * XXX race condition? XXX For now, do nothing. + * ---------------- + */ +} + +/* ---------------- + * RelationSetLockForRead + * ---------------- + */ +#ifdef LOCKDEBUG +#define LOCKDEBUG_40 \ +elog(DEBUG, "RelationSetLockForRead(%s[%d,%d]) called", \ + RelationGetRelationName(relation), lRelId.dbId, lRelId.relId) +#else +#define LOCKDEBUG_40 +#endif /* LOCKDEBUG*/ + +/* + * RelationSetLockForRead -- + * Sets relation level read lock. + */ +void +RelationSetLockForRead(Relation relation) +{ + LockInfo linfo; + + /* ---------------- + * sanity checks + * ---------------- + */ + Assert(RelationIsValid(relation)); + if (LockingDisabled()) + return; + + LOCKDEBUG_40; + + /* ---------------- + * If we don't have lock info on the reln just go ahead and + * lock it without trying to short circuit the lock manager. + * ---------------- + */ + if (!LockInfoIsValid(relation->lockInfo)) + { + RelationInitLockInfo(relation); + linfo = (LockInfo) relation->lockInfo; + linfo->flags |= ReadRelationLock; + MultiLockReln(linfo, READ_LOCK); + return; + } + else + linfo = (LockInfo) relation->lockInfo; + + MultiLockReln(linfo, READ_LOCK); +} + +/* ---------------- + * RelationUnsetLockForRead + * ---------------- + */ +#ifdef LOCKDEBUG +#define LOCKDEBUG_50 \ +elog(DEBUG, "RelationUnsetLockForRead(%s[%d,%d]) called", \ + RelationGetRelationName(relation), lRelId.dbId, lRelId.relId) +#else +#define LOCKDEBUG_50 +#endif /* LOCKDEBUG*/ + +/* + * RelationUnsetLockForRead -- + * Unsets relation level read lock. + */ +void +RelationUnsetLockForRead(Relation relation) +{ + LockInfo linfo; + + /* ---------------- + * sanity check + * ---------------- + */ + Assert(RelationIsValid(relation)); + if (LockingDisabled()) + return; + + linfo = (LockInfo) relation->lockInfo; + + /* ---------------- + * If we don't have lock info on the reln just go ahead and + * release it. + * ---------------- + */ + if (!LockInfoIsValid(linfo)) + { + elog(WARN, + "Releasing a lock on %s with invalid lock information", + RelationGetRelationName(relation)); + } + + MultiReleaseReln(linfo, READ_LOCK); +} + +/* ---------------- + * RelationSetLockForWrite(relation) + * ---------------- + */ +#ifdef LOCKDEBUG +#define LOCKDEBUG_60 \ +elog(DEBUG, "RelationSetLockForWrite(%s[%d,%d]) called", \ + RelationGetRelationName(relation), lRelId.dbId, lRelId.relId) +#else +#define LOCKDEBUG_60 +#endif /* LOCKDEBUG*/ + +/* + * RelationSetLockForWrite -- + * Sets relation level write lock. + */ +void +RelationSetLockForWrite(Relation relation) +{ + LockInfo linfo; + + /* ---------------- + * sanity checks + * ---------------- + */ + Assert(RelationIsValid(relation)); + if (LockingDisabled()) + return; + + LOCKDEBUG_60; + + /* ---------------- + * If we don't have lock info on the reln just go ahead and + * lock it without trying to short circuit the lock manager. + * ---------------- + */ + if (!LockInfoIsValid(relation->lockInfo)) + { + RelationInitLockInfo(relation); + linfo = (LockInfo) relation->lockInfo; + linfo->flags |= WriteRelationLock; + MultiLockReln(linfo, WRITE_LOCK); + return; + } + else + linfo = (LockInfo) relation->lockInfo; + + MultiLockReln(linfo, WRITE_LOCK); +} + +/* ---------------- + * RelationUnsetLockForWrite + * ---------------- + */ +#ifdef LOCKDEBUG +#define LOCKDEBUG_70 \ +elog(DEBUG, "RelationUnsetLockForWrite(%s[%d,%d]) called", \ + RelationGetRelationName(relation), lRelId.dbId, lRelId.relId); +#else +#define LOCKDEBUG_70 +#endif /* LOCKDEBUG */ + +/* + * RelationUnsetLockForWrite -- + * Unsets relation level write lock. + */ +void +RelationUnsetLockForWrite(Relation relation) +{ + LockInfo linfo; + + /* ---------------- + * sanity checks + * ---------------- + */ + Assert(RelationIsValid(relation)); + if (LockingDisabled()) { + return; + } + + linfo = (LockInfo) relation->lockInfo; + + if (!LockInfoIsValid(linfo)) + { + elog(WARN, + "Releasing a lock on %s with invalid lock information", + RelationGetRelationName(relation)); + } + + MultiReleaseReln(linfo, WRITE_LOCK); +} + +/* ---------------- + * RelationSetLockForTupleRead + * ---------------- + */ +#ifdef LOCKDEBUG +#define LOCKDEBUG_80 \ +elog(DEBUG, "RelationSetLockForTupleRead(%s[%d,%d], 0x%x) called", \ + RelationGetRelationName(relation), lRelId.dbId, lRelId.relId, \ + itemPointer) +#define LOCKDEBUG_81 \ + elog(DEBUG, "RelationSetLockForTupleRead() escalating"); +#else +#define LOCKDEBUG_80 +#define LOCKDEBUG_81 +#endif /* LOCKDEBUG */ + +/* + * RelationSetLockForTupleRead -- + * Sets tuple level read lock. + */ +void +RelationSetLockForTupleRead(Relation relation, ItemPointer itemPointer) +{ + LockInfo linfo; + TransactionId curXact; + + /* ---------------- + * sanity checks + * ---------------- + */ + Assert(RelationIsValid(relation)); + if (LockingDisabled()) + return; + + LOCKDEBUG_80; + + /* --------------------- + * If our lock info is invalid don't bother trying to short circuit + * the lock manager. + * --------------------- + */ + if (!LockInfoIsValid(relation->lockInfo)) + { + RelationInitLockInfo(relation); + linfo = (LockInfo) relation->lockInfo; + linfo->flags |= + IntentReadRelationLock | + IntentReadPageLock | + ReadTupleLock; + MultiLockTuple(linfo, itemPointer, READ_LOCK); + return; + } + else + linfo = (LockInfo) relation->lockInfo; + + /* ---------------- + * no need to set a lower granularity lock + * ---------------- + */ + curXact = GetCurrentTransactionId(); + if ((linfo->flags & ReadRelationLock) && + TransactionIdEquals(curXact, linfo->transactionIdData)) + { + return; + } + + /* ---------------- + * If we don't already have a tuple lock this transaction + * ---------------- + */ + if (!( (linfo->flags & ReadTupleLock) && + TransactionIdEquals(curXact, linfo->transactionIdData) )) { + + linfo->flags |= + IntentReadRelationLock | + IntentReadPageLock | + ReadTupleLock; + + /* clear count */ + linfo->flags &= ~TupleLevelLockCountMask; + + } else { + if (TupleLevelLockLimit == (TupleLevelLockCountMask & + linfo->flags)) { + LOCKDEBUG_81; + + /* escalate */ + MultiLockReln(linfo, READ_LOCK); + + /* clear count */ + linfo->flags &= ~TupleLevelLockCountMask; + return; + } + + /* increment count */ + linfo->flags = + (linfo->flags & ~TupleLevelLockCountMask) | + (1 + (TupleLevelLockCountMask & linfo->flags)); + } + + TransactionIdStore(curXact, &linfo->transactionIdData); + + /* ---------------- + * Lock the tuple. + * ---------------- + */ + MultiLockTuple(linfo, itemPointer, READ_LOCK); +} + +/* ---------------- + * RelationSetLockForReadPage + * ---------------- + */ +#ifdef LOCKDEBUG +#define LOCKDEBUG_90 \ +elog(DEBUG, "RelationSetLockForReadPage(%s[%d,%d], @%d) called", \ + RelationGetRelationName(relation), lRelId.dbId, lRelId.relId, page); +#else +#define LOCKDEBUG_90 +#endif /* LOCKDEBUG*/ + +/* ---------------- + * RelationSetLockForWritePage + * ---------------- + */ +#ifdef LOCKDEBUG +#define LOCKDEBUG_100 \ +elog(DEBUG, "RelationSetLockForWritePage(%s[%d,%d], @%d) called", \ + RelationGetRelationName(relation), lRelId.dbId, lRelId.relId, page); +#else +#define LOCKDEBUG_100 +#endif /* LOCKDEBUG */ + +/* + * RelationSetLockForWritePage -- + * Sets write lock on a page. + */ +void +RelationSetLockForWritePage(Relation relation, + ItemPointer itemPointer) +{ + /* ---------------- + * sanity checks + * ---------------- + */ + Assert(RelationIsValid(relation)); + if (LockingDisabled()) + return; + + /* --------------- + * Make sure linfo is initialized + * --------------- + */ + if (!LockInfoIsValid(relation->lockInfo)) + RelationInitLockInfo(relation); + + /* ---------------- + * attempt to set lock + * ---------------- + */ + MultiLockPage((LockInfo) relation->lockInfo, itemPointer, WRITE_LOCK); +} + +/* ---------------- + * RelationUnsetLockForReadPage + * ---------------- + */ +#ifdef LOCKDEBUG +#define LOCKDEBUG_110 \ +elog(DEBUG, "RelationUnsetLockForReadPage(%s[%d,%d], @%d) called", \ + RelationGetRelationName(relation), lRelId.dbId, lRelId.relId, page) +#else +#define LOCKDEBUG_110 +#endif /* LOCKDEBUG */ + +/* ---------------- + * RelationUnsetLockForWritePage + * ---------------- + */ +#ifdef LOCKDEBUG +#define LOCKDEBUG_120 \ +elog(DEBUG, "RelationUnsetLockForWritePage(%s[%d,%d], @%d) called", \ + RelationGetRelationName(relation), lRelId.dbId, lRelId.relId, page) +#else +#define LOCKDEBUG_120 +#endif /* LOCKDEBUG */ + +/* + * Set a single level write page lock. Assumes that you already + * have a write intent lock on the relation. + */ +void +RelationSetSingleWLockPage(Relation relation, + ItemPointer itemPointer) +{ + + /* ---------------- + * sanity checks + * ---------------- + */ + Assert(RelationIsValid(relation)); + if (LockingDisabled()) + return; + + if (!LockInfoIsValid(relation->lockInfo)) + RelationInitLockInfo(relation); + + SingleLockPage((LockInfo)relation->lockInfo, itemPointer, WRITE_LOCK, !UNLOCK); +} + +/* + * Unset a single level write page lock + */ +void +RelationUnsetSingleWLockPage(Relation relation, + ItemPointer itemPointer) +{ + + /* ---------------- + * sanity checks + * ---------------- + */ + Assert(RelationIsValid(relation)); + if (LockingDisabled()) + return; + + if (!LockInfoIsValid(relation->lockInfo)) + elog(WARN, + "Releasing a lock on %s with invalid lock information", + RelationGetRelationName(relation)); + + SingleLockPage((LockInfo)relation->lockInfo, itemPointer, WRITE_LOCK, UNLOCK); +} + +/* + * Set a single level read page lock. Assumes you already have a read + * intent lock set on the relation. + */ +void +RelationSetSingleRLockPage(Relation relation, + ItemPointer itemPointer) +{ + + /* ---------------- + * sanity checks + * ---------------- + */ + Assert(RelationIsValid(relation)); + if (LockingDisabled()) + return; + + if (!LockInfoIsValid(relation->lockInfo)) + RelationInitLockInfo(relation); + + SingleLockPage((LockInfo)relation->lockInfo, itemPointer, READ_LOCK, !UNLOCK); +} + +/* + * Unset a single level read page lock. + */ +void +RelationUnsetSingleRLockPage(Relation relation, + ItemPointer itemPointer) +{ + + /* ---------------- + * sanity checks + * ---------------- + */ + Assert(RelationIsValid(relation)); + if (LockingDisabled()) + return; + + if (!LockInfoIsValid(relation->lockInfo)) + elog(WARN, + "Releasing a lock on %s with invalid lock information", + RelationGetRelationName(relation)); + + SingleLockPage((LockInfo)relation->lockInfo, itemPointer, READ_LOCK, UNLOCK); +} + +/* + * Set a read intent lock on a relation. + * + * Usually these are set in a multi-level table when you acquiring a + * page level lock. i.e. To acquire a lock on a page you first acquire + * an intent lock on the entire relation. Acquiring an intent lock along + * allows one to use the single level locking routines later. Good for + * index scans that do a lot of page level locking. + */ +void +RelationSetRIntentLock(Relation relation) +{ + /* ----------------- + * Sanity check + * ----------------- + */ + Assert(RelationIsValid(relation)); + if (LockingDisabled()) + return; + + if (!LockInfoIsValid(relation->lockInfo)) + RelationInitLockInfo(relation); + + SingleLockReln((LockInfo)relation->lockInfo, READ_LOCK+INTENT, !UNLOCK); +} + +/* + * Unset a read intent lock on a relation + */ +void +RelationUnsetRIntentLock(Relation relation) +{ + /* ----------------- + * Sanity check + * ----------------- + */ + Assert(RelationIsValid(relation)); + if (LockingDisabled()) + return; + + if (!LockInfoIsValid(relation->lockInfo)) + RelationInitLockInfo(relation); + + SingleLockReln((LockInfo)relation->lockInfo, READ_LOCK+INTENT, UNLOCK); +} + +/* + * Set a write intent lock on a relation. For a more complete explanation + * see RelationSetRIntentLock() + */ +void +RelationSetWIntentLock(Relation relation) +{ + /* ----------------- + * Sanity check + * ----------------- + */ + Assert(RelationIsValid(relation)); + if (LockingDisabled()) + return; + + if (!LockInfoIsValid(relation->lockInfo)) + RelationInitLockInfo(relation); + + SingleLockReln((LockInfo)relation->lockInfo, WRITE_LOCK+INTENT, !UNLOCK); +} + +/* + * Unset a write intent lock. + */ +void +RelationUnsetWIntentLock(Relation relation) +{ + /* ----------------- + * Sanity check + * ----------------- + */ + Assert(RelationIsValid(relation)); + if (LockingDisabled()) + return; + + if (!LockInfoIsValid(relation->lockInfo)) + RelationInitLockInfo(relation); + + SingleLockReln((LockInfo)relation->lockInfo, WRITE_LOCK+INTENT, UNLOCK); +} + +/* + * Extend locks are used primarily in tertiary storage devices such as + * a WORM disk jukebox. Sometimes need exclusive access to extend a + * file by a block. + */ +void +RelationSetLockForExtend(Relation relation) +{ + /* ----------------- + * Sanity check + * ----------------- + */ + Assert(RelationIsValid(relation)); + if (LockingDisabled()) + return; + + if (!LockInfoIsValid(relation->lockInfo)) + RelationInitLockInfo(relation); + + MultiLockReln((LockInfo) relation->lockInfo, EXTEND_LOCK); +} + +void +RelationUnsetLockForExtend(Relation relation) +{ + /* ----------------- + * Sanity check + * ----------------- + */ + Assert(RelationIsValid(relation)); + if (LockingDisabled()) + return; + + if (!LockInfoIsValid(relation->lockInfo)) + RelationInitLockInfo(relation); + + MultiReleaseReln((LockInfo) relation->lockInfo, EXTEND_LOCK); +} + +/* + * Create an LRelid --- Why not just pass in a pointer to the storage? + */ +void +LRelIdAssign(LRelId *lRelId, Oid dbId, Oid relId) +{ + lRelId->dbId = dbId; + lRelId->relId = relId; +} diff --git a/src/backend/storage/lmgr/lock.c b/src/backend/storage/lmgr/lock.c new file mode 100644 index 00000000000..8df898a0068 --- /dev/null +++ b/src/backend/storage/lmgr/lock.c @@ -0,0 +1,1020 @@ +/*------------------------------------------------------------------------- + * + * lock.c-- + * simple lock acquisition + * + * Copyright (c) 1994, Regents of the University of California + * + * + * IDENTIFICATION + * $Header: /cvsroot/pgsql/src/backend/storage/lmgr/lock.c,v 1.1.1.1 1996/07/09 06:21:56 scrappy Exp $ + * + * NOTES + * Outside modules can create a lock table and acquire/release + * locks. A lock table is a shared memory hash table. When + * a process tries to acquire a lock of a type that conflicts + * with existing locks, it is put to sleep using the routines + * in storage/lmgr/proc.c. + * + * Interface: + * + * LockAcquire(), LockRelease(), LockTabInit(). + * + * LockReplace() is called only within this module and by the + * lkchain module. It releases a lock without looking + * the lock up in the lock table. + * + * NOTE: This module is used to define new lock tables. The + * multi-level lock table (multi.c) used by the heap + * access methods calls these routines. See multi.c for + * examples showing how to use this interface. + * + *------------------------------------------------------------------------- + */ +#include <stdio.h> /* for sprintf() */ +#include "storage/shmem.h" +#include "storage/spin.h" +#include "storage/proc.h" +#include "storage/lock.h" +#include "utils/hsearch.h" +#include "utils/elog.h" +#include "utils/palloc.h" +#include "access/xact.h" + +/*#define LOCK_MGR_DEBUG*/ + +#ifndef LOCK_MGR_DEBUG + +#define LOCK_PRINT(where,tag,type) +#define LOCK_DUMP(where,lock,type) +#define XID_PRINT(where,xidentP) + +#else /* LOCK_MGR_DEBUG */ + +#define LOCK_PRINT(where,tag,type)\ + elog(NOTICE, "%s: rel (%d) dbid (%d) tid (%d,%d) type (%d)\n",where, \ + tag->relId, tag->dbId, \ + ( (tag->tupleId.ip_blkid.data[0] >= 0) ? \ + BlockIdGetBlockNumber(&tag->tupleId.ip_blkid) : -1 ), \ + tag->tupleId.ip_posid, \ + type); + +#define LOCK_DUMP(where,lock,type)\ + elog(NOTICE, "%s: rel (%d) dbid (%d) tid (%d,%d) nHolding (%d) holders (%d,%d,%d,%d,%d) type (%d)\n",where, \ + lock->tag.relId, lock->tag.dbId, \ + ((lock->tag.tupleId.ip_blkid.data[0] >= 0) ? \ + BlockIdGetBlockNumber(&lock->tag.tupleId.ip_blkid) : -1 ), \ + lock->tag.tupleId.ip_posid, \ + lock->nHolding,\ + lock->holders[1],\ + lock->holders[2],\ + lock->holders[3],\ + lock->holders[4],\ + lock->holders[5],\ + type); + +#define XID_PRINT(where,xidentP)\ + elog(NOTICE,\ + "%s:xid (%d) pid (%d) lock (%x) nHolding (%d) holders (%d,%d,%d,%d,%d)",\ + where,\ + xidentP->tag.xid,\ + xidentP->tag.pid,\ + xidentP->tag.lock,\ + xidentP->nHolding,\ + xidentP->holders[1],\ + xidentP->holders[2],\ + xidentP->holders[3],\ + xidentP->holders[4],\ + xidentP->holders[5]); + +#endif /* LOCK_MGR_DEBUG */ + +SPINLOCK LockMgrLock; /* in Shmem or created in CreateSpinlocks() */ + +/* This is to simplify/speed up some bit arithmetic */ + +static MASK BITS_OFF[MAX_LOCKTYPES]; +static MASK BITS_ON[MAX_LOCKTYPES]; + +/* ----------------- + * XXX Want to move this to this file + * ----------------- + */ +static bool LockingIsDisabled; + +/* ------------------ + * from storage/ipc/shmem.c + * ------------------ + */ +extern HTAB *ShmemInitHash(); + +/* ------------------- + * map from tableId to the lock table structure + * ------------------- + */ +static LOCKTAB *AllTables[MAX_TABLES]; + +/* ------------------- + * no zero-th table + * ------------------- + */ +static int NumTables = 1; + +/* ------------------- + * InitLocks -- Init the lock module. Create a private data + * structure for constructing conflict masks. + * ------------------- + */ +void +InitLocks() +{ + int i; + int bit; + + bit = 1; + /* ------------------- + * remember 0th locktype is invalid + * ------------------- + */ + for (i=0;i<MAX_LOCKTYPES;i++,bit <<= 1) + { + BITS_ON[i] = bit; + BITS_OFF[i] = ~bit; + } +} + +/* ------------------- + * LockDisable -- sets LockingIsDisabled flag to TRUE or FALSE. + * ------------------ + */ +void +LockDisable(int status) +{ + LockingIsDisabled = status; +} + + +/* + * LockTypeInit -- initialize the lock table's lock type + * structures + * + * Notes: just copying. Should only be called once. + */ +static void +LockTypeInit(LOCKTAB *ltable, + MASK *conflictsP, + int *prioP, + int ntypes) +{ + int i; + + ltable->ctl->nLockTypes = ntypes; + ntypes++; + for (i=0;i<ntypes;i++,prioP++,conflictsP++) + { + ltable->ctl->conflictTab[i] = *conflictsP; + ltable->ctl->prio[i] = *prioP; + } +} + +/* + * LockTabInit -- initialize a lock table structure + * + * Notes: + * (a) a lock table has four separate entries in the binding + * table. This is because every shared hash table and spinlock + * has its name stored in the binding table at its creation. It + * is wasteful, in this case, but not much space is involved. + * + */ +LockTableId +LockTabInit(char *tabName, + MASK *conflictsP, + int *prioP, + int ntypes) +{ + LOCKTAB *ltable; + char *shmemName; + HASHCTL info; + int hash_flags; + bool found; + int status = TRUE; + + if (ntypes > MAX_LOCKTYPES) + { + elog(NOTICE,"LockTabInit: too many lock types %d greater than %d", + ntypes,MAX_LOCKTYPES); + return(INVALID_TABLEID); + } + + if (NumTables > MAX_TABLES) + { + elog(NOTICE, + "LockTabInit: system limit of MAX_TABLES (%d) lock tables", + MAX_TABLES); + return(INVALID_TABLEID); + } + + /* allocate a string for the binding table lookup */ + shmemName = (char *) palloc((unsigned)(strlen(tabName)+32)); + if (! shmemName) + { + elog(NOTICE,"LockTabInit: couldn't malloc string %s \n",tabName); + return(INVALID_TABLEID); + } + + /* each lock table has a non-shared header */ + ltable = (LOCKTAB *) palloc((unsigned) sizeof(LOCKTAB)); + if (! ltable) + { + elog(NOTICE,"LockTabInit: couldn't malloc lock table %s\n",tabName); + (void) pfree (shmemName); + return(INVALID_TABLEID); + } + + /* ------------------------ + * find/acquire the spinlock for the table + * ------------------------ + */ + SpinAcquire(LockMgrLock); + + + /* ----------------------- + * allocate a control structure from shared memory or attach to it + * if it already exists. + * ----------------------- + */ + sprintf(shmemName,"%s (ctl)",tabName); + ltable->ctl = (LOCKCTL *) + ShmemInitStruct(shmemName,(unsigned)sizeof(LOCKCTL),&found); + + if (! ltable->ctl) + { + elog(FATAL,"LockTabInit: couldn't initialize %s",tabName); + status = FALSE; + } + + /* ---------------- + * we're first - initialize + * ---------------- + */ + if (! found) + { + memset(ltable->ctl, 0, sizeof(LOCKCTL)); + ltable->ctl->masterLock = LockMgrLock; + ltable->ctl->tableId = NumTables; + } + + /* -------------------- + * other modules refer to the lock table by a tableId + * -------------------- + */ + AllTables[NumTables] = ltable; + NumTables++; + Assert(NumTables <= MAX_TABLES); + + /* ---------------------- + * allocate a hash table for the lock tags. This is used + * to find the different locks. + * ---------------------- + */ + info.keysize = sizeof(LOCKTAG); + info.datasize = sizeof(LOCK); + info.hash = tag_hash; + hash_flags = (HASH_ELEM | HASH_FUNCTION); + + sprintf(shmemName,"%s (lock hash)",tabName); + ltable->lockHash = (HTAB *) ShmemInitHash(shmemName, + INIT_TABLE_SIZE,MAX_TABLE_SIZE, + &info,hash_flags); + + Assert( ltable->lockHash->hash == tag_hash); + if (! ltable->lockHash) + { + elog(FATAL,"LockTabInit: couldn't initialize %s",tabName); + status = FALSE; + } + + /* ------------------------- + * allocate an xid table. When different transactions hold + * the same lock, additional information must be saved (locks per tx). + * ------------------------- + */ + info.keysize = XID_TAGSIZE; + info.datasize = sizeof(XIDLookupEnt); + info.hash = tag_hash; + hash_flags = (HASH_ELEM | HASH_FUNCTION); + + sprintf(shmemName,"%s (xid hash)",tabName); + ltable->xidHash = (HTAB *) ShmemInitHash(shmemName, + INIT_TABLE_SIZE,MAX_TABLE_SIZE, + &info,hash_flags); + + if (! ltable->xidHash) + { + elog(FATAL,"LockTabInit: couldn't initialize %s",tabName); + status = FALSE; + } + + /* init ctl data structures */ + LockTypeInit(ltable, conflictsP, prioP, ntypes); + + SpinRelease(LockMgrLock); + + (void) pfree (shmemName); + + if (status) + return(ltable->ctl->tableId); + else + return(INVALID_TABLEID); +} + +/* + * LockTabRename -- allocate another tableId to the same + * lock table. + * + * NOTES: Both the lock module and the lock chain (lchain.c) + * module use table id's to distinguish between different + * kinds of locks. Short term and long term locks look + * the same to the lock table, but are handled differently + * by the lock chain manager. This function allows the + * client to use different tableIds when acquiring/releasing + * short term and long term locks. + */ +LockTableId +LockTabRename(LockTableId tableId) +{ + LockTableId newTableId; + + if (NumTables >= MAX_TABLES) + { + return(INVALID_TABLEID); + } + if (AllTables[tableId] == INVALID_TABLEID) + { + return(INVALID_TABLEID); + } + + /* other modules refer to the lock table by a tableId */ + newTableId = NumTables; + NumTables++; + + AllTables[newTableId] = AllTables[tableId]; + return(newTableId); +} + +/* + * LockAcquire -- Check for lock conflicts, sleep if conflict found, + * set lock if/when no conflicts. + * + * Returns: TRUE if parameters are correct, FALSE otherwise. + * + * Side Effects: The lock is always acquired. No way to abort + * a lock acquisition other than aborting the transaction. + * Lock is recorded in the lkchain. + */ +bool +LockAcquire(LockTableId tableId, LOCKTAG *lockName, LOCKT lockt) +{ + XIDLookupEnt *result,item; + HTAB *xidTable; + bool found; + LOCK *lock = NULL; + SPINLOCK masterLock; + LOCKTAB *ltable; + int status; + TransactionId myXid; + + Assert (tableId < NumTables); + ltable = AllTables[tableId]; + if (!ltable) + { + elog(NOTICE,"LockAcquire: bad lock table %d",tableId); + return (FALSE); + } + + if (LockingIsDisabled) + { + return(TRUE); + } + + LOCK_PRINT("Acquire",lockName,lockt); + masterLock = ltable->ctl->masterLock; + + SpinAcquire(masterLock); + + Assert( ltable->lockHash->hash == tag_hash); + lock = (LOCK *)hash_search(ltable->lockHash,(Pointer)lockName,HASH_ENTER,&found); + + if (! lock) + { + SpinRelease(masterLock); + elog(FATAL,"LockAcquire: lock table %d is corrupted",tableId); + return(FALSE); + } + + /* -------------------- + * if there was nothing else there, complete initialization + * -------------------- + */ + if (! found) + { + lock->mask = 0; + ProcQueueInit(&(lock->waitProcs)); + memset((char *)lock->holders, 0, sizeof(int)*MAX_LOCKTYPES); + memset((char *)lock->activeHolders, 0, sizeof(int)*MAX_LOCKTYPES); + lock->nHolding = 0; + lock->nActive = 0; + + Assert(BlockIdEquals(&(lock->tag.tupleId.ip_blkid), + &(lockName->tupleId.ip_blkid))); + + } + + /* ------------------ + * add an element to the lock queue so that we can clear the + * locks at end of transaction. + * ------------------ + */ + xidTable = ltable->xidHash; + myXid = GetCurrentTransactionId(); + + /* ------------------ + * Zero out all of the tag bytes (this clears the padding bytes for long + * word alignment and ensures hashing consistency). + * ------------------ + */ + memset(&item, 0, XID_TAGSIZE); + TransactionIdStore(myXid, &item.tag.xid); + item.tag.lock = MAKE_OFFSET(lock); +#if 0 + item.tag.pid = MyPid; +#endif + + result = (XIDLookupEnt *)hash_search(xidTable, (Pointer)&item, HASH_ENTER, &found); + if (!result) + { + elog(NOTICE,"LockAcquire: xid table corrupted"); + return(STATUS_ERROR); + } + if (!found) + { + XID_PRINT("queueing XidEnt LockAcquire:", result); + ProcAddLock(&result->queue); + result->nHolding = 0; + memset((char *)result->holders, 0, sizeof(int)*MAX_LOCKTYPES); + } + + /* ---------------- + * lock->nholding tells us how many processes have _tried_ to + * acquire this lock, Regardless of whether they succeeded or + * failed in doing so. + * ---------------- + */ + lock->nHolding++; + lock->holders[lockt]++; + + /* -------------------- + * If I'm the only one holding a lock, then there + * cannot be a conflict. Need to subtract one from the + * lock's count since we just bumped the count up by 1 + * above. + * -------------------- + */ + if (result->nHolding == lock->nActive) + { + result->holders[lockt]++; + result->nHolding++; + GrantLock(lock, lockt); + SpinRelease(masterLock); + return(TRUE); + } + + Assert(result->nHolding <= lock->nActive); + + status = LockResolveConflicts(ltable, lock, lockt, myXid); + + if (status == STATUS_OK) + { + GrantLock(lock, lockt); + } + else if (status == STATUS_FOUND) + { + status = WaitOnLock(ltable, tableId, lock, lockt); + XID_PRINT("Someone granted me the lock", result); + } + + SpinRelease(masterLock); + + return(status == STATUS_OK); +} + +/* ---------------------------- + * LockResolveConflicts -- test for lock conflicts + * + * NOTES: + * Here's what makes this complicated: one transaction's + * locks don't conflict with one another. When many processes + * hold locks, each has to subtract off the other's locks when + * determining whether or not any new lock acquired conflicts with + * the old ones. + * + * For example, if I am already holding a WRITE_INTENT lock, + * there will not be a conflict with my own READ_LOCK. If I + * don't consider the intent lock when checking for conflicts, + * I find no conflict. + * ---------------------------- + */ +int +LockResolveConflicts(LOCKTAB *ltable, + LOCK *lock, + LOCKT lockt, + TransactionId xid) +{ + XIDLookupEnt *result,item; + int *myHolders; + int nLockTypes; + HTAB *xidTable; + bool found; + int bitmask; + int i,tmpMask; + + nLockTypes = ltable->ctl->nLockTypes; + xidTable = ltable->xidHash; + + /* --------------------- + * read my own statistics from the xid table. If there + * isn't an entry, then we'll just add one. + * + * Zero out the tag, this clears the padding bytes for long + * word alignment and ensures hashing consistency. + * ------------------ + */ + memset(&item, 0, XID_TAGSIZE); + TransactionIdStore(xid, &item.tag.xid); + item.tag.lock = MAKE_OFFSET(lock); +#if 0 + item.tag.pid = pid; +#endif + + if (! (result = (XIDLookupEnt *) + hash_search(xidTable, (Pointer)&item, HASH_ENTER, &found))) + { + elog(NOTICE,"LockResolveConflicts: xid table corrupted"); + return(STATUS_ERROR); + } + myHolders = result->holders; + + if (! found) + { + /* --------------- + * we're not holding any type of lock yet. Clear + * the lock stats. + * --------------- + */ + memset(result->holders, 0, nLockTypes * sizeof(*(lock->holders))); + result->nHolding = 0; + } + + /* ---------------------------- + * first check for global conflicts: If no locks conflict + * with mine, then I get the lock. + * + * Checking for conflict: lock->mask represents the types of + * currently held locks. conflictTable[lockt] has a bit + * set for each type of lock that conflicts with mine. Bitwise + * compare tells if there is a conflict. + * ---------------------------- + */ + if (! (ltable->ctl->conflictTab[lockt] & lock->mask)) + { + + result->holders[lockt]++; + result->nHolding++; + + XID_PRINT("Conflict Resolved: updated xid entry stats", result); + + return(STATUS_OK); + } + + /* ------------------------ + * Rats. Something conflicts. But it could still be my own + * lock. We have to construct a conflict mask + * that does not reflect our own locks. + * ------------------------ + */ + bitmask = 0; + tmpMask = 2; + for (i=1;i<=nLockTypes;i++, tmpMask <<= 1) + { + if (lock->activeHolders[i] - myHolders[i]) + { + bitmask |= tmpMask; + } + } + + /* ------------------------ + * now check again for conflicts. 'bitmask' describes the types + * of locks held by other processes. If one of these + * conflicts with the kind of lock that I want, there is a + * conflict and I have to sleep. + * ------------------------ + */ + if (! (ltable->ctl->conflictTab[lockt] & bitmask)) + { + + /* no conflict. Get the lock and go on */ + + result->holders[lockt]++; + result->nHolding++; + + XID_PRINT("Conflict Resolved: updated xid entry stats", result); + + return(STATUS_OK); + + } + + return(STATUS_FOUND); +} + +int +WaitOnLock(LOCKTAB *ltable, LockTableId tableId, LOCK *lock, LOCKT lockt) +{ + PROC_QUEUE *waitQueue = &(lock->waitProcs); + + int prio = ltable->ctl->prio[lockt]; + + /* the waitqueue is ordered by priority. I insert myself + * according to the priority of the lock I am acquiring. + * + * SYNC NOTE: I am assuming that the lock table spinlock + * is sufficient synchronization for this queue. That + * will not be true if/when people can be deleted from + * the queue by a SIGINT or something. + */ + LOCK_DUMP("WaitOnLock: sleeping on lock", lock, lockt); + if (ProcSleep(waitQueue, + ltable->ctl->masterLock, + lockt, + prio, + lock) != NO_ERROR) + { + /* ------------------- + * This could have happend as a result of a deadlock, see HandleDeadLock() + * Decrement the lock nHolding and holders fields as we are no longer + * waiting on this lock. + * ------------------- + */ + lock->nHolding--; + lock->holders[lockt]--; + LOCK_DUMP("WaitOnLock: aborting on lock", lock, lockt); + SpinRelease(ltable->ctl->masterLock); + elog(WARN,"WaitOnLock: error on wakeup - Aborting this transaction"); + } + + return(STATUS_OK); +} + +/* + * LockRelease -- look up 'lockName' in lock table 'tableId' and + * release it. + * + * Side Effects: if the lock no longer conflicts with the highest + * priority waiting process, that process is granted the lock + * and awoken. (We have to grant the lock here to avoid a + * race between the waking process and any new process to + * come along and request the lock). + */ +bool +LockRelease(LockTableId tableId, LOCKTAG *lockName, LOCKT lockt) +{ + LOCK *lock = NULL; + SPINLOCK masterLock; + bool found; + LOCKTAB *ltable; + XIDLookupEnt *result,item; + HTAB *xidTable; + bool wakeupNeeded = true; + + Assert (tableId < NumTables); + ltable = AllTables[tableId]; + if (!ltable) { + elog(NOTICE, "ltable is null in LockRelease"); + return (FALSE); + } + + if (LockingIsDisabled) + { + return(TRUE); + } + + LOCK_PRINT("Release",lockName,lockt); + + masterLock = ltable->ctl->masterLock; + xidTable = ltable->xidHash; + + SpinAcquire(masterLock); + + Assert( ltable->lockHash->hash == tag_hash); + lock = (LOCK *) + hash_search(ltable->lockHash,(Pointer)lockName,HASH_FIND_SAVE,&found); + + /* let the caller print its own error message, too. + * Do not elog(WARN). + */ + if (! lock) + { + SpinRelease(masterLock); + elog(NOTICE,"LockRelease: locktable corrupted"); + return(FALSE); + } + + if (! found) + { + SpinRelease(masterLock); + elog(NOTICE,"LockRelease: locktable lookup failed, no lock"); + return(FALSE); + } + + Assert(lock->nHolding > 0); + + /* + * fix the general lock stats + */ + lock->nHolding--; + lock->holders[lockt]--; + lock->nActive--; + lock->activeHolders[lockt]--; + + Assert(lock->nActive >= 0); + + if (! lock->nHolding) + { + /* ------------------ + * if there's no one waiting in the queue, + * we just released the last lock. + * Delete it from the lock table. + * ------------------ + */ + Assert( ltable->lockHash->hash == tag_hash); + lock = (LOCK *) hash_search(ltable->lockHash, + (Pointer) &(lock->tag), + HASH_REMOVE_SAVED, + &found); + Assert(lock && found); + wakeupNeeded = false; + } + + /* ------------------ + * Zero out all of the tag bytes (this clears the padding bytes for long + * word alignment and ensures hashing consistency). + * ------------------ + */ + memset(&item, 0, XID_TAGSIZE); + + TransactionIdStore(GetCurrentTransactionId(), &item.tag.xid); + item.tag.lock = MAKE_OFFSET(lock); +#if 0 + item.tag.pid = MyPid; +#endif + + if (! ( result = (XIDLookupEnt *) hash_search(xidTable, + (Pointer)&item, + HASH_FIND_SAVE, + &found) ) + || !found) + { + SpinRelease(masterLock); + elog(NOTICE,"LockReplace: xid table corrupted"); + return(FALSE); + } + /* + * now check to see if I have any private locks. If I do, + * decrement the counts associated with them. + */ + result->holders[lockt]--; + result->nHolding--; + + XID_PRINT("LockRelease updated xid stats", result); + + /* + * If this was my last hold on this lock, delete my entry + * in the XID table. + */ + if (! result->nHolding) + { + if (result->queue.next != INVALID_OFFSET) + SHMQueueDelete(&result->queue); + if (! (result = (XIDLookupEnt *) + hash_search(xidTable, (Pointer)&item, HASH_REMOVE_SAVED, &found)) || + ! found) + { + SpinRelease(masterLock); + elog(NOTICE,"LockReplace: xid table corrupted"); + return(FALSE); + } + } + + /* -------------------------- + * If there are still active locks of the type I just released, no one + * should be woken up. Whoever is asleep will still conflict + * with the remaining locks. + * -------------------------- + */ + if (! (lock->activeHolders[lockt])) + { + /* change the conflict mask. No more of this lock type. */ + lock->mask &= BITS_OFF[lockt]; + } + + if (wakeupNeeded) + { + /* -------------------------- + * Wake the first waiting process and grant him the lock if it + * doesn't conflict. The woken process must record the lock + * himself. + * -------------------------- + */ + (void) ProcLockWakeup(&(lock->waitProcs), (char *) ltable, (char *) lock); + } + + SpinRelease(masterLock); + return(TRUE); +} + +/* + * GrantLock -- udpate the lock data structure to show + * the new lock holder. + */ +void +GrantLock(LOCK *lock, LOCKT lockt) +{ + lock->nActive++; + lock->activeHolders[lockt]++; + lock->mask |= BITS_ON[lockt]; +} + +bool +LockReleaseAll(LockTableId tableId, SHM_QUEUE *lockQueue) +{ + PROC_QUEUE *waitQueue; + int done; + XIDLookupEnt *xidLook = NULL; + XIDLookupEnt *tmp = NULL; + SHMEM_OFFSET end = MAKE_OFFSET(lockQueue); + SPINLOCK masterLock; + LOCKTAB *ltable; + int i,nLockTypes; + LOCK *lock; + bool found; + + Assert (tableId < NumTables); + ltable = AllTables[tableId]; + if (!ltable) + return (FALSE); + + nLockTypes = ltable->ctl->nLockTypes; + masterLock = ltable->ctl->masterLock; + + if (SHMQueueEmpty(lockQueue)) + return TRUE; + + SHMQueueFirst(lockQueue,(Pointer*)&xidLook,&xidLook->queue); + + XID_PRINT("LockReleaseAll:", xidLook); + + SpinAcquire(masterLock); + for (;;) + { + /* --------------------------- + * XXX Here we assume the shared memory queue is circular and + * that we know its internal structure. Should have some sort of + * macros to allow one to walk it. mer 20 July 1991 + * --------------------------- + */ + done = (xidLook->queue.next == end); + lock = (LOCK *) MAKE_PTR(xidLook->tag.lock); + + LOCK_PRINT("ReleaseAll",(&lock->tag),0); + + /* ------------------ + * fix the general lock stats + * ------------------ + */ + if (lock->nHolding != xidLook->nHolding) + { + lock->nHolding -= xidLook->nHolding; + lock->nActive -= xidLook->nHolding; + Assert(lock->nActive >= 0); + for (i=1; i<=nLockTypes; i++) + { + lock->holders[i] -= xidLook->holders[i]; + lock->activeHolders[i] -= xidLook->holders[i]; + if (! lock->activeHolders[i]) + lock->mask &= BITS_OFF[i]; + } + } + else + { + /* -------------- + * set nHolding to zero so that we can garbage collect the lock + * down below... + * -------------- + */ + lock->nHolding = 0; + } + /* ---------------- + * always remove the xidLookup entry, we're done with it now + * ---------------- + */ + if ((! hash_search(ltable->xidHash, (Pointer)xidLook, HASH_REMOVE, &found)) + || !found) + { + SpinRelease(masterLock); + elog(NOTICE,"LockReplace: xid table corrupted"); + return(FALSE); + } + + if (! lock->nHolding) + { + /* -------------------- + * if there's no one waiting in the queue, we've just released + * the last lock. + * -------------------- + */ + + Assert( ltable->lockHash->hash == tag_hash); + lock = (LOCK *) + hash_search(ltable->lockHash,(Pointer)&(lock->tag),HASH_REMOVE, &found); + if ((! lock) || (!found)) + { + SpinRelease(masterLock); + elog(NOTICE,"LockReplace: cannot remove lock from HTAB"); + return(FALSE); + } + } + else + { + /* -------------------- + * Wake the first waiting process and grant him the lock if it + * doesn't conflict. The woken process must record the lock + * him/herself. + * -------------------- + */ + waitQueue = &(lock->waitProcs); + (void) ProcLockWakeup(waitQueue, (char *) ltable, (char *) lock); + } + + if (done) + break; + SHMQueueFirst(&xidLook->queue,(Pointer*)&tmp,&tmp->queue); + xidLook = tmp; + } + SpinRelease(masterLock); + SHMQueueInit(lockQueue); + return TRUE; +} + +int +LockShmemSize() +{ + int size = 0; + int nLockBuckets, nLockSegs; + int nXidBuckets, nXidSegs; + + nLockBuckets = 1 << (int)my_log2((NLOCKENTS - 1) / DEF_FFACTOR + 1); + nLockSegs = 1 << (int)my_log2((nLockBuckets - 1) / DEF_SEGSIZE + 1); + + nXidBuckets = 1 << (int)my_log2((NLOCKS_PER_XACT-1) / DEF_FFACTOR + 1); + nXidSegs = 1 << (int)my_log2((nLockBuckets - 1) / DEF_SEGSIZE + 1); + + size += MAXALIGN(NBACKENDS * sizeof(PROC)); /* each MyProc */ + size += MAXALIGN(NBACKENDS * sizeof(LOCKCTL)); /* each ltable->ctl */ + size += MAXALIGN(sizeof(PROC_HDR)); /* ProcGlobal */ + + size += MAXALIGN(my_log2(NLOCKENTS) * sizeof(void *)); + size += MAXALIGN(sizeof(HHDR)); + size += nLockSegs * MAXALIGN(DEF_SEGSIZE * sizeof(SEGMENT)); + size += NLOCKENTS * /* XXX not multiple of BUCKET_ALLOC_INCR? */ + (MAXALIGN(sizeof(BUCKET_INDEX)) + + MAXALIGN(sizeof(LOCK))); /* contains hash key */ + + size += MAXALIGN(my_log2(NBACKENDS) * sizeof(void *)); + size += MAXALIGN(sizeof(HHDR)); + size += nXidSegs * MAXALIGN(DEF_SEGSIZE * sizeof(SEGMENT)); + size += NBACKENDS * /* XXX not multiple of BUCKET_ALLOC_INCR? */ + (MAXALIGN(sizeof(BUCKET_INDEX)) + + MAXALIGN(sizeof(XIDLookupEnt))); /* contains hash key */ + + return size; +} + +/* ----------------- + * Boolean function to determine current locking status + * ----------------- + */ +bool +LockingDisabled() +{ + return LockingIsDisabled; +} diff --git a/src/backend/storage/lmgr/multi.c b/src/backend/storage/lmgr/multi.c new file mode 100644 index 00000000000..c1702d18cb8 --- /dev/null +++ b/src/backend/storage/lmgr/multi.c @@ -0,0 +1,415 @@ +/*------------------------------------------------------------------------- + * + * multi.c-- + * multi level lock table manager + * + * Standard multi-level lock manager as per the Gray paper + * (at least, that is what it is supposed to be). We implement + * three levels -- RELN, PAGE, TUPLE. Tuple is actually TID + * a physical record pointer. It isn't an object id. + * + * Copyright (c) 1994, Regents of the University of California + * + * + * IDENTIFICATION + * $Header: /cvsroot/pgsql/src/backend/storage/lmgr/Attic/multi.c,v 1.1.1.1 1996/07/09 06:21:56 scrappy Exp $ + * + * NOTES: + * (1) The lock.c module assumes that the caller here is doing + * two phase locking. + * + *------------------------------------------------------------------------- + */ +#include <stdio.h> +#include <string.h> +#include "storage/lmgr.h" +#include "storage/multilev.h" + +#include "utils/rel.h" +#include "utils/elog.h" +#include "miscadmin.h" /* MyDatabaseId */ + + +/* + * INTENT indicates to higher level that a lower level lock has been + * set. For example, a write lock on a tuple conflicts with a write + * lock on a relation. This conflict is detected as a WRITE_INTENT/ + * WRITE conflict between the tuple's intent lock and the relation's + * write lock. + */ +static int MultiConflicts[] = { + (int)NULL, + /* All reads and writes at any level conflict with a write lock */ + (1 << WRITE_LOCK)|(1 << WRITE_INTENT)|(1 << READ_LOCK)|(1 << READ_INTENT), + /* read locks conflict with write locks at curr and lower levels */ + (1 << WRITE_LOCK)| (1 << WRITE_INTENT), + /* write intent locks */ + (1 << READ_LOCK) | (1 << WRITE_LOCK), + /* read intent locks*/ + (1 << WRITE_LOCK), + /* extend locks for archive storage manager conflict only w/extend locks */ + (1 << EXTEND_LOCK) +}; + +/* + * write locks have higher priority than read locks and extend locks. May + * want to treat INTENT locks differently. + */ +static int MultiPrios[] = { + (int)NULL, + 2, + 1, + 2, + 1, + 1 +}; + +/* + * Lock table identifier for this lock table. The multi-level + * lock table is ONE lock table, not three. + */ +LockTableId MultiTableId = (LockTableId)NULL; +LockTableId ShortTermTableId = (LockTableId)NULL; + +/* + * Create the lock table described by MultiConflicts and Multiprio. + */ +LockTableId +InitMultiLevelLockm() +{ + int tableId; + + /* ----------------------- + * If we're already initialized just return the table id. + * ----------------------- + */ + if (MultiTableId) + return MultiTableId; + + tableId = LockTabInit("LockTable", MultiConflicts, MultiPrios, 5); + MultiTableId = tableId; + if (! (MultiTableId)) { + elog(WARN,"InitMultiLockm: couldnt initialize lock table"); + } + /* ----------------------- + * No short term lock table for now. -Jeff 15 July 1991 + * + * ShortTermTableId = LockTabRename(tableId); + * if (! (ShortTermTableId)) { + * elog(WARN,"InitMultiLockm: couldnt rename lock table"); + * } + * ----------------------- + */ + return MultiTableId; +} + +/* + * MultiLockReln -- lock a relation + * + * Returns: TRUE if the lock can be set, FALSE otherwise. + */ +bool +MultiLockReln(LockInfo linfo, LOCKT lockt) +{ + LOCKTAG tag; + + /* LOCKTAG has two bytes of padding, unfortunately. The + * hash function will return miss if the padding bytes aren't + * zero'd. + */ + memset(&tag,0,sizeof(tag)); + tag.relId = linfo->lRelId.relId; + tag.dbId = linfo->lRelId.dbId; + return(MultiAcquire(MultiTableId, &tag, lockt, RELN_LEVEL)); +} + +/* + * MultiLockTuple -- Lock the TID associated with a tuple + * + * Returns: TRUE if lock is set, FALSE otherwise. + * + * Side Effects: causes intention level locks to be set + * at the page and relation level. + */ +bool +MultiLockTuple(LockInfo linfo, ItemPointer tidPtr, LOCKT lockt) +{ + LOCKTAG tag; + + /* LOCKTAG has two bytes of padding, unfortunately. The + * hash function will return miss if the padding bytes aren't + * zero'd. + */ + memset(&tag,0,sizeof(tag)); + + tag.relId = linfo->lRelId.relId; + tag.dbId = linfo->lRelId.dbId; + + /* not locking any valid Tuple, just the page */ + tag.tupleId = *tidPtr; + return(MultiAcquire(MultiTableId, &tag, lockt, TUPLE_LEVEL)); +} + +/* + * same as above at page level + */ +bool +MultiLockPage(LockInfo linfo, ItemPointer tidPtr, LOCKT lockt) +{ + LOCKTAG tag; + + /* LOCKTAG has two bytes of padding, unfortunately. The + * hash function will return miss if the padding bytes aren't + * zero'd. + */ + memset(&tag,0,sizeof(tag)); + + + /* ---------------------------- + * Now we want to set the page offset to be invalid + * and lock the block. There is some confusion here as to what + * a page is. In Postgres a page is an 8k block, however this + * block may be partitioned into many subpages which are sometimes + * also called pages. The term is overloaded, so don't be fooled + * when we say lock the page we mean the 8k block. -Jeff 16 July 1991 + * ---------------------------- + */ + tag.relId = linfo->lRelId.relId; + tag.dbId = linfo->lRelId.dbId; + BlockIdCopy(&(tag.tupleId.ip_blkid), &(tidPtr->ip_blkid)); + return(MultiAcquire(MultiTableId, &tag, lockt, PAGE_LEVEL)); +} + +/* + * MultiAcquire -- acquire multi level lock at requested level + * + * Returns: TRUE if lock is set, FALSE if not + * Side Effects: + */ +bool +MultiAcquire(LockTableId tableId, + LOCKTAG *tag, + LOCKT lockt, + LOCK_LEVEL level) +{ + LOCKT locks[N_LEVELS]; + int i,status; + LOCKTAG xxTag, *tmpTag = &xxTag; + int retStatus = TRUE; + + /* + * Three levels implemented. If we set a low level (e.g. Tuple) + * lock, we must set INTENT locks on the higher levels. The + * intent lock detects conflicts between the low level lock + * and an existing high level lock. For example, setting a + * write lock on a tuple in a relation is disallowed if there + * is an existing read lock on the entire relation. The + * write lock would set a WRITE + INTENT lock on the relation + * and that lock would conflict with the read. + */ + switch (level) { + case RELN_LEVEL: + locks[0] = lockt; + locks[1] = NO_LOCK; + locks[2] = NO_LOCK; + break; + case PAGE_LEVEL: + locks[0] = lockt + INTENT; + locks[1] = lockt; + locks[2] = NO_LOCK; + break; + case TUPLE_LEVEL: + locks[0] = lockt + INTENT; + locks[1] = lockt + INTENT; + locks[2] = lockt; + break; + default: + elog(WARN,"MultiAcquire: bad lock level"); + return(FALSE); + } + + /* + * construct a new tag as we go. Always loop through all levels, + * but if we arent' seting a low level lock, locks[i] is set to + * NO_LOCK for the lower levels. Always start from the highest + * level and go to the lowest level. + */ + memset(tmpTag,0,sizeof(*tmpTag)); + tmpTag->relId = tag->relId; + tmpTag->dbId = tag->dbId; + + for (i=0;i<N_LEVELS;i++) { + if (locks[i] != NO_LOCK) { + switch (i) { + case RELN_LEVEL: + /* ------------- + * Set the block # and offset to invalid + * ------------- + */ + BlockIdSet(&(tmpTag->tupleId.ip_blkid), InvalidBlockNumber); + tmpTag->tupleId.ip_posid = InvalidOffsetNumber; + break; + case PAGE_LEVEL: + /* ------------- + * Copy the block #, set the offset to invalid + * ------------- + */ + BlockIdCopy(&(tmpTag->tupleId.ip_blkid), + &(tag->tupleId.ip_blkid)); + tmpTag->tupleId.ip_posid = InvalidOffsetNumber; + break; + case TUPLE_LEVEL: + /* -------------- + * Copy the entire tuple id. + * -------------- + */ + ItemPointerCopy(&tmpTag->tupleId, &tag->tupleId); + break; + } + + status = LockAcquire(tableId, tmpTag, locks[i]); + if (! status) { + /* failed for some reason. Before returning we have + * to release all of the locks we just acquired. + * MultiRelease(xx,xx,xx, i) means release starting from + * the last level lock we successfully acquired + */ + retStatus = FALSE; + (void) MultiRelease(tableId, tag, lockt, i); + /* now leave the loop. Don't try for any more locks */ + break; + } + } + } + return(retStatus); +} + +/* ------------------ + * Release a page in the multi-level lock table + * ------------------ + */ +bool +MultiReleasePage(LockInfo linfo, ItemPointer tidPtr, LOCKT lockt) +{ + LOCKTAG tag; + + /* ------------------ + * LOCKTAG has two bytes of padding, unfortunately. The + * hash function will return miss if the padding bytes aren't + * zero'd. + * ------------------ + */ + memset(&tag, 0,sizeof(LOCKTAG)); + + tag.relId = linfo->lRelId.relId; + tag.dbId = linfo->lRelId.dbId; + BlockIdCopy(&(tag.tupleId.ip_blkid), &(tidPtr->ip_blkid)); + + return (MultiRelease(MultiTableId, &tag, lockt, PAGE_LEVEL)); +} + +/* ------------------ + * Release a relation in the multi-level lock table + * ------------------ + */ +bool +MultiReleaseReln(LockInfo linfo, LOCKT lockt) +{ + LOCKTAG tag; + + /* ------------------ + * LOCKTAG has two bytes of padding, unfortunately. The + * hash function will return miss if the padding bytes aren't + * zero'd. + * ------------------ + */ + memset(&tag, 0, sizeof(LOCKTAG)); + tag.relId = linfo->lRelId.relId; + tag.dbId = linfo->lRelId.dbId; + + return (MultiRelease(MultiTableId, &tag, lockt, RELN_LEVEL)); +} + +/* + * MultiRelease -- release a multi-level lock + * + * Returns: TRUE if successful, FALSE otherwise. + */ +bool +MultiRelease(LockTableId tableId, + LOCKTAG *tag, + LOCKT lockt, + LOCK_LEVEL level) +{ + LOCKT locks[N_LEVELS]; + int i,status; + LOCKTAG xxTag, *tmpTag = &xxTag; + + /* + * same level scheme as MultiAcquire(). + */ + switch (level) { + case RELN_LEVEL: + locks[0] = lockt; + locks[1] = NO_LOCK; + locks[2] = NO_LOCK; + break; + case PAGE_LEVEL: + locks[0] = lockt + INTENT; + locks[1] = lockt; + locks[2] = NO_LOCK; + break; + case TUPLE_LEVEL: + locks[0] = lockt + INTENT; + locks[1] = lockt + INTENT; + locks[2] = lockt; + break; + default: + elog(WARN,"MultiRelease: bad lockt"); + } + + /* + * again, construct the tag on the fly. This time, however, + * we release the locks in the REVERSE order -- from lowest + * level to highest level. + * + * Must zero out the tag to set padding byes to zero and ensure + * hashing consistency. + */ + memset(tmpTag, 0, sizeof(*tmpTag)); + tmpTag->relId = tag->relId; + tmpTag->dbId = tag->dbId; + + for (i=(N_LEVELS-1); i>=0; i--) { + if (locks[i] != NO_LOCK) { + switch (i) { + case RELN_LEVEL: + /* ------------- + * Set the block # and offset to invalid + * ------------- + */ + BlockIdSet(&(tmpTag->tupleId.ip_blkid), InvalidBlockNumber); + tmpTag->tupleId.ip_posid = InvalidOffsetNumber; + break; + case PAGE_LEVEL: + /* ------------- + * Copy the block #, set the offset to invalid + * ------------- + */ + BlockIdCopy(&(tmpTag->tupleId.ip_blkid), + &(tag->tupleId.ip_blkid)); + tmpTag->tupleId.ip_posid = InvalidOffsetNumber; + break; + case TUPLE_LEVEL: + ItemPointerCopy(&tmpTag->tupleId, &tag->tupleId); + break; + } + status = LockRelease(tableId, tmpTag, locks[i]); + if (! status) { + elog(WARN,"MultiRelease: couldn't release after error"); + } + } + } + /* shouldn't reach here */ + return false; +} diff --git a/src/backend/storage/lmgr/proc.c b/src/backend/storage/lmgr/proc.c new file mode 100644 index 00000000000..0955cdfc2f5 --- /dev/null +++ b/src/backend/storage/lmgr/proc.c @@ -0,0 +1,826 @@ +/*------------------------------------------------------------------------- + * + * proc.c-- + * routines to manage per-process shared memory data structure + * + * Copyright (c) 1994, Regents of the University of California + * + * + * IDENTIFICATION + * $Header: /cvsroot/pgsql/src/backend/storage/lmgr/proc.c,v 1.1.1.1 1996/07/09 06:21:57 scrappy Exp $ + * + *------------------------------------------------------------------------- + */ +/* + * Each postgres backend gets one of these. We'll use it to + * clean up after the process should the process suddenly die. + * + * + * Interface (a): + * ProcSleep(), ProcWakeup(), ProcWakeupNext(), + * ProcQueueAlloc() -- create a shm queue for sleeping processes + * ProcQueueInit() -- create a queue without allocing memory + * + * Locking and waiting for buffers can cause the backend to be + * put to sleep. Whoever releases the lock, etc. wakes the + * process up again (and gives it an error code so it knows + * whether it was awoken on an error condition). + * + * Interface (b): + * + * ProcReleaseLocks -- frees the locks associated with this process, + * ProcKill -- destroys the shared memory state (and locks) + * associated with the process. + * + * 5/15/91 -- removed the buffer pool based lock chain in favor + * of a shared memory lock chain. The write-protection is + * more expensive if the lock chain is in the buffer pool. + * The only reason I kept the lock chain in the buffer pool + * in the first place was to allow the lock table to grow larger + * than available shared memory and that isn't going to work + * without a lot of unimplemented support anyway. + * + * 4/7/95 -- instead of allocating a set of 1 semaphore per process, we + * allocate a semaphore from a set of PROC_NSEMS_PER_SET semaphores + * shared among backends (we keep a few sets of semaphores around). + * This is so that we can support more backends. (system-wide semaphore + * sets run out pretty fast.) -ay 4/95 + * + * $Header: /cvsroot/pgsql/src/backend/storage/lmgr/proc.c,v 1.1.1.1 1996/07/09 06:21:57 scrappy Exp $ + */ +#include <sys/time.h> +#ifndef WIN32 +#include <unistd.h> +#endif /* WIN32 */ +#include <string.h> +#include <sys/types.h> +#include "libpq/pqsignal.h" /* substitute for <signal.h> */ + +#if defined(PORTNAME_bsdi) +/* hacka, hacka, hacka (XXX) */ +union semun { + int val; /* value for SETVAL */ + struct semid_ds *buf; /* buffer for IPC_STAT & IPC_SET */ + ushort *array; /* array for GETALL & SETALL */ +}; +#endif + +#include "access/xact.h" +#include "utils/hsearch.h" +#include "utils/elog.h" + +#include "storage/buf.h" +#include "storage/lock.h" +#include "storage/shmem.h" +#include "storage/spin.h" +#include "storage/proc.h" + +/* + * timeout (in seconds) for resolving possible deadlock + */ +#ifndef DEADLOCK_TIMEOUT +#define DEADLOCK_TIMEOUT 60 +#endif + +/* -------------------- + * Spin lock for manipulating the shared process data structure: + * ProcGlobal.... Adding an extra spin lock seemed like the smallest + * hack to get around reading and updating this structure in shared + * memory. -mer 17 July 1991 + * -------------------- + */ +SPINLOCK ProcStructLock; + +/* + * For cleanup routines. Don't cleanup if the initialization + * has not happened. + */ +static bool ProcInitialized = FALSE; + +static PROC_HDR *ProcGlobal = NULL; + +PROC *MyProc = NULL; + +static void ProcKill(int exitStatus, int pid); +static void ProcGetNewSemKeyAndNum(IPCKey *key, int *semNum); +static void ProcFreeSem(IpcSemaphoreKey semKey, int semNum); +#if defined(PORTNAME_linux) +extern int HandleDeadLock(int); +#else +extern int HandleDeadLock(void); +#endif +/* + * InitProcGlobal - + * initializes the global process table. We put it here so that + * the postmaster can do this initialization. (ProcFreeAllSem needs + * to read this table on exiting the postmaster. If we have the first + * backend do this, starting up and killing the postmaster without + * starting any backends will be a problem.) + */ +void +InitProcGlobal(IPCKey key) +{ + bool found = false; + + /* attach to the free list */ + ProcGlobal = (PROC_HDR *) + ShmemInitStruct("Proc Header",(unsigned)sizeof(PROC_HDR),&found); + + /* -------------------- + * We're the first - initialize. + * -------------------- + */ + if (! found) + { + int i; + + ProcGlobal->numProcs = 0; + ProcGlobal->freeProcs = INVALID_OFFSET; + ProcGlobal->currKey = IPCGetProcessSemaphoreInitKey(key); + for (i=0; i < MAX_PROC_SEMS/PROC_NSEMS_PER_SET; i++) + ProcGlobal->freeSemMap[i] = 0; + } +} + +/* ------------------------ + * InitProc -- create a per-process data structure for this process + * used by the lock manager on semaphore queues. + * ------------------------ + */ +void +InitProcess(IPCKey key) +{ + bool found = false; + int pid; + int semstat; + unsigned long location, myOffset; + + /* ------------------ + * Routine called if deadlock timer goes off. See ProcSleep() + * ------------------ + */ +#ifndef WIN32 + signal(SIGALRM, HandleDeadLock); +#endif /* WIN32 we'll have to figure out how to handle this later */ + + SpinAcquire(ProcStructLock); + + /* attach to the free list */ + ProcGlobal = (PROC_HDR *) + ShmemInitStruct("Proc Header",(unsigned)sizeof(PROC_HDR),&found); + if (!found) { + /* this should not happen. InitProcGlobal() is called before this. */ + elog(WARN, "InitProcess: Proc Header uninitialized"); + } + + if (MyProc != NULL) + { + SpinRelease(ProcStructLock); + elog(WARN,"ProcInit: you already exist"); + return; + } + + /* try to get a proc from the free list first */ + + myOffset = ProcGlobal->freeProcs; + + if (myOffset != INVALID_OFFSET) + { + MyProc = (PROC *) MAKE_PTR(myOffset); + ProcGlobal->freeProcs = MyProc->links.next; + } + else + { + /* have to allocate one. We can't use the normal binding + * table mechanism because the proc structure is stored + * by PID instead of by a global name (need to look it + * up by PID when we cleanup dead processes). + */ + + MyProc = (PROC *) ShmemAlloc((unsigned)sizeof(PROC)); + if (! MyProc) + { + SpinRelease(ProcStructLock); + elog (FATAL,"cannot create new proc: out of memory"); + } + + /* this cannot be initialized until after the buffer pool */ + SHMQueueInit(&(MyProc->lockQueue)); + MyProc->procId = ProcGlobal->numProcs; + ProcGlobal->numProcs++; + } + + /* + * zero out the spin lock counts and set the sLocks field for + * ProcStructLock to 1 as we have acquired this spinlock above but + * didn't record it since we didn't have MyProc until now. + */ + memset(MyProc->sLocks, 0, sizeof(MyProc->sLocks)); + MyProc->sLocks[ProcStructLock] = 1; + + + if (IsUnderPostmaster) { + IPCKey semKey; + int semNum; + int semId; + union semun semun; + + ProcGetNewSemKeyAndNum(&semKey, &semNum); + + semId = IpcSemaphoreCreate(semKey, + PROC_NSEMS_PER_SET, + IPCProtection, + IpcSemaphoreDefaultStartValue, + 0, + &semstat); + /* + * we might be reusing a semaphore that belongs to a dead + * backend. So be careful and reinitialize its value here. + */ + semun.val = IpcSemaphoreDefaultStartValue; + semctl(semId, semNum, SETVAL, semun); + + IpcSemaphoreLock(semId, semNum, IpcExclusiveLock); + MyProc->sem.semId = semId; + MyProc->sem.semNum = semNum; + MyProc->sem.semKey = semKey; + } else { + MyProc->sem.semId = -1; + } + + /* ---------------------- + * Release the lock. + * ---------------------- + */ + SpinRelease(ProcStructLock); + + MyProc->pid = 0; +#if 0 + MyProc->pid = MyPid; +#endif + + /* ---------------- + * Start keeping spin lock stats from here on. Any botch before + * this initialization is forever botched + * ---------------- + */ + memset(MyProc->sLocks, 0, MAX_SPINS*sizeof(*MyProc->sLocks)); + + /* ------------------------- + * Install ourselves in the binding table. The name to + * use is determined by the OS-assigned process id. That + * allows the cleanup process to find us after any untimely + * exit. + * ------------------------- + */ + pid = getpid(); + location = MAKE_OFFSET(MyProc); + if ((! ShmemPIDLookup(pid,&location)) || (location != MAKE_OFFSET(MyProc))) + { + elog(FATAL,"InitProc: ShmemPID table broken"); + } + + MyProc->errType = NO_ERROR; + SHMQueueElemInit(&(MyProc->links)); + + on_exitpg(ProcKill, (caddr_t)pid); + + ProcInitialized = TRUE; +} + +/* + * ProcReleaseLocks() -- release all locks associated with this process + * + */ +void +ProcReleaseLocks() +{ + if (!MyProc) + return; + LockReleaseAll(1,&MyProc->lockQueue); +} + +/* + * ProcRemove - + * used by the postmaster to clean up the global tables. This also frees + * up the semaphore used for the lmgr of the process. (We have to do + * this is the postmaster instead of doing a IpcSemaphoreKill on exiting + * the process because the semaphore set is shared among backends and + * we don't want to remove other's semaphores on exit.) + */ +bool +ProcRemove(int pid) +{ + SHMEM_OFFSET location; + PROC *proc; + + location = INVALID_OFFSET; + + location = ShmemPIDDestroy(pid); + if (location == INVALID_OFFSET) + return(FALSE); + proc = (PROC *) MAKE_PTR(location); + + SpinAcquire(ProcStructLock); + + ProcFreeSem(proc->sem.semKey, proc->sem.semNum); + + proc->links.next = ProcGlobal->freeProcs; + ProcGlobal->freeProcs = MAKE_OFFSET(proc); + + SpinRelease(ProcStructLock); + + return(TRUE); +} + +/* + * ProcKill() -- Destroy the per-proc data structure for + * this process. Release any of its held spin locks. + */ +static void +ProcKill(int exitStatus, int pid) +{ + PROC *proc; + SHMEM_OFFSET location; + + /* -------------------- + * If this is a FATAL exit the postmaster will have to kill all the + * existing backends and reinitialize shared memory. So all we don't + * need to do anything here. + * -------------------- + */ + if (exitStatus != 0) + return; + + if (! pid) + { + pid = getpid(); + } + + ShmemPIDLookup(pid,&location); + if (location == INVALID_OFFSET) + return; + + proc = (PROC *) MAKE_PTR(location); + + if (proc != MyProc) { + Assert( pid != getpid() ); + } else + MyProc = NULL; + + /* --------------- + * Assume one lock table. + * --------------- + */ + ProcReleaseSpins(proc); + LockReleaseAll(1,&proc->lockQueue); + + /* ---------------- + * get off the wait queue + * ---------------- + */ + LockLockTable(); + if (proc->links.next != INVALID_OFFSET) { + Assert(proc->waitLock->waitProcs.size > 0); + SHMQueueDelete(&(proc->links)); + --proc->waitLock->waitProcs.size; + } + SHMQueueElemInit(&(proc->links)); + UnlockLockTable(); + + return; +} + +/* + * ProcQueue package: routines for putting processes to sleep + * and waking them up + */ + +/* + * ProcQueueAlloc -- alloc/attach to a shared memory process queue + * + * Returns: a pointer to the queue or NULL + * Side Effects: Initializes the queue if we allocated one + */ +PROC_QUEUE * +ProcQueueAlloc(char *name) +{ + bool found; + PROC_QUEUE *queue = (PROC_QUEUE *) + ShmemInitStruct(name,(unsigned)sizeof(PROC_QUEUE),&found); + + if (! queue) + { + return(NULL); + } + if (! found) + { + ProcQueueInit(queue); + } + return(queue); +} + +/* + * ProcQueueInit -- initialize a shared memory process queue + */ +void +ProcQueueInit(PROC_QUEUE *queue) +{ + SHMQueueInit(&(queue->links)); + queue->size = 0; +} + + + +/* + * ProcSleep -- put a process to sleep + * + * P() on the semaphore should put us to sleep. The process + * semaphore is cleared by default, so the first time we try + * to acquire it, we sleep. + * + * ASSUME: that no one will fiddle with the queue until after + * we release the spin lock. + * + * NOTES: The process queue is now a priority queue for locking. + */ +int +ProcSleep(PROC_QUEUE *queue, + SPINLOCK spinlock, + int token, + int prio, + LOCK *lock) +{ + int i; + PROC *proc; +#ifndef WIN32 /* figure this out later */ + struct itimerval timeval, dummy; +#endif /* WIN32 */ + + proc = (PROC *) MAKE_PTR(queue->links.prev); + for (i=0;i<queue->size;i++) + { + if (proc->prio < prio) + proc = (PROC *) MAKE_PTR(proc->links.prev); + else + break; + } + + MyProc->token = token; + MyProc->waitLock = lock; + + /* ------------------- + * currently, we only need this for the ProcWakeup routines + * ------------------- + */ + TransactionIdStore((TransactionId) GetCurrentTransactionId(), &MyProc->xid); + + /* ------------------- + * assume that these two operations are atomic (because + * of the spinlock). + * ------------------- + */ + SHMQueueInsertTL(&(proc->links),&(MyProc->links)); + queue->size++; + + SpinRelease(spinlock); + + /* -------------- + * Postgres does not have any deadlock detection code and for this + * reason we must set a timer to wake up the process in the event of + * a deadlock. For now the timer is set for 1 minute and we assume that + * any process which sleeps for this amount of time is deadlocked and will + * receive a SIGALRM signal. The handler should release the processes + * semaphore and abort the current transaction. + * + * Need to zero out struct to set the interval and the micro seconds fields + * to 0. + * -------------- + */ +#ifndef WIN32 + memset(&timeval, 0, sizeof(struct itimerval)); + timeval.it_value.tv_sec = DEADLOCK_TIMEOUT; + + if (setitimer(ITIMER_REAL, &timeval, &dummy)) + elog(FATAL, "ProcSleep: Unable to set timer for process wakeup"); +#endif /* WIN32 */ + + /* -------------- + * if someone wakes us between SpinRelease and IpcSemaphoreLock, + * IpcSemaphoreLock will not block. The wakeup is "saved" by + * the semaphore implementation. + * -------------- + */ + IpcSemaphoreLock(MyProc->sem.semId, MyProc->sem.semNum, IpcExclusiveLock); + + /* --------------- + * We were awoken before a timeout - now disable the timer + * --------------- + */ +#ifndef WIN32 + timeval.it_value.tv_sec = 0; + + + if (setitimer(ITIMER_REAL, &timeval, &dummy)) + elog(FATAL, "ProcSleep: Unable to diable timer for process wakeup"); +#endif /* WIN32 */ + + /* ---------------- + * We were assumed to be in a critical section when we went + * to sleep. + * ---------------- + */ + SpinAcquire(spinlock); + + return(MyProc->errType); +} + + +/* + * ProcWakeup -- wake up a process by releasing its private semaphore. + * + * remove the process from the wait queue and set its links invalid. + * RETURN: the next process in the wait queue. + */ +PROC * +ProcWakeup(PROC *proc, int errType) +{ + PROC *retProc; + /* assume that spinlock has been acquired */ + + if (proc->links.prev == INVALID_OFFSET || + proc->links.next == INVALID_OFFSET) + return((PROC *) NULL); + + retProc = (PROC *) MAKE_PTR(proc->links.prev); + + /* you have to update waitLock->waitProcs.size yourself */ + SHMQueueDelete(&(proc->links)); + SHMQueueElemInit(&(proc->links)); + + proc->errType = errType; + + IpcSemaphoreUnlock(proc->sem.semId, proc->sem.semNum, IpcExclusiveLock); + + return retProc; +} + + +/* + * ProcGetId -- + */ +int +ProcGetId() +{ + return( MyProc->procId ); +} + +/* + * ProcLockWakeup -- routine for waking up processes when a lock is + * released. + */ +int +ProcLockWakeup(PROC_QUEUE *queue, char *ltable, char *lock) +{ + PROC *proc; + int count; + + if (! queue->size) + return(STATUS_NOT_FOUND); + + proc = (PROC *) MAKE_PTR(queue->links.prev); + count = 0; + while ((LockResolveConflicts ((LOCKTAB *) ltable, + (LOCK *) lock, + proc->token, + proc->xid) == STATUS_OK)) + { + /* there was a waiting process, grant it the lock before waking it + * up. This will prevent another process from seizing the lock + * between the time we release the lock master (spinlock) and + * the time that the awoken process begins executing again. + */ + GrantLock((LOCK *) lock, proc->token); + queue->size--; + + /* + * ProcWakeup removes proc from the lock waiting process queue and + * returns the next proc in chain. If a writer just dropped + * its lock and there are several waiting readers, wake them all up. + */ + proc = ProcWakeup(proc, NO_ERROR); + + count++; + if (!proc || queue->size == 0) + break; + } + + if (count) + return(STATUS_OK); + else + /* Something is still blocking us. May have deadlocked. */ + return(STATUS_NOT_FOUND); +} + +void +ProcAddLock(SHM_QUEUE *elem) +{ + SHMQueueInsertTL(&MyProc->lockQueue,elem); +} + +/* -------------------- + * We only get to this routine if we got SIGALRM after DEADLOCK_TIMEOUT + * while waiting for a lock to be released by some other process. After + * the one minute deadline we assume we have a deadlock and must abort + * this transaction. We must also indicate that I'm no longer waiting + * on a lock so that other processes don't try to wake me up and screw + * up my semaphore. + * -------------------- + */ +int +#if defined(PORTNAME_linux) +HandleDeadLock(int i) +#else +HandleDeadLock() +#endif +{ + LOCK *lock; + int size; + + LockLockTable(); + + /* --------------------- + * Check to see if we've been awoken by anyone in the interim. + * + * If we have we can return and resume our transaction -- happy day. + * Before we are awoken the process releasing the lock grants it to + * us so we know that we don't have to wait anymore. + * + * Damn these names are LONG! -mer + * --------------------- + */ + if (IpcSemaphoreGetCount(MyProc->sem.semId, MyProc->sem.semNum) == + IpcSemaphoreDefaultStartValue) { + UnlockLockTable(); + return 1; + } + + /* + * you would think this would be unnecessary, but... + * + * this also means we've been removed already. in some ports + * (e.g., sparc and aix) the semop(2) implementation is such that + * we can actually end up in this handler after someone has removed + * us from the queue and bopped the semaphore *but the test above + * fails to detect the semaphore update* (presumably something weird + * having to do with the order in which the semaphore wakeup signal + * and SIGALRM get handled). + */ + if (MyProc->links.prev == INVALID_OFFSET || + MyProc->links.next == INVALID_OFFSET) { + UnlockLockTable(); + return(1); + } + + lock = MyProc->waitLock; + size = lock->waitProcs.size; /* so we can look at this in the core */ + + /* ------------------------ + * Get this process off the lock's wait queue + * ------------------------ + */ + Assert(lock->waitProcs.size > 0); + --lock->waitProcs.size; + SHMQueueDelete(&(MyProc->links)); + SHMQueueElemInit(&(MyProc->links)); + + /* ------------------ + * Unlock my semaphore so that the count is right for next time. + * I was awoken by a signal, not by someone unlocking my semaphore. + * ------------------ + */ + IpcSemaphoreUnlock(MyProc->sem.semId, MyProc->sem.semNum, IpcExclusiveLock); + + /* ------------- + * Set MyProc->errType to STATUS_ERROR so that we abort after + * returning from this handler. + * ------------- + */ + MyProc->errType = STATUS_ERROR; + + /* + * if this doesn't follow the IpcSemaphoreUnlock then we get lock + * table corruption ("LockReplace: xid table corrupted") due to + * race conditions. i don't claim to understand this... + */ + UnlockLockTable(); + + elog(NOTICE, "Timeout -- possible deadlock"); + return 0; +} + +void +ProcReleaseSpins(PROC *proc) +{ + int i; + + if (!proc) + proc = MyProc; + + if (!proc) + return; + for (i=0; i < (int)MAX_SPINS; i++) + { + if (proc->sLocks[i]) + { + Assert(proc->sLocks[i] == 1); + SpinRelease(i); + } + } +} + +/***************************************************************************** + * + *****************************************************************************/ + +/* + * ProcGetNewSemKeyAndNum - + * scan the free semaphore bitmap and allocate a single semaphore from + * a semaphore set. (If the semaphore set doesn't exist yet, + * IpcSemaphoreCreate will create it. Otherwise, we use the existing + * semaphore set.) + */ +static void +ProcGetNewSemKeyAndNum(IPCKey *key, int *semNum) +{ + int i; + int32 *freeSemMap = ProcGlobal->freeSemMap; + unsigned int fullmask; + + /* + * we hold ProcStructLock when entering this routine. We scan through + * the bitmap to look for a free semaphore. + */ + fullmask = ~0 >> (32 - PROC_NSEMS_PER_SET); + for(i=0; i < MAX_PROC_SEMS/PROC_NSEMS_PER_SET; i++) { + int mask = 1; + int j; + + if (freeSemMap[i] == fullmask) + continue; /* none free for this set */ + + for(j = 0; j < PROC_NSEMS_PER_SET; j++) { + if ((freeSemMap[i] & mask) == 0) { + /* + * a free semaphore found. Mark it as allocated. + */ + freeSemMap[i] |= mask; + + *key = ProcGlobal->currKey + i; + *semNum = j; + return; + } + mask <<= 1; + } + } + + /* if we reach here, all the semaphores are in use. */ + elog(WARN, "InitProc: cannot allocate a free semaphore"); +} + +/* + * ProcFreeSem - + * free up our semaphore in the semaphore set. If we're the last one + * in the set, also remove the semaphore set. + */ +static void +ProcFreeSem(IpcSemaphoreKey semKey, int semNum) +{ + int mask; + int i; + int32 *freeSemMap = ProcGlobal->freeSemMap; + + i = semKey - ProcGlobal->currKey; + mask = ~(1 << semNum); + freeSemMap[i] &= mask; + + if (freeSemMap[i]==0) + IpcSemaphoreKill(semKey); +} + +/* + * ProcFreeAllSemaphores - + * on exiting the postmaster, we free up all the semaphores allocated + * to the lmgrs of the backends. + */ +void +ProcFreeAllSemaphores() +{ + int i; + int32 *freeSemMap = ProcGlobal->freeSemMap; + + for(i=0; i < MAX_PROC_SEMS/PROC_NSEMS_PER_SET; i++) { + if (freeSemMap[i]!=0) + IpcSemaphoreKill(ProcGlobal->currKey + i); + } +} diff --git a/src/backend/storage/lmgr/single.c b/src/backend/storage/lmgr/single.c new file mode 100644 index 00000000000..8d41ea38bb6 --- /dev/null +++ b/src/backend/storage/lmgr/single.c @@ -0,0 +1,86 @@ +/*------------------------------------------------------------------------- + * + * single.c-- + * set single locks in the multi-level lock hierarchy + * + * Sometimes we don't want to set all levels of the multi-level + * lock hierarchy at once. This allows us to set and release + * one level at a time. It's useful in index scans when + * you can set an intent lock at the beginning and thereafter + * only set page locks. Tends to speed things up. + * + * Copyright (c) 1994, Regents of the University of California + * + * + * IDENTIFICATION + * $Header: /cvsroot/pgsql/src/backend/storage/lmgr/Attic/single.c,v 1.1.1.1 1996/07/09 06:21:57 scrappy Exp $ + * + *------------------------------------------------------------------------- + */ +#include <string.h> +#include "storage/lmgr.h" /* where the declarations go */ +#include "storage/lock.h" +#include "storage/multilev.h" +#include "utils/rel.h" + +/* + * SingleLockReln -- lock a relation + * + * Returns: TRUE if the lock can be set, FALSE otherwise. + */ +bool +SingleLockReln(LockInfo linfo, LOCKT lockt, int action) +{ + LOCKTAG tag; + + /* + * LOCKTAG has two bytes of padding, unfortunately. The + * hash function will return miss if the padding bytes aren't + * zero'd. + */ + memset(&tag,0,sizeof(tag)); + tag.relId = linfo->lRelId.relId; + tag.dbId = linfo->lRelId.dbId; + BlockIdSet(&(tag.tupleId.ip_blkid), InvalidBlockNumber); + tag.tupleId.ip_posid = InvalidOffsetNumber; + + if (action == UNLOCK) + return(LockRelease(MultiTableId, &tag, lockt)); + else + return(LockAcquire(MultiTableId, &tag, lockt)); +} + +/* + * SingleLockPage -- use multi-level lock table, but lock + * only at the page level. + * + * Assumes that an INTENT lock has already been set in the + * multi-level lock table. + * + */ +bool +SingleLockPage(LockInfo linfo, + ItemPointer tidPtr, + LOCKT lockt, + int action) +{ + LOCKTAG tag; + + /* + * LOCKTAG has two bytes of padding, unfortunately. The + * hash function will return miss if the padding bytes aren't + * zero'd. + */ + memset(&tag,0,sizeof(tag)); + tag.relId = linfo->lRelId.relId; + tag.dbId = linfo->lRelId.dbId; + BlockIdCopy(&(tag.tupleId.ip_blkid), &(tidPtr->ip_blkid)); + tag.tupleId.ip_posid = InvalidOffsetNumber; + + + if (action == UNLOCK) + return(LockRelease(MultiTableId, &tag, lockt)); + else + return(LockAcquire(MultiTableId, &tag, lockt)); +} + |
