Skip to content

Commit

Permalink
Merge pull request #25423 from deads2k/etcd-retry
Browse files Browse the repository at this point in the history
bug 1870247: re-establish etcd connection if required for etcd test
  • Loading branch information
openshift-merge-robot authored Aug 19, 2020
2 parents a507fa6 + cd9d74c commit b014930
Show file tree
Hide file tree
Showing 2 changed files with 107 additions and 54 deletions.
24 changes: 20 additions & 4 deletions test/extended/etcd/etcd_storage_path.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ func (t *helperT) done() {
// It will start failing when a new type is added to ensure that all future types are added to this test.
// It will also fail when a type gets moved to a different location. Be very careful in this situation because
// it essentially means that you will be break old clusters unless you create some migration path for the old data.
func testEtcd3StoragePath(t g.GinkgoTInterface, kubeConfig *restclient.Config, etcdClient3 etcdv3.KV) {
func testEtcd3StoragePath(t g.GinkgoTInterface, kubeConfig *restclient.Config, etcdClient3Fn func() (etcdv3.KV, error)) {
defer g.GinkgoRecover()

// make Errorf fail the test as expected but continue until the end so we can see all failures
Expand Down Expand Up @@ -434,9 +434,25 @@ func testEtcd3StoragePath(t g.GinkgoTInterface, kubeConfig *restclient.Config, e
}
}

output, err := getFromEtcd(etcdClient3, testData.ExpectedEtcdPath)
if err != nil {
t.Errorf("failed to get from etcd for %v: %#v", gvk, err)
// retry a few times in case the port-forward has to get re-established.
var output *metaObject
var lastErr error
for i := 0; i < 5; i++ {
etcdClient3, err := etcdClient3Fn()
if err != nil {
lastErr = err
continue
}
output, err = getFromEtcd(etcdClient3, testData.ExpectedEtcdPath)
if err != nil {
lastErr = err
continue
}
lastErr = nil
break
}
if lastErr != nil {
t.Errorf("failed to get from etcd for %v: %#v", gvk, lastErr)
return
}

Expand Down
137 changes: 87 additions & 50 deletions test/extended/etcd/etcd_test_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,16 @@ import (

g "github.com/onsi/ginkgo"
o "github.com/onsi/gomega"
exutil "github.com/openshift/origin/test/extended/util"
"github.com/openshift/origin/test/extended/util/ibmcloud"
"go.etcd.io/etcd/clientv3"

etcdv3 "go.etcd.io/etcd/clientv3"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
restclient "k8s.io/client-go/rest"
e2e "k8s.io/kubernetes/test/e2e/framework"
e2eskipper "k8s.io/kubernetes/test/e2e/framework/skipper"

exutil "github.com/openshift/origin/test/extended/util"
"github.com/openshift/origin/test/extended/util/ibmcloud"
)

var _ = g.Describe("[sig-api-machinery] API data in etcd", func() {
Expand All @@ -32,51 +32,88 @@ var _ = g.Describe("[sig-api-machinery] API data in etcd", func() {
e2eskipper.Skipf("IBM ROKS clusters run etcd outside of the cluster. Etcd cannot be accessed directly from within the cluster")
}

ctx, cancel := context.WithCancel(context.Background())
cmd := exec.CommandContext(ctx, "oc", "port-forward", "service/etcd", ":2379", "-n", "openshift-etcd")

defer func() {
cancel()
_ = cmd.Wait() // wait to clean up resources but ignore returned error since cancel kills the process
}()

stdOut, err := cmd.StdoutPipe()
o.Expect(err).NotTo(o.HaveOccurred())

o.Expect(cmd.Start()).NotTo(o.HaveOccurred())

scanner := bufio.NewScanner(stdOut)
scan := scanner.Scan()
o.Expect(scanner.Err()).NotTo(o.HaveOccurred())
o.Expect(scan).To(o.BeTrue())
output := scanner.Text()

port := strings.TrimSuffix(strings.TrimPrefix(output, "Forwarding from 127.0.0.1:"), " -> 2379")
_, err = strconv.Atoi(port)
o.Expect(err).NotTo(o.HaveOccurred(), "port forward output not in expected format: %s", output)

coreV1 := oc.AdminKubeClient().CoreV1()
etcdConfigMap, err := coreV1.ConfigMaps("openshift-config").Get(context.Background(), "etcd-ca-bundle", metav1.GetOptions{})
o.Expect(err).NotTo(o.HaveOccurred())
etcdSecret, err := coreV1.Secrets("openshift-config").Get(context.Background(), "etcd-client", metav1.GetOptions{})
o.Expect(err).NotTo(o.HaveOccurred())

tlsConfig, err := restclient.TLSConfigFor(&restclient.Config{
TLSClientConfig: restclient.TLSClientConfig{
CertData: etcdSecret.Data[corev1.TLSCertKey],
KeyData: etcdSecret.Data[corev1.TLSPrivateKeyKey],
CAData: []byte(etcdConfigMap.Data["ca-bundle.crt"]),
},
})
o.Expect(err).NotTo(o.HaveOccurred())

etcdClient3, err := clientv3.New(clientv3.Config{
Endpoints: []string{"https://127.0.0.1:" + port},
DialTimeout: 30 * time.Second,
TLS: tlsConfig,
})
o.Expect(err).NotTo(o.HaveOccurred())

testEtcd3StoragePath(g.GinkgoT(), oc.AdminConfig(), etcdClient3.KV)
etcdClientCreater := &etcdPortForwardClient{kubeClient: oc.AdminKubeClient()}
defer etcdClientCreater.closeAll()
testEtcd3StoragePath(g.GinkgoT(), oc.AdminConfig(), etcdClientCreater.getEtcdClient)
})
})

type etcdPortForwardClient struct {
kubeClient kubernetes.Interface
currCancel context.CancelFunc
currCmd *exec.Cmd
etcdClient *clientv3.Client
}

func (e *etcdPortForwardClient) getEtcdClient() (etcdv3.KV, error) {
if e.etcdClient == nil {
return e.newEtcdClient()
}

// if the client isn't good
_, err := e.etcdClient.MemberList(context.TODO())
if err != nil {
e.closeAll()
return e.newEtcdClient()
}

return e.etcdClient, nil
}

func (e *etcdPortForwardClient) newEtcdClient() (etcdv3.KV, error) {
ctx, cancel := context.WithCancel(context.Background())
e.currCancel = cancel
e.currCmd = exec.CommandContext(ctx, "oc", "port-forward", "service/etcd", ":2379", "-n", "openshift-etcd")

stdOut, err := e.currCmd.StdoutPipe()
o.Expect(err).NotTo(o.HaveOccurred())

o.Expect(e.currCmd.Start()).NotTo(o.HaveOccurred())

scanner := bufio.NewScanner(stdOut)
scan := scanner.Scan()
o.Expect(scanner.Err()).NotTo(o.HaveOccurred())
o.Expect(scan).To(o.BeTrue())
output := scanner.Text()

port := strings.TrimSuffix(strings.TrimPrefix(output, "Forwarding from 127.0.0.1:"), " -> 2379")
_, err = strconv.Atoi(port)
o.Expect(err).NotTo(o.HaveOccurred(), "port forward output not in expected format: %s", output)

etcdConfigMap, err := e.kubeClient.CoreV1().ConfigMaps("openshift-config").Get(context.Background(), "etcd-ca-bundle", metav1.GetOptions{})
o.Expect(err).NotTo(o.HaveOccurred())
etcdSecret, err := e.kubeClient.CoreV1().Secrets("openshift-config").Get(context.Background(), "etcd-client", metav1.GetOptions{})
o.Expect(err).NotTo(o.HaveOccurred())

tlsConfig, err := restclient.TLSConfigFor(&restclient.Config{
TLSClientConfig: restclient.TLSClientConfig{
CertData: etcdSecret.Data[corev1.TLSCertKey],
KeyData: etcdSecret.Data[corev1.TLSPrivateKeyKey],
CAData: []byte(etcdConfigMap.Data["ca-bundle.crt"]),
},
})
o.Expect(err).NotTo(o.HaveOccurred())

etcdClient3, err := clientv3.New(clientv3.Config{
Endpoints: []string{"https://127.0.0.1:" + port},
DialTimeout: 5 * time.Second,
TLS: tlsConfig,
})
if err != nil {
return nil, err
}

e.etcdClient = etcdClient3
return e.etcdClient, nil
}

func (e *etcdPortForwardClient) closeAll() {
if e.currCancel == nil {
return
}
e.currCancel()
_ = e.currCmd.Wait() // wait to clean up resources but ignore returned error since cancel kills the process
e.currCancel = nil
e.currCmd = nil
e.etcdClient = nil
}

0 comments on commit b014930

Please sign in to comment.