From 59f5948e585c06617a534baf2320ea80bb50168c Mon Sep 17 00:00:00 2001 From: Callum Styan Date: Tue, 4 Jun 2024 16:08:31 -0700 Subject: [PATCH 1/5] return http status codes from ingester Signed-off-by: Callum Styan --- pkg/ingester/ingester.go | 42 ++++++++++++++++++++++++++++++++++++++++ pkg/ingester/instance.go | 41 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 83 insertions(+) diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index 41b358906e0a1..6facee991c3e9 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -4,6 +4,7 @@ import ( "context" "flag" "fmt" + server_util "github.com/grafana/loki/v3/pkg/util/server" "math/rand" "net/http" "os" @@ -22,6 +23,7 @@ import ( "github.com/go-kit/log" "github.com/go-kit/log/level" "github.com/grafana/dskit/concurrency" + "github.com/grafana/dskit/httpgrpc" "github.com/grafana/dskit/modules" "github.com/grafana/dskit/multierror" "github.com/grafana/dskit/ring" @@ -1041,6 +1043,15 @@ func (i *Ingester) asyncStoreMaxLookBack() time.Duration { // GetChunkIDs is meant to be used only when using an async store like boltdb-shipper or tsdb. func (i *Ingester) GetChunkIDs(ctx context.Context, req *logproto.GetChunkIDsRequest) (*logproto.GetChunkIDsResponse, error) { + var status int + gcr, err := i.getChunkIDs(ctx, req) + status, err = server_util.ClientHTTPStatusAndError(err) + err = httpgrpc.Errorf(status, "%s", err.Error()) + return gcr, err +} + +// GetChunkIDs is meant to be used only when using an async store like boltdb-shipper or tsdb. +func (i *Ingester) getChunkIDs(ctx context.Context, req *logproto.GetChunkIDsRequest) (*logproto.GetChunkIDsResponse, error) { orgID, err := tenant.TenantID(ctx) if err != nil { return nil, err @@ -1168,6 +1179,14 @@ func (i *Ingester) Label(ctx context.Context, req *logproto.LabelRequest) (*logp // Series queries the ingester for log stream identifiers (label sets) matching a set of matchers func (i *Ingester) Series(ctx context.Context, req *logproto.SeriesRequest) (*logproto.SeriesResponse, error) { + var status int + sr, err := i.series(ctx, req) + status, err = server_util.ClientHTTPStatusAndError(err) + err = httpgrpc.Errorf(status, "%s", err.Error()) + return sr, err +} + +func (i *Ingester) series(ctx context.Context, req *logproto.SeriesRequest) (*logproto.SeriesResponse, error) { instanceID, err := tenant.TenantID(ctx) if err != nil { return nil, err @@ -1331,6 +1350,13 @@ func (i *Ingester) getInstances() []*instance { // Tail logs matching given query func (i *Ingester) Tail(req *logproto.TailRequest, queryServer logproto.Querier_TailServer) error { + var status int + err := i.tail(req, queryServer) + status, err = server_util.ClientHTTPStatusAndError(err) + err = httpgrpc.Errorf(status, "%s", err.Error()) + return err +} +func (i *Ingester) tail(req *logproto.TailRequest, queryServer logproto.Querier_TailServer) error { select { case <-i.tailersQuit: return errors.New("Ingester is stopping") @@ -1376,6 +1402,14 @@ func (i *Ingester) Tail(req *logproto.TailRequest, queryServer logproto.Querier_ // TailersCount returns count of active tail requests from a user func (i *Ingester) TailersCount(ctx context.Context, _ *logproto.TailersCountRequest) (*logproto.TailersCountResponse, error) { + var status int + tcr, err := i.tailersCount(ctx) + status, err = server_util.ClientHTTPStatusAndError(err) + err = httpgrpc.Errorf(status, "%s", err.Error()) + return tcr, err +} + +func (i *Ingester) tailersCount(ctx context.Context) (*logproto.TailersCountResponse, error) { instanceID, err := tenant.TenantID(ctx) if err != nil { return nil, err @@ -1431,6 +1465,14 @@ func (i *Ingester) GetDetectedFields(_ context.Context, r *logproto.DetectedFiel // GetDetectedLabels returns map of detected labels and unique values from this ingester func (i *Ingester) GetDetectedLabels(ctx context.Context, req *logproto.DetectedLabelsRequest) (*logproto.LabelToValuesResponse, error) { + var status int + lvr, err := i.getDetectedLabels(ctx, req) + status, err = server_util.ClientHTTPStatusAndError(err) + err = httpgrpc.Errorf(status, "%s", err.Error()) + return lvr, err +} + +func (i *Ingester) getDetectedLabels(ctx context.Context, req *logproto.DetectedLabelsRequest) (*logproto.LabelToValuesResponse, error) { userID, err := tenant.TenantID(ctx) if err != nil { return nil, err diff --git a/pkg/ingester/instance.go b/pkg/ingester/instance.go index 7f1ec78601fff..4ab4c232734e9 100644 --- a/pkg/ingester/instance.go +++ b/pkg/ingester/instance.go @@ -49,6 +49,7 @@ import ( "github.com/grafana/loki/v3/pkg/util/deletion" util_log "github.com/grafana/loki/v3/pkg/util/log" mathutil "github.com/grafana/loki/v3/pkg/util/math" + server_util "github.com/grafana/loki/v3/pkg/util/server" "github.com/grafana/loki/v3/pkg/validation" ) @@ -441,6 +442,14 @@ func (i *instance) getLabelsFromFingerprint(fp model.Fingerprint) labels.Labels } func (i *instance) Query(ctx context.Context, req logql.SelectLogParams) (iter.EntryIterator, error) { + var status int + it, err := i.query(ctx, req) + status, err = server_util.ClientHTTPStatusAndError(err) + err = httpgrpc.Errorf(status, "%s", err.Error()) + return it, err +} + +func (i *instance) query(ctx context.Context, req logql.SelectLogParams) (iter.EntryIterator, error) { expr, err := req.LogSelector() if err != nil { return nil, err @@ -495,6 +504,14 @@ func (i *instance) Query(ctx context.Context, req logql.SelectLogParams) (iter.E } func (i *instance) QuerySample(ctx context.Context, req logql.SelectSampleParams) (iter.SampleIterator, error) { + var status int + it, err := i.querySample(ctx, req) + status, err = server_util.ClientHTTPStatusAndError(err) + err = httpgrpc.Errorf(status, "%s", err.Error()) + return it, err +} + +func (i *instance) querySample(ctx context.Context, req logql.SelectSampleParams) (iter.SampleIterator, error) { expr, err := req.Expr() if err != nil { return nil, err @@ -556,6 +573,14 @@ func (i *instance) QuerySample(ctx context.Context, req logql.SelectSampleParams // If label matchers are given only the matching streams are fetched from the index. // The label names or values are then retrieved from those matching streams. func (i *instance) Label(ctx context.Context, req *logproto.LabelRequest, matchers ...*labels.Matcher) (*logproto.LabelResponse, error) { + var status int + lr, err := i.label(ctx, req) + status, err = server_util.ClientHTTPStatusAndError(err) + err = httpgrpc.Errorf(status, "%s", err.Error()) + return lr, err +} + +func (i *instance) label(ctx context.Context, req *logproto.LabelRequest, matchers ...*labels.Matcher) (*logproto.LabelResponse, error) { if len(matchers) == 0 { var labels []string if req.Values { @@ -709,6 +734,14 @@ func (i *instance) Series(ctx context.Context, req *logproto.SeriesRequest) (*lo } func (i *instance) GetStats(ctx context.Context, req *logproto.IndexStatsRequest) (*logproto.IndexStatsResponse, error) { + var status int + isr, err := i.getStats(ctx, req) + status, err = server_util.ClientHTTPStatusAndError(err) + err = httpgrpc.Errorf(status, "%s", err.Error()) + return isr, err +} + +func (i *instance) getStats(ctx context.Context, req *logproto.IndexStatsRequest) (*logproto.IndexStatsResponse, error) { matchers, err := syntax.ParseMatchers(req.Matchers, true) if err != nil { return nil, err @@ -765,6 +798,14 @@ func (i *instance) GetStats(ctx context.Context, req *logproto.IndexStatsRequest } func (i *instance) GetVolume(ctx context.Context, req *logproto.VolumeRequest) (*logproto.VolumeResponse, error) { + var status int + vr, err := i.getVolume(ctx, req) + status, err = server_util.ClientHTTPStatusAndError(err) + err = httpgrpc.Errorf(status, "%s", err.Error()) + return vr, err +} + +func (i *instance) getVolume(ctx context.Context, req *logproto.VolumeRequest) (*logproto.VolumeResponse, error) { matchers, err := syntax.ParseMatchers(req.Matchers, true) if err != nil && req.Matchers != seriesvolume.MatchAny { return nil, err From a2511639ae5322be5c6c3daf2c089d40e6e62bc0 Mon Sep 17 00:00:00 2001 From: Callum Styan Date: Wed, 5 Jun 2024 12:52:42 -0700 Subject: [PATCH 2/5] simplify the error wrapping Signed-off-by: Callum Styan --- pkg/ingester/ingester.go | 21 +++++---------------- pkg/ingester/instance.go | 20 +++++--------------- pkg/util/server/error.go | 9 +++++++++ 3 files changed, 19 insertions(+), 31 deletions(-) diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index 6facee991c3e9..a0c652505da90 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -23,7 +23,6 @@ import ( "github.com/go-kit/log" "github.com/go-kit/log/level" "github.com/grafana/dskit/concurrency" - "github.com/grafana/dskit/httpgrpc" "github.com/grafana/dskit/modules" "github.com/grafana/dskit/multierror" "github.com/grafana/dskit/ring" @@ -1043,10 +1042,8 @@ func (i *Ingester) asyncStoreMaxLookBack() time.Duration { // GetChunkIDs is meant to be used only when using an async store like boltdb-shipper or tsdb. func (i *Ingester) GetChunkIDs(ctx context.Context, req *logproto.GetChunkIDsRequest) (*logproto.GetChunkIDsResponse, error) { - var status int gcr, err := i.getChunkIDs(ctx, req) - status, err = server_util.ClientHTTPStatusAndError(err) - err = httpgrpc.Errorf(status, "%s", err.Error()) + err = server_util.ClientGrpcStatusAndError(err) return gcr, err } @@ -1179,10 +1176,8 @@ func (i *Ingester) Label(ctx context.Context, req *logproto.LabelRequest) (*logp // Series queries the ingester for log stream identifiers (label sets) matching a set of matchers func (i *Ingester) Series(ctx context.Context, req *logproto.SeriesRequest) (*logproto.SeriesResponse, error) { - var status int sr, err := i.series(ctx, req) - status, err = server_util.ClientHTTPStatusAndError(err) - err = httpgrpc.Errorf(status, "%s", err.Error()) + err = server_util.ClientGrpcStatusAndError(err) return sr, err } @@ -1350,10 +1345,8 @@ func (i *Ingester) getInstances() []*instance { // Tail logs matching given query func (i *Ingester) Tail(req *logproto.TailRequest, queryServer logproto.Querier_TailServer) error { - var status int err := i.tail(req, queryServer) - status, err = server_util.ClientHTTPStatusAndError(err) - err = httpgrpc.Errorf(status, "%s", err.Error()) + err = server_util.ClientGrpcStatusAndError(err) return err } func (i *Ingester) tail(req *logproto.TailRequest, queryServer logproto.Querier_TailServer) error { @@ -1402,10 +1395,8 @@ func (i *Ingester) tail(req *logproto.TailRequest, queryServer logproto.Querier_ // TailersCount returns count of active tail requests from a user func (i *Ingester) TailersCount(ctx context.Context, _ *logproto.TailersCountRequest) (*logproto.TailersCountResponse, error) { - var status int tcr, err := i.tailersCount(ctx) - status, err = server_util.ClientHTTPStatusAndError(err) - err = httpgrpc.Errorf(status, "%s", err.Error()) + err = server_util.ClientGrpcStatusAndError(err) return tcr, err } @@ -1465,10 +1456,8 @@ func (i *Ingester) GetDetectedFields(_ context.Context, r *logproto.DetectedFiel // GetDetectedLabels returns map of detected labels and unique values from this ingester func (i *Ingester) GetDetectedLabels(ctx context.Context, req *logproto.DetectedLabelsRequest) (*logproto.LabelToValuesResponse, error) { - var status int lvr, err := i.getDetectedLabels(ctx, req) - status, err = server_util.ClientHTTPStatusAndError(err) - err = httpgrpc.Errorf(status, "%s", err.Error()) + err = server_util.ClientGrpcStatusAndError(err) return lvr, err } diff --git a/pkg/ingester/instance.go b/pkg/ingester/instance.go index 4ab4c232734e9..e0f2c0b993d11 100644 --- a/pkg/ingester/instance.go +++ b/pkg/ingester/instance.go @@ -442,10 +442,8 @@ func (i *instance) getLabelsFromFingerprint(fp model.Fingerprint) labels.Labels } func (i *instance) Query(ctx context.Context, req logql.SelectLogParams) (iter.EntryIterator, error) { - var status int it, err := i.query(ctx, req) - status, err = server_util.ClientHTTPStatusAndError(err) - err = httpgrpc.Errorf(status, "%s", err.Error()) + err = server_util.ClientGrpcStatusAndError(err) return it, err } @@ -504,10 +502,8 @@ func (i *instance) query(ctx context.Context, req logql.SelectLogParams) (iter.E } func (i *instance) QuerySample(ctx context.Context, req logql.SelectSampleParams) (iter.SampleIterator, error) { - var status int it, err := i.querySample(ctx, req) - status, err = server_util.ClientHTTPStatusAndError(err) - err = httpgrpc.Errorf(status, "%s", err.Error()) + err = server_util.ClientGrpcStatusAndError(err) return it, err } @@ -573,10 +569,8 @@ func (i *instance) querySample(ctx context.Context, req logql.SelectSampleParams // If label matchers are given only the matching streams are fetched from the index. // The label names or values are then retrieved from those matching streams. func (i *instance) Label(ctx context.Context, req *logproto.LabelRequest, matchers ...*labels.Matcher) (*logproto.LabelResponse, error) { - var status int lr, err := i.label(ctx, req) - status, err = server_util.ClientHTTPStatusAndError(err) - err = httpgrpc.Errorf(status, "%s", err.Error()) + err = server_util.ClientGrpcStatusAndError(err) return lr, err } @@ -734,10 +728,8 @@ func (i *instance) Series(ctx context.Context, req *logproto.SeriesRequest) (*lo } func (i *instance) GetStats(ctx context.Context, req *logproto.IndexStatsRequest) (*logproto.IndexStatsResponse, error) { - var status int isr, err := i.getStats(ctx, req) - status, err = server_util.ClientHTTPStatusAndError(err) - err = httpgrpc.Errorf(status, "%s", err.Error()) + err = server_util.ClientGrpcStatusAndError(err) return isr, err } @@ -798,10 +790,8 @@ func (i *instance) getStats(ctx context.Context, req *logproto.IndexStatsRequest } func (i *instance) GetVolume(ctx context.Context, req *logproto.VolumeRequest) (*logproto.VolumeResponse, error) { - var status int vr, err := i.getVolume(ctx, req) - status, err = server_util.ClientHTTPStatusAndError(err) - err = httpgrpc.Errorf(status, "%s", err.Error()) + err = server_util.ClientGrpcStatusAndError(err) return vr, err } diff --git a/pkg/util/server/error.go b/pkg/util/server/error.go index c120a79176f85..7326f7cecb6cf 100644 --- a/pkg/util/server/error.go +++ b/pkg/util/server/error.go @@ -27,6 +27,15 @@ const ( ErrDeadlineExceeded = "Request timed out, decrease the duration of the request or add more label matchers (prefer exact match over regex match) to reduce the amount of data processed." ) +func ClientGrpcStatusAndError(err error) error { + if err == nil { + return nil + } + + status, newErr := ClientHTTPStatusAndError(err) + return httpgrpc.Errorf(status, "%s", newErr.Error()) +} + // WriteError write a go error with the correct status code. func WriteError(err error, w http.ResponseWriter) { status, cerr := ClientHTTPStatusAndError(err) From b97e8a6009f1cff19fd4557a56eaf57bcb70e8b3 Mon Sep 17 00:00:00 2001 From: Callum Styan Date: Wed, 5 Jun 2024 13:24:39 -0700 Subject: [PATCH 3/5] pass matchers Signed-off-by: Callum Styan --- pkg/ingester/instance.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/ingester/instance.go b/pkg/ingester/instance.go index e0f2c0b993d11..ecef3f10347b8 100644 --- a/pkg/ingester/instance.go +++ b/pkg/ingester/instance.go @@ -569,7 +569,7 @@ func (i *instance) querySample(ctx context.Context, req logql.SelectSampleParams // If label matchers are given only the matching streams are fetched from the index. // The label names or values are then retrieved from those matching streams. func (i *instance) Label(ctx context.Context, req *logproto.LabelRequest, matchers ...*labels.Matcher) (*logproto.LabelResponse, error) { - lr, err := i.label(ctx, req) + lr, err := i.label(ctx, req, matchers...) err = server_util.ClientGrpcStatusAndError(err) return lr, err } From 225d24f8151f2f9d307e26904b8217650c18f8ef Mon Sep 17 00:00:00 2001 From: Callum Styan Date: Wed, 5 Jun 2024 13:34:19 -0700 Subject: [PATCH 4/5] fix lint Signed-off-by: Callum Styan --- pkg/ingester/ingester.go | 17 ++++++++++++++++- 1 file changed, 16 insertions(+), 1 deletion(-) diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index a0c652505da90..c79471620e46e 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -4,7 +4,6 @@ import ( "context" "flag" "fmt" - server_util "github.com/grafana/loki/v3/pkg/util/server" "math/rand" "net/http" "os" @@ -35,6 +34,8 @@ import ( "github.com/prometheus/prometheus/model/labels" "google.golang.org/grpc/health/grpc_health_v1" + server_util "github.com/grafana/loki/v3/pkg/util/server" + "github.com/grafana/loki/v3/pkg/analytics" "github.com/grafana/loki/v3/pkg/chunkenc" "github.com/grafana/loki/v3/pkg/distributor/writefailures" @@ -1119,21 +1120,28 @@ func (i *Ingester) Label(ctx context.Context, req *logproto.LabelRequest) (*logp pprof.Do(ctx, pprof.Labels("path", "read", "type", "labels", "tenant", userID), func(c context.Context) { resp, err = instance.Label(ctx, req, matchers...) if err != nil { + fmt.Println("error: ", err) return } if req.Start == nil { + fmt.Println("asdf") + return } // Only continue if the active index type is one of async index store types or QueryStore flag is true. asyncStoreMaxLookBack := i.asyncStoreMaxLookBack() if asyncStoreMaxLookBack == 0 && !i.cfg.QueryStore { + fmt.Println("qwer") + return } var cs storage.Store var ok bool if cs, ok = i.store.(storage.Store); !ok { + fmt.Println("1234") + return } @@ -1145,6 +1153,8 @@ func (i *Ingester) Label(ctx context.Context, req *logproto.LabelRequest) (*logp start := adjustQueryStartTime(maxLookBackPeriod, *req.Start, time.Now()) if start.After(*req.End) { // The request is older than we are allowed to query the store, just return what we have. + fmt.Println("ttttt") + return } from, through := model.TimeFromUnixNano(start.UnixNano()), model.TimeFromUnixNano(req.End.UnixNano()) @@ -1152,11 +1162,15 @@ func (i *Ingester) Label(ctx context.Context, req *logproto.LabelRequest) (*logp if req.Values { storeValues, err = cs.LabelValuesForMetricName(ctx, userID, from, through, "logs", req.Name, matchers...) if err != nil { + fmt.Println("error2: ", err) + return } } else { storeValues, err = cs.LabelNamesForMetricName(ctx, userID, from, through, "logs", matchers...) if err != nil { + fmt.Println("error3: ", err) + return } } @@ -1168,6 +1182,7 @@ func (i *Ingester) Label(ctx context.Context, req *logproto.LabelRequest) (*logp if err != nil { return nil, err } + fmt.Println("resp values: ", resp.Values) return &logproto.LabelResponse{ Values: util.MergeStringLists(resp.Values, storeValues), From a4cc4e2757a74d279522acf251ef035a1aab1ab8 Mon Sep 17 00:00:00 2001 From: Callum Styan Date: Thu, 6 Jun 2024 10:24:46 -0700 Subject: [PATCH 5/5] remove unintended println Signed-off-by: Callum Styan --- pkg/ingester/ingester.go | 14 -------------- 1 file changed, 14 deletions(-) diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index c79471620e46e..f892a2a7a89ce 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -1120,28 +1120,21 @@ func (i *Ingester) Label(ctx context.Context, req *logproto.LabelRequest) (*logp pprof.Do(ctx, pprof.Labels("path", "read", "type", "labels", "tenant", userID), func(c context.Context) { resp, err = instance.Label(ctx, req, matchers...) if err != nil { - fmt.Println("error: ", err) return } if req.Start == nil { - fmt.Println("asdf") - return } // Only continue if the active index type is one of async index store types or QueryStore flag is true. asyncStoreMaxLookBack := i.asyncStoreMaxLookBack() if asyncStoreMaxLookBack == 0 && !i.cfg.QueryStore { - fmt.Println("qwer") - return } var cs storage.Store var ok bool if cs, ok = i.store.(storage.Store); !ok { - fmt.Println("1234") - return } @@ -1153,8 +1146,6 @@ func (i *Ingester) Label(ctx context.Context, req *logproto.LabelRequest) (*logp start := adjustQueryStartTime(maxLookBackPeriod, *req.Start, time.Now()) if start.After(*req.End) { // The request is older than we are allowed to query the store, just return what we have. - fmt.Println("ttttt") - return } from, through := model.TimeFromUnixNano(start.UnixNano()), model.TimeFromUnixNano(req.End.UnixNano()) @@ -1162,15 +1153,11 @@ func (i *Ingester) Label(ctx context.Context, req *logproto.LabelRequest) (*logp if req.Values { storeValues, err = cs.LabelValuesForMetricName(ctx, userID, from, through, "logs", req.Name, matchers...) if err != nil { - fmt.Println("error2: ", err) - return } } else { storeValues, err = cs.LabelNamesForMetricName(ctx, userID, from, through, "logs", matchers...) if err != nil { - fmt.Println("error3: ", err) - return } } @@ -1182,7 +1169,6 @@ func (i *Ingester) Label(ctx context.Context, req *logproto.LabelRequest) (*logp if err != nil { return nil, err } - fmt.Println("resp values: ", resp.Values) return &logproto.LabelResponse{ Values: util.MergeStringLists(resp.Values, storeValues),