diff --git a/Makefile b/Makefile index f81e2bba65..e8d31558ff 100644 --- a/Makefile +++ b/Makefile @@ -268,7 +268,7 @@ $(BIN)/protoc-gen-go: Makefile go.mod $(BIN)/protoc-gen-connect-go: Makefile go.mod @mkdir -p $(@D) - GOBIN=$(abspath $(@D)) $(GO) install connectrpc.com/connect/cmd/protoc-gen-connect-go@v1.14.0 + GOBIN=$(abspath $(@D)) $(GO) install connectrpc.com/connect/cmd/protoc-gen-connect-go@v1.16.2 $(BIN)/protoc-gen-connect-go-mux: Makefile go.mod @mkdir -p $(@D) diff --git a/api/go.mod b/api/go.mod index f784c4156b..8954335576 100644 --- a/api/go.mod +++ b/api/go.mod @@ -3,7 +3,7 @@ module github.com/grafana/pyroscope/api go 1.21 require ( - connectrpc.com/connect v1.14.0 + connectrpc.com/connect v1.16.2 github.com/gorilla/mux v1.8.0 github.com/grpc-ecosystem/grpc-gateway/v2 v2.19.0 github.com/planetscale/vtprotobuf v0.6.0 diff --git a/api/go.sum b/api/go.sum index 18c47dc99c..a19aa1f4c4 100644 --- a/api/go.sum +++ b/api/go.sum @@ -1,5 +1,5 @@ -connectrpc.com/connect v1.14.0 h1:PDS+J7uoz5Oui2VEOMcfz6Qft7opQM9hPiKvtGC01pA= -connectrpc.com/connect v1.14.0/go.mod h1:uoAq5bmhhn43TwhaKdGKN/bZcGtzPW1v+ngDTn5u+8s= +connectrpc.com/connect v1.16.2 h1:ybd6y+ls7GOlb7Bh5C8+ghA6SvCBajHwxssO2CGFjqE= +connectrpc.com/connect v1.16.2/go.mod h1:n2kgwskMHXC+lVqb18wngEpF95ldBHXjZYJussz5FRc= github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= github.com/golang/protobuf v1.5.3 h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg= github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= diff --git a/cmd/profilecli/query.go b/cmd/profilecli/query.go index 0e58f7adf1..aff60f16a3 100644 --- a/cmd/profilecli/query.go +++ b/cmd/profilecli/query.go @@ -20,6 +20,7 @@ import ( "github.com/grafana/pyroscope/api/gen/proto/go/querier/v1/querierv1connect" "github.com/grafana/pyroscope/api/gen/proto/go/storegateway/v1/storegatewayv1connect" typesv1 "github.com/grafana/pyroscope/api/gen/proto/go/types/v1" + connectapi "github.com/grafana/pyroscope/pkg/api/connect" "github.com/grafana/pyroscope/pkg/operations" "github.com/k0kubun/pp/v3" "github.com/klauspost/compress/gzip" @@ -37,6 +38,7 @@ func (c *phlareClient) queryClient() querierv1connect.QuerierServiceClient { return querierv1connect.NewQuerierServiceClient( c.httpClient(), c.URL, + connectapi.DefaultClientOptions()..., ) } @@ -44,6 +46,7 @@ func (c *phlareClient) storeGatewayClient() storegatewayv1connect.StoreGatewaySe return storegatewayv1connect.NewStoreGatewayServiceClient( c.httpClient(), c.URL, + connectapi.DefaultClientOptions()..., ) } @@ -51,6 +54,7 @@ func (c *phlareClient) ingesterClient() ingesterv1connect.IngesterServiceClient return ingesterv1connect.NewIngesterServiceClient( c.httpClient(), c.URL, + connectapi.DefaultClientOptions()..., ) } diff --git a/cmd/profilecli/upload.go b/cmd/profilecli/upload.go index 9341d67631..19ca7a18f4 100644 --- a/cmd/profilecli/upload.go +++ b/cmd/profilecli/upload.go @@ -11,6 +11,7 @@ import ( pushv1 "github.com/grafana/pyroscope/api/gen/proto/go/push/v1" "github.com/grafana/pyroscope/api/gen/proto/go/push/v1/pushv1connect" + connectapi "github.com/grafana/pyroscope/pkg/api/connect" "github.com/grafana/pyroscope/pkg/model" "github.com/grafana/pyroscope/pkg/pprof" ) @@ -19,6 +20,7 @@ func (c *phlareClient) pusherClient() pushv1connect.PusherServiceClient { return pushv1connect.NewPusherServiceClient( c.httpClient(), c.URL, + connectapi.DefaultClientOptions()..., ) } diff --git a/ebpf/cmd/playground/main.go b/ebpf/cmd/playground/main.go index 3eb58097d9..8a6556dd0e 100644 --- a/ebpf/cmd/playground/main.go +++ b/ebpf/cmd/playground/main.go @@ -8,7 +8,6 @@ import ( "encoding/json" "flag" "fmt" - "github.com/prometheus/client_golang/prometheus" "os" "strconv" "strings" @@ -16,14 +15,16 @@ import ( "connectrpc.com/connect" "github.com/go-kit/log" + "github.com/go-kit/log/level" "github.com/grafana/pyroscope/ebpf/cpp/demangle" ebpfmetrics "github.com/grafana/pyroscope/ebpf/metrics" "github.com/pkg/errors" + "github.com/prometheus/client_golang/prometheus" + commonconfig "github.com/prometheus/common/config" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/model/relabel" - "github.com/go-kit/log/level" pushv1 "github.com/grafana/pyroscope/api/gen/proto/go/push/v1" "github.com/grafana/pyroscope/api/gen/proto/go/push/v1/pushv1connect" typesv1 "github.com/grafana/pyroscope/api/gen/proto/go/types/v1" @@ -31,7 +32,6 @@ import ( "github.com/grafana/pyroscope/ebpf/pprof" "github.com/grafana/pyroscope/ebpf/sd" "github.com/grafana/pyroscope/ebpf/symtab" - commonconfig "github.com/prometheus/common/config" ) var configFile = flag.String("config", "", "config file path") diff --git a/ebpf/go.mod b/ebpf/go.mod index 225ed97cf7..a78a661d82 100644 --- a/ebpf/go.mod +++ b/ebpf/go.mod @@ -3,7 +3,7 @@ module github.com/grafana/pyroscope/ebpf go 1.21 require ( - connectrpc.com/connect v1.14.0 + connectrpc.com/connect v1.16.2 github.com/avvmoto/buf-readerat v0.0.0-20171115124131-a17c8cb89270 github.com/cespare/xxhash/v2 v2.2.0 github.com/cilium/ebpf v0.11.0 diff --git a/ebpf/go.sum b/ebpf/go.sum index 073aecb30e..01dbf90a0c 100644 --- a/ebpf/go.sum +++ b/ebpf/go.sum @@ -1,5 +1,5 @@ -connectrpc.com/connect v1.14.0 h1:PDS+J7uoz5Oui2VEOMcfz6Qft7opQM9hPiKvtGC01pA= -connectrpc.com/connect v1.14.0/go.mod h1:uoAq5bmhhn43TwhaKdGKN/bZcGtzPW1v+ngDTn5u+8s= +connectrpc.com/connect v1.16.2 h1:ybd6y+ls7GOlb7Bh5C8+ghA6SvCBajHwxssO2CGFjqE= +connectrpc.com/connect v1.16.2/go.mod h1:n2kgwskMHXC+lVqb18wngEpF95ldBHXjZYJussz5FRc= github.com/avvmoto/buf-readerat v0.0.0-20171115124131-a17c8cb89270 h1:JIxGEMs4E5Zb6R7z2C5IgecI0mkqS97WAEF31wUbYTM= github.com/avvmoto/buf-readerat v0.0.0-20171115124131-a17c8cb89270/go.mod h1:2XtVRGCw/HthOLxU0Qw6o6jSJrcEoOb2OCCl8gQYvGw= github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= diff --git a/go.mod b/go.mod index 17bf4da6f0..f4c34a3183 100644 --- a/go.mod +++ b/go.mod @@ -3,7 +3,7 @@ module github.com/grafana/pyroscope go 1.21 require ( - connectrpc.com/connect v1.14.0 + connectrpc.com/connect v1.16.2 connectrpc.com/grpchealth v1.3.0 github.com/PuerkitoBio/goquery v1.8.1 github.com/aybabtme/rgbterm v0.0.0-20170906152045-cc83f3b3ce59 diff --git a/go.sum b/go.sum index 6b93cfecc6..63f8691f3f 100644 --- a/go.sum +++ b/go.sum @@ -40,8 +40,8 @@ cloud.google.com/go/storage v1.8.0/go.mod h1:Wv1Oy7z6Yz3DshWRJFhqM/UCfaWIRTdp0RX cloud.google.com/go/storage v1.10.0/go.mod h1:FLPqc6j+Ki4BU591ie1oL6qBQGu2Bl/tZ9ullr3+Kg0= cloud.google.com/go/storage v1.36.0 h1:P0mOkAcaJxhCTvAkMhxMfrTKiNcub4YmmPBtlhAyTr8= cloud.google.com/go/storage v1.36.0/go.mod h1:M6M/3V/D3KpzMTJyPOR/HU6n2Si5QdaXYEsng2xgOs8= -connectrpc.com/connect v1.14.0 h1:PDS+J7uoz5Oui2VEOMcfz6Qft7opQM9hPiKvtGC01pA= -connectrpc.com/connect v1.14.0/go.mod h1:uoAq5bmhhn43TwhaKdGKN/bZcGtzPW1v+ngDTn5u+8s= +connectrpc.com/connect v1.16.2 h1:ybd6y+ls7GOlb7Bh5C8+ghA6SvCBajHwxssO2CGFjqE= +connectrpc.com/connect v1.16.2/go.mod h1:n2kgwskMHXC+lVqb18wngEpF95ldBHXjZYJussz5FRc= connectrpc.com/grpchealth v1.3.0 h1:FA3OIwAvuMokQIXQrY5LbIy8IenftksTP/lG4PbYN+E= connectrpc.com/grpchealth v1.3.0/go.mod h1:3vpqmX25/ir0gVgW6RdnCPPZRcR6HvqtXX5RNPmDXHM= dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU= diff --git a/go.work.sum b/go.work.sum index d717f02355..132718b1ae 100644 --- a/go.work.sum +++ b/go.work.sum @@ -481,6 +481,8 @@ cloud.google.com/go/workflows v1.12.3 h1:qocsqETmLAl34mSa01hKZjcqAvt699gaoFbooGG cloud.google.com/go/workflows v1.12.3/go.mod h1:fmOUeeqEwPzIU81foMjTRQIdwQHADi/vEr1cx9R1m5g= cloud.google.com/go/workflows v1.12.4 h1:uHNmUiatTbPQ4H1pabwfzpfEYD4BBnqDHqMm2IesOh4= cloud.google.com/go/workflows v1.12.4/go.mod h1:yQ7HUqOkdJK4duVtMeBCAOPiN1ZF1E9pAMX51vpwB/w= +connectrpc.com/connect v1.16.2 h1:ybd6y+ls7GOlb7Bh5C8+ghA6SvCBajHwxssO2CGFjqE= +connectrpc.com/connect v1.16.2/go.mod h1:n2kgwskMHXC+lVqb18wngEpF95ldBHXjZYJussz5FRc= dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9 h1:VpgP7xuJadIUuKccphEpTJnWhS2jkQyMt6Y7pJCD7fY= github.com/Azure/azure-sdk-for-go/sdk/azcore v1.9.1/go.mod h1:RKUqNu35KJYcVG/fqTRqmuXJZYNhYkBrnC/hX7yGbTA= github.com/Azure/azure-sdk-for-go/sdk/internal v1.5.1/go.mod h1:s4kgfzA0covAXNicZHDMN58jExvcng2mC/DepXiF1EI= @@ -1122,6 +1124,7 @@ golang.org/x/net v0.19.0/go.mod h1:CfAk/cbD4CthTvqiEl8NpboMuiuOYsAr/7NOjZJtv1U= golang.org/x/net v0.20.0/go.mod h1:z8BVo6PvndSri0LbOE3hAn0apkU+1YvI6E70E9jsnvY= golang.org/x/net v0.21.0/go.mod h1:bIjVDfnllIU7BJ2DNgfnXvpSvtn8VRwhlsaeUTyUS44= golang.org/x/net v0.22.0/go.mod h1:JKghWKKOSdJwpW2GEx0Ja7fmaKnMsbu+MWVZTokSYmg= +golang.org/x/net v0.23.0/go.mod h1:JKghWKKOSdJwpW2GEx0Ja7fmaKnMsbu+MWVZTokSYmg= golang.org/x/oauth2 v0.0.0-20200902213428-5d25da1a8d43/go.mod h1:KelEdhl1UZF7XfJ4dDtk6s++YSgaE7mD/BuKKDLBl4A= golang.org/x/oauth2 v0.0.0-20201109201403-9fd604954f58/go.mod h1:KelEdhl1UZF7XfJ4dDtk6s++YSgaE7mD/BuKKDLBl4A= golang.org/x/oauth2 v0.0.0-20201208152858-08078c50e5b5/go.mod h1:KelEdhl1UZF7XfJ4dDtk6s++YSgaE7mD/BuKKDLBl4A= @@ -1317,8 +1320,7 @@ google.golang.org/protobuf v1.28.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqw google.golang.org/protobuf v1.30.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I= google.golang.org/protobuf v1.31.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I= google.golang.org/protobuf v1.32.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos= -google.golang.org/protobuf v1.34.1 h1:9ddQBjfCyZPOHPUiPxpYESBLc+T8P3E+Vo4IbKZgFWg= -google.golang.org/protobuf v1.34.1/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos= +google.golang.org/protobuf v1.33.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos= gopkg.in/errgo.v2 v2.1.0 h1:0vLT13EuvQ0hNvakwLuFZ/jYrLp5F3kcWHXdRggjCE8= gopkg.in/gcfg.v1 v1.2.3 h1:m8OOJ4ccYHnx2f4gQwpno8nAX5OGOh7RLaaz0pj3Ogs= gopkg.in/gcfg.v1 v1.2.3/go.mod h1:yesOnuUOFQAhST5vPY4nbZsb/huCgGGXlipJsBn0b3o= diff --git a/pkg/api/api.go b/pkg/api/api.go index 2dfc2cb1a8..aa48d1f545 100644 --- a/pkg/api/api.go +++ b/pkg/api/api.go @@ -35,6 +35,7 @@ import ( "github.com/grafana/pyroscope/api/gen/proto/go/version/v1/versionv1connect" "github.com/grafana/pyroscope/api/openapiv2" "github.com/grafana/pyroscope/pkg/adhocprofiles" + connectapi "github.com/grafana/pyroscope/pkg/api/connect" "github.com/grafana/pyroscope/pkg/compactor" "github.com/grafana/pyroscope/pkg/distributor" "github.com/grafana/pyroscope/pkg/frontend" @@ -198,7 +199,7 @@ func (a *API) RegisterRuntimeConfig(runtimeConfigHandler http.HandlerFunc, userL } func (a *API) RegisterTenantSettings(ts *settings.TenantSettings) { - settingsv1connect.RegisterSettingsServiceHandler(a.server.HTTP, ts, a.grpcAuthMiddleware, a.recoveryMiddleware) + settingsv1connect.RegisterSettingsServiceHandler(a.server.HTTP, ts, a.connectOptionsAuthRecovery()...) } // RegisterOverridesExporter registers the endpoints associated with the overrides exporter. @@ -214,7 +215,7 @@ func (a *API) RegisterDistributor(d *distributor.Distributor) { pyroscopeHandler := pyroscope.NewPyroscopeIngestHandler(d, a.logger) a.RegisterRoute("/ingest", pyroscopeHandler, true, true, "POST") a.RegisterRoute("/pyroscope/ingest", pyroscopeHandler, true, true, "POST") - pushv1connect.RegisterPusherServiceHandler(a.server.HTTP, d, a.grpcAuthMiddleware, a.recoveryMiddleware) + pushv1connect.RegisterPusherServiceHandler(a.server.HTTP, d, a.connectOptionsAuthRecovery()...) a.RegisterRoute("/distributor/ring", d, false, true, "GET", "POST") a.indexPage.AddLinks(defaultWeight, "Distributor", []IndexPageLink{ {Desc: "Ring status", Path: "/distributor/ring"}, @@ -244,8 +245,8 @@ type QuerierSvc interface { // RegisterQuerier registers the endpoints associated with the querier. func (a *API) RegisterQuerier(svc QuerierSvc) { - querierv1connect.RegisterQuerierServiceHandler(a.server.HTTP, svc, a.grpcAuthMiddleware, a.grpcLogMiddleware, a.recoveryMiddleware) - vcsv1connect.RegisterVCSServiceHandler(a.server.HTTP, svc, a.grpcAuthMiddleware, a.grpcLogMiddleware, a.recoveryMiddleware) + querierv1connect.RegisterQuerierServiceHandler(a.server.HTTP, svc, a.connectOptionsAuthLogRecovery()...) + vcsv1connect.RegisterVCSServiceHandler(a.server.HTTP, svc, a.connectOptionsAuthLogRecovery()...) } func (a *API) RegisterPyroscopeHandlers(client querierv1connect.QuerierServiceClient) { @@ -257,11 +258,11 @@ func (a *API) RegisterPyroscopeHandlers(client querierv1connect.QuerierServiceCl // RegisterIngester registers the endpoints associated with the ingester. func (a *API) RegisterIngester(svc *ingester.Ingester) { - ingesterv1connect.RegisterIngesterServiceHandler(a.server.HTTP, svc, a.grpcAuthMiddleware, a.recoveryMiddleware) + ingesterv1connect.RegisterIngesterServiceHandler(a.server.HTTP, svc, a.connectOptionsAuthRecovery()...) } func (a *API) RegisterStoreGateway(svc *storegateway.StoreGateway) { - storegatewayv1connect.RegisterStoreGatewayServiceHandler(a.server.HTTP, svc, a.grpcAuthMiddleware, a.recoveryMiddleware) + storegatewayv1connect.RegisterStoreGatewayServiceHandler(a.server.HTTP, svc, a.connectOptionsAuthRecovery()...) a.indexPage.AddLinks(defaultWeight, "Store-gateway", []IndexPageLink{ {Desc: "Ring status", Path: "/store-gateway/ring"}, @@ -282,18 +283,18 @@ func (a *API) RegisterCompactor(c *compactor.MultitenantCompactor) { // RegisterQueryFrontend registers the endpoints associated with the query frontend. func (a *API) RegisterQueryFrontend(frontendSvc *frontend.Frontend) { - frontendpbconnect.RegisterFrontendForQuerierHandler(a.server.HTTP, frontendSvc, a.grpcAuthMiddleware, a.recoveryMiddleware) + frontendpbconnect.RegisterFrontendForQuerierHandler(a.server.HTTP, frontendSvc, a.connectOptionsAuthRecovery()...) } // RegisterVersion registers the endpoints associated with the versions service. func (a *API) RegisterVersion(svc versionv1connect.VersionHandler) { - versionv1connect.RegisterVersionHandler(a.server.HTTP, svc, a.recoveryMiddleware) + versionv1connect.RegisterVersionHandler(a.server.HTTP, svc, a.connectOptionsRecovery()...) } // RegisterQueryScheduler registers the endpoints associated with the query scheduler. func (a *API) RegisterQueryScheduler(s *scheduler.Scheduler) { - schedulerpbconnect.RegisterSchedulerForFrontendHandler(a.server.HTTP, s, a.recoveryMiddleware) - schedulerpbconnect.RegisterSchedulerForQuerierHandler(a.server.HTTP, s, a.recoveryMiddleware) + schedulerpbconnect.RegisterSchedulerForFrontendHandler(a.server.HTTP, s, a.connectOptionsRecovery()...) + schedulerpbconnect.RegisterSchedulerForQuerierHandler(a.server.HTTP, s, a.connectOptionsRecovery()...) } // RegisterFlags registers api-related flags. @@ -312,5 +313,17 @@ func (a *API) RegisterAdmin(ad *operations.Admin) { } func (a *API) RegisterAdHocProfiles(ahp *adhocprofiles.AdHocProfiles) { - adhocprofilesv1connect.RegisterAdHocProfileServiceHandler(a.server.HTTP, ahp, a.grpcAuthMiddleware, a.recoveryMiddleware) + adhocprofilesv1connect.RegisterAdHocProfileServiceHandler(a.server.HTTP, ahp, a.connectOptionsAuthRecovery()...) +} + +func (a *API) connectOptionsRecovery() []connect.HandlerOption { + return append(connectapi.DefaultHandlerOptions(), a.recoveryMiddleware) +} + +func (a *API) connectOptionsAuthRecovery() []connect.HandlerOption { + return append(connectapi.DefaultHandlerOptions(), []connect.HandlerOption{a.grpcAuthMiddleware, a.recoveryMiddleware}...) +} + +func (a *API) connectOptionsAuthLogRecovery() []connect.HandlerOption { + return append(connectapi.DefaultHandlerOptions(), []connect.HandlerOption{a.grpcAuthMiddleware, a.grpcLogMiddleware, a.recoveryMiddleware}...) } diff --git a/pkg/api/connect/codec.go b/pkg/api/connect/codec.go new file mode 100644 index 0000000000..7e313a3095 --- /dev/null +++ b/pkg/api/connect/codec.go @@ -0,0 +1,99 @@ +package connectapi + +import ( + "fmt" + + "connectrpc.com/connect" + "google.golang.org/protobuf/proto" +) + +// Name is the name registered for the proto compressor. +const Name = "proto" + +var ProtoCodec connect.Codec = vtprotoCodec{} + +type vtprotoCodec struct{} + +// growcap scales up the capacity of a slice. +// +// Given a slice with a current capacity of oldcap and a desired +// capacity of wantcap, growcap returns a new capacity >= wantcap. +// +// The algorithm is mostly identical to the one used by append as of Go 1.14. +func growcap(oldcap, wantcap int) (newcap int) { + if wantcap > oldcap*2 { + newcap = wantcap + } else if oldcap < 1024 { + // The Go 1.14 runtime takes this case when len(s) < 1024, + // not when cap(s) < 1024. The difference doesn't seem + // significant here. + newcap = oldcap * 2 + } else { + newcap = oldcap + for 0 < newcap && newcap < wantcap { + newcap += newcap / 4 + } + if newcap <= 0 { + newcap = wantcap + } + } + return newcap +} + +type vtprotoMessage interface { + MarshalVT() ([]byte, error) + MarshalToSizedBufferVT([]byte) (int, error) + SizeVT() (n int) + UnmarshalVT([]byte) error +} + +func (vtprotoCodec) Marshal(v any) ([]byte, error) { + switch v := v.(type) { + case vtprotoMessage: + return v.MarshalVT() + case proto.Message: + return proto.Marshal(v) + default: + return nil, fmt.Errorf("failed to marshal, message is %T, must satisfy the vtprotoMessage interface or want proto.Message", v) + } +} + +func (vtprotoCodec) MarshalAppend(data []byte, v any) ([]byte, error) { + switch v := v.(type) { + case vtprotoMessage: + if v == nil { + return data, nil + } + + n := v.SizeVT() + if cap(data) < len(data)+n { + ndata := make([]byte, len(data), growcap(cap(data), len(data)+n)) + copy(ndata, data) + data = ndata + } + _, err := v.MarshalToSizedBufferVT(data[len(data) : len(data)+n]) + if err != nil { + return nil, err + } + return data[:len(data)+n], nil + case proto.Message: + return proto.MarshalOptions{}.MarshalAppend(data, v) + default: + return nil, fmt.Errorf("failed to marshalAppend, message is %T, must satisfy the vtprotoMessage interface or want proto.Message", v) + } +} + +func (vtprotoCodec) Unmarshal(data []byte, v any) error { + switch v := v.(type) { + case vtprotoMessage: + return v.UnmarshalVT(data) + case proto.Message: + return proto.Unmarshal(data, v) + default: + return fmt.Errorf("failed to unmarshal, message is %T, must satisfy the vtprotoMessage interface or want proto.Message", v) + } +} + +func (vtprotoCodec) Name() string { + return Name +} diff --git a/pkg/api/connect/connect.go b/pkg/api/connect/connect.go new file mode 100644 index 0000000000..aa0399ec37 --- /dev/null +++ b/pkg/api/connect/connect.go @@ -0,0 +1,17 @@ +package connectapi + +import ( + "connectrpc.com/connect" +) + +func DefaultClientOptions() []connect.ClientOption { + return []connect.ClientOption{ + connect.WithCodec(ProtoCodec), + } +} + +func DefaultHandlerOptions() []connect.HandlerOption { + return []connect.HandlerOption{ + connect.WithCodec(ProtoCodec), + } +} diff --git a/pkg/clientpool/store_gateway_client_pool.go b/pkg/clientpool/store_gateway_client_pool.go index 617f10afdc..a4a22c6856 100644 --- a/pkg/clientpool/store_gateway_client_pool.go +++ b/pkg/clientpool/store_gateway_client_pool.go @@ -18,7 +18,7 @@ import ( "github.com/grafana/pyroscope/pkg/util" ) -func NeStoreGatewayPool(ring ring.ReadRing, factory ring_client.PoolFactory, clientsMetric prometheus.Gauge, logger log.Logger, options ...connect.ClientOption) *ring_client.Pool { +func NewStoreGatewayPool(ring ring.ReadRing, factory ring_client.PoolFactory, clientsMetric prometheus.Gauge, logger log.Logger, options ...connect.ClientOption) *ring_client.Pool { if factory == nil { factory = newStoreGatewayPoolFactory(options...) } diff --git a/pkg/distributor/distributor.go b/pkg/distributor/distributor.go index b4e834b057..8c1611f80e 100644 --- a/pkg/distributor/distributor.go +++ b/pkg/distributor/distributor.go @@ -34,6 +34,7 @@ import ( googlev1 "github.com/grafana/pyroscope/api/gen/proto/go/google/v1" pushv1 "github.com/grafana/pyroscope/api/gen/proto/go/push/v1" typesv1 "github.com/grafana/pyroscope/api/gen/proto/go/types/v1" + connectapi "github.com/grafana/pyroscope/pkg/api/connect" "github.com/grafana/pyroscope/pkg/clientpool" "github.com/grafana/pyroscope/pkg/distributor/aggregator" distributormodel "github.com/grafana/pyroscope/pkg/distributor/model" @@ -126,6 +127,11 @@ type Limits interface { } func New(cfg Config, ingestersRing ring.ReadRing, factory ring_client.PoolFactory, limits Limits, reg prometheus.Registerer, logger log.Logger, clientsOptions ...connect.ClientOption) (*Distributor, error) { + clientsOptions = append( + connectapi.DefaultClientOptions(), + clientsOptions..., + ) + clients := promauto.With(reg).NewGauge(prometheus.GaugeOpts{ Namespace: "pyroscope", Name: "distributor_ingester_clients", diff --git a/pkg/distributor/distributor_test.go b/pkg/distributor/distributor_test.go index 6fd9ccc5a3..f78464adb7 100644 --- a/pkg/distributor/distributor_test.go +++ b/pkg/distributor/distributor_test.go @@ -38,6 +38,7 @@ import ( pushv1 "github.com/grafana/pyroscope/api/gen/proto/go/push/v1" "github.com/grafana/pyroscope/api/gen/proto/go/push/v1/pushv1connect" typesv1 "github.com/grafana/pyroscope/api/gen/proto/go/types/v1" + connectapi "github.com/grafana/pyroscope/pkg/api/connect" "github.com/grafana/pyroscope/pkg/clientpool" "github.com/grafana/pyroscope/pkg/tenant" "github.com/grafana/pyroscope/pkg/testhelper" @@ -52,6 +53,11 @@ var ringConfig = util.CommonRingConfig{ ListenPort: 8080, } +var ( + clientOptions = append(connectapi.DefaultClientOptions(), connect.WithInterceptors(tenant.NewAuthInterceptor(true))) + handlerOptions = append(connectapi.DefaultHandlerOptions(), connect.WithInterceptors(tenant.NewAuthInterceptor(true))) +) + type poolFactory struct { f func(addr string) (client.PoolClient, error) } @@ -72,12 +78,11 @@ func Test_ConnectPush(t *testing.T) { }}, newOverrides(t), nil, log.NewLogfmtLogger(os.Stdout)) require.NoError(t, err) - mux.Handle(pushv1connect.NewPusherServiceHandler(d, connect.WithInterceptors(tenant.NewAuthInterceptor(true)))) + mux.Handle(pushv1connect.NewPusherServiceHandler(d, handlerOptions...)) s := httptest.NewServer(mux) defer s.Close() - client := pushv1connect.NewPusherServiceClient(http.DefaultClient, s.URL, connect.WithInterceptors(tenant.NewAuthInterceptor(true))) - + client := pushv1connect.NewPusherServiceClient(http.DefaultClient, s.URL, clientOptions...) resp, err := client.Push(tenant.InjectTenantID(context.Background(), "foo"), connect.NewRequest(&pushv1.PushRequest{ Series: []*pushv1.RawProfileSeries{ { @@ -325,11 +330,11 @@ func Test_Limits(t *testing.T) { } m1 := metricsDump(expectedMetricDelta) - mux.Handle(pushv1connect.NewPusherServiceHandler(d, connect.WithInterceptors(tenant.NewAuthInterceptor(true)))) + mux.Handle(pushv1connect.NewPusherServiceHandler(d, handlerOptions...)) s := httptest.NewServer(mux) defer s.Close() - client := pushv1connect.NewPusherServiceClient(http.DefaultClient, s.URL, connect.WithInterceptors(tenant.NewAuthInterceptor(true))) + client := pushv1connect.NewPusherServiceClient(http.DefaultClient, s.URL, clientOptions...) resp, err := client.Push(tenant.InjectTenantID(context.Background(), "user-1"), connect.NewRequest(tc.pushReq)) require.Error(t, err) require.Equal(t, tc.expectedCode, connect.CodeOf(err)) @@ -676,11 +681,11 @@ func TestBadPushRequest(t *testing.T) { }}, newOverrides(t), nil, log.NewLogfmtLogger(os.Stdout)) require.NoError(t, err) - mux.Handle(pushv1connect.NewPusherServiceHandler(d, connect.WithInterceptors(tenant.NewAuthInterceptor(true)))) + mux.Handle(pushv1connect.NewPusherServiceHandler(d, handlerOptions...)) s := httptest.NewServer(mux) defer s.Close() - client := pushv1connect.NewPusherServiceClient(http.DefaultClient, s.URL, connect.WithInterceptors(tenant.NewAuthInterceptor(true))) + client := pushv1connect.NewPusherServiceClient(http.DefaultClient, s.URL, clientOptions...) _, err = client.Push(tenant.InjectTenantID(context.Background(), "foo"), connect.NewRequest(&pushv1.PushRequest{ Series: []*pushv1.RawProfileSeries{ @@ -757,11 +762,11 @@ func TestPush_ShuffleSharding(t *testing.T) { require.NoError(t, err) mux := http.NewServeMux() - mux.Handle(pushv1connect.NewPusherServiceHandler(d, connect.WithInterceptors(tenant.NewAuthInterceptor(true)))) + mux.Handle(pushv1connect.NewPusherServiceHandler(d, handlerOptions...)) s := httptest.NewServer(mux) defer s.Close() - client := pushv1connect.NewPusherServiceClient(http.DefaultClient, s.URL, connect.WithInterceptors(tenant.NewAuthInterceptor(true))) + client := pushv1connect.NewPusherServiceClient(http.DefaultClient, s.URL, clientOptions...) // Empty profiles are discarded before sending to ingesters. var buf bytes.Buffer diff --git a/pkg/phlaredb/phlaredb_test.go b/pkg/phlaredb/phlaredb_test.go index b60cb3542b..1af6e2d172 100644 --- a/pkg/phlaredb/phlaredb_test.go +++ b/pkg/phlaredb/phlaredb_test.go @@ -22,6 +22,7 @@ import ( "github.com/grafana/pyroscope/api/gen/proto/go/ingester/v1/ingesterv1connect" pushv1 "github.com/grafana/pyroscope/api/gen/proto/go/push/v1" typesv1 "github.com/grafana/pyroscope/api/gen/proto/go/types/v1" + connectapi "github.com/grafana/pyroscope/pkg/api/connect" phlaremodel "github.com/grafana/pyroscope/pkg/model" "github.com/grafana/pyroscope/pkg/testhelper" ) @@ -80,13 +81,13 @@ func (f *fakeBidiServerMergeProfilesStacktraces) Receive() (*ingestv1.MergeProfi func (q Queriers) ingesterClient() (ingesterv1connect.IngesterServiceClient, func()) { mux := http.NewServeMux() - mux.Handle(ingesterv1connect.NewIngesterServiceHandler(&ingesterHandlerPhlareDB{q})) + mux.Handle(ingesterv1connect.NewIngesterServiceHandler(&ingesterHandlerPhlareDB{q}, connectapi.DefaultHandlerOptions()...)) serv := testhelper.NewInMemoryServer(mux) var httpClient *http.Client = serv.Client() client := ingesterv1connect.NewIngesterServiceClient( - httpClient, serv.URL(), + httpClient, serv.URL(), connectapi.DefaultClientOptions()..., ) return client, serv.Close diff --git a/pkg/querier/grpc_handler.go b/pkg/querier/grpc_handler.go index 2daade44ad..be32c05282 100644 --- a/pkg/querier/grpc_handler.go +++ b/pkg/querier/grpc_handler.go @@ -5,6 +5,7 @@ import ( "github.com/grafana/pyroscope/api/gen/proto/go/querier/v1/querierv1connect" vcsv1connect "github.com/grafana/pyroscope/api/gen/proto/go/vcs/v1/vcsv1connect" + connectapi "github.com/grafana/pyroscope/pkg/api/connect" "github.com/grafana/pyroscope/pkg/util/connectgrpc" ) @@ -15,7 +16,7 @@ type QuerierSvc interface { func NewGRPCHandler(svc QuerierSvc) connectgrpc.GRPCHandler { mux := http.NewServeMux() - mux.Handle(querierv1connect.NewQuerierServiceHandler(svc)) - mux.Handle(vcsv1connect.NewVCSServiceHandler(svc)) + mux.Handle(querierv1connect.NewQuerierServiceHandler(svc, connectapi.DefaultHandlerOptions()...)) + mux.Handle(vcsv1connect.NewVCSServiceHandler(svc, connectapi.DefaultHandlerOptions()...)) return connectgrpc.NewHandler(mux) } diff --git a/pkg/querier/grpc_roundtripper.go b/pkg/querier/grpc_roundtripper.go index a51f2c3cdd..3b3d1f76eb 100644 --- a/pkg/querier/grpc_roundtripper.go +++ b/pkg/querier/grpc_roundtripper.go @@ -4,6 +4,7 @@ import ( "connectrpc.com/connect" "github.com/grafana/pyroscope/api/gen/proto/go/querier/v1/querierv1connect" + connectapi "github.com/grafana/pyroscope/pkg/api/connect" "github.com/grafana/pyroscope/pkg/util/connectgrpc" ) @@ -11,6 +12,9 @@ func NewGRPCRoundTripper(transport connectgrpc.GRPCRoundTripper) querierv1connec return querierv1connect.NewQuerierServiceClient( connectgrpc.NewClient(transport), "http://httpgrpc", - connect.WithGRPCWeb(), + append( + connectapi.DefaultClientOptions(), + connect.WithGRPCWeb(), + )..., ) } diff --git a/pkg/querier/querier.go b/pkg/querier/querier.go index 9aa54f294a..0dee90a9fd 100644 --- a/pkg/querier/querier.go +++ b/pkg/querier/querier.go @@ -32,6 +32,7 @@ import ( querierv1 "github.com/grafana/pyroscope/api/gen/proto/go/querier/v1" typesv1 "github.com/grafana/pyroscope/api/gen/proto/go/types/v1" "github.com/grafana/pyroscope/api/gen/proto/go/vcs/v1/vcsv1connect" + connectapi "github.com/grafana/pyroscope/pkg/api/connect" "github.com/grafana/pyroscope/pkg/clientpool" "github.com/grafana/pyroscope/pkg/iter" phlaremodel "github.com/grafana/pyroscope/pkg/model" @@ -100,6 +101,8 @@ type NewQuerierParams struct { } func New(params *NewQuerierParams) (*Querier, error) { + params.ClientOptions = append(connectapi.DefaultClientOptions(), params.ClientOptions...) + // disable gzip compression for querier-ingester communication as most of payload are not benefit from it. clientsMetrics := promauto.With(params.Reg).NewGauge(prometheus.GaugeOpts{ Namespace: "pyroscope", diff --git a/pkg/querier/store_gateway_querier.go b/pkg/querier/store_gateway_querier.go index 6a53350980..9bddf24626 100644 --- a/pkg/querier/store_gateway_querier.go +++ b/pkg/querier/store_gateway_querier.go @@ -85,7 +85,7 @@ func newStoreGatewayQuerier( Help: "The current number of store-gateway clients in the pool.", ConstLabels: map[string]string{"client": "querier"}, }) - pool := clientpool.NeStoreGatewayPool(storesRing, factory, clientsMetrics, logger, clientsOptions...) + pool := clientpool.NewStoreGatewayPool(storesRing, factory, clientsMetrics, logger, clientsOptions...) s := &StoreGatewayQuerier{ ring: storesRing, diff --git a/pkg/test/integration/cluster/cluster.go b/pkg/test/integration/cluster/cluster.go index 0edacbaa0c..744083270b 100644 --- a/pkg/test/integration/cluster/cluster.go +++ b/pkg/test/integration/cluster/cluster.go @@ -23,6 +23,7 @@ import ( "github.com/grafana/pyroscope/api/gen/proto/go/push/v1/pushv1connect" "github.com/grafana/pyroscope/api/gen/proto/go/querier/v1/querierv1connect" + connectapi "github.com/grafana/pyroscope/pkg/api/connect" "github.com/grafana/pyroscope/pkg/cfg" "github.com/grafana/pyroscope/pkg/phlare" ) @@ -337,6 +338,7 @@ func (c *Cluster) QueryClient() querierv1connect.QuerierServiceClient { return querierv1connect.NewQuerierServiceClient( c.httpClient, "http://querier", + connectapi.DefaultClientOptions()..., ) } @@ -344,6 +346,7 @@ func (c *Cluster) PushClient() pushv1connect.PusherServiceClient { return pushv1connect.NewPusherServiceClient( c.httpClient, "http://push", + connectapi.DefaultClientOptions()..., ) } diff --git a/pkg/test/integration/helper.go b/pkg/test/integration/helper.go index 6de5a83071..5d22c00a6a 100644 --- a/pkg/test/integration/helper.go +++ b/pkg/test/integration/helper.go @@ -29,6 +29,7 @@ import ( querierv1 "github.com/grafana/pyroscope/api/gen/proto/go/querier/v1" "github.com/grafana/pyroscope/api/gen/proto/go/querier/v1/querierv1connect" typesv1 "github.com/grafana/pyroscope/api/gen/proto/go/types/v1" + connectapi "github.com/grafana/pyroscope/pkg/api/connect" "github.com/grafana/pyroscope/pkg/cfg" "github.com/grafana/pyroscope/pkg/og/structs/flamebearer" "github.com/grafana/pyroscope/pkg/phlare" @@ -340,6 +341,7 @@ func (b *RequestBuilder) QueryClient() querierv1connect.QuerierServiceClient { return querierv1connect.NewQuerierServiceClient( http.DefaultClient, b.url, + connectapi.DefaultClientOptions()..., ) } @@ -347,6 +349,7 @@ func (b *RequestBuilder) PushClient() pushv1connect.PusherServiceClient { return pushv1connect.NewPusherServiceClient( http.DefaultClient, b.url, + connectapi.DefaultClientOptions()..., ) }