diff --git a/api/gen/proto/go/ingester/v1/ingester.pb.go b/api/gen/proto/go/ingester/v1/ingester.pb.go index 92fcd7ab72..571d8b6ded 100644 --- a/api/gen/proto/go/ingester/v1/ingester.pb.go +++ b/api/gen/proto/go/ingester/v1/ingester.pb.go @@ -1884,7 +1884,7 @@ var file_ingester_v1_ingester_proto_rawDesc = []byte{ 0x1c, 0x0a, 0x18, 0x4d, 0x45, 0x52, 0x47, 0x45, 0x5f, 0x46, 0x4f, 0x52, 0x4d, 0x41, 0x54, 0x5f, 0x53, 0x54, 0x41, 0x43, 0x4b, 0x54, 0x52, 0x41, 0x43, 0x45, 0x53, 0x10, 0x01, 0x12, 0x15, 0x0a, 0x11, 0x4d, 0x45, 0x52, 0x47, 0x45, 0x5f, 0x46, 0x4f, 0x52, 0x4d, 0x41, 0x54, 0x5f, 0x54, 0x52, - 0x45, 0x45, 0x10, 0x02, 0x32, 0xdc, 0x07, 0x0a, 0x0f, 0x49, 0x6e, 0x67, 0x65, 0x73, 0x74, 0x65, + 0x45, 0x45, 0x10, 0x02, 0x32, 0xb6, 0x08, 0x0a, 0x0f, 0x49, 0x6e, 0x67, 0x65, 0x73, 0x74, 0x65, 0x72, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x12, 0x35, 0x0a, 0x04, 0x50, 0x75, 0x73, 0x68, 0x12, 0x14, 0x2e, 0x70, 0x75, 0x73, 0x68, 0x2e, 0x76, 0x31, 0x2e, 0x50, 0x75, 0x73, 0x68, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x15, 0x2e, 0x70, 0x75, 0x73, 0x68, 0x2e, 0x76, 0x31, @@ -1946,19 +1946,24 @@ var file_ingester_v1_ingester_proto_rawDesc = []byte{ 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x22, 0x2e, 0x69, 0x6e, 0x67, 0x65, 0x73, 0x74, 0x65, 0x72, 0x2e, 0x76, 0x31, 0x2e, 0x42, 0x6c, 0x6f, 0x63, 0x6b, 0x4d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, - 0x65, 0x22, 0x00, 0x42, 0xb3, 0x01, 0x0a, 0x0f, 0x63, 0x6f, 0x6d, 0x2e, 0x69, 0x6e, 0x67, 0x65, - 0x73, 0x74, 0x65, 0x72, 0x2e, 0x76, 0x31, 0x42, 0x0d, 0x49, 0x6e, 0x67, 0x65, 0x73, 0x74, 0x65, - 0x72, 0x50, 0x72, 0x6f, 0x74, 0x6f, 0x50, 0x01, 0x5a, 0x44, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, - 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x67, 0x72, 0x61, 0x66, 0x61, 0x6e, 0x61, 0x2f, 0x70, 0x79, 0x72, - 0x6f, 0x73, 0x63, 0x6f, 0x70, 0x65, 0x2f, 0x61, 0x70, 0x69, 0x2f, 0x67, 0x65, 0x6e, 0x2f, 0x70, - 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x67, 0x6f, 0x2f, 0x69, 0x6e, 0x67, 0x65, 0x73, 0x74, 0x65, 0x72, - 0x2f, 0x76, 0x31, 0x3b, 0x69, 0x6e, 0x67, 0x65, 0x73, 0x74, 0x65, 0x72, 0x76, 0x31, 0xa2, 0x02, - 0x03, 0x49, 0x58, 0x58, 0xaa, 0x02, 0x0b, 0x49, 0x6e, 0x67, 0x65, 0x73, 0x74, 0x65, 0x72, 0x2e, - 0x56, 0x31, 0xca, 0x02, 0x0b, 0x49, 0x6e, 0x67, 0x65, 0x73, 0x74, 0x65, 0x72, 0x5c, 0x56, 0x31, - 0xe2, 0x02, 0x17, 0x49, 0x6e, 0x67, 0x65, 0x73, 0x74, 0x65, 0x72, 0x5c, 0x56, 0x31, 0x5c, 0x47, - 0x50, 0x42, 0x4d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0xea, 0x02, 0x0c, 0x49, 0x6e, 0x67, - 0x65, 0x73, 0x74, 0x65, 0x72, 0x3a, 0x3a, 0x56, 0x31, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, - 0x33, + 0x65, 0x22, 0x00, 0x12, 0x58, 0x0a, 0x0f, 0x47, 0x65, 0x74, 0x50, 0x72, 0x6f, 0x66, 0x69, 0x6c, + 0x65, 0x53, 0x74, 0x61, 0x74, 0x73, 0x12, 0x20, 0x2e, 0x74, 0x79, 0x70, 0x65, 0x73, 0x2e, 0x76, + 0x31, 0x2e, 0x47, 0x65, 0x74, 0x50, 0x72, 0x6f, 0x66, 0x69, 0x6c, 0x65, 0x53, 0x74, 0x61, 0x74, + 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x21, 0x2e, 0x74, 0x79, 0x70, 0x65, 0x73, + 0x2e, 0x76, 0x31, 0x2e, 0x47, 0x65, 0x74, 0x50, 0x72, 0x6f, 0x66, 0x69, 0x6c, 0x65, 0x53, 0x74, + 0x61, 0x74, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x42, 0xb3, 0x01, + 0x0a, 0x0f, 0x63, 0x6f, 0x6d, 0x2e, 0x69, 0x6e, 0x67, 0x65, 0x73, 0x74, 0x65, 0x72, 0x2e, 0x76, + 0x31, 0x42, 0x0d, 0x49, 0x6e, 0x67, 0x65, 0x73, 0x74, 0x65, 0x72, 0x50, 0x72, 0x6f, 0x74, 0x6f, + 0x50, 0x01, 0x5a, 0x44, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x67, + 0x72, 0x61, 0x66, 0x61, 0x6e, 0x61, 0x2f, 0x70, 0x79, 0x72, 0x6f, 0x73, 0x63, 0x6f, 0x70, 0x65, + 0x2f, 0x61, 0x70, 0x69, 0x2f, 0x67, 0x65, 0x6e, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x67, + 0x6f, 0x2f, 0x69, 0x6e, 0x67, 0x65, 0x73, 0x74, 0x65, 0x72, 0x2f, 0x76, 0x31, 0x3b, 0x69, 0x6e, + 0x67, 0x65, 0x73, 0x74, 0x65, 0x72, 0x76, 0x31, 0xa2, 0x02, 0x03, 0x49, 0x58, 0x58, 0xaa, 0x02, + 0x0b, 0x49, 0x6e, 0x67, 0x65, 0x73, 0x74, 0x65, 0x72, 0x2e, 0x56, 0x31, 0xca, 0x02, 0x0b, 0x49, + 0x6e, 0x67, 0x65, 0x73, 0x74, 0x65, 0x72, 0x5c, 0x56, 0x31, 0xe2, 0x02, 0x17, 0x49, 0x6e, 0x67, + 0x65, 0x73, 0x74, 0x65, 0x72, 0x5c, 0x56, 0x31, 0x5c, 0x47, 0x50, 0x42, 0x4d, 0x65, 0x74, 0x61, + 0x64, 0x61, 0x74, 0x61, 0xea, 0x02, 0x0c, 0x49, 0x6e, 0x67, 0x65, 0x73, 0x74, 0x65, 0x72, 0x3a, + 0x3a, 0x56, 0x31, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( @@ -2013,9 +2018,11 @@ var file_ingester_v1_ingester_proto_goTypes = []interface{}{ (*v11.PushRequest)(nil), // 34: push.v1.PushRequest (*v1.LabelValuesRequest)(nil), // 35: types.v1.LabelValuesRequest (*v1.LabelNamesRequest)(nil), // 36: types.v1.LabelNamesRequest - (*v11.PushResponse)(nil), // 37: push.v1.PushResponse - (*v1.LabelValuesResponse)(nil), // 38: types.v1.LabelValuesResponse - (*v1.LabelNamesResponse)(nil), // 39: types.v1.LabelNamesResponse + (*v1.GetProfileStatsRequest)(nil), // 37: types.v1.GetProfileStatsRequest + (*v11.PushResponse)(nil), // 38: push.v1.PushResponse + (*v1.LabelValuesResponse)(nil), // 39: types.v1.LabelValuesResponse + (*v1.LabelNamesResponse)(nil), // 40: types.v1.LabelNamesResponse + (*v1.GetProfileStatsResponse)(nil), // 41: types.v1.GetProfileStatsResponse } var file_ingester_v1_ingester_proto_depIdxs = []int32{ 27, // 0: ingester.v1.ProfileTypesResponse.profile_types:type_name -> types.v1.ProfileType @@ -2058,19 +2065,21 @@ var file_ingester_v1_ingester_proto_depIdxs = []int32{ 21, // 37: ingester.v1.IngesterService.MergeProfilesPprof:input_type -> ingester.v1.MergeProfilesPprofRequest 12, // 38: ingester.v1.IngesterService.MergeSpanProfile:input_type -> ingester.v1.MergeSpanProfileRequest 23, // 39: ingester.v1.IngesterService.BlockMetadata:input_type -> ingester.v1.BlockMetadataRequest - 37, // 40: ingester.v1.IngesterService.Push:output_type -> push.v1.PushResponse - 38, // 41: ingester.v1.IngesterService.LabelValues:output_type -> types.v1.LabelValuesResponse - 39, // 42: ingester.v1.IngesterService.LabelNames:output_type -> types.v1.LabelNamesResponse - 2, // 43: ingester.v1.IngesterService.ProfileTypes:output_type -> ingester.v1.ProfileTypesResponse - 4, // 44: ingester.v1.IngesterService.Series:output_type -> ingester.v1.SeriesResponse - 6, // 45: ingester.v1.IngesterService.Flush:output_type -> ingester.v1.FlushResponse - 10, // 46: ingester.v1.IngesterService.MergeProfilesStacktraces:output_type -> ingester.v1.MergeProfilesStacktracesResponse - 20, // 47: ingester.v1.IngesterService.MergeProfilesLabels:output_type -> ingester.v1.MergeProfilesLabelsResponse - 22, // 48: ingester.v1.IngesterService.MergeProfilesPprof:output_type -> ingester.v1.MergeProfilesPprofResponse - 13, // 49: ingester.v1.IngesterService.MergeSpanProfile:output_type -> ingester.v1.MergeSpanProfileResponse - 24, // 50: ingester.v1.IngesterService.BlockMetadata:output_type -> ingester.v1.BlockMetadataResponse - 40, // [40:51] is the sub-list for method output_type - 29, // [29:40] is the sub-list for method input_type + 37, // 40: ingester.v1.IngesterService.GetProfileStats:input_type -> types.v1.GetProfileStatsRequest + 38, // 41: ingester.v1.IngesterService.Push:output_type -> push.v1.PushResponse + 39, // 42: ingester.v1.IngesterService.LabelValues:output_type -> types.v1.LabelValuesResponse + 40, // 43: ingester.v1.IngesterService.LabelNames:output_type -> types.v1.LabelNamesResponse + 2, // 44: ingester.v1.IngesterService.ProfileTypes:output_type -> ingester.v1.ProfileTypesResponse + 4, // 45: ingester.v1.IngesterService.Series:output_type -> ingester.v1.SeriesResponse + 6, // 46: ingester.v1.IngesterService.Flush:output_type -> ingester.v1.FlushResponse + 10, // 47: ingester.v1.IngesterService.MergeProfilesStacktraces:output_type -> ingester.v1.MergeProfilesStacktracesResponse + 20, // 48: ingester.v1.IngesterService.MergeProfilesLabels:output_type -> ingester.v1.MergeProfilesLabelsResponse + 22, // 49: ingester.v1.IngesterService.MergeProfilesPprof:output_type -> ingester.v1.MergeProfilesPprofResponse + 13, // 50: ingester.v1.IngesterService.MergeSpanProfile:output_type -> ingester.v1.MergeSpanProfileResponse + 24, // 51: ingester.v1.IngesterService.BlockMetadata:output_type -> ingester.v1.BlockMetadataResponse + 41, // 52: ingester.v1.IngesterService.GetProfileStats:output_type -> types.v1.GetProfileStatsResponse + 41, // [41:53] is the sub-list for method output_type + 29, // [29:41] is the sub-list for method input_type 29, // [29:29] is the sub-list for extension type_name 29, // [29:29] is the sub-list for extension extendee 0, // [0:29] is the sub-list for field type_name diff --git a/api/gen/proto/go/ingester/v1/ingester_vtproto.pb.go b/api/gen/proto/go/ingester/v1/ingester_vtproto.pb.go index c48b8c33ea..e65538cb4a 100644 --- a/api/gen/proto/go/ingester/v1/ingester_vtproto.pb.go +++ b/api/gen/proto/go/ingester/v1/ingester_vtproto.pb.go @@ -1571,6 +1571,8 @@ type IngesterServiceClient interface { MergeProfilesPprof(ctx context.Context, opts ...grpc.CallOption) (IngesterService_MergeProfilesPprofClient, error) MergeSpanProfile(ctx context.Context, opts ...grpc.CallOption) (IngesterService_MergeSpanProfileClient, error) BlockMetadata(ctx context.Context, in *BlockMetadataRequest, opts ...grpc.CallOption) (*BlockMetadataResponse, error) + // GetProfileStats returns profile stats for the current tenant. + GetProfileStats(ctx context.Context, in *v1.GetProfileStatsRequest, opts ...grpc.CallOption) (*v1.GetProfileStatsResponse, error) } type ingesterServiceClient struct { @@ -1768,6 +1770,15 @@ func (c *ingesterServiceClient) BlockMetadata(ctx context.Context, in *BlockMeta return out, nil } +func (c *ingesterServiceClient) GetProfileStats(ctx context.Context, in *v1.GetProfileStatsRequest, opts ...grpc.CallOption) (*v1.GetProfileStatsResponse, error) { + out := new(v1.GetProfileStatsResponse) + err := c.cc.Invoke(ctx, "/ingester.v1.IngesterService/GetProfileStats", in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + // IngesterServiceServer is the server API for IngesterService service. // All implementations must embed UnimplementedIngesterServiceServer // for forward compatibility @@ -1785,6 +1796,8 @@ type IngesterServiceServer interface { MergeProfilesPprof(IngesterService_MergeProfilesPprofServer) error MergeSpanProfile(IngesterService_MergeSpanProfileServer) error BlockMetadata(context.Context, *BlockMetadataRequest) (*BlockMetadataResponse, error) + // GetProfileStats returns profile stats for the current tenant. + GetProfileStats(context.Context, *v1.GetProfileStatsRequest) (*v1.GetProfileStatsResponse, error) mustEmbedUnimplementedIngesterServiceServer() } @@ -1825,6 +1838,9 @@ func (UnimplementedIngesterServiceServer) MergeSpanProfile(IngesterService_Merge func (UnimplementedIngesterServiceServer) BlockMetadata(context.Context, *BlockMetadataRequest) (*BlockMetadataResponse, error) { return nil, status.Errorf(codes.Unimplemented, "method BlockMetadata not implemented") } +func (UnimplementedIngesterServiceServer) GetProfileStats(context.Context, *v1.GetProfileStatsRequest) (*v1.GetProfileStatsResponse, error) { + return nil, status.Errorf(codes.Unimplemented, "method GetProfileStats not implemented") +} func (UnimplementedIngesterServiceServer) mustEmbedUnimplementedIngesterServiceServer() {} // UnsafeIngesterServiceServer may be embedded to opt out of forward compatibility for this service. @@ -2068,6 +2084,24 @@ func _IngesterService_BlockMetadata_Handler(srv interface{}, ctx context.Context return interceptor(ctx, in, info, handler) } +func _IngesterService_GetProfileStats_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(v1.GetProfileStatsRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(IngesterServiceServer).GetProfileStats(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/ingester.v1.IngesterService/GetProfileStats", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(IngesterServiceServer).GetProfileStats(ctx, req.(*v1.GetProfileStatsRequest)) + } + return interceptor(ctx, in, info, handler) +} + // IngesterService_ServiceDesc is the grpc.ServiceDesc for IngesterService service. // It's only intended for direct use with grpc.RegisterService, // and not to be introspected or modified (even as a copy) @@ -2103,6 +2137,10 @@ var IngesterService_ServiceDesc = grpc.ServiceDesc{ MethodName: "BlockMetadata", Handler: _IngesterService_BlockMetadata_Handler, }, + { + MethodName: "GetProfileStats", + Handler: _IngesterService_GetProfileStats_Handler, + }, }, Streams: []grpc.StreamDesc{ { diff --git a/api/gen/proto/go/ingester/v1/ingesterv1connect/ingester.connect.go b/api/gen/proto/go/ingester/v1/ingesterv1connect/ingester.connect.go index 06d8779d7f..0540374078 100644 --- a/api/gen/proto/go/ingester/v1/ingesterv1connect/ingester.connect.go +++ b/api/gen/proto/go/ingester/v1/ingesterv1connect/ingester.connect.go @@ -65,6 +65,9 @@ const ( // IngesterServiceBlockMetadataProcedure is the fully-qualified name of the IngesterService's // BlockMetadata RPC. IngesterServiceBlockMetadataProcedure = "/ingester.v1.IngesterService/BlockMetadata" + // IngesterServiceGetProfileStatsProcedure is the fully-qualified name of the IngesterService's + // GetProfileStats RPC. + IngesterServiceGetProfileStatsProcedure = "/ingester.v1.IngesterService/GetProfileStats" ) // These variables are the protoreflect.Descriptor objects for the RPCs defined in this package. @@ -81,6 +84,7 @@ var ( ingesterServiceMergeProfilesPprofMethodDescriptor = ingesterServiceServiceDescriptor.Methods().ByName("MergeProfilesPprof") ingesterServiceMergeSpanProfileMethodDescriptor = ingesterServiceServiceDescriptor.Methods().ByName("MergeSpanProfile") ingesterServiceBlockMetadataMethodDescriptor = ingesterServiceServiceDescriptor.Methods().ByName("BlockMetadata") + ingesterServiceGetProfileStatsMethodDescriptor = ingesterServiceServiceDescriptor.Methods().ByName("GetProfileStats") ) // IngesterServiceClient is a client for the ingester.v1.IngesterService service. @@ -98,6 +102,8 @@ type IngesterServiceClient interface { MergeProfilesPprof(context.Context) *connect.BidiStreamForClient[v1.MergeProfilesPprofRequest, v1.MergeProfilesPprofResponse] MergeSpanProfile(context.Context) *connect.BidiStreamForClient[v1.MergeSpanProfileRequest, v1.MergeSpanProfileResponse] BlockMetadata(context.Context, *connect.Request[v1.BlockMetadataRequest]) (*connect.Response[v1.BlockMetadataResponse], error) + // GetProfileStats returns profile stats for the current tenant. + GetProfileStats(context.Context, *connect.Request[v12.GetProfileStatsRequest]) (*connect.Response[v12.GetProfileStatsResponse], error) } // NewIngesterServiceClient constructs a client for the ingester.v1.IngesterService service. By @@ -176,6 +182,12 @@ func NewIngesterServiceClient(httpClient connect.HTTPClient, baseURL string, opt connect.WithSchema(ingesterServiceBlockMetadataMethodDescriptor), connect.WithClientOptions(opts...), ), + getProfileStats: connect.NewClient[v12.GetProfileStatsRequest, v12.GetProfileStatsResponse]( + httpClient, + baseURL+IngesterServiceGetProfileStatsProcedure, + connect.WithSchema(ingesterServiceGetProfileStatsMethodDescriptor), + connect.WithClientOptions(opts...), + ), } } @@ -192,6 +204,7 @@ type ingesterServiceClient struct { mergeProfilesPprof *connect.Client[v1.MergeProfilesPprofRequest, v1.MergeProfilesPprofResponse] mergeSpanProfile *connect.Client[v1.MergeSpanProfileRequest, v1.MergeSpanProfileResponse] blockMetadata *connect.Client[v1.BlockMetadataRequest, v1.BlockMetadataResponse] + getProfileStats *connect.Client[v12.GetProfileStatsRequest, v12.GetProfileStatsResponse] } // Push calls ingester.v1.IngesterService.Push. @@ -249,6 +262,11 @@ func (c *ingesterServiceClient) BlockMetadata(ctx context.Context, req *connect. return c.blockMetadata.CallUnary(ctx, req) } +// GetProfileStats calls ingester.v1.IngesterService.GetProfileStats. +func (c *ingesterServiceClient) GetProfileStats(ctx context.Context, req *connect.Request[v12.GetProfileStatsRequest]) (*connect.Response[v12.GetProfileStatsResponse], error) { + return c.getProfileStats.CallUnary(ctx, req) +} + // IngesterServiceHandler is an implementation of the ingester.v1.IngesterService service. type IngesterServiceHandler interface { Push(context.Context, *connect.Request[v11.PushRequest]) (*connect.Response[v11.PushResponse], error) @@ -264,6 +282,8 @@ type IngesterServiceHandler interface { MergeProfilesPprof(context.Context, *connect.BidiStream[v1.MergeProfilesPprofRequest, v1.MergeProfilesPprofResponse]) error MergeSpanProfile(context.Context, *connect.BidiStream[v1.MergeSpanProfileRequest, v1.MergeSpanProfileResponse]) error BlockMetadata(context.Context, *connect.Request[v1.BlockMetadataRequest]) (*connect.Response[v1.BlockMetadataResponse], error) + // GetProfileStats returns profile stats for the current tenant. + GetProfileStats(context.Context, *connect.Request[v12.GetProfileStatsRequest]) (*connect.Response[v12.GetProfileStatsResponse], error) } // NewIngesterServiceHandler builds an HTTP handler from the service implementation. It returns the @@ -338,6 +358,12 @@ func NewIngesterServiceHandler(svc IngesterServiceHandler, opts ...connect.Handl connect.WithSchema(ingesterServiceBlockMetadataMethodDescriptor), connect.WithHandlerOptions(opts...), ) + ingesterServiceGetProfileStatsHandler := connect.NewUnaryHandler( + IngesterServiceGetProfileStatsProcedure, + svc.GetProfileStats, + connect.WithSchema(ingesterServiceGetProfileStatsMethodDescriptor), + connect.WithHandlerOptions(opts...), + ) return "/ingester.v1.IngesterService/", http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { switch r.URL.Path { case IngesterServicePushProcedure: @@ -362,6 +388,8 @@ func NewIngesterServiceHandler(svc IngesterServiceHandler, opts ...connect.Handl ingesterServiceMergeSpanProfileHandler.ServeHTTP(w, r) case IngesterServiceBlockMetadataProcedure: ingesterServiceBlockMetadataHandler.ServeHTTP(w, r) + case IngesterServiceGetProfileStatsProcedure: + ingesterServiceGetProfileStatsHandler.ServeHTTP(w, r) default: http.NotFound(w, r) } @@ -414,3 +442,7 @@ func (UnimplementedIngesterServiceHandler) MergeSpanProfile(context.Context, *co func (UnimplementedIngesterServiceHandler) BlockMetadata(context.Context, *connect.Request[v1.BlockMetadataRequest]) (*connect.Response[v1.BlockMetadataResponse], error) { return nil, connect.NewError(connect.CodeUnimplemented, errors.New("ingester.v1.IngesterService.BlockMetadata is not implemented")) } + +func (UnimplementedIngesterServiceHandler) GetProfileStats(context.Context, *connect.Request[v12.GetProfileStatsRequest]) (*connect.Response[v12.GetProfileStatsResponse], error) { + return nil, connect.NewError(connect.CodeUnimplemented, errors.New("ingester.v1.IngesterService.GetProfileStats is not implemented")) +} diff --git a/api/gen/proto/go/ingester/v1/ingesterv1connect/ingester.connect.mux.go b/api/gen/proto/go/ingester/v1/ingesterv1connect/ingester.connect.mux.go index 7a2444b750..96ac649a6f 100644 --- a/api/gen/proto/go/ingester/v1/ingesterv1connect/ingester.connect.mux.go +++ b/api/gen/proto/go/ingester/v1/ingesterv1connect/ingester.connect.mux.go @@ -74,4 +74,9 @@ func RegisterIngesterServiceHandler(mux *mux.Router, svc IngesterServiceHandler, svc.BlockMetadata, opts..., )) + mux.Handle("/ingester.v1.IngesterService/GetProfileStats", connect.NewUnaryHandler( + "/ingester.v1.IngesterService/GetProfileStats", + svc.GetProfileStats, + opts..., + )) } diff --git a/api/gen/proto/go/querier/v1/querier.pb.go b/api/gen/proto/go/querier/v1/querier.pb.go index f4df6bf2d2..df58bc561c 100644 --- a/api/gen/proto/go/querier/v1/querier.pb.go +++ b/api/gen/proto/go/querier/v1/querier.pb.go @@ -1219,7 +1219,7 @@ var file_querier_v1_querier_proto_rawDesc = []byte{ 0x69, 0x65, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x28, 0x0a, 0x06, 0x73, 0x65, 0x72, 0x69, 0x65, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x10, 0x2e, 0x74, 0x79, 0x70, 0x65, 0x73, 0x2e, 0x76, 0x31, 0x2e, 0x53, 0x65, 0x72, 0x69, 0x65, 0x73, 0x52, 0x06, 0x73, - 0x65, 0x72, 0x69, 0x65, 0x73, 0x32, 0x8c, 0x06, 0x0a, 0x0e, 0x51, 0x75, 0x65, 0x72, 0x69, 0x65, + 0x65, 0x72, 0x69, 0x65, 0x73, 0x32, 0xe6, 0x06, 0x0a, 0x0e, 0x51, 0x75, 0x65, 0x72, 0x69, 0x65, 0x72, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x12, 0x53, 0x0a, 0x0c, 0x50, 0x72, 0x6f, 0x66, 0x69, 0x6c, 0x65, 0x54, 0x79, 0x70, 0x65, 0x73, 0x12, 0x1f, 0x2e, 0x71, 0x75, 0x65, 0x72, 0x69, 0x65, 0x72, 0x2e, 0x76, 0x31, 0x2e, 0x50, 0x72, 0x6f, 0x66, 0x69, 0x6c, 0x65, 0x54, 0x79, 0x70, @@ -1268,18 +1268,24 @@ var file_querier_v1_querier_proto_rawDesc = []byte{ 0x17, 0x2e, 0x71, 0x75, 0x65, 0x72, 0x69, 0x65, 0x72, 0x2e, 0x76, 0x31, 0x2e, 0x44, 0x69, 0x66, 0x66, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x18, 0x2e, 0x71, 0x75, 0x65, 0x72, 0x69, 0x65, 0x72, 0x2e, 0x76, 0x31, 0x2e, 0x44, 0x69, 0x66, 0x66, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, - 0x73, 0x65, 0x22, 0x00, 0x42, 0xab, 0x01, 0x0a, 0x0e, 0x63, 0x6f, 0x6d, 0x2e, 0x71, 0x75, 0x65, - 0x72, 0x69, 0x65, 0x72, 0x2e, 0x76, 0x31, 0x42, 0x0c, 0x51, 0x75, 0x65, 0x72, 0x69, 0x65, 0x72, - 0x50, 0x72, 0x6f, 0x74, 0x6f, 0x50, 0x01, 0x5a, 0x42, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, - 0x63, 0x6f, 0x6d, 0x2f, 0x67, 0x72, 0x61, 0x66, 0x61, 0x6e, 0x61, 0x2f, 0x70, 0x79, 0x72, 0x6f, - 0x73, 0x63, 0x6f, 0x70, 0x65, 0x2f, 0x61, 0x70, 0x69, 0x2f, 0x67, 0x65, 0x6e, 0x2f, 0x70, 0x72, - 0x6f, 0x74, 0x6f, 0x2f, 0x67, 0x6f, 0x2f, 0x71, 0x75, 0x65, 0x72, 0x69, 0x65, 0x72, 0x2f, 0x76, - 0x31, 0x3b, 0x71, 0x75, 0x65, 0x72, 0x69, 0x65, 0x72, 0x76, 0x31, 0xa2, 0x02, 0x03, 0x51, 0x58, - 0x58, 0xaa, 0x02, 0x0a, 0x51, 0x75, 0x65, 0x72, 0x69, 0x65, 0x72, 0x2e, 0x56, 0x31, 0xca, 0x02, - 0x0a, 0x51, 0x75, 0x65, 0x72, 0x69, 0x65, 0x72, 0x5c, 0x56, 0x31, 0xe2, 0x02, 0x16, 0x51, 0x75, - 0x65, 0x72, 0x69, 0x65, 0x72, 0x5c, 0x56, 0x31, 0x5c, 0x47, 0x50, 0x42, 0x4d, 0x65, 0x74, 0x61, - 0x64, 0x61, 0x74, 0x61, 0xea, 0x02, 0x0b, 0x51, 0x75, 0x65, 0x72, 0x69, 0x65, 0x72, 0x3a, 0x3a, - 0x56, 0x31, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x73, 0x65, 0x22, 0x00, 0x12, 0x58, 0x0a, 0x0f, 0x47, 0x65, 0x74, 0x50, 0x72, 0x6f, 0x66, 0x69, + 0x6c, 0x65, 0x53, 0x74, 0x61, 0x74, 0x73, 0x12, 0x20, 0x2e, 0x74, 0x79, 0x70, 0x65, 0x73, 0x2e, + 0x76, 0x31, 0x2e, 0x47, 0x65, 0x74, 0x50, 0x72, 0x6f, 0x66, 0x69, 0x6c, 0x65, 0x53, 0x74, 0x61, + 0x74, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x21, 0x2e, 0x74, 0x79, 0x70, 0x65, + 0x73, 0x2e, 0x76, 0x31, 0x2e, 0x47, 0x65, 0x74, 0x50, 0x72, 0x6f, 0x66, 0x69, 0x6c, 0x65, 0x53, + 0x74, 0x61, 0x74, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x42, 0xab, + 0x01, 0x0a, 0x0e, 0x63, 0x6f, 0x6d, 0x2e, 0x71, 0x75, 0x65, 0x72, 0x69, 0x65, 0x72, 0x2e, 0x76, + 0x31, 0x42, 0x0c, 0x51, 0x75, 0x65, 0x72, 0x69, 0x65, 0x72, 0x50, 0x72, 0x6f, 0x74, 0x6f, 0x50, + 0x01, 0x5a, 0x42, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x67, 0x72, + 0x61, 0x66, 0x61, 0x6e, 0x61, 0x2f, 0x70, 0x79, 0x72, 0x6f, 0x73, 0x63, 0x6f, 0x70, 0x65, 0x2f, + 0x61, 0x70, 0x69, 0x2f, 0x67, 0x65, 0x6e, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x67, 0x6f, + 0x2f, 0x71, 0x75, 0x65, 0x72, 0x69, 0x65, 0x72, 0x2f, 0x76, 0x31, 0x3b, 0x71, 0x75, 0x65, 0x72, + 0x69, 0x65, 0x72, 0x76, 0x31, 0xa2, 0x02, 0x03, 0x51, 0x58, 0x58, 0xaa, 0x02, 0x0a, 0x51, 0x75, + 0x65, 0x72, 0x69, 0x65, 0x72, 0x2e, 0x56, 0x31, 0xca, 0x02, 0x0a, 0x51, 0x75, 0x65, 0x72, 0x69, + 0x65, 0x72, 0x5c, 0x56, 0x31, 0xe2, 0x02, 0x16, 0x51, 0x75, 0x65, 0x72, 0x69, 0x65, 0x72, 0x5c, + 0x56, 0x31, 0x5c, 0x47, 0x50, 0x42, 0x4d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0xea, 0x02, + 0x0b, 0x51, 0x75, 0x65, 0x72, 0x69, 0x65, 0x72, 0x3a, 0x3a, 0x56, 0x31, 0x62, 0x06, 0x70, 0x72, + 0x6f, 0x74, 0x6f, 0x33, } var ( @@ -1319,9 +1325,11 @@ var file_querier_v1_querier_proto_goTypes = []interface{}{ (*v1.Series)(nil), // 20: types.v1.Series (*v1.LabelValuesRequest)(nil), // 21: types.v1.LabelValuesRequest (*v1.LabelNamesRequest)(nil), // 22: types.v1.LabelNamesRequest - (*v1.LabelValuesResponse)(nil), // 23: types.v1.LabelValuesResponse - (*v1.LabelNamesResponse)(nil), // 24: types.v1.LabelNamesResponse - (*v11.Profile)(nil), // 25: google.v1.Profile + (*v1.GetProfileStatsRequest)(nil), // 23: types.v1.GetProfileStatsRequest + (*v1.LabelValuesResponse)(nil), // 24: types.v1.LabelValuesResponse + (*v1.LabelNamesResponse)(nil), // 25: types.v1.LabelNamesResponse + (*v11.Profile)(nil), // 26: google.v1.Profile + (*v1.GetProfileStatsResponse)(nil), // 27: types.v1.GetProfileStatsResponse } var file_querier_v1_querier_proto_depIdxs = []int32{ 16, // 0: querier.v1.ProfileTypesResponse.profile_types:type_name -> types.v1.ProfileType @@ -1346,17 +1354,19 @@ var file_querier_v1_querier_proto_depIdxs = []int32{ 13, // 19: querier.v1.QuerierService.SelectMergeProfile:input_type -> querier.v1.SelectMergeProfileRequest 14, // 20: querier.v1.QuerierService.SelectSeries:input_type -> querier.v1.SelectSeriesRequest 8, // 21: querier.v1.QuerierService.Diff:input_type -> querier.v1.DiffRequest - 1, // 22: querier.v1.QuerierService.ProfileTypes:output_type -> querier.v1.ProfileTypesResponse - 23, // 23: querier.v1.QuerierService.LabelValues:output_type -> types.v1.LabelValuesResponse - 24, // 24: querier.v1.QuerierService.LabelNames:output_type -> types.v1.LabelNamesResponse - 3, // 25: querier.v1.QuerierService.Series:output_type -> querier.v1.SeriesResponse - 5, // 26: querier.v1.QuerierService.SelectMergeStacktraces:output_type -> querier.v1.SelectMergeStacktracesResponse - 7, // 27: querier.v1.QuerierService.SelectMergeSpanProfile:output_type -> querier.v1.SelectMergeSpanProfileResponse - 25, // 28: querier.v1.QuerierService.SelectMergeProfile:output_type -> google.v1.Profile - 15, // 29: querier.v1.QuerierService.SelectSeries:output_type -> querier.v1.SelectSeriesResponse - 9, // 30: querier.v1.QuerierService.Diff:output_type -> querier.v1.DiffResponse - 22, // [22:31] is the sub-list for method output_type - 13, // [13:22] is the sub-list for method input_type + 23, // 22: querier.v1.QuerierService.GetProfileStats:input_type -> types.v1.GetProfileStatsRequest + 1, // 23: querier.v1.QuerierService.ProfileTypes:output_type -> querier.v1.ProfileTypesResponse + 24, // 24: querier.v1.QuerierService.LabelValues:output_type -> types.v1.LabelValuesResponse + 25, // 25: querier.v1.QuerierService.LabelNames:output_type -> types.v1.LabelNamesResponse + 3, // 26: querier.v1.QuerierService.Series:output_type -> querier.v1.SeriesResponse + 5, // 27: querier.v1.QuerierService.SelectMergeStacktraces:output_type -> querier.v1.SelectMergeStacktracesResponse + 7, // 28: querier.v1.QuerierService.SelectMergeSpanProfile:output_type -> querier.v1.SelectMergeSpanProfileResponse + 26, // 29: querier.v1.QuerierService.SelectMergeProfile:output_type -> google.v1.Profile + 15, // 30: querier.v1.QuerierService.SelectSeries:output_type -> querier.v1.SelectSeriesResponse + 9, // 31: querier.v1.QuerierService.Diff:output_type -> querier.v1.DiffResponse + 27, // 32: querier.v1.QuerierService.GetProfileStats:output_type -> types.v1.GetProfileStatsResponse + 23, // [23:33] is the sub-list for method output_type + 13, // [13:23] is the sub-list for method input_type 13, // [13:13] is the sub-list for extension type_name 13, // [13:13] is the sub-list for extension extendee 0, // [0:13] is the sub-list for field type_name diff --git a/api/gen/proto/go/querier/v1/querier_vtproto.pb.go b/api/gen/proto/go/querier/v1/querier_vtproto.pb.go index 22716ac25f..ce2014cc6b 100644 --- a/api/gen/proto/go/querier/v1/querier_vtproto.pb.go +++ b/api/gen/proto/go/querier/v1/querier_vtproto.pb.go @@ -1001,13 +1001,16 @@ type QuerierServiceClient interface { Series(ctx context.Context, in *SeriesRequest, opts ...grpc.CallOption) (*SeriesResponse, error) // SelectMergeStacktraces returns matching profiles aggregated in a flamegraph format. It will combine samples from within the same callstack, with each element being grouped by its function name. SelectMergeStacktraces(ctx context.Context, in *SelectMergeStacktracesRequest, opts ...grpc.CallOption) (*SelectMergeStacktracesResponse, error) - // SelectMergeSpans returns matching profiles aggregated in a flamegraph format. It will combine samples from within the same callstack, with each element being grouped by its function name. + // SelectMergeSpanProfile returns matching profiles aggregated in a flamegraph format. It will combine samples from within the same callstack, with each element being grouped by its function name. SelectMergeSpanProfile(ctx context.Context, in *SelectMergeSpanProfileRequest, opts ...grpc.CallOption) (*SelectMergeSpanProfileResponse, error) // SelectMergeProfile returns matching profiles aggregated in pprof format. It will contain all information stored (so including filenames and line number, if ingested). SelectMergeProfile(ctx context.Context, in *SelectMergeProfileRequest, opts ...grpc.CallOption) (*v11.Profile, error) // SelectSeries returns a time series for the total sum of the requested profiles. SelectSeries(ctx context.Context, in *SelectSeriesRequest, opts ...grpc.CallOption) (*SelectSeriesResponse, error) + // Diff returns a diff of two profiles Diff(ctx context.Context, in *DiffRequest, opts ...grpc.CallOption) (*DiffResponse, error) + // GetProfileStats returns profile stats for the current tenant. + GetProfileStats(ctx context.Context, in *v1.GetProfileStatsRequest, opts ...grpc.CallOption) (*v1.GetProfileStatsResponse, error) } type querierServiceClient struct { @@ -1099,6 +1102,15 @@ func (c *querierServiceClient) Diff(ctx context.Context, in *DiffRequest, opts . return out, nil } +func (c *querierServiceClient) GetProfileStats(ctx context.Context, in *v1.GetProfileStatsRequest, opts ...grpc.CallOption) (*v1.GetProfileStatsResponse, error) { + out := new(v1.GetProfileStatsResponse) + err := c.cc.Invoke(ctx, "/querier.v1.QuerierService/GetProfileStats", in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + // QuerierServiceServer is the server API for QuerierService service. // All implementations must embed UnimplementedQuerierServiceServer // for forward compatibility @@ -1113,13 +1125,16 @@ type QuerierServiceServer interface { Series(context.Context, *SeriesRequest) (*SeriesResponse, error) // SelectMergeStacktraces returns matching profiles aggregated in a flamegraph format. It will combine samples from within the same callstack, with each element being grouped by its function name. SelectMergeStacktraces(context.Context, *SelectMergeStacktracesRequest) (*SelectMergeStacktracesResponse, error) - // SelectMergeSpans returns matching profiles aggregated in a flamegraph format. It will combine samples from within the same callstack, with each element being grouped by its function name. + // SelectMergeSpanProfile returns matching profiles aggregated in a flamegraph format. It will combine samples from within the same callstack, with each element being grouped by its function name. SelectMergeSpanProfile(context.Context, *SelectMergeSpanProfileRequest) (*SelectMergeSpanProfileResponse, error) // SelectMergeProfile returns matching profiles aggregated in pprof format. It will contain all information stored (so including filenames and line number, if ingested). SelectMergeProfile(context.Context, *SelectMergeProfileRequest) (*v11.Profile, error) // SelectSeries returns a time series for the total sum of the requested profiles. SelectSeries(context.Context, *SelectSeriesRequest) (*SelectSeriesResponse, error) + // Diff returns a diff of two profiles Diff(context.Context, *DiffRequest) (*DiffResponse, error) + // GetProfileStats returns profile stats for the current tenant. + GetProfileStats(context.Context, *v1.GetProfileStatsRequest) (*v1.GetProfileStatsResponse, error) mustEmbedUnimplementedQuerierServiceServer() } @@ -1154,6 +1169,9 @@ func (UnimplementedQuerierServiceServer) SelectSeries(context.Context, *SelectSe func (UnimplementedQuerierServiceServer) Diff(context.Context, *DiffRequest) (*DiffResponse, error) { return nil, status.Errorf(codes.Unimplemented, "method Diff not implemented") } +func (UnimplementedQuerierServiceServer) GetProfileStats(context.Context, *v1.GetProfileStatsRequest) (*v1.GetProfileStatsResponse, error) { + return nil, status.Errorf(codes.Unimplemented, "method GetProfileStats not implemented") +} func (UnimplementedQuerierServiceServer) mustEmbedUnimplementedQuerierServiceServer() {} // UnsafeQuerierServiceServer may be embedded to opt out of forward compatibility for this service. @@ -1329,6 +1347,24 @@ func _QuerierService_Diff_Handler(srv interface{}, ctx context.Context, dec func return interceptor(ctx, in, info, handler) } +func _QuerierService_GetProfileStats_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(v1.GetProfileStatsRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(QuerierServiceServer).GetProfileStats(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/querier.v1.QuerierService/GetProfileStats", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(QuerierServiceServer).GetProfileStats(ctx, req.(*v1.GetProfileStatsRequest)) + } + return interceptor(ctx, in, info, handler) +} + // QuerierService_ServiceDesc is the grpc.ServiceDesc for QuerierService service. // It's only intended for direct use with grpc.RegisterService, // and not to be introspected or modified (even as a copy) @@ -1372,6 +1408,10 @@ var QuerierService_ServiceDesc = grpc.ServiceDesc{ MethodName: "Diff", Handler: _QuerierService_Diff_Handler, }, + { + MethodName: "GetProfileStats", + Handler: _QuerierService_GetProfileStats_Handler, + }, }, Streams: []grpc.StreamDesc{}, Metadata: "querier/v1/querier.proto", diff --git a/api/gen/proto/go/querier/v1/querierv1connect/querier.connect.go b/api/gen/proto/go/querier/v1/querierv1connect/querier.connect.go index f1da998895..6ee89563c0 100644 --- a/api/gen/proto/go/querier/v1/querierv1connect/querier.connect.go +++ b/api/gen/proto/go/querier/v1/querierv1connect/querier.connect.go @@ -60,6 +60,9 @@ const ( QuerierServiceSelectSeriesProcedure = "/querier.v1.QuerierService/SelectSeries" // QuerierServiceDiffProcedure is the fully-qualified name of the QuerierService's Diff RPC. QuerierServiceDiffProcedure = "/querier.v1.QuerierService/Diff" + // QuerierServiceGetProfileStatsProcedure is the fully-qualified name of the QuerierService's + // GetProfileStats RPC. + QuerierServiceGetProfileStatsProcedure = "/querier.v1.QuerierService/GetProfileStats" ) // These variables are the protoreflect.Descriptor objects for the RPCs defined in this package. @@ -74,6 +77,7 @@ var ( querierServiceSelectMergeProfileMethodDescriptor = querierServiceServiceDescriptor.Methods().ByName("SelectMergeProfile") querierServiceSelectSeriesMethodDescriptor = querierServiceServiceDescriptor.Methods().ByName("SelectSeries") querierServiceDiffMethodDescriptor = querierServiceServiceDescriptor.Methods().ByName("Diff") + querierServiceGetProfileStatsMethodDescriptor = querierServiceServiceDescriptor.Methods().ByName("GetProfileStats") ) // QuerierServiceClient is a client for the querier.v1.QuerierService service. @@ -88,13 +92,16 @@ type QuerierServiceClient interface { Series(context.Context, *connect.Request[v1.SeriesRequest]) (*connect.Response[v1.SeriesResponse], error) // SelectMergeStacktraces returns matching profiles aggregated in a flamegraph format. It will combine samples from within the same callstack, with each element being grouped by its function name. SelectMergeStacktraces(context.Context, *connect.Request[v1.SelectMergeStacktracesRequest]) (*connect.Response[v1.SelectMergeStacktracesResponse], error) - // SelectMergeSpans returns matching profiles aggregated in a flamegraph format. It will combine samples from within the same callstack, with each element being grouped by its function name. + // SelectMergeSpanProfile returns matching profiles aggregated in a flamegraph format. It will combine samples from within the same callstack, with each element being grouped by its function name. SelectMergeSpanProfile(context.Context, *connect.Request[v1.SelectMergeSpanProfileRequest]) (*connect.Response[v1.SelectMergeSpanProfileResponse], error) // SelectMergeProfile returns matching profiles aggregated in pprof format. It will contain all information stored (so including filenames and line number, if ingested). SelectMergeProfile(context.Context, *connect.Request[v1.SelectMergeProfileRequest]) (*connect.Response[v12.Profile], error) // SelectSeries returns a time series for the total sum of the requested profiles. SelectSeries(context.Context, *connect.Request[v1.SelectSeriesRequest]) (*connect.Response[v1.SelectSeriesResponse], error) + // Diff returns a diff of two profiles Diff(context.Context, *connect.Request[v1.DiffRequest]) (*connect.Response[v1.DiffResponse], error) + // GetProfileStats returns profile stats for the current tenant. + GetProfileStats(context.Context, *connect.Request[v11.GetProfileStatsRequest]) (*connect.Response[v11.GetProfileStatsResponse], error) } // NewQuerierServiceClient constructs a client for the querier.v1.QuerierService service. By @@ -161,6 +168,12 @@ func NewQuerierServiceClient(httpClient connect.HTTPClient, baseURL string, opts connect.WithSchema(querierServiceDiffMethodDescriptor), connect.WithClientOptions(opts...), ), + getProfileStats: connect.NewClient[v11.GetProfileStatsRequest, v11.GetProfileStatsResponse]( + httpClient, + baseURL+QuerierServiceGetProfileStatsProcedure, + connect.WithSchema(querierServiceGetProfileStatsMethodDescriptor), + connect.WithClientOptions(opts...), + ), } } @@ -175,6 +188,7 @@ type querierServiceClient struct { selectMergeProfile *connect.Client[v1.SelectMergeProfileRequest, v12.Profile] selectSeries *connect.Client[v1.SelectSeriesRequest, v1.SelectSeriesResponse] diff *connect.Client[v1.DiffRequest, v1.DiffResponse] + getProfileStats *connect.Client[v11.GetProfileStatsRequest, v11.GetProfileStatsResponse] } // ProfileTypes calls querier.v1.QuerierService.ProfileTypes. @@ -222,6 +236,11 @@ func (c *querierServiceClient) Diff(ctx context.Context, req *connect.Request[v1 return c.diff.CallUnary(ctx, req) } +// GetProfileStats calls querier.v1.QuerierService.GetProfileStats. +func (c *querierServiceClient) GetProfileStats(ctx context.Context, req *connect.Request[v11.GetProfileStatsRequest]) (*connect.Response[v11.GetProfileStatsResponse], error) { + return c.getProfileStats.CallUnary(ctx, req) +} + // QuerierServiceHandler is an implementation of the querier.v1.QuerierService service. type QuerierServiceHandler interface { // ProfileType returns a list of the existing profile types. @@ -234,13 +253,16 @@ type QuerierServiceHandler interface { Series(context.Context, *connect.Request[v1.SeriesRequest]) (*connect.Response[v1.SeriesResponse], error) // SelectMergeStacktraces returns matching profiles aggregated in a flamegraph format. It will combine samples from within the same callstack, with each element being grouped by its function name. SelectMergeStacktraces(context.Context, *connect.Request[v1.SelectMergeStacktracesRequest]) (*connect.Response[v1.SelectMergeStacktracesResponse], error) - // SelectMergeSpans returns matching profiles aggregated in a flamegraph format. It will combine samples from within the same callstack, with each element being grouped by its function name. + // SelectMergeSpanProfile returns matching profiles aggregated in a flamegraph format. It will combine samples from within the same callstack, with each element being grouped by its function name. SelectMergeSpanProfile(context.Context, *connect.Request[v1.SelectMergeSpanProfileRequest]) (*connect.Response[v1.SelectMergeSpanProfileResponse], error) // SelectMergeProfile returns matching profiles aggregated in pprof format. It will contain all information stored (so including filenames and line number, if ingested). SelectMergeProfile(context.Context, *connect.Request[v1.SelectMergeProfileRequest]) (*connect.Response[v12.Profile], error) // SelectSeries returns a time series for the total sum of the requested profiles. SelectSeries(context.Context, *connect.Request[v1.SelectSeriesRequest]) (*connect.Response[v1.SelectSeriesResponse], error) + // Diff returns a diff of two profiles Diff(context.Context, *connect.Request[v1.DiffRequest]) (*connect.Response[v1.DiffResponse], error) + // GetProfileStats returns profile stats for the current tenant. + GetProfileStats(context.Context, *connect.Request[v11.GetProfileStatsRequest]) (*connect.Response[v11.GetProfileStatsResponse], error) } // NewQuerierServiceHandler builds an HTTP handler from the service implementation. It returns the @@ -303,6 +325,12 @@ func NewQuerierServiceHandler(svc QuerierServiceHandler, opts ...connect.Handler connect.WithSchema(querierServiceDiffMethodDescriptor), connect.WithHandlerOptions(opts...), ) + querierServiceGetProfileStatsHandler := connect.NewUnaryHandler( + QuerierServiceGetProfileStatsProcedure, + svc.GetProfileStats, + connect.WithSchema(querierServiceGetProfileStatsMethodDescriptor), + connect.WithHandlerOptions(opts...), + ) return "/querier.v1.QuerierService/", http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { switch r.URL.Path { case QuerierServiceProfileTypesProcedure: @@ -323,6 +351,8 @@ func NewQuerierServiceHandler(svc QuerierServiceHandler, opts ...connect.Handler querierServiceSelectSeriesHandler.ServeHTTP(w, r) case QuerierServiceDiffProcedure: querierServiceDiffHandler.ServeHTTP(w, r) + case QuerierServiceGetProfileStatsProcedure: + querierServiceGetProfileStatsHandler.ServeHTTP(w, r) default: http.NotFound(w, r) } @@ -367,3 +397,7 @@ func (UnimplementedQuerierServiceHandler) SelectSeries(context.Context, *connect func (UnimplementedQuerierServiceHandler) Diff(context.Context, *connect.Request[v1.DiffRequest]) (*connect.Response[v1.DiffResponse], error) { return nil, connect.NewError(connect.CodeUnimplemented, errors.New("querier.v1.QuerierService.Diff is not implemented")) } + +func (UnimplementedQuerierServiceHandler) GetProfileStats(context.Context, *connect.Request[v11.GetProfileStatsRequest]) (*connect.Response[v11.GetProfileStatsResponse], error) { + return nil, connect.NewError(connect.CodeUnimplemented, errors.New("querier.v1.QuerierService.GetProfileStats is not implemented")) +} diff --git a/api/gen/proto/go/querier/v1/querierv1connect/querier.connect.mux.go b/api/gen/proto/go/querier/v1/querierv1connect/querier.connect.mux.go index 1e442d43cc..f94c88c3c9 100644 --- a/api/gen/proto/go/querier/v1/querierv1connect/querier.connect.mux.go +++ b/api/gen/proto/go/querier/v1/querierv1connect/querier.connect.mux.go @@ -64,4 +64,9 @@ func RegisterQuerierServiceHandler(mux *mux.Router, svc QuerierServiceHandler, o svc.Diff, opts..., )) + mux.Handle("/querier.v1.QuerierService/GetProfileStats", connect.NewUnaryHandler( + "/querier.v1.QuerierService/GetProfileStats", + svc.GetProfileStats, + opts..., + )) } diff --git a/api/gen/proto/go/types/v1/types.pb.go b/api/gen/proto/go/types/v1/types.pb.go index 189376bb8c..31ea12b556 100644 --- a/api/gen/proto/go/types/v1/types.pb.go +++ b/api/gen/proto/go/types/v1/types.pb.go @@ -835,6 +835,110 @@ func (x *Location) GetName() string { return "" } +type GetProfileStatsRequest struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields +} + +func (x *GetProfileStatsRequest) Reset() { + *x = GetProfileStatsRequest{} + if protoimpl.UnsafeEnabled { + mi := &file_types_v1_types_proto_msgTypes[13] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *GetProfileStatsRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*GetProfileStatsRequest) ProtoMessage() {} + +func (x *GetProfileStatsRequest) ProtoReflect() protoreflect.Message { + mi := &file_types_v1_types_proto_msgTypes[13] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use GetProfileStatsRequest.ProtoReflect.Descriptor instead. +func (*GetProfileStatsRequest) Descriptor() ([]byte, []int) { + return file_types_v1_types_proto_rawDescGZIP(), []int{13} +} + +type GetProfileStatsResponse struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + // Whether we received any data at any time in the past. + DataIngested bool `protobuf:"varint,1,opt,name=data_ingested,json=dataIngested,proto3" json:"data_ingested,omitempty"` + // Milliseconds since epoch. + OldestProfileTime int64 `protobuf:"varint,2,opt,name=oldest_profile_time,json=oldestProfileTime,proto3" json:"oldest_profile_time,omitempty"` + // Milliseconds since epoch. + NewestProfileTime int64 `protobuf:"varint,3,opt,name=newest_profile_time,json=newestProfileTime,proto3" json:"newest_profile_time,omitempty"` +} + +func (x *GetProfileStatsResponse) Reset() { + *x = GetProfileStatsResponse{} + if protoimpl.UnsafeEnabled { + mi := &file_types_v1_types_proto_msgTypes[14] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *GetProfileStatsResponse) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*GetProfileStatsResponse) ProtoMessage() {} + +func (x *GetProfileStatsResponse) ProtoReflect() protoreflect.Message { + mi := &file_types_v1_types_proto_msgTypes[14] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use GetProfileStatsResponse.ProtoReflect.Descriptor instead. +func (*GetProfileStatsResponse) Descriptor() ([]byte, []int) { + return file_types_v1_types_proto_rawDescGZIP(), []int{14} +} + +func (x *GetProfileStatsResponse) GetDataIngested() bool { + if x != nil { + return x.DataIngested + } + return false +} + +func (x *GetProfileStatsResponse) GetOldestProfileTime() int64 { + if x != nil { + return x.OldestProfileTime + } + return 0 +} + +func (x *GetProfileStatsResponse) GetNewestProfileTime() int64 { + if x != nil { + return x.NewestProfileTime + } + return 0 +} + var File_types_v1_types_proto protoreflect.FileDescriptor var file_types_v1_types_proto_rawDesc = []byte{ @@ -910,24 +1014,36 @@ var file_types_v1_types_proto_rawDesc = []byte{ 0x73, 0x2e, 0x76, 0x31, 0x2e, 0x4c, 0x6f, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x08, 0x63, 0x61, 0x6c, 0x6c, 0x53, 0x69, 0x74, 0x65, 0x22, 0x1e, 0x0a, 0x08, 0x4c, 0x6f, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x12, 0x12, 0x0a, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, - 0x09, 0x52, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x2a, 0x6b, 0x0a, 0x19, 0x54, 0x69, 0x6d, 0x65, 0x53, - 0x65, 0x72, 0x69, 0x65, 0x73, 0x41, 0x67, 0x67, 0x72, 0x65, 0x67, 0x61, 0x74, 0x69, 0x6f, 0x6e, - 0x54, 0x79, 0x70, 0x65, 0x12, 0x24, 0x0a, 0x20, 0x54, 0x49, 0x4d, 0x45, 0x5f, 0x53, 0x45, 0x52, - 0x49, 0x45, 0x53, 0x5f, 0x41, 0x47, 0x47, 0x52, 0x45, 0x47, 0x41, 0x54, 0x49, 0x4f, 0x4e, 0x5f, - 0x54, 0x59, 0x50, 0x45, 0x5f, 0x53, 0x55, 0x4d, 0x10, 0x00, 0x12, 0x28, 0x0a, 0x24, 0x54, 0x49, - 0x4d, 0x45, 0x5f, 0x53, 0x45, 0x52, 0x49, 0x45, 0x53, 0x5f, 0x41, 0x47, 0x47, 0x52, 0x45, 0x47, - 0x41, 0x54, 0x49, 0x4f, 0x4e, 0x5f, 0x54, 0x59, 0x50, 0x45, 0x5f, 0x41, 0x56, 0x45, 0x52, 0x41, - 0x47, 0x45, 0x10, 0x01, 0x42, 0x9b, 0x01, 0x0a, 0x0c, 0x63, 0x6f, 0x6d, 0x2e, 0x74, 0x79, 0x70, - 0x65, 0x73, 0x2e, 0x76, 0x31, 0x42, 0x0a, 0x54, 0x79, 0x70, 0x65, 0x73, 0x50, 0x72, 0x6f, 0x74, - 0x6f, 0x50, 0x01, 0x5a, 0x3e, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, - 0x67, 0x72, 0x61, 0x66, 0x61, 0x6e, 0x61, 0x2f, 0x70, 0x79, 0x72, 0x6f, 0x73, 0x63, 0x6f, 0x70, - 0x65, 0x2f, 0x61, 0x70, 0x69, 0x2f, 0x67, 0x65, 0x6e, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, - 0x67, 0x6f, 0x2f, 0x74, 0x79, 0x70, 0x65, 0x73, 0x2f, 0x76, 0x31, 0x3b, 0x74, 0x79, 0x70, 0x65, - 0x73, 0x76, 0x31, 0xa2, 0x02, 0x03, 0x54, 0x58, 0x58, 0xaa, 0x02, 0x08, 0x54, 0x79, 0x70, 0x65, - 0x73, 0x2e, 0x56, 0x31, 0xca, 0x02, 0x08, 0x54, 0x79, 0x70, 0x65, 0x73, 0x5c, 0x56, 0x31, 0xe2, - 0x02, 0x14, 0x54, 0x79, 0x70, 0x65, 0x73, 0x5c, 0x56, 0x31, 0x5c, 0x47, 0x50, 0x42, 0x4d, 0x65, - 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0xea, 0x02, 0x09, 0x54, 0x79, 0x70, 0x65, 0x73, 0x3a, 0x3a, - 0x56, 0x31, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x09, 0x52, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x22, 0x18, 0x0a, 0x16, 0x47, 0x65, 0x74, 0x50, 0x72, + 0x6f, 0x66, 0x69, 0x6c, 0x65, 0x53, 0x74, 0x61, 0x74, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, + 0x74, 0x22, 0x9e, 0x01, 0x0a, 0x17, 0x47, 0x65, 0x74, 0x50, 0x72, 0x6f, 0x66, 0x69, 0x6c, 0x65, + 0x53, 0x74, 0x61, 0x74, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x23, 0x0a, + 0x0d, 0x64, 0x61, 0x74, 0x61, 0x5f, 0x69, 0x6e, 0x67, 0x65, 0x73, 0x74, 0x65, 0x64, 0x18, 0x01, + 0x20, 0x01, 0x28, 0x08, 0x52, 0x0c, 0x64, 0x61, 0x74, 0x61, 0x49, 0x6e, 0x67, 0x65, 0x73, 0x74, + 0x65, 0x64, 0x12, 0x2e, 0x0a, 0x13, 0x6f, 0x6c, 0x64, 0x65, 0x73, 0x74, 0x5f, 0x70, 0x72, 0x6f, + 0x66, 0x69, 0x6c, 0x65, 0x5f, 0x74, 0x69, 0x6d, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x03, 0x52, + 0x11, 0x6f, 0x6c, 0x64, 0x65, 0x73, 0x74, 0x50, 0x72, 0x6f, 0x66, 0x69, 0x6c, 0x65, 0x54, 0x69, + 0x6d, 0x65, 0x12, 0x2e, 0x0a, 0x13, 0x6e, 0x65, 0x77, 0x65, 0x73, 0x74, 0x5f, 0x70, 0x72, 0x6f, + 0x66, 0x69, 0x6c, 0x65, 0x5f, 0x74, 0x69, 0x6d, 0x65, 0x18, 0x03, 0x20, 0x01, 0x28, 0x03, 0x52, + 0x11, 0x6e, 0x65, 0x77, 0x65, 0x73, 0x74, 0x50, 0x72, 0x6f, 0x66, 0x69, 0x6c, 0x65, 0x54, 0x69, + 0x6d, 0x65, 0x2a, 0x6b, 0x0a, 0x19, 0x54, 0x69, 0x6d, 0x65, 0x53, 0x65, 0x72, 0x69, 0x65, 0x73, + 0x41, 0x67, 0x67, 0x72, 0x65, 0x67, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x54, 0x79, 0x70, 0x65, 0x12, + 0x24, 0x0a, 0x20, 0x54, 0x49, 0x4d, 0x45, 0x5f, 0x53, 0x45, 0x52, 0x49, 0x45, 0x53, 0x5f, 0x41, + 0x47, 0x47, 0x52, 0x45, 0x47, 0x41, 0x54, 0x49, 0x4f, 0x4e, 0x5f, 0x54, 0x59, 0x50, 0x45, 0x5f, + 0x53, 0x55, 0x4d, 0x10, 0x00, 0x12, 0x28, 0x0a, 0x24, 0x54, 0x49, 0x4d, 0x45, 0x5f, 0x53, 0x45, + 0x52, 0x49, 0x45, 0x53, 0x5f, 0x41, 0x47, 0x47, 0x52, 0x45, 0x47, 0x41, 0x54, 0x49, 0x4f, 0x4e, + 0x5f, 0x54, 0x59, 0x50, 0x45, 0x5f, 0x41, 0x56, 0x45, 0x52, 0x41, 0x47, 0x45, 0x10, 0x01, 0x42, + 0x9b, 0x01, 0x0a, 0x0c, 0x63, 0x6f, 0x6d, 0x2e, 0x74, 0x79, 0x70, 0x65, 0x73, 0x2e, 0x76, 0x31, + 0x42, 0x0a, 0x54, 0x79, 0x70, 0x65, 0x73, 0x50, 0x72, 0x6f, 0x74, 0x6f, 0x50, 0x01, 0x5a, 0x3e, + 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x67, 0x72, 0x61, 0x66, 0x61, + 0x6e, 0x61, 0x2f, 0x70, 0x79, 0x72, 0x6f, 0x73, 0x63, 0x6f, 0x70, 0x65, 0x2f, 0x61, 0x70, 0x69, + 0x2f, 0x67, 0x65, 0x6e, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x67, 0x6f, 0x2f, 0x74, 0x79, + 0x70, 0x65, 0x73, 0x2f, 0x76, 0x31, 0x3b, 0x74, 0x79, 0x70, 0x65, 0x73, 0x76, 0x31, 0xa2, 0x02, + 0x03, 0x54, 0x58, 0x58, 0xaa, 0x02, 0x08, 0x54, 0x79, 0x70, 0x65, 0x73, 0x2e, 0x56, 0x31, 0xca, + 0x02, 0x08, 0x54, 0x79, 0x70, 0x65, 0x73, 0x5c, 0x56, 0x31, 0xe2, 0x02, 0x14, 0x54, 0x79, 0x70, + 0x65, 0x73, 0x5c, 0x56, 0x31, 0x5c, 0x47, 0x50, 0x42, 0x4d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, + 0x61, 0xea, 0x02, 0x09, 0x54, 0x79, 0x70, 0x65, 0x73, 0x3a, 0x3a, 0x56, 0x31, 0x62, 0x06, 0x70, + 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( @@ -943,22 +1059,24 @@ func file_types_v1_types_proto_rawDescGZIP() []byte { } var file_types_v1_types_proto_enumTypes = make([]protoimpl.EnumInfo, 1) -var file_types_v1_types_proto_msgTypes = make([]protoimpl.MessageInfo, 13) +var file_types_v1_types_proto_msgTypes = make([]protoimpl.MessageInfo, 15) var file_types_v1_types_proto_goTypes = []interface{}{ - (TimeSeriesAggregationType)(0), // 0: types.v1.TimeSeriesAggregationType - (*LabelPair)(nil), // 1: types.v1.LabelPair - (*ProfileType)(nil), // 2: types.v1.ProfileType - (*Labels)(nil), // 3: types.v1.Labels - (*Series)(nil), // 4: types.v1.Series - (*Point)(nil), // 5: types.v1.Point - (*LabelValuesRequest)(nil), // 6: types.v1.LabelValuesRequest - (*LabelValuesResponse)(nil), // 7: types.v1.LabelValuesResponse - (*LabelNamesRequest)(nil), // 8: types.v1.LabelNamesRequest - (*LabelNamesResponse)(nil), // 9: types.v1.LabelNamesResponse - (*BlockInfo)(nil), // 10: types.v1.BlockInfo - (*BlockCompaction)(nil), // 11: types.v1.BlockCompaction - (*StackTraceSelector)(nil), // 12: types.v1.StackTraceSelector - (*Location)(nil), // 13: types.v1.Location + (TimeSeriesAggregationType)(0), // 0: types.v1.TimeSeriesAggregationType + (*LabelPair)(nil), // 1: types.v1.LabelPair + (*ProfileType)(nil), // 2: types.v1.ProfileType + (*Labels)(nil), // 3: types.v1.Labels + (*Series)(nil), // 4: types.v1.Series + (*Point)(nil), // 5: types.v1.Point + (*LabelValuesRequest)(nil), // 6: types.v1.LabelValuesRequest + (*LabelValuesResponse)(nil), // 7: types.v1.LabelValuesResponse + (*LabelNamesRequest)(nil), // 8: types.v1.LabelNamesRequest + (*LabelNamesResponse)(nil), // 9: types.v1.LabelNamesResponse + (*BlockInfo)(nil), // 10: types.v1.BlockInfo + (*BlockCompaction)(nil), // 11: types.v1.BlockCompaction + (*StackTraceSelector)(nil), // 12: types.v1.StackTraceSelector + (*Location)(nil), // 13: types.v1.Location + (*GetProfileStatsRequest)(nil), // 14: types.v1.GetProfileStatsRequest + (*GetProfileStatsResponse)(nil), // 15: types.v1.GetProfileStatsResponse } var file_types_v1_types_proto_depIdxs = []int32{ 1, // 0: types.v1.Labels.labels:type_name -> types.v1.LabelPair @@ -1136,6 +1254,30 @@ func file_types_v1_types_proto_init() { return nil } } + file_types_v1_types_proto_msgTypes[13].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*GetProfileStatsRequest); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_types_v1_types_proto_msgTypes[14].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*GetProfileStatsResponse); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } } type x struct{} out := protoimpl.TypeBuilder{ @@ -1143,7 +1285,7 @@ func file_types_v1_types_proto_init() { GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: file_types_v1_types_proto_rawDesc, NumEnums: 1, - NumMessages: 13, + NumMessages: 15, NumExtensions: 0, NumServices: 0, }, diff --git a/api/gen/proto/go/types/v1/types_vtproto.pb.go b/api/gen/proto/go/types/v1/types_vtproto.pb.go index fcf433d240..b2ce869a6a 100644 --- a/api/gen/proto/go/types/v1/types_vtproto.pb.go +++ b/api/gen/proto/go/types/v1/types_vtproto.pb.go @@ -323,6 +323,42 @@ func (m *Location) CloneMessageVT() proto.Message { return m.CloneVT() } +func (m *GetProfileStatsRequest) CloneVT() *GetProfileStatsRequest { + if m == nil { + return (*GetProfileStatsRequest)(nil) + } + r := &GetProfileStatsRequest{} + if len(m.unknownFields) > 0 { + r.unknownFields = make([]byte, len(m.unknownFields)) + copy(r.unknownFields, m.unknownFields) + } + return r +} + +func (m *GetProfileStatsRequest) CloneMessageVT() proto.Message { + return m.CloneVT() +} + +func (m *GetProfileStatsResponse) CloneVT() *GetProfileStatsResponse { + if m == nil { + return (*GetProfileStatsResponse)(nil) + } + r := &GetProfileStatsResponse{ + DataIngested: m.DataIngested, + OldestProfileTime: m.OldestProfileTime, + NewestProfileTime: m.NewestProfileTime, + } + if len(m.unknownFields) > 0 { + r.unknownFields = make([]byte, len(m.unknownFields)) + copy(r.unknownFields, m.unknownFields) + } + return r +} + +func (m *GetProfileStatsResponse) CloneMessageVT() proto.Message { + return m.CloneVT() +} + func (this *LabelPair) EqualVT(that *LabelPair) bool { if this == that { return true @@ -733,6 +769,47 @@ func (this *Location) EqualMessageVT(thatMsg proto.Message) bool { } return this.EqualVT(that) } +func (this *GetProfileStatsRequest) EqualVT(that *GetProfileStatsRequest) bool { + if this == that { + return true + } else if this == nil || that == nil { + return false + } + return string(this.unknownFields) == string(that.unknownFields) +} + +func (this *GetProfileStatsRequest) EqualMessageVT(thatMsg proto.Message) bool { + that, ok := thatMsg.(*GetProfileStatsRequest) + if !ok { + return false + } + return this.EqualVT(that) +} +func (this *GetProfileStatsResponse) EqualVT(that *GetProfileStatsResponse) bool { + if this == that { + return true + } else if this == nil || that == nil { + return false + } + if this.DataIngested != that.DataIngested { + return false + } + if this.OldestProfileTime != that.OldestProfileTime { + return false + } + if this.NewestProfileTime != that.NewestProfileTime { + return false + } + return string(this.unknownFields) == string(that.unknownFields) +} + +func (this *GetProfileStatsResponse) EqualMessageVT(thatMsg proto.Message) bool { + that, ok := thatMsg.(*GetProfileStatsResponse) + if !ok { + return false + } + return this.EqualVT(that) +} func (m *LabelPair) MarshalVT() (dAtA []byte, err error) { if m == nil { return nil, nil @@ -1409,6 +1486,92 @@ func (m *Location) MarshalToSizedBufferVT(dAtA []byte) (int, error) { return len(dAtA) - i, nil } +func (m *GetProfileStatsRequest) MarshalVT() (dAtA []byte, err error) { + if m == nil { + return nil, nil + } + size := m.SizeVT() + dAtA = make([]byte, size) + n, err := m.MarshalToSizedBufferVT(dAtA[:size]) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *GetProfileStatsRequest) MarshalToVT(dAtA []byte) (int, error) { + size := m.SizeVT() + return m.MarshalToSizedBufferVT(dAtA[:size]) +} + +func (m *GetProfileStatsRequest) MarshalToSizedBufferVT(dAtA []byte) (int, error) { + if m == nil { + return 0, nil + } + i := len(dAtA) + _ = i + var l int + _ = l + if m.unknownFields != nil { + i -= len(m.unknownFields) + copy(dAtA[i:], m.unknownFields) + } + return len(dAtA) - i, nil +} + +func (m *GetProfileStatsResponse) MarshalVT() (dAtA []byte, err error) { + if m == nil { + return nil, nil + } + size := m.SizeVT() + dAtA = make([]byte, size) + n, err := m.MarshalToSizedBufferVT(dAtA[:size]) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *GetProfileStatsResponse) MarshalToVT(dAtA []byte) (int, error) { + size := m.SizeVT() + return m.MarshalToSizedBufferVT(dAtA[:size]) +} + +func (m *GetProfileStatsResponse) MarshalToSizedBufferVT(dAtA []byte) (int, error) { + if m == nil { + return 0, nil + } + i := len(dAtA) + _ = i + var l int + _ = l + if m.unknownFields != nil { + i -= len(m.unknownFields) + copy(dAtA[i:], m.unknownFields) + } + if m.NewestProfileTime != 0 { + i = encodeVarint(dAtA, i, uint64(m.NewestProfileTime)) + i-- + dAtA[i] = 0x18 + } + if m.OldestProfileTime != 0 { + i = encodeVarint(dAtA, i, uint64(m.OldestProfileTime)) + i-- + dAtA[i] = 0x10 + } + if m.DataIngested { + i-- + if m.DataIngested { + dAtA[i] = 1 + } else { + dAtA[i] = 0 + } + i-- + dAtA[i] = 0x8 + } + return len(dAtA) - i, nil +} + func encodeVarint(dAtA []byte, offset int, v uint64) int { offset -= sov(v) base := offset @@ -1691,6 +1854,35 @@ func (m *Location) SizeVT() (n int) { return n } +func (m *GetProfileStatsRequest) SizeVT() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + n += len(m.unknownFields) + return n +} + +func (m *GetProfileStatsResponse) SizeVT() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + if m.DataIngested { + n += 2 + } + if m.OldestProfileTime != 0 { + n += 1 + sov(uint64(m.OldestProfileTime)) + } + if m.NewestProfileTime != 0 { + n += 1 + sov(uint64(m.NewestProfileTime)) + } + n += len(m.unknownFields) + return n +} + func sov(x uint64) (n int) { return (bits.Len64(x|1) + 6) / 7 } @@ -3273,6 +3465,166 @@ func (m *Location) UnmarshalVT(dAtA []byte) error { } return nil } +func (m *GetProfileStatsRequest) UnmarshalVT(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflow + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: GetProfileStatsRequest: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: GetProfileStatsRequest: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + default: + iNdEx = preIndex + skippy, err := skip(dAtA[iNdEx:]) + if err != nil { + return err + } + if (skippy < 0) || (iNdEx+skippy) < 0 { + return ErrInvalidLength + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + m.unknownFields = append(m.unknownFields, dAtA[iNdEx:iNdEx+skippy]...) + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func (m *GetProfileStatsResponse) UnmarshalVT(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflow + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: GetProfileStatsResponse: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: GetProfileStatsResponse: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field DataIngested", wireType) + } + var v int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflow + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + v |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + m.DataIngested = bool(v != 0) + case 2: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field OldestProfileTime", wireType) + } + m.OldestProfileTime = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflow + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.OldestProfileTime |= int64(b&0x7F) << shift + if b < 0x80 { + break + } + } + case 3: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field NewestProfileTime", wireType) + } + m.NewestProfileTime = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflow + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.NewestProfileTime |= int64(b&0x7F) << shift + if b < 0x80 { + break + } + } + default: + iNdEx = preIndex + skippy, err := skip(dAtA[iNdEx:]) + if err != nil { + return err + } + if (skippy < 0) || (iNdEx+skippy) < 0 { + return ErrInvalidLength + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + m.unknownFields = append(m.unknownFields, dAtA[iNdEx:iNdEx+skippy]...) + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} func skip(dAtA []byte) (n int, err error) { l := len(dAtA) diff --git a/api/ingester/v1/ingester.proto b/api/ingester/v1/ingester.proto index 752c39b532..b0063fcf76 100644 --- a/api/ingester/v1/ingester.proto +++ b/api/ingester/v1/ingester.proto @@ -20,6 +20,8 @@ service IngesterService { rpc MergeProfilesPprof(stream MergeProfilesPprofRequest) returns (stream MergeProfilesPprofResponse) {} rpc MergeSpanProfile(stream MergeSpanProfileRequest) returns (stream MergeSpanProfileResponse) {} rpc BlockMetadata(BlockMetadataRequest) returns (BlockMetadataResponse) {} + // GetProfileStats returns profile stats for the current tenant. + rpc GetProfileStats(types.v1.GetProfileStatsRequest) returns (types.v1.GetProfileStatsResponse) {} } message ProfileTypesRequest { diff --git a/api/openapiv2/gen/phlare.swagger.json b/api/openapiv2/gen/phlare.swagger.json index 892c42b99f..e2bb139201 100644 --- a/api/openapiv2/gen/phlare.swagger.json +++ b/api/openapiv2/gen/phlare.swagger.json @@ -713,6 +713,25 @@ } } }, + "v1GetProfileStatsResponse": { + "type": "object", + "properties": { + "dataIngested": { + "type": "boolean", + "description": "Whether we received any data at any time in the past." + }, + "oldestProfileTime": { + "type": "string", + "format": "int64", + "description": "Milliseconds since epoch." + }, + "newestProfileTime": { + "type": "string", + "format": "int64", + "description": "Milliseconds since epoch." + } + } + }, "v1GetSettingsResponse": { "type": "object", "properties": { diff --git a/api/querier/v1/querier.proto b/api/querier/v1/querier.proto index 3d63b41835..8a6fca2c9d 100644 --- a/api/querier/v1/querier.proto +++ b/api/querier/v1/querier.proto @@ -16,14 +16,18 @@ service QuerierService { rpc Series(SeriesRequest) returns (SeriesResponse) {} // SelectMergeStacktraces returns matching profiles aggregated in a flamegraph format. It will combine samples from within the same callstack, with each element being grouped by its function name. rpc SelectMergeStacktraces(SelectMergeStacktracesRequest) returns (SelectMergeStacktracesResponse) {} - // SelectMergeSpans returns matching profiles aggregated in a flamegraph format. It will combine samples from within the same callstack, with each element being grouped by its function name. + // SelectMergeSpanProfile returns matching profiles aggregated in a flamegraph format. It will combine samples from within the same callstack, with each element being grouped by its function name. rpc SelectMergeSpanProfile(SelectMergeSpanProfileRequest) returns (SelectMergeSpanProfileResponse) {} // SelectMergeProfile returns matching profiles aggregated in pprof format. It will contain all information stored (so including filenames and line number, if ingested). rpc SelectMergeProfile(SelectMergeProfileRequest) returns (google.v1.Profile) {} // SelectSeries returns a time series for the total sum of the requested profiles. rpc SelectSeries(SelectSeriesRequest) returns (SelectSeriesResponse) {} + // Diff returns a diff of two profiles rpc Diff(DiffRequest) returns (DiffResponse) {} + + // GetProfileStats returns profile stats for the current tenant. + rpc GetProfileStats(types.v1.GetProfileStatsRequest) returns (types.v1.GetProfileStatsResponse) {} } message ProfileTypesRequest { diff --git a/api/types/v1/types.proto b/api/types/v1/types.proto index 7a0174b157..09012a1319 100644 --- a/api/types/v1/types.proto +++ b/api/types/v1/types.proto @@ -83,3 +83,14 @@ message StackTraceSelector { message Location { string name = 1; } + +message GetProfileStatsRequest {} + +message GetProfileStatsResponse { + // Whether we received any data at any time in the past. + bool data_ingested = 1; + // Milliseconds since epoch. + int64 oldest_profile_time = 2; + // Milliseconds since epoch. + int64 newest_profile_time = 3; +} diff --git a/pkg/frontend/frontend_get_profile_stats.go b/pkg/frontend/frontend_get_profile_stats.go new file mode 100644 index 0000000000..0720379804 --- /dev/null +++ b/pkg/frontend/frontend_get_profile_stats.go @@ -0,0 +1,23 @@ +package frontend + +import ( + "context" + + "connectrpc.com/connect" + "github.com/opentracing/opentracing-go" + + "github.com/grafana/pyroscope/api/gen/proto/go/querier/v1/querierv1connect" + typesv1 "github.com/grafana/pyroscope/api/gen/proto/go/types/v1" + "github.com/grafana/pyroscope/pkg/util/connectgrpc" +) + +func (f *Frontend) GetProfileStats(ctx context.Context, + c *connect.Request[typesv1.GetProfileStatsRequest]) ( + *connect.Response[typesv1.GetProfileStatsResponse], error, +) { + opentracing.SpanFromContext(ctx) + + ctx = connectgrpc.WithProcedure(ctx, querierv1connect.QuerierServiceGetProfileStatsProcedure) + res, err := connectgrpc.RoundTripUnary[typesv1.GetProfileStatsRequest, typesv1.GetProfileStatsResponse](ctx, f, c) + return res, err +} diff --git a/pkg/ingester/query.go b/pkg/ingester/query.go index 727bb666ef..cb03e1c6e1 100644 --- a/pkg/ingester/query.go +++ b/pkg/ingester/query.go @@ -67,3 +67,10 @@ func (i *Ingester) MergeSpanProfile(ctx context.Context, stream *connect.BidiStr return instance.MergeSpanProfile(ctx, stream) }) } + +// GetProfileStats returns +func (i *Ingester) GetProfileStats(ctx context.Context, req *connect.Request[typesv1.GetProfileStatsRequest]) (*connect.Response[typesv1.GetProfileStatsResponse], error) { + return forInstanceUnary(ctx, i, func(instance *instance) (*connect.Response[typesv1.GetProfileStatsResponse], error) { + return instance.GetProfileStats(ctx, req) + }) +} diff --git a/pkg/phlare/modules.go b/pkg/phlare/modules.go index 90a61d6350..f446742960 100644 --- a/pkg/phlare/modules.go +++ b/pkg/phlare/modules.go @@ -7,6 +7,7 @@ import ( "os" "time" + "connectrpc.com/connect" "github.com/go-kit/log" "github.com/go-kit/log/level" "github.com/grafana/dskit/dns" @@ -245,20 +246,18 @@ func (f *Phlare) setupWorkerTimeout() { } func (f *Phlare) initQuerier() (services.Service, error) { - var ( - storeGatewayQuerier *querier.StoreGatewayQuerier - err error - ) - - // if a storage bucket is configure we need to create a store gateway querier - if f.storageBucket != nil { - storeGatewayQuerier, err = querier.NewStoreGatewayQuerier(f.Cfg.StoreGateway, nil, f.Overrides, log.With(f.logger, "component", "store-gateway-querier"), f.reg, f.auth) - if err != nil { - return nil, err - } - } - - querierSvc, err := querier.New(f.Cfg.Querier, f.ring, nil, storeGatewayQuerier, f.reg, log.With(f.logger, "component", "querier"), f.auth) + newQuerierParams := &querier.NewQuerierParams{ + Cfg: f.Cfg.Querier, + StoreGatewayCfg: f.Cfg.StoreGateway, + Overrides: f.Overrides, + CfgProvider: f.Overrides, + StorageBucket: f.storageBucket, + IngestersRing: f.ring, + Reg: f.reg, + Logger: log.With(f.logger, "component", "querier"), + ClientOptions: []connect.ClientOption{f.auth}, + } + querierSvc, err := querier.New(newQuerierParams) if err != nil { return nil, err } @@ -267,12 +266,12 @@ func (f *Phlare) initQuerier() (services.Service, error) { f.API.RegisterPyroscopeHandlers(querierSvc) f.API.RegisterQuerier(querierSvc) } - worker, err := worker.NewQuerierWorker(f.Cfg.Worker, querier.NewGRPCHandler(querierSvc), log.With(f.logger, "component", "querier-worker"), f.reg) + qWorker, err := worker.NewQuerierWorker(f.Cfg.Worker, querier.NewGRPCHandler(querierSvc), log.With(f.logger, "component", "querier-worker"), f.reg) if err != nil { return nil, err } - sm, err := services.NewManager(querierSvc, worker) + sm, err := services.NewManager(querierSvc, qWorker) if err != nil { return nil, err } diff --git a/pkg/phlaredb/phlaredb.go b/pkg/phlaredb/phlaredb.go index e7cd7dc9c5..a6d4f81103 100644 --- a/pkg/phlaredb/phlaredb.go +++ b/pkg/phlaredb/phlaredb.go @@ -2,8 +2,10 @@ package phlaredb import ( "context" + "errors" "flag" "fmt" + "math" "os" "path/filepath" "sync" @@ -524,3 +526,57 @@ func (f *PhlareDB) BlockMetadata(ctx context.Context, req *connect.Request[inges return connect.NewResponse(&result), nil } + +func (f *PhlareDB) GetProfileStats(ctx context.Context, req *connect.Request[typesv1.GetProfileStatsRequest]) (*connect.Response[typesv1.GetProfileStatsResponse], error) { + sp, _ := opentracing.StartSpanFromContext(ctx, "PhlareDB GetProfileStats") + defer sp.Finish() + + minTimes := make([]model.Time, 0) + maxTimes := make([]model.Time, 0) + + f.headLock.RLock() + for _, h := range f.heads { + minT, maxT := h.Bounds() + minTimes = append(minTimes, minT) + maxTimes = append(maxTimes, maxT) + } + for _, h := range f.flushing { + minT, maxT := h.Bounds() + minTimes = append(minTimes, minT) + maxTimes = append(maxTimes, maxT) + } + f.headLock.RUnlock() + + f.blockQuerier.queriersLock.RLock() + for _, q := range f.blockQuerier.queriers { + minT, maxT := q.Bounds() + minTimes = append(minTimes, minT) + maxTimes = append(maxTimes, maxT) + } + f.blockQuerier.queriersLock.RUnlock() + + response, err := getProfileStatsFromBounds(minTimes, maxTimes) + return connect.NewResponse(response), err +} + +func getProfileStatsFromBounds(minTimes, maxTimes []model.Time) (*typesv1.GetProfileStatsResponse, error) { + if len(minTimes) != len(maxTimes) { + return nil, errors.New("minTimes and maxTimes differ in length") + } + response := &typesv1.GetProfileStatsResponse{ + DataIngested: len(minTimes) > 0, + OldestProfileTime: math.MaxInt64, + NewestProfileTime: math.MinInt64, + } + + for i, minTime := range minTimes { + maxTime := maxTimes[i] + if response.OldestProfileTime > minTime.Time().UnixMilli() { + response.OldestProfileTime = minTime.Time().UnixMilli() + } + if response.NewestProfileTime < maxTime.Time().UnixMilli() { + response.NewestProfileTime = maxTime.Time().UnixMilli() + } + } + return response, nil +} diff --git a/pkg/phlaredb/phlaredb_test.go b/pkg/phlaredb/phlaredb_test.go index e872c83f8d..3615b85ffa 100644 --- a/pkg/phlaredb/phlaredb_test.go +++ b/pkg/phlaredb/phlaredb_test.go @@ -2,6 +2,7 @@ package phlaredb import ( "context" + "math" "net/http" "os" "testing" @@ -140,6 +141,10 @@ func (i *ingesterHandlerPhlareDB) BlockMetadata(context.Context, *connect.Reques return nil, errors.New("not implemented") } +func (i *ingesterHandlerPhlareDB) GetProfileStats(context.Context, *connect.Request[typesv1.GetProfileStatsRequest]) (*connect.Response[typesv1.GetProfileStatsResponse], error) { + return nil, errors.New("not implemented") +} + func TestMergeProfilesStacktraces(t *testing.T) { defer goleak.VerifyNone(t, goleak.IgnoreCurrent()) @@ -565,3 +570,46 @@ func Test_endRangeForTimestamp(t *testing.T) { }) } } + +func Test_getProfileStatsFromMetas(t *testing.T) { + tests := []struct { + name string + minTimes []model.Time + maxTimes []model.Time + want *typesv1.GetProfileStatsResponse + }{ + { + name: "no metas should result in no data ingested", + minTimes: []model.Time{}, + maxTimes: []model.Time{}, + want: &typesv1.GetProfileStatsResponse{ + DataIngested: false, + OldestProfileTime: math.MaxInt64, + NewestProfileTime: math.MinInt64, + }, + }, + { + name: "valid metas should result in data ingested", + minTimes: []model.Time{ + model.TimeFromUnix(1710161819), + model.TimeFromUnix(1710171819), + }, + maxTimes: []model.Time{ + model.TimeFromUnix(1710172239), + model.TimeFromUnix(1710174239), + }, + want: &typesv1.GetProfileStatsResponse{ + DataIngested: true, + OldestProfileTime: 1710161819000, + NewestProfileTime: 1710174239000, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + response, err := getProfileStatsFromBounds(tt.minTimes, tt.maxTimes) + require.NoError(t, err) + assert.Equalf(t, tt.want, response, "getProfileStatsFromBounds(%v, %v)", tt.minTimes, tt.maxTimes) + }) + } +} diff --git a/pkg/querier/ingester_querier.go b/pkg/querier/ingester_querier.go index ab3fed189f..aa35bbb3de 100644 --- a/pkg/querier/ingester_querier.go +++ b/pkg/querier/ingester_querier.go @@ -32,6 +32,7 @@ type IngesterQueryClient interface { MergeProfilesPprof(ctx context.Context) clientpool.BidiClientMergeProfilesPprof MergeSpanProfile(ctx context.Context) clientpool.BidiClientMergeSpanProfile BlockMetadata(ctx context.Context, req *connect.Request[ingestv1.BlockMetadataRequest]) (*connect.Response[ingestv1.BlockMetadataResponse], error) + GetProfileStats(ctx context.Context, req *connect.Request[typesv1.GetProfileStatsRequest]) (*connect.Response[typesv1.GetProfileStatsResponse], error) } // IngesterQuerier helps with querying the ingesters. diff --git a/pkg/querier/querier.go b/pkg/querier/querier.go index 7412bdbbbc..1ebf7025d9 100644 --- a/pkg/querier/querier.go +++ b/pkg/querier/querier.go @@ -4,6 +4,7 @@ import ( "context" "flag" "fmt" + "math" "sort" "strings" "sync" @@ -15,6 +16,7 @@ import ( "github.com/grafana/dskit/ring" ring_client "github.com/grafana/dskit/ring/client" "github.com/grafana/dskit/services" + "github.com/grafana/dskit/tenant" "github.com/opentracing/opentracing-go" otlog "github.com/opentracing/opentracing-go/log" "github.com/pkg/errors" @@ -29,14 +31,18 @@ import ( ingestv1 "github.com/grafana/pyroscope/api/gen/proto/go/ingester/v1" querierv1 "github.com/grafana/pyroscope/api/gen/proto/go/querier/v1" typesv1 "github.com/grafana/pyroscope/api/gen/proto/go/types/v1" - vcsv1connect "github.com/grafana/pyroscope/api/gen/proto/go/vcs/v1/vcsv1connect" + "github.com/grafana/pyroscope/api/gen/proto/go/vcs/v1/vcsv1connect" "github.com/grafana/pyroscope/pkg/clientpool" "github.com/grafana/pyroscope/pkg/iter" phlaremodel "github.com/grafana/pyroscope/pkg/model" + phlareobj "github.com/grafana/pyroscope/pkg/objstore" + "github.com/grafana/pyroscope/pkg/phlaredb/bucketindex" "github.com/grafana/pyroscope/pkg/pprof" "github.com/grafana/pyroscope/pkg/querier/vcs" - "github.com/grafana/pyroscope/pkg/util/math" + "github.com/grafana/pyroscope/pkg/storegateway" + pmath "github.com/grafana/pyroscope/pkg/util/math" "github.com/grafana/pyroscope/pkg/util/spanlogger" + "github.com/grafana/pyroscope/pkg/validation" ) type Config struct { @@ -62,6 +68,9 @@ type Querier struct { storeGatewayQuerier *StoreGatewayQuerier vcsv1connect.VCSServiceHandler + + storageBucket phlareobj.Bucket + tenantConfigProvider phlareobj.TenantConfigProvider } // TODO(kolesnikovae): For backwards compatibility. @@ -71,25 +80,56 @@ type Querier struct { // querier frontend sets the limit. const maxNodesDefault = int64(2048) -func New(cfg Config, ingestersRing ring.ReadRing, factory ring_client.PoolFactory, storeGatewayQuerier *StoreGatewayQuerier, reg prometheus.Registerer, logger log.Logger, clientsOptions ...connect.ClientOption) (*Querier, error) { +type NewQuerierParams struct { + Cfg Config + StoreGatewayCfg storegateway.Config + Overrides *validation.Overrides + StorageBucket phlareobj.Bucket + CfgProvider phlareobj.TenantConfigProvider + IngestersRing ring.ReadRing + PoolFactory ring_client.PoolFactory + Reg prometheus.Registerer + Logger log.Logger + ClientOptions []connect.ClientOption +} + +func New(params *NewQuerierParams) (*Querier, error) { // disable gzip compression for querier-ingester communication as most of payload are not benefit from it. - clientsMetrics := promauto.With(reg).NewGauge(prometheus.GaugeOpts{ + clientsMetrics := promauto.With(params.Reg).NewGauge(prometheus.GaugeOpts{ Namespace: "pyroscope", Name: "querier_ingester_clients", Help: "The current number of ingester clients.", }) + // if a storage bucket is configured we need to create a store gateway querier + var storeGatewayQuerier *StoreGatewayQuerier + var err error + if params.StorageBucket != nil { + storeGatewayQuerier, err = newStoreGatewayQuerier( + params.StoreGatewayCfg, + params.PoolFactory, + params.Overrides, + log.With(params.Logger, "component", "store-gateway-querier"), + params.Reg, + params.ClientOptions...) + if err != nil { + return nil, err + } + } + q := &Querier{ - cfg: cfg, - logger: logger, + cfg: params.Cfg, + logger: params.Logger, ingesterQuerier: NewIngesterQuerier( - clientpool.NewIngesterPool(cfg.PoolConfig, ingestersRing, factory, clientsMetrics, logger, clientsOptions...), - ingestersRing, + clientpool.NewIngesterPool(params.Cfg.PoolConfig, params.IngestersRing, params.PoolFactory, clientsMetrics, params.Logger, params.ClientOptions...), + params.IngestersRing, ), - storeGatewayQuerier: storeGatewayQuerier, - VCSServiceHandler: vcs.New(logger), + storeGatewayQuerier: storeGatewayQuerier, + VCSServiceHandler: vcs.New(params.Logger), + storageBucket: params.StorageBucket, + tenantConfigProvider: params.CfgProvider, } - var err error + svcs := []services.Service{q.ingesterQuerier.pool} if storeGatewayQuerier != nil { svcs = append(svcs, storeGatewayQuerier) @@ -475,6 +515,63 @@ func (q *Querier) Diff(ctx context.Context, req *connect.Request[querierv1.DiffR }), nil } +func (q *Querier) GetProfileStats(ctx context.Context, req *connect.Request[typesv1.GetProfileStatsRequest]) (*connect.Response[typesv1.GetProfileStatsResponse], error) { + sp, ctx := opentracing.StartSpanFromContext(ctx, "GetProfileStats") + defer sp.Finish() + + responses, err := forAllIngesters(ctx, q.ingesterQuerier, func(childCtx context.Context, ic IngesterQueryClient) (*typesv1.GetProfileStatsResponse, error) { + response, err := ic.GetProfileStats(childCtx, connect.NewRequest(&typesv1.GetProfileStatsRequest{})) + if err != nil { + return nil, err + } + return response.Msg, nil + }) + if err != nil { + return nil, err + } + + response := &typesv1.GetProfileStatsResponse{ + DataIngested: false, + OldestProfileTime: math.MaxInt64, + NewestProfileTime: math.MinInt64, + } + for _, r := range responses { + response.DataIngested = response.DataIngested || r.response.DataIngested + if r.response.OldestProfileTime < response.OldestProfileTime { + response.OldestProfileTime = r.response.OldestProfileTime + } + if r.response.NewestProfileTime > response.NewestProfileTime { + response.NewestProfileTime = r.response.NewestProfileTime + } + } + + if q.storageBucket != nil { + tenantId, err := tenant.TenantID(ctx) + if err != nil { + return nil, err + } + index, err := bucketindex.ReadIndex(ctx, q.storageBucket, tenantId, q.tenantConfigProvider, q.logger) + if err != nil && !errors.Is(err, bucketindex.ErrIndexNotFound) { + return nil, err + } + if index != nil && len(index.Blocks) > 0 { + // assuming blocks are ordered by time in ascending order + // ignoring deleted blocks as we only need the overall time range of blocks + minTime := index.Blocks[0].MinTime.Time().UnixMilli() + if minTime < response.OldestProfileTime { + response.OldestProfileTime = minTime + } + maxTime := index.Blocks[len(index.Blocks)-1].MaxTime.Time().UnixMilli() + if maxTime > response.NewestProfileTime { + response.NewestProfileTime = maxTime + } + response.DataIngested = true + } + } + + return connect.NewResponse(response), nil +} + func (q *Querier) SelectMergeStacktraces(ctx context.Context, req *connect.Request[querierv1.SelectMergeStacktracesRequest]) (*connect.Response[querierv1.SelectMergeStacktracesResponse], error) { sp, ctx := opentracing.StartSpanFromContext(ctx, "SelectMergeStacktraces") level.Info(spanlogger.FromContext(ctx, q.logger)).Log( @@ -713,10 +810,10 @@ func splitQueryToStores(start, end model.Time, now model.Time, queryStoreAfter t queries.queryStoreAfter = queryStoreAfter cutOff := now.Add(-queryStoreAfter) if start.Before(cutOff) { - queries.storeGateway = storeQuery{shouldQuery: true, start: start, end: math.Min(cutOff, end)} + queries.storeGateway = storeQuery{shouldQuery: true, start: start, end: pmath.Min(cutOff, end)} } if end.After(cutOff) { - queries.ingester = storeQuery{shouldQuery: true, start: math.Max(cutOff, start), end: end} + queries.ingester = storeQuery{shouldQuery: true, start: pmath.Max(cutOff, start), end: end} // Note that the ranges must not overlap. if queries.storeGateway.shouldQuery { queries.ingester.start++ diff --git a/pkg/querier/querier_test.go b/pkg/querier/querier_test.go index 7f84828925..5aed2e56e3 100644 --- a/pkg/querier/querier_test.go +++ b/pkg/querier/querier_test.go @@ -2,9 +2,12 @@ package querier import ( "bytes" + "compress/gzip" "context" + "encoding/json" "errors" "os" + "path" "sort" "testing" "time" @@ -13,6 +16,7 @@ import ( "github.com/go-kit/log" "github.com/gogo/protobuf/proto" "github.com/google/pprof/profile" + "github.com/grafana/dskit/kv" "github.com/grafana/dskit/ring" "github.com/grafana/dskit/ring/client" "github.com/prometheus/common/model" @@ -27,9 +31,15 @@ import ( "github.com/grafana/pyroscope/pkg/clientpool" "github.com/grafana/pyroscope/pkg/iter" phlaremodel "github.com/grafana/pyroscope/pkg/model" + objstoreclient "github.com/grafana/pyroscope/pkg/objstore/client" + "github.com/grafana/pyroscope/pkg/objstore/providers/filesystem" + "github.com/grafana/pyroscope/pkg/phlaredb/bucketindex" "github.com/grafana/pyroscope/pkg/pprof" pprofth "github.com/grafana/pyroscope/pkg/pprof/testhelper" + "github.com/grafana/pyroscope/pkg/storegateway" + "github.com/grafana/pyroscope/pkg/tenant" "github.com/grafana/pyroscope/pkg/testhelper" + "github.com/grafana/pyroscope/pkg/util" ) type poolFactory struct { @@ -41,45 +51,52 @@ func (p poolFactory) FromInstance(desc ring.InstanceDesc) (client.PoolClient, er } func Test_QuerySampleType(t *testing.T) { - querier, err := New(Config{ - PoolConfig: clientpool.PoolConfig{ClientCleanupPeriod: 1 * time.Millisecond}, - }, testhelper.NewMockRing([]ring.InstanceDesc{ - {Addr: "1"}, - {Addr: "2"}, - {Addr: "3"}, - }, 3), &poolFactory{f: func(addr string) (client.PoolClient, error) { - q := newFakeQuerier() - switch addr { - case "1": - q.On("LabelValues", mock.Anything, mock.Anything). - Return(connect.NewResponse(&typesv1.LabelValuesResponse{ - Names: []string{ - "foo::::", - "bar::::", - }, - }), nil) - case "2": - q.On("LabelValues", mock.Anything, mock.Anything). - Return(connect.NewResponse(&typesv1.LabelValuesResponse{ - Names: []string{ - "bar::::", - "buzz::::", - }, - }), nil) - case "3": - q.On("LabelValues", mock.Anything, mock.Anything). - Return(connect.NewResponse(&typesv1.LabelValuesResponse{ - Names: []string{ - "buzz::::", - "foo::::", - }, - }), nil) - } - return q, nil - }}, nil, nil, log.NewLogfmtLogger(os.Stdout)) - + querier, err := New(&NewQuerierParams{ + Cfg: Config{ + PoolConfig: clientpool.PoolConfig{ClientCleanupPeriod: 1 * time.Millisecond}, + }, + IngestersRing: testhelper.NewMockRing([]ring.InstanceDesc{ + {Addr: "1"}, + {Addr: "2"}, + {Addr: "3"}, + }, 3), + PoolFactory: &poolFactory{f: func(addr string) (client.PoolClient, error) { + q := newFakeQuerier() + switch addr { + case "1": + q.On("LabelValues", mock.Anything, mock.Anything). + Return(connect.NewResponse(&typesv1.LabelValuesResponse{ + Names: []string{ + "foo::::", + "bar::::", + }, + }), nil) + case "2": + q.On("LabelValues", mock.Anything, mock.Anything). + Return(connect.NewResponse(&typesv1.LabelValuesResponse{ + Names: []string{ + "bar::::", + "buzz::::", + }, + }), nil) + case "3": + q.On("LabelValues", mock.Anything, mock.Anything). + Return(connect.NewResponse(&typesv1.LabelValuesResponse{ + Names: []string{ + "buzz::::", + "foo::::", + }, + }), nil) + } + return q, nil + }}, + Logger: log.NewLogfmtLogger(os.Stdout), + }) require.NoError(t, err) + out, err := querier.ProfileTypes(context.Background(), connect.NewRequest(&querierv1.ProfileTypesRequest{})) + require.NoError(t, err) + ids := make([]string, 0, len(out.Msg.ProfileTypes)) for _, pt := range out.Msg.ProfileTypes { ids = append(ids, pt.ID) @@ -90,24 +107,29 @@ func Test_QuerySampleType(t *testing.T) { func Test_QueryLabelValues(t *testing.T) { req := connect.NewRequest(&typesv1.LabelValuesRequest{Name: "foo"}) - querier, err := New(Config{ - PoolConfig: clientpool.PoolConfig{ClientCleanupPeriod: 1 * time.Millisecond}, - }, testhelper.NewMockRing([]ring.InstanceDesc{ - {Addr: "1"}, - {Addr: "2"}, - {Addr: "3"}, - }, 3), &poolFactory{f: func(addr string) (client.PoolClient, error) { - q := newFakeQuerier() - switch addr { - case "1": - q.On("LabelValues", mock.Anything, mock.Anything).Return(connect.NewResponse(&typesv1.LabelValuesResponse{Names: []string{"foo", "bar"}}), nil) - case "2": - q.On("LabelValues", mock.Anything, mock.Anything).Return(connect.NewResponse(&typesv1.LabelValuesResponse{Names: []string{"bar", "buzz"}}), nil) - case "3": - q.On("LabelValues", mock.Anything, mock.Anything).Return(connect.NewResponse(&typesv1.LabelValuesResponse{Names: []string{"buzz", "foo"}}), nil) - } - return q, nil - }}, nil, nil, log.NewLogfmtLogger(os.Stdout)) + querier, err := New(&NewQuerierParams{ + Cfg: Config{ + PoolConfig: clientpool.PoolConfig{ClientCleanupPeriod: 1 * time.Millisecond}, + }, + IngestersRing: testhelper.NewMockRing([]ring.InstanceDesc{ + {Addr: "1"}, + {Addr: "2"}, + {Addr: "3"}, + }, 3), + PoolFactory: &poolFactory{f: func(addr string) (client.PoolClient, error) { + q := newFakeQuerier() + switch addr { + case "1": + q.On("LabelValues", mock.Anything, mock.Anything).Return(connect.NewResponse(&typesv1.LabelValuesResponse{Names: []string{"foo", "bar"}}), nil) + case "2": + q.On("LabelValues", mock.Anything, mock.Anything).Return(connect.NewResponse(&typesv1.LabelValuesResponse{Names: []string{"bar", "buzz"}}), nil) + case "3": + q.On("LabelValues", mock.Anything, mock.Anything).Return(connect.NewResponse(&typesv1.LabelValuesResponse{Names: []string{"buzz", "foo"}}), nil) + } + return q, nil + }}, + Logger: log.NewLogfmtLogger(os.Stdout), + }) require.NoError(t, err) out, err := querier.LabelValues(context.Background(), req) @@ -117,24 +139,29 @@ func Test_QueryLabelValues(t *testing.T) { func Test_QueryLabelNames(t *testing.T) { req := connect.NewRequest(&typesv1.LabelNamesRequest{}) - querier, err := New(Config{ - PoolConfig: clientpool.PoolConfig{ClientCleanupPeriod: 1 * time.Millisecond}, - }, testhelper.NewMockRing([]ring.InstanceDesc{ - {Addr: "1"}, - {Addr: "2"}, - {Addr: "3"}, - }, 3), &poolFactory{f: func(addr string) (client.PoolClient, error) { - q := newFakeQuerier() - switch addr { - case "1": - q.On("LabelNames", mock.Anything, mock.Anything).Return(connect.NewResponse(&typesv1.LabelNamesResponse{Names: []string{"foo", "bar"}}), nil) - case "2": - q.On("LabelNames", mock.Anything, mock.Anything).Return(connect.NewResponse(&typesv1.LabelNamesResponse{Names: []string{"bar", "buzz"}}), nil) - case "3": - q.On("LabelNames", mock.Anything, mock.Anything).Return(connect.NewResponse(&typesv1.LabelNamesResponse{Names: []string{"buzz", "foo"}}), nil) - } - return q, nil - }}, nil, nil, log.NewLogfmtLogger(os.Stdout)) + querier, err := New(&NewQuerierParams{ + Cfg: Config{ + PoolConfig: clientpool.PoolConfig{ClientCleanupPeriod: 1 * time.Millisecond}, + }, + IngestersRing: testhelper.NewMockRing([]ring.InstanceDesc{ + {Addr: "1"}, + {Addr: "2"}, + {Addr: "3"}, + }, 3), + PoolFactory: &poolFactory{f: func(addr string) (client.PoolClient, error) { + q := newFakeQuerier() + switch addr { + case "1": + q.On("LabelNames", mock.Anything, mock.Anything).Return(connect.NewResponse(&typesv1.LabelNamesResponse{Names: []string{"foo", "bar"}}), nil) + case "2": + q.On("LabelNames", mock.Anything, mock.Anything).Return(connect.NewResponse(&typesv1.LabelNamesResponse{Names: []string{"bar", "buzz"}}), nil) + case "3": + q.On("LabelNames", mock.Anything, mock.Anything).Return(connect.NewResponse(&typesv1.LabelNamesResponse{Names: []string{"buzz", "foo"}}), nil) + } + return q, nil + }}, + Logger: log.NewLogfmtLogger(os.Stdout), + }) require.NoError(t, err) out, err := querier.LabelNames(context.Background(), req) @@ -146,28 +173,33 @@ func Test_Series(t *testing.T) { foobarlabels := phlaremodel.NewLabelsBuilder(nil).Set("foo", "bar") foobuzzlabels := phlaremodel.NewLabelsBuilder(nil).Set("foo", "buzz") req := connect.NewRequest(&querierv1.SeriesRequest{Matchers: []string{`{foo="bar"}`}}) - ingesterReponse := connect.NewResponse(&ingestv1.SeriesResponse{LabelsSet: []*typesv1.Labels{ + ingesterResponse := connect.NewResponse(&ingestv1.SeriesResponse{LabelsSet: []*typesv1.Labels{ {Labels: foobarlabels.Labels()}, {Labels: foobuzzlabels.Labels()}, }}) - querier, err := New(Config{ - PoolConfig: clientpool.PoolConfig{ClientCleanupPeriod: 1 * time.Millisecond}, - }, testhelper.NewMockRing([]ring.InstanceDesc{ - {Addr: "1"}, - {Addr: "2"}, - {Addr: "3"}, - }, 3), &poolFactory{func(addr string) (client.PoolClient, error) { - q := newFakeQuerier() - switch addr { - case "1": - q.On("Series", mock.Anything, mock.Anything).Return(ingesterReponse, nil) - case "2": - q.On("Series", mock.Anything, mock.Anything).Return(ingesterReponse, nil) - case "3": - q.On("Series", mock.Anything, mock.Anything).Return(ingesterReponse, nil) - } - return q, nil - }}, nil, nil, log.NewLogfmtLogger(os.Stdout)) + querier, err := New(&NewQuerierParams{ + Cfg: Config{ + PoolConfig: clientpool.PoolConfig{ClientCleanupPeriod: 1 * time.Millisecond}, + }, + IngestersRing: testhelper.NewMockRing([]ring.InstanceDesc{ + {Addr: "1"}, + {Addr: "2"}, + {Addr: "3"}, + }, 3), + PoolFactory: &poolFactory{func(addr string) (client.PoolClient, error) { + q := newFakeQuerier() + switch addr { + case "1": + q.On("Series", mock.Anything, mock.Anything).Return(ingesterResponse, nil) + case "2": + q.On("Series", mock.Anything, mock.Anything).Return(ingesterResponse, nil) + case "3": + q.On("Series", mock.Anything, mock.Anything).Return(ingesterResponse, nil) + } + return q, nil + }}, + Logger: log.NewLogfmtLogger(os.Stdout), + }) require.NoError(t, err) out, err := querier.Series(context.Background(), req) @@ -273,24 +305,29 @@ func Test_SelectMergeStacktraces(t *testing.T) { }, }, }) - querier, err := New(Config{ - PoolConfig: clientpool.PoolConfig{ClientCleanupPeriod: 1 * time.Millisecond}, - }, testhelper.NewMockRing([]ring.InstanceDesc{ - {Addr: "1"}, - {Addr: "2"}, - {Addr: "3"}, - }, 3), &poolFactory{func(addr string) (client.PoolClient, error) { - q := newFakeQuerier() - switch addr { - case "1": - q.mockMergeStacktraces(bidi1, []string{"a", "d"}, tc.blockSelect) - case "2": - q.mockMergeStacktraces(bidi2, []string{"b", "d"}, tc.blockSelect) - case "3": - q.mockMergeStacktraces(bidi3, []string{"c", "d"}, tc.blockSelect) - } - return q, nil - }}, nil, nil, log.NewLogfmtLogger(os.Stdout)) + querier, err := New(&NewQuerierParams{ + Cfg: Config{ + PoolConfig: clientpool.PoolConfig{ClientCleanupPeriod: 1 * time.Millisecond}, + }, + IngestersRing: testhelper.NewMockRing([]ring.InstanceDesc{ + {Addr: "1"}, + {Addr: "2"}, + {Addr: "3"}, + }, 3), + PoolFactory: &poolFactory{func(addr string) (client.PoolClient, error) { + q := newFakeQuerier() + switch addr { + case "1": + q.mockMergeStacktraces(bidi1, []string{"a", "d"}, tc.blockSelect) + case "2": + q.mockMergeStacktraces(bidi2, []string{"b", "d"}, tc.blockSelect) + case "3": + q.mockMergeStacktraces(bidi3, []string{"c", "d"}, tc.blockSelect) + } + return q, nil + }}, + Logger: log.NewLogfmtLogger(os.Stdout), + }) require.NoError(t, err) flame, err := querier.SelectMergeStacktraces(context.Background(), req) require.NoError(t, err) @@ -390,32 +427,37 @@ func Test_SelectMergeProfiles(t *testing.T) { }, }, }) - querier, err := New(Config{ - PoolConfig: clientpool.PoolConfig{ClientCleanupPeriod: 1 * time.Millisecond}, - }, testhelper.NewMockRing([]ring.InstanceDesc{ - {Addr: "1"}, - {Addr: "2"}, - {Addr: "3"}, - }, 3), &poolFactory{f: func(addr string) (client.PoolClient, error) { - q := newFakeQuerier() - switch addr { - case "1": - q.mockMergeProfile(bidi1, []string{"a", "d"}, tc.blockSelect) - case "2": - q.mockMergeProfile(bidi2, []string{"b", "d"}, tc.blockSelect) - case "3": - q.mockMergeProfile(bidi3, []string{"c", "d"}, tc.blockSelect) - } - switch addr { - case "1": - q.On("MergeProfilesPprof", mock.Anything).Once().Return(bidi1) - case "2": - q.On("MergeProfilesPprof", mock.Anything).Once().Return(bidi2) - case "3": - q.On("MergeProfilesPprof", mock.Anything).Once().Return(bidi3) - } - return q, nil - }}, nil, nil, log.NewLogfmtLogger(os.Stdout)) + querier, err := New(&NewQuerierParams{ + Cfg: Config{ + PoolConfig: clientpool.PoolConfig{ClientCleanupPeriod: 1 * time.Millisecond}, + }, + IngestersRing: testhelper.NewMockRing([]ring.InstanceDesc{ + {Addr: "1"}, + {Addr: "2"}, + {Addr: "3"}, + }, 3), + PoolFactory: &poolFactory{f: func(addr string) (client.PoolClient, error) { + q := newFakeQuerier() + switch addr { + case "1": + q.mockMergeProfile(bidi1, []string{"a", "d"}, tc.blockSelect) + case "2": + q.mockMergeProfile(bidi2, []string{"b", "d"}, tc.blockSelect) + case "3": + q.mockMergeProfile(bidi3, []string{"c", "d"}, tc.blockSelect) + } + switch addr { + case "1": + q.On("MergeProfilesPprof", mock.Anything).Once().Return(bidi1) + case "2": + q.On("MergeProfilesPprof", mock.Anything).Once().Return(bidi2) + case "3": + q.On("MergeProfilesPprof", mock.Anything).Once().Return(bidi3) + } + return q, nil + }}, + Logger: log.NewLogfmtLogger(os.Stdout), + }) require.NoError(t, err) res, err := querier.SelectMergeProfile(context.Background(), req) require.NoError(t, err) @@ -524,24 +566,29 @@ func TestSelectSeries(t *testing.T) { }, }, }, &typesv1.Series{Labels: foobarlabels, Points: []*typesv1.Point{{Value: 1, Timestamp: 1}, {Value: 2, Timestamp: 2}}}) - querier, err := New(Config{ - PoolConfig: clientpool.PoolConfig{ClientCleanupPeriod: 1 * time.Millisecond}, - }, testhelper.NewMockRing([]ring.InstanceDesc{ - {Addr: "1"}, - {Addr: "2"}, - {Addr: "3"}, - }, 3), &poolFactory{f: func(addr string) (client.PoolClient, error) { - q := newFakeQuerier() - switch addr { - case "1": - q.mockMergeLabels(bidi1, []string{"a", "d"}, tc.blockSelect) - case "2": - q.mockMergeLabels(bidi2, []string{"b", "d"}, tc.blockSelect) - case "3": - q.mockMergeLabels(bidi3, []string{"c", "d"}, tc.blockSelect) - } - return q, nil - }}, nil, nil, log.NewLogfmtLogger(os.Stdout)) + querier, err := New(&NewQuerierParams{ + Cfg: Config{ + PoolConfig: clientpool.PoolConfig{ClientCleanupPeriod: 1 * time.Millisecond}, + }, + IngestersRing: testhelper.NewMockRing([]ring.InstanceDesc{ + {Addr: "1"}, + {Addr: "2"}, + {Addr: "3"}, + }, 3), + PoolFactory: &poolFactory{f: func(addr string) (client.PoolClient, error) { + q := newFakeQuerier() + switch addr { + case "1": + q.mockMergeLabels(bidi1, []string{"a", "d"}, tc.blockSelect) + case "2": + q.mockMergeLabels(bidi2, []string{"b", "d"}, tc.blockSelect) + case "3": + q.mockMergeLabels(bidi3, []string{"c", "d"}, tc.blockSelect) + } + return q, nil + }}, + Logger: log.NewLogfmtLogger(os.Stdout), + }) require.NoError(t, err) res, err := querier.SelectSeries(context.Background(), req) require.NoError(t, err) @@ -685,6 +732,22 @@ func (f *fakeQuerierIngester) BlockMetadata(ctx context.Context, req *connect.Re return res, err } +func (f *fakeQuerierIngester) GetProfileStats(ctx context.Context, req *connect.Request[typesv1.GetProfileStatsRequest]) (*connect.Response[typesv1.GetProfileStatsResponse], error) { + var ( + args = f.Called(ctx, req) + res *connect.Response[typesv1.GetProfileStatsResponse] + err error + ) + if args[0] != nil { + res = args[0].(*connect.Response[typesv1.GetProfileStatsResponse]) + } + if args[1] != nil { + err = args.Get(1).(error) + } + + return res, err +} + type testProfile struct { Ts int64 Labels *typesv1.Labels @@ -1302,6 +1365,98 @@ func Test_splitQueryToStores(t *testing.T) { } } +func Test_GetProfileStats(t *testing.T) { + ctx := tenant.InjectTenantID(context.Background(), "1234") + + dbPath := t.TempDir() + localBucket, err := objstoreclient.NewBucket(ctx, objstoreclient.Config{ + StorageBackendConfig: objstoreclient.StorageBackendConfig{ + Backend: objstoreclient.Filesystem, + Filesystem: filesystem.Config{ + Directory: dbPath, + }, + }, + StoragePrefix: "testdata", + }, "") + require.NoError(t, err) + + index := bucketindex.Index{Blocks: []*bucketindex.Block{{ + MinTime: 0, + MaxTime: 3, + }}, + Version: bucketindex.IndexVersion3, + } + indexJson, err := json.Marshal(index) + require.NoError(t, err) + + var gzipContent bytes.Buffer + gzip := gzip.NewWriter(&gzipContent) + gzip.Name = bucketindex.IndexFilename + _, err = gzip.Write(indexJson) + gzip.Close() + require.NoError(t, err) + + err = localBucket.Upload(ctx, path.Join("1234", "phlaredb", bucketindex.IndexCompressedFilename), &gzipContent) + require.NoError(t, err) + + req := connect.NewRequest(&typesv1.GetProfileStatsRequest{}) + querier, err := New(&NewQuerierParams{ + Cfg: Config{ + PoolConfig: clientpool.PoolConfig{ClientCleanupPeriod: 1 * time.Millisecond}, + }, + IngestersRing: testhelper.NewMockRing([]ring.InstanceDesc{ + {Addr: "1"}, + {Addr: "2"}, + {Addr: "3"}, + }, 3), + PoolFactory: &poolFactory{f: func(addr string) (client.PoolClient, error) { + q := newFakeQuerier() + switch addr { + case "1": + q.On("GetProfileStats", mock.Anything, mock.Anything).Return(connect.NewResponse(&typesv1.GetProfileStatsResponse{ + DataIngested: true, + OldestProfileTime: 1, + NewestProfileTime: 4, + }), nil) + case "2": + q.On("GetProfileStats", mock.Anything, mock.Anything).Return(connect.NewResponse(&typesv1.GetProfileStatsResponse{ + DataIngested: true, + OldestProfileTime: 1, + NewestProfileTime: 5, + }), nil) + case "3": + q.On("GetProfileStats", mock.Anything, mock.Anything).Return(connect.NewResponse(&typesv1.GetProfileStatsResponse{ + DataIngested: true, + OldestProfileTime: 2, + NewestProfileTime: 5, + }), nil) + } + return q, nil + }}, + Logger: log.NewLogfmtLogger(os.Stdout), + StorageBucket: localBucket, + StoreGatewayCfg: storegateway.Config{ + ShardingRing: storegateway.RingConfig{ + Ring: util.CommonRingConfig{ + KVStore: kv.Config{ + Store: "inmemory", + }, + }, + ReplicationFactor: 1, + }, + }, + }) + + require.NoError(t, err) + out, err := querier.GetProfileStats(ctx, req) + require.NoError(t, err) + require.Equal(t, &typesv1.GetProfileStatsResponse{ + DataIngested: true, + OldestProfileTime: 0, + NewestProfileTime: 5, + }, out.Msg) +} + // The code below can be useful for testing deduping directly to a cluster. // func TestDedupeLive(t *testing.T) { // clients, err := createClients(context.Background()) diff --git a/pkg/querier/store_gateway_querier.go b/pkg/querier/store_gateway_querier.go index c111299076..1e7f4cb1fc 100644 --- a/pkg/querier/store_gateway_querier.go +++ b/pkg/querier/store_gateway_querier.go @@ -56,7 +56,7 @@ type StoreGatewayQuerier struct { subservicesWatcher *services.FailureWatcher } -func NewStoreGatewayQuerier( +func newStoreGatewayQuerier( gatewayCfg storegateway.Config, factory ring_client.PoolFactory, limits StoreGatewayLimits,