Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

create smaller unique files from boltdb shipper and other code refactorings #2261

Merged
merged 19 commits into from
Jul 28, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
2d6e0cc
rename package local to shipper
sandeepsukhani Jun 26, 2020
71dad85
reorganized files to separate out uploads and downloads
sandeepsukhani Jun 26, 2020
c4b2a5a
create smaller unique files from boltdb shipper and other improvements
sandeepsukhani Jun 26, 2020
4899ea6
cancellation of downloading of initial files when service is being st…
sandeepsukhani Jul 1, 2020
206e02b
migration of existing files to dedicated folders and cleanup of index…
sandeepsukhani Jul 1, 2020
32a8398
some more improvements to uploads and downloads code
sandeepsukhani Jul 8, 2020
88ffcd8
tests for uploads and downloads package in shipper
sandeepsukhani Jul 8, 2020
1002189
test cleanup fix and some other minor changes
sandeepsukhani Jul 9, 2020
6dd6ba7
refactor getting uploader from object key
sandeepsukhani Jul 10, 2020
88235a4
better naming
sandeepsukhani Jul 13, 2020
8c4e4fe
changes suggested from PR review
sandeepsukhani Jul 21, 2020
8de4d7f
Fixes goroutine loop.
cyriltovena Jul 21, 2020
c0b2179
Simplify getoradd table.
cyriltovena Jul 21, 2020
88ae78c
Fixes loop in upload manager.
cyriltovena Jul 21, 2020
6d9037f
some more tests, logs and changes suggested from PR review
sandeepsukhani Jul 22, 2020
ecc174f
move ready state check for downloaded table inside query function
sandeepsukhani Jul 22, 2020
03dce89
add info log for successfully finishing uploading of a table
sandeepsukhani Jul 23, 2020
7e14254
fix merge conflict
sandeepsukhani Jul 28, 2020
863038a
fix ingester name and mode config
sandeepsukhani Jul 28, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/loki/loki.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ func New(cfg Config) (*Loki, error) {
if err := loki.setupModuleManager(); err != nil {
return nil, err
}
storage.RegisterCustomIndexClients(cfg.StorageConfig, prometheus.DefaultRegisterer)
storage.RegisterCustomIndexClients(&loki.cfg.StorageConfig, prometheus.DefaultRegisterer)

return loki, nil
}
Expand Down
20 changes: 10 additions & 10 deletions pkg/loki/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ import (
"github.com/grafana/loki/pkg/querier"
"github.com/grafana/loki/pkg/querier/queryrange"
loki_storage "github.com/grafana/loki/pkg/storage"
"github.com/grafana/loki/pkg/storage/stores/local"
"github.com/grafana/loki/pkg/storage/stores/shipper"
serverutil "github.com/grafana/loki/pkg/util/server"
"github.com/grafana/loki/pkg/util/validation"
)
Expand Down Expand Up @@ -182,7 +182,7 @@ func (t *Loki) initIngester() (_ services.Service, err error) {

// We want ingester to also query the store when using boltdb-shipper
pc := t.cfg.SchemaConfig.Configs[activePeriodConfig(t.cfg.SchemaConfig)]
if pc.IndexType == local.BoltDBShipperType {
if pc.IndexType == shipper.BoltDBShipperType {
t.cfg.Ingester.QueryStore = true
mlb, err := calculateMaxLookBack(pc, t.cfg.Ingester.QueryStoreMaxLookBackPeriod, t.cfg.Ingester.MaxChunkAge)
if err != nil {
Expand Down Expand Up @@ -243,17 +243,17 @@ func (t *Loki) initTableManager() (services.Service, error) {
}

func (t *Loki) initStore() (_ services.Service, err error) {
if t.cfg.SchemaConfig.Configs[activePeriodConfig(t.cfg.SchemaConfig)].IndexType == local.BoltDBShipperType {
if t.cfg.SchemaConfig.Configs[activePeriodConfig(t.cfg.SchemaConfig)].IndexType == shipper.BoltDBShipperType {
t.cfg.StorageConfig.BoltDBShipperConfig.IngesterName = t.cfg.Ingester.LifecyclerConfig.ID
switch t.cfg.Target {
case Ingester:
// We do not want ingester to unnecessarily keep downloading files
t.cfg.StorageConfig.BoltDBShipperConfig.Mode = local.ShipperModeWriteOnly
t.cfg.StorageConfig.BoltDBShipperConfig.Mode = shipper.ModeWriteOnly
case Querier:
// We do not want query to do any updates to index
t.cfg.StorageConfig.BoltDBShipperConfig.Mode = local.ShipperModeReadOnly
t.cfg.StorageConfig.BoltDBShipperConfig.Mode = shipper.ModeReadOnly
default:
t.cfg.StorageConfig.BoltDBShipperConfig.Mode = local.ShipperModeReadWrite
t.cfg.StorageConfig.BoltDBShipperConfig.Mode = shipper.ModeReadWrite
}
}

Expand Down Expand Up @@ -370,20 +370,20 @@ func activePeriodConfig(cfg chunk.SchemaConfig) int {
// usingBoltdbShipper check whether current or the next index type is boltdb-shipper, returns true if yes.
func usingBoltdbShipper(cfg chunk.SchemaConfig) bool {
activePCIndex := activePeriodConfig(cfg)
if cfg.Configs[activePCIndex].IndexType == local.BoltDBShipperType ||
(len(cfg.Configs)-1 > activePCIndex && cfg.Configs[activePCIndex+1].IndexType == local.BoltDBShipperType) {
if cfg.Configs[activePCIndex].IndexType == shipper.BoltDBShipperType ||
(len(cfg.Configs)-1 > activePCIndex && cfg.Configs[activePCIndex+1].IndexType == shipper.BoltDBShipperType) {
return true
}

return false
}

func calculateMaxLookBack(pc chunk.PeriodConfig, maxLookBackConfig, maxChunkAge time.Duration) (time.Duration, error) {
if pc.ObjectType != local.FilesystemObjectStoreType && maxLookBackConfig.Nanoseconds() != 0 {
if pc.ObjectType != shipper.FilesystemObjectStoreType && maxLookBackConfig.Nanoseconds() != 0 {
return 0, errors.New("it is an error to specify a non zero `query_store_max_look_back_period` value when using any object store other than `filesystem`")
}
// When using shipper, limit max look back for query to MaxChunkAge + upload interval by shipper + 15 mins to query only data whose index is not pushed yet
defaultMaxLookBack := maxChunkAge + local.ShipperFileUploadInterval + (15 * time.Minute)
defaultMaxLookBack := maxChunkAge + shipper.UploadInterval + (15 * time.Minute)

if maxLookBackConfig == 0 {
// If the QueryStoreMaxLookBackPeriod is still it's default value of 0, set it to the default calculated value.
Expand Down
18 changes: 8 additions & 10 deletions pkg/storage/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,15 @@ import (
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/logql"
"github.com/grafana/loki/pkg/logql/stats"
"github.com/grafana/loki/pkg/storage/stores/local"
"github.com/grafana/loki/pkg/storage/stores/shipper"
"github.com/grafana/loki/pkg/util"
)

// Config is the loki storage configuration
type Config struct {
storage.Config `yaml:",inline"`
MaxChunkBatchSize int `yaml:"max_chunk_batch_size"`
BoltDBShipperConfig local.ShipperConfig `yaml:"boltdb_shipper"`
MaxChunkBatchSize int `yaml:"max_chunk_batch_size"`
BoltDBShipperConfig shipper.Config `yaml:"boltdb_shipper"`
}

// RegisterFlags adds the flags required to configure this flag set.
Expand Down Expand Up @@ -64,7 +64,7 @@ func NewStore(cfg Config, storeCfg chunk.StoreConfig, schemaCfg chunk.SchemaConf
// NewTableClient creates a TableClient for managing tables for index/chunk store.
// ToDo: Add support in Cortex for registering custom table client like index client.
func NewTableClient(name string, cfg Config) (chunk.TableClient, error) {
if name == local.BoltDBShipperType {
if name == shipper.BoltDBShipperType {
name = "boltdb"
cfg.FSConfig = cortex_local.FSConfig{Directory: cfg.BoltDBShipperConfig.ActiveIndexDirectory}
}
Expand Down Expand Up @@ -287,13 +287,13 @@ func filterChunksByTime(from, through model.Time, chunks []chunk.Chunk) []chunk.
return filtered
}

func RegisterCustomIndexClients(cfg Config, registerer prometheus.Registerer) {
func RegisterCustomIndexClients(cfg *Config, registerer prometheus.Registerer) {
// BoltDB Shipper is supposed to be run as a singleton.
// This could also be done in NewBoltDBIndexClientWithShipper factory method but we are doing it here because that method is used
// in tests for creating multiple instances of it at a time.
var boltDBIndexClientWithShipper chunk.IndexClient

storage.RegisterIndexStore(local.BoltDBShipperType, func() (chunk.IndexClient, error) {
storage.RegisterIndexStore(shipper.BoltDBShipperType, func() (chunk.IndexClient, error) {
if boltDBIndexClientWithShipper != nil {
return boltDBIndexClientWithShipper, nil
}
Expand All @@ -303,9 +303,7 @@ func RegisterCustomIndexClients(cfg Config, registerer prometheus.Registerer) {
return nil, err
}

boltDBIndexClientWithShipper, err = local.NewBoltDBIndexClientWithShipper(
cortex_local.BoltDBConfig{Directory: cfg.BoltDBShipperConfig.ActiveIndexDirectory},
objectClient, cfg.BoltDBShipperConfig, registerer)
boltDBIndexClientWithShipper, err = shipper.NewShipper(cfg.BoltDBShipperConfig, objectClient, registerer)

return boltDBIndexClientWithShipper, err
}, func() (client chunk.TableClient, e error) {
Expand All @@ -314,6 +312,6 @@ func RegisterCustomIndexClients(cfg Config, registerer prometheus.Registerer) {
return nil, err
}

return local.NewBoltDBShipperTableClient(objectClient), nil
return shipper.NewBoltDBShipperTableClient(objectClient), nil
})
}
6 changes: 3 additions & 3 deletions pkg/storage/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import (
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/logql"
"github.com/grafana/loki/pkg/logql/marshal"
"github.com/grafana/loki/pkg/storage/stores/local"
"github.com/grafana/loki/pkg/storage/stores/shipper"
"github.com/grafana/loki/pkg/util/validation"
)

Expand Down Expand Up @@ -731,7 +731,7 @@ func TestStore_MultipleBoltDBShippersInConfig(t *testing.T) {
require.NoError(t, err)

// config for BoltDB Shipper
boltdbShipperConfig := local.ShipperConfig{}
boltdbShipperConfig := shipper.Config{}
flagext.DefaultValues(&boltdbShipperConfig)
boltdbShipperConfig.ActiveIndexDirectory = path.Join(tempDir, "index")
boltdbShipperConfig.SharedStoreType = "filesystem"
Expand All @@ -748,7 +748,7 @@ func TestStore_MultipleBoltDBShippersInConfig(t *testing.T) {
BoltDBShipperConfig: boltdbShipperConfig,
}

RegisterCustomIndexClients(config, nil)
RegisterCustomIndexClients(&config, nil)

store, err := NewStore(config, chunk.StoreConfig{}, chunk.SchemaConfig{
Configs: []chunk.PeriodConfig{
Expand Down
65 changes: 0 additions & 65 deletions pkg/storage/stores/local/boltdb_index_client.go

This file was deleted.

Loading