Skip to content

Commit

Permalink
Update loki to cortex master (#2030)
Browse files Browse the repository at this point in the history
  • Loading branch information
adityacs authored May 13, 2020
1 parent a8b94ae commit 199e334
Show file tree
Hide file tree
Showing 257 changed files with 8,082 additions and 3,383 deletions.
16 changes: 8 additions & 8 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ require (
github.com/c2h5oh/datasize v0.0.0-20200112174442-28bbd4740fee
github.com/containerd/containerd v1.3.2 // indirect
github.com/containerd/fifo v0.0.0-20190226154929-a9fb20d87448 // indirect
github.com/coreos/go-systemd v0.0.0-20190321100706-95778dfbb74e
github.com/cortexproject/cortex v1.0.1-0.20200424135841-64fb9ad94a38
github.com/coreos/go-systemd v0.0.0-20191104093116-d3cd4ed1dbcf
github.com/cortexproject/cortex v1.0.1-0.20200430170006-3462eb63f324
github.com/davecgh/go-spew v1.1.1
github.com/docker/distribution v2.7.1+incompatible // indirect
github.com/docker/docker v0.7.3-0.20190817195342-4760db040282
Expand All @@ -20,11 +20,11 @@ require (
github.com/fatih/color v1.7.0
github.com/fluent/fluent-bit-go v0.0.0-20190925192703-ea13c021720c
github.com/frankban/quicktest v1.7.2 // indirect
github.com/go-kit/kit v0.9.0
github.com/go-logfmt/logfmt v0.4.0
github.com/go-kit/kit v0.10.0
github.com/go-logfmt/logfmt v0.5.0
github.com/gogo/protobuf v1.3.1 // remember to update loki-build-image/Dockerfile too
github.com/golang/snappy v0.0.1
github.com/gorilla/mux v1.7.1
github.com/gorilla/mux v1.7.3
github.com/gorilla/websocket v1.4.0
github.com/grpc-ecosystem/go-grpc-middleware v1.1.0
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.1-0.20191002090509-6af20e3a5340 // indirect
Expand All @@ -51,14 +51,14 @@ require (
github.com/tonistiigi/fifo v0.0.0-20190226154929-a9fb20d87448
github.com/uber/jaeger-client-go v2.20.1+incompatible
github.com/ugorji/go v1.1.7 // indirect
github.com/weaveworks/common v0.0.0-20200310113808-2708ba4e60a4
github.com/weaveworks/common v0.0.0-20200429090833-ac38719f57dd
go.etcd.io/bbolt v1.3.3
go.etcd.io/etcd v0.0.0-20200401174654-e694b7bb0875 // indirect
golang.org/x/net v0.0.0-20200226121028-0de0cce0169b
google.golang.org/grpc v1.25.1
google.golang.org/grpc v1.26.0
gopkg.in/alecthomas/kingpin.v2 v2.2.6
gopkg.in/fsnotify.v1 v1.4.7
gopkg.in/yaml.v2 v2.2.7
gopkg.in/yaml.v2 v2.2.8
k8s.io/klog v1.0.0
)

Expand Down
128 changes: 122 additions & 6 deletions go.sum

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion pkg/distributor/distributor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,7 @@ func (r mockRing) Get(key uint32, op ring.Operation, buf []ring.IngesterDesc) (r
return result, nil
}

func (r mockRing) GetAll() (ring.ReplicationSet, error) {
func (r mockRing) GetAll(op ring.Operation) (ring.ReplicationSet, error) {
return ring.ReplicationSet{
Ingesters: r.ingesters,
MaxErrors: 1,
Expand Down
3 changes: 2 additions & 1 deletion pkg/loki/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/cortexproject/cortex/pkg/chunk"
"github.com/cortexproject/cortex/pkg/chunk/storage"
"github.com/cortexproject/cortex/pkg/cortex"
cortex_querier "github.com/cortexproject/cortex/pkg/querier"
"github.com/cortexproject/cortex/pkg/querier/frontend"
"github.com/cortexproject/cortex/pkg/ring"
"github.com/cortexproject/cortex/pkg/ring/kv/codec"
Expand Down Expand Up @@ -158,7 +159,7 @@ func (t *Loki) initDistributor() (services.Service, error) {

func (t *Loki) initQuerier() (services.Service, error) {
level.Debug(util.Logger).Log("msg", "initializing querier worker", "config", fmt.Sprintf("%+v", t.cfg.Worker))
worker, err := frontend.NewWorker(t.cfg.Worker, httpgrpc_server.NewServer(t.server.HTTPServer.Handler), util.Logger)
worker, err := frontend.NewWorker(t.cfg.Worker, cortex_querier.Config{MaxConcurrent: t.cfg.Querier.MaxConcurrent}, httpgrpc_server.NewServer(t.server.HTTPServer.Handler), util.Logger)
if err != nil {
return nil, err
}
Expand Down
8 changes: 5 additions & 3 deletions pkg/querier/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ type Config struct {
ExtraQueryDelay time.Duration `yaml:"extra_query_delay,omitempty"`
IngesterMaxQueryLookback time.Duration `yaml:"query_ingesters_within,omitempty"`
Engine logql.EngineOpts `yaml:"engine,omitempty"`
MaxConcurrent int `yaml:"max_concurrent"`
}

// RegisterFlags register flags.
Expand All @@ -51,6 +52,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
f.DurationVar(&cfg.QueryTimeout, "querier.query_timeout", 1*time.Minute, "Timeout when querying backends (ingesters or storage) during the execution of a query request")
f.DurationVar(&cfg.ExtraQueryDelay, "distributor.extra-query-delay", 0, "Time to wait before sending more than the minimum successful query requests.")
f.DurationVar(&cfg.IngesterMaxQueryLookback, "querier.query-ingesters-within", 0, "Maximum lookback beyond which queries are not sent to ingester. 0 means all queries are sent to ingester.")
f.IntVar(&cfg.MaxConcurrent, "querier.max-concurrent", 20, "The maximum number of concurrent queries.")
}

// Querier handlers queries.
Expand Down Expand Up @@ -99,7 +101,7 @@ type responseFromIngesters struct {
// forAllIngesters runs f, in parallel, for all ingesters
// TODO taken from Cortex, see if we can refactor out an usable interface.
func (q *Querier) forAllIngesters(ctx context.Context, f func(logproto.QuerierClient) (interface{}, error)) ([]responseFromIngesters, error) {
replicationSet, err := q.ring.GetAll()
replicationSet, err := q.ring.GetAll(ring.Read)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -334,7 +336,7 @@ func (q *Querier) tailDisconnectedIngesters(ctx context.Context, req *logproto.T
}

// Get the current replication set from the ring
replicationSet, err := q.ring.GetAll()
replicationSet, err := q.ring.GetAll(ring.Read)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -527,7 +529,7 @@ func (q *Querier) checkTailRequestLimit(ctx context.Context) error {
return err
}

replicationSet, err := q.ring.GetAll()
replicationSet, err := q.ring.GetAll(ring.Read)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/querier/querier_mock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,7 @@ func (r *readRingMock) BatchGet(keys []uint32, op ring.Operation) ([]ring.Replic
return []ring.ReplicationSet{r.replicationSet}, nil
}

func (r *readRingMock) GetAll() (ring.ReplicationSet, error) {
func (r *readRingMock) GetAll(op ring.Operation) (ring.ReplicationSet, error) {
return r.replicationSet, nil
}

Expand Down
27 changes: 20 additions & 7 deletions pkg/querier/queryrange/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,29 @@ import (
"github.com/grafana/loki/pkg/logql/stats"
)

var jsonStd = jsoniter.ConfigCompatibleWithStandardLibrary
var (
jsonStd = jsoniter.ConfigCompatibleWithStandardLibrary
extractor = queryrange.PrometheusResponseExtractor{}
)

// PrometheusExtractor implements Extractor interface
type PrometheusExtractor struct{}

// prometheusResponseExtractor wraps the original prometheus cache extractor.
// Statistics are discarded when using cache entries.
var prometheusResponseExtractor = queryrange.ExtractorFunc(func(start, end int64, from queryrange.Response) queryrange.Response {
// Extract wraps the original prometheus cache extractor
func (PrometheusExtractor) Extract(start, end int64, from queryrange.Response) queryrange.Response {
response := extractor.Extract(start, end, from.(*LokiPromResponse).Response)
return &LokiPromResponse{
Response: queryrange.PrometheusResponseExtractor.
Extract(start, end, from.(*LokiPromResponse).Response).(*queryrange.PrometheusResponse),
Response: response.(*queryrange.PrometheusResponse),
}
})
}

// ResponseWithoutHeaders wraps the original prometheus caching without headers
func (PrometheusExtractor) ResponseWithoutHeaders(resp queryrange.Response) queryrange.Response {
response := extractor.ResponseWithoutHeaders(resp.(*LokiPromResponse).Response)
return &LokiPromResponse{
Response: response.(*queryrange.PrometheusResponse),
}
}

// encode encodes a Prometheus response and injects Loki stats.
func (p *LokiPromResponse) encode(ctx context.Context) (*http.Response, error) {
Expand Down
3 changes: 2 additions & 1 deletion pkg/querier/queryrange/roundtrip.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func NewTripperware(cfg Config, log log.Logger, limits Limits, registerer promet
instrumentMetrics := queryrange.NewInstrumentMiddlewareMetrics(registerer)
retryMetrics := queryrange.NewRetryMiddlewareMetrics(registerer)

metricsTripperware, cache, err := NewMetricTripperware(cfg, log, limits, lokiCodec, prometheusResponseExtractor, instrumentMetrics, retryMetrics)
metricsTripperware, cache, err := NewMetricTripperware(cfg, log, limits, lokiCodec, PrometheusExtractor{}, instrumentMetrics, retryMetrics)
if err != nil {
return nil, nil, err
}
Expand Down Expand Up @@ -205,6 +205,7 @@ func NewMetricTripperware(
limits,
codec,
extractor,
nil,
)
if err != nil {
return nil, nil, err
Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ type store struct {

// NewStore creates a new Loki Store using configuration supplied.
func NewStore(cfg Config, storeCfg chunk.StoreConfig, schemaCfg chunk.SchemaConfig, limits storage.StoreLimits, registerer prometheus.Registerer) (Store, error) {
s, err := storage.NewStore(cfg.Config, storeCfg, schemaCfg, limits, registerer)
s, err := storage.NewStore(cfg.Config, storeCfg, schemaCfg, limits, registerer, nil)
if err != nil {
return nil, err
}
Expand Down
12 changes: 0 additions & 12 deletions pkg/storage/stores/local/boltdb_shipper_table_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,6 @@ func TestBoltDBShipperTableClient(t *testing.T) {
err = tableClient.DeleteTable(context.Background(), "table1")
require.NoError(t, err)

// cortex does not omit empty directories from the list
// ToDo: change the code in cortex to remove empty directories from the list
ensureEmptyAndRemoveDirectory(t, path.Join(tempDir, storageKeyPrefix, "table1"))

delete(foldersWithFiles, "table1")
checkExpectedTables(t, tableClient, foldersWithFiles)
}
Expand All @@ -73,11 +69,3 @@ func checkExpectedTables(t *testing.T, tableClient chunk.TableClient, expectedTa
require.True(t, ok)
}
}

func ensureEmptyAndRemoveDirectory(t *testing.T, directory string) {
filesInfo, err := ioutil.ReadDir(directory)
require.NoError(t, err)
require.Len(t, filesInfo, 0)

require.NoError(t, os.Remove(directory))
}
7 changes: 4 additions & 3 deletions vendor/github.com/aws/aws-sdk-go/aws/arn/arn.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion vendor/github.com/aws/aws-sdk-go/aws/csm/reporter.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 199e334

Please sign in to comment.