Skip to content
This repository has been archived by the owner on Sep 1, 2022. It is now read-only.

Add support for flushing the pool #1

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions hasql-pool.cabal
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
name:
hasql-pool
version:
0.7.2
0.7.2.1
category:
Hasql, Database, PostgreSQL
synopsis:
Expand Down Expand Up @@ -49,7 +49,7 @@ library
Hasql.Pool.Prelude
build-depends:
base >=4.11 && <5,
hasql >=1.3 && <1.6,
hasql >=1.6 && <1.7,
stm >=2.5 && <3,
time >=1.5 && <2,
transformers >=0.5 && <0.7
Expand Down
60 changes: 45 additions & 15 deletions library/Hasql/Pool.hs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ module Hasql.Pool
Pool,
acquire,
acquireDynamically,
flush,
release,
use,

Expand All @@ -23,10 +24,16 @@ data Pool = Pool
poolFetchConnectionSettings :: IO Connection.Settings,
-- | Avail connections.
poolConnectionQueue :: TQueue Connection,
-- | Capacity.
-- | Remaining capacity.
-- The pool size limits the sum of poolCapacity, the length
-- of length poolConnectionQueue and the number of in-flight
-- connections.
poolCapacity :: TVar Int,
-- | Alive.
poolAlive :: TVar Bool
-- | Liveness state of the current generation.
-- The pool as a whole is alive if the current generation is alive,
-- while a connection is returned to the pool if the generation it
-- was acquired in is still alive.
poolAlive :: TVar (TVar Bool)
}

-- | Given the pool-size and connection settings create a connection-pool.
Expand All @@ -50,16 +57,36 @@ acquireDynamically poolSize fetchConnectionSettings = do
Pool fetchConnectionSettings
<$> newTQueueIO
<*> newTVarIO poolSize
<*> newTVarIO True
<*> (newTVarIO =<< newTVarIO True)

-- | Release all the connections in the pool.
-- | Release all the idle connections in the pool and mark the pool as dead.
-- In-use connections will survive this and be closed once they would be returned
-- to the pool.
release :: Pool -> IO ()
release Pool {..} = do
connections <- atomically $ do
writeTVar poolAlive False
alive <- readTVar poolAlive
writeTVar alive False
flushTQueue poolConnectionQueue
forM_ connections Connection.release

-- | Flush the pool, so that using the pool doesn't reuse any connection from
-- before the call. Release all the idle connections in the pool, and mark
-- in-use connections to be closed once they would be returned.
flush :: Pool -> IO ()
flush Pool {..} =
join . atomically $ do
prevAlive <- readTVar poolAlive
alive <- readTVar prevAlive
if alive
then do
writeTVar prevAlive False
writeTVar poolAlive =<< newTVar True
conns <- flushTQueue poolConnectionQueue
modifyTVar' poolCapacity (+ (length conns))
return $ forM_ conns Connection.release
else return (return ())

-- | Use a connection from the pool to run a session and return the connection
-- to the pool, when finished.
--
Expand All @@ -70,30 +97,31 @@ release Pool {..} = do
use :: Pool -> Session.Session a -> IO (Either UsageError a)
use Pool {..} sess =
join . atomically $ do
alive <- readTVar poolAlive
aliveVar <- readTVar poolAlive
alive <- readTVar aliveVar
if alive
then
then do
asum
[ readTQueue poolConnectionQueue <&> onConn,
[ readTQueue poolConnectionQueue <&> onConn aliveVar,
do
capVal <- readTVar poolCapacity
if capVal > 0
then do
writeTVar poolCapacity $! pred capVal
return onNewConn
return $ onNewConn aliveVar
else retry
]
else return . return . Left $ PoolIsReleasedUsageError
where
onNewConn = do
onNewConn aliveVar = do
settings <- poolFetchConnectionSettings
connRes <- Connection.acquire settings
case connRes of
Left connErr -> do
atomically $ modifyTVar' poolCapacity succ
return $ Left $ ConnectionUsageError connErr
Right conn -> onConn conn
onConn conn = do
Right conn -> onConn aliveVar conn
onConn aliveVar conn = do
sessRes <- Session.run sess conn
case sessRes of
Left err -> case err of
Expand All @@ -109,10 +137,12 @@ use Pool {..} sess =
where
returnConn =
join . atomically $ do
alive <- readTVar poolAlive
alive <- readTVar aliveVar
if alive
then writeTQueue poolConnectionQueue conn $> return ()
else return $ Connection.release conn
else do
modifyTVar' poolCapacity succ
return $ Connection.release conn

-- | Union over all errors that 'use' can result in.
data UsageError
Expand Down
2 changes: 1 addition & 1 deletion stack.yaml
Original file line number Diff line number Diff line change
@@ -1 +1 @@
resolver: nightly-2022-05-31
resolver: https://raw.githubusercontent.com/nikita-volkov/stack-snapshot/f55bfe4e3f2bd73ffa6d9255ea20ab31a993661c/snapshot.yaml
121 changes: 116 additions & 5 deletions stack.yaml.lock
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,121 @@
# For more information, please see the documentation at:
# https://docs.haskellstack.org/en/stable/lock_files

packages: []
packages:
- completed:
name: coalmine
pantry-tree:
sha256: 8896e428a656c392d25d499f9b024abd0c30459db09e8a78e1645bad43d4fb89
size: 9564
commit: a2857feca4d8a25bd868e0a4ff985b7ef4db6ec3
git: https://github.com/nikita-volkov/coalmine
version: 0.1.0.4
original:
commit: a2857feca4d8a25bd868e0a4ff985b7ef4db6ec3
git: https://github.com/nikita-volkov/coalmine
- completed:
name: moore-machines
pantry-tree:
sha256: 3d1505067d5c8599e9843ed45ec2bc587fa5100c448f558ca5db2c97fb55706f
size: 887
commit: 7e95c33b203a1cfe42dbc0e0de3cec8e6fe94290
git: https://github.com/nikita-volkov/moore-machines
version: '0.1'
original:
commit: 7e95c33b203a1cfe42dbc0e0de3cec8e6fe94290
git: https://github.com/nikita-volkov/moore-machines
- completed:
name: structure-kit
pantry-tree:
sha256: b7779ebe935d65c0b4f6e3ea8b5f5011abdabf362b1673826545c93625e050ad
size: 3500
commit: 46ffb6527c48f8a718adecd21a313f0b1ba5e81c
git: https://github.com/nikita-volkov/structure-kit
version: 0.1.0.1
original:
commit: 46ffb6527c48f8a718adecd21a313f0b1ba5e81c
git: https://github.com/nikita-volkov/structure-kit
- completed:
name: canapi
pantry-tree:
sha256: 54c44986c3d7f8daa4464e4e097f312dc3ac0c02f7eb5fbb8bba3a6e6d19487a
size: 1319
commit: 02f1d9ffea90522a3a0968305793a7f507c7da59
git: https://github.com/nikita-volkov/canapi
version: '0.1'
original:
commit: 02f1d9ffea90522a3a0968305793a7f507c7da59
git: https://github.com/nikita-volkov/canapi
- completed:
name: lean-http-client
pantry-tree:
sha256: 64da0f3390166969e2fe6a140d847c315258f9bcb8fcaf3169fd78b4b25b6f6f
size: 544
commit: 52343895df1e5ae49159b8c5f76a50ac438216b1
git: https://github.com/nikita-volkov/lean-http-client
version: '0.1'
original:
commit: 52343895df1e5ae49159b8c5f76a50ac438216b1
git: https://github.com/nikita-volkov/lean-http-client
- completed:
name: distillery
pantry-tree:
sha256: 03d32965a35914b68bdaa149db48ce66e774783360f5b7d8b18db72de61b7baf
size: 524
commit: 2c2905f2addb0d1925c43d427fcf65340ee64b2a
git: https://github.com/nikita-volkov/distillery
version: '0.1'
original:
commit: 2c2905f2addb0d1925c43d427fcf65340ee64b2a
git: https://github.com/nikita-volkov/distillery
- completed:
name: acquire
pantry-tree:
sha256: e194150d070a0e4f07e5f2285e82f177312f328c7490f3a659f361b4f6324d00
size: 438
commit: 75194757308ae9340b0e2eda46d8db4d25af7516
git: https://github.com/metrix-ai/acquire
version: 0.3.1
original:
commit: 75194757308ae9340b0e2eda46d8db4d25af7516
git: https://github.com/metrix-ai/acquire
- completed:
pantry-tree:
sha256: 520aa5f59b55c14d27115a801f499f940a563849b0c54876bcd6be41f6450713
size: 456
hackage: cereal-data-dword-0.1.1@sha256:a3fa6dfafaeeb3774c5888dfa601d561ce05b0af66f52859970247e58135d47e,1381
original:
hackage: cereal-data-dword-0.1.1
- completed:
pantry-tree:
sha256: f2fd5281b8fcea2ffa1f323feca1c84d030914dfec18f61e52be8a60d6939037
size: 2622
hackage: hasql-1.6@sha256:78f5fd9e732862cbffb06123015866312bf6f3889c18508018820536490a8460,6624
original:
hackage: hasql-1.6
- completed:
pantry-tree:
sha256: 946dbffdad0a92dbdeeaaabb1c7ac2451f3002561407068543fcd860cd860f15
size: 473
hackage: punycode-2.0@sha256:dce7f481b5eedc750cf49d21a80b839cb366afd2d29bc506acca5ad9d0c06a8f,1503
original:
hackage: punycode-2.0
- completed:
pantry-tree:
sha256: 51dc0ab22269973eec829ae476fb3102cfe155ffde2fd1d039dedf65c7842031
size: 509
hackage: timestamp-0.2@sha256:103fe49ce206922d3b14d57d8aff25eaf03c70fef5fcdaeaeb8416aaee163e51,1598
original:
hackage: timestamp-0.2
snapshots:
- completed:
sha256: c92a0359aa608c8528e0a6a3f952e7b8501c7fac946b8b0e037125e1ab271423
size: 590824
url: https://raw.githubusercontent.com/commercialhaskell/stackage-snapshots/master/nightly/2022/5/31.yaml
original: nightly-2022-05-31
sha256: a4a05670f8e2a2bc108aca4e093c57eb82f562afd6949f299b0116861444f6f6
size: 887
url: https://raw.githubusercontent.com/nikita-volkov/stack-snapshot/f55bfe4e3f2bd73ffa6d9255ea20ab31a993661c/snapshot.yaml
original:
url: https://raw.githubusercontent.com/nikita-volkov/stack-snapshot/f55bfe4e3f2bd73ffa6d9255ea20ab31a993661c/snapshot.yaml
- completed:
sha256: 6a7d84094bf948100b933f83f8cff586a41633d59cb1adbb89eefc25405fabe8
size: 616794
url: https://raw.githubusercontent.com/commercialhaskell/stackage-snapshots/master/nightly/2022/7/16.yaml
original: nightly-2022-07-16
46 changes: 45 additions & 1 deletion test/Main.hs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,33 @@ main = hspec $ do
res <- use pool $ badQuerySession
res <- use pool $ selectOneSession
shouldSatisfy res $ isRight
it "Getting and setting session variables works" $ do
pool <- acquire 1 connectionSettings
res <- use pool $ getSettingSession "testing.foo"
res `shouldBe` Right Nothing
res <- use pool $ do
setSettingSession "testing.foo" "hello world"
getSettingSession "testing.foo"
res `shouldBe` Right (Just "hello world")
it "Session variables stay set when a connection gets reused" $ do
pool <- acquire 1 connectionSettings
res <- use pool $ setSettingSession "testing.foo" "hello world"
res `shouldBe` Right ()
res2 <- use pool $ getSettingSession "testing.foo"
res2 `shouldBe` Right (Just "hello world")
it "Flushing the pool resets session variables" $ do
pool <- acquire 1 connectionSettings
res <- use pool $ setSettingSession "testing.foo" "hello world"
res `shouldBe` Right ()
flush pool
res <- use pool $ getSettingSession "testing.foo"
res `shouldBe` Right Nothing
it "Flushing a released pool leaves it dead" $ do
pool <- acquire 1 connectionSettings
release pool
flush pool
res <- use pool $ selectOneSession
res `shouldBe` Left PoolIsReleasedUsageError

connectionSettings :: Connection.Settings
connectionSettings =
Expand All @@ -60,9 +87,26 @@ badQuerySession :: Session.Session ()
badQuerySession =
Session.statement () statement
where
statement = Statement.Statement "" Encoders.noParams Decoders.noResult True
statement = Statement.Statement "zzz" Encoders.noParams Decoders.noResult True

closeConnSession :: Session.Session ()
closeConnSession = do
conn <- ask
liftIO $ Connection.release conn

setSettingSession :: Text -> Text -> Session.Session ()
setSettingSession name value = do
Session.statement (name, value) statement
where
statement = Statement.Statement "SELECT set_config($1, $2, false)" encoder Decoders.noResult True
encoder =
contramap fst (Encoders.param (Encoders.nonNullable Encoders.text))
<> contramap snd (Encoders.param (Encoders.nonNullable Encoders.text))

getSettingSession :: Text -> Session.Session (Maybe Text)
getSettingSession name = do
Session.statement name statement
where
statement = Statement.Statement "SELECT current_setting($1, true)" encoder decoder True
encoder = Encoders.param (Encoders.nonNullable Encoders.text)
decoder = Decoders.singleRow (Decoders.column (Decoders.nullable Decoders.text))