distributed-process-execution-0.1.5.0: Execution Framework for The Cloud Haskell Application Platform
Copyright(c) Tim Watson 2012 - 2013
LicenseBSD3 (see the file LICENSE)
MaintainerTim Watson <[email protected]>
Stabilityexperimental
Portabilitynon-portable (requires concurrency)
Safe HaskellNone
LanguageHaskell2010

Control.Distributed.Process.Execution.Mailbox

Description

Generic process that acts as an external mailbox and message buffer.

Overview

For use when rate limiting is not possible (or desired), this module provides a buffer process that receives mail via its post API, buffers the received messages and delivers them when its owning process asks for them. A mailbox has to be started with a maximum buffer size - the so called limit - and will discard messages once its internal storage reaches this user defined threshold.

The usual behaviour of the buffer process is to accumulate messages in its internal memory. When a client evaluates notify, the buffer will send a NewMail message to the (real) mailbox of its owning process as soon as it has any message(s) ready to deliver. If the buffer already contains undelivered mail, the NewMail message will be dispatched immediately.

When the owning process wishes to receive mail, evaluating deliver (from any process) will cause the buffer to send its owner a Delivery message containing the accumulated messages and additional information about the number of messages it is delivering, the number of messages dropped since the last delivery and a handle for the mailbox (so that processes can have multiple mailboxes if required, and distinguish between them).

Overflow Handling

A mailbox handles overflow - when the number of messages it is holding reaches the limit - differently depending on the BufferType selected when it starts. The Queue buffer will, once the limit is reached, drop older messages first (i.e., the head of the queue) to make space for newer ones. The Ring buffer works similarly, but blocks new messages so as to preserve existing ones instead. Finally, the Stack buffer will drop the last (i.e., most recently received) message to make room for new mail.

Mailboxes can be resized by evaluating resize with a new value for the limit. If the new limit is older that the current/previous one, messages are dropped as though the mailbox had previously seen a volume of mail equal to the difference (in size) between the limits. In this situation, the Queue will drop as many older messages as neccessary to come within the limit, whilst the other two buffer types will drop as many newer messages as needed.

Ordering Guarantees

When messages are delivered to the owner, they arrive as a list of raw Message entries, given in descending age order (i.e., eldest first). Whilst this approximates the FIFO ordering a process' mailbox would usually offer, the Stack buffer will appear to offer no ordering at all, since it always deletes the most recent message(s). The Queue and Ring buffers will maintain a more queue-like (i.e., FIFO) view of received messages, with the obvious constraint the newer or older data might have been deleted.

Post API and Relaying

For messages to be properly handled by the mailbox, they can either be sent via the post API or directly to the Mailbox. Messages sent directly to the mailbox will still be handled via the internal buffers and subjected to the mailbox limits. The post API is really just a means to ensure that the conversion from Serializable a -> Message is done in the caller's process and uses the safe wrapMessage variant.

Acknowledgements

This API is based on the work of Erlang programmers Fred Hebert and Geoff Cant, its design closely mirroring that of the the pobox library application.

Synopsis

Creating, Starting, Configuring and Running a Mailbox

data Mailbox Source #

Opaque handle to a mailbox.

Instances

Instances details
Binary Mailbox Source # 
Instance details

Defined in Control.Distributed.Process.Execution.Mailbox

Methods

put :: Mailbox -> Put #

get :: Get Mailbox #

putList :: [Mailbox] -> Put #

Addressable Mailbox Source # 
Instance details

Defined in Control.Distributed.Process.Execution.Mailbox

Linkable Mailbox Source # 
Instance details

Defined in Control.Distributed.Process.Execution.Mailbox

Methods

linkTo :: Mailbox -> Process () #

Resolvable Mailbox Source # 
Instance details

Defined in Control.Distributed.Process.Execution.Mailbox

Routable Mailbox Source # 
Instance details

Defined in Control.Distributed.Process.Execution.Mailbox

Generic Mailbox Source # 
Instance details

Defined in Control.Distributed.Process.Execution.Mailbox

Associated Types

type Rep Mailbox 
Instance details

Defined in Control.Distributed.Process.Execution.Mailbox

Methods

from :: Mailbox -> Rep Mailbox x #

to :: Rep Mailbox x -> Mailbox #

Show Mailbox Source # 
Instance details

Defined in Control.Distributed.Process.Execution.Mailbox

Eq Mailbox Source # 
Instance details

Defined in Control.Distributed.Process.Execution.Mailbox

Methods

(==) :: Mailbox -> Mailbox -> Bool #

(/=) :: Mailbox -> Mailbox -> Bool #

type Rep Mailbox Source # 
Instance details

Defined in Control.Distributed.Process.Execution.Mailbox

startMailbox :: ProcessId -> BufferType -> Limit -> Process Mailbox Source #

Start a mailbox for the supplied ProcessId.

start = spawnLocal $ run

startSupervised :: ProcessId -> BufferType -> Limit -> SupervisorPid -> Process (ProcessId, Message) Source #

As startMailbox, but suitable for use in supervisor child specs. This variant is for use when you want to access to the underlying Mailbox handle in your supervised child refs. See supervisor's ChildRef data type for more information.

Example: > childSpec = toChildStart $ startSupervised pid bufferType mboxLimit

See Control.Distributed.Process.Supervisor

createMailbox :: BufferType -> Limit -> Process Mailbox Source #

Start a mailbox for the calling process.

create = getSelfPid >>= start

resize :: Mailbox -> Integer -> Process () Source #

Alters the mailbox's limit - this might cause messages to be dropped!

statistics :: Mailbox -> Process MailboxStats Source #

Obtain statistics (from/to anywhere) about a mailbox.

monitor :: Mailbox -> Process MonitorRef Source #

Monitor a mailbox.

type Limit = Integer Source #

Represents the maximum number of messages the internal buffer can hold.

data BufferType Source #

Describes the different types of buffer.

Constructors

Queue

FIFO buffer, limiter drops the eldest message (queue head)

Stack

unordered buffer, limiter drops the newest (top) message

Ring

FIFO buffer, limiter refuses (i.e., drops) new messages

data MailboxStats Source #

Bundle of statistics data, available on request via the mailboxStats API call.

Instances

Instances details
Binary MailboxStats Source # 
Instance details

Defined in Control.Distributed.Process.Execution.Mailbox

Generic MailboxStats Source # 
Instance details

Defined in Control.Distributed.Process.Execution.Mailbox

Associated Types

type Rep MailboxStats 
Instance details

Defined in Control.Distributed.Process.Execution.Mailbox

type Rep MailboxStats = D1 ('MetaData "MailboxStats" "Control.Distributed.Process.Execution.Mailbox" "distributed-process-execution-0.1.5.0-9ft8kt71jqJFQLQ1PfEqJf" 'False) (C1 ('MetaCons "MailboxStats" 'PrefixI 'True) ((S1 ('MetaSel ('Just "pendingMessages") 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 Integer) :*: S1 ('MetaSel ('Just "droppedMessages") 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 Integer)) :*: (S1 ('MetaSel ('Just "currentLimit") 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 Limit) :*: S1 ('MetaSel ('Just "owningProcess") 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 ProcessId))))
Show MailboxStats Source # 
Instance details

Defined in Control.Distributed.Process.Execution.Mailbox

type Rep MailboxStats Source # 
Instance details

Defined in Control.Distributed.Process.Execution.Mailbox

type Rep MailboxStats = D1 ('MetaData "MailboxStats" "Control.Distributed.Process.Execution.Mailbox" "distributed-process-execution-0.1.5.0-9ft8kt71jqJFQLQ1PfEqJf" 'False) (C1 ('MetaCons "MailboxStats" 'PrefixI 'True) ((S1 ('MetaSel ('Just "pendingMessages") 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 Integer) :*: S1 ('MetaSel ('Just "droppedMessages") 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 Integer)) :*: (S1 ('MetaSel ('Just "currentLimit") 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 Limit) :*: S1 ('MetaSel ('Just "owningProcess") 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 ProcessId))))

Posting Mail

post :: Serializable a => Mailbox -> a -> Process () Source #

Posts a message to someone's mailbox.

Obtaining Mail and Notifications

notify :: Mailbox -> Process () Source #

Instructs the mailbox to send a NewMail signal as soon as any mail is available for delivery. Once the signal is sent, it will not be resent, even when further mail arrives, until notify is called again.

NB: signals are only delivered to the mailbox's owning process.

deliver :: Mailbox -> Process () Source #

Instructs the mailbox to deliver all pending messages to the owner.

active :: Mailbox -> Filter -> Process () Source #

Instructs the mailbox to send a Delivery as soon as any mail is available, or immediately (if the buffer already contains data).

NB: signals are only delivered to the mailbox's owning process.

data NewMail Source #

Marker message indicating to the owning process that mail has arrived.

Constructors

NewMail !Mailbox !Integer 

Instances

Instances details
Binary NewMail Source # 
Instance details

Defined in Control.Distributed.Process.Execution.Mailbox

Methods

put :: NewMail -> Put #

get :: Get NewMail #

putList :: [NewMail] -> Put #

Generic NewMail Source # 
Instance details

Defined in Control.Distributed.Process.Execution.Mailbox

Associated Types

type Rep NewMail 
Instance details

Defined in Control.Distributed.Process.Execution.Mailbox

type Rep NewMail = D1 ('MetaData "NewMail" "Control.Distributed.Process.Execution.Mailbox" "distributed-process-execution-0.1.5.0-9ft8kt71jqJFQLQ1PfEqJf" 'False) (C1 ('MetaCons "NewMail" 'PrefixI 'False) (S1 ('MetaSel ('Nothing :: Maybe Symbol) 'NoSourceUnpackedness 'SourceStrict 'DecidedStrict) (Rec0 Mailbox) :*: S1 ('MetaSel ('Nothing :: Maybe Symbol) 'NoSourceUnpackedness 'SourceStrict 'DecidedStrict) (Rec0 Integer)))

Methods

from :: NewMail -> Rep NewMail x #

to :: Rep NewMail x -> NewMail #

Show NewMail Source # 
Instance details

Defined in Control.Distributed.Process.Execution.Mailbox

type Rep NewMail Source # 
Instance details

Defined in Control.Distributed.Process.Execution.Mailbox

type Rep NewMail = D1 ('MetaData "NewMail" "Control.Distributed.Process.Execution.Mailbox" "distributed-process-execution-0.1.5.0-9ft8kt71jqJFQLQ1PfEqJf" 'False) (C1 ('MetaCons "NewMail" 'PrefixI 'False) (S1 ('MetaSel ('Nothing :: Maybe Symbol) 'NoSourceUnpackedness 'SourceStrict 'DecidedStrict) (Rec0 Mailbox) :*: S1 ('MetaSel ('Nothing :: Maybe Symbol) 'NoSourceUnpackedness 'SourceStrict 'DecidedStrict) (Rec0 Integer)))

data Delivery Source #

Mail delivery.

Constructors

Delivery 

Fields

Instances

Instances details
Binary Delivery Source # 
Instance details

Defined in Control.Distributed.Process.Execution.Mailbox

Methods

put :: Delivery -> Put #

get :: Get Delivery #

putList :: [Delivery] -> Put #

Generic Delivery Source # 
Instance details

Defined in Control.Distributed.Process.Execution.Mailbox

Associated Types

type Rep Delivery 
Instance details

Defined in Control.Distributed.Process.Execution.Mailbox

type Rep Delivery = D1 ('MetaData "Delivery" "Control.Distributed.Process.Execution.Mailbox" "distributed-process-execution-0.1.5.0-9ft8kt71jqJFQLQ1PfEqJf" 'False) (C1 ('MetaCons "Delivery" 'PrefixI 'True) ((S1 ('MetaSel ('Just "box") 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 Mailbox) :*: S1 ('MetaSel ('Just "messages") 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 [Message])) :*: (S1 ('MetaSel ('Just "count") 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 Integer) :*: S1 ('MetaSel ('Just "totalDropped") 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 Integer))))

Methods

from :: Delivery -> Rep Delivery x #

to :: Rep Delivery x -> Delivery #

type Rep Delivery Source # 
Instance details

Defined in Control.Distributed.Process.Execution.Mailbox

type Rep Delivery = D1 ('MetaData "Delivery" "Control.Distributed.Process.Execution.Mailbox" "distributed-process-execution-0.1.5.0-9ft8kt71jqJFQLQ1PfEqJf" 'False) (C1 ('MetaCons "Delivery" 'PrefixI 'True) ((S1 ('MetaSel ('Just "box") 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 Mailbox) :*: S1 ('MetaSel ('Just "messages") 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 [Message])) :*: (S1 ('MetaSel ('Just "count") 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 Integer) :*: S1 ('MetaSel ('Just "totalDropped") 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 Integer))))

data FilterResult Source #

Constructors

Keep 
Skip 
Send 

Instances

Instances details
Binary FilterResult Source # 
Instance details

Defined in Control.Distributed.Process.Execution.Mailbox

Generic FilterResult Source # 
Instance details

Defined in Control.Distributed.Process.Execution.Mailbox

Associated Types

type Rep FilterResult 
Instance details

Defined in Control.Distributed.Process.Execution.Mailbox

type Rep FilterResult = D1 ('MetaData "FilterResult" "Control.Distributed.Process.Execution.Mailbox" "distributed-process-execution-0.1.5.0-9ft8kt71jqJFQLQ1PfEqJf" 'False) (C1 ('MetaCons "Keep" 'PrefixI 'False) (U1 :: Type -> Type) :+: (C1 ('MetaCons "Skip" 'PrefixI 'False) (U1 :: Type -> Type) :+: C1 ('MetaCons "Send" 'PrefixI 'False) (U1 :: Type -> Type)))
type Rep FilterResult Source # 
Instance details

Defined in Control.Distributed.Process.Execution.Mailbox

type Rep FilterResult = D1 ('MetaData "FilterResult" "Control.Distributed.Process.Execution.Mailbox" "distributed-process-execution-0.1.5.0-9ft8kt71jqJFQLQ1PfEqJf" 'False) (C1 ('MetaCons "Keep" 'PrefixI 'False) (U1 :: Type -> Type) :+: (C1 ('MetaCons "Skip" 'PrefixI 'False) (U1 :: Type -> Type) :+: C1 ('MetaCons "Send" 'PrefixI 'False) (U1 :: Type -> Type)))

acceptEverything :: Closure (Message -> Process FilterResult) Source #

A do-nothing filter that accepts all messages (i.e., returns Keep for any input).

acceptMatching :: Closure (Closure (Message -> Process FilterResult) -> Message -> Process FilterResult) Source #

A filter that takes a Closure (Message -> Process FilterResult) holding the filter function and applies it remotely (i.e., in the mailbox's own managed process).

Remote Table