diff --git a/pkg/store/multitsdb.go b/pkg/store/multitsdb.go index 3a80907a88..847cf1721b 100644 --- a/pkg/store/multitsdb.go +++ b/pkg/store/multitsdb.go @@ -5,6 +5,7 @@ package store import ( "context" + "fmt" "sync" "github.com/go-kit/kit/log" @@ -51,10 +52,10 @@ func (s *MultiTSDBStore) Info(ctx context.Context, req *storepb.InfoRequest) (*s } infos := make([]*storepb.InfoResponse, 0, len(stores)) - for _, store := range stores { + for tenant, store := range stores { info, err := store.Info(ctx, req) if err != nil { - return nil, err + return nil, errors.Wrapf(err, "get info for tenant %s", tenant) } infos = append(infos, info) } @@ -81,7 +82,7 @@ func (s *MultiTSDBStore) Info(ctx context.Context, req *storepb.InfoRequest) (*s return resp, nil } -type seriesSetServer struct { +type tenantSeriesSetServer struct { grpc.ServerStream ctx context.Context @@ -90,27 +91,31 @@ type seriesSetServer struct { recv chan *storepb.Series cur *storepb.Series - err error + err error + tenant string } -func newSeriesSetServer( +func newTenantSeriesSetServer( ctx context.Context, + tenant string, warnCh warnSender, -) *seriesSetServer { - return &seriesSetServer{ +) *tenantSeriesSetServer { + return &tenantSeriesSetServer{ ctx: ctx, + tenant: tenant, warnCh: warnCh, recv: make(chan *storepb.Series), } } -func (s *seriesSetServer) Context() context.Context { +func (s *tenantSeriesSetServer) Context() context.Context { return s.ctx } -func (s *seriesSetServer) Series(store *TSDBStore, r *storepb.SeriesRequest) { +func (s *tenantSeriesSetServer) Series(store *TSDBStore, r *storepb.SeriesRequest) { err := store.Series(r, s) if err != nil { + err = errors.Wrapf(s.err, "get series for tenant %s", s.tenant) if r.PartialResponseDisabled { s.err = err } else { @@ -121,7 +126,7 @@ func (s *seriesSetServer) Series(store *TSDBStore, r *storepb.SeriesRequest) { close(s.recv) } -func (s *seriesSetServer) Send(r *storepb.SeriesResponse) error { +func (s *tenantSeriesSetServer) Send(r *storepb.SeriesResponse) error { series := r.GetSeries() chunks := make([]storepb.AggrChunk, len(series.Chunks)) copy(chunks, series.Chunks) @@ -132,19 +137,19 @@ func (s *seriesSetServer) Send(r *storepb.SeriesResponse) error { return nil } -func (s *seriesSetServer) Next() (ok bool) { +func (s *tenantSeriesSetServer) Next() (ok bool) { s.cur, ok = <-s.recv return ok } -func (s *seriesSetServer) At() ([]storepb.Label, []storepb.AggrChunk) { +func (s *tenantSeriesSetServer) At() ([]storepb.Label, []storepb.AggrChunk) { if s.cur == nil { return nil, nil } return s.cur.Labels, s.cur.Chunks } -func (s *seriesSetServer) Err() error { +func (s *tenantSeriesSetServer) Err() error { return s.err } @@ -178,12 +183,12 @@ func (s *MultiTSDBStore) Series(r *storepb.SeriesRequest, srv storepb.Store_Seri for tenant, store := range stores { store := store - seriesCtx, closeSeries := context.WithCancel(gctx) + seriesCtx, cancelSeries := context.WithCancel(gctx) seriesCtx = grpc_opentracing.ClientAddContextTags(seriesCtx, opentracing.Tags{ "tenant": tenant, }) - defer closeSeries() - ss := newSeriesSetServer(seriesCtx, respSender) + defer cancelSeries() + ss := newTenantSeriesSetServer(seriesCtx, tenant, respSender) wg.Add(1) go func() { defer wg.Done() @@ -217,10 +222,10 @@ func (s *MultiTSDBStore) LabelNames(ctx context.Context, req *storepb.LabelNames warnings := map[string]struct{}{} stores := s.tsdbStores() - for _, store := range stores { + for tenant, store := range stores { r, err := store.LabelNames(ctx, req) if err != nil { - return nil, err + return nil, errors.Wrapf(err, "get label names for tenant %s", tenant) } for _, l := range r.Names { @@ -228,7 +233,7 @@ func (s *MultiTSDBStore) LabelNames(ctx context.Context, req *storepb.LabelNames } for _, l := range r.Warnings { - warnings[l] = struct{}{} + warnings[prefixTenantWarning(tenant, l)] = struct{}{} } } @@ -238,6 +243,10 @@ func (s *MultiTSDBStore) LabelNames(ctx context.Context, req *storepb.LabelNames }, nil } +func prefixTenantWarning(tenant, s string) string { + return fmt.Sprintf("[%s] %s", tenant, s) +} + func keys(m map[string]struct{}) []string { res := make([]string, 0, len(m)) for k := range m { @@ -253,10 +262,10 @@ func (s *MultiTSDBStore) LabelValues(ctx context.Context, req *storepb.LabelValu warnings := map[string]struct{}{} stores := s.tsdbStores() - for _, store := range stores { + for tenant, store := range stores { r, err := store.LabelValues(ctx, req) if err != nil { - return nil, err + return nil, errors.Wrapf(err, "get label values for tenant %s", tenant) } for _, l := range r.Values { @@ -264,7 +273,7 @@ func (s *MultiTSDBStore) LabelValues(ctx context.Context, req *storepb.LabelValu } for _, l := range r.Warnings { - warnings[l] = struct{}{} + warnings[prefixTenantWarning(tenant, l)] = struct{}{} } }