Skip to content

Commit

Permalink
Use kv package from github.com/grafana/dskit (cortexproject#4436)
Browse files Browse the repository at this point in the history
Signed-off-by: Arve Knudsen <arve.knudsen@gmail.com>
  • Loading branch information
aknuds1 authored Aug 27, 2021
1 parent 6f67beb commit 32b1b40
Show file tree
Hide file tree
Showing 86 changed files with 756 additions and 2,479 deletions.
4 changes: 1 addition & 3 deletions docs/configuration/config-file-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -3810,9 +3810,7 @@ The `memberlist_config` configures the Gossip memberlist.
[compression_enabled: <boolean> | default = true]
# Other cluster members to join. Can be specified multiple times. It can be an
# IP, hostname or an entry specified in the DNS Service Discovery format (see
# https://cortexmetrics.io/docs/configuration/arguments/#dns-service-discovery
# for more details).
# IP, hostname or an entry specified in the DNS Service Discovery format.
# CLI flag: -memberlist.join
[join_members: <list of string> | default = []]
Expand Down
10 changes: 1 addition & 9 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ require (
github.com/NYTimes/gziphandler v1.1.1
github.com/alecthomas/units v0.0.0-20210208195552-ff826a37aa15
github.com/alicebob/miniredis/v2 v2.14.3
github.com/armon/go-metrics v0.3.6
github.com/aws/aws-sdk-go v1.38.68
github.com/bradfitz/gomemcache v0.0.0-20190913173617-a41fca850d0b
github.com/cespare/xxhash v1.1.0
Expand All @@ -30,12 +29,8 @@ require (
github.com/golang/protobuf v1.5.2
github.com/golang/snappy v0.0.4
github.com/gorilla/mux v1.8.0
github.com/grafana/dskit v0.0.0-20210824090727-039d9afd9208
github.com/grafana/dskit v0.0.0-20210827060659-9daca2f00327
github.com/grpc-ecosystem/go-grpc-middleware v1.3.0
github.com/hashicorp/consul/api v1.9.1
github.com/hashicorp/go-cleanhttp v0.5.1
github.com/hashicorp/go-sockaddr v1.0.2
github.com/hashicorp/memberlist v0.2.3
github.com/json-iterator/go v1.1.11
github.com/lib/pq v1.3.0
github.com/minio/minio-go/v7 v7.0.10
Expand All @@ -59,9 +54,6 @@ require (
github.com/uber/jaeger-client-go v2.29.1+incompatible
github.com/weaveworks/common v0.0.0-20210722103813-e649eff5ab4a
go.etcd.io/bbolt v1.3.6
go.etcd.io/etcd v3.3.25+incompatible
go.etcd.io/etcd/client/v3 v3.5.0
go.etcd.io/etcd/server/v3 v3.5.0
go.uber.org/atomic v1.9.0
golang.org/x/net v0.0.0-20210610132358-84b48f89b13b
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -962,8 +962,8 @@ github.com/gorilla/websocket v0.0.0-20170926233335-4201258b820c/go.mod h1:E7qHFY
github.com/gorilla/websocket v1.4.0/go.mod h1:E7qHFY5m1UJ88s3WnNqhKjPHQ0heANvMoAMk2YaljkQ=
github.com/gorilla/websocket v1.4.2 h1:+/TMaTYc4QFitKJxsQ7Yye35DkWvkdLcvGKqM+x0Ufc=
github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/grafana/dskit v0.0.0-20210824090727-039d9afd9208 h1:Yc+q7s/wcyd8bvPTQygY1iQTxxr931cLISsRhYltrJM=
github.com/grafana/dskit v0.0.0-20210824090727-039d9afd9208/go.mod h1:uF46UNN1/feB1egpq8UGbBBKvJjGgZauW7pcVbeFLLM=
github.com/grafana/dskit v0.0.0-20210827060659-9daca2f00327 h1:THdW9RnugPdLwW8RmHB/xOcKf267QunSH1mDuaJkhWk=
github.com/grafana/dskit v0.0.0-20210827060659-9daca2f00327/go.mod h1:+T2iuDOzx/BSQJSvli9FUvLM5HnV8aDPmXM8KWuVj3M=
github.com/grafana/gocql v0.0.0-20200605141915-ba5dc39ece85 h1:xLuzPoOzdfNb/RF/IENCw+oLVdZB4G21VPhkHBgwSHY=
github.com/grafana/gocql v0.0.0-20200605141915-ba5dc39ece85/go.mod h1:crI9WX6p0IhrqB+DqIUHulRW853PaNFf7o4UprV//3I=
github.com/gregjones/httpcache v0.0.0-20180305231024-9cad4c3443a7/go.mod h1:FecbI9+v66THATjSRHfNgh1IVFe/9kFxbXtjV0ctIMA=
Expand Down
6 changes: 6 additions & 0 deletions integration/getting_started_with_gossiped_ring_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,4 +125,10 @@ func TestGettingStartedWithGossipedRing(t *testing.T) {
// single ingester and so we have 1 block shipped from ingesters and loaded by both store-gateways.
require.NoError(t, cortex1.WaitSumMetrics(e2e.Equals(1), "cortex_bucket_store_blocks_loaded"))
require.NoError(t, cortex2.WaitSumMetrics(e2e.Equals(1), "cortex_bucket_store_blocks_loaded"))

// Make sure that no DNS failures occurred.
// No actual DNS lookups are necessarily performed, so we can't really assert on that.
mlMatcher := labels.MustNewMatcher(labels.MatchEqual, "name", "memberlist")
require.NoError(t, cortex1.WaitSumMetricsWithOptions(e2e.Equals(0), []string{"cortex_dns_failures_total"}, e2e.WithLabelMatchers(mlMatcher)))
require.NoError(t, cortex2.WaitSumMetricsWithOptions(e2e.Equals(0), []string{"cortex_dns_failures_total"}, e2e.WithLabelMatchers(mlMatcher)))
}
25 changes: 15 additions & 10 deletions integration/kv_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,17 @@ import (
"testing"
"time"

"github.com/go-kit/kit/log"
"github.com/grafana/dskit/kv"
"github.com/grafana/dskit/kv/consul"
"github.com/grafana/dskit/kv/etcd"
"github.com/prometheus/client_golang/prometheus"
dto "github.com/prometheus/client_model/go"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/cortexproject/cortex/integration/e2e"
e2edb "github.com/cortexproject/cortex/integration/e2e/db"
"github.com/cortexproject/cortex/pkg/ring/kv"
"github.com/cortexproject/cortex/pkg/ring/kv/consul"
"github.com/cortexproject/cortex/pkg/ring/kv/etcd"
)

func TestKVList(t *testing.T) {
Expand Down Expand Up @@ -117,7 +118,9 @@ func TestKVWatchAndDelete(t *testing.T) {
})
}

func setupEtcd(t *testing.T, scenario *e2e.Scenario, reg prometheus.Registerer) kv.Client {
func setupEtcd(t *testing.T, scenario *e2e.Scenario, reg prometheus.Registerer, logger log.Logger) kv.Client {
t.Helper()

etcdSvc := e2edb.NewETCD()
require.NoError(t, scenario.StartAndWaitReady(etcdSvc))

Expand All @@ -131,13 +134,15 @@ func setupEtcd(t *testing.T, scenario *e2e.Scenario, reg prometheus.Registerer)
MaxRetries: 5,
},
},
}, stringCodec{}, reg)
}, stringCodec{}, reg, logger)
require.NoError(t, err)

return etcdKv
}

func setupConsul(t *testing.T, scenario *e2e.Scenario, reg prometheus.Registerer) kv.Client {
func setupConsul(t *testing.T, scenario *e2e.Scenario, reg prometheus.Registerer, logger log.Logger) kv.Client {
t.Helper()

consulSvc := e2edb.NewConsul()
require.NoError(t, scenario.StartAndWaitReady(consulSvc))

Expand All @@ -152,14 +157,14 @@ func setupConsul(t *testing.T, scenario *e2e.Scenario, reg prometheus.Registerer
WatchKeyRateLimit: 1,
},
},
}, stringCodec{}, reg)
}, stringCodec{}, reg, logger)
require.NoError(t, err)

return consulKv
}

func testKVs(t *testing.T, testFn func(t *testing.T, client kv.Client, reg *prometheus.Registry)) {
setupFns := map[string]func(t *testing.T, scenario *e2e.Scenario, reg prometheus.Registerer) kv.Client{
setupFns := map[string]func(t *testing.T, scenario *e2e.Scenario, reg prometheus.Registerer, logger log.Logger) kv.Client{
"etcd": setupEtcd,
"consul": setupConsul,
}
Expand All @@ -171,13 +176,13 @@ func testKVs(t *testing.T, testFn func(t *testing.T, client kv.Client, reg *prom
}
}

func testKVScenario(t *testing.T, kvSetupFn func(t *testing.T, scenario *e2e.Scenario, reg prometheus.Registerer) kv.Client, testFn func(t *testing.T, client kv.Client, reg *prometheus.Registry)) {
func testKVScenario(t *testing.T, kvSetupFn func(t *testing.T, scenario *e2e.Scenario, reg prometheus.Registerer, logger log.Logger) kv.Client, testFn func(t *testing.T, client kv.Client, reg *prometheus.Registry)) {
s, err := e2e.NewScenario(networkName)
require.NoError(t, err)
defer s.Close()

reg := prometheus.NewRegistry()
client := kvSetupFn(t, s, reg)
client := kvSetupFn(t, s, prometheus.WrapRegistererWithPrefix("cortex_", reg), log.NewNopLogger())
testFn(t, client, reg)
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/alertmanager/alertmanager_ring.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@ import (

"github.com/go-kit/kit/log/level"
"github.com/grafana/dskit/flagext"
"github.com/grafana/dskit/kv"

"github.com/cortexproject/cortex/pkg/ring"
"github.com/cortexproject/cortex/pkg/ring/kv"
util_log "github.com/cortexproject/cortex/pkg/util/log"
)

Expand Down
5 changes: 2 additions & 3 deletions pkg/alertmanager/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,14 @@ import (
"github.com/prometheus/client_golang/prometheus"
commoncfg "github.com/prometheus/common/config"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/thanos-io/thanos/pkg/objstore"
"github.com/weaveworks/common/user"
"gopkg.in/yaml.v2"

"github.com/cortexproject/cortex/pkg/alertmanager/alertspb"
"github.com/cortexproject/cortex/pkg/alertmanager/alertstore/bucketclient"
util_log "github.com/cortexproject/cortex/pkg/util/log"

"github.com/stretchr/testify/require"
"github.com/weaveworks/common/user"
)

func TestAMConfigValidationAPI(t *testing.T) {
Expand Down
21 changes: 12 additions & 9 deletions pkg/alertmanager/distributor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,22 +13,23 @@ import (
"testing"
"time"

"github.com/go-kit/kit/log"
"github.com/grafana/dskit/flagext"

"github.com/cortexproject/cortex/pkg/alertmanager/alertmanagerpb"
"github.com/cortexproject/cortex/pkg/ring"
"github.com/cortexproject/cortex/pkg/ring/kv"
"github.com/cortexproject/cortex/pkg/ring/kv/consul"
util_log "github.com/cortexproject/cortex/pkg/util/log"
"github.com/cortexproject/cortex/pkg/util/test"

"github.com/grafana/dskit/kv"
"github.com/grafana/dskit/kv/consul"
"github.com/grafana/dskit/services"
"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/weaveworks/common/httpgrpc"
"github.com/weaveworks/common/user"
"google.golang.org/grpc"
"google.golang.org/grpc/health/grpc_health_v1"

"github.com/cortexproject/cortex/pkg/alertmanager/alertmanagerpb"
"github.com/cortexproject/cortex/pkg/ring"
util_log "github.com/cortexproject/cortex/pkg/util/log"
"github.com/cortexproject/cortex/pkg/util/test"
)

func TestDistributor_DistributeRequest(t *testing.T) {
Expand Down Expand Up @@ -336,7 +337,9 @@ func prepare(t *testing.T, numAM, numHappyAM, replicationFactor int, responseBod
amByAddr[a.myAddr] = ams[i]
}

kvStore := consul.NewInMemoryClient(ring.GetCodec())
kvStore, closer := consul.NewInMemoryClient(ring.GetCodec(), log.NewNopLogger())
t.Cleanup(func() { assert.NoError(t, closer.Close()) })

err := kvStore.CAS(context.Background(), RingKey,
func(_ interface{}) (interface{}, bool, error) {
return &ring.Desc{
Expand Down
5 changes: 3 additions & 2 deletions pkg/alertmanager/multitenant.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/grafana/dskit/flagext"
"github.com/grafana/dskit/kv"
"github.com/grafana/dskit/services"
"github.com/pkg/errors"
"github.com/prometheus/alertmanager/cluster"
Expand All @@ -34,7 +35,6 @@ import (
"github.com/cortexproject/cortex/pkg/alertmanager/alertstore"
"github.com/cortexproject/cortex/pkg/ring"
"github.com/cortexproject/cortex/pkg/ring/client"
"github.com/cortexproject/cortex/pkg/ring/kv"
"github.com/cortexproject/cortex/pkg/tenant"
"github.com/cortexproject/cortex/pkg/util"
"github.com/cortexproject/cortex/pkg/util/concurrency"
Expand Down Expand Up @@ -344,7 +344,8 @@ func NewMultitenantAlertmanager(cfg *MultitenantAlertmanagerConfig, store alerts
ringStore, err = kv.NewClient(
cfg.ShardingRing.KVStore,
ring.GetCodec(),
kv.RegistererWithKVName(registerer, "alertmanager"),
kv.RegistererWithKVName(prometheus.WrapRegistererWithPrefix("cortex_", registerer), "alertmanager"),
logger,
)
if err != nil {
return nil, errors.Wrap(err, "create KV store client")
Expand Down
39 changes: 28 additions & 11 deletions pkg/alertmanager/multitenant_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (

"github.com/go-kit/kit/log"
"github.com/grafana/dskit/flagext"
"github.com/grafana/dskit/kv/consul"
"github.com/grafana/dskit/services"
"github.com/prometheus/alertmanager/cluster/clusterpb"
"github.com/prometheus/alertmanager/notify"
Expand All @@ -43,7 +44,6 @@ import (
"github.com/cortexproject/cortex/pkg/alertmanager/alertstore"
"github.com/cortexproject/cortex/pkg/alertmanager/alertstore/bucketclient"
"github.com/cortexproject/cortex/pkg/ring"
"github.com/cortexproject/cortex/pkg/ring/kv/consul"
"github.com/cortexproject/cortex/pkg/storage/bucket"
"github.com/cortexproject/cortex/pkg/util"
"github.com/cortexproject/cortex/pkg/util/concurrency"
Expand Down Expand Up @@ -611,7 +611,9 @@ func TestMultitenantAlertmanager_deleteUnusedLocalUserState(t *testing.T) {
func TestMultitenantAlertmanager_zoneAwareSharding(t *testing.T) {
ctx := context.Background()
alertStore := prepareInMemoryAlertStore()
ringStore := consul.NewInMemoryClient(ring.GetCodec())
ringStore, closer := consul.NewInMemoryClient(ring.GetCodec(), log.NewNopLogger())
t.Cleanup(func() { assert.NoError(t, closer.Close()) })

const (
user1 = "user1"
user2 = "user2"
Expand Down Expand Up @@ -689,7 +691,8 @@ func TestMultitenantAlertmanager_deleteUnusedRemoteUserState(t *testing.T) {
)

alertStore := prepareInMemoryAlertStore()
ringStore := consul.NewInMemoryClient(ring.GetCodec())
ringStore, closer := consul.NewInMemoryClient(ring.GetCodec(), log.NewNopLogger())
t.Cleanup(func() { assert.NoError(t, closer.Close()) })

createInstance := func(i int) *MultitenantAlertmanager {
reg := prometheus.NewPedanticRegistry()
Expand Down Expand Up @@ -1005,7 +1008,8 @@ func TestMultitenantAlertmanager_InitialSyncWithSharding(t *testing.T) {
ctx := context.Background()
amConfig := mockAlertmanagerConfig(t)
amConfig.ShardingEnabled = true
ringStore := consul.NewInMemoryClient(ring.GetCodec())
ringStore, closer := consul.NewInMemoryClient(ring.GetCodec(), log.NewNopLogger())
t.Cleanup(func() { assert.NoError(t, closer.Close()) })

// Use an alert store with a mocked backend.
bkt := &bucket.ClientMock{}
Expand Down Expand Up @@ -1109,7 +1113,9 @@ func TestMultitenantAlertmanager_PerTenantSharding(t *testing.T) {
for _, tt := range tc {
t.Run(tt.name, func(t *testing.T) {
ctx := context.Background()
ringStore := consul.NewInMemoryClient(ring.GetCodec())
ringStore, closer := consul.NewInMemoryClient(ring.GetCodec(), log.NewNopLogger())
t.Cleanup(func() { assert.NoError(t, closer.Close()) })

alertStore := prepareInMemoryAlertStore()

var instances []*MultitenantAlertmanager
Expand Down Expand Up @@ -1296,7 +1302,9 @@ func TestMultitenantAlertmanager_SyncOnRingTopologyChanges(t *testing.T) {
amConfig.ShardingRing.RingCheckPeriod = 100 * time.Millisecond
amConfig.PollInterval = time.Hour // Don't trigger the periodic check.

ringStore := consul.NewInMemoryClient(ring.GetCodec())
ringStore, closer := consul.NewInMemoryClient(ring.GetCodec(), log.NewNopLogger())
t.Cleanup(func() { assert.NoError(t, closer.Close()) })

alertStore := prepareInMemoryAlertStore()

reg := prometheus.NewPedanticRegistry()
Expand Down Expand Up @@ -1347,7 +1355,9 @@ func TestMultitenantAlertmanager_RingLifecyclerShouldAutoForgetUnhealthyInstance
amConfig.ShardingRing.HeartbeatPeriod = 100 * time.Millisecond
amConfig.ShardingRing.HeartbeatTimeout = heartbeatTimeout

ringStore := consul.NewInMemoryClient(ring.GetCodec())
ringStore, closer := consul.NewInMemoryClient(ring.GetCodec(), log.NewNopLogger())
t.Cleanup(func() { assert.NoError(t, closer.Close()) })

alertStore := prepareInMemoryAlertStore()

am, err := createMultitenantAlertmanager(amConfig, nil, nil, alertStore, ringStore, nil, log.NewNopLogger(), nil)
Expand Down Expand Up @@ -1379,7 +1389,8 @@ func TestMultitenantAlertmanager_InitialSyncFailureWithSharding(t *testing.T) {
ctx := context.Background()
amConfig := mockAlertmanagerConfig(t)
amConfig.ShardingEnabled = true
ringStore := consul.NewInMemoryClient(ring.GetCodec())
ringStore, closer := consul.NewInMemoryClient(ring.GetCodec(), log.NewNopLogger())
t.Cleanup(func() { assert.NoError(t, closer.Close()) })

// Mock the store to fail listing configs.
bkt := &bucket.ClientMock{}
Expand All @@ -1401,7 +1412,9 @@ func TestMultitenantAlertmanager_InitialSyncFailureWithSharding(t *testing.T) {

func TestAlertmanager_ReplicasPosition(t *testing.T) {
ctx := context.Background()
ringStore := consul.NewInMemoryClient(ring.GetCodec())
ringStore, closer := consul.NewInMemoryClient(ring.GetCodec(), log.NewNopLogger())
t.Cleanup(func() { assert.NoError(t, closer.Close()) })

mockStore := prepareInMemoryAlertStore()
require.NoError(t, mockStore.SetAlertConfig(ctx, alertspb.AlertConfigDesc{
User: "user-1",
Expand Down Expand Up @@ -1500,7 +1513,9 @@ func TestAlertmanager_StateReplicationWithSharding(t *testing.T) {
for _, tt := range tc {
t.Run(tt.name, func(t *testing.T) {
ctx := context.Background()
ringStore := consul.NewInMemoryClient(ring.GetCodec())
ringStore, closer := consul.NewInMemoryClient(ring.GetCodec(), log.NewNopLogger())
t.Cleanup(func() { assert.NoError(t, closer.Close()) })

mockStore := prepareInMemoryAlertStore()
clientPool := newPassthroughAlertmanagerClientPool()
externalURL := flagext.URLValue{}
Expand Down Expand Up @@ -1693,7 +1708,9 @@ func TestAlertmanager_StateReplicationWithSharding_InitialSyncFromPeers(t *testi
for _, tt := range tc {
t.Run(tt.name, func(t *testing.T) {
ctx := context.Background()
ringStore := consul.NewInMemoryClient(ring.GetCodec())
ringStore, closer := consul.NewInMemoryClient(ring.GetCodec(), log.NewNopLogger())
t.Cleanup(func() { assert.NoError(t, closer.Close()) })

mockStore := prepareInMemoryAlertStore()
clientPool := newPassthroughAlertmanagerClientPool()
externalURL := flagext.URLValue{}
Expand Down
2 changes: 0 additions & 2 deletions pkg/api/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ func TestNewApiWithoutSourceIPExtractor(t *testing.T) {
require.NoError(t, err)

api, err := New(cfg, serverCfg, server, &FakeLogger{})

require.NoError(t, err)
require.Nil(t, api.sourceIPs)
}
Expand All @@ -40,7 +39,6 @@ func TestNewApiWithSourceIPExtractor(t *testing.T) {
require.NoError(t, err)

api, err := New(cfg, serverCfg, server, &FakeLogger{})

require.NoError(t, err)
require.NotNil(t, api.sourceIPs)
}
Expand Down
3 changes: 1 addition & 2 deletions pkg/chunk/purger/purger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,10 @@ import (
"testing"
"time"

"github.com/prometheus/client_golang/prometheus/testutil"

"github.com/grafana/dskit/flagext"
"github.com/grafana/dskit/services"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/testutil"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/promql/parser"
"github.com/stretchr/testify/require"
Expand Down
Loading

0 comments on commit 32b1b40

Please sign in to comment.