distributed-process-execution-0.1.5.0: Execution Framework for The Cloud Haskell Application Platform
Copyright(c) Tim Watson 2012 - 2014
LicenseBSD3 (see the file LICENSE)
MaintainerTim Watson <[email protected]>
Stabilityexperimental
Portabilitynon-portable (requires concurrency)
Safe HaskellNone
LanguageHaskell2010

Control.Distributed.Process.Execution.Exchange

Description

Message Exchanges

The concept of a message exchange is borrowed from the world of messaging and enterprise integration. The exchange acts like a kind of mailbox, accepting inputs from producers and forwarding these messages to one or more consumers, depending on the implementation's semantics.

This module provides some basic types of message exchange and exposes an API for defining your own custom exchange types.

Broadcast Exchanges

The broadcast exchange type, started via broadcastExchange, forward their inputs to all registered consumers (as the name suggests). This exchange type is highly optimised for local (intra-node) traffic and provides two different kinds of client binding, one which causes messages to be delivered directly to the client's mailbox (viz bindToBroadcaster), the other providing a separate stream of messages that can be obtained using the expect and receiveX family of messaging primitives (and thus composed with other forms of input selection, such as typed channels and selective reads on the process mailbox).

Important: When a ProcessId is registered via bindToBroadcaster, only the payload of the Message (i.e., the underlying Serializable datum) is forwarded to the consumer, not the whole Message itself.

Router Exchanges

The router API provides a means to selectively route messages to one or more clients, depending on the content of the Message. Two modes of binding (and client selection) are provided out of the box, one of which matches the message key, the second of which matches on a name and value from the headers. Alternative mechanisms for content based routing can be derived by modifying the BindingSelector expression passed to router

See messageKeyRouter and headerContentRouter for the built-in routing exchanges, and router for the extensible routing API.

Custom Exchange Types

Both the broadcast and router exchanges are implemented as custom exchange types. The mechanism for defining custom exchange behaviours such as these is very simple. Raw exchanges are started by evaluating startExchange with a specific ExchangeType record. This type is parameterised by the internal state it holds, and defines two API callbacks in its configureEx and routeEx fields. The former is evaluated whenever a client process evaluates configureExchange, the latter whenever a client evaluates post or postMessage. The configureEx callback takes a raw Message (from Control.Distributed.Process) and is responsible for decoding the message and updating its own state (if required). It is via this callback that custom exchange types can receive information about clients and handle it in their own way. The routeEx callback is evaluated with the exchange type's own internal state and the Message originally sent to the exchange process (via post) and is responsible for delivering the message to its clients in whatever way makes sense for that exchange type.

Synopsis

Fundamental API

data Message Source #

Messages sent to an exchange can optionally provide a routing key and a list of (key, value) headers in addition to the underlying payload.

Constructors

Message 

Fields

Instances

Instances details
Binary Message Source # 
Instance details

Defined in Control.Distributed.Process.Execution.Exchange.Internal

Methods

put :: Message -> Put #

get :: Get Message #

putList :: [Message] -> Put #

NFData Message Source # 
Instance details

Defined in Control.Distributed.Process.Execution.Exchange.Internal

Methods

rnf :: Message -> () #

Generic Message Source # 
Instance details

Defined in Control.Distributed.Process.Execution.Exchange.Internal

Associated Types

type Rep Message 
Instance details

Defined in Control.Distributed.Process.Execution.Exchange.Internal

type Rep Message = D1 ('MetaData "Message" "Control.Distributed.Process.Execution.Exchange.Internal" "distributed-process-execution-0.1.5.0-9ft8kt71jqJFQLQ1PfEqJf" 'False) (C1 ('MetaCons "Message" 'PrefixI 'True) (S1 ('MetaSel ('Just "key") 'NoSourceUnpackedness 'SourceStrict 'DecidedStrict) (Rec0 String) :*: (S1 ('MetaSel ('Just "headers") 'NoSourceUnpackedness 'SourceStrict 'DecidedStrict) (Rec0 [(String, String)]) :*: S1 ('MetaSel ('Just "payload") 'NoSourceUnpackedness 'SourceStrict 'DecidedStrict) (Rec0 Message))))

Methods

from :: Message -> Rep Message x #

to :: Rep Message x -> Message #

Show Message Source # 
Instance details

Defined in Control.Distributed.Process.Execution.Exchange.Internal

type Rep Message Source # 
Instance details

Defined in Control.Distributed.Process.Execution.Exchange.Internal

type Rep Message = D1 ('MetaData "Message" "Control.Distributed.Process.Execution.Exchange.Internal" "distributed-process-execution-0.1.5.0-9ft8kt71jqJFQLQ1PfEqJf" 'False) (C1 ('MetaCons "Message" 'PrefixI 'True) (S1 ('MetaSel ('Just "key") 'NoSourceUnpackedness 'SourceStrict 'DecidedStrict) (Rec0 String) :*: (S1 ('MetaSel ('Just "headers") 'NoSourceUnpackedness 'SourceStrict 'DecidedStrict) (Rec0 [(String, String)]) :*: S1 ('MetaSel ('Just "payload") 'NoSourceUnpackedness 'SourceStrict 'DecidedStrict) (Rec0 Message))))

Starting/Running an Exchange

startExchange :: ExchangeType s -> Process Exchange Source #

Starts an exchange process with the given ExchangeType.

startSupervised :: ExchangeType s -> SupervisorPid -> Process Exchange Source #

Starts an exchange as part of a supervision tree.

Example: > childSpec = toChildStart $ startSupervised exType

startSupervisedRef :: ExchangeType s -> SupervisorPid -> Process (ProcessId, Message) Source #

Starts an exchange as part of a supervision tree.

Example: > childSpec = toChildStart $ startSupervisedRef exType

runExchange :: ExchangeType s -> MVar (ControlPort ControlMessage) -> Process () Source #

Client Facing API

post :: Serializable a => Exchange -> a -> Process () Source #

Posts an arbitrary Serializable datum to an exchange. The raw datum is wrapped in the Message data type, with its key set to "" and its headers to [].

postMessage :: Exchange -> Message -> Process () Source #

Posts a Message to an exchange.

configureExchange :: Serializable m => Exchange -> m -> Process () Source #

Sends an arbitrary Serializable datum to an exchange, for use as a configuration change - see configureEx for details.

createMessage :: Serializable m => String -> [(String, String)] -> m -> Message Source #

Utility for creating a Message datum from its key, headers and payload.

Broadcast Exchange

broadcastExchange :: Process Exchange Source #

Start a new broadcast exchange and return a handle to the exchange.

broadcastExchangeT :: Process BroadcastExchange Source #

The ExchangeType of a broadcast exchange. Can be combined with the startSupervisedRef and startSupervised APIs.

broadcastClient :: Exchange -> Process (InputStream Message) Source #

Create a binding to the given broadcast exchange for the calling process and return an InputStream that can be used in the expect and receiveWait family of messaging primitives. This form of client interaction helps avoid cluttering the caller's mailbox with Message data, since the InputChannel provides a separate input stream (in a similar fashion to a typed channel). Example:

is <- broadcastClient ex
msg <- receiveWait [ matchInputStream is ]
handleMessage (payload msg)

bindToBroadcaster :: Exchange -> Process () Source #

Bind the calling process to the given broadcast exchange. For each Message the exchange receives, only the payload will be sent to the calling process' mailbox.

Example:

(producer) > post ex Hello

(consumer) > bindToBroadcaster ex > expect >>= liftIO . putStrLn

Routing (Content Based)

data Binding Source #

The binding key used by the built-in key and header based routers.

Instances

Instances details
Binary Binding Source # 
Instance details

Defined in Control.Distributed.Process.Execution.Exchange.Router

Methods

put :: Binding -> Put #

get :: Get Binding #

putList :: [Binding] -> Put #

NFData Binding Source # 
Instance details

Defined in Control.Distributed.Process.Execution.Exchange.Router

Methods

rnf :: Binding -> () #

Generic Binding Source # 
Instance details

Defined in Control.Distributed.Process.Execution.Exchange.Router

Associated Types

type Rep Binding 
Instance details

Defined in Control.Distributed.Process.Execution.Exchange.Router

type Rep Binding = D1 ('MetaData "Binding" "Control.Distributed.Process.Execution.Exchange.Router" "distributed-process-execution-0.1.5.0-9ft8kt71jqJFQLQ1PfEqJf" 'False) (C1 ('MetaCons "BindKey" 'PrefixI 'True) (S1 ('MetaSel ('Just "bindingKey") 'NoSourceUnpackedness 'SourceStrict 'DecidedStrict) (Rec0 String)) :+: (C1 ('MetaCons "BindHeader" 'PrefixI 'True) (S1 ('MetaSel ('Just "bindingKey") 'NoSourceUnpackedness 'SourceStrict 'DecidedStrict) (Rec0 String) :*: S1 ('MetaSel ('Just "headerName") 'NoSourceUnpackedness 'SourceStrict 'DecidedStrict) (Rec0 HeaderName)) :+: C1 ('MetaCons "BindNone" 'PrefixI 'False) (U1 :: Type -> Type)))

Methods

from :: Binding -> Rep Binding x #

to :: Rep Binding x -> Binding #

Show Binding Source # 
Instance details

Defined in Control.Distributed.Process.Execution.Exchange.Router

Eq Binding Source # 
Instance details

Defined in Control.Distributed.Process.Execution.Exchange.Router

Methods

(==) :: Binding -> Binding -> Bool #

(/=) :: Binding -> Binding -> Bool #

Hashable Binding Source # 
Instance details

Defined in Control.Distributed.Process.Execution.Exchange.Router

Methods

hashWithSalt :: Int -> Binding -> Int #

hash :: Binding -> Int #

type Rep Binding Source # 
Instance details

Defined in Control.Distributed.Process.Execution.Exchange.Router

type Rep Binding = D1 ('MetaData "Binding" "Control.Distributed.Process.Execution.Exchange.Router" "distributed-process-execution-0.1.5.0-9ft8kt71jqJFQLQ1PfEqJf" 'False) (C1 ('MetaCons "BindKey" 'PrefixI 'True) (S1 ('MetaSel ('Just "bindingKey") 'NoSourceUnpackedness 'SourceStrict 'DecidedStrict) (Rec0 String)) :+: (C1 ('MetaCons "BindHeader" 'PrefixI 'True) (S1 ('MetaSel ('Just "bindingKey") 'NoSourceUnpackedness 'SourceStrict 'DecidedStrict) (Rec0 String) :*: S1 ('MetaSel ('Just "headerName") 'NoSourceUnpackedness 'SourceStrict 'DecidedStrict) (Rec0 HeaderName)) :+: C1 ('MetaCons "BindNone" 'PrefixI 'False) (U1 :: Type -> Type)))

class (Hashable k, Eq k, Serializable k) => Bindable k Source #

Things that can be used as binding keys in a router.

Instances

Instances details
(Hashable k, Eq k, Serializable k) => Bindable k Source # 
Instance details

Defined in Control.Distributed.Process.Execution.Exchange.Router

type BindingSelector k = Message -> Process k Source #

Used to convert a Message into a Bindable routing key.

data RelayType Source #

Given to a router to indicate whether clients should receive Message payloads only, or the whole Message object itself.

Constructors

PayloadOnly 
WholeMessage 

Starting a Router

router :: Bindable k => RelayType -> BindingSelector k -> Process Exchange Source #

Defines a router exchange. The BindingSelector is used to construct a binding (i.e., an instance of the Bindable type k) for each incoming Message. Such bindings are matched against bindings stored in the exchange. Clients of a router exchange are identified by a binding, mapped to one or more ProcessIds.

The format of the bindings, nature of their storage and mechanism for submitting new bindings is implementation dependent (i.e., will vary by exchange type). For example, the messageKeyRouter and headerContentRouter implementations both use the Binding data type, which can represent a Message key or a HeaderName and content. As with all custom exchange types, bindings should be submitted by evaluating configureExchange with a suitable data type.

supervisedRouter :: Bindable k => RelayType -> BindingSelector k -> SupervisorPid -> Process Exchange Source #

Defines a router that can be used in a supervision tree.

Routing (Publishing) API

route :: Serializable m => Exchange -> m -> Process () Source #

Send a Serializable message to the supplied Exchange. The given datum will be converted to a Message, with the key set to "" and the headers to [].

The routing behaviour will be dependent on the choice of BindingSelector given when initialising the router.

routeMessage :: Exchange -> Message -> Process () Source #

Send a Message to the supplied Exchange. The routing behaviour will be dependent on the choice of BindingSelector given when initialising the router.

Routing via message/binding keys

messageKeyRouter :: RelayType -> Process Exchange Source #

A router that matches on a Message key. To bind a client Process to such an exchange, use the bindKey function.

bindKey :: String -> Exchange -> Process () Source #

Add a binding (for the calling process) to a messageKeyRouter exchange.

Routing via message headers

headerContentRouter :: RelayType -> HeaderName -> Process Exchange Source #

A router that matches on a specific (named) header. To bind a client Process to such an exchange, use the bindHeader function.

bindHeader :: HeaderName -> String -> Exchange -> Process () Source #

Add a binding (for the calling process) to a headerContentRouter exchange.

Defining Custom Exchange Types

data ExchangeType s Source #

Different exchange types are defined using record syntax. The configureEx and routeEx API functions are called during the exchange lifecycle when incoming traffic arrives. Configuration messages are completely arbitrary types and the exchange type author is entirely responsible for decoding them. Messages posted to the exchange (see the Message data type) are passed to the routeEx API function along with the exchange type's own internal state. Both API functions return a new (potentially updated) state and run in the Process monad.

Constructors

ExchangeType 

Fields

applyHandlers :: a -> Message -> [Message -> Process (Maybe a)] -> Process a Source #

Utility for custom exchange type authors - evaluates a set of primitive message handlers from left to right, returning the first which evaluates to Just a, or the initial e value if all the handlers yield Nothing.