Skip to content

Commit

Permalink
fix: support resolveGranularity of ApisixRoute (#1251)
Browse files Browse the repository at this point in the history
  • Loading branch information
AlinsRan authored Sep 9, 2022
1 parent 5c0ea2b commit 9d663ab
Show file tree
Hide file tree
Showing 21 changed files with 569 additions and 203 deletions.
7 changes: 5 additions & 2 deletions pkg/apisix/apisix.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,10 +153,13 @@ type PluginConfig interface {
}

type UpstreamServiceRelation interface {
// Get relation based on namespace+"_"+service.name
Get(context.Context, string) (*v1.UpstreamServiceRelation, error)
List(context.Context) ([]*v1.UpstreamServiceRelation, error)
Delete(context.Context, *v1.UpstreamServiceRelation) error
Create(context.Context, *v1.UpstreamServiceRelation) error
// Delete relation based on namespace+"_"+service.name
Delete(context.Context, string) error
// Build relation based on upstream.name
Create(context.Context, string) error
}

type apisix struct {
Expand Down
6 changes: 4 additions & 2 deletions pkg/apisix/cache/memdb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -486,8 +486,10 @@ func TestMemDBCacheUpstreamServiceRelation(t *testing.T) {
assert.Equal(t, us2, us)

us3 := &v1.UpstreamServiceRelation{
ServiceName: "httpbin",
UpstreamName: "upstream",
ServiceName: "httpbin",
UpstreamNames: map[string]struct{}{
"upstream": {},
},
}
assert.Nil(t, c.InsertUpstreamServiceRelation(us3), "inserting upstream_service 3")

Expand Down
4 changes: 2 additions & 2 deletions pkg/apisix/nonexistentclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,13 +248,13 @@ type dummyUpstreamServiceRelation struct {
func (f *dummyUpstreamServiceRelation) Get(_ context.Context, _ string) (*v1.UpstreamServiceRelation, error) {
return nil, ErrClusterNotExist
}
func (f *dummyUpstreamServiceRelation) Create(_ context.Context, _ *v1.UpstreamServiceRelation) error {
func (f *dummyUpstreamServiceRelation) Create(_ context.Context, _ string) error {
return ErrClusterNotExist
}
func (f *dummyUpstreamServiceRelation) List(_ context.Context) ([]*v1.UpstreamServiceRelation, error) {
return nil, ErrClusterNotExist
}
func (f *dummyUpstreamServiceRelation) Delete(_ context.Context, _ *v1.UpstreamServiceRelation) error {
func (f *dummyUpstreamServiceRelation) Delete(_ context.Context, _ string) error {
return ErrClusterNotExist
}

Expand Down
7 changes: 5 additions & 2 deletions pkg/apisix/resource_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,11 @@ func TestItemConvertUpstream(t *testing.T) {
ups, err := ite.upstream()
assert.Nil(t, err)
assert.Len(t, ups.Nodes, 2)
assert.Equal(t, ups.Nodes[0], v1.UpstreamNode{Host: "httpbin.org", Port: 80, Weight: 1})
assert.Equal(t, ups.Nodes[1], v1.UpstreamNode{Host: "foo.com", Port: 8080, Weight: 2})
if ups.Nodes[0].Host == "foo.com" {
ups.Nodes[0], ups.Nodes[1] = ups.Nodes[1], ups.Nodes[0]
}
assert.Equal(t, v1.UpstreamNode{Host: "httpbin.org", Port: 80, Weight: 1}, ups.Nodes[0])
assert.Equal(t, v1.UpstreamNode{Host: "foo.com", Port: 8080, Weight: 2}, ups.Nodes[1])

ite = &item{
Key: "/apisix/upstreams/419655639963271872",
Expand Down
7 changes: 2 additions & 5 deletions pkg/apisix/upstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ func (u *upstreamClient) Create(ctx context.Context, obj *v1.Upstream) (*v1.Upst
zap.String("cluster", "default"),
)

if err := u.cluster.upstreamServiceRelation.Create(ctx, &v1.UpstreamServiceRelation{UpstreamName: obj.Name}); err != nil {
if err := u.cluster.upstreamServiceRelation.Create(ctx, obj.Name); err != nil {
log.Errorf("failed to reflect upstreamService create to cache: %s", err)
}
if err := u.cluster.HasSynced(ctx); err != nil {
Expand Down Expand Up @@ -187,9 +187,6 @@ func (u *upstreamClient) Delete(ctx context.Context, obj *v1.Upstream) error {
return err
}
}
if err := u.cluster.upstreamServiceRelation.Delete(ctx, &v1.UpstreamServiceRelation{UpstreamName: obj.Name}); err != nil {
log.Errorf("failed to delete upstreamService in cache: %s", err)
}
url := u.url + "/" + obj.ID
if err := u.cluster.deleteResource(ctx, url, "upstream"); err != nil {
u.cluster.metricsCollector.IncrAPISIXRequest("upstream")
Expand All @@ -207,7 +204,7 @@ func (u *upstreamClient) Update(ctx context.Context, obj *v1.Upstream) (*v1.Upst
zap.String("url", u.url),
)

if err := u.cluster.upstreamServiceRelation.Create(ctx, &v1.UpstreamServiceRelation{UpstreamName: obj.Name}); err != nil {
if err := u.cluster.upstreamServiceRelation.Create(ctx, obj.Name); err != nil {
log.Errorf("failed to reflect upstreamService create to cache: %s", err)
}
if err := u.cluster.HasSynced(ctx); err != nil {
Expand Down
101 changes: 47 additions & 54 deletions pkg/apisix/upstreamservicerelation.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package apisix
import (
"context"
"fmt"
"strconv"
"strings"

"go.uber.org/zap"
Expand All @@ -26,6 +27,11 @@ import (
v1 "github.com/apache/apisix-ingress-controller/pkg/types/apisix/v1"
)

// to do: Delete one of the upstreams. Currently, only service is deleted. There will be some
// redundant upstream objects, but the results will not be affected. It is hoped that the service controller
// can complete the update nodes logic to avoid the intrusion of relation modules into more code.

// Maintain relationships only when resolveGranularity is endpoint
// There is no need to ensure the consistency between the upstream to services, only need to ensure that the upstream-node can be delete after deleting the service
type upstreamService struct {
cluster *cluster
Expand All @@ -37,79 +43,78 @@ func newUpstreamServiceRelation(c *cluster) *upstreamService {
}
}

func (u *upstreamService) Get(ctx context.Context, svcId string) (*v1.UpstreamServiceRelation, error) {
func (u *upstreamService) Get(ctx context.Context, serviceName string) (*v1.UpstreamServiceRelation, error) {
log.Debugw("try to get upstreamService in cache",
zap.String("svcId", svcId),
zap.String("service_name", serviceName),
zap.String("cluster", "default"),
)
us, err := u.cluster.cache.GetUpstreamServiceRelation(svcId)
if err == nil {
return us, nil
}
if err != cache.ErrNotFound {
us, err := u.cluster.cache.GetUpstreamServiceRelation(serviceName)
if err != nil && err != cache.ErrNotFound {
log.Error("failed to find upstreamService in cache",
zap.String("svcId", svcId), zap.Error(err))
} else {
log.Debugw("failed to find upstreamService in cache",
zap.String("svcId", svcId), zap.Error(err))
zap.String("service_name", serviceName), zap.Error(err))
return nil, err
}
return nil, err
return us, err
}

func (u *upstreamService) Delete(ctx context.Context, relation *v1.UpstreamServiceRelation) error {
func (u *upstreamService) Delete(ctx context.Context, serviceName string) error {
log.Debugw("try to delete upstreamService in cache",
zap.String("cluster", "default"),
)
u.initUpstreamServiceRelation(relation)
if relation == nil || relation.ServiceName == "" && relation.UpstreamName == "" {
return fmt.Errorf("UpstreamServiceRelation is empty object")
}
if relation.UpstreamName != "" {
err := u.cluster.cache.DeleteUpstreamServiceRelation(relation)
if err != nil {
return err
}
} else {
usr, err := u.cluster.cache.GetUpstreamServiceRelation(relation.ServiceName)
if err != nil {
return err
relation, err := u.Get(ctx, serviceName)
if err != nil {
if err == cache.ErrNotFound {
return nil
}
ups, err := u.cluster.upstream.Get(ctx, usr.UpstreamName)
return err
}
_ = u.cluster.cache.DeleteUpstreamServiceRelation(relation)
for upsName := range relation.UpstreamNames {
ups, err := u.cluster.upstream.Get(ctx, upsName)
if err != nil {
return err
continue
}
ups.Nodes = make(v1.UpstreamNodes, 0)
_, err = u.cluster.upstream.Update(ctx, ups)
if err != nil {
return err
}
err = u.cluster.cache.DeleteUpstreamServiceRelation(usr)
if err != nil {
return err
continue
}
}
return nil
}

func (u *upstreamService) Create(ctx context.Context, relation *v1.UpstreamServiceRelation) error {
func (u *upstreamService) Create(ctx context.Context, upstreamName string) error {
log.Debugw("try to create upstreamService in cache",
zap.String("cluster", "default"),
)
u.initUpstreamServiceRelation(relation)
if relation == nil || relation.ServiceName == "" || relation.UpstreamName == "" {
log.Error("UpstreamServiceRelation object ")

args := strings.Split(upstreamName, "_")
if len(args) < 2 {
return fmt.Errorf("wrong upstream name %s, must contains namespace_name", upstreamName)
}
// The last part of upstreanName should be a port number.
// Please refer to apisixv1.ComposeUpstreamName to see the detailed format.
_, err := strconv.Atoi(args[len(args)-1])
if err != nil {
return nil
}
us, err := u.cluster.cache.GetUpstreamServiceRelation(relation.ServiceName)

serviceName := args[0] + "_" + args[1]
relation, err := u.Get(ctx, serviceName)
if err != nil && err != cache.ErrNotFound {
return err
}
if us != nil {
us.UpstreamName = relation.UpstreamName
if relation == nil {
relation = &v1.UpstreamServiceRelation{
ServiceName: serviceName,
UpstreamNames: map[string]struct{}{
upstreamName: {},
},
}
} else {
us = relation
relation.UpstreamNames[upstreamName] = struct{}{}
}
if err := u.cluster.cache.InsertUpstreamServiceRelation(us); err != nil {
if err := u.cluster.cache.InsertUpstreamServiceRelation(relation); err != nil {
log.Errorf("failed to reflect upstreamService create to cache: %s", err)
return err
}
Expand All @@ -129,15 +134,3 @@ func (u *upstreamService) List(ctx context.Context) ([]*v1.UpstreamServiceRelati
}
return usrs, nil
}

func (u *upstreamService) initUpstreamServiceRelation(us *v1.UpstreamServiceRelation) {
if us.UpstreamName == "" || us.ServiceName != "" {
return
}
args := strings.Split(us.UpstreamName, "_")
// namespace_service_subset_port
if len(args) < 2 {
return
}
us.ServiceName = args[0] + "_" + args[1]
}
Loading

0 comments on commit 9d663ab

Please sign in to comment.