Copyright | (c) Tim Watson 2013 - 2014 |
---|---|
License | BSD3 (see the file LICENSE) |
Maintainer | Tim Watson <[email protected]> |
Stability | experimental |
Portability | non-portable (requires concurrency) |
Safe Haskell | None |
Language | Haskell2010 |
Control.Distributed.Process.Execution
Description
- Inter-Process Traffic Management
The Execution Framework provides tools for load regulation, workload shedding and remote hand-off. The currently implementation provides only a subset of the plumbing required, comprising tools for event management, mailbox buffering and message routing.
Synopsis
- statistics :: Mailbox -> Process MailboxStats
- active :: Mailbox -> Filter -> Process ()
- data BufferType
- data Delivery = Delivery {}
- type Limit = Integer
- data Mailbox
- monitor :: Mailbox -> Process MonitorRef
- deliver :: Mailbox -> Process ()
- __remoteTable :: RemoteTable -> RemoteTable
- data FilterResult
- notify :: Mailbox -> Process ()
- startMailbox :: ProcessId -> BufferType -> Limit -> Process Mailbox
- startSupervisedMailbox :: ProcessId -> BufferType -> Limit -> SupervisorPid -> Process Mailbox
- createMailbox :: BufferType -> Limit -> Process Mailbox
- resize :: Mailbox -> Integer -> Process ()
- data MailboxStats = MailboxStats {}
- data NewMail = NewMail !Mailbox !Integer
- acceptEverything :: Closure (Message -> Process FilterResult)
- acceptMatching :: Closure (Closure (Message -> Process FilterResult) -> Message -> Process FilterResult)
- data Message = Message {}
- createMessage :: Serializable m => String -> [(String, String)] -> m -> Message
- data ExchangeType s = ExchangeType {}
- post :: Serializable a => Exchange -> a -> Process ()
- data Exchange
- startSupervisedRef :: ExchangeType s -> SupervisorPid -> Process (ProcessId, Message)
- broadcastExchange :: Process Exchange
- broadcastExchangeT :: Process BroadcastExchange
- broadcastClient :: Exchange -> Process (InputStream Message)
- startExchange :: ExchangeType s -> Process Exchange
- runExchange :: ExchangeType s -> MVar (ControlPort ControlMessage) -> Process ()
- postMessage :: Exchange -> Message -> Process ()
- configureExchange :: Serializable m => Exchange -> m -> Process ()
- bindToBroadcaster :: Exchange -> Process ()
- type BroadcastExchange = ExchangeType BroadcastEx
- type HeaderName = String
- data Binding
- = BindKey {
- bindingKey :: !String
- | BindHeader {
- bindingKey :: !String
- headerName :: !HeaderName
- | BindNone
- = BindKey {
- class (Hashable k, Eq k, Serializable k) => Bindable k
- type BindingSelector k = Message -> Process k
- data RelayType
- router :: Bindable k => RelayType -> BindingSelector k -> Process Exchange
- supervisedRouter :: Bindable k => RelayType -> BindingSelector k -> SupervisorPid -> Process Exchange
- route :: Serializable m => Exchange -> m -> Process ()
- routeMessage :: Exchange -> Message -> Process ()
- messageKeyRouter :: RelayType -> Process Exchange
- bindKey :: String -> Exchange -> Process ()
- headerContentRouter :: RelayType -> HeaderName -> Process Exchange
- bindHeader :: HeaderName -> String -> Exchange -> Process ()
- applyHandlers :: a -> Message -> [Message -> Process (Maybe a)] -> Process a
Mailbox Buffering
statistics :: Mailbox -> Process MailboxStats Source #
Obtain statistics (from/to anywhere) about a mailbox.
active :: Mailbox -> Filter -> Process () Source #
Instructs the mailbox to send a Delivery
as soon as any mail is
available, or immediately (if the buffer already contains data).
NB: signals are only delivered to the mailbox's owning process.
data BufferType Source #
Describes the different types of buffer.
Constructors
Queue | FIFO buffer, limiter drops the eldest message (queue head) |
Stack | unordered buffer, limiter drops the newest (top) message |
Ring | FIFO buffer, limiter refuses (i.e., drops) new messages |
Instances
Show BufferType Source # | |
Defined in Control.Distributed.Process.Execution.Mailbox Methods showsPrec :: Int -> BufferType -> ShowS # show :: BufferType -> String # showList :: [BufferType] -> ShowS # | |
Eq BufferType Source # | |
Mail delivery.
Constructors
Delivery | |
Instances
Represents the maximum number of messages the internal buffer can hold.
Opaque handle to a mailbox.
Instances
Binary Mailbox Source # | |
Addressable Mailbox Source # | |
Linkable Mailbox Source # | |
Resolvable Mailbox Source # | |
Routable Mailbox Source # | |
Defined in Control.Distributed.Process.Execution.Mailbox Methods sendTo :: (Serializable m, Resolvable Mailbox) => Mailbox -> m -> Process () # unsafeSendTo :: (NFSerializable m, Resolvable Mailbox) => Mailbox -> m -> Process () # | |
Generic Mailbox Source # | |
Show Mailbox Source # | |
Eq Mailbox Source # | |
type Rep Mailbox Source # | |
deliver :: Mailbox -> Process () Source #
Instructs the mailbox to deliver all pending messages to the owner.
data FilterResult Source #
Instances
Binary FilterResult Source # | |||||
Generic FilterResult Source # | |||||
Defined in Control.Distributed.Process.Execution.Mailbox Associated Types
| |||||
type Rep FilterResult Source # | |||||
Defined in Control.Distributed.Process.Execution.Mailbox type Rep FilterResult = D1 ('MetaData "FilterResult" "Control.Distributed.Process.Execution.Mailbox" "distributed-process-execution-0.1.5.0-9ft8kt71jqJFQLQ1PfEqJf" 'False) (C1 ('MetaCons "Keep" 'PrefixI 'False) (U1 :: Type -> Type) :+: (C1 ('MetaCons "Skip" 'PrefixI 'False) (U1 :: Type -> Type) :+: C1 ('MetaCons "Send" 'PrefixI 'False) (U1 :: Type -> Type))) |
startMailbox :: ProcessId -> BufferType -> Limit -> Process Mailbox Source #
Start a mailbox for the supplied ProcessId
.
start = spawnLocal $ run
startSupervisedMailbox :: ProcessId -> BufferType -> Limit -> SupervisorPid -> Process Mailbox Source #
As startMailbox
, but suitable for use in supervisor child specs.
createMailbox :: BufferType -> Limit -> Process Mailbox Source #
Start a mailbox for the calling process.
create = getSelfPid >>= start
resize :: Mailbox -> Integer -> Process () Source #
Alters the mailbox's limit - this might cause messages to be dropped!
data MailboxStats Source #
Bundle of statistics data, available on request via
the mailboxStats
API call.
Constructors
MailboxStats | |
Fields |
Instances
Binary MailboxStats Source # | |||||
Generic MailboxStats Source # | |||||
Defined in Control.Distributed.Process.Execution.Mailbox Associated Types
| |||||
Show MailboxStats Source # | |||||
Defined in Control.Distributed.Process.Execution.Mailbox Methods showsPrec :: Int -> MailboxStats -> ShowS # show :: MailboxStats -> String # showList :: [MailboxStats] -> ShowS # | |||||
type Rep MailboxStats Source # | |||||
Defined in Control.Distributed.Process.Execution.Mailbox type Rep MailboxStats = D1 ('MetaData "MailboxStats" "Control.Distributed.Process.Execution.Mailbox" "distributed-process-execution-0.1.5.0-9ft8kt71jqJFQLQ1PfEqJf" 'False) (C1 ('MetaCons "MailboxStats" 'PrefixI 'True) ((S1 ('MetaSel ('Just "pendingMessages") 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 Integer) :*: S1 ('MetaSel ('Just "droppedMessages") 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 Integer)) :*: (S1 ('MetaSel ('Just "currentLimit") 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 Limit) :*: S1 ('MetaSel ('Just "owningProcess") 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 ProcessId)))) |
Marker message indicating to the owning process that mail has arrived.
Instances
Binary NewMail Source # | |||||
Generic NewMail Source # | |||||
Defined in Control.Distributed.Process.Execution.Mailbox Associated Types
| |||||
Show NewMail Source # | |||||
type Rep NewMail Source # | |||||
Defined in Control.Distributed.Process.Execution.Mailbox type Rep NewMail = D1 ('MetaData "NewMail" "Control.Distributed.Process.Execution.Mailbox" "distributed-process-execution-0.1.5.0-9ft8kt71jqJFQLQ1PfEqJf" 'False) (C1 ('MetaCons "NewMail" 'PrefixI 'False) (S1 ('MetaSel ('Nothing :: Maybe Symbol) 'NoSourceUnpackedness 'SourceStrict 'DecidedStrict) (Rec0 Mailbox) :*: S1 ('MetaSel ('Nothing :: Maybe Symbol) 'NoSourceUnpackedness 'SourceStrict 'DecidedStrict) (Rec0 Integer))) |
acceptEverything :: Closure (Message -> Process FilterResult) Source #
A do-nothing filter that accepts all messages (i.e., returns Keep
for any input).
acceptMatching :: Closure (Closure (Message -> Process FilterResult) -> Message -> Process FilterResult) Source #
A filter that takes a Closure (Message -> Process FilterResult)
holding
the filter function and applies it remotely (i.e., in the mailbox's own
managed process).
Message Exchanges
Messages sent to an exchange can optionally provide a routing key and a list of (key, value) headers in addition to the underlying payload.
Constructors
Message | |
Instances
Binary Message Source # | |||||
NFData Message Source # | |||||
Generic Message Source # | |||||
Defined in Control.Distributed.Process.Execution.Exchange.Internal Associated Types
| |||||
Show Message Source # | |||||
type Rep Message Source # | |||||
Defined in Control.Distributed.Process.Execution.Exchange.Internal type Rep Message = D1 ('MetaData "Message" "Control.Distributed.Process.Execution.Exchange.Internal" "distributed-process-execution-0.1.5.0-9ft8kt71jqJFQLQ1PfEqJf" 'False) (C1 ('MetaCons "Message" 'PrefixI 'True) (S1 ('MetaSel ('Just "key") 'NoSourceUnpackedness 'SourceStrict 'DecidedStrict) (Rec0 String) :*: (S1 ('MetaSel ('Just "headers") 'NoSourceUnpackedness 'SourceStrict 'DecidedStrict) (Rec0 [(String, String)]) :*: S1 ('MetaSel ('Just "payload") 'NoSourceUnpackedness 'SourceStrict 'DecidedStrict) (Rec0 Message)))) |
createMessage :: Serializable m => String -> [(String, String)] -> m -> Message Source #
data ExchangeType s Source #
Different exchange types are defined using record syntax.
The configureEx
and routeEx
API functions are called during the exchange
lifecycle when incoming traffic arrives. Configuration messages are
completely arbitrary types and the exchange type author is entirely
responsible for decoding them. Messages posted to the exchange (see the
Message
data type) are passed to the routeEx
API function along with the
exchange type's own internal state. Both API functions return a new
(potentially updated) state and run in the Process
monad.
Constructors
ExchangeType | |
post :: Serializable a => Exchange -> a -> Process () Source #
Posts an arbitrary Serializable
datum to an exchange. The raw datum is
wrapped in the Message
data type, with its key
set to ""
and its
headers
to []
.
Opaque handle to an exchange.
Instances
Binary Exchange Source # | |
Linkable Exchange Source # | |
Resolvable Exchange Source # | |
Generic Exchange Source # | |
Show Exchange Source # | |
Eq Exchange Source # | |
type Rep Exchange Source # | |
startSupervisedRef :: ExchangeType s -> SupervisorPid -> Process (ProcessId, Message) Source #
Starts an exchange as part of a supervision tree.
Example: > childSpec = toChildStart $ startSupervisedRef exType
broadcastExchange :: Process Exchange Source #
Start a new broadcast exchange and return a handle to the exchange.
broadcastExchangeT :: Process BroadcastExchange Source #
The ExchangeType
of a broadcast exchange. Can be combined with the
startSupervisedRef
and startSupervised
APIs.
broadcastClient :: Exchange -> Process (InputStream Message) Source #
Create a binding to the given broadcast exchange for the calling process
and return an InputStream
that can be used in the expect
and
receiveWait
family of messaging primitives. This form of client interaction
helps avoid cluttering the caller's mailbox with Message
data, since the
InputChannel
provides a separate input stream (in a similar fashion to
a typed channel).
Example:
is <- broadcastClient ex msg <- receiveWait [ matchInputStream is ] handleMessage (payload msg)
startExchange :: ExchangeType s -> Process Exchange Source #
Starts an exchange process with the given ExchangeType
.
runExchange :: ExchangeType s -> MVar (ControlPort ControlMessage) -> Process () Source #
configureExchange :: Serializable m => Exchange -> m -> Process () Source #
Sends an arbitrary Serializable
datum to an exchange, for use as a
configuration change - see configureEx
for details.
bindToBroadcaster :: Exchange -> Process () Source #
type BroadcastExchange = ExchangeType BroadcastEx Source #
type HeaderName = String Source #
The binding key used by the built-in key and header based routers.
Constructors
BindKey | |
Fields
| |
BindHeader | |
Fields
| |
BindNone |
Instances
Binary Binding Source # | |||||
NFData Binding Source # | |||||
Generic Binding Source # | |||||
Defined in Control.Distributed.Process.Execution.Exchange.Router Associated Types
| |||||
Show Binding Source # | |||||
Eq Binding Source # | |||||
Hashable Binding Source # | |||||
type Rep Binding Source # | |||||
Defined in Control.Distributed.Process.Execution.Exchange.Router type Rep Binding = D1 ('MetaData "Binding" "Control.Distributed.Process.Execution.Exchange.Router" "distributed-process-execution-0.1.5.0-9ft8kt71jqJFQLQ1PfEqJf" 'False) (C1 ('MetaCons "BindKey" 'PrefixI 'True) (S1 ('MetaSel ('Just "bindingKey") 'NoSourceUnpackedness 'SourceStrict 'DecidedStrict) (Rec0 String)) :+: (C1 ('MetaCons "BindHeader" 'PrefixI 'True) (S1 ('MetaSel ('Just "bindingKey") 'NoSourceUnpackedness 'SourceStrict 'DecidedStrict) (Rec0 String) :*: S1 ('MetaSel ('Just "headerName") 'NoSourceUnpackedness 'SourceStrict 'DecidedStrict) (Rec0 HeaderName)) :+: C1 ('MetaCons "BindNone" 'PrefixI 'False) (U1 :: Type -> Type))) |
class (Hashable k, Eq k, Serializable k) => Bindable k Source #
Things that can be used as binding keys in a router.
type BindingSelector k = Message -> Process k Source #
Given to a router to indicate whether clients should
receive Message
payloads only, or the whole Message
object
itself.
Constructors
PayloadOnly | |
WholeMessage |
router :: Bindable k => RelayType -> BindingSelector k -> Process Exchange Source #
Defines a router exchange. The BindingSelector
is used to construct
a binding (i.e., an instance of the Bindable
type k
) for each incoming
Message
. Such bindings are matched against bindings stored in the exchange.
Clients of a router exchange are identified by a binding, mapped to
one or more ProcessId
s.
The format of the bindings, nature of their storage and mechanism for
submitting new bindings is implementation dependent (i.e., will vary by
exchange type). For example, the messageKeyRouter
and headerContentRouter
implementations both use the Binding
data type, which can represent a
Message
key or a HeaderName
and content. As with all custom exchange
types, bindings should be submitted by evaluating configureExchange
with
a suitable data type.
supervisedRouter :: Bindable k => RelayType -> BindingSelector k -> SupervisorPid -> Process Exchange Source #
Defines a router that can be used in a supervision tree.
route :: Serializable m => Exchange -> m -> Process () Source #
Send a Serializable
message to the supplied Exchange
. The given datum
will be converted to a Message
, with the key
set to ""
and the
headers
to []
.
The routing behaviour will be dependent on the choice of BindingSelector
given when initialising the router.
routeMessage :: Exchange -> Message -> Process () Source #
Send a Message
to the supplied Exchange
.
The routing behaviour will be dependent on the choice of BindingSelector
given when initialising the router.
bindKey :: String -> Exchange -> Process () Source #
Add a binding (for the calling process) to a messageKeyRouter
exchange.
headerContentRouter :: RelayType -> HeaderName -> Process Exchange Source #
A router that matches on a specific (named) header. To bind a client
Process
to such an exchange, use the bindHeader
function.
bindHeader :: HeaderName -> String -> Exchange -> Process () Source #
Add a binding (for the calling process) to a headerContentRouter
exchange.