Skip to content

Commit 93d68e0

Browse files
dmq: local message notification
1 parent 4318cd7 commit 93d68e0

File tree

5 files changed

+446
-8
lines changed

5 files changed

+446
-8
lines changed

decentralized-message-queue/decentralized-message-queue.cabal

Lines changed: 28 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,16 @@ category: Network
1818
build-type: Simple
1919
extra-doc-files: CHANGELOG.md
2020

21+
common extensions
22+
default-extensions:
23+
BangPatterns
24+
BlockArguments
25+
GADTs
26+
ImportQualifiedPost
27+
LambdaCase
28+
NamedFieldPuns
29+
ScopedTypeVariables
30+
2131
common warnings
2232
ghc-options:
2333
-Wall
@@ -31,7 +41,9 @@ common warnings
3141
-Wunused-packages
3242

3343
library
34-
import: warnings
44+
import:
45+
warnings,
46+
extensions
3547
exposed-modules:
3648
DMQ.Configuration
3749
DMQ.Configuration.CLIOptions
@@ -42,8 +54,16 @@ library
4254
DMQ.Diffusion.PeerSelection
4355
DMQ.NodeToClient
4456
DMQ.NodeToNode
45-
DMQ.Protocol.SigSubmission.Type
57+
DMQ.Protocol.LocalMsgNotification.Client
58+
DMQ.Protocol.LocalMsgNotification.Codec
59+
DMQ.Protocol.LocalMsgNotification.Server
60+
DMQ.Protocol.LocalMsgNotification.Type
61+
DMQ.Protocol.LocalMsgSubmission.Client
62+
DMQ.Protocol.LocalMsgSubmission.Codec
63+
DMQ.Protocol.LocalMsgSubmission.Server
64+
DMQ.Protocol.LocalMsgSubmission.Type
4665
DMQ.Protocol.SigSubmission.Codec
66+
DMQ.Protocol.SigSubmission.Type
4767

4868
build-depends:
4969
aeson >=2.1.1.0 && <3,
@@ -67,14 +87,15 @@ library
6787
random ^>=1.2,
6888
text >=1.2.4 && <2.2,
6989
time ^>= 1.12,
70-
typed-protocols:{cborg, typed-protocols} ^>=1.0,
90+
typed-protocols:{typed-protocols, cborg} ^>=1.0,
7191

7292
hs-source-dirs: src
7393
default-language: Haskell2010
74-
default-extensions: ImportQualifiedPost
7594

7695
executable dmq-exe
77-
import: warnings
96+
import:
97+
warnings,
98+
extensions
7899
main-is: Main.hs
79100
ghc-options:
80101
-threaded
@@ -91,12 +112,11 @@ executable dmq-exe
91112

92113
hs-source-dirs: app
93114
default-language: Haskell2010
94-
default-extensions: ImportQualifiedPost
95115

96116
test-suite dmq-test
97-
import: warnings
117+
import: warnings,
118+
extensions
98119
default-language: Haskell2010
99-
default-extensions: ImportQualifiedPost
100120
other-modules:
101121
Test.DMQ.NodeToClient
102122
Test.DMQ.NodeToNode
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
{-# LANGUAGE DataKinds #-}
2+
{-# LANGUAGE PolyKinds #-}
3+
4+
-- | todo haddock
5+
module DMQ.Protocol.LocalMsgNotification.Client
6+
( LocalMsgNotificationClient (..)
7+
, LocalMsgNotificationClientStIdle (..)
8+
, localMsgNotificationClientPeer) where
9+
10+
import Data.List.NonEmpty (NonEmpty)
11+
12+
import DMQ.Protocol.LocalMsgNotification.Type
13+
import Network.TypedProtocol.Peer.Client
14+
15+
-- | todo haddock
16+
newtype LocalMsgNotificationClient m msg a = LocalMsgNotificationClient {
17+
runMsgNotificationClient :: m (LocalMsgNotificationClientStIdle m msg a)
18+
}
19+
20+
21+
-- | todo haddock
22+
data LocalMsgNotificationClientStIdle m msg a =
23+
SendMsgRequestBlocking
24+
(m a) -- ^ result if done
25+
( NonEmpty msg
26+
-> HasMore
27+
-> m (LocalMsgNotificationClientStIdle m msg a)) -- ^ continue
28+
| SendMsgRequestNonBlocking
29+
( [msg]
30+
-> HasMore
31+
-> m (LocalMsgNotificationClientStIdle m msg a))
32+
| SendMsgDone (m a)
33+
34+
35+
-- | todo haddock
36+
localMsgNotificationClientPeer
37+
:: forall m msg a .(Monad m)
38+
=> LocalMsgNotificationClient m msg a
39+
-> Client (LocalMsgNotification msg) NonPipelined StIdle m a
40+
localMsgNotificationClientPeer (LocalMsgNotificationClient client) =
41+
Effect $ go <$> client
42+
where
43+
go :: LocalMsgNotificationClientStIdle m msg a
44+
-> Client (LocalMsgNotification msg) NonPipelined StIdle m a
45+
go (SendMsgRequestBlocking done k) =
46+
Yield (MsgRequest SingBlocking)
47+
$ Await \case
48+
MsgServerDone -> Effect $ Done <$> done
49+
MsgReply (BlockingReply msgs) more -> Effect $ go <$> k msgs more
50+
51+
go (SendMsgRequestNonBlocking k) =
52+
Yield (MsgRequest SingNonBlocking)
53+
$ Await \case
54+
MsgReply (NonBlockingReply msgs) more -> Effect $ go <$> k msgs more
55+
56+
go (SendMsgDone done) =
57+
Yield MsgClientDone . Effect $ Done <$> done
Lines changed: 123 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,123 @@
1+
{-# LANGUAGE FlexibleContexts #-}
2+
{-# LANGUAGE PartialTypeSignatures #-}
3+
{-# LANGUAGE PolyKinds #-}
4+
{-# LANGUAGE RankNTypes #-}
5+
6+
-- | todo haddock
7+
module DMQ.Protocol.LocalMsgNotification.Codec
8+
( codecLocalMsgNotification
9+
, decodeLocalMsgNotification) where
10+
11+
import Codec.CBOR.Decoding qualified as CBOR
12+
import Codec.CBOR.Encoding qualified as CBOR
13+
import Codec.CBOR.Read qualified as CBOR
14+
import Control.Monad.Class.MonadST
15+
import Data.ByteString.Lazy (ByteString)
16+
import Data.Kind
17+
import Data.List.NonEmpty qualified as NonEmpty
18+
import Text.Printf
19+
20+
import Network.TypedProtocol.Codec.CBOR
21+
22+
import DMQ.Protocol.LocalMsgNotification.Type
23+
24+
-- | todo haddock
25+
--
26+
codecLocalMsgNotification
27+
:: forall msg m.
28+
(MonadST m)
29+
=> (msg -> CBOR.Encoding)
30+
-> (forall s. CBOR.Decoder s msg)
31+
-> Codec (LocalMsgNotification msg) CBOR.DeserialiseFailure m ByteString
32+
codecLocalMsgNotification encodeMsg decodeMsg =
33+
mkCodecCborLazyBS
34+
encode
35+
decode
36+
where
37+
encode :: forall st st'.
38+
Message (LocalMsgNotification msg) st st'
39+
-> CBOR.Encoding
40+
encode (MsgRequest blocking) =
41+
CBOR.encodeListLen 2
42+
<> CBOR.encodeWord 0
43+
<> CBOR.encodeBool (case blocking of
44+
SingBlocking -> True
45+
SingNonBlocking -> False)
46+
47+
encode (MsgReply msgs HasMore { hasMore }) =
48+
CBOR.encodeListLen 3
49+
<> CBOR.encodeWord 1
50+
<> CBOR.encodeListLenIndef
51+
<> foldr (\msg r ->
52+
encodeMsg msg <> r)
53+
CBOR.encodeBreak
54+
msgs'
55+
<> CBOR.encodeBool hasMore
56+
where
57+
msgs' :: [msg]
58+
msgs' = case msgs of
59+
BlockingReply xs -> NonEmpty.toList xs
60+
NonBlockingReply xs -> xs
61+
62+
encode MsgServerDone =
63+
CBOR.encodeListLen 1
64+
<> CBOR.encodeWord 2
65+
66+
encode MsgClientDone =
67+
CBOR.encodeListLen 1
68+
<> CBOR.encodeWord 3
69+
70+
decode :: forall (st :: LocalMsgNotification msg).
71+
ActiveState st
72+
=> StateToken st
73+
-> forall s. CBOR.Decoder s (SomeMessage st)
74+
decode stok = do
75+
len <- CBOR.decodeListLen
76+
key <- CBOR.decodeWord
77+
decodeLocalMsgNotification decodeMsg stok len key
78+
79+
-- | todo haddock
80+
decodeLocalMsgNotification :: forall (msg :: Type) (st' :: LocalMsgNotification msg) s.
81+
ActiveState st'
82+
=> (forall s'. CBOR.Decoder s' msg)
83+
-> StateToken st'
84+
-> Int
85+
-> Word
86+
-> CBOR.Decoder s (SomeMessage st')
87+
decodeLocalMsgNotification decodeMsg = decode
88+
where
89+
decode :: StateToken st'
90+
-> Int
91+
-> Word
92+
-> CBOR.Decoder s (SomeMessage st')
93+
decode stok len key = case (stok, len, key) of
94+
(SingIdle, 1, 3) -> return $ SomeMessage MsgClientDone
95+
(SingIdle, 2, 0) -> do
96+
blocking <- CBOR.decodeBool
97+
return $! if blocking
98+
then SomeMessage (MsgRequest SingBlocking)
99+
else SomeMessage (MsgRequest SingNonBlocking)
100+
(SingBusy SingBlocking, 1, 2) ->
101+
return $ SomeMessage MsgServerDone
102+
(SingBusy blocking, 3, 1) -> do
103+
msgs <- CBOR.decodeSequenceLenIndef
104+
(flip (:)) [] reverse
105+
(do CBOR.decodeListLenOf 2
106+
decodeMsg)
107+
more <- HasMore <$> CBOR.decodeBool
108+
case (blocking, msgs) of
109+
(SingBlocking, m:msgs') ->
110+
return . SomeMessage $ MsgReply (BlockingReply (m NonEmpty.:| msgs'))
111+
more
112+
(SingNonBlocking, _) ->
113+
return . SomeMessage $ MsgReply (NonBlockingReply msgs)
114+
more
115+
(SingBlocking, []) -> fail "todo error message"
116+
(SingDone, _, _) -> notActiveState stok
117+
118+
--
119+
-- failure
120+
--
121+
(_, _, _) ->
122+
fail (printf "codecLocalMsgNotification (%s, %s) unexpected key (%d, %d)"
123+
(show (activeAgency :: ActiveAgency st')) (show stok) key len)
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
{-# LANGUAGE DataKinds #-}
2+
{-# LANGUAGE PolyKinds #-}
3+
{-# LANGUAGE RankNTypes #-}
4+
5+
-- | todo haddock
6+
module DMQ.Protocol.LocalMsgNotification.Server
7+
( LocalMsgNotificationServer (..)
8+
, ServerIdle (..)
9+
, ServerResponse (..)
10+
, localMsgNotificationServerPeer) where
11+
12+
import DMQ.Protocol.LocalMsgNotification.Type
13+
import Network.TypedProtocol.Peer.Server
14+
15+
-- | todo haddock
16+
newtype LocalMsgNotificationServer m msg a =
17+
LocalMsgNotificationServer (m (ServerIdle m msg a))
18+
19+
20+
-- | todo haddock
21+
data ServerIdle m msg a = ServerIdle {
22+
msgRequestHandler :: forall blocking.
23+
SingBlockingStyle blocking
24+
-> m (ServerResponse m blocking msg a),
25+
msgDoneHandler :: m a
26+
}
27+
28+
29+
-- | todo haddock
30+
data ServerResponse m blocking msg a where
31+
ServerDone :: a -> ServerResponse m StBlocking msg a
32+
ServerReply :: BlockingReplyList blocking msg -- ^ received messages
33+
-> HasMore
34+
-> m (ServerIdle m msg a) -- ^ continue
35+
-> ServerResponse m blocking msg a
36+
37+
38+
localMsgNotificationServerPeer
39+
:: forall m msg a. Monad m
40+
=> LocalMsgNotificationServer m msg a
41+
-> Server (LocalMsgNotification msg) NonPipelined StIdle m a
42+
localMsgNotificationServerPeer (LocalMsgNotificationServer handler) =
43+
Effect $ go <$> handler
44+
where
45+
go :: ServerIdle m msg a
46+
-> Server (LocalMsgNotification msg) NonPipelined StIdle m a
47+
go ServerIdle { msgRequestHandler, msgDoneHandler } =
48+
Await \case
49+
MsgRequest blocking -> Effect do
50+
reply <- msgRequestHandler blocking
51+
case reply of
52+
ServerDone result ->
53+
pure $ Yield MsgServerDone (Done result)
54+
ServerReply msgs more k ->
55+
pure $ Yield (MsgReply msgs more) (Effect $ go <$> k)
56+
57+
MsgClientDone -> Effect $ Done <$> msgDoneHandler

0 commit comments

Comments
 (0)