diff --git a/services/horizon/internal/app.go b/services/horizon/internal/app.go index 873699e575..1e65443411 100644 --- a/services/horizon/internal/app.go +++ b/services/horizon/internal/app.go @@ -494,7 +494,7 @@ func (a *App) init() error { a.UpdateStellarCoreInfo(a.ctx) // horizon-db and core-db - dbServerSideTimeout := mustInitHorizonDB(a) + mustInitHorizonDB(a) if a.config.Ingest { // ingester @@ -532,7 +532,7 @@ func (a *App) init() error { SSEUpdateFrequency: a.config.SSEUpdateFrequency, StaleThreshold: a.config.StaleThreshold, ConnectionTimeout: a.config.ConnectionTimeout, - DBServerSideTimeout: dbServerSideTimeout, + CancelDBQueryTimeout: a.config.CancelDBQueryTimeout, MaxHTTPRequestSize: a.config.MaxHTTPRequestSize, NetworkPassphrase: a.config.NetworkPassphrase, MaxPathLength: a.config.MaxPathLength, diff --git a/services/horizon/internal/config.go b/services/horizon/internal/config.go index f1f8ac078d..b5c2434eab 100644 --- a/services/horizon/internal/config.go +++ b/services/horizon/internal/config.go @@ -36,8 +36,9 @@ type Config struct { HorizonDBMaxOpenConnections int HorizonDBMaxIdleConnections int - SSEUpdateFrequency time.Duration - ConnectionTimeout time.Duration + SSEUpdateFrequency time.Duration + ConnectionTimeout time.Duration + CancelDBQueryTimeout time.Duration // MaxHTTPRequestSize is the maximum allowed request payload size MaxHTTPRequestSize uint RateQuota *throttled.RateQuota diff --git a/services/horizon/internal/flags.go b/services/horizon/internal/flags.go index 4c8e4dc2f5..0ae1248082 100644 --- a/services/horizon/internal/flags.go +++ b/services/horizon/internal/flags.go @@ -8,6 +8,7 @@ import ( "os" "os/exec" "strings" + "time" "github.com/sirupsen/logrus" "github.com/spf13/viper" @@ -437,6 +438,17 @@ func Flags() (*Config, support.ConfigOptions) { Usage: "defines the timeout of connection after which 504 response will be sent or stream will be closed, if Horizon is behind a load balancer with idle connection timeout, this should be set to a few seconds less that idle timeout, does not apply to POST /transactions", UsedInCommands: ApiServerCommands, }, + &support.ConfigOption{ + Name: "cancel-db-query-timeout", + ConfigKey: &config.CancelDBQueryTimeout, + OptType: types.Int, + CustomSetValue: support.SetDuration, + Usage: "defines the timeout for when horizon will cancel all postgres queries connected to an HTTP request. The timeout is measured in seconds since the start of the HTTP request. Note, this timeout does not apply to POST /transactions. " + + "The difference between cancel-db-query-timeout and connection-timeout is that connection-timeout applies a postgres statement timeout whereas cancel-db-query-timeout will send an additional request to postgres to cancel the ongoing query. " + + "Generally, cancel-db-query-timeout should be configured to be higher than connection-timeout to allow the postgres statement timeout to kill long running queries without having to send the additional cancel request to postgres. " + + "By default, cancel-db-query-timeout will be set to 2 seconds more than connection-timeout.", + UsedInCommands: ApiServerCommands, + }, &support.ConfigOption{ Name: "max-http-request-size", ConfigKey: &config.MaxHTTPRequestSize, @@ -983,5 +995,10 @@ func ApplyFlags(config *Config, flags support.ConfigOptions, options ApplyOption " If Horizon is behind both, use --behind-cloudflare only") } + if config.CancelDBQueryTimeout == 0 { + // the default value for cancel-db-query-timeout is 2 seconds more than connection-timeout + config.CancelDBQueryTimeout = config.ConnectionTimeout + time.Second*2 + } + return nil } diff --git a/services/horizon/internal/httpx/middleware.go b/services/horizon/internal/httpx/middleware.go index 17ec794c86..0f05c4e3fc 100644 --- a/services/horizon/internal/httpx/middleware.go +++ b/services/horizon/internal/httpx/middleware.go @@ -238,9 +238,9 @@ func NewHistoryMiddleware(ledgerState *ledger.State, staleThreshold int32, sessi // has been verified and is correct (Otherwise returns `500 Internal Server Error` to prevent // returning invalid data to the user) type StateMiddleware struct { - HorizonSession db.SessionInterface - ContextDBTimeout time.Duration - NoStateVerification bool + HorizonSession db.SessionInterface + CancelDBQueryTimeout time.Duration + NoStateVerification bool } func ingestionStatus(ctx context.Context, q *history.Q) (uint32, bool, error) { @@ -278,7 +278,7 @@ func (m *StateMiddleware) WrapFunc(h http.HandlerFunc) http.HandlerFunc { if routePattern := supportHttp.GetChiRoutePattern(r); routePattern != "" { ctx = context.WithValue(ctx, &db.RouteContextKey, routePattern) } - ctx = setContextDBTimeout(m.ContextDBTimeout, ctx) + ctx = setContextDBTimeout(m.CancelDBQueryTimeout, ctx) session := m.HorizonSession.Clone() q := &history.Q{session} sseRequest := render.Negotiate(r) == render.MimeEventStream diff --git a/services/horizon/internal/httpx/router.go b/services/horizon/internal/httpx/router.go index c90ba33bf0..ecdcff0864 100644 --- a/services/horizon/internal/httpx/router.go +++ b/services/horizon/internal/httpx/router.go @@ -38,7 +38,7 @@ type RouterConfig struct { SSEUpdateFrequency time.Duration StaleThreshold uint ConnectionTimeout time.Duration - DBServerSideTimeout bool + CancelDBQueryTimeout time.Duration MaxHTTPRequestSize uint NetworkPassphrase string MaxPathLength uint @@ -139,13 +139,9 @@ func (r *Router) addMiddleware(config *RouterConfig, } func (r *Router) addRoutes(config *RouterConfig, rateLimiter *throttled.HTTPRateLimiter, ledgerState *ledger.State) { - var contextDBTimeout time.Duration - if config.DBServerSideTimeout { - contextDBTimeout = config.ConnectionTimeout * 15 - } stateMiddleware := StateMiddleware{ - HorizonSession: config.DBSession, - ContextDBTimeout: contextDBTimeout, + HorizonSession: config.DBSession, + CancelDBQueryTimeout: config.CancelDBQueryTimeout, } r.Method(http.MethodGet, "/health", config.HealthCheck) @@ -163,7 +159,7 @@ func (r *Router) addRoutes(config *RouterConfig, rateLimiter *throttled.HTTPRate LedgerSourceFactory: historyLedgerSourceFactory{ledgerState: ledgerState, updateFrequency: config.SSEUpdateFrequency}, } - historyMiddleware := NewHistoryMiddleware(ledgerState, int32(config.StaleThreshold), config.DBSession, contextDBTimeout) + historyMiddleware := NewHistoryMiddleware(ledgerState, int32(config.StaleThreshold), config.DBSession, config.CancelDBQueryTimeout) // State endpoints behind stateMiddleware r.Group(func(r chi.Router) { r.Route("/accounts", func(r chi.Router) { diff --git a/services/horizon/internal/init.go b/services/horizon/internal/init.go index d7fedaf32c..2311833508 100644 --- a/services/horizon/internal/init.go +++ b/services/horizon/internal/init.go @@ -30,9 +30,8 @@ func mustNewDBSession(subservice db.Subservice, databaseURL string, maxIdle, max return db.RegisterMetrics(session, "horizon", subservice, registry) } -func mustInitHorizonDB(app *App) bool { +func mustInitHorizonDB(app *App) { log.Infof("Initializing database...") - var dbServerSideTimeout bool maxIdle := app.config.HorizonDBMaxIdleConnections maxOpen := app.config.HorizonDBMaxOpenConnections @@ -46,40 +45,29 @@ func mustInitHorizonDB(app *App) bool { log.Fatalf("max open connections to horizon db must be greater than %d", ingest.MaxDBConnections) } } + serverSidePGTimeoutConfigs := []db.ClientConfig{ + db.StatementTimeout(app.config.ConnectionTimeout), + db.IdleTransactionTimeout(app.config.ConnectionTimeout), + } if app.config.RoDatabaseURL == "" { - var clientConfigs []db.ClientConfig - if !app.config.Ingest { - // if we are not ingesting then we don't expect to have long db queries / transactions - clientConfigs = append( - clientConfigs, - db.StatementTimeout(app.config.ConnectionTimeout), - db.IdleTransactionTimeout(app.config.ConnectionTimeout), - ) - dbServerSideTimeout = true - } app.historyQ = &history.Q{mustNewDBSession( db.HistorySubservice, app.config.DatabaseURL, maxIdle, maxOpen, app.prometheusRegistry, - clientConfigs..., + serverSidePGTimeoutConfigs..., )} } else { // If RO set, use it for all DB queries - roClientConfigs := []db.ClientConfig{ - db.StatementTimeout(app.config.ConnectionTimeout), - db.IdleTransactionTimeout(app.config.ConnectionTimeout), - } - dbServerSideTimeout = true app.historyQ = &history.Q{mustNewDBSession( db.HistorySubservice, app.config.RoDatabaseURL, maxIdle, maxOpen, app.prometheusRegistry, - roClientConfigs..., + serverSidePGTimeoutConfigs..., )} app.primaryHistoryQ = &history.Q{mustNewDBSession( @@ -88,10 +76,9 @@ func mustInitHorizonDB(app *App) bool { maxIdle, maxOpen, app.prometheusRegistry, + serverSidePGTimeoutConfigs..., )} } - - return dbServerSideTimeout } func initIngester(app *App) { diff --git a/support/db/session_test.go b/support/db/session_test.go index 00718c9b2e..fd816e571c 100644 --- a/support/db/session_test.go +++ b/support/db/session_test.go @@ -169,7 +169,7 @@ func TestDeadlineOverride(t *testing.T) { cancel() _, _, err = sess.context(requestCtx) - assert.EqualError(t, err, "canceling statement due to user request") + assert.EqualError(t, err, "context canceled") } func TestSession(t *testing.T) {