Skip to content

Commit 41c0634

Browse files
dmq: NtC applications
1 parent 01a6e39 commit 41c0634

File tree

7 files changed

+359
-30
lines changed

7 files changed

+359
-30
lines changed

decentralized-message-queue/app/Main.hs

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,14 +17,19 @@ import DMQ.Configuration.CLIOptions (parseCLIOptions)
1717
import DMQ.Configuration.Topology (readTopologyFileOrError)
1818
import DMQ.Diffusion.Applications (diffusionApplications)
1919
import DMQ.Diffusion.Arguments
20-
import DMQ.Diffusion.NodeKernel (withNodeKernel)
20+
import DMQ.Diffusion.NodeKernel (mempool, withNodeKernel)
21+
import DMQ.NodeToClient qualified as NtC
2122
import DMQ.NodeToNode (dmqCodecs, dmqLimitsAndTimeouts, ntnApps)
23+
import DMQ.Protocol.LocalMsgSubmission.Codec
24+
import DMQ.Protocol.SigSubmission.Codec
25+
import DMQ.Protocol.SigSubmission.Type (Sig (..))
2226
import DMQ.Tracer
2327

2428
import DMQ.Diffusion.PeerSelection (policy)
2529
import Ouroboros.Network.Diffusion qualified as Diffusion
2630
import Ouroboros.Network.PeerSelection.PeerSharing.Codec (decodeRemoteAddress,
2731
encodeRemoteAddress)
32+
import Ouroboros.Network.TxSubmission.Mempool.Simple qualified as Mempool
2833

2934
main :: IO ()
3035
main = void . runDMQ =<< execParser opts
@@ -78,6 +83,13 @@ runDMQ commandLineConfig = do
7883
(decodeRemoteAddress maxBound))
7984
dmqLimitsAndTimeouts
8085
defaultSigDecisionPolicy
86+
dmqNtCApps =
87+
let sigSize _ = 0 -- TODO
88+
maxMsgs = 1000 -- TODO: make this dynamic?
89+
mempoolReader = Mempool.getReader sigId sigSize (mempool nodeKernel)
90+
mempoolWriter = Mempool.getWriter sigId (const True) (mempool nodeKernel)
91+
in NtC.ntcApps mempoolReader mempoolWriter maxMsgs
92+
(NtC.dmqCodecs encodeSig decodeSig encodeReject decodeReject)
8193
dmqDiffusionArguments =
8294
diffusionArguments (if handshakeTracer
8395
then WithEventType "Handshake" >$< tracer
@@ -91,6 +103,7 @@ runDMQ commandLineConfig = do
91103
dmqDiffusionConfiguration
92104
dmqLimitsAndTimeouts
93105
dmqNtNApps
106+
dmqNtCApps
94107
(policy policyRng)
95108

96109
Diffusion.run dmqDiffusionArguments

decentralized-message-queue/decentralized-message-queue.cabal

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ library
4444
import:
4545
warnings,
4646
extensions
47+
4748
exposed-modules:
4849
DMQ.Configuration
4950
DMQ.Configuration.CLIOptions
@@ -53,11 +54,14 @@ library
5354
DMQ.Diffusion.NodeKernel
5455
DMQ.Diffusion.PeerSelection
5556
DMQ.NodeToClient
57+
DMQ.NodeToClient.LocalMsgSubmission
58+
DMQ.NodeToClient.LocalMsgNotification
5659
DMQ.NodeToClient.Version
5760
DMQ.NodeToNode
5861
DMQ.NodeToNode.Version
5962
DMQ.Protocol.LocalMsgNotification.Client
6063
DMQ.Protocol.LocalMsgNotification.Codec
64+
DMQ.Protocol.LocalMsgNotification.Examples
6165
DMQ.Protocol.LocalMsgNotification.Server
6266
DMQ.Protocol.LocalMsgNotification.Type
6367
DMQ.Protocol.LocalMsgSubmission.Client
@@ -106,6 +110,7 @@ executable dmq-exe
106110
import:
107111
warnings,
108112
extensions
113+
109114
main-is: Main.hs
110115
ghc-options:
111116
-threaded
@@ -126,15 +131,17 @@ executable dmq-exe
126131
default-language: Haskell2010
127132

128133
test-suite dmq-test
129-
import: warnings,
130-
extensions
134+
import:
135+
warnings,
136+
extensions
137+
131138
default-language: Haskell2010
132139
other-modules:
133140
Test.DMQ.NodeToClient
134141
Test.DMQ.NodeToNode
135-
Test.DMQ.Protocol.SigSubmission
136142
Test.DMQ.Protocol.LocalMsgNotification
137143
Test.DMQ.Protocol.LocalMsgSubmission
144+
Test.DMQ.Protocol.SigSubmission
138145

139146
type: exitcode-stdio-1.0
140147
hs-source-dirs: test
@@ -143,8 +150,12 @@ test-suite dmq-test
143150
QuickCheck,
144151
base >=4.14 && <4.22,
145152
bytestring,
153+
contra-tracer,
146154
decentralized-message-queue,
155+
io-classes,
156+
io-sim,
147157
ouroboros-network-api,
158+
ouroboros-network-framework,
148159
ouroboros-network-protocols:testlib,
149160
ouroboros-network-testing,
150161
quickcheck-instances,
@@ -154,7 +165,6 @@ test-suite dmq-test
154165
time,
155166
typed-protocols:{typed-protocols, codec-properties},
156167
with-utf8,
157-
io-classes
158168

159169
ghc-options:
160170
-fno-ignore-asserts

decentralized-message-queue/src/DMQ/Diffusion/Applications.hs

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -8,13 +8,13 @@ import DMQ.Configuration
88
import DMQ.Diffusion.NodeKernel (NodeKernel (..))
99
import DMQ.NodeToClient (NodeToClientVersion, NodeToClientVersionData,
1010
stdVersionDataNTC)
11+
import DMQ.NodeToClient qualified as NTC
1112
import DMQ.NodeToNode (NodeToNodeVersion, NodeToNodeVersionData,
1213
stdVersionDataNTN)
1314
import DMQ.NodeToNode qualified as NTN
1415

1516
import Ouroboros.Network.Diffusion.Types qualified as Diffusion
1617
import Ouroboros.Network.ExitPolicy (RepromoteDelay (..))
17-
import Ouroboros.Network.Mux (OuroborosApplication (..))
1818
import Ouroboros.Network.PeerSelection.Governor.Types (PeerSelectionPolicy)
1919
import Ouroboros.Network.Protocol.Handshake.Version (combineVersions,
2020
simpleSingletonVersions)
@@ -27,6 +27,7 @@ diffusionApplications
2727
-> Diffusion.Configuration NoExtraFlags m ntnFd ntnAddr ntcFd ntcAddr
2828
-> NTN.LimitsAndTimeouts ntnAddr
2929
-> NTN.Apps ntnAddr m a ()
30+
-> NTC.Apps ntcAddr m ()
3031
-> PeerSelectionPolicy ntnAddr m
3132
-> Diffusion.Applications ntnAddr NodeToNodeVersion NodeToNodeVersionData
3233
ntcAddr NodeToClientVersion NodeToClientVersionData
@@ -44,6 +45,7 @@ diffusionApplications
4445
}
4546
ntnLimitsAndTimeouts
4647
ntnApps
48+
ntcApps
4749
peerSelectionPolicy =
4850
Diffusion.Applications {
4951
daApplicationInitiatorMode =
@@ -67,11 +69,7 @@ diffusionApplications
6769
[ simpleSingletonVersions
6870
version
6971
(stdVersionDataNTC networkMagic)
70-
(\_versionData ->
71-
OuroborosApplication
72-
[
73-
]
74-
)
72+
(NTC.responders ntcApps version)
7573
| version <- [minBound..maxBound]
7674
]
7775
, daRethrowPolicy = muxErrorRethrowPolicy

decentralized-message-queue/src/DMQ/Diffusion/NodeKernel.hs

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -39,16 +39,16 @@ import DMQ.Protocol.SigSubmission.Type (Sig (..), SigId)
3939
data NodeKernel ntnAddr m =
4040
NodeKernel {
4141
-- | The fetch client registry, used for the keep alive clients.
42-
fetchClientRegistry :: FetchClientRegistry (ConnectionId ntnAddr) () () m
42+
fetchClientRegistry :: !(FetchClientRegistry (ConnectionId ntnAddr) () () m)
4343

4444
-- | Read the current peer sharing registry, used for interacting with
4545
-- the PeerSharing protocol
46-
, peerSharingRegistry :: PeerSharingRegistry ntnAddr m
47-
, peerSharingAPI :: PeerSharingAPI ntnAddr StdGen m
48-
, mempool :: Mempool m Sig
49-
, sigChannelVar :: TxChannelsVar m ntnAddr SigId Sig
50-
, sigMempoolSem :: TxMempoolSem m
51-
, sigSharedTxStateVar :: SharedTxStateVar m ntnAddr SigId Sig
46+
, peerSharingRegistry :: !(PeerSharingRegistry ntnAddr m)
47+
, peerSharingAPI :: !(PeerSharingAPI ntnAddr StdGen m)
48+
, mempool :: !(Mempool m Sig)
49+
, sigChannelVar :: !(TxChannelsVar m ntnAddr SigId Sig)
50+
, sigMempoolSem :: !(TxMempoolSem m)
51+
, sigSharedTxStateVar :: !(SharedTxStateVar m ntnAddr SigId Sig)
5252
}
5353

5454
newNodeKernel :: ( MonadLabelledSTM m

0 commit comments

Comments
 (0)