diff --git a/agent.go b/agent.go index 541d8365..17884ad4 100644 --- a/agent.go +++ b/agent.go @@ -45,11 +45,6 @@ var ( setAgentLock sync.Mutex notFoundProviderCount int64 = 0 defaultInitClusterTimeout int64 = 10000 //ms - clusterSlicePool = sync.Pool{ - New: func() interface{} { - return make([]serviceMapItem, 0, 5) - }, - } ) type Agent struct { @@ -799,28 +794,32 @@ func (a *agentMessageHandler) Call(request motan.Request) (res motan.Response) { } return res } -func (a *agentMessageHandler) fillMatch(typ, cond, key string, data []serviceMapItem, f func(u *motan.URL) string, match *[]serviceMapItem) error { +func (a *agentMessageHandler) matchRule(typ, cond, key string, data []serviceMapItem, f func(u *motan.URL) string) (foundClusters []serviceMapItem, err error) { if cond == "" { - return fmt.Errorf("empty %s is not supported", typ) + err = fmt.Errorf("empty %s is not supported", typ) + return } for _, item := range data { if f(item.url) == cond { - *match = append(*match, item) + foundClusters = append(foundClusters, item) } } - if len(*match) == 0 { - return fmt.Errorf("cluster not found. cluster:%s", key) + if len(foundClusters) == 0 { + err = fmt.Errorf("cluster not found. cluster:%s", key) + return } - return nil + return } func (a *agentMessageHandler) findCluster(request motan.Request) (c *cluster.MotanCluster, key string, err error) { service := request.GetServiceName() group := request.GetAttachment(mpro.MGroup) version := request.GetAttachment(mpro.MVersion) protocol := request.GetAttachment(mpro.MProxyProtocol) + reqInfo := fmt.Sprintf("request information: {service: %s, group: %s, protocol: %s, version: %s}", + service, group, protocol, version) serviceItemArrI, exists := a.agent.serviceMap.Load(service) if !exists { - err = fmt.Errorf("cluster not found. cluster:%s, %s", service, getReqInfo(service, group, protocol, version)) + err = fmt.Errorf("cluster not found. cluster:%s, %s", service, reqInfo) return } search := []struct { @@ -833,31 +832,23 @@ func (a *agentMessageHandler) findCluster(request motan.Request) (c *cluster.Mot {"protocol", protocol, func(u *motan.URL) string { return u.Protocol }}, {"version", version, func(u *motan.URL) string { return u.GetParam(motan.VersionKey, "") }}, } - clusters := serviceItemArrI.([]serviceMapItem) - matched := clusterSlicePool.Get().([]serviceMapItem) - if cap(matched) < len(clusters) { - matched = make([]serviceMapItem, 0, len(clusters)) - } + foundClusters := serviceItemArrI.([]serviceMapItem) for i, rule := range search { if i == 0 { key = rule.cond } else { key += "_" + rule.cond } - err = a.fillMatch(rule.tip, rule.cond, key, clusters, rule.condFn, &matched) + foundClusters, err = a.matchRule(rule.tip, rule.cond, key, foundClusters, rule.condFn) if err != nil { - putBackClusterSlice(matched) return } - if len(matched) == 1 { - c = matched[0].cluster - putBackClusterSlice(matched) + if len(foundClusters) == 1 { + c = foundClusters[0].cluster return } - matched = matched[:0] } - err = fmt.Errorf("less condition to select cluster, maybe this service belongs to multiple group, protocol, version; cluster: %s, %s", key, getReqInfo(service, group, protocol, version)) - putBackClusterSlice(matched) + err = fmt.Errorf("less condition to select cluster, maybe this service belongs to multiple group, protocol, version; cluster: %s, %s", key, reqInfo) return } @@ -1231,11 +1222,6 @@ func urlExist(url *motan.URL, urls map[string]*motan.URL) bool { return false } -func getReqInfo(service, group, protocol, version string) string { - return fmt.Sprintf("request information: {service: %s, group: %s, protocol: %s, version: %s}", - service, group, protocol, version) -} - func (a *Agent) SubscribeService(url *motan.URL) error { if urlExist(url, a.Context.RefersURLs) { return fmt.Errorf("url exist, ignore subscribe, url: %s", url.GetIdentity()) @@ -1265,8 +1251,3 @@ func (a *Agent) UnexportService(url *motan.URL) error { } return nil } - -func putBackClusterSlice(s []serviceMapItem) { - s = s[:0] - clusterSlicePool.Put(s) -} diff --git a/core/bytes.go b/core/bytes.go index 457ff2d3..604c6d57 100644 --- a/core/bytes.go +++ b/core/bytes.go @@ -4,17 +4,6 @@ import ( "encoding/binary" "errors" "io" - "math/rand" - "sync" - "time" -) - -var ( - maxReuseBufSize = 204800 - discardRatio = 0.1 - bytesBufferPool = sync.Pool{New: func() interface{} { - return new(BytesBuffer) - }} ) // BytesBuffer is a variable-sized buffer of bytes with Read and Write methods. @@ -37,16 +26,10 @@ func NewBytesBuffer(initsize int) *BytesBuffer { // NewBytesBufferWithOrder create a empty BytesBuffer with initial size and byte order func NewBytesBufferWithOrder(initsize int, order binary.ByteOrder) *BytesBuffer { - bb := AcquireBytesBuffer() - if bb.buf == nil { - bb.buf = make([]byte, initsize) - } - if bb.temp == nil { - bb.temp = make([]byte, 8) + return &BytesBuffer{buf: make([]byte, initsize), + order: order, + temp: make([]byte, 8), } - bb.order = order - - return bb } // CreateBytesBuffer create a BytesBuffer from data bytes @@ -95,16 +78,6 @@ func (b *BytesBuffer) WriteByte(c byte) { b.wpos++ } -// WriteString write a str string append the BytesBuffer, and the wpos will increase len(str) -func (b *BytesBuffer) WriteString(str string) { - l := len(str) - if len(b.buf) < b.wpos+l { - b.grow(l) - } - copy(b.buf[b.wpos:], str) - b.wpos += l -} - // Write write a byte array append the BytesBuffer, and the wpos will increase len(bytes) func (b *BytesBuffer) Write(bytes []byte) { l := len(bytes) @@ -289,29 +262,3 @@ func (b *BytesBuffer) Remain() int { return b.wpos - b.rpos } func (b *BytesBuffer) Len() int { return b.wpos - 0 } func (b *BytesBuffer) Cap() int { return cap(b.buf) } - -func hitDiscard() bool { - r := rand.New(rand.NewSource(time.Now().UnixNano())).Intn(100) - if float64(r)/100 < discardRatio { - return true - } - return false -} - -func AcquireBytesBuffer() *BytesBuffer { - b := bytesBufferPool.Get() - if b == nil { - return &BytesBuffer{} - } - return b.(*BytesBuffer) -} - -func ReleaseBytesBuffer(b *BytesBuffer) { - if b != nil { - //if cap(b.buf) > maxReuseBufSize && hitDiscard() { - // return - //} - b.Reset() - bytesBufferPool.Put(b) - } -} diff --git a/core/constants.go b/core/constants.go index c873cb63..20ec7e76 100644 --- a/core/constants.go +++ b/core/constants.go @@ -129,7 +129,3 @@ const ( EUnkonwnMsg = 1003 EConvertMsg = 1004 ) - -const ( - DefaultDecodeLength = 100 -) diff --git a/core/map.go b/core/map.go index 7434eb89..4aa2bcf3 100644 --- a/core/map.go +++ b/core/map.go @@ -26,15 +26,6 @@ func (m *StringMap) Store(key, value string) { m.mu.Unlock() } -func (m *StringMap) Reset() { - //TODO: 这个地方是否应该加锁呢? - m.mu.Lock() - for k := range m.innerMap { - delete(m.innerMap, k) - } - m.mu.Unlock() -} - func (m *StringMap) Delete(key string) { m.mu.Lock() delete(m.innerMap, key) @@ -57,8 +48,17 @@ func (m *StringMap) LoadOrEmpty(key string) string { // If f returns false, range stops the iteration func (m *StringMap) Range(f func(k, v string) bool) { m.mu.RLock() - defer m.mu.RUnlock() - for k, v := range m.innerMap { + keys := make([]string, 0, len(m.innerMap)) + for k := range m.innerMap { + keys = append(keys, k) + } + m.mu.RUnlock() + + for _, k := range keys { + v, ok := m.Load(k) + if !ok { + continue + } if !f(k, v) { break } diff --git a/core/motan.go b/core/motan.go index 8c0903aa..adfd9016 100644 --- a/core/motan.go +++ b/core/motan.go @@ -14,20 +14,7 @@ import ( type taskHandler func() -var ( - refreshTaskPool = make(chan taskHandler, 100) - requestPool = sync.Pool{New: func() interface{} { - return &MotanRequest{ - RPCContext: &RPCContext{}, - Arguments: []interface{}{}, - } - }} - responsePool = sync.Pool{New: func() interface{} { - return &MotanResponse{ - RPCContext: &RPCContext{}, - } - }} -) +var refreshTaskPool = make(chan taskHandler, 100) func init() { go func() { @@ -41,8 +28,10 @@ func init() { } const ( - DefaultAttachmentSize = 16 - ProtocolLocal = "local" + DefaultAttachmentSize = 16 + DefaultRPCContextMetaSize = 8 + + ProtocolLocal = "local" ) var ( @@ -384,6 +373,8 @@ type RPCContext struct { AsyncCall bool Result *AsyncResult Reply interface{} + + Meta *StringMap // various time, it's owned by motan request context RequestSendTime time.Time RequestReceiveTime time.Time @@ -400,44 +391,6 @@ type RPCContext struct { RemoteAddr string // remote address } -func ResetRPCContext(c *RPCContext) { - if c != nil { - c.ExtFactory = nil - c.OriginalMessage = nil - c.Oneway = false - c.Proxy = false - c.GzipSize = 0 - c.BodySize = 0 - c.SerializeNum = 0 - c.Serialized = false - c.AsyncCall = false - c.Result = nil - c.Reply = nil - c.FinishHandlers = c.FinishHandlers[:0] - c.Tc = nil - c.IsMotanV1 = false - c.RemoteAddr = "" - } -} - -func (c *RPCContext) Reset() { - c.ExtFactory = nil - c.OriginalMessage = nil - c.Oneway = false - c.Proxy = false - c.GzipSize = 0 - c.BodySize = 0 - c.SerializeNum = 0 - c.Serialized = false - c.AsyncCall = false - c.Result = nil - c.Reply = nil - c.FinishHandlers = c.FinishHandlers[:0] - c.Tc = nil - c.IsMotanV1 = false - c.RemoteAddr = "" -} - func (c *RPCContext) AddFinishHandler(handler FinishHandler) { c.FinishHandlers = append(c.FinishHandlers, handler) } @@ -490,34 +443,6 @@ type MotanRequest struct { mu sync.Mutex } -func GetMotanRequestFromPool() *MotanRequest { - return requestPool.Get().(*MotanRequest) -} - -func PutMotanRequestBackPool(req *MotanRequest) { - if req != nil { - req.Method = "" - req.RequestID = 0 - req.ServiceName = "" - req.MethodDesc = "" - ResetRPCContext(req.RPCContext) - req.Attachment = nil - req.Arguments = req.Arguments[:0] - requestPool.Put(req) - } -} - -// Reset: Reset -func (m *MotanRequest) Reset() { - m.Method = "" - m.RequestID = 0 - m.ServiceName = "" - m.MethodDesc = "" - m.RPCContext.Reset() - m.Attachment = nil - m.Arguments = m.Arguments[:0] -} - // GetAttachment GetAttachment func (m *MotanRequest) GetAttachment(key string) string { if m.Attachment == nil { @@ -575,7 +500,9 @@ func (m *MotanRequest) GetAttachments() *StringMap { func (m *MotanRequest) GetRPCContext(canCreate bool) *RPCContext { if m.RPCContext == nil && canCreate { - m.RPCContext = &RPCContext{} + m.RPCContext = &RPCContext{ + Meta: NewStringMap(DefaultRPCContextMetaSize), + } } return m.RPCContext } @@ -602,6 +529,7 @@ func (m *MotanRequest) Clone() interface{} { AsyncCall: m.RPCContext.AsyncCall, Result: m.RPCContext.Result, Reply: m.RPCContext.Reply, + Meta: m.RPCContext.Meta, RequestSendTime: m.RPCContext.RequestSendTime, RequestReceiveTime: m.RPCContext.RequestReceiveTime, ResponseSendTime: m.RPCContext.ResponseSendTime, @@ -645,32 +573,6 @@ type MotanResponse struct { mu sync.Mutex } -func GetMotanResponseFromPool() *MotanResponse { - return responsePool.Get().(*MotanResponse) -} - -func PutMotanResponseBackPool(resp *MotanResponse) { - if resp != nil { - //resp.Reset() - resp.RequestID = 0 - resp.Value = nil - resp.Exception = nil - resp.ProcessTime = 0 - resp.Attachment = nil - ResetRPCContext(resp.RPCContext) - responsePool.Put(resp) - } -} - -func (m *MotanResponse) Reset() { - m.RequestID = 0 - m.Value = nil - m.Exception = nil - m.ProcessTime = 0 - m.Attachment = nil - m.RPCContext.Reset() -} - func (m *MotanResponse) GetAttachment(key string) string { if m.Attachment == nil { return "" @@ -716,7 +618,9 @@ func (m *MotanResponse) GetAttachments() *StringMap { func (m *MotanResponse) GetRPCContext(canCreate bool) *RPCContext { if m.RPCContext == nil && canCreate { - m.RPCContext = &RPCContext{} + m.RPCContext = &RPCContext{ + Meta: NewStringMap(DefaultRPCContextMetaSize), + } } return m.RPCContext } diff --git a/endpoint/motanCommonEndpoint.go b/endpoint/motanCommonEndpoint.go index 0c857bed..220587a3 100644 --- a/endpoint/motanCommonEndpoint.go +++ b/endpoint/motanCommonEndpoint.go @@ -14,12 +14,6 @@ import ( "time" ) -var ( - streamPool = sync.Pool{New: func() interface{} { - return new(Stream) - }} -) - // MotanCommonEndpoint supports motan v1, v2 protocols type MotanCommonEndpoint struct { url *motan.URL @@ -357,25 +351,11 @@ type Stream struct { isClose atomic.Value // bool isHeartbeat bool // for heartbeat heartbeatVersion int // for heartbeat - sendTimer *time.Timer - recvTimer *time.Timer - release bool // concurrency issues for stream timeout and channel recv msg -} - -func (s *Stream) Reset() { - s.channel = nil - s.req = nil - s.res = nil - s.rc = nil } func (s *Stream) Send() (err error) { - if s.sendTimer == nil { - s.sendTimer = time.NewTimer(s.deadline.Sub(time.Now())) - } else { - s.sendTimer.Reset(s.deadline.Sub(time.Now())) - } - defer s.sendTimer.Stop() + timer := time.NewTimer(s.deadline.Sub(time.Now())) + defer timer.Stop() var bytes []byte var msg *mpro.Message @@ -406,15 +386,13 @@ func (s *Stream) Send() (err error) { } } - ready := sendReady{} if msg != nil { // encode v2 message - ready.bytesBuffer = msg.Encode() - bytes = ready.bytesBuffer.Bytes() + bytes = msg.Encode().Bytes() } - ready.data = bytes if s.rc != nil && s.rc.Tc != nil { s.rc.Tc.PutReqSpan(&motan.Span{Name: motan.Encode, Addr: s.channel.address, Time: time.Now()}) } + ready := sendReady{data: bytes} select { case s.channel.sendCh <- ready: if s.rc != nil { @@ -425,7 +403,7 @@ func (s *Stream) Send() (err error) { } } return nil - case <-s.sendTimer.C: + case <-timer.C: return ErrSendRequestTimeout case <-s.channel.shutdownCh: return ErrChannelShutdown @@ -435,16 +413,10 @@ func (s *Stream) Send() (err error) { // Recv sync recv func (s *Stream) Recv() (motan.Response, error) { defer func() { - if s.Close() { - s.release = true - } + s.Close() }() - if s.recvTimer == nil { - s.recvTimer = time.NewTimer(s.deadline.Sub(time.Now())) - } else { - s.recvTimer.Reset(s.deadline.Sub(time.Now())) - } - defer s.recvTimer.Stop() + timer := time.NewTimer(s.deadline.Sub(time.Now())) + defer timer.Stop() select { case <-s.recvNotifyCh: msg := s.res @@ -452,16 +424,17 @@ func (s *Stream) Recv() (motan.Response, error) { return nil, errors.New("recv err: recvMsg is nil") } return msg, nil - case <-s.recvTimer.C: - s.release = false + case <-timer.C: return nil, ErrRecvRequestTimeout case <-s.channel.shutdownCh: - s.release = false return nil, ErrChannelShutdown } } func (s *Stream) notify(msg interface{}, t time.Time) { + defer func() { + s.Close() + }() decodeTime := time.Now() var res motan.Response var v2Msg *mpro.Message @@ -508,9 +481,6 @@ func (s *Stream) notify(msg interface{}, t time.Time) { s.rc.Tc.PutResSpan(&motan.Span{Name: motan.Convert, Addr: s.channel.address, Time: time.Now()}) } if s.rc.AsyncCall { - defer func() { - s.Close() - }() result := s.rc.Result if err != nil { result.Error = err @@ -537,18 +507,15 @@ func (c *Channel) NewStream(req motan.Request, rc *motan.RPCContext) (*Stream, e if c.IsClosed() { return nil, ErrChannelShutdown } - s := AcquireStream() - s.streamId = GenerateRequestID() - s.channel = c - s.isHeartbeat = false - s.req = req - if s.recvNotifyCh == nil { - s.recvNotifyCh = make(chan struct{}, 1) + s := &Stream{ + streamId: GenerateRequestID(), + channel: c, + req: req, + recvNotifyCh: make(chan struct{}, 1), + deadline: time.Now().Add(defaultRequestTimeout), // default deadline + rc: rc, } - s.deadline = time.Now().Add(defaultRequestTimeout) - s.rc = rc s.isClose.Store(false) - s.release = true c.streamLock.Lock() c.streams[s.streamId] = s c.streamLock.Unlock() @@ -559,41 +526,34 @@ func (c *Channel) NewHeartbeatStream(heartbeatVersion int) (*Stream, error) { if c.IsClosed() { return nil, ErrChannelShutdown } - s := AcquireStream() - s.streamId = GenerateRequestID() - s.channel = c - s.isHeartbeat = true - s.heartbeatVersion = heartbeatVersion - if s.recvNotifyCh == nil { - s.recvNotifyCh = make(chan struct{}, 1) + s := &Stream{ + streamId: GenerateRequestID(), + channel: c, + isHeartbeat: true, + heartbeatVersion: heartbeatVersion, + recvNotifyCh: make(chan struct{}, 1), + deadline: time.Now().Add(defaultRequestTimeout), } - s.deadline = time.Now().Add(defaultRequestTimeout) s.isClose.Store(false) - s.release = true c.heartbeatLock.Lock() c.heartbeats[s.streamId] = s c.heartbeatLock.Unlock() return s, nil } -func (s *Stream) Close() bool { - var exist bool - if !s.isClose.Swap(true).(bool) { +func (s *Stream) Close() { + if !s.isClose.Load().(bool) { if s.isHeartbeat { s.channel.heartbeatLock.Lock() - if _, exist = s.channel.heartbeats[s.streamId]; exist { - delete(s.channel.heartbeats, s.streamId) - } + delete(s.channel.heartbeats, s.streamId) s.channel.heartbeatLock.Unlock() } else { s.channel.streamLock.Lock() - if _, exist = s.channel.heartbeats[s.streamId]; exist { - delete(s.channel.streams, s.streamId) - } + delete(s.channel.streams, s.streamId) s.channel.streamLock.Unlock() } + s.isClose.Store(true) } - return exist } // Call send request to the server. @@ -604,12 +564,6 @@ func (c *Channel) Call(req motan.Request, deadline time.Duration, rc *motan.RPCC if err != nil { return nil, err } - defer func() { - if rc == nil || !rc.AsyncCall { - ReleaseStream(stream) - } - }() - stream.SetDeadline(deadline) err = stream.Send() if err != nil { @@ -626,9 +580,6 @@ func (c *Channel) HeartBeat(heartbeatVersion int) (motan.Response, error) { if err != nil { return nil, err } - defer func() { - ReleaseStream(stream) - }() err = stream.Send() if err != nil { return nil, err @@ -650,7 +601,6 @@ func (c *Channel) recv() { } func (c *Channel) recvLoop() error { - decodeBuf := make([]byte, motan.DefaultDecodeLength) for { v, err := mpro.CheckMotanVersion(c.bufRead) if err != nil { @@ -661,7 +611,7 @@ func (c *Channel) recvLoop() error { if v == mpro.Version1 { msg, t, err = mpro.ReadV1Message(c.bufRead, c.config.MaxContentLength) } else if v == mpro.Version2 { - msg, t, err = mpro.DecodeWithTime(c.bufRead, &decodeBuf, c.config.MaxContentLength) + msg, t, err = mpro.DecodeWithTime(c.bufRead, c.config.MaxContentLength) } else { vlog.Warningf("unsupported motan version! version:%d con:%s.", v, c.conn.RemoteAddr().String()) err = mpro.ErrVersion @@ -697,12 +647,10 @@ func (c *Channel) handleMsg(msg interface{}, t time.Time) { if isHeartbeat { c.heartbeatLock.Lock() stream = c.heartbeats[rid] - delete(c.heartbeats, rid) c.heartbeatLock.Unlock() } else { c.streamLock.Lock() stream = c.streams[rid] - delete(c.streams, rid) c.streamLock.Unlock() } if stream == nil { @@ -732,9 +680,6 @@ func (c *Channel) send() { sent += n } } - if ready.bytesBuffer != nil { - motan.ReleaseBytesBuffer(ready.bytesBuffer) - } case <-c.shutdownCh: return } @@ -900,18 +845,3 @@ func buildChannel(conn net.Conn, config *ChannelConfig, serialization motan.Seri return channel } - -func AcquireStream() *Stream { - v := streamPool.Get() - if v == nil { - return &Stream{} - } - return v.(*Stream) -} - -func ReleaseStream(stream *Stream) { - if stream != nil && stream.release { - stream.Reset() - streamPool.Put(stream) - } -} diff --git a/endpoint/motanEndpoint.go b/endpoint/motanEndpoint.go index 90082b85..44c0ee50 100644 --- a/endpoint/motanEndpoint.go +++ b/endpoint/motanEndpoint.go @@ -32,10 +32,7 @@ var ( defaultAsyncResponse = &motan.MotanResponse{Attachment: motan.NewStringMap(motan.DefaultAttachmentSize), RPCContext: &motan.RPCContext{AsyncCall: true}} - errPanic = errors.New("panic error") - v2StreamPool = sync.Pool{New: func() interface{} { - return new(V2Stream) - }} + errPanic = errors.New("panic error") ) type MotanEndpoint struct { @@ -145,8 +142,9 @@ func (m *MotanEndpoint) Call(request motan.Request) motan.Response { m.recordErrAndKeepalive() return m.defaultErrMotanResponse(request, "motanEndpoint error: channels is null") } + startTime := time.Now().UnixNano() if rc.AsyncCall { - rc.Result.StartTime = time.Now().UnixNano() + rc.Result.StartTime = startTime } // get a channel channel, err := m.channels.Get() @@ -382,9 +380,8 @@ type V2Channel struct { } type V2Stream struct { - channel *V2Channel - sendMsg *mpro.Message - streamId uint64 + channel *V2Channel + sendMsg *mpro.Message // recv msg recvMsg *mpro.Message recvNotifyCh chan struct{} @@ -394,31 +391,17 @@ type V2Stream struct { rc *motan.RPCContext isClose atomic.Value // bool isHeartBeat bool - sendTimer *time.Timer - recvTimer *time.Timer - release bool // concurrency issues for stream timeout and channel recv msg -} - -func (s *V2Stream) Reset() { - s.channel = nil - s.sendMsg = nil - s.recvMsg = nil - s.rc = nil } func (s *V2Stream) Send() error { - if s.sendTimer == nil { - s.sendTimer = time.NewTimer(s.deadline.Sub(time.Now())) - } else { - s.sendTimer.Reset(s.deadline.Sub(time.Now())) - } - defer s.sendTimer.Stop() + timer := time.NewTimer(s.deadline.Sub(time.Now())) + defer timer.Stop() buf := s.sendMsg.Encode() if s.rc != nil && s.rc.Tc != nil { s.rc.Tc.PutReqSpan(&motan.Span{Name: motan.Encode, Addr: s.channel.address, Time: time.Now()}) } - ready := sendReady{data: buf.Bytes(), bytesBuffer: buf} + ready := sendReady{data: buf.Bytes()} select { case s.channel.sendCh <- ready: if s.rc != nil { @@ -429,7 +412,7 @@ func (s *V2Stream) Send() error { } } return nil - case <-s.sendTimer.C: + case <-timer.C: return ErrSendRequestTimeout case <-s.channel.shutdownCh: return ErrChannelShutdown @@ -439,16 +422,10 @@ func (s *V2Stream) Send() error { // Recv sync recv func (s *V2Stream) Recv() (*mpro.Message, error) { defer func() { - if s.Close() { - s.release = true - } + s.Close() }() - if s.recvTimer == nil { - s.recvTimer = time.NewTimer(s.deadline.Sub(time.Now())) - } else { - s.recvTimer.Reset(s.deadline.Sub(time.Now())) - } - defer s.recvTimer.Stop() + timer := time.NewTimer(s.deadline.Sub(time.Now())) + defer timer.Stop() select { case <-s.recvNotifyCh: msg := s.recvMsg @@ -456,16 +433,17 @@ func (s *V2Stream) Recv() (*mpro.Message, error) { return nil, errors.New("recv err: recvMsg is nil") } return msg, nil - case <-s.recvTimer.C: - s.release = false + case <-timer.C: return nil, ErrRecvRequestTimeout case <-s.channel.shutdownCh: - s.release = false return nil, ErrChannelShutdown } } func (s *V2Stream) notify(msg *mpro.Message, t time.Time) { + defer func() { + s.Close() + }() if s.rc != nil { s.rc.ResponseReceiveTime = t if s.rc.Tc != nil { @@ -473,9 +451,6 @@ func (s *V2Stream) notify(msg *mpro.Message, t time.Time) { s.rc.Tc.PutResSpan(&motan.Span{Name: motan.Decode, Time: time.Now()}) } if s.rc.AsyncCall { - defer func() { - s.Close() - }() msg.Header.SetProxy(s.rc.Proxy) result := s.rc.Result response, err := mpro.ConvertToResponse(msg, s.channel.serialization) @@ -512,19 +487,16 @@ func (c *V2Channel) NewStream(msg *mpro.Message, rc *motan.RPCContext) (*V2Strea if c.IsClosed() { return nil, ErrChannelShutdown } - - s := AcquireV2Stream() - s.channel = c - s.sendMsg = msg - if s.recvNotifyCh == nil { - s.recvNotifyCh = make(chan struct{}, 1) + s := &V2Stream{ + channel: c, + sendMsg: msg, + recvNotifyCh: make(chan struct{}, 1), + deadline: time.Now().Add(1 * time.Second), + rc: rc, } - s.deadline = time.Now().Add(1 * time.Second) - s.rc = rc s.isClose.Store(false) // RequestID is communication identifier, it is own by channel msg.Header.RequestID = GenerateRequestID() - s.streamId = msg.Header.RequestID if msg.Header.IsHeartbeat() { c.heartbeatLock.Lock() c.heartbeats[msg.Header.RequestID] = s @@ -534,35 +506,27 @@ func (c *V2Channel) NewStream(msg *mpro.Message, rc *motan.RPCContext) (*V2Strea c.streamLock.Lock() c.streams[msg.Header.RequestID] = s c.streamLock.Unlock() - s.isHeartBeat = false } - s.release = true return s, nil } -func (s *V2Stream) Close() bool { - var exist bool - if !s.isClose.Swap(true).(bool) { +func (s *V2Stream) Close() { + if !s.isClose.Load().(bool) { if s.isHeartBeat { s.channel.heartbeatLock.Lock() - if _, exist = s.channel.heartbeats[s.streamId]; exist { - delete(s.channel.heartbeats, s.streamId) - } + delete(s.channel.heartbeats, s.sendMsg.Header.RequestID) s.channel.heartbeatLock.Unlock() } else { s.channel.streamLock.Lock() - if _, exist = s.channel.heartbeats[s.streamId]; exist { - delete(s.channel.streams, s.streamId) - } + delete(s.channel.streams, s.sendMsg.Header.RequestID) s.channel.streamLock.Unlock() } + s.isClose.Store(true) } - return exist } type sendReady struct { - data []byte - bytesBuffer *motan.BytesBuffer + data []byte } func (c *V2Channel) Call(msg *mpro.Message, deadline time.Duration, rc *motan.RPCContext) (*mpro.Message, error) { @@ -570,12 +534,6 @@ func (c *V2Channel) Call(msg *mpro.Message, deadline time.Duration, rc *motan.RP if err != nil { return nil, err } - defer func() { - if rc == nil || !rc.AsyncCall { - ReleaseV2Stream(stream) - } - }() - stream.SetDeadline(deadline) if err := stream.Send(); err != nil { return nil, err @@ -600,9 +558,8 @@ func (c *V2Channel) recv() { } func (c *V2Channel) recvLoop() error { - readSlice := make([]byte, motan.DefaultDecodeLength) for { - res, t, err := mpro.DecodeWithTime(c.bufRead, &readSlice, c.config.MaxContentLength) + res, t, err := mpro.DecodeWithTime(c.bufRead, c.config.MaxContentLength) if err != nil { return err } @@ -640,7 +597,6 @@ func (c *V2Channel) send() { sent += n } } - motan.ReleaseBytesBuffer(ready.bytesBuffer) case <-c.shutdownCh: return } @@ -650,7 +606,6 @@ func (c *V2Channel) send() { func (c *V2Channel) handleHeartbeat(msg *mpro.Message, t time.Time) error { c.heartbeatLock.Lock() stream := c.heartbeats[msg.Header.RequestID] - delete(c.heartbeats, msg.Header.RequestID) c.heartbeatLock.Unlock() if stream == nil { vlog.Warningf("handle heartbeat message, missing stream: %d, ep:%s", msg.Header.RequestID, c.address) @@ -663,7 +618,6 @@ func (c *V2Channel) handleHeartbeat(msg *mpro.Message, t time.Time) error { func (c *V2Channel) handleMessage(msg *mpro.Message, t time.Time) error { c.streamLock.Lock() stream := c.streams[msg.Header.RequestID] - delete(c.streams, msg.Header.RequestID) c.streamLock.Unlock() if stream == nil { vlog.Warningf("handle recv message, missing stream: %d, ep:%s", msg.Header.RequestID, c.address) @@ -845,18 +799,3 @@ func GetDefaultMotanEPAsynInit() bool { } return res.(bool) } - -func AcquireV2Stream() *V2Stream { - v := v2StreamPool.Get() - if v == nil { - return &V2Stream{} - } - return v.(*V2Stream) -} - -func ReleaseV2Stream(stream *V2Stream) { - if stream != nil && stream.release { - stream.Reset() - v2StreamPool.Put(stream) - } -} diff --git a/endpoint/motanEndpoint_test.go b/endpoint/motanEndpoint_test.go index 1fd71939..9bb0efe6 100644 --- a/endpoint/motanEndpoint_test.go +++ b/endpoint/motanEndpoint_test.go @@ -21,7 +21,7 @@ func TestMain(m *testing.M) { m.Run() } -// TODO more UT +//TODO more UT func TestGetName(t *testing.T) { url := &motan.URL{Port: 8989, Protocol: "motan2"} url.PutParam(motan.TimeOutKey, "100") @@ -269,8 +269,7 @@ func handle(netListen net.Listener) { func handleConnection(conn net.Conn, timeout int) { buf := bufio.NewReader(conn) - readSlice := make([]byte, 100) - msg, _, err := protocol.DecodeWithTime(buf, &readSlice, 10*1024*1024) + msg, _, err := protocol.DecodeWithTime(buf, 10*1024*1024) if err != nil { time.Sleep(time.Millisecond * 1000) conn.Close() diff --git a/filter/accessLog.go b/filter/accessLog.go index ee980087..06c1e6e7 100644 --- a/filter/accessLog.go +++ b/filter/accessLog.go @@ -2,10 +2,11 @@ package filter import ( "encoding/json" - motan "github.com/weibocom/motan-go/core" - "github.com/weibocom/motan-go/log" "strconv" "time" + + motan "github.com/weibocom/motan-go/core" + "github.com/weibocom/motan-go/log" ) const ( @@ -33,17 +34,15 @@ func (t *AccessLogFilter) NewFilter(url *motan.URL) motan.Filter { func (t *AccessLogFilter) Filter(caller motan.Caller, request motan.Request) motan.Response { role := defaultRole var ip string - var start time.Time switch caller.(type) { case motan.Provider: role = serverAgentRole ip = request.GetAttachment(motan.HostKey) - start = request.GetRPCContext(true).RequestReceiveTime case motan.EndPoint: role = clientAgentRole ip = caller.GetURL().Host - start = time.Now() } + start := time.Now() response := t.GetNext().Filter(caller, request) address := ip + ":" + caller.GetURL().GetPortStr() if _, ok := caller.(motan.Provider); ok { @@ -82,6 +81,9 @@ func doAccessLog(filterName string, role string, address string, totalTime int64 // response code should be same as upstream responseCode := "" metaUpstreamCode, _ := response.GetAttachments().Load(motan.MetaUpstreamCode) + if resCtx.Meta != nil { + responseCode = resCtx.Meta.LoadOrEmpty(motan.MetaUpstreamCode) + } var exceptionData []byte if exception != nil { exceptionData, _ = json.Marshal(exception) @@ -92,23 +94,21 @@ func doAccessLog(filterName string, role string, address string, totalTime int64 responseCode = "200" } } - - logEntity := vlog.AcquireAccessLogEntity() - logEntity.FilterName = filterName - logEntity.Role = role - logEntity.RequestID = response.GetRequestID() - logEntity.Service = request.GetServiceName() - logEntity.Method = request.GetMethod() - logEntity.RemoteAddress = address - logEntity.Desc = request.GetMethodDesc() - logEntity.ReqSize = reqCtx.BodySize - logEntity.ResSize = resCtx.BodySize - logEntity.BizTime = response.GetProcessTime() //ms - logEntity.TotalTime = totalTime //ms - logEntity.ResponseCode = responseCode - logEntity.Success = exception == nil - logEntity.Exception = string(exceptionData) - logEntity.UpstreamCode = metaUpstreamCode - - vlog.AccessLog(logEntity) + vlog.AccessLog(&vlog.AccessLogEntity{ + FilterName: filterName, + Role: role, + RequestID: response.GetRequestID(), + Service: request.GetServiceName(), + Method: request.GetMethod(), + RemoteAddress: address, + Desc: request.GetMethodDesc(), + ReqSize: reqCtx.BodySize, + ResSize: resCtx.BodySize, + BizTime: response.GetProcessTime(), //ms + TotalTime: totalTime, //ms + ResponseCode: responseCode, + Success: exception == nil, + Exception: string(exceptionData), + UpstreamCode: metaUpstreamCode, + }) } diff --git a/filter/clusterMetrics.go b/filter/clusterMetrics.go index 75a6e435..bbbbcfbe 100644 --- a/filter/clusterMetrics.go +++ b/filter/clusterMetrics.go @@ -4,6 +4,7 @@ import ( "time" motan "github.com/weibocom/motan-go/core" + "github.com/weibocom/motan-go/metrics" "github.com/weibocom/motan-go/protocol" ) @@ -56,11 +57,11 @@ func (c *ClusterMetricsFilter) Filter(haStrategy motan.HaStrategy, loadBalance m if ctx != nil && ctx.Proxy { role = "motan-client-agent" } - //key := metrics.Escape(role) + - // ":" + metrics.Escape(request.GetAttachment(protocol.MSource)) + - // ":" + metrics.Escape(request.GetMethod()) - keys := []string{role, request.GetAttachment(protocol.MSource), request.GetMethod()} - addMetricWithKeys(request.GetAttachment(protocol.MGroup), ".cluster", - request.GetAttachment(protocol.MPath), keys, time.Since(start).Nanoseconds()/1e6, response) + key := metrics.Escape(role) + + ":" + metrics.Escape(request.GetAttachment(protocol.MSource)) + + ":" + metrics.Escape(request.GetMethod()) + addMetric(metrics.Escape(request.GetAttachment(protocol.MGroup))+".cluster", + metrics.Escape(request.GetAttachment(protocol.MPath)), + key, time.Since(start).Nanoseconds()/1e6, response) return response } diff --git a/filter/metrics.go b/filter/metrics.go index 1bb0fdb4..848eac88 100644 --- a/filter/metrics.go +++ b/filter/metrics.go @@ -1,10 +1,11 @@ package filter import ( + "time" + motan "github.com/weibocom/motan-go/core" "github.com/weibocom/motan-go/metrics" "github.com/weibocom/motan-go/protocol" - "time" ) const ( @@ -56,13 +57,7 @@ func (m *MetricsFilter) GetNext() motan.EndPointFilter { } func (m *MetricsFilter) Filter(caller motan.Caller, request motan.Request) motan.Response { - var start time.Time - switch caller.(type) { - case motan.Provider: - start = request.GetRPCContext(true).RequestReceiveTime - case motan.EndPoint: - start = time.Now() - } + start := time.Now() response := m.GetNext().Filter(caller, request) proxy := false @@ -91,30 +86,30 @@ func (m *MetricsFilter) Filter(caller motan.Caller, request motan.Request) motan if provider { application = caller.GetURL().GetParam(motan.ApplicationKey, "") } - //key := metrics.Escape(role) + - // ":" + metrics.Escape(application) + - // ":" + metrics.Escape(request.GetMethod()) - keys := []string{role, application, request.GetMethod()} - addMetricWithKeys(request.GetAttachment(protocol.MGroup), "", request.GetAttachment(protocol.MPath), - keys, time.Since(start).Nanoseconds()/1e6, response) + key := metrics.Escape(role) + + ":" + metrics.Escape(application) + + ":" + metrics.Escape(request.GetMethod()) + addMetric(metrics.Escape(request.GetAttachment(protocol.MGroup)), + metrics.Escape(request.GetAttachment(protocol.MPath)), + key, time.Since(start).Nanoseconds()/1e6, response) return response } -func addMetricWithKeys(group, groupSuffix string, service string, keys []string, cost int64, response motan.Response) { - metrics.AddCounterWithKeys(group, "", service, keys, MetricsTotalCountSuffix, 1) //total_count - if response.GetException() != nil { //err_count +func addMetric(group string, service string, key string, cost int64, response motan.Response) { + metrics.AddCounter(group, service, key+MetricsTotalCountSuffix, 1) //total_count + if response.GetException() != nil { //err_count exception := response.GetException() if exception.ErrType == motan.BizException { - metrics.AddCounterWithKeys(group, groupSuffix, service, keys, MetricsBizErrorCountSuffix, 1) + metrics.AddCounter(group, service, key+MetricsBizErrorCountSuffix, 1) } else { - metrics.AddCounterWithKeys(group, groupSuffix, service, keys, MetricsOtherErrorCountSuffix, 1) + metrics.AddCounter(group, service, key+MetricsOtherErrorCountSuffix, 1) } } - metrics.AddCounterWithKeys(group, groupSuffix, service, keys, metrics.ElapseTimeSuffix(cost), 1) + metrics.AddCounter(group, service, key+metrics.ElapseTimeSuffix(cost), 1) if cost > 200 { - metrics.AddCounterWithKeys(group, groupSuffix, service, keys, MetricsSlowCountSuffix, 1) + metrics.AddCounter(group, service, key+MetricsSlowCountSuffix, 1) } - metrics.AddHistogramsWithKeys(group, groupSuffix, service, keys, "", cost) + metrics.AddHistograms(group, service, key, cost) } func (m *MetricsFilter) SetContext(context *motan.Context) { diff --git a/filter/metrics_test.go b/filter/metrics_test.go index b0a9d18f..f643882f 100644 --- a/filter/metrics_test.go +++ b/filter/metrics_test.go @@ -41,28 +41,25 @@ func TestMetricsFilter(t *testing.T) { name string caller motan.Caller request motan.Request - keys []string + key string }{ - {name: "proxyClient", caller: ep, request: request, keys: []string{"motan-client-agent", application, testMethod}}, - {name: "proxyServer", caller: provider, request: request, keys: []string{"motan-server-agent", application, testMethod}}, - {name: "Client", caller: ep, request: request2, keys: []string{"motan-client", application, testMethod}}, - {name: "Server", caller: provider, request: request2, keys: []string{"motan-server", application, testMethod}}, - } - var getKeysStr = func(keys []string) string { - return metrics.Escape(keys[0]) + ":" + metrics.Escape(keys[1]) + ":" + metrics.Escape(keys[2]) + {name: "proxyClient", caller: ep, request: request, key: "motan-client-agent:" + metrics.Escape(application) + ":" + metrics.Escape(testMethod)}, + {name: "proxyServer", caller: provider, request: request, key: "motan-server-agent:" + metrics.Escape(application) + ":" + metrics.Escape(testMethod)}, + {name: "Client", caller: ep, request: request2, key: "motan-client:" + metrics.Escape(application) + ":" + metrics.Escape(testMethod)}, + {name: "Server", caller: provider, request: request2, key: "motan-server:" + metrics.Escape(application) + ":" + metrics.Escape(testMethod)}, } for _, test := range tests { t.Run(test.name, func(t *testing.T) { mf.Filter(test.caller, test.request) time.Sleep(10 * time.Millisecond) // The metrics filter has do escape - assert.Equal(t, 1, int(metrics.GetStatItem(testGroup, testService).SnapshotAndClear().Count(getKeysStr(test.keys)+MetricsTotalCountSuffix)), "metric count") + assert.Equal(t, 1, int(metrics.GetStatItem(metrics.Escape(testGroup), metrics.Escape(testService)).SnapshotAndClear().Count(test.key+MetricsTotalCountSuffix)), "metric count") }) } } func TestAddMetric(t *testing.T) { - keys := []string{"motan-client-agent", "testApplication", testMethod} + key := "motan-client-agent:testApplication:" + testMethod factory := initFactory() mf := factory.GetFilter(Metrics).(motan.EndPointFilter) mf.(*MetricsFilter).SetContext(&motan.Context{Config: config.NewConfig()}) @@ -70,9 +67,7 @@ func TestAddMetric(t *testing.T) { response2 := &motan.MotanResponse{ProcessTime: 100, Exception: &motan.Exception{ErrType: motan.BizException}} response3 := &motan.MotanResponse{ProcessTime: 100, Exception: &motan.Exception{ErrType: motan.FrameworkException}} response4 := &motan.MotanResponse{ProcessTime: 1000} - var getKeysStr = func(keys []string) string { - return metrics.Escape(keys[0]) + ":" + metrics.Escape(keys[1]) + ":" + metrics.Escape(keys[2]) - } + tests := []struct { name string response motan.Response @@ -86,11 +81,11 @@ func TestAddMetric(t *testing.T) { } for _, test := range tests { t.Run(test.name, func(t *testing.T) { - addMetricWithKeys(testGroup, "", testService, keys, test.response.GetProcessTime(), test.response) + addMetric(testGroup, testService, key, test.response.GetProcessTime(), test.response) time.Sleep(10 * time.Millisecond) snap := metrics.GetStatItem(testGroup, testService).SnapshotAndClear() for _, k := range test.keys { - assert.True(t, snap.Count(getKeysStr(keys)+k) > 0, fmt.Sprintf("key '%s'", k)) + assert.True(t, snap.Count(key+k) > 0, fmt.Sprintf("key '%s'", k)) } }) } diff --git a/ha/backupRequestHA.go b/ha/backupRequestHA.go index 892e3a5f..d50c6e5b 100644 --- a/ha/backupRequestHA.go +++ b/ha/backupRequestHA.go @@ -67,7 +67,7 @@ func (br *BackupRequestHA) Call(request motan.Request, loadBalance motan.LoadBal successCh := make(chan motan.Response, retries+1) if delay <= 0 { //no delay time configuration // TODO: we should use metrics of the cluster, with traffic control the group may changed - item := metrics.GetStatItem(request.GetAttachment(protocol.MGroup), request.GetAttachment(protocol.MPath)) + item := metrics.GetStatItem(metrics.Escape(request.GetAttachment(protocol.MGroup)), metrics.Escape(request.GetAttachment(protocol.MPath))) if item == nil || item.LastSnapshot() == nil { initDelay := int(br.url.GetMethodPositiveIntValue(request.GetMethod(), request.GetMethodDesc(), "backupRequestInitDelayTime", 0)) if initDelay == 0 { diff --git a/log/bytes.go b/log/bytes.go deleted file mode 100644 index 2e6d6a7a..00000000 --- a/log/bytes.go +++ /dev/null @@ -1,82 +0,0 @@ -package vlog - -import ( - "strconv" - "sync" -) - -var ( - initSize = 256 - innerBytesBufferPool = sync.Pool{New: func() interface{} { - return &innerBytesBuffer{buf: make([]byte, 0, initSize)} - }} -) - -// innerBytesBuffer is a variable-sized buffer of bytes with Write methods. -type innerBytesBuffer struct { - buf []byte // reuse -} - -// newInnerBytesBuffer create an empty innerBytesBuffer with initial size -func newInnerBytesBuffer() *innerBytesBuffer { - return acquireBytesBuffer() -} - -func createInnerBytesBuffer(data []byte) *innerBytesBuffer { - return &innerBytesBuffer{ - buf: data, - } -} - -// WriteString write a str string append the innerBytesBuffer -func (b *innerBytesBuffer) WriteString(str string) { - b.buf = append(b.buf, str...) -} - -// WriteBoolString append the string value of v(true/false) to innerBytesBuffer -func (b *innerBytesBuffer) WriteBoolString(v bool) { - if v { - b.WriteString("true") - } else { - b.WriteString("false") - } -} - -// WriteUint64String append the string value of u to innerBytesBuffer -func (b *innerBytesBuffer) WriteUint64String(u uint64) { - b.buf = strconv.AppendUint(b.buf, u, 10) -} - -// WriteInt64String append the string value of i to innerBytesBuffer -func (b *innerBytesBuffer) WriteInt64String(i int64) { - b.buf = strconv.AppendInt(b.buf, i, 10) -} - -func (b *innerBytesBuffer) Bytes() []byte { return b.buf } - -func (b *innerBytesBuffer) String() string { - return string(b.buf) -} - -func (b *innerBytesBuffer) Reset() { - b.buf = b.buf[:0] -} - -func (b *innerBytesBuffer) Len() int { return len(b.buf) } - -func (b *innerBytesBuffer) Cap() int { return cap(b.buf) } - -func acquireBytesBuffer() *innerBytesBuffer { - b := innerBytesBufferPool.Get() - if b == nil { - return &innerBytesBuffer{buf: make([]byte, 0, 256)} - } - return b.(*innerBytesBuffer) -} - -func releaseBytesBuffer(b *innerBytesBuffer) { - if b != nil { - b.Reset() - innerBytesBufferPool.Put(b) - } -} diff --git a/log/bytes_test.go b/log/bytes_test.go deleted file mode 100644 index df2e598b..00000000 --- a/log/bytes_test.go +++ /dev/null @@ -1,104 +0,0 @@ -package vlog - -import ( - "strconv" - "testing" -) - -func TestWrite(t *testing.T) { - // new BytesBuffer - buf := newInnerBytesBuffer() - if buf.Len() != 0 { - t.Errorf("new buf length not zero.") - } - if buf.Cap() != initSize { - t.Errorf("buf cap not correct.real:%d, expect:%d\n", buf.Cap(), initSize) - } - - // write string - buf.Reset() - buf.WriteString("string1") - buf.WriteString("string2") - tempbytes := buf.Bytes() - if "string1" != string(tempbytes[:7]) { - t.Errorf("write string not correct.buf:%+v\n", buf) - } - if "string2" != string(tempbytes[7:14]) { - t.Errorf("write string not correct.buf:%+v\n", buf) - } - - // write bool string - buf.Reset() - buf.WriteBoolString(true) - buf.WriteBoolString(false) - tempbytes = buf.Bytes() - if "true" != string(tempbytes[:4]) { - t.Errorf("write bool string not correct.buf:%+v\n", buf) - } - if "false" != string(tempbytes[4:9]) { - t.Errorf("write bool string not correct.buf:%+v\n", buf) - } - - // write uint64 string - buf.Reset() - var u1 uint64 = 11111111 - var u2 uint64 = 22222222 - buf.WriteUint64String(u1) - buf.WriteUint64String(u2) - tempbytes = buf.Bytes() - if "11111111" != string(tempbytes[:8]) { - t.Errorf("write unit64 string not correct.buf:%+v\n", buf) - } - if "22222222" != string(tempbytes[8:]) { - t.Errorf("write uint64 string not correct.buf:%+v\n", buf) - } - - // write int64 string - buf.Reset() - var i1 int64 = 11111111 - var i2 int64 = -22222222 - buf.WriteInt64String(i1) - buf.WriteInt64String(i2) - tempbytes = buf.Bytes() - if "11111111" != string(tempbytes[:8]) { - t.Errorf("write unit64 string not correct.buf:%+v\n", buf) - } - if "-22222222" != string(tempbytes[8:]) { - t.Errorf("write uint64 string not correct.buf:%+v\n", buf) - } -} - -func TestRead(t *testing.T) { - buf := newInnerBytesBuffer() - string1 := "aaaaaaaaaaaa" - buf.WriteString(string1) - buf.WriteUint64String(uint64(len(string1))) - buf.WriteBoolString(false) - buf.WriteInt64String(int64(-len(string1))) - - string2 := "bbbbbbbbbbbb" - buf.WriteString(string2) - buf.WriteUint64String(uint64(len(string2))) - buf.WriteBoolString(true) - buf.WriteInt64String(int64(-len(string2))) - - data := buf.Bytes() - buf2 := createInnerBytesBuffer(data) - rsize := len(string1) + 2 + 5 + 3 + len(string2) + 2 + 4 + 3 - if buf2.Len() != rsize { - t.Errorf("read buf len not correct. buf:%v\n", buf2) - } - - // read value - expectValue := string1 + - strconv.Itoa(len(string1)) + - "false" + - "-" + strconv.Itoa(len(string1)) + - string2 + - strconv.Itoa(len(string2)) + - "true" + - "-" + strconv.Itoa(len(string2)) - if expectValue != buf2.String() { - t.Errorf("read value not correct. buf:%v\n", buf2) - } -} diff --git a/log/log.go b/log/log.go index c675691f..d1d484bb 100644 --- a/log/log.go +++ b/log/log.go @@ -1,12 +1,14 @@ package vlog import ( + "bytes" "flag" "github.com/weibocom/motan-go/metrics/sampler" "log" "os" "path/filepath" "runtime/debug" + "strconv" "sync" "sync/atomic" "time" @@ -16,9 +18,6 @@ import ( ) var ( - accessLogEntityPool = sync.Pool{New: func() interface{} { - return new(AccessLogEntity) - }} loggerInstance Logger once sync.Once logDir = flag.String("log_dir", ".", "If non-empty, write log files in this directory") @@ -391,13 +390,12 @@ func (d *defaultLogger) doAccessLog(logObject *AccessLogEntity) { zap.String("exception", logObject.Exception), zap.String("upstreamCode", logObject.UpstreamCode)) } else { - buffer := newInnerBytesBuffer() - + var buffer bytes.Buffer buffer.WriteString(logObject.FilterName) buffer.WriteString("|") buffer.WriteString(logObject.Role) buffer.WriteString("|") - buffer.WriteUint64String(logObject.RequestID) + buffer.WriteString(strconv.FormatUint(logObject.RequestID, 10)) buffer.WriteString("|") buffer.WriteString(logObject.Service) buffer.WriteString("|") @@ -407,15 +405,15 @@ func (d *defaultLogger) doAccessLog(logObject *AccessLogEntity) { buffer.WriteString("|") buffer.WriteString(logObject.RemoteAddress) buffer.WriteString("|") - buffer.WriteInt64String(int64(logObject.ReqSize)) + buffer.WriteString(strconv.Itoa(logObject.ReqSize)) buffer.WriteString("|") - buffer.WriteInt64String(int64(logObject.ResSize)) + buffer.WriteString(strconv.Itoa(logObject.ResSize)) buffer.WriteString("|") - buffer.WriteInt64String(logObject.BizTime) + buffer.WriteString(strconv.FormatInt(logObject.BizTime, 10)) buffer.WriteString("|") - buffer.WriteInt64String(logObject.TotalTime) + buffer.WriteString(strconv.FormatInt(logObject.TotalTime, 10)) buffer.WriteString("|") - buffer.WriteBoolString(logObject.Success) + buffer.WriteString(strconv.FormatBool(logObject.Success)) buffer.WriteString("|") buffer.WriteString(logObject.ResponseCode) buffer.WriteString("|") @@ -423,10 +421,7 @@ func (d *defaultLogger) doAccessLog(logObject *AccessLogEntity) { buffer.WriteString("|") buffer.WriteString(logObject.UpstreamCode) d.accessLogger.Info(buffer.String()) - - releaseBytesBuffer(buffer) } - ReleaseAccessLogEntity(logObject) } func (d *defaultLogger) MetricsLog(msg string) { @@ -495,17 +490,3 @@ func (d *defaultLogger) SetMetricsLogAvailable(status bool) { d.metricsLevel.SetLevel(zapcore.Level(defaultLogLevel + 1)) } } - -func AcquireAccessLogEntity() *AccessLogEntity { - v := accessLogEntityPool.Get() - if v == nil { - return &AccessLogEntity{} - } - return v.(*AccessLogEntity) -} - -func ReleaseAccessLogEntity(entity *AccessLogEntity) { - if entity != nil { - accessLogEntityPool.Put(entity) - } -} diff --git a/manageHandler.go b/manageHandler.go index 991877bb..98fd4208 100644 --- a/manageHandler.go +++ b/manageHandler.go @@ -158,7 +158,7 @@ func (s *StatusHandler) getStatus() []byte { exporter := v.(motan.Exporter) group := exporter.GetURL().Group service := exporter.GetURL().Path - statItem := metrics.GetStatItem(group, service) + statItem := metrics.GetStatItem(metrics.Escape(group), metrics.Escape(service)) if statItem == nil { return true } diff --git a/metrics/graphite.go b/metrics/graphite.go index 07b4bbf3..93f80cb5 100644 --- a/metrics/graphite.go +++ b/metrics/graphite.go @@ -102,21 +102,19 @@ func GenGraphiteMessages(localIP string, snapshots []Snapshot) []string { if len(pni) < minKeyLength { return } - escapedService := snap.GetEscapedService() - escapedGroup := snap.GetEscapedGroup() if snap.IsHistogram(k) { //histogram for slaK, slaV := range sla { segment += fmt.Sprintf("%s.%s.%s.byhost.%s.%s.%s.%s:%.2f|kv\n", - pni[0], pni[1], escapedGroup, localIP, escapedService, pni[2], slaK, snap.Percentile(k, slaV)) + pni[0], pni[1], snap.GetGroup(), localIP, snap.GetService(), pni[2], slaK, snap.Percentile(k, slaV)) } segment += fmt.Sprintf("%s.%s.%s.byhost.%s.%s.%s.%s:%.2f|ms\n", - pni[0], pni[1], escapedGroup, localIP, escapedService, pni[2], "avg_time", snap.Mean(k)) + pni[0], pni[1], snap.GetGroup(), localIP, snap.GetService(), pni[2], "avg_time", snap.Mean(k)) } else if snap.IsCounter(k) { //counter segment = fmt.Sprintf("%s.%s.%s.byhost.%s.%s.%s:%d|c\n", - pni[0], pni[1], escapedGroup, localIP, escapedService, pni[2], snap.Count(k)) + pni[0], pni[1], snap.GetGroup(), localIP, snap.GetService(), pni[2], snap.Count(k)) } else { // gauge segment = fmt.Sprintf("%s.%s.%s.byhost.%s.%s.%s:%d|kv\n", - pni[0], pni[1], escapedGroup, localIP, escapedService, pni[2], snap.Value(k)) + pni[0], pni[1], snap.GetGroup(), localIP, snap.GetService(), pni[2], snap.Value(k)) } if buf.Len() > 0 && buf.Len()+len(segment) > messageMaxLen { messages = append(messages, buf.String()) diff --git a/metrics/metrics.go b/metrics/metrics.go index 6757bfc8..574917b1 100644 --- a/metrics/metrics.go +++ b/metrics/metrics.go @@ -42,7 +42,6 @@ const ( ) var ( - metricsKeyBuilderBufferSize = 64 // NewStatItem is the factory func for StatItem NewStatItem = NewDefaultStatItem items = make(map[string]StatItem, 64) @@ -53,20 +52,15 @@ var ( processor: defaultEventProcessor, //sink processor size eventBus: make(chan *event, eventBufferSize), writers: make(map[string]StatWriter), - evtBuf: &sync.Pool{New: func() interface{} { - return &event{} - }}, + evtBuf: &sync.Pool{New: func() interface{} { return new(event) }}, } - escapeCache sync.Map ) type StatItem interface { SetService(service string) GetService() string - GetEscapedService() string SetGroup(group string) GetGroup() string - GetEscapedGroup() string AddCounter(key string, value int64) AddHistograms(key string, duration int64) AddGauge(key string, value int64) @@ -104,18 +98,17 @@ type StatWriter interface { } func GetOrRegisterStatItem(group string, service string) StatItem { - k := group + service itemsLock.RLock() - item := items[k] + item := items[group+service] itemsLock.RUnlock() if item != nil { return item } itemsLock.Lock() - item = items[k] + item = items[group+service] if item == nil { item = NewStatItem(group, service) - items[k] = item + items[group+service] = item } itemsLock.Unlock() return item @@ -172,20 +165,14 @@ func StatItemSize() int { return len(items) } -// Escape the string avoid invalid graphite key func Escape(s string) string { - if v, ok := escapeCache.Load(s); ok { - return v.(string) - } - v := strings.Map(func(char rune) rune { + return strings.Map(func(char rune) rune { if (char >= 'a' && char <= 'z') || (char >= 'A' && char <= 'Z') || (char >= '0' && char <= '9') || (char == '-') { return char } else { return '_' } }, s) - escapeCache.Store(s, v) - return v } func AddCounter(group string, service string, key string, value int64) { @@ -200,27 +187,13 @@ func AddGauge(group string, service string, key string, value int64) { sendEvent(eventGauge, group, service, key, value) } -func AddCounterWithKeys(group, groupSuffix string, service string, keys []string, keySuffix string, value int64) { - sendEventWithKeys(eventCounter, group, groupSuffix, service, keys, keySuffix, value) -} - -func AddHistogramsWithKeys(group, groupSuffix string, service string, keys []string, suffix string, duration int64) { - sendEventWithKeys(eventHistograms, group, groupSuffix, service, keys, suffix, duration) -} - func sendEvent(eventType int32, group string, service string, key string, value int64) { - sendEventWithKeys(eventType, group, "", service, []string{key}, "", value) -} - -func sendEventWithKeys(eventType int32, group, groupSuffix string, service string, keys []string, suffix string, value int64) { evt := rp.evtBuf.Get().(*event) evt.event = eventType - evt.keys = keys + evt.key = key evt.group = group evt.service = service evt.value = value - evt.keySuffix = suffix - evt.groupSuffix = groupSuffix select { case rp.eventBus <- evt: default: @@ -266,46 +239,11 @@ func startSampleStatus(application string) { } type event struct { - event int32 - keys []string - keySuffix string - group string - groupSuffix string - service string - value int64 -} - -func (s *event) reset() { - s.event = 0 - s.keys = s.keys[:0] - s.keySuffix = "" - s.group = "" - s.service = "" - s.value = 0 - s.groupSuffix = "" -} - -func (s *event) getGroup() string { - if s.groupSuffix == "" { - return s.group - } - return s.group + s.groupSuffix -} - -func (s *event) getMetricKey() string { - keyBuilder := motan.NewBytesBuffer(metricsKeyBuilderBufferSize) - defer motan.ReleaseBytesBuffer(keyBuilder) - l := len(s.keys) - for idx, k := range s.keys { - keyBuilder.WriteString(Escape(k)) - if idx < l-1 { - keyBuilder.WriteString(":") - } - } - if s.keySuffix != "" { - keyBuilder.WriteString(s.keySuffix) - } - return string(keyBuilder.Bytes()) + event int32 + key string + group string + service string + value int64 } type RegistryHolder struct { @@ -332,9 +270,7 @@ func (d *DefaultStatItem) SetService(service string) { func (d *DefaultStatItem) GetService() string { return d.service } -func (d *DefaultStatItem) GetEscapedService() string { - return Escape(d.service) -} + func (d *DefaultStatItem) SetGroup(group string) { d.group = group } @@ -343,10 +279,6 @@ func (d *DefaultStatItem) GetGroup() string { return d.group } -func (d *DefaultStatItem) GetEscapedGroup() string { - return Escape(d.group) -} - func (d *DefaultStatItem) AddCounter(key string, value int64) { c := d.getRegistry().Get(key) if c == nil { @@ -589,10 +521,6 @@ func (d *ReadonlyStatItem) GetService() string { return d.service } -func (d *ReadonlyStatItem) GetEscapedService() string { - return Escape(d.service) -} - func (d *ReadonlyStatItem) SetGroup(group string) { panic("action not supported") } @@ -601,10 +529,6 @@ func (d *ReadonlyStatItem) GetGroup() string { return d.group } -func (d *ReadonlyStatItem) GetEscapedGroup() string { - return Escape(d.group) -} - func (d *ReadonlyStatItem) AddCounter(key string, value int64) { panic("action not supported") } @@ -808,9 +732,6 @@ type reporter struct { func (r *reporter) eventLoop() { for evt := range r.eventBus { r.processEvent(evt) - // clean the event object before put it back - evt.reset() - r.evtBuf.Put(evt) } } @@ -825,15 +746,14 @@ func (r *reporter) addWriter(key string, sw StatWriter) { func (r *reporter) processEvent(evt *event) { defer motan.HandlePanic(nil) - item := GetOrRegisterStatItem(evt.getGroup(), evt.service) - key := evt.getMetricKey() + item := GetOrRegisterStatItem(evt.group, evt.service) switch evt.event { case eventCounter: - item.AddCounter(key, evt.value) + item.AddCounter(evt.key, evt.value) case eventHistograms: - item.AddHistograms(key, evt.value) + item.AddHistograms(evt.key, evt.value) case eventGauge: - item.AddGauge(key, evt.value) + item.AddGauge(evt.key, evt.value) } } diff --git a/protocol/motanProtocol.go b/protocol/motanProtocol.go index 180fb021..0e1e413e 100644 --- a/protocol/motanProtocol.go +++ b/protocol/motanProtocol.go @@ -22,7 +22,7 @@ const ( DefaultMaxContentLength = 10 * 1024 * 1024 ) -// message type +//message type const ( Req = iota Res @@ -60,24 +60,6 @@ type Header struct { RequestID uint64 } -func (h *Header) Reset() { - h.Magic = 0 - h.MsgType = 0 - h.VersionStatus = 0 - h.Serialize = 0 - h.RequestID = 0 -} - -func ResetHeader(h *Header) { - if h != nil { - h.Magic = 0 - h.MsgType = 0 - h.VersionStatus = 0 - h.Serialize = 0 - h.RequestID = 0 - } -} - func (h *Header) Clone() *Header { return &Header{ Magic: h.Magic, @@ -95,7 +77,7 @@ type Message struct { Type int } -// serialize +//serialize const ( Hessian = iota GrpcPb @@ -122,9 +104,6 @@ var ( writeBufPool = &sync.Pool{New: func() interface{} { // for gzip write buffer return &bytes.Buffer{} }} - messagePool = sync.Pool{New: func() interface{} { - return &Message{Metadata: motan.NewStringMap(DefaultMetaSize), Header: &Header{}} - }} ) // errors @@ -287,9 +266,9 @@ func (msg *Message) Encode() (buf *motan.BytesBuffer) { vlog.Errorf("metadata not correct.k:%s, v:%s", k, v) return true } - metabuf.WriteString(k) + metabuf.Write([]byte(k)) metabuf.WriteByte('\n') - metabuf.WriteString(v) + metabuf.Write([]byte(v)) metabuf.WriteByte('\n') return true }) @@ -312,7 +291,6 @@ func (msg *Message) Encode() (buf *motan.BytesBuffer) { if metasize > 0 { buf.Write(metabuf.Bytes()) } - motan.ReleaseBytesBuffer(metabuf) // encode body buf.WriteUint32(uint32(bodysize)) @@ -322,13 +300,6 @@ func (msg *Message) Encode() (buf *motan.BytesBuffer) { return buf } -func (msg *Message) Reset() { - msg.Type = 0 - msg.Body = msg.Body[:0] - msg.Header.Reset() - msg.Metadata.Reset() -} - func (msg *Message) Clone() interface{} { newMessage := &Message{ Header: msg.Header.Clone(), @@ -355,105 +326,93 @@ func CheckMotanVersion(buf *bufio.Reader) (version int, err error) { return int(b[3] >> 3 & 0x1f), nil } -func Decode(buf *bufio.Reader, readSlice *[]byte) (msg *Message, err error) { - msg, _, err = DecodeWithTime(buf, readSlice, motan.DefaultMaxContentLength) +func Decode(buf *bufio.Reader) (msg *Message, err error) { + msg, _, err = DecodeWithTime(buf, motan.DefaultMaxContentLength) return msg, err } -func DecodeWithTime(buf *bufio.Reader, rs *[]byte, maxContentLength int) (msg *Message, start time.Time, err error) { - readSlice := *rs +func DecodeWithTime(buf *bufio.Reader, maxContentLength int) (msg *Message, start time.Time, err error) { + temp := make([]byte, HeaderLength, HeaderLength) + // decode header - _, err = io.ReadAtLeast(buf, readSlice[:HeaderLength], HeaderLength) + _, err = io.ReadAtLeast(buf, temp, HeaderLength) start = time.Now() // record time when starting to read data if err != nil { return nil, start, err } - mn := binary.BigEndian.Uint16(readSlice[:2]) // TODO 不再验证 + mn := binary.BigEndian.Uint16(temp[:2]) // TODO 不再验证 if mn != MotanMagic { vlog.Errorf("wrong magic num:%d, err:%v", mn, err) return nil, start, ErrMagicNum } - msg = messagePool.Get().(*Message) - msg.Header.Magic = MotanMagic - msg.Header.MsgType = readSlice[2] - msg.Header.VersionStatus = readSlice[3] - version := msg.Header.GetVersion() + + header := &Header{Magic: MotanMagic} + header.MsgType = temp[2] + header.VersionStatus = temp[3] + version := header.GetVersion() if version != Version2 { // TODO 不再验证 vlog.Errorf("unsupported protocol version number: %d", version) return nil, start, ErrVersion } - msg.Header.Serialize = readSlice[4] - msg.Header.RequestID = binary.BigEndian.Uint64(readSlice[5:HeaderLength]) + header.Serialize = temp[4] + header.RequestID = binary.BigEndian.Uint64(temp[5:]) // decode meta - _, err = io.ReadAtLeast(buf, readSlice[:4], 4) + _, err = io.ReadAtLeast(buf, temp[:4], 4) if err != nil { - PutMessageBackToPool(msg) return nil, start, err } - metasize := int(binary.BigEndian.Uint32(readSlice[:4])) + metasize := int(binary.BigEndian.Uint32(temp[:4])) if metasize > maxContentLength { vlog.Errorf("meta over size. meta size:%d, max size:%d", metasize, maxContentLength) - PutMessageBackToPool(msg) return nil, start, ErrOverSize } + metamap := motan.NewStringMap(DefaultMetaSize) if metasize > 0 { - if cap(readSlice) < metasize { - readSlice = make([]byte, metasize) - *rs = readSlice - } - err := readBytes(buf, readSlice, metasize) + metadata, err := readBytes(buf, metasize) if err != nil { - PutMessageBackToPool(msg) return nil, start, err } s, e := 0, 0 var k string for i := 0; i <= metasize; i++ { - if i == metasize || readSlice[i] == '\n' { + if i == metasize || metadata[i] == '\n' { e = i if k == "" { - k = string(readSlice[s:e]) + k = string(metadata[s:e]) } else { - msg.Metadata.Store(k, string(readSlice[s:e])) + metamap.Store(k, string(metadata[s:e])) k = "" } s = i + 1 } } if k != "" { - vlog.Errorf("decode message fail, metadata not paired. header:%v, meta:%s", msg.Header, readSlice) - PutMessageBackToPool(msg) + vlog.Errorf("decode message fail, metadata not paired. header:%v, meta:%s", header, metadata) return nil, start, ErrMetadata } } //decode body - _, err = io.ReadAtLeast(buf, readSlice[:4], 4) + _, err = io.ReadAtLeast(buf, temp[:4], 4) if err != nil { - PutMessageBackToPool(msg) return nil, start, err } - bodysize := int(binary.BigEndian.Uint32(readSlice[:4])) + bodysize := int(binary.BigEndian.Uint32(temp[:4])) if bodysize > maxContentLength { vlog.Errorf("body over size. body size:%d, max size:%d", bodysize, maxContentLength) - PutMessageBackToPool(msg) return nil, start, ErrOverSize } - + var body []byte if bodysize > 0 { - if cap(msg.Body) < bodysize { - msg.Body = make([]byte, bodysize) - } - msg.Body = msg.Body[:bodysize] - err = readBytes(buf, msg.Body, bodysize) + body, err = readBytes(buf, bodysize) } else { - msg.Body = make([]byte, 0) + body = make([]byte, 0) } if err != nil { - PutMessageBackToPool(msg) return nil, start, err } + msg = &Message{header, metamap, body, Req} return msg, start, err } @@ -466,14 +425,15 @@ func DecodeGzipBody(body []byte) []byte { return ret } -func readBytes(buf *bufio.Reader, readSlice []byte, size int) error { +func readBytes(buf *bufio.Reader, size int) ([]byte, error) { + tempbytes := make([]byte, size) var s, n = 0, 0 var err error for s < size && err == nil { - n, err = buf.Read(readSlice[s:size]) + n, err = buf.Read(tempbytes[s:]) s += n } - return err + return tempbytes, err } func EncodeGzip(data []byte) ([]byte, error) { @@ -563,7 +523,7 @@ func DecodeGzip(data []byte) (ret []byte, err error) { // ConvertToRequest convert motan2 protocol request message to motan Request func ConvertToRequest(request *Message, serialize motan.Serialization) (motan.Request, error) { - motanRequest := motan.GetMotanRequestFromPool() + motanRequest := &motan.MotanRequest{Arguments: make([]interface{}, 0)} motanRequest.RequestID = request.Header.RequestID if idStr, ok := request.Metadata.Load(MRequestID); !ok { if request.Header.IsProxy() { @@ -589,16 +549,12 @@ func ConvertToRequest(request *Message, serialize motan.Serialization) (motan.Re request.Header.SetGzip(false) } if !rc.Proxy && serialize == nil { - motan.PutMotanRequestBackPool(motanRequest) return nil, ErrSerializeNil } - if len(motanRequest.Arguments) <= 0 { - motanRequest.Arguments = []interface{}{&motan.DeserializableValue{Body: request.Body, Serialization: serialize}} - } else { - motanRequest.Arguments[0].(*motan.DeserializableValue).Body = request.Body - motanRequest.Arguments[0].(*motan.DeserializableValue).Serialization = serialize - } + dv := &motan.DeserializableValue{Body: request.Body, Serialization: serialize} + motanRequest.Arguments = []interface{}{dv} } + return motanRequest, nil } @@ -678,7 +634,7 @@ func ConvertToResMessage(response motan.Response, serialize motan.Serialization) } } - res := messagePool.Get().(*Message) + res := &Message{} var msgType int if response.GetException() != nil { msgType = Exception @@ -704,13 +660,11 @@ func ConvertToResMessage(response motan.Response, serialize motan.Serialization) res.Body = b } else { vlog.Warningf("convert response value fail! serialized value not []byte. res:%+v", response) - PutMessageBackToPool(res) return nil, ErrSerializedData } } else { b, err := serialize.Serialize(response.GetValue()) if err != nil { - PutMessageBackToPool(res) return nil, err } res.Body = b @@ -729,7 +683,7 @@ func ConvertToResMessage(response motan.Response, serialize motan.Serialization) // ConvertToResponse convert protocol response to motan Response func ConvertToResponse(response *Message, serialize motan.Serialization) (motan.Response, error) { - mres := motan.GetMotanResponseFromPool() + mres := &motan.MotanResponse{} rc := mres.GetRPCContext(true) rc.Proxy = response.Header.IsProxy() mres.RequestID = response.Header.RequestID @@ -745,7 +699,6 @@ func ConvertToResponse(response *Message, serialize motan.Serialization) (motan. response.Header.SetGzip(false) } if !rc.Proxy && serialize == nil { - motan.PutMotanResponseBackPool(mres) return nil, ErrSerializeNil } dv := &motan.DeserializableValue{Body: response.Body, Serialization: serialize} @@ -757,7 +710,6 @@ func ConvertToResponse(response *Message, serialize motan.Serialization) (motan. var exception *motan.Exception err := json.Unmarshal([]byte(e), &exception) if err != nil { - motan.PutMotanResponseBackPool(mres) return nil, err } mres.Exception = exception @@ -780,14 +732,3 @@ func ExceptionToJSON(e *motan.Exception) string { errmsg, _ := json.Marshal(e) return string(errmsg) } - -func PutMessageBackToPool(msg *Message) { - if msg != nil { - //msg.Reset() - msg.Type = 0 - msg.Body = msg.Body[:0] - ResetHeader(msg.Header) - msg.Metadata.Reset() - messagePool.Put(msg) - } -} diff --git a/protocol/motanProtocol_test.go b/protocol/motanProtocol_test.go index acc41513..060ae0cb 100644 --- a/protocol/motanProtocol_test.go +++ b/protocol/motanProtocol_test.go @@ -5,10 +5,8 @@ import ( "bytes" "compress/gzip" "fmt" - "github.com/stretchr/testify/assert" "github.com/weibocom/motan-go/serialize" "math/rand" - "strconv" "sync" "sync/atomic" "testing" @@ -147,11 +145,7 @@ func TestEncode(t *testing.T) { ebytes := msg.Encode() fmt.Println("len:", ebytes.Len()) - readSlice := make([]byte, 100) - //for i := 0; i < len(readSlice); i++ { - // readSlice[i] = 't' - //} - newMsg, err := Decode(bufio.NewReader(ebytes), &readSlice) + newMsg, err := Decode(bufio.NewReader(ebytes)) if newMsg == nil { t.Fatalf("encode message fail") } @@ -170,7 +164,7 @@ func TestEncode(t *testing.T) { msg.Header.SetGzip(true) msg.Body, _ = EncodeGzip([]byte("gzip encode")) b := msg.Encode() - newMsg, _ = Decode(bufio.NewReader(b), &readSlice) + newMsg, _ = Decode(bufio.NewReader(b)) // should not decode gzip if !newMsg.Header.IsGzip() { t.Fatalf("encode message fail") @@ -183,106 +177,13 @@ func TestEncode(t *testing.T) { assertTrue(string(nb) == "gzip encode", "body", t) } -func TestPool(t *testing.T) { - h := &Header{} - h.SetVersion(Version2) - h.SetStatus(6) - h.SetOneWay(true) - h.SetSerialize(5) - h.SetGzip(true) - h.SetHeartbeat(true) - h.SetProxy(true) - h.SetRequest(true) - h.Magic = MotanMagic - h.RequestID = 2349789 - meta := core.NewStringMap(0) - for mi := 0; mi < 10000; mi++ { - meta.Store(strconv.Itoa(mi), strconv.Itoa(mi)) - } - body := []byte("testbodytestbodytestbodytestbodytestbodytestbodytestbodytestbodytestbodytestbodytestbody") - msg := &Message{Header: h, Metadata: meta, Body: body} - ebytes := msg.Encode() - - fmt.Println("len:", ebytes.Len()) - readSlice := make([]byte, 100) - newMsg, err := Decode(bufio.NewReader(ebytes), &readSlice) - if newMsg == nil { - t.Fatalf("encode message fail") - } - assertTrue(newMsg.Header.IsOneWay(), "oneway", t) - assertTrue(newMsg.Header.IsGzip(), "gzip", t) - assertTrue(newMsg.Header.IsHeartbeat(), "heartbeat", t) - assertTrue(newMsg.Header.IsProxy(), "proxy", t) - assertTrue(newMsg.Header.isRequest(), "request", t) - assertTrue(newMsg.Header.GetVersion() == Version2, "version", t) - assertTrue(newMsg.Header.GetSerialize() == 5, "serialize", t) - assertTrue(newMsg.Header.GetStatus() == 6, "status", t) - assertTrue(newMsg.Metadata.LoadOrEmpty("1") == "1", "meta", t) - assertTrue(cap(readSlice) > 200, "readSlice", t) - assertTrue(len(newMsg.Body) == len(msg.Body), "body", t) - assert.Nil(t, err) - PutMessageBackToPool(newMsg) - body1 := []byte("testbody") - msg1 := &Message{Header: h, Metadata: meta, Body: body1} - ebytes1 := msg1.Encode() - newMsg, err = Decode(bufio.NewReader(ebytes1), &readSlice) - if newMsg == nil { - t.Fatalf("encode message fail") - } - assertTrue(newMsg.Header.IsOneWay(), "oneway", t) - assertTrue(newMsg.Header.IsGzip(), "gzip", t) - assertTrue(newMsg.Header.IsHeartbeat(), "heartbeat", t) - assertTrue(newMsg.Header.IsProxy(), "proxy", t) - assertTrue(newMsg.Header.isRequest(), "request", t) - assertTrue(newMsg.Header.GetVersion() == Version2, "version", t) - assertTrue(newMsg.Header.GetSerialize() == 5, "serialize", t) - assertTrue(newMsg.Header.GetStatus() == 6, "status", t) - assertTrue(newMsg.Metadata.LoadOrEmpty("1") == "1", "meta", t) - assertTrue(cap(readSlice) > 200, "readSlice", t) - assertTrue(len(newMsg.Body) == len(msg1.Body), "body", t) -} - func assertTrue(b bool, msg string, t *testing.T) { if !b { t.Fatalf("test fail, %s not correct.", msg) } } -func TestConvertToResponse(t *testing.T) { - h := &Header{} - h.SetVersion(Version2) - h.SetStatus(6) - h.SetOneWay(true) - h.SetSerialize(6) - h.SetGzip(true) - h.SetHeartbeat(true) - h.SetProxy(true) - h.SetRequest(true) - h.Magic = MotanMagic - h.RequestID = 2349789 - meta := core.NewStringMap(0) - meta.Store("k1", "v1") - meta.Store(MGroup, "group") - meta.Store(MMethod, "method") - meta.Store(MPath, "path") - body := []byte("testbody") - msg := &Message{Header: h, Metadata: meta, Body: body} - - pMap := make(map[string]string) - for i := 0; i < 10000; i++ { - resp, err := ConvertToResponse(msg, &serialize.SimpleSerialization{}) - assertTrue(err == nil, "conver to request err", t) - assertTrue(resp.GetAttachment(MGroup) == "group", "response group", t) - assertTrue(resp.GetAttachment(MMethod) == "method", "response method", t) - assertTrue(resp.GetAttachment(MPath) == "path", "response path", t) - //assertTrue(resp.GetValue().(string) == "testbody", "response body", t) - pMap[fmt.Sprintf("%p", resp)] = "1" - core.PutMotanResponseBackPool(resp.(*core.MotanResponse)) - } - assert.True(t, len(pMap) < 10000) -} - -// TODO convert +//TODO convert func TestConvertToRequest(t *testing.T) { h := &Header{} h.SetVersion(Version2) @@ -302,19 +203,12 @@ func TestConvertToRequest(t *testing.T) { meta.Store(MPath, "path") body := []byte("testbody") msg := &Message{Header: h, Metadata: meta, Body: body} - pMap := make(map[string]string) - for i := 0; i < 10000; i++ { - req, err := ConvertToRequest(msg, &serialize.SimpleSerialization{}) - assertTrue(err == nil, "conver to request err", t) - assertTrue(req.GetAttachment(MGroup) == "group", "request group", t) - assertTrue(req.GetAttachment(MMethod) == "method", "request method", t) - assertTrue(req.GetAttachment(MPath) == "path", "request path", t) - pMap[fmt.Sprintf("%p", req)] = "1" - core.PutMotanRequestBackPool(req.(*core.MotanRequest)) - } - assert.True(t, len(pMap) < 10000) - req, err := ConvertToRequest(msg, &serialize.SimpleSerialization{}) + assertTrue(err == nil, "conver to request err", t) + assertTrue(req.GetAttachment(MGroup) == "group", "request group", t) + assertTrue(req.GetAttachment(MMethod) == "method", "request method", t) + assertTrue(req.GetAttachment(MPath) == "path", "request path", t) + // test request clone cloneReq := req.Clone().(core.Request) assertTrue(err == nil, "conver to request err", t) diff --git a/provider/httpProvider.go b/provider/httpProvider.go index fdf34119..f19cfe69 100644 --- a/provider/httpProvider.go +++ b/provider/httpProvider.go @@ -512,5 +512,7 @@ func fillException(resp *motan.MotanResponse, start int64, err error) { } func updateUpstreamStatusCode(resp *motan.MotanResponse, statusCode int) { + resCtx := resp.GetRPCContext(true) resp.SetAttachment(motan.MetaUpstreamCode, strconv.Itoa(statusCode)) + resCtx.Meta.Store(motan.MetaUpstreamCode, strconv.Itoa(statusCode)) } diff --git a/server/motanserver.go b/server/motanserver.go index 5eecb3a2..29fbfc04 100644 --- a/server/motanserver.go +++ b/server/motanserver.go @@ -151,7 +151,7 @@ func (m *MotanServer) handleConn(conn net.Conn) { } else { ip = getRemoteIP(conn.RemoteAddr().String()) } - decodeBuf := make([]byte, motan.DefaultDecodeLength) + for { v, err := mpro.CheckMotanVersion(buf) if err != nil { @@ -168,7 +168,7 @@ func (m *MotanServer) handleConn(conn net.Conn) { } go m.processV1(v1Msg, t, ip, conn) } else if v == mpro.Version2 { - msg, t, err := mpro.DecodeWithTime(buf, &decodeBuf, m.maxContextLength) + msg, t, err := mpro.DecodeWithTime(buf, m.maxContextLength) if err != nil { vlog.Warningf("decode motan v2 message fail! con:%s, err:%s.", conn.RemoteAddr().String(), err.Error()) break @@ -219,7 +219,7 @@ func (m *MotanServer) processV2(msg *mpro.Message, start time.Time, ip string, c tc.PutReqSpan(&motan.Span{Name: motan.Convert, Time: time.Now()}) req.GetRPCContext(true).Tc = tc } - + callStart := time.Now() mres = m.handler.Call(req) if tc != nil { // clusterFilter end @@ -229,7 +229,7 @@ func (m *MotanServer) processV2(msg *mpro.Message, start time.Time, ip string, c resCtx := mres.GetRPCContext(true) resCtx.Proxy = m.proxy if mres.GetAttachment(mpro.MProcessTime) == "" { - mres.SetAttachment(mpro.MProcessTime, strconv.FormatInt(int64(time.Now().Sub(start)/1e6), 10)) + mres.SetAttachment(mpro.MProcessTime, strconv.FormatInt(int64(time.Now().Sub(callStart)/1e6), 10)) } res, err = mpro.ConvertToResMessage(mres, serialization) if tc != nil { @@ -247,8 +247,6 @@ func (m *MotanServer) processV2(msg *mpro.Message, start time.Time, ip string, c // recover the communication identifier res.Header.RequestID = lastRequestID resBuf := res.Encode() - // reuse BytesBuffer - defer motan.ReleaseBytesBuffer(resBuf) if tc != nil { tc.PutResSpan(&motan.Span{Name: motan.Encode, Time: time.Now()}) } @@ -270,16 +268,6 @@ func (m *MotanServer) processV2(msg *mpro.Message, start time.Time, ip string, c if tc != nil { tc.PutResSpan(&motan.Span{Name: motan.Send, Time: resSendTime}) } - // 回收message - mpro.PutMessageBackToPool(msg) - mpro.PutMessageBackToPool(res) - // 回收request - if motanReq, ok := mreq.(*motan.MotanRequest); ok { - motan.PutMotanRequestBackPool(motanReq) - } - if motanResp, ok := mres.(*motan.MotanResponse); ok { - motan.PutMotanResponseBackPool(motanResp) - } } func (m *MotanServer) processV1(msg *mpro.MotanV1Message, start time.Time, ip string, conn net.Conn) {