Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improved connection recovery on read replicas #2813

Merged
merged 5 commits into from
Jun 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,12 @@ This project adheres to [Semantic Versioning](http://semver.org/).
- #2703, Add pre-config function - @steve-chavez
+ New config option `db-pre-config`(empty by default)
+ Allows using the in-database configuration without SUPERUSER
- #2781, Start automatic connection recovery when pool connections are closed with pg_terminate_backend - @steve-chavez

### Fixed

- #2791, Fix dropping schema cache reload notifications - @steve-chavez
- #2801, Stop retrying connection when "no password supplied" - @steve-chavez

## [11.0.1] - 2023-04-27

Expand Down
22 changes: 5 additions & 17 deletions src/PostgREST/App.hs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ Some of its functionality includes:
- Producing HTTP Headers according to RFCs.
- Content Negotiation
-}
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE RecordWildCards #-}
module PostgREST.App
( SignalHandlerInstaller
Expand All @@ -20,15 +19,14 @@ module PostgREST.App


import Control.Monad.Except (liftEither)
import Data.Either.Combinators (mapLeft, whenLeft)
import Data.Either.Combinators (mapLeft)
import Data.Maybe (fromJust)
import Data.String (IsString (..))
import Network.Wai.Handler.Warp (defaultSettings, setHost, setPort,
setServerName)
import System.Posix.Types (FileMode)

import qualified Data.HashMap.Strict as HM
import qualified Hasql.Pool as SQL
import qualified Hasql.Transaction.Sessions as SQL
import qualified Network.Wai as Wai
import qualified Network.Wai.Handler.Warp as Warp
Expand Down Expand Up @@ -154,27 +152,17 @@ postgrestResponse appState conf@AppConfig{..} maybeSchemaCache pgVer authResult@
Response.optionalRollback conf apiRequest $
handleRequest authResult conf appState (Just authRole /= configDbAnonRole) configDbPreparedStatements pgVer apiRequest sCache

runDbHandler :: AppState.AppState -> Maybe Text -> SQL.Mode -> Bool -> Bool -> DbHandler b -> Handler IO b
runDbHandler :: AppState.AppState -> SQL.IsolationLevel -> SQL.Mode -> Bool -> Bool -> DbHandler b -> Handler IO b
runDbHandler appState isoLvl mode authenticated prepared handler = do
dbResp <- lift $ do
let transaction = if prepared then SQL.transaction else SQL.unpreparedTransaction
res <- AppState.usePool appState . transaction (toIsolationLevel isoLvl) mode $ runExceptT handler
whenLeft res (\case
SQL.AcquisitionTimeoutUsageError -> AppState.debounceLogAcquisitionTimeout appState -- this can happen rapidly for many requests, so we debounce
_ -> pure ())
return res
AppState.usePool appState . transaction isoLvl mode $ runExceptT handler

resp <-
liftEither . mapLeft Error.PgErr $
mapLeft (Error.PgError authenticated) dbResp

liftEither resp
where
toIsolationLevel = \case
Nothing -> SQL.ReadCommitted
Just "repeatable read" -> SQL.RepeatableRead
Just "serializable" -> SQL.Serializable
_ -> SQL.ReadCommitted

handleRequest :: AuthResult -> AppConfig -> AppState.AppState -> Bool -> Bool -> PgVersion -> ApiRequest -> SchemaCache -> Handler IO Wai.Response
handleRequest AuthResult{..} conf appState authenticated prepared pgVer apiReq@ApiRequest{..} sCache =
Expand Down Expand Up @@ -206,7 +194,7 @@ handleRequest AuthResult{..} conf appState authenticated prepared pgVer apiReq@A

(ActionInvoke invMethod, TargetProc identifier _) -> do
cPlan <- liftEither $ Plan.callReadPlan identifier conf sCache apiReq invMethod
resultSet <- runQuery (roleIsoLvl <|> pdIsoLvl (Plan.crProc cPlan))(Plan.crTxMode cPlan) $ Query.invokeQuery (Plan.crProc cPlan) cPlan apiReq conf pgVer
resultSet <- runQuery (fromMaybe roleIsoLvl $ pdIsoLvl (Plan.crProc cPlan))(Plan.crTxMode cPlan) $ Query.invokeQuery (Plan.crProc cPlan) cPlan apiReq conf pgVer
return $ Response.invokeResponse invMethod (Plan.crProc cPlan) apiReq resultSet

(ActionInspect headersOnly, TargetDefaultSpec tSchema) -> do
Expand All @@ -229,7 +217,7 @@ handleRequest AuthResult{..} conf appState authenticated prepared pgVer apiReq@A
throwError $ Error.ApiRequestError ApiRequestTypes.NotFound
where
roleSettings = fromMaybe mempty (HM.lookup authRole $ configRoleSettings conf)
roleIsoLvl = decodeUtf8 <$> HM.lookup "default_transaction_isolation" roleSettings
roleIsoLvl = HM.findWithDefault SQL.ReadCommitted authRole $ configRoleIsoLvl conf
runQuery isoLvl mode query =
runDbHandler appState isoLvl mode authenticated prepared $ do
Query.setPgLocals conf authClaims authRole (HM.toList roleSettings) apiReq pgVer
Expand Down
49 changes: 40 additions & 9 deletions src/PostgREST/AppState.hs
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,15 @@ module PostgREST.AppState
, putSchemaCache
, putPgVersion
, usePool
, debounceLogAcquisitionTimeout
, loadSchemaCache
, reReadConfig
, connectionWorker
, runListener
) where

import qualified Data.ByteString as BS
import qualified Data.ByteString.Char8 as BS
import qualified Data.ByteString.Lazy as LBS
import Data.Either.Combinators (whenLeft)
import qualified Data.Text.Encoding as T
import Hasql.Connection (acquire)
import qualified Hasql.Notifications as SQL
Expand Down Expand Up @@ -140,7 +140,12 @@ initPool AppConfig{..} =

-- | Run an action with a database connection.
usePool :: AppState -> SQL.Session a -> IO (Either SQL.UsageError a)
usePool AppState{..} = SQL.use statePool
usePool AppState{..} x = do
res <- SQL.use statePool x
whenLeft res (\case
SQL.AcquisitionTimeoutUsageError -> debounceLogAcquisitionTimeout -- this can happen rapidly for many requests, so we debounce
_ -> pure ())
return res

-- | Flush the connection pool so that any future use of the pool will
-- use connections freshly established after this call.
Expand Down Expand Up @@ -226,7 +231,7 @@ loadSchemaCache appState = do
querySchemaCache conf
case result of
Left e -> do
case Error.checkIsFatal e of
case checkIsFatal e of
Just hint -> do
logWithZTime appState "A fatal error ocurred when loading the schema cache"
logPgrstError appState e
Expand Down Expand Up @@ -321,7 +326,7 @@ establishConnection appState =
case pgVersion of
Left e -> do
logPgrstError appState e
case Error.checkIsFatal e of
case checkIsFatal e of
Just reason ->
return $ FatalConnectionError reason
Nothing ->
Expand Down Expand Up @@ -357,7 +362,7 @@ reReadConfig startingUp appState = do
Left e -> do
logWithZTime appState
"An error ocurred when trying to query database settings for the config parameters"
case Error.checkIsFatal e of
case checkIsFatal e of
Just hint -> do
logPgrstError appState e
logWithZTime appState hint
Expand All @@ -368,18 +373,18 @@ reReadConfig startingUp appState = do
Right x -> pure x
else
pure mempty
roleSettings <-
(roleSettings, roleIsolationLvl) <-
if configDbConfig then do
rSettings <- usePool appState $ queryRoleSettings configDbPreparedStatements
case rSettings of
Left e -> do
logWithZTime appState "An error ocurred when trying to query the role settings"
logPgrstError appState e
pure mempty
pure (mempty, mempty)
Right x -> pure x
else
pure mempty
readAppConfig dbSettings configFilePath (Just configDbUri) roleSettings >>= \case
readAppConfig dbSettings configFilePath (Just configDbUri) roleSettings roleIsolationLvl >>= \case
Left err ->
if startingUp then
panic err -- die on invalid config if the program is starting up
Expand Down Expand Up @@ -442,3 +447,29 @@ listener appState = do
-- reloads the schema cache + restarts pool connections
-- it's necessary to restart the pg connections because they cache the pg catalog(see #2620)
connectionWorker appState

checkIsFatal :: SQL.UsageError -> Maybe Text
checkIsFatal (SQL.ConnectionUsageError e)
| isAuthFailureMessage = Just $ toS failureMessage
| otherwise = Nothing
where isAuthFailureMessage =
("FATAL: password authentication failed" `isInfixOf` failureMessage) ||
("no password supplied" `isInfixOf` failureMessage)
failureMessage = BS.unpack $ fromMaybe mempty e
checkIsFatal(SQL.SessionUsageError (SQL.QueryError _ _ (SQL.ResultError serverError)))
= case serverError of
-- Check for a syntax error (42601 is the pg code). This would mean the error is on our part somehow, so we treat it as fatal.
SQL.ServerError "42601" _ _ _ _
-> Just "Hint: This is probably a bug in PostgREST, please report it at https://github.com/PostgREST/postgrest/issues"
-- Check for a "prepared statement <name> already exists" error (Code 42P05: duplicate_prepared_statement).
-- This would mean that a connection pooler in transaction mode is being used
-- while prepared statements are enabled in the PostgREST configuration,
-- both of which are incompatible with each other.
SQL.ServerError "42P05" _ _ _ _
-> Just "Hint: If you are using connection poolers in transaction mode, try setting db-prepared-statements to false."
-- Check for a "transaction blocks not allowed in statement pooling mode" error (Code 08P01: protocol_violation).
-- This would mean that a connection pooler in statement mode is being used which is not supported in PostgREST.
SQL.ServerError "08P01" "transaction blocks not allowed in statement pooling mode" _ _ _
-> Just "Hint: Connection poolers in statement mode are not supported."
_ -> Nothing
checkIsFatal _ = Nothing
2 changes: 1 addition & 1 deletion src/PostgREST/CLI.hs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import Protolude hiding (hPutStrLn)
main :: App.SignalHandlerInstaller -> Maybe App.SocketRunner -> CLI -> IO ()
main installSignalHandlers runAppWithSocket CLI{cliCommand, cliPath} = do
conf@AppConfig{..} <-
either panic identity <$> Config.readAppConfig mempty cliPath Nothing mempty
either panic identity <$> Config.readAppConfig mempty cliPath Nothing mempty mempty

-- Per https://github.com/PostgREST/postgrest/issues/268, we want to
-- explicitly close the connections to PostgreSQL on shutdown.
Expand Down
15 changes: 9 additions & 6 deletions src/PostgREST/Config.hs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,8 @@ import Numeric (readOct, showOct)
import System.Environment (getEnvironment)
import System.Posix.Types (FileMode)

import PostgREST.Config.Database (RoleSettings)
import PostgREST.Config.Database (RoleIsolationLvl,
RoleSettings)
import PostgREST.Config.JSPath (JSPath, JSPathExp (..),
dumpJSPath, pRoleClaimKey)
import PostgREST.Config.Proxy (Proxy (..),
Expand Down Expand Up @@ -103,6 +104,7 @@ data AppConfig = AppConfig
, configServerUnixSocketMode :: FileMode
, configAdminServerPort :: Maybe Int
, configRoleSettings :: RoleSettings
, configRoleIsoLvl :: RoleIsolationLvl
, configInternalSCSleep :: Maybe Int32
}

Expand Down Expand Up @@ -198,13 +200,13 @@ instance JustIfMaybe a (Maybe a) where

-- | Reads and parses the config and overrides its parameters from env vars,
-- files or db settings.
readAppConfig :: [(Text, Text)] -> Maybe FilePath -> Maybe Text -> RoleSettings -> IO (Either Text AppConfig)
readAppConfig dbSettings optPath prevDbUri roleSettings = do
readAppConfig :: [(Text, Text)] -> Maybe FilePath -> Maybe Text -> RoleSettings -> RoleIsolationLvl -> IO (Either Text AppConfig)
readAppConfig dbSettings optPath prevDbUri roleSettings roleIsolationLvl = do
env <- readPGRSTEnvironment
-- if no filename provided, start with an empty map to read config from environment
conf <- maybe (return $ Right M.empty) loadConfig optPath

case C.runParser (parser optPath env dbSettings roleSettings) =<< mapLeft show conf of
case C.runParser (parser optPath env dbSettings roleSettings roleIsolationLvl) =<< mapLeft show conf of
Left err ->
return . Left $ "Error in config " <> err
Right parsedConfig ->
Expand All @@ -219,8 +221,8 @@ readAppConfig dbSettings optPath prevDbUri roleSettings = do
decodeJWKS <$>
(decodeSecret =<< readSecretFile =<< readDbUriFile prevDbUri parsedConfig)

parser :: Maybe FilePath -> Environment -> [(Text, Text)] -> RoleSettings -> C.Parser C.Config AppConfig
parser optPath env dbSettings roleSettings =
parser :: Maybe FilePath -> Environment -> [(Text, Text)] -> RoleSettings -> RoleIsolationLvl -> C.Parser C.Config AppConfig
parser optPath env dbSettings roleSettings roleIsolationLvl =
AppConfig
<$> parseAppSettings "app.settings"
<*> (fmap encodeUtf8 <$> optString "db-anon-role")
Expand Down Expand Up @@ -268,6 +270,7 @@ parser optPath env dbSettings roleSettings =
<*> parseSocketFileMode "server-unix-socket-mode"
<*> optInt "admin-server-port"
<*> pure roleSettings
<*> pure roleIsolationLvl
<*> optInt "internal-schema-cache-sleep"
where
parseAppSettings :: C.Key -> C.Parser C.Config [(Text, Text)]
Expand Down
52 changes: 40 additions & 12 deletions src/PostgREST/Config/Database.hs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ module PostgREST.Config.Database
, queryRoleSettings
, queryPgVersion
, RoleSettings
, RoleIsolationLvl
, toIsolationLevel
) where

import Control.Arrow ((***))
Expand All @@ -25,7 +27,14 @@ import Text.InterpolatedString.Perl6 (q, qc)

import Protolude

type RoleSettings = (HM.HashMap ByteString (HM.HashMap ByteString ByteString))
type RoleSettings = (HM.HashMap ByteString (HM.HashMap ByteString ByteString))
type RoleIsolationLvl = HM.HashMap ByteString SQL.IsolationLevel

toIsolationLevel :: (Eq a, IsString a) => a -> SQL.IsolationLevel
toIsolationLevel a = case a of
"repeatable read" -> SQL.RepeatableRead
"serializable" -> SQL.Serializable
_ -> SQL.ReadCommitted

prefix :: Text
prefix = "pgrst."
Expand Down Expand Up @@ -117,13 +126,10 @@ queryDbSettings preConfFunc prepared =
|]::Text
decodeSettings = HD.rowList $ (,) <$> column HD.text <*> column HD.text

queryRoleSettings :: Bool -> Session RoleSettings
queryRoleSettings :: Bool -> Session (RoleSettings, RoleIsolationLvl)
queryRoleSettings prepared =
let transaction = if prepared then SQL.transaction else SQL.unpreparedTransaction in
transaction SQL.ReadCommitted SQL.Read $ SQL.statement mempty $ roleSettingsStatement prepared

roleSettingsStatement :: Bool -> SQL.Statement () RoleSettings
roleSettingsStatement = SQL.Statement sql HE.noParams decodeRoleSettings
transaction SQL.ReadCommitted SQL.Read $ SQL.statement mempty $ SQL.Statement sql HE.noParams (processRows <$> rows) prepared
where
sql = [q|
with
Expand All @@ -139,18 +145,40 @@ roleSettingsStatement = SQL.Statement sql HE.noParams decodeRoleSettings
substr(setting, 1, strpos(setting, '=') - 1) as key,
lower(substr(setting, strpos(setting, '=') + 1)) as value
FROM role_setting
),
iso_setting AS (
SELECT rolname, value
FROM kv_settings
WHERE key = 'default_transaction_isolation'
)
select rolname, array_agg(row(key, value))
from kv_settings
group by rolname;
select
kv.rolname,
i.value as iso_lvl,
array_agg(row(kv.key, kv.value)) filter (where key <> 'default_transation_isolation') as role_settings
from kv_settings kv
left join iso_setting i on i.rolname = kv.rolname
group by kv.rolname, i.value;
|]
decodeRoleSettings = HM.fromList . map (bimap encodeUtf8 (HM.fromList . ((encodeUtf8 *** encodeUtf8) <$>))) <$> HD.rowList aRow
aRow :: HD.Row (Text, [(Text, Text)])
aRow = (,) <$> column HD.text <*> compositeArrayColumn ((,) <$> compositeField HD.text <*> compositeField HD.text)

processRows :: [(Text, Maybe Text, [(Text, Text)])] -> (RoleSettings, RoleIsolationLvl)
processRows rs =
let
rowsWRoleSettings = [ (x, z) | (x, _, z) <- rs ]
rowsWIsolation = [ (x, y) | (x, Just y, _) <- rs ]
in
( HM.fromList $ bimap encodeUtf8 (HM.fromList . ((encodeUtf8 *** encodeUtf8) <$>)) <$> rowsWRoleSettings
, HM.fromList $ (encodeUtf8 *** toIsolationLevel) <$> rowsWIsolation
)

rows :: HD.Result [(Text, Maybe Text, [(Text, Text)])]
rows = HD.rowList $ (,,) <$> column HD.text <*> nullableColumn HD.text <*> compositeArrayColumn ((,) <$> compositeField HD.text <*> compositeField HD.text)

column :: HD.Value a -> HD.Row a
column = HD.column . HD.nonNullable

nullableColumn :: HD.Value a -> HD.Row (Maybe a)
nullableColumn = HD.column . HD.nullable

compositeField :: HD.Value a -> HD.Composite a
compositeField = HD.field . HD.nonNullable

Expand Down
25 changes: 1 addition & 24 deletions src/PostgREST/Error.hs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ module PostgREST.Error
, PgError(..)
, Error(..)
, errorPayload
, checkIsFatal
, singularityError
) where

Expand Down Expand Up @@ -429,6 +428,7 @@ pgErrorStatus authed (SQL.SessionUsageError (SQL.QueryError _ _ (SQL.ResultError
'5':'3':_ -> HTTP.status503 -- insufficient resources
'5':'4':_ -> HTTP.status413 -- too complex
'5':'5':_ -> HTTP.status500 -- obj not on prereq state
'5':'7':'P':'0':'1':_ -> HTTP.status503 -- terminating connection due to administrator command
'5':'7':_ -> HTTP.status500 -- operator intervention
'5':'8':_ -> HTTP.status500 -- system error
'F':'0':_ -> HTTP.status500 -- conf file error
Expand All @@ -446,29 +446,6 @@ pgErrorStatus authed (SQL.SessionUsageError (SQL.QueryError _ _ (SQL.ResultError

_ -> HTTP.status500

checkIsFatal :: SQL.UsageError -> Maybe Text
checkIsFatal (SQL.ConnectionUsageError e)
| isAuthFailureMessage = Just $ toS failureMessage
| otherwise = Nothing
where isAuthFailureMessage = "FATAL: password authentication failed" `isInfixOf` failureMessage
failureMessage = BS.unpack $ fromMaybe mempty e
checkIsFatal(SQL.SessionUsageError (SQL.QueryError _ _ (SQL.ResultError serverError)))
= case serverError of
-- Check for a syntax error (42601 is the pg code). This would mean the error is on our part somehow, so we treat it as fatal.
SQL.ServerError "42601" _ _ _ _
-> Just "Hint: This is probably a bug in PostgREST, please report it at https://github.com/PostgREST/postgrest/issues"
-- Check for a "prepared statement <name> already exists" error (Code 42P05: duplicate_prepared_statement).
-- This would mean that a connection pooler in transaction mode is being used
-- while prepared statements are enabled in the PostgREST configuration,
-- both of which are incompatible with each other.
SQL.ServerError "42P05" _ _ _ _
-> Just "Hint: If you are using connection poolers in transaction mode, try setting db-prepared-statements to false."
-- Check for a "transaction blocks not allowed in statement pooling mode" error (Code 08P01: protocol_violation).
-- This would mean that a connection pooler in statement mode is being used which is not supported in PostgREST.
SQL.ServerError "08P01" "transaction blocks not allowed in statement pooling mode" _ _ _
-> Just "Hint: Connection poolers in statement mode are not supported."
_ -> Nothing
checkIsFatal _ = Nothing


data Error
Expand Down
Loading