Skip to content

Commit 270aa07

Browse files
committed
Configurable limit on bytes in Received event
Since 'QDisc's have been added, it's now possible to control the rate of incoming events. However, it's not so easy to control the rate of incoming data, because one 'Received' event could include as many as 2^32 bytes. This patch gives a limit on the number of bytes in any single 'Received' event. The 'recvExact' function remains, using 'maxBound :: Word32' as the limit, which was the implicit limit anyway. Also made the maximum data payload and address lengths non-optional; the default is 'maxBound :: Word32'.
1 parent c342d0e commit 270aa07

File tree

3 files changed

+76
-38
lines changed

3 files changed

+76
-38
lines changed

src/Network/Transport/TCP.hs

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@ import Network.Transport.TCP.Internal
5656
, decodeConnectionRequestResponse
5757
, forkServer
5858
, recvWithLength
59+
, recvWithLengthFold
5960
, recvWord32
6061
, encodeWord32
6162
, tryCloseSocket
@@ -482,16 +483,18 @@ data TCPParameters = TCPParameters {
482483
, transportConnectTimeout :: Maybe Int
483484
-- | Create a QDisc for an EndPoint.
484485
, tcpNewQDisc :: forall t . IO (QDisc t)
485-
-- | Optional maximum length (in bytes) for a peer's address.
486+
-- | Maximum length (in bytes) for a peer's address.
486487
-- If a peer attempts to send an address of length exceeding the limit,
487488
-- the connection will be refused (socket will close).
488-
, tcpMaxAddressLength :: Maybe Word32
489-
-- | Optional maximum length (in bytes) to receive from a peer.
489+
, tcpMaxAddressLength :: Word32
490+
-- | Maximum length (in bytes) to receive from a peer.
490491
-- If a peer attempts to send data on a lightweight connection exceeding
491492
-- the limit, the heavyweight connection which carries that lightweight
492493
-- connection will go down. The peer and the local node will get an
493494
-- EventConnectionLost.
494-
, tcpMaxReceiveLength :: Maybe Word32
495+
, tcpMaxReceiveLength :: Word32
496+
-- | Maximum length (in bytes) of a 'Received' event payload.
497+
, tcpMaxChunkSize :: Word32
495498
}
496499

497500
-- | Internal functionality we expose for unit testing
@@ -598,8 +601,9 @@ defaultTCPParameters = TCPParameters {
598601
, tcpUserTimeout = Nothing
599602
, tcpNewQDisc = simpleUnboundedQDisc
600603
, transportConnectTimeout = Nothing
601-
, tcpMaxAddressLength = Nothing
602-
, tcpMaxReceiveLength = Nothing
604+
, tcpMaxAddressLength = maxBound
605+
, tcpMaxReceiveLength = maxBound
606+
, tcpMaxChunkSize = maxBound
603607
}
604608

605609
--------------------------------------------------------------------------------
@@ -1189,8 +1193,8 @@ handleIncomingMessages params (ourEndPoint, theirEndPoint) = do
11891193
-- overhead
11901194
readMessage :: N.Socket -> LightweightConnectionId -> IO ()
11911195
readMessage sock lcid =
1192-
recvWithLength recvLimit sock >>=
1193-
qdiscEnqueue' ourQueue theirAddr . Received (connId lcid)
1196+
recvWithLengthFold recvLimit chunkLimit sock () $ \bs _ ->
1197+
qdiscEnqueue' ourQueue theirAddr (Received (connId lcid) bs)
11941198

11951199
-- Stop probing a connection as a result of receiving a probe ack.
11961200
stopProbing :: IO ()
@@ -1206,6 +1210,7 @@ handleIncomingMessages params (ourEndPoint, theirEndPoint) = do
12061210
theirState = remoteState theirEndPoint
12071211
theirAddr = remoteAddress theirEndPoint
12081212
recvLimit = tcpMaxReceiveLength params
1213+
chunkLimit = tcpMaxChunkSize params
12091214

12101215
-- Deal with a premature exit
12111216
prematureExit :: N.Socket -> IOException -> IO ()

src/Network/Transport/TCP/Internal.hs

Lines changed: 53 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
{-# LANGUAGE BangPatterns #-}
2+
13
-- | Utility functions for TCP sockets
24
module Network.Transport.TCP.Internal
35
( ControlHeader(..)
@@ -8,6 +10,7 @@ module Network.Transport.TCP.Internal
810
, decodeConnectionRequestResponse
911
, forkServer
1012
, recvWithLength
13+
, recvWithLengthFold
1114
, recvExact
1215
, recvWord32
1316
, encodeWord32
@@ -59,7 +62,7 @@ import qualified Network.Socket.ByteString as NBS (recv)
5962
import Control.Concurrent (ThreadId)
6063
import Data.Word (Word32)
6164

62-
import Control.Monad (forever, when)
65+
import Control.Monad (forever, when, unless)
6366
import Control.Exception (SomeException, catch, bracketOnError, throwIO, mask_)
6467
import Control.Applicative ((<$>), (<*>))
6568
import Data.Word (Word32)
@@ -180,20 +183,44 @@ forkServer host port backlog reuseAddr terminationHandler requestHandler = do
180183
(tryCloseSocket . fst)
181184
(requestHandler . fst)
182185

183-
-- | Read a length and then a payload of that length, subject to an optional
184-
-- limit on the length.
185-
recvWithLength :: Maybe Word32 -> N.Socket -> IO [ByteString]
186-
recvWithLength mlimit sock = case mlimit of
187-
Nothing -> recvWord32 sock >>= recvExact sock
188-
Just limit -> do
189-
length <- recvWord32 sock
190-
when (length > limit) $
191-
throwIO (userError "recvWithLength: limit exceeded")
192-
recvExact sock length
186+
-- | Read a length, then 1 or more payloads each less than some maximum
187+
-- length in bytes, such that the sum of their lengths is the length that was
188+
-- read.
189+
recvWithLengthFold
190+
:: Word32 -- ^ Maximum total size.
191+
-> Word32 -- ^ Maximum chunk size.
192+
-> N.Socket
193+
-> t -- ^ Start element for the fold.
194+
-> ([ByteString] -> t -> IO t) -- ^ Run this every time we get data of at
195+
-- most the maximum size.
196+
-> IO t
197+
recvWithLengthFold maxSize maxChunk sock base folder = do
198+
len <- recvWord32 sock
199+
when (len > maxSize) $
200+
throwIO (userError "recvWithLengthFold: limit exceeded")
201+
loop base len
202+
where
203+
loop !base !total = do
204+
(bs, received) <- recvExact sock (min maxChunk total)
205+
base' <- folder bs base
206+
let remaining = total - received
207+
when (received > total) $ throwIO (userError "recvWithLengthFold: got more bytes than requested")
208+
if remaining == 0
209+
then return base'
210+
else loop base' remaining
211+
212+
-- | Read a length and then a payload of that length
213+
recvWithLength
214+
:: Word32 -- ^ Maximum total size.
215+
-> N.Socket
216+
-> IO [ByteString]
217+
recvWithLength maxSize sock = fmap (concat . reverse) $
218+
recvWithLengthFold maxSize maxBound sock [] $
219+
\bs lst -> return (bs : lst)
193220

194221
-- | Receive a 32-bit unsigned integer
195222
recvWord32 :: N.Socket -> IO Word32
196-
recvWord32 = fmap (decodeWord32 . BS.concat) . flip recvExact 4
223+
recvWord32 = fmap (decodeWord32 . BS.concat . fst) . flip recvExact 4
197224

198225
-- | Close a socket, ignoring I/O exceptions.
199226
tryCloseSocket :: N.Socket -> IO ()
@@ -204,16 +231,22 @@ tryCloseSocket sock = void . tryIO $
204231
--
205232
-- Throws an I/O exception if the socket closes before the specified
206233
-- number of bytes could be read
207-
recvExact :: N.Socket -- ^ Socket to read from
208-
-> Word32 -- ^ Number of bytes to read
209-
-> IO [ByteString]
234+
recvExact :: N.Socket -- ^ Socket to read from
235+
-> Word32 -- ^ Number of bytes to read
236+
-> IO ([ByteString], Word32) -- ^ Data and number of bytes read
210237
recvExact _ len | len < 0 = throwIO (userError "recvExact: Negative length")
211-
recvExact sock len = go [] len
238+
recvExact sock len = go [] 0 len
212239
where
213-
go :: [ByteString] -> Word32 -> IO [ByteString]
214-
go acc 0 = return (reverse acc)
215-
go acc l = do
240+
go :: [ByteString] -> Word32 -> Word32 -> IO ([ByteString], Word32)
241+
go acc !n 0 = return (reverse acc, n)
242+
go acc !n l = do
216243
bs <- NBS.recv sock (fromIntegral l `min` smallChunkSize)
217244
if BS.null bs
218245
then throwIO (userError "recvExact: Socket closed")
219-
else go (bs : acc) (l - fromIntegral (BS.length bs))
246+
else do
247+
let received = fromIntegral (BS.length bs)
248+
remaining = l - received
249+
total = n + received
250+
-- Check for underflow. Shouldn't be possible but let's make sure.
251+
when (received > l) $ throwIO (userError "recvExact: got more bytes than requested")
252+
go (bs : acc) total remaining

tests/TestTCP.hs

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -165,7 +165,7 @@ testEarlyDisconnect = do
165165
(clientPort, _) <- forkServer "127.0.0.1" "0" 5 True throwIO $ \sock -> do
166166
-- Initial setup
167167
0 <- recvWord32 sock
168-
_ <- recvWithLength Nothing sock
168+
_ <- recvWithLength maxBound sock
169169
sendMany sock [encodeWord32 (encodeConnectionRequestResponse ConnectionRequestAccepted)]
170170

171171
-- Server opens a logical connection
@@ -174,7 +174,7 @@ testEarlyDisconnect = do
174174

175175
-- Server sends a message
176176
1024 <- recvWord32 sock
177-
["ping"] <- recvWithLength Nothing sock
177+
["ping"] <- recvWithLength maxBound sock
178178

179179
-- Reply
180180
sendMany sock [
@@ -277,7 +277,7 @@ testEarlyCloseSocket = do
277277
(clientPort, _) <- forkServer "127.0.0.1" "0" 5 True throwIO $ \sock -> do
278278
-- Initial setup
279279
0 <- recvWord32 sock
280-
_ <- recvWithLength Nothing sock
280+
_ <- recvWithLength maxBound sock
281281
sendMany sock [encodeWord32 (encodeConnectionRequestResponse ConnectionRequestAccepted)]
282282

283283
-- Server opens a logical connection
@@ -286,7 +286,7 @@ testEarlyCloseSocket = do
286286

287287
-- Server sends a message
288288
1024 <- recvWord32 sock
289-
["ping"] <- recvWithLength Nothing sock
289+
["ping"] <- recvWithLength maxBound sock
290290

291291
-- Reply
292292
sendMany sock [
@@ -620,7 +620,7 @@ testReconnect = do
620620
(serverPort, _) <- forkServer "127.0.0.1" "0" 5 True throwIO $ \sock -> do
621621
-- Accept the connection
622622
Right 0 <- tryIO $ recvWord32 sock
623-
Right _ <- tryIO $ recvWithLength Nothing sock
623+
Right _ <- tryIO $ recvWithLength maxBound sock
624624

625625
-- The first time we close the socket before accepting the logical connection
626626
count <- modifyMVar counter $ \i -> return (i + 1, i)
@@ -639,7 +639,7 @@ testReconnect = do
639639
-- Client sends a message
640640
Right connId' <- tryIO $ (recvWord32 sock :: IO LightweightConnectionId)
641641
True <- return $ connId == connId'
642-
Right ["ping"] <- tryIO $ recvWithLength Nothing sock
642+
Right ["ping"] <- tryIO $ recvWithLength maxBound sock
643643
putMVar serverDone ()
644644

645645
Right () <- tryIO $ N.sClose sock
@@ -712,15 +712,15 @@ testUnidirectionalError = do
712712
-- would shutdown the socket in the other direction)
713713
void . (try :: IO () -> IO (Either SomeException ())) $ do
714714
0 <- recvWord32 sock
715-
_ <- recvWithLength Nothing sock
715+
_ <- recvWithLength maxBound sock
716716
() <- sendMany sock [encodeWord32 (encodeConnectionRequestResponse ConnectionRequestAccepted)]
717717

718718
Just CreatedNewConnection <- decodeControlHeader <$> recvWord32 sock
719719
connId <- recvWord32 sock :: IO LightweightConnectionId
720720

721721
connId' <- recvWord32 sock :: IO LightweightConnectionId
722722
True <- return $ connId == connId'
723-
["ping"] <- recvWithLength Nothing sock
723+
["ping"] <- recvWithLength maxBound sock
724724
putMVar serverGotPing ()
725725

726726
-- Client
@@ -843,8 +843,8 @@ testMaxLength = do
843843
-- Port is at most 5 bytes (65536) and id is a base-10 Word32 so
844844
-- at most 10 bytes. We'll have one client with a 5-byte port to push it
845845
-- over the chosen limit of 16
846-
tcpMaxAddressLength = Just 16
847-
, tcpMaxReceiveLength = Just 8
846+
tcpMaxAddressLength = 16
847+
, tcpMaxReceiveLength = 8
848848
}
849849
Right goodClientTransport <- createTransport "127.0.0.1" "9999" ((,) "127.0.0.1") defaultTCPParameters
850850
Right badClientTransport <- createTransport "127.0.0.1" "10000" ((,) "127.0.0.1") defaultTCPParameters

0 commit comments

Comments
 (0)