Skip to content

Commit a3b2e3e

Browse files
dmq: NtC applications
1 parent 825a497 commit a3b2e3e

File tree

7 files changed

+351
-27
lines changed

7 files changed

+351
-27
lines changed

decentralized-message-queue/app/Main.hs

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,14 +17,19 @@ import DMQ.Configuration.CLIOptions (parseCLIOptions)
1717
import DMQ.Configuration.Topology (readTopologyFileOrError)
1818
import DMQ.Diffusion.Applications (diffusionApplications)
1919
import DMQ.Diffusion.Arguments
20-
import DMQ.Diffusion.NodeKernel (withNodeKernel)
20+
import DMQ.Diffusion.NodeKernel (mempool, withNodeKernel)
21+
import DMQ.NodeToClient qualified as NtC
2122
import DMQ.NodeToNode (dmqCodecs, dmqLimitsAndTimeouts, ntnApps)
23+
import DMQ.Protocol.LocalMsgSubmission.Codec
24+
import DMQ.Protocol.SigSubmission.Codec
25+
import DMQ.Protocol.SigSubmission.Type (Sig (..))
2226
import DMQ.Tracer
2327

2428
import DMQ.Diffusion.PeerSelection (policy)
2529
import Ouroboros.Network.Diffusion qualified as Diffusion
2630
import Ouroboros.Network.PeerSelection.PeerSharing.Codec (decodeRemoteAddress,
2731
encodeRemoteAddress)
32+
import Ouroboros.Network.TxSubmission.Mempool.Simple qualified as Mempool
2833

2934
main :: IO ()
3035
main = void . runDMQ =<< execParser opts
@@ -78,6 +83,13 @@ runDMQ commandLineConfig = do
7883
(decodeRemoteAddress maxBound))
7984
dmqLimitsAndTimeouts
8085
defaultSigDecisionPolicy
86+
dmqNtCApps =
87+
let sigSize _ = 0 -- TODO
88+
maxMsgs = 1000 -- TODO: make this dynamic?
89+
mempoolReader = Mempool.getReader sigId sigSize (mempool nodeKernel)
90+
mempoolWriter = Mempool.getWriter sigId (const True) (mempool nodeKernel)
91+
in NtC.ntcApps () mempoolReader mempoolWriter maxMsgs
92+
(NtC.dmqCodecs encodeSig decodeSig encodeReject decodeReject)
8193
dmqDiffusionArguments =
8294
diffusionArguments (if handshakeTracer
8395
then WithEventType "Handshake" >$< tracer
@@ -91,6 +103,7 @@ runDMQ commandLineConfig = do
91103
dmqDiffusionConfiguration
92104
dmqLimitsAndTimeouts
93105
dmqNtNApps
106+
dmqNtCApps
94107
(policy policyRng)
95108

96109
Diffusion.run dmqDiffusionArguments

decentralized-message-queue/decentralized-message-queue.cabal

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,8 +56,11 @@ library
5656
DMQ.NodeToClient.Version
5757
DMQ.NodeToNode
5858
DMQ.NodeToNode.Version
59+
DMQ.NodeToClient.LocalMsgSubmission
60+
DMQ.NodeToClient.LocalMsgNotification
5961
DMQ.Protocol.LocalMsgNotification.Client
6062
DMQ.Protocol.LocalMsgNotification.Codec
63+
DMQ.Protocol.LocalMsgNotification.Examples
6164
DMQ.Protocol.LocalMsgNotification.Server
6265
DMQ.Protocol.LocalMsgNotification.Type
6366
DMQ.Protocol.LocalMsgSubmission.Client
@@ -143,8 +146,10 @@ test-suite dmq-test
143146
QuickCheck,
144147
base >=4.14 && <4.22,
145148
bytestring,
149+
contra-tracer,
146150
decentralized-message-queue,
147151
ouroboros-network-api,
152+
ouroboros-network-framework,
148153
ouroboros-network-protocols:testlib,
149154
ouroboros-network-testing,
150155
quickcheck-instances,
@@ -154,7 +159,8 @@ test-suite dmq-test
154159
time,
155160
typed-protocols:{typed-protocols, codec-properties},
156161
with-utf8,
157-
io-classes
162+
io-classes,
163+
io-sim
158164

159165
ghc-options:
160166
-fno-ignore-asserts

decentralized-message-queue/src/DMQ/Diffusion/Applications.hs

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -8,13 +8,13 @@ import DMQ.Configuration
88
import DMQ.Diffusion.NodeKernel (NodeKernel (..))
99
import DMQ.NodeToClient (NodeToClientVersion, NodeToClientVersionData,
1010
stdVersionDataNTC)
11+
import DMQ.NodeToClient qualified as NTC
1112
import DMQ.NodeToNode (NodeToNodeVersion, NodeToNodeVersionData,
1213
stdVersionDataNTN)
1314
import DMQ.NodeToNode qualified as NTN
1415

1516
import Ouroboros.Network.Diffusion.Types qualified as Diffusion
1617
import Ouroboros.Network.ExitPolicy (RepromoteDelay (..))
17-
import Ouroboros.Network.Mux (OuroborosApplication (..))
1818
import Ouroboros.Network.PeerSelection.Governor.Types (PeerSelectionPolicy)
1919
import Ouroboros.Network.Protocol.Handshake.Version (combineVersions,
2020
simpleSingletonVersions)
@@ -27,6 +27,7 @@ diffusionApplications
2727
-> Diffusion.Configuration NoExtraFlags m ntnFd ntnAddr ntcFd ntcAddr
2828
-> NTN.LimitsAndTimeouts ntnAddr
2929
-> NTN.Apps ntnAddr m a ()
30+
-> NTC.Apps ntcAddr m ()
3031
-> PeerSelectionPolicy ntnAddr m
3132
-> Diffusion.Applications ntnAddr NodeToNodeVersion NodeToNodeVersionData
3233
ntcAddr NodeToClientVersion NodeToClientVersionData
@@ -44,6 +45,7 @@ diffusionApplications
4445
}
4546
ntnLimitsAndTimeouts
4647
ntnApps
48+
ntcApps
4749
peerSelectionPolicy =
4850
Diffusion.Applications {
4951
daApplicationInitiatorMode =
@@ -67,11 +69,7 @@ diffusionApplications
6769
[ simpleSingletonVersions
6870
version
6971
(stdVersionDataNTC networkMagic)
70-
(\_versionData ->
71-
OuroborosApplication
72-
[
73-
]
74-
)
72+
(NTC.responders ntcApps version)
7573
| version <- [minBound..maxBound]
7674
]
7775
, daRethrowPolicy = muxErrorRethrowPolicy

decentralized-message-queue/src/DMQ/Diffusion/NodeKernel.hs

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -39,16 +39,16 @@ import DMQ.Protocol.SigSubmission.Type (Sig (..), SigId)
3939
data NodeKernel ntnAddr m =
4040
NodeKernel {
4141
-- | The fetch client registry, used for the keep alive clients.
42-
fetchClientRegistry :: FetchClientRegistry (ConnectionId ntnAddr) () () m
42+
fetchClientRegistry :: !(FetchClientRegistry (ConnectionId ntnAddr) () () m)
4343

4444
-- | Read the current peer sharing registry, used for interacting with
4545
-- the PeerSharing protocol
46-
, peerSharingRegistry :: PeerSharingRegistry ntnAddr m
47-
, peerSharingAPI :: PeerSharingAPI ntnAddr StdGen m
48-
, mempool :: Mempool m Sig
49-
, sigChannelVar :: TxChannelsVar m ntnAddr SigId Sig
50-
, sigMempoolSem :: TxMempoolSem m
51-
, sigSharedTxStateVar :: SharedTxStateVar m ntnAddr SigId Sig
46+
, peerSharingRegistry :: !(PeerSharingRegistry ntnAddr m)
47+
, peerSharingAPI :: !(PeerSharingAPI ntnAddr StdGen m)
48+
, mempool :: !(Mempool m Sig)
49+
, sigChannelVar :: !(TxChannelsVar m ntnAddr SigId Sig)
50+
, sigMempoolSem :: !(TxMempoolSem m)
51+
, sigSharedTxStateVar :: !(SharedTxStateVar m ntnAddr SigId Sig)
5252
}
5353

5454
newNodeKernel :: ( MonadLabelledSTM m

decentralized-message-queue/src/DMQ/NodeToClient.hs

Lines changed: 186 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,35 +1,58 @@
1-
{-# LANGUAGE NamedFieldPuns #-}
1+
{-# LANGUAGE DataKinds #-}
2+
{-# LANGUAGE RankNTypes #-}
23

34
module DMQ.NodeToClient
4-
( LocalAddress (..)
5-
, module DMQ.NodeToClient.Version
5+
( module DMQ.NodeToClient.Version
66
, Protocols (..)
77
, HandshakeTr
8+
, Apps
9+
, dmqCodecs
10+
, ntcApps
811
, ntcHandshakeArguments
12+
, responders
913
) where
1014

15+
import Data.ByteString.Lazy (ByteString)
16+
import Data.Void
17+
import Data.Word
18+
19+
import Control.Concurrent.Class.MonadSTM
20+
import Control.Monad.Class.MonadFork
1121
import Control.Monad.Class.MonadST (MonadST)
22+
import Control.Monad.Class.MonadThrow
1223
import Control.Tracer (Tracer, nullTracer)
1324

25+
import Codec.CBOR.Decoding qualified as CBOR
26+
import Codec.CBOR.Encoding qualified as CBOR
1427
import Codec.CBOR.Term qualified as CBOR
1528

1629
import Network.Mux qualified as Mx
30+
import Network.TypedProtocol.Codec hiding (decode, encode)
31+
import Network.TypedProtocol.Codec.CBOR qualified as CBOR
32+
33+
import DMQ.NodeToClient.LocalMsgNotification
34+
import DMQ.NodeToClient.LocalMsgSubmission
35+
import DMQ.NodeToClient.Version
36+
import DMQ.Protocol.LocalMsgNotification.Codec
37+
import DMQ.Protocol.LocalMsgNotification.Server
38+
import DMQ.Protocol.LocalMsgNotification.Type
39+
import DMQ.Protocol.LocalMsgSubmission.Codec
40+
import DMQ.Protocol.LocalMsgSubmission.Server
41+
import DMQ.Protocol.LocalMsgSubmission.Type
1742

18-
import Ouroboros.Network.ConnectionId (ConnectionId)
19-
import Ouroboros.Network.Driver.Simple (TraceSendRecv)
43+
import Ouroboros.Network.Context
44+
import Ouroboros.Network.Driver.Simple
2045
import Ouroboros.Network.Handshake.Acceptable (Acceptable (..))
2146
import Ouroboros.Network.Handshake.Queryable (Queryable (..))
47+
import Ouroboros.Network.Mux
2248
import Ouroboros.Network.Protocol.Handshake (Handshake, HandshakeArguments (..))
2349
import Ouroboros.Network.Protocol.Handshake.Codec (cborTermVersionDataCodec,
2450
codecHandshake, noTimeLimitsHandshake)
25-
import Ouroboros.Network.Snocket (LocalAddress (..))
26-
27-
import DMQ.NodeToClient.Version
28-
51+
import Ouroboros.Network.TxSubmission.Inbound.V2.Types
52+
(TxSubmissionMempoolWriter)
53+
import Ouroboros.Network.TxSubmission.Mempool.Reader
54+
import Ouroboros.Network.Util.ShowProxy
2955

30-
data Protocols =
31-
Protocols {
32-
}
3356

3457
type HandshakeTr ntcAddr = Mx.WithBearer (ConnectionId ntcAddr) (TraceSendRecv (Handshake NodeToClientVersion CBOR.Term))
3558

@@ -53,3 +76,154 @@ ntcHandshakeArguments tracer =
5376
, haQueryVersion = queryVersion
5477
, haTimeLimits = noTimeLimitsHandshake
5578
}
79+
80+
81+
data Codecs m sig reject =
82+
Codecs {
83+
msgSubmissionCodec
84+
:: !(Codec (LocalMsgSubmission sig reject)
85+
CBOR.DeserialiseFailure m ByteString)
86+
, msgNotificationCodec
87+
:: !(Codec (LocalMsgNotification sig)
88+
CBOR.DeserialiseFailure m ByteString)
89+
}
90+
91+
dmqCodecs :: MonadST m
92+
=> (sig -> CBOR.Encoding)
93+
-> (forall s. CBOR.Decoder s sig)
94+
-> (reject -> CBOR.Encoding)
95+
-> (forall s. CBOR.Decoder s reject)
96+
-> Codecs m sig reject
97+
dmqCodecs encodeSig decodeSig encodeReject' decodeReject' =
98+
Codecs {
99+
msgSubmissionCodec = codecLocalMsgSubmission encodeSig decodeSig encodeReject' decodeReject'
100+
, msgNotificationCodec = codecLocalMsgNotification encodeSig decodeSig
101+
}
102+
103+
104+
-- | A node-to-client application
105+
--
106+
type App ntcAddr m a =
107+
NodeToClientVersion
108+
-> ResponderContext ntcAddr
109+
-> Mx.Channel m ByteString
110+
-> m (a, Maybe ByteString)
111+
112+
113+
data Apps ntcAddr m a =
114+
Apps {
115+
-- | Start a sig-submission client
116+
aLocalMsgSubmission :: !(App ntcAddr m a)
117+
118+
-- | Start a sig-submission server
119+
, aLocalMsgNotification :: !(App ntcAddr m a)
120+
}
121+
122+
123+
-- | Construct applications for the node-to-client protocols
124+
--
125+
ntcApps
126+
:: (MonadThrow m, MonadThread m, MonadSTM m, ShowProxy reject, ShowProxy sig)
127+
=> reject
128+
-> TxSubmissionMempoolReader msgid sig idx m
129+
-> TxSubmissionMempoolWriter msgid sig idx m
130+
-> Word16
131+
-> Codecs m sig reject
132+
-> Apps ntcAddr m ()
133+
ntcApps reject mempoolReader mempoolWriter maxMsgs
134+
Codecs { msgSubmissionCodec, msgNotificationCodec } =
135+
Apps {
136+
aLocalMsgSubmission
137+
, aLocalMsgNotification
138+
}
139+
where
140+
aLocalMsgSubmission _version _ctx channel = do
141+
labelThisThread "LocalMsgSubmissionServer"
142+
runPeer
143+
nullTracer
144+
msgSubmissionCodec
145+
channel
146+
(localMsgSubmissionServerPeer $
147+
localMsgSubmissionServer nullTracer reject mempoolWriter)
148+
149+
aLocalMsgNotification _version _ctx channel = do
150+
labelThisThread "LocalMsgNotificationServer"
151+
runPeer
152+
nullTracer
153+
msgNotificationCodec
154+
channel
155+
(localMsgNotificationServerPeer $
156+
localMsgNotificationServer nullTracer (pure ()) maxMsgs mempoolReader)
157+
158+
159+
data Protocols appType ntcAddr bytes m a b =
160+
Protocols {
161+
msgSubmissionProtocol :: !(RunMiniProtocolWithMinimalCtx appType ntcAddr bytes m a b)
162+
, msgNotificationProtocol :: !(RunMiniProtocolWithMinimalCtx appType ntcAddr bytes m a b)
163+
}
164+
165+
responders
166+
:: Apps ntcAddr m a
167+
-> NodeToClientVersion
168+
-> NodeToClientVersionData
169+
-> OuroborosApplicationWithMinimalCtx Mx.ResponderMode ntcAddr ByteString m Void a
170+
responders Apps {
171+
aLocalMsgSubmission
172+
, aLocalMsgNotification
173+
}
174+
version =
175+
nodeToClientProtocols
176+
Protocols {
177+
msgSubmissionProtocol =
178+
ResponderProtocolOnly $
179+
MiniProtocolCb $ aLocalMsgSubmission version
180+
, msgNotificationProtocol =
181+
ResponderProtocolOnly $
182+
MiniProtocolCb $ aLocalMsgNotification version
183+
}
184+
version
185+
186+
187+
-- | Make an 'OuroborosApplication' for the bundle of mini-protocols that
188+
-- make up the overall node-to-client protocol.
189+
--
190+
-- This function specifies the wire format protocol numbers as well as the
191+
-- protocols that run for each 'NodeToClientVersion'.
192+
--
193+
-- They are chosen to not overlap with the node to node protocol numbers.
194+
-- This is not essential for correctness, but is helpful to allow a single
195+
-- shared implementation of tools that can analyse both protocols, e.g.
196+
-- wireshark plugins.
197+
--
198+
nodeToClientProtocols
199+
:: Protocols appType ntcAddr bytes m a b
200+
-> NodeToClientVersion
201+
-> NodeToClientVersionData
202+
-> OuroborosApplicationWithMinimalCtx appType ntcAddr bytes m a b
203+
nodeToClientProtocols protocols _version _versionData =
204+
OuroborosApplication $
205+
case protocols of
206+
Protocols {
207+
msgSubmissionProtocol
208+
, msgNotificationProtocol
209+
} ->
210+
[ localMsgSubmission msgSubmissionProtocol
211+
, localMsgNotification msgNotificationProtocol
212+
]
213+
where
214+
localMsgSubmission protocol = MiniProtocol {
215+
miniProtocolNum = MiniProtocolNum 14,
216+
miniProtocolStart = StartOnDemand,
217+
miniProtocolLimits = maximumMiniProtocolLimits,
218+
miniProtocolRun = protocol
219+
}
220+
localMsgNotification protocol = MiniProtocol {
221+
miniProtocolNum = MiniProtocolNum 15,
222+
miniProtocolStart = StartOnDemand,
223+
miniProtocolLimits = maximumMiniProtocolLimits,
224+
miniProtocolRun = protocol
225+
}
226+
maximumMiniProtocolLimits =
227+
MiniProtocolLimits {
228+
maximumIngressQueue = 0xffffffff
229+
}

0 commit comments

Comments
 (0)