Skip to content

Commit

Permalink
fix: delete the cluster object when give up the leadership (#774)
Browse files Browse the repository at this point in the history
  • Loading branch information
tokers authored Dec 24, 2021
1 parent 970df2b commit 819b003
Show file tree
Hide file tree
Showing 7 changed files with 160 additions and 4 deletions.
11 changes: 11 additions & 0 deletions pkg/apisix/apisix.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ type APISIX interface {
UpdateCluster(context.Context, *ClusterOptions) error
// ListClusters lists all APISIX clusters.
ListClusters() []Cluster
// DeleteCluster deletes the target APISIX cluster by its name.
DeleteCluster(string)
}

// Cluster defines specific operations that can be applied in an APISIX
Expand Down Expand Up @@ -214,3 +216,12 @@ func (c *apisix) UpdateCluster(ctx context.Context, co *ClusterOptions) error {
c.clusters[co.Name] = cluster
return nil
}

func (c *apisix) DeleteCluster(name string) {
c.mu.Lock()
defer c.mu.Unlock()

// Don't have to close or free some resources in that cluster, so
// just delete its index.
delete(c.clusters, name)
}
12 changes: 9 additions & 3 deletions pkg/apisix/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,10 +179,16 @@ func (c *cluster) syncCache(ctx context.Context) {
Steps: 5,
}
var lastSyncErr error
err := wait.ExponentialBackoff(backoff, func() (done bool, _ error) {
err := wait.ExponentialBackoff(backoff, func() (done bool, err error) {
// impossibly return: false, nil
// so can safe used
done, lastSyncErr = c.syncCacheOnce(ctx)
select {
case <-ctx.Done():
err = context.Canceled
default:
break
}
return
})
if err != nil {
Expand All @@ -199,7 +205,7 @@ func (c *cluster) syncCache(ctx context.Context) {
func (c *cluster) syncCacheOnce(ctx context.Context) (bool, error) {
routes, err := c.route.List(ctx)
if err != nil {
log.Errorf("failed to list route in APISIX: %s", err)
log.Errorf("failed to list routes in APISIX: %s", err)
return false, err
}
upstreams, err := c.upstream.List(ctx)
Expand Down Expand Up @@ -329,7 +335,7 @@ func (c *cluster) syncSchema(ctx context.Context, interval time.Duration) {

for {
if err := c.syncSchemaOnce(ctx); err != nil {
log.Warnf("failed to sync schema: %s", err)
log.Errorf("failed to sync schema: %s", err)
c.metricsCollector.IncrSyncOperation("schema", "failure")
}

Expand Down
9 changes: 9 additions & 0 deletions pkg/ingress/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -353,6 +353,11 @@ func (c *Controller) Run(stop chan struct{}) error {
zap.String("namespace", c.namespace),
zap.String("pod", c.name),
)
c.MetricsCollector.ResetLeader(false)
// delete the old APISIX cluster, so that the cached state
// like synchronization won't be used next time the candidate
// becomes the leader again.
c.apisix.DeleteCluster(c.cfg.APISIX.DefaultClusterName)
}
},
OnStoppedLeading: func() {
Expand All @@ -361,6 +366,10 @@ func (c *Controller) Run(stop chan struct{}) error {
zap.String("pod", c.name),
)
c.MetricsCollector.ResetLeader(false)
// delete the old APISIX cluster, so that the cached state
// like synchronization won't be used next time the candidate
// becomes the leader again.
c.apisix.DeleteCluster(c.cfg.APISIX.DefaultClusterName)
},
},
// Set it to false as current leaderelection implementation will report
Expand Down
85 changes: 85 additions & 0 deletions test/e2e/chaos/chaos.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package chaos

import (
"fmt"
"net/http"

"github.com/onsi/ginkgo"
"github.com/stretchr/testify/assert"

"github.com/apache/apisix-ingress-controller/test/e2e/scaffold"
)

var _ = ginkgo.Describe("Chaos Testing", func() {
opts := &scaffold.Options{
Name: "default",
Kubeconfig: scaffold.GetKubeconfig(),
APISIXConfigPath: "testdata/apisix-gw-config.yaml",
IngressAPISIXReplicas: 1,
HTTPBinServicePort: 80,
APISIXRouteVersion: "apisix.apache.org/v2beta2",
}
s := scaffold.NewScaffold(opts)
ginkgo.Context("simulate apisix deployment restart", func() {
ginkgo.Specify("ingress controller can synchronize rules normally after apisix recovery", func() {
assert.Nil(ginkgo.GinkgoT(), s.EnsureNumApisixRoutesCreated(0), "checking number of upstreams")
backendSvc, backendSvcPort := s.DefaultHTTPBackend()
route1 := fmt.Sprintf(`
apiVersion: apisix.apache.org/v2beta2
kind: ApisixRoute
metadata:
name: httpbin-route1
spec:
http:
- name: route1
match:
hosts:
- httpbin.org
paths:
- /ip
backends:
- serviceName: %s
servicePort: %d
`, backendSvc, backendSvcPort[0])
assert.Nil(ginkgo.GinkgoT(), s.CreateResourceFromString(route1))
assert.Nil(ginkgo.GinkgoT(), s.EnsureNumApisixRoutesCreated(1), "checking number of routes")
s.RestartAPISIXDeploy()
route2 := fmt.Sprintf(`
apiVersion: apisix.apache.org/v2beta2
kind: ApisixRoute
metadata:
name: httpbin-route2
spec:
http:
- name: route2
match:
hosts:
- httpbin.org
paths:
- /get
backends:
- serviceName: %s
servicePort: %d
`, backendSvc, backendSvcPort[0])
assert.Nil(ginkgo.GinkgoT(), s.CreateResourceFromString(route2))
assert.Nil(ginkgo.GinkgoT(), s.EnsureNumApisixRoutesCreated(2), "checking number of routes")
s.NewAPISIXClient().GET("/ip").WithHeader("Host", "httpbin.org").Expect().Status(http.StatusOK)
s.NewAPISIXClient().GET("/get").WithHeader("Host", "httpbin.org").Expect().Status(http.StatusOK)
})
})

})
1 change: 1 addition & 0 deletions test/e2e/e2e.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package e2e

import (
_ "github.com/apache/apisix-ingress-controller/test/e2e/annotations"
_ "github.com/apache/apisix-ingress-controller/test/e2e/chaos"
_ "github.com/apache/apisix-ingress-controller/test/e2e/config"
_ "github.com/apache/apisix-ingress-controller/test/e2e/endpoints"
_ "github.com/apache/apisix-ingress-controller/test/e2e/features"
Expand Down
9 changes: 9 additions & 0 deletions test/e2e/scaffold/k8s.go
Original file line number Diff line number Diff line change
Expand Up @@ -474,6 +474,15 @@ func (s *Scaffold) newAPISIXTunnels() error {
return nil
}

func (s *Scaffold) shutdownApisixTunnel() {
s.apisixAdminTunnel.Close()
s.apisixHttpTunnel.Close()
s.apisixHttpsTunnel.Close()
s.apisixTCPTunnel.Close()
s.apisixUDPTunnel.Close()
s.apisixControlTunnel.Close()
}

// Namespace returns the current working namespace.
func (s *Scaffold) Namespace() string {
return s.kubectlOptions.Namespace
Expand Down
37 changes: 36 additions & 1 deletion test/e2e/scaffold/scaffold.go
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,23 @@ func (s *Scaffold) APISIXGatewayServiceEndpoint() string {
return s.apisixHttpTunnel.Endpoint()
}

// RestartAPISIXDeploy delete apisix pod and wait new pod be ready
func (s *Scaffold) RestartAPISIXDeploy() {
s.shutdownApisixTunnel()
pods, err := k8s.ListPodsE(s.t, s.kubectlOptions, metav1.ListOptions{
LabelSelector: "app=apisix-deployment-e2e-test",
})
assert.NoError(s.t, err, "list apisix pod")
for _, pod := range pods {
err = s.KillPod(pod.Name)
assert.NoError(s.t, err, "killing apisix pod")
}
err = s.waitAllAPISIXPodsAvailable()
assert.NoError(s.t, err, "waiting for new apisix instance ready")
err = s.newAPISIXTunnels()
assert.NoError(s.t, err, "renew apisix tunnels")
}

func (s *Scaffold) beforeEach() {
var err error
s.namespace = fmt.Sprintf("ingress-apisix-e2e-tests-%s-%d", s.opts.Name, time.Now().Nanosecond())
Expand Down Expand Up @@ -371,14 +388,32 @@ func (s *Scaffold) afterEach() {
assert.Nilf(ginkgo.GinkgoT(), err, "deleting namespace %s", s.namespace)

for _, f := range s.finializers {
f()
runWithRecover(f)
}

// Wait for a while to prevent the worker node being overwhelming
// (new cases will be run).
time.Sleep(3 * time.Second)
}

func runWithRecover(f func()) {
defer func() {
r := recover()
if r == nil {
return
}
err, ok := r.(error)
if ok {
// just ignore already closed channel
if strings.Contains(err.Error(), "close of closed channel") {
return
}
}
panic(r)
}()
f()
}

func (s *Scaffold) GetDeploymentLogs(name string) string {
cli, err := k8s.GetKubernetesClientE(s.t)
if err != nil {
Expand Down

0 comments on commit 819b003

Please sign in to comment.