Skip to content

Commit

Permalink
meshregistry: remove arg AggregateDubboMethods and add methods label …
Browse files Browse the repository at this point in the history
…to zk/cacheJson api
  • Loading branch information
YonkaFang committed May 23, 2023
1 parent d8ba1d6 commit 5065529
Show file tree
Hide file tree
Showing 3 changed files with 78 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,6 @@ type ZookeeperSourceArgs struct {
TrimDubboRemoveDepInterval util.Duration `json:"TrimDubboRemoveDepInterval,omitempty"`
// specify how to map `app` to label key:value pair
DubboWorkloadAppLabel string `json:"DubboWorkloadAppLabel,omitempty"`
AggregateDubboMethods bool `json:"AggregateDubboMethods,omitempty"` // XXX totally remove this feature?

// mcp configs
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,25 +139,23 @@ type convertedServiceEntry struct {
}

func convertServiceEntry(
providers, consumers []string, service string, svcPort uint32, instancePortAsSvcPort, patchLabel,
aggregateDubboMethods bool, ignoreLabels map[string]string, gatewayMode bool) map[string]*convertedServiceEntry {
providers, consumers []string, service string, svcPort uint32, instancePortAsSvcPort, patchLabel bool,
ignoreLabels map[string]string, gatewayMode bool) map[string]*convertedServiceEntry {
serviceEntryByServiceKey := make(map[string]*convertedServiceEntry)
methodsByServiceKey := make(map[string]map[string]struct{})

defer func() {
for k, cse := range serviceEntryByServiceKey {
cse.methodsEqual = trimSameDubboMethodsLabel(cse.se)

if aggregateDubboMethods {
if v := methodsByServiceKey[k]; len(v) > 0 {
methods := make([]string, 0, len(v))
for method := range v {
methods = append(methods, method)
}
sort.Strings(methods)

cse.methodsLabel = text.EscapeLabelValues(methods)
if v := methodsByServiceKey[k]; len(v) > 0 {
methods := make([]string, 0, len(v))
for method := range v {
methods = append(methods, method)
}
sort.Strings(methods)

cse.methodsLabel = text.EscapeLabelValues(methods)
}
}
}()
Expand Down Expand Up @@ -189,21 +187,17 @@ func convertServiceEntry(

var (
methods = map[string]struct{}{}
methodApplier func(method string)
)
if aggregateDubboMethods {
methods = map[string]struct{}{}
methodApplier = func(method string) {
methods[method] = struct{}{}
}
}
)

meta, ok := verifyMeta(providerParts[len(providerParts)-1], addr, patchLabel, ignoreLabels, methodApplier)
if !ok {
continue
}

serviceKey := buildServiceKey(service, meta)
serviceKey := buildServiceKey(service, meta) // istio service host

if len(methods) > 0 {
serviceMethods := methodsByServiceKey[serviceKey]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,9 @@ type Source struct {
ignoreLabelsMap map[string]string
watchingRoot bool // TODO useless?

serviceCache map[string]*ServiceEntryWithMeta
cache cmap.ConcurrentMap
pollingCache cmap.ConcurrentMap
serviceMethods map[string]string
cache cmap.ConcurrentMap // string-interface: cmap(string-host: *ServiceEntryWithMeta)
pollingCache cmap.ConcurrentMap // string-interface: cmap(string-host: *ServiceEntryWithMeta)
sidecarCache map[resource.FullName]SidecarWithMeta
dubboCallModels map[string]DubboCallModel // can only be replaced rather than being modified
seDubboCallModels map[resource.FullName]map[string]DubboCallModel
Expand Down Expand Up @@ -108,6 +108,7 @@ func New(args *bootstrap.ZookeeperSourceArgs, exceptedResources []collection.Sch

initedCallback: readyCallback,

serviceMethods: map[string]string{},
cache: cmap.New(),
pollingCache: cmap.New(),
seDubboCallModels: map[resource.FullName]map[string]DubboCallModel{},
Expand Down Expand Up @@ -232,16 +233,61 @@ func (s *Source) simpleCacheJson(w http.ResponseWriter, r *http.Request) {

func (s *Source) cacheJson(w http.ResponseWriter, req *http.Request) {
temp := s.cacheInUse()
all := make(map[string]interface{}, 0)
if interfaceName := req.URL.Query().Get("interfaceName"); interfaceName != "" {
cacheData := map[string]map[string]*ServiceEntryWithMeta{}
var result interface{}

interfaceName := req.URL.Query().Get("interfaceName")
if interfaceName != "" {
newTemp := cmap.New()
if value, exist := temp.Get(interfaceName); exist {
all["cache"] = value
newTemp.Set(interfaceName, value)
}
temp = newTemp
}

temp.IterCb(func(dubboInterface string, v interface{}) {
if v == nil {
return
}

inner := v.(cmap.ConcurrentMap)
if inner == nil {
return
}

interfaceCacheData := cacheData[dubboInterface]
if interfaceCacheData == nil {
interfaceCacheData = map[string]*ServiceEntryWithMeta{}
cacheData[dubboInterface] = interfaceCacheData
}
inner.IterCb(func(host string, v interface{}) {
sem := v.(*ServiceEntryWithMeta)
s.mut.RLock()
methods, ok := s.serviceMethods[host]
s.mut.RUnlock()

if ok && sem.Meta.Labels[dubboParamMethods] != methods {
semCopy := *sem
labelCopy := make(map[string]string, len(sem.Meta.Labels))
for k, v := range sem.Meta.Labels {
labelCopy[k] = v
}
labelCopy[dubboParamMethods] = methods
semCopy.Meta.Labels = labelCopy
sem = &semCopy
}

interfaceCacheData[host] = sem
})
})

if interfaceName != "" {
result = cacheData[interfaceName]
} else {
all["cache"] = temp
all["serviceCache"] = s.serviceCache
result = cacheData
}
b, err := json.MarshalIndent(all, "", " ")

b, err := json.MarshalIndent(map[string]interface{}{"cache": result}, "", " ")
if err != nil {
_, _ = fmt.Fprintf(w, "unable to marshal zk se cache: %v", err)
return
Expand Down Expand Up @@ -435,14 +481,14 @@ func (s *Source) markServiceEntryInitDone() {
}
}

func (s *Source) handleServiceData(cacheInUse cmap.ConcurrentMap, provider, consumer []string, service string) {
if _, ok := cacheInUse.Get(service); !ok {
cacheInUse.Set(service, cmap.New())
func (s *Source) handleServiceData(cacheInUse cmap.ConcurrentMap, provider, consumer []string, dubboInterface string) {
if _, ok := cacheInUse.Get(dubboInterface); !ok {
cacheInUse.Set(dubboInterface, cmap.New())
}

freshSeMap := convertServiceEntry(
provider, consumer, service, s.args.SvcPort, s.args.InstancePortAsSvcPort, s.args.LabelPatch,
s.args.AggregateDubboMethods, s.ignoreLabelsMap, s.args.GatewayModel)
provider, consumer, dubboInterface, s.args.SvcPort, s.args.InstancePortAsSvcPort, s.args.LabelPatch,
s.ignoreLabelsMap, s.args.GatewayModel)
for serviceKey, convertedSe := range freshSeMap {
se := convertedSe.se
now := time.Now()
Expand All @@ -453,7 +499,7 @@ func (s *Source) handleServiceData(cacheInUse cmap.ConcurrentMap, provider, cons
CreateTime: now,
Version: resource.Version(now.String()),
Labels: map[string]string{
"path": service,
"path": dubboInterface,
"registry": "zookeeper",
},
Annotations: map[string]string{},
Expand All @@ -464,11 +510,12 @@ func (s *Source) handleServiceData(cacheInUse cmap.ConcurrentMap, provider, cons
// to trigger svc change/full push in istio sidecar when eq -> uneq or uneq -> eq
newSeWithMeta.Meta.Labels[DubboSvcMethodEqLabel] = strconv.FormatBool(convertedSe.methodsEqual)
}
if s.args.AggregateDubboMethods {
newSeWithMeta.Meta.Labels[dubboParamMethods] = convertedSe.methodsLabel
}

v, ok := cacheInUse.Get(service)
s.mut.Lock()
s.serviceMethods[serviceKey] = convertedSe.methodsLabel
s.mut.Unlock()

v, ok := cacheInUse.Get(dubboInterface)
if !ok {
continue
}
Expand Down Expand Up @@ -503,7 +550,7 @@ func (s *Source) handleServiceData(cacheInUse cmap.ConcurrentMap, provider, cons

// check if svc deleted
deleteKey := make([]string, 0)
v, ok := cacheInUse.Get(service)
v, ok := cacheInUse.Get(dubboInterface)
if !ok {
return
}
Expand Down

0 comments on commit 5065529

Please sign in to comment.