Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 14 additions & 1 deletion decentralized-message-queue/app/Main.hs
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,19 @@ import DMQ.Configuration.CLIOptions (parseCLIOptions)
import DMQ.Configuration.Topology (readTopologyFileOrError)
import DMQ.Diffusion.Applications (diffusionApplications)
import DMQ.Diffusion.Arguments
import DMQ.Diffusion.NodeKernel (withNodeKernel)
import DMQ.Diffusion.NodeKernel (mempool, withNodeKernel)
import DMQ.NodeToClient qualified as NtC
import DMQ.NodeToNode (dmqCodecs, dmqLimitsAndTimeouts, ntnApps)
import DMQ.Protocol.LocalMsgSubmission.Codec
import DMQ.Protocol.SigSubmission.Codec
import DMQ.Protocol.SigSubmission.Type (Sig (..))
import DMQ.Tracer

import DMQ.Diffusion.PeerSelection (policy)
import Ouroboros.Network.Diffusion qualified as Diffusion
import Ouroboros.Network.PeerSelection.PeerSharing.Codec (decodeRemoteAddress,
encodeRemoteAddress)
import Ouroboros.Network.TxSubmission.Mempool.Simple qualified as Mempool

main :: IO ()
main = void . runDMQ =<< execParser opts
Expand Down Expand Up @@ -78,6 +83,13 @@ runDMQ commandLineConfig = do
(decodeRemoteAddress maxBound))
dmqLimitsAndTimeouts
defaultSigDecisionPolicy
dmqNtCApps =
let sigSize _ = 0 -- TODO
maxMsgs = 1000 -- TODO: make this dynamic?
mempoolReader = Mempool.getReader sigId sigSize (mempool nodeKernel)
mempoolWriter = Mempool.getWriter sigId (const True) (mempool nodeKernel)
in NtC.ntcApps mempoolReader mempoolWriter maxMsgs
(NtC.dmqCodecs encodeSig decodeSig encodeReject decodeReject)
dmqDiffusionArguments =
diffusionArguments (if handshakeTracer
then WithEventType "Handshake" >$< tracer
Expand All @@ -91,6 +103,7 @@ runDMQ commandLineConfig = do
dmqDiffusionConfiguration
dmqLimitsAndTimeouts
dmqNtNApps
dmqNtCApps
(policy policyRng)

Diffusion.run dmqDiffusionArguments
Expand Down
49 changes: 43 additions & 6 deletions decentralized-message-queue/decentralized-message-queue.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,16 @@ category: Network
build-type: Simple
extra-doc-files: CHANGELOG.md

common extensions
default-extensions:
BangPatterns
BlockArguments
GADTs
ImportQualifiedPost
LambdaCase
NamedFieldPuns
ScopedTypeVariables

common warnings
ghc-options:
-Wall
Expand All @@ -31,7 +41,10 @@ common warnings
-Wunused-packages

library
import: warnings
import:
warnings,
extensions

exposed-modules:
DMQ.Configuration
DMQ.Configuration.CLIOptions
Expand All @@ -41,9 +54,20 @@ library
DMQ.Diffusion.NodeKernel
DMQ.Diffusion.PeerSelection
DMQ.NodeToClient
DMQ.NodeToClient.LocalMsgNotification
DMQ.NodeToClient.LocalMsgSubmission
DMQ.NodeToClient.Version
DMQ.NodeToNode
DMQ.NodeToNode.Version
DMQ.Protocol.LocalMsgNotification.Client
DMQ.Protocol.LocalMsgNotification.Codec
DMQ.Protocol.LocalMsgNotification.Examples
DMQ.Protocol.LocalMsgNotification.Server
DMQ.Protocol.LocalMsgNotification.Type
DMQ.Protocol.LocalMsgSubmission.Client
DMQ.Protocol.LocalMsgSubmission.Codec
DMQ.Protocol.LocalMsgSubmission.Server
DMQ.Protocol.LocalMsgSubmission.Type
DMQ.Protocol.SigSubmission.Codec
DMQ.Protocol.SigSubmission.Type
DMQ.Tracer
Expand Down Expand Up @@ -74,16 +98,19 @@ library
ouroboros-network-framework ^>=0.19,
ouroboros-network-protocols ^>=0.15,
random ^>=1.2,
singletons,
text >=1.2.4 && <2.2,
time ^>=1.12,
typed-protocols:{typed-protocols, cborg} ^>=1.1,

hs-source-dirs: src
default-language: Haskell2010
default-extensions: ImportQualifiedPost

executable dmq-exe
import: warnings
import:
warnings,
extensions

main-is: Main.hs
ghc-options:
-threaded
Expand All @@ -102,15 +129,18 @@ executable dmq-exe

hs-source-dirs: app
default-language: Haskell2010
default-extensions: ImportQualifiedPost

test-suite dmq-test
import: warnings
import:
warnings,
extensions

default-language: Haskell2010
default-extensions: ImportQualifiedPost
other-modules:
Test.DMQ.NodeToClient
Test.DMQ.NodeToNode
Test.DMQ.Protocol.LocalMsgNotification
Test.DMQ.Protocol.LocalMsgSubmission
Test.DMQ.Protocol.SigSubmission

type: exitcode-stdio-1.0
Expand All @@ -119,10 +149,17 @@ test-suite dmq-test
build-depends:
QuickCheck,
base >=4.14 && <4.22,
bytestring,
contra-tracer,
decentralized-message-queue,
io-classes,
io-sim,
ouroboros-network-api,
ouroboros-network-framework,
ouroboros-network-protocols:testlib,
ouroboros-network-testing,
quickcheck-instances,
serialise,
tasty,
tasty-quickcheck,
time,
Expand Down
10 changes: 4 additions & 6 deletions decentralized-message-queue/src/DMQ/Diffusion/Applications.hs
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,13 @@ import DMQ.Configuration
import DMQ.Diffusion.NodeKernel (NodeKernel (..))
import DMQ.NodeToClient (NodeToClientVersion, NodeToClientVersionData,
stdVersionDataNTC)
import DMQ.NodeToClient qualified as NTC
import DMQ.NodeToNode (NodeToNodeVersion, NodeToNodeVersionData,
stdVersionDataNTN)
import DMQ.NodeToNode qualified as NTN

import Ouroboros.Network.Diffusion.Types qualified as Diffusion
import Ouroboros.Network.ExitPolicy (RepromoteDelay (..))
import Ouroboros.Network.Mux (OuroborosApplication (..))
import Ouroboros.Network.PeerSelection.Governor.Types (PeerSelectionPolicy)
import Ouroboros.Network.Protocol.Handshake.Version (combineVersions,
simpleSingletonVersions)
Expand All @@ -27,6 +27,7 @@ diffusionApplications
-> Diffusion.Configuration NoExtraFlags m ntnFd ntnAddr ntcFd ntcAddr
-> NTN.LimitsAndTimeouts ntnAddr
-> NTN.Apps ntnAddr m a ()
-> NTC.Apps ntcAddr m ()
-> PeerSelectionPolicy ntnAddr m
-> Diffusion.Applications ntnAddr NodeToNodeVersion NodeToNodeVersionData
ntcAddr NodeToClientVersion NodeToClientVersionData
Expand All @@ -44,6 +45,7 @@ diffusionApplications
}
ntnLimitsAndTimeouts
ntnApps
ntcApps
peerSelectionPolicy =
Diffusion.Applications {
daApplicationInitiatorMode =
Expand All @@ -67,11 +69,7 @@ diffusionApplications
[ simpleSingletonVersions
version
(stdVersionDataNTC networkMagic)
(\_versionData ->
OuroborosApplication
[
]
)
(NTC.responders ntcApps version)
| version <- [minBound..maxBound]
]
, daRethrowPolicy = muxErrorRethrowPolicy
Expand Down
14 changes: 7 additions & 7 deletions decentralized-message-queue/src/DMQ/Diffusion/NodeKernel.hs
Original file line number Diff line number Diff line change
Expand Up @@ -39,16 +39,16 @@ import DMQ.Protocol.SigSubmission.Type (Sig (..), SigId)
data NodeKernel ntnAddr m =
NodeKernel {
-- | The fetch client registry, used for the keep alive clients.
fetchClientRegistry :: FetchClientRegistry (ConnectionId ntnAddr) () () m
fetchClientRegistry :: !(FetchClientRegistry (ConnectionId ntnAddr) () () m)

-- | Read the current peer sharing registry, used for interacting with
-- the PeerSharing protocol
, peerSharingRegistry :: PeerSharingRegistry ntnAddr m
, peerSharingAPI :: PeerSharingAPI ntnAddr StdGen m
, mempool :: Mempool m Sig
, sigChannelVar :: TxChannelsVar m ntnAddr SigId Sig
, sigMempoolSem :: TxMempoolSem m
, sigSharedTxStateVar :: SharedTxStateVar m ntnAddr SigId Sig
, peerSharingRegistry :: !(PeerSharingRegistry ntnAddr m)
, peerSharingAPI :: !(PeerSharingAPI ntnAddr StdGen m)
, mempool :: !(Mempool m Sig)
, sigChannelVar :: !(TxChannelsVar m ntnAddr SigId Sig)
, sigMempoolSem :: !(TxMempoolSem m)
, sigSharedTxStateVar :: !(SharedTxStateVar m ntnAddr SigId Sig)
}

newNodeKernel :: ( MonadLabelledSTM m
Expand Down
Loading
Loading