Skip to content

Commit

Permalink
Profile base to dev (#352)
Browse files Browse the repository at this point in the history
profile done
  • Loading branch information
snail007 authored Dec 26, 2023
1 parent 64558b2 commit ecfa6a0
Show file tree
Hide file tree
Showing 54 changed files with 2,408 additions and 840 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ jobs:
testing:
strategy:
matrix:
go-version: [1.12.x,1.13.x,1.14.x,1.15.x,1.16.x,1.17.x,1.18.x,1.19.x]
go-version: [1.12.x,1.13.x,1.14.x,1.15.x,1.16.x,1.17.x,1.18.x,1.19.x,1.20.x,1.21.x]
platform: [ubuntu-latest]
runs-on: ${{ matrix.platform }}
steps:
Expand Down
93 changes: 40 additions & 53 deletions agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ func (a *Agent) RegisterCommandHandler(f CommandHandler) {
a.commandHandlers = append(a.commandHandlers, f)
}

func (a *Agent) GetDynamicRegistryInfo() *registrySnapInfoStorage {
func (a *Agent) GetDynamicRegistryInfo() *RegistrySnapInfoStorage {
return a.configurer.getRegistryInfo()
}

Expand All @@ -150,7 +150,7 @@ func (a *Agent) RuntimeDir() string {
return a.runtimedir
}

// get Agent server
// GetAgentServer get Agent server
func (a *Agent) GetAgentServer() motan.Server {
return a.agentServer
}
Expand Down Expand Up @@ -300,7 +300,7 @@ func (a *Agent) initStatus() {

func (a *Agent) saveStatus() {
statSnapFile := a.runtimedir + string(filepath.Separator) + defaultStatusSnap
err := ioutil.WriteFile(statSnapFile, []byte(strconv.Itoa(int(http.StatusOK))), 0644)
err := ioutil.WriteFile(statSnapFile, []byte(strconv.Itoa(http.StatusOK)), 0644)
if err != nil {
vlog.Errorln("Save status error: " + err.Error())
return
Expand Down Expand Up @@ -351,6 +351,14 @@ func (a *Agent) initParam() {
initLog(logDir, section)
registerSwitchers(a.Context)

processPoolSize := 0
if section != nil && section["processPoolSize"] != nil {
processPoolSize = section["processPoolSize"].(int)
}
if processPoolSize > 0 {
mserver.SetProcessPoolSize(processPoolSize)
}

port := *motan.Port
if port == 0 && section != nil && section["port"] != nil {
port = section["port"].(int)
Expand Down Expand Up @@ -474,7 +482,7 @@ func (a *Agent) reloadClusters(ctx *motan.Context) {
serviceItemKeep := make(map[string]bool)
clusterMap := make(map[interface{}]interface{})
serviceMap := make(map[interface{}]interface{})
var allRefersURLs = []*motan.URL{}
var allRefersURLs []*motan.URL
if a.configurer != nil {
//keep all dynamic refers
for _, url := range a.configurer.subscribeNodes {
Expand All @@ -490,7 +498,7 @@ func (a *Agent) reloadClusters(ctx *motan.Context) {
}

service := url.Path
mapKey := getClusterKey(url.Group, url.GetStringParamsWithDefault(motan.VersionKey, "0.1"), url.Protocol, url.Path)
mapKey := getClusterKey(url.Group, url.GetStringParamsWithDefault(motan.VersionKey, motan.DefaultReferVersion), url.Protocol, url.Path)

// find exists old serviceMap
var serviceMapValue serviceMapItem
Expand Down Expand Up @@ -589,7 +597,7 @@ func (a *Agent) initCluster(url *motan.URL) {
}
a.serviceMap.UnsafeStore(url.Path, serviceMapItemArr)
})
mapKey := getClusterKey(url.Group, url.GetStringParamsWithDefault(motan.VersionKey, "0.1"), url.Protocol, url.Path)
mapKey := getClusterKey(url.Group, url.GetStringParamsWithDefault(motan.VersionKey, motan.DefaultReferVersion), url.Protocol, url.Path)
a.clsLock.Lock() // Mutually exclusive with the reloadClusters method
defer a.clsLock.Unlock()
a.clusterMap.Store(mapKey, c)
Expand Down Expand Up @@ -748,7 +756,9 @@ func (a *agentMessageHandler) httpCall(request motan.Request, ck string, httpClu
if err != nil {
return getDefaultResponse(request.GetRequestID(), "do http request failed : "+err.Error())
}
res = &motan.MotanResponse{RequestID: request.GetRequestID()}
httpMotanResp := mhttp.AcquireHttpMotanResponse()
httpMotanResp.RequestID = request.GetRequestID()
res = httpMotanResp
mhttp.FasthttpResponseToMotanResponse(res, httpResponse)
return res
}
Expand Down Expand Up @@ -794,61 +804,38 @@ func (a *agentMessageHandler) Call(request motan.Request) (res motan.Response) {
}
return res
}
func (a *agentMessageHandler) matchRule(typ, cond, key string, data []serviceMapItem, f func(u *motan.URL) string) (foundClusters []serviceMapItem, err error) {
if cond == "" {
err = fmt.Errorf("empty %s is not supported", typ)

func (a *agentMessageHandler) findCluster(request motan.Request) (c *cluster.MotanCluster, key string, err error) {
service := request.GetServiceName()
if service == "" {
err = fmt.Errorf("empty service is not supported. service: %s", service)
return
}
for _, item := range data {
if f(item.url) == cond {
foundClusters = append(foundClusters, item)
}
serviceItemArrI, exists := a.agent.serviceMap.Load(service)
if !exists {
err = fmt.Errorf("cluster not found. service: %s", service)
return
}
if len(foundClusters) == 0 {
err = fmt.Errorf("cluster not found. cluster:%s", key)
clusters := serviceItemArrI.([]serviceMapItem)
if len(clusters) == 1 {
//TODO: add strict mode to avoid incorrect group call
c = clusters[0].cluster
return
}
return
}
func (a *agentMessageHandler) findCluster(request motan.Request) (c *cluster.MotanCluster, key string, err error) {
service := request.GetServiceName()
group := request.GetAttachment(mpro.MGroup)
version := request.GetAttachment(mpro.MVersion)
protocol := request.GetAttachment(mpro.MProxyProtocol)
reqInfo := fmt.Sprintf("request information: {service: %s, group: %s, protocol: %s, version: %s}",
service, group, protocol, version)
serviceItemArrI, exists := a.agent.serviceMap.Load(service)
if !exists {
err = fmt.Errorf("cluster not found. cluster:%s, %s", service, reqInfo)
if group == "" {
err = fmt.Errorf("multiple clusters are matched with service: %s, but the group is empty", service)
return
}
search := []struct {
tip string
cond string
condFn func(u *motan.URL) string
}{
{"service", service, func(u *motan.URL) string { return u.Path }},
{"group", group, func(u *motan.URL) string { return u.Group }},
{"protocol", protocol, func(u *motan.URL) string { return u.Protocol }},
{"version", version, func(u *motan.URL) string { return u.GetParam(motan.VersionKey, "") }},
}
foundClusters := serviceItemArrI.([]serviceMapItem)
for i, rule := range search {
if i == 0 {
key = rule.cond
} else {
key += "_" + rule.cond
}
foundClusters, err = a.matchRule(rule.tip, rule.cond, key, foundClusters, rule.condFn)
if err != nil {
return
}
if len(foundClusters) == 1 {
c = foundClusters[0].cluster
version := request.GetAttachment(mpro.MVersion)
protocol := request.GetAttachment(mpro.MProxyProtocol)
for _, j := range clusters {
if j.url.IsMatch(service, group, protocol, version) {
c = j.cluster
return
}
}
err = fmt.Errorf("less condition to select cluster, maybe this service belongs to multiple group, protocol, version; cluster: %s, %s", key, reqInfo)
err = fmt.Errorf("no cluster matches the request; info: {service: %s, group: %s, protocol: %s, version: %s}", service, group, protocol, version)
return
}

Expand Down Expand Up @@ -1145,7 +1132,7 @@ func (a *Agent) startMServer() {
continue
}
a.mport = port
managementListener = motan.TCPKeepAliveListener{listener.(*net.TCPListener)}
managementListener = motan.TCPKeepAliveListener{TCPListener: listener.(*net.TCPListener)}
break
}
if managementListener == nil {
Expand All @@ -1158,7 +1145,7 @@ func (a *Agent) startMServer() {
vlog.Infof("listen manage port %d failed:%s", a.mport, err.Error())
return
}
managementListener = motan.TCPKeepAliveListener{listener.(*net.TCPListener)}
managementListener = motan.TCPKeepAliveListener{TCPListener: listener.(*net.TCPListener)}
}

vlog.Infof("start listen manage for address: %s", managementListener.Addr().String())
Expand Down
25 changes: 15 additions & 10 deletions agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
vlog "github.com/weibocom/motan-go/log"
"github.com/weibocom/motan-go/registry"
"github.com/weibocom/motan-go/serialize"
"github.com/weibocom/motan-go/server"
_ "github.com/weibocom/motan-go/server"
_ "golang.org/x/net/context"
"io/ioutil"
Expand Down Expand Up @@ -63,6 +64,7 @@ motan-agent:
log_dir: "stdout"
snapshot_dir: "./snapshot"
application: "testing"
processPoolSize: 100
motan-registry:
direct:
Expand All @@ -87,6 +89,7 @@ motan-refer:
resp := c1.BaseCall(req, nil)
assert.Nil(t, resp.GetException())
assert.Equal(t, "Hello jack from motan server", resp.GetValue())
assert.Equal(t, 100, server.GetProcessPoolSize())
}
func Test_unixClientCall2(t *testing.T) {
t.Parallel()
Expand Down Expand Up @@ -387,7 +390,7 @@ func TestAgent_InitCall(t *testing.T) {
}

//test init cluster with one path and one groups in clusterMap
temp := agent.clusterMap.LoadOrNil(getClusterKey("test1", "0.1", "", ""))
temp := agent.clusterMap.LoadOrNil(getClusterKey("test1", "1.0", "", ""))
assert.NotNil(t, temp, "init cluster with one path and two groups in clusterMap fail")

//test agentHandler call with group
Expand All @@ -413,15 +416,18 @@ func TestAgent_InitCall(t *testing.T) {
version string
except string
}{
// only input service,and there is only one cluster,findCluster would return successfully
{"test0", "", "", "", "No refers for request"},
{"test-1", "111", "222", "333", "cluster not found. cluster:test-1"},
{"test3", "", "", "", "empty group is not supported"},
{"test0", "g0", "", "", "No refers for request"},
{"test0", "g0", "http", "", "No refers for request"},
{"test0", "g0", "", "1.3", "No refers for request"},
{"test-1", "111", "222", "333", "cluster not found"},
{"test", "g2", "", "", "No refers for request"},
{"test", "g1", "", "", "empty protocol is not supported"},
{"test", "g1", "motan2", "", "No refers for request"},
{"test", "g1", "motan", "", "empty version is not supported"},
{"test", "g1", "http", "1.3", "No refers for request"},
{"test", "g1", "http", "1.2", "less condition to select cluster"},
{"test", "b", "c", "d", "no cluster matches the request"},
// one service matches multiple clusters, without passing group
{"test", "", "c", "d", "multiple clusters are matched with service"},
} {
request.ServiceName = v.service
request.SetAttachment(mpro.MGroup, v.group)
Expand Down Expand Up @@ -479,10 +485,9 @@ func TestAgent_InitCall(t *testing.T) {
version string
except string
}{
{"test3", "111", "222", "333", "cluster not found. cluster:test3"},
{"test4", "", "", "", "empty group is not supported"},
{"test3", "111", "222", "333", "cluster not found. service: test3"},
{"test5", "", "", "", "No refers for request"},
{"helloService2", "", "", "", "cluster not found. cluster:helloService2"},
{"helloService2", "", "", "", "cluster not found. service: helloService2"},
} {
request = newRequest(v.service, "")
request.SetAttachment(mpro.MGroup, v.group)
Expand Down Expand Up @@ -633,7 +638,7 @@ motan-service:
c1.Initialize()
var reply []byte
req := c1.BuildRequestWithGroup("helloService", "/unixclient", []interface{}{}, "hello")
req.SetAttachment("HTTP_HOST", "test.com")
req.SetAttachment("http_Host", "test.com")
resp := c1.BaseCall(req, &reply)
assert.Nil(t, resp.GetException())
assert.Equal(t, "okay", string(reply))
Expand Down
4 changes: 3 additions & 1 deletion cluster/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,9 +108,11 @@ type CmdList []ClientCommand
func (c CmdList) Len() int {
return len(c)
}

func (c CmdList) Swap(i, j int) {
c[i], c[j] = c[j], c[i]
}

func (c CmdList) Less(i, j int) bool {
return c[i].Index < c[j].Index
}
Expand Down Expand Up @@ -149,7 +151,7 @@ func GetCommandRegistryWrapper(cluster *MotanCluster, registry motan.Registry) m
mixGroups := cluster.GetURL().GetParam(motan.MixGroups, "")
if mixGroups != "" {
groups := strings.Split(mixGroups, ",")
command := &ClientCommand{CommandType: CMDTrafficControl, Index: 0, Version: "1.0", MergeGroups: make([]string, 0, len(groups)+1)}
command := &ClientCommand{CommandType: CMDTrafficControl, Index: 0, Version: motan.DefaultReferVersion, MergeGroups: make([]string, 0, len(groups)+1)}
ownGroup := cluster.GetURL().Group
command.MergeGroups = append(command.MergeGroups, ownGroup)
for _, group := range groups {
Expand Down
9 changes: 9 additions & 0 deletions cluster/motanCluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ func (m *MotanCluster) GetURL() *motan.URL {
func (m *MotanCluster) SetURL(url *motan.URL) {
m.url = url
}

func (m *MotanCluster) Call(request motan.Request) (res motan.Response) {
defer motan.HandlePanic(func() {
res = motan.BuildExceptionResponse(request.GetRequestID(), &motan.Exception{ErrCode: 500, ErrMsg: "cluster call panic", ErrType: motan.ServiceException})
Expand All @@ -71,6 +72,7 @@ func (m *MotanCluster) Call(request motan.Request) (res motan.Response) {
vlog.Infoln("cluster:" + m.GetIdentity() + "is not available!")
return motan.BuildExceptionResponse(request.GetRequestID(), &motan.Exception{ErrCode: 500, ErrMsg: "cluster not available, maybe caused by degrade", ErrType: motan.ServiceException})
}

func (m *MotanCluster) initCluster() bool {
m.registryRefers = make(map[string][]motan.EndPoint)
//ha
Expand Down Expand Up @@ -99,15 +101,19 @@ func (m *MotanCluster) initCluster() bool {
vlog.Infof("init MotanCluster %s", m.GetIdentity())
return true
}

func (m *MotanCluster) SetLoadBalance(loadBalance motan.LoadBalance) {
m.LoadBalance = loadBalance
}

func (m *MotanCluster) SetHaStrategy(haStrategy motan.HaStrategy) {
m.HaStrategy = haStrategy
}

func (m *MotanCluster) GetRefers() []motan.EndPoint {
return m.Refers
}

func (m *MotanCluster) refresh() {
newRefers := make([]motan.EndPoint, 0, 32)
for _, v := range m.registryRefers {
Expand All @@ -120,14 +126,17 @@ func (m *MotanCluster) refresh() {
m.Refers = newRefers
m.LoadBalance.OnRefresh(newRefers)
}

func (m *MotanCluster) ShuffleEndpoints(endpoints []motan.EndPoint) []motan.EndPoint {
rand.Seed(time.Now().UnixNano())
rand.Shuffle(len(endpoints), func(i, j int) { endpoints[i], endpoints[j] = endpoints[j], endpoints[i] })
return endpoints
}

func (m *MotanCluster) AddRegistry(registry motan.Registry) {
m.Registries = append(m.Registries, registry)
}

func (m *MotanCluster) Notify(registryURL *motan.URL, urls []*motan.URL) {
vlog.Infof("cluster %s receive notify size %d. ", m.GetIdentity(), len(urls))
m.notifyLock.Lock()
Expand Down
Loading

0 comments on commit ecfa6a0

Please sign in to comment.