Skip to content

Commit

Permalink
Handle grpc code resource exhausted for store gateway (cortexproject#…
Browse files Browse the repository at this point in the history
…5286)

* handle grpc code resource exhausted for store gateway

Signed-off-by: Ben Ye <benye@amazon.com>

* fix lint

Signed-off-by: Ben Ye <benye@amazon.com>

* update changelog

Signed-off-by: Ben Ye <benye@amazon.com>

* try fixing test

Signed-off-by: Ben Ye <benye@amazon.com>

* try to fix E2E test

Signed-off-by: Ben Ye <benye@amazon.com>

* lint

Signed-off-by: Ben Ye <benye@amazon.com>

* try again

Signed-off-by: Ben Ye <benye@amazon.com>

* fix message

Signed-off-by: Ben Ye <benye@amazon.com>

* remove labels API

Signed-off-by: Ben Ye <benye@amazon.com>

* remove logic to check string contains

Signed-off-by: Ben Ye <benye@amazon.com>

* make limiter vars private

Signed-off-by: Ben Ye <benye@amazon.com>

---------

Signed-off-by: Ben Ye <benye@amazon.com>
  • Loading branch information
yeya24 authored and alanprot committed Apr 26, 2023
1 parent 92fcee2 commit 0f312ef
Show file tree
Hide file tree
Showing 7 changed files with 192 additions and 14 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@

## master / unreleased

## 1.15.0 2023-04-26

* [BUGFIX] Querier: Convert gRPC `ResourceExhausted` status code from store gateway to 422 limit error. #5286

## 1.15.0 2023-04-19

* [CHANGE] Storage: Make Max exemplars config per tenant instead of global configuration. #5080 #5122
Expand Down
88 changes: 83 additions & 5 deletions integration/e2ecortex/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,10 +150,88 @@ func (c *Client) QueryRangeRaw(query string, start, end time.Time, step time.Dur
}

// QueryRaw runs a query directly against the querier API.
func (c *Client) QueryRaw(query string) (*http.Response, []byte, error) {
addr := fmt.Sprintf("http://%s/api/prom/api/v1/query?query=%s", c.querierAddress, url.QueryEscape(query))
func (c *Client) QueryRaw(query string, ts time.Time) (*http.Response, []byte, error) {
u := &url.URL{
Scheme: "http",
Path: fmt.Sprintf("%s/api/prom/api/v1/query", c.querierAddress),
}
q := u.Query()
q.Set("query", query)

return c.query(addr)
if !ts.IsZero() {
q.Set("time", FormatTime(ts))
}
u.RawQuery = q.Encode()
return c.query(u.String())
}

// SeriesRaw runs a series request directly against the querier API.
func (c *Client) SeriesRaw(matches []string, startTime, endTime time.Time) (*http.Response, []byte, error) {
u := &url.URL{
Scheme: "http",
Path: fmt.Sprintf("%s/api/prom/api/v1/series", c.querierAddress),
}
q := u.Query()

for _, m := range matches {
q.Add("match[]", m)
}

if !startTime.IsZero() {
q.Set("start", FormatTime(startTime))
}
if !endTime.IsZero() {
q.Set("end", FormatTime(endTime))
}

u.RawQuery = q.Encode()
return c.query(u.String())
}

// LabelNamesRaw runs a label names request directly against the querier API.
func (c *Client) LabelNamesRaw(matches []string, startTime, endTime time.Time) (*http.Response, []byte, error) {
u := &url.URL{
Scheme: "http",
Path: fmt.Sprintf("%s/api/prom/api/v1/labels", c.querierAddress),
}
q := u.Query()

for _, m := range matches {
q.Add("match[]", m)
}

if !startTime.IsZero() {
q.Set("start", FormatTime(startTime))
}
if !endTime.IsZero() {
q.Set("end", FormatTime(endTime))
}

u.RawQuery = q.Encode()
return c.query(u.String())
}

// LabelValuesRaw runs a label values request directly against the querier API.
func (c *Client) LabelValuesRaw(label string, matches []string, startTime, endTime time.Time) (*http.Response, []byte, error) {
u := &url.URL{
Scheme: "http",
Path: fmt.Sprintf("%s/api/prom/api/v1/label/%s/values", c.querierAddress, label),
}
q := u.Query()

for _, m := range matches {
q.Add("match[]", m)
}

if !startTime.IsZero() {
q.Set("start", FormatTime(startTime))
}
if !endTime.IsZero() {
q.Set("end", FormatTime(endTime))
}

u.RawQuery = q.Encode()
return c.query(u.String())
}

// RemoteRead runs a remote read query.
Expand Down Expand Up @@ -259,8 +337,8 @@ func (c *Client) LabelValues(label string, start, end time.Time, matches []strin
}

// LabelNames gets label names
func (c *Client) LabelNames(start, end time.Time) ([]string, error) {
result, _, err := c.querierClient.LabelNames(context.Background(), nil, start, end)
func (c *Client) LabelNames(start, end time.Time, matchers ...string) ([]string, error) {
result, _, err := c.querierClient.LabelNames(context.Background(), matchers, start, end)
return result, err
}

Expand Down
85 changes: 85 additions & 0 deletions integration/querier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package integration

import (
"fmt"
"net/http"
"strconv"
"strings"
"testing"
Expand Down Expand Up @@ -818,6 +819,90 @@ func TestQuerierWithBlocksStorageOnMissingBlocksFromStorage(t *testing.T) {
assert.Contains(t, err.Error(), "500")
}

func TestQuerierWithBlocksStorageLimits(t *testing.T) {
const blockRangePeriod = 5 * time.Second

s, err := e2e.NewScenario(networkName)
require.NoError(t, err)
defer s.Close()

// Configure the blocks storage to frequently compact TSDB head
// and ship blocks to the storage.
flags := mergeFlags(BlocksStorageFlags(), map[string]string{
"-blocks-storage.tsdb.block-ranges-period": blockRangePeriod.String(),
"-blocks-storage.tsdb.ship-interval": "1s",
"-blocks-storage.tsdb.retention-period": ((blockRangePeriod * 2) - 1).String(),
})

// Start dependencies.
consul := e2edb.NewConsul()
minio := e2edb.NewMinio(9000, flags["-blocks-storage.s3.bucket-name"])
require.NoError(t, s.StartAndWaitReady(consul, minio))

// Start Cortex components for the write path.
distributor := e2ecortex.NewDistributor("distributor", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags, "")
ingester := e2ecortex.NewIngester("ingester", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags, "")
require.NoError(t, s.StartAndWaitReady(distributor, ingester))

// Wait until the distributor has updated the ring.
require.NoError(t, distributor.WaitSumMetrics(e2e.Equals(512), "cortex_ring_tokens_total"))

// Push some series to Cortex.
c, err := e2ecortex.NewClient(distributor.HTTPEndpoint(), "", "", "", "user-1")
require.NoError(t, err)

seriesTimestamp := time.Now()
series2Timestamp := seriesTimestamp.Add(blockRangePeriod * 2)
series1, _ := generateSeries("series_1", seriesTimestamp, prompb.Label{Name: "job", Value: "test"})
series2, _ := generateSeries("series_2", series2Timestamp, prompb.Label{Name: "job", Value: "test"})

res, err := c.Push(series1)
require.NoError(t, err)
require.Equal(t, 200, res.StatusCode)

res, err = c.Push(series2)
require.NoError(t, err)
require.Equal(t, 200, res.StatusCode)

// Wait until the TSDB head is compacted and shipped to the storage.
// The shipped block contains the 1st series, while the 2ns series in in the head.
require.NoError(t, ingester.WaitSumMetrics(e2e.Equals(1), "cortex_ingester_shipper_uploads_total"))
require.NoError(t, ingester.WaitSumMetrics(e2e.Equals(2), "cortex_ingester_memory_series_created_total"))
require.NoError(t, ingester.WaitSumMetrics(e2e.Equals(1), "cortex_ingester_memory_series_removed_total"))
require.NoError(t, ingester.WaitSumMetrics(e2e.Equals(1), "cortex_ingester_memory_series"))

// Start the querier and store-gateway, and configure them to frequently sync blocks fast enough to trigger consistency check.
storeGateway := e2ecortex.NewStoreGateway("store-gateway", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), mergeFlags(flags, map[string]string{
"-blocks-storage.bucket-store.sync-interval": "5s",
"-querier.max-fetched-series-per-query": "1",
}), "")
querier := e2ecortex.NewQuerier("querier", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), mergeFlags(flags, map[string]string{
"-blocks-storage.bucket-store.sync-interval": "5s",
"-querier.max-fetched-series-per-query": "1",
}), "")
require.NoError(t, s.StartAndWaitReady(querier, storeGateway))

// Wait until the querier and store-gateway have updated the ring, and wait until the blocks are old enough for consistency check
require.NoError(t, querier.WaitSumMetrics(e2e.Equals(512*2), "cortex_ring_tokens_total"))
require.NoError(t, storeGateway.WaitSumMetrics(e2e.Equals(512), "cortex_ring_tokens_total"))
require.NoError(t, querier.WaitSumMetricsWithOptions(e2e.GreaterOrEqual(4), []string{"cortex_querier_blocks_scan_duration_seconds"}, e2e.WithMetricCount))

// Query back the series.
c, err = e2ecortex.NewClient("", querier.HTTPEndpoint(), "", "", "user-1")
require.NoError(t, err)

// We expect all queries hitting 422 exceeded series limit
resp, body, err := c.QueryRaw(`{job="test"}`, series2Timestamp)
require.NoError(t, err)
require.Equal(t, http.StatusUnprocessableEntity, resp.StatusCode)
require.Contains(t, string(body), "max number of series limit")

resp, body, err = c.SeriesRaw([]string{`{job="test"}`}, series2Timestamp.Add(-time.Hour), series2Timestamp)
require.NoError(t, err)
require.Equal(t, http.StatusUnprocessableEntity, resp.StatusCode)
require.Contains(t, string(body), "max number of series limit")
}

func TestQueryLimitsWithBlocksStorageRunningInMicroServices(t *testing.T) {
const blockRangePeriod = 5 * time.Second

Expand Down
4 changes: 2 additions & 2 deletions integration/query_frontend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,7 @@ func runQueryFrontendTest(t *testing.T, cfg queryFrontendTestConfig) {

// No need to repeat the test on missing metric name for each user.
if userID == 0 && cfg.testMissingMetricName {
res, body, err := c.QueryRaw("{instance=~\"hello.*\"}")
res, body, err := c.QueryRaw("{instance=~\"hello.*\"}", time.Now())
require.NoError(t, err)
require.Equal(t, 422, res.StatusCode)
require.Contains(t, string(body), "query must contain metric name")
Expand All @@ -317,7 +317,7 @@ func runQueryFrontendTest(t *testing.T, cfg queryFrontendTestConfig) {

// No need to repeat the test on Server-Timing header for each user.
if userID == 0 && cfg.queryStatsEnabled {
res, _, err := c.QueryRaw("{instance=~\"hello.*\"}")
res, _, err := c.QueryRaw("{instance=~\"hello.*\"}", time.Now())
require.NoError(t, err)
require.Regexp(t, "querier_wall_time;dur=[0-9.]*, response_time;dur=[0-9.]*$", res.Header.Values("Server-Timing")[0])
}
Expand Down
2 changes: 1 addition & 1 deletion integration/zone_aware_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ func TestZoneAwareReplication(t *testing.T) {
require.NoError(t, ingester3.Kill())

// Query back any series => fail (either because of a timeout or 500)
result, _, err := client.QueryRaw("series_1")
result, _, err := client.QueryRaw("series_1", time.Now())
if !errors.Is(err, context.DeadlineExceeded) {
require.NoError(t, err)
require.Equal(t, 500, result.StatusCode)
Expand Down
19 changes: 15 additions & 4 deletions pkg/querier/blocks_store_queryable.go
Original file line number Diff line number Diff line change
Expand Up @@ -639,6 +639,16 @@ func (q *blocksStoreQuerier) fetchSeriesFromStores(
}

if err != nil {
s, ok := status.FromError(err)
if !ok {
s, ok = status.FromError(errors.Cause(err))
}

if ok {
if s.Code() == codes.ResourceExhausted {
return validation.LimitError(s.Message())
}
}
return errors.Wrapf(err, "failed to receive series from %s", c.RemoteAddress())
}

Expand Down Expand Up @@ -763,10 +773,11 @@ func (q *blocksStoreQuerier) fetchLabelNamesFromStore(
namesResp, err := c.LabelNames(gCtx, req)
if err != nil {
if isRetryableError(err) {
level.Warn(spanLog).Log("err", errors.Wrapf(err, "failed to fetch series from %s due to retryable error", c.RemoteAddress()))
level.Warn(spanLog).Log("err", errors.Wrapf(err, "failed to fetch label names from %s due to retryable error", c.RemoteAddress()))
return nil
}
return errors.Wrapf(err, "failed to fetch series from %s", c.RemoteAddress())

return errors.Wrapf(err, "failed to fetch label names from %s", c.RemoteAddress())
}

myQueriedBlocks := []ulid.ULID(nil)
Expand Down Expand Up @@ -844,10 +855,10 @@ func (q *blocksStoreQuerier) fetchLabelValuesFromStore(
valuesResp, err := c.LabelValues(gCtx, req)
if err != nil {
if isRetryableError(err) {
level.Warn(spanLog).Log("err", errors.Wrapf(err, "failed to fetch series from %s due to retryable error", c.RemoteAddress()))
level.Warn(spanLog).Log("err", errors.Wrapf(err, "failed to fetch label values from %s due to retryable error", c.RemoteAddress()))
return nil
}
return errors.Wrapf(err, "failed to fetch series from %s", c.RemoteAddress())
return errors.Wrapf(err, "failed to fetch label values from %s", c.RemoteAddress())
}

myQueriedBlocks := []ulid.ULID(nil)
Expand Down
4 changes: 2 additions & 2 deletions pkg/querier/error_translate_queryable.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,13 @@ package querier
import (
"context"

"github.com/cortexproject/cortex/pkg/util/validation"

"github.com/gogo/status"
"github.com/pkg/errors"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/promql"
"github.com/prometheus/prometheus/storage"

"github.com/cortexproject/cortex/pkg/util/validation"
)

// TranslateToPromqlAPIError converts error to one of promql.Errors for consumption in PromQL API.
Expand Down

0 comments on commit 0f312ef

Please sign in to comment.