Skip to content

Commit bb54b2f

Browse files
dmq: local message notification
1 parent 4e69dad commit bb54b2f

File tree

4 files changed

+449
-0
lines changed

4 files changed

+449
-0
lines changed
Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
{-# LANGUAGE DataKinds #-}
2+
{-# LANGUAGE PolyKinds #-}
3+
4+
-- | A higher level API to implement clients for local message notification
5+
-- miniprotocol.
6+
--
7+
-- For execution, 'localMsgNotificationClientPeer' reinterprets this high level
8+
-- description into the underlying typed protocol representation.
9+
--
10+
module DMQ.Protocol.LocalMsgNotification.Client
11+
( -- * Client API types
12+
LocalMsgNotificationClient (..)
13+
, LocalMsgNotificationClientStIdle (..)
14+
-- * Translates the client into a typed protocol
15+
, localMsgNotificationClientPeer
16+
) where
17+
18+
import Data.List.NonEmpty (NonEmpty)
19+
20+
import DMQ.Protocol.LocalMsgNotification.Type
21+
import Network.TypedProtocol.Peer.Client
22+
23+
-- | The high level client wrapper
24+
--
25+
newtype LocalMsgNotificationClient m msg a = LocalMsgNotificationClient {
26+
runMsgNotificationClient :: m (LocalMsgNotificationClientStIdle m msg a)
27+
}
28+
29+
30+
-- | The client API message types
31+
--
32+
data LocalMsgNotificationClientStIdle m msg a =
33+
SendMsgRequestBlocking
34+
!(m a) -- ^ result if done
35+
!( NonEmpty msg
36+
-> HasMore
37+
-> m (LocalMsgNotificationClientStIdle m msg a)) -- ^ a continuation
38+
| SendMsgRequestNonBlocking
39+
!( [msg]
40+
-> HasMore
41+
-> m (LocalMsgNotificationClientStIdle m msg a))
42+
| SendMsgDone !(m a)
43+
44+
45+
-- | A non-pipelined 'Peer' representing the 'LocalMsgNotificationClient'.
46+
--
47+
-- Translates the client into the typed protocol representation.
48+
--
49+
localMsgNotificationClientPeer
50+
:: forall m msg a .(Monad m)
51+
=> LocalMsgNotificationClient m msg a
52+
-> Client (LocalMsgNotification msg) NonPipelined StIdle m a
53+
localMsgNotificationClientPeer (LocalMsgNotificationClient client) =
54+
Effect $ go <$> client
55+
where
56+
go :: LocalMsgNotificationClientStIdle m msg a
57+
-> Client (LocalMsgNotification msg) NonPipelined StIdle m a
58+
go (SendMsgRequestBlocking done k) =
59+
Yield (MsgRequest SingBlocking)
60+
$ Await \case
61+
MsgServerDone -> Effect $ Done <$> done
62+
MsgReply (BlockingReply msgs) more -> Effect $ go <$> k msgs more
63+
64+
go (SendMsgRequestNonBlocking k) =
65+
Yield (MsgRequest SingNonBlocking)
66+
$ Await \case
67+
MsgReply (NonBlockingReply msgs) more -> Effect $ go <$> k msgs more
68+
69+
go (SendMsgDone done) =
70+
Yield MsgClientDone . Effect $ Done <$> done
Lines changed: 123 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,123 @@
1+
{-# LANGUAGE FlexibleContexts #-}
2+
{-# LANGUAGE PartialTypeSignatures #-}
3+
{-# LANGUAGE PolyKinds #-}
4+
{-# LANGUAGE RankNTypes #-}
5+
6+
-- | The codec for the local message notification miniprotocol
7+
--
8+
module DMQ.Protocol.LocalMsgNotification.Codec
9+
( codecLocalMsgNotification
10+
, decodeLocalMsgNotification
11+
) where
12+
13+
import Codec.CBOR.Decoding qualified as CBOR
14+
import Codec.CBOR.Encoding qualified as CBOR
15+
import Codec.CBOR.Read qualified as CBOR
16+
import Control.Monad.Class.MonadST
17+
import Data.ByteString.Lazy (ByteString)
18+
import Data.Functor ((<&>))
19+
import Data.Kind
20+
import Data.List.NonEmpty qualified as NonEmpty
21+
import Text.Printf
22+
23+
import DMQ.Protocol.LocalMsgNotification.Type
24+
import Network.TypedProtocol.Codec.CBOR
25+
26+
codecLocalMsgNotification
27+
:: forall msg m.
28+
MonadST m
29+
=> (msg -> CBOR.Encoding)
30+
-> (forall s. CBOR.Decoder s msg)
31+
-> Codec (LocalMsgNotification msg) CBOR.DeserialiseFailure m ByteString
32+
codecLocalMsgNotification encodeMsg decodeMsg =
33+
mkCodecCborLazyBS
34+
encode
35+
decode
36+
where
37+
encode :: forall st st'.
38+
Message (LocalMsgNotification msg) st st'
39+
-> CBOR.Encoding
40+
encode (MsgRequest blocking) =
41+
CBOR.encodeListLen 2
42+
<> CBOR.encodeWord 0
43+
<> CBOR.encodeBool (case blocking of
44+
SingBlocking -> True
45+
SingNonBlocking -> False)
46+
47+
encode (MsgReply msgs hasMore) =
48+
CBOR.encodeListLen 3
49+
<> CBOR.encodeWord 1
50+
<> CBOR.encodeListLenIndef
51+
<> foldr (\msg r ->
52+
encodeMsg msg <> r)
53+
CBOR.encodeBreak
54+
msgs
55+
<> CBOR.encodeBool hasMore'
56+
where
57+
hasMore' = case hasMore of
58+
HasMore -> True
59+
DoesNotHaveMore -> False
60+
61+
encode MsgServerDone =
62+
CBOR.encodeListLen 1
63+
<> CBOR.encodeWord 2
64+
65+
encode MsgClientDone =
66+
CBOR.encodeListLen 1
67+
<> CBOR.encodeWord 3
68+
69+
decode :: forall (st :: LocalMsgNotification msg).
70+
ActiveState st
71+
=> StateToken st
72+
-> forall s. CBOR.Decoder s (SomeMessage st)
73+
decode stok = do
74+
len <- CBOR.decodeListLen
75+
key <- CBOR.decodeWord
76+
decodeLocalMsgNotification decodeMsg stok len key
77+
78+
79+
decodeLocalMsgNotification :: forall (msg :: Type) (st' :: LocalMsgNotification msg) s.
80+
ActiveState st'
81+
=> (forall s'. CBOR.Decoder s' msg)
82+
-> StateToken st'
83+
-> Int
84+
-> Word
85+
-> CBOR.Decoder s (SomeMessage st')
86+
decodeLocalMsgNotification decodeMsg = decode
87+
where
88+
decode :: StateToken st'
89+
-> Int
90+
-> Word
91+
-> CBOR.Decoder s (SomeMessage st')
92+
decode stok len key = case (stok, len, key) of
93+
(SingIdle, 1, 3) -> return $ SomeMessage MsgClientDone
94+
(SingIdle, 2, 0) -> do
95+
blocking <- CBOR.decodeBool
96+
return $! if blocking
97+
then SomeMessage (MsgRequest SingBlocking)
98+
else SomeMessage (MsgRequest SingNonBlocking)
99+
(SingBusy SingBlocking, 1, 2) ->
100+
return $ SomeMessage MsgServerDone
101+
(SingBusy blocking, 3, 1) -> do
102+
CBOR.decodeListLenIndef
103+
msgs <- CBOR.decodeSequenceLenIndef
104+
(flip (:)) [] reverse
105+
decodeMsg
106+
more <- CBOR.decodeBool <&> \case
107+
True -> HasMore
108+
False -> DoesNotHaveMore
109+
case (blocking, msgs) of
110+
(SingBlocking, m:msgs') ->
111+
return . SomeMessage $ MsgReply (BlockingReply (m NonEmpty.:| msgs'))
112+
more
113+
(SingNonBlocking, _) ->
114+
return . SomeMessage $ MsgReply (NonBlockingReply msgs)
115+
more
116+
(SingBlocking, []) -> fail "codecLocalMsgNotification: MsgReply: empty list not permitted"
117+
(SingDone, _, _) -> notActiveState stok
118+
--
119+
-- failure
120+
--
121+
(_, _, _) ->
122+
fail (printf "codecLocalMsgNotification (%s, %s) unexpected key (%d, %d)"
123+
(show (activeAgency :: ActiveAgency st')) (show stok) key len)
Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
{-# LANGUAGE DataKinds #-}
2+
{-# LANGUAGE PolyKinds #-}
3+
{-# LANGUAGE RankNTypes #-}
4+
5+
-- | A higher level API to implement server for local message notification
6+
-- miniprotocol.
7+
--
8+
-- For execution, 'localMsgNotificationServerPeer' reinterprets this high level
9+
-- description into the underlying typed protocol representation.
10+
--
11+
module DMQ.Protocol.LocalMsgNotification.Server
12+
( -- * Server API types
13+
LocalMsgNotificationServer (..)
14+
, ServerIdle (..)
15+
, ServerResponse (..)
16+
-- * Translates the server into a typed protocol
17+
, localMsgNotificationServerPeer
18+
) where
19+
20+
import DMQ.Protocol.LocalMsgNotification.Type
21+
import Network.TypedProtocol.Peer.Server
22+
23+
-- | The high level server wrapper
24+
--
25+
newtype LocalMsgNotificationServer m msg a =
26+
LocalMsgNotificationServer (m (ServerIdle m msg a))
27+
28+
29+
-- | The server high level message handlers
30+
--
31+
data ServerIdle m msg a = ServerIdle {
32+
msgRequestHandler :: forall blocking.
33+
SingBlockingStyle blocking
34+
-> m (ServerResponse m blocking msg a),
35+
msgDoneHandler :: m a
36+
}
37+
38+
39+
-- | The server high level response type
40+
--
41+
data ServerResponse m blocking msg a where
42+
-- | The server terminates the connection
43+
--
44+
ServerDone :: a -> ServerResponse m StBlocking msg a
45+
46+
-- | The server provides a response to the client's query
47+
--
48+
ServerReply :: BlockingReplyList blocking msg -- ^ received messages
49+
-> HasMore
50+
-> m (ServerIdle m msg a) -- ^ a continuation
51+
-> ServerResponse m blocking msg a
52+
53+
54+
-- | tranlates the server into the typed protocol representation
55+
--
56+
localMsgNotificationServerPeer
57+
:: forall m msg a. Monad m
58+
=> LocalMsgNotificationServer m msg a
59+
-> Server (LocalMsgNotification msg) NonPipelined StIdle m a
60+
localMsgNotificationServerPeer (LocalMsgNotificationServer handler) =
61+
Effect $ go <$> handler
62+
where
63+
go :: ServerIdle m msg a
64+
-> Server (LocalMsgNotification msg) NonPipelined StIdle m a
65+
go ServerIdle { msgRequestHandler, msgDoneHandler } =
66+
Await \case
67+
MsgRequest blocking -> Effect do
68+
reply <- msgRequestHandler blocking
69+
case reply of
70+
ServerDone result ->
71+
pure $ Yield MsgServerDone (Done result)
72+
ServerReply msgs more k ->
73+
pure $ Yield (MsgReply msgs more) (Effect $ go <$> k)
74+
75+
MsgClientDone -> Effect $ Done <$> msgDoneHandler

0 commit comments

Comments
 (0)