{-# LANGUAGE DeriveDataTypeable    #-}
{-# LANGUAGE DeriveGeneric         #-}
{-# LANGUAGE StandaloneDeriving    #-}
{-# LANGUAGE ScopedTypeVariables   #-}
{-# LANGUAGE PatternGuards         #-}
{-# LANGUAGE RecordWildCards       #-}
{-# LANGUAGE FlexibleInstances     #-}
{-# LANGUAGE EmptyDataDecls        #-}
{-# LANGUAGE TemplateHaskell       #-}
{-# LANGUAGE ImpredicativeTypes    #-}
{-# LANGUAGE UndecidableInstances  #-}
{-# LANGUAGE MultiParamTypeClasses #-}

-- | Internal Exchange Implementation
module Control.Distributed.Process.Execution.Exchange.Internal
  ( Exchange(..)
  , Message(..)
  , ExchangeType(..)
  , startExchange
  , startSupervised
  , startSupervisedRef
  , runExchange
  , post
  , postMessage
  , configureExchange
  , createMessage
  , applyHandlers
  ) where

import Control.Concurrent.MVar (MVar, takeMVar, putMVar, newEmptyMVar)
import Control.DeepSeq (NFData)
import Control.Distributed.Process
  ( Process
  , ProcessMonitorNotification(..)
  , ProcessId
  , liftIO
  , spawnLocal
  , unsafeWrapMessage
  )
import qualified Control.Distributed.Process as P (Message, link)
import Control.Distributed.Process.Serializable hiding (SerializableDict)
import Control.Distributed.Process.Extras.Internal.Types
  ( Resolvable(..)
  )
import Control.Distributed.Process.Extras.Internal.Primitives
  ( Linkable(..)
  )
import Control.Distributed.Process.ManagedProcess
  ( channelControlPort
  , handleControlChan
  , handleInfo
  , handleRaw
  , continue
  , defaultProcess
  , InitHandler
  , InitResult(..)
  , ProcessAction
  , ProcessDefinition(..)
  , ControlChannel
  , ControlPort
  )
import qualified Control.Distributed.Process.ManagedProcess as MP
  ( chanServe
  )
import Control.Distributed.Process.ManagedProcess.UnsafeClient
  ( sendControlMessage
  )
import Control.Distributed.Process.Supervisor (SupervisorPid)
import Control.Distributed.Process.Extras.Time (Delay(Infinity))
import Data.Binary
import Data.Typeable (Typeable)
import GHC.Generics
import Prelude hiding (drop)

{- [design notes]

Messages are sent to exchanges and forwarded to clients. An exchange
is parameterised by its routing mechanism, which is responsible for
maintaining its own client state and selecting the clients to which
messages are forwarded.

-}

-- | Opaque handle to an exchange.
--
data Exchange = Exchange { Exchange -> ProcessId
pid   :: !ProcessId
                         , Exchange -> ControlPort ControlMessage
cchan :: !(ControlPort ControlMessage)
                         , Exchange -> String
xType :: !String
                         } deriving (Typeable, (forall x. Exchange -> Rep Exchange x)
-> (forall x. Rep Exchange x -> Exchange) -> Generic Exchange
forall x. Rep Exchange x -> Exchange
forall x. Exchange -> Rep Exchange x
forall a.
(forall x. a -> Rep a x) -> (forall x. Rep a x -> a) -> Generic a
$cfrom :: forall x. Exchange -> Rep Exchange x
from :: forall x. Exchange -> Rep Exchange x
$cto :: forall x. Rep Exchange x -> Exchange
to :: forall x. Rep Exchange x -> Exchange
Generic, Exchange -> Exchange -> Bool
(Exchange -> Exchange -> Bool)
-> (Exchange -> Exchange -> Bool) -> Eq Exchange
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: Exchange -> Exchange -> Bool
== :: Exchange -> Exchange -> Bool
$c/= :: Exchange -> Exchange -> Bool
/= :: Exchange -> Exchange -> Bool
Eq)
instance Binary Exchange where
instance Show Exchange where
  show :: Exchange -> String
show Exchange{String
ProcessId
ControlPort ControlMessage
pid :: Exchange -> ProcessId
cchan :: Exchange -> ControlPort ControlMessage
xType :: Exchange -> String
pid :: ProcessId
cchan :: ControlPort ControlMessage
xType :: String
..} = (String
xType String -> ShowS
forall a. [a] -> [a] -> [a]
++ String
":" String -> ShowS
forall a. [a] -> [a] -> [a]
++ (ProcessId -> String
forall a. Show a => a -> String
show ProcessId
pid))

instance Resolvable Exchange where
  resolve :: Exchange -> Process (Maybe ProcessId)
resolve = Maybe ProcessId -> Process (Maybe ProcessId)
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe ProcessId -> Process (Maybe ProcessId))
-> (Exchange -> Maybe ProcessId)
-> Exchange
-> Process (Maybe ProcessId)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ProcessId -> Maybe ProcessId
forall a. a -> Maybe a
Just (ProcessId -> Maybe ProcessId)
-> (Exchange -> ProcessId) -> Exchange -> Maybe ProcessId
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Exchange -> ProcessId
pid

{-
instance Observable Exchange MonitorRef ProcessMonitorNotification where
  observe   = P.monitor . pid
  unobserve = P.unmonitor
  observableFrom ref (ProcessMonitorNotification ref' _ r) =
    return $ if ref' == ref then Just r else Nothing
-}

instance Linkable Exchange where
  linkTo :: Resolvable Exchange => Exchange -> Process ()
linkTo = ProcessId -> Process ()
P.link (ProcessId -> Process ())
-> (Exchange -> ProcessId) -> Exchange -> Process ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Exchange -> ProcessId
pid

-- we communicate with exchanges using control channels
sendCtrlMsg :: Exchange -> ControlMessage -> Process ()
sendCtrlMsg :: Exchange -> ControlMessage -> Process ()
sendCtrlMsg Exchange{String
ProcessId
ControlPort ControlMessage
pid :: Exchange -> ProcessId
cchan :: Exchange -> ControlPort ControlMessage
xType :: Exchange -> String
pid :: ProcessId
cchan :: ControlPort ControlMessage
xType :: String
..} = ControlPort ControlMessage -> ControlMessage -> Process ()
forall m. Serializable m => ControlPort m -> m -> Process ()
sendControlMessage ControlPort ControlMessage
cchan

-- | Messages sent to an exchange can optionally provide a routing
-- key and a list of (key, value) headers in addition to the underlying
-- payload.
data Message =
  Message { Message -> String
key     :: !String  -- ^ a /routing key/ for the payload
          , Message -> [(String, String)]
headers :: ![(String, String)] -- ^ arbitrary key-value headers
          , Message -> Message
payload :: !P.Message  -- ^ the underlying @Message@ payload
          } deriving (Typeable, (forall x. Message -> Rep Message x)
-> (forall x. Rep Message x -> Message) -> Generic Message
forall x. Rep Message x -> Message
forall x. Message -> Rep Message x
forall a.
(forall x. a -> Rep a x) -> (forall x. Rep a x -> a) -> Generic a
$cfrom :: forall x. Message -> Rep Message x
from :: forall x. Message -> Rep Message x
$cto :: forall x. Rep Message x -> Message
to :: forall x. Rep Message x -> Message
Generic, Int -> Message -> ShowS
[Message] -> ShowS
Message -> String
(Int -> Message -> ShowS)
-> (Message -> String) -> ([Message] -> ShowS) -> Show Message
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> Message -> ShowS
showsPrec :: Int -> Message -> ShowS
$cshow :: Message -> String
show :: Message -> String
$cshowList :: [Message] -> ShowS
showList :: [Message] -> ShowS
Show)
instance Binary Message where
instance NFData Message where

data ControlMessage =
    Configure !P.Message
  | Post      !Message
    deriving (Typeable, (forall x. ControlMessage -> Rep ControlMessage x)
-> (forall x. Rep ControlMessage x -> ControlMessage)
-> Generic ControlMessage
forall x. Rep ControlMessage x -> ControlMessage
forall x. ControlMessage -> Rep ControlMessage x
forall a.
(forall x. a -> Rep a x) -> (forall x. Rep a x -> a) -> Generic a
$cfrom :: forall x. ControlMessage -> Rep ControlMessage x
from :: forall x. ControlMessage -> Rep ControlMessage x
$cto :: forall x. Rep ControlMessage x -> ControlMessage
to :: forall x. Rep ControlMessage x -> ControlMessage
Generic)
instance Binary ControlMessage where
instance NFData ControlMessage where

-- | Different exchange types are defined using record syntax.
-- The 'configureEx' and 'routeEx' API functions are called during the exchange
-- lifecycle when incoming traffic arrives. Configuration messages are
-- completely arbitrary types and the exchange type author is entirely
-- responsible for decoding them. Messages posted to the exchange (see the
-- 'Message' data type) are passed to the 'routeEx' API function along with the
-- exchange type's own internal state. Both API functions return a new
-- (potentially updated) state and run in the @Process@ monad.
--
data ExchangeType s =
  ExchangeType { forall s. ExchangeType s -> String
name        :: String
               , forall s. ExchangeType s -> s
state       :: s
               , forall s. ExchangeType s -> s -> Message -> Process s
configureEx :: s -> P.Message -> Process s
               , forall s. ExchangeType s -> s -> Message -> Process s
routeEx     :: s -> Message -> Process s
               }

--------------------------------------------------------------------------------
-- Starting/Running an Exchange                                               --
--------------------------------------------------------------------------------

-- | Starts an /exchange process/ with the given 'ExchangeType'.
startExchange :: forall s. ExchangeType s -> Process Exchange
startExchange :: forall s. ExchangeType s -> Process Exchange
startExchange = Maybe ProcessId -> ExchangeType s -> Process Exchange
forall s. Maybe ProcessId -> ExchangeType s -> Process Exchange
doStart Maybe ProcessId
forall a. Maybe a
Nothing

-- | Starts an exchange as part of a supervision tree.
--
-- Example:
-- > childSpec = toChildStart $ startSupervisedRef exType
--
startSupervisedRef :: forall s . ExchangeType s
                   -> SupervisorPid
                   -> Process (ProcessId, P.Message)
startSupervisedRef :: forall s.
ExchangeType s -> ProcessId -> Process (ProcessId, Message)
startSupervisedRef ExchangeType s
t ProcessId
s = do
  ex <- ExchangeType s -> ProcessId -> Process Exchange
forall s. ExchangeType s -> ProcessId -> Process Exchange
startSupervised ExchangeType s
t ProcessId
s
  return (pid ex, unsafeWrapMessage ex)

-- | Starts an exchange as part of a supervision tree.
--
-- Example:
-- > childSpec = toChildStart $ startSupervised exType
--
startSupervised :: forall s . ExchangeType s
                -> SupervisorPid
                -> Process Exchange
startSupervised :: forall s. ExchangeType s -> ProcessId -> Process Exchange
startSupervised ExchangeType s
t ProcessId
s = Maybe ProcessId -> ExchangeType s -> Process Exchange
forall s. Maybe ProcessId -> ExchangeType s -> Process Exchange
doStart (ProcessId -> Maybe ProcessId
forall a. a -> Maybe a
Just ProcessId
s) ExchangeType s
t

doStart :: Maybe SupervisorPid -> ExchangeType s -> Process Exchange
doStart :: forall s. Maybe ProcessId -> ExchangeType s -> Process Exchange
doStart Maybe ProcessId
mSp ExchangeType s
t = do
  cchan <- IO (MVar (ControlPort ControlMessage))
-> Process (MVar (ControlPort ControlMessage))
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (MVar (ControlPort ControlMessage))
 -> Process (MVar (ControlPort ControlMessage)))
-> IO (MVar (ControlPort ControlMessage))
-> Process (MVar (ControlPort ControlMessage))
forall a b. (a -> b) -> a -> b
$ IO (MVar (ControlPort ControlMessage))
forall a. IO (MVar a)
newEmptyMVar
  spawnLocal (maybeLink mSp >> runExchange t cchan) >>= \ProcessId
pid -> do
    cc <- IO (ControlPort ControlMessage)
-> Process (ControlPort ControlMessage)
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (ControlPort ControlMessage)
 -> Process (ControlPort ControlMessage))
-> IO (ControlPort ControlMessage)
-> Process (ControlPort ControlMessage)
forall a b. (a -> b) -> a -> b
$ MVar (ControlPort ControlMessage)
-> IO (ControlPort ControlMessage)
forall a. MVar a -> IO a
takeMVar MVar (ControlPort ControlMessage)
cchan
    return $ Exchange pid cc (name t)
  where
    maybeLink :: Maybe ProcessId -> Process ()
maybeLink Maybe ProcessId
Nothing   = () -> Process ()
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
    maybeLink (Just ProcessId
p') = ProcessId -> Process ()
P.link ProcessId
p'

runExchange :: forall s.
               ExchangeType s
            -> MVar (ControlPort ControlMessage)
            -> Process ()
runExchange :: forall s.
ExchangeType s -> MVar (ControlPort ControlMessage) -> Process ()
runExchange ExchangeType s
t MVar (ControlPort ControlMessage)
tc = ExchangeType s
-> InitHandler (ExchangeType s) (ExchangeType s)
-> (ControlChannel ControlMessage
    -> Process (ProcessDefinition (ExchangeType s)))
-> Process ()
forall b a s.
Serializable b =>
a
-> InitHandler a s
-> (ControlChannel b -> Process (ProcessDefinition s))
-> Process ()
MP.chanServe ExchangeType s
t InitHandler (ExchangeType s) (ExchangeType s)
forall s. InitHandler (ExchangeType s) (ExchangeType s)
exInit (ExchangeType s
-> MVar (ControlPort ControlMessage)
-> ControlChannel ControlMessage
-> Process (ProcessDefinition (ExchangeType s))
forall s.
ExchangeType s
-> MVar (ControlPort ControlMessage)
-> ControlChannel ControlMessage
-> Process (ProcessDefinition (ExchangeType s))
processDefinition ExchangeType s
t MVar (ControlPort ControlMessage)
tc)

exInit :: forall s. InitHandler (ExchangeType s) (ExchangeType s)
exInit :: forall s. InitHandler (ExchangeType s) (ExchangeType s)
exInit ExchangeType s
t = InitResult (ExchangeType s)
-> Process (InitResult (ExchangeType s))
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return (InitResult (ExchangeType s)
 -> Process (InitResult (ExchangeType s)))
-> InitResult (ExchangeType s)
-> Process (InitResult (ExchangeType s))
forall a b. (a -> b) -> a -> b
$ ExchangeType s -> Delay -> InitResult (ExchangeType s)
forall s. s -> Delay -> InitResult s
InitOk ExchangeType s
t Delay
Infinity

--------------------------------------------------------------------------------
-- Client Facing API                                                          --
--------------------------------------------------------------------------------

-- | Posts an arbitrary 'Serializable' datum to an /exchange/. The raw datum is
-- wrapped in the 'Message' data type, with its 'key' set to @""@ and its
-- 'headers' to @[]@.
post :: Serializable a => Exchange -> a -> Process ()
post :: forall a. Serializable a => Exchange -> a -> Process ()
post Exchange
ex a
msg = Exchange -> Message -> Process ()
postMessage Exchange
ex (Message -> Process ()) -> Message -> Process ()
forall a b. (a -> b) -> a -> b
$ String -> [(String, String)] -> Message -> Message
Message String
"" [] (a -> Message
forall a. Serializable a => a -> Message
unsafeWrapMessage a
msg)

-- | Posts a 'Message' to an /exchange/.
postMessage :: Exchange -> Message -> Process ()
postMessage :: Exchange -> Message -> Process ()
postMessage Exchange
ex Message
msg = Message
msg Message -> Process () -> Process ()
forall a b. a -> b -> b
`seq` Exchange -> ControlMessage -> Process ()
sendCtrlMsg Exchange
ex (ControlMessage -> Process ()) -> ControlMessage -> Process ()
forall a b. (a -> b) -> a -> b
$ Message -> ControlMessage
Post Message
msg

-- | Sends an arbitrary 'Serializable' datum to an /exchange/, for use as a
-- configuration change - see 'configureEx' for details.
configureExchange :: Serializable m => Exchange -> m -> Process ()
configureExchange :: forall a. Serializable a => Exchange -> a -> Process ()
configureExchange Exchange
e m
m = Exchange -> ControlMessage -> Process ()
sendCtrlMsg Exchange
e (ControlMessage -> Process ()) -> ControlMessage -> Process ()
forall a b. (a -> b) -> a -> b
$ Message -> ControlMessage
Configure (m -> Message
forall a. Serializable a => a -> Message
unsafeWrapMessage m
m)

-- | Utility for creating a 'Message' datum from its 'key', 'headers' and
-- 'payload'.
createMessage :: Serializable m => String -> [(String, String)] -> m -> Message
createMessage :: forall m.
Serializable m =>
String -> [(String, String)] -> m -> Message
createMessage String
k [(String, String)]
h m
m = String -> [(String, String)] -> Message -> Message
Message String
k [(String, String)]
h (Message -> Message) -> Message -> Message
forall a b. (a -> b) -> a -> b
$ m -> Message
forall a. Serializable a => a -> Message
unsafeWrapMessage m
m

-- | Utility for custom exchange type authors - evaluates a set of primitive
-- message handlers from left to right, returning the first which evaluates
-- to @Just a@, or the initial @e@ value if all the handlers yield @Nothing@.
applyHandlers :: a
              -> P.Message
              -> [P.Message -> Process (Maybe a)]
              -> Process a
applyHandlers :: forall a.
a -> Message -> [Message -> Process (Maybe a)] -> Process a
applyHandlers a
e Message
_ []     = a -> Process a
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return a
e
applyHandlers a
e Message
m (Message -> Process (Maybe a)
f:[Message -> Process (Maybe a)]
fs) = do
  r <- Message -> Process (Maybe a)
f Message
m
  case r of
    Maybe a
Nothing -> a -> Message -> [Message -> Process (Maybe a)] -> Process a
forall a.
a -> Message -> [Message -> Process (Maybe a)] -> Process a
applyHandlers a
e Message
m [Message -> Process (Maybe a)]
fs
    Just a
r' -> a -> Process a
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return a
r'

--------------------------------------------------------------------------------
-- Process Definition/State & API Handlers                                    --
--------------------------------------------------------------------------------

processDefinition :: forall s.
                     ExchangeType s
                  -> MVar (ControlPort ControlMessage)
                  -> ControlChannel ControlMessage
                  -> Process (ProcessDefinition (ExchangeType s))
processDefinition :: forall s.
ExchangeType s
-> MVar (ControlPort ControlMessage)
-> ControlChannel ControlMessage
-> Process (ProcessDefinition (ExchangeType s))
processDefinition ExchangeType s
_ MVar (ControlPort ControlMessage)
tc ControlChannel ControlMessage
cc = do
  IO () -> Process ()
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> Process ()) -> IO () -> Process ()
forall a b. (a -> b) -> a -> b
$ MVar (ControlPort ControlMessage)
-> ControlPort ControlMessage -> IO ()
forall a. MVar a -> a -> IO ()
putMVar MVar (ControlPort ControlMessage)
tc (ControlPort ControlMessage -> IO ())
-> ControlPort ControlMessage -> IO ()
forall a b. (a -> b) -> a -> b
$ ControlChannel ControlMessage -> ControlPort ControlMessage
forall m. ControlChannel m -> ControlPort m
channelControlPort ControlChannel ControlMessage
cc
  ProcessDefinition (ExchangeType s)
-> Process (ProcessDefinition (ExchangeType s))
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return (ProcessDefinition (ExchangeType s)
 -> Process (ProcessDefinition (ExchangeType s)))
-> ProcessDefinition (ExchangeType s)
-> Process (ProcessDefinition (ExchangeType s))
forall a b. (a -> b) -> a -> b
$
    ProcessDefinition (ExchangeType s)
forall s. ProcessDefinition s
defaultProcess {
        externHandlers = [ handleControlChan cc handleControlMessage ]
      , infoHandlers   = [ handleInfo handleMonitor
                         , handleRaw convertToCC
                         ]
      } :: Process (ProcessDefinition (ExchangeType s))

handleMonitor :: forall s.
                 ExchangeType s
              -> ProcessMonitorNotification
              -> Process (ProcessAction (ExchangeType s))
handleMonitor :: forall s.
ExchangeType s
-> ProcessMonitorNotification
-> Process (ProcessAction (ExchangeType s))
handleMonitor ExchangeType s
ex ProcessMonitorNotification
m = do
  ExchangeType s
-> ControlMessage -> Process (ProcessAction (ExchangeType s))
forall s.
ExchangeType s
-> ControlMessage -> Process (ProcessAction (ExchangeType s))
handleControlMessage ExchangeType s
ex (Message -> ControlMessage
Configure (ProcessMonitorNotification -> Message
forall a. Serializable a => a -> Message
unsafeWrapMessage ProcessMonitorNotification
m))

convertToCC :: forall s.
               ExchangeType s
            -> P.Message
            -> Process (ProcessAction (ExchangeType s))
convertToCC :: forall s.
ExchangeType s
-> Message -> Process (ProcessAction (ExchangeType s))
convertToCC ExchangeType s
ex Message
msg = do
  IO () -> Process ()
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> Process ()) -> IO () -> Process ()
forall a b. (a -> b) -> a -> b
$ String -> IO ()
putStrLn String
"convert to cc"
  ExchangeType s
-> ControlMessage -> Process (ProcessAction (ExchangeType s))
forall s.
ExchangeType s
-> ControlMessage -> Process (ProcessAction (ExchangeType s))
handleControlMessage ExchangeType s
ex (Message -> ControlMessage
Post (Message -> ControlMessage) -> Message -> ControlMessage
forall a b. (a -> b) -> a -> b
$ String -> [(String, String)] -> Message -> Message
Message String
"" [] Message
msg)

handleControlMessage :: forall s.
                        ExchangeType s
                     -> ControlMessage
                     -> Process (ProcessAction (ExchangeType s))
handleControlMessage :: forall s.
ExchangeType s
-> ControlMessage -> Process (ProcessAction (ExchangeType s))
handleControlMessage ex :: ExchangeType s
ex@ExchangeType{s
String
s -> Message -> Process s
s -> Message -> Process s
configureEx :: forall s. ExchangeType s -> s -> Message -> Process s
routeEx :: forall s. ExchangeType s -> s -> Message -> Process s
name :: forall s. ExchangeType s -> String
state :: forall s. ExchangeType s -> s
name :: String
state :: s
configureEx :: s -> Message -> Process s
routeEx :: s -> Message -> Process s
..} ControlMessage
cm =
  let action :: Process s
action = case ControlMessage
cm of
                 Configure Message
msg -> s -> Message -> Process s
configureEx s
state Message
msg
                 Post      Message
msg -> s -> Message -> Process s
routeEx     s
state Message
msg
  in Process s
action Process s
-> (s -> Process (ProcessAction (ExchangeType s)))
-> Process (ProcessAction (ExchangeType s))
forall a b. Process a -> (a -> Process b) -> Process b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \s
s -> ExchangeType s -> Process (ProcessAction (ExchangeType s))
forall s. s -> Action s
continue (ExchangeType s -> Process (ProcessAction (ExchangeType s)))
-> ExchangeType s -> Process (ProcessAction (ExchangeType s))
forall a b. (a -> b) -> a -> b
$ ExchangeType s
ex { state = s }