diff --git a/pkg/kb_agent/handlers/grpc_handler.go b/pkg/kb_agent/handlers/grpc_handler.go index 25c913d9570..72d7307a248 100644 --- a/pkg/kb_agent/handlers/grpc_handler.go +++ b/pkg/kb_agent/handlers/grpc_handler.go @@ -22,6 +22,9 @@ package handlers import ( "context" + "github.com/apecloud/kubeblocks/pkg/constant" + "github.com/apecloud/kubeblocks/pkg/kb_agent/plugin" + viper "github.com/apecloud/kubeblocks/pkg/viperx" "github.com/go-logr/logr" "github.com/pkg/errors" ctrl "sigs.k8s.io/controller-runtime" @@ -30,17 +33,30 @@ import ( ) type GRPCHandler struct { - Logger logr.Logger + Logger logr.Logger + grpcClient plugin.GrpcClient } var _ Handler = &GRPCHandler{} func NewGRPCHandler(properties map[string]string) (*GRPCHandler, error) { logger := ctrl.Log.WithName("GRPC handler") + host := viper.GetString(constant.KBEnvPodIP) + if h, ok := properties["host"]; ok && h != "" { + host = h + } + port, ok := properties["port"] + if !ok || port == "" { + return &GRPCHandler{logger, nil}, nil + } + grpcClient, err := plugin.NewGRPCClient(host, port) + if err != nil { + return nil, errors.Wrap(err, "new client failed") + } h := &GRPCHandler{ - Logger: logger, + Logger: logger, + grpcClient: grpcClient, } - return h, nil } @@ -48,6 +64,22 @@ func (h *GRPCHandler) Do(ctx context.Context, setting util.HandlerSpec, args map if setting.GPRC == nil { return nil, errors.New("grpc setting is nil") } - // TODO: implement grpc handler - return nil, ErrNotImplemented + if args == nil { + return nil, errors.New("args is nil") + } + parameters, err := util.WrapperArgs(args) + if err != nil { + return nil, errors.Wrap(err, "wrapper args failed") + } + request := &plugin.Request{ + MethodName: args["methodName"].(string), + Parameters: parameters, + } + client := h.grpcClient + result, err := client.Call(ctx, request) + if err != nil { + return nil, errors.Wrap(err, "grpc call failed") + } + resp := &Response{Message: result.Message} + return resp, nil } diff --git a/pkg/kb_agent/handlers/grpc_handler_test.go b/pkg/kb_agent/handlers/grpc_handler_test.go index fee324cdf27..91d0fc032b0 100644 --- a/pkg/kb_agent/handlers/grpc_handler_test.go +++ b/pkg/kb_agent/handlers/grpc_handler_test.go @@ -21,39 +21,112 @@ package handlers import ( "context" + "flag" + "log" + "net" "testing" - + + "github.com/apecloud/kubeblocks/pkg/kb_agent/plugin" + "google.golang.org/grpc" + "github.com/apecloud/kubeblocks/pkg/kb_agent/util" "github.com/pkg/errors" "github.com/stretchr/testify/assert" ) func TestNewGRPCHandler(t *testing.T) { - handler, err := NewGRPCHandler(nil) + handler, err := NewGRPCHandler(map[string]string{ + "host": "localhost", + "port": "50051", + }) assert.NotNil(t, handler) assert.Nil(t, err) } func TestGRPCHandlerDo(t *testing.T) { ctx := context.Background() - handler := &GRPCHandler{} + go func() { + flag.Parse() + lis, err := net.Listen("tcp", ":50051") + if err != nil { + log.Fatalf("failed to listen: %v", err) + } + s := grpc.NewServer() + plugin.RegisterGrpcServer(s, &MockGrpcServer{}) + log.Printf("server listening at %v", lis.Addr()) + if err := s.Serve(lis); err != nil { + log.Fatalf("failed to serve: %v", err) + } + }() + setting := util.HandlerSpec{ + GPRC: map[string]string{ + "host": "localhost", + "port": "50051", + }, + } t.Run("grpc handler is nil", func(t *testing.T) { setting := util.HandlerSpec{ GPRC: nil, } + handler := &GRPCHandler{} do, err := handler.Do(ctx, setting, nil) assert.Nil(t, do) assert.NotNil(t, err) assert.Error(t, err, errors.New("grpc setting is nil")) }) - t.Run("grpc handler is not nil but not implemented", func(t *testing.T) { - setting := util.HandlerSpec{ - GPRC: map[string]string{"test": "test"}, - } + t.Run("grpc args is nil", func(t *testing.T) { + handler, err := NewGRPCHandler(setting.GPRC) + assert.Nil(t, err) do, err := handler.Do(ctx, setting, nil) assert.Nil(t, do) assert.NotNil(t, err) - assert.Error(t, err, ErrNotImplemented) + assert.Error(t, err, errors.New("args is nil")) + }) + + t.Run("grpc do success", func(t *testing.T) { + handler, err := NewGRPCHandler(setting.GPRC) + assert.Nil(t, err) + args := map[string]interface{}{ + "methodName": "test", + "username": "admin", + "password": "admin", + } + result, err := handler.Do(ctx, setting, args) + assert.NotNil(t, result) + assert.Nil(t, err) + assert.Equal(t, "methodName : test", result.Message) }) + + t.Run("grpc do not implemented", func(t *testing.T) { + handler, err := NewGRPCHandler(setting.GPRC) + assert.Nil(t, err) + args := map[string]interface{}{ + "methodName": "notImplemented", + "username": "admin", + } + result, err := handler.Do(ctx, setting, args) + assert.Nil(t, result) + assert.NotNil(t, err) + assert.Error(t, err, errors.New("not implemented")) + }) +} + +type MockGrpcServer struct { + *plugin.UnimplementedGrpcServer +} + +func (s *MockGrpcServer) Call(ctx context.Context, in *plugin.Request) (*plugin.Response, error) { + methodName := in.MethodName + parameters := in.Parameters + m, _ := util.ParseArgs(parameters) + for k, v := range m { + log.Printf("key: %s, value: %s", k, v) + } + switch methodName { + case "test": + return &plugin.Response{Message: "methodName : " + methodName}, nil + default: + return nil, errors.New("not implemented") + } } diff --git a/pkg/kb_agent/httpserver/apis_test.go b/pkg/kb_agent/httpserver/apis_test.go index 04ccb6f7cf9..e5372402686 100644 --- a/pkg/kb_agent/httpserver/apis_test.go +++ b/pkg/kb_agent/httpserver/apis_test.go @@ -71,16 +71,6 @@ func TestActionHandler(t *testing.T) { assert.JSONEq(t, `{"errorCode":"ERR_MALFORMED_REQUEST_DATA","message":"no action in request"}`, string(reqCtx.Response.Body())) }) - t.Run("action not implemented", func(t *testing.T) { - reqCtx := &fasthttp.RequestCtx{} - reqCtx.Request.Header.SetMethod(fasthttp.MethodPost) - reqCtx.Request.Header.SetContentType("application/json") - reqCtx.Request.SetBody([]byte(`{"action":"test"}`)) - actionHandler(reqCtx) - assert.Equal(t, fasthttp.StatusNotImplemented, reqCtx.Response.StatusCode()) - assert.JSONEq(t, `{"errorCode":"ERR_ACTION_FAILED","message":"action exec failed: not implemented"}`, string(reqCtx.Response.Body())) - }) - t.Run("action exec failed", func(t *testing.T) { msg := "action exec failed" reqCtx := &fasthttp.RequestCtx{} diff --git a/pkg/kb_agent/plugin/grpc.pb.go b/pkg/kb_agent/plugin/grpc.pb.go new file mode 100644 index 00000000000..beaaba9bd99 --- /dev/null +++ b/pkg/kb_agent/plugin/grpc.pb.go @@ -0,0 +1,328 @@ +/* +Copyright (C) 2022-2024 ApeCloud Co., Ltd + +This file is part of KubeBlocks project + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU Affero General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU Affero General Public License for more details. + +You should have received a copy of the GNU Affero General Public License +along with this program. If not, see . +*/ + +// Code generated by protoc-gen-go. DO NOT EDIT. +// versions: +// protoc-gen-go v1.28.1 +// protoc v4.25.2 +// source: grpc.proto + +package plugin + +import ( + reflect "reflect" + sync "sync" + + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" +) + +const ( + // Verify that this generated code is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) + // Verify that runtime/protoimpl is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) +) + +type Request struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + MethodName string `protobuf:"bytes,1,opt,name=method_name,json=methodName,proto3" json:"method_name,omitempty"` + Parameters []*Parameter `protobuf:"bytes,2,rep,name=parameters,proto3" json:"parameters,omitempty"` +} + +func (x *Request) Reset() { + *x = Request{} + if protoimpl.UnsafeEnabled { + mi := &file_grpc_proto_msgTypes[0] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *Request) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*Request) ProtoMessage() {} + +func (x *Request) ProtoReflect() protoreflect.Message { + mi := &file_grpc_proto_msgTypes[0] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use Request.ProtoReflect.Descriptor instead. +func (*Request) Descriptor() ([]byte, []int) { + return file_grpc_proto_rawDescGZIP(), []int{0} +} + +func (x *Request) GetMethodName() string { + if x != nil { + return x.MethodName + } + return "" +} + +func (x *Request) GetParameters() []*Parameter { + if x != nil { + return x.Parameters + } + return nil +} + +type Response struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Message string `protobuf:"bytes,1,opt,name=message,proto3" json:"message,omitempty"` + Parameters []*Parameter `protobuf:"bytes,2,rep,name=parameters,proto3" json:"parameters,omitempty"` +} + +func (x *Response) Reset() { + *x = Response{} + if protoimpl.UnsafeEnabled { + mi := &file_grpc_proto_msgTypes[1] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *Response) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*Response) ProtoMessage() {} + +func (x *Response) ProtoReflect() protoreflect.Message { + mi := &file_grpc_proto_msgTypes[1] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use Response.ProtoReflect.Descriptor instead. +func (*Response) Descriptor() ([]byte, []int) { + return file_grpc_proto_rawDescGZIP(), []int{1} +} + +func (x *Response) GetMessage() string { + if x != nil { + return x.Message + } + return "" +} + +func (x *Response) GetParameters() []*Parameter { + if x != nil { + return x.Parameters + } + return nil +} + +type Parameter struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Key string `protobuf:"bytes,1,opt,name=key,proto3" json:"key,omitempty"` + Value string `protobuf:"bytes,2,opt,name=value,proto3" json:"value,omitempty"` +} + +func (x *Parameter) Reset() { + *x = Parameter{} + if protoimpl.UnsafeEnabled { + mi := &file_grpc_proto_msgTypes[2] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *Parameter) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*Parameter) ProtoMessage() {} + +func (x *Parameter) ProtoReflect() protoreflect.Message { + mi := &file_grpc_proto_msgTypes[2] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use Parameter.ProtoReflect.Descriptor instead. +func (*Parameter) Descriptor() ([]byte, []int) { + return file_grpc_proto_rawDescGZIP(), []int{2} +} + +func (x *Parameter) GetKey() string { + if x != nil { + return x.Key + } + return "" +} + +func (x *Parameter) GetValue() string { + if x != nil { + return x.Value + } + return "" +} + +var File_grpc_proto protoreflect.FileDescriptor + +var file_grpc_proto_rawDesc = []byte{ + 0x0a, 0x0a, 0x67, 0x72, 0x70, 0x63, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x06, 0x70, 0x6c, + 0x75, 0x67, 0x69, 0x6e, 0x22, 0x5d, 0x0a, 0x07, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, + 0x1f, 0x0a, 0x0b, 0x6d, 0x65, 0x74, 0x68, 0x6f, 0x64, 0x5f, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x01, + 0x20, 0x01, 0x28, 0x09, 0x52, 0x0a, 0x6d, 0x65, 0x74, 0x68, 0x6f, 0x64, 0x4e, 0x61, 0x6d, 0x65, + 0x12, 0x31, 0x0a, 0x0a, 0x70, 0x61, 0x72, 0x61, 0x6d, 0x65, 0x74, 0x65, 0x72, 0x73, 0x18, 0x02, + 0x20, 0x03, 0x28, 0x0b, 0x32, 0x11, 0x2e, 0x70, 0x6c, 0x75, 0x67, 0x69, 0x6e, 0x2e, 0x50, 0x61, + 0x72, 0x61, 0x6d, 0x65, 0x74, 0x65, 0x72, 0x52, 0x0a, 0x70, 0x61, 0x72, 0x61, 0x6d, 0x65, 0x74, + 0x65, 0x72, 0x73, 0x22, 0x57, 0x0a, 0x08, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, + 0x18, 0x0a, 0x07, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, + 0x52, 0x07, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x12, 0x31, 0x0a, 0x0a, 0x70, 0x61, 0x72, + 0x61, 0x6d, 0x65, 0x74, 0x65, 0x72, 0x73, 0x18, 0x02, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x11, 0x2e, + 0x70, 0x6c, 0x75, 0x67, 0x69, 0x6e, 0x2e, 0x50, 0x61, 0x72, 0x61, 0x6d, 0x65, 0x74, 0x65, 0x72, + 0x52, 0x0a, 0x70, 0x61, 0x72, 0x61, 0x6d, 0x65, 0x74, 0x65, 0x72, 0x73, 0x22, 0x33, 0x0a, 0x09, + 0x50, 0x61, 0x72, 0x61, 0x6d, 0x65, 0x74, 0x65, 0x72, 0x12, 0x10, 0x0a, 0x03, 0x6b, 0x65, 0x79, + 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x6b, 0x65, 0x79, 0x12, 0x14, 0x0a, 0x05, 0x76, + 0x61, 0x6c, 0x75, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, + 0x65, 0x32, 0x33, 0x0a, 0x04, 0x47, 0x72, 0x70, 0x63, 0x12, 0x2b, 0x0a, 0x04, 0x43, 0x61, 0x6c, + 0x6c, 0x12, 0x0f, 0x2e, 0x70, 0x6c, 0x75, 0x67, 0x69, 0x6e, 0x2e, 0x52, 0x65, 0x71, 0x75, 0x65, + 0x73, 0x74, 0x1a, 0x10, 0x2e, 0x70, 0x6c, 0x75, 0x67, 0x69, 0x6e, 0x2e, 0x52, 0x65, 0x73, 0x70, + 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x42, 0x3d, 0x5a, 0x3b, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, + 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x61, 0x70, 0x65, 0x63, 0x6c, 0x6f, 0x75, 0x64, 0x2f, 0x6b, 0x75, + 0x62, 0x65, 0x62, 0x6c, 0x6f, 0x63, 0x6b, 0x73, 0x2f, 0x70, 0x6b, 0x67, 0x2f, 0x6b, 0x62, 0x2d, + 0x61, 0x67, 0x65, 0x6e, 0x74, 0x2f, 0x68, 0x61, 0x6e, 0x64, 0x6c, 0x65, 0x72, 0x73, 0x2f, 0x70, + 0x6c, 0x75, 0x67, 0x69, 0x6e, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, +} + +var ( + file_grpc_proto_rawDescOnce sync.Once + file_grpc_proto_rawDescData = file_grpc_proto_rawDesc +) + +func file_grpc_proto_rawDescGZIP() []byte { + file_grpc_proto_rawDescOnce.Do(func() { + file_grpc_proto_rawDescData = protoimpl.X.CompressGZIP(file_grpc_proto_rawDescData) + }) + return file_grpc_proto_rawDescData +} + +var file_grpc_proto_msgTypes = make([]protoimpl.MessageInfo, 3) +var file_grpc_proto_goTypes = []interface{}{ + (*Request)(nil), // 0: plugin.Request + (*Response)(nil), // 1: plugin.Response + (*Parameter)(nil), // 2: plugin.Parameter +} +var file_grpc_proto_depIdxs = []int32{ + 2, // 0: plugin.Request.parameters:type_name -> plugin.Parameter + 2, // 1: plugin.Response.parameters:type_name -> plugin.Parameter + 0, // 2: plugin.Grpc.Call:input_type -> plugin.Request + 1, // 3: plugin.Grpc.Call:output_type -> plugin.Response + 3, // [3:4] is the sub-list for method output_type + 2, // [2:3] is the sub-list for method input_type + 2, // [2:2] is the sub-list for extension type_name + 2, // [2:2] is the sub-list for extension extendee + 0, // [0:2] is the sub-list for field type_name +} + +func init() { file_grpc_proto_init() } +func file_grpc_proto_init() { + if File_grpc_proto != nil { + return + } + if !protoimpl.UnsafeEnabled { + file_grpc_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*Request); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_grpc_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*Response); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_grpc_proto_msgTypes[2].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*Parameter); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + } + type x struct{} + out := protoimpl.TypeBuilder{ + File: protoimpl.DescBuilder{ + GoPackagePath: reflect.TypeOf(x{}).PkgPath(), + RawDescriptor: file_grpc_proto_rawDesc, + NumEnums: 0, + NumMessages: 3, + NumExtensions: 0, + NumServices: 1, + }, + GoTypes: file_grpc_proto_goTypes, + DependencyIndexes: file_grpc_proto_depIdxs, + MessageInfos: file_grpc_proto_msgTypes, + }.Build() + File_grpc_proto = out.File + file_grpc_proto_rawDesc = nil + file_grpc_proto_goTypes = nil + file_grpc_proto_depIdxs = nil +} diff --git a/pkg/kb_agent/plugin/grpc_client.go b/pkg/kb_agent/plugin/grpc_client.go new file mode 100644 index 00000000000..64e026955b2 --- /dev/null +++ b/pkg/kb_agent/plugin/grpc_client.go @@ -0,0 +1,38 @@ +/* +Copyright (C) 2022-2024 ApeCloud Co., Ltd + +This file is part of KubeBlocks project + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU Affero General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU Affero General Public License for more details. + +You should have received a copy of the GNU Affero General Public License +along with this program. If not, see . +*/ + +package plugin + +import ( + "fmt" + + "github.com/pkg/errors" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" +) + +func NewGRPCClient(host, port string) (GrpcClient, error) { + addr := fmt.Sprintf("%s:%s", host, port) + conn, err := grpc.Dial(addr, grpc.WithTransportCredentials(insecure.NewCredentials())) + if err != nil { + return nil, errors.Wrap(err, "grpc: failed to dial") + } + client := NewGrpcClient(conn) + return client, nil +} diff --git a/pkg/kb_agent/plugin/grpc_grpc.pb.go b/pkg/kb_agent/plugin/grpc_grpc.pb.go new file mode 100644 index 00000000000..4cd38d46712 --- /dev/null +++ b/pkg/kb_agent/plugin/grpc_grpc.pb.go @@ -0,0 +1,125 @@ +/* +Copyright (C) 2022-2024 ApeCloud Co., Ltd + +This file is part of KubeBlocks project + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU Affero General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU Affero General Public License for more details. + +You should have received a copy of the GNU Affero General Public License +along with this program. If not, see . +*/ + +// Code generated by protoc-gen-go-grpc. DO NOT EDIT. +// versions: +// - protoc-gen-go-grpc v1.2.0 +// - protoc v4.25.2 +// source: grpc.proto + +package plugin + +import ( + context "context" + + grpc "google.golang.org/grpc" + codes "google.golang.org/grpc/codes" + status "google.golang.org/grpc/status" +) + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the grpc package it is being compiled against. +// Requires gRPC-Go v1.32.0 or later. +const _ = grpc.SupportPackageIsVersion7 + +// GrpcClient is the client API for Grpc service. +// +// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream. +type GrpcClient interface { + Call(ctx context.Context, in *Request, opts ...grpc.CallOption) (*Response, error) +} + +type grpcClient struct { + cc grpc.ClientConnInterface +} + +func NewGrpcClient(cc grpc.ClientConnInterface) GrpcClient { + return &grpcClient{cc} +} + +func (c *grpcClient) Call(ctx context.Context, in *Request, opts ...grpc.CallOption) (*Response, error) { + out := new(Response) + err := c.cc.Invoke(ctx, "/plugin.Grpc/Call", in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +// GrpcServer is the server API for Grpc service. +// All implementations must embed UnimplementedGrpcServer +// for forward compatibility +type GrpcServer interface { + Call(context.Context, *Request) (*Response, error) + mustEmbedUnimplementedGrpcServer() +} + +// UnimplementedGrpcServer must be embedded to have forward compatible implementations. +type UnimplementedGrpcServer struct { +} + +func (UnimplementedGrpcServer) Call(context.Context, *Request) (*Response, error) { + return nil, status.Errorf(codes.Unimplemented, "method Call not implemented") +} +func (UnimplementedGrpcServer) mustEmbedUnimplementedGrpcServer() {} + +// UnsafeGrpcServer may be embedded to opt out of forward compatibility for this service. +// Use of this interface is not recommended, as added methods to GrpcServer will +// result in compilation errors. +type UnsafeGrpcServer interface { + mustEmbedUnimplementedGrpcServer() +} + +func RegisterGrpcServer(s grpc.ServiceRegistrar, srv GrpcServer) { + s.RegisterService(&Grpc_ServiceDesc, srv) +} + +func _Grpc_Call_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(Request) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(GrpcServer).Call(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/plugin.Grpc/Call", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(GrpcServer).Call(ctx, req.(*Request)) + } + return interceptor(ctx, in, info, handler) +} + +// Grpc_ServiceDesc is the grpc.ServiceDesc for Grpc service. +// It's only intended for direct use with grpc.RegisterService, +// and not to be introspected or modified (even as a copy) +var Grpc_ServiceDesc = grpc.ServiceDesc{ + ServiceName: "plugin.Grpc", + HandlerType: (*GrpcServer)(nil), + Methods: []grpc.MethodDesc{ + { + MethodName: "Call", + Handler: _Grpc_Call_Handler, + }, + }, + Streams: []grpc.StreamDesc{}, + Metadata: "grpc.proto", +} diff --git a/pkg/kb_agent/plugin/proto/Makefile b/pkg/kb_agent/plugin/proto/Makefile new file mode 100644 index 00000000000..8a0a630aea9 --- /dev/null +++ b/pkg/kb_agent/plugin/proto/Makefile @@ -0,0 +1,97 @@ + +all: build + +######################################################################## +## GOLANG ## +######################################################################## + +# If GOPATH isn't defined then set its default location. +ifeq (,$(strip $(GOPATH))) +GOPATH := $(HOME)/go +else +# If GOPATH is already set then update GOPATH to be its own +# first element. +GOPATH := $(word 1,$(subst :, ,$(GOPATH))) +endif +export GOPATH + +GOBIN := $(shell go env GOBIN) +ifeq (,$(strip $(GOBIN))) +GOBIN := $(GOPATH)/bin +endif + + +######################################################################## +## PROTOC ## +######################################################################## + +# Only set PROTOC_VER if it has an empty value. +ifeq (,$(strip $(PROTOC_VER))) +PROTOC_VER := 25.2 +endif + +PROTOC_OS := $(shell uname -s) +ifeq (Darwin,$(PROTOC_OS)) +PROTOC_OS := osx +endif + +PROTOC_ARCH := $(shell uname -m) +ifeq (i386,$(PROTOC_ARCH)) +PROTOC_ARCH := x86_32 +else ifeq (arm64,$(PROTOC_ARCH)) +PROTOC_ARCH := aarch_64 +endif + +PROTOC_ZIP := protoc-$(PROTOC_VER)-$(PROTOC_OS)-$(PROTOC_ARCH).zip +PROTOC_URL := https://github.com/protocolbuffers/protobuf/releases/download/v$(PROTOC_VER)/$(PROTOC_ZIP) +PROTOC_TMP_DIR := .protoc +PROTOC := $(PROTOC_TMP_DIR)/bin/protoc + +$(GOBIN)/protoc-gen-go: ../../../../go.mod + go install google.golang.org/protobuf/cmd/protoc-gen-go +$(GOBIN)/protoc-gen-go-grpc: + go install google.golang.org/grpc/cmd/protoc-gen-go-grpc@v1.3.0 + +$(PROTOC): + -mkdir -p "$(PROTOC_TMP_DIR)" && \ + curl -L $(PROTOC_URL) -o "$(PROTOC_TMP_DIR)/$(PROTOC_ZIP)" && \ + unzip "$(PROTOC_TMP_DIR)/$(PROTOC_ZIP)" -d "$(PROTOC_TMP_DIR)" && \ + chmod 0755 "$@" + stat "$@" > /dev/null 2>&1 + +PROTOC_ALL := $(GOBIN)/protoc-gen-go $(GOBIN)/protoc-gen-go-grpc $(PROTOC) + +######################################################################## +## PATH ## +######################################################################## + +# Update PATH with GOBIN. This enables the protoc binary to discover +# the protoc-gen-go binary +export PATH := $(GOBIN):$(PATH) + + +######################################################################## +## BUILD ## +######################################################################## +DB_PLUGIN_PROTO := ./grpc.proto +DB_PLUGIN_PKG_SUB := ../../plugin +DB_PLUGIN_GO := $(DB_PLUGIN_PKG_SUB)/grpc.pb.go +DB_PLUGIN_GRPC := $(DB_PLUGIN_PKG_SUB)/grpc_grpc.pb.go + +# This recipe generates the go language bindings +$(DB_PLUGIN_GO) $(DB_PLUGIN_GRPC): $(DB_PLUGIN_PROTO) $(PROTOC_ALL) + @mkdir -p "$(@D)" + $(PROTOC) -I./ --go-grpc_out=$(DB_PLUGIN_PKG_SUB) --go_out=$(DB_PLUGIN_PKG_SUB) \ + --go_opt=paths=source_relative --go-grpc_opt=paths=source_relative \ + "$(. +*/ +syntax = "proto3"; + + +option go_package = "github.com/apecloud/kubeblocks/pkg/kb-agent/handlers/plugin"; + +package plugin; + +service Grpc { + rpc Call (Request) returns (Response) {} +} + +message Request { + string method_name = 1; + repeated Parameter parameters = 2; +} + +message Response { + string message = 1; + repeated Parameter parameters = 2; +} + +message Parameter { + string key = 1; + string value = 2; +} diff --git a/pkg/kb_agent/util/grpc_permeters.go b/pkg/kb_agent/util/grpc_permeters.go new file mode 100644 index 00000000000..93faad4cd6d --- /dev/null +++ b/pkg/kb_agent/util/grpc_permeters.go @@ -0,0 +1,60 @@ +/* +Copyright (C) 2022-2024 ApeCloud Co., Ltd + +This file is part of KubeBlocks project + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU Affero General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU Affero General Public License for more details. + +You should have received a copy of the GNU Affero General Public License +along with this program. If not, see . +*/ + +package util + +import ( + "fmt" + + "github.com/apecloud/kubeblocks/pkg/kb_agent/plugin" +) + +// WrapperArgs Used to convert map[string]interface{} to parameter +func WrapperArgs(args map[string]interface{}) ([]*plugin.Parameter, error) { + var parameters []*plugin.Parameter + for key, value := range args { + param, err := CreateParameter(key, value) + if err != nil { + return nil, err + } + parameters = append(parameters, param) + } + return parameters, nil +} + +// ParseArgs Used to parse args to map[string]string +func ParseArgs(parameters []*plugin.Parameter) (map[string]string, error) { + m := map[string]string{} + for _, param := range parameters { + m[param.GetKey()] = param.GetValue() + } + return m, nil +} + +// CreateParameter Used to create parameter +func CreateParameter(key string, value interface{}) (*plugin.Parameter, error) { + param := &plugin.Parameter{Key: key} + switch v := value.(type) { + case string: + param.Value = v + default: + return nil, fmt.Errorf("unsupported type: %T", v) + } + return param, nil +} diff --git a/pkg/kb_agent/util/grpc_permeters_test.go b/pkg/kb_agent/util/grpc_permeters_test.go new file mode 100644 index 00000000000..462a0dec130 --- /dev/null +++ b/pkg/kb_agent/util/grpc_permeters_test.go @@ -0,0 +1,84 @@ +/* +Copyright (C) 2022-2024 ApeCloud Co., Ltd + +This file is part of KubeBlocks project + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU Affero General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU Affero General Public License for more details. + +You should have received a copy of the GNU Affero General Public License +along with this program. If not, see . +*/ + +package util + +import ( + "testing" + + "github.com/apecloud/kubeblocks/pkg/kb_agent/plugin" + "github.com/stretchr/testify/assert" +) + +func TestWrapperArgs(t *testing.T) { + t.Run("supported type", func(t *testing.T) { + args := map[string]interface{}{ + "key1": "value1", + } + parameters, err := WrapperArgs(args) + assert.Nil(t, err) + assert.NotNil(t, parameters) + assert.Equal(t, "key1", parameters[0].GetKey()) + assert.Equal(t, "value1", parameters[0].GetValue()) + }) + t.Run("unsupported type", func(t *testing.T) { + args := map[string]interface{}{ + "key1": 1, + } + wrapperArgs, err := WrapperArgs(args) + assert.NotNil(t, err) + assert.Nil(t, wrapperArgs) + }) +} + +func TestCreateParameter(t *testing.T) { + t.Run("supported type", func(t *testing.T) { + param, err := CreateParameter("key", "value") + assert.Nil(t, err) + assert.NotNil(t, param) + assert.Equal(t, "key", param.GetKey()) + assert.Equal(t, "value", param.GetValue()) + }) + t.Run("unsupported type", func(t *testing.T) { + param, err := CreateParameter("key", 1) + assert.NotNil(t, err) + assert.Nil(t, param) + }) +} + +func TestParseArgs(t *testing.T) { + t.Run("parse map", func(t *testing.T) { + parameters := []*plugin.Parameter{ + { + Key: "key1", + Value: "value1", + }, + { + Key: "key2", + Value: "value2", + }, + } + args, err := ParseArgs(parameters) + assert.Nil(t, err) + assert.NotNil(t, args) + assert.Equal(t, 2, len(args)) + assert.Equal(t, "value1", args["key1"]) + assert.Equal(t, "value2", args["key2"]) + }) +}