From 56fb0eb8e49c66e6d4860a261d3ebb998fee0ce6 Mon Sep 17 00:00:00 2001 From: Kevin Park Date: Tue, 12 Dec 2023 12:29:28 +0900 Subject: [PATCH] Update overall codes --- admin/auth.go | 13 ++++++--- client/auth.go | 7 ++--- client/client.go | 12 ++------- go.mod | 1 - server/rpc/connecthelper/logging.go | 8 +++--- server/rpc/connecthelper/status.go | 6 ++--- .../interceptors/{admin.go => admin_auth.go} | 27 +++++++++++++++---- server/rpc/interceptors/context.go | 14 +++++----- server/rpc/interceptors/default.go | 6 ++--- server/rpc/server.go | 2 +- server/rpc/server_test.go | 19 ++++--------- test/integration/tree_test.go | 4 +-- 12 files changed, 62 insertions(+), 57 deletions(-) rename server/rpc/interceptors/{admin.go => admin_auth.go} (84%) diff --git a/admin/auth.go b/admin/auth.go index 9d4de4407..9465588a6 100644 --- a/admin/auth.go +++ b/admin/auth.go @@ -42,7 +42,7 @@ func (i *AuthInterceptor) SetToken(token string) { i.token = token } -// WrapUnary creates a unary server interceptor for building additional context. +// WrapUnary creates a unary server interceptor for authorization. func (i *AuthInterceptor) WrapUnary(next connect.UnaryFunc) connect.UnaryFunc { return func( ctx context.Context, @@ -55,17 +55,22 @@ func (i *AuthInterceptor) WrapUnary(next connect.UnaryFunc) connect.UnaryFunc { } } -// WrapStreamingClient creates a stream client interceptor for building additional context. +// WrapStreamingClient creates a stream client interceptor for authorization. func (i *AuthInterceptor) WrapStreamingClient(next connect.StreamingClientFunc) connect.StreamingClientFunc { return func( ctx context.Context, spec connect.Spec, ) connect.StreamingClientConn { - return next(ctx, spec) + conn := next(ctx, spec) + + conn.RequestHeader().Add(types.AuthorizationKey, i.token) + conn.RequestHeader().Add(types.UserAgentKey, types.GoSDKType+"/"+version.Version) + + return conn } } -// WrapStreamingHandler creates a stream server interceptor for building additional context. +// WrapStreamingHandler creates a stream server interceptor for authorization. func (i *AuthInterceptor) WrapStreamingHandler(next connect.StreamingHandlerFunc) connect.StreamingHandlerFunc { return func( ctx context.Context, diff --git a/client/auth.go b/client/auth.go index 843ca077a..affe39b60 100644 --- a/client/auth.go +++ b/client/auth.go @@ -39,7 +39,7 @@ func NewAuthInterceptor(apiKey, token string) *AuthInterceptor { } } -// WrapUnary creates a unary server interceptor for building additional context. +// WrapUnary creates a unary server interceptor for authorization. func (i *AuthInterceptor) WrapUnary(next connect.UnaryFunc) connect.UnaryFunc { return func( ctx context.Context, @@ -53,20 +53,21 @@ func (i *AuthInterceptor) WrapUnary(next connect.UnaryFunc) connect.UnaryFunc { } } -// WrapStreamingClient creates a stream client interceptor for building additional context. +// WrapStreamingClient creates a stream client interceptor for authorization. func (i *AuthInterceptor) WrapStreamingClient(next connect.StreamingClientFunc) connect.StreamingClientFunc { return func( ctx context.Context, spec connect.Spec, ) connect.StreamingClientConn { conn := next(ctx, spec) + conn.RequestHeader().Set(types.APIKeyKey, i.apiKey) return conn } } -// WrapStreamingHandler creates a stream server interceptor for building additional context. +// WrapStreamingHandler creates a stream server interceptor for authorization. func (i *AuthInterceptor) WrapStreamingHandler(next connect.StreamingHandlerFunc) connect.StreamingHandlerFunc { return func( ctx context.Context, diff --git a/client/client.go b/client/client.go index bedfe4c1c..81906d9a7 100644 --- a/client/client.go +++ b/client/client.go @@ -426,26 +426,18 @@ func (c *Client) Watch( for stream.Receive() { pbResp := stream.Msg() - if err != nil { - return nil, err - } if _, err := handleResponse(pbResp, doc); err != nil { return nil, err } break } if err = stream.Err(); err != nil { - return nil, connect.NewError(connect.CodeUnavailable, err) + return nil, err } go func() { for stream.Receive() { pbResp := stream.Msg() - if err != nil { - rch <- WatchResponse{Err: err} - close(rch) - return - } resp, err := handleResponse(pbResp, doc) if err != nil { rch <- WatchResponse{Err: err} @@ -732,7 +724,7 @@ func newTLSConfigFromFile(certFile, serverNameOverride string) (*tls.Config, err return nil, fmt.Errorf("credentials: failed to append certificates") } - return &tls.Config{ServerName: serverNameOverride, RootCAs: cp, MinVersion: tls.VersionTLS13}, nil + return &tls.Config{ServerName: serverNameOverride, RootCAs: cp, MinVersion: tls.VersionTLS12}, nil } /** diff --git a/go.mod b/go.mod index a33b1db5e..1b71cea9b 100644 --- a/go.mod +++ b/go.mod @@ -9,7 +9,6 @@ require ( github.com/go-playground/universal-translator v0.18.0 github.com/go-playground/validator/v10 v10.11.1 github.com/golang-jwt/jwt v3.2.2+incompatible - github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 github.com/hashicorp/go-memdb v1.3.3 github.com/jedib0t/go-pretty/v6 v6.4.9 github.com/prometheus/client_golang v1.13.0 diff --git a/server/rpc/connecthelper/logging.go b/server/rpc/connecthelper/logging.go index a66a4d538..01359b480 100644 --- a/server/rpc/connecthelper/logging.go +++ b/server/rpc/connecthelper/logging.go @@ -14,7 +14,7 @@ * limitations under the License. */ -// Package connecthelper provides helper functions for gRPC. +// Package connecthelper provides helper functions for connectRPC. package connecthelper import ( @@ -44,7 +44,7 @@ func NewLoggingInterceptor() *LoggingInterceptor { return &LoggingInterceptor{} } -// WrapUnary creates a unary server interceptor for building additional context. +// WrapUnary creates a unary server interceptor for request logging. func (i *LoggingInterceptor) WrapUnary(next connect.UnaryFunc) connect.UnaryFunc { return func( ctx context.Context, @@ -55,7 +55,7 @@ func (i *LoggingInterceptor) WrapUnary(next connect.UnaryFunc) connect.UnaryFunc } } -// WrapStreamingClient creates a stream client interceptor for building additional context. +// WrapStreamingClient creates a stream client interceptor for request logging. func (i *LoggingInterceptor) WrapStreamingClient(next connect.StreamingClientFunc) connect.StreamingClientFunc { return func( ctx context.Context, @@ -65,7 +65,7 @@ func (i *LoggingInterceptor) WrapStreamingClient(next connect.StreamingClientFun } } -// WrapStreamingHandler creates a stream server interceptor for building additional context. +// WrapStreamingHandler creates a stream server interceptor for request logging. func (i *LoggingInterceptor) WrapStreamingHandler(next connect.StreamingHandlerFunc) connect.StreamingHandlerFunc { return func( ctx context.Context, diff --git a/server/rpc/connecthelper/status.go b/server/rpc/connecthelper/status.go index ec176a5e0..1cc22b89b 100644 --- a/server/rpc/connecthelper/status.go +++ b/server/rpc/connecthelper/status.go @@ -36,7 +36,7 @@ import ( "github.com/yorkie-team/yorkie/server/rpc/auth" ) -// errorToCode maps an error to gRPC status code. +// errorToCode maps an error to connectRPC status code. var errorToCode = map[error]connect.Code{ // InvalidArgument means the request is malformed. converter.ErrPackRequired: connect.CodeInvalidArgument, @@ -105,8 +105,8 @@ func detailsFromError(err error) (*errdetails.BadRequest, bool) { return br, true } -// ToStatusError returns a status.Error from the given logic error. If an error -// occurs while executing logic in API handler, gRPC status.error should be +// ToStatusError returns a connect.Error from the given logic error. If an error +// occurs while executing logic in API handler, connectRPC connect.error should be // returned so that the client can know more about the status of the request. func ToStatusError(err error) error { cause := err diff --git a/server/rpc/interceptors/admin.go b/server/rpc/interceptors/admin_auth.go similarity index 84% rename from server/rpc/interceptors/admin.go rename to server/rpc/interceptors/admin_auth.go index 39210a926..4940b19ac 100644 --- a/server/rpc/interceptors/admin.go +++ b/server/rpc/interceptors/admin_auth.go @@ -67,6 +67,15 @@ func (i *AdminAuthInterceptor) WrapUnary(next connect.UnaryFunc) connect.UnaryFu res, err := next(ctx, req) + // TODO(hackerwins, emplam27): Consider splitting between admin and sdk metrics. + sdkType, sdkVersion := connecthelper.SDKTypeAndVersion(req.Header()) + i.backend.Metrics.AddUserAgentWithEmptyProject( + i.backend.Config.Hostname, + sdkType, + sdkVersion, + req.Spec().Procedure, + ) + i.backend.Metrics.AddServerHandledCounter( "unary", strings.Split(req.Spec().Procedure, "/")[1], @@ -78,18 +87,17 @@ func (i *AdminAuthInterceptor) WrapUnary(next connect.UnaryFunc) connect.UnaryFu } } -// WrapStreamingClient creates a stream client interceptor for building additional context. +// WrapStreamingClient creates a stream client interceptor for authentication. func (i *AdminAuthInterceptor) WrapStreamingClient(next connect.StreamingClientFunc) connect.StreamingClientFunc { return func( ctx context.Context, spec connect.Spec, ) connect.StreamingClientConn { - conn := next(ctx, spec) - return conn + return next(ctx, spec) } } -// WrapStreamingHandler creates a stream server interceptor for building additional context. +// WrapStreamingHandler creates a stream server interceptor for authentication. func (i *AdminAuthInterceptor) WrapStreamingHandler(next connect.StreamingHandlerFunc) connect.StreamingHandlerFunc { return func( ctx context.Context, @@ -109,6 +117,15 @@ func (i *AdminAuthInterceptor) WrapStreamingHandler(next connect.StreamingHandle err := next(ctx, conn) + // TODO(hackerwins, emplam27): Consider splitting between admin and sdk metrics. + sdkType, sdkVersion := connecthelper.SDKTypeAndVersion(conn.RequestHeader()) + i.backend.Metrics.AddUserAgentWithEmptyProject( + i.backend.Config.Hostname, + sdkType, + sdkVersion, + conn.Spec().Procedure, + ) + i.backend.Metrics.AddServerHandledCounter( "server_stream", strings.Split(conn.Spec().Procedure, "/")[1], @@ -135,7 +152,7 @@ func (i *AdminAuthInterceptor) authenticate( header http.Header, ) (*types.User, error) { authorization := header.Get(types.AuthorizationKey) - if len(authorization) == 0 { + if authorization == "" { return nil, grpcstatus.Errorf(codes.Unauthenticated, "authorization is not provided") } diff --git a/server/rpc/interceptors/context.go b/server/rpc/interceptors/context.go index cda06cb5b..753112570 100644 --- a/server/rpc/interceptors/context.go +++ b/server/rpc/interceptors/context.go @@ -68,6 +68,8 @@ func (i *ContextInterceptor) WrapUnary(next connect.UnaryFunc) connect.UnaryFunc return nil, err } + res, err := next(ctx, req) + sdkType, sdkVersion := connecthelper.SDKTypeAndVersion(req.Header()) i.backend.Metrics.AddUserAgent( i.backend.Config.Hostname, @@ -77,8 +79,6 @@ func (i *ContextInterceptor) WrapUnary(next connect.UnaryFunc) connect.UnaryFunc req.Spec().Procedure, ) - res, err := next(ctx, req) - i.backend.Metrics.AddServerHandledCounter( "unary", strings.Split(req.Spec().Procedure, "/")[1], @@ -115,6 +115,8 @@ func (i *ContextInterceptor) WrapStreamingHandler(next connect.StreamingHandlerF return err } + err = next(ctx, conn) + sdkType, sdkVersion := connecthelper.SDKTypeAndVersion(conn.RequestHeader()) i.backend.Metrics.AddUserAgent( i.backend.Config.Hostname, @@ -124,8 +126,6 @@ func (i *ContextInterceptor) WrapStreamingHandler(next connect.StreamingHandlerF conn.Spec().Procedure, ) - err = next(ctx, conn) - i.backend.Metrics.AddServerHandledCounter( "server_stream", strings.Split(conn.Spec().Procedure, "/")[1], @@ -148,15 +148,15 @@ func (i *ContextInterceptor) buildContext(ctx context.Context, header http.Heade md := metadata.Metadata{} apiKey := header.Get(types.APIKeyKey) - if len(apiKey) == 0 && !i.backend.Config.UseDefaultProject { + if apiKey == "" && !i.backend.Config.UseDefaultProject { return nil, connect.NewError(connect.CodeUnauthenticated, errors.New("api key is not provided")) } - if len(apiKey) > 0 { + if apiKey != "" { md.APIKey = apiKey } authorization := header.Get(types.AuthorizationKey) - if len(authorization) > 0 { + if authorization != "" { md.Authorization = authorization } ctx = metadata.With(ctx, md) diff --git a/server/rpc/interceptors/default.go b/server/rpc/interceptors/default.go index 2d521e2cb..acdfe8c4a 100644 --- a/server/rpc/interceptors/default.go +++ b/server/rpc/interceptors/default.go @@ -41,7 +41,7 @@ const ( SlowThreshold = 100 * gotime.Millisecond ) -// WrapUnary creates a unary server interceptor for building additional context. +// WrapUnary creates a unary server interceptor for default. func (i *DefaultInterceptor) WrapUnary(next connect.UnaryFunc) connect.UnaryFunc { return func( ctx context.Context, @@ -62,7 +62,7 @@ func (i *DefaultInterceptor) WrapUnary(next connect.UnaryFunc) connect.UnaryFunc } } -// WrapStreamingClient creates a stream client interceptor for building additional context. +// WrapStreamingClient creates a stream client interceptor for default. func (i *DefaultInterceptor) WrapStreamingClient(next connect.StreamingClientFunc) connect.StreamingClientFunc { return func( ctx context.Context, @@ -72,7 +72,7 @@ func (i *DefaultInterceptor) WrapStreamingClient(next connect.StreamingClientFun } } -// WrapStreamingHandler creates a stream server interceptor for building additional context. +// WrapStreamingHandler creates a stream server interceptor for default. func (i *DefaultInterceptor) WrapStreamingHandler(next connect.StreamingHandlerFunc) connect.StreamingHandlerFunc { return func( ctx context.Context, diff --git a/server/rpc/server.go b/server/rpc/server.go index 098be5c03..e18556032 100644 --- a/server/rpc/server.go +++ b/server/rpc/server.go @@ -113,7 +113,7 @@ func (s *Server) Shutdown(graceful bool) { func (s *Server) listenAndServe() error { go func() { - logging.DefaultLogger().Infof(fmt.Sprintf("serving rpc on %d", s.conf.Port)) + logging.DefaultLogger().Infof(fmt.Sprintf("serving RPC on %d", s.conf.Port)) s.httpServer.Handler = h2c.NewHandler( newCORS().Handler(s.serverMux), &http2.Server{}, diff --git a/server/rpc/server_test.go b/server/rpc/server_test.go index d5cb04eeb..cfb3286b6 100644 --- a/server/rpc/server_test.go +++ b/server/rpc/server_test.go @@ -77,7 +77,6 @@ func TestMain(m *testing.M) { ProjectInfoCacheSize: helper.ProjectInfoCacheSize, ProjectInfoCacheTTL: helper.ProjectInfoCacheTTL.String(), AdminTokenDuration: helper.AdminTokenDuration, - UseDefaultProject: true, }, &mongo.Config{ ConnectionURI: helper.MongoConnectionURI, YorkieDatabase: helper.TestDBName(), @@ -101,10 +100,10 @@ func TestMain(m *testing.M) { } testRPCServer, err = rpc.NewServer(&rpc.Config{ - Port: helper.RPCPort, - //MaxRequestBytes: helper.RPCMaxRequestBytes, - //MaxConnectionAge: helper.RPCMaxConnectionAge.String(), - //MaxConnectionAgeGrace: helper.RPCMaxConnectionAgeGrace.String(), + Port: helper.RPCPort, + MaxRequestBytes: helper.RPCMaxRequestBytes, + MaxConnectionAge: helper.RPCMaxConnectionAge.String(), + MaxConnectionAgeGrace: helper.RPCMaxConnectionAgeGrace.String(), }, be) if err != nil { log.Fatal(err) @@ -114,11 +113,7 @@ func TestMain(m *testing.M) { log.Fatalf("failed rpc listen: %s\n", err) } - //var dialOptions []grpc.DialOption authInterceptor := client.NewAuthInterceptor(project.PublicKey, "") - //dialOptions = append(dialOptions, grpc.WithUnaryInterceptor(authInterceptor.Unary())) - //dialOptions = append(dialOptions, grpc.WithStreamInterceptor(authInterceptor.Stream())) - //dialOptions = append(dialOptions, grpc.WithTransportCredentials(insecure.NewCredentials())) conn := http.DefaultClient testClient = v1connect.NewYorkieServiceClient( @@ -127,12 +122,7 @@ func TestMain(m *testing.M) { connect.WithInterceptors(authInterceptor), ) - //credentials := grpc.WithTransportCredentials(insecure.NewCredentials()) - //dialOptions = []grpc.DialOption{credentials} - testAdminAuthInterceptor = admin.NewAuthInterceptor("") - //dialOptions = append(dialOptions, grpc.WithUnaryInterceptor(testAdminAuthInterceptor.Unary())) - //dialOptions = append(dialOptions, grpc.WithStreamInterceptor(testAdminAuthInterceptor.Stream())) adminConn := http.DefaultClient testAdminClient = v1connect.NewAdminServiceClient( @@ -675,6 +665,7 @@ func TestSDKRPCServerBackend(t *testing.T) { break } + // TODO(krapie): find a way to set timeout for stream //// wait for MaxConnectionAge + MaxConnectionAgeGrace //time.Sleep(helper.RPCMaxConnectionAge + helper.RPCMaxConnectionAgeGrace) // diff --git a/test/integration/tree_test.go b/test/integration/tree_test.go index ea10767a5..9e4629cfe 100644 --- a/test/integration/tree_test.go +++ b/test/integration/tree_test.go @@ -425,7 +425,7 @@ func TestTree(t *testing.T) { root.GetTree("t").EditBulk(3, 3, []*json.TreeNode{{ Type: "text", Value: "c", - }, { + }, &json.TreeNode{ Type: "text", Value: "d", }}, 0) @@ -451,7 +451,7 @@ func TestTree(t *testing.T) { root.GetTree("t").EditBulk(4, 4, []*json.TreeNode{{ Type: "p", Children: []json.TreeNode{{Type: "text", Value: "cd"}}, - }, { + }, &json.TreeNode{ Type: "i", Children: []json.TreeNode{{Type: "text", Value: "fg"}}, }}, 0)