Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

don't fetch indices , add indices to cache, and add esmonthly func #1931

Merged
merged 2 commits into from
Oct 20, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
119 changes: 65 additions & 54 deletions cmd/bosun/expr/elastic.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"bosun.org/models"
"bosun.org/opentsdb"
"github.com/MiniProfiler/go/miniprofiler"
"github.com/jinzhu/now"
elastic "gopkg.in/olivere/elastic.v3"
)

Expand Down Expand Up @@ -51,11 +52,14 @@ var Elastic = map[string]parse.Func{
F: ESIndicies,
},
"esdaily": {
Args: []models.FuncType{models.TypeString, models.TypeString, models.TypeString},
VArgs: true,
VArgsPos: 1,
Return: models.TypeESIndexer,
F: ESDaily,
Args: []models.FuncType{models.TypeString, models.TypeString, models.TypeString},
Return: models.TypeESIndexer,
F: ESDaily,
},
"esmonthly": {
Args: []models.FuncType{models.TypeString, models.TypeString, models.TypeString},
Return: models.TypeESIndexer,
F: ESMonthly,
},
"esls": {
Args: []models.FuncType{models.TypeString},
Expand Down Expand Up @@ -265,37 +269,47 @@ func (e ElasticHosts) Query(r *ElasticRequest) (*elastic.SearchResult, error) {
if err != nil {
return nil, err
}
indicies, err := r.Indexer.Generate(r.Start, r.End)
if err != nil {
return nil, err
}
s.Index(indicies...)
return s.SearchSource(r.Source).Do()
s.Index(r.Indices...)
// With IgnoreUnavailable there can be gaps in the indices (i.e. missing days) and we will not error
// If no indices match than there will be no successful shards and and error is returned in that case
s.IgnoreUnavailable(true)
res, err := s.SearchSource(r.Source).Do()
if err != nil {
return nil, err
}
if res.Shards == nil {
return nil, fmt.Errorf("no shard info in reply, should not be here please file issue")
}
if res.Shards.Successful == 0 {
return nil, fmt.Errorf("no successful shards in result, perhaps the index does exist, total shards: %v, failed shards: %v", res.Shards.Total, res.Shards.Failed)
}
return res, nil
}

// ElasticRequest is a container for the information needed to query elasticsearch or a date
// histogram.
type ElasticRequest struct {
Indexer ESIndexer
Indices []string
Start *time.Time
End *time.Time
Source *elastic.SearchSource // This the object that we build queries in
}

// CacheKey returns the text of the elastic query. That text is the indentifer for
// the query in the cache
// the query in the cache. It is a combination of the indices queries and the json query content
func (r *ElasticRequest) CacheKey() (string, error) {
s, err := r.Source.Source()
if err != nil {
return "", err
}
var str string
var ok bool
str, ok = s.(string)
if !ok {
return "", fmt.Errorf("failed to generate string representation of search source for cache key: %s", s)
b, err := json.Marshal(s)
if err != nil {
return "", fmt.Errorf("failed to generate json representation of search source for cache key: %s", s)
}
return str, nil
return fmt.Sprintf("%v\n%s", r.Indices, b), nil
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

probably use string(b) here. Don't want [42 31 13] in you key.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

additionally, what does the source have that causes it to be a different key? What varies?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I ask because it surprises me that something like getting a cache key even has a possibility of erroring.

}

// timeESRequest execute the elasticsearch query (which may set or hit cache) and returns
Expand All @@ -311,28 +325,32 @@ func timeESRequest(e *State, T miniprofiler.Timer, req *ElasticRequest) (resp *e
if err != nil {
return resp, err
}
T.StepCustomTiming("elastic", "query", string(b), func() {
key, err := req.CacheKey()
if err != nil {
return nil, err
}
T.StepCustomTiming("elastic", "query", fmt.Sprintf("%v\n%s", req.Indices, b), func() {
getFn := func() (interface{}, error) {
return e.ElasticHosts.Query(req)
}
var val interface{}
val, err = e.Cache.Get(string(b), getFn)
val, err = e.Cache.Get(key, getFn)
resp = val.(*elastic.SearchResult)
})
return
}

func ESIndicies(e *State, T miniprofiler.Timer, timeField string, literalIndices ...string) (*Results, error) {
func ESIndicies(e *State, T miniprofiler.Timer, timeField string, literalIndices ...string) *Results {
var r Results
indexer := ESIndexer{}
// Don't check for existing indexes in this case, just pass through and let elastic return
// an error at query time if the index does not exist
indexer.Generate = func(start, end *time.Time) ([]string, error) {
return literalIndices, nil
indexer.Generate = func(start, end *time.Time) []string {
return literalIndices
}
indexer.TimeField = timeField
r.Results = append(r.Results, &Result{Value: indexer})
return &r, nil
return &r
}

func ESLS(e *State, T miniprofiler.Timer, indexRoot string) (*Results, error) {
Expand All @@ -341,41 +359,33 @@ func ESLS(e *State, T miniprofiler.Timer, indexRoot string) (*Results, error) {

func ESDaily(e *State, T miniprofiler.Timer, timeField, indexRoot, layout string) (*Results, error) {
var r Results
err := e.ElasticHosts.InitClient()
if err != nil {
return &r, err
}
indexer := ESIndexer{}
indexer.TimeField = timeField
indexer.Generate = func(start, end *time.Time) ([]string, error) {
err := e.ElasticHosts.InitClient()
if err != nil {
return []string{}, err
}
indices, err := esClient.IndexNames()
if err != nil {
return []string{}, err
}
trunStart := start.Truncate(time.Hour * 24)
trunEnd := end.Truncate(time.Hour*24).AddDate(0, 0, 1)
var selectedIndices []string
for _, index := range indices {
date := strings.TrimPrefix(index, indexRoot)
if !strings.HasPrefix(index, indexRoot) {
continue
}
d, err := time.Parse(layout, date)
if err != nil {
continue
}
if !d.Before(trunStart) && !d.After(trunEnd) {
selectedIndices = append(selectedIndices, index)
}
indexer.Generate = func(start, end *time.Time) []string {
var indices []string
truncStart := now.New(*start).BeginningOfDay()
truncEnd := now.New(*end).BeginningOfDay()
for d := truncStart; !d.After(truncEnd); d = d.AddDate(0, 0, 1) {
indices = append(indices, fmt.Sprintf("%v%v", indexRoot, d.Format(layout)))
}
if len(selectedIndices) == 0 {
return selectedIndices, fmt.Errorf("no elastic indices available during this time range, index[%s], start/end [%s|%s]", indexRoot, start.Format("2006.01.02"), end.Format("2006.01.02"))
return indices
}
r.Results = append(r.Results, &Result{Value: indexer})
return &r, nil
}

func ESMonthly(e *State, T miniprofiler.Timer, timeField, indexRoot, layout string) (*Results, error) {
var r Results
indexer := ESIndexer{}
indexer.TimeField = timeField
indexer.Generate = func(start, end *time.Time) []string {
var indices []string
truncStart := now.New(*start).BeginningOfMonth()
truncEnd := now.New(*end).BeginningOfMonth()
for d := truncStart; !d.After(truncEnd); d = d.AddDate(0, 1, 0) {
indices = append(indices, fmt.Sprintf("%v%v", indexRoot, d.Format(layout)))
}
return selectedIndices, nil
return indices
}
r.Results = append(r.Results, &Result{Value: indexer})
return &r, nil
Expand Down Expand Up @@ -511,8 +521,9 @@ func ESBaseQuery(now time.Time, indexer ESIndexer, filter elastic.Query, sdurati
}
st := now.Add(time.Duration(-start))
en := now.Add(time.Duration(-end))
indices := indexer.Generate(&st, &en)
r := ElasticRequest{
Indexer: indexer,
Indices: indices,
Start: &st,
End: &en,
Source: elastic.NewSearchSource().Size(size),
Expand Down
2 changes: 1 addition & 1 deletion cmd/bosun/expr/expr.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ func (e ESQuery) MarshalJSON() ([]byte, error) {

type ESIndexer struct {
TimeField string
Generate func(startDuration, endDuration *time.Time) ([]string, error)
Generate func(startDuration, endDuration *time.Time) []string
}

func (e ESIndexer) Type() models.FuncType { return models.TypeESIndexer }
Expand Down