Skip to content

Commit

Permalink
ReplicaSyncMiddleware
Browse files Browse the repository at this point in the history
  • Loading branch information
bartekn committed Apr 23, 2021
1 parent 7d7e580 commit 2041179
Show file tree
Hide file tree
Showing 7 changed files with 66 additions and 1 deletion.
5 changes: 5 additions & 0 deletions services/horizon/internal/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ type App struct {
config Config
webServer *httpx.Server
historyQ *history.Q
roHistoryQ *history.Q
ctx context.Context
cancel func()
horizonVersion string
Expand Down Expand Up @@ -502,6 +503,10 @@ func (a *App) init() error {
},
}

if a.roHistoryQ != nil {
routerConfig.RoDBSession = a.roHistoryQ.Session
}

var err error
config := httpx.ServerConfig{
Port: uint16(a.config.Port),
Expand Down
1 change: 1 addition & 0 deletions services/horizon/internal/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
// app's main function and is provided to NewApp.
type Config struct {
DatabaseURL string
RoDatabaseURL string
HistoryArchiveURLs []string
Port uint
AdminPort uint
Expand Down
7 changes: 7 additions & 0 deletions services/horizon/internal/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,13 @@ func Flags() (*Config, support.ConfigOptions) {
Required: true,
Usage: "horizon postgres database to connect with",
},
&support.ConfigOption{
Name: "ro-database-url",
ConfigKey: &config.RoDatabaseURL,
OptType: types.String,
Required: false,
Usage: "horizon postgres read-replica to connect with, when set it will return stale history error when replica is behind primary",
},
&support.ConfigOption{
Name: StellarCoreBinaryPathName,
OptType: types.String,
Expand Down
34 changes: 34 additions & 0 deletions services/horizon/internal/httpx/middleware.go
Original file line number Diff line number Diff line change
Expand Up @@ -354,3 +354,37 @@ func (m *StateMiddleware) WrapFunc(h http.HandlerFunc) http.HandlerFunc {
func (m *StateMiddleware) Wrap(h http.Handler) http.Handler {
return m.WrapFunc(h.ServeHTTP)
}

type ReplicaSyncMiddleware struct {
PrimaryHistoryQ *history.Q
ReplicaHistoryQ *history.Q
}

// WrapFunc executes the middleware on a given HTTP handler function
func (m *ReplicaSyncMiddleware) WrapFunc(h http.HandlerFunc) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
replicaIngestLedger, err := m.ReplicaHistoryQ.GetLastLedgerIngestNonBlocking()
if err != nil {
problem.Render(r.Context(), w, err)
return
}

primaryIngestLedger, err := m.PrimaryHistoryQ.GetLastLedgerIngestNonBlocking()
if err != nil {
problem.Render(r.Context(), w, err)
return
}

if primaryIngestLedger > replicaIngestLedger {
// If this is the final problem.P is TBD.
problem.Render(r.Context(), w, hProblem.StaleHistory)
return
}

h.ServeHTTP(w, r)
}
}

func (m *ReplicaSyncMiddleware) Wrap(h http.Handler) http.Handler {
return m.WrapFunc(h.ServeHTTP)
}
10 changes: 10 additions & 0 deletions services/horizon/internal/httpx/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/stellar/throttled"

"github.com/stellar/go/services/horizon/internal/actions"
"github.com/stellar/go/services/horizon/internal/db2/history"
"github.com/stellar/go/services/horizon/internal/ledger"
"github.com/stellar/go/services/horizon/internal/paths"
"github.com/stellar/go/services/horizon/internal/render/sse"
Expand All @@ -29,6 +30,7 @@ const maxAssetsForPathFinding = 15

type RouterConfig struct {
DBSession *db.Session
RoDBSession *db.Session
TxSubmitter *txsub.System
RateQuota *throttled.RateQuota

Expand Down Expand Up @@ -99,6 +101,14 @@ func (r *Router) addMiddleware(config *RouterConfig,
r.Use(rateLimitter.RateLimit)
}

if config.RoDBSession != nil {
replicaSyncMiddleware := ReplicaSyncMiddleware{
PrimaryHistoryQ: &history.Q{config.DBSession},
ReplicaHistoryQ: &history.Q{config.RoDBSession},
}
r.Use(replicaSyncMiddleware.Wrap)
}

// Internal middlewares
r.Internal.Use(chimiddleware.StripSlashes)
r.Internal.Use(chimiddleware.RequestID)
Expand Down
8 changes: 8 additions & 0 deletions services/horizon/internal/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,14 @@ func mustInitHorizonDB(app *App) {
maxIdle,
maxOpen,
)}

if app.config.RoDatabaseURL != "" {
app.roHistoryQ = &history.Q{mustNewDBSession(
app.config.RoDatabaseURL,
maxIdle,
maxOpen,
)}
}
}

func initIngester(app *App) {
Expand Down
2 changes: 1 addition & 1 deletion services/horizon/internal/render/problem/problem.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ var (
StaleHistory = problem.P{
Type: "stale_history",
Title: "Historical DB Is Too Stale",
Status: http.StatusServiceUnavailable,
Status: http.StatusLoopDetected, // Temp change to find stale resps in metrics.
Detail: "This horizon instance is configured to reject client requests " +
"when it can determine that the history database is lagging too far " +
"behind the connected instance of stellar-core. If you operate this " +
Expand Down

0 comments on commit 2041179

Please sign in to comment.