Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/profile_base' into profile_base
Browse files Browse the repository at this point in the history
  • Loading branch information
snail007 committed Dec 13, 2023
2 parents 131f4c9 + 5d9a877 commit 0456692
Show file tree
Hide file tree
Showing 10 changed files with 267 additions and 219 deletions.
65 changes: 19 additions & 46 deletions agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,6 @@ var (
setAgentLock sync.Mutex
notFoundProviderCount int64 = 0
defaultInitClusterTimeout int64 = 10000 //ms
clusterSlicePool = sync.Pool{
New: func() interface{} {
return make([]serviceMapItem, 0, 5)
},
}
)

type Agent struct {
Expand Down Expand Up @@ -442,7 +437,7 @@ func (a *Agent) initHTTPClusters() {
}
httpCluster := cluster.NewHTTPCluster(url, true, a.Context, a.extFactory)
if httpCluster == nil {
vlog.Errorf("Create http cluster %s failed", id)
vlog.Errorf("Create http cluster %s failed", id)
continue
}
// here the domain has value
Expand Down Expand Up @@ -820,44 +815,32 @@ func (a *agentMessageHandler) findCluster(request motan.Request) (c *cluster.Mot
group := request.GetAttachment(mpro.MGroup)
version := request.GetAttachment(mpro.MVersion)
protocol := request.GetAttachment(mpro.MProxyProtocol)
if service == "" {
err = fmt.Errorf("empty service is not supported. info: {service: %s, group: %s, protocol: %s, version: %s}", service, group, protocol, version)
return
}
serviceItemArrI, exists := a.agent.serviceMap.Load(service)
if !exists {
err = fmt.Errorf("cluster not found. cluster:%s, %s", service, getReqInfo(service, group, protocol, version))
err = fmt.Errorf("cluster not found. info: {service: %s, group: %s, protocol: %s, version: %s}", service, group, protocol, version)
return
}
search := []struct {
tip string
cond string
condFn func(u *motan.URL) string
}{
{"service", service, func(u *motan.URL) string { return u.Path }},
{"group", group, func(u *motan.URL) string { return u.Group }},
{"protocol", protocol, func(u *motan.URL) string { return u.Protocol }},
{"version", version, func(u *motan.URL) string { return u.GetParam(motan.VersionKey, "") }},
}
clusters := serviceItemArrI.([]serviceMapItem)
matched := clusterSlicePool.Get().([]serviceMapItem)
if cap(matched) < len(clusters) {
matched = make([]serviceMapItem, 0, len(clusters))
}
releaseClusterSlice(matched)
for i, rule := range search {
if i == 0 {
key = rule.cond
} else {
key += "_" + rule.cond
}
err = a.fillMatch(rule.tip, rule.cond, key, clusters, rule.condFn, &matched)
if err != nil {
return
}
if len(matched) == 1 {
c = matched[0].cluster
if len(clusters) == 1 {
//TODO: add strict mode to avoid incorrect group call
c = clusters[0].cluster
return
}
if group == "" {
err = fmt.Errorf("multiple clusters are matched with service: %s, but the group is empty", service)
return
}
for _, j := range clusters {
if j.url.IsMatch(service, group, protocol, version) {
c = j.cluster
return
}
matched = matched[:0]
}
err = fmt.Errorf("less condition to select cluster, maybe this service belongs to multiple group, protocol, version; cluster: %s, %s", key, getReqInfo(service, group, protocol, version))
err = fmt.Errorf("no cluster matches the request; info: {service: %s, group: %s, protocol: %s, version: %s}", service, group, protocol, version)
return
}

Expand Down Expand Up @@ -1231,11 +1214,6 @@ func urlExist(url *motan.URL, urls map[string]*motan.URL) bool {
return false
}

func getReqInfo(service, group, protocol, version string) string {
return fmt.Sprintf("request information: {service: %s, group: %s, protocol: %s, version: %s}",
service, group, protocol, version)
}

func (a *Agent) SubscribeService(url *motan.URL) error {
if urlExist(url, a.Context.RefersURLs) {
return fmt.Errorf("url exist, ignore subscribe, url: %s", url.GetIdentity())
Expand Down Expand Up @@ -1265,8 +1243,3 @@ func (a *Agent) UnexportService(url *motan.URL) error {
}
return nil
}

func releaseClusterSlice(s []serviceMapItem) {
s = s[:0]
clusterSlicePool.Put(s)
}
18 changes: 10 additions & 8 deletions agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -413,15 +413,18 @@ func TestAgent_InitCall(t *testing.T) {
version string
except string
}{
// 只传service,且只有一个cluster,findcCluster 会正常返回
{"test0", "", "", "", "No refers for request"},
{"test-1", "111", "222", "333", "cluster not found. cluster:test-1"},
{"test3", "", "", "", "empty group is not supported"},
{"test0", "g0", "", "", "No refers for request"},
{"test0", "g0", "http", "", "No refers for request"},
{"test0", "g0", "", "1.3", "No refers for request"},
{"test-1", "111", "222", "333", "cluster not found"},
{"test", "g2", "", "", "No refers for request"},
{"test", "g1", "", "", "empty protocol is not supported"},
{"test", "g1", "motan2", "", "No refers for request"},
{"test", "g1", "motan", "", "empty version is not supported"},
{"test", "g1", "http", "1.3", "No refers for request"},
{"test", "g1", "http", "1.2", "less condition to select cluster"},
{"test", "b", "c", "d", "no cluster matches the request"},
// 同一个service有多个cluster可以匹配,但是group没有传
{"test", "", "c", "d", "multiple clusters are matched with service"},
} {
request.ServiceName = v.service
request.SetAttachment(mpro.MGroup, v.group)
Expand Down Expand Up @@ -479,10 +482,9 @@ func TestAgent_InitCall(t *testing.T) {
version string
except string
}{
{"test3", "111", "222", "333", "cluster not found. cluster:test3"},
{"test4", "", "", "", "empty group is not supported"},
{"test3", "111", "222", "333", "cluster not found. info: {service: test3"},
{"test5", "", "", "", "No refers for request"},
{"helloService2", "", "", "", "cluster not found. cluster:helloService2"},
{"helloService2", "", "", "", "cluster not found. info: {service: helloService2"},
} {
request = newRequest(v.service, "")
request.SetAttachment(mpro.MGroup, v.group)
Expand Down
1 change: 0 additions & 1 deletion core/map.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ func (m *StringMap) Store(key, value string) {
}

func (m *StringMap) Reset() {
//TODO: 这个地方是否应该加锁呢?
m.mu.Lock()
for k := range m.innerMap {
delete(m.innerMap, k)
Expand Down
Loading

0 comments on commit 0456692

Please sign in to comment.