Skip to content

Commit ff6004b

Browse files
dmq: local message notification
1 parent 27ca5d6 commit ff6004b

File tree

5 files changed

+483
-8
lines changed

5 files changed

+483
-8
lines changed

decentralized-message-queue/decentralized-message-queue.cabal

Lines changed: 28 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,16 @@ category: Network
1818
build-type: Simple
1919
extra-doc-files: CHANGELOG.md
2020

21+
common extensions
22+
default-extensions:
23+
BangPatterns
24+
BlockArguments
25+
GADTs
26+
ImportQualifiedPost
27+
LambdaCase
28+
NamedFieldPuns
29+
ScopedTypeVariables
30+
2131
common warnings
2232
ghc-options:
2333
-Wall
@@ -31,7 +41,9 @@ common warnings
3141
-Wunused-packages
3242

3343
library
34-
import: warnings
44+
import:
45+
warnings,
46+
extensions
3547
exposed-modules:
3648
DMQ.Configuration
3749
DMQ.Configuration.CLIOptions
@@ -42,8 +54,16 @@ library
4254
DMQ.Diffusion.PeerSelection
4355
DMQ.NodeToClient
4456
DMQ.NodeToNode
45-
DMQ.Protocol.SigSubmission.Type
57+
DMQ.Protocol.LocalMsgNotification.Client
58+
DMQ.Protocol.LocalMsgNotification.Codec
59+
DMQ.Protocol.LocalMsgNotification.Server
60+
DMQ.Protocol.LocalMsgNotification.Type
61+
DMQ.Protocol.LocalMsgSubmission.Client
62+
DMQ.Protocol.LocalMsgSubmission.Codec
63+
DMQ.Protocol.LocalMsgSubmission.Server
64+
DMQ.Protocol.LocalMsgSubmission.Type
4665
DMQ.Protocol.SigSubmission.Codec
66+
DMQ.Protocol.SigSubmission.Type
4767

4868
build-depends:
4969
aeson >=2.1.1.0 && <3,
@@ -67,14 +87,15 @@ library
6787
random ^>=1.2,
6888
text >=1.2.4 && <2.2,
6989
time ^>= 1.12,
70-
typed-protocols:{cborg, typed-protocols} ^>=1.0,
90+
typed-protocols:{typed-protocols, cborg} ^>=1.0,
7191

7292
hs-source-dirs: src
7393
default-language: Haskell2010
74-
default-extensions: ImportQualifiedPost
7594

7695
executable dmq-exe
77-
import: warnings
96+
import:
97+
warnings,
98+
extensions
7899
main-is: Main.hs
79100
ghc-options:
80101
-threaded
@@ -91,12 +112,11 @@ executable dmq-exe
91112

92113
hs-source-dirs: app
93114
default-language: Haskell2010
94-
default-extensions: ImportQualifiedPost
95115

96116
test-suite dmq-test
97-
import: warnings
117+
import: warnings,
118+
extensions
98119
default-language: Haskell2010
99-
default-extensions: ImportQualifiedPost
100120
other-modules:
101121
Test.DMQ.NodeToClient
102122
Test.DMQ.NodeToNode
Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
{-# LANGUAGE DataKinds #-}
2+
{-# LANGUAGE PolyKinds #-}
3+
4+
-- | A higher level API to implement clients for local message notification
5+
-- miniprotocol.
6+
--
7+
-- For execution, 'localMsgNotificationClientPeer' reinterprets this high level
8+
-- description into the underlying typed protocol representation.
9+
--
10+
module DMQ.Protocol.LocalMsgNotification.Client
11+
( -- * Client API types
12+
LocalMsgNotificationClient (..)
13+
, LocalMsgNotificationClientStIdle (..)
14+
-- * Translates the client into a typed protocol
15+
, localMsgNotificationClientPeer
16+
) where
17+
18+
import Data.List.NonEmpty (NonEmpty)
19+
20+
import DMQ.Protocol.LocalMsgNotification.Type
21+
import Network.TypedProtocol.Peer.Client
22+
23+
-- | The high level client wrapper
24+
--
25+
newtype LocalMsgNotificationClient m msg a = LocalMsgNotificationClient {
26+
runMsgNotificationClient :: m (LocalMsgNotificationClientStIdle m msg a)
27+
}
28+
29+
30+
-- | The client API message types
31+
--
32+
data LocalMsgNotificationClientStIdle m msg a =
33+
SendMsgRequestBlocking
34+
(m a) -- ^ result if done
35+
( NonEmpty msg
36+
-> HasMore
37+
-> m (LocalMsgNotificationClientStIdle m msg a)) -- ^ continue
38+
| SendMsgRequestNonBlocking
39+
( [msg]
40+
-> HasMore
41+
-> m (LocalMsgNotificationClientStIdle m msg a))
42+
| SendMsgDone (m a)
43+
44+
45+
-- | A non-pipelined 'Peer' representing the 'LocalMsgNotificationClient'.
46+
--
47+
-- Translates the client into the typed protocol representation.
48+
--
49+
localMsgNotificationClientPeer
50+
:: forall m msg a .(Monad m)
51+
=> LocalMsgNotificationClient m msg a
52+
-> Client (LocalMsgNotification msg) NonPipelined StIdle m a
53+
localMsgNotificationClientPeer (LocalMsgNotificationClient client) =
54+
Effect $ go <$> client
55+
where
56+
go :: LocalMsgNotificationClientStIdle m msg a
57+
-> Client (LocalMsgNotification msg) NonPipelined StIdle m a
58+
go (SendMsgRequestBlocking done k) =
59+
Yield (MsgRequest SingBlocking)
60+
$ Await \case
61+
MsgServerDone -> Effect $ Done <$> done
62+
MsgReply (BlockingReply msgs) more -> Effect $ go <$> k msgs more
63+
64+
go (SendMsgRequestNonBlocking k) =
65+
Yield (MsgRequest SingNonBlocking)
66+
$ Await \case
67+
MsgReply (NonBlockingReply msgs) more -> Effect $ go <$> k msgs more
68+
69+
go (SendMsgDone done) =
70+
Yield MsgClientDone . Effect $ Done <$> done
Lines changed: 130 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,130 @@
1+
{-# LANGUAGE FlexibleContexts #-}
2+
{-# LANGUAGE PartialTypeSignatures #-}
3+
{-# LANGUAGE PolyKinds #-}
4+
{-# LANGUAGE RankNTypes #-}
5+
6+
-- | The codec for the local message notification miniprotocol
7+
--
8+
module DMQ.Protocol.LocalMsgNotification.Codec
9+
( codecLocalMsgNotification
10+
, decodeLocalMsgNotification
11+
) where
12+
13+
import Codec.CBOR.Decoding qualified as CBOR
14+
import Codec.CBOR.Encoding qualified as CBOR
15+
import Codec.CBOR.Read qualified as CBOR
16+
import Control.Monad.Class.MonadST
17+
import Data.ByteString.Lazy (ByteString)
18+
import Data.Functor ((<&>))
19+
import Data.Kind
20+
import Data.List.NonEmpty qualified as NonEmpty
21+
import Text.Printf
22+
23+
import Network.TypedProtocol.Codec.CBOR
24+
25+
import DMQ.Protocol.LocalMsgNotification.Type
26+
27+
codecLocalMsgNotification
28+
:: forall msg m.
29+
MonadST m
30+
=> (msg -> CBOR.Encoding)
31+
-> (forall s. CBOR.Decoder s msg)
32+
-> Codec (LocalMsgNotification msg) CBOR.DeserialiseFailure m ByteString
33+
codecLocalMsgNotification encodeMsg decodeMsg =
34+
mkCodecCborLazyBS
35+
encode
36+
decode
37+
where
38+
encode :: forall st st'.
39+
Message (LocalMsgNotification msg) st st'
40+
-> CBOR.Encoding
41+
encode (MsgRequest blocking) =
42+
CBOR.encodeListLen 2
43+
<> CBOR.encodeWord 0
44+
<> CBOR.encodeBool (case blocking of
45+
SingBlocking -> True
46+
SingNonBlocking -> False)
47+
48+
encode (MsgReply msgs hasMore) =
49+
CBOR.encodeListLen 3
50+
<> CBOR.encodeWord 1
51+
<> CBOR.encodeListLenIndef
52+
<> foldr (\msg r ->
53+
encodeMsg msg <> r)
54+
CBOR.encodeBreak
55+
msgs'
56+
<> CBOR.encodeBool hasMore'
57+
where
58+
hasMore' = case hasMore of
59+
HasMore -> True
60+
DoesNotHaveMore -> False
61+
62+
msgs' :: [msg]
63+
msgs' = case msgs of
64+
BlockingReply xs -> NonEmpty.toList xs
65+
NonBlockingReply xs -> xs
66+
67+
encode MsgServerDone =
68+
CBOR.encodeListLen 1
69+
<> CBOR.encodeWord 2
70+
71+
encode MsgClientDone =
72+
CBOR.encodeListLen 1
73+
<> CBOR.encodeWord 3
74+
75+
decode :: forall (st :: LocalMsgNotification msg).
76+
ActiveState st
77+
=> StateToken st
78+
-> forall s. CBOR.Decoder s (SomeMessage st)
79+
decode stok = do
80+
len <- CBOR.decodeListLen
81+
key <- CBOR.decodeWord
82+
decodeLocalMsgNotification decodeMsg stok len key
83+
84+
85+
decodeLocalMsgNotification :: forall (msg :: Type) (st' :: LocalMsgNotification msg) s.
86+
ActiveState st'
87+
=> (forall s'. CBOR.Decoder s' msg)
88+
-> StateToken st'
89+
-> Int
90+
-> Word
91+
-> CBOR.Decoder s (SomeMessage st')
92+
decodeLocalMsgNotification decodeMsg = decode
93+
where
94+
decode :: StateToken st'
95+
-> Int
96+
-> Word
97+
-> CBOR.Decoder s (SomeMessage st')
98+
decode stok len key = case (stok, len, key) of
99+
(SingIdle, 1, 3) -> return $ SomeMessage MsgClientDone
100+
(SingIdle, 2, 0) -> do
101+
blocking <- CBOR.decodeBool
102+
return $! if blocking
103+
then SomeMessage (MsgRequest SingBlocking)
104+
else SomeMessage (MsgRequest SingNonBlocking)
105+
(SingBusy SingBlocking, 1, 2) ->
106+
return $ SomeMessage MsgServerDone
107+
(SingBusy blocking, 3, 1) -> do
108+
msgs <- CBOR.decodeSequenceLenIndef
109+
(flip (:)) [] reverse
110+
(do CBOR.decodeListLenOf 2
111+
decodeMsg)
112+
more <- CBOR.decodeBool <&> \case
113+
True -> HasMore
114+
False -> DoesNotHaveMore
115+
case (blocking, msgs) of
116+
(SingBlocking, m:msgs') ->
117+
return . SomeMessage $ MsgReply (BlockingReply (m NonEmpty.:| msgs'))
118+
more
119+
(SingNonBlocking, _) ->
120+
return . SomeMessage $ MsgReply (NonBlockingReply msgs)
121+
more
122+
(SingBlocking, []) -> fail "codecLocalMsgNotification: MsgReply: empty list not permitted"
123+
(SingDone, _, _) -> notActiveState stok
124+
125+
--
126+
-- failure
127+
--
128+
(_, _, _) ->
129+
fail (printf "codecLocalMsgNotification (%s, %s) unexpected key (%d, %d)"
130+
(show (activeAgency :: ActiveAgency st')) (show stok) key len)
Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
{-# LANGUAGE DataKinds #-}
2+
{-# LANGUAGE PolyKinds #-}
3+
{-# LANGUAGE RankNTypes #-}
4+
5+
-- | A higher level API to implement server for local message notification
6+
-- miniprotocol.
7+
--
8+
-- For execution, 'localMsgNotificationServerPeer' reinterprets this high level
9+
-- description into the underlying typed protocol representation.
10+
--
11+
module DMQ.Protocol.LocalMsgNotification.Server
12+
( -- * Server API types
13+
LocalMsgNotificationServer (..)
14+
, ServerIdle (..)
15+
, ServerResponse (..)
16+
-- * Translates the server into a typed protocol
17+
, localMsgNotificationServerPeer
18+
) where
19+
20+
import DMQ.Protocol.LocalMsgNotification.Type
21+
import Network.TypedProtocol.Peer.Server
22+
23+
-- | The high level server wrapper
24+
--
25+
newtype LocalMsgNotificationServer m msg a =
26+
LocalMsgNotificationServer (m (ServerIdle m msg a))
27+
28+
29+
-- | The server high level message handlers
30+
--
31+
data ServerIdle m msg a = ServerIdle {
32+
msgRequestHandler :: forall blocking.
33+
SingBlockingStyle blocking
34+
-> m (ServerResponse m blocking msg a),
35+
msgDoneHandler :: m a
36+
}
37+
38+
39+
-- | The server high level response type
40+
--
41+
data ServerResponse m blocking msg a where
42+
-- | The server terminates the connection
43+
--
44+
ServerDone :: a -> ServerResponse m StBlocking msg a
45+
46+
-- | The server provides a response to the client's query
47+
--
48+
ServerReply :: BlockingReplyList blocking msg -- ^ received messages
49+
-> HasMore
50+
-> m (ServerIdle m msg a) -- ^ a continuation
51+
-> ServerResponse m blocking msg a
52+
53+
54+
-- | tranlates the server into the typed protocol representation
55+
--
56+
localMsgNotificationServerPeer
57+
:: forall m msg a. Monad m
58+
=> LocalMsgNotificationServer m msg a
59+
-> Server (LocalMsgNotification msg) NonPipelined StIdle m a
60+
localMsgNotificationServerPeer (LocalMsgNotificationServer handler) =
61+
Effect $ go <$> handler
62+
where
63+
go :: ServerIdle m msg a
64+
-> Server (LocalMsgNotification msg) NonPipelined StIdle m a
65+
go ServerIdle { msgRequestHandler, msgDoneHandler } =
66+
Await \case
67+
MsgRequest blocking -> Effect do
68+
reply <- msgRequestHandler blocking
69+
case reply of
70+
ServerDone result ->
71+
pure $ Yield MsgServerDone (Done result)
72+
ServerReply msgs more k ->
73+
pure $ Yield (MsgReply msgs more) (Effect $ go <$> k)
74+
75+
MsgClientDone -> Effect $ Done <$> msgDoneHandler

0 commit comments

Comments
 (0)