Skip to content

Commit

Permalink
Revert "Profile base to dev (#350)" (#351)
Browse files Browse the repository at this point in the history
This reverts commit 3cf5d10.
  • Loading branch information
snail007 authored Dec 6, 2023
1 parent 3cf5d10 commit 64558b2
Show file tree
Hide file tree
Showing 23 changed files with 252 additions and 1,027 deletions.
51 changes: 16 additions & 35 deletions agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,6 @@ var (
setAgentLock sync.Mutex
notFoundProviderCount int64 = 0
defaultInitClusterTimeout int64 = 10000 //ms
clusterSlicePool = sync.Pool{
New: func() interface{} {
return make([]serviceMapItem, 0, 5)
},
}
)

type Agent struct {
Expand Down Expand Up @@ -799,28 +794,32 @@ func (a *agentMessageHandler) Call(request motan.Request) (res motan.Response) {
}
return res
}
func (a *agentMessageHandler) fillMatch(typ, cond, key string, data []serviceMapItem, f func(u *motan.URL) string, match *[]serviceMapItem) error {
func (a *agentMessageHandler) matchRule(typ, cond, key string, data []serviceMapItem, f func(u *motan.URL) string) (foundClusters []serviceMapItem, err error) {
if cond == "" {
return fmt.Errorf("empty %s is not supported", typ)
err = fmt.Errorf("empty %s is not supported", typ)
return
}
for _, item := range data {
if f(item.url) == cond {
*match = append(*match, item)
foundClusters = append(foundClusters, item)
}
}
if len(*match) == 0 {
return fmt.Errorf("cluster not found. cluster:%s", key)
if len(foundClusters) == 0 {
err = fmt.Errorf("cluster not found. cluster:%s", key)
return
}
return nil
return
}
func (a *agentMessageHandler) findCluster(request motan.Request) (c *cluster.MotanCluster, key string, err error) {
service := request.GetServiceName()
group := request.GetAttachment(mpro.MGroup)
version := request.GetAttachment(mpro.MVersion)
protocol := request.GetAttachment(mpro.MProxyProtocol)
reqInfo := fmt.Sprintf("request information: {service: %s, group: %s, protocol: %s, version: %s}",
service, group, protocol, version)
serviceItemArrI, exists := a.agent.serviceMap.Load(service)
if !exists {
err = fmt.Errorf("cluster not found. cluster:%s, %s", service, getReqInfo(service, group, protocol, version))
err = fmt.Errorf("cluster not found. cluster:%s, %s", service, reqInfo)
return
}
search := []struct {
Expand All @@ -833,31 +832,23 @@ func (a *agentMessageHandler) findCluster(request motan.Request) (c *cluster.Mot
{"protocol", protocol, func(u *motan.URL) string { return u.Protocol }},
{"version", version, func(u *motan.URL) string { return u.GetParam(motan.VersionKey, "") }},
}
clusters := serviceItemArrI.([]serviceMapItem)
matched := clusterSlicePool.Get().([]serviceMapItem)
if cap(matched) < len(clusters) {
matched = make([]serviceMapItem, 0, len(clusters))
}
foundClusters := serviceItemArrI.([]serviceMapItem)
for i, rule := range search {
if i == 0 {
key = rule.cond
} else {
key += "_" + rule.cond
}
err = a.fillMatch(rule.tip, rule.cond, key, clusters, rule.condFn, &matched)
foundClusters, err = a.matchRule(rule.tip, rule.cond, key, foundClusters, rule.condFn)
if err != nil {
putBackClusterSlice(matched)
return
}
if len(matched) == 1 {
c = matched[0].cluster
putBackClusterSlice(matched)
if len(foundClusters) == 1 {
c = foundClusters[0].cluster
return
}
matched = matched[:0]
}
err = fmt.Errorf("less condition to select cluster, maybe this service belongs to multiple group, protocol, version; cluster: %s, %s", key, getReqInfo(service, group, protocol, version))
putBackClusterSlice(matched)
err = fmt.Errorf("less condition to select cluster, maybe this service belongs to multiple group, protocol, version; cluster: %s, %s", key, reqInfo)
return
}

Expand Down Expand Up @@ -1231,11 +1222,6 @@ func urlExist(url *motan.URL, urls map[string]*motan.URL) bool {
return false
}

func getReqInfo(service, group, protocol, version string) string {
return fmt.Sprintf("request information: {service: %s, group: %s, protocol: %s, version: %s}",
service, group, protocol, version)
}

func (a *Agent) SubscribeService(url *motan.URL) error {
if urlExist(url, a.Context.RefersURLs) {
return fmt.Errorf("url exist, ignore subscribe, url: %s", url.GetIdentity())
Expand Down Expand Up @@ -1265,8 +1251,3 @@ func (a *Agent) UnexportService(url *motan.URL) error {
}
return nil
}

func putBackClusterSlice(s []serviceMapItem) {
s = s[:0]
clusterSlicePool.Put(s)
}
59 changes: 3 additions & 56 deletions core/bytes.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,6 @@ import (
"encoding/binary"
"errors"
"io"
"math/rand"
"sync"
"time"
)

var (
maxReuseBufSize = 204800
discardRatio = 0.1
bytesBufferPool = sync.Pool{New: func() interface{} {
return new(BytesBuffer)
}}
)

// BytesBuffer is a variable-sized buffer of bytes with Read and Write methods.
Expand All @@ -37,16 +26,10 @@ func NewBytesBuffer(initsize int) *BytesBuffer {

// NewBytesBufferWithOrder create a empty BytesBuffer with initial size and byte order
func NewBytesBufferWithOrder(initsize int, order binary.ByteOrder) *BytesBuffer {
bb := AcquireBytesBuffer()
if bb.buf == nil {
bb.buf = make([]byte, initsize)
}
if bb.temp == nil {
bb.temp = make([]byte, 8)
return &BytesBuffer{buf: make([]byte, initsize),
order: order,
temp: make([]byte, 8),
}
bb.order = order

return bb
}

// CreateBytesBuffer create a BytesBuffer from data bytes
Expand Down Expand Up @@ -95,16 +78,6 @@ func (b *BytesBuffer) WriteByte(c byte) {
b.wpos++
}

// WriteString write a str string append the BytesBuffer, and the wpos will increase len(str)
func (b *BytesBuffer) WriteString(str string) {
l := len(str)
if len(b.buf) < b.wpos+l {
b.grow(l)
}
copy(b.buf[b.wpos:], str)
b.wpos += l
}

// Write write a byte array append the BytesBuffer, and the wpos will increase len(bytes)
func (b *BytesBuffer) Write(bytes []byte) {
l := len(bytes)
Expand Down Expand Up @@ -289,29 +262,3 @@ func (b *BytesBuffer) Remain() int { return b.wpos - b.rpos }
func (b *BytesBuffer) Len() int { return b.wpos - 0 }

func (b *BytesBuffer) Cap() int { return cap(b.buf) }

func hitDiscard() bool {
r := rand.New(rand.NewSource(time.Now().UnixNano())).Intn(100)
if float64(r)/100 < discardRatio {
return true
}
return false
}

func AcquireBytesBuffer() *BytesBuffer {
b := bytesBufferPool.Get()
if b == nil {
return &BytesBuffer{}
}
return b.(*BytesBuffer)
}

func ReleaseBytesBuffer(b *BytesBuffer) {
if b != nil {
//if cap(b.buf) > maxReuseBufSize && hitDiscard() {
// return
//}
b.Reset()
bytesBufferPool.Put(b)
}
}
4 changes: 0 additions & 4 deletions core/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,3 @@ const (
EUnkonwnMsg = 1003
EConvertMsg = 1004
)

const (
DefaultDecodeLength = 100
)
22 changes: 11 additions & 11 deletions core/map.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,6 @@ func (m *StringMap) Store(key, value string) {
m.mu.Unlock()
}

func (m *StringMap) Reset() {
//TODO: 这个地方是否应该加锁呢?
m.mu.Lock()
for k := range m.innerMap {
delete(m.innerMap, k)
}
m.mu.Unlock()
}

func (m *StringMap) Delete(key string) {
m.mu.Lock()
delete(m.innerMap, key)
Expand All @@ -57,8 +48,17 @@ func (m *StringMap) LoadOrEmpty(key string) string {
// If f returns false, range stops the iteration
func (m *StringMap) Range(f func(k, v string) bool) {
m.mu.RLock()
defer m.mu.RUnlock()
for k, v := range m.innerMap {
keys := make([]string, 0, len(m.innerMap))
for k := range m.innerMap {
keys = append(keys, k)
}
m.mu.RUnlock()

for _, k := range keys {
v, ok := m.Load(k)
if !ok {
continue
}
if !f(k, v) {
break
}
Expand Down
Loading

0 comments on commit 64558b2

Please sign in to comment.