zeromq4-patterns-0.3.1.0: Haskell implementation of several ZeroMQ patterns.
Safe HaskellNone
LanguageHaskell2010

System.ZMQ4.Patterns.Clone

Description

This module implements the ZeroMQ reliable pub-sub (clone) pattern.

See <http://zguide.zeromq.org/page:all#Reliable-Pub-Sub-Clone-Pattern the ZeroMQ guide> for more information.

Example usage

import Control.Concurrent (threadDelay)
import Control.Concurrent.MVar
import Control.Concurrent.Async

import Control.Monad (forever)

import System.ZMQ4.Patterns.Clone (server, client)

-- | Produce an infinite stream of numbers
producer :: IO ()
producer = do
    chan <- newEmptyMVar
    withAsync (server "tcp://:5009" "tcp://:5010" chan) $ \_ ->
        let send i = putMVar chan i >> threadDelay (100*1000)
        mapM_ send [(1 :: Integer)..]

-- | Consume the stream of numbers
consumer :: IO ()
consumer = do
    chan <- newEmptyMVar
    withAsync (client "tcp://127.0.0.1:5009" "tcp://127.0.0.1:5010" chan) $ \_ ->
        forever $ do
            n <- takeMVar chan
            print n
Synopsis

Server and client

publisher Source #

Arguments

:: Binary a 
=> String

Bind address of the PUB socket

-> String

Bind address of the ROUTER socket

-> MVar a

Channel of incoming messages

-> IO () 

A server publishing messages to a client.

This function will start serving messages on the current thread.

It will send all objects that are pushed on the channel to a PUB socket.

It will listen for snapshot requests on a ROUTER socket.

The server will automatically keep the last sent object cached, and use at as a snapshot. Sequencing is automatically handled using a Word64 sequence counter.

subscriber Source #

Arguments

:: Binary a 
=> String

Address of the server's PUB socket

-> String

Address of the server's ROUTER socket

-> MVar a

CHannel where incoming messsages will be written to

-> IO () 

A client receiving messages from a server.

This function will start reading messages on the current thread.

It will connect to the server's PUB and ROUTER ports. It will backlog messages from the PUB socket, while requesting the initial state snapshot from the ROUTER socket. Afterwards, it will forever read from the PUB socket, and process all in-sequence updates.

Note that this pattern is not 100% reliable. Messages might be dropped between the initial state request and the first update.

queryLastState Source #

Arguments

:: Binary a 
=> String

Address of the server's ROUTER socket

-> IO a 

Only request the most recent state from the server.

This function will query the ROUTER port for the latest state, and return it.