Skip to content

Commit dcd0e7a

Browse files
dmq: NtC applications
1 parent 1d2af32 commit dcd0e7a

File tree

7 files changed

+310
-18
lines changed

7 files changed

+310
-18
lines changed

decentralized-message-queue/app/Main.hs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ import DMQ.Configuration.Topology (readTopologyFileOrError)
1616
import DMQ.Diffusion.Applications (diffusionApplications)
1717
import DMQ.Diffusion.Arguments (diffusionArguments)
1818
import DMQ.Diffusion.NodeKernel (withNodeKernel)
19+
import DMQ.NodeToClient qualified as NtC
1920
import DMQ.NodeToNode (dmqCodecs, dmqLimitsAndTimeouts, mapNtNDMQtoOuroboros,
2021
ntnApps)
2122

@@ -53,6 +54,9 @@ runDMQ cliopts@CLIOptions {
5354
(decodeRemoteAddress (mapNtNDMQtoOuroboros maxBound)))
5455
dmqLimitsAndTimeouts
5556
defaultSigDecisionPolicy
57+
dmqNtCApps =
58+
NtC.ntcApps nodeKernel
59+
NtC.dmqCodecs
5660
dmqDiffusionArguments =
5761
diffusionArguments debugTracer
5862
debugTracer
@@ -62,6 +66,7 @@ runDMQ cliopts@CLIOptions {
6266
dmqDiffusionConfiguration
6367
dmqLimitsAndTimeouts
6468
dmqNtNApps
69+
dmqNtCApps
6570
(policy policyRng)
6671

6772
Diffusion.run dmqDiffusionArguments

decentralized-message-queue/decentralized-message-queue.cabal

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,8 @@ library
5454
DMQ.Diffusion.PeerSelection
5555
DMQ.NodeToClient
5656
DMQ.NodeToNode
57+
DMQ.NtC_Applications.LocalMsgSubmission
58+
DMQ.NtC_Applications.LocalMsgNotification
5759
DMQ.Protocol.LocalMsgNotification.Client
5860
DMQ.Protocol.LocalMsgNotification.Codec
5961
DMQ.Protocol.LocalMsgNotification.Server
@@ -85,6 +87,7 @@ library
8587
ouroboros-network-framework ^>=0.19,
8688
ouroboros-network-protocols ^>=0.15,
8789
random ^>=1.2,
90+
serialise,
8891
text >=1.2.4 && <2.2,
8992
time ^>= 1.12,
9093
typed-protocols:{typed-protocols, cborg} ^>=1.0,

decentralized-message-queue/src/DMQ/Diffusion/Applications.hs

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -10,11 +10,11 @@ import DMQ.NodeToClient (NodeToClientVersion, NodeToClientVersionData,
1010
stdVersionDataNTC)
1111
import DMQ.NodeToNode (NodeToNodeVersion, NodeToNodeVersionData,
1212
stdVersionDataNTN)
13+
import DMQ.NodeToClient qualified as NTC
1314
import DMQ.NodeToNode qualified as NTN
1415

1516
import Ouroboros.Network.Diffusion.Types qualified as Diffusion
1617
import Ouroboros.Network.ExitPolicy (RepromoteDelay (..))
17-
import Ouroboros.Network.Mux (OuroborosApplication (..))
1818
import Ouroboros.Network.PeerSelection.Governor.Types (PeerSelectionPolicy)
1919
import Ouroboros.Network.Protocol.Handshake.Version (combineVersions,
2020
simpleSingletonVersions)
@@ -27,6 +27,7 @@ diffusionApplications
2727
-> Diffusion.Configuration extraFlags m ntnFd ntnAddr ntcFd ntcAddr
2828
-> NTN.LimitsAndTimeouts ntnAddr
2929
-> NTN.Apps ntnAddr m a ()
30+
-> NTC.Apps ntcAddr m ()
3031
-> PeerSelectionPolicy ntnAddr m
3132
-> Diffusion.Applications ntnAddr NodeToNodeVersion NodeToNodeVersionData
3233
ntcAddr NodeToClientVersion NodeToClientVersionData
@@ -44,6 +45,7 @@ diffusionApplications
4445
}
4546
ntnLimitsAndTimeouts
4647
ntnApps
48+
ntcApps
4749
peerSelectionPolicy =
4850
Diffusion.Applications {
4951
daApplicationInitiatorMode =
@@ -67,11 +69,7 @@ diffusionApplications
6769
[ simpleSingletonVersions
6870
version
6971
(stdVersionDataNTC dmqcNetworkMagic)
70-
(\_versionData ->
71-
OuroborosApplication
72-
[
73-
]
74-
)
72+
(NTC.responders ntcApps version)
7573
| version <- [minBound..maxBound]
7674
]
7775
, daRethrowPolicy = muxErrorRethrowPolicy

decentralized-message-queue/src/DMQ/Diffusion/NodeKernel.hs

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -39,16 +39,16 @@ import DMQ.Protocol.SigSubmission.Type (Sig (..), SigId)
3939
data NodeKernel ntnAddr m =
4040
NodeKernel {
4141
-- | The fetch client registry, used for the keep alive clients.
42-
fetchClientRegistry :: FetchClientRegistry (ConnectionId ntnAddr) () () m
42+
fetchClientRegistry :: !(FetchClientRegistry (ConnectionId ntnAddr) () () m)
4343

4444
-- | Read the current peer sharing registry, used for interacting with
4545
-- the PeerSharing protocol
46-
, peerSharingRegistry :: PeerSharingRegistry ntnAddr m
47-
, peerSharingAPI :: PeerSharingAPI ntnAddr StdGen m
48-
, mempool :: Mempool m Sig
49-
, sigChannelVar :: TxChannelsVar m ntnAddr SigId Sig
50-
, sigMempoolSem :: TxMempoolSem m
51-
, sigSharedTxStateVar :: SharedTxStateVar m ntnAddr SigId Sig
46+
, peerSharingRegistry :: !(PeerSharingRegistry ntnAddr m)
47+
, peerSharingAPI :: !(PeerSharingAPI ntnAddr StdGen m)
48+
, mempool :: !(Mempool m Sig)
49+
, sigChannelVar :: !(TxChannelsVar m ntnAddr SigId Sig)
50+
, sigMempoolSem :: !(TxMempoolSem m)
51+
, sigSharedTxStateVar :: !(SharedTxStateVar m ntnAddr SigId Sig)
5252
}
5353

5454
newNodeKernel :: ( MonadLabelledSTM m

decentralized-message-queue/src/DMQ/NodeToClient.hs

Lines changed: 181 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
{-# LANGUAGE DataKinds #-}
12
{-# LANGUAGE DeriveAnyClass #-}
23
{-# LANGUAGE DeriveGeneric #-}
34
{-# LANGUAGE LambdaCase #-}
@@ -6,28 +7,51 @@
67
module DMQ.NodeToClient where
78

89
import Codec.CBOR.Term qualified as CBOR
10+
import Codec.Serialise (Serialise (decode, encode))
911
import Control.DeepSeq (NFData)
1012
import Control.Monad ((>=>))
1113
import Data.Bits (Bits (..))
14+
import Data.ByteString.Lazy (ByteString)
1215
import Data.Text (Text)
1316
import Data.Text qualified as T
17+
import Data.Void
1418
import GHC.Generics (Generic)
1519

20+
import Control.Concurrent.Class.MonadSTM
21+
import Control.Monad.Class.MonadFork
1622
import Control.Monad.Class.MonadST (MonadST)
23+
import Control.Monad.Class.MonadThrow
1724
import Control.Tracer (Tracer, nullTracer)
1825

1926
import Network.Mux qualified as Mx
27+
import Network.TypedProtocol.Codec hiding (encode, decode)
28+
import Network.TypedProtocol.Codec.CBOR qualified as CBOR
29+
30+
import DMQ.Diffusion.NodeKernel
31+
import DMQ.NtC_Applications.LocalMsgSubmission
32+
import DMQ.NtC_Applications.LocalMsgNotification
33+
import DMQ.Protocol.LocalMsgNotification.Codec
34+
import DMQ.Protocol.LocalMsgNotification.Server
35+
import DMQ.Protocol.LocalMsgNotification.Type
36+
import DMQ.Protocol.LocalMsgSubmission.Codec
37+
import DMQ.Protocol.LocalMsgSubmission.Server
38+
import DMQ.Protocol.LocalMsgSubmission.Type
39+
import DMQ.Protocol.SigSubmission.Codec
40+
import DMQ.Protocol.SigSubmission.Type
2041

2142
import Ouroboros.Network.CodecCBORTerm (CodecCBORTerm (..))
22-
import Ouroboros.Network.ConnectionId (ConnectionId)
23-
import Ouroboros.Network.Driver.Simple (TraceSendRecv)
43+
import Ouroboros.Network.Context
44+
import Ouroboros.Network.Driver.Simple
2445
import Ouroboros.Network.Handshake.Acceptable (Acceptable (..))
2546
import Ouroboros.Network.Handshake.Queryable (Queryable (..))
2647
import Ouroboros.Network.Magic (NetworkMagic (..))
48+
import Ouroboros.Network.Mux
2749
import Ouroboros.Network.Protocol.Handshake (Accept (..), Handshake,
2850
HandshakeArguments (..))
2951
import Ouroboros.Network.Protocol.Handshake.Codec (cborTermVersionDataCodec,
3052
codecHandshake, noTimeLimitsHandshake)
53+
import Ouroboros.Network.TxSubmission.Mempool.Simple qualified as Mempool
54+
3155

3256
data NodeToClientVersion =
3357
NodeToClientV_1
@@ -106,9 +130,6 @@ nodeToClientCodecCBORTerm _v = CodecCBORTerm {encodeTerm, decodeTerm}
106130
decoder x query | x >= 0 && x <= 0xffffffff = Right (NodeToClientVersionData (NetworkMagic $ fromIntegral x) query)
107131
| otherwise = Left $ T.pack $ "networkMagic out of bound: " <> show x
108132

109-
data Protocols =
110-
Protocols {
111-
}
112133

113134
type HandshakeTr ntcAddr = Mx.WithBearer (ConnectionId ntcAddr) (TraceSendRecv (Handshake NodeToClientVersion CBOR.Term))
114135

@@ -139,3 +160,158 @@ stdVersionDataNTC networkMagic =
139160
{ networkMagic
140161
, query = False
141162
}
163+
164+
165+
-- TODO: delete these aliases
166+
type LocalMsgSubmission' = LocalMsgSubmission Sig Int
167+
type LocalMsgNotification' = LocalMsgNotification Sig
168+
169+
data Codecs m =
170+
Codecs {
171+
msgSubmissionCodec
172+
:: !(Codec LocalMsgSubmission'
173+
CBOR.DeserialiseFailure m ByteString)
174+
, msgNotificationCodec
175+
:: !(Codec LocalMsgNotification'
176+
CBOR.DeserialiseFailure m ByteString)
177+
}
178+
179+
dmqCodecs :: MonadST m
180+
=> Codecs m
181+
dmqCodecs =
182+
Codecs {
183+
msgSubmissionCodec = codecLocalMsgSubmission encodeSig decodeSig encode decode
184+
, msgNotificationCodec = codecLocalMsgNotification encodeSig decodeSig
185+
}
186+
187+
188+
-- | A node-to-client application
189+
--
190+
type App ntcAddr m a =
191+
NodeToClientVersion
192+
-> ResponderContext ntcAddr
193+
-> Mx.Channel m ByteString
194+
-> m (a, Maybe ByteString)
195+
196+
197+
data Apps ntcAddr m a =
198+
Apps {
199+
-- | Start a sig-submission client
200+
aLocalMsgSubmission :: !(App ntcAddr m a)
201+
202+
-- | Start a sig-submission server
203+
, aLocalMsgNotification :: !(App ntcAddr m a)
204+
}
205+
206+
207+
-- | Construct applications for the node-to-client protocols
208+
--
209+
ntcApps
210+
:: (MonadThrow m, MonadThread m, MonadSTM m)
211+
=> NodeKernel ntnAddr m
212+
-> Codecs m
213+
-> Apps ntcAddr m ()
214+
ntcApps NodeKernel { mempool }
215+
Codecs { msgSubmissionCodec, msgNotificationCodec } =
216+
Apps {
217+
aLocalMsgSubmission
218+
, aLocalMsgNotification
219+
}
220+
where
221+
sigSize :: Sig -> SizeInBytes
222+
sigSize _ = 0 -- TODO
223+
224+
mempoolReader = Mempool.getReader sigId sigSize mempool
225+
mempoolWriter = Mempool.getWriter sigId (const True) mempool
226+
227+
aLocalMsgSubmission _version _ctx channel = do
228+
labelThisThread "LocalMsgSubmissionServer"
229+
runPeer
230+
nullTracer
231+
msgSubmissionCodec
232+
channel
233+
(localMsgSubmissionServerPeer $
234+
localMsgSubmissionServer undefined mempoolWriter)
235+
236+
aLocalMsgNotification _version _ctx channel = do
237+
labelThisThread "LocalMsgNotificationServer"
238+
runPeer
239+
nullTracer
240+
msgNotificationCodec
241+
channel
242+
(localMsgNotificationServerPeer $
243+
localMsgNotificationServer undefined nullTracer undefined mempoolReader)
244+
245+
246+
data Protocols appType ntcAddr bytes m a b =
247+
Protocols {
248+
msgSubmissionProtocol :: !(RunMiniProtocolWithMinimalCtx appType ntcAddr bytes m a b)
249+
, msgNotificationProtocol :: !(RunMiniProtocolWithMinimalCtx appType ntcAddr bytes m a b)
250+
}
251+
252+
responders
253+
:: Apps ntcAddr m a
254+
-> NodeToClientVersion
255+
-> NodeToClientVersionData
256+
-> OuroborosApplicationWithMinimalCtx Mx.ResponderMode ntcAddr ByteString m Void a
257+
responders Apps {
258+
aLocalMsgSubmission
259+
, aLocalMsgNotification
260+
}
261+
version =
262+
nodeToClientProtocols
263+
Protocols {
264+
msgSubmissionProtocol =
265+
ResponderProtocolOnly $
266+
MiniProtocolCb $ aLocalMsgSubmission version
267+
, msgNotificationProtocol =
268+
ResponderProtocolOnly $
269+
MiniProtocolCb $ aLocalMsgNotification version
270+
}
271+
version
272+
273+
274+
-- | Make an 'OuroborosApplication' for the bundle of mini-protocols that
275+
-- make up the overall node-to-client protocol.
276+
--
277+
-- This function specifies the wire format protocol numbers as well as the
278+
-- protocols that run for each 'NodeToClientVersion'.
279+
--
280+
-- They are chosen to not overlap with the node to node protocol numbers.
281+
-- This is not essential for correctness, but is helpful to allow a single
282+
-- shared implementation of tools that can analyse both protocols, e.g.
283+
-- wireshark plugins.
284+
--
285+
nodeToClientProtocols
286+
:: Protocols appType ntcAddr bytes m a b
287+
-> NodeToClientVersion
288+
-> NodeToClientVersionData
289+
-> OuroborosApplicationWithMinimalCtx appType ntcAddr bytes m a b
290+
nodeToClientProtocols protocols _version _versionData =
291+
OuroborosApplication $
292+
case protocols of
293+
Protocols {
294+
msgSubmissionProtocol
295+
, msgNotificationProtocol
296+
} ->
297+
[ localMsgSubmission msgSubmissionProtocol
298+
, localMsgNotification msgNotificationProtocol
299+
]
300+
where
301+
-- TODO: verify protocol numbers
302+
localMsgSubmission protocol = MiniProtocol {
303+
miniProtocolNum = MiniProtocolNum 10,
304+
miniProtocolStart = StartOnDemand,
305+
miniProtocolLimits = maximumMiniProtocolLimits,
306+
miniProtocolRun = protocol
307+
}
308+
localMsgNotification protocol = MiniProtocol {
309+
miniProtocolNum = MiniProtocolNum 11,
310+
miniProtocolStart = StartOnDemand,
311+
miniProtocolLimits = maximumMiniProtocolLimits,
312+
miniProtocolRun = protocol
313+
}
314+
maximumMiniProtocolLimits =
315+
MiniProtocolLimits {
316+
maximumIngressQueue = 0xffffffff
317+
}

0 commit comments

Comments
 (0)