Skip to content

Commit

Permalink
receive/multitsdb: don't forget to close the store
Browse files Browse the repository at this point in the history
Don't forget to close the store. Trying to reproduce prod problems but
cannot trigger them :/

Signed-off-by: Giedrius Statkevičius <giedrius.statkevicius@vinted.com>
  • Loading branch information
GiedriusS committed Aug 28, 2024
1 parent e0221e6 commit 7542f2d
Show file tree
Hide file tree
Showing 12 changed files with 117 additions and 20 deletions.
1 change: 1 addition & 0 deletions cmd/thanos/receive.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,7 @@ func runReceive(
bkt,
conf.allowOutOfOrderUpload,
hashFunc,
15*time.Second,
)
writer := receive.NewWriter(log.With(logger, "component", "receive-writer"), dbs, &receive.WriterOptions{
Intern: conf.writerInterning,
Expand Down
2 changes: 1 addition & 1 deletion cmd/thanos/rule.go
Original file line number Diff line number Diff line change
Expand Up @@ -736,7 +736,7 @@ func runRule(
}
infoOptions := []info.ServerOptionFunc{info.WithRulesInfoFunc()}
if tsdbDB != nil {
tsdbStore := store.NewTSDBStore(logger, tsdbDB, component.Rule, conf.lset)
tsdbStore := store.NewTSDBStore(logger, tsdbDB, component.Rule, conf.lset, 15*time.Second)
infoOptions = append(
infoOptions,
info.WithLabelSetFunc(func() []labelpb.ZLabelSet {
Expand Down
17 changes: 10 additions & 7 deletions pkg/api/query/v1_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ func TestQueryEndpoints(t *testing.T) {
baseAPI: &baseAPI.BaseAPI{
Now: func() time.Time { return now },
},
queryableCreate: query.NewQueryableCreator(nil, nil, newProxyStoreWithTSDBStore(db), 2, timeout),
queryableCreate: query.NewQueryableCreator(nil, nil, newProxyStoreWithTSDBStore(t, db), 2, timeout),
engineFactory: ef,
defaultEngine: PromqlEnginePrometheus,
lookbackDeltaCreate: func(m int64) time.Duration { return time.Duration(0) },
Expand Down Expand Up @@ -648,7 +648,7 @@ func TestQueryExplainEndpoints(t *testing.T) {
baseAPI: &baseAPI.BaseAPI{
Now: func() time.Time { return now },
},
queryableCreate: query.NewQueryableCreator(nil, nil, newProxyStoreWithTSDBStore(db), 2, timeout),
queryableCreate: query.NewQueryableCreator(nil, nil, newProxyStoreWithTSDBStore(t, db), 2, timeout),
engineFactory: ef,
defaultEngine: PromqlEnginePrometheus,
lookbackDeltaCreate: func(m int64) time.Duration { return time.Duration(0) },
Expand Down Expand Up @@ -712,7 +712,7 @@ func TestQueryAnalyzeEndpoints(t *testing.T) {
baseAPI: &baseAPI.BaseAPI{
Now: func() time.Time { return now },
},
queryableCreate: query.NewQueryableCreator(nil, nil, newProxyStoreWithTSDBStore(db), 2, timeout),
queryableCreate: query.NewQueryableCreator(nil, nil, newProxyStoreWithTSDBStore(t, db), 2, timeout),
engineFactory: ef,
defaultEngine: PromqlEnginePrometheus,
lookbackDeltaCreate: func(m int64) time.Duration { return time.Duration(0) },
Expand Down Expand Up @@ -777,10 +777,13 @@ func TestQueryAnalyzeEndpoints(t *testing.T) {
}
}

func newProxyStoreWithTSDBStore(db store.TSDBReader) *store.ProxyStore {
func newProxyStoreWithTSDBStore(t *testing.T, db store.TSDBReader) *store.ProxyStore {
tsdbStore := store.NewTSDBStore(nil, db, component.Query, nil, 15*time.Second)
t.Cleanup(tsdbStore.Close)

c := &storetestutil.TestClient{
Name: "1",
StoreClient: storepb.ServerAsClient(store.NewTSDBStore(nil, db, component.Query, nil), 0),
StoreClient: storepb.ServerAsClient(tsdbStore, 0),
MinTime: math.MinInt64, MaxTime: math.MaxInt64,
}

Expand Down Expand Up @@ -886,7 +889,7 @@ func TestMetadataEndpoints(t *testing.T) {
baseAPI: &baseAPI.BaseAPI{
Now: func() time.Time { return now },
},
queryableCreate: query.NewQueryableCreator(nil, nil, newProxyStoreWithTSDBStore(db), 2, timeout),
queryableCreate: query.NewQueryableCreator(nil, nil, newProxyStoreWithTSDBStore(t, db), 2, timeout),
engineFactory: ef,
defaultEngine: PromqlEnginePrometheus,
lookbackDeltaCreate: func(m int64) time.Duration { return time.Duration(0) },
Expand All @@ -902,7 +905,7 @@ func TestMetadataEndpoints(t *testing.T) {
baseAPI: &baseAPI.BaseAPI{
Now: func() time.Time { return now },
},
queryableCreate: query.NewQueryableCreator(nil, nil, newProxyStoreWithTSDBStore(db), 2, timeout),
queryableCreate: query.NewQueryableCreator(nil, nil, newProxyStoreWithTSDBStore(t, db), 2, timeout),
engineFactory: ef,
defaultEngine: PromqlEnginePrometheus,
lookbackDeltaCreate: func(m int64) time.Duration { return time.Duration(0) },
Expand Down
5 changes: 4 additions & 1 deletion pkg/query/query_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,13 @@ func TestQuerier_Proxy(t *testing.T) {
m, err := storepb.PromMatchersToMatchers(st.matchers...)
testutil.Ok(t, err)

tsdbStore := store.NewTSDBStore(logger, st.storage.DB, component.Debug, nil, 15*time.Second)
t.Cleanup(tsdbStore.Close)

// TODO(bwplotka): Parse external labels.
clients = append(clients, &storetestutil.TestClient{
Name: fmt.Sprintf("store number %v", i),
StoreClient: storepb.ServerAsClient(selectedStore(store.NewTSDBStore(logger, st.storage.DB, component.Debug, nil), m, st.mint, st.maxt), 0),
StoreClient: storepb.ServerAsClient(selectedStore(tsdbStore, m, st.mint, st.maxt), 0),
MinTime: st.mint,
MaxTime: st.maxt,
})
Expand Down
1 change: 1 addition & 0 deletions pkg/receive/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1005,6 +1005,7 @@ func benchmarkHandlerMultiTSDBReceiveRemoteWrite(b testutil.TB) {
nil,
false,
metadata.NoneFunc,
15*time.Second,
)
defer func() { testutil.Ok(b, m.Close()) }()
handler.writer = NewWriter(logger, m, &WriterOptions{})
Expand Down
14 changes: 12 additions & 2 deletions pkg/receive/multitsdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ type MultiTSDB struct {
allowOutOfOrderUpload bool
hashFunc metadata.HashFunc
hashringConfigs []HashringConfig
filterInterval time.Duration
}

// NewMultiTSDB creates new MultiTSDB.
Expand All @@ -81,6 +82,7 @@ func NewMultiTSDB(
bucket objstore.Bucket,
allowOutOfOrderUpload bool,
hashFunc metadata.HashFunc,
filterInterval time.Duration,
) *MultiTSDB {
if l == nil {
l = log.NewNopLogger()
Expand All @@ -98,6 +100,7 @@ func NewMultiTSDB(
bucket: bucket,
allowOutOfOrderUpload: allowOutOfOrderUpload,
hashFunc: hashFunc,
filterInterval: filterInterval,
}
}

Expand Down Expand Up @@ -398,6 +401,7 @@ func (t *MultiTSDB) Close() error {
level.Error(t.logger).Log("msg", "closing TSDB failed; not ready", "tenant", id)
continue
}
tenant.storeTSDB.Close()
level.Info(t.logger).Log("msg", "closing TSDB", "tenant", id)
merr.Add(db.Close())
}
Expand Down Expand Up @@ -564,14 +568,20 @@ func (t *MultiTSDB) Sync(ctx context.Context) (int, error) {
)

for tenantID, tenant := range t.tenants {
tenant := tenant

level.Debug(t.logger).Log("msg", "uploading block for tenant", "tenant", tenantID)
s := tenant.shipper()

tenant.mtx.RLock()
s := tenant.ship
if s == nil {
tenant.mtx.RUnlock()
continue
}
wg.Add(1)
go func() {
up, err := s.Sync(ctx)
tenant.mtx.RUnlock()
if err != nil {
errmtx.Lock()
merr.Add(errors.Wrap(err, "upload"))
Expand Down Expand Up @@ -736,7 +746,7 @@ func (t *MultiTSDB) startTSDB(logger log.Logger, tenantID string, tenant *tenant
shipper.DefaultMetaFilename,
)
}
tenant.set(store.NewTSDBStore(logger, s, component.Receive, lset), s, ship, exemplars.NewTSDB(s, lset))
tenant.set(store.NewTSDBStore(logger, s, component.Receive, lset, t.filterInterval), s, ship, exemplars.NewTSDB(s, lset))
level.Info(logger).Log("msg", "TSDB is now ready")
return nil
}
Expand Down
70 changes: 70 additions & 0 deletions pkg/receive/multitsdb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/efficientgo/core/testutil"
"github.com/go-kit/log"
"github.com/oklog/ulid"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/prometheus/model/exemplar"
"github.com/prometheus/prometheus/model/labels"
Expand Down Expand Up @@ -57,6 +58,7 @@ func TestMultiTSDB(t *testing.T) {
nil,
false,
metadata.NoneFunc,
15*time.Second,
)
defer func() { testutil.Ok(t, m.Close()) }()

Expand Down Expand Up @@ -141,6 +143,7 @@ func TestMultiTSDB(t *testing.T) {
nil,
false,
metadata.NoneFunc,
15*time.Second,
)
defer func() { testutil.Ok(t, m.Close()) }()

Expand Down Expand Up @@ -184,6 +187,7 @@ func TestMultiTSDB(t *testing.T) {
nil,
false,
metadata.NoneFunc,
15*time.Second,
)
defer func() { testutil.Ok(t, m.Close()) }()

Expand Down Expand Up @@ -415,6 +419,65 @@ func checkExemplarsResponse(t *testing.T, expected, data []exemplarspb.ExemplarD
}
}

func TestMultiTSDBPruneWithMetricNameFilter(t *testing.T) {
dir := t.TempDir()

m := NewMultiTSDB(dir, log.NewLogfmtLogger(os.Stderr), prometheus.NewRegistry(),
&tsdb.Options{
MinBlockDuration: (2 * time.Hour).Milliseconds(),
MaxBlockDuration: (2 * time.Hour).Milliseconds(),
RetentionDuration: (6 * time.Hour).Milliseconds(),
},
labels.FromStrings("replica", "test"),
"tenant_id",
nil,
false,
metadata.NoneFunc,
1*time.Nanosecond,
)
defer func() { testutil.Ok(t, m.Close()) }()

wg := &sync.WaitGroup{}
ctx, cancel := context.WithCancel(context.Background())

wg.Add(1)
go func() {
defer wg.Done()
for {
select {
case <-ctx.Done():
return
default:
for i := 0; i < 100; i++ {
err := appendSample(m, "deleted-tenant", time.UnixMilli(int64(10+i)))
if errors.Is(err, tsdb.ErrNotReady) || errors.Is(err, ErrNotReady) {
continue
}
testutil.Ok(t, err)
}
}
}
}()

wg.Add(1)
go func() {
defer wg.Done()
for {
select {
case <-ctx.Done():
return
default:
testutil.Ok(t, m.Prune(ctx))
}
}
}()

time.Sleep(15 * time.Second)

cancel()
wg.Wait()
}

func TestMultiTSDBPrune(t *testing.T) {
tests := []struct {
name string
Expand Down Expand Up @@ -451,6 +514,7 @@ func TestMultiTSDBPrune(t *testing.T) {
test.bucket,
false,
metadata.NoneFunc,
15*time.Second,
)
defer func() { testutil.Ok(t, m.Close()) }()

Expand Down Expand Up @@ -520,6 +584,7 @@ func TestMultiTSDBRecreatePrunedTenant(t *testing.T) {
objstore.NewInMemBucket(),
false,
metadata.NoneFunc,
15*time.Second,
)
defer func() { testutil.Ok(t, m.Close()) }()

Expand Down Expand Up @@ -581,6 +646,7 @@ func TestAlignedHeadFlush(t *testing.T) {
test.bucket,
false,
metadata.NoneFunc,
15*time.Second,
)
defer func() { testutil.Ok(t, m.Close()) }()

Expand Down Expand Up @@ -655,6 +721,7 @@ func TestMultiTSDBStats(t *testing.T) {
nil,
false,
metadata.NoneFunc,
15*time.Second,
)
defer func() { testutil.Ok(t, m.Close()) }()

Expand Down Expand Up @@ -684,6 +751,7 @@ func TestMultiTSDBWithNilStore(t *testing.T) {
nil,
false,
metadata.NoneFunc,
15*time.Second,
)
defer func() { testutil.Ok(t, m.Close()) }()

Expand Down Expand Up @@ -725,6 +793,7 @@ func TestProxyLabelValues(t *testing.T) {
nil,
false,
metadata.NoneFunc,
15*time.Second,
)
defer func() { testutil.Ok(t, m.Close()) }()

Expand Down Expand Up @@ -815,6 +884,7 @@ func BenchmarkMultiTSDB(b *testing.B) {
nil,
false,
metadata.NoneFunc,
15*time.Second,
)
defer func() { testutil.Ok(b, m.Close()) }()

Expand Down
1 change: 1 addition & 0 deletions pkg/receive/receive_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -815,6 +815,7 @@ func initializeMultiTSDB(dir string) *MultiTSDB {
bucket,
false,
metadata.NoneFunc,
15*time.Second,
)

return m
Expand Down
2 changes: 2 additions & 0 deletions pkg/receive/writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -344,6 +344,7 @@ func TestWriter(t *testing.T) {
nil,
false,
metadata.NoneFunc,
15*time.Second,
)
t.Cleanup(func() { testutil.Ok(t, m.Close()) })

Expand Down Expand Up @@ -436,6 +437,7 @@ func benchmarkWriter(b *testing.B, labelsNum int, seriesNum int, generateHistogr
nil,
false,
metadata.NoneFunc,
15*time.Second,
)
b.Cleanup(func() { testutil.Ok(b, m.Close()) })

Expand Down
6 changes: 5 additions & 1 deletion pkg/store/acceptance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -983,7 +983,11 @@ func TestTSDBStore_Acceptance(t *testing.T) {
tt.Cleanup(func() { testutil.Ok(tt, db.Close()) })
appendFn(db.Appender(context.Background()))

return NewTSDBStore(nil, db, component.Rule, extLset)
st := NewTSDBStore(nil, db, component.Rule, extLset, 15*time.Second)

tt.Cleanup(st.Close)

return st
}

testStoreAPIsAcceptance(t, startStore)
Expand Down
4 changes: 2 additions & 2 deletions pkg/store/tsdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ type ReadWriteTSDBStore struct {

// NewTSDBStore creates a new TSDBStore.
// NOTE: Given lset has to be sorted.
func NewTSDBStore(logger log.Logger, db TSDBReader, component component.StoreAPI, extLset labels.Labels) *TSDBStore {
func NewTSDBStore(logger log.Logger, db TSDBReader, component component.StoreAPI, extLset labels.Labels, updateInterval time.Duration) *TSDBStore {
if logger == nil {
logger = log.NewNopLogger()
}
Expand All @@ -91,7 +91,7 @@ func NewTSDBStore(logger log.Logger, db TSDBReader, component component.StoreAPI
MetricNameFilter: filter.NewCuckooFilterMetricNameFilter(1000000),
}

t := time.NewTicker(15 * time.Second)
t := time.NewTicker(updateInterval)
ctx, cancel := context.WithCancel(context.Background())
updateMetricNames := func() {
vals, err := st.LabelValues(context.Background(), &storepb.LabelValuesRequest{
Expand Down
Loading

0 comments on commit 7542f2d

Please sign in to comment.