From 7d8d11429bdcc9bd7400cc951ab48aaf013d2746 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Giedrius=20Statkevi=C4=8Dius?= Date: Wed, 28 Aug 2024 15:03:52 +0300 Subject: [PATCH 1/4] receive: reduce locking MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit blocksToDelete() should only be called when the TSDB is running so there's no point in locking and checking here. Signed-off-by: Giedrius Statkevičius --- pkg/receive/multitsdb.go | 7 ------- 1 file changed, 7 deletions(-) diff --git a/pkg/receive/multitsdb.go b/pkg/receive/multitsdb.go index 3251c2729c..915f8dd241 100644 --- a/pkg/receive/multitsdb.go +++ b/pkg/receive/multitsdb.go @@ -238,13 +238,6 @@ type tenant struct { } func (t *tenant) blocksToDelete(blocks []*tsdb.Block) map[ulid.ULID]struct{} { - t.mtx.RLock() - defer t.mtx.RUnlock() - - if t.tsdb == nil { - return nil - } - deletable := t.blocksToDeleteFn(t.tsdb)(blocks) if t.ship == nil { return deletable From b90ece565f3a9e800a7dfb592c339c99c43cead1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Giedrius=20Statkevi=C4=8Dius?= Date: Wed, 28 Aug 2024 15:10:04 +0300 Subject: [PATCH 2/4] receive/multitsdb: remove shipper method MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Remove shipper() method because we want to hold down the lock while shipper is doing its stuff. This is because Sync() is not concurrency safe and pruning might happen in the background. Signed-off-by: Giedrius Statkevičius --- pkg/receive/multitsdb.go | 11 ++++------- 1 file changed, 4 insertions(+), 7 deletions(-) diff --git a/pkg/receive/multitsdb.go b/pkg/receive/multitsdb.go index 915f8dd241..d27d385f90 100644 --- a/pkg/receive/multitsdb.go +++ b/pkg/receive/multitsdb.go @@ -288,12 +288,6 @@ func (t *tenant) exemplars() *exemplars.TSDB { return t.exemplarsTSDB } -func (t *tenant) shipper() *shipper.Shipper { - t.mtx.RLock() - defer t.mtx.RUnlock() - return t.ship -} - func (t *tenant) set(storeTSDB *store.TSDBStore, tenantTSDB *tsdb.DB, ship *shipper.Shipper, exemplarsTSDB *exemplars.TSDB) { t.readyS.Set(tenantTSDB) t.mtx.Lock() @@ -558,13 +552,16 @@ func (t *MultiTSDB) Sync(ctx context.Context) (int, error) { for tenantID, tenant := range t.tenants { level.Debug(t.logger).Log("msg", "uploading block for tenant", "tenant", tenantID) - s := tenant.shipper() + tenant.mtx.RLock() + s := tenant.ship if s == nil { + tenant.mtx.RUnlock() continue } wg.Add(1) go func() { up, err := s.Sync(ctx) + tenant.mtx.RUnlock() if err != nil { errmtx.Lock() merr.Add(errors.Wrap(err, "upload")) From e95fa4c7e0c006e16ac1e2ab2fad124b67859e11 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Giedrius=20Statkevi=C4=8Dius?= Date: Wed, 28 Aug 2024 15:14:18 +0300 Subject: [PATCH 3/4] receive: fix linter MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Giedrius Statkevičius --- pkg/receive/multitsdb.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/pkg/receive/multitsdb.go b/pkg/receive/multitsdb.go index d27d385f90..1bc147fe12 100644 --- a/pkg/receive/multitsdb.go +++ b/pkg/receive/multitsdb.go @@ -551,6 +551,8 @@ func (t *MultiTSDB) Sync(ctx context.Context) (int, error) { ) for tenantID, tenant := range t.tenants { + tenant := tenant + level.Debug(t.logger).Log("msg", "uploading block for tenant", "tenant", tenantID) tenant.mtx.RLock() s := tenant.ship From f1608c030935dba28459c7d113eac4d3bd949eb6 Mon Sep 17 00:00:00 2001 From: Harry John Date: Mon, 12 Aug 2024 11:15:28 -0700 Subject: [PATCH 4/4] Fix failing e2e test (#7620) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: 🌲 Harry 🌊 John 🏔 --- test/e2e/receive_test.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/test/e2e/receive_test.go b/test/e2e/receive_test.go index 2c8cbb2fa5..ca0b631251 100644 --- a/test/e2e/receive_test.go +++ b/test/e2e/receive_test.go @@ -843,9 +843,9 @@ test_metric{a="2", b="2"} 1`) e2ethanos.AvalancheOptions{ MetricCount: "10", SeriesCount: "1", - MetricInterval: "30", - SeriesInterval: "3600", - ValueInterval: "3600", + MetricInterval: "3600", + SeriesInterval: "30", + ValueInterval: "30", RemoteURL: e2ethanos.RemoteWriteEndpoint(ingestor1.InternalEndpoint("remote-write")), RemoteWriteInterval: "30s", @@ -881,9 +881,9 @@ test_metric{a="2", b="2"} 1`) e2ethanos.AvalancheOptions{ MetricCount: "10", SeriesCount: "1", - MetricInterval: "30", - SeriesInterval: "3600", - ValueInterval: "3600", + MetricInterval: "3600", + SeriesInterval: "30", + ValueInterval: "30", RemoteURL: e2ethanos.RemoteWriteEndpoint(ingestor1.InternalEndpoint("remote-write")), RemoteWriteInterval: "30s",