Skip to content

Commit cf95d94

Browse files
dmq: NtC applications
1 parent 1d2af32 commit cf95d94

File tree

7 files changed

+311
-18
lines changed

7 files changed

+311
-18
lines changed

decentralized-message-queue/app/Main.hs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ import DMQ.Configuration.Topology (readTopologyFileOrError)
1616
import DMQ.Diffusion.Applications (diffusionApplications)
1717
import DMQ.Diffusion.Arguments (diffusionArguments)
1818
import DMQ.Diffusion.NodeKernel (withNodeKernel)
19+
import DMQ.NodeToClient qualified as NtC
1920
import DMQ.NodeToNode (dmqCodecs, dmqLimitsAndTimeouts, mapNtNDMQtoOuroboros,
2021
ntnApps)
2122

@@ -53,6 +54,9 @@ runDMQ cliopts@CLIOptions {
5354
(decodeRemoteAddress (mapNtNDMQtoOuroboros maxBound)))
5455
dmqLimitsAndTimeouts
5556
defaultSigDecisionPolicy
57+
dmqNtCApps =
58+
NtC.ntcApps nodeKernel
59+
NtC.dmqCodecs
5660
dmqDiffusionArguments =
5761
diffusionArguments debugTracer
5862
debugTracer
@@ -62,6 +66,7 @@ runDMQ cliopts@CLIOptions {
6266
dmqDiffusionConfiguration
6367
dmqLimitsAndTimeouts
6468
dmqNtNApps
69+
dmqNtCApps
6570
(policy policyRng)
6671

6772
Diffusion.run dmqDiffusionArguments

decentralized-message-queue/decentralized-message-queue.cabal

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,8 @@ library
5454
DMQ.Diffusion.PeerSelection
5555
DMQ.NodeToClient
5656
DMQ.NodeToNode
57+
DMQ.NtC_Applications.LocalMsgSubmission
58+
DMQ.NtC_Applications.LocalMsgNotification
5759
DMQ.Protocol.LocalMsgNotification.Client
5860
DMQ.Protocol.LocalMsgNotification.Codec
5961
DMQ.Protocol.LocalMsgNotification.Server
@@ -85,6 +87,7 @@ library
8587
ouroboros-network-framework ^>=0.19,
8688
ouroboros-network-protocols ^>=0.15,
8789
random ^>=1.2,
90+
serialise,
8891
text >=1.2.4 && <2.2,
8992
time ^>= 1.12,
9093
typed-protocols:{typed-protocols, cborg} ^>=1.0,

decentralized-message-queue/src/DMQ/Diffusion/Applications.hs

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -10,11 +10,11 @@ import DMQ.NodeToClient (NodeToClientVersion, NodeToClientVersionData,
1010
stdVersionDataNTC)
1111
import DMQ.NodeToNode (NodeToNodeVersion, NodeToNodeVersionData,
1212
stdVersionDataNTN)
13+
import DMQ.NodeToClient qualified as NTC
1314
import DMQ.NodeToNode qualified as NTN
1415

1516
import Ouroboros.Network.Diffusion.Types qualified as Diffusion
1617
import Ouroboros.Network.ExitPolicy (RepromoteDelay (..))
17-
import Ouroboros.Network.Mux (OuroborosApplication (..))
1818
import Ouroboros.Network.PeerSelection.Governor.Types (PeerSelectionPolicy)
1919
import Ouroboros.Network.Protocol.Handshake.Version (combineVersions,
2020
simpleSingletonVersions)
@@ -27,6 +27,7 @@ diffusionApplications
2727
-> Diffusion.Configuration extraFlags m ntnFd ntnAddr ntcFd ntcAddr
2828
-> NTN.LimitsAndTimeouts ntnAddr
2929
-> NTN.Apps ntnAddr m a ()
30+
-> NTC.Apps ntcAddr m ()
3031
-> PeerSelectionPolicy ntnAddr m
3132
-> Diffusion.Applications ntnAddr NodeToNodeVersion NodeToNodeVersionData
3233
ntcAddr NodeToClientVersion NodeToClientVersionData
@@ -44,6 +45,7 @@ diffusionApplications
4445
}
4546
ntnLimitsAndTimeouts
4647
ntnApps
48+
ntcApps
4749
peerSelectionPolicy =
4850
Diffusion.Applications {
4951
daApplicationInitiatorMode =
@@ -67,11 +69,7 @@ diffusionApplications
6769
[ simpleSingletonVersions
6870
version
6971
(stdVersionDataNTC dmqcNetworkMagic)
70-
(\_versionData ->
71-
OuroborosApplication
72-
[
73-
]
74-
)
72+
(NTC.responders ntcApps version)
7573
| version <- [minBound..maxBound]
7674
]
7775
, daRethrowPolicy = muxErrorRethrowPolicy

decentralized-message-queue/src/DMQ/Diffusion/NodeKernel.hs

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -39,16 +39,16 @@ import DMQ.Protocol.SigSubmission.Type (Sig (..), SigId)
3939
data NodeKernel ntnAddr m =
4040
NodeKernel {
4141
-- | The fetch client registry, used for the keep alive clients.
42-
fetchClientRegistry :: FetchClientRegistry (ConnectionId ntnAddr) () () m
42+
fetchClientRegistry :: !(FetchClientRegistry (ConnectionId ntnAddr) () () m)
4343

4444
-- | Read the current peer sharing registry, used for interacting with
4545
-- the PeerSharing protocol
46-
, peerSharingRegistry :: PeerSharingRegistry ntnAddr m
47-
, peerSharingAPI :: PeerSharingAPI ntnAddr StdGen m
48-
, mempool :: Mempool m Sig
49-
, sigChannelVar :: TxChannelsVar m ntnAddr SigId Sig
50-
, sigMempoolSem :: TxMempoolSem m
51-
, sigSharedTxStateVar :: SharedTxStateVar m ntnAddr SigId Sig
46+
, peerSharingRegistry :: !(PeerSharingRegistry ntnAddr m)
47+
, peerSharingAPI :: !(PeerSharingAPI ntnAddr StdGen m)
48+
, mempool :: !(Mempool m Sig)
49+
, sigChannelVar :: !(TxChannelsVar m ntnAddr SigId Sig)
50+
, sigMempoolSem :: !(TxMempoolSem m)
51+
, sigSharedTxStateVar :: !(SharedTxStateVar m ntnAddr SigId Sig)
5252
}
5353

5454
newNodeKernel :: ( MonadLabelledSTM m

decentralized-message-queue/src/DMQ/NodeToClient.hs

Lines changed: 182 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
{-# LANGUAGE DataKinds #-}
12
{-# LANGUAGE DeriveAnyClass #-}
23
{-# LANGUAGE DeriveGeneric #-}
34
{-# LANGUAGE LambdaCase #-}
@@ -6,28 +7,51 @@
67
module DMQ.NodeToClient where
78

89
import Codec.CBOR.Term qualified as CBOR
10+
import Codec.Serialise (Serialise (decode, encode))
911
import Control.DeepSeq (NFData)
1012
import Control.Monad ((>=>))
1113
import Data.Bits (Bits (..))
14+
import Data.ByteString.Lazy (ByteString)
1215
import Data.Text (Text)
1316
import Data.Text qualified as T
17+
import Data.Void
1418
import GHC.Generics (Generic)
1519

20+
import Control.Monad.Class.MonadFork
1621
import Control.Monad.Class.MonadST (MonadST)
22+
import Control.Concurrent.Class.MonadSTM
23+
import Control.Monad.Class.MonadThrow
1724
import Control.Tracer (Tracer, nullTracer)
1825

1926
import Network.Mux qualified as Mx
27+
import Network.TypedProtocol.Codec hiding (encode, decode)
28+
import Network.TypedProtocol.Codec.CBOR qualified as CBOR
2029

30+
import DMQ.Diffusion.NodeKernel
31+
import DMQ.NtC_Applications.LocalMsgSubmission
32+
import DMQ.NtC_Applications.LocalMsgNotification
33+
import DMQ.Protocol.LocalMsgNotification.Codec
34+
import DMQ.Protocol.LocalMsgNotification.Server
35+
import DMQ.Protocol.LocalMsgNotification.Type
36+
import DMQ.Protocol.LocalMsgSubmission.Codec
37+
import DMQ.Protocol.LocalMsgSubmission.Server
38+
import DMQ.Protocol.LocalMsgSubmission.Type
39+
import DMQ.Protocol.SigSubmission.Codec
40+
import DMQ.Protocol.SigSubmission.Type
41+
42+
import Ouroboros.Network.Context
2143
import Ouroboros.Network.CodecCBORTerm (CodecCBORTerm (..))
22-
import Ouroboros.Network.ConnectionId (ConnectionId)
23-
import Ouroboros.Network.Driver.Simple (TraceSendRecv)
44+
import Ouroboros.Network.Driver.Simple
2445
import Ouroboros.Network.Handshake.Acceptable (Acceptable (..))
2546
import Ouroboros.Network.Handshake.Queryable (Queryable (..))
2647
import Ouroboros.Network.Magic (NetworkMagic (..))
48+
import Ouroboros.Network.Mux
2749
import Ouroboros.Network.Protocol.Handshake (Accept (..), Handshake,
2850
HandshakeArguments (..))
2951
import Ouroboros.Network.Protocol.Handshake.Codec (cborTermVersionDataCodec,
3052
codecHandshake, noTimeLimitsHandshake)
53+
import Ouroboros.Network.TxSubmission.Mempool.Simple qualified as Mempool
54+
3155

3256
data NodeToClientVersion =
3357
NodeToClientV_1
@@ -106,9 +130,6 @@ nodeToClientCodecCBORTerm _v = CodecCBORTerm {encodeTerm, decodeTerm}
106130
decoder x query | x >= 0 && x <= 0xffffffff = Right (NodeToClientVersionData (NetworkMagic $ fromIntegral x) query)
107131
| otherwise = Left $ T.pack $ "networkMagic out of bound: " <> show x
108132

109-
data Protocols =
110-
Protocols {
111-
}
112133

113134
type HandshakeTr ntcAddr = Mx.WithBearer (ConnectionId ntcAddr) (TraceSendRecv (Handshake NodeToClientVersion CBOR.Term))
114135

@@ -139,3 +160,159 @@ stdVersionDataNTC networkMagic =
139160
{ networkMagic
140161
, query = False
141162
}
163+
164+
165+
-- TODO: delete these aliases
166+
type LocalMsgSubmission' = LocalMsgSubmission Sig Int
167+
type LocalMsgNotification' = LocalMsgNotification Sig
168+
169+
data Codecs m =
170+
Codecs {
171+
msgSubmissionCodec
172+
:: Codec LocalMsgSubmission'
173+
CBOR.DeserialiseFailure m ByteString
174+
, msgNotificationCodec
175+
:: Codec LocalMsgNotification'
176+
CBOR.DeserialiseFailure m ByteString
177+
}
178+
179+
dmqCodecs :: MonadST m
180+
=> Codecs m
181+
dmqCodecs =
182+
Codecs {
183+
msgSubmissionCodec = codecLocalMsgSubmission encodeSig decodeSig encode decode
184+
, msgNotificationCodec = codecLocalMsgNotification encodeSig decodeSig
185+
}
186+
187+
188+
-- | A node-to-client application
189+
--
190+
type App ntcAddr m a =
191+
NodeToClientVersion
192+
-> ResponderContext ntcAddr
193+
-> Mx.Channel m ByteString
194+
-> m (a, Maybe ByteString)
195+
196+
197+
data Apps ntcAddr m a =
198+
Apps {
199+
-- | Start a sig-submission client
200+
aLocalMsgSubmission :: !(App ntcAddr m a)
201+
202+
-- | Start a sig-submission server
203+
, aLocalMsgNotification :: !(App ntcAddr m a)
204+
}
205+
206+
207+
-- | Construct applications for the node-to-client protocols
208+
--
209+
ntcApps
210+
:: (MonadThrow m, MonadThread m, MonadSTM m)
211+
=> NodeKernel ntnAddr m
212+
-> Codecs m
213+
-> Apps ntcAddr m ()
214+
ntcApps NodeKernel { mempool }
215+
Codecs { msgSubmissionCodec, msgNotificationCodec }=
216+
Apps {
217+
aLocalMsgSubmission
218+
, aLocalMsgNotification
219+
}
220+
where
221+
sigSize :: Sig -> SizeInBytes
222+
sigSize _ = 0 -- TODO
223+
224+
mempoolReader = Mempool.getReader sigId sigSize mempool
225+
mempoolWriter = Mempool.getWriter sigId (const True) mempool
226+
227+
aLocalMsgSubmission _version _ctx channel = do
228+
labelThisThread "LocalMsgSubmissionServer"
229+
runPeer
230+
nullTracer
231+
msgSubmissionCodec
232+
channel
233+
(localMsgSubmissionServerPeer $
234+
localMsgSubmissionServer undefined mempoolWriter)
235+
236+
aLocalMsgNotification _version _ctx channel = do
237+
labelThisThread "LocalMsgNotificationServer"
238+
runPeer
239+
nullTracer
240+
msgNotificationCodec
241+
channel
242+
(localMsgNotificationServerPeer $
243+
localMsgNotificationServer undefined nullTracer undefined mempoolReader)
244+
245+
246+
data Protocols appType ntcAddr bytes m a b =
247+
Protocols {
248+
msgSubmissionProtocol :: RunMiniProtocolWithMinimalCtx appType ntcAddr bytes m a b
249+
, msgNotificationProtocol :: RunMiniProtocolWithMinimalCtx appType ntcAddr bytes m a b
250+
}
251+
252+
responders
253+
:: Apps ntcAddr m a
254+
-> NodeToClientVersion
255+
-> NodeToClientVersionData
256+
-> OuroborosApplicationWithMinimalCtx 'Mx.ResponderMode ntcAddr ByteString m Void a
257+
responders Apps {
258+
aLocalMsgSubmission
259+
, aLocalMsgNotification
260+
}
261+
version
262+
=
263+
nodeToClientProtocols
264+
Protocols {
265+
msgSubmissionProtocol =
266+
ResponderProtocolOnly $
267+
MiniProtocolCb $ aLocalMsgSubmission version
268+
, msgNotificationProtocol =
269+
ResponderProtocolOnly $
270+
MiniProtocolCb $ aLocalMsgNotification version
271+
}
272+
version
273+
274+
275+
-- | Make an 'OuroborosApplication' for the bundle of mini-protocols that
276+
-- make up the overall node-to-client protocol.
277+
--
278+
-- This function specifies the wire format protocol numbers as well as the
279+
-- protocols that run for each 'NodeToClientVersion'.
280+
--
281+
-- They are chosen to not overlap with the node to node protocol numbers.
282+
-- This is not essential for correctness, but is helpful to allow a single
283+
-- shared implementation of tools that can analyse both protocols, e.g.
284+
-- wireshark plugins.
285+
--
286+
nodeToClientProtocols
287+
:: Protocols appType ntcAddr bytes m a b
288+
-> NodeToClientVersion
289+
-> NodeToClientVersionData
290+
-> OuroborosApplicationWithMinimalCtx appType ntcAddr bytes m a b
291+
nodeToClientProtocols protocols _version _versionData =
292+
OuroborosApplication $
293+
case protocols of
294+
Protocols {
295+
msgSubmissionProtocol
296+
, msgNotificationProtocol
297+
} ->
298+
[ localMsgSubmission msgSubmissionProtocol
299+
, localMsgNotification msgNotificationProtocol
300+
]
301+
where
302+
-- TODO: verify protocol numbers
303+
localMsgSubmission protocol = MiniProtocol {
304+
miniProtocolNum = MiniProtocolNum 10,
305+
miniProtocolStart = StartOnDemand,
306+
miniProtocolLimits = maximumMiniProtocolLimits,
307+
miniProtocolRun = protocol
308+
}
309+
localMsgNotification protocol = MiniProtocol {
310+
miniProtocolNum = MiniProtocolNum 11,
311+
miniProtocolStart = StartOnDemand,
312+
miniProtocolLimits = maximumMiniProtocolLimits,
313+
miniProtocolRun = protocol
314+
}
315+
maximumMiniProtocolLimits =
316+
MiniProtocolLimits {
317+
maximumIngressQueue = 0xffffffff
318+
}

0 commit comments

Comments
 (0)