Skip to content

Commit

Permalink
Rebase master && update storeset test
Browse files Browse the repository at this point in the history
Signed-off-by: jojohappy <sarahdj0917@gmail.com>
  • Loading branch information
jojohappy committed Oct 8, 2019
1 parent b52a238 commit a210289
Show file tree
Hide file tree
Showing 7 changed files with 77 additions and 65 deletions.
2 changes: 1 addition & 1 deletion cmd/thanos/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ func runCompact(
maxCompactionLevel int,
blockSyncConcurrency int,
concurrency int,
selectorRelabelConf *pathOrContent,
selectorRelabelConf *extflag.PathOrContent,
) error {
halted := prometheus.NewGauge(prometheus.GaugeOpts{
Name: "thanos_compactor_halted",
Expand Down
26 changes: 7 additions & 19 deletions cmd/thanos/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,23 +73,11 @@ func regCommonTracingFlags(app *kingpin.Application) *extflag.PathOrContent {
)
}

func regSelectorRelabelFlags(cmd *kingpin.CmdClause) *pathOrContent {
fileFlagName := "selector.relabel-config-file"
contentFlagName := "selector.relabel-config"
help := "Path to YAML file that contains seletor relabeling configuration. It follows native Prometheus relabel-config syntax. See format details: https://prometheus.io/docs/prometheus/latest/configuration/configuration/#relabel_config "

selectorRelabelConfFile := cmd.Flag(fileFlagName, help).PlaceHolder("<seletor.relabel-config-yaml-path>").String()

help = fmt.Sprintf("Alternative to '%s' flag. Relabeling configuration in YAML. See format details: https://prometheus.io/docs/prometheus/latest/configuration/configuration/#relabel_config", fileFlagName)

selectorRelabelConf := cmd.Flag(contentFlagName, help).PlaceHolder("<selector.relabel-config-yaml>").String()

return &pathOrContent{
fileFlagName: fileFlagName,
contentFlagName: contentFlagName,
required: false,

path: selectorRelabelConfFile,
content: selectorRelabelConf,
}
func regSelectorRelabelFlags(cmd *kingpin.CmdClause) *extflag.PathOrContent {
return extflag.RegisterPathOrContent(
cmd,
"selector.relabel-config",
"YAML file that contains seletor relabeling configuration. It follows native Prometheus relabel-config syntax. See format details: https://prometheus.io/docs/prometheus/latest/configuration/configuration/#relabel_config ",
false,
)
}
4 changes: 3 additions & 1 deletion cmd/thanos/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/opentracing/opentracing-go"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/prometheus/pkg/relabel"
"github.com/thanos-io/thanos/pkg/component"
"github.com/thanos-io/thanos/pkg/model"
"github.com/thanos-io/thanos/pkg/objstore/client"
Expand All @@ -21,6 +22,7 @@ import (
"github.com/thanos-io/thanos/pkg/store"
storecache "github.com/thanos-io/thanos/pkg/store/cache"
"gopkg.in/alecthomas/kingpin.v2"
yaml "gopkg.in/yaml.v2"
)

// registerStore registers a store command.
Expand Down Expand Up @@ -116,7 +118,7 @@ func runStore(
syncInterval time.Duration,
blockSyncConcurrency int,
filterConf *store.FilterConfig,
selectorRelabelConf *pathOrContent,
selectorRelabelConf *extflag.PathOrContent,
) error {
statusProber := prober.NewProber(component, logger, prometheus.WrapRegistererWithPrefix("thanos_", reg))

Expand Down
22 changes: 11 additions & 11 deletions pkg/compact/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ const (
)

var blockTooFreshSentinelError = errors.New("Block too fresh")
var blockDroppedInRelabelingError = errors.New("Block dropped in Relabeling")

// Syncer syncronizes block metas from a bucket into a local directory.
// It sorts them into compaction groups based on equal label sets.
Expand Down Expand Up @@ -210,9 +209,10 @@ func (c *Syncer) syncMetas(ctx context.Context) error {
}

meta, err := c.downloadMeta(workCtx, id)
if err == blockTooFreshSentinelError || err == blockDroppedInRelabelingError {
if err == blockTooFreshSentinelError {
continue
}

if err != nil {
if removedOrIgnored := c.removeIfMetaMalformed(workCtx, id); removedOrIgnored {
continue
Expand All @@ -221,6 +221,15 @@ func (c *Syncer) syncMetas(ctx context.Context) error {
return
}

// Check for block labels by relabeling.
// If output is empty, the block will be dropped.
lset := promlables.FromMap(meta.Thanos.Labels)
processedLabels := relabel.Process(lset, c.relabelConfig...)
if processedLabels == nil {
level.Warn(c.logger).Log("msg", "dropping block(drop in relabeling)", "block", id)
return
}

c.blocksMtx.Lock()
c.blocks[id] = meta
c.blocksMtx.Unlock()
Expand Down Expand Up @@ -280,15 +289,6 @@ func (c *Syncer) downloadMeta(ctx context.Context, id ulid.ULID) (*metadata.Meta
return nil, errors.Wrapf(err, "downloading meta.json for %s", id)
}

// Check for block labels by relabeling.
// If output is empty, the block will be dropped.
lset := promlables.FromMap(meta.Thanos.Labels)
processedLabels := relabel.Process(lset, c.relabelConfig...)
if processedLabels == nil {
level.Debug(c.logger).Log("msg", "dropping block(drop in relabeling)", "block", id)
return nil, blockDroppedInRelabelingError
}

// ULIDs contain a millisecond timestamp. We do not consider blocks that have been created too recently to
// avoid races when a block is only partially uploaded. This relates to all blocks, excluding:
// - repair created blocks
Expand Down
17 changes: 9 additions & 8 deletions pkg/query/storeset.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,10 @@ func (s *StoreSet) Update(ctx context.Context) {
// Record the number of occurrences of external label combinations for current store slice.
externalLabelOccurrencesInStores := map[string]int{}
for _, st := range healthyStores {
externalLabelOccurrencesInStores[externalLabelsFromStore(st)]++
if st.storeType != nil && (st.storeType.ToProto() == storepb.StoreType_QUERY ||
st.storeType.ToProto() == storepb.StoreType_SIDECAR) {
externalLabelOccurrencesInStores[externalLabelsFromStore(st)]++
}
}
level.Debug(s.logger).Log("msg", "updating healthy stores", "externalLabelOccurrencesInStores", fmt.Sprintf("%#+v", externalLabelOccurrencesInStores))

Expand Down Expand Up @@ -252,7 +255,11 @@ func (s *StoreSet) Update(ctx context.Context) {
// Note: No external labels means strictly store gateway or ruler and it is fine to have access to multiple instances of them.
// Any other component will error out if it will be configured with empty external labels.
externalLabels := externalLabelsFromStore(store)
if len(store.LabelSets()) > 0 && externalLabelOccurrencesInStores[externalLabels] != 1 {
if len(store.LabelSets()) > 0 &&
store.storeType != nil &&
(store.storeType.ToProto() == storepb.StoreType_QUERY ||
store.storeType.ToProto() == storepb.StoreType_SIDECAR) &&
externalLabelOccurrencesInStores[externalLabels] != 1 {
store.close()
s.updateStoreStatus(store, errors.New(droppingStoreMessage))
level.Warn(s.logger).Log("msg", droppingStoreMessage, "address", addr, "extLset", externalLabels, "duplicates", externalLabelOccurrencesInStores[externalLabels])
Expand Down Expand Up @@ -360,12 +367,6 @@ func externalLabelsFromStore(store *storeRef) string {
}
sort.Strings(tsdbLabelSetStrings)

// Append the storeType to the end of list, because the store gateway will be exposed external labels,
// we allow the duplicate external labels with different components.
if store.storeType != nil {
tsdbLabelSetStrings = append(tsdbLabelSetStrings, store.storeType.String())
}

return strings.Join(tsdbLabelSetStrings, ",")
}

Expand Down
39 changes: 31 additions & 8 deletions pkg/query/storeset_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/fortytw2/leaktest"
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/thanos-io/thanos/pkg/component"
"github.com/thanos-io/thanos/pkg/store/storepb"
"github.com/thanos-io/thanos/pkg/testutil"
"google.golang.org/grpc"
Expand All @@ -26,6 +27,11 @@ var testGRPCOpts = []grpc.DialOption{
grpc.WithInsecure(),
}

var (
emptyStoresExtLabels [][]storepb.Label
emptyStoresTypes []component.StoreAPI
)

type testStore struct {
info storepb.InfoResponse
}
Expand Down Expand Up @@ -54,7 +60,7 @@ type testStores struct {
srvs map[string]*grpc.Server
}

func newTestStores(numStores int, storesExtLabels ...[]storepb.Label) (*testStores, error) {
func newTestStores(numStores int, storesExtLabels [][]storepb.Label, storesTypes []component.StoreAPI) (*testStores, error) {
st := &testStores{
srvs: map[string]*grpc.Server{},
}
Expand All @@ -79,7 +85,15 @@ func newTestStores(numStores int, storesExtLabels ...[]storepb.Label) (*testStor
return []storepb.LabelSet{{Labels: storesExtLabels[i]}}
}

srv, addr, err := startStore(lsetFn)
storeTypeFn := func() storepb.StoreType {
if len(storesTypes) != numStores {
return component.Sidecar.ToProto()
}
st := storesTypes[i]
return st.ToProto()
}

srv, addr, err := startStore(lsetFn, storeTypeFn)
if err != nil {
// Close so far started servers.
st.Close()
Expand All @@ -92,14 +106,14 @@ func newTestStores(numStores int, storesExtLabels ...[]storepb.Label) (*testStor
return st, nil
}

func startStore(lsetFn func(addr string) []storepb.LabelSet) (*grpc.Server, string, error) {
func startStore(lsetFn func(addr string) []storepb.LabelSet, storeTypeFn func() storepb.StoreType) (*grpc.Server, string, error) {
listener, err := net.Listen("tcp", "127.0.0.1:0")
if err != nil {
return nil, "", err
}

srv := grpc.NewServer()
storepb.RegisterStoreServer(srv, &testStore{info: storepb.InfoResponse{LabelSets: lsetFn(listener.Addr().String())}})
storepb.RegisterStoreServer(srv, &testStore{info: storepb.InfoResponse{LabelSets: lsetFn(listener.Addr().String()), StoreType: storeTypeFn()}})
go func() {
_ = srv.Serve(listener)
}()
Expand Down Expand Up @@ -144,7 +158,7 @@ func specsFromAddrFunc(addrs []string) func() []StoreSpec {
func TestStoreSet_AllAvailable_ThenDown(t *testing.T) {
defer leaktest.CheckTimeout(t, 10*time.Second)()

st, err := newTestStores(2)
st, err := newTestStores(2, emptyStoresExtLabels, emptyStoresTypes)
testutil.Ok(t, err)
defer st.Close()

Expand Down Expand Up @@ -188,7 +202,7 @@ func TestStoreSet_AllAvailable_ThenDown(t *testing.T) {
func TestStoreSet_StaticStores_OneAvailable(t *testing.T) {
defer leaktest.CheckTimeout(t, 10*time.Second)()

st, err := newTestStores(2)
st, err := newTestStores(2, emptyStoresExtLabels, emptyStoresTypes)
testutil.Ok(t, err)
defer st.Close()

Expand Down Expand Up @@ -217,7 +231,7 @@ func TestStoreSet_StaticStores_OneAvailable(t *testing.T) {
func TestStoreSet_StaticStores_NoneAvailable(t *testing.T) {
defer leaktest.CheckTimeout(t, 10*time.Second)()

st, err := newTestStores(2)
st, err := newTestStores(2, emptyStoresExtLabels, emptyStoresTypes)
testutil.Ok(t, err)
defer st.Close()

Expand Down Expand Up @@ -263,7 +277,16 @@ func TestStoreSet_AllAvailable_BlockExtLsetDuplicates(t *testing.T) {
},
}

st, err := newTestStores(6, storeExtLabels...)
storeTypes := []component.StoreAPI{
component.Query,
component.Sidecar,
component.Query,
component.Store,
component.Store,
component.Sidecar,
}

st, err := newTestStores(6, storeExtLabels, storeTypes)
testutil.Ok(t, err)
defer st.Close()

Expand Down
32 changes: 15 additions & 17 deletions pkg/store/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -470,7 +470,7 @@ func (s *BucketStore) addBlock(ctx context.Context, id ulid.ULID) (err error) {
level.Debug(s.logger).Log("msg", "dropping block(drop in relabeling)", "block", id)
return os.RemoveAll(dir)
}
b.labels = labels.FromMap(processedLabels.Map())
b.labels = lset
sort.Sort(b.labels)

set, ok := s.blockSets[h]
Expand Down Expand Up @@ -542,25 +542,23 @@ func (s *BucketStore) Info(context.Context, *storepb.InfoRequest) (*storepb.Info
MaxTime: maxt,
}

if len(s.relabelConfig) != 0 {
labelSets := make(map[uint64][]storepb.Label, len(s.blocks))
for _, bs := range s.blocks {
ls := map[string]string{}
for _, l := range bs.labels {
ls[l.Name] = l.Value
}

res := []storepb.Label{}
for k, v := range ls {
res = append(res, storepb.Label{Name: k, Value: v})
}
labelSets[bs.labels.Hash()] = res
labelSets := make(map[uint64][]storepb.Label, len(s.blocks))
for _, bs := range s.blocks {
ls := map[string]string{}
for _, l := range bs.labels {
ls[l.Name] = l.Value
}

res.LabelSets = make([]storepb.LabelSet, 0, len(labelSets))
for _, v := range labelSets {
res.LabelSets = append(res.LabelSets, storepb.LabelSet{Labels: v})
res := []storepb.Label{}
for k, v := range ls {
res = append(res, storepb.Label{Name: k, Value: v})
}
labelSets[bs.labels.Hash()] = res
}

res.LabelSets = make([]storepb.LabelSet, 0, len(labelSets))
for _, v := range labelSets {
res.LabelSets = append(res.LabelSets, storepb.LabelSet{Labels: v})
}

return res, nil
Expand Down

0 comments on commit a210289

Please sign in to comment.