Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

update cortex to latest master #1956

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ require (
github.com/containerd/containerd v1.3.2 // indirect
github.com/containerd/fifo v0.0.0-20190226154929-a9fb20d87448 // indirect
github.com/coreos/go-systemd v0.0.0-20190321100706-95778dfbb74e
github.com/cortexproject/cortex v1.0.0
github.com/cortexproject/cortex v1.0.1-0.20200416152925-3fe04dcff1d8
github.com/davecgh/go-spew v1.1.1
github.com/docker/distribution v2.7.1+incompatible // indirect
github.com/docker/docker v0.7.3-0.20190817195342-4760db040282
Expand Down Expand Up @@ -47,7 +47,6 @@ require (
github.com/shurcooL/httpfs v0.0.0-20190707220628-8d4bc4ba7749
github.com/shurcooL/vfsgen v0.0.0-20181202132449-6a9ea43bcacd
github.com/stretchr/testify v1.5.1
github.com/thanos-io/thanos v0.11.0 // indirect
github.com/tonistiigi/fifo v0.0.0-20190226154929-a9fb20d87448
github.com/uber/jaeger-client-go v2.20.1+incompatible
github.com/ugorji/go v1.1.7 // indirect
Expand Down
10 changes: 4 additions & 6 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -163,8 +163,8 @@ github.com/coreos/pkg v0.0.0-20160727233714-3ac0863d7acf/go.mod h1:E3G3o1h8I7cfc
github.com/coreos/pkg v0.0.0-20180928190104-399ea9e2e55f h1:lBNOc5arjvs8E5mO2tbpBpLoyyu8B6e44T7hJy6potg=
github.com/coreos/pkg v0.0.0-20180928190104-399ea9e2e55f/go.mod h1:E3G3o1h8I7cfcXa63jLwjI0eiQQMgzzUDFVpN/nH/eA=
github.com/cortexproject/cortex v0.6.1-0.20200228110116-92ab6cbe0995/go.mod h1:3Xa3DjJxtpXqxcMGdk850lcIRb81M0fyY1MQ6udY134=
github.com/cortexproject/cortex v1.0.0 h1:SbvD/LBbp50bQBq+lMwYoS91I6DUMbRKaYxE6UmSEa0=
github.com/cortexproject/cortex v1.0.0/go.mod h1:KixgGK5GO7YVo48k37rvHOEQlwpDCqHSPX2Mv2IuJMY=
github.com/cortexproject/cortex v1.0.1-0.20200416152925-3fe04dcff1d8 h1:A7nGtA5pj10j5bwbLPqf5C+WAhVzFaOt1c/uen6202o=
github.com/cortexproject/cortex v1.0.1-0.20200416152925-3fe04dcff1d8/go.mod h1:5NXU+UpV8NW6I3teskVmxn45xcq4+IbtSOINfRf+jds=
github.com/creack/pty v1.1.7/go.mod h1:lj5s0c3V2DBrqTV7llrYr5NG6My20zk30Fl46Y7DoTY=
github.com/cznic/b v0.0.0-20180115125044-35e9bbe41f07/go.mod h1:URriBxXwVq5ijiJ12C7iIZqlA69nTlI+LgI6/pwftG8=
github.com/cznic/fileutil v0.0.0-20180108211300-6a051e75936f/go.mod h1:8S58EK26zhXSxzv7NQFpnliaOQsmDUxvoQO3rt154Vg=
Expand Down Expand Up @@ -701,7 +701,6 @@ github.com/prometheus/procfs v0.0.8/go.mod h1:7Qr8sr6344vo1JqZ6HhLceV9o3AJ1Ff+Gx
github.com/prometheus/prometheus v0.0.0-20180315085919-58e2a31db8de/go.mod h1:oAIUtOny2rjMX0OWN5vPR5/q/twIROJvdqnQKDdil/s=
github.com/prometheus/prometheus v0.0.0-20190818123050-43acd0e2e93f/go.mod h1:rMTlmxGCvukf2KMu3fClMDKLLoJ5hl61MhcJ7xKakf0=
github.com/prometheus/prometheus v1.8.2-0.20200107122003-4708915ac6ef/go.mod h1:7U90zPoLkWjEIQcy/rweQla82OCTUzxVHE51G3OhJbI=
github.com/prometheus/prometheus v1.8.2-0.20200110114423-1e64d757f711/go.mod h1:7U90zPoLkWjEIQcy/rweQla82OCTUzxVHE51G3OhJbI=
github.com/prometheus/prometheus v1.8.2-0.20200213233353-b90be6f32a33 h1:HBYrMJj5iosUjUkAK9L5GO+5eEQXbcrzdjkqY9HV5W4=
github.com/prometheus/prometheus v1.8.2-0.20200213233353-b90be6f32a33/go.mod h1:fkIPPkuZnkXyopYHmXPxf9rgiPkVgZCN8w9o8+UgBlY=
github.com/rafaeljusto/redigomock v0.0.0-20190202135759-257e089e14a1 h1:+kGqA4dNN5hn7WwvKdzHl0rdN5AEkbNZd0VjRltAiZg=
Expand Down Expand Up @@ -767,9 +766,8 @@ github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81P
github.com/stretchr/testify v1.5.1 h1:nOGnQDM7FYENwehXlg/kFVnos3rEvtKTjRvOWSzb6H4=
github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA=
github.com/thanos-io/thanos v0.8.1-0.20200109203923-552ffa4c1a0d/go.mod h1:usT/TxtJQ7DzinTt+G9kinDQmRS5sxwu0unVKZ9vdcw=
github.com/thanos-io/thanos v0.8.1-0.20200326105947-214ff4480e93/go.mod h1:PeLHoE5XdPZss/3eLvuxDCFXnM6Sd2Kh+saQIRJVtBE=
github.com/thanos-io/thanos v0.11.0 h1:UkWLa93sihcxCofelRH/NBGQxFyFU73eXIr2a+dwOFM=
github.com/thanos-io/thanos v0.11.0/go.mod h1:N/Yes7J68KqvmY+xM6J5CJqEvWIvKSR5sqGtmuD6wDc=
github.com/thanos-io/thanos v0.12.1-0.20200416112106-b391ca115ed8 h1:z7sOhoCEWnrQ2MIew3cJxsaxKT0AQu5pgXA8ZjdrYlk=
github.com/thanos-io/thanos v0.12.1-0.20200416112106-b391ca115ed8/go.mod h1:+nN9AzmfaIH2e2KJGyRxX0BoUGrRSyZmp+U8ToRxlDc=
github.com/tidwall/pretty v0.0.0-20180105212114-65a9db5fad51/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhVysOjyk=
github.com/tidwall/pretty v1.0.0/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhVysOjyk=
github.com/tmc/grpc-websocket-proxy v0.0.0-20170815181823-89b8d40f7ca8/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U=
Expand Down
10 changes: 5 additions & 5 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ import (
"time"

cortex_distributor "github.com/cortexproject/cortex/pkg/distributor"
cortex_client "github.com/cortexproject/cortex/pkg/ingester/client"
"github.com/cortexproject/cortex/pkg/ring"
ring_client "github.com/cortexproject/cortex/pkg/ring/client"
cortex_util "github.com/cortexproject/cortex/pkg/util"
"github.com/cortexproject/cortex/pkg/util/limiter"
"github.com/cortexproject/cortex/pkg/util/services"
Expand Down Expand Up @@ -65,7 +65,7 @@ type Config struct {
DistributorRing cortex_distributor.RingConfig `yaml:"ring,omitempty"`

// For testing.
factory func(addr string) (grpc_health_v1.HealthClient, error) `yaml:"-"`
factory ring_client.PoolFactory `yaml:"-"`
}

// RegisterFlags registers the flags.
Expand All @@ -79,7 +79,7 @@ type Distributor struct {
clientCfg client.Config
ingestersRing ring.ReadRing
validator *Validator
pool *cortex_client.Pool
pool *ring_client.Pool

// The global rate limiter requires a distributors ring to count
// the number of healthy instances.
Expand All @@ -93,7 +93,7 @@ type Distributor struct {
func New(cfg Config, clientCfg client.Config, ingestersRing ring.ReadRing, overrides *validation.Overrides) (*Distributor, error) {
factory := cfg.factory
if factory == nil {
factory = func(addr string) (grpc_health_v1.HealthClient, error) {
factory = func(addr string) (ring_client.PoolClient, error) {
return client.New(clientCfg, addr)
}
}
Expand Down Expand Up @@ -137,7 +137,7 @@ func New(cfg Config, clientCfg client.Config, ingestersRing ring.ReadRing, overr
ingestersRing: ingestersRing,
distributorsRing: distributorsRing,
validator: validator,
pool: cortex_client.NewPool(clientCfg.PoolConfig, ingestersRing, factory, cortex_util.Logger),
pool: cortex_distributor.NewPool(clientCfg.PoolConfig, ingestersRing, factory, cortex_util.Logger),
ingestionRateLimiter: limiter.NewRateLimiter(ingestionRateStrategy, 10*time.Second),
}

Expand Down
7 changes: 6 additions & 1 deletion pkg/distributor/distributor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"time"

"github.com/cortexproject/cortex/pkg/ring"
ring_client "github.com/cortexproject/cortex/pkg/ring/client"
"github.com/cortexproject/cortex/pkg/ring/kv"
"github.com/cortexproject/cortex/pkg/ring/kv/consul"
"github.com/cortexproject/cortex/pkg/util/flagext"
Expand Down Expand Up @@ -219,7 +220,7 @@ func prepare(t *testing.T, limits *validation.Limits, kvStore kv.Client) *Distri
distributorConfig.DistributorRing.InstanceID = strconv.Itoa(rand.Int())
distributorConfig.DistributorRing.KVStore.Mock = kvStore
distributorConfig.DistributorRing.InstanceInterfaceNames = []string{"eth0", "en0", "lo0"}
distributorConfig.factory = func(addr string) (grpc_health_v1.HealthClient, error) {
distributorConfig.factory = func(addr string) (ring_client.PoolClient, error) {
return ingesters[addr], nil
}

Expand Down Expand Up @@ -260,6 +261,10 @@ func (i *mockIngester) Push(ctx context.Context, in *logproto.PushRequest, opts
return nil, nil
}

func (i *mockIngester) Close() error {
return nil
}

// Copied from Cortex; TODO(twilkie) - factor this our and share it.
// mockRing doesn't do virtual nodes, just returns mod(key) + replicationFactor
// ingesters.
Expand Down
36 changes: 22 additions & 14 deletions pkg/ingester/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,35 +5,49 @@ import (
"io"
"time"

cortex_client "github.com/cortexproject/cortex/pkg/ingester/client"
"github.com/cortexproject/cortex/pkg/distributor"
"github.com/cortexproject/cortex/pkg/util/grpcclient"
"github.com/grpc-ecosystem/grpc-opentracing/go/otgrpc"
opentracing "github.com/opentracing/opentracing-go"
"github.com/opentracing/opentracing-go"
"github.com/weaveworks/common/middleware"
"google.golang.org/grpc"
"google.golang.org/grpc/health/grpc_health_v1"

"github.com/grafana/loki/pkg/logproto"
)

type HealthAndIngesterClient interface {
logproto.IngesterClient
grpc_health_v1.HealthClient
Close() error
}

type ClosableHealthAndIngesterClient struct {
logproto.PusherClient
logproto.QuerierClient
logproto.IngesterClient
grpc_health_v1.HealthClient
io.Closer
}

// Config for an ingester client.
type Config struct {
PoolConfig cortex_client.PoolConfig `yaml:"pool_config,omitempty"`
RemoteTimeout time.Duration `yaml:"remote_timeout,omitempty"`
GRPCClientConfig grpcclient.Config `yaml:"grpc_client_config"`
PoolConfig distributor.PoolConfig `yaml:"pool_config,omitempty"`
RemoteTimeout time.Duration `yaml:"remote_timeout,omitempty"`
GRPCClientConfig grpcclient.Config `yaml:"grpc_client_config"`
}

// RegisterFlags registers flags.
func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
cfg.GRPCClientConfig.RegisterFlags("ingester.client", f)
cfg.GRPCClientConfig.RegisterFlagsWithPrefix("ingester.client", f)
cfg.PoolConfig.RegisterFlags(f)

f.DurationVar(&cfg.PoolConfig.RemoteTimeout, "ingester.client.healthcheck-timeout", 1*time.Second, "Timeout for healthcheck rpcs.")
f.DurationVar(&cfg.RemoteTimeout, "ingester.client.timeout", 5*time.Second, "Timeout for ingester client RPCs.")
}

// New returns a new ingester client.
func New(cfg Config, addr string) (grpc_health_v1.HealthClient, error) {
func New(cfg Config, addr string) (HealthAndIngesterClient, error) {
opts := []grpc.DialOption{
grpc.WithInsecure(),
grpc.WithDefaultCallOptions(cfg.GRPCClientConfig.CallOptions()...),
Expand All @@ -43,13 +57,7 @@ func New(cfg Config, addr string) (grpc_health_v1.HealthClient, error) {
if err != nil {
return nil, err
}
return struct {
logproto.PusherClient
logproto.QuerierClient
logproto.IngesterClient
grpc_health_v1.HealthClient
io.Closer
}{
return ClosableHealthAndIngesterClient{
PusherClient: logproto.NewPusherClient(conn),
QuerierClient: logproto.NewQuerierClient(conn),
IngesterClient: logproto.NewIngesterClient(conn),
Expand Down
2 changes: 1 addition & 1 deletion pkg/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ type Config struct {
MaxReturnedErrors int `yaml:"max_returned_stream_errors"`

// For testing, you can override the address and ID of this ingester.
ingesterClientFactory func(cfg client.Config, addr string) (grpc_health_v1.HealthClient, error)
ingesterClientFactory func(cfg client.Config, addr string) (client.HealthAndIngesterClient, error)
}

// RegisterFlags registers the flags.
Expand Down
12 changes: 3 additions & 9 deletions pkg/ingester/transfer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,13 @@ import (
"github.com/cortexproject/cortex/pkg/ring/kv"
"github.com/cortexproject/cortex/pkg/util"
"github.com/cortexproject/cortex/pkg/util/services"

"github.com/go-kit/kit/log/level"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/weaveworks/common/user"
"golang.org/x/net/context"
"google.golang.org/grpc"
"google.golang.org/grpc/health/grpc_health_v1"

"github.com/grafana/loki/pkg/ingester/client"
"github.com/grafana/loki/pkg/logproto"
Expand Down Expand Up @@ -141,19 +141,13 @@ func (f *testIngesterFactory) getIngester(joinAfter time.Duration, t *testing.T)
cfg.LifecyclerConfig.JoinAfter = joinAfter
cfg.LifecyclerConfig.Addr = cfg.LifecyclerConfig.ID

cfg.ingesterClientFactory = func(cfg client.Config, addr string) (grpc_health_v1.HealthClient, error) {
cfg.ingesterClientFactory = func(cfg client.Config, addr string) (client.HealthAndIngesterClient, error) {
ingester, ok := f.ingesters[addr]
if !ok {
return nil, fmt.Errorf("no ingester %s", addr)
}

return struct {
logproto.PusherClient
logproto.QuerierClient
logproto.IngesterClient
grpc_health_v1.HealthClient
io.Closer
}{
return client.ClosableHealthAndIngesterClient{
PusherClient: nil,
QuerierClient: nil,
IngesterClient: &testIngesterClient{t: f.t, i: ingester},
Expand Down
13 changes: 7 additions & 6 deletions pkg/querier/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,18 @@ import (
"net/http"
"time"

"github.com/cortexproject/cortex/pkg/util/services"
"github.com/go-kit/kit/log/level"
"github.com/pkg/errors"
"github.com/prometheus/common/model"
"github.com/weaveworks/common/httpgrpc"
"github.com/weaveworks/common/user"
"google.golang.org/grpc/health/grpc_health_v1"

cortex_client "github.com/cortexproject/cortex/pkg/ingester/client"
"github.com/cortexproject/cortex/pkg/distributor"
"github.com/cortexproject/cortex/pkg/ring"
ring_client "github.com/cortexproject/cortex/pkg/ring/client"
"github.com/cortexproject/cortex/pkg/util"
"github.com/cortexproject/cortex/pkg/util/services"
cortex_validation "github.com/cortexproject/cortex/pkg/util/validation"

"github.com/grafana/loki/pkg/ingester/client"
Expand Down Expand Up @@ -59,15 +60,15 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
type Querier struct {
cfg Config
ring ring.ReadRing
pool *cortex_client.Pool
pool *ring_client.Pool
store storage.Store
engine logql.Engine
limits *validation.Overrides
}

// New makes a new Querier.
func New(cfg Config, clientCfg client.Config, ring ring.ReadRing, store storage.Store, limits *validation.Overrides) (*Querier, error) {
factory := func(addr string) (grpc_health_v1.HealthClient, error) {
factory := func(addr string) (ring_client.PoolClient, error) {
return client.New(clientCfg, addr)
}

Expand All @@ -76,11 +77,11 @@ func New(cfg Config, clientCfg client.Config, ring ring.ReadRing, store storage.

// newQuerier creates a new Querier and allows to pass a custom ingester client factory
// used for testing purposes
func newQuerier(cfg Config, clientCfg client.Config, clientFactory cortex_client.Factory, ring ring.ReadRing, store storage.Store, limits *validation.Overrides) (*Querier, error) {
func newQuerier(cfg Config, clientCfg client.Config, clientFactory ring_client.PoolFactory, ring ring.ReadRing, store storage.Store, limits *validation.Overrides) (*Querier, error) {
querier := Querier{
cfg: cfg,
ring: ring,
pool: cortex_client.NewPool(clientCfg.PoolConfig, ring, clientFactory, util.Logger),
pool: distributor.NewPool(clientCfg.PoolConfig, ring, clientFactory, util.Logger),
store: store,
limits: limits,
}
Expand Down
17 changes: 11 additions & 6 deletions pkg/querier/querier_mock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,12 @@ import (
"fmt"
"time"

"github.com/cortexproject/cortex/pkg/util/grpcclient"

"github.com/cortexproject/cortex/pkg/chunk"
cortex_client "github.com/cortexproject/cortex/pkg/ingester/client"
"github.com/cortexproject/cortex/pkg/distributor"
"github.com/cortexproject/cortex/pkg/ring"
ring_client "github.com/cortexproject/cortex/pkg/ring/client"
"github.com/cortexproject/cortex/pkg/util/grpcclient"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/pkg/labels"
Expand Down Expand Up @@ -70,18 +71,22 @@ func (c *querierClientMock) Context() context.Context {
return context.Background()
}

func (c *querierClientMock) Close() error {
return nil
}

// newIngesterClientMockFactory creates a factory function always returning
// the input querierClientMock
func newIngesterClientMockFactory(c *querierClientMock) cortex_client.Factory {
return func(addr string) (grpc_health_v1.HealthClient, error) {
func newIngesterClientMockFactory(c *querierClientMock) ring_client.PoolFactory {
return func(addr string) (ring_client.PoolClient, error) {
return c, nil
}
}

// mockIngesterClientConfig returns an ingester client config suitable for testing
func mockIngesterClientConfig() client.Config {
return client.Config{
PoolConfig: cortex_client.PoolConfig{
PoolConfig: distributor.PoolConfig{
ClientCleanupPeriod: 1 * time.Minute,
HealthCheckIngesters: false,
RemoteTimeout: 1 * time.Second,
Expand Down
4 changes: 2 additions & 2 deletions pkg/querier/queryrange/roundtrip_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@ var (
CacheConfig: cache.Config{
EnableFifoCache: true,
Fifocache: cache.FifoCacheConfig{
Size: 1024,
Validity: 24 * time.Hour,
MaxSizeItems: 1024,
Validity: 24 * time.Hour,
},
},
},
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading