Skip to content

Commit

Permalink
Change semantic (and rename) 'MsgReAcquire' to be blocking for updates
Browse files Browse the repository at this point in the history
  'MsgAwaitRequire' now replaces 'MsgReAcquire' so that it only replies when there has been a change in the content of the mempool. This allows for clients to efficiently consume a snapshot and then, wait for the next one without having to poll for updates based on arbitrary times.

  Also, I took the opportunity to update the module's header and fix the state-machine diagram rendering in Haddock.
  • Loading branch information
KtorZ committed Oct 10, 2021
1 parent 6d26a89 commit bde8158
Show file tree
Hide file tree
Showing 9 changed files with 74 additions and 74 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,13 @@ localTxMonitorServer mempool =
serverStIdle
:: ServerStIdle (GenTxId blk) (GenTx blk) SlotNo m ()
serverStIdle =
ServerStIdle { recvMsgDone, recvMsgAcquire }
ServerStIdle
{ recvMsgDone = do
pure ()
, recvMsgAcquire = do
s <- atomically $ (,) <$> getCapacity mempool <*> getSnapshot mempool
pure $ serverStAcquiring s
}

serverStAcquiring
:: (MempoolCapacityBytes, MempoolSnapshot blk idx)
Expand Down Expand Up @@ -58,19 +64,11 @@ localTxMonitorServer mempool =
, numberOfTxs = msNumTxs
}
pure $ SendMsgReplyGetSizes sizes (serverStAcquired s txs)
, recvMsgReAcquire =
recvMsgAcquire
, recvMsgAwaitAcquire = do
s' <- atomically $ do
s' <- (,) <$> getCapacity mempool <*> getSnapshot mempool
s' <$ check (s /= s')
pure $ serverStAcquiring s
, recvMsgRelease =
pure serverStIdle
}

recvMsgAcquire
:: m (ServerStAcquiring (GenTxId blk) (GenTx blk) SlotNo m ())
recvMsgAcquire = do
s <- atomically $ (,) <$> getCapacity mempool <*> getSnapshot mempool
pure $ serverStAcquiring s

recvMsgDone
:: m ()
recvMsgDone =
pure ()
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ direct (LocalTxMonitorClient mClient) (LocalTxMonitorServer mServer) = do
-> m (a, b)
directAcquired ServerStAcquired
{ recvMsgRelease
, recvMsgReAcquire
, recvMsgAwaitAcquire
, recvMsgNextTx
, recvMsgHasTx
, recvMsgGetSizes
Expand All @@ -54,8 +54,8 @@ direct (LocalTxMonitorClient mClient) (LocalTxMonitorServer mServer) = do
serverStIdle <- recvMsgRelease
clientStIdle <- mClientStIdle
directIdle serverStIdle clientStIdle
SendMsgReAcquire mClientStAcquired -> do
SendMsgAcquired slot serverStAcquired <- recvMsgReAcquire
SendMsgAwaitAcquire mClientStAcquired -> do
SendMsgAcquired slot serverStAcquired <- recvMsgAwaitAcquire
clientStAcquired <- mClientStAcquired slot
directAcquired serverStAcquired clientStAcquired
SendMsgNextTx mClientStAcquired -> do
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,7 @@ localTxMonitorClient txId =
ClientStIdle txid tx slot m ([(tx, Bool)], MempoolSizeAndCapacity)
clientStIdle =
SendMsgAcquire $ \_slot ->
pure $ SendMsgReAcquire $ \_slot' ->
pure $ clientStAcquired []
pure $ clientStAcquired []

clientStAcquired ::
[(tx, Bool)]
Expand Down Expand Up @@ -82,7 +81,7 @@ localTxMonitorServer txId (slot, allTxs) =
-> ServerStAcquired txid tx slot m ()
serverStAcquired txs =
ServerStAcquired
{ recvMsgReAcquire =
{ recvMsgAwaitAcquire =
pure serverStAcquiring
, recvMsgRelease =
pure serverStIdle
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ instance (Arbitrary txid, Arbitrary tx, Arbitrary slot)
arbitrary = oneof
[ pure $ AnyMessageAndAgency (ClientAgency TokIdle) MsgAcquire
, AnyMessageAndAgency (ServerAgency TokAcquiring) . MsgAcquired <$> arbitrary
, pure $ AnyMessageAndAgency (ClientAgency TokAcquired) MsgReAcquire
, pure $ AnyMessageAndAgency (ClientAgency TokAcquired) MsgAwaitAcquire
, pure $ AnyMessageAndAgency (ClientAgency TokAcquired) MsgNextTx
, AnyMessageAndAgency (ServerAgency (TokBusy TokNextTx)) . MsgReplyNextTx <$> arbitrary
, AnyMessageAndAgency (ClientAgency TokAcquired) . MsgHasTx <$> arbitrary
Expand All @@ -229,7 +229,7 @@ instance (Eq txid, Eq tx, Eq slot)
where
AnyMessage MsgAcquire == AnyMessage MsgAcquire = True
AnyMessage (MsgAcquired a) == AnyMessage (MsgAcquired b) = a == b
AnyMessage MsgReAcquire == AnyMessage MsgReAcquire = True
AnyMessage MsgAwaitAcquire == AnyMessage MsgAwaitAcquire = True
AnyMessage MsgNextTx == AnyMessage MsgNextTx = True
AnyMessage (MsgReplyNextTx a) == AnyMessage (MsgReplyNextTx b) = a == b
AnyMessage (MsgHasTx a) == AnyMessage (MsgHasTx b) = a == b
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,9 +91,9 @@ data ClientStAcquired txid tx slot m a where
:: (MempoolSizeAndCapacity -> m (ClientStAcquired txid tx slot m a))
-> ClientStAcquired txid tx slot m a

-- | Re-acquire a more recent snapshot, or the same one if nothing has changed.
-- | Await for a new snapshot and acquire it.
--
SendMsgReAcquire
SendMsgAwaitAcquire
:: (slot -> m (ClientStAcquired txid tx slot m a))
-> ClientStAcquired txid tx slot m a

Expand Down Expand Up @@ -145,8 +145,8 @@ localTxMonitorClientPeer (LocalTxMonitorClient mClient) =
Await (ServerAgency (TokBusy TokGetSizes)) $ \case
MsgReplyGetSizes sizes ->
Effect $ handleStAcquired <$> stAcquired sizes
SendMsgReAcquire stAcquired ->
Yield (ClientAgency TokAcquired) MsgReAcquire $
SendMsgAwaitAcquire stAcquired ->
Yield (ClientAgency TokAcquired) MsgAwaitAcquire $
Await (ServerAgency TokAcquiring) $ \case
MsgAcquired slot ->
Effect $ handleStAcquired <$> stAcquired slot
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ codecLocalTxMonitor encodeTxId decodeTxId
CBOR.encodeListLen 1 <> CBOR.encodeWord 1

encode (ClientAgency TokAcquired) = \case
MsgReAcquire ->
MsgAwaitAcquire ->
CBOR.encodeListLen 1 <> CBOR.encodeWord 1
MsgRelease ->
CBOR.encodeListLen 1 <> CBOR.encodeWord 3
Expand Down Expand Up @@ -100,7 +100,7 @@ codecLocalTxMonitor encodeTxId decodeTxId
return (SomeMessage MsgAcquire)

(ClientAgency TokAcquired, 1, 1) ->
return (SomeMessage MsgReAcquire)
return (SomeMessage MsgAwaitAcquire)
(ClientAgency TokAcquired, 1, 3) ->
return (SomeMessage MsgRelease)
(ClientAgency TokAcquired, 1, 5) ->
Expand Down Expand Up @@ -170,14 +170,14 @@ codecLocalTxMonitorId =
let res :: Message ptcl st st' -> m (DecodeStep bytes failure m (SomeMessage st))
res msg = return (DecodeDone (SomeMessage msg) Nothing)
in return $ DecodePartial $ \bytes -> case (stok, bytes) of
(ClientAgency TokIdle, Just (AnyMessage msg@MsgAcquire{})) -> res msg
(ClientAgency TokIdle, Just (AnyMessage msg@MsgDone{})) -> res msg
(ClientAgency TokAcquired, Just (AnyMessage msg@MsgReAcquire{})) -> res msg
(ClientAgency TokAcquired, Just (AnyMessage msg@MsgNextTx{})) -> res msg
(ClientAgency TokAcquired, Just (AnyMessage msg@MsgHasTx{})) -> res msg
(ClientAgency TokAcquired, Just (AnyMessage msg@MsgRelease{})) -> res msg

(ServerAgency TokAcquiring, Just (AnyMessage msg@MsgAcquired{})) -> res msg
(ClientAgency TokIdle, Just (AnyMessage msg@MsgAcquire{})) -> res msg
(ClientAgency TokIdle, Just (AnyMessage msg@MsgDone{})) -> res msg
(ClientAgency TokAcquired, Just (AnyMessage msg@MsgAwaitAcquire{})) -> res msg
(ClientAgency TokAcquired, Just (AnyMessage msg@MsgNextTx{})) -> res msg
(ClientAgency TokAcquired, Just (AnyMessage msg@MsgHasTx{})) -> res msg
(ClientAgency TokAcquired, Just (AnyMessage msg@MsgRelease{})) -> res msg

(ServerAgency TokAcquiring, Just (AnyMessage msg@MsgAcquired{})) -> res msg
(ServerAgency (TokBusy TokNextTx), Just (AnyMessage msg@MsgReplyNextTx{})) -> res msg
(ServerAgency (TokBusy TokHasTx), Just (AnyMessage msg@MsgReplyHasTx{})) -> res msg

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,15 +62,15 @@ data ServerStAcquiring txid tx slot m a where
--
-- * request the next transaction from the snapshot;
-- * check the presence of a given transaction, by its id;
-- * re-acquire the latest snapshot;
-- * await a change in the snapshot and acquire it;
-- * release and go back to the 'StIdle' state;
--
data ServerStAcquired txid tx slot m a = ServerStAcquired
{ recvMsgNextTx :: m (ServerStBusy NextTx txid tx slot m a)
, recvMsgHasTx :: txid -> m (ServerStBusy HasTx txid tx slot m a)
, recvMsgGetSizes :: m (ServerStBusy GetSizes txid tx slot m a)
, recvMsgReAcquire :: m (ServerStAcquiring txid tx slot m a)
, recvMsgRelease :: m (ServerStIdle txid tx slot m a)
{ recvMsgNextTx :: m (ServerStBusy NextTx txid tx slot m a)
, recvMsgHasTx :: txid -> m (ServerStBusy HasTx txid tx slot m a)
, recvMsgGetSizes :: m (ServerStBusy GetSizes txid tx slot m a)
, recvMsgAwaitAcquire :: m (ServerStAcquiring txid tx slot m a)
, recvMsgRelease :: m (ServerStIdle txid tx slot m a)
}

-- In the 'StBusy' protocol state, the server has agency and is responding to
Expand Down Expand Up @@ -134,7 +134,7 @@ localTxMonitorServerPeer (LocalTxMonitorServer mServer) =
{ recvMsgNextTx
, recvMsgHasTx
, recvMsgGetSizes
, recvMsgReAcquire
, recvMsgAwaitAcquire
, recvMsgRelease
} -> Await (ClientAgency TokAcquired) $ \case
MsgNextTx ->
Expand All @@ -143,8 +143,8 @@ localTxMonitorServerPeer (LocalTxMonitorServer mServer) =
Effect $ handleHasTx <$> recvMsgHasTx txid
MsgGetSizes ->
Effect $ handleGetSizes <$> recvMsgGetSizes
MsgReAcquire ->
Effect $ handleStAcquiring <$> recvMsgReAcquire
MsgAwaitAcquire ->
Effect $ handleStAcquiring <$> recvMsgAwaitAcquire
MsgRelease ->
Effect $ handleStIdle <$> recvMsgRelease

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,31 +17,31 @@
-- The protocol is stateful such that the server keeps track of the transactions
-- already sent to the client.
--
--
-- START
--
-- ┌───────────────┐
-- ┌──────▶│ Idle │⇒ DONE
-- │ └───┬───────────┘
-- │ │
-- │ Acquire │
-- │ ▼
-- │ ┌───────────────┐
-- Release │ │ Acquiring │
-- │ └───┬───────────┘
-- │ │ ▲
-- │ Acquired │ │ ReAcquire
-- │ ▼ │
-- │ ┌───────────┴───┐
-- └───────┤ Acquired │
-- └───┬───────────┘
-- │ ▲
-- HasTx/NextTx/GetSizes │ │ Reply (HasTx/NextTx/GetSizes)
-- ▼ │
-- ┌───────────┴───┐
-- │ Busy │
-- └───────────────┘
--
-- @
-- START
--
-- ┌───────────────┐
-- ┌──────▶│ Idle │⇒ DONE
-- │ └───┬───────────┘
-- │ │
-- │ Acquire │
-- │ ▼
-- │ ┌───────────────┐
-- Release │ │ Acquiring │
-- │ └───┬───────────┘
-- │ │ ▲
-- │ Acquired │ │ AwaitAcquire
-- │ ▼ │
-- │ ┌───────────┴───┐
-- └───────┤ Acquired │
-- └───┬───────────┘
-- │ ▲
-- HasTx|NextTx|GetSizes │ │ Reply (HasTx|NextTx|GetSizes)
-- ▼ │
-- ┌───────────┴───┐
-- │ Busy │
-- └───────────────┘
-- @
module Ouroboros.Network.Protocol.LocalTxMonitor.Type where


Expand Down Expand Up @@ -151,10 +151,13 @@ instance Protocol (LocalTxMonitor txid tx slot) where
:: slot
-> Message (LocalTxMonitor txid tx slot) StAcquiring StAcquired

-- | Like 'MsgAcquire', but when one is already acquired. Allows to renew
-- the snapshot's state.
-- | Like 'MsgAcquire' but await for a new snapshot different from the one
-- currently acquired.
--
-- There is no timeout; the caller will block until a new snapshot is
-- available.
--
MsgReAcquire
MsgAwaitAcquire
:: Message (LocalTxMonitor txid tx slot) StAcquired StAcquiring

-- | The client requests a single transaction and waits a reply.
Expand Down
2 changes: 1 addition & 1 deletion ouroboros-network/test-cddl/specs/local-tx-monitor.cddl
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ msgDone = [0]
msgAcquire = [1]
msgAcquired = [2, slotNo]

msgReAcquire = msgAcquire
msgAwaitAcquire = msgAcquire
msgRelease = [3]
msgNextTx = [5]
msgReplyNextTx = [6] / [6, transaction]
Expand Down

0 comments on commit bde8158

Please sign in to comment.