Skip to content

Commit

Permalink
fix: wait for ongoing queries to finish at close (#3030)
Browse files Browse the repository at this point in the history
  • Loading branch information
kolesnikovae authored Feb 27, 2024
1 parent c286ca2 commit 685506f
Show file tree
Hide file tree
Showing 2 changed files with 66 additions and 16 deletions.
54 changes: 38 additions & 16 deletions pkg/phlaredb/block_querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,7 @@ type singleBlockQuerier struct {

tables []tableReader

queries sync.WaitGroup
openLock sync.Mutex
opened bool
index *index.Reader
Expand Down Expand Up @@ -353,10 +354,11 @@ func (b *singleBlockQuerier) ProfileTypes(ctx context.Context, req *connect.Requ
sp, ctx := opentracing.StartSpanFromContext(ctx, "ProfileTypes Block")
defer sp.Finish()

err := b.Open(ctx)
if err != nil {
if err := b.Open(ctx); err != nil {
return nil, err
}
b.queries.Add(1)
defer b.queries.Done()

values, err := b.index.LabelValues(phlaremodel.LabelNameProfileType)
if err != nil {
Expand Down Expand Up @@ -384,10 +386,11 @@ func (b *singleBlockQuerier) LabelValues(ctx context.Context, req *connect.Reque

params := req.Msg

err := b.Open(ctx)
if err != nil {
if err := b.Open(ctx); err != nil {
return nil, err
}
b.queries.Add(1)
defer b.queries.Done()

names, err := b.index.LabelNames()
if err != nil {
Expand Down Expand Up @@ -451,10 +454,11 @@ func (b *singleBlockQuerier) LabelNames(ctx context.Context, req *connect.Reques

params := req.Msg

err := b.Open(ctx)
if err != nil {
if err := b.Open(ctx); err != nil {
return nil, err
}
b.queries.Add(1)
defer b.queries.Done()

selectors, err := parseSelectors(params.Matchers)
if err != nil {
Expand Down Expand Up @@ -516,6 +520,12 @@ func (b *singleBlockQuerier) Close() error {
b.openLock.Unlock()
b.metrics.blockOpened.Dec()
}()

if !b.opened {
return nil
}
b.queries.Wait()

errs := multierror.New()
if b.index != nil {
err := b.index.Close()
Expand All @@ -524,7 +534,6 @@ func (b *singleBlockQuerier) Close() error {
errs.Add(err)
}
}

for _, t := range b.tables {
if err := t.Close(); err != nil {
errs.Add(err)
Expand Down Expand Up @@ -1523,6 +1532,9 @@ func (b *singleBlockQuerier) SelectMatchingProfiles(ctx context.Context, params
if err := b.Open(ctx); err != nil {
return nil, err
}
b.queries.Add(1)
defer b.queries.Done()

matchers, err := parser.ParseMetricSelector(params.LabelSelector)
if err != nil {
return nil, status.Error(codes.InvalidArgument, "failed to parse label selectors: "+err.Error())
Expand Down Expand Up @@ -1627,6 +1639,9 @@ func (b *singleBlockQuerier) SelectMergeByLabels(
if err := b.Open(ctx); err != nil {
return nil, err
}
b.queries.Add(1)
defer b.queries.Done()

matchers, err := parser.ParseMetricSelector(params.LabelSelector)
if err != nil {
return nil, status.Error(codes.InvalidArgument, "failed to parse label selectors: "+err.Error())
Expand Down Expand Up @@ -1704,6 +1719,9 @@ func (b *singleBlockQuerier) SelectMergeByStacktraces(ctx context.Context, param
if err := b.Open(ctx); err != nil {
return nil, err
}
b.queries.Add(1)
defer b.queries.Done()

matchers, err := parser.ParseMetricSelector(params.LabelSelector)
if err != nil {
return nil, status.Error(codes.InvalidArgument, "failed to parse label selectors: "+err.Error())
Expand Down Expand Up @@ -1770,6 +1788,9 @@ func (b *singleBlockQuerier) SelectMergeBySpans(ctx context.Context, params *ing
if err := b.Open(ctx); err != nil {
return nil, err
}
b.queries.Add(1)
defer b.queries.Done()

matchers, err := parser.ParseMetricSelector(params.LabelSelector)
if err != nil {
return nil, status.Error(codes.InvalidArgument, "failed to parse label selectors: "+err.Error())
Expand Down Expand Up @@ -1835,6 +1856,9 @@ func (b *singleBlockQuerier) SelectMergePprof(ctx context.Context, params *inges
if err := b.Open(ctx); err != nil {
return nil, err
}
b.queries.Add(1)
defer b.queries.Done()

matchers, err := parser.ParseMetricSelector(params.LabelSelector)
if err != nil {
return nil, status.Error(codes.InvalidArgument, "failed to parse label selectors: "+err.Error())
Expand Down Expand Up @@ -1902,10 +1926,11 @@ func (b *singleBlockQuerier) Series(ctx context.Context, params *ingestv1.Series
sp, ctx := opentracing.StartSpanFromContext(ctx, "Series Block")
defer sp.Finish()

err := b.Open(ctx)
if err != nil {
if err := b.Open(ctx); err != nil {
return nil, err
}
b.queries.Add(1)
defer b.queries.Done()

selectors, err := parseSelectors(params.Matchers)
if err != nil {
Expand Down Expand Up @@ -2056,13 +2081,10 @@ func (q *singleBlockQuerier) openTSDBIndex(ctx context.Context) error {
func (q *singleBlockQuerier) Open(ctx context.Context) error {
q.openLock.Lock()
defer q.openLock.Unlock()

// already open
if q.opened {
return nil
}
if err := q.openFiles(ctx); err != nil {
return err
if !q.opened {
if err := q.openFiles(ctx); err != nil {
return err
}
}
q.metrics.blockOpened.Inc()
q.opened = true
Expand Down
28 changes: 28 additions & 0 deletions pkg/phlaredb/sample_merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,13 @@ func (b *singleBlockQuerier) MergeByStacktraces(ctx context.Context, rows iter.I
sp, ctx := opentracing.StartSpanFromContext(ctx, "MergeByStacktraces - Block")
defer sp.Finish()
sp.SetTag("block ULID", b.meta.ULID.String())

if err := b.Open(ctx); err != nil {
return nil, err
}
b.queries.Add(1)
defer b.queries.Done()

ctx = query.AddMetricsToContext(ctx, b.metrics.query)
r := symdb.NewResolver(ctx, b.symbols)
defer r.Release()
Expand All @@ -37,6 +44,13 @@ func (b *singleBlockQuerier) MergePprof(ctx context.Context, rows iter.Iterator[
sp, ctx := opentracing.StartSpanFromContext(ctx, "MergePprof - Block")
defer sp.Finish()
sp.SetTag("block ULID", b.meta.ULID.String())

if err := b.Open(ctx); err != nil {
return nil, err
}
b.queries.Add(1)
defer b.queries.Done()

ctx = query.AddMetricsToContext(ctx, b.metrics.query)
r := symdb.NewResolver(ctx, b.symbols,
symdb.WithResolverMaxNodes(maxNodes),
Expand All @@ -52,6 +66,13 @@ func (b *singleBlockQuerier) MergeByLabels(ctx context.Context, rows iter.Iterat
sp, ctx := opentracing.StartSpanFromContext(ctx, "MergeByLabels - Block")
defer sp.Finish()
sp.SetTag("block ULID", b.meta.ULID.String())

if err := b.Open(ctx); err != nil {
return nil, err
}
b.queries.Add(1)
defer b.queries.Done()

ctx = query.AddMetricsToContext(ctx, b.metrics.query)
if len(sts.GetCallSite()) == 0 {
columnName := "TotalValue"
Expand All @@ -70,6 +91,13 @@ func (b *singleBlockQuerier) MergeBySpans(ctx context.Context, rows iter.Iterato
sp, _ := opentracing.StartSpanFromContext(ctx, "MergeBySpans - Block")
defer sp.Finish()
sp.SetTag("block ULID", b.meta.ULID.String())

if err := b.Open(ctx); err != nil {
return nil, err
}
b.queries.Add(1)
defer b.queries.Done()

ctx = query.AddMetricsToContext(ctx, b.metrics.query)
r := symdb.NewResolver(ctx, b.symbols)
defer r.Release()
Expand Down

0 comments on commit 685506f

Please sign in to comment.