Skip to content

Commit

Permalink
better handling of tail requests when the frontend and querier are ru…
Browse files Browse the repository at this point in the history
…nning in the same process
  • Loading branch information
slim-bean committed Nov 4, 2021
1 parent 2ac409c commit 5122df0
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 7 deletions.
32 changes: 25 additions & 7 deletions pkg/loki/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,17 +238,30 @@ func (t *Loki) initQuerier() (services.Service, error) {
"/loki/api/v1/label": http.HandlerFunc(t.Querier.LabelHandler),
"/loki/api/v1/labels": http.HandlerFunc(t.Querier.LabelHandler),
"/loki/api/v1/label/{name}/values": http.HandlerFunc(t.Querier.LabelHandler),
"/loki/api/v1/tail": http.HandlerFunc(t.Querier.TailHandler),
"/loki/api/v1/series": http.HandlerFunc(t.Querier.SeriesHandler),

"/api/prom/query": http.HandlerFunc(t.Querier.LogQueryHandler),
"/api/prom/label": http.HandlerFunc(t.Querier.LabelHandler),
"/api/prom/label/{name}/values": http.HandlerFunc(t.Querier.LabelHandler),
"/api/prom/tail": http.HandlerFunc(t.Querier.TailHandler),
"/api/prom/series": http.HandlerFunc(t.Querier.SeriesHandler),
}

// We always want to register tail routes externally, tail requests are different from normal queries, they
// are HTTP requests that get upgraded to websocket requests and need to be handled/kept open by the Queriers.
// The frontend has code to proxy these requests, however when running in the same processes
// (such as target=All or target=Read) we don't want the frontend to proxy and instead we want the Queriers
// to directly register these routes.
// In practice this means we always want the queriers to register the tail routes externally, when a querier
// is standalone ALL routes are registered externally, and when it's in the same process as a frontend,
// we disable the proxying of the tail routes in initQueryFrontend() and we still want these routes regiestered
// on the external router.
var alwaysExternalHandlers = map[string]http.Handler{
"/loki/api/v1/tail": http.HandlerFunc(t.Querier.TailHandler),
"/api/prom/tail": http.HandlerFunc(t.Querier.TailHandler),
}

return querier.InitWorkerService(
querierWorkerServiceConfig, queryHandlers, t.Server.HTTP, t.Server.HTTPServer.Handler, t.HTTPAuthMiddleware,
querierWorkerServiceConfig, queryHandlers, alwaysExternalHandlers, t.Server.HTTP, t.Server.HTTPServer.Handler, t.HTTPAuthMiddleware,
)
}

Expand Down Expand Up @@ -480,7 +493,8 @@ func (t *Loki) initQueryFrontend() (_ services.Service, err error) {
).Wrap(frontendHandler)

var defaultHandler http.Handler
if t.Cfg.Frontend.TailProxyURL != "" {
// If this process also acts as a Querier we don't do any proxying of tail requests
if t.Cfg.Frontend.TailProxyURL != "" && !t.ModuleManager.IsModuleRegistered(Querier) {
httpMiddleware := middleware.Merge(
t.HTTPAuthMiddleware,
queryrange.StatsHTTPMiddleware,
Expand Down Expand Up @@ -512,9 +526,13 @@ func (t *Loki) initQueryFrontend() (_ services.Service, err error) {
t.Server.HTTP.Path("/api/prom/label/{name}/values").Methods("GET", "POST").Handler(frontendHandler)
t.Server.HTTP.Path("/api/prom/series").Methods("GET", "POST").Handler(frontendHandler)

// defer tail endpoints to the default handler
t.Server.HTTP.Path("/loki/api/v1/tail").Methods("GET", "POST").Handler(defaultHandler)
t.Server.HTTP.Path("/api/prom/tail").Methods("GET", "POST").Handler(defaultHandler)
// Only register tailing requests if this process does not act as a Querier
// If this process is also a Querier the Querier will register the tail endpoints.
if !t.ModuleManager.IsModuleRegistered(Querier) {
// defer tail endpoints to the default handler
t.Server.HTTP.Path("/loki/api/v1/tail").Methods("GET", "POST").Handler(defaultHandler)
t.Server.HTTP.Path("/api/prom/tail").Methods("GET", "POST").Handler(defaultHandler)
}

if t.frontend == nil {
return services.NewIdleService(nil, func(_ error) error {
Expand Down
7 changes: 7 additions & 0 deletions pkg/querier/worker_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ type WorkerServiceConfig struct {
func InitWorkerService(
cfg WorkerServiceConfig,
queryRoutesToHandlers map[string]http.Handler,
alwaysExternalRoutesToHandlers map[string]http.Handler,
externalRouter *mux.Router,
externalHandler http.Handler,
authMiddleware middleware.Interface,
Expand Down Expand Up @@ -121,6 +122,12 @@ func InitWorkerService(
//Querier worker's max concurrent requests must be the same as the querier setting
(*cfg.QuerierWorkerConfig).MaxConcurrentRequests = cfg.QuerierMaxConcurrent

// There are some routes which are always registered on the external router, add them now and
// wrap them with the httpMiddleware
for route, handler := range alwaysExternalRoutesToHandlers {
externalRouter.Path(route).Methods("GET", "POST").Handler(httpMiddleware.Wrap(handler))
}

//Return a querier worker pointed to the internal querier HTTP handler so there is not a conflict in routes between the querier
//and the query frontend
return querier_worker.NewQuerierWorker(
Expand Down

0 comments on commit 5122df0

Please sign in to comment.