Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Loki: fix handling of tail requests when using target all or read #4642

Merged
merged 4 commits into from
Nov 4, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 28 additions & 0 deletions pkg/loki/loki.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,7 @@ type Loki struct {
// set during initialization
ModuleManager *modules.Manager
serviceMap map[string]services.Service
deps map[string][]string

Server *server.Server
ring *ring.Ring
Expand Down Expand Up @@ -481,7 +482,34 @@ func (t *Loki) setupModuleManager() error {
}
}

t.deps = deps
t.ModuleManager = mm

return nil
}

func (t *Loki) isModuleActive(m string) bool {
for _, target := range t.Cfg.Target {
if target == m {
return true
}
if t.recursiveIsModuleActive(target, m) {
return true
}
}
return false
}

func (t *Loki) recursiveIsModuleActive(target, m string) bool {
if targetDeps, ok := t.deps[target]; ok {
for _, dep := range targetDeps {
if dep == m {
return true
}
if t.recursiveIsModuleActive(dep, m) {
return true
}
}
}
return false
}
34 changes: 34 additions & 0 deletions pkg/loki/loki_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (
"testing"
"time"

"github.com/grafana/dskit/flagext"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -52,3 +54,35 @@ func TestFlagDefaults(t *testing.T) {
require.Equal(t, c.Server.GRPCServerPingWithoutStreamAllowed, true)
require.Contains(t, gotFlags[flagToCheck], "(default true)")
}

func TestLoki_isModuleEnabled(t1 *testing.T) {
tests := []struct {
name string
target flagext.StringSliceCSV
module string
want bool
}{
{name: "Target All includes Querier", target: flagext.StringSliceCSV{"all"}, module: Querier, want: true},
{name: "Target Querier does not include Distributor", target: flagext.StringSliceCSV{"querier"}, module: Distributor, want: false},
{name: "Target Read includes Query Frontend", target: flagext.StringSliceCSV{"read"}, module: QueryFrontend, want: true},
{name: "Target Querier does not include Query Frontend", target: flagext.StringSliceCSV{"querier"}, module: QueryFrontend, want: false},
{name: "Target Query Frontend does not include Querier", target: flagext.StringSliceCSV{"query-frontend"}, module: Querier, want: false},
{name: "Multi target includes querier", target: flagext.StringSliceCSV{"query-frontend", "query-scheduler", "querier"}, module: Querier, want: true},
{name: "Multi target does not include distributor", target: flagext.StringSliceCSV{"query-frontend", "query-scheduler", "querier"}, module: Distributor, want: false},
{name: "Test recursive dep, Ingester -> TenantConfigs -> RuntimeConfig", target: flagext.StringSliceCSV{"ingester"}, module: RuntimeConfig, want: true},
}
for _, tt := range tests {
t1.Run(tt.name, func(t1 *testing.T) {
t := &Loki{
Cfg: Config{
Target: tt.target,
},
}
err := t.setupModuleManager()
assert.NoError(t1, err)
if got := t.isModuleActive(tt.module); got != tt.want {
t1.Errorf("isModuleActive() = %v, want %v", got, tt.want)
}
})
}
}
32 changes: 25 additions & 7 deletions pkg/loki/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,17 +238,30 @@ func (t *Loki) initQuerier() (services.Service, error) {
"/loki/api/v1/label": http.HandlerFunc(t.Querier.LabelHandler),
"/loki/api/v1/labels": http.HandlerFunc(t.Querier.LabelHandler),
"/loki/api/v1/label/{name}/values": http.HandlerFunc(t.Querier.LabelHandler),
"/loki/api/v1/tail": http.HandlerFunc(t.Querier.TailHandler),
"/loki/api/v1/series": http.HandlerFunc(t.Querier.SeriesHandler),

"/api/prom/query": http.HandlerFunc(t.Querier.LogQueryHandler),
"/api/prom/label": http.HandlerFunc(t.Querier.LabelHandler),
"/api/prom/label/{name}/values": http.HandlerFunc(t.Querier.LabelHandler),
"/api/prom/tail": http.HandlerFunc(t.Querier.TailHandler),
"/api/prom/series": http.HandlerFunc(t.Querier.SeriesHandler),
}

// We always want to register tail routes externally, tail requests are different from normal queries, they
// are HTTP requests that get upgraded to websocket requests and need to be handled/kept open by the Queriers.
// The frontend has code to proxy these requests, however when running in the same processes
// (such as target=All or target=Read) we don't want the frontend to proxy and instead we want the Queriers
// to directly register these routes.
// In practice this means we always want the queriers to register the tail routes externally, when a querier
// is standalone ALL routes are registered externally, and when it's in the same process as a frontend,
// we disable the proxying of the tail routes in initQueryFrontend() and we still want these routes regiestered
// on the external router.
var alwaysExternalHandlers = map[string]http.Handler{
"/loki/api/v1/tail": http.HandlerFunc(t.Querier.TailHandler),
"/api/prom/tail": http.HandlerFunc(t.Querier.TailHandler),
}

return querier.InitWorkerService(
querierWorkerServiceConfig, queryHandlers, t.Server.HTTP, t.Server.HTTPServer.Handler, t.HTTPAuthMiddleware,
querierWorkerServiceConfig, queryHandlers, alwaysExternalHandlers, t.Server.HTTP, t.Server.HTTPServer.Handler, t.HTTPAuthMiddleware,
)
}

Expand Down Expand Up @@ -480,7 +493,8 @@ func (t *Loki) initQueryFrontend() (_ services.Service, err error) {
).Wrap(frontendHandler)

var defaultHandler http.Handler
if t.Cfg.Frontend.TailProxyURL != "" {
// If this process also acts as a Querier we don't do any proxying of tail requests
if t.Cfg.Frontend.TailProxyURL != "" && !t.isModuleActive(Querier) {
httpMiddleware := middleware.Merge(
t.HTTPAuthMiddleware,
queryrange.StatsHTTPMiddleware,
Expand Down Expand Up @@ -512,9 +526,13 @@ func (t *Loki) initQueryFrontend() (_ services.Service, err error) {
t.Server.HTTP.Path("/api/prom/label/{name}/values").Methods("GET", "POST").Handler(frontendHandler)
t.Server.HTTP.Path("/api/prom/series").Methods("GET", "POST").Handler(frontendHandler)

// defer tail endpoints to the default handler
t.Server.HTTP.Path("/loki/api/v1/tail").Methods("GET", "POST").Handler(defaultHandler)
t.Server.HTTP.Path("/api/prom/tail").Methods("GET", "POST").Handler(defaultHandler)
// Only register tailing requests if this process does not act as a Querier
// If this process is also a Querier the Querier will register the tail endpoints.
if !t.isModuleActive(Querier) {
// defer tail endpoints to the default handler
t.Server.HTTP.Path("/loki/api/v1/tail").Methods("GET", "POST").Handler(defaultHandler)
t.Server.HTTP.Path("/api/prom/tail").Methods("GET", "POST").Handler(defaultHandler)
}

if t.frontend == nil {
return services.NewIdleService(nil, func(_ error) error {
Expand Down
48 changes: 24 additions & 24 deletions pkg/querier/worker_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,16 +46,35 @@ type WorkerServiceConfig struct {
func InitWorkerService(
cfg WorkerServiceConfig,
queryRoutesToHandlers map[string]http.Handler,
alwaysExternalRoutesToHandlers map[string]http.Handler,
externalRouter *mux.Router,
externalHandler http.Handler,
authMiddleware middleware.Interface,
) (serve services.Service, err error) {

// Create a couple Middlewares used to handle panics, perform auth, and parse Form's in http request
internalMiddleware := middleware.Merge(
serverutil.RecoveryHTTPMiddleware,
authMiddleware,
serverutil.NewPrepopulateMiddleware(),
)
// External middleware also needs to set JSON content type headers
externalMiddleware := middleware.Merge(
internalMiddleware,
serverutil.ResponseJSONMiddleware(),
)

internalRouter := mux.NewRouter()
for route, handler := range queryRoutesToHandlers {
internalRouter.Path(route).Methods("GET", "POST").Handler(handler)
}

// There are some routes which are always registered on the external router, add them now and
// wrap them with the externalMiddleware
for route, handler := range alwaysExternalRoutesToHandlers {
externalRouter.Path(route).Methods("GET", "POST").Handler(externalMiddleware.Wrap(handler))
}

// If the querier is running standalone without the query-frontend or query-scheduler, we must register the internal
// HTTP handler externally (as it's the only handler that needs to register on querier routes) and provide the
// external Loki Server HTTP handler to the frontend worker to ensure requests it processes use the default
Expand All @@ -70,7 +89,10 @@ func InitWorkerService(
idx++
}

registerRoutesExternally(routes, externalRouter, internalRouter, authMiddleware)
// Register routes externally
for _, route := range routes {
externalRouter.Path(route).Methods("GET", "POST").Handler(externalMiddleware.Wrap(internalRouter))
}

//If no frontend or scheduler address has been configured, then there is no place for the
//querier worker to request work from, so no need to start a worker service
Expand Down Expand Up @@ -107,16 +129,7 @@ func InitWorkerService(
return "internalQuerier"
}))

// If queries are processed using the external HTTP Server, we need wrap the internal querier with
// HTTP router with middleware to parse the tenant ID from the HTTP header and inject it into the
// request context, as well as make sure any x-www-url-formencoded params are correctly parsed
httpMiddleware := middleware.Merge(
serverutil.RecoveryHTTPMiddleware,
authMiddleware,
serverutil.NewPrepopulateMiddleware(),
)

internalHandler = httpMiddleware.Wrap(internalHandler)
internalHandler = internalMiddleware.Wrap(internalHandler)

//Querier worker's max concurrent requests must be the same as the querier setting
(*cfg.QuerierWorkerConfig).MaxConcurrentRequests = cfg.QuerierMaxConcurrent
Expand All @@ -131,19 +144,6 @@ func InitWorkerService(
prometheus.DefaultRegisterer)
}

func registerRoutesExternally(routes []string, externalRouter *mux.Router, internalHandler http.Handler, authMiddleware middleware.Interface) {
httpMiddleware := middleware.Merge(
serverutil.RecoveryHTTPMiddleware,
authMiddleware,
serverutil.NewPrepopulateMiddleware(),
serverutil.ResponseJSONMiddleware(),
)

for _, route := range routes {
externalRouter.Path(route).Methods("GET", "POST").Handler(httpMiddleware.Wrap(internalHandler))
}
}

func querierRunningStandalone(cfg WorkerServiceConfig) bool {
runningStandalone := !cfg.QueryFrontendEnabled && !cfg.QuerySchedulerEnabled && !cfg.ReadEnabled && !cfg.AllEnabled
level.Debug(util_log.Logger).Log(
Expand Down
22 changes: 22 additions & 0 deletions pkg/querier/worker_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,13 @@ func Test_InitQuerierService(t *testing.T) {
}),
}

var alwaysExternalHandlers = map[string]http.Handler{
"/loki/api/v1/tail": http.HandlerFunc(func(res http.ResponseWriter, req *http.Request) {
_, err := res.Write([]byte("test tail handler"))
require.NoError(t, err)
}),
}

testContext := func(config WorkerServiceConfig, authMiddleware middleware.Interface) (*mux.Router, services.Service) {
externalRouter := mux.NewRouter()

Expand All @@ -32,6 +39,7 @@ func Test_InitQuerierService(t *testing.T) {
querierWorkerService, err := InitWorkerService(
config,
mockQueryHandlers,
alwaysExternalHandlers,
externalRouter,
http.HandlerFunc(externalRouter.ServeHTTP),
authMiddleware,
Expand All @@ -57,6 +65,13 @@ func Test_InitQuerierService(t *testing.T) {
externalRouter.ServeHTTP(recorder, request)
assert.Equal(t, 200, recorder.Code)
assert.Equal(t, "test handler", recorder.Body.String())

// Tail endpoints always external
recorder = httptest.NewRecorder()
request = httptest.NewRequest("GET", "/loki/api/v1/tail", nil)
externalRouter.ServeHTTP(recorder, request)
assert.Equal(t, 200, recorder.Code)
assert.Equal(t, "test tail handler", recorder.Body.String())
})

t.Run("wrap external handler with auth middleware", func(t *testing.T) {
Expand Down Expand Up @@ -187,6 +202,13 @@ func Test_InitQuerierService(t *testing.T) {
request := httptest.NewRequest("GET", "/loki/api/v1/query", nil)
externalRouter.ServeHTTP(recorder, request)
assert.Equal(t, 404, recorder.Code)

// Tail endpoints always external
recorder = httptest.NewRecorder()
request = httptest.NewRequest("GET", "/loki/api/v1/tail", nil)
externalRouter.ServeHTTP(recorder, request)
assert.Equal(t, 200, recorder.Code)
assert.Equal(t, "test tail handler", recorder.Body.String())
}
})

Expand Down