Skip to content

Commit

Permalink
fix: many namespace lead to provider stuck (#1386)
Browse files Browse the repository at this point in the history
  • Loading branch information
shareinto authored Dec 12, 2022
1 parent 2ce1ed3 commit 7e8f076
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 20 deletions.
4 changes: 4 additions & 0 deletions pkg/providers/apisix/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (

"github.com/apache/apisix-ingress-controller/pkg/config"
"github.com/apache/apisix-ingress-controller/pkg/kube"
"github.com/apache/apisix-ingress-controller/pkg/kube/apisix/client/informers/externalversions"
apisixtranslation "github.com/apache/apisix-ingress-controller/pkg/providers/apisix/translation"
"github.com/apache/apisix-ingress-controller/pkg/providers/k8s/namespace"
"github.com/apache/apisix-ingress-controller/pkg/providers/translation"
Expand Down Expand Up @@ -75,6 +76,8 @@ type apisixProvider struct {
apisixConsumerInformer cache.SharedIndexInformer
apisixPluginConfigInformer cache.SharedIndexInformer
apisixTlsInformer cache.SharedIndexInformer

apisixSharedInformerFactory externalversions.SharedInformerFactory
}

func NewProvider(common *providertypes.Common, namespaceProvider namespace.WatchingNamespaceProvider,
Expand All @@ -86,6 +89,7 @@ func NewProvider(common *providertypes.Common, namespaceProvider namespace.Watch
}

apisixFactory := common.KubeClient.NewAPISIXSharedIndexInformerFactory()
p.apisixSharedInformerFactory = apisixFactory

p.apisixTranslator = apisixtranslation.NewApisixTranslator(&apisixtranslation.TranslatorOptions{
Apisix: common.APISIX,
Expand Down
51 changes: 31 additions & 20 deletions pkg/providers/apisix/provider_init.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,11 @@ package apisix

import (
"context"
"fmt"
"sync"

"go.uber.org/zap"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"

"github.com/apache/apisix-ingress-controller/pkg/config"
"github.com/apache/apisix-ingress-controller/pkg/log"
Expand Down Expand Up @@ -48,23 +49,33 @@ func (p *apisixProvider) Init(ctx context.Context) error {
pluginConfigMapA6 = make(map[string]string)
)

p.apisixSharedInformerFactory.Start(ctx.Done())
synced := p.apisixSharedInformerFactory.WaitForCacheSync(ctx.Done())
for v, ok := range synced {
if !ok {
err := fmt.Errorf("%s cache failed to sync", v.Name())
log.Error(err.Error())
return err
}
}

namespaces := p.namespaceProvider.WatchingNamespaces()

for _, key := range namespaces {
log.Debugf("start to watch namespace: %s", key)
wg.Add(1)
go func(ns string) {
defer wg.Done()
// ApisixRoute
opts := v1.ListOptions{}
switch p.common.Config.Kubernetes.APIVersion {
case config.ApisixV2beta3:
retRoutes, err := p.common.KubeClient.APISIXClient.ApisixV2beta3().ApisixRoutes(ns).List(ctx, opts)
retRoutes, err := p.apisixSharedInformerFactory.Apisix().V2beta3().ApisixRoutes().Lister().ApisixRoutes(ns).List(labels.Everything())
if err != nil {
log.Error(err.Error())
ctx.Done()
} else {
for _, r := range retRoutes.Items {
tc, err := p.apisixTranslator.TranslateRouteV2beta3NotStrictly(&r)
for _, r := range retRoutes {
tc, err := p.apisixTranslator.TranslateRouteV2beta3NotStrictly(r)
if err != nil {
log.Error(err.Error())
ctx.Done()
Expand Down Expand Up @@ -93,13 +104,13 @@ func (p *apisixProvider) Init(ctx context.Context) error {
}
}
case config.ApisixV2:
retRoutes, err := p.common.KubeClient.APISIXClient.ApisixV2().ApisixRoutes(ns).List(ctx, opts)
retRoutes, err := p.apisixSharedInformerFactory.Apisix().V2().ApisixRoutes().Lister().ApisixRoutes(ns).List(labels.Everything())
if err != nil {
log.Error(err.Error())
ctx.Done()
} else {
for _, r := range retRoutes.Items {
tc, err := p.apisixTranslator.TranslateRouteV2NotStrictly(&r)
for _, r := range retRoutes {
tc, err := p.apisixTranslator.TranslateRouteV2NotStrictly(r)
if err != nil {
log.Error(err.Error())
ctx.Done()
Expand Down Expand Up @@ -138,13 +149,13 @@ func (p *apisixProvider) Init(ctx context.Context) error {
switch p.common.Config.Kubernetes.APIVersion {
case config.ApisixV2beta3:
// ApisixConsumer
retConsumer, err := p.common.KubeClient.APISIXClient.ApisixV2beta3().ApisixConsumers(ns).List(ctx, opts)
retConsumer, err := p.apisixSharedInformerFactory.Apisix().V2beta3().ApisixConsumers().Lister().ApisixConsumers(ns).List(labels.Everything())
if err != nil {
log.Error(err.Error())
ctx.Done()
} else {
for _, con := range retConsumer.Items {
consumer, err := p.apisixTranslator.TranslateApisixConsumerV2beta3(&con)
for _, con := range retConsumer {
consumer, err := p.apisixTranslator.TranslateApisixConsumerV2beta3(con)
if err != nil {
log.Error(err.Error())
ctx.Done()
Expand All @@ -154,13 +165,13 @@ func (p *apisixProvider) Init(ctx context.Context) error {
}
}
// ApisixTls
retSSL, err := p.common.KubeClient.APISIXClient.ApisixV2beta3().ApisixTlses(ns).List(ctx, opts)
retSSL, err := p.apisixSharedInformerFactory.Apisix().V2beta3().ApisixTlses().Lister().ApisixTlses(ns).List(labels.Everything())
if err != nil {
log.Error(err.Error())
ctx.Done()
} else {
for _, s := range retSSL.Items {
ssl, err := p.apisixTranslator.TranslateSSLV2Beta3(&s)
for _, s := range retSSL {
ssl, err := p.apisixTranslator.TranslateSSLV2Beta3(s)
if err != nil {
log.Error(err.Error())
ctx.Done()
Expand All @@ -171,13 +182,13 @@ func (p *apisixProvider) Init(ctx context.Context) error {
}
case config.ApisixV2:
// ApisixConsumer
retConsumer, err := p.common.KubeClient.APISIXClient.ApisixV2().ApisixConsumers(ns).List(ctx, opts)
retConsumer, err := p.apisixSharedInformerFactory.Apisix().V2().ApisixConsumers().Lister().ApisixConsumers(ns).List(labels.Everything())
if err != nil {
log.Error(err.Error())
ctx.Done()
} else {
for _, con := range retConsumer.Items {
consumer, err := p.apisixTranslator.TranslateApisixConsumerV2(&con)
for _, con := range retConsumer {
consumer, err := p.apisixTranslator.TranslateApisixConsumerV2(con)
if err != nil {
log.Error(err.Error())
ctx.Done()
Expand All @@ -187,13 +198,13 @@ func (p *apisixProvider) Init(ctx context.Context) error {
}
}
// ApisixTls
retSSL, err := p.common.KubeClient.APISIXClient.ApisixV2().ApisixTlses(ns).List(ctx, opts)
retSSL, err := p.apisixSharedInformerFactory.Apisix().V2().ApisixTlses().Lister().ApisixTlses(ns).List(labels.Everything())
if err != nil {
log.Error(err.Error())
ctx.Done()
} else {
for _, s := range retSSL.Items {
ssl, err := p.apisixTranslator.TranslateSSLV2(&s)
for _, s := range retSSL {
ssl, err := p.apisixTranslator.TranslateSSLV2(s)
if err != nil {
log.Error(err.Error())
ctx.Done()
Expand Down

0 comments on commit 7e8f076

Please sign in to comment.