diff --git a/api/cluster.go b/api/cluster.go index 8d181184ac..14a5149ccc 100644 --- a/api/cluster.go +++ b/api/cluster.go @@ -283,7 +283,11 @@ func (s *Server) indexTagDelSeries(ctx *middleware.Context, request models.Index return } - deleted := s.MetricIndex.DeleteTagged(request.OrgId, query) + deleted, err := s.MetricIndex.DeleteTagged(request.OrgId, query) + if err != nil { + response.Write(ctx, response.WrapErrorForTagDB(err)) + return + } res.Count += len(deleted) } diff --git a/api/graphite.go b/api/graphite.go index 09ed0056be..03a5f1d7c6 100644 --- a/api/graphite.go +++ b/api/graphite.go @@ -1278,7 +1278,12 @@ func (s *Server) graphiteTagDelSeries(ctx *middleware.Context, request models.Gr return } - deleted := s.MetricIndex.DeleteTagged(ctx.OrgId, query) + deleted, err := s.MetricIndex.DeleteTagged(ctx.OrgId, query) + if err != nil { + response.Write(ctx, response.WrapErrorForTagDB(err)) + return + } + res.Count += len(deleted) } } diff --git a/idx/bigtable/bigtable.go b/idx/bigtable/bigtable.go index 305c79cdfa..373a7e5a3b 100644 --- a/idx/bigtable/bigtable.go +++ b/idx/bigtable/bigtable.go @@ -8,6 +8,7 @@ import ( "cloud.google.com/go/bigtable" "github.com/grafana/metrictank/cluster" + "github.com/grafana/metrictank/expr/tagquery" "github.com/grafana/metrictank/idx" "github.com/grafana/metrictank/idx/memory" "github.com/grafana/metrictank/schema" @@ -482,18 +483,47 @@ func (b *BigtableIdx) Delete(orgId uint32, pattern string) ([]idx.Archive, error if err != nil { return defs, err } + + err = b.deleteDefs(defs) + if err != nil { + return nil, err + } + + statDeleteDuration.Value(time.Since(pre)) + return defs, err +} + +func (b *BigtableIdx) DeleteTagged(orgId uint32, query tagquery.Query) ([]idx.Archive, error) { + pre := time.Now() + defs, err := b.MemoryIndex.DeleteTagged(orgId, query) + if err != nil { + return nil, err + } + + err = b.deleteDefs(defs) + if err != nil { + return nil, err + } + + statDeleteDuration.Value(time.Since(pre)) + return defs, err +} + +func (b *BigtableIdx) deleteDefs(defs []idx.Archive) error { + var err error + if b.cfg.UpdateBigtableIdx { for _, def := range defs { delErr := b.deleteDef(&def.MetricDefinition) // the last error encountered will be passed back to the caller if delErr != nil { - log.Errorf("bigtable-idx: Failed to delete def %s: %s", def.MetricDefinition.Id, err) + log.Errorf("bigtable-idx: Failed to delete def %s: %s", def.MetricDefinition.Id, delErr.Error()) err = delErr } } } - statDeleteDuration.Value(time.Since(pre)) - return defs, err + + return err } func (b *BigtableIdx) deleteDef(def *schema.MetricDefinition) error { diff --git a/idx/cassandra/cassandra.go b/idx/cassandra/cassandra.go index c281208a36..1438d1c821 100644 --- a/idx/cassandra/cassandra.go +++ b/idx/cassandra/cassandra.go @@ -12,6 +12,7 @@ import ( "github.com/gocql/gocql" "github.com/grafana/metrictank/cassandra" "github.com/grafana/metrictank/cluster" + "github.com/grafana/metrictank/expr/tagquery" "github.com/grafana/metrictank/idx" "github.com/grafana/metrictank/idx/memory" "github.com/grafana/metrictank/schema" @@ -668,16 +669,46 @@ func (c *CasIdx) Delete(orgId uint32, pattern string) ([]idx.Archive, error) { if err != nil { return defs, err } + + err = c.deleteDefs(defs) + if err != nil { + return nil, err + } + + statDeleteDuration.Value(time.Since(pre)) + return defs, err +} + +func (c *CasIdx) DeleteTagged(orgId uint32, query tagquery.Query) ([]idx.Archive, error) { + pre := time.Now() + defs, err := c.MemoryIndex.DeleteTagged(orgId, query) + if err != nil { + return nil, err + } + + err = c.deleteDefs(defs) + if err != nil { + return nil, err + } + + statDeleteDuration.Value(time.Since(pre)) + return defs, err +} + +func (c *CasIdx) deleteDefs(defs []idx.Archive) error { + var err error + if c.Config.updateCassIdx { for _, def := range defs { - err = c.deleteDef(def.Id, def.Partition) - if err != nil { - log.Errorf("cassandra-idx: %s", err.Error()) + delErr := c.deleteDef(def.Id, def.Partition) + if delErr != nil { + log.Errorf("cassandra-idx: %s", delErr.Error()) + err = delErr } } } - statDeleteDuration.Value(time.Since(pre)) - return defs, err + + return err } func (c *CasIdx) deleteDef(key schema.MKey, part int32) error { diff --git a/idx/idx.go b/idx/idx.go index 0960d05ec7..6cc90c5a61 100644 --- a/idx/idx.go +++ b/idx/idx.go @@ -149,7 +149,7 @@ type MetricIndex interface { // DeleteTagged deletes the series returned by the given query from the tag index // and also the DefById index. - DeleteTagged(orgId uint32, query tagquery.Query) []Archive + DeleteTagged(orgId uint32, query tagquery.Query) ([]Archive, error) // MetaTagRecordUpsert inserts, updates or deletes a meta record, depending on // whether it already exists or is new. The identity of a record is determined diff --git a/idx/memory/memory.go b/idx/memory/memory.go index 8b9ee0a2b8..2d09afe8ea 100755 --- a/idx/memory/memory.go +++ b/idx/memory/memory.go @@ -1755,10 +1755,10 @@ func (m *UnpartitionedMemoryIdx) List(orgId uint32) []idx.Archive { return defs } -func (m *UnpartitionedMemoryIdx) DeleteTagged(orgId uint32, query tagquery.Query) []idx.Archive { +func (m *UnpartitionedMemoryIdx) DeleteTagged(orgId uint32, query tagquery.Query) ([]idx.Archive, error) { if !TagSupport { log.Warn("memory-idx: received tag query, but tag support is disabled") - return nil + return nil, nil } queryCtx := NewTagQueryContext(query) @@ -1774,7 +1774,7 @@ func (m *UnpartitionedMemoryIdx) DeleteTagged(orgId uint32, query tagquery.Query m.Lock() defer m.Unlock() - return m.deleteTaggedByIdSet(orgId, ids) + return m.deleteTaggedByIdSet(orgId, ids), nil } // deleteTaggedByIdSet deletes a map of ids from the tag index and also the DefByIds diff --git a/idx/memory/memory_test.go b/idx/memory/memory_test.go index 8dd5eab2f7..4a6da39076 100644 --- a/idx/memory/memory_test.go +++ b/idx/memory/memory_test.go @@ -484,7 +484,8 @@ func testDeleteTagged(t *testing.T) { So(err, ShouldBeNil) query, err := tagquery.NewQueryFromStrings(tags.Strings(), 0) So(err, ShouldBeNil) - ids := ix.DeleteTagged(1, query) + ids, err := ix.DeleteTagged(1, query) + So(err, ShouldBeNil) So(ids, ShouldHaveLength, 1) So(ids[0].Id.String(), ShouldEqual, org1Series[3].Id) Convey("series should not be present in the metricDef index", func() { diff --git a/idx/memory/partitioned_idx.go b/idx/memory/partitioned_idx.go index 14e13d2467..776be5d482 100644 --- a/idx/memory/partitioned_idx.go +++ b/idx/memory/partitioned_idx.go @@ -485,19 +485,26 @@ func (p *PartitionedMemoryIdx) FindTagValuesWithQuery(orgId uint32, tag, prefix // DeleteTagged deletes the specified series from the tag index and also the // DefById index. -func (p *PartitionedMemoryIdx) DeleteTagged(orgId uint32, query tagquery.Query) []idx.Archive { +func (p *PartitionedMemoryIdx) DeleteTagged(orgId uint32, query tagquery.Query) ([]idx.Archive, error) { g, _ := errgroup.WithContext(context.Background()) result := make([][]idx.Archive, len(p.Partition)) var i int for _, m := range p.Partition { pos, m := i, m g.Go(func() error { - result[pos] = m.DeleteTagged(orgId, query) + var err error + result[pos], err = m.DeleteTagged(orgId, query) + if err != nil { + return err + } return nil }) i++ } - g.Wait() + err := g.Wait() + if err != nil { + return nil, err + } // get our total count, so we can allocate our response in one go. items := 0 @@ -510,7 +517,7 @@ func (p *PartitionedMemoryIdx) DeleteTagged(orgId uint32, query tagquery.Query) response = append(response, r...) } - return response + return response, nil } // Used to rebuild the index from an existing set of metricDefinitions.