Copyright | (c) Tim Watson 2012 - 2013 |
---|---|
License | BSD3 (see the file LICENSE) |
Maintainer | Tim Watson <[email protected]> |
Stability | experimental |
Portability | non-portable (requires concurrency) |
Safe Haskell | None |
Language | Haskell2010 |
Control.Distributed.Process.Execution.Mailbox
Description
Generic process that acts as an external mailbox and message buffer.
- Overview
For use when rate limiting is not possible (or desired), this module
provides a buffer process that receives mail via its post
API, buffers
the received messages and delivers them when its owning process asks for
them. A mailbox has to be started with a maximum buffer size - the so called
limit - and will discard messages once its internal storage reaches this
user defined threshold.
The usual behaviour of the buffer process is to accumulate messages in
its internal memory. When a client evaluates notify
, the buffer will
send a NewMail
message to the (real) mailbox of its owning process as
soon as it has any message(s) ready to deliver. If the buffer already
contains undelivered mail, the NewMail
message will be dispatched
immediately.
When the owning process wishes to receive mail, evaluating deliver
(from
any process) will cause the buffer to send its owner a Delivery
message
containing the accumulated messages and additional information about the
number of messages it is delivering, the number of messages dropped since
the last delivery and a handle for the mailbox (so that processes can have
multiple mailboxes if required, and distinguish between them).
- Overflow Handling
A mailbox handles overflow - when the number of messages it is holding
reaches the limit - differently depending on the BufferType
selected
when it starts. The Queue
buffer will, once the limit is reached, drop
older messages first (i.e., the head of the queue) to make space for
newer ones. The Ring
buffer works similarly, but blocks new messages
so as to preserve existing ones instead. Finally, the Stack
buffer will
drop the last (i.e., most recently received) message to make room for new
mail.
Mailboxes can be resized by evaluating resize
with a new value for the
limit. If the new limit is older that the current/previous one, messages
are dropped as though the mailbox had previously seen a volume of mail
equal to the difference (in size) between the limits. In this situation,
the Queue
will drop as many older messages as neccessary to come within
the limit, whilst the other two buffer types will drop as many newer messages
as needed.
- Ordering Guarantees
When messages are delivered to the owner, they arrive as a list of raw
Message
entries, given in descending age order (i.e., eldest first).
Whilst this approximates the FIFO ordering a process' mailbox would usually
offer, the Stack
buffer will appear to offer no ordering at all, since
it always deletes the most recent message(s). The Queue
and Ring
buffers
will maintain a more queue-like (i.e., FIFO) view of received messages,
with the obvious constraint the newer or older data might have been deleted.
- Post API and Relaying
For messages to be properly handled by the mailbox, they can either be sent
via the post
API or directly to the Mailbox
. Messages sent directly to
the mailbox will still be handled via the internal buffers and subjected to
the mailbox limits. The post
API is really just a means to ensure that
the conversion from Serializable a -> Message
is done in the caller's
process and uses the safe wrapMessage
variant.
- Acknowledgements
This API is based on the work of Erlang programmers Fred Hebert and Geoff Cant, its design closely mirroring that of the the pobox library application.
Synopsis
- data Mailbox
- startMailbox :: ProcessId -> BufferType -> Limit -> Process Mailbox
- startSupervised :: ProcessId -> BufferType -> Limit -> SupervisorPid -> Process (ProcessId, Message)
- startSupervisedMailbox :: ProcessId -> BufferType -> Limit -> SupervisorPid -> Process Mailbox
- createMailbox :: BufferType -> Limit -> Process Mailbox
- resize :: Mailbox -> Integer -> Process ()
- statistics :: Mailbox -> Process MailboxStats
- monitor :: Mailbox -> Process MonitorRef
- type Limit = Integer
- data BufferType
- data MailboxStats = MailboxStats {}
- post :: Serializable a => Mailbox -> a -> Process ()
- notify :: Mailbox -> Process ()
- deliver :: Mailbox -> Process ()
- active :: Mailbox -> Filter -> Process ()
- data NewMail = NewMail !Mailbox !Integer
- data Delivery = Delivery {}
- data FilterResult
- acceptEverything :: Closure (Message -> Process FilterResult)
- acceptMatching :: Closure (Closure (Message -> Process FilterResult) -> Message -> Process FilterResult)
- __remoteTable :: RemoteTable -> RemoteTable
Creating, Starting, Configuring and Running a Mailbox
Opaque handle to a mailbox.
Instances
Binary Mailbox Source # | |
Addressable Mailbox Source # | |
Linkable Mailbox Source # | |
Resolvable Mailbox Source # | |
Routable Mailbox Source # | |
Defined in Control.Distributed.Process.Execution.Mailbox Methods sendTo :: (Serializable m, Resolvable Mailbox) => Mailbox -> m -> Process () # unsafeSendTo :: (NFSerializable m, Resolvable Mailbox) => Mailbox -> m -> Process () # | |
Generic Mailbox Source # | |
Show Mailbox Source # | |
Eq Mailbox Source # | |
type Rep Mailbox Source # | |
startMailbox :: ProcessId -> BufferType -> Limit -> Process Mailbox Source #
Start a mailbox for the supplied ProcessId
.
start = spawnLocal $ run
startSupervised :: ProcessId -> BufferType -> Limit -> SupervisorPid -> Process (ProcessId, Message) Source #
As startMailbox
, but suitable for use in supervisor child specs.
This variant is for use when you want to access to the underlying
Mailbox
handle in your supervised child refs. See supervisor's
ChildRef
data type for more information.
Example: > childSpec = toChildStart $ startSupervised pid bufferType mboxLimit
startSupervisedMailbox :: ProcessId -> BufferType -> Limit -> SupervisorPid -> Process Mailbox Source #
As startMailbox
, but suitable for use in supervisor child specs.
createMailbox :: BufferType -> Limit -> Process Mailbox Source #
Start a mailbox for the calling process.
create = getSelfPid >>= start
resize :: Mailbox -> Integer -> Process () Source #
Alters the mailbox's limit - this might cause messages to be dropped!
statistics :: Mailbox -> Process MailboxStats Source #
Obtain statistics (from/to anywhere) about a mailbox.
Represents the maximum number of messages the internal buffer can hold.
data BufferType Source #
Describes the different types of buffer.
Constructors
Queue | FIFO buffer, limiter drops the eldest message (queue head) |
Stack | unordered buffer, limiter drops the newest (top) message |
Ring | FIFO buffer, limiter refuses (i.e., drops) new messages |
Instances
Show BufferType Source # | |
Defined in Control.Distributed.Process.Execution.Mailbox Methods showsPrec :: Int -> BufferType -> ShowS # show :: BufferType -> String # showList :: [BufferType] -> ShowS # | |
Eq BufferType Source # | |
data MailboxStats Source #
Bundle of statistics data, available on request via
the mailboxStats
API call.
Constructors
MailboxStats | |
Fields |
Instances
Posting Mail
Obtaining Mail and Notifications
deliver :: Mailbox -> Process () Source #
Instructs the mailbox to deliver all pending messages to the owner.
active :: Mailbox -> Filter -> Process () Source #
Instructs the mailbox to send a Delivery
as soon as any mail is
available, or immediately (if the buffer already contains data).
NB: signals are only delivered to the mailbox's owning process.
Marker message indicating to the owning process that mail has arrived.
Instances
Binary NewMail Source # | |||||
Generic NewMail Source # | |||||
Defined in Control.Distributed.Process.Execution.Mailbox Associated Types
| |||||
Show NewMail Source # | |||||
type Rep NewMail Source # | |||||
Defined in Control.Distributed.Process.Execution.Mailbox type Rep NewMail = D1 ('MetaData "NewMail" "Control.Distributed.Process.Execution.Mailbox" "distributed-process-execution-0.1.5.0-AioyE4oZTIc48yHL37wGQ" 'False) (C1 ('MetaCons "NewMail" 'PrefixI 'False) (S1 ('MetaSel ('Nothing :: Maybe Symbol) 'NoSourceUnpackedness 'SourceStrict 'DecidedStrict) (Rec0 Mailbox) :*: S1 ('MetaSel ('Nothing :: Maybe Symbol) 'NoSourceUnpackedness 'SourceStrict 'DecidedStrict) (Rec0 Integer))) |
Mail delivery.
Constructors
Delivery | |
Instances
Binary Delivery Source # | |||||
Generic Delivery Source # | |||||
Defined in Control.Distributed.Process.Execution.Mailbox Associated Types
| |||||
type Rep Delivery Source # | |||||
Defined in Control.Distributed.Process.Execution.Mailbox type Rep Delivery = D1 ('MetaData "Delivery" "Control.Distributed.Process.Execution.Mailbox" "distributed-process-execution-0.1.5.0-AioyE4oZTIc48yHL37wGQ" 'False) (C1 ('MetaCons "Delivery" 'PrefixI 'True) ((S1 ('MetaSel ('Just "box") 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 Mailbox) :*: S1 ('MetaSel ('Just "messages") 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 [Message])) :*: (S1 ('MetaSel ('Just "count") 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 Integer) :*: S1 ('MetaSel ('Just "totalDropped") 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 Integer)))) |
data FilterResult Source #
Instances
Binary FilterResult Source # | |||||
Generic FilterResult Source # | |||||
Defined in Control.Distributed.Process.Execution.Mailbox Associated Types
| |||||
type Rep FilterResult Source # | |||||
Defined in Control.Distributed.Process.Execution.Mailbox type Rep FilterResult = D1 ('MetaData "FilterResult" "Control.Distributed.Process.Execution.Mailbox" "distributed-process-execution-0.1.5.0-AioyE4oZTIc48yHL37wGQ" 'False) (C1 ('MetaCons "Keep" 'PrefixI 'False) (U1 :: Type -> Type) :+: (C1 ('MetaCons "Skip" 'PrefixI 'False) (U1 :: Type -> Type) :+: C1 ('MetaCons "Send" 'PrefixI 'False) (U1 :: Type -> Type))) |
acceptEverything :: Closure (Message -> Process FilterResult) Source #
A do-nothing filter that accepts all messages (i.e., returns Keep
for any input).
acceptMatching :: Closure (Closure (Message -> Process FilterResult) -> Message -> Process FilterResult) Source #
A filter that takes a Closure (Message -> Process FilterResult)
holding
the filter function and applies it remotely (i.e., in the mailbox's own
managed process).