Skip to content

Commit b35f1eb

Browse files
dmq: NtC applications
1 parent 01a6e39 commit b35f1eb

File tree

7 files changed

+352
-27
lines changed

7 files changed

+352
-27
lines changed

decentralized-message-queue/app/Main.hs

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,14 +17,19 @@ import DMQ.Configuration.CLIOptions (parseCLIOptions)
1717
import DMQ.Configuration.Topology (readTopologyFileOrError)
1818
import DMQ.Diffusion.Applications (diffusionApplications)
1919
import DMQ.Diffusion.Arguments
20-
import DMQ.Diffusion.NodeKernel (withNodeKernel)
20+
import DMQ.Diffusion.NodeKernel (mempool, withNodeKernel)
21+
import DMQ.NodeToClient qualified as NtC
2122
import DMQ.NodeToNode (dmqCodecs, dmqLimitsAndTimeouts, ntnApps)
23+
import DMQ.Protocol.LocalMsgSubmission.Codec
24+
import DMQ.Protocol.SigSubmission.Codec
25+
import DMQ.Protocol.SigSubmission.Type (Sig (..))
2226
import DMQ.Tracer
2327

2428
import DMQ.Diffusion.PeerSelection (policy)
2529
import Ouroboros.Network.Diffusion qualified as Diffusion
2630
import Ouroboros.Network.PeerSelection.PeerSharing.Codec (decodeRemoteAddress,
2731
encodeRemoteAddress)
32+
import Ouroboros.Network.TxSubmission.Mempool.Simple qualified as Mempool
2833

2934
main :: IO ()
3035
main = void . runDMQ =<< execParser opts
@@ -78,6 +83,13 @@ runDMQ commandLineConfig = do
7883
(decodeRemoteAddress maxBound))
7984
dmqLimitsAndTimeouts
8085
defaultSigDecisionPolicy
86+
dmqNtCApps =
87+
let sigSize _ = 0 -- TODO
88+
maxMsgs = 1000 -- TODO: make this dynamic?
89+
mempoolReader = Mempool.getReader sigId sigSize (mempool nodeKernel)
90+
mempoolWriter = Mempool.getWriter sigId (const True) (mempool nodeKernel)
91+
in NtC.ntcApps mempoolReader mempoolWriter maxMsgs
92+
(NtC.dmqCodecs encodeSig decodeSig encodeReject decodeReject)
8193
dmqDiffusionArguments =
8294
diffusionArguments (if handshakeTracer
8395
then WithEventType "Handshake" >$< tracer
@@ -91,6 +103,7 @@ runDMQ commandLineConfig = do
91103
dmqDiffusionConfiguration
92104
dmqLimitsAndTimeouts
93105
dmqNtNApps
106+
dmqNtCApps
94107
(policy policyRng)
95108

96109
Diffusion.run dmqDiffusionArguments

decentralized-message-queue/decentralized-message-queue.cabal

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,8 +56,11 @@ library
5656
DMQ.NodeToClient.Version
5757
DMQ.NodeToNode
5858
DMQ.NodeToNode.Version
59+
DMQ.NodeToClient.LocalMsgSubmission
60+
DMQ.NodeToClient.LocalMsgNotification
5961
DMQ.Protocol.LocalMsgNotification.Client
6062
DMQ.Protocol.LocalMsgNotification.Codec
63+
DMQ.Protocol.LocalMsgNotification.Examples
6164
DMQ.Protocol.LocalMsgNotification.Server
6265
DMQ.Protocol.LocalMsgNotification.Type
6366
DMQ.Protocol.LocalMsgSubmission.Client
@@ -143,8 +146,10 @@ test-suite dmq-test
143146
QuickCheck,
144147
base >=4.14 && <4.22,
145148
bytestring,
149+
contra-tracer,
146150
decentralized-message-queue,
147151
ouroboros-network-api,
152+
ouroboros-network-framework,
148153
ouroboros-network-protocols:testlib,
149154
ouroboros-network-testing,
150155
quickcheck-instances,
@@ -154,7 +159,8 @@ test-suite dmq-test
154159
time,
155160
typed-protocols:{typed-protocols, codec-properties},
156161
with-utf8,
157-
io-classes
162+
io-classes,
163+
io-sim
158164

159165
ghc-options:
160166
-fno-ignore-asserts

decentralized-message-queue/src/DMQ/Diffusion/Applications.hs

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -8,13 +8,13 @@ import DMQ.Configuration
88
import DMQ.Diffusion.NodeKernel (NodeKernel (..))
99
import DMQ.NodeToClient (NodeToClientVersion, NodeToClientVersionData,
1010
stdVersionDataNTC)
11+
import DMQ.NodeToClient qualified as NTC
1112
import DMQ.NodeToNode (NodeToNodeVersion, NodeToNodeVersionData,
1213
stdVersionDataNTN)
1314
import DMQ.NodeToNode qualified as NTN
1415

1516
import Ouroboros.Network.Diffusion.Types qualified as Diffusion
1617
import Ouroboros.Network.ExitPolicy (RepromoteDelay (..))
17-
import Ouroboros.Network.Mux (OuroborosApplication (..))
1818
import Ouroboros.Network.PeerSelection.Governor.Types (PeerSelectionPolicy)
1919
import Ouroboros.Network.Protocol.Handshake.Version (combineVersions,
2020
simpleSingletonVersions)
@@ -27,6 +27,7 @@ diffusionApplications
2727
-> Diffusion.Configuration NoExtraFlags m ntnFd ntnAddr ntcFd ntcAddr
2828
-> NTN.LimitsAndTimeouts ntnAddr
2929
-> NTN.Apps ntnAddr m a ()
30+
-> NTC.Apps ntcAddr m ()
3031
-> PeerSelectionPolicy ntnAddr m
3132
-> Diffusion.Applications ntnAddr NodeToNodeVersion NodeToNodeVersionData
3233
ntcAddr NodeToClientVersion NodeToClientVersionData
@@ -44,6 +45,7 @@ diffusionApplications
4445
}
4546
ntnLimitsAndTimeouts
4647
ntnApps
48+
ntcApps
4749
peerSelectionPolicy =
4850
Diffusion.Applications {
4951
daApplicationInitiatorMode =
@@ -67,11 +69,7 @@ diffusionApplications
6769
[ simpleSingletonVersions
6870
version
6971
(stdVersionDataNTC networkMagic)
70-
(\_versionData ->
71-
OuroborosApplication
72-
[
73-
]
74-
)
72+
(NTC.responders ntcApps version)
7573
| version <- [minBound..maxBound]
7674
]
7775
, daRethrowPolicy = muxErrorRethrowPolicy

decentralized-message-queue/src/DMQ/Diffusion/NodeKernel.hs

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -39,16 +39,16 @@ import DMQ.Protocol.SigSubmission.Type (Sig (..), SigId)
3939
data NodeKernel ntnAddr m =
4040
NodeKernel {
4141
-- | The fetch client registry, used for the keep alive clients.
42-
fetchClientRegistry :: FetchClientRegistry (ConnectionId ntnAddr) () () m
42+
fetchClientRegistry :: !(FetchClientRegistry (ConnectionId ntnAddr) () () m)
4343

4444
-- | Read the current peer sharing registry, used for interacting with
4545
-- the PeerSharing protocol
46-
, peerSharingRegistry :: PeerSharingRegistry ntnAddr m
47-
, peerSharingAPI :: PeerSharingAPI ntnAddr StdGen m
48-
, mempool :: Mempool m Sig
49-
, sigChannelVar :: TxChannelsVar m ntnAddr SigId Sig
50-
, sigMempoolSem :: TxMempoolSem m
51-
, sigSharedTxStateVar :: SharedTxStateVar m ntnAddr SigId Sig
46+
, peerSharingRegistry :: !(PeerSharingRegistry ntnAddr m)
47+
, peerSharingAPI :: !(PeerSharingAPI ntnAddr StdGen m)
48+
, mempool :: !(Mempool m Sig)
49+
, sigChannelVar :: !(TxChannelsVar m ntnAddr SigId Sig)
50+
, sigMempoolSem :: !(TxMempoolSem m)
51+
, sigSharedTxStateVar :: !(SharedTxStateVar m ntnAddr SigId Sig)
5252
}
5353

5454
newNodeKernel :: ( MonadLabelledSTM m

decentralized-message-queue/src/DMQ/NodeToClient.hs

Lines changed: 186 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,35 +1,59 @@
1-
{-# LANGUAGE NamedFieldPuns #-}
1+
{-# LANGUAGE DataKinds #-}
2+
{-# LANGUAGE FlexibleContexts #-}
3+
{-# LANGUAGE RankNTypes #-}
24

35
module DMQ.NodeToClient
4-
( LocalAddress (..)
5-
, module DMQ.NodeToClient.Version
6+
( module DMQ.NodeToClient.Version
67
, Protocols (..)
78
, HandshakeTr
9+
, Apps
10+
, dmqCodecs
11+
, ntcApps
812
, ntcHandshakeArguments
13+
, responders
914
) where
1015

16+
import Data.ByteString.Lazy (ByteString)
17+
import Data.Void
18+
import Data.Word
19+
20+
import Control.Concurrent.Class.MonadSTM
21+
import Control.Monad.Class.MonadFork
1122
import Control.Monad.Class.MonadST (MonadST)
23+
import Control.Monad.Class.MonadThrow
1224
import Control.Tracer (Tracer, nullTracer)
1325

26+
import Codec.CBOR.Decoding qualified as CBOR
27+
import Codec.CBOR.Encoding qualified as CBOR
1428
import Codec.CBOR.Term qualified as CBOR
1529

1630
import Network.Mux qualified as Mx
31+
import Network.TypedProtocol.Codec hiding (decode, encode)
32+
import Network.TypedProtocol.Codec.CBOR qualified as CBOR
33+
34+
import DMQ.NodeToClient.LocalMsgNotification
35+
import DMQ.NodeToClient.LocalMsgSubmission
36+
import DMQ.NodeToClient.Version
37+
import DMQ.Protocol.LocalMsgNotification.Codec
38+
import DMQ.Protocol.LocalMsgNotification.Server
39+
import DMQ.Protocol.LocalMsgNotification.Type
40+
import DMQ.Protocol.LocalMsgSubmission.Codec
41+
import DMQ.Protocol.LocalMsgSubmission.Server
42+
import DMQ.Protocol.LocalMsgSubmission.Type
1743

18-
import Ouroboros.Network.ConnectionId (ConnectionId)
19-
import Ouroboros.Network.Driver.Simple (TraceSendRecv)
44+
import Ouroboros.Network.Context
45+
import Ouroboros.Network.Driver.Simple
2046
import Ouroboros.Network.Handshake.Acceptable (Acceptable (..))
2147
import Ouroboros.Network.Handshake.Queryable (Queryable (..))
48+
import Ouroboros.Network.Mux
2249
import Ouroboros.Network.Protocol.Handshake (Handshake, HandshakeArguments (..))
2350
import Ouroboros.Network.Protocol.Handshake.Codec (cborTermVersionDataCodec,
2451
codecHandshake, noTimeLimitsHandshake)
25-
import Ouroboros.Network.Snocket (LocalAddress (..))
26-
27-
import DMQ.NodeToClient.Version
28-
52+
import Ouroboros.Network.TxSubmission.Inbound.V2.Types
53+
(TxSubmissionMempoolWriter)
54+
import Ouroboros.Network.TxSubmission.Mempool.Reader
55+
import Ouroboros.Network.Util.ShowProxy
2956

30-
data Protocols =
31-
Protocols {
32-
}
3357

3458
type HandshakeTr ntcAddr = Mx.WithBearer (ConnectionId ntcAddr) (TraceSendRecv (Handshake NodeToClientVersion CBOR.Term))
3559

@@ -53,3 +77,153 @@ ntcHandshakeArguments tracer =
5377
, haQueryVersion = queryVersion
5478
, haTimeLimits = noTimeLimitsHandshake
5579
}
80+
81+
82+
data Codecs m sig reject =
83+
Codecs {
84+
msgSubmissionCodec
85+
:: !(Codec (LocalMsgSubmission sig reject)
86+
CBOR.DeserialiseFailure m ByteString)
87+
, msgNotificationCodec
88+
:: !(Codec (LocalMsgNotification sig)
89+
CBOR.DeserialiseFailure m ByteString)
90+
}
91+
92+
dmqCodecs :: MonadST m
93+
=> (sig -> CBOR.Encoding)
94+
-> (forall s. CBOR.Decoder s sig)
95+
-> (SigMempoolFail reason -> CBOR.Encoding)
96+
-> (forall s. CBOR.Decoder s (SigMempoolFail reason))
97+
-> Codecs m sig reason
98+
dmqCodecs encodeSig decodeSig encodeReject' decodeReject' =
99+
Codecs {
100+
msgSubmissionCodec = codecLocalMsgSubmission encodeSig decodeSig encodeReject' decodeReject'
101+
, msgNotificationCodec = codecLocalMsgNotification encodeSig decodeSig
102+
}
103+
104+
105+
-- | A node-to-client application
106+
--
107+
type App ntcAddr m a =
108+
NodeToClientVersion
109+
-> ResponderContext ntcAddr
110+
-> Mx.Channel m ByteString
111+
-> m (a, Maybe ByteString)
112+
113+
114+
data Apps ntcAddr m a =
115+
Apps {
116+
-- | Start a sig-submission client
117+
aLocalMsgSubmission :: !(App ntcAddr m a)
118+
119+
-- | Start a sig-submission server
120+
, aLocalMsgNotification :: !(App ntcAddr m a)
121+
}
122+
123+
124+
-- | Construct applications for the node-to-client protocols
125+
--
126+
ntcApps
127+
:: (MonadThrow m, MonadThread m, MonadSTM m, ShowProxy (SigMempoolFail reason), ShowProxy sig)
128+
=> TxSubmissionMempoolReader msgid sig idx m
129+
-> TxSubmissionMempoolWriter msgid sig idx m
130+
-> Word16
131+
-> Codecs m sig reason
132+
-> Apps ntcAddr m ()
133+
ntcApps mempoolReader mempoolWriter maxMsgs
134+
Codecs { msgSubmissionCodec, msgNotificationCodec } =
135+
Apps {
136+
aLocalMsgSubmission
137+
, aLocalMsgNotification
138+
}
139+
where
140+
aLocalMsgSubmission _version _ctx channel = do
141+
labelThisThread "LocalMsgSubmissionServer"
142+
runPeer
143+
nullTracer
144+
msgSubmissionCodec
145+
channel
146+
(localMsgSubmissionServerPeer $
147+
localMsgSubmissionServer nullTracer mempoolWriter)
148+
149+
aLocalMsgNotification _version _ctx channel = do
150+
labelThisThread "LocalMsgNotificationServer"
151+
runPeer
152+
nullTracer
153+
msgNotificationCodec
154+
channel
155+
(localMsgNotificationServerPeer $
156+
localMsgNotificationServer nullTracer (pure ()) maxMsgs mempoolReader)
157+
158+
159+
data Protocols appType ntcAddr bytes m a b =
160+
Protocols {
161+
msgSubmissionProtocol :: !(RunMiniProtocolWithMinimalCtx appType ntcAddr bytes m a b)
162+
, msgNotificationProtocol :: !(RunMiniProtocolWithMinimalCtx appType ntcAddr bytes m a b)
163+
}
164+
165+
responders
166+
:: Apps ntcAddr m a
167+
-> NodeToClientVersion
168+
-> NodeToClientVersionData
169+
-> OuroborosApplicationWithMinimalCtx Mx.ResponderMode ntcAddr ByteString m Void a
170+
responders Apps {
171+
aLocalMsgSubmission
172+
, aLocalMsgNotification
173+
}
174+
version =
175+
nodeToClientProtocols
176+
Protocols {
177+
msgSubmissionProtocol =
178+
ResponderProtocolOnly $
179+
MiniProtocolCb $ aLocalMsgSubmission version
180+
, msgNotificationProtocol =
181+
ResponderProtocolOnly $
182+
MiniProtocolCb $ aLocalMsgNotification version
183+
}
184+
version
185+
186+
187+
-- | Make an 'OuroborosApplication' for the bundle of mini-protocols that
188+
-- make up the overall node-to-client protocol.
189+
--
190+
-- This function specifies the wire format protocol numbers as well as the
191+
-- protocols that run for each 'NodeToClientVersion'.
192+
--
193+
-- They are chosen to not overlap with the node to node protocol numbers.
194+
-- This is not essential for correctness, but is helpful to allow a single
195+
-- shared implementation of tools that can analyse both protocols, e.g.
196+
-- wireshark plugins.
197+
--
198+
nodeToClientProtocols
199+
:: Protocols appType ntcAddr bytes m a b
200+
-> NodeToClientVersion
201+
-> NodeToClientVersionData
202+
-> OuroborosApplicationWithMinimalCtx appType ntcAddr bytes m a b
203+
nodeToClientProtocols protocols _version _versionData =
204+
OuroborosApplication $
205+
case protocols of
206+
Protocols {
207+
msgSubmissionProtocol
208+
, msgNotificationProtocol
209+
} ->
210+
[ localMsgSubmission msgSubmissionProtocol
211+
, localMsgNotification msgNotificationProtocol
212+
]
213+
where
214+
localMsgSubmission protocol = MiniProtocol {
215+
miniProtocolNum = MiniProtocolNum 14,
216+
miniProtocolStart = StartOnDemand,
217+
miniProtocolLimits = maximumMiniProtocolLimits,
218+
miniProtocolRun = protocol
219+
}
220+
localMsgNotification protocol = MiniProtocol {
221+
miniProtocolNum = MiniProtocolNum 15,
222+
miniProtocolStart = StartOnDemand,
223+
miniProtocolLimits = maximumMiniProtocolLimits,
224+
miniProtocolRun = protocol
225+
}
226+
maximumMiniProtocolLimits =
227+
MiniProtocolLimits {
228+
maximumIngressQueue = 0xffffffff
229+
}

0 commit comments

Comments
 (0)