Skip to content

Commit

Permalink
Add cancel-db-query-timeout flag
Browse files Browse the repository at this point in the history
  • Loading branch information
tamirms committed Mar 11, 2024
1 parent 259f224 commit 6036a07
Show file tree
Hide file tree
Showing 7 changed files with 39 additions and 38 deletions.
4 changes: 2 additions & 2 deletions services/horizon/internal/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -494,7 +494,7 @@ func (a *App) init() error {
a.UpdateStellarCoreInfo(a.ctx)

// horizon-db and core-db
dbServerSideTimeout := mustInitHorizonDB(a)
mustInitHorizonDB(a)

if a.config.Ingest {
// ingester
Expand Down Expand Up @@ -532,7 +532,7 @@ func (a *App) init() error {
SSEUpdateFrequency: a.config.SSEUpdateFrequency,
StaleThreshold: a.config.StaleThreshold,
ConnectionTimeout: a.config.ConnectionTimeout,
DBServerSideTimeout: dbServerSideTimeout,
CancelDBQueryTimeout: a.config.CancelDBQueryTimeout,
MaxHTTPRequestSize: a.config.MaxHTTPRequestSize,
NetworkPassphrase: a.config.NetworkPassphrase,
MaxPathLength: a.config.MaxPathLength,
Expand Down
5 changes: 3 additions & 2 deletions services/horizon/internal/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,9 @@ type Config struct {
HorizonDBMaxOpenConnections int
HorizonDBMaxIdleConnections int

SSEUpdateFrequency time.Duration
ConnectionTimeout time.Duration
SSEUpdateFrequency time.Duration
ConnectionTimeout time.Duration
CancelDBQueryTimeout time.Duration
// MaxHTTPRequestSize is the maximum allowed request payload size
MaxHTTPRequestSize uint
RateQuota *throttled.RateQuota
Expand Down
17 changes: 17 additions & 0 deletions services/horizon/internal/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"os"
"os/exec"
"strings"
"time"

"github.com/sirupsen/logrus"
"github.com/spf13/viper"
Expand Down Expand Up @@ -437,6 +438,17 @@ func Flags() (*Config, support.ConfigOptions) {
Usage: "defines the timeout of connection after which 504 response will be sent or stream will be closed, if Horizon is behind a load balancer with idle connection timeout, this should be set to a few seconds less that idle timeout, does not apply to POST /transactions",
UsedInCommands: ApiServerCommands,
},
&support.ConfigOption{
Name: "cancel-db-query-timeout",
ConfigKey: &config.CancelDBQueryTimeout,
OptType: types.Int,
CustomSetValue: support.SetDuration,
Usage: "defines the timeout for when horizon will cancel all postgres queries connected to an HTTP request. The timeout is measured in seconds since the start of the HTTP request. Note, this timeout does not apply to POST /transactions. " +
"The difference between cancel-db-query-timeout and connection-timeout is that connection-timeout applies a postgres statement timeout whereas cancel-db-query-timeout will send an additional request to postgres to cancel the ongoing query. " +
"Generally, cancel-db-query-timeout should be configured to be higher than connection-timeout to allow the postgres statement timeout to kill long running queries without having to send the additional cancel request to postgres. " +
"By default, cancel-db-query-timeout will be set to 2 seconds more than connection-timeout.",
UsedInCommands: ApiServerCommands,
},
&support.ConfigOption{
Name: "max-http-request-size",
ConfigKey: &config.MaxHTTPRequestSize,
Expand Down Expand Up @@ -983,5 +995,10 @@ func ApplyFlags(config *Config, flags support.ConfigOptions, options ApplyOption
" If Horizon is behind both, use --behind-cloudflare only")
}

if config.CancelDBQueryTimeout == 0 {
// the default value for cancel-db-query-timeout is 2 seconds more than connection-timeout
config.CancelDBQueryTimeout = config.ConnectionTimeout + time.Second*2
}

return nil
}
8 changes: 4 additions & 4 deletions services/horizon/internal/httpx/middleware.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,9 +238,9 @@ func NewHistoryMiddleware(ledgerState *ledger.State, staleThreshold int32, sessi
// has been verified and is correct (Otherwise returns `500 Internal Server Error` to prevent
// returning invalid data to the user)
type StateMiddleware struct {
HorizonSession db.SessionInterface
ContextDBTimeout time.Duration
NoStateVerification bool
HorizonSession db.SessionInterface
CancelDBQueryTimeout time.Duration
NoStateVerification bool
}

func ingestionStatus(ctx context.Context, q *history.Q) (uint32, bool, error) {
Expand Down Expand Up @@ -278,7 +278,7 @@ func (m *StateMiddleware) WrapFunc(h http.HandlerFunc) http.HandlerFunc {
if routePattern := supportHttp.GetChiRoutePattern(r); routePattern != "" {
ctx = context.WithValue(ctx, &db.RouteContextKey, routePattern)
}
ctx = setContextDBTimeout(m.ContextDBTimeout, ctx)
ctx = setContextDBTimeout(m.CancelDBQueryTimeout, ctx)
session := m.HorizonSession.Clone()
q := &history.Q{session}
sseRequest := render.Negotiate(r) == render.MimeEventStream
Expand Down
12 changes: 4 additions & 8 deletions services/horizon/internal/httpx/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ type RouterConfig struct {
SSEUpdateFrequency time.Duration
StaleThreshold uint
ConnectionTimeout time.Duration
DBServerSideTimeout bool
CancelDBQueryTimeout time.Duration
MaxHTTPRequestSize uint
NetworkPassphrase string
MaxPathLength uint
Expand Down Expand Up @@ -139,13 +139,9 @@ func (r *Router) addMiddleware(config *RouterConfig,
}

func (r *Router) addRoutes(config *RouterConfig, rateLimiter *throttled.HTTPRateLimiter, ledgerState *ledger.State) {
var contextDBTimeout time.Duration
if config.DBServerSideTimeout {
contextDBTimeout = config.ConnectionTimeout * 15
}
stateMiddleware := StateMiddleware{
HorizonSession: config.DBSession,
ContextDBTimeout: contextDBTimeout,
HorizonSession: config.DBSession,
CancelDBQueryTimeout: config.CancelDBQueryTimeout,
}

r.Method(http.MethodGet, "/health", config.HealthCheck)
Expand All @@ -163,7 +159,7 @@ func (r *Router) addRoutes(config *RouterConfig, rateLimiter *throttled.HTTPRate
LedgerSourceFactory: historyLedgerSourceFactory{ledgerState: ledgerState, updateFrequency: config.SSEUpdateFrequency},
}

historyMiddleware := NewHistoryMiddleware(ledgerState, int32(config.StaleThreshold), config.DBSession, contextDBTimeout)
historyMiddleware := NewHistoryMiddleware(ledgerState, int32(config.StaleThreshold), config.DBSession, config.CancelDBQueryTimeout)
// State endpoints behind stateMiddleware
r.Group(func(r chi.Router) {
r.Route("/accounts", func(r chi.Router) {
Expand Down
29 changes: 8 additions & 21 deletions services/horizon/internal/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,8 @@ func mustNewDBSession(subservice db.Subservice, databaseURL string, maxIdle, max
return db.RegisterMetrics(session, "horizon", subservice, registry)
}

func mustInitHorizonDB(app *App) bool {
func mustInitHorizonDB(app *App) {
log.Infof("Initializing database...")
var dbServerSideTimeout bool

maxIdle := app.config.HorizonDBMaxIdleConnections
maxOpen := app.config.HorizonDBMaxOpenConnections
Expand All @@ -46,40 +45,29 @@ func mustInitHorizonDB(app *App) bool {
log.Fatalf("max open connections to horizon db must be greater than %d", ingest.MaxDBConnections)
}
}
serverSidePGTimeoutConfigs := []db.ClientConfig{
db.StatementTimeout(app.config.ConnectionTimeout),
db.IdleTransactionTimeout(app.config.ConnectionTimeout),
}

if app.config.RoDatabaseURL == "" {
var clientConfigs []db.ClientConfig
if !app.config.Ingest {
// if we are not ingesting then we don't expect to have long db queries / transactions
clientConfigs = append(
clientConfigs,
db.StatementTimeout(app.config.ConnectionTimeout),
db.IdleTransactionTimeout(app.config.ConnectionTimeout),
)
dbServerSideTimeout = true
}
app.historyQ = &history.Q{mustNewDBSession(
db.HistorySubservice,
app.config.DatabaseURL,
maxIdle,
maxOpen,
app.prometheusRegistry,
clientConfigs...,
serverSidePGTimeoutConfigs...,
)}
} else {
// If RO set, use it for all DB queries
roClientConfigs := []db.ClientConfig{
db.StatementTimeout(app.config.ConnectionTimeout),
db.IdleTransactionTimeout(app.config.ConnectionTimeout),
}
dbServerSideTimeout = true
app.historyQ = &history.Q{mustNewDBSession(
db.HistorySubservice,
app.config.RoDatabaseURL,
maxIdle,
maxOpen,
app.prometheusRegistry,
roClientConfigs...,
serverSidePGTimeoutConfigs...,
)}

app.primaryHistoryQ = &history.Q{mustNewDBSession(
Expand All @@ -88,10 +76,9 @@ func mustInitHorizonDB(app *App) bool {
maxIdle,
maxOpen,
app.prometheusRegistry,
serverSidePGTimeoutConfigs...,
)}
}

return dbServerSideTimeout
}

func initIngester(app *App) {
Expand Down
2 changes: 1 addition & 1 deletion support/db/session_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ func TestDeadlineOverride(t *testing.T) {

cancel()
_, _, err = sess.context(requestCtx)
assert.EqualError(t, err, "canceling statement due to user request")
assert.EqualError(t, err, "context canceled")
}

func TestSession(t *testing.T) {
Expand Down

0 comments on commit 6036a07

Please sign in to comment.