From 1bf1bb07c4e06d3520f9179423374432fc19b9bd Mon Sep 17 00:00:00 2001 From: believening Date: Thu, 16 Feb 2023 19:57:42 +0800 Subject: [PATCH] meshregistry: nacos source support fetching instances from all namespaces --- .../meshregistry/pkg/bootstrap/args.go | 2 + .../pkg/source/nacos/httpclient.go | 348 +++++++++++++++--- .../meshregistry/pkg/source/nacos/source.go | 12 +- 3 files changed, 301 insertions(+), 61 deletions(-) diff --git a/staging/src/slime.io/slime/modules/meshregistry/pkg/bootstrap/args.go b/staging/src/slime.io/slime/modules/meshregistry/pkg/bootstrap/args.go index dc6635e0..300b33db 100644 --- a/staging/src/slime.io/slime/modules/meshregistry/pkg/bootstrap/args.go +++ b/staging/src/slime.io/slime/modules/meshregistry/pkg/bootstrap/args.go @@ -169,6 +169,8 @@ type NacosSourceArgs struct { // username and password for nacos auth Username string Password string + // fetch services from all namespaces + AllNamespaces bool } type McpArgs struct { diff --git a/staging/src/slime.io/slime/modules/meshregistry/pkg/source/nacos/httpclient.go b/staging/src/slime.io/slime/modules/meshregistry/pkg/source/nacos/httpclient.go index 3fd67d88..aa003020 100644 --- a/staging/src/slime.io/slime/modules/meshregistry/pkg/source/nacos/httpclient.go +++ b/staging/src/slime.io/slime/modules/meshregistry/pkg/source/nacos/httpclient.go @@ -4,6 +4,7 @@ import ( "encoding/json" "fmt" "io" + "math/rand" "net/http" "net/url" "strings" @@ -11,13 +12,40 @@ import ( "time" ) -const defaultNacosTokenTTL = 5 +const ( + defaultNacosTokenTTL = 5 +) + +var ( + serviceListPageSize = 1000 +) + +type nacosNamespace struct { + Namespace string `json:"namespace,omitempty"` + NamespaceShowName string `json:"namespaceShowName,omitempty"` +} + +type namespaceResp struct { + Data []*nacosNamespace `json:"data,omitempty"` +} type serviceResp struct { - Doms []string `json:"doms"` - Count int `json:"count"` + Doms []string `json:"doms,omitempty"` + Count int `json:"count,omitempty"` +} + +type catalogServiceInfo struct { + Name string `json:"name,omitempty"` + GroupName string `json:"groupName,omitempty"` +} + +type catalogServiceListResp struct { + ServiceList []*catalogServiceInfo `json:"serviceList,omitempty"` + Count int `json:"count,omitempty"` } +type nacosMetadata map[string]string + type instance struct { // nolint: maligned Ip string `json:"ip"` Port int `json:"port"` @@ -31,16 +59,10 @@ type instance struct { // nolint: maligned } type instanceResp struct { - Hosts []*instance `json:"hosts"` - Dom string `json:"dom"` - Name string `json:"name"` - Env string `json:"env"` - Clusters string `json:"clusters"` - LastRefTime int64 `json:"lastRefTime"` + Hosts []*instance `json:"hosts"` + Dom string `json:"dom"` } -type nacosMetadata map[string]string - // Client for Nacos type Client interface { // Instances registered on the Nacos server @@ -51,7 +73,10 @@ type client struct { client http.Client urls []string headers map[string]string - index int + + // fetch instances from a specific namespace and group, or from all of the namespaces. + namespaceId, group string // if not set which means public and DEFAULT_GROUP + fetchAllNamespaces bool // security login username string @@ -60,21 +85,27 @@ type client struct { tokenTTL int64 } -func NewClient(urls []string, username, password string, headers map[string]string) Client { +func NewClient(urls []string, + username, password string, + namespaceId, group string, allNamespaces bool, + headers map[string]string) Client { c := &client{ - client: http.Client{Timeout: 30 * time.Second}, - headers: headers, - urls: urls, - index: 0, - username: username, - password: password, - tokenTTL: defaultNacosTokenTTL, // defaulr TokenTTL as 5 second, if first login failed - token: &atomic.Value{}, + client: http.Client{Timeout: 30 * time.Second}, + headers: headers, + urls: urls, + namespaceId: namespaceId, + group: group, + fetchAllNamespaces: allNamespaces, + username: username, + password: password, + tokenTTL: defaultNacosTokenTTL, // defaulr TokenTTL as 5 second, if first login failed + token: &atomic.Value{}, } if c.headers == nil { c.headers = make(map[string]string) } c.headers["Content-Type"] = "application/x-www-form-urlencoded" + c.headers["Accept"] = "application/json" if c.username != "" && c.password != "" { c.login() c.autoRefresh() @@ -83,31 +114,65 @@ func NewClient(urls []string, username, password string, headers map[string]stri } const ( - servicePath = "/nacos/v1/ns/service/list?pageNo=1&pageSize=100000" - intancesPath = "/nacos/v1/ns/instance/list?serviceName=" - loginPath = "/nacos/v1/auth/login" + namespaceListAPI = "/nacos/v1/console/namespaces" + serviceListAPI = "/nacos/v1/ns/service/list" + catalogServiceListAPI = "/nacos/v1/ns/catalog/services" + intancesListAPI = "/nacos/v1/ns/instance/list" + loginAPI = "/nacos/v1/auth/login" ) -func (c *client) chooseURL() string { - if c.index >= len(c.urls) { - c.index = 0 +func encodeQuery(param map[string]string) string { + if len(param) == 0 { + return "" + } + enc := url.Values{} + for k, v := range param { + enc.Add(k, v) } - url := c.urls[c.index] - c.index++ + return enc.Encode() +} - return url +func (c *client) call(api string, method string, header map[string]string, queryParam map[string]string, body io.Reader) ([]byte, error) { + query := encodeQuery(queryParam) + appendUrl := func(url string) string { + if query == "" { + return url + } + return url + "?" + query + } + var lastErr error + var bodyContent string + if body != nil { + rawContent, _ := io.ReadAll(body) + bodyContent = string(rawContent) + } + l := len(c.urls) + shift := rand.Intn(l) + for i := 0; i < l; i++ { + var curBody io.Reader + if bodyContent != "" { + curBody = strings.NewReader(bodyContent) + } + url := c.urls[(i+shift)%l] + api + resp, err := c.doCall(appendUrl(url), method, header, curBody) + if err == nil { + return resp, nil + } + //Scope.Debugf("call nacos api %s failed: %s", url, err) + Scope.Warnf("call nacos api %s failed: %s", url, err) + lastErr = err + } + return nil, lastErr } -func (c *client) call(method string, url string, body io.Reader) ([]byte, error) { +func (c *client) doCall(url string, method string, header map[string]string, body io.Reader) ([]byte, error) { req, err := http.NewRequest(method, url, body) if err != nil { return nil, err } - req.Header.Set("Accept", "application/json") - for k, v := range c.headers { + for k, v := range header { req.Header.Set(k, v) } - resp, err := c.client.Do(req) if err != nil { return nil, err @@ -121,37 +186,198 @@ func (c *client) call(method string, url string, body io.Reader) ([]byte, error) } func (c *client) Instances() ([]*instanceResp, error) { - url := c.chooseURL() - Scope.Debug("nacos url:" + url) + var fetcher func() (map[string][]*instance, error) + if c.fetchAllNamespaces { + fetcher = c.allNamespacesInstances + } else { + fetcher = func() (map[string][]*instance, error) { + return c.namespacedGroupedInstances(c.namespaceId, c.group) + } + } + m, err := fetcher() + if err != nil { + Scope.Errorf("do get instances failed: %s", err) + return nil, err + } + resp := make([]*instanceResp, 0, len(m)) + for svc, instances := range m { + resp = append(resp, &instanceResp{ + Dom: svc, + Hosts: instances, + }) + } + return resp, nil +} + +func (c *client) pagingListServices(namespaceId, groupName string, pageNo int) (*serviceResp, error) { + var sr serviceResp + param := map[string]string{ + "namespaceId": namespaceId, + "groupName": groupName, + "pageSize": fmt.Sprintf("%d", serviceListPageSize), + "pageNo": fmt.Sprintf("%d", pageNo), + } + c.injectAuthParam(param) + resp, err := c.call(serviceListAPI, http.MethodGet, c.headers, param, nil) + if err != nil { + return nil, err + } + if err := json.Unmarshal(resp, &sr); err != nil { + return nil, err + } + return &sr, nil +} + +func (c *client) listServices(namespaceId, groupName string) ([]string, error) { + probeServiceResp, err := c.pagingListServices(namespaceId, groupName, 1) + if err != nil { + return nil, err + } + doms := probeServiceResp.Doms + if probeServiceResp.Count > serviceListPageSize { + pageCount := probeServiceResp.Count/serviceListPageSize + 1 + for page := 2; page <= pageCount; page++ { + sr, err := c.pagingListServices(namespaceId, groupName, page) + if err != nil { + return nil, err + } + doms = append(doms, sr.Doms...) + } + } + return doms, nil +} + +func (c *client) pagingListCatalogServices(namespaceId string, pageNo int) (*catalogServiceListResp, error) { + var csr catalogServiceListResp + param := map[string]string{ + "namespaceId": namespaceId, + "haseIpCount": "false", + "withIntances": "false", + "pageSize": fmt.Sprintf("%d", serviceListPageSize), + "pageNo": fmt.Sprintf("%d", pageNo), + } + c.injectAuthParam(param) + resp, err := c.call(catalogServiceListAPI, http.MethodGet, c.headers, param, nil) + if err != nil { + return nil, err + } + if err := json.Unmarshal(resp, &csr); err != nil { + return nil, err + } + return &csr, nil +} - getUrl := func(base string) string { - token := c.token.Load() - if token == nil { - return base +func (c *client) listCatalogServices(namespaceId string) ([]*catalogServiceInfo, error) { + probeServiceResp, err := c.pagingListCatalogServices(namespaceId, 1) + if err != nil { + return nil, err + } + groupedServices := probeServiceResp.ServiceList + if probeServiceResp.Count > serviceListPageSize { + pageCount := probeServiceResp.Count/serviceListPageSize + 1 + for page := 2; page <= pageCount; page++ { + csr, err := c.pagingListCatalogServices(namespaceId, page) + if err != nil { + return nil, err + } + groupedServices = append(groupedServices, csr.ServiceList...) } - return base + "&accessToken=" + token.(string) } + return groupedServices, nil +} + +func (c *client) listInstances(namespaceId, groupName, serviceName string) ([]*instance, error) { + var ir instanceResp + param := map[string]string{ + "namespaceId": namespaceId, + "groupName": groupName, + "serviceName": serviceName, + } + c.injectAuthParam(param) + resp, err := c.call(intancesListAPI, http.MethodGet, c.headers, param, nil) + if err != nil { + return nil, err + } + if err := json.Unmarshal(resp, &ir); err != nil { + return nil, err + } + + return ir.Hosts, nil +} + +func (c *client) listNamespaces() ([]*nacosNamespace, error) { + var nr namespaceResp + param := map[string]string{} + c.injectAuthParam(param) + resp, err := c.call(namespaceListAPI, http.MethodGet, c.headers, param, nil) + if err != nil { + return nil, err + } + if err := json.Unmarshal(resp, &nr); err != nil { + return nil, err + } + return nr.Data, nil + +} - serviceData, err := c.call(http.MethodGet, getUrl(url+servicePath), nil) +func (c *client) namespacedGroupedInstances(namespaceId, groupName string) (map[string][]*instance, error) { + svcs, err := c.listServices(namespaceId, groupName) if err != nil { + Scope.Errorf("list services in namespace %q group %q failed: %s", namespaceId, groupName, err) return nil, err } - var services serviceResp - if err = json.Unmarshal(serviceData, &services); err != nil { + svcInstances := make(map[string][]*instance, len(svcs)) + for _, svc := range svcs { + instances, err := c.listInstances(namespaceId, groupName, svc) + if err != nil { + Scope.Warnf("list instances of service %q in namespace %q group %q failed: %s", namespaceId, groupName, svc, err) + // try best + continue + } + svcInstances[svc] = instances + } + return svcInstances, nil +} + +func (c *client) namespacedInstances(namespaceId string) (map[string][]*instance, error) { + svcs, err := c.listCatalogServices(namespaceId) + if err != nil { + Scope.Errorf("list services using catalog api in namespace %q failed: %s", namespaceId, err) return nil, err } + svcInstances := make(map[string][]*instance, len(svcs)) + for _, svc := range svcs { + instances, err := c.listInstances(namespaceId, svc.GroupName, svc.Name) + if err != nil { + Scope.Warnf("list instances of service %q in namespace %q group %q failed: %s", namespaceId, svc.GroupName, svc.Name, err) + // try best + continue + } + + svcInstances[svc.Name] = append(svcInstances[svc.Name], instances...) + } + return svcInstances, nil +} - instanceAll := make([]*instanceResp, 0) - for _, serviceName := range services.Doms { - var instance instanceResp - instanceData, err := c.call(http.MethodGet, getUrl(url+intancesPath+"DEFAULT_GROUP@@"+serviceName), nil) - if err = json.Unmarshal(instanceData, &instance); err != nil { - return nil, err +func (c *client) allNamespacesInstances() (map[string][]*instance, error) { + nsList, err := c.listNamespaces() + if err != nil { + Scope.Errorf("list namespaces failed: %s", err) + return nil, err + } + svcInstances := make(map[string][]*instance) + for _, ns := range nsList { + instances, err := c.namespacedInstances(ns.Namespace) + if err != nil { + Scope.Warnf("get all instances in namespace %q failed: %s", ns.Namespace, err) + // try best + continue + } + for k, v := range instances { + svcInstances[k] = append(svcInstances[k], v...) } - instance.Dom = serviceName - instanceAll = append(instanceAll, &instance) } - return instanceAll, nil + return svcInstances, nil } func (c *client) login() { @@ -164,19 +390,15 @@ func (c *client) login() { c.tokenTTL = defaultNacosTokenTTL } }() - loginUrl := c.chooseURL() + loginPath - enc := url.Values{} - enc.Add("username", c.username) - enc.Add("password", c.password) body := func() io.Reader { enc := url.Values{} enc.Add("username", c.username) enc.Add("password", c.password) return strings.NewReader(enc.Encode()) }() - resp, err := c.call(http.MethodPost, loginUrl, body) + resp, err := c.call(loginAPI, http.MethodPost, c.headers, nil, body) if err != nil { - Scope.Warnf("login %s with user %s failed: %s", loginUrl, c.username, err) + Scope.Warnf("login with user %s failed: %s", c.username, err) needResetTTL = true return } @@ -214,3 +436,11 @@ func (c *client) autoRefresh() { } }() } + +func (c *client) injectAuthParam(param map[string]string) { + v := c.token.Load() + token, ok := v.(string) + if ok { + param["accessToken"] = token + } +} diff --git a/staging/src/slime.io/slime/modules/meshregistry/pkg/source/nacos/source.go b/staging/src/slime.io/slime/modules/meshregistry/pkg/source/nacos/source.go index c6069415..d7454cdd 100644 --- a/staging/src/slime.io/slime/modules/meshregistry/pkg/source/nacos/source.go +++ b/staging/src/slime.io/slime/modules/meshregistry/pkg/source/nacos/source.go @@ -26,7 +26,6 @@ type Source struct { // nacos client client Client namingClient naming_client.INamingClient - namespace string group string // common configs @@ -34,6 +33,9 @@ type Source struct { gatewayModel bool nsHost bool k8sDomainSuffix bool + allNamespaces bool + namespace string + namespaces []string svcPort uint32 mode string delay time.Duration @@ -110,7 +112,13 @@ func New(nacoesArgs bootstrap.NacosSourceArgs, nsHost bool, k8sDomainSuffix bool } } if nacoesArgs.Mode == POLLING { - s.client = NewClient(nacoesArgs.Address, nacoesArgs.Username, nacoesArgs.Password, headers) + s.client = NewClient(nacoesArgs.Address, + nacoesArgs.Username, + nacoesArgs.Password, + nacoesArgs.Namespace, + nacoesArgs.Group, + nacoesArgs.AllNamespaces, + headers) } else { namingClient, err := newNamingClient(nacoesArgs.Address, nacoesArgs.Namespace, headers) if err != nil {