Skip to content

Commit

Permalink
Schema cache reload with zero downtime (PostgREST#1559)
Browse files Browse the repository at this point in the history
* Improve error messages and comments

* Reorder Main.hs functions
  • Loading branch information
steve-chavez authored Jul 16, 2020
1 parent f071180 commit f3776c2
Show file tree
Hide file tree
Showing 5 changed files with 143 additions and 146 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ This project adheres to [Semantic Versioning](http://semver.org/).
- #1512, Allow schema cache reloading with NOTIFY - @steve-chavez
- #1119, Allow config file reloading with SIGUSR2 - @steve-chavez
- #1558, Allow 'Bearer' with and without capitalization as authentication schema - @wolfgangwalther
- #1559, No downtime when reloading the schema cache with SIGUSR1 - @steve-chavez

### Fixed

Expand Down
275 changes: 135 additions & 140 deletions main/Main.hs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,112 @@ import System.Posix.Signals
import UnixSocket
#endif

-- | This is where everything starts.
main :: IO ()
main = do
--
-- LineBuffering: the entire output buffer is flushed whenever a newline is
-- output, the buffer overflows, a hFlush is issued or the handle is closed
--
-- NoBuffering: output is written immediately and never stored in the buffer
hSetBuffering stdout LineBuffering
hSetBuffering stdin LineBuffering
hSetBuffering stderr NoBuffering

-- read path from commad line
path <- readPathShowHelp

-- build the 'AppConfig' from the config file path
conf <- readValidateConfig path

-- These are config values that can't be reloaded at runtime. Reloading some of them would imply restarting the web server.
let
host = configHost conf
port = configPort conf
maybeSocketAddr = configSocket conf
socketFileMode = configSocketMode conf
dbUri = toS (configDbUri conf)
(dbChannelEnabled, dbChannel) = (configDbChannelEnabled conf, toS $ configDbChannel conf)
serverSettings =
setHost ((fromString . toS) host) -- Warp settings
. setPort port
. setServerName (toS $ "postgrest/" <> prettyVersion) $
defaultSettings
poolSize = configPoolSize conf
poolTimeout = configPoolTimeout' conf

-- create connection pool with the provided settings, returns either a 'Connection' or a 'ConnectionError'. Does not throw.
pool <- P.acquire (poolSize, poolTimeout, dbUri)

-- Used to sync the listener(NOTIFY reload) with the connectionWorker. No connection for the listener at first. Only used if dbChannelEnabled=true.
mvarConnectionStatus <- newEmptyMVar

-- No schema cache at the start. Will be filled in by the connectionWorker
refDbStructure <- newIORef Nothing

-- Helper ref to make sure just one connectionWorker can run at a time
refIsWorkerOn <- newIORef False

-- Config that can change at runtime
refConf <- newIORef conf

-- This is passed to the connectionWorker method so it can kill the main thread if the PostgreSQL's version is not supported.
mainTid <- myThreadId

let connWorker = connectionWorker mainTid pool refConf refDbStructure refIsWorkerOn (dbChannelEnabled, mvarConnectionStatus)

-- Sets the initial refDbStructure
connWorker

#ifndef mingw32_HOST_OS
-- Only for systems with signals:
--
-- releases the connection pool whenever the program is terminated,
-- see https://github.com/PostgREST/postgrest/issues/268
forM_ [sigINT, sigTERM] $ \sig ->
void $ installHandler sig (Catch $ do
P.release pool
throwTo mainTid UserInterrupt
) Nothing

-- The SIGUSR1 signal updates the internal 'DbStructure' by running 'connectionWorker' exactly as before.
void $ installHandler sigUSR1 (
Catch connWorker
) Nothing

-- Re-read the config on SIGUSR2
void $ installHandler sigUSR2 (
Catch $ reReadConfig path refConf
) Nothing
#endif

-- reload schema cache on NOTIFY
when dbChannelEnabled $
listener dbUri dbChannel pool refConf refDbStructure mvarConnectionStatus connWorker

-- ask for the OS time at most once per second
getTime <- mkAutoUpdate defaultUpdateSettings {updateAction = getCurrentTime}

let postgrestApplication =
postgrest
LogStdout
refConf
refDbStructure
pool
getTime
connWorker

#ifndef mingw32_HOST_OS
-- run the postgrest application with user defined socket. Only for UNIX systems.
whenJust maybeSocketAddr $
runAppInSocket serverSettings postgrestApplication socketFileMode
#endif

-- run the postgrest application
whenNothing maybeSocketAddr $ do
putStrLn $ ("Listening on port " :: Text) <> show port
runSettings serverSettings postgrestApplication

-- Time constants
_32s :: Int
_32s = 32000000 :: Int -- 32 seconds
Expand All @@ -53,9 +159,8 @@ _1s :: Int
_1s = 1000000 :: Int -- 1 second

{-|
The purpose of this worker is to fill the refDbStructure created in 'main'
with the 'DbStructure' returned from calling 'getDbStructure'. This method
is meant to be called by multiple times by the same thread, but does nothing if
The purpose of this worker is to obtain a healthy connection to pg and an up-to-date schema cache(DbStructure).
This method is meant to be called by multiple times by the same thread, but does nothing if
the previous invocation has not terminated. In all cases this method does not
halt the calling thread, the work is preformed in a separate thread.
Expand All @@ -69,11 +174,11 @@ _1s = 1000000 :: Int -- 1 second
3. Obtains the dbStructure.
-}
connectionWorker
:: ThreadId -- ^ Main thread id. Killed if pg version is unsupported
-> P.Pool -- ^ The PostgreSQL connection pool
-> IORef AppConfig
-> IORef (Maybe DbStructure) -- ^ mutable reference to 'DbStructure'
-> IORef Bool -- ^ Used as a binary Semaphore
:: ThreadId -- ^ Main thread id. Killed if pg version is unsupported
-> P.Pool -- ^ The pg connection pool
-> IORef AppConfig -- ^ mutable reference to AppConfig
-> IORef (Maybe DbStructure) -- ^ mutable reference to 'DbStructure'
-> IORef Bool -- ^ Used as a binary Semaphore
-> (Bool, MVar ConnectionStatus) -- ^ For interacting with the LISTEN channel
-> IO ()
connectionWorker mainTid pool refConf refDbStructure refIsWorkerOn (dbChannelEnabled, mvarConnectionStatus) = do
Expand All @@ -83,7 +188,6 @@ connectionWorker mainTid pool refConf refDbStructure refIsWorkerOn (dbChannelEna
void $ forkIO work
where
work = do
atomicWriteIORef refDbStructure Nothing
putStrLn ("Attempting to connect to the database..." :: Text)
connected <- connectionStatus pool
when dbChannelEnabled $
Expand All @@ -96,29 +200,13 @@ connectionWorker mainTid pool refConf refDbStructure refIsWorkerOn (dbChannelEna
fillSchemaCache pool actualPgVersion refConf refDbStructure
liftIO $ atomicWriteIORef refIsWorkerOn False

fillSchemaCache :: P.Pool -> PgVersion -> IORef AppConfig -> IORef (Maybe DbStructure) -> IO ()
fillSchemaCache pool actualPgVersion refConf refDbStructure = do
conf <- readIORef refConf
result <- P.use pool $ HT.transaction HT.ReadCommitted HT.Read $ getDbStructure (toList $ configSchemas conf) actualPgVersion
case result of
Left e -> do
-- If this error happens it would mean the connection is down again. Improbable because connectionStatus ensured the connection.
-- It's not a problem though, because App.postgrest would retry the connectionWorker or the user can do a SIGSUR1 again.
hPutStrLn stderr . toS . errorPayload $ PgError False e
putStrLn ("Failed to load the schema cache" :: Text)

Right dbStructure -> do
atomicWriteIORef refDbStructure $ Just dbStructure
putStrLn ("Schema cache loaded" :: Text)

{-|
Used by 'connectionWorker' to check if the provided db-uri lets
the application access the PostgreSQL database. This method is used
the first time the connection is tested, but only to test before
calling 'getDbStructure' inside the 'connectionWorker' method.
Check if a connection from the pool allows access to the PostgreSQL database.
If not, the pool connections are released and a new connection is tried.
Releasing the pool is key for rapid recovery. Otherwise, the pool timeout would have to be reached for new healthy connections to be acquired.
Which might not happen if the server is busy with requests. No idle connection, no pool timeout.
The connection tries are capped, but if the connection times out no error is
thrown, just 'False' is returned.
The connection tries are capped, but if the connection times out no error is thrown, just 'False' is returned.
-}
connectionStatus :: P.Pool -> IO ConnectionStatus
connectionStatus pool =
Expand Down Expand Up @@ -150,9 +238,25 @@ connectionStatus pool =
putStrLn $ "Attempting to reconnect to the database in " <> (show delay::Text) <> " seconds..."
return itShould

-- | Fill the DbStructure by using a connection from the pool
fillSchemaCache :: P.Pool -> PgVersion -> IORef AppConfig -> IORef (Maybe DbStructure) -> IO ()
fillSchemaCache pool actualPgVersion refConf refDbStructure = do
conf <- readIORef refConf
result <- P.use pool $ HT.transaction HT.ReadCommitted HT.Read $ getDbStructure (toList $ configSchemas conf) actualPgVersion
case result of
Left e -> do
-- If this error happens it would mean the connection is down again. Improbable because connectionStatus ensured the connection.
-- It's not a problem though, because App.postgrest would retry the connectionWorker or the user can do a SIGSUR1 again.
hPutStrLn stderr . toS . errorPayload $ PgError False e
putStrLn ("Failed to load the schema cache" :: Text)

Right dbStructure -> do
atomicWriteIORef refDbStructure $ Just dbStructure
putStrLn ("Schema cache loaded" :: Text)

{-|
Starts a dedicated pg connection to LISTEN for notifications.
When a NOTIFY channel(with an empty payload) is done, it refills the schema cache.
When a NOTIFY <db-channel> - with an empty payload - is done, it refills the schema cache.
It uses the connectionWorker in case the LISTEN connection dies.
-}
listener :: ByteString -> Text -> P.Pool -> IORef AppConfig -> IORef (Maybe DbStructure) -> MVar ConnectionStatus -> IO () -> IO ()
Expand Down Expand Up @@ -195,115 +299,6 @@ reReadConfig path refConf = do
atomicWriteIORef refConf conf
putStrLn ("Config file reloaded" :: Text)

-- | This is where everything starts.
main :: IO ()
main = do
--
-- LineBuffering: the entire output buffer is flushed whenever a newline is
-- output, the buffer overflows, a hFlush is issued or the handle is closed
--
-- NoBuffering: output is written immediately and never stored in the buffer
hSetBuffering stdout LineBuffering
hSetBuffering stdin LineBuffering
hSetBuffering stderr NoBuffering

-- read path from commad line
path <- readPathShowHelp

-- build the 'AppConfig' from the config file path
conf <- readValidateConfig path

-- These are config values that can't be reloaded at runtime. Reloading some of them would imply restarting the web server.
let
host = configHost conf
port = configPort conf
maybeSocketAddr = configSocket conf
socketFileMode = configSocketMode conf
dbUri = toS (configDbUri conf)
(dbChannelEnabled, dbChannel) = (configDbChannelEnabled conf, toS $ configDbChannel conf)
serverSettings =
setHost ((fromString . toS) host) -- Warp settings
. setPort port
. setServerName (toS $ "postgrest/" <> prettyVersion) $
defaultSettings
poolSize = configPoolSize conf
poolTimeout = configPoolTimeout' conf

-- create connection pool with the provided settings, returns either
-- a 'Connection' or a 'ConnectionError'. Does not throw.
pool <- P.acquire (poolSize, poolTimeout, dbUri)

-- Used to sync the listener with the connectionWorker. No connection for the listener at first. Only used if dbChannelEnabled=true.
mvarConnectionStatus <- newEmptyMVar

-- To be filled in by connectionWorker
refDbStructure <- newIORef Nothing

-- Helper ref to make sure just one connectionWorker can run at a time
refIsWorkerOn <- newIORef False

-- Config that can change at runtime
refConf <- newIORef conf

-- This is passed to the connectionWorker method so it can kill the main
-- thread if the PostgreSQL's version is not supported.
mainTid <- myThreadId

let connWorker = connectionWorker mainTid pool refConf refDbStructure refIsWorkerOn (dbChannelEnabled, mvarConnectionStatus)

-- Sets the initial refDbStructure
connWorker

#ifndef mingw32_HOST_OS
-- Only for systems with signals:
--
-- releases the connection pool whenever the program is terminated,
-- see https://github.com/PostgREST/postgrest/issues/268
forM_ [sigINT, sigTERM] $ \sig ->
void $ installHandler sig (Catch $ do
P.release pool
throwTo mainTid UserInterrupt
) Nothing

-- Plus the SIGUSR1 signal updates the internal 'DbStructure' by running
-- 'connectionWorker' exactly as before.
void $ installHandler sigUSR1 (
Catch connWorker
) Nothing

-- Re-read the config on SIGUSR2
void $ installHandler sigUSR2 (
Catch $ reReadConfig path refConf
) Nothing
#endif

-- reload schema cache on NOTIFY
when dbChannelEnabled $
listener dbUri dbChannel pool refConf refDbStructure mvarConnectionStatus connWorker

-- ask for the OS time at most once per second
getTime <- mkAutoUpdate defaultUpdateSettings {updateAction = getCurrentTime}

let postgrestApplication =
postgrest
LogStdout
refConf
refDbStructure
pool
getTime
connWorker

#ifndef mingw32_HOST_OS
-- run the postgrest application with user defined socket. Only for UNIX systems.
whenJust maybeSocketAddr $
runAppInSocket serverSettings postgrestApplication socketFileMode
#endif

-- run the postgrest application
whenNothing maybeSocketAddr $ do
putStrLn $ ("Listening on port " :: Text) <> show port
runSettings serverSettings postgrestApplication

-- Utilitarian functions.
whenJust :: Applicative f => Maybe a -> (a -> f ()) -> f ()
whenJust (Just x) f = f x
Expand Down
5 changes: 3 additions & 2 deletions src/PostgREST/App.hs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ import Protolude hiding (Proxy, intercalate, toS)
import Protolude.Conv (toS)

postgrest :: LogSetup -> IORef AppConfig -> IORef (Maybe DbStructure) -> P.Pool -> IO UTCTime -> IO () -> Application
postgrest logS refConf refDbStructure pool getTime worker =
postgrest logS refConf refDbStructure pool getTime connWorker =
pgrstMiddleware logS $ \ req respond -> do
time <- getTime
body <- strictRequestBody req
Expand Down Expand Up @@ -100,7 +100,8 @@ postgrest logS refConf refDbStructure pool getTime worker =
txMode = transactionMode proc (iAction apiRequest)
dbResp <- P.use pool $ HT.transaction HT.ReadCommitted txMode handleReq
return $ either (errorResponseFor . PgError authed) identity dbResp
when (responseStatus response == status503) worker
-- Launch the connWorker when the connection is down. The postgrest function can respond successfully(with a stale schema cache) before the connWorker is done.
when (responseStatus response == status503) connWorker
respond response

transactionMode :: Maybe ProcDescription -> Action -> HT.Mode
Expand Down
6 changes: 3 additions & 3 deletions src/PostgREST/Error.hs
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ instance JSON.ToJSON PgError where
instance JSON.ToJSON P.UsageError where
toJSON (P.ConnectionError e) = JSON.object [
"code" .= ("" :: Text),
"message" .= ("Database connection error" :: Text),
"message" .= ("Database connection error. Retrying the connection." :: Text),
"details" .= (toS $ fromMaybe "" e :: Text)]
toJSON (P.SessionError e) = JSON.toJSON e -- H.Error

Expand Down Expand Up @@ -169,7 +169,7 @@ instance JSON.ToJSON H.CommandError where
"message" .= ("Unexpected amount of rows" :: Text),
"details" .= i]
toJSON (H.ClientError d) = JSON.object [
"message" .= ("Database client error" :: Text),
"message" .= ("Database client error. Retrying the connection." :: Text),
"details" .= (fmap toS d :: Maybe Text)]

pgErrorStatus :: Bool -> P.UsageError -> HT.Status
Expand Down Expand Up @@ -256,7 +256,7 @@ instance JSON.ToJSON SimpleError where
toJSON (BinaryFieldError ct) = JSON.object [
"message" .= ((toS (toMime ct) <> " requested but more than one column was selected") :: Text)]
toJSON ConnectionLostError = JSON.object [
"message" .= ("Database connection lost, retrying the connection." :: Text)]
"message" .= ("Database connection lost. Retrying the connection." :: Text)]

toJSON PutRangeNotAllowedError = JSON.object [
"message" .= ("Range header and limit/offset querystring parameters are not allowed for PUT" :: Text)]
Expand Down
2 changes: 1 addition & 1 deletion test/io-tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -251,11 +251,11 @@ checkDbSchemaReload(){
sleep 0.1 \
|| sleep 1 # fallback: subsecond sleep is not standard and may fail
done
secret="reallyreallyreallyreallyverysafe"
# add v1 schema to db-schema
replaceConfigValue "db-schema" "test, v1" ./configs/sigusr2-settings.config
# reload
kill -s SIGUSR2 $pgrPID
kill -s SIGUSR1 $pgrPID
httpStatus="$(v1SchemaParentsStatus)"
if test "$httpStatus" -eq 200
then
Expand Down

0 comments on commit f3776c2

Please sign in to comment.