Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cmd/bosun: expr.execute refactor #1775

Merged
merged 4 commits into from
Jun 9, 2016
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions cmd/bosun/expr/elastic.go
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,7 @@ func timeESRequest(e *State, T miniprofiler.Timer, req *ElasticRequest) (resp *e
}
T.StepCustomTiming("elastic", "query", string(b), func() {
getFn := func() (interface{}, error) {
return e.elasticHosts.Query(req)
return e.ElasticHosts.Query(req)
}
var val interface{}
val, err = e.cache.Get(string(b), getFn)
Expand Down Expand Up @@ -314,14 +314,14 @@ func ESLS(e *State, T miniprofiler.Timer, indexRoot string) (*Results, error) {

func ESDaily(e *State, T miniprofiler.Timer, timeField, indexRoot, layout string) (*Results, error) {
var r Results
err := e.elasticHosts.InitClient()
err := e.ElasticHosts.InitClient()
if err != nil {
return &r, err
}
indexer := ESIndexer{}
indexer.TimeField = timeField
indexer.Generate = func(start, end *time.Time) ([]string, error) {
err := e.elasticHosts.InitClient()
err := e.ElasticHosts.InitClient()
if err != nil {
return []string{}, err
}
Expand Down Expand Up @@ -365,7 +365,7 @@ func ESStat(e *State, T miniprofiler.Timer, indexer ESIndexer, keystring string,

func ESDateHistogram(e *State, T miniprofiler.Timer, indexer ESIndexer, keystring string, filter elastic.Query, interval, sduration, eduration, stat_field, rstat string, size int) (r *Results, err error) {
r = new(Results)
req, err := ESBaseQuery(e.now, indexer, e.elasticHosts, filter, sduration, eduration, size)
req, err := ESBaseQuery(e.now, indexer, filter, sduration, eduration, size)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -467,7 +467,7 @@ func ESDateHistogram(e *State, T miniprofiler.Timer, indexer ESIndexer, keystrin
}

// ESBaseQuery builds the base query that both ESCount and ESStat share
func ESBaseQuery(now time.Time, indexer ESIndexer, l ElasticHosts, filter elastic.Query, sduration, eduration string, size int) (*ElasticRequest, error) {
func ESBaseQuery(now time.Time, indexer ESIndexer, filter elastic.Query, sduration, eduration string, size int) (*ElasticRequest, error) {
start, err := opentsdb.ParseDuration(sduration)
if err != nil {
return nil, err
Expand Down
51 changes: 25 additions & 26 deletions cmd/bosun/expr/expr.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,31 +28,36 @@ type State struct {
now time.Time
cache *cache.Cache
enableComputations bool
unjoinedOk bool

autods int

*Contexts
// OpenTSDB
Search *search.Search
autods int
tsdbContext opentsdb.Context
tsdbQueries []opentsdb.Request
unjoinedOk bool
squelched func(tags opentsdb.TagSet) bool

// Bosun Internal

squelched func(tags opentsdb.TagSet) bool
Search *search.Search
History AlertStatusProvider

// Graphite
graphiteQueries []graphite.Request
graphiteContext graphite.Context

// LogstashElastic (for pre ES v2)
logstashQueries []elasticOld.SearchSource
logstashHosts LogstashElasticHosts

// Elastic (for post ES v2)
elasticQueries []elastic.SearchSource
elasticHosts ElasticHosts

// InfluxDB
InfluxConfig client.Config
}

History AlertStatusProvider
type Contexts struct {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1000

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Contexts may be a bit vague. How about Backends ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

K

TSDBContext opentsdb.Context
GraphiteContext graphite.Context
LogstashHosts LogstashElasticHosts
ElasticHosts ElasticHosts
InfluxConfig client.Config
}

// Alert Status Provider is used to provide information about alert results.
Expand Down Expand Up @@ -85,26 +90,20 @@ func New(expr string, funcs ...map[string]parse.Func) (*Expr, error) {

// Execute applies a parse expression to the specified OpenTSDB context, and
// returns one result per group. T may be nil to ignore timings.
func (e *Expr) Execute(c opentsdb.Context, g graphite.Context, l LogstashElasticHosts, eh ElasticHosts, influxConfig client.Config, cache *cache.Cache, T miniprofiler.Timer, now time.Time, autods int, unjoinedOk bool, search *search.Search, squelched func(tags opentsdb.TagSet) bool, history AlertStatusProvider) (r *Results, queries []opentsdb.Request, err error) {
func (e *Expr) Execute(contexts *Contexts, cache *cache.Cache, T miniprofiler.Timer, now time.Time, autods int, unjoinedOk bool, search *search.Search, squelched func(tags opentsdb.TagSet) bool, history AlertStatusProvider) (r *Results, queries []opentsdb.Request, err error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we still have 9 args? wow.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe I can do better on a revision. Will look some more as long as you approve of the direction. That func signature has been making me mad for a while :P

if squelched == nil {
squelched = func(tags opentsdb.TagSet) bool {
return false
}
}
s := &State{
Expr: e,
cache: cache,
tsdbContext: c,
graphiteContext: g,
logstashHosts: l,
elasticHosts: eh,
InfluxConfig: influxConfig,
now: now,
autods: autods,
unjoinedOk: unjoinedOk,
Search: search,
squelched: squelched,
History: history,
Expr: e,
now: now,
autods: autods,
unjoinedOk: unjoinedOk,
Contexts: contexts,
Search: search,
squelched: squelched,
}
return e.ExecuteState(s, T)
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/bosun/expr/graphite.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ func timeGraphiteRequest(e *State, T miniprofiler.Timer, req *graphite.Request)
T.StepCustomTiming("graphite", "query", string(b), func() {
key := req.CacheKey()
getFn := func() (interface{}, error) {
return e.graphiteContext.Query(req)
return e.GraphiteContext.Query(req)
}
var val interface{}
val, err = e.cache.Get(key, getFn)
Expand Down
6 changes: 3 additions & 3 deletions cmd/bosun/expr/logstash.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ func timeLSRequest(e *State, T miniprofiler.Timer, req *LogstashRequest) (resp *
b, _ := json.MarshalIndent(req.Source.Source(), "", " ")
T.StepCustomTiming("logstash", "query", string(b), func() {
getFn := func() (interface{}, error) {
return e.logstashHosts.Query(req)
return e.LogstashHosts.Query(req)
}
var val interface{}
val, err = e.cache.Get(string(b), getFn)
Expand Down Expand Up @@ -206,7 +206,7 @@ func LSStat(e *State, T miniprofiler.Timer, index_root, keystring, filter, field
// that Bosun can understand
func LSDateHistogram(e *State, T miniprofiler.Timer, index_root, keystring, filter, interval, sduration, eduration, stat_field, rstat string, size int) (r *Results, err error) {
r = new(Results)
req, err := LSBaseQuery(e.now, index_root, e.logstashHosts, keystring, filter, sduration, eduration, size)
req, err := LSBaseQuery(e.now, index_root, keystring, filter, sduration, eduration, size)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -339,7 +339,7 @@ func processBucketItem(b *elastic.AggregationBucketHistogramItem, rstat string)
}

// LSBaseQuery builds the base query that both LSCount and LSStat share
func LSBaseQuery(now time.Time, indexRoot string, l LogstashElasticHosts, keystring string, filter, sduration, eduration string, size int) (*LogstashRequest, error) {
func LSBaseQuery(now time.Time, indexRoot string, keystring string, filter, sduration, eduration string, size int) (*LogstashRequest, error) {
start, err := opentsdb.ParseDuration(sduration)
if err != nil {
return nil, err
Expand Down
14 changes: 7 additions & 7 deletions cmd/bosun/expr/tsdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func timeTSDBRequest(e *State, T miniprofiler.Timer, req *opentsdb.Request) (s o
for {
T.StepCustomTiming("tsdb", "query", string(b), func() {
getFn := func() (interface{}, error) {
return e.tsdbContext.Query(req)
return e.TSDBContext.Query(req)
}
var val interface{}
val, err = e.cache.Get(string(b), getFn)
Expand Down Expand Up @@ -113,11 +113,11 @@ func bandTSDB(e *State, T miniprofiler.Timer, query, duration, period string, nu
err = fmt.Errorf("num out of bounds")
}
var q *opentsdb.Query
q, err = opentsdb.ParseQuery(query, e.tsdbContext.Version())
q, err = opentsdb.ParseQuery(query, e.TSDBContext.Version())
if err != nil {
return
}
if !e.tsdbContext.Version().FilterSupport() {
if !e.TSDBContext.Version().FilterSupport() {
if err = e.Search.Expand(q); err != nil {
return
}
Expand Down Expand Up @@ -308,11 +308,11 @@ func Over(e *State, T miniprofiler.Timer, query, duration, period string, num fl
err = fmt.Errorf("num out of bounds")
}
var q *opentsdb.Query
q, err = opentsdb.ParseQuery(query, e.tsdbContext.Version())
q, err = opentsdb.ParseQuery(query, e.TSDBContext.Version())
if err != nil {
return
}
if !e.tsdbContext.Version().FilterSupport() {
if !e.TSDBContext.Version().FilterSupport() {
if err = e.Search.Expand(q); err != nil {
return
}
Expand Down Expand Up @@ -356,11 +356,11 @@ func Over(e *State, T miniprofiler.Timer, query, duration, period string, num fl

func Query(e *State, T miniprofiler.Timer, query, sduration, eduration string) (r *Results, err error) {
r = new(Results)
q, err := opentsdb.ParseQuery(query, e.tsdbContext.Version())
q, err := opentsdb.ParseQuery(query, e.TSDBContext.Version())
if q == nil && err != nil {
return
}
if !e.tsdbContext.Version().FilterSupport() {
if !e.TSDBContext.Version().FilterSupport() {
if err = e.Search.Expand(q); err != nil {
return
}
Expand Down
40 changes: 18 additions & 22 deletions cmd/bosun/sched/check.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,11 @@ import (
"bosun.org/cmd/bosun/conf"
"bosun.org/cmd/bosun/expr"
"bosun.org/collect"
"bosun.org/graphite"
"bosun.org/metadata"
"bosun.org/models"
"bosun.org/opentsdb"
"bosun.org/slog"
"github.com/MiniProfiler/go/miniprofiler"
"github.com/influxdata/influxdb/client"
)

func init() {
Expand Down Expand Up @@ -49,14 +47,9 @@ func NewIncident(ak models.AlertKey) *models.IncidentState {
}

type RunHistory struct {
Cache *cache.Cache
Start time.Time
Context opentsdb.Context
GraphiteContext graphite.Context
InfluxConfig client.Config
Logstash expr.LogstashElasticHosts
Elastic expr.ElasticHosts

Cache *cache.Cache
Start time.Time
Contexts *expr.Contexts
Events map[models.AlertKey]*models.Event
schedule *Schedule
}
Expand All @@ -70,17 +63,20 @@ func (rh *RunHistory) AtTime(t time.Time) *RunHistory {
}

func (s *Schedule) NewRunHistory(start time.Time, cache *cache.Cache) *RunHistory {
return &RunHistory{
Cache: cache,
Start: start,
Events: make(map[models.AlertKey]*models.Event),
Context: s.Conf.TSDBContext(),
GraphiteContext: s.Conf.GraphiteContext(),
InfluxConfig: s.Conf.InfluxConfig,
Logstash: s.Conf.LogstashElasticHosts,
Elastic: s.Conf.ElasticHosts,
schedule: s,
}
r := &RunHistory{
Cache: cache,
Start: start,
Events: make(map[models.AlertKey]*models.Event),
schedule: s,
Contexts: &expr.Contexts{
TSDBContext: s.Conf.TSDBContext(),
GraphiteContext: s.Conf.GraphiteContext(),
InfluxConfig: s.Conf.InfluxConfig,
LogstashHosts: s.Conf.LogstashElasticHosts,
ElasticHosts: s.Conf.ElasticHosts,
},
}
return r
}

// RunHistory processes an event history and triggers notifications if needed.
Expand Down Expand Up @@ -589,7 +585,7 @@ func (s *Schedule) executeExpr(T miniprofiler.Timer, rh *RunHistory, a *conf.Ale
if e == nil {
return nil, nil
}
results, _, err := e.Execute(rh.Context, rh.GraphiteContext, rh.Logstash, rh.Elastic, rh.InfluxConfig, rh.Cache, T, rh.Start, 0, a.UnjoinedOK, s.Search, s.Conf.AlertSquelched(a), s)
results, _, err := e.Execute(rh.Contexts, rh.Cache, T, rh.Start, 0, a.UnjoinedOK, s.Search, s.Conf.AlertSquelched(a), s)
return results, err
}

Expand Down
14 changes: 7 additions & 7 deletions cmd/bosun/sched/template.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ func (c *Context) evalExpr(e *expr.Expr, filter bool, series bool, autods int) (
if series && e.Root.Return() != models.TypeSeriesSet {
return nil, "", fmt.Errorf("need a series, got %T (%v)", e, e)
}
res, _, err := e.Execute(c.runHistory.Context, c.runHistory.GraphiteContext, c.runHistory.Logstash, c.runHistory.Elastic, c.runHistory.InfluxConfig, c.runHistory.Cache, nil, c.runHistory.Start, autods, c.Alert.UnjoinedOK, c.schedule.Search, c.schedule.Conf.AlertSquelched(c.Alert), c.schedule)
res, _, err := e.Execute(c.runHistory.Contexts, c.runHistory.Cache, nil, c.runHistory.Start, autods, c.Alert.UnjoinedOK, c.schedule.Search, c.schedule.Conf.AlertSquelched(c.Alert), c.schedule)
if err != nil {
return nil, "", fmt.Errorf("%s: %v", e, err)
}
Expand Down Expand Up @@ -483,11 +483,11 @@ func (c *Context) LSQuery(index_root, filter, sduration, eduration string, size
}

func (c *Context) LSQueryAll(index_root, keystring, filter, sduration, eduration string, size int) (interface{}, error) {
req, err := expr.LSBaseQuery(c.runHistory.Start, index_root, c.runHistory.Logstash, keystring, filter, sduration, eduration, size)
req, err := expr.LSBaseQuery(c.runHistory.Start, index_root, keystring, filter, sduration, eduration, size)
if err != nil {
return nil, err
}
results, err := c.runHistory.Logstash.Query(req)
results, err := c.runHistory.Contexts.LogstashHosts.Query(req)
if err != nil {
return nil, err
}
Expand All @@ -504,11 +504,11 @@ func (c *Context) LSQueryAll(index_root, keystring, filter, sduration, eduration

func (c *Context) ESQuery(indexRoot expr.ESIndexer, filter expr.ESQuery, sduration, eduration string, size int) (interface{}, error) {
newFilter := expr.ScopeES(c.Group(), filter.Query)
req, err := expr.ESBaseQuery(c.runHistory.Start, indexRoot, c.runHistory.Elastic, newFilter, sduration, eduration, size)
req, err := expr.ESBaseQuery(c.runHistory.Start, indexRoot, newFilter, sduration, eduration, size)
if err != nil {
return nil, err
}
results, err := c.runHistory.Elastic.Query(req)
results, err := c.runHistory.Contexts.ElasticHosts.Query(req)
if err != nil {
return nil, err
}
Expand All @@ -524,11 +524,11 @@ func (c *Context) ESQuery(indexRoot expr.ESIndexer, filter expr.ESQuery, sdurati
}

func (c *Context) ESQueryAll(indexRoot expr.ESIndexer, filter expr.ESQuery, sduration, eduration string, size int) (interface{}, error) {
req, err := expr.ESBaseQuery(c.runHistory.Start, indexRoot, c.runHistory.Elastic, filter.Query, sduration, eduration, size)
req, err := expr.ESBaseQuery(c.runHistory.Start, indexRoot, filter.Query, sduration, eduration, size)
if err != nil {
return nil, err
}
results, err := c.runHistory.Elastic.Query(req)
results, err := c.runHistory.Contexts.ElasticHosts.Query(req)
if err != nil {
return nil, err
}
Expand Down
14 changes: 8 additions & 6 deletions cmd/bosun/web/chart.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,12 +236,14 @@ func ExprGraph(t miniprofiler.Timer, w http.ResponseWriter, r *http.Request) (in
return nil, fmt.Errorf("egraph: requires an expression that returns a series")
}
// it may not strictly be necessary to recreate the contexts each time, but we do to be safe
tsdbContext := schedule.Conf.TSDBContext()
graphiteContext := schedule.Conf.GraphiteContext()
ls := schedule.Conf.LogstashElasticHosts
influx := schedule.Conf.InfluxConfig
es := schedule.Conf.ElasticHosts
res, _, err := e.Execute(tsdbContext, graphiteContext, ls, es, influx, cacheObj, t, now, autods, false, schedule.Search, nil, nil)
contexts := &expr.Contexts{
TSDBContext: schedule.Conf.TSDBContext(),
GraphiteContext: schedule.Conf.GraphiteContext(),
InfluxConfig: schedule.Conf.InfluxConfig,
LogstashHosts: schedule.Conf.LogstashElasticHosts,
ElasticHosts: schedule.Conf.ElasticHosts,
}
res, _, err := e.Execute(contexts, cacheObj, t, now, autods, false, schedule.Search, nil, nil)
if err != nil {
return nil, err
}
Expand Down
14 changes: 8 additions & 6 deletions cmd/bosun/web/expr.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,12 +72,14 @@ func Expr(t miniprofiler.Timer, w http.ResponseWriter, r *http.Request) (v inter
return nil, err
}
// it may not strictly be necessary to recreate the contexts each time, but we do to be safe
tsdbContext := schedule.Conf.TSDBContext()
graphiteContext := schedule.Conf.GraphiteContext()
ls := schedule.Conf.LogstashElasticHosts
influx := schedule.Conf.InfluxConfig
es := schedule.Conf.ElasticHosts
res, queries, err := e.Execute(tsdbContext, graphiteContext, ls, es, influx, cacheObj, t, now, 0, false, schedule.Search, nil, nil)
contexts := &expr.Contexts{
TSDBContext: schedule.Conf.TSDBContext(),
GraphiteContext: schedule.Conf.GraphiteContext(),
InfluxConfig: schedule.Conf.InfluxConfig,
LogstashHosts: schedule.Conf.LogstashElasticHosts,
ElasticHosts: schedule.Conf.ElasticHosts,
}
res, queries, err := e.Execute(contexts, cacheObj, t, now, 0, false, schedule.Search, nil, nil)
if err != nil {
return nil, err
}
Expand Down