Skip to content

Commit

Permalink
Merge pull request #95 from kazu-yamamoto/run-with-context
Browse files Browse the repository at this point in the history
Run with context
  • Loading branch information
kazu-yamamoto authored Nov 2, 2023
2 parents d966397 + 9ac9d3e commit 486f754
Show file tree
Hide file tree
Showing 5 changed files with 271 additions and 43 deletions.
7 changes: 7 additions & 0 deletions Network/HTTP2/Client/Internal.hs
Original file line number Diff line number Diff line change
@@ -1,6 +1,13 @@
module Network.HTTP2.Client.Internal (
Request (..),
Response (..),

-- * Low level
Stream (..),
ClientContext (..),
runWithContext,
) where

import Network.HTTP2.Arch
import Network.HTTP2.Client.Run
import Network.HTTP2.Client.Types
65 changes: 49 additions & 16 deletions Network/HTTP2/Client/Run.hs
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,12 @@ module Network.HTTP2.Client.Run where

import Control.Concurrent.STM (check)
import Control.Exception
import Data.ByteString.Builder (Builder)
import Network.Socket (SockAddr)
import UnliftIO.Async
import UnliftIO.Concurrent
import UnliftIO.STM

import Data.ByteString.Builder (Builder)
import Imports
import Network.HTTP2.Arch
import Network.HTTP2.Client.Types
Expand All @@ -28,22 +29,44 @@ data ClientConfig = ClientConfig

-- | Running HTTP/2 client.
run :: ClientConfig -> Config -> Client a -> IO a
run ClientConfig{..} conf@Config{..} client = do
clientInfo <- newClientInfo scheme authority cacheLimit
ctx <- newContext clientInfo confBufferSize confMySockAddr confPeerSockAddr
mgr <- start confTimeoutManager
let runReceiver = frameReceiver ctx conf
runSender = frameSender ctx conf mgr
runBackgroundThreads = concurrently_ runReceiver runSender
exchangeSettings conf ctx
run cconf@ClientConfig{..} conf client = do
(ctx, mgr) <- setup cconf conf
let runClient = do
x <- client $ sendRequest ctx mgr scheme authority
x <- client $ \req processRequest -> do
strm <- sendRequest ctx mgr scheme authority req
rsp <- Response <$> takeMVar (streamInput strm)
processRequest rsp
waitCounter0 mgr
let frame = goawayFrame 0 NoError "graceful closing"
mvar <- newMVar ()
enqueueControl (controlQ ctx) $ CGoaway frame mvar
takeMVar mvar
return x
runArch conf ctx mgr runClient

runWithContext :: ClientConfig -> Config -> (ClientContext -> IO (IO a)) -> IO a
runWithContext cconf@ClientConfig{..} conf@Config{..} action = do
(ctx@Context{..}, mgr) <- setup cconf conf
let putB bs = enqueueControl controlQ $ CFrames Nothing [bs]
putR = sendRequest ctx mgr scheme authority
get strm = Response <$> takeMVar (streamInput strm)
create = do
sid <- getMyNewStreamId ctx
openStream ctx sid FrameHeaders
runClient <-
action $ ClientContext confMySockAddr confPeerSockAddr putB putR get create
runArch conf ctx mgr runClient

setup :: ClientConfig -> Config -> IO (Context, Manager)
setup ClientConfig{..} conf@Config{..} = do
clientInfo <- newClientInfo scheme authority cacheLimit
ctx <- newContext clientInfo confBufferSize confMySockAddr confPeerSockAddr
mgr <- start confTimeoutManager
exchangeSettings conf ctx
return (ctx, mgr)

runArch :: Config -> Context -> Manager -> IO a -> IO a
runArch conf ctx mgr runClient =
stopAfter mgr (race runBackgroundThreads runClient) $ \res -> do
closeAllStreams (streamTable ctx) $ either Just (const Nothing) res
case res of
Expand All @@ -53,22 +76,25 @@ run ClientConfig{..} conf@Config{..} client = do
undefined -- never reach
Right (Right x) ->
return x
where
runReceiver = frameReceiver ctx conf
runSender = frameSender ctx conf mgr
runBackgroundThreads = concurrently_ runReceiver runSender

sendRequest
:: Context
-> Manager
-> Scheme
-> Authority
-> Request
-> (Response -> IO a)
-> IO a
sendRequest ctx@Context{..} mgr scheme auth (Request req) processResponse = do
-> IO Stream
sendRequest ctx@Context{..} mgr scheme auth (Request req) = do
-- Checking push promises
let hdr0 = outObjHeaders req
method = fromMaybe (error "sendRequest:method") $ lookup ":method" hdr0
path = fromMaybe (error "sendRequest:path") $ lookup ":path" hdr0
mstrm0 <- lookupCache method path roleInfo
strm <- case mstrm0 of
case mstrm0 of
Nothing -> do
-- Arch/Sender is originally implemented for servers where
-- the ordering of responses can be out-of-order.
Expand Down Expand Up @@ -101,8 +127,6 @@ sendRequest ctx@Context{..} mgr scheme auth (Request req) processResponse = do
writeTQueue outputQ $ Output newstrm req' OObj Nothing (return ())
return newstrm
Just strm0 -> return strm0
rsp <- takeMVar $ streamInput strm
processResponse $ Response rsp

sendStreaming
:: Context
Expand Down Expand Up @@ -135,3 +159,12 @@ exchangeSettings conf ctx@Context{..} = do
frames <- updateMySettings conf ctx
let setframe = CFrames Nothing (connectionPreface : frames)
enqueueControl controlQ setframe

data ClientContext = ClientContext
{ cctxMySockAddr :: SockAddr
, cctxPeerSockAddr :: SockAddr
, cctxWriteBytes :: ByteString -> IO ()
, cctxWriteRequest :: Request -> IO Stream
, cctxReadResponse :: Stream -> IO Response
, cctxCreateStream :: IO Stream
}
7 changes: 7 additions & 0 deletions Network/HTTP2/Server/Internal.hs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,13 @@ module Network.HTTP2.Server.Internal (
Request (..),
Response (..),
Aux (..),

-- * Low level
Stream,
ServerContext (..),
runWithContext,
) where

import Network.HTTP2.Arch
import Network.HTTP2.Server.Run
import Network.HTTP2.Server.Types
90 changes: 64 additions & 26 deletions Network/HTTP2/Server/Run.hs
Original file line number Diff line number Diff line change
Expand Up @@ -3,26 +3,24 @@

module Network.HTTP2.Server.Run where

import UnliftIO.Async (concurrently_)

import Control.Concurrent.STM
import Control.Exception
import Imports
import Network.HTTP2.Arch
import Network.HTTP2.Frame
import Network.HTTP2.Server.Types
import Network.HTTP2.Server.Worker
import Network.Socket (SockAddr)
import UnliftIO.Async (concurrently_)

----------------------------------------------------------------

-- | Running HTTP/2 server.
run :: Config -> Server -> IO ()
run conf@Config{..} server = do
ok <- checkPreface
run conf server = do
ok <- checkPreface conf
when ok $ do
serverInfo <- newServerInfo
ctx <- newContext serverInfo confBufferSize confMySockAddr confPeerSockAddr
-- Workers, worker manager and timer manager
mgr <- start confTimeoutManager
(ctx, mgr) <- setup conf
let wc = fromContext ctx
setAction mgr $ worker wc mgr server
-- The number of workers is 3.
Expand All @@ -31,24 +29,64 @@ run conf@Config{..} server = do
-- If it is large, huge memory is consumed and many
-- context switches happen.
replicateM_ 3 $ spawnAction mgr
let runReceiver = frameReceiver ctx conf
runSender = frameSender ctx conf mgr
runBackgroundThreads = concurrently_ runReceiver runSender
stopAfter mgr runBackgroundThreads $ \res -> do
closeAllStreams (streamTable ctx) $ either Just (const Nothing) res
case res of
Left err ->
throwIO err
Right x ->
return x
where
checkPreface = do
preface <- confReadN connectionPrefaceLength
if connectionPreface /= preface
then do
goaway conf ProtocolError "Preface mismatch"
return False
else return True
runArch conf ctx mgr

data ServerContext = ServerContext
{ sctxMySockAddr :: SockAddr
, sctxPeerSockAddr :: SockAddr
, sctxReadRequest :: IO (Stream, Request)
, sctxWriteResponse :: Stream -> Response -> IO ()
, sctxWriteBytes :: ByteString -> IO ()
}

runWithContext
:: Config
-> (ServerContext -> IO (IO ()))
-> IO ()
runWithContext conf@Config{..} action = do
ok <- checkPreface conf
when ok $ do
(ctx@Context{..}, mgr) <- setup conf
let ServerInfo{..} = toServerInfo roleInfo
get = do
Input strm inObj <- atomically $ readTQueue inputQ
return (strm, Request inObj)
putR strm (Response outObj) = do
let out = Output strm outObj OObj Nothing (return ())
enqueueOutput outputQ out
putB bs = enqueueControl controlQ $ CFrames Nothing [bs]
io <- action $ ServerContext confMySockAddr confPeerSockAddr get putR putB
concurrently_ io $ runArch conf ctx mgr

checkPreface :: Config -> IO Bool
checkPreface conf@Config{..} = do
preface <- confReadN connectionPrefaceLength
if connectionPreface /= preface
then do
goaway conf ProtocolError "Preface mismatch"
return False
else return True

setup :: Config -> IO (Context, Manager)
setup Config{..} = do
serverInfo <- newServerInfo
ctx <- newContext serverInfo confBufferSize confMySockAddr confPeerSockAddr
-- Workers, worker manager and timer manager
mgr <- start confTimeoutManager
return (ctx, mgr)

runArch :: Config -> Context -> Manager -> IO ()
runArch conf ctx mgr = do
let runReceiver = frameReceiver ctx conf
runSender = frameSender ctx conf mgr
runBackgroundThreads = concurrently_ runReceiver runSender
stopAfter mgr runBackgroundThreads $ \res -> do
closeAllStreams (streamTable ctx) $ either Just (const Nothing) res
case res of
Left err ->
throwIO err
Right x ->
return x

-- connClose must not be called here since Run:fork calls it
goaway :: Config -> ErrorCode -> ByteString -> IO ()
Expand Down
Loading

0 comments on commit 486f754

Please sign in to comment.