Skip to content
This repository was archived by the owner on Aug 23, 2023. It is now read-only.

update cass/bt index when deleting tagged metrics #1657

Merged
merged 2 commits into from
Jan 31, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion api/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,11 @@ func (s *Server) indexTagDelSeries(ctx *middleware.Context, request models.Index
return
}

deleted := s.MetricIndex.DeleteTagged(request.OrgId, query)
deleted, err := s.MetricIndex.DeleteTagged(request.OrgId, query)
if err != nil {
response.Write(ctx, response.WrapErrorForTagDB(err))
return
}
res.Count += len(deleted)
}

Expand Down
7 changes: 6 additions & 1 deletion api/graphite.go
Original file line number Diff line number Diff line change
Expand Up @@ -1278,7 +1278,12 @@ func (s *Server) graphiteTagDelSeries(ctx *middleware.Context, request models.Gr
return
}

deleted := s.MetricIndex.DeleteTagged(ctx.OrgId, query)
deleted, err := s.MetricIndex.DeleteTagged(ctx.OrgId, query)
if err != nil {
response.Write(ctx, response.WrapErrorForTagDB(err))
return
}

res.Count += len(deleted)
}
}
Expand Down
24 changes: 24 additions & 0 deletions idx/bigtable/bigtable.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

"cloud.google.com/go/bigtable"
"github.com/grafana/metrictank/cluster"
"github.com/grafana/metrictank/expr/tagquery"
"github.com/grafana/metrictank/idx"
"github.com/grafana/metrictank/idx/memory"
"github.com/grafana/metrictank/schema"
Expand Down Expand Up @@ -492,6 +493,29 @@ func (b *BigtableIdx) Delete(orgId uint32, pattern string) ([]idx.Archive, error
}
}
}

statDeleteDuration.Value(time.Since(pre))
return defs, err
}

func (b *BigtableIdx) DeleteTagged(orgId uint32, query tagquery.Query) ([]idx.Archive, error) {
pre := time.Now()
defs, err := b.MemoryIndex.DeleteTagged(orgId, query)
if err != nil {
return nil, err
}

if b.cfg.UpdateBigtableIdx {
for _, def := range defs {
delErr := b.deleteDef(&def.MetricDefinition)
// the last error encountered will be passed back to the caller
if delErr != nil {
log.Errorf("bigtable-idx: Failed to delete def %s: %s", def.MetricDefinition.Id, delErr.Error())
err = delErr
}
}
}

statDeleteDuration.Value(time.Since(pre))
return defs, err
}
Expand Down
22 changes: 22 additions & 0 deletions idx/cassandra/cassandra.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/gocql/gocql"
"github.com/grafana/metrictank/cassandra"
"github.com/grafana/metrictank/cluster"
"github.com/grafana/metrictank/expr/tagquery"
"github.com/grafana/metrictank/idx"
"github.com/grafana/metrictank/idx/memory"
"github.com/grafana/metrictank/schema"
Expand Down Expand Up @@ -680,6 +681,27 @@ func (c *CasIdx) Delete(orgId uint32, pattern string) ([]idx.Archive, error) {
return defs, err
}

func (c *CasIdx) DeleteTagged(orgId uint32, query tagquery.Query) ([]idx.Archive, error) {
pre := time.Now()
defs, err := c.MemoryIndex.DeleteTagged(orgId, query)
if err != nil {
return nil, err
}

if c.Config.updateCassIdx {
for _, def := range defs {
delErr := c.deleteDef(def.Id, def.Partition)
if delErr != nil {
log.Errorf("cassandra-idx: %s", delErr.Error())
err = delErr
}
}
}

statDeleteDuration.Value(time.Since(pre))
return defs, err
}

func (c *CasIdx) deleteDef(key schema.MKey, part int32) error {
pre := time.Now()
attempts := 0
Expand Down
2 changes: 1 addition & 1 deletion idx/idx.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ type MetricIndex interface {

// DeleteTagged deletes the series returned by the given query from the tag index
// and also the DefById index.
DeleteTagged(orgId uint32, query tagquery.Query) []Archive
DeleteTagged(orgId uint32, query tagquery.Query) ([]Archive, error)

// MetaTagRecordUpsert inserts, updates or deletes a meta record, depending on
// whether it already exists or is new. The identity of a record is determined
Expand Down
6 changes: 3 additions & 3 deletions idx/memory/memory.go
Original file line number Diff line number Diff line change
Expand Up @@ -1755,10 +1755,10 @@ func (m *UnpartitionedMemoryIdx) List(orgId uint32) []idx.Archive {
return defs
}

func (m *UnpartitionedMemoryIdx) DeleteTagged(orgId uint32, query tagquery.Query) []idx.Archive {
func (m *UnpartitionedMemoryIdx) DeleteTagged(orgId uint32, query tagquery.Query) ([]idx.Archive, error) {
if !TagSupport {
log.Warn("memory-idx: received tag query, but tag support is disabled")
return nil
return nil, nil
}

queryCtx := NewTagQueryContext(query)
Expand All @@ -1774,7 +1774,7 @@ func (m *UnpartitionedMemoryIdx) DeleteTagged(orgId uint32, query tagquery.Query

m.Lock()
defer m.Unlock()
return m.deleteTaggedByIdSet(orgId, ids)
return m.deleteTaggedByIdSet(orgId, ids), nil
}

// deleteTaggedByIdSet deletes a map of ids from the tag index and also the DefByIds
Expand Down
3 changes: 2 additions & 1 deletion idx/memory/memory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -484,7 +484,8 @@ func testDeleteTagged(t *testing.T) {
So(err, ShouldBeNil)
query, err := tagquery.NewQueryFromStrings(tags.Strings(), 0)
So(err, ShouldBeNil)
ids := ix.DeleteTagged(1, query)
ids, err := ix.DeleteTagged(1, query)
So(err, ShouldBeNil)
So(ids, ShouldHaveLength, 1)
So(ids[0].Id.String(), ShouldEqual, org1Series[3].Id)
Convey("series should not be present in the metricDef index", func() {
Expand Down
15 changes: 11 additions & 4 deletions idx/memory/partitioned_idx.go
Original file line number Diff line number Diff line change
Expand Up @@ -485,19 +485,26 @@ func (p *PartitionedMemoryIdx) FindTagValuesWithQuery(orgId uint32, tag, prefix

// DeleteTagged deletes the specified series from the tag index and also the
// DefById index.
func (p *PartitionedMemoryIdx) DeleteTagged(orgId uint32, query tagquery.Query) []idx.Archive {
func (p *PartitionedMemoryIdx) DeleteTagged(orgId uint32, query tagquery.Query) ([]idx.Archive, error) {
g, _ := errgroup.WithContext(context.Background())
result := make([][]idx.Archive, len(p.Partition))
var i int
for _, m := range p.Partition {
pos, m := i, m
g.Go(func() error {
result[pos] = m.DeleteTagged(orgId, query)
var err error
result[pos], err = m.DeleteTagged(orgId, query)
if err != nil {
return err
}
return nil
})
i++
}
g.Wait()
err := g.Wait()
if err != nil {
return nil, err
}

// get our total count, so we can allocate our response in one go.
items := 0
Expand All @@ -510,7 +517,7 @@ func (p *PartitionedMemoryIdx) DeleteTagged(orgId uint32, query tagquery.Query)
response = append(response, r...)
}

return response
return response, nil
}

// Used to rebuild the index from an existing set of metricDefinitions.
Expand Down