Skip to content
This repository was archived by the owner on Aug 23, 2023. It is now read-only.

Tagged delete #1902

Merged
merged 6 commits into from
Nov 20, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion api/ccache.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func (s *Server) ccacheDelete(ctx *middleware.Context, req models.CCacheDelete)
res.AddError(err)
code = http.StatusBadRequest
} else {
query, err := tagquery.NewQuery(expressions, 0)
query, err := tagquery.NewQuery(expressions, 0, 0)
if err != nil {
res.AddError(err)
code = http.StatusInternalServerError
Expand Down
44 changes: 39 additions & 5 deletions api/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ func (s *Server) indexAutoCompleteTags(ctx *middleware.Context, req models.Index
expressions = append(expressions, "__tag^="+req.Prefix)
}

query, err := tagquery.NewQueryFromStrings(expressions, 0)
query, err := tagquery.NewQueryFromStrings(expressions, 0, 0)
if err != nil {
response.Write(ctx, response.NewError(http.StatusBadRequest, err.Error()))
return
Expand Down Expand Up @@ -240,7 +240,7 @@ func (s *Server) indexAutoCompleteTagValues(ctx *middleware.Context, req models.
expressions = append(expressions, req.Tag+"^="+req.Prefix)
}

query, err := tagquery.NewQueryFromStrings(expressions, 0)
query, err := tagquery.NewQueryFromStrings(expressions, 0, 0)
if err != nil {
response.Write(ctx, response.NewError(http.StatusBadRequest, err.Error()))
return
Expand Down Expand Up @@ -280,7 +280,7 @@ func (s *Server) indexTagDelSeries(ctx *middleware.Context, request models.Index
builder.Reset()
}

query, err := tagquery.NewQuery(expressions, 0)
query, err := tagquery.NewQuery(expressions, 0, 0)
if err != nil {
response.Write(ctx, response.WrapErrorForTagDB(err))
return
Expand All @@ -297,14 +297,48 @@ func (s *Server) indexTagDelSeries(ctx *middleware.Context, request models.Index
response.Write(ctx, response.NewMsgp(200, res))
}

func (s *Server) localTagDelByQuery(request models.IndexTagDelByQuery) (int, error) {
query, err := tagquery.NewQueryFromStrings(request.Expr, 0, request.OlderThan)
if err != nil {
return 0, err
}

if request.Execute {
deleted, err := s.MetricIndex.DeleteTagged(request.OrgId, query)
return len(deleted), err
}
matched, _ := s.MetricIndex.FindTerms(request.OrgId, []string{}, query)
return int(matched), nil
}

func (s *Server) IndexTagDelByQuery(ctx *middleware.Context, request models.IndexTagDelByQuery) {

res := models.IndexTagDelByQueryResp{}

// nothing to do on query nodes.
if s.MetricIndex == nil {
response.Write(ctx, response.NewMsgp(200, res))
return
}

var err error
res.Count, err = s.localTagDelByQuery(request)
if err != nil {
response.Write(ctx, response.WrapErrorForTagDB(err))
return
}

response.Write(ctx, response.NewMsgp(200, res))
}

func (s *Server) IndexTagTerms(ctx *middleware.Context, req models.IndexTagTerms) {
// query nodes don't own any data.
if s.MetricIndex == nil {
response.Write(ctx, response.NewMsgp(200, &models.GraphiteTagTermsResp{}))
return
}

query, err := tagquery.NewQueryFromStrings(req.Expr, 0)
query, err := tagquery.NewQueryFromStrings(req.Expr, 0, 0)
if err != nil {
response.Write(ctx, response.NewError(http.StatusBadRequest, err.Error()))
return
Expand All @@ -323,7 +357,7 @@ func (s *Server) indexFindByTag(ctx *middleware.Context, req models.IndexFindByT
return
}

query, err := tagquery.NewQueryFromStrings(req.Expr, req.From)
query, err := tagquery.NewQueryFromStrings(req.Expr, req.From, 0)
if err != nil {
response.Write(ctx, response.NewError(http.StatusBadRequest, err.Error()))
return
Expand Down
40 changes: 39 additions & 1 deletion api/graphite.go
Original file line number Diff line number Diff line change
Expand Up @@ -1412,7 +1412,7 @@ func (s *Server) graphiteTagDelSeries(ctx *middleware.Context, request models.Gr
builder.Reset()
}

query, err := tagquery.NewQuery(expressions, 0)
query, err := tagquery.NewQuery(expressions, 0, 0)
if err != nil {
response.Write(ctx, response.WrapErrorForTagDB(err))
return
Expand Down Expand Up @@ -1451,6 +1451,44 @@ func (s *Server) graphiteTagDelSeries(ctx *middleware.Context, request models.Gr
response.Write(ctx, response.NewJson(200, res, ""))
}

func (s *Server) graphiteTagDelByQuery(ctx *middleware.Context, request models.GraphiteTagDelByQuery) {
res := models.GraphiteTagDelByQueryResp{}

data := models.IndexTagDelByQuery{OrgId: ctx.OrgId, Expr: request.Expr, OlderThan: request.OlderThan, Execute: request.Execute}
responses, errors := s.queryAllPeers(ctx.Req.Context(), data, "clusterTagDelByQuery,", "/index/tags/delByQuery")

// nothing to do locally on query nodes.
if s.MetricIndex != nil {
matched, err := s.localTagDelByQuery(data)
if err != nil {
response.Write(ctx, response.WrapErrorForTagDB(err))
return
}
res.Count += matched
res.Peers[cluster.Manager.ThisNode().GetName()] = matched
}

// if there are any errors, write one of them and return
for _, err := range errors {
response.Write(ctx, response.WrapErrorForTagDB(err))
return
}

res.Peers = make(map[string]int, len(responses))
peerResp := models.IndexTagDelSeriesResp{}
for peer, resp := range responses {
_, err := peerResp.UnmarshalMsg(resp.buf)
if err != nil {
response.Write(ctx, response.WrapErrorForTagDB(err))
return
}
res.Count += peerResp.Count
res.Peers[peer] = peerResp.Count
}

response.Write(ctx, response.NewJson(200, res, ""))
}

// showPlan attempts to create a Plan given a /render target query.
// If the Plan creation is successful it returns 200, JSON marshaling of Plan.
// Otherwise, it returns 400, error details.
Expand Down
5 changes: 5 additions & 0 deletions api/models/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,3 +53,8 @@ type IndexFindByTagResp struct {
type IndexTagDelSeriesResp struct {
Count int
}

//go:generate msgp
type IndexTagDelByQueryResp struct {
Count int
}
103 changes: 103 additions & 0 deletions api/models/cluster_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

113 changes: 113 additions & 0 deletions api/models/cluster_gen_test.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading