Skip to content

Commit

Permalink
Updated cortex to latest master. (#1088)
Browse files Browse the repository at this point in the history
* Updated cortex to latest master.

* Updated cortex to latest master.

* Fix tests after updating cortex.

* go mod tidy and vendor
  • Loading branch information
pstibrany authored and cyriltovena committed Oct 9, 2019
1 parent d4c5b09 commit 3df5561
Show file tree
Hide file tree
Showing 115 changed files with 7,085 additions and 2,040 deletions.
10 changes: 4 additions & 6 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,9 @@ require (
github.com/Microsoft/go-winio v0.4.12 // indirect
github.com/blang/semver v3.5.1+incompatible // indirect
github.com/bmatcuk/doublestar v1.1.1
github.com/bradfitz/gomemcache v0.0.0-20180710155616-bc664df96737 // indirect
github.com/containerd/fifo v0.0.0-20190226154929-a9fb20d87448 // indirect
github.com/coreos/go-systemd v0.0.0-20190321100706-95778dfbb74e
github.com/cortexproject/cortex v0.2.0
github.com/cortexproject/cortex v0.2.1-0.20191003165238-857bb8476e59
github.com/davecgh/go-spew v1.1.1
github.com/docker/distribution v2.7.1+incompatible // indirect
github.com/docker/docker v0.0.0-20190607191414-238f8eaa31aa
Expand Down Expand Up @@ -40,11 +39,11 @@ require (
github.com/pkg/errors v0.8.1
github.com/prometheus/client_golang v1.1.0
github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4
github.com/prometheus/common v0.6.0
github.com/prometheus/prometheus v0.0.0-20190818123050-43acd0e2e93f
github.com/prometheus/common v0.7.0
github.com/prometheus/prometheus v1.8.2-0.20190918104050-8744afdd1ea0
github.com/shurcooL/httpfs v0.0.0-20190707220628-8d4bc4ba7749
github.com/shurcooL/vfsgen v0.0.0-20181202132449-6a9ea43bcacd
github.com/stretchr/testify v1.3.0
github.com/stretchr/testify v1.4.0
github.com/tonistiigi/fifo v0.0.0-20190226154929-a9fb20d87448
github.com/ugorji/go v1.1.7 // indirect
github.com/weaveworks/common v0.0.0-20190822150010-afb9996716e4
Expand All @@ -60,7 +59,6 @@ require (
google.golang.org/genproto v0.0.0-20190916214212-f660b8655731 // indirect
google.golang.org/grpc v1.23.1
gopkg.in/alecthomas/kingpin.v2 v2.2.6
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 // indirect
gopkg.in/fsnotify.v1 v1.4.7
gopkg.in/yaml.v2 v2.2.2
gotest.tools v2.2.0+incompatible // indirect
Expand Down
184 changes: 19 additions & 165 deletions go.sum

Large diffs are not rendered by default.

13 changes: 8 additions & 5 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,14 +232,17 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log
return nil, httpgrpc.Errorf(http.StatusTooManyRequests, "ingestion rate limit (%d) exceeded while adding %d lines", int(limiter.Limit()), lineCount)
}

replicationSets, err := d.ring.BatchGet(keys, ring.Write)
if err != nil {
return nil, err
}
const maxExpectedReplicationSet = 5 // typical replication factor 3 plus one for inactive plus one for luck
var descs [maxExpectedReplicationSet]ring.IngesterDesc

samplesByIngester := map[string][]*streamTracker{}
ingesterDescs := map[string]ring.IngesterDesc{}
for i, replicationSet := range replicationSets {
for i, key := range keys {
replicationSet, err := d.ring.Get(key, ring.Write, descs[:0])
if err != nil {
return nil, err
}

streams[i].minSuccess = len(replicationSet.Ingesters) - replicationSet.MaxErrors
streams[i].maxFailures = replicationSet.MaxErrors
for _, ingester := range replicationSet.Ingesters {
Expand Down
19 changes: 6 additions & 13 deletions pkg/distributor/distributor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,9 +133,10 @@ type mockRing struct {
replicationFactor uint32
}

func (r mockRing) Get(key uint32, op ring.Operation) (ring.ReplicationSet, error) {
func (r mockRing) Get(key uint32, op ring.Operation, buf []ring.IngesterDesc) (ring.ReplicationSet, error) {
result := ring.ReplicationSet{
MaxErrors: 1,
Ingesters: buf[:0],
}
for i := uint32(0); i < r.replicationFactor; i++ {
n := (key + i) % uint32(len(r.ingesters))
Expand All @@ -144,18 +145,6 @@ func (r mockRing) Get(key uint32, op ring.Operation) (ring.ReplicationSet, error
return result, nil
}

func (r mockRing) BatchGet(keys []uint32, op ring.Operation) ([]ring.ReplicationSet, error) {
result := []ring.ReplicationSet{}
for i := 0; i < len(keys); i++ {
rs, err := r.Get(keys[i], op)
if err != nil {
return nil, err
}
result = append(result, rs)
}
return result, nil
}

func (r mockRing) GetAll() (ring.ReplicationSet, error) {
return ring.ReplicationSet{
Ingesters: r.ingesters,
Expand All @@ -166,3 +155,7 @@ func (r mockRing) GetAll() (ring.ReplicationSet, error) {
func (r mockRing) ReplicationFactor() int {
return int(r.replicationFactor)
}

func (r mockRing) IngesterCount() int {
return len(r.ingesters)
}
9 changes: 7 additions & 2 deletions pkg/querier/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,15 +196,20 @@ func (q *Querier) Label(ctx context.Context, req *logproto.LabelRequest) (*logpr
return nil, err
}

userID, err := user.ExtractOrgID(ctx)
if err != nil {
return nil, err
}

from, through := model.TimeFromUnixNano(req.Start.UnixNano()), model.TimeFromUnixNano(req.End.UnixNano())
var storeValues []string
if req.Values {
storeValues, err = q.store.LabelValuesForMetricName(ctx, from, through, "logs", req.Name)
storeValues, err = q.store.LabelValuesForMetricName(ctx, userID, from, through, "logs", req.Name)
if err != nil {
return nil, err
}
} else {
storeValues, err = q.store.LabelNamesForMetricName(ctx, from, through, "logs")
storeValues, err = q.store.LabelNamesForMetricName(ctx, userID, from, through, "logs")
if err != nil {
return nil, err
}
Expand Down
22 changes: 13 additions & 9 deletions pkg/querier/querier_mock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,13 +176,13 @@ func (s *storeMock) LazyQuery(ctx context.Context, req logql.SelectParams) (iter
return args.Get(0).(iter.EntryIterator), args.Error(1)
}

func (s *storeMock) Get(ctx context.Context, from, through model.Time, matchers ...*labels.Matcher) ([]chunk.Chunk, error) {
args := s.Called(ctx, from, through, matchers)
func (s *storeMock) Get(ctx context.Context, userID string, from, through model.Time, matchers ...*labels.Matcher) ([]chunk.Chunk, error) {
args := s.Called(ctx, userID, from, through, matchers)
return args.Get(0).([]chunk.Chunk), args.Error(1)
}

func (s *storeMock) GetChunkRefs(ctx context.Context, from, through model.Time, matchers ...*labels.Matcher) ([][]chunk.Chunk, []*chunk.Fetcher, error) {
args := s.Called(ctx, from, through, matchers)
func (s *storeMock) GetChunkRefs(ctx context.Context, userID string, from, through model.Time, matchers ...*labels.Matcher) ([][]chunk.Chunk, []*chunk.Fetcher, error) {
args := s.Called(ctx, userID, from, through, matchers)
return args.Get(0).([][]chunk.Chunk), args.Get(0).([]*chunk.Fetcher), args.Error(2)
}

Expand All @@ -194,13 +194,13 @@ func (s *storeMock) PutOne(ctx context.Context, from, through model.Time, chunk
return errors.New("storeMock.PutOne() has not been mocked")
}

func (s *storeMock) LabelValuesForMetricName(ctx context.Context, from, through model.Time, metricName string, labelName string) ([]string, error) {
args := s.Called(ctx, from, through, metricName, labelName)
func (s *storeMock) LabelValuesForMetricName(ctx context.Context, userID string, from, through model.Time, metricName string, labelName string) ([]string, error) {
args := s.Called(ctx, userID, from, through, metricName, labelName)
return args.Get(0).([]string), args.Error(1)
}

func (s *storeMock) LabelNamesForMetricName(ctx context.Context, from, through model.Time, metricName string) ([]string, error) {
args := s.Called(ctx, from, through, metricName)
func (s *storeMock) LabelNamesForMetricName(ctx context.Context, userID string, from, through model.Time, metricName string) ([]string, error) {
args := s.Called(ctx, userID, from, through, metricName)
return args.Get(0).([]string), args.Error(1)
}

Expand Down Expand Up @@ -229,7 +229,7 @@ func (r *readRingMock) Describe(ch chan<- *prometheus.Desc) {
func (r *readRingMock) Collect(ch chan<- prometheus.Metric) {
}

func (r *readRingMock) Get(key uint32, op ring.Operation) (ring.ReplicationSet, error) {
func (r *readRingMock) Get(key uint32, op ring.Operation, buf []ring.IngesterDesc) (ring.ReplicationSet, error) {
return r.replicationSet, nil
}

Expand All @@ -245,6 +245,10 @@ func (r *readRingMock) ReplicationFactor() int {
return 1
}

func (r *readRingMock) IngesterCount() int {
return len(r.replicationSet.Ingesters)
}

func mockReadRingWithOneActiveIngester() *readRingMock {
return newReadRingMock([]ring.IngesterDesc{
{Addr: "test", Timestamp: time.Now().UnixNano(), State: ring.ACTIVE, Tokens: []uint32{1, 2, 3}},
Expand Down
2 changes: 1 addition & 1 deletion pkg/querier/querier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func TestQuerier_Label_QueryTimeoutConfigFlag(t *testing.T) {
ingesterClient.On("Label", mock.Anything, &request, mock.Anything).Return(mockLabelResponse([]string{}), nil)

store := newStoreMock()
store.On("LabelValuesForMetricName", mock.Anything, model.TimeFromUnixNano(startTime.UnixNano()), model.TimeFromUnixNano(endTime.UnixNano()), "logs", "test").Return([]string{"foo", "bar"}, nil)
store.On("LabelValuesForMetricName", mock.Anything, "test", model.TimeFromUnixNano(startTime.UnixNano()), model.TimeFromUnixNano(endTime.UnixNano()), "logs", "test").Return([]string{"foo", "bar"}, nil)

limits, err := validation.NewOverrides(defaultLimitsTestConfig())
require.NoError(t, err)
Expand Down
8 changes: 7 additions & 1 deletion pkg/storage/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (

"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/weaveworks/common/user"

"github.com/cortexproject/cortex/pkg/chunk"
"github.com/cortexproject/cortex/pkg/chunk/storage"
Expand Down Expand Up @@ -70,9 +71,14 @@ func (s *store) LazyQuery(ctx context.Context, req logql.SelectParams) (iter.Ent
return nil, err
}

userID, err := user.ExtractOrgID(ctx)
if err != nil {
return nil, err
}

matchers = append(matchers, nameLabelMatcher)
from, through := util.RoundToMilliseconds(req.Start, req.End)
chks, fetchers, err := s.GetChunkRefs(ctx, from, through, matchers...)
chks, fetchers, err := s.GetChunkRefs(ctx, userID, from, through, matchers...)
if err != nil {
return nil, err
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/storage/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -334,7 +334,8 @@ func Test_store_LazyQuery(t *testing.T) {
MaxChunkBatchSize: 10,
},
}
it, err := s.LazyQuery(context.Background(), logql.SelectParams{QueryRequest: tt.req})
ctx = user.InjectOrgID(context.Background(), "test-user")
it, err := s.LazyQuery(ctx, logql.SelectParams{QueryRequest: tt.req})
if err != nil {
t.Errorf("store.LazyQuery() error = %v", err)
return
Expand Down
8 changes: 4 additions & 4 deletions pkg/storage/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,14 +115,14 @@ func (m *mockChunkStore) Put(ctx context.Context, chunks []chunk.Chunk) error {
func (m *mockChunkStore) PutOne(ctx context.Context, from, through model.Time, chunk chunk.Chunk) error {
return nil
}
func (m *mockChunkStore) LabelValuesForMetricName(ctx context.Context, from, through model.Time, metricName string, labelName string) ([]string, error) {
func (m *mockChunkStore) LabelValuesForMetricName(ctx context.Context, userID string, from, through model.Time, metricName string, labelName string) ([]string, error) {
return nil, nil
}
func (m *mockChunkStore) LabelNamesForMetricName(ctx context.Context, from, through model.Time, metricName string) ([]string, error) {
func (m *mockChunkStore) LabelNamesForMetricName(ctx context.Context, userID string, from, through model.Time, metricName string) ([]string, error) {
return nil, nil
}
func (m *mockChunkStore) Stop() {}
func (m *mockChunkStore) Get(ctx context.Context, from, through model.Time, matchers ...*labels.Matcher) ([]chunk.Chunk, error) {
func (m *mockChunkStore) Get(ctx context.Context, userID string, from, through model.Time, matchers ...*labels.Matcher) ([]chunk.Chunk, error) {
return nil, nil
}

Expand All @@ -143,7 +143,7 @@ func (m *mockChunkStore) GetChunks(ctx context.Context, chunks []chunk.Chunk) ([
return res, nil
}

func (m *mockChunkStore) GetChunkRefs(ctx context.Context, from, through model.Time, matchers ...*labels.Matcher) ([][]chunk.Chunk, []*chunk.Fetcher, error) {
func (m *mockChunkStore) GetChunkRefs(ctx context.Context, userID string, from, through model.Time, matchers ...*labels.Matcher) ([][]chunk.Chunk, []*chunk.Fetcher, error) {
refs := make([]chunk.Chunk, 0, len(m.chunks))
// transform real chunks into ref chunks.
for _, c := range m.chunks {
Expand Down
35 changes: 11 additions & 24 deletions vendor/github.com/aws/aws-sdk-go/aws/client/default_retryer.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions vendor/github.com/aws/aws-sdk-go/aws/config.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 3df5561

Please sign in to comment.