Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add timeout for prometheus datasource #242

Merged
merged 1 commit into from
Mar 31, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmd/craned/app/options/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ func (o *Options) AddFlags(flags *pflag.FlagSet) {
flags.IntVar(&o.DataSourcePromConfig.QueryConcurrency, "prometheus-query-concurrency", 10, "prometheus query concurrency")
flags.BoolVar(&o.DataSourcePromConfig.InsecureSkipVerify, "prometheus-insecure-skip-verify", false, "prometheus insecure skip verify")
flags.DurationVar(&o.DataSourcePromConfig.KeepAlive, "prometheus-keepalive", 60*time.Second, "prometheus keep alive")
flags.DurationVar(&o.DataSourcePromConfig.Timeout, "prometheus-timeout", 60*time.Second, "prometheus timeout")
flags.DurationVar(&o.DataSourcePromConfig.Timeout, "prometheus-timeout", 3*time.Minute, "prometheus timeout")
flags.BoolVar(&o.DataSourcePromConfig.BRateLimit, "prometheus-bratelimit", false, "prometheus bratelimit")
flags.IntVar(&o.DataSourcePromConfig.MaxPointsLimitPerTimeSeries, "prometheus-maxpoints", 11000, "prometheus max points limit per time series")
flags.StringVar(&o.DataSourceMockConfig.SeedFile, "seed-file", "", "mock provider seed file")
Expand Down
37 changes: 6 additions & 31 deletions pkg/prediction/percentile/prediction.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package percentile
import (
"context"
"fmt"
"math"
"sync"
"time"

Expand Down Expand Up @@ -173,8 +172,6 @@ func (p *percentilePrediction) QueryRealtimePredictedValuesOnce(ctx context.Cont
// process is a stateless function to get estimation of a metric series by constructing a histogram then get estimation data.
func (p *percentilePrediction) process(namer metricnaming.MetricNamer, config config.Config) ([]*common.TimeSeries, error) {
var predictedTimeSeriesList []*common.TimeSeries
maxAttempts := 10
attempts := 0
var historyTimeSeriesList []*common.TimeSeries
var err error
queryExpr := namer.BuildUniqueKey()
Expand All @@ -184,19 +181,9 @@ func (p *percentilePrediction) process(namer metricnaming.MetricNamer, config co
}
klog.V(4).Infof("process analyzing metric namer: %v, config: %+v", namer.BuildUniqueKey(), *cfg)

for attempts < maxAttempts {
historyTimeSeriesList, err = p.queryHistoryTimeSeries(namer, cfg)
if err != nil {
attempts++
t := time.Second * time.Duration(math.Pow(2., float64(attempts)))
klog.ErrorS(err, "Failed to get time series.", "queryExpr", queryExpr, "attempts", attempts)
time.Sleep(t)
} else {
break
}
}
if attempts == maxAttempts {
klog.Errorf("After attempting %d times, still cannot get history time series for query expression '%s'.", maxAttempts, queryExpr)
historyTimeSeriesList, err = p.queryHistoryTimeSeries(namer, cfg)
if err != nil {
klog.Errorf("Failed to query history time series for query expression '%s'.", queryExpr)
return nil, err
}

Expand Down Expand Up @@ -414,23 +401,11 @@ func (p *percentilePrediction) init(namer metricnaming.MetricNamer) error {
queryExpr := namer.BuildUniqueKey()
cfg := p.a.GetConfig(queryExpr)
// Query history data for prediction
maxAttempts := 10
attempts := 0
var historyTimeSeriesList []*common.TimeSeries
var err error
for attempts < maxAttempts {
historyTimeSeriesList, err = p.queryHistoryTimeSeries(namer, cfg)
if err != nil {
attempts++
t := time.Second * time.Duration(math.Pow(2., float64(attempts)))
klog.ErrorS(err, "Failed to get time series.", "queryExpr", queryExpr, "attempts", attempts)
time.Sleep(t)
} else {
break
}
}
if attempts == maxAttempts {
klog.Errorf("After attempting %d times, still cannot get history time series for query expression '%s'.", maxAttempts, queryExpr)
historyTimeSeriesList, err = p.queryHistoryTimeSeries(namer, cfg)
if err != nil {
klog.Errorf("Failed to query history time series for query expression '%s'.", queryExpr)
return err
}

Expand Down
16 changes: 10 additions & 6 deletions pkg/providers/prom/ctx.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,17 +51,18 @@ func (c *context) QueryRangeSync(ctx gocontext.Context, query string, start, end
}
shards := c.computeShards(query, &r)
if len(shards.windows) <= 1 {
klog.V(4).InfoS("Prom query directly", "query", query)
var ts []*common.TimeSeries
results, warnings, err := c.api.QueryRange(ctx, query, r)
if len(warnings) != 0 {
klog.V(4).InfoS("prom query range warnings", "warnings", warnings)
klog.V(4).InfoS("Prom query range warnings", "warnings", warnings)
}
// todo: parse err to see its max limit dynamically
if err != nil {
return ts, err
}
if klog.V(10).Enabled() {
klog.V(10).InfoS("prom query range result", "result", results.String(), "resultsType", results.Type())
if klog.V(7).Enabled() {
klog.V(7).InfoS("Prom query range result", "query", query, "result", results.String(), "resultsType", results.Type())
}

return c.convertPromResultsToTimeSeries(results)
Expand All @@ -74,27 +75,29 @@ func (c *context) QuerySync(ctx gocontext.Context, query string) ([]*common.Time
var ts []*common.TimeSeries
results, warnings, err := c.api.Query(ctx, query, time.Now())
if len(warnings) != 0 {
klog.InfoS("prom query warnings", "warnings", warnings)
klog.InfoS("Prom query warnings", "warnings", warnings)
}
if err != nil {
return ts, err
}
klog.V(8).InfoS("prom query result", "result", results.String(), "resultsType", results.Type())
klog.V(8).InfoS("Prom query result", "result", results.String(), "resultsType", results.Type())
return c.convertPromResultsToTimeSeries(results)

}

func (c *context) queryByShards(ctx gocontext.Context, queryShards *QueryShards) ([]*common.TimeSeries, error) {
klog.V(4).InfoS("Prom query range by shards", "query", queryShards.query)
resultsCh := make(chan *QueryShardResult, len(queryShards.windows))
var wg sync.WaitGroup
for _, window := range queryShards.windows {
wg.Add(1)
go func(ctx gocontext.Context, window *promapiv1.Range) {
defer runtime.HandleCrash()
defer wg.Done()
klog.V(6).InfoS("Prom query range by shards", "query", queryShards.query, "window", window)
value, warnings, err := c.api.QueryRange(ctx, queryShards.query, *window)
if len(warnings) != 0 {
klog.V(4).InfoS("prom query range warnings", "warnings", warnings, "window", window, "query", queryShards.query)
klog.V(4).InfoS("Prom query range warnings", "warnings", warnings, "window", window, "query", queryShards.query)
}
if err != nil {
resultsCh <- &QueryShardResult{
Expand Down Expand Up @@ -126,6 +129,7 @@ func (c *context) queryByShards(ctx gocontext.Context, queryShards *QueryShards)
wg.Wait()
close(resultsCh)

klog.V(4).InfoS("Prom query range by shards, all shards query done", "query", queryShards.query)
var errs []error
resultsMap := make(map[string]*common.TimeSeries)
var results []*common.TimeSeries
Expand Down
17 changes: 11 additions & 6 deletions pkg/providers/prom/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ import (
)

type prom struct {
ctx *context
ctx *context
config *providers.PromConfig
}

// NewProvider return a prometheus data provider
Expand All @@ -26,7 +27,7 @@ func NewProvider(config *providers.PromConfig) (providers.Interface, error) {

ctx := NewContext(client, config.MaxPointsLimitPerTimeSeries)

return &prom{ctx: ctx}, nil
return &prom{ctx: ctx, config: config}, nil
}

func (p *prom) QueryTimeSeries(namer metricnaming.MetricNamer, startTime time.Time, endTime time.Time, step time.Duration) ([]*common.TimeSeries, error) {
Expand All @@ -36,12 +37,14 @@ func (p *prom) QueryTimeSeries(namer metricnaming.MetricNamer, startTime time.Ti
klog.Errorf("Failed to BuildQuery: %v", err)
return nil, err
}
timeSeries, err := p.ctx.QueryRangeSync(gocontext.TODO(), promQuery.Prometheus.Query, startTime, endTime, step)
klog.V(6).Infof("QueryTimeSeries metricNamer %v, timeout: %v", namer.BuildUniqueKey(), p.config.Timeout)
timeoutCtx, cancelFunc := gocontext.WithTimeout(gocontext.Background(), p.config.Timeout)
defer cancelFunc()
timeSeries, err := p.ctx.QueryRangeSync(timeoutCtx, promQuery.Prometheus.Query, startTime, endTime, step)
if err != nil {
klog.Errorf("Failed to QueryTimeSeries: %v, metricNamer: %v, query: %v", err, namer.BuildUniqueKey(), promQuery.Prometheus.Query)
return nil, err
}
klog.V(6).Infof("QueryTimeSeries metricNamer %v", namer.BuildUniqueKey())
return timeSeries, nil
}

Expand All @@ -56,11 +59,13 @@ func (p *prom) QueryLatestTimeSeries(namer metricnaming.MetricNamer) ([]*common.
//end := time.Now()
// avoid no data latest. multiply 2
//start := end.Add(-step * 2)
timeSeries, err := p.ctx.QuerySync(gocontext.TODO(), promQuery.Prometheus.Query)
klog.V(6).Infof("QueryLatestTimeSeries metricNamer %v, timeout: %v", namer.BuildUniqueKey(), p.config.Timeout)
timeoutCtx, cancelFunc := gocontext.WithTimeout(gocontext.Background(), p.config.Timeout)
defer cancelFunc()
timeSeries, err := p.ctx.QuerySync(timeoutCtx, promQuery.Prometheus.Query)
if err != nil {
klog.Errorf("Failed to QueryLatestTimeSeries: %v, metricNamer: %v, query: %v", err, namer.BuildUniqueKey(), promQuery.Prometheus.Query)
return nil, err
}
klog.V(6).Infof("QueryLatestTimeSeries metricNamer %v", namer.BuildUniqueKey())
return timeSeries, nil
}
4 changes: 2 additions & 2 deletions pkg/recommend/advisor/resource_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ func (a *ResourceRequestAdvisor) Advise(proposed *types.ProposedRecommendation)
}

mericNamer := ResourceToContainerMetricNamer(namespace, pod.Name, c.Name, corev1.ResourceCPU)
klog.V(8).Infof("CPU query for resource request recommendation: %s", mericNamer.BuildUniqueKey())
klog.V(6).Infof("CPU query for resource request recommendation: %s", mericNamer.BuildUniqueKey())
cpuConfig := makeCpuConfig(a.ConfigProperties)
tsList, err := utils.QueryPredictedValuesOnce(a.Recommendation, p,
fmt.Sprintf(callerFormat, a.Recommendation.UID), cpuConfig, mericNamer)
Expand All @@ -122,7 +122,7 @@ func (a *ResourceRequestAdvisor) Advise(proposed *types.ProposedRecommendation)
cr.Target[corev1.ResourceCPU] = resource.NewMilliQuantity(v, resource.DecimalSI).String()

mericNamer = ResourceToContainerMetricNamer(namespace, pod.Name, c.Name, corev1.ResourceMemory)
klog.V(8).Infof("Memory query for resource request recommendation: %s", mericNamer.BuildUniqueKey())
klog.V(6).Infof("Memory query for resource request recommendation: %s", mericNamer.BuildUniqueKey())
memConfig := makeMemConfig(a.ConfigProperties)
tsList, err = utils.QueryPredictedValuesOnce(a.Recommendation, p,
fmt.Sprintf(callerFormat, a.Recommendation.UID), memConfig, mericNamer)
Expand Down