Skip to content

Commit

Permalink
refactor(dataobj): Consolidate configuration and add querier support (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
cyriltovena authored Feb 7, 2025
1 parent ff0da88 commit 1fa952d
Show file tree
Hide file tree
Showing 12 changed files with 1,080 additions and 86 deletions.
65 changes: 35 additions & 30 deletions docs/sources/shared/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -786,38 +786,43 @@ kafka_config:
# CLI flag: -kafka.max-consumer-lag-at-startup
[max_consumer_lag_at_startup: <duration> | default = 15s]

dataobj_consumer:
builderconfig:
# The size of the target page to use for the data object builder.
# CLI flag: -dataobj-consumer.target-page-size
[target_page_size: <int> | default = 2MiB]

# The size of the target object to use for the data object builder.
# CLI flag: -dataobj-consumer.target-object-size
[target_object_size: <int> | default = 1GiB]

# Configures a maximum size for sections, for sections that support it.
# CLI flag: -dataobj-consumer.target-section-size
[target_section_size: <int> | default = 128MiB]

# The size of the buffer to use for sorting logs.
# CLI flag: -dataobj-consumer.buffer-size
[buffer_size: <int> | default = 16MiB]

uploader:
# The size of the SHA prefix to use for generating object storage keys for
# data objects.
# CLI flag: -dataobj-consumer.sha-prefix-size
[shaprefixsize: <int> | default = 2]
dataobj:
consumer:
builderconfig:
# The size of the target page to use for the data object builder.
# CLI flag: -dataobj-consumer.target-page-size
[target_page_size: <int> | default = 2MiB]

# The size of the target object to use for the data object builder.
# CLI flag: -dataobj-consumer.target-object-size
[target_object_size: <int> | default = 1GiB]

# Configures a maximum size for sections, for sections that support it.
# CLI flag: -dataobj-consumer.target-section-size
[target_section_size: <int> | default = 128MiB]

# The size of the buffer to use for sorting logs.
# CLI flag: -dataobj-consumer.buffer-size
[buffer_size: <int> | default = 16MiB]

uploader:
# The size of the SHA prefix to use for generating object storage keys for
# data objects.
# CLI flag: -dataobj-consumer.sha-prefix-size
[shaprefixsize: <int> | default = 2]

querier:
# Enable the dataobj querier.
# CLI flag: -dataobj-querier-enabled
[enabled: <boolean> | default = false]

# The prefix to use for the storage bucket.
# CLI flag: -dataobj-consumer.storage-bucket-prefix
[storage_bucket_prefix: <string> | default = "dataobj/"]
# The date of the first day of when the dataobj querier should start
# querying from. In YYYY-MM-DD format, for example: 2018-04-15.
# CLI flag: -dataobj-querier-from
[from: <daytime> | default = 1970-01-01]

dataobj_explorer:
# Prefix to use when exploring the bucket. If set, only objects under this
# prefix will be visible.
# CLI flag: -dataobj-explorer.storage-bucket-prefix
# The prefix to use for the storage bucket.
# CLI flag: -dataobj-storage-bucket-prefix
[storage_bucket_prefix: <string> | default = "dataobj/"]

# Configuration for 'runtime config' module, responsible for reloading runtime
Expand Down
31 changes: 31 additions & 0 deletions pkg/dataobj/config/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package config

import (
"flag"

"github.com/grafana/loki/v3/pkg/dataobj/consumer"
"github.com/grafana/loki/v3/pkg/dataobj/querier"
)

type Config struct {
Consumer consumer.Config `yaml:"consumer"`
Querier querier.Config `yaml:"querier"`
// StorageBucketPrefix is the prefix to use for the storage bucket.
StorageBucketPrefix string `yaml:"storage_bucket_prefix"`
}

func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
cfg.Consumer.RegisterFlags(f)
cfg.Querier.RegisterFlags(f)
f.StringVar(&cfg.StorageBucketPrefix, "dataobj-storage-bucket-prefix", "dataobj/", "The prefix to use for the storage bucket.")
}

func (cfg *Config) Validate() error {
if err := cfg.Consumer.Validate(); err != nil {
return err
}
if err := cfg.Querier.Validate(); err != nil {
return err
}
return nil
}
3 changes: 0 additions & 3 deletions pkg/dataobj/consumer/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,6 @@ import (
type Config struct {
dataobj.BuilderConfig
UploaderConfig uploader.Config `yaml:"uploader"`
// StorageBucketPrefix is the prefix to use for the storage bucket.
StorageBucketPrefix string `yaml:"storage_bucket_prefix"`
}

func (cfg *Config) Validate() error {
Expand All @@ -29,5 +27,4 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
cfg.BuilderConfig.RegisterFlagsWithPrefix(prefix, f)
cfg.UploaderConfig.RegisterFlagsWithPrefix(prefix, f)
f.StringVar(&cfg.StorageBucketPrefix, prefix+"storage-bucket-prefix", "dataobj/", "The prefix to use for the storage bucket.")
}
3 changes: 0 additions & 3 deletions pkg/dataobj/consumer/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,6 @@ type Service struct {
}

func New(kafkaCfg kafka.Config, cfg Config, topicPrefix string, bucket objstore.Bucket, instanceID string, partitionRing ring.PartitionRingReader, reg prometheus.Registerer, logger log.Logger) *Service {
if cfg.StorageBucketPrefix != "" {
bucket = objstore.NewPrefixedBucket(bucket, cfg.StorageBucketPrefix)
}
s := &Service{
logger: log.With(logger, "component", groupName),
cfg: cfg,
Expand Down
14 changes: 0 additions & 14 deletions pkg/dataobj/explorer/config.go

This file was deleted.

16 changes: 7 additions & 9 deletions pkg/dataobj/metastore/metastore.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,13 @@ const (
metastoreWindowSize = 12 * time.Hour
)

var (
// Define our own builder config because metastore objects are significantly smaller.
metastoreBuilderCfg = dataobj.BuilderConfig{
TargetObjectSize: 32 * 1024 * 1024,
TargetPageSize: 4 * 1024 * 1024,
BufferSize: 32 * 1024 * 1024, // 8x page size
TargetSectionSize: 4 * 1024 * 1024, // object size / 8
}
)
// Define our own builder config because metastore objects are significantly smaller.
var metastoreBuilderCfg = dataobj.BuilderConfig{
TargetObjectSize: 32 * 1024 * 1024,
TargetPageSize: 4 * 1024 * 1024,
BufferSize: 32 * 1024 * 1024, // 8x page size
TargetSectionSize: 4 * 1024 * 1024, // object size / 8
}

type Manager struct {
metastoreBuilder *dataobj.Builder
Expand Down
96 changes: 96 additions & 0 deletions pkg/dataobj/querier/store.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
package querier

import (
"context"
"flag"
"fmt"

"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"
"github.com/thanos-io/objstore"

"github.com/grafana/loki/v3/pkg/iter"
"github.com/grafana/loki/v3/pkg/logproto"
"github.com/grafana/loki/v3/pkg/logql"
"github.com/grafana/loki/v3/pkg/querier"
"github.com/grafana/loki/v3/pkg/storage/chunk"
storageconfig "github.com/grafana/loki/v3/pkg/storage/config"
"github.com/grafana/loki/v3/pkg/storage/stores/index/stats"
)

var _ querier.Store = &Store{}

type Config struct {
Enabled bool `yaml:"enabled" doc:"description=Enable the dataobj querier."`
From storageconfig.DayTime `yaml:"from" doc:"description=The date of the first day of when the dataobj querier should start querying from. In YYYY-MM-DD format, for example: 2018-04-15."`
}

func (c *Config) RegisterFlags(f *flag.FlagSet) {
f.BoolVar(&c.Enabled, "dataobj-querier-enabled", false, "Enable the dataobj querier.")
f.Var(&c.From, "dataobj-querier-from", "The start time to query from.")
}

func (c *Config) Validate() error {
if c.Enabled && c.From.ModelTime().Time().IsZero() {
return fmt.Errorf("from is required when dataobj querier is enabled")
}
return nil
}

type Store struct {
bucket objstore.Bucket
}

func NewStore(bucket objstore.Bucket) *Store {
return &Store{
bucket: bucket,
}
}

// SelectLogs implements querier.Store
func (s *Store) SelectLogs(_ context.Context, _ logql.SelectLogParams) (iter.EntryIterator, error) {
// TODO: Implement
return iter.NoopEntryIterator, nil
}

// SelectSamples implements querier.Store
func (s *Store) SelectSamples(_ context.Context, _ logql.SelectSampleParams) (iter.SampleIterator, error) {
// TODO: Implement
return iter.NoopSampleIterator, nil
}

// SelectSeries implements querier.Store
func (s *Store) SelectSeries(_ context.Context, _ logql.SelectLogParams) ([]logproto.SeriesIdentifier, error) {
// TODO: Implement
return []logproto.SeriesIdentifier{}, nil
}

// LabelValuesForMetricName implements querier.Store
func (s *Store) LabelValuesForMetricName(_ context.Context, _ string, _ model.Time, _ model.Time, _ string, _ string, _ ...*labels.Matcher) ([]string, error) {
// TODO: Implement
return []string{}, nil
}

// LabelNamesForMetricName implements querier.Store
func (s *Store) LabelNamesForMetricName(_ context.Context, _ string, _ model.Time, _ model.Time, _ string, _ ...*labels.Matcher) ([]string, error) {
// TODO: Implement
return []string{}, nil
}

// Stats implements querier.Store
func (s *Store) Stats(_ context.Context, _ string, _ model.Time, _ model.Time, _ ...*labels.Matcher) (*stats.Stats, error) {
// TODO: Implement
return &stats.Stats{}, nil
}

// Volume implements querier.Store
func (s *Store) Volume(_ context.Context, _ string, _ model.Time, _ model.Time, _ int32, _ []string, _ string, _ ...*labels.Matcher) (*logproto.VolumeResponse, error) {
// TODO: Implement
return &logproto.VolumeResponse{}, nil
}

// GetShards implements querier.Store
func (s *Store) GetShards(_ context.Context, _ string, _ model.Time, _ model.Time, _ uint64, _ chunk.Predicate) (*logproto.ShardsResponse, error) {
// TODO: Implement
return &logproto.ShardsResponse{}, nil
}
12 changes: 5 additions & 7 deletions pkg/loki/loki.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@ import (
"github.com/grafana/loki/v3/pkg/compactor"
compactorclient "github.com/grafana/loki/v3/pkg/compactor/client"
"github.com/grafana/loki/v3/pkg/compactor/deletion"
dataobjconfig "github.com/grafana/loki/v3/pkg/dataobj/config"
"github.com/grafana/loki/v3/pkg/dataobj/consumer"
"github.com/grafana/loki/v3/pkg/dataobj/explorer"
"github.com/grafana/loki/v3/pkg/distributor"
"github.com/grafana/loki/v3/pkg/indexgateway"
"github.com/grafana/loki/v3/pkg/ingester"
Expand Down Expand Up @@ -110,8 +110,7 @@ type Config struct {
TableManager index.TableManagerConfig `yaml:"table_manager,omitempty"`
MemberlistKV memberlist.KVConfig `yaml:"memberlist"`
KafkaConfig kafka.Config `yaml:"kafka_config,omitempty" category:"experimental"`
DataObjConsumer consumer.Config `yaml:"dataobj_consumer,omitempty" category:"experimental"`
DataObjExplorer explorer.Config `yaml:"dataobj_explorer,omitempty" category:"experimental"`
DataObj dataobjconfig.Config `yaml:"dataobj,omitempty" category:"experimental"`

RuntimeConfig runtimeconfig.Config `yaml:"runtime_config,omitempty"`
OperationalConfig runtime.Config `yaml:"operational_config,omitempty"`
Expand Down Expand Up @@ -193,8 +192,7 @@ func (c *Config) RegisterFlags(f *flag.FlagSet) {
c.KafkaConfig.RegisterFlags(f)
c.BlockBuilder.RegisterFlags(f)
c.BlockScheduler.RegisterFlags(f)
c.DataObjExplorer.RegisterFlags(f)
c.DataObjConsumer.RegisterFlags(f)
c.DataObj.RegisterFlags(f)
}

func (c *Config) registerServerFlagsWithChangedDefaultValues(fs *flag.FlagSet) {
Expand Down Expand Up @@ -310,8 +308,8 @@ func (c *Config) Validate() error {
if err := c.KafkaConfig.Validate(); err != nil {
errs = append(errs, errors.Wrap(err, "CONFIG ERROR: invalid kafka_config config"))
}
if err := c.DataObjConsumer.Validate(); err != nil {
errs = append(errs, errors.Wrap(err, "CONFIG ERROR: invalid dataobj_consumer config"))
if err := c.DataObj.Validate(); err != nil {
errs = append(errs, errors.Wrap(err, "CONFIG ERROR: invalid dataobj config"))
}
}
if err := c.Distributor.Validate(); err != nil {
Expand Down
Loading

0 comments on commit 1fa952d

Please sign in to comment.