Skip to content

Commit

Permalink
fix: upstream nodes filed IP occupation. (#1064)
Browse files Browse the repository at this point in the history
  • Loading branch information
AlinsRan authored Jun 27, 2022
1 parent d46b8e0 commit 9e0c658
Show file tree
Hide file tree
Showing 13 changed files with 386 additions and 65 deletions.
9 changes: 9 additions & 0 deletions pkg/apisix/apisix.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ type Cluster interface {
PluginConfig() PluginConfig
// Schema returns a Schema interface that can fetch schema of APISIX objects.
Schema() Schema
// UpstreamServiceRelation returns a UpstreamServiceRelation interface that can fetch UpstreamServiceRelation of APISIX objects.
UpstreamServiceRelation() UpstreamServiceRelation
}

// Route is the specific client interface to take over the create, update,
Expand Down Expand Up @@ -150,6 +152,13 @@ type PluginConfig interface {
Update(context.Context, *v1.PluginConfig) (*v1.PluginConfig, error)
}

type UpstreamServiceRelation interface {
Get(context.Context, string) (*v1.UpstreamServiceRelation, error)
List(context.Context) ([]*v1.UpstreamServiceRelation, error)
Delete(context.Context, *v1.UpstreamServiceRelation) error
Create(context.Context, *v1.UpstreamServiceRelation) error
}

type apisix struct {
mu sync.RWMutex
nonExistentCluster Cluster
Expand Down
8 changes: 8 additions & 0 deletions pkg/apisix/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ type Cache interface {
// InsertPluginConfig adds or updates plugin_config to cache.
InsertPluginConfig(*v1.PluginConfig) error

InsertUpstreamServiceRelation(*v1.UpstreamServiceRelation) error

// GetRoute finds the route from cache according to the primary index (id).
GetRoute(string) (*v1.Route, error)
// GetSSL finds the ssl from cache according to the primary index (id).
Expand All @@ -57,6 +59,8 @@ type Cache interface {
// GetPluginConfig finds the plugin_config from cache according to the primary index (id).
GetPluginConfig(string) (*v1.PluginConfig, error)

GetUpstreamServiceRelation(string) (*v1.UpstreamServiceRelation, error)

// ListRoutes lists all routes in cache.
ListRoutes() ([]*v1.Route, error)
// ListSSL lists all ssl objects in cache.
Expand All @@ -74,6 +78,8 @@ type Cache interface {
// ListPluginConfigs lists all plugin_config in cache.
ListPluginConfigs() ([]*v1.PluginConfig, error)

ListUpstreamServiceRelation() ([]*v1.UpstreamServiceRelation, error)

// DeleteRoute deletes the specified route in cache.
DeleteRoute(*v1.Route) error
// DeleteSSL deletes the specified ssl in cache.
Expand All @@ -90,4 +96,6 @@ type Cache interface {
DeleteSchema(*v1.Schema) error
// DeletePluginConfig deletes the specified plugin_config in cache.
DeletePluginConfig(*v1.PluginConfig) error

DeleteUpstreamServiceRelation(*v1.UpstreamServiceRelation) error
}
28 changes: 28 additions & 0 deletions pkg/apisix/cache/memdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,10 @@ func (c *dbCache) InsertPluginConfig(pc *v1.PluginConfig) error {
return c.insert("plugin_config", pc.DeepCopy())
}

func (c *dbCache) InsertUpstreamServiceRelation(us *v1.UpstreamServiceRelation) error {
return c.insert("upstream_service", us.DeepCopy())
}

func (c *dbCache) insert(table string, obj interface{}) error {
txn := c.db.Txn(true)
defer txn.Abort()
Expand Down Expand Up @@ -152,6 +156,14 @@ func (c *dbCache) GetPluginConfig(name string) (*v1.PluginConfig, error) {
return obj.(*v1.PluginConfig).DeepCopy(), nil
}

func (c *dbCache) GetUpstreamServiceRelation(serviceName string) (*v1.UpstreamServiceRelation, error) {
obj, err := c.get("upstream_service", serviceName)
if err != nil {
return nil, err
}
return obj.(*v1.UpstreamServiceRelation).DeepCopy(), nil
}

func (c *dbCache) get(table, id string) (interface{}, error) {
txn := c.db.Txn(false)
defer txn.Abort()
Expand Down Expand Up @@ -264,6 +276,18 @@ func (c *dbCache) ListPluginConfigs() ([]*v1.PluginConfig, error) {
return pluginConfigs, nil
}

func (c *dbCache) ListUpstreamServiceRelation() ([]*v1.UpstreamServiceRelation, error) {
raws, err := c.list("upstream_service")
if err != nil {
return nil, err
}
upstreamServices := make([]*v1.UpstreamServiceRelation, 0, len(raws))
for _, raw := range raws {
upstreamServices = append(upstreamServices, raw.(*v1.UpstreamServiceRelation).DeepCopy())
}
return upstreamServices, nil
}

func (c *dbCache) list(table string) ([]interface{}, error) {
txn := c.db.Txn(false)
defer txn.Abort()
Expand Down Expand Up @@ -316,6 +340,10 @@ func (c *dbCache) DeletePluginConfig(pc *v1.PluginConfig) error {
return c.delete("plugin_config", pc)
}

func (c *dbCache) DeleteUpstreamServiceRelation(us *v1.UpstreamServiceRelation) error {
return c.delete("upstream_service", us)
}

func (c *dbCache) delete(table string, obj interface{}) error {
txn := c.db.Txn(true)
defer txn.Abort()
Expand Down
43 changes: 43 additions & 0 deletions pkg/apisix/cache/memdb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -462,3 +462,46 @@ func TestMemDBCachePluginConfig(t *testing.T) {
}
assert.Error(t, ErrNotFound, c.DeletePluginConfig(pc4))
}

func TestMemDBCacheUpstreamServiceRelation(t *testing.T) {
c, err := NewMemDBCache()
assert.Nil(t, err, "NewMemDBCache")

us1 := &v1.UpstreamServiceRelation{
ServiceName: "1",
}
assert.Nil(t, c.InsertUpstreamServiceRelation(us1), "inserting upstream_service 1")

us, err := c.GetUpstreamServiceRelation("1")
assert.Nil(t, err)
assert.Equal(t, us1, us)

us2 := &v1.UpstreamServiceRelation{
ServiceName: "2",
}
assert.Nil(t, c.InsertUpstreamServiceRelation(us2), "inserting upstream_service 2")

us, err = c.GetUpstreamServiceRelation("2")
assert.Nil(t, err)
assert.Equal(t, us2, us)

us3 := &v1.UpstreamServiceRelation{
ServiceName: "httpbin",
UpstreamName: "upstream",
}
assert.Nil(t, c.InsertUpstreamServiceRelation(us3), "inserting upstream_service 3")

us, err = c.GetUpstreamServiceRelation("httpbin")
assert.Nil(t, err)
assert.Equal(t, us3, us)

uss, err := c.ListUpstreamServiceRelation()
assert.Nil(t, err)
assert.Len(t, uss, 3)

err = c.DeleteUpstreamServiceRelation(us)
assert.Nil(t, err)
uss, err = c.ListUpstreamServiceRelation()
assert.Nil(t, err)
assert.Len(t, uss, 2)
}
10 changes: 10 additions & 0 deletions pkg/apisix/cache/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,16 @@ var (
},
},
},
"upstream_service": {
Name: "upstream_service",
Indexes: map[string]*memdb.IndexSchema{
"id": {
Name: "id",
Unique: true,
Indexer: &memdb.StringFieldIndex{Field: "ServiceName"},
},
},
},
},
}
)
44 changes: 25 additions & 19 deletions pkg/apisix/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,25 +84,26 @@ type ClusterOptions struct {
}

type cluster struct {
name string
baseURL string
baseURLHost string
adminKey string
cli *http.Client
cacheState int32
cache cache.Cache
cacheSynced chan struct{}
cacheSyncErr error
route Route
upstream Upstream
ssl SSL
streamRoute StreamRoute
globalRules GlobalRule
consumer Consumer
plugin Plugin
schema Schema
pluginConfig PluginConfig
metricsCollector metrics.Collector
name string
baseURL string
baseURLHost string
adminKey string
cli *http.Client
cacheState int32
cache cache.Cache
cacheSynced chan struct{}
cacheSyncErr error
route Route
upstream Upstream
ssl SSL
streamRoute StreamRoute
globalRules GlobalRule
consumer Consumer
plugin Plugin
schema Schema
pluginConfig PluginConfig
metricsCollector metrics.Collector
upstreamServiceRelation UpstreamServiceRelation
}

func newCluster(ctx context.Context, o *ClusterOptions) (Cluster, error) {
Expand Down Expand Up @@ -144,6 +145,7 @@ func newCluster(ctx context.Context, o *ClusterOptions) (Cluster, error) {
c.plugin = newPluginClient(c)
c.schema = newSchemaClient(c)
c.pluginConfig = newPluginConfigClient(c)
c.upstreamServiceRelation = newUpstreamServiceRelation(c)

c.cache, err = cache.NewMemDBCache()
if err != nil {
Expand Down Expand Up @@ -460,6 +462,10 @@ func (c *cluster) Schema() Schema {
return c.schema
}

func (c *cluster) UpstreamServiceRelation() UpstreamServiceRelation {
return c.upstreamServiceRelation
}

// HealthCheck implements Cluster.HealthCheck method.
func (c *cluster) HealthCheck(ctx context.Context) (err error) {
if c.cacheSyncErr != nil {
Expand Down
112 changes: 71 additions & 41 deletions pkg/apisix/nonexistentclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,29 +29,31 @@ type nonExistentCluster struct {
func newNonExistentCluster() *nonExistentCluster {
return &nonExistentCluster{
embedDummyResourceImplementer{
route: &dummyRoute{},
ssl: &dummySSL{},
upstream: &dummyUpstream{},
streamRoute: &dummyStreamRoute{},
globalRule: &dummyGlobalRule{},
consumer: &dummyConsumer{},
plugin: &dummyPlugin{},
schema: &dummySchema{},
pluginConfig: &dummyPluginConfig{},
route: &dummyRoute{},
ssl: &dummySSL{},
upstream: &dummyUpstream{},
streamRoute: &dummyStreamRoute{},
globalRule: &dummyGlobalRule{},
consumer: &dummyConsumer{},
plugin: &dummyPlugin{},
schema: &dummySchema{},
pluginConfig: &dummyPluginConfig{},
upstreamServiceRelation: &dummyUpstreamServiceRelation{},
},
}
}

type embedDummyResourceImplementer struct {
route Route
ssl SSL
upstream Upstream
streamRoute StreamRoute
globalRule GlobalRule
consumer Consumer
plugin Plugin
schema Schema
pluginConfig PluginConfig
route Route
ssl SSL
upstream Upstream
streamRoute StreamRoute
globalRule GlobalRule
consumer Consumer
plugin Plugin
schema Schema
pluginConfig PluginConfig
upstreamServiceRelation UpstreamServiceRelation
}

type dummyRoute struct{}
Expand Down Expand Up @@ -240,6 +242,22 @@ func (f *dummyPluginConfig) Update(_ context.Context, _ *v1.PluginConfig) (*v1.P
return nil, ErrClusterNotExist
}

type dummyUpstreamServiceRelation struct {
}

func (f *dummyUpstreamServiceRelation) Get(_ context.Context, _ string) (*v1.UpstreamServiceRelation, error) {
return nil, ErrClusterNotExist
}
func (f *dummyUpstreamServiceRelation) Create(_ context.Context, _ *v1.UpstreamServiceRelation) error {
return ErrClusterNotExist
}
func (f *dummyUpstreamServiceRelation) List(_ context.Context) ([]*v1.UpstreamServiceRelation, error) {
return nil, ErrClusterNotExist
}
func (f *dummyUpstreamServiceRelation) Delete(_ context.Context, _ *v1.UpstreamServiceRelation) error {
return ErrClusterNotExist
}

func (nc *nonExistentCluster) Route() Route {
return nc.route
}
Expand Down Expand Up @@ -276,6 +294,10 @@ func (nc *nonExistentCluster) Schema() Schema {
return nc.schema
}

func (nc *nonExistentCluster) UpstreamServiceRelation() UpstreamServiceRelation {
return nc.upstreamServiceRelation
}

func (nc *nonExistentCluster) HasSynced(_ context.Context) error {
return nil
}
Expand All @@ -292,24 +314,28 @@ type dummyCache struct{}

var _ cache.Cache = &dummyCache{}

func (c *dummyCache) InsertRoute(_ *v1.Route) error { return nil }
func (c *dummyCache) InsertSSL(_ *v1.Ssl) error { return nil }
func (c *dummyCache) InsertUpstream(_ *v1.Upstream) error { return nil }
func (c *dummyCache) InsertStreamRoute(_ *v1.StreamRoute) error { return nil }
func (c *dummyCache) InsertGlobalRule(_ *v1.GlobalRule) error { return nil }
func (c *dummyCache) InsertConsumer(_ *v1.Consumer) error { return nil }
func (c *dummyCache) InsertSchema(_ *v1.Schema) error { return nil }
func (c *dummyCache) InsertPluginConfig(_ *v1.PluginConfig) error { return nil }
func (c *dummyCache) GetRoute(_ string) (*v1.Route, error) { return nil, cache.ErrNotFound }
func (c *dummyCache) GetSSL(_ string) (*v1.Ssl, error) { return nil, cache.ErrNotFound }
func (c *dummyCache) GetUpstream(_ string) (*v1.Upstream, error) { return nil, cache.ErrNotFound }
func (c *dummyCache) GetStreamRoute(_ string) (*v1.StreamRoute, error) { return nil, cache.ErrNotFound }
func (c *dummyCache) GetGlobalRule(_ string) (*v1.GlobalRule, error) { return nil, cache.ErrNotFound }
func (c *dummyCache) GetConsumer(_ string) (*v1.Consumer, error) { return nil, cache.ErrNotFound }
func (c *dummyCache) GetSchema(_ string) (*v1.Schema, error) { return nil, cache.ErrNotFound }
func (c *dummyCache) InsertRoute(_ *v1.Route) error { return nil }
func (c *dummyCache) InsertSSL(_ *v1.Ssl) error { return nil }
func (c *dummyCache) InsertUpstream(_ *v1.Upstream) error { return nil }
func (c *dummyCache) InsertStreamRoute(_ *v1.StreamRoute) error { return nil }
func (c *dummyCache) InsertGlobalRule(_ *v1.GlobalRule) error { return nil }
func (c *dummyCache) InsertConsumer(_ *v1.Consumer) error { return nil }
func (c *dummyCache) InsertSchema(_ *v1.Schema) error { return nil }
func (c *dummyCache) InsertPluginConfig(_ *v1.PluginConfig) error { return nil }
func (c *dummyCache) InsertUpstreamServiceRelation(_ *v1.UpstreamServiceRelation) error { return nil }
func (c *dummyCache) GetRoute(_ string) (*v1.Route, error) { return nil, cache.ErrNotFound }
func (c *dummyCache) GetSSL(_ string) (*v1.Ssl, error) { return nil, cache.ErrNotFound }
func (c *dummyCache) GetUpstream(_ string) (*v1.Upstream, error) { return nil, cache.ErrNotFound }
func (c *dummyCache) GetStreamRoute(_ string) (*v1.StreamRoute, error) { return nil, cache.ErrNotFound }
func (c *dummyCache) GetGlobalRule(_ string) (*v1.GlobalRule, error) { return nil, cache.ErrNotFound }
func (c *dummyCache) GetConsumer(_ string) (*v1.Consumer, error) { return nil, cache.ErrNotFound }
func (c *dummyCache) GetSchema(_ string) (*v1.Schema, error) { return nil, cache.ErrNotFound }
func (c *dummyCache) GetPluginConfig(_ string) (*v1.PluginConfig, error) {
return nil, cache.ErrNotFound
}
func (c *dummyCache) GetUpstreamServiceRelation(_ string) (*v1.UpstreamServiceRelation, error) {
return nil, cache.ErrNotFound
}
func (c *dummyCache) ListRoutes() ([]*v1.Route, error) { return nil, nil }
func (c *dummyCache) ListSSL() ([]*v1.Ssl, error) { return nil, nil }
func (c *dummyCache) ListUpstreams() ([]*v1.Upstream, error) { return nil, nil }
Expand All @@ -318,11 +344,15 @@ func (c *dummyCache) ListGlobalRules() ([]*v1.GlobalRule, error) { return ni
func (c *dummyCache) ListConsumers() ([]*v1.Consumer, error) { return nil, nil }
func (c *dummyCache) ListSchema() ([]*v1.Schema, error) { return nil, nil }
func (c *dummyCache) ListPluginConfigs() ([]*v1.PluginConfig, error) { return nil, nil }
func (c *dummyCache) DeleteRoute(_ *v1.Route) error { return nil }
func (c *dummyCache) DeleteSSL(_ *v1.Ssl) error { return nil }
func (c *dummyCache) DeleteUpstream(_ *v1.Upstream) error { return nil }
func (c *dummyCache) DeleteStreamRoute(_ *v1.StreamRoute) error { return nil }
func (c *dummyCache) DeleteGlobalRule(_ *v1.GlobalRule) error { return nil }
func (c *dummyCache) DeleteConsumer(_ *v1.Consumer) error { return nil }
func (c *dummyCache) DeleteSchema(_ *v1.Schema) error { return nil }
func (c *dummyCache) DeletePluginConfig(_ *v1.PluginConfig) error { return nil }
func (c *dummyCache) ListUpstreamServiceRelation() ([]*v1.UpstreamServiceRelation, error) {
return nil, nil
}
func (c *dummyCache) DeleteRoute(_ *v1.Route) error { return nil }
func (c *dummyCache) DeleteSSL(_ *v1.Ssl) error { return nil }
func (c *dummyCache) DeleteUpstream(_ *v1.Upstream) error { return nil }
func (c *dummyCache) DeleteStreamRoute(_ *v1.StreamRoute) error { return nil }
func (c *dummyCache) DeleteGlobalRule(_ *v1.GlobalRule) error { return nil }
func (c *dummyCache) DeleteConsumer(_ *v1.Consumer) error { return nil }
func (c *dummyCache) DeleteSchema(_ *v1.Schema) error { return nil }
func (c *dummyCache) DeletePluginConfig(_ *v1.PluginConfig) error { return nil }
func (c *dummyCache) DeleteUpstreamServiceRelation(_ *v1.UpstreamServiceRelation) error { return nil }
Loading

0 comments on commit 9e0c658

Please sign in to comment.