Skip to content

Commit

Permalink
Merge pull request #393 from weibocom/dev
Browse files Browse the repository at this point in the history
Dev
  • Loading branch information
rayzhang0603 authored Apr 2, 2024
2 parents db996c8 + 4f95c41 commit a6f5d1c
Show file tree
Hide file tree
Showing 20 changed files with 716 additions and 45 deletions.
29 changes: 26 additions & 3 deletions agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"path/filepath"
"runtime"
"strconv"
"strings"
"sync"
"sync/atomic"
"time"
Expand Down Expand Up @@ -75,6 +76,7 @@ type Agent struct {
httpProxyServer *mserver.HTTPProxyServer

manageHandlers map[string]http.Handler
envHandlers map[string]map[string]http.Handler

svcLock sync.Mutex
clsLock sync.Mutex
Expand Down Expand Up @@ -109,6 +111,7 @@ func NewAgent(extfactory motan.ExtensionFactory) *Agent {
agent.agentPortServer = make(map[int]motan.Server)
agent.serviceRegistries = motan.NewCopyOnWriteMap()
agent.manageHandlers = make(map[string]http.Handler)
agent.envHandlers = make(map[string]map[string]http.Handler)
agent.serviceMap = motan.NewCopyOnWriteMap()
return agent
}
Expand Down Expand Up @@ -778,9 +781,13 @@ func fillDefaultReqInfo(r motan.Request, url *motan.URL) {
}
} else {
if r.GetAttachment(mpro.MSource) == "" {
application := url.GetParam(motan.ApplicationKey, "")
if application != "" {
r.SetAttachment(mpro.MSource, application)
if app := r.GetAttachment(motan.ApplicationKey); app != "" {
r.SetAttachment(mpro.MSource, app)
} else {
application := url.GetParam(motan.ApplicationKey, "")
if application != "" {
r.SetAttachment(mpro.MSource, application)
}
}
}
if r.GetAttachment(mpro.MGroup) == "" {
Expand Down Expand Up @@ -1091,6 +1098,12 @@ func (a *Agent) RegisterManageHandler(path string, handler http.Handler) {
}
}

func (a *Agent) RegisterEnvHandlers(envStr string, handlers map[string]http.Handler) {
if envStr != "" && handlers != nil {
a.envHandlers[envStr] = handlers // override
}
}

func (a *Agent) startMServer() {
handlers := make(map[string]http.Handler, 16)
for k, v := range GetDefaultManageHandlers() {
Expand All @@ -1099,6 +1112,16 @@ func (a *Agent) startMServer() {
for k, v := range a.manageHandlers {
handlers[k] = v
}
// register env handlers
extHandelrs := os.Getenv(motan.HandlerEnvironmentName)
for _, k := range strings.Split(extHandelrs, ",") {
if v, ok := a.envHandlers[strings.TrimSpace(k)]; ok {
for kk, vv := range v {
handlers[kk] = vv
}

}
}
for k, v := range handlers {
a.mhandle(k, v)
}
Expand Down
52 changes: 52 additions & 0 deletions agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,58 @@ motan-refer:
assert.Equal(t, "Hello jack from motan server", resp.GetValue())
assert.Equal(t, 100, server.GetProcessPoolSize())
}

func Test_envHandler(t *testing.T) {
t.Parallel()
time.Sleep(time.Second * 3)
// start client mesh
ext := GetDefaultExtFactory()
os.Remove("agent.sock")
config, _ := config.NewConfigFromReader(bytes.NewReader([]byte(`
motan-agent:
mport: 13500
port: 14821
eport: 14281
htport: 25282
motan-registry:
direct:
protocol: direct
address: 127.0.0.1:22991
motan-refer:
recom-engine-refer:
group: hello
path: helloService
protocol: motan2
registry: direct
asyncInitConnection: false
serialization: breeze`)))
agent := NewAgent(ext)
agent.RegisterEnvHandlers("testHandler", map[string]http.Handler{
"/test/test": testHandler(),
})
os.Setenv(core.HandlerEnvironmentName, "testHandler")
go agent.StartMotanAgentFromConfig(config)
time.Sleep(time.Second * 3)
client := http.Client{
Timeout: time.Second,
}
resp, err := client.Get("http://127.0.0.1:13500/test/test")
assert.Nil(t, err)
defer resp.Body.Close()
b, err := ioutil.ReadAll(resp.Body)
assert.Nil(t, err)
assert.Equal(t, string(b), "OK")
os.Unsetenv(core.HandlerEnvironmentName)
}

func testHandler() http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
w.Write([]byte("OK"))
}
}

func Test_unixClientCall2(t *testing.T) {
t.Parallel()
startServer(t, "helloService", 22992)
Expand Down
8 changes: 8 additions & 0 deletions core/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,11 @@ const (
UnixSockProtocolFlag = "unix://"
)

// metrics request application
const (
MetricsReqApplication = "metricsReqApp"
)

// attachment keys
const (
XForwardedForLower = "x-forwarded-for" // used as motan default proxy key
Expand Down Expand Up @@ -116,6 +121,9 @@ const (
GroupEnvironmentName = "MESH_SERVICE_ADDITIONAL_GROUP"
DirectRPCEnvironmentName = "MESH_DIRECT_RPC"
FilterEnvironmentName = "MESH_FILTERS"
HandlerEnvironmentName = "MESH_ADMIN_EXT_HANDLERS"
RegGroupSuffix = "RPC_REG_GROUP_SUFFIX"
SubGroupSuffix = "MESH_MULTI_SUB_GROUP_SUFFIX"
)

// meta keys
Expand Down
75 changes: 69 additions & 6 deletions core/globalContext.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,11 @@ import (
"errors"
"flag"
"fmt"
cfg "github.com/weibocom/motan-go/config"
"github.com/weibocom/motan-go/log"
"os"
"reflect"
"strings"

cfg "github.com/weibocom/motan-go/config"
"github.com/weibocom/motan-go/log"
)

const (
Expand Down Expand Up @@ -69,7 +68,8 @@ var (
defaultConfigPath = "./"
defaultFileSuffix = ".yaml"

urlFields = map[string]bool{"protocol": true, "host": true, "port": true, "path": true, "group": true}
urlFields = map[string]bool{"protocol": true, "host": true, "port": true, "path": true, "group": true}
extFilters = make(map[string]bool)
)

// all env flag in motan-go
Expand All @@ -87,6 +87,17 @@ var (
Recover = flag.Bool("recover", false, "recover from accidental exit")
)

func AddRelevantFilter(filterStr string) {
k := strings.TrimSpace(filterStr)
if k != "" {
extFilters[k] = true
}
}

func GetRelevantFilters() map[string]bool {
return extFilters
}

func (c *Context) confToURLs(section string) map[string]*URL {
urls := map[string]*URL{}
sectionConf, _ := c.Config.GetSection(section)
Expand Down Expand Up @@ -238,6 +249,7 @@ func (c *Context) Initialize() {
c.parserBasicServices()
c.parseServices()
c.parseHTTPClients()
initSwitcher(c)
}

func (c *Context) parseSingleConfiguration() (*cfg.Config, error) {
Expand Down Expand Up @@ -397,11 +409,12 @@ func (c *Context) basicConfToURLs(section string) map[string]*URL {
newURL = url
}

//final filters: defaultFilter + globalFilter + filters + envFilter
//final filters: defaultFilter + globalFilter + filters + envFilter + relevantFilters
finalFilters := c.MergeFilterSet(
c.GetDefaultFilterSet(newURL),
c.GetGlobalFilterSet(newURL),
c.GetEnvGlobalFilterSet(),
GetRelevantFilters(),
c.GetFilterSet(newURL.GetStringParamsWithDefault(FilterKey, ""), ""),
)
if len(finalFilters) > 0 {
Expand Down Expand Up @@ -518,8 +531,57 @@ func (c *Context) parseMultipleServiceGroup(motanServiceMap map[string]*URL) {
}
}

func (c *Context) parseRegGroupSuffix(urlMap map[string]*URL) {
regGroupSuffix := os.Getenv(RegGroupSuffix)
if regGroupSuffix == "" {
return
}
filterMap := make(map[string]struct{}, len(urlMap))
for _, url := range urlMap {
filterMap[url.GetIdentityWithRegistry()] = struct{}{}
}
for k, url := range urlMap {
if strings.HasSuffix(url.Group, regGroupSuffix) {
continue
}
newUrl := url.Copy()
newUrl.Group += regGroupSuffix
if _, ok := filterMap[newUrl.GetIdentityWithRegistry()]; ok {
continue
}
filterMap[newUrl.GetIdentityWithRegistry()] = struct{}{}
urlMap[k] = newUrl
}
}

func (c *Context) parseSubGroupSuffix(urlMap map[string]*URL) {
subGroupSuffix := os.Getenv(SubGroupSuffix)
if subGroupSuffix == "" || c.AgentURL == nil {
return
}
filterMap := make(map[string]struct{}, len(urlMap)*2)
for _, url := range urlMap {
filterMap[url.GetIdentity()] = struct{}{}
}
for k, url := range urlMap {
if strings.HasSuffix(url.Group, subGroupSuffix) {
continue
}
groupWithSuffix := url.Group + subGroupSuffix
newUrl := url.Copy()
newUrl.Group += subGroupSuffix
if _, ok := filterMap[newUrl.GetIdentity()]; ok {
continue
}
filterMap[newUrl.GetIdentity()] = struct{}{}
urlMap["auto_"+k+groupWithSuffix] = newUrl
}
}

func (c *Context) parseRefers() {
c.RefersURLs = c.basicConfToURLs(refersSection)
referUrls := c.basicConfToURLs(refersSection)
c.parseSubGroupSuffix(referUrls)
c.RefersURLs = referUrls
}

func (c *Context) parseBasicRefers() {
Expand All @@ -529,6 +591,7 @@ func (c *Context) parseBasicRefers() {
func (c *Context) parseServices() {
urlsMap := c.basicConfToURLs(servicesSection)
c.parseMultipleServiceGroup(urlsMap)
c.parseRegGroupSuffix(urlsMap)
c.ServiceURLs = urlsMap
}

Expand Down
Loading

0 comments on commit a6f5d1c

Please sign in to comment.