Skip to content

Commit 7858cd3

Browse files
dmq: NtC applications
1 parent d510935 commit 7858cd3

File tree

5 files changed

+151
-1
lines changed

5 files changed

+151
-1
lines changed

decentralized-message-queue/app/Main.hs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,10 @@ runDMQ cliopts@CLIOptions {
5353
(decodeRemoteAddress (mapNtNDMQtoOuroboros maxBound)))
5454
dmqLimitsAndTimeouts
5555
defaultSigDecisionPolicy
56+
dmqNtCApps =
57+
ntcApps nodeKernel
58+
undefined -- codecs
59+
5660
dmqDiffusionArguments =
5761
diffusionArguments debugTracer
5862
debugTracer

decentralized-message-queue/decentralized-message-queue.cabal

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,8 @@ library
5454
DMQ.Diffusion.PeerSelection
5555
DMQ.NodeToClient
5656
DMQ.NodeToNode
57+
DMQ.NtC_Applications.LocalMsgSubmission
58+
DMQ.NtC_Applications.LocalMsgNotification
5759
DMQ.Protocol.LocalMsgNotification.Client
5860
DMQ.Protocol.LocalMsgNotification.Codec
5961
DMQ.Protocol.LocalMsgNotification.Server

decentralized-message-queue/src/DMQ/NodeToClient.hs

Lines changed: 90 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
{-# LANGUAGE OverloadedRecordDot #-}
12
{-# LANGUAGE DeriveAnyClass #-}
23
{-# LANGUAGE DeriveGeneric #-}
34
{-# LANGUAGE LambdaCase #-}
@@ -9,25 +10,41 @@ import Codec.CBOR.Term qualified as CBOR
910
import Control.DeepSeq (NFData)
1011
import Control.Monad ((>=>))
1112
import Data.Bits (Bits (..))
13+
import Data.ByteString.Lazy (ByteString)
1214
import Data.Text (Text)
1315
import Data.Text qualified as T
1416
import GHC.Generics (Generic)
1517

18+
import Control.Monad.Class.MonadFork
1619
import Control.Monad.Class.MonadST (MonadST)
20+
import Control.Monad.Class.MonadThrow
1721
import Control.Tracer (Tracer, nullTracer)
1822

1923
import Network.Mux qualified as Mx
2024

2125
import Ouroboros.Network.CodecCBORTerm (CodecCBORTerm (..))
2226
import Ouroboros.Network.ConnectionId (ConnectionId)
23-
import Ouroboros.Network.Driver.Simple (TraceSendRecv)
27+
import Ouroboros.Network.Driver.Simple
2428
import Ouroboros.Network.Handshake.Acceptable (Acceptable (..))
2529
import Ouroboros.Network.Handshake.Queryable (Queryable (..))
2630
import Ouroboros.Network.Magic (NetworkMagic (..))
2731
import Ouroboros.Network.Protocol.Handshake (Accept (..), Handshake,
2832
HandshakeArguments (..))
2933
import Ouroboros.Network.Protocol.Handshake.Codec (cborTermVersionDataCodec,
3034
codecHandshake, noTimeLimitsHandshake)
35+
import DMQ.Diffusion.NodeKernel
36+
import Network.TypedProtocol.Codec
37+
import DMQ.Protocol.LocalMsgSubmission.Type
38+
import qualified Network.TypedProtocol.Codec.CBOR as CBOR
39+
import DMQ.Protocol.LocalMsgNotification.Type
40+
import DMQ.Protocol.LocalMsgNotification.Server
41+
import DMQ.Protocol.LocalMsgSubmission.Server
42+
import Ouroboros.Network.Context
43+
import DMQ.Protocol.SigSubmission.Type
44+
import qualified Ouroboros.Network.TxSubmission.Mempool.Simple as Mempool
45+
import Control.Concurrent.Class.MonadSTM
46+
import DMQ.NtC_Applications.LocalMsgSubmission
47+
import DMQ.NtC_Applications.LocalMsgNotification
3148

3249
data NodeToClientVersion =
3350
NodeToClientV_1
@@ -106,6 +123,7 @@ nodeToClientCodecCBORTerm _v = CodecCBORTerm {encodeTerm, decodeTerm}
106123
decoder x query | x >= 0 && x <= 0xffffffff = Right (NodeToClientVersionData (NetworkMagic $ fromIntegral x) query)
107124
| otherwise = Left $ T.pack $ "networkMagic out of bound: " <> show x
108125

126+
109127
data Protocols =
110128
Protocols {
111129
}
@@ -139,3 +157,74 @@ stdVersionDataNTC networkMagic =
139157
{ networkMagic
140158
, query = False
141159
}
160+
161+
type LocalMsgSubmission' = LocalMsgSubmission Sig Int
162+
type LocalMsgNotification' = LocalMsgNotification Sig
163+
164+
data Codecs m =
165+
Codecs {
166+
msgSubmissionCodec
167+
:: Codec LocalMsgSubmission'
168+
CBOR.DeserialiseFailure m ByteString
169+
, msgNotificationCodec
170+
:: Codec LocalMsgNotification'
171+
CBOR.DeserialiseFailure m ByteString
172+
}
173+
174+
175+
-- | A node-to-client application
176+
--
177+
type App ctx m a =
178+
ctx
179+
-> Mx.Channel m ByteString
180+
-> m (a, Maybe ByteString)
181+
182+
183+
data Apps addr m a =
184+
Apps {
185+
-- | Start a sig-submission client
186+
aLocalMsgSubmission :: App addr m a
187+
188+
-- | Start a sig-submission server
189+
, aLocalMsgNotification :: App addr m a
190+
}
191+
192+
193+
-- | Construct applications for the node-to-client protocols
194+
--
195+
ntcApps
196+
:: (MonadThrow m, MonadThread m, MonadSTM m)
197+
=> NodeKernel addr m
198+
-> Codecs m
199+
-> Apps addr m ()
200+
ntcApps NodeKernel { mempool } codecs =
201+
Apps {
202+
aLocalMsgSubmission
203+
, aLocalMsgNotification
204+
}
205+
where
206+
sigSize :: Sig -> SizeInBytes
207+
sigSize _ = 0 -- TODO
208+
209+
mempoolReader = Mempool.getReader sigId sigSize mempool
210+
mempoolWriter = Mempool.getWriter sigId (const True) mempool
211+
212+
-- aLocalMsgSubmission :: _
213+
aLocalMsgSubmission _ctx channel = do
214+
labelThisThread "LocalMsgSubmissionServer"
215+
runPeer
216+
undefined
217+
codecs.msgSubmissionCodec
218+
channel
219+
(localMsgSubmissionServerPeer $
220+
localMsgSubmissionServer undefined mempoolWriter)
221+
222+
-- aLocalMsgNotification :: _
223+
aLocalMsgNotification _ctx channel = do
224+
labelThisThread "LocalMsgNotificationServer"
225+
runPeer
226+
undefined
227+
codecs.msgNotificationCodec
228+
channel
229+
(localMsgNotificationServerPeer $
230+
localMsgNotificationServer undefined)
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
module DMQ.NtC_Applications.LocalMsgNotification where
2+
3+
import DMQ.Protocol.LocalMsgNotification.Server
4+
import Control.Concurrent.Class.MonadSTM
5+
6+
localMsgNotificationServer
7+
:: (MonadSTM m)
8+
=> a
9+
-> LocalMsgNotificationServer m msg b
10+
localMsgNotificationServer _a =
11+
LocalMsgNotificationServer . pure $ serverIdle undefined
12+
where
13+
serverIdle :: b -> ServerIdle m msg b
14+
serverIdle _b = ServerIdle { msgRequestHandler, msgDoneHandler }
15+
16+
msgRequestHandler = undefined
17+
msgDoneHandler = undefined
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
module DMQ.NtC_Applications.LocalMsgSubmission where
2+
3+
import Control.Tracer
4+
import Control.Concurrent.Class.MonadSTM
5+
import Ouroboros.Network.TxSubmission.Mempool.Simple
6+
import DMQ.Protocol.LocalMsgSubmission.Server
7+
import Ouroboros.Network.TxSubmission.Inbound.V2
8+
import Data.Maybe
9+
import DMQ.Protocol.LocalMsgSubmission.Type
10+
11+
-- | Local transaction submission server, for adding txs to the 'Mempool'
12+
--
13+
localMsgSubmissionServer ::
14+
MonadSTM m
15+
=> Tracer m TraceLocalMsgSubmission
16+
-> TxSubmissionMempoolWriter msgid msg idx m
17+
-> m (LocalMsgSubmissionServer msg reject m ())
18+
localMsgSubmissionServer tracer TxSubmissionMempoolWriter { mempoolAddTxs } =
19+
pure server
20+
where
21+
failure = (SubmitFail undefined, server)
22+
success = const (SubmitSuccess, server)
23+
24+
server = LocalTxSubmissionServer {
25+
recvMsgSubmitTx = \msg -> do
26+
traceWith tracer $ TraceReceivedMsg {- TODO: add msg argument -}
27+
maybe failure success . listToMaybe <$> mempoolAddTxs [msg]
28+
-- case addTxRes of
29+
-- MempoolTxAdded _tx -> return (SubmitSuccess, server)
30+
-- MempoolTxRejected _tx addTxErr -> return (SubmitFail addTxErr, server)
31+
32+
, recvMsgDone = ()
33+
}
34+
35+
36+
data TraceLocalMsgSubmission
37+
= TraceReceivedMsg
38+
-- ^ A transaction was received.

0 commit comments

Comments
 (0)