Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make it possible to guarantee that final DATA frame is marked end-of-stream #2

Merged
merged 7 commits into from
Jun 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 15 additions & 1 deletion Network/HTTP/Semantics/Client.hs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@ module Network.HTTP.Semantics.Client (
requestStreamingUnmask,
requestBuilder,

-- ** Generalized streaming interface
OutBodyIface(..),
requestStreamingIface,

-- ** Trailers maker
TrailersMaker,
NextTrailersMaker (..),
Expand Down Expand Up @@ -106,7 +110,17 @@ requestStreamingUnmask
-> RequestHeaders
-> ((forall x. IO x -> IO x) -> (Builder -> IO ()) -> IO () -> IO ())
-> Request
requestStreamingUnmask m p hdr strmbdy = Request $ OutObj hdr' (OutBodyStreamingUnmask strmbdy) defaultTrailersMaker
requestStreamingUnmask m p hdr strmbdy = requestStreamingIface m p hdr $ \iface ->
strmbdy (outBodyUnmask iface) (outBodyPush iface) (outBodyFlush iface)

-- | Generalized version of 'requestStreaming',
requestStreamingIface
:: Method
-> Path
-> RequestHeaders
-> (OutBodyIface -> IO ())
-> Request
requestStreamingIface m p hdr strmbdy = Request $ OutObj hdr' (OutBodyStreamingUnmask strmbdy) defaultTrailersMaker
where
hdr' = addHeaders m p hdr

Expand Down
218 changes: 98 additions & 120 deletions Network/HTTP/Semantics/FillBuf.hs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ module Network.HTTP.Semantics.FillBuf (
DynaNext,
BytesFilled,
StreamingChunk (..),
CleanupStream,
fillBuilderBodyGetNext,
fillFileBodyGetNext,
fillStreamBodyGetNext,
Expand All @@ -27,7 +28,7 @@ import Network.HTTP.Semantics.Client
----------------------------------------------------------------

-- type DynaNext = Buffer -> BufferSize -> WindowSize -> IO Next
type DynaNext = Buffer -> BufferSize -> Int -> IO Next
type DynaNext = Buffer -> Int -> IO Next

type BytesFilled = Int

Expand All @@ -39,159 +40,136 @@ data Next

----------------------------------------------------------------

data Leftover
= LZero
| LOne B.BufferWriter
| LTwo ByteString B.BufferWriter

----------------------------------------------------------------

data StreamingChunk
= StreamingFinished (IO ())
| StreamingFlush
| StreamingBuilder Builder
= -- | Indicate that the stream is finished
StreamingFinished CleanupStream
| -- | Flush the stream
--
-- This will cause the write buffer to be written to the network socket,
-- without waiting for more data.
StreamingFlush
| -- | Construct a DATA frame, optionally terminating the stream
--
-- The optional 'CleanupStream' argument can be used to ensure that the
-- final DATA frame in the stream is marked as end-of-stream, as opposed
-- to using a separate, /empty/, data frame with this flag set.
StreamingBuilder Builder (Maybe CleanupStream)

-- | Action to run prior to terminating the stream
type CleanupStream = IO ()

----------------------------------------------------------------

fillBuilderBodyGetNext :: Builder -> DynaNext
fillBuilderBodyGetNext bb buf siz lim = do
let room = min siz lim
fillBuilderBodyGetNext bb buf room = do
(len, signal) <- B.runBuilder bb buf room
return $ nextForBuilder len signal

fillFileBodyGetNext
:: PositionRead -> FileOffset -> ByteCount -> IO () -> DynaNext
fillFileBodyGetNext pread start bytecount refresh buf siz lim = do
let room = min siz lim
fillFileBodyGetNext pread start bytecount refresh buf room = do
len <- pread start (mini room bytecount) buf
let len' = fromIntegral len
return $ nextForFile len' pread (start + len) (bytecount - len) refresh

fillStreamBodyGetNext :: IO (Maybe StreamingChunk) -> DynaNext
fillStreamBodyGetNext takeQ buf siz lim = do
let room = min siz lim
(cont, len, reqflush, leftover) <- runStreamBuilder buf room takeQ
return $ nextForStream cont len reqflush leftover takeQ
fillStreamBodyGetNext takeQ = loop 0
where
loop :: NextWithTotal
loop total buf room = do
mChunk <- takeQ
case mChunk of
Just chunk -> runStreamingChunk chunk loop total buf room
Nothing -> return $ Next total False (Just $ loop 0)

----------------------------------------------------------------

fillBufBuilder :: Leftover -> DynaNext
fillBufBuilder leftover buf0 siz0 lim = do
let room = min siz0 lim
case leftover of
LZero -> error "fillBufBuilder: LZero"
LOne writer -> do
(len, signal) <- writer buf0 room
getNext len signal
LTwo bs writer
| BS.length bs <= room -> do
buf1 <- copy buf0 bs
let len1 = BS.length bs
(len2, signal) <- writer buf1 (room - len1)
getNext (len1 + len2) signal
| otherwise -> do
let (bs1, bs2) = BS.splitAt room bs
void $ copy buf0 bs1
getNext room (B.Chunk bs2 writer)
where
getNext l s = return $ nextForBuilder l s
fillBufBuilderOne :: B.BufferWriter -> DynaNext
fillBufBuilderOne writer buf0 room = do
(len, signal) <- writer buf0 room
return $ nextForBuilder len signal

fillBufBuilderTwo :: ByteString -> B.BufferWriter -> DynaNext
fillBufBuilderTwo bs writer buf0 room
| BS.length bs <= room = do
buf1 <- copy buf0 bs
let len1 = BS.length bs
(len2, signal) <- writer buf1 (room - len1)
return $ nextForBuilder (len1 + len2) signal
| otherwise = do
let (bs1, bs2) = BS.splitAt room bs
void $ copy buf0 bs1
return $ nextForBuilder room (B.Chunk bs2 writer)

nextForBuilder :: BytesFilled -> B.Next -> Next
nextForBuilder len B.Done =
Next len True Nothing -- let's flush
nextForBuilder len (B.More _ writer) =
Next len False $ Just (fillBufBuilder (LOne writer))
Next len False $ Just (fillBufBuilderOne writer)
nextForBuilder len (B.Chunk bs writer) =
Next len False $ Just (fillBufBuilder (LTwo bs writer))
Next len False $ Just (fillBufBuilderTwo bs writer)

----------------------------------------------------------------

runStreamBuilder
:: Buffer
-> BufferSize
-> IO (Maybe StreamingChunk)
-> IO
( Bool -- continue
, BytesFilled
, Bool -- require flusing
, Leftover
)
runStreamBuilder buf0 room0 takeQ = loop buf0 room0 0
-- | Like 'DynaNext', but with additional argument indicating total bytes written
type NextWithTotal = Int -> DynaNext

-- | Run the chunk, then continue as specified, unless streaming is finished
runStreamingChunk :: StreamingChunk -> NextWithTotal -> NextWithTotal
runStreamingChunk chunk next =
case chunk of
StreamingFinished dec -> finished dec
StreamingFlush -> flush
StreamingBuilder builder Nothing -> runStreamingBuilder builder next
StreamingBuilder builder (Just dec) -> runStreamingBuilder builder (finished dec)
where
loop buf room total = do
mbuilder <- takeQ
case mbuilder of
Nothing -> return (True, total, False, LZero)
Just (StreamingBuilder builder) -> do
(len, signal) <- B.runBuilder builder buf room
let total' = total + len
case signal of
B.Done -> loop (buf `plusPtr` len) (room - len) total'
B.More _ writer -> return (True, total', False, LOne writer)
B.Chunk bs writer -> return (True, total', False, LTwo bs writer)
Just StreamingFlush -> return (True, total, True, LZero)
Just (StreamingFinished dec) -> do
dec
return (False, total, True, LZero)

fillBufStream :: Leftover -> IO (Maybe StreamingChunk) -> DynaNext
fillBufStream leftover0 takeQ buf0 siz0 lim0 = do
let room0 = min siz0 lim0
case leftover0 of
LZero -> do
(cont, len, reqflush, leftover) <- runStreamBuilder buf0 room0 takeQ
getNext cont len reqflush leftover
LOne writer -> write writer buf0 room0 0
LTwo bs writer
| BS.length bs <= room0 -> do
buf1 <- copy buf0 bs
let len = BS.length bs
write writer buf1 (room0 - len) len
| otherwise -> do
let (bs1, bs2) = BS.splitAt room0 bs
void $ copy buf0 bs1
getNext True room0 False $ LTwo bs2 writer
finished :: CleanupStream -> NextWithTotal
finished dec = \total _buf _room -> do
dec
return $ Next total True Nothing

flush :: NextWithTotal
flush = \total _buf _room -> do
return $ Next total True (Just $ next 0)

-- | Run 'Builder' until completion, then continue as specified
runStreamingBuilder :: Builder -> NextWithTotal -> NextWithTotal
runStreamingBuilder builder next = \total buf room -> do
writeResult <- B.runBuilder builder buf room
ranWriter writeResult total buf room
where
getNext :: Bool -> BytesFilled -> Bool -> Leftover -> IO Next
getNext cont len reqflush l = return $ nextForStream cont len reqflush l takeQ

write
:: (Buffer -> BufferSize -> IO (Int, B.Next))
-> Buffer
-> BufferSize
-> Int
-> IO Next
write writer1 buf room sofar = do
(len, signal) <- writer1 buf room
ranWriter :: (Int, B.Next) -> NextWithTotal
ranWriter (len, signal) = \total buf room -> do
let total' = total + len
case signal of
B.Done -> do
(cont, extra, reqflush, leftover) <-
runStreamBuilder (buf `plusPtr` len) (room - len) takeQ
let total = sofar + len + extra
getNext cont total reqflush leftover
B.More _ writer -> do
let total = sofar + len
getNext True total False $ LOne writer
B.Chunk bs writer -> do
let total = sofar + len
getNext True total False $ LTwo bs writer

nextForStream
:: Bool
-> BytesFilled
-> Bool
-> Leftover
-> IO (Maybe StreamingChunk)
-> Next
nextForStream False len reqflush _ _ = Next len reqflush Nothing
nextForStream True len reqflush leftOrZero takeQ =
Next len reqflush $ Just (fillBufStream leftOrZero takeQ)
B.Done ->
next total' (buf `plusPtr` len) (room - len)
B.More _ writer ->
return $ Next total' False (Just $ goMore writer 0)
B.Chunk bs writer ->
return $ Next total' False (Just $ goChunk bs writer 0)

goMore :: B.BufferWriter -> NextWithTotal
goMore writer = \total buf room -> do
writeResult <- writer buf room
ranWriter writeResult total buf room

goChunk :: ByteString -> B.BufferWriter -> NextWithTotal
goChunk bs writer = \total buf room ->
if BS.length bs <= room
then do
buf' <- copy buf bs
let len = BS.length bs
goMore writer (total + len) buf' (room - len)
else do
let (bs1, bs2) = BS.splitAt room bs
void $ copy buf bs1
return $ Next (total + room) False (Just $ goChunk bs2 writer 0)

----------------------------------------------------------------

fillBufFile :: PositionRead -> FileOffset -> ByteCount -> IO () -> DynaNext
fillBufFile pread start bytes refresh buf siz lim = do
let room = min siz lim
fillBufFile pread start bytes refresh buf room = do
len <- pread start (mini room bytes) buf
refresh
let len' = fromIntegral len
Expand Down
19 changes: 17 additions & 2 deletions Network/HTTP/Semantics/Types.hs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ module Network.HTTP.Semantics.Types (
-- * Request/response as output
OutObj (..),
OutBody (..),
OutBodyIface (..),

-- * Trailers maker
TrailersMaker,
Expand Down Expand Up @@ -66,11 +67,25 @@ data OutBody
--
-- TODO: The analogous change for the server-side would be to provide a similar
-- @unmask@ callback as the first argument in the 'Server' type alias.
OutBodyStreamingUnmask
((forall x. IO x -> IO x) -> (Builder -> IO ()) -> IO () -> IO ())
OutBodyStreamingUnmask (OutBodyIface -> IO ())
| OutBodyBuilder Builder
| OutBodyFile FileSpec

data OutBodyIface = OutBodyIface
{ outBodyUnmask :: (forall x. IO x -> IO x)
-- ^ Unmask exceptions in the thread spawned for the request body
, outBodyPush :: Builder -> IO ()
-- ^ Push a new chunk
, outBodyPushFinal :: Builder -> IO ()
-- ^ Push the final chunk
--
-- Using this function instead of 'outBodyPush' can be used to guarantee that the final
-- HTTP2 DATA frame is marked end-of-stream; with 'outBodyPush' it may happen that
-- an additional empty DATA frame is used for this purpose.
, outBodyFlush :: IO ()
-- ^ Flush
}

-- | Input object
data InpObj = InpObj
{ inpObjHeaders :: TokenHeaderTable
Expand Down