From cd9d74c49baa120f119313813fb5ceeffec69db4 Mon Sep 17 00:00:00 2001 From: David Eads Date: Wed, 19 Aug 2020 11:29:07 -0400 Subject: [PATCH] re-establish etcd connection if required for etcd test --- test/extended/etcd/etcd_storage_path.go | 24 ++++- test/extended/etcd/etcd_test_runner.go | 137 +++++++++++++++--------- 2 files changed, 107 insertions(+), 54 deletions(-) diff --git a/test/extended/etcd/etcd_storage_path.go b/test/extended/etcd/etcd_storage_path.go index d8d6ba9f4601..e938cccc2e83 100644 --- a/test/extended/etcd/etcd_storage_path.go +++ b/test/extended/etcd/etcd_storage_path.go @@ -218,7 +218,7 @@ func (t *helperT) done() { // It will start failing when a new type is added to ensure that all future types are added to this test. // It will also fail when a type gets moved to a different location. Be very careful in this situation because // it essentially means that you will be break old clusters unless you create some migration path for the old data. -func testEtcd3StoragePath(t g.GinkgoTInterface, kubeConfig *restclient.Config, etcdClient3 etcdv3.KV) { +func testEtcd3StoragePath(t g.GinkgoTInterface, kubeConfig *restclient.Config, etcdClient3Fn func() (etcdv3.KV, error)) { defer g.GinkgoRecover() // make Errorf fail the test as expected but continue until the end so we can see all failures @@ -434,9 +434,25 @@ func testEtcd3StoragePath(t g.GinkgoTInterface, kubeConfig *restclient.Config, e } } - output, err := getFromEtcd(etcdClient3, testData.ExpectedEtcdPath) - if err != nil { - t.Errorf("failed to get from etcd for %v: %#v", gvk, err) + // retry a few times in case the port-forward has to get re-established. + var output *metaObject + var lastErr error + for i := 0; i < 5; i++ { + etcdClient3, err := etcdClient3Fn() + if err != nil { + lastErr = err + continue + } + output, err = getFromEtcd(etcdClient3, testData.ExpectedEtcdPath) + if err != nil { + lastErr = err + continue + } + lastErr = nil + break + } + if lastErr != nil { + t.Errorf("failed to get from etcd for %v: %#v", gvk, lastErr) return } diff --git a/test/extended/etcd/etcd_test_runner.go b/test/extended/etcd/etcd_test_runner.go index d26f8dda4c9b..684a7cd53db1 100644 --- a/test/extended/etcd/etcd_test_runner.go +++ b/test/extended/etcd/etcd_test_runner.go @@ -10,16 +10,16 @@ import ( g "github.com/onsi/ginkgo" o "github.com/onsi/gomega" + exutil "github.com/openshift/origin/test/extended/util" + "github.com/openshift/origin/test/extended/util/ibmcloud" "go.etcd.io/etcd/clientv3" - + etcdv3 "go.etcd.io/etcd/clientv3" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes" restclient "k8s.io/client-go/rest" e2e "k8s.io/kubernetes/test/e2e/framework" e2eskipper "k8s.io/kubernetes/test/e2e/framework/skipper" - - exutil "github.com/openshift/origin/test/extended/util" - "github.com/openshift/origin/test/extended/util/ibmcloud" ) var _ = g.Describe("[sig-api-machinery] API data in etcd", func() { @@ -32,51 +32,88 @@ var _ = g.Describe("[sig-api-machinery] API data in etcd", func() { e2eskipper.Skipf("IBM ROKS clusters run etcd outside of the cluster. Etcd cannot be accessed directly from within the cluster") } - ctx, cancel := context.WithCancel(context.Background()) - cmd := exec.CommandContext(ctx, "oc", "port-forward", "service/etcd", ":2379", "-n", "openshift-etcd") - - defer func() { - cancel() - _ = cmd.Wait() // wait to clean up resources but ignore returned error since cancel kills the process - }() - - stdOut, err := cmd.StdoutPipe() - o.Expect(err).NotTo(o.HaveOccurred()) - - o.Expect(cmd.Start()).NotTo(o.HaveOccurred()) - - scanner := bufio.NewScanner(stdOut) - scan := scanner.Scan() - o.Expect(scanner.Err()).NotTo(o.HaveOccurred()) - o.Expect(scan).To(o.BeTrue()) - output := scanner.Text() - - port := strings.TrimSuffix(strings.TrimPrefix(output, "Forwarding from 127.0.0.1:"), " -> 2379") - _, err = strconv.Atoi(port) - o.Expect(err).NotTo(o.HaveOccurred(), "port forward output not in expected format: %s", output) - - coreV1 := oc.AdminKubeClient().CoreV1() - etcdConfigMap, err := coreV1.ConfigMaps("openshift-config").Get(context.Background(), "etcd-ca-bundle", metav1.GetOptions{}) - o.Expect(err).NotTo(o.HaveOccurred()) - etcdSecret, err := coreV1.Secrets("openshift-config").Get(context.Background(), "etcd-client", metav1.GetOptions{}) - o.Expect(err).NotTo(o.HaveOccurred()) - - tlsConfig, err := restclient.TLSConfigFor(&restclient.Config{ - TLSClientConfig: restclient.TLSClientConfig{ - CertData: etcdSecret.Data[corev1.TLSCertKey], - KeyData: etcdSecret.Data[corev1.TLSPrivateKeyKey], - CAData: []byte(etcdConfigMap.Data["ca-bundle.crt"]), - }, - }) - o.Expect(err).NotTo(o.HaveOccurred()) - - etcdClient3, err := clientv3.New(clientv3.Config{ - Endpoints: []string{"https://127.0.0.1:" + port}, - DialTimeout: 30 * time.Second, - TLS: tlsConfig, - }) - o.Expect(err).NotTo(o.HaveOccurred()) - - testEtcd3StoragePath(g.GinkgoT(), oc.AdminConfig(), etcdClient3.KV) + etcdClientCreater := &etcdPortForwardClient{kubeClient: oc.AdminKubeClient()} + defer etcdClientCreater.closeAll() + testEtcd3StoragePath(g.GinkgoT(), oc.AdminConfig(), etcdClientCreater.getEtcdClient) }) }) + +type etcdPortForwardClient struct { + kubeClient kubernetes.Interface + currCancel context.CancelFunc + currCmd *exec.Cmd + etcdClient *clientv3.Client +} + +func (e *etcdPortForwardClient) getEtcdClient() (etcdv3.KV, error) { + if e.etcdClient == nil { + return e.newEtcdClient() + } + + // if the client isn't good + _, err := e.etcdClient.MemberList(context.TODO()) + if err != nil { + e.closeAll() + return e.newEtcdClient() + } + + return e.etcdClient, nil +} + +func (e *etcdPortForwardClient) newEtcdClient() (etcdv3.KV, error) { + ctx, cancel := context.WithCancel(context.Background()) + e.currCancel = cancel + e.currCmd = exec.CommandContext(ctx, "oc", "port-forward", "service/etcd", ":2379", "-n", "openshift-etcd") + + stdOut, err := e.currCmd.StdoutPipe() + o.Expect(err).NotTo(o.HaveOccurred()) + + o.Expect(e.currCmd.Start()).NotTo(o.HaveOccurred()) + + scanner := bufio.NewScanner(stdOut) + scan := scanner.Scan() + o.Expect(scanner.Err()).NotTo(o.HaveOccurred()) + o.Expect(scan).To(o.BeTrue()) + output := scanner.Text() + + port := strings.TrimSuffix(strings.TrimPrefix(output, "Forwarding from 127.0.0.1:"), " -> 2379") + _, err = strconv.Atoi(port) + o.Expect(err).NotTo(o.HaveOccurred(), "port forward output not in expected format: %s", output) + + etcdConfigMap, err := e.kubeClient.CoreV1().ConfigMaps("openshift-config").Get(context.Background(), "etcd-ca-bundle", metav1.GetOptions{}) + o.Expect(err).NotTo(o.HaveOccurred()) + etcdSecret, err := e.kubeClient.CoreV1().Secrets("openshift-config").Get(context.Background(), "etcd-client", metav1.GetOptions{}) + o.Expect(err).NotTo(o.HaveOccurred()) + + tlsConfig, err := restclient.TLSConfigFor(&restclient.Config{ + TLSClientConfig: restclient.TLSClientConfig{ + CertData: etcdSecret.Data[corev1.TLSCertKey], + KeyData: etcdSecret.Data[corev1.TLSPrivateKeyKey], + CAData: []byte(etcdConfigMap.Data["ca-bundle.crt"]), + }, + }) + o.Expect(err).NotTo(o.HaveOccurred()) + + etcdClient3, err := clientv3.New(clientv3.Config{ + Endpoints: []string{"https://127.0.0.1:" + port}, + DialTimeout: 5 * time.Second, + TLS: tlsConfig, + }) + if err != nil { + return nil, err + } + + e.etcdClient = etcdClient3 + return e.etcdClient, nil +} + +func (e *etcdPortForwardClient) closeAll() { + if e.currCancel == nil { + return + } + e.currCancel() + _ = e.currCmd.Wait() // wait to clean up resources but ignore returned error since cancel kills the process + e.currCancel = nil + e.currCmd = nil + e.etcdClient = nil +}