Skip to content

Commit

Permalink
Loki: fix handling of tail requests when using target all or read (
Browse files Browse the repository at this point in the history
…#4642)

* better handling of tail requests when the frontend and querier are running in the same process

* adding function to determine if a module is enable based on the target supplied to Loki

* fix tests and incorrect registering of tail routes

* renamed function and made check for active recursive
  • Loading branch information
slim-bean authored Nov 4, 2021
1 parent 2d24e2e commit 89ee022
Show file tree
Hide file tree
Showing 5 changed files with 133 additions and 31 deletions.
28 changes: 28 additions & 0 deletions pkg/loki/loki.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,7 @@ type Loki struct {
// set during initialization
ModuleManager *modules.Manager
serviceMap map[string]services.Service
deps map[string][]string

Server *server.Server
ring *ring.Ring
Expand Down Expand Up @@ -481,7 +482,34 @@ func (t *Loki) setupModuleManager() error {
}
}

t.deps = deps
t.ModuleManager = mm

return nil
}

func (t *Loki) isModuleActive(m string) bool {
for _, target := range t.Cfg.Target {
if target == m {
return true
}
if t.recursiveIsModuleActive(target, m) {
return true
}
}
return false
}

func (t *Loki) recursiveIsModuleActive(target, m string) bool {
if targetDeps, ok := t.deps[target]; ok {
for _, dep := range targetDeps {
if dep == m {
return true
}
if t.recursiveIsModuleActive(dep, m) {
return true
}
}
}
return false
}
34 changes: 34 additions & 0 deletions pkg/loki/loki_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (
"testing"
"time"

"github.com/grafana/dskit/flagext"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -52,3 +54,35 @@ func TestFlagDefaults(t *testing.T) {
require.Equal(t, c.Server.GRPCServerPingWithoutStreamAllowed, true)
require.Contains(t, gotFlags[flagToCheck], "(default true)")
}

func TestLoki_isModuleEnabled(t1 *testing.T) {
tests := []struct {
name string
target flagext.StringSliceCSV
module string
want bool
}{
{name: "Target All includes Querier", target: flagext.StringSliceCSV{"all"}, module: Querier, want: true},
{name: "Target Querier does not include Distributor", target: flagext.StringSliceCSV{"querier"}, module: Distributor, want: false},
{name: "Target Read includes Query Frontend", target: flagext.StringSliceCSV{"read"}, module: QueryFrontend, want: true},
{name: "Target Querier does not include Query Frontend", target: flagext.StringSliceCSV{"querier"}, module: QueryFrontend, want: false},
{name: "Target Query Frontend does not include Querier", target: flagext.StringSliceCSV{"query-frontend"}, module: Querier, want: false},
{name: "Multi target includes querier", target: flagext.StringSliceCSV{"query-frontend", "query-scheduler", "querier"}, module: Querier, want: true},
{name: "Multi target does not include distributor", target: flagext.StringSliceCSV{"query-frontend", "query-scheduler", "querier"}, module: Distributor, want: false},
{name: "Test recursive dep, Ingester -> TenantConfigs -> RuntimeConfig", target: flagext.StringSliceCSV{"ingester"}, module: RuntimeConfig, want: true},
}
for _, tt := range tests {
t1.Run(tt.name, func(t1 *testing.T) {
t := &Loki{
Cfg: Config{
Target: tt.target,
},
}
err := t.setupModuleManager()
assert.NoError(t1, err)
if got := t.isModuleActive(tt.module); got != tt.want {
t1.Errorf("isModuleActive() = %v, want %v", got, tt.want)
}
})
}
}
32 changes: 25 additions & 7 deletions pkg/loki/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,17 +237,30 @@ func (t *Loki) initQuerier() (services.Service, error) {
"/loki/api/v1/label": http.HandlerFunc(t.Querier.LabelHandler),
"/loki/api/v1/labels": http.HandlerFunc(t.Querier.LabelHandler),
"/loki/api/v1/label/{name}/values": http.HandlerFunc(t.Querier.LabelHandler),
"/loki/api/v1/tail": http.HandlerFunc(t.Querier.TailHandler),
"/loki/api/v1/series": http.HandlerFunc(t.Querier.SeriesHandler),

"/api/prom/query": http.HandlerFunc(t.Querier.LogQueryHandler),
"/api/prom/label": http.HandlerFunc(t.Querier.LabelHandler),
"/api/prom/label/{name}/values": http.HandlerFunc(t.Querier.LabelHandler),
"/api/prom/tail": http.HandlerFunc(t.Querier.TailHandler),
"/api/prom/series": http.HandlerFunc(t.Querier.SeriesHandler),
}

// We always want to register tail routes externally, tail requests are different from normal queries, they
// are HTTP requests that get upgraded to websocket requests and need to be handled/kept open by the Queriers.
// The frontend has code to proxy these requests, however when running in the same processes
// (such as target=All or target=Read) we don't want the frontend to proxy and instead we want the Queriers
// to directly register these routes.
// In practice this means we always want the queriers to register the tail routes externally, when a querier
// is standalone ALL routes are registered externally, and when it's in the same process as a frontend,
// we disable the proxying of the tail routes in initQueryFrontend() and we still want these routes regiestered
// on the external router.
var alwaysExternalHandlers = map[string]http.Handler{
"/loki/api/v1/tail": http.HandlerFunc(t.Querier.TailHandler),
"/api/prom/tail": http.HandlerFunc(t.Querier.TailHandler),
}

return querier.InitWorkerService(
querierWorkerServiceConfig, queryHandlers, t.Server.HTTP, t.Server.HTTPServer.Handler, t.HTTPAuthMiddleware,
querierWorkerServiceConfig, queryHandlers, alwaysExternalHandlers, t.Server.HTTP, t.Server.HTTPServer.Handler, t.HTTPAuthMiddleware,
)
}

Expand Down Expand Up @@ -479,7 +492,8 @@ func (t *Loki) initQueryFrontend() (_ services.Service, err error) {
).Wrap(frontendHandler)

var defaultHandler http.Handler
if t.Cfg.Frontend.TailProxyURL != "" {
// If this process also acts as a Querier we don't do any proxying of tail requests
if t.Cfg.Frontend.TailProxyURL != "" && !t.isModuleActive(Querier) {
httpMiddleware := middleware.Merge(
t.HTTPAuthMiddleware,
queryrange.StatsHTTPMiddleware,
Expand Down Expand Up @@ -511,9 +525,13 @@ func (t *Loki) initQueryFrontend() (_ services.Service, err error) {
t.Server.HTTP.Path("/api/prom/label/{name}/values").Methods("GET", "POST").Handler(frontendHandler)
t.Server.HTTP.Path("/api/prom/series").Methods("GET", "POST").Handler(frontendHandler)

// defer tail endpoints to the default handler
t.Server.HTTP.Path("/loki/api/v1/tail").Methods("GET", "POST").Handler(defaultHandler)
t.Server.HTTP.Path("/api/prom/tail").Methods("GET", "POST").Handler(defaultHandler)
// Only register tailing requests if this process does not act as a Querier
// If this process is also a Querier the Querier will register the tail endpoints.
if !t.isModuleActive(Querier) {
// defer tail endpoints to the default handler
t.Server.HTTP.Path("/loki/api/v1/tail").Methods("GET", "POST").Handler(defaultHandler)
t.Server.HTTP.Path("/api/prom/tail").Methods("GET", "POST").Handler(defaultHandler)
}

if t.frontend == nil {
return services.NewIdleService(nil, func(_ error) error {
Expand Down
48 changes: 24 additions & 24 deletions pkg/querier/worker_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,16 +46,35 @@ type WorkerServiceConfig struct {
func InitWorkerService(
cfg WorkerServiceConfig,
queryRoutesToHandlers map[string]http.Handler,
alwaysExternalRoutesToHandlers map[string]http.Handler,
externalRouter *mux.Router,
externalHandler http.Handler,
authMiddleware middleware.Interface,
) (serve services.Service, err error) {

// Create a couple Middlewares used to handle panics, perform auth, and parse Form's in http request
internalMiddleware := middleware.Merge(
serverutil.RecoveryHTTPMiddleware,
authMiddleware,
serverutil.NewPrepopulateMiddleware(),
)
// External middleware also needs to set JSON content type headers
externalMiddleware := middleware.Merge(
internalMiddleware,
serverutil.ResponseJSONMiddleware(),
)

internalRouter := mux.NewRouter()
for route, handler := range queryRoutesToHandlers {
internalRouter.Path(route).Methods("GET", "POST").Handler(handler)
}

// There are some routes which are always registered on the external router, add them now and
// wrap them with the externalMiddleware
for route, handler := range alwaysExternalRoutesToHandlers {
externalRouter.Path(route).Methods("GET", "POST").Handler(externalMiddleware.Wrap(handler))
}

// If the querier is running standalone without the query-frontend or query-scheduler, we must register the internal
// HTTP handler externally (as it's the only handler that needs to register on querier routes) and provide the
// external Loki Server HTTP handler to the frontend worker to ensure requests it processes use the default
Expand All @@ -70,7 +89,10 @@ func InitWorkerService(
idx++
}

registerRoutesExternally(routes, externalRouter, internalRouter, authMiddleware)
// Register routes externally
for _, route := range routes {
externalRouter.Path(route).Methods("GET", "POST").Handler(externalMiddleware.Wrap(internalRouter))
}

//If no frontend or scheduler address has been configured, then there is no place for the
//querier worker to request work from, so no need to start a worker service
Expand Down Expand Up @@ -107,16 +129,7 @@ func InitWorkerService(
return "internalQuerier"
}))

// If queries are processed using the external HTTP Server, we need wrap the internal querier with
// HTTP router with middleware to parse the tenant ID from the HTTP header and inject it into the
// request context, as well as make sure any x-www-url-formencoded params are correctly parsed
httpMiddleware := middleware.Merge(
serverutil.RecoveryHTTPMiddleware,
authMiddleware,
serverutil.NewPrepopulateMiddleware(),
)

internalHandler = httpMiddleware.Wrap(internalHandler)
internalHandler = internalMiddleware.Wrap(internalHandler)

//Querier worker's max concurrent requests must be the same as the querier setting
(*cfg.QuerierWorkerConfig).MaxConcurrentRequests = cfg.QuerierMaxConcurrent
Expand All @@ -131,19 +144,6 @@ func InitWorkerService(
prometheus.DefaultRegisterer)
}

func registerRoutesExternally(routes []string, externalRouter *mux.Router, internalHandler http.Handler, authMiddleware middleware.Interface) {
httpMiddleware := middleware.Merge(
serverutil.RecoveryHTTPMiddleware,
authMiddleware,
serverutil.NewPrepopulateMiddleware(),
serverutil.ResponseJSONMiddleware(),
)

for _, route := range routes {
externalRouter.Path(route).Methods("GET", "POST").Handler(httpMiddleware.Wrap(internalHandler))
}
}

func querierRunningStandalone(cfg WorkerServiceConfig) bool {
runningStandalone := !cfg.QueryFrontendEnabled && !cfg.QuerySchedulerEnabled && !cfg.ReadEnabled && !cfg.AllEnabled
level.Debug(util_log.Logger).Log(
Expand Down
22 changes: 22 additions & 0 deletions pkg/querier/worker_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,13 @@ func Test_InitQuerierService(t *testing.T) {
}),
}

var alwaysExternalHandlers = map[string]http.Handler{
"/loki/api/v1/tail": http.HandlerFunc(func(res http.ResponseWriter, req *http.Request) {
_, err := res.Write([]byte("test tail handler"))
require.NoError(t, err)
}),
}

testContext := func(config WorkerServiceConfig, authMiddleware middleware.Interface) (*mux.Router, services.Service) {
externalRouter := mux.NewRouter()

Expand All @@ -32,6 +39,7 @@ func Test_InitQuerierService(t *testing.T) {
querierWorkerService, err := InitWorkerService(
config,
mockQueryHandlers,
alwaysExternalHandlers,
externalRouter,
http.HandlerFunc(externalRouter.ServeHTTP),
authMiddleware,
Expand All @@ -57,6 +65,13 @@ func Test_InitQuerierService(t *testing.T) {
externalRouter.ServeHTTP(recorder, request)
assert.Equal(t, 200, recorder.Code)
assert.Equal(t, "test handler", recorder.Body.String())

// Tail endpoints always external
recorder = httptest.NewRecorder()
request = httptest.NewRequest("GET", "/loki/api/v1/tail", nil)
externalRouter.ServeHTTP(recorder, request)
assert.Equal(t, 200, recorder.Code)
assert.Equal(t, "test tail handler", recorder.Body.String())
})

t.Run("wrap external handler with auth middleware", func(t *testing.T) {
Expand Down Expand Up @@ -187,6 +202,13 @@ func Test_InitQuerierService(t *testing.T) {
request := httptest.NewRequest("GET", "/loki/api/v1/query", nil)
externalRouter.ServeHTTP(recorder, request)
assert.Equal(t, 404, recorder.Code)

// Tail endpoints always external
recorder = httptest.NewRecorder()
request = httptest.NewRequest("GET", "/loki/api/v1/tail", nil)
externalRouter.ServeHTTP(recorder, request)
assert.Equal(t, 200, recorder.Code)
assert.Equal(t, "test tail handler", recorder.Body.String())
}
})

Expand Down

0 comments on commit 89ee022

Please sign in to comment.