| Copyright | (c) Tim Watson 2013 - 2014 |
|---|---|
| License | BSD3 (see the file LICENSE) |
| Maintainer | Tim Watson <watson.timothy@gmail.com> |
| Stability | experimental |
| Portability | non-portable (requires concurrency) |
| Safe Haskell | None |
| Language | Haskell2010 |
Control.Distributed.Process.Execution
Description
- Inter-Process Traffic Management
The Execution Framework provides tools for load regulation, workload shedding and remote hand-off. The currently implementation provides only a subset of the plumbing required, comprising tools for event management, mailbox buffering and message routing.
Synopsis
- active :: Mailbox -> Filter -> Process ()
- data Delivery = Delivery {}
- statistics :: Mailbox -> Process MailboxStats
- data BufferType
- type Limit = Integer
- data Mailbox
- monitor :: Mailbox -> Process MonitorRef
- deliver :: Mailbox -> Process ()
- __remoteTable :: RemoteTable -> RemoteTable
- data FilterResult
- notify :: Mailbox -> Process ()
- startMailbox :: ProcessId -> BufferType -> Limit -> Process Mailbox
- startSupervisedMailbox :: ProcessId -> BufferType -> Limit -> SupervisorPid -> Process Mailbox
- createMailbox :: BufferType -> Limit -> Process Mailbox
- resize :: Mailbox -> Integer -> Process ()
- data MailboxStats = MailboxStats {}
- data NewMail = NewMail !Mailbox !Integer
- acceptEverything :: Closure (Message -> Process FilterResult)
- acceptMatching :: Closure (Closure (Message -> Process FilterResult) -> Message -> Process FilterResult)
- data Message = Message {}
- createMessage :: Serializable m => String -> [(String, String)] -> m -> Message
- data ExchangeType s = ExchangeType {}
- post :: Serializable a => Exchange -> a -> Process ()
- data Exchange
- startSupervisedRef :: ExchangeType s -> SupervisorPid -> Process (ProcessId, Message)
- broadcastExchange :: Process Exchange
- broadcastExchangeT :: Process BroadcastExchange
- broadcastClient :: Exchange -> Process (InputStream Message)
- startExchange :: ExchangeType s -> Process Exchange
- runExchange :: ExchangeType s -> MVar (ControlPort ControlMessage) -> Process ()
- postMessage :: Exchange -> Message -> Process ()
- configureExchange :: Serializable m => Exchange -> m -> Process ()
- bindToBroadcaster :: Exchange -> Process ()
- type BroadcastExchange = ExchangeType BroadcastEx
- type HeaderName = String
- data Binding
- = BindKey {
- bindingKey :: !String
- | BindHeader {
- bindingKey :: !String
- headerName :: !HeaderName
- | BindNone
- = BindKey {
- class (Hashable k, Eq k, Serializable k) => Bindable k
- type BindingSelector k = Message -> Process k
- data RelayType
- router :: Bindable k => RelayType -> BindingSelector k -> Process Exchange
- supervisedRouter :: Bindable k => RelayType -> BindingSelector k -> SupervisorPid -> Process Exchange
- route :: Serializable m => Exchange -> m -> Process ()
- routeMessage :: Exchange -> Message -> Process ()
- messageKeyRouter :: RelayType -> Process Exchange
- bindKey :: String -> Exchange -> Process ()
- headerContentRouter :: RelayType -> HeaderName -> Process Exchange
- bindHeader :: HeaderName -> String -> Exchange -> Process ()
- applyHandlers :: a -> Message -> [Message -> Process (Maybe a)] -> Process a
Mailbox Buffering
active :: Mailbox -> Filter -> Process () Source #
Instructs the mailbox to send a Delivery as soon as any mail is
available, or immediately (if the buffer already contains data).
NB: signals are only delivered to the mailbox's owning process.
Mail delivery.
Constructors
| Delivery | |
Instances
statistics :: Mailbox -> Process MailboxStats Source #
Obtain statistics (from/to anywhere) about a mailbox.
data BufferType Source #
Describes the different types of buffer.
Constructors
| Queue | FIFO buffer, limiter drops the eldest message (queue head) |
| Stack | unordered buffer, limiter drops the newest (top) message |
| Ring | FIFO buffer, limiter refuses (i.e., drops) new messages |
Instances
| Show BufferType Source # | |
Defined in Control.Distributed.Process.Execution.Mailbox Methods showsPrec :: Int -> BufferType -> ShowS # show :: BufferType -> String # showList :: [BufferType] -> ShowS # | |
| Eq BufferType Source # | |
Represents the maximum number of messages the internal buffer can hold.
Opaque handle to a mailbox.
Instances
| Binary Mailbox Source # | |
| Addressable Mailbox Source # | |
| Linkable Mailbox Source # | |
| Resolvable Mailbox Source # | |
| Routable Mailbox Source # | |
Defined in Control.Distributed.Process.Execution.Mailbox Methods sendTo :: (Serializable m, Resolvable Mailbox) => Mailbox -> m -> Process () # unsafeSendTo :: (NFSerializable m, Resolvable Mailbox) => Mailbox -> m -> Process () # | |
| Generic Mailbox Source # | |
| Show Mailbox Source # | |
| Eq Mailbox Source # | |
| type Rep Mailbox Source # | |
deliver :: Mailbox -> Process () Source #
Instructs the mailbox to deliver all pending messages to the owner.
data FilterResult Source #
Instances
| Binary FilterResult Source # | |||||
| Generic FilterResult Source # | |||||
Defined in Control.Distributed.Process.Execution.Mailbox Associated Types
| |||||
| type Rep FilterResult Source # | |||||
Defined in Control.Distributed.Process.Execution.Mailbox type Rep FilterResult = D1 ('MetaData "FilterResult" "Control.Distributed.Process.Execution.Mailbox" "distributed-process-execution-0.1.5.0-AioyE4oZTIc48yHL37wGQ" 'False) (C1 ('MetaCons "Keep" 'PrefixI 'False) (U1 :: Type -> Type) :+: (C1 ('MetaCons "Skip" 'PrefixI 'False) (U1 :: Type -> Type) :+: C1 ('MetaCons "Send" 'PrefixI 'False) (U1 :: Type -> Type))) | |||||
startMailbox :: ProcessId -> BufferType -> Limit -> Process Mailbox Source #
Start a mailbox for the supplied ProcessId.
start = spawnLocal $ run
startSupervisedMailbox :: ProcessId -> BufferType -> Limit -> SupervisorPid -> Process Mailbox Source #
As startMailbox, but suitable for use in supervisor child specs.
createMailbox :: BufferType -> Limit -> Process Mailbox Source #
Start a mailbox for the calling process.
create = getSelfPid >>= start
resize :: Mailbox -> Integer -> Process () Source #
Alters the mailbox's limit - this might cause messages to be dropped!
data MailboxStats Source #
Bundle of statistics data, available on request via
the mailboxStats API call.
Constructors
| MailboxStats | |
Fields | |
Instances
| Binary MailboxStats Source # | |||||
| Generic MailboxStats Source # | |||||
Defined in Control.Distributed.Process.Execution.Mailbox Associated Types
| |||||
| Show MailboxStats Source # | |||||
Defined in Control.Distributed.Process.Execution.Mailbox Methods showsPrec :: Int -> MailboxStats -> ShowS # show :: MailboxStats -> String # showList :: [MailboxStats] -> ShowS # | |||||
| type Rep MailboxStats Source # | |||||
Defined in Control.Distributed.Process.Execution.Mailbox type Rep MailboxStats = D1 ('MetaData "MailboxStats" "Control.Distributed.Process.Execution.Mailbox" "distributed-process-execution-0.1.5.0-AioyE4oZTIc48yHL37wGQ" 'False) (C1 ('MetaCons "MailboxStats" 'PrefixI 'True) ((S1 ('MetaSel ('Just "pendingMessages") 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 Integer) :*: S1 ('MetaSel ('Just "droppedMessages") 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 Integer)) :*: (S1 ('MetaSel ('Just "currentLimit") 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 Limit) :*: S1 ('MetaSel ('Just "owningProcess") 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 ProcessId)))) | |||||
Marker message indicating to the owning process that mail has arrived.
Instances
| Binary NewMail Source # | |||||
| Generic NewMail Source # | |||||
Defined in Control.Distributed.Process.Execution.Mailbox Associated Types
| |||||
| Show NewMail Source # | |||||
| type Rep NewMail Source # | |||||
Defined in Control.Distributed.Process.Execution.Mailbox type Rep NewMail = D1 ('MetaData "NewMail" "Control.Distributed.Process.Execution.Mailbox" "distributed-process-execution-0.1.5.0-AioyE4oZTIc48yHL37wGQ" 'False) (C1 ('MetaCons "NewMail" 'PrefixI 'False) (S1 ('MetaSel ('Nothing :: Maybe Symbol) 'NoSourceUnpackedness 'SourceStrict 'DecidedStrict) (Rec0 Mailbox) :*: S1 ('MetaSel ('Nothing :: Maybe Symbol) 'NoSourceUnpackedness 'SourceStrict 'DecidedStrict) (Rec0 Integer))) | |||||
acceptEverything :: Closure (Message -> Process FilterResult) Source #
A do-nothing filter that accepts all messages (i.e., returns Keep
for any input).
acceptMatching :: Closure (Closure (Message -> Process FilterResult) -> Message -> Process FilterResult) Source #
A filter that takes a Closure (Message -> Process FilterResult) holding
the filter function and applies it remotely (i.e., in the mailbox's own
managed process).
Message Exchanges
Messages sent to an exchange can optionally provide a routing key and a list of (key, value) headers in addition to the underlying payload.
Constructors
| Message | |
Instances
| Binary Message Source # | |||||
| NFData Message Source # | |||||
| Generic Message Source # | |||||
Defined in Control.Distributed.Process.Execution.Exchange.Internal Associated Types
| |||||
| Show Message Source # | |||||
| type Rep Message Source # | |||||
Defined in Control.Distributed.Process.Execution.Exchange.Internal type Rep Message = D1 ('MetaData "Message" "Control.Distributed.Process.Execution.Exchange.Internal" "distributed-process-execution-0.1.5.0-AioyE4oZTIc48yHL37wGQ" 'False) (C1 ('MetaCons "Message" 'PrefixI 'True) (S1 ('MetaSel ('Just "key") 'NoSourceUnpackedness 'SourceStrict 'DecidedStrict) (Rec0 String) :*: (S1 ('MetaSel ('Just "headers") 'NoSourceUnpackedness 'SourceStrict 'DecidedStrict) (Rec0 [(String, String)]) :*: S1 ('MetaSel ('Just "payload") 'NoSourceUnpackedness 'SourceStrict 'DecidedStrict) (Rec0 Message)))) | |||||
createMessage :: Serializable m => String -> [(String, String)] -> m -> Message Source #
data ExchangeType s Source #
Different exchange types are defined using record syntax.
The configureEx and routeEx API functions are called during the exchange
lifecycle when incoming traffic arrives. Configuration messages are
completely arbitrary types and the exchange type author is entirely
responsible for decoding them. Messages posted to the exchange (see the
Message data type) are passed to the routeEx API function along with the
exchange type's own internal state. Both API functions return a new
(potentially updated) state and run in the Process monad.
Constructors
| ExchangeType | |
post :: Serializable a => Exchange -> a -> Process () Source #
Posts an arbitrary Serializable datum to an exchange. The raw datum is
wrapped in the Message data type, with its key set to "" and its
headers to [].
Opaque handle to an exchange.
Instances
| Binary Exchange Source # | |
| Linkable Exchange Source # | |
| Resolvable Exchange Source # | |
| Generic Exchange Source # | |
| Show Exchange Source # | |
| Eq Exchange Source # | |
| type Rep Exchange Source # | |
startSupervisedRef :: ExchangeType s -> SupervisorPid -> Process (ProcessId, Message) Source #
Starts an exchange as part of a supervision tree.
Example: > childSpec = toChildStart $ startSupervisedRef exType
broadcastExchange :: Process Exchange Source #
Start a new broadcast exchange and return a handle to the exchange.
broadcastExchangeT :: Process BroadcastExchange Source #
The ExchangeType of a broadcast exchange. Can be combined with the
startSupervisedRef and startSupervised APIs.
broadcastClient :: Exchange -> Process (InputStream Message) Source #
Create a binding to the given broadcast exchange for the calling process
and return an InputStream that can be used in the expect and
receiveWait family of messaging primitives. This form of client interaction
helps avoid cluttering the caller's mailbox with Message data, since the
InputChannel provides a separate input stream (in a similar fashion to
a typed channel).
Example:
is <- broadcastClient ex msg <- receiveWait [ matchInputStream is ] handleMessage (payload msg)
startExchange :: ExchangeType s -> Process Exchange Source #
Starts an exchange process with the given ExchangeType.
runExchange :: ExchangeType s -> MVar (ControlPort ControlMessage) -> Process () Source #
configureExchange :: Serializable m => Exchange -> m -> Process () Source #
Sends an arbitrary Serializable datum to an exchange, for use as a
configuration change - see configureEx for details.
bindToBroadcaster :: Exchange -> Process () Source #
type BroadcastExchange = ExchangeType BroadcastEx Source #
type HeaderName = String Source #
The binding key used by the built-in key and header based routers.
Constructors
| BindKey | |
Fields
| |
| BindHeader | |
Fields
| |
| BindNone | |
Instances
| Binary Binding Source # | |||||
| NFData Binding Source # | |||||
| Generic Binding Source # | |||||
Defined in Control.Distributed.Process.Execution.Exchange.Router Associated Types
| |||||
| Show Binding Source # | |||||
| Eq Binding Source # | |||||
| Hashable Binding Source # | |||||
| type Rep Binding Source # | |||||
Defined in Control.Distributed.Process.Execution.Exchange.Router type Rep Binding = D1 ('MetaData "Binding" "Control.Distributed.Process.Execution.Exchange.Router" "distributed-process-execution-0.1.5.0-AioyE4oZTIc48yHL37wGQ" 'False) (C1 ('MetaCons "BindKey" 'PrefixI 'True) (S1 ('MetaSel ('Just "bindingKey") 'NoSourceUnpackedness 'SourceStrict 'DecidedStrict) (Rec0 String)) :+: (C1 ('MetaCons "BindHeader" 'PrefixI 'True) (S1 ('MetaSel ('Just "bindingKey") 'NoSourceUnpackedness 'SourceStrict 'DecidedStrict) (Rec0 String) :*: S1 ('MetaSel ('Just "headerName") 'NoSourceUnpackedness 'SourceStrict 'DecidedStrict) (Rec0 HeaderName)) :+: C1 ('MetaCons "BindNone" 'PrefixI 'False) (U1 :: Type -> Type))) | |||||
class (Hashable k, Eq k, Serializable k) => Bindable k Source #
Things that can be used as binding keys in a router.
type BindingSelector k = Message -> Process k Source #
Given to a router to indicate whether clients should
receive Message payloads only, or the whole Message object
itself.
Constructors
| PayloadOnly | |
| WholeMessage |
router :: Bindable k => RelayType -> BindingSelector k -> Process Exchange Source #
Defines a router exchange. The BindingSelector is used to construct
a binding (i.e., an instance of the Bindable type k) for each incoming
Message. Such bindings are matched against bindings stored in the exchange.
Clients of a router exchange are identified by a binding, mapped to
one or more ProcessIds.
The format of the bindings, nature of their storage and mechanism for
submitting new bindings is implementation dependent (i.e., will vary by
exchange type). For example, the messageKeyRouter and headerContentRouter
implementations both use the Binding data type, which can represent a
Message key or a HeaderName and content. As with all custom exchange
types, bindings should be submitted by evaluating configureExchange with
a suitable data type.
supervisedRouter :: Bindable k => RelayType -> BindingSelector k -> SupervisorPid -> Process Exchange Source #
Defines a router that can be used in a supervision tree.
route :: Serializable m => Exchange -> m -> Process () Source #
Send a Serializable message to the supplied Exchange. The given datum
will be converted to a Message, with the key set to "" and the
headers to [].
The routing behaviour will be dependent on the choice of BindingSelector
given when initialising the router.
routeMessage :: Exchange -> Message -> Process () Source #
Send a Message to the supplied Exchange.
The routing behaviour will be dependent on the choice of BindingSelector
given when initialising the router.
bindKey :: String -> Exchange -> Process () Source #
Add a binding (for the calling process) to a messageKeyRouter exchange.
headerContentRouter :: RelayType -> HeaderName -> Process Exchange Source #
A router that matches on a specific (named) header. To bind a client
Process to such an exchange, use the bindHeader function.
bindHeader :: HeaderName -> String -> Exchange -> Process () Source #
Add a binding (for the calling process) to a headerContentRouter exchange.