diff --git a/go.mod b/go.mod index e5eb85da5f..cde7daf471 100644 --- a/go.mod +++ b/go.mod @@ -11,6 +11,7 @@ require ( github.com/NYTimes/gziphandler v1.1.1 github.com/alecthomas/units v0.0.0-20210927113745-59d0afb8317a github.com/alicebob/miniredis/v2 v2.14.3 + github.com/armon/go-metrics v0.3.9 github.com/aws/aws-sdk-go v1.42.8 github.com/bradfitz/gomemcache v0.0.0-20190913173617-a41fca850d0b github.com/cespare/xxhash v1.1.0 @@ -28,9 +29,14 @@ require ( github.com/golang-migrate/migrate/v4 v4.7.0 github.com/golang/protobuf v1.5.2 github.com/golang/snappy v0.0.4 + github.com/google/btree v1.0.1 // indirect github.com/gorilla/mux v1.8.0 github.com/grafana/dskit v0.0.0-20220105080720-01ce9286d7d5 github.com/grpc-ecosystem/go-grpc-middleware v1.3.0 + github.com/hashicorp/consul/api v1.11.0 + github.com/hashicorp/go-cleanhttp v0.5.2 + github.com/hashicorp/go-sockaddr v1.0.2 + github.com/hashicorp/memberlist v0.2.4 github.com/json-iterator/go v1.1.12 github.com/lib/pq v1.3.0 github.com/minio/minio-go/v7 v7.0.10 @@ -54,6 +60,9 @@ require ( github.com/uber/jaeger-client-go v2.29.1+incompatible github.com/weaveworks/common v0.0.0-20210913144402-035033b78a78 go.etcd.io/bbolt v1.3.6 + go.etcd.io/etcd v3.3.25+incompatible + go.etcd.io/etcd/api/v3 v3.5.0 + go.etcd.io/etcd/client/v3 v3.5.0 go.uber.org/atomic v1.9.0 golang.org/x/net v0.0.0-20211020060615-d418f374d309 golang.org/x/sync v0.0.0-20210220032951-036812b2e83c diff --git a/go.sum b/go.sum index b54d4510ed..b211f71149 100644 --- a/go.sum +++ b/go.sum @@ -778,8 +778,9 @@ github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM= github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/golangci/lint-1 v0.0.0-20181222135242-d2cdd8c08219/go.mod h1:/X8TswGSh1pIozq4ZwCfxS0WA5JGXguxk94ar/4c87Y= github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ= -github.com/google/btree v1.0.0 h1:0udJVsspx3VBr5FwtLhQQtuAsVc79tTq0ocGIPAU6qo= github.com/google/btree v1.0.0/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ= +github.com/google/btree v1.0.1 h1:gK4Kx5IaGY9CD5sPJ36FHiBJ6ZXl0kilRiiCj+jdYp4= +github.com/google/btree v1.0.1/go.mod h1:xXMiIv4Fb/0kKde4SpL7qlzvu5cMJDRkFDxJfI9uaxA= github.com/google/flatbuffers v1.11.0/go.mod h1:1AeVuKshWv4vARoZatz6mlQ0JxURH0Kv5+zNeJKJCa8= github.com/google/flatbuffers v2.0.0+incompatible/go.mod h1:1AeVuKshWv4vARoZatz6mlQ0JxURH0Kv5+zNeJKJCa8= github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M= diff --git a/integration/kv_test.go b/integration/kv_test.go index e1d4e5c9e1..e55cb5a0cb 100644 --- a/integration/kv_test.go +++ b/integration/kv_test.go @@ -12,9 +12,6 @@ import ( "time" "github.com/go-kit/log" - "github.com/grafana/dskit/kv" - "github.com/grafana/dskit/kv/consul" - "github.com/grafana/dskit/kv/etcd" "github.com/prometheus/client_golang/prometheus" dto "github.com/prometheus/client_model/go" "github.com/stretchr/testify/assert" @@ -22,6 +19,9 @@ import ( "github.com/cortexproject/cortex/integration/e2e" e2edb "github.com/cortexproject/cortex/integration/e2e/db" + "github.com/cortexproject/cortex/pkg/ring/kv" + "github.com/cortexproject/cortex/pkg/ring/kv/consul" + "github.com/cortexproject/cortex/pkg/ring/kv/etcd" ) func TestKVList(t *testing.T) { diff --git a/pkg/alertmanager/alertmanager_ring.go b/pkg/alertmanager/alertmanager_ring.go index 93daf36850..7e033d7dba 100644 --- a/pkg/alertmanager/alertmanager_ring.go +++ b/pkg/alertmanager/alertmanager_ring.go @@ -9,9 +9,9 @@ import ( "github.com/go-kit/log" "github.com/go-kit/log/level" "github.com/grafana/dskit/flagext" - "github.com/grafana/dskit/kv" "github.com/cortexproject/cortex/pkg/ring" + "github.com/cortexproject/cortex/pkg/ring/kv" util_log "github.com/cortexproject/cortex/pkg/util/log" ) diff --git a/pkg/alertmanager/distributor_test.go b/pkg/alertmanager/distributor_test.go index 304ceebea6..421d4fc9d0 100644 --- a/pkg/alertmanager/distributor_test.go +++ b/pkg/alertmanager/distributor_test.go @@ -15,8 +15,6 @@ import ( "github.com/go-kit/log" "github.com/grafana/dskit/flagext" - "github.com/grafana/dskit/kv" - "github.com/grafana/dskit/kv/consul" "github.com/grafana/dskit/services" "github.com/prometheus/client_golang/prometheus" "github.com/stretchr/testify/assert" @@ -28,6 +26,8 @@ import ( "github.com/cortexproject/cortex/pkg/alertmanager/alertmanagerpb" "github.com/cortexproject/cortex/pkg/ring" + "github.com/cortexproject/cortex/pkg/ring/kv" + "github.com/cortexproject/cortex/pkg/ring/kv/consul" util_log "github.com/cortexproject/cortex/pkg/util/log" "github.com/cortexproject/cortex/pkg/util/test" ) diff --git a/pkg/alertmanager/multitenant.go b/pkg/alertmanager/multitenant.go index 947efcf565..8fcd0acc39 100644 --- a/pkg/alertmanager/multitenant.go +++ b/pkg/alertmanager/multitenant.go @@ -16,7 +16,6 @@ import ( "github.com/go-kit/log" "github.com/go-kit/log/level" "github.com/grafana/dskit/flagext" - "github.com/grafana/dskit/kv" "github.com/grafana/dskit/services" "github.com/pkg/errors" "github.com/prometheus/alertmanager/cluster" @@ -35,6 +34,7 @@ import ( "github.com/cortexproject/cortex/pkg/alertmanager/alertstore" "github.com/cortexproject/cortex/pkg/ring" "github.com/cortexproject/cortex/pkg/ring/client" + "github.com/cortexproject/cortex/pkg/ring/kv" "github.com/cortexproject/cortex/pkg/tenant" "github.com/cortexproject/cortex/pkg/util" "github.com/cortexproject/cortex/pkg/util/concurrency" diff --git a/pkg/alertmanager/multitenant_test.go b/pkg/alertmanager/multitenant_test.go index d036f3313f..0ff63f3556 100644 --- a/pkg/alertmanager/multitenant_test.go +++ b/pkg/alertmanager/multitenant_test.go @@ -21,7 +21,6 @@ import ( "github.com/go-kit/log" "github.com/grafana/dskit/flagext" - "github.com/grafana/dskit/kv/consul" "github.com/grafana/dskit/services" "github.com/prometheus/alertmanager/cluster/clusterpb" "github.com/prometheus/alertmanager/notify" @@ -44,6 +43,7 @@ import ( "github.com/cortexproject/cortex/pkg/alertmanager/alertstore" "github.com/cortexproject/cortex/pkg/alertmanager/alertstore/bucketclient" "github.com/cortexproject/cortex/pkg/ring" + "github.com/cortexproject/cortex/pkg/ring/kv/consul" "github.com/cortexproject/cortex/pkg/storage/bucket" "github.com/cortexproject/cortex/pkg/util" "github.com/cortexproject/cortex/pkg/util/concurrency" diff --git a/pkg/compactor/compactor_ring.go b/pkg/compactor/compactor_ring.go index 28b20e6734..7b14b1a5b6 100644 --- a/pkg/compactor/compactor_ring.go +++ b/pkg/compactor/compactor_ring.go @@ -7,9 +7,9 @@ import ( "github.com/go-kit/log/level" "github.com/grafana/dskit/flagext" - "github.com/grafana/dskit/kv" "github.com/cortexproject/cortex/pkg/ring" + "github.com/cortexproject/cortex/pkg/ring/kv" util_log "github.com/cortexproject/cortex/pkg/util/log" ) diff --git a/pkg/compactor/compactor_test.go b/pkg/compactor/compactor_test.go index 66d4dc4e0c..ad74f8f1ec 100644 --- a/pkg/compactor/compactor_test.go +++ b/pkg/compactor/compactor_test.go @@ -19,7 +19,6 @@ import ( "github.com/go-kit/log" "github.com/grafana/dskit/flagext" - "github.com/grafana/dskit/kv/consul" "github.com/grafana/dskit/services" "github.com/oklog/ulid" "github.com/pkg/errors" @@ -36,6 +35,7 @@ import ( "gopkg.in/yaml.v2" "github.com/cortexproject/cortex/pkg/ring" + "github.com/cortexproject/cortex/pkg/ring/kv/consul" "github.com/cortexproject/cortex/pkg/storage/bucket" cortex_tsdb "github.com/cortexproject/cortex/pkg/storage/tsdb" "github.com/cortexproject/cortex/pkg/util/concurrency" diff --git a/pkg/cortex/cortex.go b/pkg/cortex/cortex.go index 0508658005..06256f3319 100644 --- a/pkg/cortex/cortex.go +++ b/pkg/cortex/cortex.go @@ -13,7 +13,6 @@ import ( "github.com/go-kit/log" "github.com/go-kit/log/level" "github.com/grafana/dskit/flagext" - "github.com/grafana/dskit/kv/memberlist" "github.com/grafana/dskit/services" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" @@ -48,6 +47,7 @@ import ( "github.com/cortexproject/cortex/pkg/querier/tenantfederation" querier_worker "github.com/cortexproject/cortex/pkg/querier/worker" "github.com/cortexproject/cortex/pkg/ring" + "github.com/cortexproject/cortex/pkg/ring/kv/memberlist" "github.com/cortexproject/cortex/pkg/ruler" "github.com/cortexproject/cortex/pkg/ruler/rulestore" "github.com/cortexproject/cortex/pkg/scheduler" diff --git a/pkg/cortex/cortex_test.go b/pkg/cortex/cortex_test.go index 7c7c2c615c..e19e9e91e3 100644 --- a/pkg/cortex/cortex_test.go +++ b/pkg/cortex/cortex_test.go @@ -13,7 +13,6 @@ import ( "time" "github.com/grafana/dskit/flagext" - "github.com/grafana/dskit/kv" "github.com/grafana/dskit/services" "github.com/prometheus/client_golang/prometheus" "github.com/stretchr/testify/assert" @@ -27,6 +26,7 @@ import ( "github.com/cortexproject/cortex/pkg/frontend/v1/frontendv1pb" "github.com/cortexproject/cortex/pkg/ingester" "github.com/cortexproject/cortex/pkg/ring" + "github.com/cortexproject/cortex/pkg/ring/kv" "github.com/cortexproject/cortex/pkg/ruler" "github.com/cortexproject/cortex/pkg/scheduler/schedulerpb" "github.com/cortexproject/cortex/pkg/storage/bucket" diff --git a/pkg/cortex/modules.go b/pkg/cortex/modules.go index 342f3a299c..20d97a14c1 100644 --- a/pkg/cortex/modules.go +++ b/pkg/cortex/modules.go @@ -9,8 +9,6 @@ import ( "time" "github.com/go-kit/log/level" - "github.com/grafana/dskit/kv/codec" - "github.com/grafana/dskit/kv/memberlist" "github.com/grafana/dskit/services" "github.com/opentracing-contrib/go-stdlib/nethttp" "github.com/opentracing/opentracing-go" @@ -42,6 +40,8 @@ import ( "github.com/cortexproject/cortex/pkg/querier/tenantfederation" querier_worker "github.com/cortexproject/cortex/pkg/querier/worker" "github.com/cortexproject/cortex/pkg/ring" + "github.com/cortexproject/cortex/pkg/ring/kv/codec" + "github.com/cortexproject/cortex/pkg/ring/kv/memberlist" "github.com/cortexproject/cortex/pkg/ruler" "github.com/cortexproject/cortex/pkg/scheduler" "github.com/cortexproject/cortex/pkg/storegateway" diff --git a/pkg/cortex/runtime_config.go b/pkg/cortex/runtime_config.go index 4acbbd5160..150c25727b 100644 --- a/pkg/cortex/runtime_config.go +++ b/pkg/cortex/runtime_config.go @@ -5,10 +5,10 @@ import ( "io" "net/http" - "github.com/grafana/dskit/kv" "gopkg.in/yaml.v2" "github.com/cortexproject/cortex/pkg/ingester" + "github.com/cortexproject/cortex/pkg/ring/kv" "github.com/cortexproject/cortex/pkg/util" "github.com/cortexproject/cortex/pkg/util/runtimeconfig" "github.com/cortexproject/cortex/pkg/util/validation" diff --git a/pkg/distributor/distributor_ring.go b/pkg/distributor/distributor_ring.go index e1e6135d2c..1e7ff840c8 100644 --- a/pkg/distributor/distributor_ring.go +++ b/pkg/distributor/distributor_ring.go @@ -7,9 +7,9 @@ import ( "github.com/go-kit/log/level" "github.com/grafana/dskit/flagext" - "github.com/grafana/dskit/kv" "github.com/cortexproject/cortex/pkg/ring" + "github.com/cortexproject/cortex/pkg/ring/kv" util_log "github.com/cortexproject/cortex/pkg/util/log" ) diff --git a/pkg/distributor/distributor_test.go b/pkg/distributor/distributor_test.go index 17288af26a..d97caa42ed 100644 --- a/pkg/distributor/distributor_test.go +++ b/pkg/distributor/distributor_test.go @@ -16,8 +16,6 @@ import ( "github.com/go-kit/log" "github.com/grafana/dskit/flagext" - "github.com/grafana/dskit/kv" - "github.com/grafana/dskit/kv/consul" "github.com/grafana/dskit/services" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/testutil" @@ -39,6 +37,8 @@ import ( "github.com/cortexproject/cortex/pkg/prom1/storage/metric" "github.com/cortexproject/cortex/pkg/ring" ring_client "github.com/cortexproject/cortex/pkg/ring/client" + "github.com/cortexproject/cortex/pkg/ring/kv" + "github.com/cortexproject/cortex/pkg/ring/kv/consul" "github.com/cortexproject/cortex/pkg/tenant" "github.com/cortexproject/cortex/pkg/util" "github.com/cortexproject/cortex/pkg/util/chunkcompat" diff --git a/pkg/distributor/ha_tracker.go b/pkg/distributor/ha_tracker.go index 5e7d6d421c..5502b3ddf0 100644 --- a/pkg/distributor/ha_tracker.go +++ b/pkg/distributor/ha_tracker.go @@ -13,14 +13,14 @@ import ( "github.com/go-kit/log" "github.com/go-kit/log/level" "github.com/gogo/protobuf/proto" - "github.com/grafana/dskit/kv" - "github.com/grafana/dskit/kv/codec" "github.com/grafana/dskit/services" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" "github.com/prometheus/prometheus/model/timestamp" "github.com/cortexproject/cortex/pkg/cortexpb" + "github.com/cortexproject/cortex/pkg/ring/kv" + "github.com/cortexproject/cortex/pkg/ring/kv/codec" "github.com/cortexproject/cortex/pkg/util" ) diff --git a/pkg/distributor/ha_tracker_test.go b/pkg/distributor/ha_tracker_test.go index cd7bd5dfe9..cc60d1cf29 100644 --- a/pkg/distributor/ha_tracker_test.go +++ b/pkg/distributor/ha_tracker_test.go @@ -9,8 +9,6 @@ import ( "github.com/go-kit/log" "github.com/grafana/dskit/flagext" - "github.com/grafana/dskit/kv" - "github.com/grafana/dskit/kv/consul" "github.com/grafana/dskit/services" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" @@ -23,6 +21,8 @@ import ( "github.com/cortexproject/cortex/pkg/cortexpb" "github.com/cortexproject/cortex/pkg/ring" + "github.com/cortexproject/cortex/pkg/ring/kv" + "github.com/cortexproject/cortex/pkg/ring/kv/consul" "github.com/cortexproject/cortex/pkg/util" util_log "github.com/cortexproject/cortex/pkg/util/log" "github.com/cortexproject/cortex/pkg/util/test" diff --git a/pkg/ingester/flush_test.go b/pkg/ingester/flush_test.go index 16d873f7aa..d7e53b5005 100644 --- a/pkg/ingester/flush_test.go +++ b/pkg/ingester/flush_test.go @@ -9,7 +9,6 @@ import ( "time" "github.com/go-kit/log" - "github.com/grafana/dskit/kv" "github.com/grafana/dskit/services" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/model/labels" @@ -21,6 +20,7 @@ import ( "github.com/cortexproject/cortex/pkg/cortexpb" "github.com/cortexproject/cortex/pkg/ingester/client" "github.com/cortexproject/cortex/pkg/ring" + "github.com/cortexproject/cortex/pkg/ring/kv" "github.com/cortexproject/cortex/pkg/util" "github.com/cortexproject/cortex/pkg/util/validation" ) diff --git a/pkg/ingester/lifecycle_test.go b/pkg/ingester/lifecycle_test.go index 78b38236fc..b34cd6a03b 100644 --- a/pkg/ingester/lifecycle_test.go +++ b/pkg/ingester/lifecycle_test.go @@ -12,8 +12,6 @@ import ( "github.com/go-kit/log" "github.com/grafana/dskit/flagext" - "github.com/grafana/dskit/kv" - "github.com/grafana/dskit/kv/consul" "github.com/grafana/dskit/services" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/model/labels" @@ -27,6 +25,8 @@ import ( "github.com/cortexproject/cortex/pkg/cortexpb" "github.com/cortexproject/cortex/pkg/ingester/client" "github.com/cortexproject/cortex/pkg/ring" + "github.com/cortexproject/cortex/pkg/ring/kv" + "github.com/cortexproject/cortex/pkg/ring/kv/consul" "github.com/cortexproject/cortex/pkg/util/test" "github.com/cortexproject/cortex/pkg/util/validation" ) diff --git a/pkg/querier/blocks_store_queryable.go b/pkg/querier/blocks_store_queryable.go index 6854595595..3845c78983 100644 --- a/pkg/querier/blocks_store_queryable.go +++ b/pkg/querier/blocks_store_queryable.go @@ -12,7 +12,6 @@ import ( "github.com/go-kit/log" "github.com/go-kit/log/level" "github.com/gogo/protobuf/types" - "github.com/grafana/dskit/kv" "github.com/grafana/dskit/services" "github.com/oklog/ulid" "github.com/pkg/errors" @@ -33,6 +32,7 @@ import ( "github.com/cortexproject/cortex/pkg/querier/series" "github.com/cortexproject/cortex/pkg/querier/stats" "github.com/cortexproject/cortex/pkg/ring" + "github.com/cortexproject/cortex/pkg/ring/kv" "github.com/cortexproject/cortex/pkg/storage/bucket" cortex_tsdb "github.com/cortexproject/cortex/pkg/storage/tsdb" "github.com/cortexproject/cortex/pkg/storage/tsdb/bucketindex" diff --git a/pkg/querier/blocks_store_replicated_set_test.go b/pkg/querier/blocks_store_replicated_set_test.go index c614946ac4..9d78ff6873 100644 --- a/pkg/querier/blocks_store_replicated_set_test.go +++ b/pkg/querier/blocks_store_replicated_set_test.go @@ -9,7 +9,6 @@ import ( "github.com/go-kit/log" "github.com/grafana/dskit/flagext" - "github.com/grafana/dskit/kv/consul" "github.com/grafana/dskit/services" "github.com/oklog/ulid" "github.com/prometheus/client_golang/prometheus" @@ -18,6 +17,7 @@ import ( "github.com/stretchr/testify/require" "github.com/cortexproject/cortex/pkg/ring" + "github.com/cortexproject/cortex/pkg/ring/kv/consul" cortex_tsdb "github.com/cortexproject/cortex/pkg/storage/tsdb" "github.com/cortexproject/cortex/pkg/util" "github.com/cortexproject/cortex/pkg/util/test" diff --git a/pkg/ring/basic_lifecycler.go b/pkg/ring/basic_lifecycler.go index c5a4d2df2f..b3a6702f3c 100644 --- a/pkg/ring/basic_lifecycler.go +++ b/pkg/ring/basic_lifecycler.go @@ -9,10 +9,11 @@ import ( "github.com/go-kit/log" "github.com/go-kit/log/level" - "github.com/grafana/dskit/kv" "github.com/grafana/dskit/services" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" + + "github.com/cortexproject/cortex/pkg/ring/kv" ) type BasicLifecyclerDelegate interface { diff --git a/pkg/ring/basic_lifecycler_test.go b/pkg/ring/basic_lifecycler_test.go index d123932991..a8381f5e86 100644 --- a/pkg/ring/basic_lifecycler_test.go +++ b/pkg/ring/basic_lifecycler_test.go @@ -6,13 +6,14 @@ import ( "time" "github.com/go-kit/log" - "github.com/grafana/dskit/kv" - "github.com/grafana/dskit/kv/consul" "github.com/grafana/dskit/services" "github.com/grafana/dskit/test" "github.com/prometheus/client_golang/prometheus/testutil" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + + "github.com/cortexproject/cortex/pkg/ring/kv" + "github.com/cortexproject/cortex/pkg/ring/kv/consul" ) const ( diff --git a/pkg/ring/bench/ring_memberlist_test.go b/pkg/ring/bench/ring_memberlist_test.go index 0eb1f5ecbe..69bf436348 100644 --- a/pkg/ring/bench/ring_memberlist_test.go +++ b/pkg/ring/bench/ring_memberlist_test.go @@ -8,13 +8,13 @@ import ( "github.com/go-kit/log" "github.com/grafana/dskit/flagext" - "github.com/grafana/dskit/kv/codec" - "github.com/grafana/dskit/kv/memberlist" "github.com/grafana/dskit/services" "github.com/prometheus/client_golang/prometheus" "github.com/stretchr/testify/require" "github.com/cortexproject/cortex/pkg/ring" + "github.com/cortexproject/cortex/pkg/ring/kv/codec" + "github.com/cortexproject/cortex/pkg/ring/kv/memberlist" ) type dnsProviderMock struct { diff --git a/vendor/github.com/grafana/dskit/kv/client.go b/pkg/ring/kv/client.go similarity index 97% rename from vendor/github.com/grafana/dskit/kv/client.go rename to pkg/ring/kv/client.go index b73620dfd6..b251a64b8b 100644 --- a/vendor/github.com/grafana/dskit/kv/client.go +++ b/pkg/ring/kv/client.go @@ -9,10 +9,10 @@ import ( "github.com/go-kit/log" "github.com/prometheus/client_golang/prometheus" - "github.com/grafana/dskit/kv/codec" - "github.com/grafana/dskit/kv/consul" - "github.com/grafana/dskit/kv/etcd" - "github.com/grafana/dskit/kv/memberlist" + "github.com/cortexproject/cortex/pkg/ring/kv/codec" + "github.com/cortexproject/cortex/pkg/ring/kv/consul" + "github.com/cortexproject/cortex/pkg/ring/kv/etcd" + "github.com/cortexproject/cortex/pkg/ring/kv/memberlist" ) const ( diff --git a/pkg/ring/kv/client_test.go b/pkg/ring/kv/client_test.go new file mode 100644 index 0000000000..b31f904d08 --- /dev/null +++ b/pkg/ring/kv/client_test.go @@ -0,0 +1,159 @@ +package kv + +import ( + "context" + "testing" + "time" + + "github.com/gogo/protobuf/proto" + "github.com/prometheus/client_golang/prometheus" + "github.com/stretchr/testify/require" + "gopkg.in/yaml.v2" + + "github.com/cortexproject/cortex/pkg/ring/kv/codec" +) + +func TestParseConfig(t *testing.T) { + conf := ` +store: consul +consul: + host: "consul:8500" + consistentreads: true +prefix: "test/" +multi: + primary: consul + secondary: etcd +` + + cfg := Config{} + + err := yaml.Unmarshal([]byte(conf), &cfg) + require.NoError(t, err) + require.Equal(t, "consul", cfg.Store) + require.Equal(t, "test/", cfg.Prefix) + require.Equal(t, "consul:8500", cfg.Consul.Host) + require.Equal(t, "consul", cfg.Multi.Primary) + require.Equal(t, "etcd", cfg.Multi.Secondary) +} + +func Test_createClient_multiBackend_withSingleRing(t *testing.T) { + storeCfg, testCodec := newConfigsForTest() + require.NotPanics(t, func() { + _, err := createClient("multi", "/collector", storeCfg, testCodec, Primary, prometheus.NewPedanticRegistry(), testLogger{}) + require.NoError(t, err) + }) +} + +func Test_createClient_multiBackend_withMultiRing(t *testing.T) { + storeCfg1, testCodec := newConfigsForTest() + storeCfg2 := StoreConfig{} + reg := prometheus.NewPedanticRegistry() + + require.NotPanics(t, func() { + _, err := createClient("multi", "/test", storeCfg1, testCodec, Primary, reg, testLogger{}) + require.NoError(t, err) + }, "First client for KV store must not panic") + require.NotPanics(t, func() { + _, err := createClient("mock", "/test", storeCfg2, testCodec, Primary, reg, testLogger{}) + require.NoError(t, err) + }, "Second client for KV store must not panic") +} + +func Test_createClient_singleBackend_mustContainRoleAndTypeLabels(t *testing.T) { + storeCfg, testCodec := newConfigsForTest() + reg := prometheus.NewPedanticRegistry() + client, err := createClient("mock", "/test1", storeCfg, testCodec, Primary, reg, testLogger{}) + require.NoError(t, err) + require.NoError(t, client.CAS(context.Background(), "/test", func(_ interface{}) (out interface{}, retry bool, err error) { + out = &mockMessage{id: "inCAS"} + retry = false + return + })) + + actual := typeToRoleMapHistogramLabels(t, reg, "kv_request_duration_seconds") + require.Len(t, actual, 1) + require.Equal(t, "primary", actual["mock"]) +} + +func Test_createClient_multiBackend_mustContainRoleAndTypeLabels(t *testing.T) { + storeCfg, testCodec := newConfigsForTest() + storeCfg.Multi.MirrorEnabled = true + storeCfg.Multi.MirrorTimeout = 10 * time.Second + reg := prometheus.NewPedanticRegistry() + client, err := createClient("multi", "/test1", storeCfg, testCodec, Primary, reg, testLogger{}) + require.NoError(t, err) + require.NoError(t, client.CAS(context.Background(), "/test", func(_ interface{}) (out interface{}, retry bool, err error) { + out = &mockMessage{id: "inCAS"} + retry = false + return + })) + + actual := typeToRoleMapHistogramLabels(t, reg, "kv_request_duration_seconds") + // expected multi-primary, inmemory-primary and mock-secondary + require.Len(t, actual, 3) + require.Equal(t, "primary", actual["multi"]) + require.Equal(t, "primary", actual["inmemory"]) + require.Equal(t, "secondary", actual["mock"]) + +} + +func typeToRoleMapHistogramLabels(t *testing.T, reg prometheus.Gatherer, histogramWithRoleLabels string) map[string]string { + mfs, err := reg.Gather() + require.NoError(t, err) + result := map[string]string{} + for _, mf := range mfs { + if mf.GetName() != histogramWithRoleLabels { + continue + } + for _, m := range mf.GetMetric() { + backendType := "" + role := "" + for _, l := range m.GetLabel() { + if l.GetName() == "role" { + role = l.GetValue() + } else if l.GetName() == "type" { + backendType = l.GetValue() + } + } + require.NotEmpty(t, backendType) + require.NotEmpty(t, role) + result[backendType] = role + } + } + return result +} +func newConfigsForTest() (cfg StoreConfig, c codec.Codec) { + cfg = StoreConfig{ + Multi: MultiConfig{ + Primary: "inmemory", + Secondary: "mock", + }, + } + c = codec.NewProtoCodec("test", func() proto.Message { + return &mockMessage{id: "inCodec"} + }) + return +} + +type mockMessage struct { + id string +} + +func (m *mockMessage) Reset() { + panic("do not use") +} + +func (m *mockMessage) String() string { + panic("do not use") +} + +func (m *mockMessage) ProtoMessage() { + panic("do not use") +} + +type testLogger struct { +} + +func (l testLogger) Log(keyvals ...interface{}) error { + return nil +} diff --git a/vendor/github.com/grafana/dskit/kv/codec/codec.go b/pkg/ring/kv/codec/codec.go similarity index 100% rename from vendor/github.com/grafana/dskit/kv/codec/codec.go rename to pkg/ring/kv/codec/codec.go diff --git a/vendor/github.com/grafana/dskit/kv/consul/client.go b/pkg/ring/kv/consul/client.go similarity index 99% rename from vendor/github.com/grafana/dskit/kv/consul/client.go rename to pkg/ring/kv/consul/client.go index 69219cf748..97cf4771b8 100644 --- a/vendor/github.com/grafana/dskit/kv/consul/client.go +++ b/pkg/ring/kv/consul/client.go @@ -11,14 +11,14 @@ import ( "github.com/go-kit/log" "github.com/go-kit/log/level" + "github.com/grafana/dskit/backoff" consul "github.com/hashicorp/consul/api" "github.com/hashicorp/go-cleanhttp" "github.com/prometheus/client_golang/prometheus" "github.com/weaveworks/common/instrument" "golang.org/x/time/rate" - "github.com/grafana/dskit/backoff" - "github.com/grafana/dskit/kv/codec" + "github.com/cortexproject/cortex/pkg/ring/kv/codec" ) const ( diff --git a/pkg/ring/kv/consul/client_test.go b/pkg/ring/kv/consul/client_test.go new file mode 100644 index 0000000000..ed599b5ab4 --- /dev/null +++ b/pkg/ring/kv/consul/client_test.go @@ -0,0 +1,187 @@ +package consul + +import ( + "context" + "fmt" + "strconv" + "testing" + "time" + + consul "github.com/hashicorp/consul/api" + "github.com/prometheus/client_golang/prometheus" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/cortexproject/cortex/pkg/ring/kv/codec" +) + +func writeValuesToKV(t *testing.T, client *Client, key string, start, end int, sleep time.Duration) <-chan struct{} { + t.Helper() + + ch := make(chan struct{}) + go func() { + defer close(ch) + for i := start; i <= end; i++ { + t.Log("ts", time.Now(), "msg", "writing value", "val", i) + _, _ = client.kv.Put(&consul.KVPair{Key: key, Value: []byte(fmt.Sprintf("%d", i))}, nil) + time.Sleep(sleep) + } + }() + return ch +} + +func TestWatchKeyWithRateLimit(t *testing.T) { + c, closer := NewInMemoryClientWithConfig(codec.String{}, Config{ + WatchKeyRateLimit: 5.0, + WatchKeyBurstSize: 1, + }, testLogger{}, prometheus.NewPedanticRegistry()) + t.Cleanup(func() { + assert.NoError(t, closer.Close()) + }) + + const key = "test" + const max = 100 + + ch := writeValuesToKV(t, c, key, 0, max, 10*time.Millisecond) + + observed := observeValueForSomeTime(t, c, key, 1200*time.Millisecond) // little over 1 second + + // wait until updater finishes + <-ch + + if testing.Verbose() { + t.Log(observed) + } + // Let's see how many updates we have observed. Given the rate limit and our observing time, it should be 6 + // We should also have seen one of the later values, as we're observing for longer than a second, so rate limit should allow + // us to see it. + if len(observed) < 5 || len(observed) > 10 { + t.Error("Expected ~6 observed values, got", observed) + } + last := observed[len(observed)-1] + n, _ := strconv.Atoi(last) + if n < max/2 { + t.Error("Expected to see high last observed value, got", observed) + } +} + +func TestWatchKeyNoRateLimit(t *testing.T) { + c, closer := NewInMemoryClientWithConfig(codec.String{}, Config{ + WatchKeyRateLimit: 0, + }, testLogger{}, prometheus.NewPedanticRegistry()) + t.Cleanup(func() { + assert.NoError(t, closer.Close()) + }) + + const key = "test" + const max = 100 + + ch := writeValuesToKV(t, c, key, 0, max, time.Millisecond) + observed := observeValueForSomeTime(t, c, key, 500*time.Millisecond) + + // wait until updater finishes + <-ch + + // With no limit, we should see most written values (we can lose some values if watching + // code is busy while multiple new values are written) + if len(observed) < 3*max/4 { + t.Error("Expected at least 3/4 of all values, got", observed) + } +} + +func TestReset(t *testing.T) { + c, closer := NewInMemoryClient(codec.String{}, testLogger{}, prometheus.NewPedanticRegistry()) + t.Cleanup(func() { + assert.NoError(t, closer.Close()) + }) + + const key = "test" + const max = 5 + + ch := make(chan error) + go func() { + defer close(ch) + for i := 0; i <= max; i++ { + t.Log("ts", time.Now(), "msg", "writing value", "val", i) + _, _ = c.kv.Put(&consul.KVPair{Key: key, Value: []byte(fmt.Sprintf("%d", i))}, nil) + if i == 1 { + c.kv.(*mockKV).ResetIndex() + } + if i == 2 { + c.kv.(*mockKV).ResetIndexForKey(key) + } + time.Sleep(10 * time.Millisecond) + } + }() + + observed := observeValueForSomeTime(t, c, key, 25*max*time.Millisecond) + + // wait until updater finishes + <-ch + + // Let's see how many updates we have observed. Given the rate limit and our observing time, we should see all numeric values + if testing.Verbose() { + t.Log(observed) + } + if len(observed) < max { + t.Error("Expected all values, got", observed) + } else if observed[len(observed)-1] != fmt.Sprintf("%d", max) { + t.Error("Expected to see last written value, got", observed) + } +} + +func observeValueForSomeTime(t *testing.T, client *Client, key string, timeout time.Duration) []string { + t.Helper() + + observed := []string(nil) + ctx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() + client.WatchKey(ctx, key, func(i interface{}) bool { + s, ok := i.(string) + if !ok { + return false + } + t.Log("ts", time.Now(), "msg", "observed value", "val", s) + observed = append(observed, s) + return true + }) + return observed +} + +func TestWatchKeyWithNoStartValue(t *testing.T) { + c, closer := NewInMemoryClient(codec.String{}, testLogger{}, prometheus.NewPedanticRegistry()) + t.Cleanup(func() { + assert.NoError(t, closer.Close()) + }) + + const key = "test" + + go func() { + time.Sleep(100 * time.Millisecond) + _, err := c.kv.Put(&consul.KVPair{Key: key, Value: []byte("start")}, nil) + require.NoError(t, err) + + time.Sleep(100 * time.Millisecond) + _, err = c.kv.Put(&consul.KVPair{Key: key, Value: []byte("end")}, nil) + require.NoError(t, err) + }() + + ctx, fn := context.WithTimeout(context.Background(), 300*time.Millisecond) + defer fn() + + reported := 0 + c.WatchKey(ctx, key, func(i interface{}) bool { + reported++ + return reported != 2 + }) + + // we should see both start and end values. + require.Equal(t, 2, reported) +} + +type testLogger struct { +} + +func (l testLogger) Log(keyvals ...interface{}) error { + return nil +} diff --git a/vendor/github.com/grafana/dskit/kv/consul/metrics.go b/pkg/ring/kv/consul/metrics.go similarity index 100% rename from vendor/github.com/grafana/dskit/kv/consul/metrics.go rename to pkg/ring/kv/consul/metrics.go diff --git a/vendor/github.com/grafana/dskit/kv/consul/mock.go b/pkg/ring/kv/consul/mock.go similarity index 99% rename from vendor/github.com/grafana/dskit/kv/consul/mock.go rename to pkg/ring/kv/consul/mock.go index f1f6937f08..6de12058aa 100644 --- a/vendor/github.com/grafana/dskit/kv/consul/mock.go +++ b/pkg/ring/kv/consul/mock.go @@ -12,7 +12,7 @@ import ( consul "github.com/hashicorp/consul/api" "github.com/prometheus/client_golang/prometheus" - "github.com/grafana/dskit/kv/codec" + "github.com/cortexproject/cortex/pkg/ring/kv/codec" ) type mockKV struct { diff --git a/vendor/github.com/grafana/dskit/kv/etcd/etcd.go b/pkg/ring/kv/etcd/etcd.go similarity index 95% rename from vendor/github.com/grafana/dskit/kv/etcd/etcd.go rename to pkg/ring/kv/etcd/etcd.go index fa6944d4f5..0d2bfb0a3f 100644 --- a/vendor/github.com/grafana/dskit/kv/etcd/etcd.go +++ b/pkg/ring/kv/etcd/etcd.go @@ -9,23 +9,23 @@ import ( "github.com/go-kit/log" "github.com/go-kit/log/level" + "github.com/grafana/dskit/backoff" + "github.com/grafana/dskit/flagext" "github.com/pkg/errors" clientv3 "go.etcd.io/etcd/client/v3" "go.etcd.io/etcd/pkg/transport" - "github.com/grafana/dskit/backoff" - dstls "github.com/grafana/dskit/crypto/tls" - "github.com/grafana/dskit/flagext" - "github.com/grafana/dskit/kv/codec" + "github.com/cortexproject/cortex/pkg/ring/kv/codec" + cortextls "github.com/cortexproject/cortex/pkg/util/tls" ) // Config for a new etcd.Client. type Config struct { - Endpoints []string `yaml:"endpoints"` - DialTimeout time.Duration `yaml:"dial_timeout"` - MaxRetries int `yaml:"max_retries"` - EnableTLS bool `yaml:"tls_enabled"` - TLS dstls.ClientConfig `yaml:",inline"` + Endpoints []string `yaml:"endpoints"` + DialTimeout time.Duration `yaml:"dial_timeout"` + MaxRetries int `yaml:"max_retries"` + EnableTLS bool `yaml:"tls_enabled"` + TLS cortextls.ClientConfig `yaml:",inline"` UserName string `yaml:"username"` Password string `yaml:"password"` diff --git a/vendor/github.com/grafana/dskit/kv/etcd/mock.go b/pkg/ring/kv/etcd/mock.go similarity index 99% rename from vendor/github.com/grafana/dskit/kv/etcd/mock.go rename to pkg/ring/kv/etcd/mock.go index b7ee276455..53142e1c19 100644 --- a/vendor/github.com/grafana/dskit/kv/etcd/mock.go +++ b/pkg/ring/kv/etcd/mock.go @@ -8,12 +8,12 @@ import ( "sync" "github.com/go-kit/log" + "github.com/grafana/dskit/flagext" "go.etcd.io/etcd/api/v3/etcdserverpb" "go.etcd.io/etcd/api/v3/mvccpb" clientv3 "go.etcd.io/etcd/client/v3" - "github.com/grafana/dskit/flagext" - "github.com/grafana/dskit/kv/codec" + "github.com/cortexproject/cortex/pkg/ring/kv/codec" ) // channelBufferSize is the size of the channels used to send events from Put, Delete, diff --git a/pkg/ring/kv/etcd/mock_test.go b/pkg/ring/kv/etcd/mock_test.go new file mode 100644 index 0000000000..58bf917528 --- /dev/null +++ b/pkg/ring/kv/etcd/mock_test.go @@ -0,0 +1,394 @@ +package etcd + +import ( + "context" + "sync" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.etcd.io/etcd/api/v3/mvccpb" + clientv3 "go.etcd.io/etcd/client/v3" +) + +// Tests to make sure the mock Etcd client works which we need for kv.Client tests. +// Quis custodiet ipsos custodes? + +func TestMockKv_Get(t *testing.T) { + t.Run("exact match", func(t *testing.T) { + pair := mvccpb.KeyValue{ + Key: []byte("/foo"), + Value: []byte("1"), + } + + kv := newMockKV() + kv.values = map[string]mvccpb.KeyValue{string(pair.Key): pair} + res, err := kv.Get(context.Background(), "/foo") + + require.NoError(t, err) + require.Len(t, res.Kvs, 1) + assert.Equal(t, []byte("/foo"), res.Kvs[0].Key) + }) + + t.Run("not exact match", func(t *testing.T) { + pair := mvccpb.KeyValue{ + Key: []byte("/foo"), + Value: []byte("1"), + } + + kv := newMockKV() + kv.values = map[string]mvccpb.KeyValue{string(pair.Key): pair} + res, err := kv.Get(context.Background(), "/bar") + + require.NoError(t, err) + assert.Empty(t, res.Kvs) + }) + + t.Run("prefix match", func(t *testing.T) { + fooPair := mvccpb.KeyValue{ + Key: []byte("/foo"), + Value: []byte("1"), + } + bazPair := mvccpb.KeyValue{ + Key: []byte("/baz"), + Value: []byte("2"), + } + firstPair := mvccpb.KeyValue{ + Key: []byte("/first"), + Value: []byte("3"), + } + + kv := newMockKV() + kv.values = map[string]mvccpb.KeyValue{ + string(fooPair.Key): fooPair, + string(bazPair.Key): bazPair, + string(firstPair.Key): firstPair, + } + res, err := kv.Get(context.Background(), "/f", clientv3.WithPrefix()) + + require.NoError(t, err) + assert.ElementsMatch(t, []*mvccpb.KeyValue{&fooPair, &firstPair}, res.Kvs) + }) + + t.Run("empty prefix", func(t *testing.T) { + fooPair := mvccpb.KeyValue{ + Key: []byte("/foo"), + Value: []byte("1"), + } + bazPair := mvccpb.KeyValue{ + Key: []byte("/baz"), + Value: []byte("2"), + } + firstPair := mvccpb.KeyValue{ + Key: []byte("/first"), + Value: []byte("3"), + } + + kv := newMockKV() + kv.values = map[string]mvccpb.KeyValue{ + string(fooPair.Key): fooPair, + string(bazPair.Key): bazPair, + string(firstPair.Key): firstPair, + } + res, err := kv.Get(context.Background(), "", clientv3.WithPrefix()) + + require.NoError(t, err) + assert.ElementsMatch(t, []*mvccpb.KeyValue{&fooPair, &bazPair, &firstPair}, res.Kvs) + }) +} + +func TestMockKV_Put(t *testing.T) { + t.Run("new key", func(t *testing.T) { + kv := newMockKV() + _, err := kv.Put(context.Background(), "/foo", "1") + + require.NoError(t, err) + assert.Equal(t, int64(1), kv.values["/foo"].Version) + assert.Equal(t, []byte("1"), kv.values["/foo"].Value) + }) + + t.Run("existing key", func(t *testing.T) { + kv := newMockKV() + kv.values["/foo"] = mvccpb.KeyValue{ + Key: []byte("/foo"), + CreateRevision: 1, + ModRevision: 2, + Version: 2, + Value: []byte("1"), + } + + _, err := kv.Put(context.Background(), "/foo", "2") + + require.NoError(t, err) + assert.Equal(t, int64(3), kv.values["/foo"].Version) + assert.Equal(t, []byte("2"), kv.values["/foo"].Value) + }) +} + +func TestMockKV_Delete(t *testing.T) { + t.Run("exact match", func(t *testing.T) { + kv := newMockKV() + kv.values["/foo"] = mvccpb.KeyValue{ + Key: []byte("/foo"), + Value: []byte("1"), + } + + res, err := kv.Delete(context.Background(), "/foo") + + require.NoError(t, err) + assert.Equal(t, int64(1), res.Deleted) + assert.Empty(t, kv.values) + }) + + t.Run("prefix match", func(t *testing.T) { + kv := newMockKV() + kv.values["/foo"] = mvccpb.KeyValue{ + Key: []byte("/foo"), + Value: []byte("1"), + } + kv.values["/baz"] = mvccpb.KeyValue{ + Key: []byte("/baz"), + Value: []byte("2"), + } + kv.values["/first"] = mvccpb.KeyValue{ + Key: []byte("/first"), + Value: []byte("3"), + } + + res, err := kv.Delete(context.Background(), "/f", clientv3.WithPrefix()) + + require.NoError(t, err) + assert.Equal(t, int64(2), res.Deleted) + assert.NotContains(t, kv.values, "/foo") + assert.NotContains(t, kv.values, "/first") + }) + + t.Run("empty prefix", func(t *testing.T) { + kv := newMockKV() + kv.values["/foo"] = mvccpb.KeyValue{ + Key: []byte("/foo"), + Value: []byte("1"), + } + kv.values["/baz"] = mvccpb.KeyValue{ + Key: []byte("/baz"), + Value: []byte("2"), + } + kv.values["/first"] = mvccpb.KeyValue{ + Key: []byte("/first"), + Value: []byte("3"), + } + + res, err := kv.Delete(context.Background(), "", clientv3.WithPrefix()) + + require.NoError(t, err) + assert.Equal(t, int64(3), res.Deleted) + assert.NotContains(t, kv.values, "/foo") + assert.NotContains(t, kv.values, "/baz") + assert.NotContains(t, kv.values, "/first") + }) +} + +func TestMockKV_Txn(t *testing.T) { + t.Run("success compare value", func(t *testing.T) { + kv := newMockKV() + kv.values["/foo"] = mvccpb.KeyValue{ + Key: []byte("/foo"), + CreateRevision: 1, + ModRevision: 3, + Version: 3, + Value: []byte("1"), + } + + res, err := kv.Txn(context.Background()). + If(clientv3.Compare(clientv3.Value("/foo"), "=", "1")). + Then(clientv3.OpPut("/foo", "2")). + Commit() + + require.NoError(t, err) + assert.True(t, res.Succeeded) + assert.Equal(t, kv.values["/foo"].Value, []byte("2")) + }) + + t.Run("failure compare value", func(t *testing.T) { + kv := newMockKV() + kv.values["/foo"] = mvccpb.KeyValue{ + Key: []byte("/foo"), + CreateRevision: 1, + ModRevision: 3, + Version: 3, + Value: []byte("2"), + } + + res, err := kv.Txn(context.Background()). + If(clientv3.Compare(clientv3.Value("/foo"), "=", "3")). + Then(clientv3.OpPut("/foo", "4")). + Else(clientv3.OpPut("/foo", "-1")). + Commit() + + require.NoError(t, err) + assert.False(t, res.Succeeded) + assert.Equal(t, kv.values["/foo"].Value, []byte("-1")) + }) + + t.Run("success compare version exists", func(t *testing.T) { + kv := newMockKV() + kv.values["/foo"] = mvccpb.KeyValue{ + Key: []byte("/foo"), + CreateRevision: 1, + ModRevision: 3, + Version: 3, + Value: []byte("1"), + } + + res, err := kv.Txn(context.Background()). + If(clientv3.Compare(clientv3.Version("/foo"), "=", 3)). + Then(clientv3.OpPut("/foo", "2")). + Commit() + + require.NoError(t, err) + assert.True(t, res.Succeeded) + assert.Equal(t, kv.values["/foo"].Value, []byte("2")) + }) + + t.Run("failure compare version exists", func(t *testing.T) { + kv := newMockKV() + kv.values["/foo"] = mvccpb.KeyValue{ + Key: []byte("/foo"), + CreateRevision: 1, + ModRevision: 3, + Version: 3, + Value: []byte("1"), + } + + res, err := kv.Txn(context.Background()). + If(clientv3.Compare(clientv3.Version("/foo"), "=", 2)). + Then(clientv3.OpPut("/foo", "2")). + Commit() + + require.NoError(t, err) + assert.False(t, res.Succeeded) + assert.Equal(t, kv.values["/foo"].Value, []byte("1")) + }) + + t.Run("success compare version does not exist", func(t *testing.T) { + kv := newMockKV() + + res, err := kv.Txn(context.Background()). + If(clientv3.Compare(clientv3.Version("/foo"), "=", 0)). + Then(clientv3.OpPut("/foo", "1")). + Commit() + + require.NoError(t, err) + assert.True(t, res.Succeeded) + assert.Equal(t, kv.values["/foo"].Value, []byte("1")) + }) + + t.Run("failure compare version does not exist", func(t *testing.T) { + kv := newMockKV() + + res, err := kv.Txn(context.Background()). + If(clientv3.Compare(clientv3.Version("/foo"), "=", 1)). + Then(clientv3.OpPut("/foo", "2")). + Commit() + + _, ok := kv.values["/foo"] + + require.NoError(t, err) + assert.False(t, res.Succeeded) + assert.False(t, ok) + }) +} + +func TestMockKV_Watch(t *testing.T) { + // setupWatchTest spawns a goroutine to watch for events matching a particular key + // emitted by a mockKV. Any observed events are sent to callers via the returned channel. + // The goroutine can be stopped using the return cancel function and waited for using the + // returned wait group. + setupWatchTest := func(key string, prefix bool) (*mockKV, context.CancelFunc, chan *clientv3.Event, *sync.WaitGroup) { + kv := newMockKV() + // Use a condition to make sure the goroutine has started using the watch before + // we do anything to the mockKV that would emit an event the watcher is expecting + cond := sync.NewCond(&sync.Mutex{}) + wg := sync.WaitGroup{} + ch := make(chan *clientv3.Event) + ctx, cancel := context.WithCancel(context.Background()) + + wg.Add(1) + go func() { + defer wg.Done() + + var ops []clientv3.OpOption + if prefix { + ops = []clientv3.OpOption{clientv3.WithPrefix()} + } + + watch := kv.Watch(ctx, key, ops...) + cond.Broadcast() + + for e := range watch { + if len(e.Events) > 0 { + ch <- e.Events[0] + } + } + }() + + // Wait for the watcher goroutine to start actually watching + cond.L.Lock() + cond.Wait() + cond.L.Unlock() + + return kv, cancel, ch, &wg + } + + t.Run("watch stopped by context", func(t *testing.T) { + // Ensure we can use the cancel method of the context given to the watch + // to stop the watch + _, cancel, _, wg := setupWatchTest("/bar", false) + cancel() + wg.Wait() + }) + + t.Run("watch stopped by close", func(t *testing.T) { + // Ensure we can use the Close method of the mockKV given to the watch + // to stop the watch + kv, _, _, wg := setupWatchTest("/bar", false) + _ = kv.Close() + wg.Wait() + }) + + t.Run("watch exact key", func(t *testing.T) { + // watch for events with key "/bar" and send them via the channel + kv, cancel, ch, wg := setupWatchTest("/bar", false) + + _, err := kv.Put(context.Background(), "/foo", "1") + require.NoError(t, err) + + _, err = kv.Put(context.Background(), "/bar", "1") + require.NoError(t, err) + + event := <-ch + assert.Equal(t, mvccpb.PUT, event.Type) + assert.Equal(t, []byte("/bar"), event.Kv.Key) + + cancel() + wg.Wait() + }) + + t.Run("watch prefix match", func(t *testing.T) { + // watch for events with the prefix "/b" and send them via the channel + kv, cancel, ch, wg := setupWatchTest("/b", true) + + _, err := kv.Delete(context.Background(), "/foo") + require.NoError(t, err) + + _, err = kv.Put(context.Background(), "/bar", "1") + require.NoError(t, err) + + event := <-ch + assert.Equal(t, mvccpb.PUT, event.Type) + assert.Equal(t, []byte("/bar"), event.Kv.Key) + + cancel() + wg.Wait() + }) +} diff --git a/pkg/ring/kv/kv_test.go b/pkg/ring/kv/kv_test.go new file mode 100644 index 0000000000..37a51ae0da --- /dev/null +++ b/pkg/ring/kv/kv_test.go @@ -0,0 +1,283 @@ +package kv + +import ( + "context" + "fmt" + "io" + "sort" + "strconv" + "sync" + "testing" + "time" + + "github.com/stretchr/testify/require" + + "github.com/cortexproject/cortex/pkg/ring/kv/codec" + "github.com/cortexproject/cortex/pkg/ring/kv/consul" + "github.com/cortexproject/cortex/pkg/ring/kv/etcd" +) + +func withFixtures(t *testing.T, f func(*testing.T, Client)) { + t.Helper() + + for _, fixture := range []struct { + name string + factory func() (Client, io.Closer, error) + }{ + {"consul", func() (Client, io.Closer, error) { + client, closer := consul.NewInMemoryClient(codec.String{}, testLogger{}, nil) + return client, closer, nil + }}, + {"etcd", func() (Client, io.Closer, error) { + client, closer := etcd.NewInMemoryClient(codec.String{}, testLogger{}) + return client, closer, nil + }}, + } { + t.Run(fixture.name, func(t *testing.T) { + client, closer, err := fixture.factory() + require.NoError(t, err) + t.Cleanup(func() { + _ = closer.Close() + }) + f(t, client) + }) + } +} + +var ( + ctx = context.Background() + key = "/key" +) + +func TestCAS(t *testing.T) { + withFixtures(t, func(t *testing.T, client Client) { + // Blindly set key to "0". + err := client.CAS(ctx, key, func(in interface{}) (interface{}, bool, error) { + return "0", true, nil + }) + require.NoError(t, err) + + // Swap key to i+1 iff its i. + for i := 0; i < 10; i++ { + err = client.CAS(ctx, key, func(in interface{}) (interface{}, bool, error) { + require.EqualValues(t, strconv.Itoa(i), in) + return strconv.Itoa(i + 1), true, nil + }) + require.NoError(t, err) + } + + // Make sure the CASes left the right value - "10". + value, err := client.Get(ctx, key) + require.NoError(t, err) + require.EqualValues(t, "10", value) + }) +} + +// TestNilCAS ensures we can return nil from the CAS callback when we don't +// want to modify the value. +func TestNilCAS(t *testing.T) { + withFixtures(t, func(t *testing.T, client Client) { + // Blindly set key to "0". + err := client.CAS(ctx, key, func(in interface{}) (interface{}, bool, error) { + return "0", true, nil + }) + require.NoError(t, err) + + // Ensure key is "0" and don't set it. + err = client.CAS(ctx, key, func(in interface{}) (interface{}, bool, error) { + require.EqualValues(t, "0", in) + return nil, false, nil + }) + require.NoError(t, err) + + // Make sure value is still 0. + value, err := client.Get(ctx, key) + require.NoError(t, err) + require.EqualValues(t, "0", value) + }) +} + +func TestWatchKey(t *testing.T) { + const key = "test" + const max = 100 + const sleep = 15 * time.Millisecond + const totalTestTimeout = 3 * max * sleep + const expectedFactor = 0.75 // we may not see every single value + + withFixtures(t, func(t *testing.T, client Client) { + observedValuesCh := make(chan string, max) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + go func() { + // Start watching before we even start generating values. + // Values will be buffered in the channel. + t.Log("Watching in background", "key", key) + client.WatchKey(ctx, key, func(value interface{}) bool { + observedValuesCh <- value.(string) + return true + }) + }() + + // update value for the key + go func() { + for i := 0; i < max; i++ { + // Start with sleeping, so that watching client see empty KV store at the beginning. + time.Sleep(sleep) + + err := client.CAS(ctx, key, func(in interface{}) (out interface{}, retry bool, err error) { + return fmt.Sprintf("%d", i), true, nil + }) + + if ctx.Err() != nil { + break + } + require.NoError(t, err) + } + }() + + lastObservedValue := -1 + observedCount := 0 + + totalDeadline := time.After(totalTestTimeout) + + for watching := true; watching; { + select { + case <-totalDeadline: + watching = false + case valStr := <-observedValuesCh: + val, err := strconv.Atoi(valStr) + if err != nil { + t.Fatal("Unexpected value observed:", valStr) + } + + if val <= lastObservedValue { + t.Fatal("Unexpected value observed:", val, "previous:", lastObservedValue) + } + lastObservedValue = val + observedCount++ + + if observedCount >= expectedFactor*max { + watching = false + } + } + } + + if observedCount < expectedFactor*max { + t.Errorf("expected at least %.0f%% observed values, got %.0f%% (observed count: %d)", 100*expectedFactor, 100*float64(observedCount)/max, observedCount) + } + }) +} + +func TestWatchPrefix(t *testing.T) { + withFixtures(t, func(t *testing.T, client Client) { + const prefix = "test/" + const prefix2 = "ignore/" + + // We are going to generate this number of updates, sleeping between each update. + const max = 100 + const sleep = time.Millisecond * 10 + // etcd seems to be quite slow. If we finish faster, test will end sooner. + // (We regularly see generators taking up to 5 seconds to produce all messages on some platforms!) + const totalTestTimeout = 10 * time.Second + + observedKeysCh := make(chan string, max) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + wg := sync.WaitGroup{} + + wg.Add(1) + go func() { + defer wg.Done() + + // start watching before we even start generating values. values will be buffered + client.WatchPrefix(ctx, prefix, func(key string, val interface{}) bool { + observedKeysCh <- key + return true + }) + }() + + gen := func(p string) { + defer wg.Done() + + start := time.Now() + for i := 0; i < max && ctx.Err() == nil; i++ { + // Start with sleeping, so that watching client can see empty KV store at the beginning. + time.Sleep(sleep) + + key := fmt.Sprintf("%s%d", p, i) + err := client.CAS(ctx, key, func(in interface{}) (out interface{}, retry bool, err error) { + return key, true, nil + }) + + if ctx.Err() != nil { + break + } + require.NoError(t, err) + } + t.Log("Generator finished in", time.Since(start)) + } + + wg.Add(2) + go gen(prefix) + go gen(prefix2) // we don't want to see these keys reported + + observedKeys := map[string]int{} + + totalDeadline := time.After(totalTestTimeout) + + start := time.Now() + for watching := true; watching; { + select { + case <-totalDeadline: + watching = false + case key := <-observedKeysCh: + observedKeys[key]++ + if len(observedKeys) == max { + watching = false + } + } + } + t.Log("Watching finished in", time.Since(start)) + + // Stop all goroutines and wait until terminated. + cancel() + wg.Wait() + + // verify that each key was reported once, and keys outside prefix were not reported + for i := 0; i < max; i++ { + key := fmt.Sprintf("%s%d", prefix, i) + + if observedKeys[key] != 1 { + t.Errorf("key %s has incorrect value %d", key, observedKeys[key]) + } + delete(observedKeys, key) + } + + if len(observedKeys) > 0 { + t.Errorf("unexpected keys reported: %v", observedKeys) + } + }) +} + +// TestList makes sure stored keys are listed back. +func TestList(t *testing.T) { + keysToCreate := []string{"a", "b", "c"} + + withFixtures(t, func(t *testing.T, client Client) { + for _, key := range keysToCreate { + err := client.CAS(context.Background(), key, func(in interface{}) (out interface{}, retry bool, err error) { + return key, false, nil + }) + require.NoError(t, err) + } + + storedKeys, err := client.List(context.Background(), "") + require.NoError(t, err) + sort.Strings(storedKeys) + + require.Equal(t, keysToCreate, storedKeys) + }) +} diff --git a/vendor/github.com/grafana/dskit/kv/memberlist/broadcast.go b/pkg/ring/kv/memberlist/broadcast.go similarity index 100% rename from vendor/github.com/grafana/dskit/kv/memberlist/broadcast.go rename to pkg/ring/kv/memberlist/broadcast.go diff --git a/pkg/ring/kv/memberlist/broadcast_test.go b/pkg/ring/kv/memberlist/broadcast_test.go new file mode 100644 index 0000000000..9117035029 --- /dev/null +++ b/pkg/ring/kv/memberlist/broadcast_test.go @@ -0,0 +1,53 @@ +package memberlist + +import "testing" + +func TestInvalidates(t *testing.T) { + const key = "ring" + + logger := testLogger{} + messages := map[string]ringBroadcast{ + "b1": {key: key, content: []string{"A", "B", "C"}, version: 1, logger: logger}, + "b2": {key: key, content: []string{"A", "B", "C"}, version: 2, logger: logger}, + "b3": {key: key, content: []string{"A"}, version: 3, logger: logger}, + "b4": {key: key, content: []string{"A", "B"}, version: 4, logger: logger}, + "b5": {key: key, content: []string{"A", "B", "D"}, version: 5, logger: logger}, + "b6": {key: key, content: []string{"A", "B", "C", "D"}, version: 6, logger: logger}, + } + + checkInvalidate(t, messages, "b2", "b1", true, false) + checkInvalidate(t, messages, "b3", "b1", false, false) + checkInvalidate(t, messages, "b3", "b2", false, false) + checkInvalidate(t, messages, "b4", "b1", false, false) + checkInvalidate(t, messages, "b4", "b2", false, false) + checkInvalidate(t, messages, "b4", "b3", true, false) + checkInvalidate(t, messages, "b5", "b1", false, false) + checkInvalidate(t, messages, "b5", "b2", false, false) + checkInvalidate(t, messages, "b5", "b3", true, false) + checkInvalidate(t, messages, "b5", "b4", true, false) + checkInvalidate(t, messages, "b6", "b1", true, false) + checkInvalidate(t, messages, "b6", "b2", true, false) + checkInvalidate(t, messages, "b6", "b3", true, false) + checkInvalidate(t, messages, "b6", "b4", true, false) + checkInvalidate(t, messages, "b6", "b5", true, false) +} + +func checkInvalidate(t *testing.T, messages map[string]ringBroadcast, key1, key2 string, firstInvalidatesSecond, secondInvalidatesFirst bool) { + b1, ok := messages[key1] + if !ok { + t.Fatal("cannot find", key1) + } + + b2, ok := messages[key2] + if !ok { + t.Fatal("cannot find", key2) + } + + if b1.Invalidates(b2) != firstInvalidatesSecond { + t.Errorf("%s.Invalidates(%s) returned %t. %s={%v, %d}, %s={%v, %d}", key1, key2, !firstInvalidatesSecond, key1, b1.content, b1.version, key2, b2.content, b2.version) + } + + if b2.Invalidates(b1) != secondInvalidatesFirst { + t.Errorf("%s.Invalidates(%s) returned %t. %s={%v, %d}, %s={%v, %d}", key2, key1, !secondInvalidatesFirst, key2, b2.content, b2.version, key1, b1.content, b1.version) + } +} diff --git a/vendor/github.com/grafana/dskit/kv/memberlist/dnsprovider.go b/pkg/ring/kv/memberlist/dnsprovider.go similarity index 100% rename from vendor/github.com/grafana/dskit/kv/memberlist/dnsprovider.go rename to pkg/ring/kv/memberlist/dnsprovider.go diff --git a/vendor/github.com/grafana/dskit/kv/memberlist/kv.pb.go b/pkg/ring/kv/memberlist/kv.pb.go similarity index 100% rename from vendor/github.com/grafana/dskit/kv/memberlist/kv.pb.go rename to pkg/ring/kv/memberlist/kv.pb.go diff --git a/vendor/github.com/grafana/dskit/kv/memberlist/kv.proto b/pkg/ring/kv/memberlist/kv.proto similarity index 100% rename from vendor/github.com/grafana/dskit/kv/memberlist/kv.proto rename to pkg/ring/kv/memberlist/kv.proto diff --git a/vendor/github.com/grafana/dskit/kv/memberlist/kv_init_service.go b/pkg/ring/kv/memberlist/kv_init_service.go similarity index 100% rename from vendor/github.com/grafana/dskit/kv/memberlist/kv_init_service.go rename to pkg/ring/kv/memberlist/kv_init_service.go diff --git a/pkg/ring/kv/memberlist/kv_init_service_test.go b/pkg/ring/kv/memberlist/kv_init_service_test.go new file mode 100644 index 0000000000..35c668433f --- /dev/null +++ b/pkg/ring/kv/memberlist/kv_init_service_test.go @@ -0,0 +1,62 @@ +package memberlist + +import ( + "bytes" + "testing" + "time" + + "github.com/hashicorp/memberlist" + "github.com/prometheus/client_golang/prometheus" + "github.com/stretchr/testify/require" + + "github.com/grafana/dskit/flagext" +) + +func TestPage(t *testing.T) { + conf := memberlist.DefaultLANConfig() + ml, err := memberlist.Create(conf) + require.NoError(t, err) + + t.Cleanup(func() { + _ = ml.Shutdown() + }) + + require.NoError(t, pageTemplate.Execute(&bytes.Buffer{}, pageData{ + Now: time.Now(), + Memberlist: ml, + SortedMembers: ml.Members(), + Store: nil, + ReceivedMessages: []message{{ + ID: 10, + Time: time.Now(), + Size: 50, + Pair: KeyValuePair{ + Key: "hello", + Value: []byte("world"), + Codec: "codec", + }, + Version: 20, + Changes: []string{"A", "B", "C"}, + }}, + + SentMessages: []message{{ + ID: 10, + Time: time.Now(), + Size: 50, + Pair: KeyValuePair{ + Key: "hello", + Value: []byte("world"), + Codec: "codec", + }, + Version: 20, + Changes: []string{"A", "B", "C"}, + }}, + })) +} + +func TestStop(t *testing.T) { + var cfg KVConfig + flagext.DefaultValues(&cfg) + kvinit := NewKVInitService(&cfg, nil, &dnsProviderMock{}, prometheus.NewPedanticRegistry()) + require.NoError(t, kvinit.stopping(nil)) +} diff --git a/vendor/github.com/grafana/dskit/kv/memberlist/memberlist_client.go b/pkg/ring/kv/memberlist/memberlist_client.go similarity index 99% rename from vendor/github.com/grafana/dskit/kv/memberlist/memberlist_client.go rename to pkg/ring/kv/memberlist/memberlist_client.go index d7ad176d0e..ff2ddba56d 100644 --- a/vendor/github.com/grafana/dskit/kv/memberlist/memberlist_client.go +++ b/pkg/ring/kv/memberlist/memberlist_client.go @@ -15,13 +15,13 @@ import ( "github.com/go-kit/log" "github.com/go-kit/log/level" - "github.com/hashicorp/memberlist" - "github.com/prometheus/client_golang/prometheus" - "github.com/grafana/dskit/backoff" "github.com/grafana/dskit/flagext" - "github.com/grafana/dskit/kv/codec" "github.com/grafana/dskit/services" + "github.com/hashicorp/memberlist" + "github.com/prometheus/client_golang/prometheus" + + "github.com/cortexproject/cortex/pkg/ring/kv/codec" ) const ( diff --git a/pkg/ring/kv/memberlist/memberlist_client_test.go b/pkg/ring/kv/memberlist/memberlist_client_test.go new file mode 100644 index 0000000000..aa33df5344 --- /dev/null +++ b/pkg/ring/kv/memberlist/memberlist_client_test.go @@ -0,0 +1,1311 @@ +package memberlist + +import ( + "bytes" + "context" + "encoding/gob" + "errors" + "fmt" + "math" + "math/rand" + "net" + "reflect" + "sort" + "sync" + "testing" + "time" + + "github.com/go-kit/log" + "github.com/grafana/dskit/flagext" + "github.com/grafana/dskit/services" + "github.com/prometheus/client_golang/prometheus" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/cortexproject/cortex/pkg/ring/kv/codec" +) + +const ACTIVE = 1 +const JOINING = 2 +const LEFT = 3 + +// Simple mergeable data structure, used for gossiping +type member struct { + Timestamp int64 + Tokens []uint32 + State int +} + +type data struct { + Members map[string]member +} + +func (d *data) Merge(mergeable Mergeable, localCAS bool) (Mergeable, error) { + if mergeable == nil { + return nil, nil + } + + od, ok := mergeable.(*data) + if !ok || od == nil { + return nil, fmt.Errorf("invalid thing to merge: %T", od) + } + + updated := map[string]member{} + + for k, v := range od.Members { + if v.Timestamp > d.Members[k].Timestamp { + d.Members[k] = v + updated[k] = v + } + } + + if localCAS { + for k, v := range d.Members { + if _, ok := od.Members[k]; !ok && v.State != LEFT { + v.State = LEFT + v.Tokens = nil + d.Members[k] = v + updated[k] = v + } + } + } + + if len(updated) == 0 { + return nil, nil + } + return &data{Members: updated}, nil +} + +func (d *data) MergeContent() []string { + // return list of keys + out := []string(nil) + for k := range d.Members { + out = append(out, k) + } + return out +} + +// This method deliberately ignores zero limit, so that tests can observe LEFT state as well. +func (d *data) RemoveTombstones(limit time.Time) (total, removed int) { + for n, m := range d.Members { + if m.State == LEFT { + if time.Unix(m.Timestamp, 0).Before(limit) { + // remove it + delete(d.Members, n) + removed++ + } else { + total++ + } + } + } + return +} + +func (m member) clone() member { + out := member{ + Timestamp: m.Timestamp, + Tokens: make([]uint32, len(m.Tokens)), + State: m.State, + } + copy(out.Tokens, m.Tokens) + return out +} + +func (d *data) Clone() Mergeable { + out := &data{ + Members: make(map[string]member, len(d.Members)), + } + for k, v := range d.Members { + out.Members[k] = v.clone() + } + return out +} + +func (d *data) getAllTokens() []uint32 { + out := []uint32(nil) + for _, m := range d.Members { + out = append(out, m.Tokens...) + } + + sort.Sort(sortableUint32(out)) + return out +} + +type dataCodec struct{} + +func (d dataCodec) CodecID() string { + return "testDataCodec" +} + +func (d dataCodec) Decode(b []byte) (interface{}, error) { + dec := gob.NewDecoder(bytes.NewBuffer(b)) + out := &data{} + err := dec.Decode(out) + return out, err +} + +func (d dataCodec) Encode(val interface{}) ([]byte, error) { + buf := bytes.Buffer{} + enc := gob.NewEncoder(&buf) + err := enc.Encode(val) + return buf.Bytes(), err +} + +var _ codec.Codec = &dataCodec{} + +type sortableUint32 []uint32 + +func (ts sortableUint32) Len() int { return len(ts) } +func (ts sortableUint32) Swap(i, j int) { ts[i], ts[j] = ts[j], ts[i] } +func (ts sortableUint32) Less(i, j int) bool { return ts[i] < ts[j] } + +const key = "test" + +func updateFn(name string) func(*data) (*data, bool, error) { + return func(in *data) (out *data, retry bool, err error) { + // Modify value that was passed as a parameter. + // Client takes care of concurrent modifications. + r := in + if r == nil { + r = &data{Members: map[string]member{}} + } + + m, ok := r.Members[name] + if !ok { + r.Members[name] = member{ + Timestamp: time.Now().Unix(), + Tokens: generateTokens(128), + State: JOINING, + } + } else { + // We need to update timestamp, otherwise CAS will fail + m.Timestamp = time.Now().Unix() + m.State = ACTIVE + r.Members[name] = m + } + + return r, true, nil + } +} + +func get(t *testing.T, kv *Client, key string) interface{} { + val, err := kv.Get(context.Background(), key) + if err != nil { + t.Fatalf("Failed to get value for key %s: %v", key, err) + } + return val +} + +func getData(t *testing.T, kv *Client, key string) *data { + t.Helper() + val := get(t, kv, key) + if val == nil { + return nil + } + if r, ok := val.(*data); ok { + return r + } + t.Fatalf("Expected ring, got: %T", val) + return nil +} + +func cas(t *testing.T, kv *Client, key string, updateFn func(*data) (*data, bool, error)) { + t.Helper() + + if err := casWithErr(context.Background(), t, kv, key, updateFn); err != nil { + t.Fatal(err) + } +} + +func casWithErr(ctx context.Context, t *testing.T, kv *Client, key string, updateFn func(*data) (*data, bool, error)) error { + t.Helper() + fn := func(in interface{}) (out interface{}, retry bool, err error) { + var r *data + if in != nil { + r = in.(*data) + } + + d, rt, e := updateFn(r) + if d == nil { + // translate nil pointer to nil interface value + return nil, rt, e + } + return d, rt, e + } + + return kv.CAS(ctx, key, fn) +} + +func TestBasicGetAndCas(t *testing.T) { + c := dataCodec{} + + name := "Ing 1" + var cfg KVConfig + flagext.DefaultValues(&cfg) + cfg.TCPTransport = TCPTransportConfig{ + BindAddrs: []string{"localhost"}, + } + cfg.Codecs = []codec.Codec{c} + + mkv := NewKV(cfg, log.NewNopLogger(), &dnsProviderMock{}, prometheus.NewPedanticRegistry()) + require.NoError(t, services.StartAndAwaitRunning(context.Background(), mkv)) + defer services.StopAndAwaitTerminated(context.Background(), mkv) //nolint:errcheck + + kv, err := NewClient(mkv, c) + require.NoError(t, err) + + const key = "test" + + val := get(t, kv, key) + if val != nil { + t.Error("Expected nil, got:", val) + } + + // Create member in PENDING state, with some tokens + cas(t, kv, key, updateFn(name)) + + r := getData(t, kv, key) + if r == nil || r.Members[name].Timestamp == 0 || len(r.Members[name].Tokens) <= 0 { + t.Fatalf("Expected ring with tokens, got %v", r) + } + + val = get(t, kv, "other key") + if val != nil { + t.Errorf("Expected nil, got: %v", val) + } + + // Update member into ACTIVE state + cas(t, kv, key, updateFn(name)) + r = getData(t, kv, key) + if r.Members[name].State != ACTIVE { + t.Errorf("Expected member to be active after second update, got %v", r) + } + + // Delete member + cas(t, kv, key, func(r *data) (*data, bool, error) { + delete(r.Members, name) + return r, true, nil + }) + + r = getData(t, kv, key) + if r.Members[name].State != LEFT { + t.Errorf("Expected member to be LEFT, got %v", r) + } +} + +func withFixtures(t *testing.T, testFN func(t *testing.T, kv *Client)) { + t.Helper() + + c := dataCodec{} + + var cfg KVConfig + flagext.DefaultValues(&cfg) + cfg.TCPTransport = TCPTransportConfig{} + cfg.Codecs = []codec.Codec{c} + + mkv := NewKV(cfg, log.NewNopLogger(), &dnsProviderMock{}, prometheus.NewPedanticRegistry()) + require.NoError(t, services.StartAndAwaitRunning(context.Background(), mkv)) + defer services.StopAndAwaitTerminated(context.Background(), mkv) //nolint:errcheck + + kv, err := NewClient(mkv, c) + require.NoError(t, err) + + testFN(t, kv) +} + +func TestCASNoOutput(t *testing.T) { + withFixtures(t, func(t *testing.T, kv *Client) { + // should succeed with single call + calls := 0 + cas(t, kv, key, func(d *data) (*data, bool, error) { + calls++ + return nil, true, nil + }) + + require.Equal(t, 1, calls) + }) +} + +func TestCASErrorNoRetry(t *testing.T) { + withFixtures(t, func(t *testing.T, kv *Client) { + calls := 0 + err := casWithErr(context.Background(), t, kv, key, func(d *data) (*data, bool, error) { + calls++ + return nil, false, errors.New("don't worry, be happy") + }) + require.EqualError(t, err, "failed to CAS-update key test: fn returned error: don't worry, be happy") + require.Equal(t, 1, calls) + }) +} + +func TestCASErrorWithRetries(t *testing.T) { + withFixtures(t, func(t *testing.T, kv *Client) { + calls := 0 + err := casWithErr(context.Background(), t, kv, key, func(d *data) (*data, bool, error) { + calls++ + return nil, true, errors.New("don't worry, be happy") + }) + require.EqualError(t, err, "failed to CAS-update key test: fn returned error: don't worry, be happy") + require.Equal(t, 10, calls) // hard-coded in CAS function. + }) +} + +func TestCASNoChange(t *testing.T) { + withFixtures(t, func(t *testing.T, kv *Client) { + cas(t, kv, key, func(in *data) (*data, bool, error) { + if in == nil { + in = &data{Members: map[string]member{}} + } + + in.Members["hello"] = member{ + Timestamp: time.Now().Unix(), + Tokens: generateTokens(128), + State: JOINING, + } + + return in, true, nil + }) + + startTime := time.Now() + calls := 0 + err := casWithErr(context.Background(), t, kv, key, func(d *data) (*data, bool, error) { + calls++ + return d, true, nil + }) + require.EqualError(t, err, "failed to CAS-update key test: no change detected") + require.Equal(t, maxCasRetries, calls) + // if there was no change, CAS sleeps before every retry + require.True(t, time.Since(startTime) >= (maxCasRetries-1)*noChangeDetectedRetrySleep) + }) +} + +func TestCASNoChangeShortTimeout(t *testing.T) { + withFixtures(t, func(t *testing.T, kv *Client) { + cas(t, kv, key, func(in *data) (*data, bool, error) { + if in == nil { + in = &data{Members: map[string]member{}} + } + + in.Members["hello"] = member{ + Timestamp: time.Now().Unix(), + Tokens: generateTokens(128), + State: JOINING, + } + + return in, true, nil + }) + + ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond) + defer cancel() + + calls := 0 + err := casWithErr(ctx, t, kv, key, func(d *data) (*data, bool, error) { + calls++ + return d, true, nil + }) + require.EqualError(t, err, "failed to CAS-update key test: context deadline exceeded") + require.Equal(t, 1, calls) // hard-coded in CAS function. + }) +} + +func TestCASFailedBecauseOfVersionChanges(t *testing.T) { + withFixtures(t, func(t *testing.T, kv *Client) { + cas(t, kv, key, func(in *data) (*data, bool, error) { + return &data{Members: map[string]member{"nonempty": {Timestamp: time.Now().Unix()}}}, true, nil + }) + + calls := 0 + // outer cas + err := casWithErr(context.Background(), t, kv, key, func(d *data) (*data, bool, error) { + // outer CAS logic + calls++ + + // run inner-CAS that succeeds, and that will make outer cas to fail + cas(t, kv, key, func(d *data) (*data, bool, error) { + // to avoid delays due to merging, we update different ingester each time. + d.Members[fmt.Sprintf("%d", calls)] = member{ + Timestamp: time.Now().Unix(), + } + return d, true, nil + }) + + d.Members["world"] = member{ + Timestamp: time.Now().Unix(), + } + return d, true, nil + }) + + require.EqualError(t, err, "failed to CAS-update key test: too many retries") + require.Equal(t, maxCasRetries, calls) + }) +} + +func TestMultipleCAS(t *testing.T) { + c := dataCodec{} + + var cfg KVConfig + flagext.DefaultValues(&cfg) + cfg.Codecs = []codec.Codec{c} + + mkv := NewKV(cfg, log.NewNopLogger(), &dnsProviderMock{}, prometheus.NewPedanticRegistry()) + mkv.maxCasRetries = 20 + require.NoError(t, services.StartAndAwaitRunning(context.Background(), mkv)) + defer services.StopAndAwaitTerminated(context.Background(), mkv) //nolint:errcheck + + kv, err := NewClient(mkv, c) + require.NoError(t, err) + + wg := &sync.WaitGroup{} + start := make(chan struct{}) + + const members = 10 + const namePattern = "Member-%d" + + for i := 0; i < members; i++ { + wg.Add(1) + go func(name string) { + defer wg.Done() + <-start + up := updateFn(name) + cas(t, kv, "test", up) // JOINING state + cas(t, kv, "test", up) // ACTIVE state + }(fmt.Sprintf(namePattern, i)) + } + + close(start) // start all CAS updates + wg.Wait() // wait until all CAS updates are finished + + // Now let's test that all members are in ACTIVE state + r := getData(t, kv, "test") + require.True(t, r != nil, "nil ring") + + for i := 0; i < members; i++ { + n := fmt.Sprintf(namePattern, i) + + if r.Members[n].State != ACTIVE { + t.Errorf("Expected member %s to be ACTIVE got %v", n, r.Members[n].State) + } + } + + // Make all members leave + start = make(chan struct{}) + + for i := 0; i < members; i++ { + wg.Add(1) + go func(name string) { + defer wg.Done() + + <-start + up := func(in *data) (out *data, retry bool, err error) { + delete(in.Members, name) + return in, true, nil + } + cas(t, kv, "test", up) // PENDING state + }(fmt.Sprintf(namePattern, i)) + } + + close(start) // start all CAS updates + wg.Wait() // wait until all CAS updates are finished + + r = getData(t, kv, "test") + require.True(t, r != nil, "nil ring") + + for i := 0; i < members; i++ { + n := fmt.Sprintf(namePattern, i) + + if r.Members[n].State != LEFT { + t.Errorf("Expected member %s to be ACTIVE got %v", n, r.Members[n].State) + } + } +} + +func TestMultipleClients(t *testing.T) { + c := dataCodec{} + + const members = 10 + const key = "ring" + + var clients []*Client + + stop := make(chan struct{}) + start := make(chan struct{}) + + port := 0 + + for i := 0; i < members; i++ { + id := fmt.Sprintf("Member-%d", i) + var cfg KVConfig + flagext.DefaultValues(&cfg) + cfg.NodeName = id + + cfg.GossipInterval = 100 * time.Millisecond + cfg.GossipNodes = 3 + cfg.PushPullInterval = 5 * time.Second + + cfg.TCPTransport = TCPTransportConfig{ + BindAddrs: []string{"localhost"}, + BindPort: 0, // randomize ports + } + + cfg.Codecs = []codec.Codec{c} + + mkv := NewKV(cfg, log.NewNopLogger(), &dnsProviderMock{}, prometheus.NewPedanticRegistry()) + require.NoError(t, services.StartAndAwaitRunning(context.Background(), mkv)) + + kv, err := NewClient(mkv, c) + require.NoError(t, err) + + clients = append(clients, kv) + + go runClient(t, kv, id, key, port, start, stop) + + // next KV will connect to this one + port = kv.kv.GetListeningPort() + } + + println("Waiting before start") + time.Sleep(2 * time.Second) + close(start) + + println("Observing ring ...") + + startTime := time.Now() + firstKv := clients[0] + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + updates := 0 + firstKv.WatchKey(ctx, key, func(in interface{}) bool { + updates++ + + r := in.(*data) + + minTimestamp, maxTimestamp, avgTimestamp := getTimestamps(r.Members) + + now := time.Now() + t.Log("Update", now.Sub(startTime).String(), ": Ring has", len(r.Members), "members, and", len(r.getAllTokens()), + "tokens, oldest timestamp:", now.Sub(time.Unix(minTimestamp, 0)).String(), + "avg timestamp:", now.Sub(time.Unix(avgTimestamp, 0)).String(), + "youngest timestamp:", now.Sub(time.Unix(maxTimestamp, 0)).String()) + return true // yes, keep watching + }) + cancel() // make linter happy + + t.Logf("Ring updates observed: %d", updates) + + if updates < members { + // in general, at least one update from each node. (although that's not necessarily true... + // but typically we get more updates than that anyway) + t.Errorf("expected to see updates, got %d", updates) + } + + // Let's check all the clients to see if they have relatively up-to-date information + // All of them should at least have all the clients + // And same tokens. + allTokens := []uint32(nil) + + for i := 0; i < members; i++ { + kv := clients[i] + + r := getData(t, kv, key) + t.Logf("KV %d: number of known members: %d\n", i, len(r.Members)) + if len(r.Members) != members { + t.Errorf("Member %d has only %d members in the ring", i, len(r.Members)) + } + + minTimestamp, maxTimestamp, avgTimestamp := getTimestamps(r.Members) + for n, ing := range r.Members { + if ing.State != ACTIVE { + t.Errorf("Member %d: invalid state of member %s in the ring: %v ", i, n, ing.State) + } + } + now := time.Now() + t.Logf("Member %d: oldest: %v, avg: %v, youngest: %v", i, + now.Sub(time.Unix(minTimestamp, 0)).String(), + now.Sub(time.Unix(avgTimestamp, 0)).String(), + now.Sub(time.Unix(maxTimestamp, 0)).String()) + + tokens := r.getAllTokens() + if allTokens == nil { + allTokens = tokens + t.Logf("Found tokens: %d", len(allTokens)) + } else { + if len(allTokens) != len(tokens) { + t.Errorf("Member %d: Expected %d tokens, got %d", i, len(allTokens), len(tokens)) + } else { + for ix, tok := range allTokens { + if tok != tokens[ix] { + t.Errorf("Member %d: Tokens at position %d differ: %v, %v", i, ix, tok, tokens[ix]) + break + } + } + } + } + } + + // We cannot shutdown the KV until now in order for Get() to work reliably. + close(stop) +} + +func TestJoinMembersWithRetryBackoff(t *testing.T) { + c := dataCodec{} + + const members = 3 + const key = "ring" + + var clients []*Client + + stop := make(chan struct{}) + start := make(chan struct{}) + + ports, err := getFreePorts(members) + require.NoError(t, err) + + watcher := services.NewFailureWatcher() + go func() { + for { + select { + case err := <-watcher.Chan(): + t.Errorf("service reported error: %v", err) + case <-stop: + return + } + } + }() + + for i, port := range ports { + id := fmt.Sprintf("Member-%d", i) + var cfg KVConfig + flagext.DefaultValues(&cfg) + cfg.NodeName = id + + cfg.GossipInterval = 100 * time.Millisecond + cfg.GossipNodes = 3 + cfg.PushPullInterval = 5 * time.Second + + cfg.MinJoinBackoff = 100 * time.Millisecond + cfg.MaxJoinBackoff = 1 * time.Minute + cfg.MaxJoinRetries = 10 + cfg.AbortIfJoinFails = true + + cfg.TCPTransport = TCPTransportConfig{ + BindAddrs: []string{"localhost"}, + BindPort: port, + } + + cfg.Codecs = []codec.Codec{c} + + if i == 0 { + // Add members to first KV config to join immediately on initialization. + // This will enforce backoff as each next members listener is not open yet. + cfg.JoinMembers = []string{fmt.Sprintf("localhost:%d", ports[1])} + } else { + // Add delay to each next member to force backoff + time.Sleep(1 * time.Second) + } + + mkv := NewKV(cfg, log.NewNopLogger(), &dnsProviderMock{}, prometheus.NewPedanticRegistry()) // Not started yet. + watcher.WatchService(mkv) + + kv, err := NewClient(mkv, c) + require.NoError(t, err) + + clients = append(clients, kv) + + startKVAndRunClient := func(kv *Client, id string, port int) { + err = services.StartAndAwaitRunning(context.Background(), mkv) + if err != nil { + t.Errorf("failed to start KV: %v", err) + } + runClient(t, kv, id, key, port, start, stop) + } + + if i == 0 { + go startKVAndRunClient(kv, id, 0) + } else { + go startKVAndRunClient(kv, id, ports[i-1]) + } + } + + t.Log("Waiting for all members to join memberlist cluster") + close(start) + time.Sleep(2 * time.Second) + + t.Log("Observing ring ...") + + startTime := time.Now() + firstKv := clients[0] + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + observedMembers := 0 + firstKv.WatchKey(ctx, key, func(in interface{}) bool { + r := in.(*data) + observedMembers = len(r.Members) + + now := time.Now() + t.Log("Update", now.Sub(startTime).String(), ": Ring has", len(r.Members), "members, and", len(r.getAllTokens()), + "tokens") + return true // yes, keep watching + }) + cancel() // make linter happy + + // Let clients exchange messages for a while + close(stop) + + if observedMembers < members { + t.Errorf("expected to see %d but saw %d", members, observedMembers) + } +} + +func TestMemberlistFailsToJoin(t *testing.T) { + c := dataCodec{} + + ports, err := getFreePorts(1) + require.NoError(t, err) + + var cfg KVConfig + flagext.DefaultValues(&cfg) + cfg.MinJoinBackoff = 100 * time.Millisecond + cfg.MaxJoinBackoff = 100 * time.Millisecond + cfg.MaxJoinRetries = 2 + cfg.AbortIfJoinFails = true + + cfg.TCPTransport = TCPTransportConfig{ + BindAddrs: []string{"localhost"}, + BindPort: 0, + } + + cfg.JoinMembers = []string{fmt.Sprintf("127.0.0.1:%d", ports[0])} + + cfg.Codecs = []codec.Codec{c} + + mkv := NewKV(cfg, log.NewNopLogger(), &dnsProviderMock{}, prometheus.NewPedanticRegistry()) + require.NoError(t, services.StartAndAwaitRunning(context.Background(), mkv)) + + ctxTimeout, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + // Service should fail soon after starting, since it cannot join the cluster. + _ = mkv.AwaitTerminated(ctxTimeout) + + // We verify service state here. + require.Equal(t, mkv.FailureCase(), errFailedToJoinCluster) +} + +func getFreePorts(count int) ([]int, error) { + var ports []int + for i := 0; i < count; i++ { + addr, err := net.ResolveTCPAddr("tcp", "127.0.0.1:0") + if err != nil { + return nil, err + } + + l, err := net.ListenTCP("tcp", addr) + if err != nil { + return nil, err + } + defer l.Close() + ports = append(ports, l.Addr().(*net.TCPAddr).Port) + } + return ports, nil +} + +func getTimestamps(members map[string]member) (min int64, max int64, avg int64) { + min = int64(math.MaxInt64) + + for _, ing := range members { + if ing.Timestamp < min { + min = ing.Timestamp + } + + if ing.Timestamp > max { + max = ing.Timestamp + } + + avg += ing.Timestamp + } + if len(members) > 0 { + avg /= int64(len(members)) + } + return +} + +func runClient(t *testing.T, kv *Client, name string, ringKey string, portToConnect int, start <-chan struct{}, stop <-chan struct{}) { + // stop gossipping about the ring(s) + defer services.StopAndAwaitTerminated(context.Background(), kv.kv) //nolint:errcheck + + for { + select { + case <-start: + start = nil + + // let's join the first member + if portToConnect > 0 { + _, err := kv.kv.JoinMembers([]string{fmt.Sprintf("127.0.0.1:%d", portToConnect)}) + if err != nil { + t.Errorf("%s failed to join the cluster: %v", name, err) + return + } + } + case <-stop: + return + case <-time.After(1 * time.Second): + cas(t, kv, ringKey, updateFn(name)) + } + } +} + +// avoid dependency on ring package +func generateTokens(numTokens int) []uint32 { + var tokens []uint32 + for i := 0; i < numTokens; { + candidate := rand.Uint32() + tokens = append(tokens, candidate) + i++ + } + return tokens +} + +type distributedCounter map[string]int + +func (dc distributedCounter) Merge(mergeable Mergeable, localCAS bool) (Mergeable, error) { + if mergeable == nil { + return nil, nil + } + + odc, ok := mergeable.(distributedCounter) + if !ok || odc == nil { + return nil, fmt.Errorf("invalid thing to merge: %T", mergeable) + } + + updated := distributedCounter{} + + for k, v := range odc { + if v > dc[k] { + dc[k] = v + updated[k] = v + } + } + + if len(updated) == 0 { + return nil, nil + } + return updated, nil +} + +func (dc distributedCounter) MergeContent() []string { + // return list of keys + out := []string(nil) + for k := range dc { + out = append(out, k) + } + return out +} + +func (dc distributedCounter) RemoveTombstones(limit time.Time) (_, _ int) { + // nothing to do + return +} + +func (dc distributedCounter) Clone() Mergeable { + out := make(distributedCounter, len(dc)) + for k, v := range dc { + out[k] = v + } + return out +} + +type distributedCounterCodec struct{} + +func (d distributedCounterCodec) CodecID() string { + return "distributedCounter" +} + +func (d distributedCounterCodec) Decode(b []byte) (interface{}, error) { + dec := gob.NewDecoder(bytes.NewBuffer(b)) + out := &distributedCounter{} + err := dec.Decode(out) + return *out, err +} + +func (d distributedCounterCodec) Encode(val interface{}) ([]byte, error) { + buf := bytes.Buffer{} + enc := gob.NewEncoder(&buf) + err := enc.Encode(val) + return buf.Bytes(), err +} + +var _ codec.Codec = &distributedCounterCodec{} + +func TestMultipleCodecs(t *testing.T) { + var cfg KVConfig + flagext.DefaultValues(&cfg) + cfg.TCPTransport = TCPTransportConfig{ + BindAddrs: []string{"localhost"}, + BindPort: 0, // randomize + } + + cfg.Codecs = []codec.Codec{ + dataCodec{}, + distributedCounterCodec{}, + } + + mkv1 := NewKV(cfg, log.NewNopLogger(), &dnsProviderMock{}, prometheus.NewPedanticRegistry()) + require.NoError(t, services.StartAndAwaitRunning(context.Background(), mkv1)) + defer services.StopAndAwaitTerminated(context.Background(), mkv1) //nolint:errcheck + + kv1, err := NewClient(mkv1, dataCodec{}) + require.NoError(t, err) + + kv2, err := NewClient(mkv1, distributedCounterCodec{}) + require.NoError(t, err) + + err = kv1.CAS(context.Background(), "data", func(in interface{}) (out interface{}, retry bool, err error) { + var d *data + if in != nil { + d = in.(*data) + } + if d == nil { + d = &data{} + } + if d.Members == nil { + d.Members = map[string]member{} + } + d.Members["test"] = member{ + Timestamp: time.Now().Unix(), + State: ACTIVE, + } + return d, true, nil + }) + require.NoError(t, err) + + err = kv2.CAS(context.Background(), "counter", func(in interface{}) (out interface{}, retry bool, err error) { + var dc distributedCounter + if in != nil { + dc = in.(distributedCounter) + } + if dc == nil { + dc = distributedCounter{} + } + dc["test"] = 5 + return dc, true, err + }) + require.NoError(t, err) + + // We will read values from second KV, which will join the first one + mkv2 := NewKV(cfg, log.NewNopLogger(), &dnsProviderMock{}, prometheus.NewPedanticRegistry()) + require.NoError(t, services.StartAndAwaitRunning(context.Background(), mkv2)) + defer services.StopAndAwaitTerminated(context.Background(), mkv2) //nolint:errcheck + + // Join second KV to first one. That will also trigger state transfer. + _, err = mkv2.JoinMembers([]string{fmt.Sprintf("127.0.0.1:%d", mkv1.GetListeningPort())}) + require.NoError(t, err) + + // Now read both values from second KV. It should have both values by now. + + // fetch directly from single KV, to see that both are stored in the same one + val, err := mkv2.Get("data", dataCodec{}) + require.NoError(t, err) + require.NotNil(t, val) + require.NotZero(t, val.(*data).Members["test"].Timestamp) + require.Equal(t, ACTIVE, val.(*data).Members["test"].State) + + val, err = mkv2.Get("counter", distributedCounterCodec{}) + require.NoError(t, err) + require.NotNil(t, val) + require.Equal(t, 5, val.(distributedCounter)["test"]) +} + +func TestGenerateRandomSuffix(t *testing.T) { + h1 := generateRandomSuffix(testLogger{}) + h2 := generateRandomSuffix(testLogger{}) + h3 := generateRandomSuffix(testLogger{}) + + require.NotEqual(t, h1, h2) + require.NotEqual(t, h2, h3) +} + +func TestRejoin(t *testing.T) { + ports, err := getFreePorts(2) + require.NoError(t, err) + + var cfg1 KVConfig + flagext.DefaultValues(&cfg1) + cfg1.TCPTransport = TCPTransportConfig{ + BindAddrs: []string{"localhost"}, + BindPort: ports[0], + } + + cfg1.RandomizeNodeName = true + cfg1.Codecs = []codec.Codec{dataCodec{}} + cfg1.AbortIfJoinFails = false + + cfg2 := cfg1 + cfg2.TCPTransport.BindPort = ports[1] + cfg2.JoinMembers = []string{fmt.Sprintf("localhost:%d", ports[0])} + cfg2.RejoinInterval = 1 * time.Second + + mkv1 := NewKV(cfg1, log.NewNopLogger(), &dnsProviderMock{}, prometheus.NewPedanticRegistry()) + require.NoError(t, services.StartAndAwaitRunning(context.Background(), mkv1)) + defer services.StopAndAwaitTerminated(context.Background(), mkv1) //nolint:errcheck + + mkv2 := NewKV(cfg2, log.NewNopLogger(), &dnsProviderMock{}, prometheus.NewPedanticRegistry()) + require.NoError(t, services.StartAndAwaitRunning(context.Background(), mkv2)) + defer services.StopAndAwaitTerminated(context.Background(), mkv2) //nolint:errcheck + + membersFunc := func() interface{} { + return mkv2.memberlist.NumMembers() + } + + poll(t, 5*time.Second, 2, membersFunc) + + // Shutdown first KV + require.NoError(t, services.StopAndAwaitTerminated(context.Background(), mkv1)) + + // Second KV should see single member now. + poll(t, 5*time.Second, 1, membersFunc) + + // Let's start first KV again. It is not configured to join the cluster, but KV2 is rejoining. + mkv1 = NewKV(cfg1, log.NewNopLogger(), &dnsProviderMock{}, prometheus.NewPedanticRegistry()) + require.NoError(t, services.StartAndAwaitRunning(context.Background(), mkv1)) + defer services.StopAndAwaitTerminated(context.Background(), mkv1) //nolint:errcheck + + poll(t, 5*time.Second, 2, membersFunc) +} + +func TestMessageBuffer(t *testing.T) { + buf := []message(nil) + size := 0 + + buf, size = addMessageToBuffer(buf, size, 100, message{Size: 50}) + assert.Len(t, buf, 1) + assert.Equal(t, size, 50) + + buf, size = addMessageToBuffer(buf, size, 100, message{Size: 50}) + assert.Len(t, buf, 2) + assert.Equal(t, size, 100) + + buf, size = addMessageToBuffer(buf, size, 100, message{Size: 25}) + assert.Len(t, buf, 2) + assert.Equal(t, size, 75) +} + +func TestNotifyMsgResendsOnlyChanges(t *testing.T) { + codec := dataCodec{} + + cfg := KVConfig{} + // We will be checking for number of messages in the broadcast queue, so make sure to use known retransmit factor. + cfg.RetransmitMult = 1 + cfg.Codecs = append(cfg.Codecs, codec) + + kv := NewKV(cfg, log.NewNopLogger(), &dnsProviderMock{}, prometheus.NewPedanticRegistry()) + require.NoError(t, services.StartAndAwaitRunning(context.Background(), kv)) + defer services.StopAndAwaitTerminated(context.Background(), kv) //nolint:errcheck + + client, err := NewClient(kv, codec) + require.NoError(t, err) + + // No broadcast messages from KV at the beginning. + require.Equal(t, 0, len(kv.GetBroadcasts(0, math.MaxInt32))) + + now := time.Now() + + require.NoError(t, client.CAS(context.Background(), key, func(in interface{}) (out interface{}, retry bool, err error) { + d := getOrCreateData(in) + d.Members["a"] = member{Timestamp: now.Unix(), State: JOINING} + d.Members["b"] = member{Timestamp: now.Unix(), State: JOINING} + return d, true, nil + })) + + // Check that new instance is broadcasted about just once. + assert.Equal(t, 1, len(kv.GetBroadcasts(0, math.MaxInt32))) + require.Equal(t, 0, len(kv.GetBroadcasts(0, math.MaxInt32))) + + kv.NotifyMsg(marshalKeyValuePair(t, key, codec, &data{ + Members: map[string]member{ + "a": {Timestamp: now.Unix() - 5, State: ACTIVE}, + "b": {Timestamp: now.Unix() + 5, State: ACTIVE, Tokens: []uint32{1, 2, 3}}, + "c": {Timestamp: now.Unix(), State: ACTIVE}, + }})) + + // Check two things here: + // 1) state of value in KV store + // 2) broadcast message only has changed members + + d := getData(t, client, key) + assert.Equal(t, &data{ + Members: map[string]member{ + "a": {Timestamp: now.Unix(), State: JOINING, Tokens: []uint32{}}, // unchanged, timestamp too old + "b": {Timestamp: now.Unix() + 5, State: ACTIVE, Tokens: []uint32{1, 2, 3}}, + "c": {Timestamp: now.Unix(), State: ACTIVE, Tokens: []uint32{}}, + }}, d) + + bs := kv.GetBroadcasts(0, math.MaxInt32) + require.Equal(t, 1, len(bs)) + + d = decodeDataFromMarshalledKeyValuePair(t, bs[0], key, codec) + assert.Equal(t, &data{ + Members: map[string]member{ + // "a" is not here, because it wasn't changed by the message. + "b": {Timestamp: now.Unix() + 5, State: ACTIVE, Tokens: []uint32{1, 2, 3}}, + "c": {Timestamp: now.Unix(), State: ACTIVE}, + }}, d) +} + +func TestSendingOldTombstoneShouldNotForwardMessage(t *testing.T) { + codec := dataCodec{} + + cfg := KVConfig{} + // We will be checking for number of messages in the broadcast queue, so make sure to use known retransmit factor. + cfg.RetransmitMult = 1 + cfg.LeftIngestersTimeout = 5 * time.Minute + cfg.Codecs = append(cfg.Codecs, codec) + + kv := NewKV(cfg, log.NewNopLogger(), &dnsProviderMock{}, prometheus.NewPedanticRegistry()) + require.NoError(t, services.StartAndAwaitRunning(context.Background(), kv)) + defer services.StopAndAwaitTerminated(context.Background(), kv) //nolint:errcheck + + client, err := NewClient(kv, codec) + require.NoError(t, err) + + now := time.Now() + + // No broadcast messages from KV at the beginning. + require.Equal(t, 0, len(kv.GetBroadcasts(0, math.MaxInt32))) + + for _, tc := range []struct { + name string + valueBeforeSend *data // value in KV store before sending messsage + msgToSend *data + valueAfterSend *data // value in KV store after sending message + broadcastMessage *data // broadcasted change, if not nil + }{ + // These tests follow each other (end state of KV in state is starting point in the next state). + { + name: "old tombstone, empty KV", + valueBeforeSend: nil, + msgToSend: &data{Members: map[string]member{"instance": {Timestamp: now.Unix() - int64(2*cfg.LeftIngestersTimeout.Seconds()), State: LEFT}}}, + valueAfterSend: nil, // no change to KV + broadcastMessage: nil, // no new message + }, + + { + name: "recent tombstone, empty KV", + valueBeforeSend: nil, + msgToSend: &data{Members: map[string]member{"instance": {Timestamp: now.Unix(), State: LEFT}}}, + broadcastMessage: &data{Members: map[string]member{"instance": {Timestamp: now.Unix(), State: LEFT}}}, + valueAfterSend: &data{Members: map[string]member{"instance": {Timestamp: now.Unix(), State: LEFT, Tokens: []uint32{}}}}, + }, + + { + name: "old tombstone, KV contains tombstone already", + valueBeforeSend: &data{Members: map[string]member{"instance": {Timestamp: now.Unix(), State: LEFT, Tokens: []uint32{}}}}, + msgToSend: &data{Members: map[string]member{"instance": {Timestamp: now.Unix() - 10, State: LEFT}}}, + broadcastMessage: nil, + valueAfterSend: &data{Members: map[string]member{"instance": {Timestamp: now.Unix(), State: LEFT, Tokens: []uint32{}}}}, + }, + + { + name: "fresh tombstone, KV contains tombstone already", + valueBeforeSend: &data{Members: map[string]member{"instance": {Timestamp: now.Unix(), State: LEFT, Tokens: []uint32{}}}}, + msgToSend: &data{Members: map[string]member{"instance": {Timestamp: now.Unix() + 10, State: LEFT}}}, + broadcastMessage: &data{Members: map[string]member{"instance": {Timestamp: now.Unix() + 10, State: LEFT}}}, + valueAfterSend: &data{Members: map[string]member{"instance": {Timestamp: now.Unix() + 10, State: LEFT, Tokens: []uint32{}}}}, + }, + } { + t.Run(tc.name, func(t *testing.T) { + d := getData(t, client, key) + if tc.valueBeforeSend == nil { + require.True(t, d == nil || len(d.Members) == 0) + } else { + require.Equal(t, tc.valueBeforeSend, d, "valueBeforeSend") + } + + kv.NotifyMsg(marshalKeyValuePair(t, key, codec, tc.msgToSend)) + + bs := kv.GetBroadcasts(0, math.MaxInt32) + if tc.broadcastMessage == nil { + require.Equal(t, 0, len(bs), "expected no broadcast message") + } else { + require.Equal(t, 1, len(bs), "expected broadcast message") + require.Equal(t, tc.broadcastMessage, decodeDataFromMarshalledKeyValuePair(t, bs[0], key, codec)) + } + + d = getData(t, client, key) + if tc.valueAfterSend == nil { + require.True(t, d == nil || len(d.Members) == 0) + } else { + require.Equal(t, tc.valueAfterSend, d, "valueAfterSend") + } + }) + } +} + +func decodeDataFromMarshalledKeyValuePair(t *testing.T, marshalledKVP []byte, key string, codec dataCodec) *data { + kvp := KeyValuePair{} + require.NoError(t, kvp.Unmarshal(marshalledKVP)) + require.Equal(t, key, kvp.Key) + + val, err := codec.Decode(kvp.Value) + require.NoError(t, err) + d, ok := val.(*data) + require.True(t, ok) + return d +} + +func marshalKeyValuePair(t *testing.T, key string, codec codec.Codec, value interface{}) []byte { + data, err := codec.Encode(value) + require.NoError(t, err) + + kvp := KeyValuePair{Key: key, Codec: codec.CodecID(), Value: data} + data, err = kvp.Marshal() + require.NoError(t, err) + return data +} + +func getOrCreateData(in interface{}) *data { + // Modify value that was passed as a parameter. + // Client takes care of concurrent modifications. + r, ok := in.(*data) + if !ok || r == nil { + return &data{Members: map[string]member{}} + } + return r +} + +// poll repeatedly evaluates condition until we either timeout, or it succeeds. +func poll(t testing.TB, d time.Duration, want interface{}, have func() interface{}) { + t.Helper() + + deadline := time.Now().Add(d) + for { + if time.Now().After(deadline) { + break + } + if reflect.DeepEqual(want, have()) { + return + } + time.Sleep(d / 100) + } + h := have() + if !reflect.DeepEqual(want, h) { + t.Fatalf("expected %v, got %v", want, h) + } +} + +type testLogger struct { +} + +func (l testLogger) Log(keyvals ...interface{}) error { + return nil +} + +type dnsProviderMock struct { + resolved []string +} + +func (p *dnsProviderMock) Resolve(ctx context.Context, addrs []string) error { + p.resolved = addrs + return nil +} + +func (p dnsProviderMock) Addresses() []string { + return p.resolved +} diff --git a/vendor/github.com/grafana/dskit/kv/memberlist/memberlist_logger.go b/pkg/ring/kv/memberlist/memberlist_logger.go similarity index 100% rename from vendor/github.com/grafana/dskit/kv/memberlist/memberlist_logger.go rename to pkg/ring/kv/memberlist/memberlist_logger.go diff --git a/vendor/github.com/grafana/dskit/kv/memberlist/mergeable.go b/pkg/ring/kv/memberlist/mergeable.go similarity index 100% rename from vendor/github.com/grafana/dskit/kv/memberlist/mergeable.go rename to pkg/ring/kv/memberlist/mergeable.go diff --git a/vendor/github.com/grafana/dskit/kv/memberlist/metrics.go b/pkg/ring/kv/memberlist/metrics.go similarity index 100% rename from vendor/github.com/grafana/dskit/kv/memberlist/metrics.go rename to pkg/ring/kv/memberlist/metrics.go diff --git a/vendor/github.com/grafana/dskit/kv/memberlist/tcp_transport.go b/pkg/ring/kv/memberlist/tcp_transport.go similarity index 99% rename from vendor/github.com/grafana/dskit/kv/memberlist/tcp_transport.go rename to pkg/ring/kv/memberlist/tcp_transport.go index 4265a3b223..9fb8608c6f 100644 --- a/vendor/github.com/grafana/dskit/kv/memberlist/tcp_transport.go +++ b/pkg/ring/kv/memberlist/tcp_transport.go @@ -14,6 +14,7 @@ import ( "github.com/go-kit/log" "github.com/go-kit/log/level" + "github.com/grafana/dskit/flagext" "github.com/hashicorp/go-sockaddr" "github.com/hashicorp/memberlist" "github.com/pkg/errors" @@ -21,8 +22,7 @@ import ( "github.com/prometheus/client_golang/prometheus/promauto" "go.uber.org/atomic" - dstls "github.com/grafana/dskit/crypto/tls" - "github.com/grafana/dskit/flagext" + cortextls "github.com/cortexproject/cortex/pkg/util/tls" ) type messageType uint8 @@ -57,8 +57,8 @@ type TCPTransportConfig struct { MetricsRegisterer prometheus.Registerer `yaml:"-"` MetricsNamespace string `yaml:"-"` - TLSEnabled bool `yaml:"tls_enabled"` - TLS dstls.ClientConfig `yaml:",inline"` + TLSEnabled bool `yaml:"tls_enabled"` + TLS cortextls.ClientConfig `yaml:",inline"` } func (cfg *TCPTransportConfig) RegisterFlags(f *flag.FlagSet) { diff --git a/pkg/ring/kv/memberlist/tcp_transport_test.go b/pkg/ring/kv/memberlist/tcp_transport_test.go new file mode 100644 index 0000000000..37fafc6441 --- /dev/null +++ b/pkg/ring/kv/memberlist/tcp_transport_test.go @@ -0,0 +1,60 @@ +package memberlist + +import ( + "testing" + + "github.com/go-kit/log" + "github.com/grafana/dskit/concurrency" + "github.com/grafana/dskit/flagext" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestTCPTransport_WriteTo_ShouldNotLogAsWarningExpectedFailures(t *testing.T) { + tests := map[string]struct { + setup func(t *testing.T, cfg *TCPTransportConfig) + remoteAddr string + expectedLogs string + unexpectedLogs string + }{ + "should not log 'connection refused' by default": { + remoteAddr: "localhost:12345", + unexpectedLogs: "connection refused", + }, + "should log 'connection refused' if debug log level is enabled": { + setup: func(t *testing.T, cfg *TCPTransportConfig) { + cfg.TransportDebug = true + }, + remoteAddr: "localhost:12345", + expectedLogs: "connection refused", + }, + } + + for testName, testData := range tests { + t.Run(testName, func(t *testing.T) { + logs := &concurrency.SyncBuffer{} + logger := log.NewLogfmtLogger(logs) + + cfg := TCPTransportConfig{} + flagext.DefaultValues(&cfg) + cfg.BindAddrs = []string{"localhost"} + cfg.BindPort = 0 + if testData.setup != nil { + testData.setup(t, &cfg) + } + + transport, err := NewTCPTransport(cfg, logger) + require.NoError(t, err) + + _, err = transport.WriteTo([]byte("test"), testData.remoteAddr) + require.NoError(t, err) + + if testData.expectedLogs != "" { + assert.Contains(t, logs.String(), testData.expectedLogs) + } + if testData.unexpectedLogs != "" { + assert.NotContains(t, logs.String(), testData.unexpectedLogs) + } + }) + } +} diff --git a/vendor/github.com/grafana/dskit/kv/metrics.go b/pkg/ring/kv/metrics.go similarity index 100% rename from vendor/github.com/grafana/dskit/kv/metrics.go rename to pkg/ring/kv/metrics.go diff --git a/vendor/github.com/grafana/dskit/kv/mock.go b/pkg/ring/kv/mock.go similarity index 100% rename from vendor/github.com/grafana/dskit/kv/mock.go rename to pkg/ring/kv/mock.go diff --git a/vendor/github.com/grafana/dskit/kv/multi.go b/pkg/ring/kv/multi.go similarity index 100% rename from vendor/github.com/grafana/dskit/kv/multi.go rename to pkg/ring/kv/multi.go diff --git a/pkg/ring/kv/multi_test.go b/pkg/ring/kv/multi_test.go new file mode 100644 index 0000000000..9385ed5ff8 --- /dev/null +++ b/pkg/ring/kv/multi_test.go @@ -0,0 +1,32 @@ +package kv + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "gopkg.in/yaml.v2" +) + +func boolPtr(b bool) *bool { + return &b +} + +func TestMultiRuntimeConfigWithVariousEnabledValues(t *testing.T) { + testcases := map[string]struct { + yaml string + expected *bool + }{ + "nil": {"primary: test", nil}, + "true": {"primary: test\nmirror_enabled: true", boolPtr(true)}, + "false": {"mirror_enabled: false", boolPtr(false)}, + } + + for name, tc := range testcases { + t.Run(name, func(t *testing.T) { + c := MultiRuntimeConfig{} + err := yaml.Unmarshal([]byte(tc.yaml), &c) + assert.NoError(t, err, tc.yaml) + assert.Equal(t, tc.expected, c.Mirroring, tc.yaml) + }) + } +} diff --git a/vendor/github.com/grafana/dskit/kv/prefix.go b/pkg/ring/kv/prefix.go similarity index 100% rename from vendor/github.com/grafana/dskit/kv/prefix.go rename to pkg/ring/kv/prefix.go diff --git a/pkg/ring/lifecycler.go b/pkg/ring/lifecycler.go index 6619a23795..3cf48dba5b 100644 --- a/pkg/ring/lifecycler.go +++ b/pkg/ring/lifecycler.go @@ -12,12 +12,13 @@ import ( "github.com/go-kit/log" "github.com/go-kit/log/level" "github.com/grafana/dskit/flagext" - "github.com/grafana/dskit/kv" "github.com/grafana/dskit/services" "github.com/pkg/errors" perrors "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" "go.uber.org/atomic" + + "github.com/cortexproject/cortex/pkg/ring/kv" ) // LifecyclerConfig is the config to build a Lifecycler. diff --git a/pkg/ring/lifecycler_test.go b/pkg/ring/lifecycler_test.go index 66cc371911..789c3760d3 100644 --- a/pkg/ring/lifecycler_test.go +++ b/pkg/ring/lifecycler_test.go @@ -9,12 +9,13 @@ import ( "github.com/go-kit/log" "github.com/grafana/dskit/flagext" - "github.com/grafana/dskit/kv/consul" "github.com/grafana/dskit/services" "github.com/grafana/dskit/test" "github.com/pkg/errors" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + + "github.com/cortexproject/cortex/pkg/ring/kv/consul" ) const ( diff --git a/pkg/ring/model.go b/pkg/ring/model.go index 75c5f0588f..cbfe4f962c 100644 --- a/pkg/ring/model.go +++ b/pkg/ring/model.go @@ -8,8 +8,9 @@ import ( "time" "github.com/gogo/protobuf/proto" - "github.com/grafana/dskit/kv/codec" - "github.com/grafana/dskit/kv/memberlist" + + "github.com/cortexproject/cortex/pkg/ring/kv/codec" + "github.com/cortexproject/cortex/pkg/ring/kv/memberlist" ) // ByAddr is a sortable list of InstanceDesc. diff --git a/pkg/ring/ring.go b/pkg/ring/ring.go index a0784d954f..c0e29248f2 100644 --- a/pkg/ring/ring.go +++ b/pkg/ring/ring.go @@ -14,12 +14,12 @@ import ( "github.com/go-kit/log" "github.com/go-kit/log/level" "github.com/grafana/dskit/flagext" - "github.com/grafana/dskit/kv" "github.com/grafana/dskit/services" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" + "github.com/cortexproject/cortex/pkg/ring/kv" shardUtil "github.com/cortexproject/cortex/pkg/ring/shard" "github.com/cortexproject/cortex/pkg/ring/util" utilmath "github.com/cortexproject/cortex/pkg/util/math" diff --git a/pkg/ring/ring_test.go b/pkg/ring/ring_test.go index 684c712dc9..322f44ab7b 100644 --- a/pkg/ring/ring_test.go +++ b/pkg/ring/ring_test.go @@ -14,8 +14,6 @@ import ( "github.com/go-kit/log" "github.com/grafana/dskit/flagext" - "github.com/grafana/dskit/kv" - "github.com/grafana/dskit/kv/consul" "github.com/grafana/dskit/services" "github.com/grafana/dskit/test" "github.com/prometheus/client_golang/prometheus" @@ -23,6 +21,8 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "github.com/cortexproject/cortex/pkg/ring/kv" + "github.com/cortexproject/cortex/pkg/ring/kv/consul" "github.com/cortexproject/cortex/pkg/ring/shard" "github.com/cortexproject/cortex/pkg/ring/util" ) diff --git a/pkg/ruler/lifecycle_test.go b/pkg/ruler/lifecycle_test.go index 83bf396a94..6898bbd519 100644 --- a/pkg/ruler/lifecycle_test.go +++ b/pkg/ruler/lifecycle_test.go @@ -7,13 +7,13 @@ import ( "time" "github.com/go-kit/log" - "github.com/grafana/dskit/kv" - "github.com/grafana/dskit/kv/consul" "github.com/grafana/dskit/services" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/cortexproject/cortex/pkg/ring" + "github.com/cortexproject/cortex/pkg/ring/kv" + "github.com/cortexproject/cortex/pkg/ring/kv/consul" "github.com/cortexproject/cortex/pkg/util/test" ) diff --git a/pkg/ruler/ruler.go b/pkg/ruler/ruler.go index b6e8df9940..51e1b4d804 100644 --- a/pkg/ruler/ruler.go +++ b/pkg/ruler/ruler.go @@ -15,7 +15,6 @@ import ( "github.com/go-kit/log" "github.com/go-kit/log/level" "github.com/grafana/dskit/flagext" - "github.com/grafana/dskit/kv" "github.com/grafana/dskit/services" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" @@ -29,6 +28,7 @@ import ( "github.com/cortexproject/cortex/pkg/cortexpb" "github.com/cortexproject/cortex/pkg/ring" + "github.com/cortexproject/cortex/pkg/ring/kv" "github.com/cortexproject/cortex/pkg/ruler/rulespb" "github.com/cortexproject/cortex/pkg/ruler/rulestore" "github.com/cortexproject/cortex/pkg/tenant" diff --git a/pkg/ruler/ruler_ring.go b/pkg/ruler/ruler_ring.go index 443dfae922..676a1c3150 100644 --- a/pkg/ruler/ruler_ring.go +++ b/pkg/ruler/ruler_ring.go @@ -8,9 +8,9 @@ import ( "github.com/go-kit/log" "github.com/grafana/dskit/flagext" - "github.com/grafana/dskit/kv" "github.com/cortexproject/cortex/pkg/ring" + "github.com/cortexproject/cortex/pkg/ring/kv" ) const ( diff --git a/pkg/ruler/ruler_test.go b/pkg/ruler/ruler_test.go index 3efc15e083..a870eddc50 100644 --- a/pkg/ruler/ruler_test.go +++ b/pkg/ruler/ruler_test.go @@ -21,8 +21,6 @@ import ( "github.com/go-kit/log/level" "github.com/gorilla/mux" "github.com/grafana/dskit/flagext" - "github.com/grafana/dskit/kv" - "github.com/grafana/dskit/kv/consul" "github.com/grafana/dskit/services" "github.com/prometheus/client_golang/prometheus" prom_testutil "github.com/prometheus/client_golang/prometheus/testutil" @@ -46,6 +44,8 @@ import ( "github.com/cortexproject/cortex/pkg/cortexpb" "github.com/cortexproject/cortex/pkg/querier" "github.com/cortexproject/cortex/pkg/ring" + "github.com/cortexproject/cortex/pkg/ring/kv" + "github.com/cortexproject/cortex/pkg/ring/kv/consul" "github.com/cortexproject/cortex/pkg/ruler/rulespb" "github.com/cortexproject/cortex/pkg/ruler/rulestore" "github.com/cortexproject/cortex/pkg/ruler/rulestore/objectclient" diff --git a/pkg/storegateway/gateway.go b/pkg/storegateway/gateway.go index 0f45e461fb..1abc3386ad 100644 --- a/pkg/storegateway/gateway.go +++ b/pkg/storegateway/gateway.go @@ -9,7 +9,6 @@ import ( "github.com/go-kit/log" "github.com/go-kit/log/level" - "github.com/grafana/dskit/kv" "github.com/grafana/dskit/services" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" @@ -20,6 +19,7 @@ import ( "github.com/weaveworks/common/logging" "github.com/cortexproject/cortex/pkg/ring" + "github.com/cortexproject/cortex/pkg/ring/kv" "github.com/cortexproject/cortex/pkg/storage/bucket" cortex_tsdb "github.com/cortexproject/cortex/pkg/storage/tsdb" "github.com/cortexproject/cortex/pkg/storegateway/storegatewaypb" diff --git a/pkg/storegateway/gateway_ring.go b/pkg/storegateway/gateway_ring.go index c57a2f19a3..0a8ded19ea 100644 --- a/pkg/storegateway/gateway_ring.go +++ b/pkg/storegateway/gateway_ring.go @@ -9,9 +9,9 @@ import ( "github.com/go-kit/log" "github.com/go-kit/log/level" "github.com/grafana/dskit/flagext" - "github.com/grafana/dskit/kv" "github.com/cortexproject/cortex/pkg/ring" + "github.com/cortexproject/cortex/pkg/ring/kv" util_log "github.com/cortexproject/cortex/pkg/util/log" ) diff --git a/pkg/storegateway/gateway_test.go b/pkg/storegateway/gateway_test.go index 1ef229c4ac..9f47af3b94 100644 --- a/pkg/storegateway/gateway_test.go +++ b/pkg/storegateway/gateway_test.go @@ -18,7 +18,6 @@ import ( "github.com/go-kit/log" "github.com/grafana/dskit/flagext" - "github.com/grafana/dskit/kv/consul" "github.com/grafana/dskit/services" "github.com/oklog/ulid" "github.com/pkg/errors" @@ -38,6 +37,7 @@ import ( "google.golang.org/grpc/status" "github.com/cortexproject/cortex/pkg/ring" + "github.com/cortexproject/cortex/pkg/ring/kv/consul" "github.com/cortexproject/cortex/pkg/storage/bucket" "github.com/cortexproject/cortex/pkg/storage/bucket/filesystem" cortex_tsdb "github.com/cortexproject/cortex/pkg/storage/tsdb" diff --git a/pkg/storegateway/sharding_strategy_test.go b/pkg/storegateway/sharding_strategy_test.go index 207c213ef5..5b3e712379 100644 --- a/pkg/storegateway/sharding_strategy_test.go +++ b/pkg/storegateway/sharding_strategy_test.go @@ -6,7 +6,6 @@ import ( "time" "github.com/go-kit/log" - "github.com/grafana/dskit/kv/consul" "github.com/grafana/dskit/services" "github.com/oklog/ulid" "github.com/prometheus/client_golang/prometheus" @@ -17,6 +16,7 @@ import ( "github.com/thanos-io/thanos/pkg/extprom" "github.com/cortexproject/cortex/pkg/ring" + "github.com/cortexproject/cortex/pkg/ring/kv/consul" cortex_tsdb "github.com/cortexproject/cortex/pkg/storage/tsdb" ) diff --git a/tools/doc-generator/main.go b/tools/doc-generator/main.go index a6fb85d3d6..946760de96 100644 --- a/tools/doc-generator/main.go +++ b/tools/doc-generator/main.go @@ -9,9 +9,6 @@ import ( "strings" "text/template" - "github.com/grafana/dskit/kv/consul" - "github.com/grafana/dskit/kv/etcd" - "github.com/grafana/dskit/kv/memberlist" "github.com/weaveworks/common/server" "github.com/cortexproject/cortex/pkg/alertmanager" @@ -32,6 +29,9 @@ import ( "github.com/cortexproject/cortex/pkg/querier" "github.com/cortexproject/cortex/pkg/querier/queryrange" querier_worker "github.com/cortexproject/cortex/pkg/querier/worker" + "github.com/cortexproject/cortex/pkg/ring/kv/consul" + "github.com/cortexproject/cortex/pkg/ring/kv/etcd" + "github.com/cortexproject/cortex/pkg/ring/kv/memberlist" "github.com/cortexproject/cortex/pkg/ruler" "github.com/cortexproject/cortex/pkg/ruler/rulestore" "github.com/cortexproject/cortex/pkg/storage/bucket/s3" diff --git a/vendor/github.com/google/btree/btree.go b/vendor/github.com/google/btree/btree.go index 6ff062f9bb..b83acdbc6d 100644 --- a/vendor/github.com/google/btree/btree.go +++ b/vendor/github.com/google/btree/btree.go @@ -796,7 +796,7 @@ func (t *BTree) DescendLessOrEqual(pivot Item, iterator ItemIterator) { } // DescendGreaterThan calls the iterator for every value in the tree within -// the range (pivot, last], until iterator returns false. +// the range [last, pivot), until iterator returns false. func (t *BTree) DescendGreaterThan(pivot Item, iterator ItemIterator) { if t.root == nil { return diff --git a/vendor/github.com/google/btree/go.mod b/vendor/github.com/google/btree/go.mod new file mode 100644 index 0000000000..fe4d5ca17b --- /dev/null +++ b/vendor/github.com/google/btree/go.mod @@ -0,0 +1,17 @@ +// Copyright 2014 Google Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +module github.com/google/btree + +go 1.12 diff --git a/vendor/github.com/grafana/dskit/concurrency/buffer.go b/vendor/github.com/grafana/dskit/concurrency/buffer.go new file mode 100644 index 0000000000..623b9a7076 --- /dev/null +++ b/vendor/github.com/grafana/dskit/concurrency/buffer.go @@ -0,0 +1,26 @@ +package concurrency + +import ( + "bytes" + "sync" +) + +// SyncBuffer is a io.writer implementation with atomic writes. It only keeps data in memory. +type SyncBuffer struct { + mu sync.Mutex + buf bytes.Buffer +} + +func (sb *SyncBuffer) Write(p []byte) (n int, err error) { + sb.mu.Lock() + defer sb.mu.Unlock() + + return sb.buf.Write(p) +} + +func (sb *SyncBuffer) String() string { + sb.mu.Lock() + defer sb.mu.Unlock() + + return sb.buf.String() +} diff --git a/vendor/github.com/grafana/dskit/concurrency/runner.go b/vendor/github.com/grafana/dskit/concurrency/runner.go new file mode 100644 index 0000000000..a6740f3ac9 --- /dev/null +++ b/vendor/github.com/grafana/dskit/concurrency/runner.go @@ -0,0 +1,106 @@ +package concurrency + +import ( + "context" + "sync" + + "golang.org/x/sync/errgroup" + + "github.com/grafana/dskit/internal/math" + "github.com/grafana/dskit/multierror" +) + +// ForEachUser runs the provided userFunc for each userIDs up to concurrency concurrent workers. +// In case userFunc returns error, it will continue to process remaining users but returns an +// error with all errors userFunc has returned. +func ForEachUser(ctx context.Context, userIDs []string, concurrency int, userFunc func(ctx context.Context, userID string) error) error { + if len(userIDs) == 0 { + return nil + } + + // Push all jobs to a channel. + ch := make(chan string, len(userIDs)) + for _, userID := range userIDs { + ch <- userID + } + close(ch) + + // Keep track of all errors occurred. + errs := multierror.MultiError{} + errsMx := sync.Mutex{} + + wg := sync.WaitGroup{} + for ix := 0; ix < math.Min(concurrency, len(userIDs)); ix++ { + wg.Add(1) + go func() { + defer wg.Done() + + for userID := range ch { + // Ensure the context has not been canceled (ie. shutdown has been triggered). + if ctx.Err() != nil { + break + } + + if err := userFunc(ctx, userID); err != nil { + errsMx.Lock() + errs.Add(err) + errsMx.Unlock() + } + } + }() + } + + // wait for ongoing workers to finish. + wg.Wait() + + if ctx.Err() != nil { + return ctx.Err() + } + + return errs.Err() +} + +// ForEach runs the provided jobFunc for each job up to concurrency concurrent workers. +// The execution breaks on first error encountered. +func ForEach(ctx context.Context, jobs []interface{}, concurrency int, jobFunc func(ctx context.Context, job interface{}) error) error { + if len(jobs) == 0 { + return nil + } + + // Push all jobs to a channel. + ch := make(chan interface{}, len(jobs)) + for _, job := range jobs { + ch <- job + } + close(ch) + + // Start workers to process jobs. + g, ctx := errgroup.WithContext(ctx) + for ix := 0; ix < math.Min(concurrency, len(jobs)); ix++ { + g.Go(func() error { + for job := range ch { + if err := ctx.Err(); err != nil { + return err + } + + if err := jobFunc(ctx, job); err != nil { + return err + } + } + + return nil + }) + } + + // Wait until done (or context has canceled). + return g.Wait() +} + +// CreateJobsFromStrings is an utility to create jobs from an slice of strings. +func CreateJobsFromStrings(values []string) []interface{} { + jobs := make([]interface{}, len(values)) + for i := 0; i < len(values); i++ { + jobs[i] = values[i] + } + return jobs +} diff --git a/vendor/github.com/grafana/dskit/crypto/tls/tls.go b/vendor/github.com/grafana/dskit/crypto/tls/tls.go deleted file mode 100644 index a6fa46f073..0000000000 --- a/vendor/github.com/grafana/dskit/crypto/tls/tls.go +++ /dev/null @@ -1,87 +0,0 @@ -package tls - -import ( - "crypto/tls" - "crypto/x509" - "flag" - "os" - - "github.com/pkg/errors" - "google.golang.org/grpc" - "google.golang.org/grpc/credentials" -) - -// ClientConfig is the config for client TLS. -type ClientConfig struct { - CertPath string `yaml:"tls_cert_path"` - KeyPath string `yaml:"tls_key_path"` - CAPath string `yaml:"tls_ca_path"` - ServerName string `yaml:"tls_server_name"` - InsecureSkipVerify bool `yaml:"tls_insecure_skip_verify"` -} - -var ( - errKeyMissing = errors.New("certificate given but no key configured") - errCertMissing = errors.New("key given but no certificate configured") -) - -// RegisterFlagsWithPrefix registers flags with prefix. -func (cfg *ClientConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { - f.StringVar(&cfg.CertPath, prefix+".tls-cert-path", "", "Path to the client certificate file, which will be used for authenticating with the server. Also requires the key path to be configured.") - f.StringVar(&cfg.KeyPath, prefix+".tls-key-path", "", "Path to the key file for the client certificate. Also requires the client certificate to be configured.") - f.StringVar(&cfg.CAPath, prefix+".tls-ca-path", "", "Path to the CA certificates file to validate server certificate against. If not set, the host's root CA certificates are used.") - f.StringVar(&cfg.ServerName, prefix+".tls-server-name", "", "Override the expected name on the server certificate.") - f.BoolVar(&cfg.InsecureSkipVerify, prefix+".tls-insecure-skip-verify", false, "Skip validating server certificate.") -} - -// GetTLSConfig initialises tls.Config from config options -func (cfg *ClientConfig) GetTLSConfig() (*tls.Config, error) { - config := &tls.Config{ - InsecureSkipVerify: cfg.InsecureSkipVerify, - ServerName: cfg.ServerName, - } - - // read ca certificates - if cfg.CAPath != "" { - var caCertPool *x509.CertPool - caCert, err := os.ReadFile(cfg.CAPath) - if err != nil { - return nil, errors.Wrapf(err, "error loading ca cert: %s", cfg.CAPath) - } - caCertPool = x509.NewCertPool() - caCertPool.AppendCertsFromPEM(caCert) - - config.RootCAs = caCertPool - } - - // read client certificate - if cfg.CertPath != "" || cfg.KeyPath != "" { - if cfg.CertPath == "" { - return nil, errCertMissing - } - if cfg.KeyPath == "" { - return nil, errKeyMissing - } - clientCert, err := tls.LoadX509KeyPair(cfg.CertPath, cfg.KeyPath) - if err != nil { - return nil, errors.Wrapf(err, "failed to load TLS certificate %s,%s", cfg.CertPath, cfg.KeyPath) - } - config.Certificates = []tls.Certificate{clientCert} - } - - return config, nil -} - -// GetGRPCDialOptions creates GRPC DialOptions for TLS -func (cfg *ClientConfig) GetGRPCDialOptions(enabled bool) ([]grpc.DialOption, error) { - if !enabled { - return []grpc.DialOption{grpc.WithInsecure()}, nil - } - - tlsConfig, err := cfg.GetTLSConfig() - if err != nil { - return nil, errors.Wrap(err, "error creating grpc dial options") - } - - return []grpc.DialOption{grpc.WithTransportCredentials(credentials.NewTLS(tlsConfig))}, nil -} diff --git a/vendor/github.com/grafana/dskit/internal/math/math.go b/vendor/github.com/grafana/dskit/internal/math/math.go new file mode 100644 index 0000000000..9d6422e50e --- /dev/null +++ b/vendor/github.com/grafana/dskit/internal/math/math.go @@ -0,0 +1,9 @@ +package math + +// Min returns the minimum of two ints. +func Min(a, b int) int { + if a < b { + return a + } + return b +} diff --git a/vendor/modules.txt b/vendor/modules.txt index 03b5e7aad7..0131cfaaec 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -63,6 +63,7 @@ github.com/alicebob/miniredis/v2 github.com/alicebob/miniredis/v2/geohash github.com/alicebob/miniredis/v2/server # github.com/armon/go-metrics v0.3.9 +## explicit github.com/armon/go-metrics github.com/armon/go-metrics/prometheus # github.com/asaskevich/govalidator v0.0.0-20200907205600-7a23bdc65eef @@ -335,7 +336,8 @@ github.com/golang/protobuf/ptypes/wrappers # github.com/golang/snappy v0.0.4 ## explicit github.com/golang/snappy -# github.com/google/btree v1.0.0 +# github.com/google/btree v1.0.1 +## explicit github.com/google/btree # github.com/google/go-cmp v0.5.6 github.com/google/go-cmp/cmp @@ -358,13 +360,9 @@ github.com/gorilla/mux # github.com/grafana/dskit v0.0.0-20220105080720-01ce9286d7d5 ## explicit github.com/grafana/dskit/backoff -github.com/grafana/dskit/crypto/tls +github.com/grafana/dskit/concurrency github.com/grafana/dskit/flagext -github.com/grafana/dskit/kv -github.com/grafana/dskit/kv/codec -github.com/grafana/dskit/kv/consul -github.com/grafana/dskit/kv/etcd -github.com/grafana/dskit/kv/memberlist +github.com/grafana/dskit/internal/math github.com/grafana/dskit/middleware github.com/grafana/dskit/multierror github.com/grafana/dskit/runutil @@ -382,10 +380,12 @@ github.com/grpc-ecosystem/go-grpc-middleware/v2/util/metautils # github.com/hailocab/go-hostpool v0.0.0-20160125115350-e80d13ce29ed github.com/hailocab/go-hostpool # github.com/hashicorp/consul/api v1.11.0 +## explicit github.com/hashicorp/consul/api # github.com/hashicorp/errwrap v1.0.0 github.com/hashicorp/errwrap # github.com/hashicorp/go-cleanhttp v0.5.2 +## explicit github.com/hashicorp/go-cleanhttp # github.com/hashicorp/go-hclog v0.16.2 github.com/hashicorp/go-hclog @@ -398,11 +398,13 @@ github.com/hashicorp/go-multierror # github.com/hashicorp/go-rootcerts v1.0.2 github.com/hashicorp/go-rootcerts # github.com/hashicorp/go-sockaddr v1.0.2 +## explicit github.com/hashicorp/go-sockaddr # github.com/hashicorp/golang-lru v0.5.4 github.com/hashicorp/golang-lru github.com/hashicorp/golang-lru/simplelru # github.com/hashicorp/memberlist v0.2.4 +## explicit github.com/hashicorp/memberlist # github.com/hashicorp/serf v0.9.5 github.com/hashicorp/serf/coordinate @@ -747,8 +749,10 @@ github.com/yuin/gopher-lua/pm ## explicit go.etcd.io/bbolt # go.etcd.io/etcd v3.3.25+incompatible +## explicit go.etcd.io/etcd/pkg/transport # go.etcd.io/etcd/api/v3 v3.5.0 +## explicit go.etcd.io/etcd/api/v3/authpb go.etcd.io/etcd/api/v3/etcdserverpb go.etcd.io/etcd/api/v3/membershippb @@ -760,6 +764,7 @@ go.etcd.io/etcd/client/pkg/v3/logutil go.etcd.io/etcd/client/pkg/v3/systemd go.etcd.io/etcd/client/pkg/v3/types # go.etcd.io/etcd/client/v3 v3.5.0 +## explicit go.etcd.io/etcd/client/v3 go.etcd.io/etcd/client/v3/credentials go.etcd.io/etcd/client/v3/internal/endpoint