Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[COST-3758] retry failed operator queries up to 5 times #195

Merged
merged 10 commits into from
Aug 8, 2023
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,7 @@ else
KUSTOMIZE=$(shell which kustomize)
endif

NAMESPACE ?= ""
# Generate bundle manifests and metadata, then validate generated files.
bundle: manifests kustomize
mkdir -p koku-metrics-operator/$(VERSION)/
Expand All @@ -278,7 +279,7 @@ bundle: manifests kustomize
operator-sdk bundle validate ./bundle
cp -r ./bundle/ koku-metrics-operator/$(VERSION)/
cp bundle.Dockerfile koku-metrics-operator/$(VERSION)/Dockerfile
scripts/txt_replace.py $(VERSION) $(PREVIOUS_VERSION) ${IMAGE_SHA}
scripts/txt_replace.py $(VERSION) $(PREVIOUS_VERSION) ${IMAGE_SHA} --namespace=${NAMESPACE}

# Build the bundle image.
bundle-build:
Expand Down
10 changes: 5 additions & 5 deletions collector/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ func GenerateReports(cr *metricscfgv1beta1.MetricsConfig, dirCfg *dirconfig.Dire
// ################################################################################################################
log.Info("querying for node metrics")
nodeResults := mappedResults{}
if err := c.getQueryRangeResults(nodeQueries, &nodeResults); err != nil {
if err := c.getQueryRangeResults(nodeQueries, &nodeResults, 5); err != nil {
return err
}

Expand Down Expand Up @@ -274,7 +274,7 @@ func generateCostManagementReports(log gologr.Logger, c *PrometheusCollector, di

log.Info("querying for pod metrics")
podResults := mappedResults{}
if err := c.getQueryRangeResults(podQueries, &podResults); err != nil {
if err := c.getQueryRangeResults(podQueries, &podResults, 5); err != nil {
return err
}

Expand Down Expand Up @@ -314,7 +314,7 @@ func generateCostManagementReports(log gologr.Logger, c *PrometheusCollector, di

log.Info("querying for storage metrics")
volResults := mappedResults{}
if err := c.getQueryRangeResults(volQueries, &volResults); err != nil {
if err := c.getQueryRangeResults(volQueries, &volResults, 5); err != nil {
return err
}

Expand Down Expand Up @@ -346,7 +346,7 @@ func generateCostManagementReports(log gologr.Logger, c *PrometheusCollector, di

log.Info("querying for namespaces")
namespaceResults := mappedResults{}
if err := c.getQueryRangeResults(namespaceQueries, &namespaceResults); err != nil {
if err := c.getQueryRangeResults(namespaceQueries, &namespaceResults, 5); err != nil {
return err
}

Expand Down Expand Up @@ -381,7 +381,7 @@ func generateResourceOpimizationReports(log gologr.Logger, c *PrometheusCollecto
ts := c.TimeSeries.End
log.Info(fmt.Sprintf("querying for resource-optimization for ts: %+v", ts))
rosResults := mappedResults{}
if err := c.getQueryResults(ts, resourceOptimizationQueries, &rosResults); err != nil {
if err := c.getQueryResults(ts, resourceOptimizationQueries, &rosResults, 5); err != nil {
return err
}

Expand Down
43 changes: 37 additions & 6 deletions collector/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,16 +206,23 @@ func (c *PrometheusCollector) GetPromConn(
return nil
}

func (c *PrometheusCollector) getQueryRangeResults(queries *querys, results *mappedResults) error {
func (c *PrometheusCollector) getQueryRangeResults(queries *querys, results *mappedResults, retries int) error {
log := log.WithName("getQueryRangeResults")

queriesToRetry := querys{}

for _, query := range *queries {
ctx, cancel := context.WithTimeout(context.Background(), c.ContextTimeout)
defer cancel()

queryResult, warnings, err := c.PromConn.QueryRange(ctx, query.QueryString, *c.TimeSeries)
if err != nil {
return fmt.Errorf("query: %s: error querying prometheus: %v", query.QueryString, err)
if retries > 0 {
log.Info(fmt.Sprintf("query `%s` failed, appending to queries to retry", query.Name))
queriesToRetry = append(queriesToRetry, query)
continue
} else {
return fmt.Errorf("query: %s: error querying prometheus: %v", query.QueryString, err)
}
}
if len(warnings) > 0 {
log.Info("query warnings", "Warnings", warnings)
Expand All @@ -224,22 +231,37 @@ func (c *PrometheusCollector) getQueryRangeResults(queries *querys, results *map
if !ok {
return fmt.Errorf("expected a matrix in response to query, got a %v", queryResult.Type())
}

results.iterateMatrix(matrix, query)
}

if len(queriesToRetry) > 0 {
retries--
waitTime := time.Duration(5-retries) * time.Second
samdoran marked this conversation as resolved.
Show resolved Hide resolved
log.Info(fmt.Sprintf("retrying failed queries after %s seconds", waitTime))
time.Sleep(waitTime)
return c.getQueryRangeResults(&queriesToRetry, results, retries)
}
return nil
}

func (c *PrometheusCollector) getQueryResults(ts time.Time, queries *querys, results *mappedResults) error {
func (c *PrometheusCollector) getQueryResults(ts time.Time, queries *querys, results *mappedResults, retries int) error {
log := log.WithName("getQueryResults")

queriesToRetry := querys{}

for _, query := range *queries {
ctx, cancel := context.WithTimeout(context.Background(), c.ContextTimeout)
defer cancel()

queryResult, warnings, err := c.PromConn.Query(ctx, query.QueryString, ts)
if err != nil {
return fmt.Errorf("query: %s: error querying prometheus: %v", query.QueryString, err)
if retries > 0 {
log.Info(fmt.Sprintf("query `%s` failed, appending to queries to retry", query.Name))
queriesToRetry = append(queriesToRetry, query)
continue
} else {
return fmt.Errorf("query: %s: error querying prometheus: %v", query.QueryString, err)
}
}
if len(warnings) > 0 {
log.Info("query warnings", "Warnings", warnings)
Expand All @@ -251,5 +273,14 @@ func (c *PrometheusCollector) getQueryResults(ts time.Time, queries *querys, res

results.iterateVector(vector, query)
}

if len(queriesToRetry) > 0 {
retries--
waitTime := time.Duration(5-retries) * time.Second
log.Info(fmt.Sprintf("retrying failed queries after %s seconds", waitTime))
time.Sleep(waitTime)
return c.getQueryResults(ts, &queriesToRetry, results, retries)
}

return nil
}
18 changes: 9 additions & 9 deletions collector/prometheus_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,11 +91,11 @@ func (m mockPrometheusConnectionPolling) Query(ctx context.Context, query string
return res.value, res.warnings, err
}

func TestGetQueryResultsSuccess(t *testing.T) {
func TestGetQueryRangeResultsSuccess(t *testing.T) {
c := PrometheusCollector{
TimeSeries: &promv1.Range{},
}
getQueryResultsErrorsTests := []struct {
getQueryRangeResultsSuccessTests := []struct {
name string
queries *querys
queriesResult mappedMockPromResult
Expand Down Expand Up @@ -197,30 +197,30 @@ func TestGetQueryResultsSuccess(t *testing.T) {
wantedError: nil,
},
}
for _, tt := range getQueryResultsErrorsTests {
for _, tt := range getQueryRangeResultsSuccessTests {
t.Run(tt.name, func(t *testing.T) {
c.PromConn = mockPrometheusConnection{
mappedResults: &tt.queriesResult,
t: t,
}
got := mappedResults{}
err := c.getQueryRangeResults(tt.queries, &got)
err := c.getQueryRangeResults(tt.queries, &got, 5)
if tt.wantedError == nil && err != nil {
t.Errorf("got unexpected error: %v", err)
}
if !reflect.DeepEqual(got, tt.wantedResult) {
t.Errorf("getQueryResults got:\n\t%s\n want:\n\t%s", got, tt.wantedResult)
t.Errorf("getQueryRangeResults got:\n\t%s\n want:\n\t%s", got, tt.wantedResult)
}
})
}
}

func TestGetQueryResultsError(t *testing.T) {
func TestGetQueryRangeResultsError(t *testing.T) {
c := PrometheusCollector{
ContextTimeout: defaultContextTimeout,
TimeSeries: &promv1.Range{},
}
getQueryResultsErrorsTests := []struct {
getQueryRangeResultsErrorsTests := []struct {
name string
queryResult *mockPromResult
wantedResult mappedResults
Expand Down Expand Up @@ -275,14 +275,14 @@ func TestGetQueryResultsError(t *testing.T) {
wantedError: errTest,
},
}
for _, tt := range getQueryResultsErrorsTests {
for _, tt := range getQueryRangeResultsErrorsTests {
t.Run(tt.name, func(t *testing.T) {
c.PromConn = mockPrometheusConnection{
singleResult: tt.queryResult,
t: t,
}
got := mappedResults{}
err := c.getQueryRangeResults(&querys{query{QueryString: "fake-query"}}, &got)
err := c.getQueryRangeResults(&querys{query{QueryString: "fake-query"}}, &got, 5)
if tt.wantedError != nil && err == nil {
t.Errorf("%s got: nil error, want: error", tt.name)
}
Expand Down
95 changes: 92 additions & 3 deletions controllers/kokumetricsconfig_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,47 @@ func TestConcatErrors(t *testing.T) {
}
}

func TestIsQueryNeeded(t *testing.T) {
isQueryNeededTests := []struct {
name string
time time.Time
count []int
want bool
}{
{
name: "time not in tracker",
time: time.Now(),
count: []int{-1},
want: true,
},
{
name: "time in tracker, retry required",
time: time.Now(),
count: []int{0, 1, 2, 3, 4},
want: true,
},
{
name: "time in tracker, retry count exceeded",
time: time.Now(),
count: []int{5, 6, 7, 8, 9},
want: false,
},
}
for _, tt := range isQueryNeededTests {
for _, count := range tt.count {
if count > -1 {
retryTracker[tt.time] = count
}
t.Run(tt.name, func(t *testing.T) {
got := isQueryNeeded(tt.time)
if got != tt.want {
t.Errorf("%s\ngot: %v\nwant: %v\n", tt.name, got, tt.want)
}
})
}
}
}

func setup() error {
type dirInfo struct {
dirName string
Expand Down Expand Up @@ -252,6 +293,7 @@ var _ = Describe("MetricsConfigController - CRD Handling", func() {
GitCommit = "1234567"

setupRequired(ctx)
retentionPeriod = time.Duration(0)

promConnTester = func(promcoll *collector.PrometheusCollector) error { return nil }
promConnSetter = func(promcoll *collector.PrometheusCollector) error {
Expand Down Expand Up @@ -885,7 +927,6 @@ var _ = Describe("MetricsConfigController - CRD Handling", func() {

BeforeEach(func() {
r = &MetricsConfigReconciler{Client: k8sClient, apiReader: k8sManager.GetAPIReader()}
retentionPeriod = time.Duration(0)
Expect(retentionPeriod).To(Equal(time.Duration(0)))
})
It("configMap does not exist - uses 14 days", func() {
Expand Down Expand Up @@ -999,6 +1040,54 @@ var _ = Describe("MetricsConfigController - CRD Handling", func() {
Expect(got).ToNot(Equal(original.Add(-fourteenDayDuration)))
})

It("check the start time on old CR - failed query more than hour old", func() {
// cr.Spec.PrometheusConfig.CollectPreviousData != nil &&
// *cr.Spec.PrometheusConfig.CollectPreviousData &&
// cr.Status.Prometheus.LastQuerySuccessTime.IsZero()
original := time.Now().UTC().Truncate(time.Hour).Add(-2 * time.Hour)

cr := &metricscfgv1beta1.MetricsConfig{
Spec: metricscfgv1beta1.MetricsConfigSpec{
PrometheusConfig: metricscfgv1beta1.PrometheusSpec{
CollectPreviousData: &falseDef,
},
},
Status: metricscfgv1beta1.MetricsConfigStatus{
Prometheus: metricscfgv1beta1.PrometheusStatus{
LastQuerySuccessTime: metav1.Time{Time: original.Add(-1 * time.Hour)},
},
},
}

got, _ := getTimeRange(ctx, r, cr)
Expect(got).To(Equal(original))
Expect(got).ToNot(Equal(original.Add(-fourteenDayDuration)))
})

It("check the start time on old CR - failed query more than retention period old", func() {
// cr.Spec.PrometheusConfig.CollectPreviousData != nil &&
// *cr.Spec.PrometheusConfig.CollectPreviousData &&
// cr.Status.Prometheus.LastQuerySuccessTime.IsZero()
thirtyDaysOld := time.Now().UTC().Truncate(time.Hour).Add(-30 * 24 * time.Hour)
expected := now().UTC().Add(-1 * fourteenDayDuration).Truncate(24 * time.Hour)

cr := &metricscfgv1beta1.MetricsConfig{
Spec: metricscfgv1beta1.MetricsConfigSpec{
PrometheusConfig: metricscfgv1beta1.PrometheusSpec{
CollectPreviousData: &falseDef,
},
},
Status: metricscfgv1beta1.MetricsConfigStatus{
Prometheus: metricscfgv1beta1.PrometheusStatus{
LastQuerySuccessTime: metav1.Time{Time: thirtyDaysOld},
},
},
}

got, _ := getTimeRange(ctx, r, cr)
Expect(got).To(Equal(expected))
})

It("check the start time on new CR - previous data collection set to false", func() {
// cr.Spec.PrometheusConfig.CollectPreviousData != nil &&
// *cr.Spec.PrometheusConfig.CollectPreviousData &&
Expand Down Expand Up @@ -1085,7 +1174,7 @@ var _ = Describe("MetricsConfigController - CRD Handling", func() {
It("2day retention period - successfully queried but there was no data on first day, but data on second", func() {
resetReconciler(WithSecretOverride(true))

testConfigMap.Data = map[string]string{"config.yaml": "prometheusK8s:\n retention: 1d"}
testConfigMap.Data = map[string]string{"config.yaml": "prometheusK8s:\n retention: 2d"}
createObject(ctx, testConfigMap)

t := time.Now().UTC().Truncate(1 * time.Hour).Add(-1 * time.Hour)
Expand Down Expand Up @@ -1116,7 +1205,7 @@ var _ = Describe("MetricsConfigController - CRD Handling", func() {
resetReconciler(WithSecretOverride(true))
now = func() time.Time { return time.Now().Truncate(24 * time.Hour).Add(24 * time.Hour) }

testConfigMap.Data = map[string]string{"config.yaml": "prometheusK8s:\n retention: 1d"}
testConfigMap.Data = map[string]string{"config.yaml": "prometheusK8s:\n retention: 2d"}
createObject(ctx, testConfigMap)

t := now().UTC().Truncate(1 * time.Hour).Add(-1 * time.Hour)
Expand Down
Loading