Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Accumulated changes #43

wants to merge 1 commit into
base: master
Choose a base branch
Show file tree
Hide file tree
Changes from all commits
File filter

Filter by extension

Filter by extension

Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
160 changes: 107 additions & 53 deletions Data/Pool.hs
Original file line number Diff line number Diff line change
@@ -1,9 +1,5 @@
{-# LANGUAGE CPP, NamedFieldPuns, RecordWildCards, ScopedTypeVariables, RankNTypes, DeriveDataTypeable #-}

#if MIN_VERSION_monad_control(0,3,0)
{-# LANGUAGE FlexibleContexts #-}

#if !MIN_VERSION_base(4,3,0)
{-# LANGUAGE RankNTypes #-}
Expand Down Expand Up @@ -31,6 +27,8 @@ module Data.Pool
Pool(idleTime, maxResources, numStripes)
, LocalPool
, Stats(..)
, PoolStats(..)
, createPool
, withResource
, takeResource
Expand All @@ -39,39 +37,23 @@ module Data.Pool
, destroyResource
, putResource
, destroyAllResources
, stats
) where

import Control.Applicative ((<$>))
import Control.Concurrent (ThreadId, forkIOWithUnmask, killThread, myThreadId, threadDelay)
import Control.Concurrent.STM
import Control.Exception (SomeException, onException, mask_)
import Control.Monad (forM_, forever, join, liftM3, unless, when)
import Control.Monad (forM_, forever, join, liftM5, unless, when)
import Data.Hashable (hash)
import Data.IORef (IORef, newIORef, mkWeakIORef)
import Data.List (partition)
import Data.Pool.WaiterQueue (WaiterQueue, newQueueIO, push, pop)
import Data.Time.Clock (NominalDiffTime, UTCTime, diffUTCTime, getCurrentTime)
import Data.Typeable (Typeable)
import GHC.Conc.Sync (labelThread)
import qualified Control.Exception as E
import qualified Data.Vector as V

#if MIN_VERSION_monad_control(0,3,0)
import Control.Monad.Trans.Control (MonadBaseControl, control)
import Control.Monad.Base (liftBase)
import Control.Monad.IO.Control (MonadControlIO, controlIO)
import Control.Monad.IO.Class (liftIO)
#define control controlIO
#define liftBase liftIO

#if MIN_VERSION_base(4,3,0)
import Control.Exception (mask)
-- Don't do any async exception protection for older GHCs.
mask :: ((forall a. IO a -> IO a) -> IO b) -> IO b
mask f = f id
import UnliftIO (MonadUnliftIO, mask, withRunInIO)

-- | A single resource pool entry.
data Entry a = Entry {
Expand All @@ -80,12 +62,45 @@ data Entry a = Entry {
-- ^ Time of last return.

-- | Stats for a single 'LocalPool'.
data PoolStats = PoolStats {
highwaterUsage :: Int
-- ^ Highest usage since last reset.
, currentUsage :: Int
-- ^ Current number of items.
, takes :: Int
-- ^ Number of takes since last reset.
, creates :: Int
-- ^ Number of creates since last reset.
, createFailures :: Int
-- ^ Number of creation failures since last reset.
} deriving (Show)

-- | Pool-wide stats.
data Stats = Stats {
perStripe :: V.Vector PoolStats
-- ^ Stats per 'LocalPool' (stripe).
, poolStats :: PoolStats
-- ^ Aggregate stats across pool.
} deriving (Show)

-- | A single striped pool.
data LocalPool a = LocalPool {
inUse :: TVar Int
-- ^ Count of open entries (both idle and in use).
, entries :: TVar [Entry a]
-- ^ Idle entries.
, highwaterVar :: TVar Int
-- ^ Highest value of 'inUse' since last reset.
, takeVar :: TVar Int
-- ^ Number of takes since last reset.
, createVar :: TVar Int
-- ^ Number of creates since last reset.
, createFailureVar :: TVar Int
-- ^ Number of create failures since last reset.
, waiters :: WaiterQueue (TMVar (Maybe (Entry a)))
-- ^ threads waiting for a resource
, lfin :: IORef ()
-- ^ empty value used to attach a finalizer to (internal)
} deriving (Typeable)
Expand Down Expand Up @@ -159,7 +174,7 @@ createPool create destroy numStripes idleTime maxResources = do
when (maxResources < 1) $
modError "pool " $ "invalid maximum resource count " ++ show maxResources
localPools <- V.replicateM numStripes $
liftM3 LocalPool (newTVarIO 0) (newTVarIO []) (newIORef ())
LocalPool <$> newTVarIO 0 <*> newTVarIO [] <*> newTVarIO 0 <*> newTVarIO 0 <*> newTVarIO 0 <*> newTVarIO 0 <*> newQueueIO <*> newIORef ()
reaperId <- forkIOLabeledWithUnmask "resource-pool: reaper" $ \unmask ->
unmask $ reaper destroy idleTime localPools
fin <- newIORef ()
Expand Down Expand Up @@ -247,15 +262,9 @@ purgeLocalPool destroy LocalPool{..} = do
-- destroy a pooled resource, as doing so will almost certainly cause
-- a subsequent user (who expects the resource to be valid) to throw
-- an exception.
withResource ::
#if MIN_VERSION_monad_control(0,3,0)
(MonadBaseControl IO m)
(MonadControlIO m)
=> Pool a -> (a -> m b) -> m b
withResource :: MonadUnliftIO m => Pool a -> (a -> m b) -> m b
{-# SPECIALIZE withResource :: Pool a -> (a -> IO b) -> IO b #-}
withResource pool act = control $ \runInIO -> mask $ \restore -> do
withResource pool act = withRunInIO $ \runInIO -> mask $ \restore -> do
(resource, local) <- takeResource pool
ret <- restore (runInIO (act resource)) `onException`
destroyResource pool local resource
Expand All @@ -275,16 +284,35 @@ withResource pool act = control $ \runInIO -> mask $ \restore -> do
takeResource :: Pool a -> IO (a, LocalPool a)
takeResource pool@Pool{..} = do
local@LocalPool{..} <- getLocalPool pool
resource <- liftBase . join . atomically $ do
resource <- join . atomically $ do
modifyTVar_ takeVar (+ 1)
ents <- readTVar entries
case ents of
(Entry{..}:es) -> writeTVar entries es >> return (return entry)
[] -> do
used <- readTVar inUse
when (used == maxResources) retry
writeTVar inUse $! used + 1
return $
create `onException` atomically (modifyTVar_ inUse (subtract 1))
case used == maxResources of
False -> do
writeTVar inUse $! used + 1
modifyTVar_ highwaterVar (`max` (used + 1))
modifyTVar_ createVar (+ 1)
return $
create `onException` atomically (modifyTVar_ createFailureVar (+ 1) >> destroyResourceSTM local)
True -> do
var <- newEmptyTMVar
removeSelf <- push waiters var
let getResource x = case x of
Just y -> pure (entry y)
Nothing -> create `onException` atomically (destroyResourceSTM local)
let dequeue = do
maybeEntry <- atomically $ do
tryTakeTMVar var
atomically $ case maybeEntry of
Nothing -> pure ()
Just Nothing -> destroyResourceSTM local
Just (Just v) -> putResourceSTM local v
return (getResource =<< atomically (takeTMVar var) `onException` dequeue)
return (resource, local)
#if __GLASGOW_HASKELL__ >= 700
{-# INLINABLE takeResource #-}
Expand All @@ -295,14 +323,8 @@ takeResource pool@Pool{..} = do
-- returns immediately with 'Nothing' (ie. the action function is /not/ called).
-- Conversely, if a resource can be borrowed from the pool without blocking, the
-- action is performed and it's result is returned, wrapped in a 'Just'.
tryWithResource :: forall m a b.
#if MIN_VERSION_monad_control(0,3,0)
(MonadBaseControl IO m)
(MonadControlIO m)
=> Pool a -> (a -> m b) -> m (Maybe b)
tryWithResource pool act = control $ \runInIO -> mask $ \restore -> do
tryWithResource :: forall m a b. MonadUnliftIO m => Pool a -> (a -> m b) -> m (Maybe b)
tryWithResource pool act = withRunInIO $ \runInIO -> mask $ \restore -> do
res <- tryTakeResource pool
case res of
Just (resource, local) -> do
Expand All @@ -321,7 +343,7 @@ tryWithResource pool act = control $ \runInIO -> mask $ \restore -> do
tryTakeResource :: Pool a -> IO (Maybe (a, LocalPool a))
tryTakeResource pool@Pool{..} = do
local@LocalPool{..} <- getLocalPool pool
resource <- liftBase . join . atomically $ do
resource <- join . atomically $ do
ents <- readTVar entries
case ents of
(Entry{..}:es) -> writeTVar entries es >> return (return . Just $ entry)
Expand All @@ -332,7 +354,7 @@ tryTakeResource pool@Pool{..} = do
else do
writeTVar inUse $! used + 1
return $ Just <$>
create `onException` atomically (modifyTVar_ inUse (subtract 1))
create `onException` atomically (destroyResourceSTM local)
return $ (flip (,) local) <$> resource
#if __GLASGOW_HASKELL__ >= 700
{-# INLINABLE tryTakeResource #-}
Expand All @@ -343,7 +365,7 @@ tryTakeResource pool@Pool{..} = do
-- Internal, just to not repeat code for 'takeResource' and 'tryTakeResource'
getLocalPool :: Pool a -> IO (LocalPool a)
getLocalPool Pool{..} = do
i <- liftBase $ ((`mod` numStripes) . hash) <$> myThreadId
i <- ((`mod` numStripes) . hash) <$> myThreadId
return $ localPools V.! i
#if __GLASGOW_HASKELL__ >= 700
{-# INLINABLE getLocalPool #-}
Expand All @@ -352,22 +374,38 @@ getLocalPool Pool{..} = do
-- | Destroy a resource. Note that this will ignore any exceptions in the
-- destroy function.
destroyResource :: Pool a -> LocalPool a -> a -> IO ()
destroyResource Pool{..} LocalPool{..} resource = do
destroyResource Pool{..} local resource = do
destroy resource `E.catch` \(_::SomeException) -> return ()
atomically (modifyTVar_ inUse (subtract 1))
atomically (destroyResourceSTM local)
#if __GLASGOW_HASKELL__ >= 700
{-# INLINABLE destroyResource #-}

-- | Return a resource to the given 'LocalPool'.
putResource :: LocalPool a -> a -> IO ()
putResource LocalPool{..} resource = do
putResource lp resource = do
now <- getCurrentTime
atomically $ modifyTVar_ entries (Entry resource now:)
atomically $ putResourceSTM lp (Entry resource now)
#if __GLASGOW_HASKELL__ >= 700
{-# INLINABLE putResource #-}

putResourceSTM :: LocalPool a -> Entry a -> STM ()
putResourceSTM LocalPool{..} resourceEntry = do
mWaiters <- pop waiters
case mWaiters of
Nothing -> modifyTVar_ entries (resourceEntry:)
Just w -> putTMVar w (Just resourceEntry)
{-# INLINE putResourceSTM #-}

destroyResourceSTM :: LocalPool a -> STM ()
destroyResourceSTM LocalPool{..} = do
mwaiter <- pop waiters
case mwaiter of
Nothing -> modifyTVar_ inUse (subtract 1)
Just w -> putTMVar w Nothing
{-# INLINE destroyResourceSTM #-}

-- | Destroy all resources in all stripes in the pool. Note that this
-- will ignore any exceptions in the destroy function.
Expand All @@ -385,6 +423,22 @@ putResource LocalPool{..} resource = do
destroyAllResources :: Pool a -> IO ()
destroyAllResources Pool{..} = V.forM_ localPools $ purgeLocalPool destroy

-- | @stats pool reset@ returns statistics on each 'LocalPool' as well as a summary across the entire Pool.
-- When @reset@ is true, the stats are reset.
stats :: Pool a -> Bool -> IO Stats
stats Pool{..} reset = do
let stripeStats LocalPool{..} = atomically $ do
s <- liftM5 PoolStats (readTVar highwaterVar) (readTVar inUse) (readTVar takeVar) (readTVar createVar) (readTVar createFailureVar)
when reset $ do
mapM_ (\v -> writeTVar v 0) [takeVar, createVar, createFailureVar]
writeTVar highwaterVar $! currentUsage s
return s

per <- V.mapM stripeStats localPools
let poolWide = V.foldr merge (PoolStats 0 0 0 0 0) per
merge (PoolStats hw1 cu1 t1 c1 f1) (PoolStats hw2 cu2 t2 c2 f2) = PoolStats (hw1 + hw2) (cu1 + cu2) (t1 + t2) (c1 + c2) (f1 + f2)
return $ Stats per poolWide

modifyTVar_ :: TVar a -> (a -> a) -> STM ()
modifyTVar_ v f = readTVar v >>= \a -> writeTVar v $! f a

Expand Down
87 changes: 87 additions & 0 deletions Data/Pool/WaiterQueue.hs
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
module Data.Pool.WaiterQueue
( WaiterQueue,

import Control.Concurrent.STM

-- | A FIFO queue that supports removing any element from the queue.
-- We have a pointer to the head of the list, and a pointer to the
-- final forward pointer in the list.
data WaiterQueue a
= WaiterQueue
(TVar (TDList a))
(TVar (TVar (TDList a)))

-- | Each element has a pointer to the previous element's forward
-- pointer where "previous element" can be a 'TDList' cons cell or the
-- 'WaiterQueue' head pointer.
data TDList a
= TCons
(TVar (TVar (TDList a)))
(TVar (TDList a))
| TNil

newQueueIO :: IO (WaiterQueue a)
newQueueIO = do
emptyVarL <- newTVarIO TNil
emptyVarR <- newTVarIO emptyVarL
pure (WaiterQueue emptyVarL emptyVarR)

removeSelf ::
-- | 'WaiterQueue's final forward pointer pointer
TVar (TVar (TDList a)) ->
-- | Our back pointer
TVar (TVar (TDList a)) ->
-- | Our forward pointer
TVar (TDList a) ->
STM ()
removeSelf tv prevPP nextP = do
prevP <- readTVar prevPP
-- If our back pointer points to our forward pointer then we have
-- already been removed from the queue
case prevP == nextP of
True -> pure ()
False -> do
next <- readTVar nextP
writeTVar prevP next
case next of
TNil -> writeTVar tv prevP
TCons bp _ _ -> writeTVar bp prevP
writeTVar prevPP nextP
{-# INLINE removeSelf #-}

-- | Returns an STM action that removes the pushed element from the
-- queue
push :: WaiterQueue a -> a -> STM (STM ())
push (WaiterQueue _ tv) a = do
fwdPointer <- readTVar tv
backPointer <- newTVar fwdPointer
emptyVar <- newTVar TNil
let cell = TCons backPointer a emptyVar
writeTVar fwdPointer cell
writeTVar tv emptyVar
pure (removeSelf tv backPointer emptyVar)
{-# INLINE push #-}

pop :: WaiterQueue a -> STM (Maybe a)
pop (WaiterQueue hv tv) = do
firstElem <- readTVar hv
case firstElem of
TNil -> pure Nothing
TCons bp a fp -> do
f <- readTVar fp
writeTVar hv f
case f of
TNil -> writeTVar tv hv
TCons fbp _ _ -> writeTVar fbp hv
-- point the back pointer to the forward pointer as a sign that
-- the cell has been popped (referenced in removeSelf)
writeTVar bp fp
pure (Just a)
{-# INLINE pop #-}