Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

meshregistry: nacos source support filtering instance #295

Merged
merged 1 commit into from
Feb 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,9 @@ type SourceArgs struct {
// if empty, those endpoints with ns attr will be aggregated into a no-ns service like "foo"
DefaultServiceNs string
ResourceNs string
// A list of selectors that specify the set of service instances to be processed,
// configured in the same way as the k8s label selector.
EndpointSelectors []*metav1.LabelSelector
}

type K8SSourceArgs struct {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

networking "istio.io/api/networking/v1alpha3"

"slime.io/slime/modules/meshregistry/pkg/source"
"slime.io/slime/modules/meshregistry/pkg/util"
)

Expand Down Expand Up @@ -72,8 +73,11 @@ func convertEndpoints(instances []*instance, patchLabel bool) ([]*networking.Wor
Name: "http",
}
ports = append(ports, port)

filter, enableInstanceFilter := instanceFilter.Load().(source.SelectHook)
for _, ins := range instances {
if enableInstanceFilter && !filter(ins.Metadata) {
continue
}
if !ins.Healthy {
continue
}
Expand Down Expand Up @@ -157,7 +161,11 @@ func convertEndpointsWithNs(instances []*instance, defaultNs string, svcPort uin
sort.Slice(instances, func(i, j int) bool {
return instances[i].InstanceId < instances[j].InstanceId
})
filter, enableInstanceFilter := instanceFilter.Load().(source.SelectHook)
for _, ins := range instances {
if enableInstanceFilter && !filter(ins.Metadata) {
continue
}
if !ins.Healthy {
continue
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"net/http"
"os"
"strings"
"sync/atomic"
"time"

"github.com/nacos-group/nacos-sdk-go/v2/clients/naming_client"
Expand Down Expand Up @@ -58,7 +59,14 @@ type Source struct {
initedCallback func(string)
}

var Scope = log.RegisterScope("nacos", "nacos debugging", 0)
var (
instanceFilter = new(atomic.Value) // source.SelectHook
ApplyInstanceFilter source.ApplyHook = func(sh source.SelectHook) {
instanceFilter.Store(sh)
}

Scope = log.RegisterScope("nacos", "nacos debugging", 0)
)

const (
SourceName = "nacos"
Expand All @@ -69,7 +77,7 @@ const (
)

func New(nacoesArgs bootstrap.NacosSourceArgs, nsHost bool, k8sDomainSuffix bool, delay time.Duration, readyCallback func(string)) (event.Source, func(http.ResponseWriter, *http.Request), error) {
source := &Source{
s := &Source{
namespace: nacoesArgs.Namespace,
group: nacoesArgs.Group,
delay: delay,
Expand Down Expand Up @@ -102,17 +110,18 @@ func New(nacoesArgs bootstrap.NacosSourceArgs, nsHost bool, k8sDomainSuffix bool
}
}
if nacoesArgs.Mode == POLLING {
source.client = NewClient(nacoesArgs.Address, nacoesArgs.Username, nacoesArgs.Password, headers)
s.client = NewClient(nacoesArgs.Address, nacoesArgs.Username, nacoesArgs.Password, headers)
} else {
namingClient, err := newNamingClient(nacoesArgs.Address, nacoesArgs.Namespace, headers)
if err != nil {
return nil, nil, Error{
msg: fmt.Sprintf("init nacos client failed: %s", err.Error()),
}
}
source.namingClient = namingClient
s.namingClient = namingClient
}
return source, source.cacheJson, nil
source.UpdateSelector(nacoesArgs.EndpointSelectors, ApplyInstanceFilter)
return s, s.cacheJson, nil
}

func (s *Source) cacheJson(w http.ResponseWriter, _ *http.Request) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package source

import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
)

type SelectHook func(map[string]string) bool

type ApplyHook func(SelectHook)

// NewSelectHook build a SelectHook by the input LabelSelectors.
// If the input LabelSelectors is nil, the returned hook always returns TRUE.
func NewSelectHook(labelSelectors []*metav1.LabelSelector) SelectHook {
if len(labelSelectors) == 0 {
return func(_ map[string]string) bool { return true }
}
var selectors []labels.Selector
for _, selector := range labelSelectors {
ls, err := metav1.LabelSelectorAsSelector(selector)
if err != nil {
// ignore invalid LabelSelector
continue
}
selectors = append(selectors, ls)
}
return func(m map[string]string) bool {
if len(selectors) == 0 {
return true
}
for _, selector := range selectors {
if selector.Matches(labels.Set(m)) {
return true
}
}
return false
}
}

// UpdateSelector updates the given selector with the input LabelSelectors.
func UpdateSelector(labelSelectors []*metav1.LabelSelector, apply ApplyHook) {
apply(NewSelectHook(labelSelectors))
}