Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: endpointslice controller #574

Merged
merged 24 commits into from
Jul 10, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
e25adc8
chore: revert the .gitignore change
tokers Feb 3, 2021
335338f
fix: workqueue shoud shutdown when controller exited
slene May 15, 2021
f646aa7
fix: panic of start leading. sync ingress failed when apisix not start.
slene May 15, 2021
f7c6050
fix for comment
slene May 16, 2021
27b0e4b
fix
slene May 16, 2021
6dd0469
fix: remove DefaultClusterClientTimeout config
slene May 16, 2021
79e7f83
fix for comment
slene May 16, 2021
2d12304
impl tcpSocket heath check probe
slene May 17, 2021
f820ab5
fix for comments
slene May 17, 2021
9d95a9d
fix
slene May 17, 2021
96b2315
fix for comments
slene May 18, 2021
3e93d1b
Merge remote-tracking branch 'upstream/master' into slene-master
tokers May 26, 2021
48bf2d2
Merge remote-tracking branch 'origin/master'
tokers May 26, 2021
4200a8b
feat: abstract the endpoints-related logic
tokers Jun 30, 2021
79fa610
fix: broken unit test cases
tokers Jun 30, 2021
9731cea
Merge remote-tracking branch 'origin/master' into feat/endpointslice
tokers Jul 1, 2021
1760d27
Merge remote-tracking branch 'upstream/master' into feat/endpointslice
tokers Jul 1, 2021
e05fc06
Merge remote-tracking branch 'upstream/master' into feat/endpointslice
tokers Jul 6, 2021
244727e
fix
tokers Jul 6, 2021
2513b47
fix gofmt
tokers Jul 7, 2021
a98e123
chore: add endpointslice controller
tokers Jul 7, 2021
603b52e
fix
tokers Jul 9, 2021
3367254
Merge remote-tracking branch 'upstream/master' into chore/endpointsli…
tokers Jul 9, 2021
2126fd3
add test case
tokers Jul 9, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ This project is currently general availability.

## Prerequisites

Apisix ingress controller requires Kubernetes version 1.14+.
Apisix ingress controller requires Kubernetes version 1.15+.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


## Apache APISIX Ingress vs. Kubernetes Ingress Nginx

Expand Down
1 change: 1 addition & 0 deletions cmd/ingress/ingress.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@ the apisix cluster and others are created`,
cmd.PersistentFlags().StringVar(&cfg.Kubernetes.ElectionID, "election-id", config.IngressAPISIXLeader, "election id used for campaign the controller leader")
cmd.PersistentFlags().StringVar(&cfg.Kubernetes.IngressVersion, "ingress-version", config.IngressNetworkingV1, "the supported ingress api group version, can be \"networking/v1beta1\", \"networking/v1\" (for Kubernetes version v1.19.0 or higher) and \"extensions/v1beta1\"")
cmd.PersistentFlags().StringVar(&cfg.Kubernetes.ApisixRouteVersion, "apisix-route-version", config.ApisixRouteV2alpha1, "the supported apisixroute api group version, can be \"apisix.apache.org/v1\" or \"apisix.apache.org/v2alpha1\"")
cmd.PersistentFlags().BoolVar(&cfg.Kubernetes.WatchEndpointSlices, "watch-endpointslices", false, "whether to watch endpointslices rather than endpoints")
cmd.PersistentFlags().StringVar(&cfg.APISIX.BaseURL, "apisix-base-url", "", "the base URL for APISIX admin api / manager api (deprecated, using --default-apisix-cluster-base-url instead)")
cmd.PersistentFlags().StringVar(&cfg.APISIX.AdminKey, "apisix-admin-key", "", "admin key used for the authorization of APISIX admin api / manager api (deprecated, using --default-apisix-cluster-admin-key instead)")
cmd.PersistentFlags().StringVar(&cfg.APISIX.DefaultClusterBaseURL, "default-apisix-cluster-base-url", "", "the base URL of admin api / manager api for the default APISIX cluster")
Expand Down
1 change: 1 addition & 0 deletions conf/config-default.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ kubernetes:
ingress_version: "networking/v1" # the supported ingress api group version, can be "networking/v1beta1"
# , "networking/v1" (for Kubernetes version v1.19.0 or higher), and
# "extensions/v1beta1", default is "networking/v1".
watch_endpointslices: false # whether to watch EndpointSlices rather than Endpoints.

apisix_route_version: "apisix.apache.org/v2alpha1" # the supported apisixroute api group version, can be
# "apisix.apache.org/v1" or "apisix.apache.org/v2alpha1",
Expand Down
2 changes: 1 addition & 1 deletion docs/en/latest/FAQ.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ Tips: The failure caused by empty upstream nodes is a limitation of Apache APISI

6. What is the retry rule of `apisix-ingress-controller`?

If an error occurs during the process of `apisix-ingress-controller` parsing CRD and distributing the configuration to APISIX, a retry will be triggered.
If an error occurs duriREADME.mdng the process of `apisix-ingress-controller` parsing CRD and distributing the configuration to APISIX, a retry will be triggered.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Misspell


The delayed retry method is adopted. After the first failure, it is retried once per second. After 5 retries are triggered, the slow retry strategy will be enabled, and the retry will be performed every 1 minute until it succeeds.

Expand Down
103 changes: 97 additions & 6 deletions pkg/ingress/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,11 @@ import (
"sync"
"time"

apisixcache "github.com/apache/apisix-ingress-controller/pkg/apisix/cache"
configv1 "github.com/apache/apisix-ingress-controller/pkg/kube/apisix/apis/config/v1"
"go.uber.org/zap"
v1 "k8s.io/api/core/v1"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
Expand Down Expand Up @@ -107,10 +110,11 @@ type Controller struct {
apisixConsumerLister listersv2alpha1.ApisixConsumerLister

// resource controllers
podController *podController
endpointsController *endpointsController
ingressController *ingressController
secretController *secretController
podController *podController
endpointsController *endpointsController
endpointSliceController *endpointSliceController
ingressController *ingressController
secretController *secretController

apisixUpstreamController *apisixUpstreamController
apisixRouteController *apisixRouteController
Expand Down Expand Up @@ -233,8 +237,12 @@ func (c *Controller) initWhenStartLeading() {
c.apisixTlsInformer = apisixFactory.Apisix().V1().ApisixTlses().Informer()
c.apisixConsumerInformer = apisixFactory.Apisix().V2alpha1().ApisixConsumers().Informer()

if c.cfg.Kubernetes.WatchEndpointSlices {
c.endpointSliceController = c.newEndpointSliceController()
} else {
c.endpointsController = c.newEndpointsController()
}
c.podController = c.newPodController()
c.endpointsController = c.newEndpointsController()
c.apisixUpstreamController = c.newApisixUpstreamController()
c.ingressController = c.newIngressController()
c.apisixRouteController = c.newApisixRouteController()
Expand Down Expand Up @@ -425,7 +433,11 @@ func (c *Controller) run(ctx context.Context) {
c.podController.run(ctx)
})
c.goAttach(func() {
c.endpointsController.run(ctx)
if c.cfg.Kubernetes.WatchEndpointSlices {
c.endpointSliceController.run(ctx)
} else {
c.endpointsController.run(ctx)
}
})
c.goAttach(func() {
c.apisixUpstreamController.run(ctx)
Expand Down Expand Up @@ -504,6 +516,85 @@ func (c *Controller) syncConsumer(ctx context.Context, consumer *apisixv1.Consum
}
return
}

func (c *Controller) syncEndpoint(ctx context.Context, ep kube.Endpoint) error {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why write sync in the controller?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because these logics can be reused by both the endpoint controller and endpointslice controller.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean put them in another file maybe better , not in controller .

namespace := ep.Namespace()
svcName := ep.ServiceName()
svc, err := c.svcLister.Services(ep.Namespace()).Get(svcName)
if err != nil {
if k8serrors.IsNotFound(err) {
log.Infof("service %s/%s not found", ep.Namespace(), svcName)
return nil
}
log.Errorf("failed to get service %s/%s: %s", ep.Namespace(), svcName, err)
return err
}
var subsets []configv1.ApisixUpstreamSubset
subsets = append(subsets, configv1.ApisixUpstreamSubset{})
au, err := c.apisixUpstreamLister.ApisixUpstreams(namespace).Get(svcName)
if err != nil {
if !k8serrors.IsNotFound(err) {
log.Errorf("failed to get ApisixUpstream %s/%s: %s", ep.Namespace(), svcName, err)
return err
}
} else if len(au.Spec.Subsets) > 0 {
subsets = append(subsets, au.Spec.Subsets...)
}

clusters := c.apisix.ListClusters()
for _, port := range svc.Spec.Ports {
for _, subset := range subsets {
nodes, err := c.translator.TranslateUpstreamNodes(ep, port.Port, subset.Labels)
if err != nil {
log.Errorw("failed to translate upstream nodes",
zap.Error(err),
zap.Any("endpoints", ep),
zap.Int32("port", port.Port),
)
}
name := apisixv1.ComposeUpstreamName(namespace, svcName, subset.Name, port.Port)
for _, cluster := range clusters {
if err := c.syncUpstreamNodesChangeToCluster(ctx, cluster, nodes, name); err != nil {
return err
}
}
}
}
return nil
}

func (c *Controller) syncUpstreamNodesChangeToCluster(ctx context.Context, cluster apisix.Cluster, nodes apisixv1.UpstreamNodes, upsName string) error {
upstream, err := cluster.Upstream().Get(ctx, upsName)
if err != nil {
if err == apisixcache.ErrNotFound {
log.Warnw("upstream is not referenced",
zap.String("cluster", cluster.String()),
zap.String("upstream", upsName),
)
return nil
} else {
log.Errorw("failed to get upstream",
zap.String("upstream", upsName),
zap.String("cluster", cluster.String()),
zap.Error(err),
)
return err
}
}

upstream.Nodes = nodes

log.Debugw("upstream binds new nodes",
zap.Any("upstream", upstream),
zap.String("cluster", cluster.String()),
)

updated := &manifest{
upstreams: []*apisixv1.Upstream{upstream},
}
return c.syncManifests(ctx, nil, updated, nil)
}

func (c *Controller) checkClusterHealth(ctx context.Context, cancelFunc context.CancelFunc) {
defer cancelFunc()
for {
Expand Down
84 changes: 2 additions & 82 deletions pkg/ingress/endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,21 +16,16 @@ package ingress

import (
"context"
"github.com/apache/apisix-ingress-controller/pkg/kube"
"time"

"go.uber.org/zap"
corev1 "k8s.io/api/core/v1"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"

"github.com/apache/apisix-ingress-controller/pkg/apisix"
apisixcache "github.com/apache/apisix-ingress-controller/pkg/apisix/cache"
configv1 "github.com/apache/apisix-ingress-controller/pkg/kube/apisix/apis/config/v1"
"github.com/apache/apisix-ingress-controller/pkg/kube"
"github.com/apache/apisix-ingress-controller/pkg/log"
"github.com/apache/apisix-ingress-controller/pkg/types"
apisixv1 "github.com/apache/apisix-ingress-controller/pkg/types/apisix/v1"
)

type endpointsController struct {
Expand Down Expand Up @@ -89,82 +84,7 @@ func (c *endpointsController) run(ctx context.Context) {

func (c *endpointsController) sync(ctx context.Context, ev *types.Event) error {
ep := ev.Object.(kube.Endpoint)
namespace := ep.Namespace()
svcName := ep.ServiceName()
svc, err := c.controller.svcLister.Services(ep.Namespace()).Get(svcName)
if err != nil {
if k8serrors.IsNotFound(err) {
log.Infof("service %s/%s not found", ep.Namespace(), svcName)
return nil
}
log.Errorf("failed to get service %s/%s: %s", ep.Namespace(), svcName, err)
return err
}
var subsets []configv1.ApisixUpstreamSubset
subsets = append(subsets, configv1.ApisixUpstreamSubset{})
au, err := c.controller.apisixUpstreamLister.ApisixUpstreams(namespace).Get(svcName)
if err != nil {
if !k8serrors.IsNotFound(err) {
log.Errorf("failed to get ApisixUpstream %s/%s: %s", ep.Namespace(), svcName, err)
return err
}
} else if len(au.Spec.Subsets) > 0 {
subsets = append(subsets, au.Spec.Subsets...)
}

clusters := c.controller.apisix.ListClusters()
for _, port := range svc.Spec.Ports {
for _, subset := range subsets {
nodes, err := c.controller.translator.TranslateUpstreamNodes(ep, port.Port, subset.Labels)
if err != nil {
log.Errorw("failed to translate upstream nodes",
zap.Error(err),
zap.Any("endpoints", ep),
zap.Int32("port", port.Port),
)
}
name := apisixv1.ComposeUpstreamName(namespace, svcName, subset.Name, port.Port)
for _, cluster := range clusters {
if err := c.syncToCluster(ctx, cluster, nodes, name); err != nil {
return err
}
}
}
}

return nil
}

func (c *endpointsController) syncToCluster(ctx context.Context, cluster apisix.Cluster, nodes apisixv1.UpstreamNodes, upsName string) error {
upstream, err := cluster.Upstream().Get(ctx, upsName)
if err != nil {
if err == apisixcache.ErrNotFound {
log.Warnw("upstream is not referenced",
zap.String("cluster", cluster.String()),
zap.String("upstream", upsName),
)
return nil
} else {
log.Errorw("failed to get upstream",
zap.String("upstream", upsName),
zap.String("cluster", cluster.String()),
zap.Error(err),
)
return err
}
}

upstream.Nodes = nodes

log.Debugw("upstream binds new nodes",
zap.Any("upstream", upstream),
zap.String("cluster", cluster.String()),
)

updated := &manifest{
upstreams: []*apisixv1.Upstream{upstream},
}
return c.controller.syncManifests(ctx, nil, updated, nil)
return c.controller.syncEndpoint(ctx, ep)
}

func (c *endpointsController) handleSyncErr(obj interface{}, err error) {
Expand Down
Loading