distributed-process-execution-0.1.5.0: Execution Framework for The Cloud Haskell Application Platform
Copyright(c) Tim Watson 2013 - 2014
LicenseBSD3 (see the file LICENSE)
MaintainerTim Watson <[email protected]>
Stabilityexperimental
Portabilitynon-portable (requires concurrency)
Safe HaskellNone
LanguageHaskell2010

Control.Distributed.Process.Execution

Description

Inter-Process Traffic Management

The Execution Framework provides tools for load regulation, workload shedding and remote hand-off. The currently implementation provides only a subset of the plumbing required, comprising tools for event management, mailbox buffering and message routing.

Synopsis

Mailbox Buffering

statistics :: Mailbox -> Process MailboxStats Source #

Obtain statistics (from/to anywhere) about a mailbox.

active :: Mailbox -> Filter -> Process () Source #

Instructs the mailbox to send a Delivery as soon as any mail is available, or immediately (if the buffer already contains data).

NB: signals are only delivered to the mailbox's owning process.

data BufferType Source #

Describes the different types of buffer.

Constructors

Queue

FIFO buffer, limiter drops the eldest message (queue head)

Stack

unordered buffer, limiter drops the newest (top) message

Ring

FIFO buffer, limiter refuses (i.e., drops) new messages

data Delivery Source #

Mail delivery.

Constructors

Delivery 

Fields

Instances

Instances details
Binary Delivery Source # 
Instance details

Defined in Control.Distributed.Process.Execution.Mailbox

Methods

put :: Delivery -> Put #

get :: Get Delivery #

putList :: [Delivery] -> Put #

Generic Delivery Source # 
Instance details

Defined in Control.Distributed.Process.Execution.Mailbox

Associated Types

type Rep Delivery 
Instance details

Defined in Control.Distributed.Process.Execution.Mailbox

type Rep Delivery = D1 ('MetaData "Delivery" "Control.Distributed.Process.Execution.Mailbox" "distributed-process-execution-0.1.5.0-9ft8kt71jqJFQLQ1PfEqJf" 'False) (C1 ('MetaCons "Delivery" 'PrefixI 'True) ((S1 ('MetaSel ('Just "box") 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 Mailbox) :*: S1 ('MetaSel ('Just "messages") 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 [Message])) :*: (S1 ('MetaSel ('Just "count") 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 Integer) :*: S1 ('MetaSel ('Just "totalDropped") 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 Integer))))

Methods

from :: Delivery -> Rep Delivery x #

to :: Rep Delivery x -> Delivery #

type Rep Delivery Source # 
Instance details

Defined in Control.Distributed.Process.Execution.Mailbox

type Rep Delivery = D1 ('MetaData "Delivery" "Control.Distributed.Process.Execution.Mailbox" "distributed-process-execution-0.1.5.0-9ft8kt71jqJFQLQ1PfEqJf" 'False) (C1 ('MetaCons "Delivery" 'PrefixI 'True) ((S1 ('MetaSel ('Just "box") 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 Mailbox) :*: S1 ('MetaSel ('Just "messages") 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 [Message])) :*: (S1 ('MetaSel ('Just "count") 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 Integer) :*: S1 ('MetaSel ('Just "totalDropped") 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 Integer))))

type Limit = Integer Source #

Represents the maximum number of messages the internal buffer can hold.

data Mailbox Source #

Opaque handle to a mailbox.

Instances

Instances details
Binary Mailbox Source # 
Instance details

Defined in Control.Distributed.Process.Execution.Mailbox

Methods

put :: Mailbox -> Put #

get :: Get Mailbox #

putList :: [Mailbox] -> Put #

Addressable Mailbox Source # 
Instance details

Defined in Control.Distributed.Process.Execution.Mailbox

Linkable Mailbox Source # 
Instance details

Defined in Control.Distributed.Process.Execution.Mailbox

Methods

linkTo :: Mailbox -> Process () #

Resolvable Mailbox Source # 
Instance details

Defined in Control.Distributed.Process.Execution.Mailbox

Routable Mailbox Source # 
Instance details

Defined in Control.Distributed.Process.Execution.Mailbox

Generic Mailbox Source # 
Instance details

Defined in Control.Distributed.Process.Execution.Mailbox

Associated Types

type Rep Mailbox 
Instance details

Defined in Control.Distributed.Process.Execution.Mailbox

Methods

from :: Mailbox -> Rep Mailbox x #

to :: Rep Mailbox x -> Mailbox #

Show Mailbox Source # 
Instance details

Defined in Control.Distributed.Process.Execution.Mailbox

Eq Mailbox Source # 
Instance details

Defined in Control.Distributed.Process.Execution.Mailbox

Methods

(==) :: Mailbox -> Mailbox -> Bool #

(/=) :: Mailbox -> Mailbox -> Bool #

type Rep Mailbox Source # 
Instance details

Defined in Control.Distributed.Process.Execution.Mailbox

monitor :: Mailbox -> Process MonitorRef Source #

Monitor a mailbox.

deliver :: Mailbox -> Process () Source #

Instructs the mailbox to deliver all pending messages to the owner.

data FilterResult Source #

Constructors

Keep 
Skip 
Send 

Instances

Instances details
Binary FilterResult Source # 
Instance details

Defined in Control.Distributed.Process.Execution.Mailbox

Generic FilterResult Source # 
Instance details

Defined in Control.Distributed.Process.Execution.Mailbox

Associated Types

type Rep FilterResult 
Instance details

Defined in Control.Distributed.Process.Execution.Mailbox

type Rep FilterResult = D1 ('MetaData "FilterResult" "Control.Distributed.Process.Execution.Mailbox" "distributed-process-execution-0.1.5.0-9ft8kt71jqJFQLQ1PfEqJf" 'False) (C1 ('MetaCons "Keep" 'PrefixI 'False) (U1 :: Type -> Type) :+: (C1 ('MetaCons "Skip" 'PrefixI 'False) (U1 :: Type -> Type) :+: C1 ('MetaCons "Send" 'PrefixI 'False) (U1 :: Type -> Type)))
type Rep FilterResult Source # 
Instance details

Defined in Control.Distributed.Process.Execution.Mailbox

type Rep FilterResult = D1 ('MetaData "FilterResult" "Control.Distributed.Process.Execution.Mailbox" "distributed-process-execution-0.1.5.0-9ft8kt71jqJFQLQ1PfEqJf" 'False) (C1 ('MetaCons "Keep" 'PrefixI 'False) (U1 :: Type -> Type) :+: (C1 ('MetaCons "Skip" 'PrefixI 'False) (U1 :: Type -> Type) :+: C1 ('MetaCons "Send" 'PrefixI 'False) (U1 :: Type -> Type)))

notify :: Mailbox -> Process () Source #

Instructs the mailbox to send a NewMail signal as soon as any mail is available for delivery. Once the signal is sent, it will not be resent, even when further mail arrives, until notify is called again.

NB: signals are only delivered to the mailbox's owning process.

startMailbox :: ProcessId -> BufferType -> Limit -> Process Mailbox Source #

Start a mailbox for the supplied ProcessId.

start = spawnLocal $ run

createMailbox :: BufferType -> Limit -> Process Mailbox Source #

Start a mailbox for the calling process.

create = getSelfPid >>= start

resize :: Mailbox -> Integer -> Process () Source #

Alters the mailbox's limit - this might cause messages to be dropped!

data MailboxStats Source #

Bundle of statistics data, available on request via the mailboxStats API call.

Instances

Instances details
Binary MailboxStats Source # 
Instance details

Defined in Control.Distributed.Process.Execution.Mailbox

Generic MailboxStats Source # 
Instance details

Defined in Control.Distributed.Process.Execution.Mailbox

Associated Types

type Rep MailboxStats 
Instance details

Defined in Control.Distributed.Process.Execution.Mailbox

type Rep MailboxStats = D1 ('MetaData "MailboxStats" "Control.Distributed.Process.Execution.Mailbox" "distributed-process-execution-0.1.5.0-9ft8kt71jqJFQLQ1PfEqJf" 'False) (C1 ('MetaCons "MailboxStats" 'PrefixI 'True) ((S1 ('MetaSel ('Just "pendingMessages") 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 Integer) :*: S1 ('MetaSel ('Just "droppedMessages") 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 Integer)) :*: (S1 ('MetaSel ('Just "currentLimit") 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 Limit) :*: S1 ('MetaSel ('Just "owningProcess") 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 ProcessId))))
Show MailboxStats Source # 
Instance details

Defined in Control.Distributed.Process.Execution.Mailbox

type Rep MailboxStats Source # 
Instance details

Defined in Control.Distributed.Process.Execution.Mailbox

type Rep MailboxStats = D1 ('MetaData "MailboxStats" "Control.Distributed.Process.Execution.Mailbox" "distributed-process-execution-0.1.5.0-9ft8kt71jqJFQLQ1PfEqJf" 'False) (C1 ('MetaCons "MailboxStats" 'PrefixI 'True) ((S1 ('MetaSel ('Just "pendingMessages") 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 Integer) :*: S1 ('MetaSel ('Just "droppedMessages") 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 Integer)) :*: (S1 ('MetaSel ('Just "currentLimit") 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 Limit) :*: S1 ('MetaSel ('Just "owningProcess") 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 ProcessId))))

data NewMail Source #

Marker message indicating to the owning process that mail has arrived.

Constructors

NewMail !Mailbox !Integer 

Instances

Instances details
Binary NewMail Source # 
Instance details

Defined in Control.Distributed.Process.Execution.Mailbox

Methods

put :: NewMail -> Put #

get :: Get NewMail #

putList :: [NewMail] -> Put #

Generic NewMail Source # 
Instance details

Defined in Control.Distributed.Process.Execution.Mailbox

Associated Types

type Rep NewMail 
Instance details

Defined in Control.Distributed.Process.Execution.Mailbox

type Rep NewMail = D1 ('MetaData "NewMail" "Control.Distributed.Process.Execution.Mailbox" "distributed-process-execution-0.1.5.0-9ft8kt71jqJFQLQ1PfEqJf" 'False) (C1 ('MetaCons "NewMail" 'PrefixI 'False) (S1 ('MetaSel ('Nothing :: Maybe Symbol) 'NoSourceUnpackedness 'SourceStrict 'DecidedStrict) (Rec0 Mailbox) :*: S1 ('MetaSel ('Nothing :: Maybe Symbol) 'NoSourceUnpackedness 'SourceStrict 'DecidedStrict) (Rec0 Integer)))

Methods

from :: NewMail -> Rep NewMail x #

to :: Rep NewMail x -> NewMail #

Show NewMail Source # 
Instance details

Defined in Control.Distributed.Process.Execution.Mailbox

type Rep NewMail Source # 
Instance details

Defined in Control.Distributed.Process.Execution.Mailbox

type Rep NewMail = D1 ('MetaData "NewMail" "Control.Distributed.Process.Execution.Mailbox" "distributed-process-execution-0.1.5.0-9ft8kt71jqJFQLQ1PfEqJf" 'False) (C1 ('MetaCons "NewMail" 'PrefixI 'False) (S1 ('MetaSel ('Nothing :: Maybe Symbol) 'NoSourceUnpackedness 'SourceStrict 'DecidedStrict) (Rec0 Mailbox) :*: S1 ('MetaSel ('Nothing :: Maybe Symbol) 'NoSourceUnpackedness 'SourceStrict 'DecidedStrict) (Rec0 Integer)))

acceptEverything :: Closure (Message -> Process FilterResult) Source #

A do-nothing filter that accepts all messages (i.e., returns Keep for any input).

acceptMatching :: Closure (Closure (Message -> Process FilterResult) -> Message -> Process FilterResult) Source #

A filter that takes a Closure (Message -> Process FilterResult) holding the filter function and applies it remotely (i.e., in the mailbox's own managed process).

Message Exchanges

data Message Source #

Messages sent to an exchange can optionally provide a routing key and a list of (key, value) headers in addition to the underlying payload.

Constructors

Message 

Fields

Instances

Instances details
Binary Message Source # 
Instance details

Defined in Control.Distributed.Process.Execution.Exchange.Internal

Methods

put :: Message -> Put #

get :: Get Message #

putList :: [Message] -> Put #

NFData Message Source # 
Instance details

Defined in Control.Distributed.Process.Execution.Exchange.Internal

Methods

rnf :: Message -> () #

Generic Message Source # 
Instance details

Defined in Control.Distributed.Process.Execution.Exchange.Internal

Associated Types

type Rep Message 
Instance details

Defined in Control.Distributed.Process.Execution.Exchange.Internal

type Rep Message = D1 ('MetaData "Message" "Control.Distributed.Process.Execution.Exchange.Internal" "distributed-process-execution-0.1.5.0-9ft8kt71jqJFQLQ1PfEqJf" 'False) (C1 ('MetaCons "Message" 'PrefixI 'True) (S1 ('MetaSel ('Just "key") 'NoSourceUnpackedness 'SourceStrict 'DecidedStrict) (Rec0 String) :*: (S1 ('MetaSel ('Just "headers") 'NoSourceUnpackedness 'SourceStrict 'DecidedStrict) (Rec0 [(String, String)]) :*: S1 ('MetaSel ('Just "payload") 'NoSourceUnpackedness 'SourceStrict 'DecidedStrict) (Rec0 Message))))

Methods

from :: Message -> Rep Message x #

to :: Rep Message x -> Message #

Show Message Source # 
Instance details

Defined in Control.Distributed.Process.Execution.Exchange.Internal

type Rep Message Source # 
Instance details

Defined in Control.Distributed.Process.Execution.Exchange.Internal

type Rep Message = D1 ('MetaData "Message" "Control.Distributed.Process.Execution.Exchange.Internal" "distributed-process-execution-0.1.5.0-9ft8kt71jqJFQLQ1PfEqJf" 'False) (C1 ('MetaCons "Message" 'PrefixI 'True) (S1 ('MetaSel ('Just "key") 'NoSourceUnpackedness 'SourceStrict 'DecidedStrict) (Rec0 String) :*: (S1 ('MetaSel ('Just "headers") 'NoSourceUnpackedness 'SourceStrict 'DecidedStrict) (Rec0 [(String, String)]) :*: S1 ('MetaSel ('Just "payload") 'NoSourceUnpackedness 'SourceStrict 'DecidedStrict) (Rec0 Message))))

createMessage :: Serializable m => String -> [(String, String)] -> m -> Message Source #

Utility for creating a Message datum from its key, headers and payload.

data ExchangeType s Source #

Different exchange types are defined using record syntax. The configureEx and routeEx API functions are called during the exchange lifecycle when incoming traffic arrives. Configuration messages are completely arbitrary types and the exchange type author is entirely responsible for decoding them. Messages posted to the exchange (see the Message data type) are passed to the routeEx API function along with the exchange type's own internal state. Both API functions return a new (potentially updated) state and run in the Process monad.

Constructors

ExchangeType 

Fields

post :: Serializable a => Exchange -> a -> Process () Source #

Posts an arbitrary Serializable datum to an exchange. The raw datum is wrapped in the Message data type, with its key set to "" and its headers to [].

startSupervisedRef :: ExchangeType s -> SupervisorPid -> Process (ProcessId, Message) Source #

Starts an exchange as part of a supervision tree.

Example: > childSpec = toChildStart $ startSupervisedRef exType

broadcastExchange :: Process Exchange Source #

Start a new broadcast exchange and return a handle to the exchange.

broadcastExchangeT :: Process BroadcastExchange Source #

The ExchangeType of a broadcast exchange. Can be combined with the startSupervisedRef and startSupervised APIs.

broadcastClient :: Exchange -> Process (InputStream Message) Source #

Create a binding to the given broadcast exchange for the calling process and return an InputStream that can be used in the expect and receiveWait family of messaging primitives. This form of client interaction helps avoid cluttering the caller's mailbox with Message data, since the InputChannel provides a separate input stream (in a similar fashion to a typed channel). Example:

is <- broadcastClient ex
msg <- receiveWait [ matchInputStream is ]
handleMessage (payload msg)

startExchange :: ExchangeType s -> Process Exchange Source #

Starts an exchange process with the given ExchangeType.

runExchange :: ExchangeType s -> MVar (ControlPort ControlMessage) -> Process () Source #

postMessage :: Exchange -> Message -> Process () Source #

Posts a Message to an exchange.

configureExchange :: Serializable m => Exchange -> m -> Process () Source #

Sends an arbitrary Serializable datum to an exchange, for use as a configuration change - see configureEx for details.

bindToBroadcaster :: Exchange -> Process () Source #

Bind the calling process to the given broadcast exchange. For each Message the exchange receives, only the payload will be sent to the calling process' mailbox.

Example:

(producer) > post ex Hello

(consumer) > bindToBroadcaster ex > expect >>= liftIO . putStrLn

data Binding Source #

The binding key used by the built-in key and header based routers.

Instances

Instances details
Binary Binding Source # 
Instance details

Defined in Control.Distributed.Process.Execution.Exchange.Router

Methods

put :: Binding -> Put #

get :: Get Binding #

putList :: [Binding] -> Put #

NFData Binding Source # 
Instance details

Defined in Control.Distributed.Process.Execution.Exchange.Router

Methods

rnf :: Binding -> () #

Generic Binding Source # 
Instance details

Defined in Control.Distributed.Process.Execution.Exchange.Router

Associated Types

type Rep Binding 
Instance details

Defined in Control.Distributed.Process.Execution.Exchange.Router

type Rep Binding = D1 ('MetaData "Binding" "Control.Distributed.Process.Execution.Exchange.Router" "distributed-process-execution-0.1.5.0-9ft8kt71jqJFQLQ1PfEqJf" 'False) (C1 ('MetaCons "BindKey" 'PrefixI 'True) (S1 ('MetaSel ('Just "bindingKey") 'NoSourceUnpackedness 'SourceStrict 'DecidedStrict) (Rec0 String)) :+: (C1 ('MetaCons "BindHeader" 'PrefixI 'True) (S1 ('MetaSel ('Just "bindingKey") 'NoSourceUnpackedness 'SourceStrict 'DecidedStrict) (Rec0 String) :*: S1 ('MetaSel ('Just "headerName") 'NoSourceUnpackedness 'SourceStrict 'DecidedStrict) (Rec0 HeaderName)) :+: C1 ('MetaCons "BindNone" 'PrefixI 'False) (U1 :: Type -> Type)))

Methods

from :: Binding -> Rep Binding x #

to :: Rep Binding x -> Binding #

Show Binding Source # 
Instance details

Defined in Control.Distributed.Process.Execution.Exchange.Router

Eq Binding Source # 
Instance details

Defined in Control.Distributed.Process.Execution.Exchange.Router

Methods

(==) :: Binding -> Binding -> Bool #

(/=) :: Binding -> Binding -> Bool #

Hashable Binding Source # 
Instance details

Defined in Control.Distributed.Process.Execution.Exchange.Router

Methods

hashWithSalt :: Int -> Binding -> Int #

hash :: Binding -> Int #

type Rep Binding Source # 
Instance details

Defined in Control.Distributed.Process.Execution.Exchange.Router

type Rep Binding = D1 ('MetaData "Binding" "Control.Distributed.Process.Execution.Exchange.Router" "distributed-process-execution-0.1.5.0-9ft8kt71jqJFQLQ1PfEqJf" 'False) (C1 ('MetaCons "BindKey" 'PrefixI 'True) (S1 ('MetaSel ('Just "bindingKey") 'NoSourceUnpackedness 'SourceStrict 'DecidedStrict) (Rec0 String)) :+: (C1 ('MetaCons "BindHeader" 'PrefixI 'True) (S1 ('MetaSel ('Just "bindingKey") 'NoSourceUnpackedness 'SourceStrict 'DecidedStrict) (Rec0 String) :*: S1 ('MetaSel ('Just "headerName") 'NoSourceUnpackedness 'SourceStrict 'DecidedStrict) (Rec0 HeaderName)) :+: C1 ('MetaCons "BindNone" 'PrefixI 'False) (U1 :: Type -> Type)))

class (Hashable k, Eq k, Serializable k) => Bindable k Source #

Things that can be used as binding keys in a router.

Instances

Instances details
(Hashable k, Eq k, Serializable k) => Bindable k Source # 
Instance details

Defined in Control.Distributed.Process.Execution.Exchange.Router

type BindingSelector k = Message -> Process k Source #

Used to convert a Message into a Bindable routing key.

data RelayType Source #

Given to a router to indicate whether clients should receive Message payloads only, or the whole Message object itself.

Constructors

PayloadOnly 
WholeMessage 

router :: Bindable k => RelayType -> BindingSelector k -> Process Exchange Source #

Defines a router exchange. The BindingSelector is used to construct a binding (i.e., an instance of the Bindable type k) for each incoming Message. Such bindings are matched against bindings stored in the exchange. Clients of a router exchange are identified by a binding, mapped to one or more ProcessIds.

The format of the bindings, nature of their storage and mechanism for submitting new bindings is implementation dependent (i.e., will vary by exchange type). For example, the messageKeyRouter and headerContentRouter implementations both use the Binding data type, which can represent a Message key or a HeaderName and content. As with all custom exchange types, bindings should be submitted by evaluating configureExchange with a suitable data type.

supervisedRouter :: Bindable k => RelayType -> BindingSelector k -> SupervisorPid -> Process Exchange Source #

Defines a router that can be used in a supervision tree.

route :: Serializable m => Exchange -> m -> Process () Source #

Send a Serializable message to the supplied Exchange. The given datum will be converted to a Message, with the key set to "" and the headers to [].

The routing behaviour will be dependent on the choice of BindingSelector given when initialising the router.

routeMessage :: Exchange -> Message -> Process () Source #

Send a Message to the supplied Exchange. The routing behaviour will be dependent on the choice of BindingSelector given when initialising the router.

messageKeyRouter :: RelayType -> Process Exchange Source #

A router that matches on a Message key. To bind a client Process to such an exchange, use the bindKey function.

bindKey :: String -> Exchange -> Process () Source #

Add a binding (for the calling process) to a messageKeyRouter exchange.

headerContentRouter :: RelayType -> HeaderName -> Process Exchange Source #

A router that matches on a specific (named) header. To bind a client Process to such an exchange, use the bindHeader function.

bindHeader :: HeaderName -> String -> Exchange -> Process () Source #

Add a binding (for the calling process) to a headerContentRouter exchange.

applyHandlers :: a -> Message -> [Message -> Process (Maybe a)] -> Process a Source #

Utility for custom exchange type authors - evaluates a set of primitive message handlers from left to right, returning the first which evaluates to Just a, or the initial e value if all the handlers yield Nothing.