From 1c186dd8b9aa35ba64b7d60e80814936f964566d Mon Sep 17 00:00:00 2001 From: Cabinfever_B Date: Mon, 20 Dec 2021 14:52:14 +0800 Subject: [PATCH] Fix #4373 : add comment Signed-off-by: Cabinfever_B --- pkg/apiutil/apiutil.go | 2 + server/api/server.go | 2 +- server/config/config.go | 7 ++- server/middleware/self_protection.go | 79 ++++++++++++++++++++++------ server/server.go | 27 +++------- 5 files changed, 77 insertions(+), 40 deletions(-) diff --git a/pkg/apiutil/apiutil.go b/pkg/apiutil/apiutil.go index fe14d5688d7..2c2354cf260 100644 --- a/pkg/apiutil/apiutil.go +++ b/pkg/apiutil/apiutil.go @@ -143,6 +143,7 @@ func GetIPAddrFromGRPCContext(ctx context.Context) string { return ip } +// GetPeerAddrFromGRPCContext return gRPC client real IP if gateway or proxy put real IP In func GetRealIPAddrFromGRPCContext(ctx context.Context) (string, bool) { md, ok := metadata.FromIncomingContext(ctx) if !ok { @@ -155,6 +156,7 @@ func GetRealIPAddrFromGRPCContext(ctx context.Context) (string, bool) { return realIPs[0], true } +// GetPeerAddrFromGRPCContext return gRPC client IP which may be a proxy IP from peer info func GetPeerAddrFromGRPCContext(ctx context.Context) string { var addr string if pr, ok := peer.FromContext(ctx); ok { diff --git a/server/api/server.go b/server/api/server.go index 0e32dcd0fd6..df2ed9ddf56 100644 --- a/server/api/server.go +++ b/server/api/server.go @@ -37,7 +37,7 @@ func NewHandler(ctx context.Context, svr *server.Server) (http.Handler, server.S router.PathPrefix(apiPrefix).Handler(negroni.New( serverapi.NewRuntimeServiceValidator(svr, group), serverapi.NewRedirector(svr), - svr.SelfProtectionHandler, + serverapi.NewSelfProtector(svr), negroni.Wrap(r)), ) diff --git a/server/config/config.go b/server/config/config.go index 3a1b0e74b39..c02e94cd304 100644 --- a/server/config/config.go +++ b/server/config/config.go @@ -164,7 +164,7 @@ type Config struct { ReplicationMode ReplicationModeConfig `toml:"replication-mode" json:"replication-mode"` - SelfProtectionConfig SelfProtectionConfig `toml:"SelfProtectionConfig"` + SelfProtectionConfig SelfProtectionConfig `toml:"self-protection-config"` } // NewConfig creates a new config. @@ -267,10 +267,9 @@ var ( DefaultStoreLimit = StoreLimit{AddPeer: 15, RemovePeer: 15} // DefaultTiFlashStoreLimit is the default TiFlash store limit of add peer and remove peer. DefaultTiFlashStoreLimit = StoreLimit{AddPeer: 30, RemovePeer: 30} - + // ServiceSelfProtectionConfig is used for self protection mechanism DefaultServiceSelfProtectionConfig = []ServiceSelfprotectionConfig{} - - HTTPAPIServiceNames = map[string]string{} + // GRPCMethodServiceNames is used to get logic service name of the gRPC service GRPCMethodServiceNames = map[string]string{} ) diff --git a/server/middleware/self_protection.go b/server/middleware/self_protection.go index bbc04d3b6c2..671f3a7a864 100644 --- a/server/middleware/self_protection.go +++ b/server/middleware/self_protection.go @@ -45,7 +45,8 @@ var ( componentAnonymousValue = "anonymous" ) -// SelfProtectionHandler a +// SelfProtectionHandler is a framework to handle self protection mechanism +// Self-protection granularity is a logical service type SelfProtectionHandler struct { // grpcServiceNames is used to find the service name of grpc method GrpcServiceNames map[string]string @@ -53,6 +54,24 @@ type SelfProtectionHandler struct { ServiceHandlers map[string]*ServiceSelfProtectionHandler } +// MergeSelfProtectionConfig is used for when both the user configuration and the default configuration exist +func MergeSelfProtectionConfig(handlers map[string]*ServiceSelfProtectionHandler, highPriorityConfigs []config.ServiceSelfprotectionConfig, lowPriorityConfigs []config.ServiceSelfprotectionConfig) { + for i := range highPriorityConfigs { + serviceName := highPriorityConfigs[i].ServiceName + serviceSelfProtectionHandler := NewServiceSelfProtectionHandler(&highPriorityConfigs[i]) + handlers[serviceName] = serviceSelfProtectionHandler + } + for i := range lowPriorityConfigs { + serviceName := lowPriorityConfigs[i].ServiceName + if _, find := handlers[serviceName]; find { + continue + } + serviceSelfProtectionHandler := NewServiceSelfProtectionHandler(&lowPriorityConfigs[i]) + handlers[serviceName] = serviceSelfProtectionHandler + } +} + +// GetHTTPAPIServiceName return mux route name registered for ServiceName func (h *SelfProtectionHandler) GetHTTPAPIServiceName(req *http.Request) (string, bool) { route := mux.CurrentRoute(req) if route != nil { @@ -63,11 +82,13 @@ func (h *SelfProtectionHandler) GetHTTPAPIServiceName(req *http.Request) (string return "", false } +// GetGRPCServiceName return ServiceName by mapping gRPC method name func (h *SelfProtectionHandler) GetGRPCServiceName(method string) (string, bool) { serviceName, ok := h.GrpcServiceNames[method] return serviceName, ok } +// GetComponentNameOnHTTP return component name from Request Header func (h *SelfProtectionHandler) GetComponentNameOnHTTP(r *http.Request) string { componentName := r.Header.Get(componentSignatureKey) if componentName == "" { @@ -76,6 +97,7 @@ func (h *SelfProtectionHandler) GetComponentNameOnHTTP(r *http.Request) string { return componentName } +// GetComponentNameOnGRPC return component name from gRPC metadata func (h *SelfProtectionHandler) GetComponentNameOnGRPC(ctx context.Context) string { md, ok := metadata.FromIncomingContext(ctx) if ok { @@ -87,6 +109,7 @@ func (h *SelfProtectionHandler) GetComponentNameOnGRPC(ctx context.Context) stri return componentAnonymousValue } +// SelfProtectionHandleHTTP is used to handle http api self protection func (h *SelfProtectionHandler) SelfProtectionHandleHTTP(req *http.Request) bool { serviceName, foundName := h.GetHTTPAPIServiceName(req) if !foundName { @@ -112,6 +135,7 @@ func (h *SelfProtectionHandler) SelfProtectionHandleHTTP(req *http.Request) bool return limitAllow } +// SelfProtectionHandleGRPC is used to handle gRPC self protection func (h *SelfProtectionHandler) SelfProtectionHandleGRPC(fullMethod string, ctx context.Context) bool { serviceName, foundName := h.GetGRPCServiceName(fullMethod) if !foundName { @@ -137,17 +161,20 @@ func (h *SelfProtectionHandler) SelfProtectionHandleGRPC(fullMethod string, ctx return limitAllow } +// ServiceSelfProtectionHandler currently includes QPS rate limiter and audit logger type ServiceSelfProtectionHandler struct { apiRateLimiter *APIRateLimiter auditLogger *AuditLogger } +// NewServiceSelfProtectionHandler return a new ServiceSelfProtectionHandler func NewServiceSelfProtectionHandler(config *config.ServiceSelfprotectionConfig) *ServiceSelfProtectionHandler { handler := &ServiceSelfProtectionHandler{} handler.Update(config) return handler } +// Update is used to update ServiceSelfProtectionHandler func (h *ServiceSelfProtectionHandler) Update(config *config.ServiceSelfprotectionConfig) { if h.apiRateLimiter == nil { h.apiRateLimiter = NewAPIRateLimiter(config) @@ -161,6 +188,7 @@ func (h *ServiceSelfProtectionHandler) Update(config *config.ServiceSelfprotecti } } +// RateLimitAllow is used to check whether the rate limit allow request process func (h *ServiceSelfProtectionHandler) RateLimitAllow(componentName string) bool { if h.apiRateLimiter == nil { return true @@ -168,6 +196,7 @@ func (h *ServiceSelfProtectionHandler) RateLimitAllow(componentName string) bool return h.apiRateLimiter.Allow(componentName) } +// EnableAudit is used to check Whether to enable the audit handle func (h *ServiceSelfProtectionHandler) EnableAudit() bool { if h.auditLogger == nil { return true @@ -175,6 +204,7 @@ func (h *ServiceSelfProtectionHandler) EnableAudit() bool { return h.auditLogger.Enable() } +// GetLogInfoFromHTTP return LogInfo from http.Request func GetLogInfoFromHTTP(req *http.Request, logInfo *LogInfo) { // Get IP logInfo.IP = apiutil.GetIPAddrFromHTTPRequest(req) @@ -186,16 +216,21 @@ func GetLogInfoFromHTTP(req *http.Request, logInfo *LogInfo) { req.Body = io.NopCloser(bytes.NewBuffer(buf)) } +// GetLogInfoFromHTTP return LogInfo from Context func GetLogInfoFromGRPC(ctx context.Context, logInfo *LogInfo) { // Get IP logInfo.IP = apiutil.GetIPAddrFromGRPCContext(ctx) // gRPC can't get Param in middware } +// AuditLog is a entrance to access AuditLoggor func (h *ServiceSelfProtectionHandler) AuditLog(logInfo *LogInfo) { h.auditLogger.Log(logInfo) } +// APIRateLimiter is used to limit unnecessary and excess request +// Currently support QPS rate limit by compoenent +// It depends on the rate.Limiter which implement a token-bucket algorithm type APIRateLimiter struct { mu sync.RWMutex @@ -207,12 +242,14 @@ type APIRateLimiter struct { componentQPSRateLimiter map[string]*rate.Limiter } +// NewAPIRateLimiter create a new api rate limiter func NewAPIRateLimiter(config *config.ServiceSelfprotectionConfig) *APIRateLimiter { limiter := &APIRateLimiter{} limiter.Update(config) return limiter } +// Update will replace all handler by service func (rl *APIRateLimiter) Update(config *config.ServiceSelfprotectionConfig) { rl.mu.Lock() defer rl.mu.Unlock() @@ -233,6 +270,7 @@ func (rl *APIRateLimiter) Update(config *config.ServiceSelfprotectionConfig) { } } +// QPSAllow firstly check component token bucket and then check total token bucket func (rl *APIRateLimiter) QPSAllow(componentName string) bool { if !rl.enableQPSLimit { return true @@ -249,12 +287,14 @@ func (rl *APIRateLimiter) QPSAllow(componentName string) bool { return isComponentQPSLimit && isTotalQPSLimit } +// Allow currentlt only supports QPS rate limit func (rl *APIRateLimiter) Allow(componentName string) bool { rl.mu.RLock() defer rl.mu.RUnlock() return rl.QPSAllow(componentName) } +// LogInfo stores needed api request info type LogInfo struct { ServiceName string Method string @@ -265,18 +305,24 @@ type LogInfo struct { RateLimitAllow bool } +// AuditLogger is used to record some information about the service for auditing when problems occur +// Currently it can be bonded two audit labels +// LoggerLabelLog("log") means AuditLogger will restore info in local file system +// LoggerLabelMonitored("Monitored") means AuditLogger will report info to promethus type AuditLogger struct { mu sync.RWMutex enableAudit bool labels map[string]bool } +// NewAuditLogger return a new AuditLogger func NewAuditLogger(config *config.ServiceSelfprotectionConfig) *AuditLogger { logger := &AuditLogger{} logger.Update(config) return logger } +// Update AuditLogger by config func (logger *AuditLogger) Update(config *config.ServiceSelfprotectionConfig) { logger.mu.Lock() defer logger.mu.Unlock() @@ -287,13 +333,17 @@ func (logger *AuditLogger) Update(config *config.ServiceSelfprotectionConfig) { } } +// Enable is used to check Whether to enable the audit handle func (logger *AuditLogger) Enable() bool { logger.mu.RLock() defer logger.mu.RUnlock() return logger.enableAudit } +// Log is used to handle log action func (logger *AuditLogger) Log(info *LogInfo) { + logger.mu.RLock() + defer logger.mu.RUnlock() if isLog, ok := logger.labels[LoggerLabelLog]; ok { if isLog { log.Info("service_audit_detailed", @@ -311,14 +361,7 @@ func (logger *AuditLogger) Log(info *LogInfo) { } } -func (h *SelfProtectionHandler) ServeHTTP(w http.ResponseWriter, r *http.Request, next http.HandlerFunc) { - if h.SelfProtectionHandleHTTP(r) { - next(w, r) - } else { - http.Error(w, http.StatusText(http.StatusTooManyRequests), http.StatusTooManyRequests) - } -} - +// UnaryServerInterceptor returns a gRPC stream server interceptor to handle self protection in gRPC unary service func (h *SelfProtectionHandler) UnaryServerInterceptor() grpc.UnaryServerInterceptor { return grpc.UnaryServerInterceptor(func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) { if h.SelfProtectionHandleGRPC(info.FullMethod, ctx) { @@ -328,6 +371,7 @@ func (h *SelfProtectionHandler) UnaryServerInterceptor() grpc.UnaryServerInterce }) } +// StreamServerInterceptor returns a gRPC stream server interceptor to handle self protection in gRPC stream service func (h *SelfProtectionHandler) StreamServerInterceptor() grpc.StreamServerInterceptor { return func(srv interface{}, stream grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error { if h.SelfProtectionHandleGRPC(info.FullMethod, stream.Context()) { @@ -337,16 +381,18 @@ func (h *SelfProtectionHandler) StreamServerInterceptor() grpc.StreamServerInter } } -// UserSignatureGRPCClientInterceptorBuilder add component user signature in gRPC -type UserSignatureGRPCClientInterceptorBuilder struct { +// ComponentSignatureGRPCClientInterceptorBuilder add component signature in gRPC +type ComponentSignatureGRPCClientInterceptorBuilder struct { component string } -func (builder *UserSignatureGRPCClientInterceptorBuilder) SetComponentName(component string) { +// SetComponentName set component name +func (builder *ComponentSignatureGRPCClientInterceptorBuilder) SetComponentName(component string) { builder.component = component } -func (builder *UserSignatureGRPCClientInterceptorBuilder) UnaryClientInterceptor() grpc.UnaryClientInterceptor { +// UnaryClientInterceptor return a ComponentSignature UnaryClientInterceptor +func (builder *ComponentSignatureGRPCClientInterceptorBuilder) UnaryClientInterceptor() grpc.UnaryClientInterceptor { return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error { headerData := metadata.Pairs(componentSignatureKey, builder.component) ctxH := metadata.NewOutgoingContext(ctx, headerData) @@ -355,7 +401,8 @@ func (builder *UserSignatureGRPCClientInterceptorBuilder) UnaryClientInterceptor } } -func (builder *UserSignatureGRPCClientInterceptorBuilder) StreamClientInterceptor() grpc.StreamClientInterceptor { +// StreamClientInterceptor return a ComponentSignature StreamClientInterceptor +func (builder *ComponentSignatureGRPCClientInterceptorBuilder) StreamClientInterceptor() grpc.StreamClientInterceptor { return func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) { headerData := metadata.Pairs(componentSignatureKey, builder.component) ctxH := metadata.NewOutgoingContext(ctx, headerData) @@ -363,7 +410,8 @@ func (builder *UserSignatureGRPCClientInterceptorBuilder) StreamClientIntercepto } } -func (builder *UserSignatureGRPCClientInterceptorBuilder) UserSignatureDialOptions() []grpc.DialOption { +// UserSignatureDialOptions create opts with ComponentSignature Interceptors +func (builder *ComponentSignatureGRPCClientInterceptorBuilder) UserSignatureDialOptions() []grpc.DialOption { streamInterceptors := []grpc.StreamClientInterceptor{builder.StreamClientInterceptor()} unaryInterceptors := []grpc.UnaryClientInterceptor{builder.UnaryClientInterceptor()} opts := []grpc.DialOption{grpc.WithChainStreamInterceptor(streamInterceptors...), grpc.WithChainUnaryInterceptor(unaryInterceptors...)} @@ -376,6 +424,7 @@ type UserSignatureRoundTripper struct { Component string } +// RoundTrip is used to implement RoundTripper func (rt *UserSignatureRoundTripper) RoundTrip(req *http.Request) (resp *http.Response, err error) { req.Header.Set(componentSignatureKey, rt.Component) // Send the request, get the response and the error diff --git a/server/server.go b/server/server.go index 1d022ca0756..00f54fe9b86 100644 --- a/server/server.go +++ b/server/server.go @@ -349,16 +349,19 @@ func (s *Server) startEtcd(ctx context.Context) error { return nil } +// NewSelfProtectionHandler returns a new SelfProtectionHandler with config func NewSelfProtectionHandler(server *Server) *middleware.SelfProtectionHandler { handler := &middleware.SelfProtectionHandler{ GrpcServiceNames: config.GRPCMethodServiceNames, ServiceHandlers: make(map[string]*middleware.ServiceSelfProtectionHandler), } - UpdateServiceHandlers(handler, server) + updateServiceHandlers(handler, server) return handler } -func UpdateServiceHandlers(h *middleware.SelfProtectionHandler, server *Server) { +// updateServiceHandlers update ServiceHandlers +// it will make a new map and merge user-defined handlers and dafault handlers with different priority according to enableUseDefault +func updateServiceHandlers(h *middleware.SelfProtectionHandler, server *Server) { if server == nil { return } @@ -373,26 +376,10 @@ func UpdateServiceHandlers(h *middleware.SelfProtectionHandler, server *Server) } // if enableUseDefault is 1, config defined by users has higher priority than dafault } else if enableUseDefault == 1 { - mergeSelfProtectionConfig(h.ServiceHandlers, server.GetConfig().SelfProtectionConfig.ServiceSelfprotectionConfig, config.DefaultServiceSelfProtectionConfig) + middleware.MergeSelfProtectionConfig(h.ServiceHandlers, server.GetConfig().SelfProtectionConfig.ServiceSelfprotectionConfig, config.DefaultServiceSelfProtectionConfig) // if enableUseDefault is 0, dafault config has higher priority than config defined by users } else { - mergeSelfProtectionConfig(h.ServiceHandlers, config.DefaultServiceSelfProtectionConfig, server.GetConfig().SelfProtectionConfig.ServiceSelfprotectionConfig) - } -} - -func mergeSelfProtectionConfig(handlers map[string]*middleware.ServiceSelfProtectionHandler, highPriorityConfigs []config.ServiceSelfprotectionConfig, lowPriorityConfigs []config.ServiceSelfprotectionConfig) { - for i := range highPriorityConfigs { - serviceName := highPriorityConfigs[i].ServiceName - serviceSelfProtectionHandler := middleware.NewServiceSelfProtectionHandler(&highPriorityConfigs[i]) - handlers[serviceName] = serviceSelfProtectionHandler - } - for i := range lowPriorityConfigs { - serviceName := lowPriorityConfigs[i].ServiceName - if _, find := handlers[serviceName]; find { - continue - } - serviceSelfProtectionHandler := middleware.NewServiceSelfProtectionHandler(&lowPriorityConfigs[i]) - handlers[serviceName] = serviceSelfProtectionHandler + middleware.MergeSelfProtectionConfig(h.ServiceHandlers, config.DefaultServiceSelfProtectionConfig, server.GetConfig().SelfProtectionConfig.ServiceSelfprotectionConfig) } }