From 9d663abec02f09ece4af9aa99713de3e878146f6 Mon Sep 17 00:00:00 2001 From: Xin Rong Date: Fri, 9 Sep 2022 22:43:46 +0800 Subject: [PATCH] fix: support resolveGranularity of ApisixRoute (#1251) --- pkg/apisix/apisix.go | 7 +- pkg/apisix/cache/memdb_test.go | 6 +- pkg/apisix/nonexistentclient.go | 4 +- pkg/apisix/resource_test.go | 7 +- pkg/apisix/upstream.go | 7 +- pkg/apisix/upstreamservicerelation.go | 101 +++++---- pkg/apisix/upstreamservicerelation_test.go | 197 ++++++++++++++++++ pkg/providers/apisix/apisix_upstream.go | 89 ++++---- .../apisix/translation/apisix_plugin_test.go | 4 +- .../apisix/translation/apisix_route.go | 24 +-- .../apisix/translation/apisix_upstream.go | 9 +- .../gateway/translation/gateway_httproute.go | 3 +- .../gateway/translation/gateway_tlsroute.go | 3 +- .../ingress/translation/translator.go | 9 +- pkg/providers/k8s/endpoint/base.go | 28 ++- pkg/providers/k8s/endpoint/endpoint.go | 17 +- pkg/providers/k8s/endpoint/endpointslice.go | 72 +++---- pkg/types/apisix/v1/types.go | 26 ++- pkg/types/apisix/v1/zz_generated.deepcopy.go | 7 + pkg/types/service.go | 25 +++ test/e2e/suite-chore/resolvegranularity.go | 127 +++++++++++ 21 files changed, 569 insertions(+), 203 deletions(-) create mode 100644 pkg/apisix/upstreamservicerelation_test.go create mode 100644 pkg/types/service.go create mode 100644 test/e2e/suite-chore/resolvegranularity.go diff --git a/pkg/apisix/apisix.go b/pkg/apisix/apisix.go index 23f3e1b2ba..e4ca42e059 100644 --- a/pkg/apisix/apisix.go +++ b/pkg/apisix/apisix.go @@ -153,10 +153,13 @@ type PluginConfig interface { } type UpstreamServiceRelation interface { + // Get relation based on namespace+"_"+service.name Get(context.Context, string) (*v1.UpstreamServiceRelation, error) List(context.Context) ([]*v1.UpstreamServiceRelation, error) - Delete(context.Context, *v1.UpstreamServiceRelation) error - Create(context.Context, *v1.UpstreamServiceRelation) error + // Delete relation based on namespace+"_"+service.name + Delete(context.Context, string) error + // Build relation based on upstream.name + Create(context.Context, string) error } type apisix struct { diff --git a/pkg/apisix/cache/memdb_test.go b/pkg/apisix/cache/memdb_test.go index 79d9cbd3dc..8181f75839 100644 --- a/pkg/apisix/cache/memdb_test.go +++ b/pkg/apisix/cache/memdb_test.go @@ -486,8 +486,10 @@ func TestMemDBCacheUpstreamServiceRelation(t *testing.T) { assert.Equal(t, us2, us) us3 := &v1.UpstreamServiceRelation{ - ServiceName: "httpbin", - UpstreamName: "upstream", + ServiceName: "httpbin", + UpstreamNames: map[string]struct{}{ + "upstream": {}, + }, } assert.Nil(t, c.InsertUpstreamServiceRelation(us3), "inserting upstream_service 3") diff --git a/pkg/apisix/nonexistentclient.go b/pkg/apisix/nonexistentclient.go index 7ca3da46f8..37c9b90da7 100644 --- a/pkg/apisix/nonexistentclient.go +++ b/pkg/apisix/nonexistentclient.go @@ -248,13 +248,13 @@ type dummyUpstreamServiceRelation struct { func (f *dummyUpstreamServiceRelation) Get(_ context.Context, _ string) (*v1.UpstreamServiceRelation, error) { return nil, ErrClusterNotExist } -func (f *dummyUpstreamServiceRelation) Create(_ context.Context, _ *v1.UpstreamServiceRelation) error { +func (f *dummyUpstreamServiceRelation) Create(_ context.Context, _ string) error { return ErrClusterNotExist } func (f *dummyUpstreamServiceRelation) List(_ context.Context) ([]*v1.UpstreamServiceRelation, error) { return nil, ErrClusterNotExist } -func (f *dummyUpstreamServiceRelation) Delete(_ context.Context, _ *v1.UpstreamServiceRelation) error { +func (f *dummyUpstreamServiceRelation) Delete(_ context.Context, _ string) error { return ErrClusterNotExist } diff --git a/pkg/apisix/resource_test.go b/pkg/apisix/resource_test.go index 528f801dae..5b4c6c18f0 100644 --- a/pkg/apisix/resource_test.go +++ b/pkg/apisix/resource_test.go @@ -86,8 +86,11 @@ func TestItemConvertUpstream(t *testing.T) { ups, err := ite.upstream() assert.Nil(t, err) assert.Len(t, ups.Nodes, 2) - assert.Equal(t, ups.Nodes[0], v1.UpstreamNode{Host: "httpbin.org", Port: 80, Weight: 1}) - assert.Equal(t, ups.Nodes[1], v1.UpstreamNode{Host: "foo.com", Port: 8080, Weight: 2}) + if ups.Nodes[0].Host == "foo.com" { + ups.Nodes[0], ups.Nodes[1] = ups.Nodes[1], ups.Nodes[0] + } + assert.Equal(t, v1.UpstreamNode{Host: "httpbin.org", Port: 80, Weight: 1}, ups.Nodes[0]) + assert.Equal(t, v1.UpstreamNode{Host: "foo.com", Port: 8080, Weight: 2}, ups.Nodes[1]) ite = &item{ Key: "/apisix/upstreams/419655639963271872", diff --git a/pkg/apisix/upstream.go b/pkg/apisix/upstream.go index d50110fe36..4405fbd463 100644 --- a/pkg/apisix/upstream.go +++ b/pkg/apisix/upstream.go @@ -139,7 +139,7 @@ func (u *upstreamClient) Create(ctx context.Context, obj *v1.Upstream) (*v1.Upst zap.String("cluster", "default"), ) - if err := u.cluster.upstreamServiceRelation.Create(ctx, &v1.UpstreamServiceRelation{UpstreamName: obj.Name}); err != nil { + if err := u.cluster.upstreamServiceRelation.Create(ctx, obj.Name); err != nil { log.Errorf("failed to reflect upstreamService create to cache: %s", err) } if err := u.cluster.HasSynced(ctx); err != nil { @@ -187,9 +187,6 @@ func (u *upstreamClient) Delete(ctx context.Context, obj *v1.Upstream) error { return err } } - if err := u.cluster.upstreamServiceRelation.Delete(ctx, &v1.UpstreamServiceRelation{UpstreamName: obj.Name}); err != nil { - log.Errorf("failed to delete upstreamService in cache: %s", err) - } url := u.url + "/" + obj.ID if err := u.cluster.deleteResource(ctx, url, "upstream"); err != nil { u.cluster.metricsCollector.IncrAPISIXRequest("upstream") @@ -207,7 +204,7 @@ func (u *upstreamClient) Update(ctx context.Context, obj *v1.Upstream) (*v1.Upst zap.String("url", u.url), ) - if err := u.cluster.upstreamServiceRelation.Create(ctx, &v1.UpstreamServiceRelation{UpstreamName: obj.Name}); err != nil { + if err := u.cluster.upstreamServiceRelation.Create(ctx, obj.Name); err != nil { log.Errorf("failed to reflect upstreamService create to cache: %s", err) } if err := u.cluster.HasSynced(ctx); err != nil { diff --git a/pkg/apisix/upstreamservicerelation.go b/pkg/apisix/upstreamservicerelation.go index 3ebe1d0cfb..9b026c7f4d 100644 --- a/pkg/apisix/upstreamservicerelation.go +++ b/pkg/apisix/upstreamservicerelation.go @@ -17,6 +17,7 @@ package apisix import ( "context" "fmt" + "strconv" "strings" "go.uber.org/zap" @@ -26,6 +27,11 @@ import ( v1 "github.com/apache/apisix-ingress-controller/pkg/types/apisix/v1" ) +// to do: Delete one of the upstreams. Currently, only service is deleted. There will be some +// redundant upstream objects, but the results will not be affected. It is hoped that the service controller +// can complete the update nodes logic to avoid the intrusion of relation modules into more code. + +// Maintain relationships only when resolveGranularity is endpoint // There is no need to ensure the consistency between the upstream to services, only need to ensure that the upstream-node can be delete after deleting the service type upstreamService struct { cluster *cluster @@ -37,79 +43,78 @@ func newUpstreamServiceRelation(c *cluster) *upstreamService { } } -func (u *upstreamService) Get(ctx context.Context, svcId string) (*v1.UpstreamServiceRelation, error) { +func (u *upstreamService) Get(ctx context.Context, serviceName string) (*v1.UpstreamServiceRelation, error) { log.Debugw("try to get upstreamService in cache", - zap.String("svcId", svcId), + zap.String("service_name", serviceName), zap.String("cluster", "default"), ) - us, err := u.cluster.cache.GetUpstreamServiceRelation(svcId) - if err == nil { - return us, nil - } - if err != cache.ErrNotFound { + us, err := u.cluster.cache.GetUpstreamServiceRelation(serviceName) + if err != nil && err != cache.ErrNotFound { log.Error("failed to find upstreamService in cache", - zap.String("svcId", svcId), zap.Error(err)) - } else { - log.Debugw("failed to find upstreamService in cache", - zap.String("svcId", svcId), zap.Error(err)) + zap.String("service_name", serviceName), zap.Error(err)) + return nil, err } - return nil, err + return us, err } -func (u *upstreamService) Delete(ctx context.Context, relation *v1.UpstreamServiceRelation) error { +func (u *upstreamService) Delete(ctx context.Context, serviceName string) error { log.Debugw("try to delete upstreamService in cache", zap.String("cluster", "default"), ) - u.initUpstreamServiceRelation(relation) - if relation == nil || relation.ServiceName == "" && relation.UpstreamName == "" { - return fmt.Errorf("UpstreamServiceRelation is empty object") - } - if relation.UpstreamName != "" { - err := u.cluster.cache.DeleteUpstreamServiceRelation(relation) - if err != nil { - return err - } - } else { - usr, err := u.cluster.cache.GetUpstreamServiceRelation(relation.ServiceName) - if err != nil { - return err + relation, err := u.Get(ctx, serviceName) + if err != nil { + if err == cache.ErrNotFound { + return nil } - ups, err := u.cluster.upstream.Get(ctx, usr.UpstreamName) + return err + } + _ = u.cluster.cache.DeleteUpstreamServiceRelation(relation) + for upsName := range relation.UpstreamNames { + ups, err := u.cluster.upstream.Get(ctx, upsName) if err != nil { - return err + continue } ups.Nodes = make(v1.UpstreamNodes, 0) _, err = u.cluster.upstream.Update(ctx, ups) if err != nil { - return err - } - err = u.cluster.cache.DeleteUpstreamServiceRelation(usr) - if err != nil { - return err + continue } } return nil } -func (u *upstreamService) Create(ctx context.Context, relation *v1.UpstreamServiceRelation) error { +func (u *upstreamService) Create(ctx context.Context, upstreamName string) error { log.Debugw("try to create upstreamService in cache", zap.String("cluster", "default"), ) - u.initUpstreamServiceRelation(relation) - if relation == nil || relation.ServiceName == "" || relation.UpstreamName == "" { - log.Error("UpstreamServiceRelation object ") + + args := strings.Split(upstreamName, "_") + if len(args) < 2 { + return fmt.Errorf("wrong upstream name %s, must contains namespace_name", upstreamName) + } + // The last part of upstreanName should be a port number. + // Please refer to apisixv1.ComposeUpstreamName to see the detailed format. + _, err := strconv.Atoi(args[len(args)-1]) + if err != nil { return nil } - us, err := u.cluster.cache.GetUpstreamServiceRelation(relation.ServiceName) + + serviceName := args[0] + "_" + args[1] + relation, err := u.Get(ctx, serviceName) if err != nil && err != cache.ErrNotFound { return err } - if us != nil { - us.UpstreamName = relation.UpstreamName + if relation == nil { + relation = &v1.UpstreamServiceRelation{ + ServiceName: serviceName, + UpstreamNames: map[string]struct{}{ + upstreamName: {}, + }, + } } else { - us = relation + relation.UpstreamNames[upstreamName] = struct{}{} } - if err := u.cluster.cache.InsertUpstreamServiceRelation(us); err != nil { + if err := u.cluster.cache.InsertUpstreamServiceRelation(relation); err != nil { log.Errorf("failed to reflect upstreamService create to cache: %s", err) return err } @@ -129,15 +134,3 @@ func (u *upstreamService) List(ctx context.Context) ([]*v1.UpstreamServiceRelati } return usrs, nil } - -func (u *upstreamService) initUpstreamServiceRelation(us *v1.UpstreamServiceRelation) { - if us.UpstreamName == "" || us.ServiceName != "" { - return - } - args := strings.Split(us.UpstreamName, "_") - // namespace_service_subset_port - if len(args) < 2 { - return - } - us.ServiceName = args[0] + "_" + args[1] -} diff --git a/pkg/apisix/upstreamservicerelation_test.go b/pkg/apisix/upstreamservicerelation_test.go new file mode 100644 index 0000000000..dce71f1b99 --- /dev/null +++ b/pkg/apisix/upstreamservicerelation_test.go @@ -0,0 +1,197 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package apisix + +import ( + "context" + "net/http" + "net/url" + "testing" + + "github.com/stretchr/testify/assert" + + "github.com/apache/apisix-ingress-controller/pkg/apisix/cache" + "github.com/apache/apisix-ingress-controller/pkg/metrics" + v1 "github.com/apache/apisix-ingress-controller/pkg/types/apisix/v1" +) + +func TestUpstreamServiceRelation(t *testing.T) { + u := url.URL{} + closedCh := make(chan struct{}) + close(closedCh) + cache, err := cache.NewMemDBCache() + assert.Nil(t, err) + cli := newUpstreamServiceRelation(&cluster{ + baseURL: u.String(), + cli: http.DefaultClient, + cache: cache, + cacheSynced: closedCh, + metricsCollector: metrics.NewPrometheusCollector(), + upstream: &dummyUpstream{}, + }) + + upsName := "default_httpbin_80" + upsName2 := "default_httpbin_8080" + svcName := "default_httpbin" + + err = cli.Create(context.TODO(), upsName) + assert.Nil(t, err) + + relation, err := cli.Get(context.TODO(), svcName) + assert.Nil(t, err) + assert.NotNil(t, relation) + assert.Equal(t, &v1.UpstreamServiceRelation{ + ServiceName: svcName, + UpstreamNames: map[string]struct{}{ + upsName: {}, + }, + }, relation) + + err = cli.Create(context.TODO(), upsName2) + assert.Nil(t, err) + + relation, err = cli.Get(context.TODO(), svcName) + assert.Nil(t, err) + assert.NotNil(t, relation) + assert.Equal(t, &v1.UpstreamServiceRelation{ + ServiceName: svcName, + UpstreamNames: map[string]struct{}{ + upsName: {}, + upsName2: {}, + }, + }, relation) + + relations, err := cli.List(context.TODO()) + assert.Nil(t, err) + assert.Len(t, relations, 1) + assert.Equal(t, &v1.UpstreamServiceRelation{ + ServiceName: svcName, + UpstreamNames: map[string]struct{}{ + upsName: {}, + upsName2: {}, + }, + }, relations[0]) + + err = cli.Delete(context.TODO(), svcName) + assert.Nil(t, err) + relations, err = cli.List(context.TODO()) + assert.Nil(t, err) + assert.Len(t, relations, 0) +} + +func TestUpstreamRelatoinClient(t *testing.T) { + srv := runFakeUpstreamSrv(t) + defer func() { + assert.Nil(t, srv.Shutdown(context.Background())) + }() + + cache, err := cache.NewMemDBCache() + assert.Nil(t, err) + u := url.URL{ + Scheme: "http", + Host: srv.Addr, + Path: "/apisix/admin", + } + closedCh := make(chan struct{}) + clu := &cluster{ + baseURL: u.String(), + cli: http.DefaultClient, + cache: cache, + cacheSynced: closedCh, + metricsCollector: metrics.NewPrometheusCollector(), + } + close(closedCh) + relationCli := newUpstreamServiceRelation(clu) + clu.upstreamServiceRelation = relationCli + cli := newUpstreamClient(clu) + clu.upstream = cli + relationCli.cluster = clu + + // Create + key := "upstreams/abc" + lbType := "roundrobin" + upsName := "default_httpbin_80" + upsName2 := "default_httpbin_8080" + svcName := "default_httpbin" + ip := "10.0.11.153" + port := 15006 + weight := 100 + nodes := v1.UpstreamNodes{ + { + Host: ip, + Port: port, + Weight: weight, + }, + } + + obj, err := cli.Create(context.TODO(), &v1.Upstream{ + Metadata: v1.Metadata{ + ID: "1", + Name: upsName, + }, + Type: lbType, + Key: key, + Nodes: nodes, + }) + assert.Nil(t, err) + assert.Equal(t, "1", obj.ID) + relations, err := relationCli.List(context.TODO()) + assert.Nil(t, err) + assert.Len(t, relations, 1) + assert.Equal(t, &v1.UpstreamServiceRelation{ + ServiceName: svcName, + UpstreamNames: map[string]struct{}{ + upsName: {}, + }, + }, relations[0]) + + id2 := "2" + obj, err = cli.Create(context.TODO(), &v1.Upstream{ + Metadata: v1.Metadata{ + ID: id2, + Name: upsName2, + }, + Type: lbType, + Key: key, + Nodes: nodes, + }) + assert.Nil(t, err) + assert.Equal(t, "2", obj.ID) + + // List + objs, err := cli.List(context.Background()) + assert.Nil(t, err) + assert.Len(t, objs, 2) + assert.Equal(t, "1", objs[0].ID) + assert.Equal(t, "2", objs[1].ID) + relations, err = relationCli.List(context.Background()) + assert.Nil(t, err) + assert.Len(t, relations, 1) + assert.Equal(t, &v1.UpstreamServiceRelation{ + ServiceName: svcName, + UpstreamNames: map[string]struct{}{ + upsName: {}, + upsName2: {}, + }, + }, relations[0]) + + err = relationCli.Delete(context.Background(), svcName) + assert.Nil(t, err) + objs, err = clu.Upstream().List(context.Background()) + assert.Nil(t, err) + assert.Len(t, objs, 2) + assert.Equal(t, "1", objs[0].ID) + assert.Equal(t, "2", objs[1].ID) +} diff --git a/pkg/providers/apisix/apisix_upstream.go b/pkg/providers/apisix/apisix_upstream.go index 92b091ec87..b30c3430e0 100644 --- a/pkg/providers/apisix/apisix_upstream.go +++ b/pkg/providers/apisix/apisix_upstream.go @@ -182,7 +182,7 @@ func (c *apisixUpstreamController) sync(ctx context.Context, ev *types.Event) er clusterName := c.Config.APISIX.DefaultClusterName for _, port := range svc.Spec.Ports { for _, subset := range subsets { - upsName := apisixv1.ComposeUpstreamName(namespace, name, subset.Name, port.Port) + upsName := apisixv1.ComposeUpstreamName(namespace, name, subset.Name, port.Port, "") // TODO: multiple cluster ups, err := c.APISIX.Cluster(clusterName).Upstream().Get(ctx, upsName) if err != nil { @@ -266,55 +266,66 @@ func (c *apisixUpstreamController) sync(ctx context.Context, ev *types.Event) er clusterName := c.Config.APISIX.DefaultClusterName for _, port := range svc.Spec.Ports { for _, subset := range subsets { - upsName := apisixv1.ComposeUpstreamName(namespace, name, subset.Name, port.Port) // TODO: multiple cluster - ups, err := c.APISIX.Cluster(clusterName).Upstream().Get(ctx, upsName) - if err != nil { - if err == apisixcache.ErrNotFound { - continue + update := func(upsName string) error { + ups, err := c.APISIX.Cluster(clusterName).Upstream().Get(ctx, upsName) + if err != nil { + if err == apisixcache.ErrNotFound { + return nil + } + log.Errorf("failed to get upstream %s: %s", upsName, err) + c.RecordEvent(au, corev1.EventTypeWarning, utils.ResourceSyncAborted, err) + c.recordStatus(au, utils.ResourceSyncAborted, err, metav1.ConditionFalse, au.GetGeneration()) + return err } - log.Errorf("failed to get upstream %s: %s", upsName, err) - c.RecordEvent(au, corev1.EventTypeWarning, utils.ResourceSyncAborted, err) - c.recordStatus(au, utils.ResourceSyncAborted, err, metav1.ConditionFalse, au.GetGeneration()) - return err - } - var newUps *apisixv1.Upstream - if au.Spec != nil && ev.Type != types.EventDelete { - cfg, ok := portLevelSettings[port.Port] - if !ok { - cfg = &au.Spec.ApisixUpstreamConfig + var newUps *apisixv1.Upstream + if au.Spec != nil && ev.Type != types.EventDelete { + cfg, ok := portLevelSettings[port.Port] + if !ok { + cfg = &au.Spec.ApisixUpstreamConfig + } + // FIXME Same ApisixUpstreamConfig might be translated multiple times. + newUps, err = c.translator.TranslateUpstreamConfigV2(cfg) + if err != nil { + log.Errorw("ApisixUpstream conversion cannot be completed, or the format is incorrect", + zap.Any("object", au), + zap.Error(err), + ) + c.RecordEvent(au, corev1.EventTypeWarning, utils.ResourceSyncAborted, err) + c.recordStatus(au, utils.ResourceSyncAborted, err, metav1.ConditionFalse, au.GetGeneration()) + return err + } + } else { + newUps = apisixv1.NewDefaultUpstream() } - // FIXME Same ApisixUpstreamConfig might be translated multiple times. - newUps, err = c.translator.TranslateUpstreamConfigV2(cfg) - if err != nil { - log.Errorw("found malformed ApisixUpstream", - zap.Any("object", au), + + newUps.Metadata = ups.Metadata + newUps.Nodes = ups.Nodes + log.Debugw("updating upstream since ApisixUpstream changed", + zap.String("event", ev.Type.String()), + zap.Any("upstream", newUps), + zap.Any("ApisixUpstream", au), + ) + if _, err := c.APISIX.Cluster(clusterName).Upstream().Update(ctx, newUps); err != nil { + log.Errorw("failed to update upstream", zap.Error(err), + zap.Any("upstream", newUps), + zap.Any("ApisixUpstream", au), + zap.String("cluster", clusterName), ) c.RecordEvent(au, corev1.EventTypeWarning, utils.ResourceSyncAborted, err) c.recordStatus(au, utils.ResourceSyncAborted, err, metav1.ConditionFalse, au.GetGeneration()) return err } - } else { - newUps = apisixv1.NewDefaultUpstream() + return nil } - newUps.Metadata = ups.Metadata - newUps.Nodes = ups.Nodes - log.Debugw("updating upstream since ApisixUpstream changed", - zap.String("event", ev.Type.String()), - zap.Any("upstream", newUps), - zap.Any("ApisixUpstream", au), - ) - if _, err := c.APISIX.Cluster(clusterName).Upstream().Update(ctx, newUps); err != nil { - log.Errorw("failed to update upstream", - zap.Error(err), - zap.Any("upstream", newUps), - zap.Any("ApisixUpstream", au), - zap.String("cluster", clusterName), - ) - c.RecordEvent(au, corev1.EventTypeWarning, utils.ResourceSyncAborted, err) - c.recordStatus(au, utils.ResourceSyncAborted, err, metav1.ConditionFalse, au.GetGeneration()) + err := update(apisixv1.ComposeUpstreamName(namespace, name, subset.Name, port.Port, types.ResolveGranularity.Endpoint)) + if err != nil { + return err + } + err = update(apisixv1.ComposeUpstreamName(namespace, name, subset.Name, port.Port, types.ResolveGranularity.Service)) + if err != nil { return err } } diff --git a/pkg/providers/apisix/translation/apisix_plugin_test.go b/pkg/providers/apisix/translation/apisix_plugin_test.go index 2c0b224cf6..907c72f98a 100644 --- a/pkg/providers/apisix/translation/apisix_plugin_test.go +++ b/pkg/providers/apisix/translation/apisix_plugin_test.go @@ -198,7 +198,7 @@ func TestTranslateTrafficSplitPlugin(t *testing.T) { assert.Equal(t, "192.168.1.2", ctx.Upstreams[0].Nodes[1].Host) assert.Equal(t, 9080, ctx.Upstreams[0].Nodes[1].Port) - assert.Equal(t, "test_svc-1_443", ctx.Upstreams[1].Name) + assert.Equal(t, "test_svc-1_443_service", ctx.Upstreams[1].Name) assert.Len(t, ctx.Upstreams[1].Nodes, 1) assert.Equal(t, "10.0.5.3", ctx.Upstreams[1].Nodes[0].Host) assert.Equal(t, 443, ctx.Upstreams[1].Nodes[0].Port) @@ -207,7 +207,7 @@ func TestTranslateTrafficSplitPlugin(t *testing.T) { assert.Len(t, cfg.Rules[0].WeightedUpstreams, 3) assert.Equal(t, id.GenID("test_svc-1_80"), cfg.Rules[0].WeightedUpstreams[0].UpstreamID) assert.Equal(t, 10, cfg.Rules[0].WeightedUpstreams[0].Weight) - assert.Equal(t, id.GenID("test_svc-1_443"), cfg.Rules[0].WeightedUpstreams[1].UpstreamID) + assert.Equal(t, id.GenID("test_svc-1_443_service"), cfg.Rules[0].WeightedUpstreams[1].UpstreamID) assert.Equal(t, 20, cfg.Rules[0].WeightedUpstreams[1].Weight) assert.Equal(t, "", cfg.Rules[0].WeightedUpstreams[2].UpstreamID) assert.Equal(t, 30, cfg.Rules[0].WeightedUpstreams[2].Weight) diff --git a/pkg/providers/apisix/translation/apisix_route.go b/pkg/providers/apisix/translation/apisix_route.go index f91ea1289e..40531158b4 100644 --- a/pkg/providers/apisix/translation/apisix_route.go +++ b/pkg/providers/apisix/translation/apisix_route.go @@ -175,7 +175,7 @@ func (t *translator) translateHTTPRouteV2beta2(ctx *translation.TranslateContext return err } - upstreamName := apisixv1.ComposeUpstreamName(ar.Namespace, backend.ServiceName, backend.Subset, svcPort) + upstreamName := apisixv1.ComposeUpstreamName(ar.Namespace, backend.ServiceName, backend.Subset, svcPort, backend.ResolveGranularity) route := apisixv1.NewDefaultRoute() route.Name = apisixv1.ComposeRouteName(ar.Namespace, ar.Name, part.Name) route.ID = id.GenID(route.Name) @@ -309,7 +309,7 @@ func (t *translator) translateHTTPRouteV2beta3(ctx *translation.TranslateContext return err } - upstreamName := apisixv1.ComposeUpstreamName(ar.Namespace, backend.ServiceName, backend.Subset, svcPort) + upstreamName := apisixv1.ComposeUpstreamName(ar.Namespace, backend.ServiceName, backend.Subset, svcPort, backend.ResolveGranularity) route := apisixv1.NewDefaultRoute() route.Name = apisixv1.ComposeRouteName(ar.Namespace, ar.Name, part.Name) route.ID = id.GenID(route.Name) @@ -445,7 +445,7 @@ func (t *translator) translateHTTPRouteV2(ctx *translation.TranslateContext, ar return err } - upstreamName := apisixv1.ComposeUpstreamName(ar.Namespace, backend.ServiceName, backend.Subset, svcPort) + upstreamName := apisixv1.ComposeUpstreamName(ar.Namespace, backend.ServiceName, backend.Subset, svcPort, backend.ResolveGranularity) route := apisixv1.NewDefaultRoute() route.Name = apisixv1.ComposeRouteName(ar.Namespace, ar.Name, part.Name) route.ID = id.GenID(route.Name) @@ -597,13 +597,13 @@ func (t *translator) translateHTTPRouteV2beta2NotStrictly(ctx *translation.Trans // Use the first backend as the default backend in Route, // others will be configured in traffic-split plugin. backend := backends[0] - upstreamName := apisixv1.ComposeUpstreamName(ar.Namespace, backend.ServiceName, backend.Subset, backend.ServicePort.IntVal) + upstreamName := apisixv1.ComposeUpstreamName(ar.Namespace, backend.ServiceName, backend.Subset, backend.ServicePort.IntVal, backend.ResolveGranularity) route := apisixv1.NewDefaultRoute() route.Name = apisixv1.ComposeRouteName(ar.Namespace, ar.Name, part.Name) route.ID = id.GenID(route.Name) ctx.AddRoute(route) if !ctx.CheckUpstreamExist(upstreamName) { - ups, err := t.translateUpstreamNotStrictly(ar.Namespace, backend.ServiceName, backend.Subset, backend.ServicePort.IntVal) + ups, err := t.translateUpstreamNotStrictly(ar.Namespace, backend.ServiceName, backend.Subset, backend.ServicePort.IntVal, backend.ResolveGranularity) if err != nil { return err } @@ -652,7 +652,7 @@ func (t *translator) translateHTTPRouteV2beta3NotStrictly(ctx *translation.Trans } } - upstreamName := apisixv1.ComposeUpstreamName(ar.Namespace, backend.ServiceName, backend.Subset, backend.ServicePort.IntVal) + upstreamName := apisixv1.ComposeUpstreamName(ar.Namespace, backend.ServiceName, backend.Subset, backend.ServicePort.IntVal, backend.ResolveGranularity) route := apisixv1.NewDefaultRoute() route.Name = apisixv1.ComposeRouteName(ar.Namespace, ar.Name, part.Name) route.ID = id.GenID(route.Name) @@ -662,7 +662,7 @@ func (t *translator) translateHTTPRouteV2beta3NotStrictly(ctx *translation.Trans ctx.AddRoute(route) if !ctx.CheckUpstreamExist(upstreamName) { - ups, err := t.translateUpstreamNotStrictly(ar.Namespace, backend.ServiceName, backend.Subset, backend.ServicePort.IntVal) + ups, err := t.translateUpstreamNotStrictly(ar.Namespace, backend.ServiceName, backend.Subset, backend.ServicePort.IntVal, backend.ResolveGranularity) if err != nil { return err } @@ -711,7 +711,7 @@ func (t *translator) translateHTTPRouteV2NotStrictly(ctx *translation.TranslateC } } - upstreamName := apisixv1.ComposeUpstreamName(ar.Namespace, backend.ServiceName, backend.Subset, backend.ServicePort.IntVal) + upstreamName := apisixv1.ComposeUpstreamName(ar.Namespace, backend.ServiceName, backend.Subset, backend.ServicePort.IntVal, backend.ResolveGranularity) route := apisixv1.NewDefaultRoute() route.Name = apisixv1.ComposeRouteName(ar.Namespace, ar.Name, part.Name) route.ID = id.GenID(route.Name) @@ -721,7 +721,7 @@ func (t *translator) translateHTTPRouteV2NotStrictly(ctx *translation.TranslateC ctx.AddRoute(route) if !ctx.CheckUpstreamExist(upstreamName) { - ups, err := t.translateUpstreamNotStrictly(ar.Namespace, backend.ServiceName, backend.Subset, backend.ServicePort.IntVal) + ups, err := t.translateUpstreamNotStrictly(ar.Namespace, backend.ServiceName, backend.Subset, backend.ServicePort.IntVal, backend.ResolveGranularity) if err != nil { return err } @@ -859,7 +859,7 @@ func (t *translator) translateStreamRouteNotStrictlyV2beta2(ctx *translation.Tra name := apisixv1.ComposeStreamRouteName(ar.Namespace, ar.Name, part.Name) sr.ID = id.GenID(name) sr.ServerPort = part.Match.IngressPort - ups, err := t.translateUpstreamNotStrictly(ar.Namespace, backend.ServiceName, backend.Subset, backend.ServicePort.IntVal) + ups, err := t.translateUpstreamNotStrictly(ar.Namespace, backend.ServiceName, backend.Subset, backend.ServicePort.IntVal, backend.ResolveGranularity) if err != nil { return err } @@ -880,7 +880,7 @@ func (t *translator) translateStreamRouteNotStrictlyV2beta3(ctx *translation.Tra name := apisixv1.ComposeStreamRouteName(ar.Namespace, ar.Name, part.Name) sr.ID = id.GenID(name) sr.ServerPort = part.Match.IngressPort - ups, err := t.translateUpstreamNotStrictly(ar.Namespace, backend.ServiceName, backend.Subset, backend.ServicePort.IntVal) + ups, err := t.translateUpstreamNotStrictly(ar.Namespace, backend.ServiceName, backend.Subset, backend.ServicePort.IntVal, backend.ResolveGranularity) if err != nil { return err } @@ -901,7 +901,7 @@ func (t *translator) translateStreamRouteNotStrictlyV2(ctx *translation.Translat name := apisixv1.ComposeStreamRouteName(ar.Namespace, ar.Name, part.Name) sr.ID = id.GenID(name) sr.ServerPort = part.Match.IngressPort - ups, err := t.translateUpstreamNotStrictly(ar.Namespace, backend.ServiceName, backend.Subset, backend.ServicePort.IntVal) + ups, err := t.translateUpstreamNotStrictly(ar.Namespace, backend.ServiceName, backend.Subset, backend.ServicePort.IntVal, backend.ResolveGranularity) if err != nil { return err } diff --git a/pkg/providers/apisix/translation/apisix_upstream.go b/pkg/providers/apisix/translation/apisix_upstream.go index ac1e901b5d..d0052c5d14 100644 --- a/pkg/providers/apisix/translation/apisix_upstream.go +++ b/pkg/providers/apisix/translation/apisix_upstream.go @@ -17,13 +17,14 @@ package translation import ( "github.com/apache/apisix-ingress-controller/pkg/id" "github.com/apache/apisix-ingress-controller/pkg/providers/translation" + "github.com/apache/apisix-ingress-controller/pkg/types" apisixv1 "github.com/apache/apisix-ingress-controller/pkg/types/apisix/v1" ) // translateUpstreamNotStrictly translates Upstream nodes with a loose way, only generate ID and Name for delete Event. -func (t *translator) translateUpstreamNotStrictly(namespace, svcName, subset string, svcPort int32) (*apisixv1.Upstream, error) { +func (t *translator) translateUpstreamNotStrictly(namespace, svcName, subset string, svcPort int32, resolveGranularity string) (*apisixv1.Upstream, error) { ups := &apisixv1.Upstream{} - ups.Name = apisixv1.ComposeUpstreamName(namespace, svcName, subset, svcPort) + ups.Name = apisixv1.ComposeUpstreamName(namespace, svcName, subset, svcPort, resolveGranularity) ups.ID = id.GenID(ups.Name) return ups, nil } @@ -33,7 +34,7 @@ func (t *translator) translateService(namespace, svcName, subset, svcResolveGran if err != nil { return nil, err } - if svcResolveGranularity == "service" { + if svcResolveGranularity == types.ResolveGranularity.Service { ups.Nodes = apisixv1.UpstreamNodes{ { Host: svcClusterIP, @@ -42,7 +43,7 @@ func (t *translator) translateService(namespace, svcName, subset, svcResolveGran }, } } - ups.Name = apisixv1.ComposeUpstreamName(namespace, svcName, subset, svcPort) + ups.Name = apisixv1.ComposeUpstreamName(namespace, svcName, subset, svcPort, svcResolveGranularity) ups.ID = id.GenID(ups.Name) return ups, nil } diff --git a/pkg/providers/gateway/translation/gateway_httproute.go b/pkg/providers/gateway/translation/gateway_httproute.go index e1eaa657cf..68006f60a3 100644 --- a/pkg/providers/gateway/translation/gateway_httproute.go +++ b/pkg/providers/gateway/translation/gateway_httproute.go @@ -29,6 +29,7 @@ import ( "github.com/apache/apisix-ingress-controller/pkg/log" "github.com/apache/apisix-ingress-controller/pkg/providers/translation" "github.com/apache/apisix-ingress-controller/pkg/providers/utils" + "github.com/apache/apisix-ingress-controller/pkg/types" apisixv1 "github.com/apache/apisix-ingress-controller/pkg/types/apisix/v1" ) @@ -96,7 +97,7 @@ func (t *translator) TranslateGatewayHTTPRouteV1Alpha2(httpRoute *gatewayv1alpha if err != nil { return nil, errors.Wrap(err, fmt.Sprintf("failed to translate Rules[%v].BackendRefs[%v]", i, j)) } - name := apisixv1.ComposeUpstreamName(ns, string(backend.Name), "", int32(*backend.Port)) + name := apisixv1.ComposeUpstreamName(ns, string(backend.Name), "", int32(*backend.Port), types.ResolveGranularity.Endpoint) // APISIX limits max length of label value // https://github.com/apache/apisix/blob/5b95b85faea3094d5e466ee2d39a52f1f805abbb/apisix/schema_def.lua#L85 diff --git a/pkg/providers/gateway/translation/gateway_tlsroute.go b/pkg/providers/gateway/translation/gateway_tlsroute.go index 138cc40c2e..4af2c6a29e 100644 --- a/pkg/providers/gateway/translation/gateway_tlsroute.go +++ b/pkg/providers/gateway/translation/gateway_tlsroute.go @@ -29,6 +29,7 @@ import ( "github.com/apache/apisix-ingress-controller/pkg/log" "github.com/apache/apisix-ingress-controller/pkg/providers/translation" "github.com/apache/apisix-ingress-controller/pkg/providers/utils" + "github.com/apache/apisix-ingress-controller/pkg/types" apisixv1 "github.com/apache/apisix-ingress-controller/pkg/types/apisix/v1" ) @@ -90,7 +91,7 @@ func (t *translator) TranslateGatewayTLSRouteV1Alpha2(tlsRoute *gatewayv1alpha2. if err != nil { return nil, errors.Wrap(err, fmt.Sprintf("failed to translate Rules[%v].BackendRefs[%v]", i, j)) } - name := apisixv1.ComposeUpstreamName(ns, string(backend.Name), "", int32(*backend.Port)) + name := apisixv1.ComposeUpstreamName(ns, string(backend.Name), "", int32(*backend.Port), types.ResolveGranularity.Endpoint) ups.Labels["meta_namespace"] = utils.TruncateString(ns, 64) ups.Labels["meta_backend"] = utils.TruncateString(string(backend.Name), 64) diff --git a/pkg/providers/ingress/translation/translator.go b/pkg/providers/ingress/translation/translator.go index 5a7712a991..025df60722 100644 --- a/pkg/providers/ingress/translation/translator.go +++ b/pkg/providers/ingress/translation/translator.go @@ -41,6 +41,7 @@ import ( "github.com/apache/apisix-ingress-controller/pkg/log" apisixtranslation "github.com/apache/apisix-ingress-controller/pkg/providers/apisix/translation" "github.com/apache/apisix-ingress-controller/pkg/providers/translation" + "github.com/apache/apisix-ingress-controller/pkg/types" apisixv1 "github.com/apache/apisix-ingress-controller/pkg/types/apisix/v1" ) @@ -350,7 +351,7 @@ func (t *translator) translateDefaultUpstreamFromIngressV1(namespace string, bac portNumber = backend.Port.Number } ups := apisixv1.NewDefaultUpstream() - ups.Name = apisixv1.ComposeUpstreamName(namespace, backend.Name, "", portNumber) + ups.Name = apisixv1.ComposeUpstreamName(namespace, backend.Name, "", portNumber, types.ResolveGranularity.Endpoint) ups.ID = id.GenID(ups.Name) return ups } @@ -380,7 +381,7 @@ func (t *translator) translateUpstreamFromIngressV1(namespace string, backend *n if err != nil { return nil, err } - ups.Name = apisixv1.ComposeUpstreamName(namespace, backend.Name, "", svcPort) + ups.Name = apisixv1.ComposeUpstreamName(namespace, backend.Name, "", svcPort, types.ResolveGranularity.Endpoint) ups.ID = id.GenID(ups.Name) return ups, nil } @@ -491,7 +492,7 @@ func (t *translator) translateDefaultUpstreamFromIngressV1beta1(namespace string portNumber = svcPort.IntVal } ups := apisixv1.NewDefaultUpstream() - ups.Name = apisixv1.ComposeUpstreamName(namespace, svcName, "", portNumber) + ups.Name = apisixv1.ComposeUpstreamName(namespace, svcName, "", portNumber, types.ResolveGranularity.Endpoint) ups.ID = id.GenID(ups.Name) return ups } @@ -522,7 +523,7 @@ func (t *translator) translateUpstreamFromIngressV1beta1(namespace string, svcNa if err != nil { return nil, err } - ups.Name = apisixv1.ComposeUpstreamName(namespace, svcName, "", portNumber) + ups.Name = apisixv1.ComposeUpstreamName(namespace, svcName, "", portNumber, types.ResolveGranularity.Endpoint) ups.ID = id.GenID(ups.Name) return ups, nil } diff --git a/pkg/providers/k8s/endpoint/base.go b/pkg/providers/k8s/endpoint/base.go index ac67aa870a..7f6af9a648 100644 --- a/pkg/providers/k8s/endpoint/base.go +++ b/pkg/providers/k8s/endpoint/base.go @@ -32,6 +32,7 @@ import ( "github.com/apache/apisix-ingress-controller/pkg/log" "github.com/apache/apisix-ingress-controller/pkg/providers/translation" providertypes "github.com/apache/apisix-ingress-controller/pkg/providers/types" + "github.com/apache/apisix-ingress-controller/pkg/types" apisixv1 "github.com/apache/apisix-ingress-controller/pkg/types/apisix/v1" ) @@ -56,8 +57,7 @@ func (c *baseEndpointController) syncEndpoint(ctx context.Context, ep kube.Endpo svc, err := c.svcLister.Services(namespace).Get(svcName) if err != nil { if k8serrors.IsNotFound(err) { - log.Infof("service %s/%s not found", namespace, svcName) - return nil + return c.syncEmptyEndpoint(ctx, ep) } log.Errorf("failed to get service %s/%s: %s", namespace, svcName, err) return err @@ -87,7 +87,7 @@ func (c *baseEndpointController) syncEndpoint(ctx context.Context, ep kube.Endpo zap.Int32("port", port.Port), ) } - name := apisixv1.ComposeUpstreamName(namespace, svcName, subset.Name, port.Port) + name := apisixv1.ComposeUpstreamName(namespace, svcName, subset.Name, port.Port, types.ResolveGranularity.Endpoint) for _, cluster := range clusters { if err := c.SyncUpstreamNodesChangeToCluster(ctx, cluster, nodes, name); err != nil { return err @@ -118,7 +118,7 @@ func (c *baseEndpointController) syncEndpoint(ctx context.Context, ep kube.Endpo zap.Int32("port", port.Port), ) } - name := apisixv1.ComposeUpstreamName(namespace, svcName, subset.Name, port.Port) + name := apisixv1.ComposeUpstreamName(namespace, svcName, subset.Name, port.Port, types.ResolveGranularity.Endpoint) for _, cluster := range clusters { if err := c.SyncUpstreamNodesChangeToCluster(ctx, cluster, nodes, name); err != nil { return err @@ -131,3 +131,23 @@ func (c *baseEndpointController) syncEndpoint(ctx context.Context, ep kube.Endpo } return nil } + +func (c *baseEndpointController) syncEmptyEndpoint(ctx context.Context, ep kube.Endpoint) error { + namespace, err := ep.Namespace() + if err != nil { + return err + } + svcName := ep.ServiceName() + log.Debugw("The service has been deleted, try to delete upstream relation", + zap.String("namespace", namespace), + zap.String("service_name", svcName), + ) + clusterName := c.Config.APISIX.DefaultClusterName + err = c.APISIX.Cluster(clusterName).UpstreamServiceRelation().Delete(ctx, namespace+"_"+svcName) + if err != nil { + log.Errorw("delete upstream relation failed", + zap.Error(err), + ) + } + return nil +} diff --git a/pkg/providers/k8s/endpoint/endpoint.go b/pkg/providers/k8s/endpoint/endpoint.go index 9b14bf1206..8b817dea78 100644 --- a/pkg/providers/k8s/endpoint/endpoint.go +++ b/pkg/providers/k8s/endpoint/endpoint.go @@ -28,7 +28,6 @@ import ( "github.com/apache/apisix-ingress-controller/pkg/log" "github.com/apache/apisix-ingress-controller/pkg/providers/k8s/namespace" "github.com/apache/apisix-ingress-controller/pkg/types" - v1 "github.com/apache/apisix-ingress-controller/pkg/types/apisix/v1" ) type endpointsController struct { @@ -103,22 +102,12 @@ func (c *endpointsController) sync(ctx context.Context, ev *types.Event) error { if err != nil { return err } - if ev.Type == types.EventDelete { - clusterName := c.Config.APISIX.DefaultClusterName - err = c.APISIX.Cluster(clusterName).UpstreamServiceRelation().Delete(ctx, - &v1.UpstreamServiceRelation{ - ServiceName: ns + "_" + ep.ServiceName(), - }) - if err != nil { - return err - } - } newestEp, err := c.epLister.GetEndpoint(ns, ep.ServiceName()) if err != nil { - if !errors.IsNotFound(err) { - return err + if errors.IsNotFound(err) { + return c.syncEmptyEndpoint(ctx, ep) } - newestEp = ep + return err } return c.syncEndpoint(ctx, newestEp) } diff --git a/pkg/providers/k8s/endpoint/endpointslice.go b/pkg/providers/k8s/endpoint/endpointslice.go index 972ecb5a01..01a94af310 100644 --- a/pkg/providers/k8s/endpoint/endpointslice.go +++ b/pkg/providers/k8s/endpoint/endpointslice.go @@ -20,6 +20,7 @@ import ( "go.uber.org/zap" discoveryv1 "k8s.io/api/discovery/v1" + "k8s.io/apimachinery/pkg/api/errors" k8serrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/client-go/tools/cache" "k8s.io/client-go/util/workqueue" @@ -28,18 +29,12 @@ import ( "github.com/apache/apisix-ingress-controller/pkg/log" "github.com/apache/apisix-ingress-controller/pkg/providers/k8s/namespace" "github.com/apache/apisix-ingress-controller/pkg/types" - v1 "github.com/apache/apisix-ingress-controller/pkg/types/apisix/v1" ) const ( _endpointSlicesManagedBy = "endpointslice-controller.k8s.io" ) -type endpointSliceEvent struct { - Key string - ServiceName string -} - type endpointSliceController struct { *baseEndpointController @@ -107,34 +102,23 @@ func (c *endpointSliceController) run(ctx context.Context) { } func (c *endpointSliceController) sync(ctx context.Context, ev *types.Event) error { - log.Debugw("process endpoint slice event", + log.Debugw("process endpoint slice sync event", zap.Any("event", ev), ) - epEvent := ev.Object.(endpointSliceEvent) - namespace, _, err := cache.SplitMetaNamespaceKey(epEvent.Key) + ep := ev.Object.(kube.Endpoint) + ns, err := ep.Namespace() if err != nil { - log.Errorf("found endpointSlice object with bad namespace/name: %s, ignore it", epEvent.Key) - return nil - } - if ev.Type == types.EventDelete { - log.Debugw("endpointsplice upstream serviece sync", - zap.String("service_name", epEvent.ServiceName)) - clusterName := c.Config.APISIX.DefaultClusterName - err = c.APISIX.Cluster(clusterName).UpstreamServiceRelation().Delete(ctx, - &v1.UpstreamServiceRelation{ - ServiceName: namespace + "_" + epEvent.ServiceName, - }) - if err != nil { - return err - } + return err } - ep, err := c.epLister.GetEndpoint(namespace, epEvent.ServiceName) + + newestEp, err := c.epLister.GetEndpoint(ns, ep.ServiceName()) if err != nil { - log.Errorf("failed to get all endpointSlices for service %s: %s", - epEvent.ServiceName, err) + if errors.IsNotFound(err) { + return c.syncEmptyEndpoint(ctx, ep) + } return err } - return c.syncEndpoint(ctx, ep) + return c.syncEndpoint(ctx, newestEp) } func (c *endpointSliceController) handleSyncErr(obj interface{}, err error) { @@ -147,7 +131,7 @@ func (c *endpointSliceController) handleSyncErr(obj interface{}, err error) { if k8serrors.IsNotFound(err) && event.Type != types.EventDelete { log.Infow("sync endpointSlice but not found, ignore", zap.String("event_type", event.Type.String()), - zap.Any("endpointSlice", event.Object.(endpointSliceEvent)), + zap.Any("endpointSlice", event.Object.(kube.Endpoint)), ) c.workqueue.Forget(event) return @@ -183,11 +167,8 @@ func (c *endpointSliceController) onAdd(obj interface{}) { ) c.workqueue.Add(&types.Event{ - Type: types.EventAdd, - Object: endpointSliceEvent{ - Key: key, - ServiceName: svcName, - }, + Type: types.EventAdd, + Object: kube.NewEndpointWithSlice(ep), }) c.MetricsCollector.IncrEvents("endpointSlice", "add") @@ -208,15 +189,15 @@ func (c *endpointSliceController) onUpdate(prev, curr interface{}) { if !c.namespaceProvider.IsWatchingNamespace(key) { return } + svcName := currEp.Labels[discoveryv1.LabelServiceName] + if svcName == "" { + return + } if currEp.Labels[discoveryv1.LabelManagedBy] != _endpointSlicesManagedBy { // We only care about endpointSlice objects managed by the EndpointSlices // controller. return } - svcName := currEp.Labels[discoveryv1.LabelServiceName] - if svcName == "" { - return - } log.Debugw("endpointSlice update event arrived", zap.Any("new object", currEp), @@ -225,10 +206,7 @@ func (c *endpointSliceController) onUpdate(prev, curr interface{}) { c.workqueue.Add(&types.Event{ Type: types.EventUpdate, // TODO pass key. - Object: endpointSliceEvent{ - Key: key, - ServiceName: svcName, - }, + Object: kube.NewEndpointWithSlice(currEp), }) c.MetricsCollector.IncrEvents("endpointSlice", "update") @@ -252,21 +230,21 @@ func (c *endpointSliceController) onDelete(obj interface{}) { if !c.namespaceProvider.IsWatchingNamespace(key) { return } + svcName := ep.Labels[discoveryv1.LabelServiceName] + if svcName == "" { + return + } if ep.Labels[discoveryv1.LabelManagedBy] != _endpointSlicesManagedBy { // We only care about endpointSlice objects managed by the EndpointSlices // controller. return } - svcName := ep.Labels[discoveryv1.LabelServiceName] log.Debugw("endpoints delete event arrived", zap.Any("object-key", key), ) c.workqueue.Add(&types.Event{ - Type: types.EventDelete, - Object: endpointSliceEvent{ - Key: key, - ServiceName: svcName, - }, + Type: types.EventDelete, + Object: kube.NewEndpointWithSlice(ep), }) c.MetricsCollector.IncrEvents("endpointSlice", "delete") diff --git a/pkg/types/apisix/v1/types.go b/pkg/types/apisix/v1/types.go index ea326adf34..b119c62483 100644 --- a/pkg/types/apisix/v1/types.go +++ b/pkg/types/apisix/v1/types.go @@ -22,6 +22,8 @@ import ( "strconv" "strings" "time" + + "github.com/apache/apisix-ingress-controller/pkg/types" ) const ( @@ -401,8 +403,8 @@ type PluginConfig struct { // UpstreamServiceRelation Upstream association object // +k8s:deepcopy-gen=true type UpstreamServiceRelation struct { - ServiceName string `json:"service_name" yaml:"service_name"` - UpstreamName string `json:"upstream_name,omitempty" yaml:"upstream_name,omitempty"` + ServiceName string `json:"service_name" yaml:"service_name"` + UpstreamNames map[string]struct{} `json:"upstream_name,omitempty" yaml:"upstream_name,omitempty"` } // NewDefaultUpstream returns an empty Upstream with default values. @@ -466,19 +468,23 @@ func NewDefaultPluginConfig() *PluginConfig { } } -// ComposeUpstreamName uses namespace, name, subset (optional) and port info to compose +// ComposeUpstreamName uses namespace, name, subset (optional), port, resolveGranularity info to compose // the upstream name. -func ComposeUpstreamName(namespace, name, subset string, port int32) string { +// the resolveGranularity is not composited in the upstream name when it is endpoint. +func ComposeUpstreamName(namespace, name, subset string, port int32, resolveGranularity string) string { pstr := strconv.Itoa(int(port)) // FIXME Use sync.Pool to reuse this buffer if the upstream // name composing code path is hot. var p []byte - if subset == "" { - p = make([]byte, 0, len(namespace)+len(name)+len(pstr)+2) - } else { - p = make([]byte, 0, len(namespace)+len(name)+len(subset)+len(pstr)+3) + plen := len(namespace) + len(name) + len(pstr) + 2 + if subset != "" { + plen = plen + len(subset) + 1 + } + if resolveGranularity == types.ResolveGranularity.Service { + plen = plen + len(resolveGranularity) + 1 } + p = make([]byte, 0, plen) buf := bytes.NewBuffer(p) buf.WriteString(namespace) buf.WriteByte('_') @@ -489,6 +495,10 @@ func ComposeUpstreamName(namespace, name, subset string, port int32) string { buf.WriteByte('_') } buf.WriteString(pstr) + if resolveGranularity == types.ResolveGranularity.Service { + buf.WriteByte('_') + buf.WriteString(resolveGranularity) + } return buf.String() } diff --git a/pkg/types/apisix/v1/zz_generated.deepcopy.go b/pkg/types/apisix/v1/zz_generated.deepcopy.go index a353718b64..e4eeadd9d3 100644 --- a/pkg/types/apisix/v1/zz_generated.deepcopy.go +++ b/pkg/types/apisix/v1/zz_generated.deepcopy.go @@ -764,6 +764,13 @@ func (in *UpstreamPassiveHealthCheckUnhealthy) DeepCopy() *UpstreamPassiveHealth // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *UpstreamServiceRelation) DeepCopyInto(out *UpstreamServiceRelation) { *out = *in + if in.UpstreamNames != nil { + in, out := &in.UpstreamNames, &out.UpstreamNames + *out = make(map[string]struct{}, len(*in)) + for key, val := range *in { + (*out)[key] = val + } + } return } diff --git a/pkg/types/service.go b/pkg/types/service.go new file mode 100644 index 0000000000..418f4a3b19 --- /dev/null +++ b/pkg/types/service.go @@ -0,0 +1,25 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package types + +var ( + ResolveGranularity = struct { + Endpoint string + Service string + }{ + Endpoint: "endpoint", + Service: "service", + } +) diff --git a/test/e2e/suite-chore/resolvegranularity.go b/test/e2e/suite-chore/resolvegranularity.go new file mode 100644 index 0000000000..e7b036bc27 --- /dev/null +++ b/test/e2e/suite-chore/resolvegranularity.go @@ -0,0 +1,127 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package chore + +import ( + "fmt" + "net/http" + "time" + + ginkgo "github.com/onsi/ginkgo/v2" + "github.com/stretchr/testify/assert" + + "github.com/apache/apisix-ingress-controller/test/e2e/scaffold" +) + +var _ = ginkgo.Describe("suite-chore: ApisixRoute resolvegranularity Testing", func() { + s := scaffold.NewDefaultScaffold() + ginkgo.It("service and upstream [1:m]", func() { + assert.Nil(ginkgo.GinkgoT(), s.ScaleHTTPBIN(2)) + time.Sleep(5 * time.Second) + + backendSvc, backendSvcPort := s.DefaultHTTPBackend() + route1 := fmt.Sprintf(` +apiVersion: apisix.apache.org/v2 +kind: ApisixRoute +metadata: + name: httpbin-route1 +spec: + http: + - name: route1 + match: + hosts: + - httpbin.org + paths: + - /ip + backends: + - serviceName: %s + servicePort: %d + resolveGranularity: service +`, backendSvc, backendSvcPort[0]) + assert.Nil(ginkgo.GinkgoT(), s.CreateResourceFromString(route1)) + assert.Nil(ginkgo.GinkgoT(), s.EnsureNumApisixRoutesCreated(1), "checking number of routes") + ups, err := s.ListApisixUpstreams() + assert.Nil(ginkgo.GinkgoT(), err) + assert.Len(ginkgo.GinkgoT(), ups, 1) + assert.Len(ginkgo.GinkgoT(), ups[0].Nodes, 1) + _ = s.NewAPISIXClient().GET("/ip"). + WithHeader("Host", "httpbin.org"). + Expect(). + Status(http.StatusOK) + + route2 := fmt.Sprintf(` +apiVersion: apisix.apache.org/v2 +kind: ApisixRoute +metadata: + name: httpbin-route2 +spec: + http: + - name: route2 + match: + hosts: + - httpbin.com + paths: + - /get + backends: + - serviceName: %s + servicePort: %d + resolveGranularity: endpoint +`, backendSvc, backendSvcPort[0]) + assert.Nil(ginkgo.GinkgoT(), s.CreateVersionedApisixResource(route2)) + assert.Nil(ginkgo.GinkgoT(), s.EnsureNumApisixRoutesCreated(2), "checking number of routes") + ups, err = s.ListApisixUpstreams() + assert.Nil(ginkgo.GinkgoT(), err) + assert.Len(ginkgo.GinkgoT(), ups, 2) + if len(ups[0].Nodes) == 1 { + assert.Len(ginkgo.GinkgoT(), ups[1].Nodes, 2) + } else { + assert.Len(ginkgo.GinkgoT(), ups[0].Nodes, 2) + assert.Len(ginkgo.GinkgoT(), ups[1].Nodes, 1) + } + _ = s.NewAPISIXClient().GET("/get"). + WithHeader("Host", "httpbin.com"). + Expect(). + Status(http.StatusOK) + // Verify consistency after apisix-ingress-controller restart + verify := func() { + s.RestartIngressControllerDeploy() + time.Sleep(15 * time.Second) + + ups, err = s.ListApisixUpstreams() + assert.Nil(ginkgo.GinkgoT(), err) + assert.Len(ginkgo.GinkgoT(), ups, 2) + if len(ups[0].Nodes) == 1 { + assert.Len(ginkgo.GinkgoT(), ups[1].Nodes, 2) + } else { + assert.Len(ginkgo.GinkgoT(), ups[0].Nodes, 2) + assert.Len(ginkgo.GinkgoT(), ups[1].Nodes, 1) + } + + _ = s.NewAPISIXClient().GET("/ip"). + WithHeader("Host", "httpbin.org"). + Expect(). + Status(http.StatusOK) + + _ = s.NewAPISIXClient().GET("/get"). + WithHeader("Host", "httpbin.com"). + Expect(). + Status(http.StatusOK) + } + + for i := 0; i < 5; i++ { + verify() + } + }) +})