Skip to content

Commit

Permalink
pkg/store/multitsdb.go: Make errors and warnings tenant aware
Browse files Browse the repository at this point in the history
  • Loading branch information
brancz committed Apr 16, 2020
1 parent 47dc82a commit fe81aec
Showing 1 changed file with 31 additions and 22 deletions.
53 changes: 31 additions & 22 deletions pkg/store/multitsdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package store

import (
"context"
"fmt"
"sync"

"github.com/go-kit/kit/log"
Expand Down Expand Up @@ -51,10 +52,10 @@ func (s *MultiTSDBStore) Info(ctx context.Context, req *storepb.InfoRequest) (*s
}

infos := make([]*storepb.InfoResponse, 0, len(stores))
for _, store := range stores {
for tenant, store := range stores {
info, err := store.Info(ctx, req)
if err != nil {
return nil, err
return nil, errors.Wrapf(err, "get info for tenant %s", tenant)
}
infos = append(infos, info)
}
Expand All @@ -81,7 +82,7 @@ func (s *MultiTSDBStore) Info(ctx context.Context, req *storepb.InfoRequest) (*s
return resp, nil
}

type seriesSetServer struct {
type tenantSeriesSetServer struct {
grpc.ServerStream

ctx context.Context
Expand All @@ -90,27 +91,31 @@ type seriesSetServer struct {
recv chan *storepb.Series
cur *storepb.Series

err error
err error
tenant string
}

func newSeriesSetServer(
func newTenantSeriesSetServer(
ctx context.Context,
tenant string,
warnCh warnSender,
) *seriesSetServer {
return &seriesSetServer{
) *tenantSeriesSetServer {
return &tenantSeriesSetServer{
ctx: ctx,
tenant: tenant,
warnCh: warnCh,
recv: make(chan *storepb.Series),
}
}

func (s *seriesSetServer) Context() context.Context {
func (s *tenantSeriesSetServer) Context() context.Context {
return s.ctx
}

func (s *seriesSetServer) Series(store *TSDBStore, r *storepb.SeriesRequest) {
func (s *tenantSeriesSetServer) Series(store *TSDBStore, r *storepb.SeriesRequest) {
err := store.Series(r, s)
if err != nil {
err = errors.Wrapf(s.err, "get series for tenant %s", s.tenant)
if r.PartialResponseDisabled {
s.err = err
} else {
Expand All @@ -121,7 +126,7 @@ func (s *seriesSetServer) Series(store *TSDBStore, r *storepb.SeriesRequest) {
close(s.recv)
}

func (s *seriesSetServer) Send(r *storepb.SeriesResponse) error {
func (s *tenantSeriesSetServer) Send(r *storepb.SeriesResponse) error {
series := r.GetSeries()
chunks := make([]storepb.AggrChunk, len(series.Chunks))
copy(chunks, series.Chunks)
Expand All @@ -132,19 +137,19 @@ func (s *seriesSetServer) Send(r *storepb.SeriesResponse) error {
return nil
}

func (s *seriesSetServer) Next() (ok bool) {
func (s *tenantSeriesSetServer) Next() (ok bool) {
s.cur, ok = <-s.recv
return ok
}

func (s *seriesSetServer) At() ([]storepb.Label, []storepb.AggrChunk) {
func (s *tenantSeriesSetServer) At() ([]storepb.Label, []storepb.AggrChunk) {
if s.cur == nil {
return nil, nil
}
return s.cur.Labels, s.cur.Chunks
}

func (s *seriesSetServer) Err() error {
func (s *tenantSeriesSetServer) Err() error {
return s.err
}

Expand Down Expand Up @@ -178,12 +183,12 @@ func (s *MultiTSDBStore) Series(r *storepb.SeriesRequest, srv storepb.Store_Seri

for tenant, store := range stores {
store := store
seriesCtx, closeSeries := context.WithCancel(gctx)
seriesCtx, cancelSeries := context.WithCancel(gctx)
seriesCtx = grpc_opentracing.ClientAddContextTags(seriesCtx, opentracing.Tags{
"tenant": tenant,
})
defer closeSeries()
ss := newSeriesSetServer(seriesCtx, respSender)
defer cancelSeries()
ss := newTenantSeriesSetServer(seriesCtx, tenant, respSender)
wg.Add(1)
go func() {
defer wg.Done()
Expand Down Expand Up @@ -217,18 +222,18 @@ func (s *MultiTSDBStore) LabelNames(ctx context.Context, req *storepb.LabelNames
warnings := map[string]struct{}{}

stores := s.tsdbStores()
for _, store := range stores {
for tenant, store := range stores {
r, err := store.LabelNames(ctx, req)
if err != nil {
return nil, err
return nil, errors.Wrapf(err, "get label names for tenant %s", tenant)
}

for _, l := range r.Names {
names[l] = struct{}{}
}

for _, l := range r.Warnings {
warnings[l] = struct{}{}
warnings[prefixTenantWarning(tenant, l)] = struct{}{}
}
}

Expand All @@ -238,6 +243,10 @@ func (s *MultiTSDBStore) LabelNames(ctx context.Context, req *storepb.LabelNames
}, nil
}

func prefixTenantWarning(tenant, s string) string {
return fmt.Sprintf("[%s] %s", tenant, s)
}

func keys(m map[string]struct{}) []string {
res := make([]string, 0, len(m))
for k := range m {
Expand All @@ -253,18 +262,18 @@ func (s *MultiTSDBStore) LabelValues(ctx context.Context, req *storepb.LabelValu
warnings := map[string]struct{}{}

stores := s.tsdbStores()
for _, store := range stores {
for tenant, store := range stores {
r, err := store.LabelValues(ctx, req)
if err != nil {
return nil, err
return nil, errors.Wrapf(err, "get label values for tenant %s", tenant)
}

for _, l := range r.Values {
values[l] = struct{}{}
}

for _, l := range r.Warnings {
warnings[l] = struct{}{}
warnings[prefixTenantWarning(tenant, l)] = struct{}{}
}
}

Expand Down

0 comments on commit fe81aec

Please sign in to comment.