Skip to content

Commit

Permalink
Restart port forwarding on failure (redhat-developer#6013)
Browse files Browse the repository at this point in the history
* Restart port forwarding on failure

* Save ports again when port forward is restarted

* Integration test

* Update pkg/portForward/portForward.go

Co-authored-by: Dharmit Shah <shahdharmit@gmail.com>

* Fix rebase

* Fix integration test with run composite command

* Copy errorhandlers

* Add timeout for first-time port forwarding

Co-authored-by: Dharmit Shah <shahdharmit@gmail.com>
  • Loading branch information
2 people authored and cdrage committed Aug 31, 2022
1 parent 1b99d62 commit 9d236d9
Show file tree
Hide file tree
Showing 7 changed files with 136 additions and 13 deletions.
77 changes: 66 additions & 11 deletions pkg/portForward/portForward.go
Original file line number Diff line number Diff line change
@@ -1,19 +1,23 @@
package portForward

import (
"errors"
"fmt"
"io"
"reflect"
"time"

"github.com/devfile/api/v2/pkg/apis/workspaces/v1alpha2"
"github.com/devfile/library/pkg/devfile/parser"
parsercommon "github.com/devfile/library/pkg/devfile/parser/data/v2/common"
"k8s.io/apimachinery/pkg/util/runtime"

"github.com/redhat-developer/odo/pkg/kclient"
"github.com/redhat-developer/odo/pkg/libdevfile"
"github.com/redhat-developer/odo/pkg/log"
"github.com/redhat-developer/odo/pkg/state"
"github.com/redhat-developer/odo/pkg/util"
"github.com/redhat-developer/odo/pkg/watch"
)

var _ Client = (*PFClient)(nil)
Expand All @@ -28,6 +32,11 @@ type PFClient struct {
stopChan chan struct{}
// finishedChan is written when the port forwarding is finished
finishedChan chan struct{}

originalErrorHandlers []func(error)

// indicates that the port forwarding is started, and not stopped
isRunning bool
}

func NewPFClient(kubernetesClient kclient.ClientInterface, stateClient state.Client) *PFClient {
Expand Down Expand Up @@ -58,7 +67,6 @@ func (o *PFClient) StartPortForwarding(
o.StopPortForwarding()

o.stopChan = make(chan struct{}, 1)
o.finishedChan = make(chan struct{}, 1)

var portPairs map[string][]string
if randomPorts {
Expand All @@ -75,32 +83,79 @@ func (o *PFClient) StartPortForwarding(
return err
}

portsBuf := NewPortWriter(log.GetStdout(), len(portPairsSlice), ceMapping)
o.originalErrorHandlers = append([]func(error){}, runtime.ErrorHandlers...)

runtime.ErrorHandlers = append(runtime.ErrorHandlers, func(err error) {
if err.Error() == "lost connection to pod" {
// Stop the low-level port forwarding
// the infinite loop will restart it
if o.stopChan == nil {
return
}
o.stopChan <- struct{}{}
o.stopChan = make(chan struct{}, 1)
}
})

o.isRunning = true

devstateChan := make(chan error)
go func() {
err = o.kubernetesClient.SetupPortForwarding(pod, portPairsSlice, portsBuf, errOut, o.stopChan)
if err != nil {
fmt.Printf("failed to setup port-forwarding: %v\n", err)
backo := watch.NewExpBackoff()
for {
o.finishedChan = make(chan struct{}, 1)
portsBuf := NewPortWriter(log.GetStdout(), len(portPairsSlice), ceMapping)

go func() {
portsBuf.Wait()
err = o.stateClient.SetForwardedPorts(portsBuf.GetForwardedPorts())
if err != nil {
err = fmt.Errorf("unable to save forwarded ports to state file: %v", err)
}
devstateChan <- err
}()

err = o.kubernetesClient.SetupPortForwarding(pod, portPairsSlice, portsBuf, errOut, o.stopChan)
if err != nil {
fmt.Fprintf(errOut, "Failed to setup port-forwarding: %v\n", err)
d := backo.Delay()
time.Sleep(d)
} else {
backo.Reset()
}
if !o.isRunning {
break
}
}
o.finishedChan <- struct{}{}
}()

portsBuf.Wait()
err = o.stateClient.SetForwardedPorts(portsBuf.GetForwardedPorts())
if err != nil {
return fmt.Errorf("unable to save forwarded ports to state file: %v", err)
// Wait the first time the devstate file is written
timeout := 1 * time.Minute
select {
case err = <-devstateChan:
return err
case <-time.After(timeout):
return errors.New("unable to setup port forwarding")
}

return nil
}

func (o *PFClient) StopPortForwarding() {
if o.stopChan == nil {
return
}
// Ask the low-level port forward to stop
o.stopChan <- struct{}{}
o.stopChan = nil

// Ask the infinite loop to stop
o.isRunning = false

// Wait for low level port forward to be finished
// and the infinite loop to exit
<-o.finishedChan
o.finishedChan = nil
runtime.ErrorHandlers = o.originalErrorHandlers
}

func (o *PFClient) GetForwardedPorts() map[string][]int {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,12 +71,20 @@ commands:
commandLine: npm run debug
component: runtime

- id: build
exec:
commandLine: npm install
component: runtime
workingDir: ${PROJECT_SOURCE}
group:
isDefault: true
kind: build

- id: run
composite:
commands:
- build-image
- create-k8s-resource
- install
- start
group:
isDefault: true
Expand Down
1 change: 1 addition & 0 deletions tests/helper/helper_cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,4 +50,5 @@ type CliRunner interface {
EnsureOperatorIsInstalled(partialOperatorName string)
GetBindableKinds() (string, string)
GetServiceBinding(name, projectName string) (string, string)
GetLogs(podName string) string
}
10 changes: 9 additions & 1 deletion tests/helper/helper_dev.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,10 +158,18 @@ func (o DevSession) WaitEnd() {

// WaitSync waits for the synchronization of files to be finished
// It returns the contents of the standard and error outputs
// since the end of the dev mode started or previous sync, and until the end of the synchronization.
// and the list of forwarded ports
// since the end of the dev mode or the last time WaitSync/GetInfo has been called
func (o DevSession) WaitSync() ([]byte, []byte, map[string]string, error) {
WaitForOutputToContainOne([]string{"Pushing files...", "Updating Component..."}, 180, 10, o.session)
WaitForOutputToContain("Watching for changes in the current directory", 240, 10, o.session)
return o.GetInfo()
}

// GetInfo returns the contents of the standard and error outputs
// and the list of forwarded ports
// since the end of the dev mode or the last time WaitSync/GetInfo has been called
func (o DevSession) GetInfo() ([]byte, []byte, map[string]string, error) {
outContents := o.session.Out.Contents()
errContents := o.session.Err.Contents()
err := o.session.Out.Clear()
Expand Down
5 changes: 5 additions & 0 deletions tests/helper/helper_kubectl.go
Original file line number Diff line number Diff line change
Expand Up @@ -402,3 +402,8 @@ func (kubectl KubectlRunner) GetAllNamespaceProjects() []string {
Expect(err).ShouldNot(HaveOccurred())
return result
}

func (kubectl KubectlRunner) GetLogs(podName string) string {
output := Cmd(kubectl.path, "logs", podName).ShouldPass().Out()
return output
}
5 changes: 5 additions & 0 deletions tests/helper/helper_oc.go
Original file line number Diff line number Diff line change
Expand Up @@ -594,3 +594,8 @@ func (oc OcRunner) GetAllNamespaceProjects() []string {
Expect(err).ShouldNot(HaveOccurred())
return result
}

func (oc OcRunner) GetLogs(podName string) string {
output := Cmd(oc.path, "logs", podName).ShouldPass().Out()
return output
}
41 changes: 41 additions & 0 deletions tests/integration/cmd_dev_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -388,6 +388,47 @@ var _ = Describe("odo dev command tests", func() {
})
})
})

When("a delay is necessary for the component to start and running odo dev", func() {

var devSession helper.DevSession
var ports map[string]string

BeforeEach(func() {
helper.ReplaceString(filepath.Join(commonVar.Context, "devfile.yaml"), "npm start", "sleep 20 ; npm start")

var err error
devSession, _, _, ports, err = helper.StartDevMode(nil)
Expect(err).ToNot(HaveOccurred())
})

AfterEach(func() {
devSession.Kill()
devSession.WaitEnd()
})

It("should first fail then succeed querying endpoint", func() {
url := fmt.Sprintf("http://%s", ports["3000"])
_, err := http.Get(url)
Expect(err).To(HaveOccurred())

podName := commonVar.CliRunner.GetRunningPodNameByComponent(cmpName, commonVar.Project)
Eventually(func() bool {
logs := helper.GetCliRunner().GetLogs(podName)
return strings.Contains(logs, "App started on PORT")
}, 180, 10).Should(Equal(true))

// Get new random port after restart
_, _, ports, err = devSession.GetInfo()
Expect(err).ToNot(HaveOccurred())
url = fmt.Sprintf("http://%s", ports["3000"])

resp, err := http.Get(url)
Expect(err).ToNot(HaveOccurred())
body, _ := io.ReadAll(resp.Body)
helper.MatchAllInOutput(string(body), []string{"Hello from Node.js Starter Application!"})
})
})
})

Context("port-forwarding for the component", func() {
Expand Down

0 comments on commit 9d236d9

Please sign in to comment.