Skip to content

Commit

Permalink
fix: namespace_selector invalid when restarting (#1238) (#1291)
Browse files Browse the repository at this point in the history
  • Loading branch information
AlinsRan authored Sep 2, 2022
1 parent 3b56ee3 commit 8241b15
Show file tree
Hide file tree
Showing 7 changed files with 337 additions and 109 deletions.
11 changes: 0 additions & 11 deletions pkg/api/validation/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,14 +99,3 @@ func validateSchema(schemaLoader *gojsonschema.JSONLoader, obj interface{}) (boo

return false, resultErr
}

func HasValueInSyncMap(m *sync.Map) bool {
hasValue := false
if m != nil {
m.Range(func(k, v interface{}) bool {
hasValue = true
return false
})
}
return hasValue
}
9 changes: 0 additions & 9 deletions pkg/api/validation/utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,8 @@
package validation

import (
"sync"
"testing"

"github.com/stretchr/testify/assert"
"github.com/xeipuuv/gojsonschema"

v2beta3 "github.com/apache/apisix-ingress-controller/pkg/kube/apisix/apis/config/v2beta3"
Expand Down Expand Up @@ -48,10 +46,3 @@ func Test_validateSchema(t *testing.T) {
})
}
}

func TestHasValueInSyncMap(t *testing.T) {
m := new(sync.Map)
assert.False(t, HasValueInSyncMap(m), "sync.Map should be empty")
m.Store("hello", "test")
assert.True(t, HasValueInSyncMap(m), "sync.Map should not be empty")
}
84 changes: 42 additions & 42 deletions pkg/ingress/namespace/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,16 @@ package namespace

import (
"context"
"fmt"
"strings"
"sync"

"go.uber.org/zap"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
listerscorev1 "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/cache"

"github.com/apache/apisix-ingress-controller/pkg/api/validation"
"github.com/apache/apisix-ingress-controller/pkg/config"
"github.com/apache/apisix-ingress-controller/pkg/ingress/utils"
"github.com/apache/apisix-ingress-controller/pkg/kube"
Expand All @@ -44,44 +43,28 @@ type WatchingProvider interface {
}

func NewWatchingProvider(ctx context.Context, kube *kube.KubeClient, cfg *config.Config) (WatchingProvider, error) {
var (
watchingNamespaces = new(sync.Map)
watchingLabels = make(map[string]string)
)
if len(cfg.Kubernetes.AppNamespaces) > 1 || cfg.Kubernetes.AppNamespaces[0] != v1.NamespaceAll {
for _, ns := range cfg.Kubernetes.AppNamespaces {
watchingNamespaces.Store(ns, struct{}{})
}
c := &watchingProvider{
kube: kube,
cfg: cfg,

watchingNamespaces: new(sync.Map),
watchingLabels: make(map[string]string),

enableLabelsWatching: false,
}

if len(cfg.Kubernetes.NamespaceSelector) == 0 {
return c, nil
}

// support namespace label-selector
c.enableLabelsWatching = true
for _, selector := range cfg.Kubernetes.NamespaceSelector {
labelSlice := strings.Split(selector, "=")
watchingLabels[labelSlice[0]] = labelSlice[1]
}

// watchingNamespaces and watchingLabels are empty means to monitor all namespaces.
if !validation.HasValueInSyncMap(watchingNamespaces) && len(watchingLabels) == 0 {
opts := metav1.ListOptions{}
// list all namespaces
nsList, err := kube.Client.CoreV1().Namespaces().List(ctx, opts)
if err != nil {
log.Error(err.Error())
ctx.Done()
} else {
wns := new(sync.Map)
for _, v := range nsList.Items {
wns.Store(v.Name, struct{}{})
}
watchingNamespaces = wns
if len(labelSlice) != 2 {
return nil, fmt.Errorf("Bad namespace-selector format: %s, expected namespace-selector format: xxx=xxx", selector)
}
}

c := &watchingProvider{
kube: kube,
cfg: cfg,

watchingNamespaces: watchingNamespaces,
watchingLabels: watchingLabels,
c.watchingLabels[labelSlice[0]] = labelSlice[1]
}

kubeFactory := kube.NewSharedIndexInformerFactory()
Expand All @@ -108,6 +91,8 @@ type watchingProvider struct {
namespaceLister listerscorev1.NamespaceLister

controller *namespaceController

enableLabelsWatching bool
}

func (c *watchingProvider) initWatchingNamespacesByLabels(ctx context.Context) error {
Expand All @@ -130,12 +115,14 @@ func (c *watchingProvider) initWatchingNamespacesByLabels(ctx context.Context) e
}

func (c *watchingProvider) Run(ctx context.Context) {
e := utils.ParallelExecutor{}
if !c.enableLabelsWatching {
return
}

e := utils.ParallelExecutor{}
e.Add(func() {
c.namespaceInformer.Run(ctx.Done())
})

e.Add(func() {
c.controller.run(ctx)
})
Expand All @@ -145,17 +132,30 @@ func (c *watchingProvider) Run(ctx context.Context) {

func (c *watchingProvider) WatchingNamespaces() []string {
var keys []string
c.watchingNamespaces.Range(func(key, _ interface{}) bool {
keys = append(keys, key.(string))
return true
})
if c.enableLabelsWatching {
c.watchingNamespaces.Range(func(key, _ interface{}) bool {
keys = append(keys, key.(string))
return true
})
} else {
namespaces, err := c.kube.Client.CoreV1().Namespaces().List(context.Background(), metav1.ListOptions{})
if err != nil {
log.Warnw("Namespace list get failed",
zap.Error(err),
)
return nil
}
for _, ns := range namespaces.Items {
keys = append(keys, ns.Name)
}
}
return keys
}

// IsWatchingNamespace accepts a resource key, getting the namespace part
// and checking whether the namespace is being watched.
func (c *watchingProvider) IsWatchingNamespace(key string) (ok bool) {
if !validation.HasValueInSyncMap(c.watchingNamespaces) {
if !c.enableLabelsWatching {
ok = true
return
}
Expand Down
15 changes: 11 additions & 4 deletions test/e2e/scaffold/ingress.go
Original file line number Diff line number Diff line change
Expand Up @@ -420,13 +420,17 @@ func (s *Scaffold) newIngressAPISIXController() error {
})

var ingressAPISIXDeployment string
label := fmt.Sprintf("apisix.ingress.watch=%s", s.namespace)
label := `""`
if labels := s.NamespaceSelectorLabelStrings(); labels != nil && !s.opts.DisableNamespaceSelector {
label = labels[0]
}

if s.opts.EnableWebhooks {
ingressAPISIXDeployment = fmt.Sprintf(s.FormatRegistry(_ingressAPISIXDeploymentTemplate), s.opts.IngressAPISIXReplicas, s.namespace, s.opts.ApisixResourceSyncInterval,
s.FormatNamespaceLabel(label), s.opts.ApisixResourceVersion, s.opts.APISIXPublishAddress, _volumeMounts, _webhookCertSecret)
label, s.opts.ApisixResourceVersion, s.opts.APISIXPublishAddress, _volumeMounts, _webhookCertSecret)
} else {
ingressAPISIXDeployment = fmt.Sprintf(s.FormatRegistry(_ingressAPISIXDeploymentTemplate), s.opts.IngressAPISIXReplicas, s.namespace, s.opts.ApisixResourceSyncInterval,
s.FormatNamespaceLabel(label), s.opts.ApisixResourceVersion, s.opts.APISIXPublishAddress, "", _webhookCertSecret)
label, s.opts.ApisixResourceVersion, s.opts.APISIXPublishAddress, "", _webhookCertSecret)
}

err = k8s.KubectlApplyFromStringE(s.t, s.kubectlOptions, ingressAPISIXDeployment)
Expand Down Expand Up @@ -530,7 +534,10 @@ func (s *Scaffold) GetIngressPodDetails() ([]corev1.Pod, error) {
// ScaleIngressController scales the number of Ingress Controller pods to desired.
func (s *Scaffold) ScaleIngressController(desired int) error {
var ingressDeployment string
label := fmt.Sprintf("apisix.ingress.watch=%s", s.namespace)
var label string
if labels := s.NamespaceSelectorLabelStrings(); labels != nil {
label = labels[0]
}
if s.opts.EnableWebhooks {
ingressDeployment = fmt.Sprintf(s.FormatRegistry(_ingressAPISIXDeploymentTemplate), desired, s.namespace, s.opts.ApisixResourceSyncInterval, label, s.opts.ApisixResourceVersion, s.opts.APISIXPublishAddress, _volumeMounts, _webhookCertSecret)
} else {
Expand Down
16 changes: 10 additions & 6 deletions test/e2e/scaffold/k8s.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,16 +161,20 @@ func (s *Scaffold) CreateResourceFromStringWithNamespace(yaml, namespace string)
s.kubectlOptions.Namespace = originalNamespace
}()
s.addFinalizers(func() {
originalNamespace := s.kubectlOptions.Namespace
s.kubectlOptions.Namespace = namespace
defer func() {
s.kubectlOptions.Namespace = originalNamespace
}()
assert.Nil(s.t, k8s.KubectlDeleteFromStringE(s.t, s.kubectlOptions, yaml))
_ = s.DeleteResourceFromStringWithNamespace(yaml, namespace)
})
return k8s.KubectlApplyFromStringE(s.t, s.kubectlOptions, yaml)
}

func (s *Scaffold) DeleteResourceFromStringWithNamespace(yaml, namespace string) error {
originalNamespace := s.kubectlOptions.Namespace
s.kubectlOptions.Namespace = namespace
defer func() {
s.kubectlOptions.Namespace = originalNamespace
}()
return k8s.KubectlDeleteFromStringE(s.t, s.kubectlOptions, yaml)
}

func (s *Scaffold) ensureNumApisixCRDsCreated(url string, desired int) error {
condFunc := func() (bool, error) {
req, err := http.NewRequest("GET", url, nil)
Expand Down
66 changes: 45 additions & 21 deletions test/e2e/scaffold/scaffold.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,12 @@ type Options struct {
APISIXAdminAPIKey string
EnableWebhooks bool
APISIXPublishAddress string
disableNamespaceSelector bool
ApisixResourceSyncInterval string
ApisixResourceVersion string

NamespaceSelectorLabel map[string]string
DisableNamespaceSelector bool
DisableNamespaceLabel bool
}

type Scaffold struct {
Expand Down Expand Up @@ -96,10 +99,10 @@ var (
}

createVersionedApisixResourceMap = map[string]struct{}{
"ApisixRoute": struct{}{},
"ApisixConsumer": struct{}{},
"ApisixPluginConfig": struct{}{},
"ApisixUpstream": struct{}{},
"ApisixRoute": {},
"ApisixConsumer": {},
"ApisixPluginConfig": {},
"ApisixUpstream": {},
}
)

Expand Down Expand Up @@ -368,18 +371,39 @@ func (s *Scaffold) RestartAPISIXDeploy() {
assert.NoError(s.t, err, "renew apisix tunnels")
}

func (s *Scaffold) RestartIngressControllerDeploy() {
pods, err := k8s.ListPodsE(s.t, s.kubectlOptions, metav1.ListOptions{
LabelSelector: "app=ingress-apisix-controller-deployment-e2e-test",
})
assert.NoError(s.t, err, "list ingress-controller pod")
for _, pod := range pods {
err = s.KillPod(pod.Name)
assert.NoError(s.t, err, "killing ingress-controller pod")
}
err = s.WaitAllIngressControllerPodsAvailable()
assert.NoError(s.t, err, "waiting for new ingress-controller instance ready")
}

func (s *Scaffold) beforeEach() {
var err error
s.namespace = fmt.Sprintf("ingress-apisix-e2e-tests-%s-%d", s.opts.Name, time.Now().Nanosecond())
s.kubectlOptions = &k8s.KubectlOptions{
ConfigPath: s.opts.Kubeconfig,
Namespace: s.namespace,
}

s.finializers = nil
labels := make(map[string]string)
labels["apisix.ingress.watch"] = s.namespace
k8s.CreateNamespaceWithMetadata(s.t, s.kubectlOptions, metav1.ObjectMeta{Name: s.namespace, Labels: labels})

label := map[string]string{}
if !s.opts.DisableNamespaceLabel {
if s.opts.NamespaceSelectorLabel == nil {
label["apisix.ingress.watch"] = s.namespace
s.opts.NamespaceSelectorLabel = label
} else {
label = s.opts.NamespaceSelectorLabel
}
}

k8s.CreateNamespaceWithMetadata(s.t, s.kubectlOptions, metav1.ObjectMeta{Name: s.namespace, Labels: label})

s.nodes, err = k8s.GetReadyNodesE(s.t, s.kubectlOptions)
assert.Nil(s.t, err, "querying ready nodes")
Expand Down Expand Up @@ -541,14 +565,6 @@ func (s *Scaffold) FormatRegistry(workloadTemplate string) string {
}
}

// FormatNamespaceLabel set label to be empty if s.opts.disableNamespaceSelector is true.
func (s *Scaffold) FormatNamespaceLabel(label string) string {
if s.opts.disableNamespaceSelector {
return "\"\""
}
return label
}

var (
versionRegex = regexp.MustCompile(`apiVersion: apisix.apache.org/v.*?\n`)
kindRegex = regexp.MustCompile(`kind: (.*?)\n`)
Expand All @@ -566,10 +582,6 @@ func (s *Scaffold) getKindValue(yml string) string {
return subStr[1]
}

func (s *Scaffold) DisableNamespaceSelector() {
s.opts.disableNamespaceSelector = true
}

func waitExponentialBackoff(condFunc func() (bool, error)) error {
backoff := wait.Backoff{
Duration: 500 * time.Millisecond,
Expand Down Expand Up @@ -614,3 +626,15 @@ func (s *Scaffold) CreateVersionedApisixResourceWithNamespace(yml, namespace str
func ApisixResourceVersion() *apisixResourceVersionInfo {
return apisixResourceVersion
}

func (s *Scaffold) NamespaceSelectorLabelStrings() []string {
var labels []string
for k, v := range s.opts.NamespaceSelectorLabel {
labels = append(labels, fmt.Sprintf("%s=%s", k, v))
}
return labels
}

func (s *Scaffold) NamespaceSelectorLabel() map[string]string {
return s.opts.NamespaceSelectorLabel
}
Loading

0 comments on commit 8241b15

Please sign in to comment.