Skip to content

Commit

Permalink
fix_rpc_async_call
Browse files Browse the repository at this point in the history
fix_rpc_async_call

remove AsyncResult.StartTime

runtime and endpoint circuit breaker

bugfix: motanEndpoint_test

unit test

code review

unit test case

unit test case

update defaultFailbackInterval
  • Loading branch information
wuhua3 committed Mar 28, 2024
1 parent af7c115 commit 1d079d1
Show file tree
Hide file tree
Showing 61 changed files with 1,470 additions and 378 deletions.
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ zj*
# gdb
*.gdb_history
__*
*.log

# motan-go
*.pid
Expand All @@ -40,4 +41,4 @@ main/magent*
log/log.test*
go.sum
agent_runtime
test/
test/
86 changes: 61 additions & 25 deletions agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ func (a *Agent) StartMotanAgentFromConfig(config *cfg.Config) {

func (a *Agent) startRegistryFailback() {
vlog.Infoln("start agent failback")
ticker := time.NewTicker(registry.DefaultFailbackInterval * time.Millisecond)
ticker := time.NewTicker(time.Duration(registry.GetFailbackInterval()) * time.Millisecond)
defer ticker.Stop()
for range ticker.C {
a.registryLock.Lock()
Expand Down Expand Up @@ -265,22 +265,8 @@ func (a *Agent) GetRegistryStatus() []map[string]*motan.RegistryStatus {
}

func (a *Agent) registerStatusSampler() {
metrics.RegisterStatusSampleFunc("memory", func() int64 {
p, _ := process.NewProcess(int32(os.Getpid()))
memInfo, err := p.MemoryInfo()
if err != nil {
return 0
}
return int64(memInfo.RSS >> 20)
})
metrics.RegisterStatusSampleFunc("cpu", func() int64 {
p, _ := process.NewProcess(int32(os.Getpid()))
cpuPercent, err := p.CPUPercent()
if err != nil {
return 0
}
return int64(cpuPercent)
})
metrics.RegisterStatusSampleFunc("memory", GetRssMemory)
metrics.RegisterStatusSampleFunc("cpu", GetCpuPercent)
metrics.RegisterStatusSampleFunc("goroutine_count", func() int64 {
return int64(runtime.NumGoroutine())
})
Expand All @@ -289,6 +275,24 @@ func (a *Agent) registerStatusSampler() {
})
}

func GetRssMemory() int64 {
p, _ := process.NewProcess(int32(os.Getpid()))
memInfo, err := p.MemoryInfo()
if err != nil {
return 0
}
return int64(memInfo.RSS >> 20)
}

func GetCpuPercent() int64 {
p, _ := process.NewProcess(int32(os.Getpid()))
cpuPercent, err := p.CPUPercent()
if err != nil {
return 0
}
return int64(cpuPercent)
}

func (a *Agent) initStatus() {
if a.recover {
a.recoverStatus()
Expand Down Expand Up @@ -371,7 +375,7 @@ func (a *Agent) initParam() {
port = defaultPort
}

mPort := *motan.Mport
mPort := motan.GetMport()
if mPort == 0 {
if envMPort, ok := os.LookupEnv("mport"); ok {
if envMPortInt, err := strconv.Atoi(envMPort); err == nil {
Expand Down Expand Up @@ -629,6 +633,7 @@ func (a *Agent) initAgentURL() {
} else {
agentURL.Parameters[motan.ApplicationKey] = agentURL.Group
}
motan.SetApplication(agentURL.Parameters[motan.ApplicationKey])
if agentURL.Group == "" {
agentURL.Group = defaultAgentGroup
agentURL.Parameters[motan.ApplicationKey] = defaultAgentGroup
Expand All @@ -653,7 +658,7 @@ func (a *Agent) startAgent() {
url := a.agentURL.Copy()
url.Port = a.port
handler := &agentMessageHandler{agent: a}
server := &mserver.MotanServer{URL: url}
server := defaultExtFactory.GetServer(url)
server.SetMessageHandler(handler)
vlog.Infof("Motan agent is started. port:%d", a.port)
fmt.Println("Motan agent start.")
Expand All @@ -673,7 +678,7 @@ func (a *Agent) registerAgent() {
if agentURL.Host == "" {
agentURL.Host = motan.GetLocalIP()
}
if registryURL, regexit := a.Context.RegistryURLs[reg]; regexit {
if registryURL, regExist := a.Context.RegistryURLs[reg]; regExist {
registry := a.extFactory.GetRegistry(registryURL)
if registry != nil {
vlog.Infof("agent register in registry:%s, agent url:%s", registry.GetURL().GetIdentity(), agentURL.GetIdentity())
Expand All @@ -697,6 +702,16 @@ type agentMessageHandler struct {
agent *Agent
}

func (a *agentMessageHandler) GetName() string {
return "agentMessageHandler"
}

func (a *agentMessageHandler) GetRuntimeInfo() map[string]interface{} {
info := map[string]interface{}{}
info[motan.RuntimeMessageHandlerTypeKey] = a.GetName()
return info
}

func (a *agentMessageHandler) clusterCall(request motan.Request, ck string, motanCluster *cluster.MotanCluster) (res motan.Response) {
// fill default request info
fillDefaultReqInfo(request, motanCluster.GetURL())
Expand Down Expand Up @@ -935,6 +950,26 @@ type serverAgentMessageHandler struct {
providers *motan.CopyOnWriteMap
}

func (sa *serverAgentMessageHandler) GetName() string {
return "serverAgentMessageHandler"
}

func (sa *serverAgentMessageHandler) GetRuntimeInfo() map[string]interface{} {
info := map[string]interface{}{}
info[motan.RuntimeMessageHandlerTypeKey] = sa.GetName()
providersInfo := map[string]interface{}{}
sa.providers.Range(func(k, v interface{}) bool {
provider, ok := v.(motan.Provider)
if !ok {
return true
}
providersInfo[k.(string)] = provider.GetRuntimeInfo()
return true
})
info[motan.RuntimeProvidersKey] = providersInfo
return info
}

func (sa *serverAgentMessageHandler) Initialize() {
sa.providers = motan.NewCopyOnWriteMap()
}
Expand All @@ -960,9 +995,9 @@ func (sa *serverAgentMessageHandler) Call(request motan.Request) (res motan.Resp
res.GetRPCContext(true).GzipSize = int(p.GetURL().GetIntValue(motan.GzipSizeKey, 0))
return res
}
vlog.Errorf("not found provider for %s", motan.GetReqInfo(request))
vlog.Errorf("%s%s%s", motan.ProviderNotExistPrefix, serviceKey, motan.GetReqInfo(request))
atomic.AddInt64(&notFoundProviderCount, 1)
return motan.BuildExceptionResponse(request.GetRequestID(), &motan.Exception{ErrCode: 500, ErrMsg: "not found provider for " + serviceKey, ErrType: motan.ServiceException})
return motan.BuildExceptionResponse(request.GetRequestID(), &motan.Exception{ErrCode: motan.EProviderNotExist, ErrMsg: motan.ProviderNotExistPrefix + serviceKey, ErrType: motan.ServiceException})
}

func (sa *serverAgentMessageHandler) AddProvider(p motan.Provider) error {
Expand Down Expand Up @@ -1119,13 +1154,12 @@ func (a *Agent) startMServer() {
for kk, vv := range v {
handlers[kk] = vv
}

}
}
for k, v := range handlers {
a.mhandle(k, v)
}

var mPort int
var managementListener net.Listener
if managementUnixSockAddr := a.agentURL.GetParam(motan.ManagementUnixSockKey, ""); managementUnixSockAddr != "" {
listener, err := motan.ListenUnixSock(managementUnixSockAddr)
Expand Down Expand Up @@ -1159,19 +1193,21 @@ func (a *Agent) startMServer() {
managementListener = motan.TCPKeepAliveListener{TCPListener: listener.(*net.TCPListener)}
break
}
mPort = a.mport
if managementListener == nil {
vlog.Warningf("start management server failed for port range %s", startAndPortStr)
return
}
} else {
listener, err := net.Listen("tcp", ":"+strconv.Itoa(a.mport))
mPort = a.mport
if err != nil {
vlog.Infof("listen manage port %d failed:%s", a.mport, err.Error())
return
}
managementListener = motan.TCPKeepAliveListener{TCPListener: listener.(*net.TCPListener)}
}

motan.SetMport(mPort)
vlog.Infof("start listen manage for address: %s", managementListener.Addr().String())
err := http.Serve(managementListener, nil)
if err != nil {
Expand Down
Loading

0 comments on commit 1d079d1

Please sign in to comment.