Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: Allow exposing multiple label-sets #1284

Merged
merged 6 commits into from
Jul 1, 2019
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@ We use *breaking* word for marking changes that are not backward compatible (rel

- [#1248](https://github.com/improbable-eng/thanos/pull/1248) Add a web UI to show the state of remote storage.

### Changed

- [#1284](https://github.com/improbable-eng/thanos/pull/1284) Add support for multiple label-sets in Info gRPC service. This deprecates the single `Labels` slice of the `InfoResponse`, in a future release backward compatible handling for the single set of Labels will be removed. Upgrading to v0.6.0 or higher is advised.

## [v0.5.0](https://github.com/improbable-eng/thanos/releases/tag/v0.5.0) - 2019.06.05

TL;DR: Store LRU cache is no longer leaking, Upgraded Thanos UI to Prometheus 2.9, Fixed auto-downsampling, Moved to Go 1.12.5 and more.
Expand Down
1 change: 0 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ require (
github.com/uber/jaeger-client-go v2.16.0+incompatible
github.com/uber/jaeger-lib v2.0.0+incompatible
go.uber.org/atomic v1.4.0 // indirect
golang.org/x/net v0.0.0-20190522155817-f3200d17e092
golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421
golang.org/x/sync v0.0.0-20190227155943-e225da77a7e6
golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2
Expand Down
103 changes: 58 additions & 45 deletions pkg/query/storeset.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"sort"
"strings"
"sync"
"time"

Expand Down Expand Up @@ -32,14 +33,14 @@ type StoreSpec interface {
// If metadata call fails we assume that store is no longer accessible and we should not use it.
// NOTE: It is implementation responsibility to retry until context timeout, but a caller responsibility to manage
// given store connection.
Metadata(ctx context.Context, client storepb.StoreClient) (labels []storepb.Label, mint int64, maxt int64, err error)
Metadata(ctx context.Context, client storepb.StoreClient) (labelSets []storepb.LabelSet, labels []storepb.Label, mint int64, maxt int64, err error)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't really get why not just replacing with labelSets.

We can hide the compatibility logic internally, right?

}

type StoreStatus struct {
Name string
LastCheck time.Time
LastError error
Labels []storepb.Label
LabelSets []storepb.LabelSet
StoreType component.StoreAPI
MinTime int64
MaxTime int64
Expand All @@ -62,12 +63,12 @@ func (s *grpcStoreSpec) Addr() string {

// Metadata method for gRPC store API tries to reach host Info method until context timeout. If we are unable to get metadata after
// that time, we assume that the host is unhealthy and return error.
func (s *grpcStoreSpec) Metadata(ctx context.Context, client storepb.StoreClient) (labels []storepb.Label, mint int64, maxt int64, err error) {
func (s *grpcStoreSpec) Metadata(ctx context.Context, client storepb.StoreClient) (labelSets []storepb.LabelSet, labels []storepb.Label, mint int64, maxt int64, err error) {
resp, err := client.Info(ctx, &storepb.InfoRequest{}, grpc.FailFast(false))
if err != nil {
return nil, 0, 0, errors.Wrapf(err, "fetching store info from %s", s.addr)
return nil, nil, 0, 0, errors.Wrapf(err, "fetching store info from %s", s.addr)
}
return resp.Labels, resp.MinTime, resp.MaxTime, nil
return resp.LabelSets, resp.Labels, resp.MinTime, resp.MaxTime, nil
}

// StoreSet maintains a set of active stores. It is backed up by Store Specifications that are dynamically fetched on
Expand All @@ -81,13 +82,13 @@ type StoreSet struct {
dialOpts []grpc.DialOption
gRPCInfoCallTimeout time.Duration

mtx sync.RWMutex
storesStatusesMtx sync.RWMutex
stores map[string]*storeRef
storeNodeConnections prometheus.Gauge
externalLabelStores map[string]int
storeStatuses map[string]*StoreStatus
unhealthyStoreTimeout time.Duration
mtx sync.RWMutex
storesStatusesMtx sync.RWMutex
stores map[string]*storeRef
storeNodeConnections prometheus.Gauge
externalLabelOccurrencesInStores map[string]int
storeStatuses map[string]*StoreStatus
unhealthyStoreTimeout time.Duration
}

type storeSetNodeCollector struct {
Expand Down Expand Up @@ -135,12 +136,12 @@ func NewStoreSet(
}

ss := &StoreSet{
logger: log.With(logger, "component", "storeset"),
storeSpecs: storeSpecs,
dialOpts: dialOpts,
storeNodeConnections: storeNodeConnections,
gRPCInfoCallTimeout: 10 * time.Second,
externalLabelStores: map[string]int{},
logger: log.With(logger, "component", "storeset"),
storeSpecs: storeSpecs,
dialOpts: dialOpts,
storeNodeConnections: storeNodeConnections,
gRPCInfoCallTimeout: 10 * time.Second,
externalLabelOccurrencesInStores: map[string]int{},
stores: make(map[string]*storeRef),
storeStatuses: make(map[string]*StoreStatus),
unhealthyStoreTimeout: unhealthyStoreTimeout,
Expand All @@ -162,27 +163,31 @@ type storeRef struct {
addr string

// Meta (can change during runtime).
labels []storepb.Label
labelSets []storepb.LabelSet
storeType component.StoreAPI
minTime int64
maxTime int64

logger log.Logger
}

func (s *storeRef) Update(labels []storepb.Label, minTime int64, maxTime int64) {
s.mtx.RLock()
defer s.mtx.RUnlock()
func (s *storeRef) Update(labelSets []storepb.LabelSet, labels []storepb.Label, minTime int64, maxTime int64) {
if len(labelSets) == 0 && len(labels) > 0 {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would hide it behind Metadata interface.

labelSets = []storepb.LabelSet{{Labels: labels}}
}

s.labels = labels
s.mtx.Lock()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

defer s.mtx.Unlock()

s.labelSets = labelSets
s.minTime = minTime
s.maxTime = maxTime
}

func (s *storeRef) Labels() []storepb.Label {
func (s *storeRef) LabelSets() []storepb.LabelSet {
s.mtx.RLock()
defer s.mtx.RUnlock()
return s.labels
return s.labelSets
}

func (s *storeRef) TimeRange() (int64, int64) {
Expand All @@ -194,7 +199,7 @@ func (s *storeRef) TimeRange() (int64, int64) {

func (s *storeRef) String() string {
mint, maxt := s.TimeRange()
return fmt.Sprintf("Addr: %s Labels: %v Mint: %d Maxt: %d", s.addr, s.Labels(), mint, maxt)
return fmt.Sprintf("Addr: %s LabelSets: %v Mint: %d Maxt: %d", s.addr, storepb.LabelSetsToString(s.LabelSets()), mint, maxt)
}

func (s *storeRef) Addr() string {
Expand All @@ -211,10 +216,11 @@ func (s *StoreSet) Update(ctx context.Context) {
healthyStores := s.getHealthyStores(ctx)

// Record the number of occurrences of external label combinations for current store slice.
externalLabelStores := map[string]int{}
externalLabelOccurrencesInStores := map[string]int{}
for _, st := range healthyStores {
externalLabelStores[externalLabelsFromStore(st)]++
externalLabelOccurrencesInStores[externalLabelsFromStore(st)]++
}
level.Debug(s.logger).Log("msg", "updating healthy stores", "externalLabelOccurrencesInStores", fmt.Sprintf("%#+v", externalLabelOccurrencesInStores))

s.mtx.Lock()
defer s.mtx.Unlock()
Expand All @@ -238,10 +244,12 @@ func (s *StoreSet) Update(ctx context.Context) {
// No external labels means strictly store gateway or ruler and it is fine to have access to multiple instances of them.
//
// Sidecar will error out if it will be configured with empty external labels.
if len(store.Labels()) > 0 && externalLabelStores[externalLabelsFromStore(store)] != 1 {
externalLabels := externalLabelsFromStore(store)
storesWithExternalLabels := externalLabelOccurrencesInStores[externalLabels]
if len(store.LabelSets()) > 0 && storesWithExternalLabels != 1 {
store.close()
s.updateStoreStatus(store, errors.New(droppingStoreMessage))
level.Warn(s.logger).Log("msg", droppingStoreMessage, "address", addr)
level.Warn(s.logger).Log("msg", droppingStoreMessage, "address", addr, "extLset", externalLabels, "duplicates", storesWithExternalLabels)
continue
}

Expand All @@ -254,7 +262,7 @@ func (s *StoreSet) Update(ctx context.Context) {
s.updateStoreStatus(store, nil)
level.Info(s.logger).Log("msg", "adding new store to query storeset", "address", addr)
}
s.externalLabelStores = externalLabelStores
s.externalLabelOccurrencesInStores = externalLabelOccurrencesInStores
s.storeNodeConnections.Set(float64(len(s.stores)))
s.cleanUpStoreStatuses()
}
Expand Down Expand Up @@ -288,14 +296,14 @@ func (s *StoreSet) getHealthyStores(ctx context.Context) map[string]*storeRef {
store, ok := s.stores[addr]
if ok {
// Check existing store. Is it healthy? What are current metadata?
labels, minTime, maxTime, err := spec.Metadata(ctx, store.StoreClient)
labelSets, labels, minTime, maxTime, err := spec.Metadata(ctx, store.StoreClient)
if err != nil {
// Peer unhealthy. Do not include in healthy stores.
s.updateStoreStatus(store, err)
level.Warn(s.logger).Log("msg", "update of store node failed", "err", err, "address", addr)
return
}
store.Update(labels, minTime, maxTime)
store.Update(labelSets, labels, minTime, maxTime)
} else {
// New store or was unhealthy and was removed in the past - create new one.
conn, err := grpc.DialContext(ctx, addr, s.dialOpts...)
Expand All @@ -315,7 +323,7 @@ func (s *StoreSet) getHealthyStores(ctx context.Context) map[string]*storeRef {
return
}
store.storeType = component.FromProto(resp.StoreType)
store.Update(resp.Labels, resp.MinTime, resp.MaxTime)
store.Update(resp.LabelSets, resp.Labels, resp.MinTime, resp.MaxTime)
}

mtx.Lock()
Expand All @@ -331,16 +339,21 @@ func (s *StoreSet) getHealthyStores(ctx context.Context) map[string]*storeRef {
}

func externalLabelsFromStore(store *storeRef) string {
tsdbLabels := labels.Labels{}
for _, l := range store.labels {
tsdbLabels = append(tsdbLabels, labels.Label{
Name: l.Name,
Value: l.Value,
})
tsdbLabelSetStrings := make([]string, 0, len(store.labelSets))
for _, ls := range store.labelSets {
tsdbLabels := labels.Labels{}
for _, l := range ls.Labels {
tsdbLabels = append(tsdbLabels, labels.Label{
Name: l.Name,
Value: l.Value,
})
}
sort.Sort(tsdbLabels)
tsdbLabelSetStrings = append(tsdbLabelSetStrings, tsdbLabels.String())
}
sort.Sort(tsdbLabels)
sort.Strings(tsdbLabelSetStrings)

return tsdbLabels.String()
return strings.Join(tsdbLabelSetStrings, ",")
}

func (s *StoreSet) updateStoreStatus(store *storeRef, err error) {
Expand All @@ -357,7 +370,7 @@ func (s *StoreSet) updateStoreStatus(store *storeRef, err error) {
status.LastCheck = time.Now()

if err == nil {
status.Labels = store.labels
status.LabelSets = store.labelSets
status.StoreType = store.storeType
status.MinTime = store.minTime
status.MaxTime = store.maxTime
Expand Down Expand Up @@ -385,8 +398,8 @@ func (s *StoreSet) externalLabelOccurrences() map[string]int {
s.mtx.RLock()
defer s.mtx.RUnlock()

r := make(map[string]int, len(s.externalLabelStores))
for k, v := range s.externalLabelStores {
r := make(map[string]int, len(s.externalLabelOccurrencesInStores))
for k, v := range s.externalLabelOccurrencesInStores {
r[k] = v
}

Expand Down
59 changes: 38 additions & 21 deletions pkg/query/storeset_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,18 @@ package query

import (
"context"
"fmt"
"math"
"net"
"os"
"testing"
"time"

"sort"

"github.com/fortytw2/leaktest"
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/improbable-eng/thanos/pkg/store/storepb"
"github.com/improbable-eng/thanos/pkg/testutil"
"google.golang.org/grpc"
Expand Down Expand Up @@ -56,17 +60,23 @@ func newTestStores(numStores int, storesExtLabels ...[]storepb.Label) (*testStor
}

for i := 0; i < numStores; i++ {
lsetFn := func(addr string) []storepb.Label {
lsetFn := func(addr string) []storepb.LabelSet {
if len(storesExtLabels) != numStores {
return []storepb.Label{
{
Name: "addr",
Value: addr,
return []storepb.LabelSet{{
Labels: []storepb.Label{
{
Name: "addr",
Value: addr,
},
},
}
}}
}
ls := storesExtLabels[i]
if len(ls) == 0 {
return []storepb.LabelSet{}
}

return storesExtLabels[i]
return []storepb.LabelSet{{Labels: storesExtLabels[i]}}
}

srv, addr, err := startStore(lsetFn)
Expand All @@ -82,14 +92,14 @@ func newTestStores(numStores int, storesExtLabels ...[]storepb.Label) (*testStor
return st, nil
}

func startStore(lsetFn func(addr string) []storepb.Label) (*grpc.Server, string, error) {
func startStore(lsetFn func(addr string) []storepb.LabelSet) (*grpc.Server, string, error) {
listener, err := net.Listen("tcp", "127.0.0.1:0")
if err != nil {
return nil, "", err
}

srv := grpc.NewServer()
storepb.RegisterStoreServer(srv, &testStore{info: storepb.InfoResponse{Labels: lsetFn(listener.Addr().String())}})
storepb.RegisterStoreServer(srv, &testStore{info: storepb.InfoResponse{LabelSets: lsetFn(listener.Addr().String())}})
go func() {
_ = srv.Serve(listener)
}()
Expand Down Expand Up @@ -154,9 +164,9 @@ func TestStoreSet_AllAvailable_ThenDown(t *testing.T) {

for addr, store := range storeSet.stores {
testutil.Equals(t, addr, store.addr)
testutil.Equals(t, 1, len(store.labels))
testutil.Equals(t, "addr", store.labels[0].Name)
testutil.Equals(t, addr, store.labels[0].Value)
testutil.Equals(t, 1, len(store.labelSets))
testutil.Equals(t, "addr", store.labelSets[0].Labels[0].Name)
testutil.Equals(t, addr, store.labelSets[0].Labels[0].Value)
}

st.CloseOne(initialStoreAddr[0])
Expand All @@ -170,9 +180,9 @@ func TestStoreSet_AllAvailable_ThenDown(t *testing.T) {
store, ok := storeSet.stores[addr]
testutil.Assert(t, ok, "addr exist")
testutil.Equals(t, addr, store.addr)
testutil.Equals(t, 1, len(store.labels))
testutil.Equals(t, "addr", store.labels[0].Name)
testutil.Equals(t, addr, store.labels[0].Value)
testutil.Equals(t, 1, len(store.labelSets))
testutil.Equals(t, "addr", store.labelSets[0].Labels[0].Name)
testutil.Equals(t, addr, store.labelSets[0].Labels[0].Value)
}

func TestStoreSet_StaticStores_OneAvailable(t *testing.T) {
Expand All @@ -199,9 +209,9 @@ func TestStoreSet_StaticStores_OneAvailable(t *testing.T) {
store, ok := storeSet.stores[addr]
testutil.Assert(t, ok, "addr exist")
testutil.Equals(t, addr, store.addr)
testutil.Equals(t, 1, len(store.labels))
testutil.Equals(t, "addr", store.labels[0].Name)
testutil.Equals(t, addr, store.labels[0].Value)
testutil.Equals(t, 1, len(store.labelSets))
testutil.Equals(t, "addr", store.labelSets[0].Labels[0].Name)
testutil.Equals(t, addr, store.labelSets[0].Labels[0].Value)
}

func TestStoreSet_StaticStores_NoneAvailable(t *testing.T) {
Expand Down Expand Up @@ -259,7 +269,10 @@ func TestStoreSet_AllAvailable_BlockExtLsetDuplicates(t *testing.T) {

initialStoreAddr := st.StoreAddresses()

storeSet := NewStoreSet(nil, nil, specsFromAddrFunc(initialStoreAddr), testGRPCOpts, time.Minute)
logger := log.NewLogfmtLogger(log.NewSyncWriter(os.Stderr))
logger = level.NewFilter(logger, level.AllowDebug())
logger = log.With(logger, "ts", log.DefaultTimestampUTC, "caller", log.DefaultCaller)
storeSet := NewStoreSet(logger, nil, specsFromAddrFunc(initialStoreAddr), testGRPCOpts, time.Minute)
storeSet.gRPCInfoCallTimeout = 2 * time.Second
defer storeSet.Close()

Expand All @@ -269,13 +282,17 @@ func TestStoreSet_AllAvailable_BlockExtLsetDuplicates(t *testing.T) {
storeSet.Update(context.Background())
storeSet.Update(context.Background())

testutil.Assert(t, len(storeSet.stores) == 5-2, "all services should respond just fine, but we expect duplicates being blocked.")
storeNum := len(storeSet.stores)
expectedStoreNum := 5 - 2
testutil.Assert(t, storeNum == expectedStoreNum, fmt.Sprintf("all services should respond just fine, but we expect duplicates being blocked. Expected %d stores, got %d", expectedStoreNum, storeNum))

// Sort result to be able to compare.

var existingStoreLabels [][]storepb.Label
for _, store := range storeSet.stores {
existingStoreLabels = append(existingStoreLabels, store.Labels())
for _, ls := range store.LabelSets() {
existingStoreLabels = append(existingStoreLabels, ls.Labels)
}
}
sort.Slice(existingStoreLabels, func(i, j int) bool {
return len(existingStoreLabels[i]) > len(existingStoreLabels[j])
Expand Down
Loading