From 685506f01b4dd31de0d5d13575f9cd42301a4ddf Mon Sep 17 00:00:00 2001 From: Anton Kolesnikov Date: Tue, 27 Feb 2024 19:25:02 +0800 Subject: [PATCH] fix: wait for ongoing queries to finish at close (#3030) --- pkg/phlaredb/block_querier.go | 54 ++++++++++++++++++++++++----------- pkg/phlaredb/sample_merge.go | 28 ++++++++++++++++++ 2 files changed, 66 insertions(+), 16 deletions(-) diff --git a/pkg/phlaredb/block_querier.go b/pkg/phlaredb/block_querier.go index 22d83dfafb..3da1f8cb4e 100644 --- a/pkg/phlaredb/block_querier.go +++ b/pkg/phlaredb/block_querier.go @@ -299,6 +299,7 @@ type singleBlockQuerier struct { tables []tableReader + queries sync.WaitGroup openLock sync.Mutex opened bool index *index.Reader @@ -353,10 +354,11 @@ func (b *singleBlockQuerier) ProfileTypes(ctx context.Context, req *connect.Requ sp, ctx := opentracing.StartSpanFromContext(ctx, "ProfileTypes Block") defer sp.Finish() - err := b.Open(ctx) - if err != nil { + if err := b.Open(ctx); err != nil { return nil, err } + b.queries.Add(1) + defer b.queries.Done() values, err := b.index.LabelValues(phlaremodel.LabelNameProfileType) if err != nil { @@ -384,10 +386,11 @@ func (b *singleBlockQuerier) LabelValues(ctx context.Context, req *connect.Reque params := req.Msg - err := b.Open(ctx) - if err != nil { + if err := b.Open(ctx); err != nil { return nil, err } + b.queries.Add(1) + defer b.queries.Done() names, err := b.index.LabelNames() if err != nil { @@ -451,10 +454,11 @@ func (b *singleBlockQuerier) LabelNames(ctx context.Context, req *connect.Reques params := req.Msg - err := b.Open(ctx) - if err != nil { + if err := b.Open(ctx); err != nil { return nil, err } + b.queries.Add(1) + defer b.queries.Done() selectors, err := parseSelectors(params.Matchers) if err != nil { @@ -516,6 +520,12 @@ func (b *singleBlockQuerier) Close() error { b.openLock.Unlock() b.metrics.blockOpened.Dec() }() + + if !b.opened { + return nil + } + b.queries.Wait() + errs := multierror.New() if b.index != nil { err := b.index.Close() @@ -524,7 +534,6 @@ func (b *singleBlockQuerier) Close() error { errs.Add(err) } } - for _, t := range b.tables { if err := t.Close(); err != nil { errs.Add(err) @@ -1523,6 +1532,9 @@ func (b *singleBlockQuerier) SelectMatchingProfiles(ctx context.Context, params if err := b.Open(ctx); err != nil { return nil, err } + b.queries.Add(1) + defer b.queries.Done() + matchers, err := parser.ParseMetricSelector(params.LabelSelector) if err != nil { return nil, status.Error(codes.InvalidArgument, "failed to parse label selectors: "+err.Error()) @@ -1627,6 +1639,9 @@ func (b *singleBlockQuerier) SelectMergeByLabels( if err := b.Open(ctx); err != nil { return nil, err } + b.queries.Add(1) + defer b.queries.Done() + matchers, err := parser.ParseMetricSelector(params.LabelSelector) if err != nil { return nil, status.Error(codes.InvalidArgument, "failed to parse label selectors: "+err.Error()) @@ -1704,6 +1719,9 @@ func (b *singleBlockQuerier) SelectMergeByStacktraces(ctx context.Context, param if err := b.Open(ctx); err != nil { return nil, err } + b.queries.Add(1) + defer b.queries.Done() + matchers, err := parser.ParseMetricSelector(params.LabelSelector) if err != nil { return nil, status.Error(codes.InvalidArgument, "failed to parse label selectors: "+err.Error()) @@ -1770,6 +1788,9 @@ func (b *singleBlockQuerier) SelectMergeBySpans(ctx context.Context, params *ing if err := b.Open(ctx); err != nil { return nil, err } + b.queries.Add(1) + defer b.queries.Done() + matchers, err := parser.ParseMetricSelector(params.LabelSelector) if err != nil { return nil, status.Error(codes.InvalidArgument, "failed to parse label selectors: "+err.Error()) @@ -1835,6 +1856,9 @@ func (b *singleBlockQuerier) SelectMergePprof(ctx context.Context, params *inges if err := b.Open(ctx); err != nil { return nil, err } + b.queries.Add(1) + defer b.queries.Done() + matchers, err := parser.ParseMetricSelector(params.LabelSelector) if err != nil { return nil, status.Error(codes.InvalidArgument, "failed to parse label selectors: "+err.Error()) @@ -1902,10 +1926,11 @@ func (b *singleBlockQuerier) Series(ctx context.Context, params *ingestv1.Series sp, ctx := opentracing.StartSpanFromContext(ctx, "Series Block") defer sp.Finish() - err := b.Open(ctx) - if err != nil { + if err := b.Open(ctx); err != nil { return nil, err } + b.queries.Add(1) + defer b.queries.Done() selectors, err := parseSelectors(params.Matchers) if err != nil { @@ -2056,13 +2081,10 @@ func (q *singleBlockQuerier) openTSDBIndex(ctx context.Context) error { func (q *singleBlockQuerier) Open(ctx context.Context) error { q.openLock.Lock() defer q.openLock.Unlock() - - // already open - if q.opened { - return nil - } - if err := q.openFiles(ctx); err != nil { - return err + if !q.opened { + if err := q.openFiles(ctx); err != nil { + return err + } } q.metrics.blockOpened.Inc() q.opened = true diff --git a/pkg/phlaredb/sample_merge.go b/pkg/phlaredb/sample_merge.go index 762f403942..801c385bdd 100644 --- a/pkg/phlaredb/sample_merge.go +++ b/pkg/phlaredb/sample_merge.go @@ -24,6 +24,13 @@ func (b *singleBlockQuerier) MergeByStacktraces(ctx context.Context, rows iter.I sp, ctx := opentracing.StartSpanFromContext(ctx, "MergeByStacktraces - Block") defer sp.Finish() sp.SetTag("block ULID", b.meta.ULID.String()) + + if err := b.Open(ctx); err != nil { + return nil, err + } + b.queries.Add(1) + defer b.queries.Done() + ctx = query.AddMetricsToContext(ctx, b.metrics.query) r := symdb.NewResolver(ctx, b.symbols) defer r.Release() @@ -37,6 +44,13 @@ func (b *singleBlockQuerier) MergePprof(ctx context.Context, rows iter.Iterator[ sp, ctx := opentracing.StartSpanFromContext(ctx, "MergePprof - Block") defer sp.Finish() sp.SetTag("block ULID", b.meta.ULID.String()) + + if err := b.Open(ctx); err != nil { + return nil, err + } + b.queries.Add(1) + defer b.queries.Done() + ctx = query.AddMetricsToContext(ctx, b.metrics.query) r := symdb.NewResolver(ctx, b.symbols, symdb.WithResolverMaxNodes(maxNodes), @@ -52,6 +66,13 @@ func (b *singleBlockQuerier) MergeByLabels(ctx context.Context, rows iter.Iterat sp, ctx := opentracing.StartSpanFromContext(ctx, "MergeByLabels - Block") defer sp.Finish() sp.SetTag("block ULID", b.meta.ULID.String()) + + if err := b.Open(ctx); err != nil { + return nil, err + } + b.queries.Add(1) + defer b.queries.Done() + ctx = query.AddMetricsToContext(ctx, b.metrics.query) if len(sts.GetCallSite()) == 0 { columnName := "TotalValue" @@ -70,6 +91,13 @@ func (b *singleBlockQuerier) MergeBySpans(ctx context.Context, rows iter.Iterato sp, _ := opentracing.StartSpanFromContext(ctx, "MergeBySpans - Block") defer sp.Finish() sp.SetTag("block ULID", b.meta.ULID.String()) + + if err := b.Open(ctx); err != nil { + return nil, err + } + b.queries.Add(1) + defer b.queries.Done() + ctx = query.AddMetricsToContext(ctx, b.metrics.query) r := symdb.NewResolver(ctx, b.symbols) defer r.Release()