-
Notifications
You must be signed in to change notification settings - Fork 87
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Mux Improvements #5093
base: main
Are you sure you want to change the base?
Mux Improvements #5093
Conversation
Lower level benchmark for reading and writing over Sockets. Uses local TCP sockets over ephemeral ports.
Enforce at least 1 byte payload data per SDU.
Define msHeaderLength and use it instead of 8 as hardcoded headerlength.
Permit sending of multiple SDUs through a single call to sendMany for Socket bearers Bearers without vector IO support emulate it through multiple calls to write.
c094ef5
to
aa52d92
Compare
This implements an optional read buffer for the Socket bearer. If provided with a read buffer the socket bearer will read as much data available on the socket that can fit in the read buffer. Subsequent read requests will first be served data from the buffer.
Poll the egress queue at most 1000 times per second. This gives us an oppertunity to pack multiple messages into larger SDUs, and write those SDUs with a single sendMany call.
Use a Builder for ingress. Messages shorter than 128 bytes will be copied.
Use RecvLowWater to signal that we want to read at least the length of the SDU.
aa52d92
to
a274cc2
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I did the first pass, the PR looks nice.
doRead :: Int64 -> ByteChannel IO -> Int64 -> IO () | ||
doRead maxData _ cnt | cnt >= maxData = return () | ||
doRead maxData chan !cnt = do |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don't need to pass the maxData
, since sndSize
is in scope:
doRead :: ByteChannel IO -> Int64 -> IO ()
doRead _ cnt | cnt >= totalPayloadLen sndSize = return ()
doRead chan !cnt = do
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
network-mux/test/Test/Mux.hs
Outdated
realLen <- choose (0, 8) -- Size of mux header is 8 | ||
len <- if realLen == 8 then return 0 | ||
else arbitrary |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you explain it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We are generating an SDU with an invalid length. With the change in this commit an SDU with a zero size payload will throw an exception. The change updates the generator to cover this case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you add a comment in the code?
buf <- Win32.Async.recv sd (fromIntegral l) | ||
buf <- Win32.Async.recv sd (fromIntegral len) | ||
#else | ||
buf <- Socket.recv sd l | ||
buf <- (case readBuffer_m of | ||
Nothing -> Socket.recv sd len | ||
Just readBuffer -> recvBuf readBuffer len | ||
) | ||
#endif | ||
`catch` Mx.handleIOException "recv errored" | ||
if BL.null buf | ||
then do | ||
when waitingOnNxtHeader $ | ||
{- This may not be an error, but could be an orderly shutdown. | ||
- We wait 1 seconds to give the mux protocols time to perform | ||
- a clean up and exit. | ||
-} | ||
threadDelay 1 | ||
throwIO $ Mx.BearerClosed (show sd ++ | ||
" closed when reading data, waiting on next header " ++ | ||
show waitingOnNxtHeader) | ||
else do | ||
traceWith tracer $ Mx.TraceRecvEnd (fromIntegral $ BL.length buf) | ||
return buf | ||
`catch` Mx.handleIOException "recv errored" | ||
if BL.null buf | ||
then do | ||
when waitingOnNxtHeader $ | ||
{- This may not be an error, but could be an orderly shutdown. | ||
- We wait 1 seconds to give the mux protocols time to perform | ||
- a clean up and exit. | ||
-} | ||
threadDelay 1 | ||
throwIO $ Mx.BearerClosed (show sd ++ | ||
" closed when reading data, waiting on next header " ++ | ||
show waitingOnNxtHeader) | ||
else return buf |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if we keep the indentation in the whole where
clause, then the diff will be much simpler.
@@ -406,10 +406,12 @@ prop_mux_snd_recv_bi (DummyRun messages) = ioProperty $ do | |||
(-1) | |||
clientTracer | |||
QueueChannel { writeQueue = client_w, readQueue = client_r } | |||
Nothing |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we have tests that cover the ReadBuffer
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Currently only prop_mux_close_io
, . @crocodile-dentist is working on improving test coverage for Socket bearers and I will make sure thay they cover ReadBuffer
too.
@@ -174,7 +175,7 @@ data Status | |||
-- Mux internal types | |||
-- | |||
|
|||
type IngressQueue m = StrictTVar m BL.ByteString | |||
type IngressQueue m = StrictTVar m (Int64, Builder) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need to be careful when writing data, (,)
are lazy...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
let len' = len + BL.length (msBlob sdu) | ||
if len' <= fromIntegral qMax | ||
then do | ||
let buf' = if len == 0 | ||
then -- Don't copy the payload if the queue was empty | ||
lazyByteStringInsert $ msBlob sdu | ||
else -- Copy payloads smaller than 128 bytes | ||
buf <> (lazyByteStringThreshold 128 $ msBlob sdu) | ||
writeTVar q $ (len', buf') |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's force len'
and buf'
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
Co-authored-by: coot <coot@coot.me>
Co-authored-by: coot <coot@coot.me>
Just (TLSRDemand mpc md d) -> do | ||
sdu <- processSingleWanton egressQueue sduSize mpc md d | ||
buildBatch (sdu:sdus) (sdusLength + sduLength sdu) | ||
Nothing -> return sdus |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would reverse the sdus here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You or Niel gave that feedback previously, but this is just one out of 3 cases. I could either do it where I do it now or I will have to do it in 3 different places.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What about reversing when we return the value:
buildBatch sdus _ | length sdus >= maxSDUsPerBatch = return $ reverse sdus
buildBatch sdus sdusLength | sdusLength >= batchSize = return $ reverse sdus
buildBatch sdus !sdusLength = do
demand_m <- atomically $ tryReadTBQueue egressQueue
case demand_m of
Just (TLSRDemand mpc md d) -> do
sdu <- processSingleWanton egressQueue sduSize mpc md d
buildBatch (sdu:sdus) (sdusLength + sduLength sdu)
Nothing -> return sdus
Description
This PR consist of a series of commits aimed to make Mux more efficient.
Mainly this is done by attempting to read and write larger chunks of data.
Checklist
Quality
Maintenance
ouroboros-network
project.