diff --git a/CHANGELOG.md b/CHANGELOG.md index 44abcbd5ed..bd1ab53dda 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,7 @@ ## master / unreleased * [BUGFIX] Ruler: Validate if rule group can be safely converted back to rule group yaml from protobuf message #5265 +* [BUGFIX] Querier: Convert gRPC `ResourceExhausted` status code from store gateway to 422 limit error. #5286 ## 1.15.0 2023-04-19 diff --git a/integration/e2ecortex/client.go b/integration/e2ecortex/client.go index adfc99faf4..7912f28c51 100644 --- a/integration/e2ecortex/client.go +++ b/integration/e2ecortex/client.go @@ -150,10 +150,88 @@ func (c *Client) QueryRangeRaw(query string, start, end time.Time, step time.Dur } // QueryRaw runs a query directly against the querier API. -func (c *Client) QueryRaw(query string) (*http.Response, []byte, error) { - addr := fmt.Sprintf("http://%s/api/prom/api/v1/query?query=%s", c.querierAddress, url.QueryEscape(query)) +func (c *Client) QueryRaw(query string, ts time.Time) (*http.Response, []byte, error) { + u := &url.URL{ + Scheme: "http", + Path: fmt.Sprintf("%s/api/prom/api/v1/query", c.querierAddress), + } + q := u.Query() + q.Set("query", query) - return c.query(addr) + if !ts.IsZero() { + q.Set("time", FormatTime(ts)) + } + u.RawQuery = q.Encode() + return c.query(u.String()) +} + +// SeriesRaw runs a series request directly against the querier API. +func (c *Client) SeriesRaw(matches []string, startTime, endTime time.Time) (*http.Response, []byte, error) { + u := &url.URL{ + Scheme: "http", + Path: fmt.Sprintf("%s/api/prom/api/v1/series", c.querierAddress), + } + q := u.Query() + + for _, m := range matches { + q.Add("match[]", m) + } + + if !startTime.IsZero() { + q.Set("start", FormatTime(startTime)) + } + if !endTime.IsZero() { + q.Set("end", FormatTime(endTime)) + } + + u.RawQuery = q.Encode() + return c.query(u.String()) +} + +// LabelNamesRaw runs a label names request directly against the querier API. +func (c *Client) LabelNamesRaw(matches []string, startTime, endTime time.Time) (*http.Response, []byte, error) { + u := &url.URL{ + Scheme: "http", + Path: fmt.Sprintf("%s/api/prom/api/v1/labels", c.querierAddress), + } + q := u.Query() + + for _, m := range matches { + q.Add("match[]", m) + } + + if !startTime.IsZero() { + q.Set("start", FormatTime(startTime)) + } + if !endTime.IsZero() { + q.Set("end", FormatTime(endTime)) + } + + u.RawQuery = q.Encode() + return c.query(u.String()) +} + +// LabelValuesRaw runs a label values request directly against the querier API. +func (c *Client) LabelValuesRaw(label string, matches []string, startTime, endTime time.Time) (*http.Response, []byte, error) { + u := &url.URL{ + Scheme: "http", + Path: fmt.Sprintf("%s/api/prom/api/v1/label/%s/values", c.querierAddress, label), + } + q := u.Query() + + for _, m := range matches { + q.Add("match[]", m) + } + + if !startTime.IsZero() { + q.Set("start", FormatTime(startTime)) + } + if !endTime.IsZero() { + q.Set("end", FormatTime(endTime)) + } + + u.RawQuery = q.Encode() + return c.query(u.String()) } // RemoteRead runs a remote read query. @@ -259,8 +337,8 @@ func (c *Client) LabelValues(label string, start, end time.Time, matches []strin } // LabelNames gets label names -func (c *Client) LabelNames(start, end time.Time) ([]string, error) { - result, _, err := c.querierClient.LabelNames(context.Background(), nil, start, end) +func (c *Client) LabelNames(start, end time.Time, matchers ...string) ([]string, error) { + result, _, err := c.querierClient.LabelNames(context.Background(), matchers, start, end) return result, err } diff --git a/integration/querier_test.go b/integration/querier_test.go index 651003a9ec..26d0e62c21 100644 --- a/integration/querier_test.go +++ b/integration/querier_test.go @@ -5,6 +5,7 @@ package integration import ( "fmt" + "net/http" "strconv" "strings" "testing" @@ -818,6 +819,90 @@ func TestQuerierWithBlocksStorageOnMissingBlocksFromStorage(t *testing.T) { assert.Contains(t, err.Error(), "500") } +func TestQuerierWithBlocksStorageLimits(t *testing.T) { + const blockRangePeriod = 5 * time.Second + + s, err := e2e.NewScenario(networkName) + require.NoError(t, err) + defer s.Close() + + // Configure the blocks storage to frequently compact TSDB head + // and ship blocks to the storage. + flags := mergeFlags(BlocksStorageFlags(), map[string]string{ + "-blocks-storage.tsdb.block-ranges-period": blockRangePeriod.String(), + "-blocks-storage.tsdb.ship-interval": "1s", + "-blocks-storage.tsdb.retention-period": ((blockRangePeriod * 2) - 1).String(), + }) + + // Start dependencies. + consul := e2edb.NewConsul() + minio := e2edb.NewMinio(9000, flags["-blocks-storage.s3.bucket-name"]) + require.NoError(t, s.StartAndWaitReady(consul, minio)) + + // Start Cortex components for the write path. + distributor := e2ecortex.NewDistributor("distributor", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags, "") + ingester := e2ecortex.NewIngester("ingester", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags, "") + require.NoError(t, s.StartAndWaitReady(distributor, ingester)) + + // Wait until the distributor has updated the ring. + require.NoError(t, distributor.WaitSumMetrics(e2e.Equals(512), "cortex_ring_tokens_total")) + + // Push some series to Cortex. + c, err := e2ecortex.NewClient(distributor.HTTPEndpoint(), "", "", "", "user-1") + require.NoError(t, err) + + seriesTimestamp := time.Now() + series2Timestamp := seriesTimestamp.Add(blockRangePeriod * 2) + series1, _ := generateSeries("series_1", seriesTimestamp, prompb.Label{Name: "job", Value: "test"}) + series2, _ := generateSeries("series_2", series2Timestamp, prompb.Label{Name: "job", Value: "test"}) + + res, err := c.Push(series1) + require.NoError(t, err) + require.Equal(t, 200, res.StatusCode) + + res, err = c.Push(series2) + require.NoError(t, err) + require.Equal(t, 200, res.StatusCode) + + // Wait until the TSDB head is compacted and shipped to the storage. + // The shipped block contains the 1st series, while the 2ns series in in the head. + require.NoError(t, ingester.WaitSumMetrics(e2e.Equals(1), "cortex_ingester_shipper_uploads_total")) + require.NoError(t, ingester.WaitSumMetrics(e2e.Equals(2), "cortex_ingester_memory_series_created_total")) + require.NoError(t, ingester.WaitSumMetrics(e2e.Equals(1), "cortex_ingester_memory_series_removed_total")) + require.NoError(t, ingester.WaitSumMetrics(e2e.Equals(1), "cortex_ingester_memory_series")) + + // Start the querier and store-gateway, and configure them to frequently sync blocks fast enough to trigger consistency check. + storeGateway := e2ecortex.NewStoreGateway("store-gateway", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), mergeFlags(flags, map[string]string{ + "-blocks-storage.bucket-store.sync-interval": "5s", + "-querier.max-fetched-series-per-query": "1", + }), "") + querier := e2ecortex.NewQuerier("querier", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), mergeFlags(flags, map[string]string{ + "-blocks-storage.bucket-store.sync-interval": "5s", + "-querier.max-fetched-series-per-query": "1", + }), "") + require.NoError(t, s.StartAndWaitReady(querier, storeGateway)) + + // Wait until the querier and store-gateway have updated the ring, and wait until the blocks are old enough for consistency check + require.NoError(t, querier.WaitSumMetrics(e2e.Equals(512*2), "cortex_ring_tokens_total")) + require.NoError(t, storeGateway.WaitSumMetrics(e2e.Equals(512), "cortex_ring_tokens_total")) + require.NoError(t, querier.WaitSumMetricsWithOptions(e2e.GreaterOrEqual(4), []string{"cortex_querier_blocks_scan_duration_seconds"}, e2e.WithMetricCount)) + + // Query back the series. + c, err = e2ecortex.NewClient("", querier.HTTPEndpoint(), "", "", "user-1") + require.NoError(t, err) + + // We expect all queries hitting 422 exceeded series limit + resp, body, err := c.QueryRaw(`{job="test"}`, series2Timestamp) + require.NoError(t, err) + require.Equal(t, http.StatusUnprocessableEntity, resp.StatusCode) + require.Contains(t, string(body), "max number of series limit") + + resp, body, err = c.SeriesRaw([]string{`{job="test"}`}, series2Timestamp.Add(-time.Hour), series2Timestamp) + require.NoError(t, err) + require.Equal(t, http.StatusUnprocessableEntity, resp.StatusCode) + require.Contains(t, string(body), "max number of series limit") +} + func TestQueryLimitsWithBlocksStorageRunningInMicroServices(t *testing.T) { const blockRangePeriod = 5 * time.Second diff --git a/integration/query_frontend_test.go b/integration/query_frontend_test.go index 5177eb1973..cd93908cd6 100644 --- a/integration/query_frontend_test.go +++ b/integration/query_frontend_test.go @@ -293,7 +293,7 @@ func runQueryFrontendTest(t *testing.T, cfg queryFrontendTestConfig) { // No need to repeat the test on missing metric name for each user. if userID == 0 && cfg.testMissingMetricName { - res, body, err := c.QueryRaw("{instance=~\"hello.*\"}") + res, body, err := c.QueryRaw("{instance=~\"hello.*\"}", time.Now()) require.NoError(t, err) require.Equal(t, 422, res.StatusCode) require.Contains(t, string(body), "query must contain metric name") @@ -317,7 +317,7 @@ func runQueryFrontendTest(t *testing.T, cfg queryFrontendTestConfig) { // No need to repeat the test on Server-Timing header for each user. if userID == 0 && cfg.queryStatsEnabled { - res, _, err := c.QueryRaw("{instance=~\"hello.*\"}") + res, _, err := c.QueryRaw("{instance=~\"hello.*\"}", time.Now()) require.NoError(t, err) require.Regexp(t, "querier_wall_time;dur=[0-9.]*, response_time;dur=[0-9.]*$", res.Header.Values("Server-Timing")[0]) } diff --git a/integration/query_fuzz_test.go b/integration/query_fuzz_test.go index 7b19321c32..1562dbf2ee 100644 --- a/integration/query_fuzz_test.go +++ b/integration/query_fuzz_test.go @@ -133,7 +133,6 @@ func TestVerticalShardingFuzz(t *testing.T) { opts := []promqlsmith.Option{ promqlsmith.WithEnableOffset(true), promqlsmith.WithEnableAtModifier(true), - promqlsmith.WithEnableVectorMatching(true), } ps := promqlsmith.New(rnd, lbls, opts...) diff --git a/integration/zone_aware_test.go b/integration/zone_aware_test.go index 7cb0772213..c4d7937478 100644 --- a/integration/zone_aware_test.go +++ b/integration/zone_aware_test.go @@ -135,7 +135,7 @@ func TestZoneAwareReplication(t *testing.T) { require.NoError(t, ingester3.Kill()) // Query back any series => fail (either because of a timeout or 500) - result, _, err := client.QueryRaw("series_1") + result, _, err := client.QueryRaw("series_1", time.Now()) if !errors.Is(err, context.DeadlineExceeded) { require.NoError(t, err) require.Equal(t, 500, result.StatusCode) diff --git a/pkg/querier/blocks_store_queryable.go b/pkg/querier/blocks_store_queryable.go index 468c3b8e97..aa100bf683 100644 --- a/pkg/querier/blocks_store_queryable.go +++ b/pkg/querier/blocks_store_queryable.go @@ -639,6 +639,16 @@ func (q *blocksStoreQuerier) fetchSeriesFromStores( } if err != nil { + s, ok := status.FromError(err) + if !ok { + s, ok = status.FromError(errors.Cause(err)) + } + + if ok { + if s.Code() == codes.ResourceExhausted { + return validation.LimitError(s.Message()) + } + } return errors.Wrapf(err, "failed to receive series from %s", c.RemoteAddress()) } @@ -763,10 +773,11 @@ func (q *blocksStoreQuerier) fetchLabelNamesFromStore( namesResp, err := c.LabelNames(gCtx, req) if err != nil { if isRetryableError(err) { - level.Warn(spanLog).Log("err", errors.Wrapf(err, "failed to fetch series from %s due to retryable error", c.RemoteAddress())) + level.Warn(spanLog).Log("err", errors.Wrapf(err, "failed to fetch label names from %s due to retryable error", c.RemoteAddress())) return nil } - return errors.Wrapf(err, "failed to fetch series from %s", c.RemoteAddress()) + + return errors.Wrapf(err, "failed to fetch label names from %s", c.RemoteAddress()) } myQueriedBlocks := []ulid.ULID(nil) @@ -844,10 +855,10 @@ func (q *blocksStoreQuerier) fetchLabelValuesFromStore( valuesResp, err := c.LabelValues(gCtx, req) if err != nil { if isRetryableError(err) { - level.Warn(spanLog).Log("err", errors.Wrapf(err, "failed to fetch series from %s due to retryable error", c.RemoteAddress())) + level.Warn(spanLog).Log("err", errors.Wrapf(err, "failed to fetch label values from %s due to retryable error", c.RemoteAddress())) return nil } - return errors.Wrapf(err, "failed to fetch series from %s", c.RemoteAddress()) + return errors.Wrapf(err, "failed to fetch label values from %s", c.RemoteAddress()) } myQueriedBlocks := []ulid.ULID(nil) diff --git a/pkg/querier/error_translate_queryable.go b/pkg/querier/error_translate_queryable.go index 43ace739b5..c0a14a3723 100644 --- a/pkg/querier/error_translate_queryable.go +++ b/pkg/querier/error_translate_queryable.go @@ -3,13 +3,13 @@ package querier import ( "context" - "github.com/cortexproject/cortex/pkg/util/validation" - "github.com/gogo/status" "github.com/pkg/errors" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/promql" "github.com/prometheus/prometheus/storage" + + "github.com/cortexproject/cortex/pkg/util/validation" ) // TranslateToPromqlAPIError converts error to one of promql.Errors for consumption in PromQL API.