Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add connection retrying on startup and SIGHUP, Fix #742 #869

Merged
merged 6 commits into from
May 7, 2017
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ This project adheres to [Semantic Versioning](http://semver.org/).

### Added

- #742, Add connection retrying on startup and SIGHUP - @steve-chavez

### Fixed

## [0.4.1.0] - 2017-04-25
Expand Down
86 changes: 70 additions & 16 deletions main/Main.hs
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,15 @@ import PostgREST.Config (AppConfig (..),
import PostgREST.Error (encodeError)
import PostgREST.OpenAPI (isMalformedProxyUri)
import PostgREST.DbStructure
import PostgREST.Types (DbStructure, Schema)

import Control.AutoUpdate
import Control.Retry
import Data.ByteString.Base64 (decode)
import Data.String (IsString (..))
import Data.Text (stripPrefix, pack, replace)
import Data.Text.Encoding (encodeUtf8, decodeUtf8)
import Data.Text.IO (hPutStrLn, readFile)
import Data.Function (id)
import Data.Time.Clock.POSIX (getPOSIXTime)
import qualified Hasql.Query as H
import qualified Hasql.Session as H
Expand All @@ -43,6 +44,65 @@ isServerVersionSupported = do
H.statement "SELECT current_setting('server_version_num')::integer"
HE.unit (HD.singleRow $ HD.value HD.int4) False

{-|
Background thread that does the following :
1. Tries to connect to pg server and will keep trying until success.
2. Checks if the pg version is supported and if it's not it kills the main program.
3. Obtains the dbStructure.
4. If 2 or 3 fail to give their result it means the connection is down so it goes back to 1,
otherwise it finishes his work successfully.
-}
connectionWorker :: ThreadId -> P.Pool -> Schema -> IORef (Maybe DbStructure) -> IORef Bool -> IO ()
connectionWorker mainTid pool schema refDbStructure refIsWorkerOn = do
isWorkerOn <- readIORef refIsWorkerOn
unless isWorkerOn $ do
atomicWriteIORef refIsWorkerOn True
void $ forkIO work
where
work = do
atomicWriteIORef refDbStructure Nothing
putStrLn ("Attempting to connect to the database..." :: Text)
connected <- connectingSucceeded pool
when connected $ do
result <- P.use pool $ do
supported <- isServerVersionSupported
unless supported $ liftIO $ do
hPutStrLn stderr
("Cannot run in this PostgreSQL version, PostgREST needs at least "
<> pgvName minimumPgVersion)
killThread mainTid
dbStructure <- getDbStructure schema
liftIO $ atomicWriteIORef refDbStructure $ Just dbStructure
case result of
Left e -> do
putStrLn ("Failed to query the database. Retrying." :: Text)
hPutStrLn stderr (toS $ encodeError e)
work
Right _ -> do
atomicWriteIORef refIsWorkerOn False
putStrLn ("Connection successful" :: Text)

-- | Connect to pg server if it fails retry with capped exponential backoff until success
connectingSucceeded :: P.Pool -> IO Bool
connectingSucceeded pool =
retrying (capDelay 32000000 $ exponentialBackoff 1000000)
shouldRetry
(const $ P.release pool >> isConnectionSuccessful)
where
isConnectionSuccessful :: IO Bool
isConnectionSuccessful = do
testConn <- P.use pool $ H.sql "SELECT 1"
case testConn of
Left e -> hPutStrLn stderr (toS $ encodeError e) >> pure False
_ -> pure True
shouldRetry :: RetryStatus -> Bool -> IO Bool
shouldRetry rs isConnSucc = do
delay <- pure $ fromMaybe 0 (rsPreviousDelay rs) `div` 1000000
itShould <- pure $ not isConnSucc
when itShould $
putStrLn $ "Attempting to reconnect to the database in " <> (show delay::Text) <> " seconds..."
return itShould

main :: IO ()
main = do
hSetBuffering stdout LineBuffering
Expand All @@ -67,31 +127,24 @@ main = do

pool <- P.acquire (configPool conf, 10, pgSettings)

result <- P.use pool $ do
supported <- isServerVersionSupported
unless supported $ panic (
"Cannot run in this PostgreSQL version, PostgREST needs at least "
<> pgvName minimumPgVersion)
getDbStructure (toS $ configSchema conf)
refDbStructure <- newIORef Nothing

-- Helper ref to make sure just one connectionWorker can run at a time
refIsWorkerOn <- newIORef False

forM_ (lefts [result]) $ \e -> do
hPutStrLn stderr (toS $ encodeError e)
exitFailure
mainTid <- myThreadId

refDbStructure <- newIORef $ either (panic . show) id result
connectionWorker mainTid pool (configSchema conf) refDbStructure refIsWorkerOn

#ifndef mingw32_HOST_OS
tid <- myThreadId
forM_ [sigINT, sigTERM] $ \sig ->
void $ installHandler sig (Catch $ do
P.release pool
throwTo tid UserInterrupt
throwTo mainTid UserInterrupt
) Nothing

void $ installHandler sigHUP (
Catch . void . P.use pool $ do
s <- getDbStructure (toS $ configSchema conf)
liftIO $ atomicWriteIORef refDbStructure s
Catch $ connectionWorker mainTid pool (configSchema conf) refDbStructure refIsWorkerOn
) Nothing
#endif

Expand All @@ -100,6 +153,7 @@ main = do
defaultUpdateSettings { updateAction = getPOSIXTime }

runSettings appSettings $ postgrest conf refDbStructure pool getTime
(connectionWorker mainTid pool (configSchema conf) refDbStructure refIsWorkerOn)

loadSecretFile :: AppConfig -> IO AppConfig
loadSecretFile conf = extractAndTransform mSecret
Expand Down
1 change: 1 addition & 0 deletions postgrest.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ executable postgrest
, warp
, bytestring
, base64-bytestring
, retry
if !os(windows)
build-depends: unix

Expand Down
42 changes: 26 additions & 16 deletions src/PostgREST/App.hs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ import PostgREST.DbRequestBuilder( readRequest
import PostgREST.Error ( simpleError, pgError
, apiRequestError
, singularityError, binaryFieldError
, connectionLostError
)
import PostgREST.RangeQuery (allRange, rangeOffset)
import PostgREST.Middleware
Expand All @@ -62,28 +63,37 @@ import Data.Function (id)
import Protolude hiding (intercalate, Proxy)
import Safe (headMay)

postgrest :: AppConfig -> IORef DbStructure -> P.Pool -> IO POSIXTime ->
Application
postgrest conf refDbStructure pool getTime =
postgrest :: AppConfig -> IORef (Maybe DbStructure) -> P.Pool -> IO POSIXTime ->
IO () -> Application
postgrest conf refDbStructure pool getTime worker =
let middle = (if configQuiet conf then id else logStdout) . defaultMiddle in

middle $ \ req respond -> do
time <- getTime
body <- strictRequestBody req
dbStructure <- readIORef refDbStructure
maybeDbStructure <- readIORef refDbStructure
case maybeDbStructure of
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is where I see that having an Either ApiRequestError DbStructure type would be useful, making composition easier

Nothing -> respond connectionLostError
Just dbStructure -> do
response <- case userApiRequest (configSchema conf) req body of
Left err -> return $ apiRequestError err
Right apiRequest -> do
let jwtSecret = binarySecret <$> configJwtSecret conf
eClaims = jwtClaims jwtSecret (iJWT apiRequest) time
authed = containsRole eClaims
handleReq = runWithClaims conf eClaims (app dbStructure conf) apiRequest
txMode = transactionMode dbStructure
(iTarget apiRequest) (iAction apiRequest)
response <- P.use pool $ HT.transaction HT.ReadCommitted txMode handleReq
return $ either (pgError authed) identity response
if isResponse503 response then do
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might be a good use for the when function:

when (isResponse503 response) worker
respond respose

worker
respond response
else
respond response

response <- case userApiRequest (configSchema conf) req body of
Left err -> return $ apiRequestError err
Right apiRequest -> do
let jwtSecret = binarySecret <$> configJwtSecret conf
eClaims = jwtClaims jwtSecret (iJWT apiRequest) time
authed = containsRole eClaims
handleReq = runWithClaims conf eClaims (app dbStructure conf) apiRequest
txMode = transactionMode dbStructure
(iTarget apiRequest) (iAction apiRequest)
response <- P.use pool $ HT.transaction HT.ReadCommitted txMode handleReq
return $ either (pgError authed) identity response
respond response
isResponse503 :: Response -> Bool
isResponse503 resp = statusCode (responseStatus resp) == 503

transactionMode :: DbStructure -> Target -> Action -> H.Mode
transactionMode structure target action =
Expand Down
7 changes: 6 additions & 1 deletion src/PostgREST/Error.hs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ module PostgREST.Error (
, simpleError
, singularityError
, binaryFieldError
, connectionLostError
, encodeError
) where

Expand Down Expand Up @@ -73,6 +74,10 @@ binaryFieldError =
simpleError HT.status406 (toS (toMime CTOctetStream) <>
" requested but a single column was not selected")

connectionLostError :: Response
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using the either type we could also remove this function and use only apiRequestError

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A special message for the reconnection state is needed. I tried to make your suggestion work by adding a new constructor for ApiRequestError with this message, all good, the problem was that by making the IORef (Either ApiRequestError DbStructure) I had to change two function type signatures, namely app and transactionMode, to avoid that I could have unwrapped the Left of the IORef, but that would be the same non composable way as of the Maybe. In the end I was not able to make the code simpler with your suggestion.

connectionLostError =
simpleError HT.status503 "Database connection lost, retrying the connection."

encodeError :: JSON.ToJSON a => a -> LByteString
encodeError = JSON.encode

Expand Down Expand Up @@ -128,7 +133,7 @@ instance JSON.ToJSON H.Error where
"details" .= (fmap toS d::Maybe Text)]

httpStatus :: Bool -> P.UsageError -> HT.Status
httpStatus _ (P.ConnectionError _) = HT.status500
httpStatus _ (P.ConnectionError _) = HT.status503
httpStatus authed (P.SessionError (H.ResultError (H.ServerError c _ _ _))) =
case toS c of
'0':'8':_ -> HT.status503 -- pg connection err
Expand Down
14 changes: 7 additions & 7 deletions test/Main.hs
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,13 @@ main = do


result <- P.use pool $ getDbStructure "test"
refDbStructure <- newIORef $ either (panic.show) id result
let withApp = return $ postgrest (testCfg testDbConn) refDbStructure pool getTime
ltdApp = return $ postgrest (testLtdRowsCfg testDbConn) refDbStructure pool getTime
unicodeApp = return $ postgrest (testUnicodeCfg testDbConn) refDbStructure pool getTime
proxyApp = return $ postgrest (testProxyCfg testDbConn) refDbStructure pool getTime
noJwtApp = return $ postgrest (testCfgNoJWT testDbConn) refDbStructure pool getTime
binaryJwtApp = return $ postgrest (testCfgBinaryJWT testDbConn) refDbStructure pool getTime
refDbStructure <- newIORef $ Just $ either (panic.show) id result
let withApp = return $ postgrest (testCfg testDbConn) refDbStructure pool getTime $ pure ()
ltdApp = return $ postgrest (testLtdRowsCfg testDbConn) refDbStructure pool getTime $ pure ()
unicodeApp = return $ postgrest (testUnicodeCfg testDbConn) refDbStructure pool getTime $ pure ()
proxyApp = return $ postgrest (testProxyCfg testDbConn) refDbStructure pool getTime $ pure ()
noJwtApp = return $ postgrest (testCfgNoJWT testDbConn) refDbStructure pool getTime $ pure ()
binaryJwtApp = return $ postgrest (testCfgBinaryJWT testDbConn) refDbStructure pool getTime $ pure ()

let reset = resetDb testDbConn
hspec $ do
Expand Down