Skip to content
This repository has been archived by the owner on Sep 24, 2022. It is now read-only.

Commit

Permalink
Discard postgres connections periodically to free memory
Browse files Browse the repository at this point in the history
  • Loading branch information
jberryman committed Jun 18, 2020
1 parent 70a849d commit b526847
Show file tree
Hide file tree
Showing 4 changed files with 61 additions and 11 deletions.
2 changes: 1 addition & 1 deletion bench/Main.hs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ getPoolC = do
, Q.connUser = "admin"
, Q.connDatabase = "chinook"
}
connParams = Q.ConnParams 1 1 180
connParams = Q.ConnParams 1 1 180 Nothing
Q.initPGPool connInfo connParams

q1 :: T.Text
Expand Down
12 changes: 8 additions & 4 deletions src/Database/PG/Query/Connection.hs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ import Data.Bool
import Data.Hashable
import Data.IORef
import Data.Maybe
import Data.Time
import Data.Word
import GHC.Exts
import GHC.Generics
Expand Down Expand Up @@ -279,7 +280,7 @@ retryOnConnErr pgConn action =
Left (Right pgConnErr) -> return $ Left pgConnErr
where
resetFn = resetPGConn pgConn
PGConn _ _ retryP logger _ _ = pgConn
PGConn _ _ retryP logger _ _ _ _ = pgConn

checkResult
:: PQ.Connection
Expand Down Expand Up @@ -401,10 +402,13 @@ data PGConn
, pgLogger :: !PGLogger
, pgCounter :: !(IORef Word16)
, pgTable :: !RKLookupTable
, pgCreatedAt :: !UTCTime
, pgMbLifetime :: !(Maybe NominalDiffTime)
-- ^ If passed, 'withExpiringPGconn' will destroy the connection when it is older than lifetime.
}

resetPGConn :: PGConn -> IO ()
resetPGConn (PGConn conn _ _ _ ctr ht) = do
resetPGConn (PGConn conn _ _ _ ctr ht _ _) = do
-- Reset LibPQ connection
PQ.reset conn
-- Set counter to 0
Expand Down Expand Up @@ -449,7 +453,7 @@ prepare
-> Template
-> [PQ.Oid]
-> PGExec RemoteKey
prepare (PGConn conn _ _ _ counter table) t tl = do
prepare (PGConn conn _ _ _ counter table _ _) t tl = do
let lk = localKey t tl
rkm <- lift $ HI.lookup table lk
case rkm of
Expand Down Expand Up @@ -498,7 +502,7 @@ execQuery pgConn pgQuery = do
bool withoutPrepare withPrepare $ allowPrepare && preparable
withExceptT PGIUnexpected $ convF resOk
where
PGConn conn allowPrepare _ _ _ _ = pgConn
PGConn conn allowPrepare _ _ _ _ _ _ = pgConn
PGQuery t params preparable convF = pgQuery
withoutPrepare = execParams conn t params
withPrepare = do
Expand Down
3 changes: 1 addition & 2 deletions src/Database/PG/Query/Listen.hs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import Database.PG.Query.Transaction
import Control.Exception (displayException, try)
import Control.Monad.Except
import Control.Monad.Trans.Control
import Data.Pool (withResource)
import Data.String
import GHC.Conc.IO (threadWaitRead)

Expand Down Expand Up @@ -48,7 +47,7 @@ listen
)
=> PGPool -> PGChannel -> NotifyHandler -> m ()
listen pool channel handler = catchConnErr $
withResource pool $ \pgConn -> do
withExpiringPGconn pool $ \pgConn -> do
let conn = pgPQConn pgConn

-- Issue listen command
Expand Down
55 changes: 51 additions & 4 deletions src/Database/PG/Query/Pool.hs
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE TemplateHaskell #-}
{-# LANGUAGE NamedFieldPuns #-}
{-# OPTIONS_GHC -fno-warn-missing-fields #-}

module Database.PG.Query.Pool
( ConnParams (..)
, PGPool
, withExpiringPGconn
, defaultConnParams
, initPGPool
, destroyPGPool
Expand All @@ -23,6 +25,8 @@ module Database.PG.Query.Pool
, PGExecErr(..)
, FromPGConnErr(..)
, FromPGTxErr(..)
-- * Forcing destroying of connections
, PGConnectionStale(..)
) where

import Database.PG.Query.Connection
Expand All @@ -33,6 +37,7 @@ import Control.Monad.Except
import Control.Monad.Trans.Control
import Data.Aeson
import Data.IORef
import Data.Time
import GHC.Exts (fromString)
import Language.Haskell.TH.Quote
import Language.Haskell.TH.Syntax
Expand All @@ -49,12 +54,15 @@ data ConnParams
{ cpStripes :: !Int
, cpConns :: !Int
, cpIdleTime :: !Int
-- ^ Connections that sit idle for longer than cpIdleTime may be destroyed.
, cpAllowPrepare :: !Bool
, cpMbLifetime :: !(Maybe NominalDiffTime)
-- ^ If passed, 'withExpiringPGconn' will destroy the connection when it is older than lifetime.
}
deriving (Show, Eq)

defaultConnParams :: ConnParams
defaultConnParams = ConnParams 1 20 60 True
defaultConnParams = ConnParams 1 20 60 True Nothing

initPGPool :: ConnInfo
-> ConnParams
Expand All @@ -67,10 +75,11 @@ initPGPool ci cp logger =
nConns = cpConns cp
retryP = mkPGRetryPolicy $ ciRetries ci
creator = do
createdAt <- getCurrentTime
pqConn <- initPQConn ci logger
ctr <- newIORef 0
table <- HI.new
return $ PGConn pqConn (cpAllowPrepare cp) retryP logger ctr table
return $ PGConn pqConn (cpAllowPrepare cp) retryP logger ctr table createdAt (cpMbLifetime cp)
destroyer = PQ.finish . pgPQConn
diffTime = fromIntegral $ cpIdleTime cp

Expand Down Expand Up @@ -145,7 +154,7 @@ withConn :: (FromPGTxErr e, FromPGConnErr e)
withConn pool txm f =
catchConnErr action
where
action = RP.withResource pool $
action = withExpiringPGconn pool $
\connRsrc -> runTxOnConn connRsrc txm f

catchConnErr :: (FromPGConnErr e, MonadError e m, MonadBaseControl IO m)
Expand Down Expand Up @@ -179,7 +188,7 @@ runTx' :: (FromPGTxErr e, FromPGConnErr e)
-> ExceptT e IO a
runTx' pool tx = do
res <- liftIO $ runExceptT $ catchConnErr $
RP.withResource pool $ \connRsrc -> execTx connRsrc tx
withExpiringPGconn pool $ \connRsrc -> execTx connRsrc tx
either throwError return res

runTxOnConn' :: PGConn
Expand All @@ -194,3 +203,41 @@ sqlFromFile :: FilePath -> Q Exp
sqlFromFile fp = do
contents <- qAddDependentFile fp >> runIO (readFile fp)
[| fromString contents |]


-- | 'RP.withResource' for PGPool but implementing a workaround for #5087,
-- optionally expiring the connection after a configurable amount of time so
-- that memory at least can't accumulate unbounded in long-lived connections.
--
-- See ticket for discussion of more long-term solutions.
--
-- Note that idle connections that aren't actively expired here will be
-- destroyed per the timeout policy in Data.Pool.
withExpiringPGconn
:: (MonadBaseControl IO m, MonadIO m)=> PGPool -> (PGConn -> m a) -> m a
withExpiringPGconn pool f = do
-- If the connection was stale, we'll discard it and retry, possibly forcing
-- creation of new connection:
handleLifted (\PGConnectionStale -> withExpiringPGconn pool f) $ do
RP.withResource pool $ \connRsrc@PGConn{pgCreatedAt, pgMbLifetime} -> do
now <- liftIO $ getCurrentTime
let connectionStale =
maybe False (\lifetime-> now `diffUTCTime` pgCreatedAt > lifetime) pgMbLifetime
when connectionStale $ do
-- Throwing is the only way to signal to resource pool to discard the
-- connection at this time, so we need to use it for control flow:
throw PGConnectionStale
-- else proceed with callback:
f connRsrc

-- | Used internally (see 'withExpiringPGconn'), but exported in case we need
-- to allow callback to signal that the connection should be destroyed and we
-- should retry.
data PGConnectionStale = PGConnectionStale
deriving Show

instance Exception PGConnectionStale

-- cribbed from lifted-base
handleLifted :: (MonadBaseControl IO m, Exception e) => (e -> m a) -> m a -> m a
handleLifted handler ma = control $ \runInIO -> handle (runInIO . handler) (runInIO ma)

0 comments on commit b526847

Please sign in to comment.