diff --git a/pkg/portForward/portForward.go b/pkg/portForward/portForward.go index 7101fc2fb0f..e8826c513b4 100644 --- a/pkg/portForward/portForward.go +++ b/pkg/portForward/portForward.go @@ -1,19 +1,23 @@ package portForward import ( + "errors" "fmt" "io" "reflect" + "time" "github.com/devfile/api/v2/pkg/apis/workspaces/v1alpha2" "github.com/devfile/library/pkg/devfile/parser" parsercommon "github.com/devfile/library/pkg/devfile/parser/data/v2/common" + "k8s.io/apimachinery/pkg/util/runtime" "github.com/redhat-developer/odo/pkg/kclient" "github.com/redhat-developer/odo/pkg/libdevfile" "github.com/redhat-developer/odo/pkg/log" "github.com/redhat-developer/odo/pkg/state" "github.com/redhat-developer/odo/pkg/util" + "github.com/redhat-developer/odo/pkg/watch" ) var _ Client = (*PFClient)(nil) @@ -28,6 +32,11 @@ type PFClient struct { stopChan chan struct{} // finishedChan is written when the port forwarding is finished finishedChan chan struct{} + + originalErrorHandlers []func(error) + + // indicates that the port forwarding is started, and not stopped + isRunning bool } func NewPFClient(kubernetesClient kclient.ClientInterface, stateClient state.Client) *PFClient { @@ -58,7 +67,6 @@ func (o *PFClient) StartPortForwarding( o.StopPortForwarding() o.stopChan = make(chan struct{}, 1) - o.finishedChan = make(chan struct{}, 1) var portPairs map[string][]string if randomPorts { @@ -75,32 +83,79 @@ func (o *PFClient) StartPortForwarding( return err } - portsBuf := NewPortWriter(log.GetStdout(), len(portPairsSlice), ceMapping) + o.originalErrorHandlers = append([]func(error){}, runtime.ErrorHandlers...) + + runtime.ErrorHandlers = append(runtime.ErrorHandlers, func(err error) { + if err.Error() == "lost connection to pod" { + // Stop the low-level port forwarding + // the infinite loop will restart it + if o.stopChan == nil { + return + } + o.stopChan <- struct{}{} + o.stopChan = make(chan struct{}, 1) + } + }) + + o.isRunning = true + + devstateChan := make(chan error) go func() { - err = o.kubernetesClient.SetupPortForwarding(pod, portPairsSlice, portsBuf, errOut, o.stopChan) - if err != nil { - fmt.Printf("failed to setup port-forwarding: %v\n", err) + backo := watch.NewExpBackoff() + for { + o.finishedChan = make(chan struct{}, 1) + portsBuf := NewPortWriter(log.GetStdout(), len(portPairsSlice), ceMapping) + + go func() { + portsBuf.Wait() + err = o.stateClient.SetForwardedPorts(portsBuf.GetForwardedPorts()) + if err != nil { + err = fmt.Errorf("unable to save forwarded ports to state file: %v", err) + } + devstateChan <- err + }() + + err = o.kubernetesClient.SetupPortForwarding(pod, portPairsSlice, portsBuf, errOut, o.stopChan) + if err != nil { + fmt.Fprintf(errOut, "Failed to setup port-forwarding: %v\n", err) + d := backo.Delay() + time.Sleep(d) + } else { + backo.Reset() + } + if !o.isRunning { + break + } } o.finishedChan <- struct{}{} }() - portsBuf.Wait() - err = o.stateClient.SetForwardedPorts(portsBuf.GetForwardedPorts()) - if err != nil { - return fmt.Errorf("unable to save forwarded ports to state file: %v", err) + // Wait the first time the devstate file is written + timeout := 1 * time.Minute + select { + case err = <-devstateChan: + return err + case <-time.After(timeout): + return errors.New("unable to setup port forwarding") } - - return nil } func (o *PFClient) StopPortForwarding() { if o.stopChan == nil { return } + // Ask the low-level port forward to stop o.stopChan <- struct{}{} o.stopChan = nil + + // Ask the infinite loop to stop + o.isRunning = false + + // Wait for low level port forward to be finished + // and the infinite loop to exit <-o.finishedChan o.finishedChan = nil + runtime.ErrorHandlers = o.originalErrorHandlers } func (o *PFClient) GetForwardedPorts() map[string][]int { diff --git a/tests/examples/source/devfiles/nodejs/devfile-composite-apply-commands.yaml b/tests/examples/source/devfiles/nodejs/devfile-composite-apply-commands.yaml index fd3c4f2ade2..42f1396d433 100644 --- a/tests/examples/source/devfiles/nodejs/devfile-composite-apply-commands.yaml +++ b/tests/examples/source/devfiles/nodejs/devfile-composite-apply-commands.yaml @@ -71,12 +71,20 @@ commands: commandLine: npm run debug component: runtime + - id: build + exec: + commandLine: npm install + component: runtime + workingDir: ${PROJECT_SOURCE} + group: + isDefault: true + kind: build + - id: run composite: commands: - build-image - create-k8s-resource - - install - start group: isDefault: true diff --git a/tests/helper/helper_cli.go b/tests/helper/helper_cli.go index 0f66e538ce4..7992802470a 100644 --- a/tests/helper/helper_cli.go +++ b/tests/helper/helper_cli.go @@ -50,4 +50,5 @@ type CliRunner interface { EnsureOperatorIsInstalled(partialOperatorName string) GetBindableKinds() (string, string) GetServiceBinding(name, projectName string) (string, string) + GetLogs(podName string) string } diff --git a/tests/helper/helper_dev.go b/tests/helper/helper_dev.go index 1709fbe51ed..8fadc31d6ce 100644 --- a/tests/helper/helper_dev.go +++ b/tests/helper/helper_dev.go @@ -158,10 +158,18 @@ func (o DevSession) WaitEnd() { // WaitSync waits for the synchronization of files to be finished // It returns the contents of the standard and error outputs -// since the end of the dev mode started or previous sync, and until the end of the synchronization. +// and the list of forwarded ports +// since the end of the dev mode or the last time WaitSync/GetInfo has been called func (o DevSession) WaitSync() ([]byte, []byte, map[string]string, error) { WaitForOutputToContainOne([]string{"Pushing files...", "Updating Component..."}, 180, 10, o.session) WaitForOutputToContain("Watching for changes in the current directory", 240, 10, o.session) + return o.GetInfo() +} + +// GetInfo returns the contents of the standard and error outputs +// and the list of forwarded ports +// since the end of the dev mode or the last time WaitSync/GetInfo has been called +func (o DevSession) GetInfo() ([]byte, []byte, map[string]string, error) { outContents := o.session.Out.Contents() errContents := o.session.Err.Contents() err := o.session.Out.Clear() diff --git a/tests/helper/helper_kubectl.go b/tests/helper/helper_kubectl.go index 0f7c7ab2855..d6bef4f1405 100644 --- a/tests/helper/helper_kubectl.go +++ b/tests/helper/helper_kubectl.go @@ -402,3 +402,8 @@ func (kubectl KubectlRunner) GetAllNamespaceProjects() []string { Expect(err).ShouldNot(HaveOccurred()) return result } + +func (kubectl KubectlRunner) GetLogs(podName string) string { + output := Cmd(kubectl.path, "logs", podName).ShouldPass().Out() + return output +} diff --git a/tests/helper/helper_oc.go b/tests/helper/helper_oc.go index b05911f70fb..9f3329c96ba 100644 --- a/tests/helper/helper_oc.go +++ b/tests/helper/helper_oc.go @@ -594,3 +594,8 @@ func (oc OcRunner) GetAllNamespaceProjects() []string { Expect(err).ShouldNot(HaveOccurred()) return result } + +func (oc OcRunner) GetLogs(podName string) string { + output := Cmd(oc.path, "logs", podName).ShouldPass().Out() + return output +} diff --git a/tests/integration/cmd_dev_test.go b/tests/integration/cmd_dev_test.go index 90e661b3225..c8580cda59c 100644 --- a/tests/integration/cmd_dev_test.go +++ b/tests/integration/cmd_dev_test.go @@ -388,6 +388,47 @@ var _ = Describe("odo dev command tests", func() { }) }) }) + + When("a delay is necessary for the component to start and running odo dev", func() { + + var devSession helper.DevSession + var ports map[string]string + + BeforeEach(func() { + helper.ReplaceString(filepath.Join(commonVar.Context, "devfile.yaml"), "npm start", "sleep 20 ; npm start") + + var err error + devSession, _, _, ports, err = helper.StartDevMode(nil) + Expect(err).ToNot(HaveOccurred()) + }) + + AfterEach(func() { + devSession.Kill() + devSession.WaitEnd() + }) + + It("should first fail then succeed querying endpoint", func() { + url := fmt.Sprintf("http://%s", ports["3000"]) + _, err := http.Get(url) + Expect(err).To(HaveOccurred()) + + podName := commonVar.CliRunner.GetRunningPodNameByComponent(cmpName, commonVar.Project) + Eventually(func() bool { + logs := helper.GetCliRunner().GetLogs(podName) + return strings.Contains(logs, "App started on PORT") + }, 180, 10).Should(Equal(true)) + + // Get new random port after restart + _, _, ports, err = devSession.GetInfo() + Expect(err).ToNot(HaveOccurred()) + url = fmt.Sprintf("http://%s", ports["3000"]) + + resp, err := http.Get(url) + Expect(err).ToNot(HaveOccurred()) + body, _ := io.ReadAll(resp.Body) + helper.MatchAllInOutput(string(body), []string{"Hello from Node.js Starter Application!"}) + }) + }) }) Context("port-forwarding for the component", func() {