diff --git a/docs/sources/shared/configuration.md b/docs/sources/shared/configuration.md index fe57f40daa581..f59f6501c94e5 100644 --- a/docs/sources/shared/configuration.md +++ b/docs/sources/shared/configuration.md @@ -326,6 +326,16 @@ pattern_ingester: # merging them as bloom blocks. [bloom_compactor: ] +bloom_build: + # Flag to enable or disable the usage of the bloom-planner and bloom-builder + # components. + # CLI flag: -bloom-build.enabled + [enabled: | default = false] + + planner: + + builder: + # Experimental: The bloom_gateway block configures the Loki bloom gateway # server, responsible for serving queries for filtering chunks based on filter # expressions. diff --git a/pkg/bloombuild/builder/builder.go b/pkg/bloombuild/builder/builder.go new file mode 100644 index 0000000000000..098e7d6d83f00 --- /dev/null +++ b/pkg/bloombuild/builder/builder.go @@ -0,0 +1,50 @@ +package builder + +import ( + "context" + + "github.com/go-kit/log" + "github.com/grafana/dskit/services" + "github.com/prometheus/client_golang/prometheus" + + utillog "github.com/grafana/loki/v3/pkg/util/log" +) + +type Worker struct { + services.Service + + cfg Config + metrics *Metrics + logger log.Logger +} + +func New( + cfg Config, + logger log.Logger, + r prometheus.Registerer, +) (*Worker, error) { + utillog.WarnExperimentalUse("Bloom Builder", logger) + + w := &Worker{ + cfg: cfg, + metrics: NewMetrics(r), + logger: logger, + } + + w.Service = services.NewBasicService(w.starting, w.running, w.stopping) + return w, nil +} + +func (w *Worker) starting(_ context.Context) (err error) { + w.metrics.running.Set(1) + return err +} + +func (w *Worker) stopping(_ error) error { + w.metrics.running.Set(0) + return nil +} + +func (w *Worker) running(_ context.Context) error { + return nil +} diff --git a/pkg/bloombuild/builder/config.go b/pkg/bloombuild/builder/config.go new file mode 100644 index 0000000000000..ac282ccf95ebb --- /dev/null +++ b/pkg/bloombuild/builder/config.go @@ -0,0 +1,21 @@ +package builder + +import "flag" + +// Config configures the bloom-builder component. +type Config struct { + // TODO: Add config +} + +// RegisterFlagsWithPrefix registers flags for the bloom-planner configuration. +func (cfg *Config) RegisterFlagsWithPrefix(_ string, _ *flag.FlagSet) { + // TODO: Register flags with flagsPrefix +} + +func (cfg *Config) Validate() error { + return nil +} + +type Limits interface { + // TODO: Add limits +} diff --git a/pkg/bloombuild/builder/metrics.go b/pkg/bloombuild/builder/metrics.go new file mode 100644 index 0000000000000..e8f46fa025080 --- /dev/null +++ b/pkg/bloombuild/builder/metrics.go @@ -0,0 +1,26 @@ +package builder + +import ( + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" +) + +const ( + metricsNamespace = "loki" + metricsSubsystem = "bloombuilder" +) + +type Metrics struct { + running prometheus.Gauge +} + +func NewMetrics(r prometheus.Registerer) *Metrics { + return &Metrics{ + running: promauto.With(r).NewGauge(prometheus.GaugeOpts{ + Namespace: metricsNamespace, + Subsystem: metricsSubsystem, + Name: "running", + Help: "Value will be 1 if the bloom builder is currently running on this instance", + }), + } +} diff --git a/pkg/bloombuild/config.go b/pkg/bloombuild/config.go new file mode 100644 index 0000000000000..c69c605607f5a --- /dev/null +++ b/pkg/bloombuild/config.go @@ -0,0 +1,40 @@ +package bloombuild + +import ( + "flag" + "fmt" + + "github.com/grafana/loki/v3/pkg/bloombuild/builder" + "github.com/grafana/loki/v3/pkg/bloombuild/planner" +) + +// Config configures the bloom-planner component. +type Config struct { + Enabled bool `yaml:"enabled"` + + Planner planner.Config `yaml:"planner"` + Builder builder.Config `yaml:"builder"` +} + +// RegisterFlags registers flags for the bloom building configuration. +func (cfg *Config) RegisterFlags(f *flag.FlagSet) { + f.BoolVar(&cfg.Enabled, "bloom-build.enabled", false, "Flag to enable or disable the usage of the bloom-planner and bloom-builder components.") + cfg.Planner.RegisterFlagsWithPrefix("bloom-build.planner", f) + cfg.Builder.RegisterFlagsWithPrefix("bloom-build.builder", f) +} + +func (cfg *Config) Validate() error { + if !cfg.Enabled { + return nil + } + + if err := cfg.Planner.Validate(); err != nil { + return fmt.Errorf("invalid bloom planner configuration: %w", err) + } + + if err := cfg.Builder.Validate(); err != nil { + return fmt.Errorf("invalid bloom builder configuration: %w", err) + } + + return nil +} diff --git a/pkg/bloombuild/planner/config.go b/pkg/bloombuild/planner/config.go new file mode 100644 index 0000000000000..dd8cb315d9345 --- /dev/null +++ b/pkg/bloombuild/planner/config.go @@ -0,0 +1,21 @@ +package planner + +import "flag" + +// Config configures the bloom-planner component. +type Config struct { + // TODO: Add config +} + +// RegisterFlagsWithPrefix registers flags for the bloom-planner configuration. +func (cfg *Config) RegisterFlagsWithPrefix(_ string, _ *flag.FlagSet) { + // TODO: Register flags with flagsPrefix +} + +func (cfg *Config) Validate() error { + return nil +} + +type Limits interface { + // TODO: Add limits +} diff --git a/pkg/bloombuild/planner/metrics.go b/pkg/bloombuild/planner/metrics.go new file mode 100644 index 0000000000000..e9a9035e14df0 --- /dev/null +++ b/pkg/bloombuild/planner/metrics.go @@ -0,0 +1,26 @@ +package planner + +import ( + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" +) + +const ( + metricsNamespace = "loki" + metricsSubsystem = "bloomplanner" +) + +type Metrics struct { + running prometheus.Gauge +} + +func NewMetrics(r prometheus.Registerer) *Metrics { + return &Metrics{ + running: promauto.With(r).NewGauge(prometheus.GaugeOpts{ + Namespace: metricsNamespace, + Subsystem: metricsSubsystem, + Name: "running", + Help: "Value will be 1 if bloom planner is currently running on this instance", + }), + } +} diff --git a/pkg/bloombuild/planner/planner.go b/pkg/bloombuild/planner/planner.go new file mode 100644 index 0000000000000..7732d180b0bb8 --- /dev/null +++ b/pkg/bloombuild/planner/planner.go @@ -0,0 +1,50 @@ +package planner + +import ( + "context" + + "github.com/go-kit/log" + "github.com/grafana/dskit/services" + "github.com/prometheus/client_golang/prometheus" + + utillog "github.com/grafana/loki/v3/pkg/util/log" +) + +type Planner struct { + services.Service + + cfg Config + metrics *Metrics + logger log.Logger +} + +func New( + cfg Config, + logger log.Logger, + r prometheus.Registerer, +) (*Planner, error) { + utillog.WarnExperimentalUse("Bloom Planner", logger) + + p := &Planner{ + cfg: cfg, + metrics: NewMetrics(r), + logger: logger, + } + + p.Service = services.NewBasicService(p.starting, p.running, p.stopping) + return p, nil +} + +func (p *Planner) starting(_ context.Context) (err error) { + p.metrics.running.Set(1) + return err +} + +func (p *Planner) stopping(_ error) error { + p.metrics.running.Set(0) + return nil +} + +func (p *Planner) running(_ context.Context) error { + return nil +} diff --git a/pkg/loki/loki.go b/pkg/loki/loki.go index b682c4bfaa65c..9446b351aab82 100644 --- a/pkg/loki/loki.go +++ b/pkg/loki/loki.go @@ -31,6 +31,7 @@ import ( "google.golang.org/grpc/health/grpc_health_v1" "github.com/grafana/loki/v3/pkg/analytics" + "github.com/grafana/loki/v3/pkg/bloombuild" "github.com/grafana/loki/v3/pkg/bloomcompactor" "github.com/grafana/loki/v3/pkg/bloomgateway" "github.com/grafana/loki/v3/pkg/compactor" @@ -90,6 +91,7 @@ type Config struct { Pattern pattern.Config `yaml:"pattern_ingester,omitempty"` IndexGateway indexgateway.Config `yaml:"index_gateway"` BloomCompactor bloomcompactor.Config `yaml:"bloom_compactor,omitempty" category:"experimental"` + BloomBuild bloombuild.Config `yaml:"bloom_build,omitempty" category:"experimental"` BloomGateway bloomgateway.Config `yaml:"bloom_gateway,omitempty" category:"experimental"` StorageConfig storage.Config `yaml:"storage_config,omitempty"` ChunkStoreConfig config.ChunkStoreConfig `yaml:"chunk_store_config,omitempty"` @@ -173,6 +175,7 @@ func (c *Config) RegisterFlags(f *flag.FlagSet) { c.Tracing.RegisterFlags(f) c.CompactorConfig.RegisterFlags(f) c.BloomCompactor.RegisterFlags(f) + c.BloomBuild.RegisterFlags(f) c.QueryScheduler.RegisterFlags(f) c.Analytics.RegisterFlags(f) c.OperationalConfig.RegisterFlags(f) @@ -649,6 +652,8 @@ func (t *Loki) setupModuleManager() error { mm.RegisterModule(BloomStore, t.initBloomStore) mm.RegisterModule(BloomCompactor, t.initBloomCompactor) mm.RegisterModule(BloomCompactorRing, t.initBloomCompactorRing, modules.UserInvisibleModule) + mm.RegisterModule(BloomPlanner, t.initBloomPlanner) + mm.RegisterModule(BloomBuilder, t.initBloomBuilder) mm.RegisterModule(IndexGateway, t.initIndexGateway) mm.RegisterModule(IndexGatewayRing, t.initIndexGatewayRing, modules.UserInvisibleModule) mm.RegisterModule(IndexGatewayInterceptors, t.initIndexGatewayInterceptors, modules.UserInvisibleModule) @@ -686,6 +691,8 @@ func (t *Loki) setupModuleManager() error { IndexGateway: {Server, Store, BloomStore, IndexGatewayRing, IndexGatewayInterceptors, Analytics}, BloomGateway: {Server, BloomStore, Analytics}, BloomCompactor: {Server, BloomStore, BloomCompactorRing, Analytics, Store}, + BloomPlanner: {Server, BloomStore, Analytics, Store}, + BloomBuilder: {Server, BloomStore, Analytics, Store}, PatternIngester: {Server, MemberlistKV, Analytics}, PatternRingClient: {Server, MemberlistKV, Analytics}, IngesterQuerier: {Ring}, diff --git a/pkg/loki/modules.go b/pkg/loki/modules.go index 0280bd514d3c1..a563e80f789fe 100644 --- a/pkg/loki/modules.go +++ b/pkg/loki/modules.go @@ -38,6 +38,8 @@ import ( "github.com/grafana/loki/v3/pkg/storage/types" "github.com/grafana/loki/v3/pkg/analytics" + "github.com/grafana/loki/v3/pkg/bloombuild/builder" + "github.com/grafana/loki/v3/pkg/bloombuild/planner" "github.com/grafana/loki/v3/pkg/bloomgateway" "github.com/grafana/loki/v3/pkg/compactor" compactorclient "github.com/grafana/loki/v3/pkg/compactor/client" @@ -122,6 +124,8 @@ const ( QuerySchedulerRing string = "query-scheduler-ring" BloomCompactor string = "bloom-compactor" BloomCompactorRing string = "bloom-compactor-ring" + BloomPlanner string = "bloom-planner" + BloomBuilder string = "bloom-builder" BloomStore string = "bloom-store" All string = "all" Read string = "read" @@ -803,7 +807,7 @@ func (t *Loki) updateConfigForShipperStore() { t.Cfg.StorageConfig.TSDBShipperConfig.Mode = indexshipper.ModeWriteOnly t.Cfg.StorageConfig.TSDBShipperConfig.IngesterDBRetainPeriod = shipperQuerierIndexUpdateDelay(t.Cfg.StorageConfig.IndexCacheValidity, t.Cfg.StorageConfig.TSDBShipperConfig.ResyncInterval) - case t.Cfg.isTarget(Querier), t.Cfg.isTarget(Ruler), t.Cfg.isTarget(Read), t.Cfg.isTarget(Backend), t.isModuleActive(IndexGateway), t.Cfg.isTarget(BloomCompactor): + case t.Cfg.isTarget(Querier), t.Cfg.isTarget(Ruler), t.Cfg.isTarget(Read), t.Cfg.isTarget(Backend), t.isModuleActive(IndexGateway), t.Cfg.isTarget(BloomCompactor), t.Cfg.isTarget(BloomPlanner), t.Cfg.isTarget(BloomBuilder): // We do not want query to do any updates to index t.Cfg.StorageConfig.BoltDBShipperConfig.Mode = indexshipper.ModeReadOnly t.Cfg.StorageConfig.TSDBShipperConfig.Mode = indexshipper.ModeReadOnly @@ -1553,6 +1557,34 @@ func (t *Loki) initBloomCompactorRing() (services.Service, error) { return t.bloomCompactorRingManager, nil } +func (t *Loki) initBloomPlanner() (services.Service, error) { + if !t.Cfg.BloomBuild.Enabled { + return nil, nil + } + + logger := log.With(util_log.Logger, "component", "bloom-planner") + + return planner.New( + t.Cfg.BloomBuild.Planner, + logger, + prometheus.DefaultRegisterer, + ) +} + +func (t *Loki) initBloomBuilder() (services.Service, error) { + if !t.Cfg.BloomBuild.Enabled { + return nil, nil + } + + logger := log.With(util_log.Logger, "component", "bloom-worker") + + return builder.New( + t.Cfg.BloomBuild.Builder, + logger, + prometheus.DefaultRegisterer, + ) +} + func (t *Loki) initQueryScheduler() (services.Service, error) { s, err := scheduler.NewScheduler(t.Cfg.QueryScheduler, t.Overrides, util_log.Logger, t.querySchedulerRingManager, prometheus.DefaultRegisterer, t.Cfg.MetricsNamespace) if err != nil {