Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Querier: Split gRPC client into two. #12726

Merged
merged 18 commits into from
May 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 21 additions & 6 deletions docs/sources/shared/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -543,19 +543,19 @@ The `alibabacloud_storage_config` block configures the connection to Alibaba Clo

```yaml
# Name of OSS bucket.
# CLI flag: -common.storage.oss.bucketname
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this flag docs was bugged this whole time

# CLI flag: -<prefix>.storage.oss.bucketname
[bucket: <string> | default = ""]

# oss Endpoint to connect to.
# CLI flag: -common.storage.oss.endpoint
# CLI flag: -<prefix>.storage.oss.endpoint
[endpoint: <string> | default = ""]

# alibabacloud Access Key ID
# CLI flag: -common.storage.oss.access-key-id
# CLI flag: -<prefix>.storage.oss.access-key-id
[access_key_id: <string> | default = ""]

# alibabacloud Secret Access Key
# CLI flag: -common.storage.oss.secret-access-key
# CLI flag: -<prefix>.storage.oss.secret-access-key
[secret_access_key: <string> | default = ""]
```

Expand Down Expand Up @@ -2236,10 +2236,23 @@ The `frontend_worker` configures the worker - running within the Loki querier -
# CLI flag: -querier.id
[id: <string> | default = ""]

# The grpc_client block configures the gRPC client used to communicate between a
# client and server component in Loki.
# Configures the querier gRPC client used to communicate with the
# query-frontend. Shouldn't be used in conjunction with 'grpc_client_config'.
# The CLI flags prefix for this block configuration is:
# querier.frontend-grpc-client
[query_frontend_grpc_client: <grpc_client>]

# Configures the querier gRPC client used to communicate with the query-frontend
# and with the query-scheduler if 'query_scheduler_grpc_client' isn't defined.
# This shouldn't be used if 'query_frontend_grpc_client' is defined.
# The CLI flags prefix for this block configuration is: querier.frontend-client
[grpc_client_config: <grpc_client>]

# Configures the querier gRPC client used to communicate with the
# query-scheduler. If not defined, 'grpc_client_config' is used instead.
# The CLI flags prefix for this block configuration is:
# querier.scheduler-grpc-client
[query_scheduler_grpc_client: <grpc_client>]
```

### gcs_storage_config
Expand Down Expand Up @@ -2297,6 +2310,8 @@ The `grpc_client` block configures the gRPC client used to communicate between a
- `ingester.client`
- `pattern-ingester.client`
- `querier.frontend-client`
- `querier.frontend-grpc-client`
- `querier.scheduler-grpc-client`
- `query-scheduler.grpc-client-config`
- `ruler.client`
- `tsdb.shipper.index-gateway-client.grpc`
Expand Down
23 changes: 23 additions & 0 deletions pkg/loki/config_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,9 @@ func (c *ConfigWrapper) ApplyDynamicConfig() cfg.Source {
applyIngesterFinalSleep(r)
applyIngesterReplicationFactor(r)
applyChunkRetain(r, &defaults)
if err := applyCommonQuerierWorkerGRPCConfig(r, &defaults); err != nil {
return err
}

return nil
}
Expand Down Expand Up @@ -684,3 +687,23 @@ func applyChunkRetain(cfg, defaults *ConfigWrapper) {
}
}
}

func applyCommonQuerierWorkerGRPCConfig(cfg, defaults *ConfigWrapper) error {
if !reflect.DeepEqual(cfg.Worker.OldQueryFrontendGRPCClientConfig, defaults.Worker.OldQueryFrontendGRPCClientConfig) {
// User is using the old grpc configuration.

if reflect.DeepEqual(cfg.Worker.NewQueryFrontendGRPCClientConfig, defaults.Worker.NewQueryFrontendGRPCClientConfig) {
// User is using the old grpc configuration only, we can just copy it to the new grpc client struct.
cfg.Worker.NewQueryFrontendGRPCClientConfig = cfg.Worker.OldQueryFrontendGRPCClientConfig
} else {
// User is using both, old and new way of configuring the grpc client, so we throw an error.
return fmt.Errorf("both `grpc_client_config` and `query_frontend_grpc_client` are set at the same time. Please use only one of them")
}

if reflect.DeepEqual(cfg.Worker.QuerySchedulerGRPCClientConfig, defaults.Worker.QuerySchedulerGRPCClientConfig) {
// Since the scheduler grpc client is not set, we can just copy the old query frontend grpc client to the scheduler grpc client.
cfg.Worker.QuerySchedulerGRPCClientConfig = cfg.Worker.OldQueryFrontendGRPCClientConfig
}
}
return nil
}
103 changes: 103 additions & 0 deletions pkg/loki/config_wrapper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -799,6 +799,109 @@ query_range:
config, _ := testContext(configFileString, nil)
assert.True(t, config.QueryRange.ResultsCacheConfig.CacheConfig.EmbeddedCache.Enabled)
})

t.Run("querier worker grpc client behavior", func(t *testing.T) {
newConfigBothClientsSet := `---
frontend_worker:
query_frontend_grpc_client:
tls_server_name: query-frontend
query_scheduler_grpc_client:
tls_server_name: query-scheduler
`

oldConfig := `---
frontend_worker:
grpc_client_config:
tls_server_name: query-frontend
`

mixedConfig := `---
frontend_worker:
grpc_client_config:
tls_server_name: query-frontend-old
query_frontend_grpc_client:
tls_server_name: query-frontend-new
query_scheduler_grpc_client:
tls_server_name: query-scheduler
`
t.Run("new configs are used", func(t *testing.T) {
asserts := func(config ConfigWrapper) {
require.EqualValues(t, "query-frontend", config.Worker.NewQueryFrontendGRPCClientConfig.TLS.ServerName)
require.EqualValues(t, "query-scheduler", config.Worker.QuerySchedulerGRPCClientConfig.TLS.ServerName)
// we never want to use zero values by default.
require.NotEqualValues(t, 0, config.Worker.NewQueryFrontendGRPCClientConfig.MaxRecvMsgSize)
require.NotEqualValues(t, 0, config.Worker.QuerySchedulerGRPCClientConfig.MaxRecvMsgSize)
}

yamlConfig, _, err := configWrapperFromYAML(t, newConfigBothClientsSet, nil)
require.NoError(t, err)
asserts(yamlConfig)

// repeat the test using only cli flags.
cliFlags := []string{
"-querier.frontend-grpc-client.tls-server-name=query-frontend",
"-querier.scheduler-grpc-client.tls-server-name=query-scheduler",
}
cliConfig, _, err := configWrapperFromYAML(t, emptyConfigString, cliFlags)
require.NoError(t, err)
asserts(cliConfig)
})

t.Run("old config works the same way", func(t *testing.T) {
asserts := func(config ConfigWrapper) {
require.EqualValues(t, "query-frontend", config.Worker.NewQueryFrontendGRPCClientConfig.TLS.ServerName)
require.EqualValues(t, "query-frontend", config.Worker.QuerySchedulerGRPCClientConfig.TLS.ServerName)

// we never want to use zero values by default.
require.NotEqualValues(t, 0, config.Worker.NewQueryFrontendGRPCClientConfig.MaxRecvMsgSize)
require.NotEqualValues(t, 0, config.Worker.QuerySchedulerGRPCClientConfig.MaxRecvMsgSize)
}

yamlConfig, _, err := configWrapperFromYAML(t, oldConfig, nil)
require.NoError(t, err)
asserts(yamlConfig)

// repeat the test using only cli flags.
cliFlags := []string{
"-querier.frontend-client.tls-server-name=query-frontend",
}
cliConfig, _, err := configWrapperFromYAML(t, emptyConfigString, cliFlags)
require.NoError(t, err)
asserts(cliConfig)
})

t.Run("mixed frontend clients throws an error", func(t *testing.T) {
_, _, err := configWrapperFromYAML(t, mixedConfig, nil)
require.Error(t, err)

// repeat the test using only cli flags.
_, _, err = configWrapperFromYAML(t, emptyConfigString, []string{
"-querier.frontend-client.tls-server-name=query-frontend",
"-querier.frontend-grpc-client.tls-server-name=query-frontend",
})
require.Error(t, err)

// repeat the test mixing the YAML with cli flags.
_, _, err = configWrapperFromYAML(t, newConfigBothClientsSet, []string{
"-querier.frontend-client.tls-server-name=query-frontend",
})
require.Error(t, err)
})

t.Run("mix correct cli flags with YAML configs", func(t *testing.T) {
config, _, err := configWrapperFromYAML(t, newConfigBothClientsSet, []string{
"-querier.scheduler-grpc-client.tls-enabled=true",
})
require.NoError(t, err)

require.EqualValues(t, "query-frontend", config.Worker.NewQueryFrontendGRPCClientConfig.TLS.ServerName)
require.EqualValues(t, "query-scheduler", config.Worker.QuerySchedulerGRPCClientConfig.TLS.ServerName)
// we never want to use zero values by default.
require.NotEqualValues(t, 0, config.Worker.NewQueryFrontendGRPCClientConfig.MaxRecvMsgSize)
require.NotEqualValues(t, 0, config.Worker.QuerySchedulerGRPCClientConfig.MaxRecvMsgSize)
require.True(t, config.Worker.QuerySchedulerGRPCClientConfig.TLSEnabled)
})
})
}

const defaultResulsCacheString = `---
Expand Down
2 changes: 1 addition & 1 deletion pkg/querier/worker/frontend_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func newFrontendProcessor(cfg Config, handler RequestHandler, log log.Logger, co
log: log,
handler: handler,
codec: codec,
maxMessageSize: cfg.GRPCClientConfig.MaxSendMsgSize,
maxMessageSize: cfg.NewQueryFrontendGRPCClientConfig.MaxSendMsgSize,
querierID: cfg.QuerierID,
}
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/querier/worker/scheduler_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,9 @@ func newSchedulerProcessor(cfg Config, handler RequestHandler, log log.Logger, m
log: log,
handler: handler,
codec: codec,
maxMessageSize: cfg.GRPCClientConfig.MaxSendMsgSize,
maxMessageSize: cfg.NewQueryFrontendGRPCClientConfig.MaxRecvMsgSize,
querierID: cfg.QuerierID,
grpcConfig: cfg.GRPCClientConfig,
grpcConfig: cfg.NewQueryFrontendGRPCClientConfig,
schedulerClientFactory: func(conn *grpc.ClientConn) schedulerpb.SchedulerForQuerierClient {
return schedulerpb.NewSchedulerForQuerierClient(conn)
},
Expand Down
55 changes: 38 additions & 17 deletions pkg/querier/worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,10 @@ type Config struct {

QuerierID string `yaml:"id"`

GRPCClientConfig grpcclient.Config `yaml:"grpc_client_config"`
NewQueryFrontendGRPCClientConfig grpcclient.Config `yaml:"query_frontend_grpc_client" doc:"description=Configures the querier gRPC client used to communicate with the query-frontend. Shouldn't be used in conjunction with 'grpc_client_config'."`
OldQueryFrontendGRPCClientConfig grpcclient.Config `yaml:"grpc_client_config" doc:"description=Configures the querier gRPC client used to communicate with the query-frontend and with the query-scheduler if 'query_scheduler_grpc_client' isn't defined. This shouldn't be used if 'query_frontend_grpc_client' is defined."`

QuerySchedulerGRPCClientConfig grpcclient.Config `yaml:"query_scheduler_grpc_client" doc:"description=Configures the querier gRPC client used to communicate with the query-scheduler. If not defined, 'grpc_client_config' is used instead."`
}

func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
Expand All @@ -39,14 +42,25 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
f.DurationVar(&cfg.DNSLookupPeriod, "querier.dns-lookup-period", 3*time.Second, "How often to query DNS for query-frontend or query-scheduler address. Also used to determine how often to poll the scheduler-ring for addresses if the scheduler-ring is configured.")
f.StringVar(&cfg.QuerierID, "querier.id", "", "Querier ID, sent to frontend service to identify requests from the same querier. Defaults to hostname.")

cfg.GRPCClientConfig.RegisterFlagsWithPrefix("querier.frontend-client", f)
// Register old client as the frontend-client flag for retro-compatibility.
cfg.OldQueryFrontendGRPCClientConfig.RegisterFlagsWithPrefix("querier.frontend-client", f)

cfg.NewQueryFrontendGRPCClientConfig.RegisterFlagsWithPrefix("querier.frontend-grpc-client", f)
cfg.QuerySchedulerGRPCClientConfig.RegisterFlagsWithPrefix("querier.scheduler-grpc-client", f)
}

func (cfg *Config) Validate() error {
if cfg.FrontendAddress != "" && cfg.SchedulerAddress != "" {
return errors.New("frontend address and scheduler address are mutually exclusive, please use only one")
}
return cfg.GRPCClientConfig.Validate()
if err := cfg.NewQueryFrontendGRPCClientConfig.Validate(); err != nil {
return err
}
if err := cfg.OldQueryFrontendGRPCClientConfig.Validate(); err != nil {
return err
}

return cfg.QuerySchedulerGRPCClientConfig.Validate()
}

// Handler for HTTP requests wrapped in protobuf messages.
Expand Down Expand Up @@ -80,7 +94,6 @@ type processor interface {
type querierWorker struct {
*services.BasicService

cfg Config
logger log.Logger

processor processor
Expand All @@ -92,6 +105,9 @@ type querierWorker struct {
managers map[string]*processorManager

metrics *Metrics

grpcClientConfig grpcclient.Config
maxConcurrentRequests int
}

func NewQuerierWorker(cfg Config, rng ring.ReadRing, handler RequestHandler, logger log.Logger, reg prometheus.Registerer, codec RequestCodec) (services.Service, error) {
Expand All @@ -105,43 +121,48 @@ func NewQuerierWorker(cfg Config, rng ring.ReadRing, handler RequestHandler, log

metrics := NewMetrics(cfg, reg)
var processor processor
var grpcCfg grpcclient.Config
var servs []services.Service
var address string

switch {
case rng != nil:
level.Info(logger).Log("msg", "Starting querier worker using query-scheduler and scheduler ring for addresses")
grpcCfg = cfg.QuerySchedulerGRPCClientConfig
processor, servs = newSchedulerProcessor(cfg, handler, logger, metrics, codec)
case cfg.SchedulerAddress != "":
level.Info(logger).Log("msg", "Starting querier worker connected to query-scheduler", "scheduler", cfg.SchedulerAddress)

grpcCfg = cfg.QuerySchedulerGRPCClientConfig
address = cfg.SchedulerAddress
processor, servs = newSchedulerProcessor(cfg, handler, logger, metrics, codec)

case cfg.FrontendAddress != "":
level.Info(logger).Log("msg", "Starting querier worker connected to query-frontend", "frontend", cfg.FrontendAddress)

address = cfg.FrontendAddress
grpcCfg = cfg.NewQueryFrontendGRPCClientConfig
processor = newFrontendProcessor(cfg, handler, logger, codec)
default:
return nil, errors.New("unable to start the querier worker, need to configure one of frontend_address, scheduler_address, or a ring config in the query_scheduler config block")
}

return newQuerierWorkerWithProcessor(cfg, metrics, logger, processor, address, rng, servs)
return newQuerierWorkerWithProcessor(grpcCfg, cfg.MaxConcurrent, cfg.DNSLookupPeriod, metrics, logger, processor, address, rng, servs)
}

func newQuerierWorkerWithProcessor(cfg Config, metrics *Metrics, logger log.Logger, processor processor, address string, ring ring.ReadRing, servs []services.Service) (*querierWorker, error) {
func newQuerierWorkerWithProcessor(grpcCfg grpcclient.Config, maxConcReq int, dnsLookupPeriod time.Duration, metrics *Metrics, logger log.Logger, processor processor, address string, ring ring.ReadRing, servs []services.Service) (*querierWorker, error) {
f := &querierWorker{
cfg: cfg,
logger: logger,
managers: map[string]*processorManager{},
processor: processor,
metrics: metrics,
maxConcurrentRequests: maxConcReq,
grpcClientConfig: grpcCfg,
logger: logger,
managers: map[string]*processorManager{},
processor: processor,
metrics: metrics,
}

// Empty address is only used in tests, where individual targets are added manually.
if address != "" {
w, err := util.NewDNSWatcher(address, cfg.DNSLookupPeriod, f)
w, err := util.NewDNSWatcher(address, dnsLookupPeriod, f)
if err != nil {
return nil, err
}
Expand All @@ -150,7 +171,7 @@ func newQuerierWorkerWithProcessor(cfg Config, metrics *Metrics, logger log.Logg
}

if ring != nil {
w, err := util.NewRingWatcher(log.With(logger, "component", "querier-scheduler-worker"), ring, cfg.DNSLookupPeriod, f)
w, err := util.NewRingWatcher(log.With(logger, "component", "querier-scheduler-worker"), ring, dnsLookupPeriod, f)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -245,17 +266,17 @@ func (w *querierWorker) resetConcurrency() {
}()

for _, m := range w.managers {
concurrency := w.cfg.MaxConcurrent / len(w.managers)
concurrency := w.maxConcurrentRequests / len(w.managers)

// If max concurrency does not evenly divide into our frontends a subset will be chosen
// to receive an extra connection. Frontend addresses were shuffled above so this will be a
// random selection of frontends.
if index < w.cfg.MaxConcurrent%len(w.managers) {
if index < w.maxConcurrentRequests%len(w.managers) {
level.Warn(w.logger).Log("msg", "max concurrency is not evenly divisible across targets, adding an extra connection", "addr", m.address)
concurrency++
}

// If concurrency is 0 then MaxConcurrentRequests is less than the total number of
// If concurrency is 0 then maxConcurrentRequests is less than the total number of
// frontends/schedulers. In order to prevent accidentally starving a frontend or scheduler we are just going to
// always connect once to every target. This is dangerous b/c we may start exceeding LogQL
// max concurrency.
Expand All @@ -271,7 +292,7 @@ func (w *querierWorker) resetConcurrency() {

func (w *querierWorker) connect(ctx context.Context, address string) (*grpc.ClientConn, error) {
// Because we only use single long-running method, it doesn't make sense to inject user ID, send over tracing or add metrics.
opts, err := w.cfg.GRPCClientConfig.DialOption(nil, nil)
opts, err := w.grpcClientConfig.DialOption(nil, nil)
if err != nil {
return nil, err
}
Expand Down
Loading
Loading