diff --git a/Gopkg.lock b/Gopkg.lock index c31275b068b2f..28e587fe62e0a 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -186,8 +186,8 @@ version = "v2.0.0" [[projects]] - branch = "lazy-load-chunks" - digest = "1:a999c29b3a215dfc12d374a9aac09c94c1b72ef530f4e39d9ab3ae1468cfe8e8" + branch = "master" + digest = "1:d9488f98e486896b56f406cfa36bf8f0b9f049bdfa6c67b28cc2b20328010adc" name = "github.com/cortexproject/cortex" packages = [ "pkg/chunk", @@ -213,8 +213,7 @@ "pkg/util/validation", ] pruneopts = "UT" - revision = "61b92520b0c1afdef6e42b7a27cca6c715e9f386" - source = "https://github.com/grafana/cortex" + revision = "be63a81445db6e9481a577a70ca0623ef6f97873" [[projects]] digest = "1:ffe9824d294da03b391f44e1ae8281281b4afc1bdaa9588c9097785e3af10cec" @@ -1400,11 +1399,14 @@ "github.com/gorilla/websocket", "github.com/grpc-ecosystem/grpc-opentracing/go/otgrpc", "github.com/hpcloud/tail", + "github.com/jmespath/go-jmespath", + "github.com/mitchellh/mapstructure", "github.com/mwitkow/go-grpc-middleware", "github.com/opentracing/opentracing-go", "github.com/pkg/errors", "github.com/prometheus/client_golang/prometheus", "github.com/prometheus/client_golang/prometheus/promauto", + "github.com/prometheus/common/config", "github.com/prometheus/common/model", "github.com/prometheus/common/version", "github.com/prometheus/prometheus/discovery", diff --git a/Gopkg.toml b/Gopkg.toml index 35b9221f209b4..ba38388f4b670 100644 --- a/Gopkg.toml +++ b/Gopkg.toml @@ -26,8 +26,7 @@ [[constraint]] name = "github.com/cortexproject/cortex" - source = "https://github.com/grafana/cortex" - branch = "lazy-load-chunks" + branch = "master" [[constraint]] name = "github.com/weaveworks/common" diff --git a/vendor/github.com/cortexproject/cortex/pkg/chunk/aws/dynamodb_storage_client.go b/vendor/github.com/cortexproject/cortex/pkg/chunk/aws/dynamodb_storage_client.go index 3356eaa9ccbcf..7a4f9984c8e33 100644 --- a/vendor/github.com/cortexproject/cortex/pkg/chunk/aws/dynamodb_storage_client.go +++ b/vendor/github.com/cortexproject/cortex/pkg/chunk/aws/dynamodb_storage_client.go @@ -616,8 +616,27 @@ func processChunkResponse(response *dynamodb.BatchGetItemOutput, chunksByKey map return result, nil } +// PutChunkAndIndex implements chunk.ObjectAndIndexClient +// Combine both sets of writes before sending to DynamoDB, for performance +func (a dynamoDBStorageClient) PutChunkAndIndex(ctx context.Context, c chunk.Chunk, index chunk.WriteBatch) error { + dynamoDBWrites, err := a.writesForChunks([]chunk.Chunk{c}) + if err != nil { + return err + } + dynamoDBWrites.TakeReqs(index.(dynamoDBWriteBatch), 0) + return a.BatchWrite(ctx, dynamoDBWrites) +} + // PutChunks implements chunk.ObjectClient. func (a dynamoDBStorageClient) PutChunks(ctx context.Context, chunks []chunk.Chunk) error { + dynamoDBWrites, err := a.writesForChunks(chunks) + if err != nil { + return err + } + return a.BatchWrite(ctx, dynamoDBWrites) +} + +func (a dynamoDBStorageClient) writesForChunks(chunks []chunk.Chunk) (dynamoDBWriteBatch, error) { var ( dynamoDBWrites = dynamoDBWriteBatch{} ) @@ -625,19 +644,19 @@ func (a dynamoDBStorageClient) PutChunks(ctx context.Context, chunks []chunk.Chu for i := range chunks { buf, err := chunks[i].Encoded() if err != nil { - return err + return nil, err } key := chunks[i].ExternalKey() table, err := a.schemaCfg.ChunkTableFor(chunks[i].From) if err != nil { - return err + return nil, err } dynamoDBWrites.Add(table, key, placeholder, buf) } - return a.BatchWrite(ctx, dynamoDBWrites) + return dynamoDBWrites, nil } // Slice of values returned; map key is attribute name diff --git a/vendor/github.com/cortexproject/cortex/pkg/chunk/fixtures.go b/vendor/github.com/cortexproject/cortex/pkg/chunk/fixtures.go index 9c6898d23fae2..abc073279758b 100644 --- a/vendor/github.com/cortexproject/cortex/pkg/chunk/fixtures.go +++ b/vendor/github.com/cortexproject/cortex/pkg/chunk/fixtures.go @@ -33,7 +33,7 @@ func DefaultSchemaConfig(store, schema string, from model.Time) SchemaConfig { Configs: []PeriodConfig{{ IndexType: store, Schema: schema, - From: from, + From: DayTime{from}, ChunkTables: PeriodicTableConfig{ Prefix: "cortex", Period: 7 * 24 * time.Hour, diff --git a/vendor/github.com/cortexproject/cortex/pkg/chunk/local/fixtures.go b/vendor/github.com/cortexproject/cortex/pkg/chunk/local/fixtures.go index baf338d70c4eb..323a51ae8c130 100644 --- a/vendor/github.com/cortexproject/cortex/pkg/chunk/local/fixtures.go +++ b/vendor/github.com/cortexproject/cortex/pkg/chunk/local/fixtures.go @@ -51,7 +51,7 @@ func (f *fixture) Clients() ( schemaConfig = chunk.SchemaConfig{ Configs: []chunk.PeriodConfig{{ IndexType: "boltdb", - From: model.Now(), + From: chunk.DayTime{Time: model.Now()}, ChunkTables: chunk.PeriodicTableConfig{ Prefix: "chunks", Period: 10 * time.Minute, diff --git a/vendor/github.com/cortexproject/cortex/pkg/chunk/schema_config.go b/vendor/github.com/cortexproject/cortex/pkg/chunk/schema_config.go index aa522284852c0..40b499a9fa42d 100644 --- a/vendor/github.com/cortexproject/cortex/pkg/chunk/schema_config.go +++ b/vendor/github.com/cortexproject/cortex/pkg/chunk/schema_config.go @@ -25,16 +25,40 @@ const ( // PeriodConfig defines the schema and tables to use for a period of time type PeriodConfig struct { - From model.Time `yaml:"-"` // used when working with config - FromStr string `yaml:"from,omitempty"` // used when loading from yaml - IndexType string `yaml:"store"` // type of index client to use. - ObjectType string `yaml:"object_store"` // type of object client to use; if omitted, defaults to store. + From DayTime `yaml:"from"` // used when working with config + IndexType string `yaml:"store"` // type of index client to use. + ObjectType string `yaml:"object_store"` // type of object client to use; if omitted, defaults to store. Schema string `yaml:"schema"` IndexTables PeriodicTableConfig `yaml:"index"` ChunkTables PeriodicTableConfig `yaml:"chunks,omitempty"` RowShards uint32 `yaml:"row_shards"` } +// DayTime is a model.Time what holds day-aligned values, and marshals to/from +// YAML in YYYY-MM-DD format. +type DayTime struct { + model.Time +} + +// MarshalYAML implements yaml.Marshaller. +func (d DayTime) MarshalYAML() (interface{}, error) { + return d.Time.Time().Format("2006-01-02"), nil +} + +// UnmarshalYAML implements yaml.Unmarshaller. +func (d *DayTime) UnmarshalYAML(unmarshal func(interface{}) error) error { + var from string + if err := unmarshal(&from); err != nil { + return err + } + t, err := time.Parse("2006-01-02", from) + if err != nil { + return err + } + d.Time = model.TimeFromUnix(t.Unix()) + return nil +} + // SchemaConfig contains the config for our chunk index schemas type SchemaConfig struct { Configs []PeriodConfig `yaml:"configs"` @@ -98,8 +122,7 @@ func (cfg *SchemaConfig) translate() error { add := func(t string, f model.Time) { cfg.Configs = append(cfg.Configs, PeriodConfig{ - From: f, - FromStr: f.Time().Format("2006-01-02"), + From: DayTime{f}, Schema: t, IndexType: cfg.legacy.StorageClient, IndexTables: PeriodicTableConfig{ @@ -153,13 +176,13 @@ func (cfg *SchemaConfig) translate() error { // entries if necessary so there is an entry starting at t func (cfg *SchemaConfig) ForEachAfter(t model.Time, f func(config *PeriodConfig)) { for i := 0; i < len(cfg.Configs); i++ { - if t > cfg.Configs[i].From && - (i+1 == len(cfg.Configs) || t < cfg.Configs[i+1].From) { + if t > cfg.Configs[i].From.Time && + (i+1 == len(cfg.Configs) || t < cfg.Configs[i+1].From.Time) { // Split the i'th entry by duplicating then overwriting the From time cfg.Configs = append(cfg.Configs[:i+1], cfg.Configs[i:]...) - cfg.Configs[i+1].From = t + cfg.Configs[i+1].From = DayTime{t} } - if cfg.Configs[i].From >= t { + if cfg.Configs[i].From.Time >= t { f(&cfg.Configs[i]) } } @@ -211,25 +234,11 @@ func (cfg *SchemaConfig) Load() error { decoder := yaml.NewDecoder(f) decoder.SetStrict(true) - if err := decoder.Decode(&cfg); err != nil { - return err - } - for i := range cfg.Configs { - t, err := time.Parse("2006-01-02", cfg.Configs[i].FromStr) - if err != nil { - return err - } - cfg.Configs[i].From = model.TimeFromUnix(t.Unix()) - } - - return nil + return decoder.Decode(&cfg) } // PrintYaml dumps the yaml to stdout, to aid in migration func (cfg SchemaConfig) PrintYaml() { - for i := range cfg.Configs { - cfg.Configs[i].FromStr = cfg.Configs[i].From.Time().Format("2006-01-02") - } encoder := yaml.NewEncoder(os.Stdout) encoder.Encode(cfg) } @@ -425,7 +434,7 @@ func (cfg *PeriodicTableConfig) periodicTables(from, through model.Time, pCfg Pr // ChunkTableFor calculates the chunk table shard for a given point in time. func (cfg SchemaConfig) ChunkTableFor(t model.Time) (string, error) { for i := range cfg.Configs { - if t >= cfg.Configs[i].From && (i+1 == len(cfg.Configs) || t < cfg.Configs[i+1].From) { + if t >= cfg.Configs[i].From.Time && (i+1 == len(cfg.Configs) || t < cfg.Configs[i+1].From.Time) { return cfg.Configs[i].ChunkTables.TableFor(t), nil } } diff --git a/vendor/github.com/cortexproject/cortex/pkg/chunk/series_store.go b/vendor/github.com/cortexproject/cortex/pkg/chunk/series_store.go index a7d618c3854bd..fc12849f9223c 100644 --- a/vendor/github.com/cortexproject/cortex/pkg/chunk/series_store.go +++ b/vendor/github.com/cortexproject/cortex/pkg/chunk/series_store.go @@ -353,21 +353,25 @@ func (c *seriesStore) Put(ctx context.Context, chunks []Chunk) error { func (c *seriesStore) PutOne(ctx context.Context, from, through model.Time, chunk Chunk) error { chunks := []Chunk{chunk} - err := c.storage.PutChunks(ctx, chunks) - if err != nil { - return err - } - - c.writeBackCache(ctx, chunks) - writeReqs, keysToCache, err := c.calculateIndexEntries(from, through, chunk) if err != nil { return err } - if err := c.index.BatchWrite(ctx, writeReqs); err != nil { - return err + if oic, ok := c.storage.(ObjectAndIndexClient); ok { + if err = oic.PutChunkAndIndex(ctx, chunk, writeReqs); err != nil { + return err + } + } else { + err := c.storage.PutChunks(ctx, chunks) + if err != nil { + return err + } + if err := c.index.BatchWrite(ctx, writeReqs); err != nil { + return err + } } + c.writeBackCache(ctx, chunks) bufs := make([][]byte, len(keysToCache)) c.writeDedupeCache.Store(ctx, keysToCache, bufs) diff --git a/vendor/github.com/cortexproject/cortex/pkg/chunk/storage_client.go b/vendor/github.com/cortexproject/cortex/pkg/chunk/storage_client.go index 0448239db3c2e..802173f88a751 100644 --- a/vendor/github.com/cortexproject/cortex/pkg/chunk/storage_client.go +++ b/vendor/github.com/cortexproject/cortex/pkg/chunk/storage_client.go @@ -22,6 +22,11 @@ type ObjectClient interface { GetChunks(ctx context.Context, chunks []Chunk) ([]Chunk, error) } +// ObjectAndIndexClient allows optimisations where the same client handles both +type ObjectAndIndexClient interface { + PutChunkAndIndex(ctx context.Context, c Chunk, index WriteBatch) error +} + // WriteBatch represents a batch of writes. type WriteBatch interface { Add(tableName, hashValue string, rangeValue []byte, value []byte) diff --git a/vendor/github.com/cortexproject/cortex/pkg/chunk/table_manager.go b/vendor/github.com/cortexproject/cortex/pkg/chunk/table_manager.go index b32034001ffee..9bbe325da24d7 100644 --- a/vendor/github.com/cortexproject/cortex/pkg/chunk/table_manager.go +++ b/vendor/github.com/cortexproject/cortex/pkg/chunk/table_manager.go @@ -195,7 +195,7 @@ func (m *TableManager) calculateExpectedTables() []TableDesc { result := []TableDesc{} for i, config := range m.schemaCfg.Configs { - if config.From.Time().After(mtime.Now()) { + if config.From.Time.Time().After(mtime.Now()) { continue } if config.IndexTables.Period == 0 { // non-periodic table @@ -240,18 +240,18 @@ func (m *TableManager) calculateExpectedTables() []TableDesc { } else { endTime := mtime.Now().Add(m.cfg.CreationGracePeriod) if i+1 < len(m.schemaCfg.Configs) { - nextFrom := m.schemaCfg.Configs[i+1].From.Time() + nextFrom := m.schemaCfg.Configs[i+1].From.Time.Time() if endTime.After(nextFrom) { endTime = nextFrom } } endModelTime := model.TimeFromUnix(endTime.Unix()) result = append(result, config.IndexTables.periodicTables( - config.From, endModelTime, m.cfg.IndexTables, m.cfg.CreationGracePeriod, m.maxChunkAge, m.cfg.RetentionPeriod, + config.From.Time, endModelTime, m.cfg.IndexTables, m.cfg.CreationGracePeriod, m.maxChunkAge, m.cfg.RetentionPeriod, )...) if config.ChunkTables.Prefix != "" { result = append(result, config.ChunkTables.periodicTables( - config.From, endModelTime, m.cfg.ChunkTables, m.cfg.CreationGracePeriod, m.maxChunkAge, m.cfg.RetentionPeriod, + config.From.Time, endModelTime, m.cfg.ChunkTables, m.cfg.CreationGracePeriod, m.maxChunkAge, m.cfg.RetentionPeriod, )...) } } diff --git a/vendor/github.com/cortexproject/cortex/pkg/ingester/client/pool.go b/vendor/github.com/cortexproject/cortex/pkg/ingester/client/pool.go index 56e9b7d42e2f6..303c78f78c289 100644 --- a/vendor/github.com/cortexproject/cortex/pkg/ingester/client/pool.go +++ b/vendor/github.com/cortexproject/cortex/pkg/ingester/client/pool.go @@ -30,8 +30,8 @@ type Factory func(addr string) (grpc_health_v1.HealthClient, error) // PoolConfig is config for creating a Pool. type PoolConfig struct { - ClientCleanupPeriod time.Duration - HealthCheckIngesters bool + ClientCleanupPeriod time.Duration `yaml:"client_cleanup_period,omitempty"` + HealthCheckIngesters bool `yaml:"health_check_ingesters,omitempty"` RemoteTimeout time.Duration } diff --git a/vendor/github.com/cortexproject/cortex/pkg/util/validation/limits.go b/vendor/github.com/cortexproject/cortex/pkg/util/validation/limits.go index 0837b86ab1c3a..733a65cd56c7f 100644 --- a/vendor/github.com/cortexproject/cortex/pkg/util/validation/limits.go +++ b/vendor/github.com/cortexproject/cortex/pkg/util/validation/limits.go @@ -32,8 +32,8 @@ type Limits struct { CardinalityLimit int `yaml:"cardinality_limit"` // Config for overrides, convenient if it goes here. - PerTenantOverrideConfig string - PerTenantOverridePeriod time.Duration + PerTenantOverrideConfig string `yaml:"per_tenant_override_config"` + PerTenantOverridePeriod time.Duration `yaml:"per_tenant_override_period"` } // RegisterFlags adds the flags required to config this to the given FlagSet