From 2faf7b40ba576e647b4286892b5a559c00d5486c Mon Sep 17 00:00:00 2001 From: liangwei3 Date: Fri, 8 Dec 2023 14:26:17 +0800 Subject: [PATCH 1/4] code beautify --- agent.go | 8 ++++---- core/motan.go | 4 ++-- filter/rateLimit.go | 20 ++++++++++---------- protocol/motanProtocol.go | 10 +++++++--- protocol/motanProtocol_test.go | 4 ++-- server/motanserver.go | 4 ++-- 6 files changed, 27 insertions(+), 23 deletions(-) diff --git a/agent.go b/agent.go index a4ce63b1..fe27f9d0 100644 --- a/agent.go +++ b/agent.go @@ -846,18 +846,18 @@ func (a *agentMessageHandler) findCluster(request motan.Request) (c *cluster.Mot } err = a.fillMatch(rule.tip, rule.cond, key, clusters, rule.condFn, &matched) if err != nil { - putBackClusterSlice(matched) + releaseClusterSlice(matched) return } if len(matched) == 1 { c = matched[0].cluster - putBackClusterSlice(matched) + releaseClusterSlice(matched) return } matched = matched[:0] } err = fmt.Errorf("less condition to select cluster, maybe this service belongs to multiple group, protocol, version; cluster: %s, %s", key, getReqInfo(service, group, protocol, version)) - putBackClusterSlice(matched) + releaseClusterSlice(matched) return } @@ -1266,7 +1266,7 @@ func (a *Agent) UnexportService(url *motan.URL) error { return nil } -func putBackClusterSlice(s []serviceMapItem) { +func releaseClusterSlice(s []serviceMapItem) { s = s[:0] clusterSlicePool.Put(s) } diff --git a/core/motan.go b/core/motan.go index 73b01900..8c6c4553 100644 --- a/core/motan.go +++ b/core/motan.go @@ -494,7 +494,7 @@ func GetMotanRequestFromPool() *MotanRequest { return requestPool.Get().(*MotanRequest) } -func PutMotanRequestBackPool(req *MotanRequest) { +func ReleaseMotanRequest(req *MotanRequest) { if req != nil { req.Method = "" req.RequestID = 0 @@ -649,7 +649,7 @@ func GetMotanResponseFromPool() *MotanResponse { return responsePool.Get().(*MotanResponse) } -func PutMotanResponseBackPool(resp *MotanResponse) { +func ReleaseMotanResponse(resp *MotanResponse) { if resp != nil { //resp.Reset() resp.RequestID = 0 diff --git a/filter/rateLimit.go b/filter/rateLimit.go index 85d5ac22..e98692b1 100644 --- a/filter/rateLimit.go +++ b/filter/rateLimit.go @@ -137,20 +137,20 @@ func (r *RateLimitFilter) GetType() int32 { func getKeyValue(key, value, prefix string) (string, float64, bool) { if strings.HasPrefix(key, prefix) { if temp := strings.Split(key, prefix); len(temp) == 2 { - if r, err := strconv.ParseFloat(value, 64); err == nil && temp[1] != "" && r > 0 { + r, err := strconv.ParseFloat(value, 64) + if err == nil && temp[1] != "" && r > 0 { return temp[1], r, true + } + if err != nil { + vlog.Warningf("[rateLimit] parse %s config error:%s", key, err.Error()) } else { - if err != nil { - vlog.Warningf("[rateLimit] parse %s config error:%s", key, err.Error()) - } else { - if r <= 0 { - vlog.Warningf("[rateLimit] parse %s config error: value is 0 or negative", key) - } - } - if temp[1] == "" { - vlog.Warningf("[rateLimit] parse %s config error: key is empty", key) + if r <= 0 { + vlog.Warningf("[rateLimit] parse %s config error: value is 0 or negative", key) } } + if temp[1] == "" { + vlog.Warningf("[rateLimit] parse %s config error: key is empty", key) + } } } return "", 0, false diff --git a/protocol/motanProtocol.go b/protocol/motanProtocol.go index 180fb021..0b2c8d14 100644 --- a/protocol/motanProtocol.go +++ b/protocol/motanProtocol.go @@ -361,6 +361,9 @@ func Decode(buf *bufio.Reader, readSlice *[]byte) (msg *Message, err error) { } func DecodeWithTime(buf *bufio.Reader, rs *[]byte, maxContentLength int) (msg *Message, start time.Time, err error) { + // readSlice is a slice used by reading header info from buf + // in order to reuse this slice, parameter rs is a slice pointer, so that + // the extension of *rs would also affect the slice outside readSlice := *rs // decode header _, err = io.ReadAtLeast(buf, readSlice[:HeaderLength], HeaderLength) @@ -373,6 +376,7 @@ func DecodeWithTime(buf *bufio.Reader, rs *[]byte, maxContentLength int) (msg *M vlog.Errorf("wrong magic num:%d, err:%v", mn, err) return nil, start, ErrMagicNum } + // get a message from pool msg = messagePool.Get().(*Message) msg.Header.Magic = MotanMagic msg.Header.MsgType = readSlice[2] @@ -589,7 +593,7 @@ func ConvertToRequest(request *Message, serialize motan.Serialization) (motan.Re request.Header.SetGzip(false) } if !rc.Proxy && serialize == nil { - motan.PutMotanRequestBackPool(motanRequest) + motan.ReleaseMotanRequest(motanRequest) return nil, ErrSerializeNil } if len(motanRequest.Arguments) <= 0 { @@ -745,7 +749,7 @@ func ConvertToResponse(response *Message, serialize motan.Serialization) (motan. response.Header.SetGzip(false) } if !rc.Proxy && serialize == nil { - motan.PutMotanResponseBackPool(mres) + motan.ReleaseMotanResponse(mres) return nil, ErrSerializeNil } dv := &motan.DeserializableValue{Body: response.Body, Serialization: serialize} @@ -757,7 +761,7 @@ func ConvertToResponse(response *Message, serialize motan.Serialization) (motan. var exception *motan.Exception err := json.Unmarshal([]byte(e), &exception) if err != nil { - motan.PutMotanResponseBackPool(mres) + motan.ReleaseMotanResponse(mres) return nil, err } mres.Exception = exception diff --git a/protocol/motanProtocol_test.go b/protocol/motanProtocol_test.go index acc41513..0ce51292 100644 --- a/protocol/motanProtocol_test.go +++ b/protocol/motanProtocol_test.go @@ -277,7 +277,7 @@ func TestConvertToResponse(t *testing.T) { assertTrue(resp.GetAttachment(MPath) == "path", "response path", t) //assertTrue(resp.GetValue().(string) == "testbody", "response body", t) pMap[fmt.Sprintf("%p", resp)] = "1" - core.PutMotanResponseBackPool(resp.(*core.MotanResponse)) + core.ReleaseMotanResponse(resp.(*core.MotanResponse)) } assert.True(t, len(pMap) < 10000) } @@ -310,7 +310,7 @@ func TestConvertToRequest(t *testing.T) { assertTrue(req.GetAttachment(MMethod) == "method", "request method", t) assertTrue(req.GetAttachment(MPath) == "path", "request path", t) pMap[fmt.Sprintf("%p", req)] = "1" - core.PutMotanRequestBackPool(req.(*core.MotanRequest)) + core.ReleaseMotanRequest(req.(*core.MotanRequest)) } assert.True(t, len(pMap) < 10000) diff --git a/server/motanserver.go b/server/motanserver.go index 5eecb3a2..74bef77b 100644 --- a/server/motanserver.go +++ b/server/motanserver.go @@ -275,10 +275,10 @@ func (m *MotanServer) processV2(msg *mpro.Message, start time.Time, ip string, c mpro.PutMessageBackToPool(res) // 回收request if motanReq, ok := mreq.(*motan.MotanRequest); ok { - motan.PutMotanRequestBackPool(motanReq) + motan.ReleaseMotanRequest(motanReq) } if motanResp, ok := mres.(*motan.MotanResponse); ok { - motan.PutMotanResponseBackPool(motanResp) + motan.ReleaseMotanResponse(motanResp) } } From ff79f24ac7a15224c4bed1697c94c2f12c4b1674 Mon Sep 17 00:00:00 2001 From: liangwei3 Date: Fri, 8 Dec 2023 15:12:10 +0800 Subject: [PATCH 2/4] beautify code --- agent.go | 6 ++---- dynamicConfig.go | 8 ++++---- protocol/motanProtocol.go | 20 ++++++++++---------- protocol/motanProtocol_test.go | 2 +- server/motanserver.go | 4 ++-- 5 files changed, 19 insertions(+), 21 deletions(-) diff --git a/agent.go b/agent.go index 1f7a7ae5..40538c7a 100644 --- a/agent.go +++ b/agent.go @@ -138,7 +138,7 @@ func (a *Agent) RegisterCommandHandler(f CommandHandler) { a.commandHandlers = append(a.commandHandlers, f) } -func (a *Agent) GetDynamicRegistryInfo() *registrySnapInfoStorage { +func (a *Agent) GetDynamicRegistryInfo() *RegistrySnapInfoStorage { return a.configurer.getRegistryInfo() } @@ -840,6 +840,7 @@ func (a *agentMessageHandler) findCluster(request motan.Request) (c *cluster.Mot if cap(matched) < len(clusters) { matched = make([]serviceMapItem, 0, len(clusters)) } + releaseClusterSlice(matched) for i, rule := range search { if i == 0 { key = rule.cond @@ -848,18 +849,15 @@ func (a *agentMessageHandler) findCluster(request motan.Request) (c *cluster.Mot } err = a.fillMatch(rule.tip, rule.cond, key, clusters, rule.condFn, &matched) if err != nil { - releaseClusterSlice(matched) return } if len(matched) == 1 { c = matched[0].cluster - releaseClusterSlice(matched) return } matched = matched[:0] } err = fmt.Errorf("less condition to select cluster, maybe this service belongs to multiple group, protocol, version; cluster: %s, %s", key, getReqInfo(service, group, protocol, version)) - releaseClusterSlice(matched) return } diff --git a/dynamicConfig.go b/dynamicConfig.go index 98ca0538..afc10cc5 100644 --- a/dynamicConfig.go +++ b/dynamicConfig.go @@ -32,7 +32,7 @@ type DynamicConfigurer struct { agent *Agent } -type registrySnapInfoStorage struct { +type RegistrySnapInfoStorage struct { RegisterNodes []*core.URL `json:"register_nodes"` SubscribeNodes []*core.URL `json:"subscribe_nodes"` } @@ -60,7 +60,7 @@ func (c *DynamicConfigurer) doRecover() error { vlog.Warningln("Read configuration snapshot file error: " + err.Error()) return err } - registerSnapInfo := new(registrySnapInfoStorage) + registerSnapInfo := new(RegistrySnapInfoStorage) err = json.Unmarshal(bytes, registerSnapInfo) if err != nil { vlog.Errorln("Parse snapshot string error: " + err.Error()) @@ -162,8 +162,8 @@ func (c *DynamicConfigurer) saveSnapshot() { } } -func (c *DynamicConfigurer) getRegistryInfo() *registrySnapInfoStorage { - registrySnapInfo := registrySnapInfoStorage{} +func (c *DynamicConfigurer) getRegistryInfo() *RegistrySnapInfoStorage { + registrySnapInfo := RegistrySnapInfoStorage{} c.regLock.Lock() defer c.regLock.Unlock() diff --git a/protocol/motanProtocol.go b/protocol/motanProtocol.go index f90fceb8..2354bf83 100644 --- a/protocol/motanProtocol.go +++ b/protocol/motanProtocol.go @@ -392,13 +392,13 @@ func DecodeWithTime(buf *bufio.Reader, rs *[]byte, maxContentLength int) (msg *M // decode meta _, err = io.ReadAtLeast(buf, readSlice[:4], 4) if err != nil { - PutMessageBackToPool(msg) + ReleaseMessage(msg) return nil, start, err } metasize := int(binary.BigEndian.Uint32(readSlice[:4])) if metasize > maxContentLength { vlog.Errorf("meta over size. meta size:%d, max size:%d", metasize, maxContentLength) - PutMessageBackToPool(msg) + ReleaseMessage(msg) return nil, start, ErrOverSize } if metasize > 0 { @@ -408,7 +408,7 @@ func DecodeWithTime(buf *bufio.Reader, rs *[]byte, maxContentLength int) (msg *M } err := readBytes(buf, readSlice, metasize) if err != nil { - PutMessageBackToPool(msg) + ReleaseMessage(msg) return nil, start, err } s, e := 0, 0 @@ -427,7 +427,7 @@ func DecodeWithTime(buf *bufio.Reader, rs *[]byte, maxContentLength int) (msg *M } if k != "" { vlog.Errorf("decode message fail, metadata not paired. header:%v, meta:%s", msg.Header, readSlice) - PutMessageBackToPool(msg) + ReleaseMessage(msg) return nil, start, ErrMetadata } } @@ -435,13 +435,13 @@ func DecodeWithTime(buf *bufio.Reader, rs *[]byte, maxContentLength int) (msg *M //decode body _, err = io.ReadAtLeast(buf, readSlice[:4], 4) if err != nil { - PutMessageBackToPool(msg) + ReleaseMessage(msg) return nil, start, err } bodysize := int(binary.BigEndian.Uint32(readSlice[:4])) if bodysize > maxContentLength { vlog.Errorf("body over size. body size:%d, max size:%d", bodysize, maxContentLength) - PutMessageBackToPool(msg) + ReleaseMessage(msg) return nil, start, ErrOverSize } @@ -455,7 +455,7 @@ func DecodeWithTime(buf *bufio.Reader, rs *[]byte, maxContentLength int) (msg *M msg.Body = make([]byte, 0) } if err != nil { - PutMessageBackToPool(msg) + ReleaseMessage(msg) return nil, start, err } return msg, start, err @@ -708,13 +708,13 @@ func ConvertToResMessage(response motan.Response, serialize motan.Serialization) res.Body = b } else { vlog.Warningf("convert response value fail! serialized value not []byte. res:%+v", response) - PutMessageBackToPool(res) + ReleaseMessage(res) return nil, ErrSerializedData } } else { b, err := serialize.Serialize(response.GetValue()) if err != nil { - PutMessageBackToPool(res) + ReleaseMessage(res) return nil, err } res.Body = b @@ -785,7 +785,7 @@ func ExceptionToJSON(e *motan.Exception) string { return string(errmsg) } -func PutMessageBackToPool(msg *Message) { +func ReleaseMessage(msg *Message) { if msg != nil { //msg.Reset() msg.Type = 0 diff --git a/protocol/motanProtocol_test.go b/protocol/motanProtocol_test.go index 0ce51292..8a11f20a 100644 --- a/protocol/motanProtocol_test.go +++ b/protocol/motanProtocol_test.go @@ -221,7 +221,7 @@ func TestPool(t *testing.T) { assertTrue(cap(readSlice) > 200, "readSlice", t) assertTrue(len(newMsg.Body) == len(msg.Body), "body", t) assert.Nil(t, err) - PutMessageBackToPool(newMsg) + ReleaseMessage(newMsg) body1 := []byte("testbody") msg1 := &Message{Header: h, Metadata: meta, Body: body1} ebytes1 := msg1.Encode() diff --git a/server/motanserver.go b/server/motanserver.go index 5c3445b7..c038eaec 100644 --- a/server/motanserver.go +++ b/server/motanserver.go @@ -271,8 +271,8 @@ func (m *MotanServer) processV2(msg *mpro.Message, start time.Time, ip string, c tc.PutResSpan(&motan.Span{Name: motan.Send, Time: resSendTime}) } // 回收message - mpro.PutMessageBackToPool(msg) - mpro.PutMessageBackToPool(res) + mpro.ReleaseMessage(msg) + mpro.ReleaseMessage(res) // 回收request if motanReq, ok := mreq.(*motan.MotanRequest); ok { motan.ReleaseMotanRequest(motanReq) From b90bac90dadcbcc5d42eef2df266e6a1a62eadb9 Mon Sep 17 00:00:00 2001 From: liangwei3 Date: Tue, 12 Dec 2023 17:43:51 +0800 Subject: [PATCH 3/4] beautify code, add unit test --- agent.go | 65 ++++-------- agent_test.go | 18 ++-- core/map.go | 1 - core/motan.go | 180 ++++++++++++++------------------- core/url.go | 18 ++++ filter/accessLog.go | 3 +- filter/filter.go | 12 +++ filter/metrics.go | 9 +- protocol/motanProtocol.go | 56 ++++------ protocol/motanProtocol_test.go | 124 +++++++++++++++++++++-- 10 files changed, 267 insertions(+), 219 deletions(-) diff --git a/agent.go b/agent.go index 40538c7a..3da0be9b 100644 --- a/agent.go +++ b/agent.go @@ -45,11 +45,6 @@ var ( setAgentLock sync.Mutex notFoundProviderCount int64 = 0 defaultInitClusterTimeout int64 = 10000 //ms - clusterSlicePool = sync.Pool{ - New: func() interface{} { - return make([]serviceMapItem, 0, 5) - }, - } ) type Agent struct { @@ -442,7 +437,7 @@ func (a *Agent) initHTTPClusters() { } httpCluster := cluster.NewHTTPCluster(url, true, a.Context, a.extFactory) if httpCluster == nil { - vlog.Errorf("—Create http cluster %s failed", id) + vlog.Errorf("Create http cluster %s failed", id) continue } // here the domain has value @@ -820,44 +815,32 @@ func (a *agentMessageHandler) findCluster(request motan.Request) (c *cluster.Mot group := request.GetAttachment(mpro.MGroup) version := request.GetAttachment(mpro.MVersion) protocol := request.GetAttachment(mpro.MProxyProtocol) + if service == "" { + err = fmt.Errorf("empty service is not supported. info: {service: %s, group: %s, protocol: %s, version: %s}", service, group, protocol, version) + return + } serviceItemArrI, exists := a.agent.serviceMap.Load(service) if !exists { - err = fmt.Errorf("cluster not found. cluster:%s, %s", service, getReqInfo(service, group, protocol, version)) + err = fmt.Errorf("cluster not found. info: {service: %s, group: %s, protocol: %s, version: %s}", service, group, protocol, version) return } - search := []struct { - tip string - cond string - condFn func(u *motan.URL) string - }{ - {"service", service, func(u *motan.URL) string { return u.Path }}, - {"group", group, func(u *motan.URL) string { return u.Group }}, - {"protocol", protocol, func(u *motan.URL) string { return u.Protocol }}, - {"version", version, func(u *motan.URL) string { return u.GetParam(motan.VersionKey, "") }}, - } clusters := serviceItemArrI.([]serviceMapItem) - matched := clusterSlicePool.Get().([]serviceMapItem) - if cap(matched) < len(clusters) { - matched = make([]serviceMapItem, 0, len(clusters)) - } - releaseClusterSlice(matched) - for i, rule := range search { - if i == 0 { - key = rule.cond - } else { - key += "_" + rule.cond - } - err = a.fillMatch(rule.tip, rule.cond, key, clusters, rule.condFn, &matched) - if err != nil { - return - } - if len(matched) == 1 { - c = matched[0].cluster + if len(clusters) == 1 { + //TODO: add strict mode to avoid incorrect group call + c = clusters[0].cluster + return + } + if group == "" { + err = fmt.Errorf("multiple clusters are matched with service: %s, but the group is empty", service) + return + } + for _, j := range clusters { + if j.url.IsMatch(service, group, protocol, version) { + c = j.cluster return } - matched = matched[:0] } - err = fmt.Errorf("less condition to select cluster, maybe this service belongs to multiple group, protocol, version; cluster: %s, %s", key, getReqInfo(service, group, protocol, version)) + err = fmt.Errorf("no cluster matches the request; info: {service: %s, group: %s, protocol: %s, version: %s}", service, group, protocol, version) return } @@ -1231,11 +1214,6 @@ func urlExist(url *motan.URL, urls map[string]*motan.URL) bool { return false } -func getReqInfo(service, group, protocol, version string) string { - return fmt.Sprintf("request information: {service: %s, group: %s, protocol: %s, version: %s}", - service, group, protocol, version) -} - func (a *Agent) SubscribeService(url *motan.URL) error { if urlExist(url, a.Context.RefersURLs) { return fmt.Errorf("url exist, ignore subscribe, url: %s", url.GetIdentity()) @@ -1265,8 +1243,3 @@ func (a *Agent) UnexportService(url *motan.URL) error { } return nil } - -func releaseClusterSlice(s []serviceMapItem) { - s = s[:0] - clusterSlicePool.Put(s) -} diff --git a/agent_test.go b/agent_test.go index 55b8929d..fc413a7c 100644 --- a/agent_test.go +++ b/agent_test.go @@ -413,15 +413,18 @@ func TestAgent_InitCall(t *testing.T) { version string except string }{ + // 只传service,且只有一个cluster,findcCluster 会正常返回 {"test0", "", "", "", "No refers for request"}, - {"test-1", "111", "222", "333", "cluster not found. cluster:test-1"}, - {"test3", "", "", "", "empty group is not supported"}, + {"test0", "g0", "", "", "No refers for request"}, + {"test0", "g0", "http", "", "No refers for request"}, + {"test0", "g0", "", "1.3", "No refers for request"}, + {"test-1", "111", "222", "333", "cluster not found"}, {"test", "g2", "", "", "No refers for request"}, - {"test", "g1", "", "", "empty protocol is not supported"}, {"test", "g1", "motan2", "", "No refers for request"}, - {"test", "g1", "motan", "", "empty version is not supported"}, {"test", "g1", "http", "1.3", "No refers for request"}, - {"test", "g1", "http", "1.2", "less condition to select cluster"}, + {"test", "b", "c", "d", "no cluster matches the request"}, + // 同一个service有多个cluster可以匹配,但是group没有传 + {"test", "", "c", "d", "multiple clusters are matched with service"}, } { request.ServiceName = v.service request.SetAttachment(mpro.MGroup, v.group) @@ -479,10 +482,9 @@ func TestAgent_InitCall(t *testing.T) { version string except string }{ - {"test3", "111", "222", "333", "cluster not found. cluster:test3"}, - {"test4", "", "", "", "empty group is not supported"}, + {"test3", "111", "222", "333", "cluster not found. info: {service: test3"}, {"test5", "", "", "", "No refers for request"}, - {"helloService2", "", "", "", "cluster not found. cluster:helloService2"}, + {"helloService2", "", "", "", "cluster not found. info: {service: helloService2"}, } { request = newRequest(v.service, "") request.SetAttachment(mpro.MGroup, v.group) diff --git a/core/map.go b/core/map.go index cd6a4c43..5d5502e8 100644 --- a/core/map.go +++ b/core/map.go @@ -27,7 +27,6 @@ func (m *StringMap) Store(key, value string) { } func (m *StringMap) Reset() { - //TODO: 这个地方是否应该加锁呢? m.mu.Lock() for k := range m.innerMap { delete(m.innerMap, k) diff --git a/core/motan.go b/core/motan.go index b1ea3d42..a04bee48 100644 --- a/core/motan.go +++ b/core/motan.go @@ -400,26 +400,6 @@ type RPCContext struct { RemoteAddr string // remote address } -func ResetRPCContext(c *RPCContext) { - if c != nil { - c.ExtFactory = nil - c.OriginalMessage = nil - c.Oneway = false - c.Proxy = false - c.GzipSize = 0 - c.BodySize = 0 - c.SerializeNum = 0 - c.Serialized = false - c.AsyncCall = false - c.Result = nil - c.Reply = nil - c.FinishHandlers = c.FinishHandlers[:0] - c.Tc = nil - c.IsMotanV1 = false - c.RemoteAddr = "" - } -} - func (c *RPCContext) Reset() { c.ExtFactory = nil c.OriginalMessage = nil @@ -490,131 +470,125 @@ type MotanRequest struct { mu sync.Mutex } -func GetMotanRequestFromPool() *MotanRequest { +func AcquireMotanRequest() *MotanRequest { return requestPool.Get().(*MotanRequest) } func ReleaseMotanRequest(req *MotanRequest) { if req != nil { - req.Method = "" - req.RequestID = 0 - req.ServiceName = "" - req.MethodDesc = "" - ResetRPCContext(req.RPCContext) - req.Attachment = nil - req.Arguments = req.Arguments[:0] + req.Reset() requestPool.Put(req) } } // Reset reset motan request -func (m *MotanRequest) Reset() { - m.Method = "" - m.RequestID = 0 - m.ServiceName = "" - m.MethodDesc = "" - m.RPCContext.Reset() - m.Attachment = nil - m.Arguments = m.Arguments[:0] +func (req *MotanRequest) Reset() { + req.Method = "" + req.RequestID = 0 + req.ServiceName = "" + req.MethodDesc = "" + req.RPCContext.Reset() + req.Attachment = nil + req.Arguments = req.Arguments[:0] } // GetAttachment GetAttachment -func (m *MotanRequest) GetAttachment(key string) string { - if m.Attachment == nil { +func (req *MotanRequest) GetAttachment(key string) string { + if req.Attachment == nil { return "" } - return m.Attachment.LoadOrEmpty(key) + return req.Attachment.LoadOrEmpty(key) } // SetAttachment : SetAttachment -func (m *MotanRequest) SetAttachment(key string, value string) { - m.GetAttachments().Store(key, value) +func (req *MotanRequest) SetAttachment(key string, value string) { + req.GetAttachments().Store(key, value) } // GetServiceName GetServiceName -func (m *MotanRequest) GetServiceName() string { - return m.ServiceName +func (req *MotanRequest) GetServiceName() string { + return req.ServiceName } // GetMethod GetMethod -func (m *MotanRequest) GetMethod() string { - return m.Method +func (req *MotanRequest) GetMethod() string { + return req.Method } // GetMethodDesc GetMethodDesc -func (m *MotanRequest) GetMethodDesc() string { - return m.MethodDesc +func (req *MotanRequest) GetMethodDesc() string { + return req.MethodDesc } -func (m *MotanRequest) GetArguments() []interface{} { - return m.Arguments +func (req *MotanRequest) GetArguments() []interface{} { + return req.Arguments } -func (m *MotanRequest) GetRequestID() uint64 { - return m.RequestID +func (req *MotanRequest) GetRequestID() uint64 { + return req.RequestID } -func (m *MotanRequest) SetArguments(arguments []interface{}) { - m.Arguments = arguments +func (req *MotanRequest) SetArguments(arguments []interface{}) { + req.Arguments = arguments } -func (m *MotanRequest) GetAttachments() *StringMap { - attachment := (*StringMap)(atomic.LoadPointer((*unsafe.Pointer)(unsafe.Pointer(&m.Attachment)))) +func (req *MotanRequest) GetAttachments() *StringMap { + attachment := (*StringMap)(atomic.LoadPointer((*unsafe.Pointer)(unsafe.Pointer(&req.Attachment)))) if attachment != nil { return attachment } - m.mu.Lock() - defer m.mu.Unlock() - if m.Attachment == nil { + req.mu.Lock() + defer req.mu.Unlock() + if req.Attachment == nil { attachment = NewStringMap(DefaultAttachmentSize) - atomic.StorePointer((*unsafe.Pointer)(unsafe.Pointer(&m.Attachment)), unsafe.Pointer(attachment)) + atomic.StorePointer((*unsafe.Pointer)(unsafe.Pointer(&req.Attachment)), unsafe.Pointer(attachment)) } else { - attachment = m.Attachment + attachment = req.Attachment } return attachment } -func (m *MotanRequest) GetRPCContext(canCreate bool) *RPCContext { - if m.RPCContext == nil && canCreate { - m.RPCContext = &RPCContext{} +func (req *MotanRequest) GetRPCContext(canCreate bool) *RPCContext { + if req.RPCContext == nil && canCreate { + req.RPCContext = &RPCContext{} } - return m.RPCContext + return req.RPCContext } -func (m *MotanRequest) Clone() interface{} { +func (req *MotanRequest) Clone() interface{} { newRequest := &MotanRequest{ - RequestID: m.RequestID, - ServiceName: m.ServiceName, - Method: m.Method, - MethodDesc: m.MethodDesc, - Arguments: m.Arguments, + RequestID: req.RequestID, + ServiceName: req.ServiceName, + Method: req.Method, + MethodDesc: req.MethodDesc, + Arguments: req.Arguments, } - if m.Attachment != nil { - newRequest.Attachment = m.Attachment.Copy() + if req.Attachment != nil { + newRequest.Attachment = req.Attachment.Copy() } - if m.RPCContext != nil { + if req.RPCContext != nil { newRequest.RPCContext = &RPCContext{ - ExtFactory: m.RPCContext.ExtFactory, - Oneway: m.RPCContext.Oneway, - Proxy: m.RPCContext.Proxy, - GzipSize: m.RPCContext.GzipSize, - SerializeNum: m.RPCContext.SerializeNum, - Serialized: m.RPCContext.Serialized, - AsyncCall: m.RPCContext.AsyncCall, - Result: m.RPCContext.Result, - Reply: m.RPCContext.Reply, - RequestSendTime: m.RPCContext.RequestSendTime, - RequestReceiveTime: m.RPCContext.RequestReceiveTime, - ResponseSendTime: m.RPCContext.ResponseSendTime, - ResponseReceiveTime: m.RPCContext.ResponseReceiveTime, - FinishHandlers: m.RPCContext.FinishHandlers, - Tc: m.RPCContext.Tc, + ExtFactory: req.RPCContext.ExtFactory, + Oneway: req.RPCContext.Oneway, + Proxy: req.RPCContext.Proxy, + GzipSize: req.RPCContext.GzipSize, + SerializeNum: req.RPCContext.SerializeNum, + Serialized: req.RPCContext.Serialized, + AsyncCall: req.RPCContext.AsyncCall, + Result: req.RPCContext.Result, + Reply: req.RPCContext.Reply, + RequestSendTime: req.RPCContext.RequestSendTime, + RequestReceiveTime: req.RPCContext.RequestReceiveTime, + ResponseSendTime: req.RPCContext.ResponseSendTime, + ResponseReceiveTime: req.RPCContext.ResponseReceiveTime, + FinishHandlers: req.RPCContext.FinishHandlers, + Tc: req.RPCContext.Tc, } - if m.RPCContext.OriginalMessage != nil { - if oldMessage, ok := m.RPCContext.OriginalMessage.(Cloneable); ok { + if req.RPCContext.OriginalMessage != nil { + if oldMessage, ok := req.RPCContext.OriginalMessage.(Cloneable); ok { newRequest.RPCContext.OriginalMessage = oldMessage.Clone() } else { - newRequest.RPCContext.OriginalMessage = m.RPCContext.OriginalMessage + newRequest.RPCContext.OriginalMessage = req.RPCContext.OriginalMessage } } } @@ -623,14 +597,14 @@ func (m *MotanRequest) Clone() interface{} { // ProcessDeserializable : DeserializableValue to real params according toType // some serialization can deserialize without toType, so nil toType can be accepted in these serializations -func (m *MotanRequest) ProcessDeserializable(toTypes []interface{}) error { - if m.GetArguments() != nil && len(m.GetArguments()) == 1 { - if d, ok := m.GetArguments()[0].(*DeserializableValue); ok { +func (req *MotanRequest) ProcessDeserializable(toTypes []interface{}) error { + if req.GetArguments() != nil && len(req.GetArguments()) == 1 { + if d, ok := req.GetArguments()[0].(*DeserializableValue); ok { v, err := d.DeserializeMulti(toTypes) if err != nil { return err } - m.SetArguments(v) + req.SetArguments(v) } } return nil @@ -646,20 +620,14 @@ type MotanResponse struct { mu sync.Mutex } -func GetMotanResponseFromPool() *MotanResponse { +func AcquireMotanResponse() *MotanResponse { return responsePool.Get().(*MotanResponse) } -func ReleaseMotanResponse(resp *MotanResponse) { - if resp != nil { - //resp.Reset() - resp.RequestID = 0 - resp.Value = nil - resp.Exception = nil - resp.ProcessTime = 0 - resp.Attachment = nil - ResetRPCContext(resp.RPCContext) - responsePool.Put(resp) +func ReleaseMotanResponse(m *MotanResponse) { + if m != nil { + m.Reset() + responsePool.Put(m) } } diff --git a/core/url.go b/core/url.go index 8978a9f2..dfcbb989 100644 --- a/core/url.go +++ b/core/url.go @@ -45,6 +45,24 @@ func (u *URL) GetIdentity() string { return idt } +// IsMatch is a tool function for comparing parameters: service, group, protocol and version +// with URL. This function is compatible with the situation that some of the parameters is empty +func (u *URL) IsMatch(service, group, protocol, version string) bool { + if u.Path != service { + return false + } + if group != "" && u.Group != group { + return false + } + if protocol != "" && u.Protocol != protocol { + return false + } + if version != "" && u.GetParam(VersionKey, "") != version { + return false + } + return true +} + func (u *URL) GetIdentityWithRegistry() string { id := u.GetIdentity() registryId := u.GetParam(RegistryKey, "") diff --git a/filter/accessLog.go b/filter/accessLog.go index ee980087..a78c9562 100644 --- a/filter/accessLog.go +++ b/filter/accessLog.go @@ -38,12 +38,11 @@ func (t *AccessLogFilter) Filter(caller motan.Caller, request motan.Request) mot case motan.Provider: role = serverAgentRole ip = request.GetAttachment(motan.HostKey) - start = request.GetRPCContext(true).RequestReceiveTime case motan.EndPoint: role = clientAgentRole ip = caller.GetURL().Host - start = time.Now() } + start = getFilterStartTime(caller, request) response := t.GetNext().Filter(caller, request) address := ip + ":" + caller.GetURL().GetPortStr() if _, ok := caller.(motan.Provider); ok { diff --git a/filter/filter.go b/filter/filter.go index 0126f291..38552ac8 100644 --- a/filter/filter.go +++ b/filter/filter.go @@ -2,6 +2,7 @@ package filter import ( motan "github.com/weibocom/motan-go/core" + "time" ) // ext name @@ -59,3 +60,14 @@ func RegistDefaultFilters(extFactory motan.ExtensionFactory) { return &ClusterCircuitBreakerFilter{} }) } + +func getFilterStartTime(caller motan.Caller, request motan.Request) time.Time { + switch caller.(type) { + case motan.Provider: + return request.GetRPCContext(true).RequestReceiveTime + case motan.EndPoint: + return time.Now() + default: + return time.Now() + } +} diff --git a/filter/metrics.go b/filter/metrics.go index f74df941..c69b41fd 100644 --- a/filter/metrics.go +++ b/filter/metrics.go @@ -56,15 +56,8 @@ func (m *MetricsFilter) GetNext() motan.EndPointFilter { } func (m *MetricsFilter) Filter(caller motan.Caller, request motan.Request) motan.Response { - var start time.Time - switch caller.(type) { - case motan.Provider: - start = request.GetRPCContext(true).RequestReceiveTime - case motan.EndPoint: - start = time.Now() - } + start := getFilterStartTime(caller, request) response := m.GetNext().Filter(caller, request) - proxy := false provider := false ctx := request.GetRPCContext(false) diff --git a/protocol/motanProtocol.go b/protocol/motanProtocol.go index 2354bf83..dc19f51b 100644 --- a/protocol/motanProtocol.go +++ b/protocol/motanProtocol.go @@ -68,16 +68,6 @@ func (h *Header) Reset() { h.RequestID = 0 } -func ResetHeader(h *Header) { - if h != nil { - h.Magic = 0 - h.MsgType = 0 - h.VersionStatus = 0 - h.Serialize = 0 - h.RequestID = 0 - } -} - func (h *Header) Clone() *Header { return &Header{ Magic: h.Magic, @@ -360,10 +350,10 @@ func Decode(buf *bufio.Reader, readSlice *[]byte) (msg *Message, err error) { return msg, err } +// DecodeWithTime the parameter rs is a slice pointer, so the +// extension of *rs will affect the slice outside in order to +// reuse the readSlice which is used by reading header info from buf func DecodeWithTime(buf *bufio.Reader, rs *[]byte, maxContentLength int) (msg *Message, start time.Time, err error) { - // readSlice is a slice used by reading header info from buf - // in order to reuse this slice, parameter rs is a slice pointer, so that - // the extension of *rs would also affect the slice outside readSlice := *rs // decode header _, err = io.ReadAtLeast(buf, readSlice[:HeaderLength], HeaderLength) @@ -377,7 +367,12 @@ func DecodeWithTime(buf *bufio.Reader, rs *[]byte, maxContentLength int) (msg *M return nil, start, ErrMagicNum } // get a message from pool - msg = messagePool.Get().(*Message) + msg = AcquireMessage() + defer func() { + if err != nil { + ReleaseMessage(msg) + } + }() msg.Header.Magic = MotanMagic msg.Header.MsgType = readSlice[2] msg.Header.VersionStatus = readSlice[3] @@ -392,13 +387,11 @@ func DecodeWithTime(buf *bufio.Reader, rs *[]byte, maxContentLength int) (msg *M // decode meta _, err = io.ReadAtLeast(buf, readSlice[:4], 4) if err != nil { - ReleaseMessage(msg) return nil, start, err } metasize := int(binary.BigEndian.Uint32(readSlice[:4])) if metasize > maxContentLength { vlog.Errorf("meta over size. meta size:%d, max size:%d", metasize, maxContentLength) - ReleaseMessage(msg) return nil, start, ErrOverSize } if metasize > 0 { @@ -406,9 +399,8 @@ func DecodeWithTime(buf *bufio.Reader, rs *[]byte, maxContentLength int) (msg *M readSlice = make([]byte, metasize) *rs = readSlice } - err := readBytes(buf, readSlice, metasize) + err = readBytes(buf, readSlice, metasize) if err != nil { - ReleaseMessage(msg) return nil, start, err } s, e := 0, 0 @@ -427,7 +419,6 @@ func DecodeWithTime(buf *bufio.Reader, rs *[]byte, maxContentLength int) (msg *M } if k != "" { vlog.Errorf("decode message fail, metadata not paired. header:%v, meta:%s", msg.Header, readSlice) - ReleaseMessage(msg) return nil, start, ErrMetadata } } @@ -435,13 +426,11 @@ func DecodeWithTime(buf *bufio.Reader, rs *[]byte, maxContentLength int) (msg *M //decode body _, err = io.ReadAtLeast(buf, readSlice[:4], 4) if err != nil { - ReleaseMessage(msg) return nil, start, err } bodysize := int(binary.BigEndian.Uint32(readSlice[:4])) if bodysize > maxContentLength { vlog.Errorf("body over size. body size:%d, max size:%d", bodysize, maxContentLength) - ReleaseMessage(msg) return nil, start, ErrOverSize } @@ -455,7 +444,6 @@ func DecodeWithTime(buf *bufio.Reader, rs *[]byte, maxContentLength int) (msg *M msg.Body = make([]byte, 0) } if err != nil { - ReleaseMessage(msg) return nil, start, err } return msg, start, err @@ -567,7 +555,7 @@ func DecodeGzip(data []byte) (ret []byte, err error) { // ConvertToRequest convert motan2 protocol request message to motan Request func ConvertToRequest(request *Message, serialize motan.Serialization) (motan.Request, error) { - motanRequest := motan.GetMotanRequestFromPool() + motanRequest := motan.AcquireMotanRequest() motanRequest.RequestID = request.Header.RequestID if idStr, ok := request.Metadata.Load(MRequestID); !ok { if request.Header.IsProxy() { @@ -596,12 +584,7 @@ func ConvertToRequest(request *Message, serialize motan.Serialization) (motan.Re motan.ReleaseMotanRequest(motanRequest) return nil, ErrSerializeNil } - if len(motanRequest.Arguments) <= 0 { - motanRequest.Arguments = []interface{}{&motan.DeserializableValue{Body: request.Body, Serialization: serialize}} - } else { - motanRequest.Arguments[0].(*motan.DeserializableValue).Body = request.Body - motanRequest.Arguments[0].(*motan.DeserializableValue).Serialization = serialize - } + motanRequest.Arguments = append(motanRequest.Arguments, &motan.DeserializableValue{Body: request.Body, Serialization: serialize}) } return motanRequest, nil } @@ -681,8 +664,7 @@ func ConvertToResMessage(response motan.Response, serialize motan.Serialization) return msg, nil } } - - res := messagePool.Get().(*Message) + res := AcquireMessage() var msgType int if response.GetException() != nil { msgType = Exception @@ -733,7 +715,7 @@ func ConvertToResMessage(response motan.Response, serialize motan.Serialization) // ConvertToResponse convert protocol response to motan Response func ConvertToResponse(response *Message, serialize motan.Serialization) (motan.Response, error) { - mres := motan.GetMotanResponseFromPool() + mres := motan.AcquireMotanResponse() rc := mres.GetRPCContext(true) rc.Proxy = response.Header.IsProxy() mres.RequestID = response.Header.RequestID @@ -785,13 +767,13 @@ func ExceptionToJSON(e *motan.Exception) string { return string(errmsg) } +func AcquireMessage() *Message { + return messagePool.Get().(*Message) +} + func ReleaseMessage(msg *Message) { if msg != nil { - //msg.Reset() - msg.Type = 0 - msg.Body = msg.Body[:0] - ResetHeader(msg.Header) - msg.Metadata.Reset() + msg.Reset() messagePool.Put(msg) } } diff --git a/protocol/motanProtocol_test.go b/protocol/motanProtocol_test.go index 8a11f20a..a71a3745 100644 --- a/protocol/motanProtocol_test.go +++ b/protocol/motanProtocol_test.go @@ -148,9 +148,6 @@ func TestEncode(t *testing.T) { fmt.Println("len:", ebytes.Len()) readSlice := make([]byte, 100) - //for i := 0; i < len(readSlice); i++ { - // readSlice[i] = 't' - //} newMsg, err := Decode(bufio.NewReader(ebytes), &readSlice) if newMsg == nil { t.Fatalf("encode message fail") @@ -175,7 +172,6 @@ func TestEncode(t *testing.T) { if !newMsg.Header.IsGzip() { t.Fatalf("encode message fail") } - nb, err := DecodeGzip(newMsg.Body) if err != nil { t.Errorf("decode gzip fail. err:%v", err) @@ -254,7 +250,8 @@ func TestConvertToResponse(t *testing.T) { h.SetStatus(6) h.SetOneWay(true) h.SetSerialize(6) - h.SetGzip(true) + h.SetStatus(0) + h.SetGzip(false) h.SetHeartbeat(true) h.SetProxy(true) h.SetRequest(true) @@ -267,7 +264,7 @@ func TestConvertToResponse(t *testing.T) { meta.Store(MPath, "path") body := []byte("testbody") msg := &Message{Header: h, Metadata: meta, Body: body} - + // To test convert when the method use pool pMap := make(map[string]string) for i := 0; i < 10000; i++ { resp, err := ConvertToResponse(msg, &serialize.SimpleSerialization{}) @@ -275,21 +272,74 @@ func TestConvertToResponse(t *testing.T) { assertTrue(resp.GetAttachment(MGroup) == "group", "response group", t) assertTrue(resp.GetAttachment(MMethod) == "method", "response method", t) assertTrue(resp.GetAttachment(MPath) == "path", "response path", t) - //assertTrue(resp.GetValue().(string) == "testbody", "response body", t) + assertTrue(string(resp.GetValue().(*core.DeserializableValue).Body) == "testbody", "response body", t) pMap[fmt.Sprintf("%p", resp)] = "1" core.ReleaseMotanResponse(resp.(*core.MotanResponse)) } + // check if responses are reused assert.True(t, len(pMap) < 10000) + // To test if convert is correct when the method is called in concurrent situation + h1 := &Header{} + h1.SetVersion(Version2) + h1.SetStatus(1) + h1.SetOneWay(true) + h1.SetSerialize(6) + h1.SetGzip(false) + h1.SetHeartbeat(true) + h1.SetProxy(true) + h1.SetRequest(true) + h1.Magic = MotanMagic + h1.RequestID = 1234456 + meta1 := core.NewStringMap(0) + meta1.Store("k2", "v2") + meta1.Store(MGroup, "group1") + meta1.Store(MMethod, "method1") + meta1.Store(MPath, "path1") + meta1.Store(MException, `{"errcode": 0, "errmsg": "test exception", "errtype": 1}`) + msg1 := &Message{Header: h1, Metadata: meta1, Body: nil} + wg := sync.WaitGroup{} + wg.Add(2) + go func() { + time.Sleep(time.Millisecond * 500) + for i := 0; i < 10000; i++ { + resp, err := ConvertToResponse(msg, &serialize.SimpleSerialization{}) + assertTrue(resp.GetRequestID() == 2349789, "request id is incorrect", t) + assertTrue(err == nil, "convert to response err", t) + assertTrue(resp.GetAttachment(MGroup) == "group", "response group", t) + assertTrue(resp.GetAttachment(MMethod) == "method", "response method", t) + assertTrue(resp.GetAttachment(MPath) == "path", "response path", t) + assertTrue(string(resp.GetValue().(*core.DeserializableValue).Body) == "testbody", "response body", t) + core.ReleaseMotanResponse(resp.(*core.MotanResponse)) + } + wg.Done() + }() + go func() { + time.Sleep(time.Millisecond * 500) + for i := 0; i < 10000; i++ { + resp, err := ConvertToResponse(msg1, &serialize.SimpleSerialization{}) + assertTrue(resp.GetRequestID() == 1234456, "request id is incorrect", t) + assertTrue(err == nil, "convert to response err", t) + assertTrue(resp.GetAttachment(MGroup) == "group1", "response group", t) + assertTrue(resp.GetAttachment(MMethod) == "method1", "response method", t) + assertTrue(resp.GetAttachment(MPath) == "path1", "response path", t) + assertTrue(resp.GetValue() == nil, "response body", t) + assertTrue(resp.GetException().ErrMsg == "test exception", "response exception error message", t) + assertTrue(resp.GetException().ErrCode == 0, "response exception error code", t) + assertTrue(resp.GetException().ErrType == 1, "response exception error type", t) + core.ReleaseMotanResponse(resp.(*core.MotanResponse)) + } + wg.Done() + }() + wg.Wait() } -// TODO convert func TestConvertToRequest(t *testing.T) { h := &Header{} h.SetVersion(Version2) h.SetStatus(6) h.SetOneWay(true) h.SetSerialize(6) - h.SetGzip(true) + h.SetGzip(false) h.SetHeartbeat(true) h.SetProxy(true) h.SetRequest(true) @@ -303,19 +353,71 @@ func TestConvertToRequest(t *testing.T) { body := []byte("testbody") msg := &Message{Header: h, Metadata: meta, Body: body} pMap := make(map[string]string) + // To test convert when the method use pool for i := 0; i < 10000; i++ { req, err := ConvertToRequest(msg, &serialize.SimpleSerialization{}) + assertTrue(req.GetRequestID() == 2349789, "request id", t) assertTrue(err == nil, "conver to request err", t) assertTrue(req.GetAttachment(MGroup) == "group", "request group", t) assertTrue(req.GetAttachment(MMethod) == "method", "request method", t) assertTrue(req.GetAttachment(MPath) == "path", "request path", t) + assertTrue(len(req.GetArguments()) == 1, "request argument", t) pMap[fmt.Sprintf("%p", req)] = "1" core.ReleaseMotanRequest(req.(*core.MotanRequest)) } + // check if requests are reused assert.True(t, len(pMap) < 10000) - - req, err := ConvertToRequest(msg, &serialize.SimpleSerialization{}) + // To test if convert is correct when the method is called in concurrent situation + h1 := &Header{} + h1.SetVersion(Version2) + h1.SetStatus(6) + h1.SetOneWay(true) + h1.SetSerialize(6) + h1.SetGzip(false) + h1.SetHeartbeat(true) + h1.SetProxy(true) + h1.SetRequest(true) + h1.Magic = MotanMagic + h1.RequestID = 1234456 + meta1 := core.NewStringMap(0) + meta1.Store("k2", "v2") + meta1.Store(MGroup, "group1") + meta1.Store(MMethod, "method1") + meta1.Store(MPath, "path1") + msg1 := &Message{Header: h1, Metadata: meta1, Body: nil} + wg := sync.WaitGroup{} + wg.Add(2) + go func() { + time.Sleep(time.Millisecond * 500) + for i := 0; i < 10000; i++ { + req, err := ConvertToRequest(msg, &serialize.SimpleSerialization{}) + assertTrue(req.GetRequestID() == 2349789, "request id is incorrect", t) + assertTrue(err == nil, "conver to request err", t) + assertTrue(req.GetAttachment(MGroup) == "group", "request group", t) + assertTrue(req.GetAttachment(MMethod) == "method", "request method", t) + assertTrue(req.GetAttachment(MPath) == "path", "request path", t) + assertTrue(len(req.GetArguments()) == 1, "request argument", t) + core.ReleaseMotanRequest(req.(*core.MotanRequest)) + } + wg.Done() + }() + go func() { + time.Sleep(time.Millisecond * 500) + for i := 0; i < 10000; i++ { + req, err := ConvertToRequest(msg1, &serialize.SimpleSerialization{}) + assertTrue(req.GetRequestID() == 1234456, "request id is incorrect", t) + assertTrue(err == nil, "conver to request err", t) + assertTrue(req.GetAttachment(MGroup) == "group1", "request group", t) + assertTrue(req.GetAttachment(MMethod) == "method1", "request method", t) + assertTrue(req.GetAttachment(MPath) == "path1", "request path", t) + assertTrue(len(req.GetArguments()) == 0, "request argument", t) + core.ReleaseMotanRequest(req.(*core.MotanRequest)) + } + wg.Done() + }() + wg.Wait() // test request clone + req, err := ConvertToRequest(msg, &serialize.SimpleSerialization{}) cloneReq := req.Clone().(core.Request) assertTrue(err == nil, "conver to request err", t) assertTrue(cloneReq.GetAttachment(MGroup) == "group", "clone request group", t) From 0796175a494d97128592cceeb0d74924faa6fa5b Mon Sep 17 00:00:00 2001 From: liangwei3 Date: Wed, 13 Dec 2023 16:34:51 +0800 Subject: [PATCH 4/4] fix message pool bug --- protocol/motanProtocol.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/protocol/motanProtocol.go b/protocol/motanProtocol.go index dc19f51b..48f9e695 100644 --- a/protocol/motanProtocol.go +++ b/protocol/motanProtocol.go @@ -378,6 +378,7 @@ func DecodeWithTime(buf *bufio.Reader, rs *[]byte, maxContentLength int) (msg *M msg.Header.VersionStatus = readSlice[3] version := msg.Header.GetVersion() if version != Version2 { // TODO 不再验证 + err = ErrVersion vlog.Errorf("unsupported protocol version number: %d", version) return nil, start, ErrVersion } @@ -391,6 +392,7 @@ func DecodeWithTime(buf *bufio.Reader, rs *[]byte, maxContentLength int) (msg *M } metasize := int(binary.BigEndian.Uint32(readSlice[:4])) if metasize > maxContentLength { + err = ErrOverSize vlog.Errorf("meta over size. meta size:%d, max size:%d", metasize, maxContentLength) return nil, start, ErrOverSize } @@ -418,6 +420,7 @@ func DecodeWithTime(buf *bufio.Reader, rs *[]byte, maxContentLength int) (msg *M } } if k != "" { + err = ErrMetadata vlog.Errorf("decode message fail, metadata not paired. header:%v, meta:%s", msg.Header, readSlice) return nil, start, ErrMetadata } @@ -430,6 +433,7 @@ func DecodeWithTime(buf *bufio.Reader, rs *[]byte, maxContentLength int) (msg *M } bodysize := int(binary.BigEndian.Uint32(readSlice[:4])) if bodysize > maxContentLength { + err = ErrOverSize vlog.Errorf("body over size. body size:%d, max size:%d", bodysize, maxContentLength) return nil, start, ErrOverSize }