Skip to content
This repository was archived by the owner on Sep 3, 2024. It is now read-only.

Commit 061135b

Browse files
authored
Handle arbitrary STM actions
Provide an API for for communication between CH and non-CH code
2 parents e78ac26 + 4a7efad commit 061135b

14 files changed

+546
-150
lines changed

.travis.yml

Lines changed: 30 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,22 +1,37 @@
1-
language: haskell
1+
language: c
22

3-
ghc:
4-
- 7.8
5-
- 7.6
6-
- 7.4
3+
sudo: false
4+
5+
matrix:
6+
include:
7+
- env: ARGS="--resolver nightly" COVER="" GHCVER=latest
8+
addons: {apt: {packages: [libgmp-dev]}}
9+
10+
cache:
11+
directories:
12+
- $HOME/.stack
13+
- $HOME/.local
714

815
before_install:
9-
- cabal sandbox init
10-
- for i in `cat REPOS`; do git clone http://github.com/haskell-distributed/$i; done
11-
- for i in `cat REPOS`; do cabal sandbox add-source $i; done
12-
- sudo apt-get update -qq
13-
- sudo apt-get install -qq binutils-dev
16+
- export PATH=$HOME/.local/bin:$HOME/.cabal/bin:$PATH
17+
- mkdir -p ~/.local/bin
18+
- travis_retry curl -L https://www.stackage.org/stack/linux-x86_64 | tar xz --wildcards --strip-components=1 -C ~/.local/bin '*/stack'
19+
- stack --version
1420

1521
install:
16-
# Don't run tests for dependencies.
17-
- cabal install --only-dependencies
18-
- cabal install --only-dependencies distributed-process-tests
22+
- stack ${ARGS} setup --no-terminal
1923

2024
script:
21-
- cabal install
22-
- cabal install --enable-tests -j1 distributed-process-tests
25+
- case "$COVER" in
26+
true)
27+
stack ${ARGS} test --coverage --no-terminal;
28+
./coverage.sh;
29+
;;
30+
*)
31+
stack ${ARGS} test --test-arguments='--plain'
32+
;;
33+
esac
34+
35+
notifications:
36+
slack:
37+
secure: g0NP1tkOe3+kI6O0Q1mgT/jPaLjxQ31J26MWouicu2F1Y3p73qTvv/QsOkafRMZDn07HlzgviCP25r7Ytg32pUAFvOh4U4MT2MpO0jUVVGPi4ZiwB+W5AH+HlDtJSickeSZ0AjXZSaGv8nQNegWkeaLQgLBIzrTHU8s0Y9K+whQ=

distributed-process-client-server.cabal

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -28,9 +28,9 @@ library
2828
build-depends:
2929
base >= 4.4 && < 5,
3030
data-accessor >= 0.2.2.3,
31-
distributed-process >= 0.5.2 && < 0.7,
32-
distributed-process-extras >= 0.2.0 && < 0.3,
33-
distributed-process-async >= 0.2.1 && < 0.3,
31+
distributed-process >= 0.6.6 && < 0.7,
32+
distributed-process-extras >= 0.3.0 && < 0.4,
33+
distributed-process-async >= 0.2.3 && < 0.3,
3434
binary >= 0.6.3.0 && < 0.9,
3535
deepseq >= 1.3.0.1 && < 1.6,
3636
mtl,
@@ -68,11 +68,11 @@ test-suite ManagedProcessTests
6868
base >= 4.4 && < 5,
6969
ansi-terminal >= 0.5 && < 0.7,
7070
containers,
71-
distributed-process >= 0.5.2 && < 0.7,
72-
distributed-process-extras >= 0.2.0 && < 0.3,
73-
distributed-process-async >= 0.2.1 && < 0.3,
71+
distributed-process >= 0.6.6 && < 0.7,
72+
distributed-process-extras >= 0.3.0 && < 0.4,
73+
distributed-process-async >= 0.2.3 && < 0.3,
7474
distributed-process-client-server,
75-
distributed-process-tests >= 0.4.2 && < 0.5,
75+
distributed-process-systest >= 0.1.1 && < 0.2,
7676
network-transport >= 0.4 && < 0.5,
7777
mtl,
7878
fingertree < 0.2,
@@ -101,11 +101,11 @@ test-suite PrioritisedProcessTests
101101
base >= 4.4 && < 5,
102102
ansi-terminal >= 0.5 && < 0.7,
103103
containers,
104-
distributed-process >= 0.5.2 && < 0.7,
105-
distributed-process-extras >= 0.2.0 && < 0.3,
106-
distributed-process-async >= 0.2.1 && < 0.3,
104+
distributed-process >= 0.6.6 && < 0.7,
105+
distributed-process-extras >= 0.3.0 && < 0.4,
106+
distributed-process-async >= 0.2.3 && < 0.3,
107107
distributed-process-client-server,
108-
distributed-process-tests >= 0.4.2 && < 0.5,
108+
distributed-process-systest >= 0.1.1 && < 0.2,
109109
network-transport >= 0.4 && < 0.5,
110110
mtl,
111111
fingertree < 0.2,

src/Control/Distributed/Process/ManagedProcess.hs

Lines changed: 103 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,5 @@
1-
{-# LANGUAGE DeriveDataTypeable #-}
21
{-# LANGUAGE ExistentialQuantification #-}
32
{-# LANGUAGE ScopedTypeVariables #-}
4-
{-# LANGUAGE TemplateHaskell #-}
5-
{-# LANGUAGE RecordWildCards #-}
63

74
-----------------------------------------------------------------------------
85
-- |
@@ -229,6 +226,13 @@
229226
-- Using a prioritised process is as simple as calling 'pserve' instead of
230227
-- 'serve', and passing an initialised 'PrioritisedProcessDefinition'.
231228
--
229+
-- Note that prioritised process definitions cannot utilise control channels,
230+
-- nor can the @handleExternal@ family of expressions be used with them. This
231+
-- constraint is currenly not enforced by the compiler, and calling @pserve@
232+
-- with a @ProcessDefinition@ containing any of these items will fail with
233+
-- either @ExitOther "IllegalControlChannel"@ or @ExitOther "IllegalSTMAction"@
234+
-- at runtime.
235+
--
232236
-- [Control Channels]
233237
--
234238
-- For advanced users and those requiring very low latency, a prioritised
@@ -301,6 +305,98 @@
301305
-- > sendControlMessage cp $ Request str sp
302306
-- > receiveChan rp
303307
--
308+
-- [External (STM) Input Channels]
309+
--
310+
-- Both client and server APIs provide a mechanism for interacting with a running
311+
-- server process via STM. This is primarily intended for code that runs outside
312+
-- of Cloud Haskell's /Process/ monad, but can also be used as a channel for
313+
-- sending and/or receiving non-serializable data to or from a managed process.
314+
-- Obviously if you attempt to do this across a remote boundary, things will go
315+
-- spectacularly wrong. The APIs provided do not attempt to restrain this, or
316+
-- to impose any particular scheme on the programmer, therefore you're on your
317+
-- own when it comes to writing the /STM/ code for reading and writing data
318+
-- between client and server.
319+
--
320+
-- For code running inside the /Process/ monad and passing Serializable thunks,
321+
-- there is no real advantage to this approach, and indeed there are several
322+
-- serious disadvantages - none of Cloud Haskell's ordering guarantees will hold
323+
-- when passing data to and from server processes in this fashion, nor are there
324+
-- any guarantees the runtime system can make with regards interleaving between
325+
-- messages passed across Cloud Haskell's communication fabric vs. data shared
326+
-- via STM. This is true even when client(s) and server(s) reside on the same
327+
-- local node.
328+
--
329+
--
330+
-- A server wishing to receive data via STM can do so using the @handleExternal@
331+
-- API. By way of example, here is a simple echo server implemented using STM:
332+
--
333+
-- > demoExternal = do
334+
-- > inChan <- liftIO newTQueueIO
335+
-- > replyQ <- liftIO newTQueueIO
336+
-- > let procDef = statelessProcess {
337+
-- > apiHandlers = [
338+
-- > handleExternal
339+
-- > (readTQueue inChan)
340+
-- > (\s (m :: String) -> do
341+
-- > liftIO $ atomically $ writeTQueue replyQ m
342+
-- > continue s)
343+
-- > ]
344+
-- > }
345+
-- > let txt = "hello 2-way stm foo"
346+
-- > pid <- spawnLocal $ serve () (statelessInit Infinity) procDef
347+
-- > echoTxt <- liftIO $ do
348+
-- > -- firstly we write something that the server can receive
349+
-- > atomically $ writeTQueue inChan txt
350+
-- > -- then sit and wait for it to write something back to us
351+
-- > atomically $ readTQueue replyQ
352+
-- >
353+
-- > say (show $ echoTxt == txt)
354+
--
355+
-- For request/reply channels such as this, a convenience based on the call API
356+
-- is also provided, which allows the server author to write an ordinary call
357+
-- handler, and the client author to utilise an API that monitors the server and
358+
-- does the usual stuff you'd expect an RPC style client to do. Here is another
359+
-- example of this in use, demonstrating the @callSTM@ and @handleCallExternal@
360+
-- APIs in practise.
361+
--
362+
-- > data StmServer = StmServer { serverPid :: ProcessId
363+
-- > , writerChan :: TQueue String
364+
-- > , readerChan :: TQueue String
365+
-- > }
366+
-- >
367+
-- > instance Resolvable StmServer where
368+
-- > resolve = return . Just . serverPid
369+
-- >
370+
-- > echoStm :: StmServer -> String -> Process (Either ExitReason String)
371+
-- > echoStm StmServer{..} = callSTM serverPid
372+
-- > (writeTQueue writerChan)
373+
-- > (readTQueue readerChan)
374+
-- >
375+
-- > launchEchoServer :: CallHandler () String String -> Process StmServer
376+
-- > launchEchoServer handler = do
377+
-- > (inQ, replyQ) <- liftIO $ do
378+
-- > cIn <- newTQueueIO
379+
-- > cOut <- newTQueueIO
380+
-- > return (cIn, cOut)
381+
-- >
382+
-- > let procDef = statelessProcess {
383+
-- > apiHandlers = [
384+
-- > handleCallExternal
385+
-- > (readTQueue inQ)
386+
-- > (writeTQueue replyQ)
387+
-- > handler
388+
-- > ]
389+
-- > }
390+
-- >
391+
-- > pid <- spawnLocal $ serve () (statelessInit Infinity) procDef
392+
-- > return $ StmServer pid inQ replyQ
393+
-- >
394+
-- > testExternalCall :: TestResult Bool -> Process ()
395+
-- > testExternalCall result = do
396+
-- > let txt = "hello stm-call foo"
397+
-- > srv <- launchEchoServer (\st (msg :: String) -> reply msg st)
398+
-- > echoStm srv txt >>= stash result . (== Right txt)
399+
--
304400
-- [Performance Considerations]
305401
--
306402
-- The various server loops are fairly optimised, but there /is/ a definite
@@ -386,6 +482,10 @@ module Control.Distributed.Process.ManagedProcess
386482
, channelControlPort
387483
, handleControlChan
388484
, handleControlChan_
485+
-- * Arbitrary STM actions
486+
, handleExternal
487+
, handleExternal_
488+
, handleCallExternal
389489
-- * Prioritised mailboxes
390490
, module Control.Distributed.Process.ManagedProcess.Server.Priority
391491
-- * Constructing handler results
@@ -522,4 +622,3 @@ statelessProcess = defaultProcess :: ProcessDefinition ()
522622
-- state (i.e., unit) and the given 'Delay'.
523623
statelessInit :: Delay -> InitHandler () ()
524624
statelessInit d () = return $ InitOk () d
525-

src/Control/Distributed/Process/ManagedProcess/Client.hs

Lines changed: 64 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -28,15 +28,19 @@ module Control.Distributed.Process.ManagedProcess.Client
2828
, callChan
2929
, syncCallChan
3030
, syncSafeCallChan
31+
, callSTM
3132
) where
3233

33-
import Control.Distributed.Process hiding (call)
34+
import Control.Concurrent.STM (atomically, STM)
35+
import Control.Distributed.Process hiding (call, finally)
3436
import Control.Distributed.Process.Serializable
3537
import Control.Distributed.Process.Async hiding (check)
3638
import Control.Distributed.Process.ManagedProcess.Internal.Types
3739
import qualified Control.Distributed.Process.ManagedProcess.Internal.Types as T
40+
import Control.Distributed.Process.Extras.Internal.Types (resolveOrDie)
3841
import Control.Distributed.Process.Extras hiding (monitor, sendChan)
3942
import Control.Distributed.Process.Extras.Time
43+
import Control.Monad.Catch (finally)
4044
import Data.Maybe (fromJust)
4145

4246
import Prelude hiding (init)
@@ -72,7 +76,7 @@ call sid msg = initCall sid msg >>= waitResponse Nothing >>= decodeResult
7276
-- will be stashed away as @(ExitOther String)@.
7377
safeCall :: forall s a b . (Addressable s, Serializable a, Serializable b)
7478
=> s -> a -> Process (Either ExitReason b)
75-
safeCall s m = initCall s m >>= waitResponse Nothing >>= return . fromJust
79+
safeCall s m = fmap fromJust (initCall s m >>= waitResponse Nothing)
7680

7781
-- | Version of 'safeCall' that returns 'Nothing' if the operation fails. If
7882
-- you need information about *why* a call has failed then you should use
@@ -115,7 +119,7 @@ flushPendingCalls :: forall b . (Serializable b)
115119
=> TimeInterval
116120
-> (b -> Process b)
117121
-> Process (Maybe b)
118-
flushPendingCalls d proc = do
122+
flushPendingCalls d proc =
119123
receiveTimeout (asTimeout d) [
120124
match (\(CallResponse (m :: b) _) -> proc m)
121125
]
@@ -133,7 +137,7 @@ callAsync server msg = async $ task $ call server msg
133137
--
134138
cast :: forall a m . (Addressable a, Serializable m)
135139
=> a -> m -> Process ()
136-
cast server msg = sendTo server ((CastMessage msg) :: T.Message m ())
140+
cast server msg = sendTo server (CastMessage msg :: T.Message m ())
137141

138142
-- | Sends a /channel/ message to the server and returns a @ReceivePort@ on
139143
-- which the reponse can be delivered, if the server so chooses (i.e., the
@@ -142,7 +146,7 @@ callChan :: forall s a b . (Addressable s, Serializable a, Serializable b)
142146
=> s -> a -> Process (ReceivePort b)
143147
callChan server msg = do
144148
(sp, rp) <- newChan
145-
sendTo server ((ChanMessage msg sp) :: T.Message a b)
149+
sendTo server (ChanMessage msg sp :: T.Message a b)
146150
return rp
147151

148152
-- | A synchronous version of 'callChan'.
@@ -162,3 +166,58 @@ syncSafeCallChan server msg = do
162166
rp <- callChan server msg
163167
awaitResponse server [ matchChan rp (return . Right) ]
164168

169+
-- | Manages an rpc-style interaction with a server process, using @STM@ actions
170+
-- to read/write data. The server process is monitored for the duration of the
171+
-- /call/. The stm write expression is passed the input, and the read expression
172+
-- is evaluated and the result given as @Right b@ or @Left ExitReason@ if a
173+
-- monitor signal is detected whilst waiting.
174+
--
175+
-- Note that the caller will exit (with @ExitOther String@) if the server
176+
-- address is un-resolvable.
177+
--
178+
-- A note about scheduling and timing guarantees (or lack thereof): It is not
179+
-- possibly to guarantee the contents of @ExitReason@ in cases where this API
180+
-- fails due to server exits/crashes. We establish a monitor prior to evaluating
181+
-- the stm writer action, however @monitor@ is asychronous and we've no way to
182+
-- know whether or not the scheduler will allow monitor establishment to proceed
183+
-- first, or the stm transaction. As a result, assuming that your server process
184+
-- can die/fail/exit on evaluating the read end of the STM write we perform here
185+
-- (and we assume this is very likely, since we apply no safety rules and do not
186+
-- even worry about serializing thunks passed from the client's thread), it is
187+
-- just as likely that in the case of failure you will see a reason such as
188+
-- @ExitOther "DiedUnknownId"@ due to the server process crashing before the node
189+
-- controller can establish a monitor.
190+
--
191+
-- As unpleasant as this is, there's little we can do about it without making
192+
-- false assumptions about the runtime. Cloud Haskell's semantics guarantee us
193+
-- only that we will see /some/ monitor signal in the event of a failure here.
194+
-- To provide a more robust error handling, you can catch/trap failures in the
195+
-- server process and return a wrapper reponse datum here instead. This will
196+
-- /still/ be subject to the failure modes described above in cases where the
197+
-- server process exits abnormally, but that will at least allow the caller to
198+
-- differentiate between expected and exceptional failure conditions.
199+
--
200+
callSTM :: forall s a b . (Addressable s)
201+
=> s
202+
-> (a -> STM ())
203+
-> STM b
204+
-> a
205+
-> Process (Either ExitReason b)
206+
callSTM server writeAction readAction input = do
207+
-- NB: we must establish the monitor before writing, to ensure we have
208+
-- a valid ref such that server failure gets reported properly
209+
pid <- resolveOrDie server "callSTM: unresolveable address "
210+
mRef <- monitor pid
211+
212+
liftIO $ atomically $ writeAction input
213+
214+
finally (receiveWait [ matchRef mRef
215+
, matchSTM readAction (return . Right)
216+
])
217+
(unmonitor mRef)
218+
219+
where
220+
matchRef :: MonitorRef -> Match (Either ExitReason b)
221+
matchRef r = matchIf (\(ProcessMonitorNotification r' _ _) -> r == r')
222+
(\(ProcessMonitorNotification _ _ d) ->
223+
return (Left (ExitOther (show d))))

src/Control/Distributed/Process/ManagedProcess/Internal/GenProcess.hs

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
{-# LANGUAGE ExistentialQuantification #-}
22
{-# LANGUAGE ScopedTypeVariables #-}
3-
{-# LANGUAGE ViewPatterns #-}
43
{-# LANGUAGE PatternGuards #-}
54

65
-- | This is the @Process@ implementation of a /managed process/
@@ -40,6 +39,9 @@ import Prelude hiding (init)
4039
-- Priority Mailbox Handling --
4140
--------------------------------------------------------------------------------
4241

42+
-- TODO: we need to actually utilise recvTimeout on the prioritised pdef, such
43+
-- that a busy mailbox can't prevent us from operating normally.
44+
4345
type Queue = PriorityQ Int P.Message
4446
type TimeoutSpec = (Delay, Maybe (TimerRef, (STM ())))
4547
data TimeoutAction s = Stop s ExitReason | Go Delay s
@@ -52,8 +54,10 @@ precvLoop ppDef pState recvDelay = do
5254
where
5355
verify pDef = mapM_ disallowCC $ apiHandlers pDef
5456

55-
disallowCC (DispatchCC _ _) = die $ ExitOther "IllegalControlChannel"
56-
disallowCC _ = return ()
57+
-- TODO: better failure messages here!
58+
disallowCC (DispatchCC _ _) = die $ ExitOther "IllegalControlChannel"
59+
disallowCC (DispatchSTM _ _) = die $ ExitOther "IllegalSTMAction"
60+
disallowCC _ = return ()
5761

5862
recvQueue :: PrioritisedProcessDefinition s
5963
-> s
@@ -325,4 +329,3 @@ applyPolicy p s m =
325329
where
326330
logIt =
327331
Log.report Log.info Log.logChannel $ "Unhandled Gen Input Message: " ++ (show m)
328-

0 commit comments

Comments
 (0)