Skip to content

Commit

Permalink
add no pod events yet callback
Browse files Browse the repository at this point in the history
add lock to avoid data race on Out stream used for logging
  • Loading branch information
gabemontero committed Oct 9, 2021
1 parent ecc1872 commit 65ae1c4
Show file tree
Hide file tree
Showing 3 changed files with 70 additions and 17 deletions.
62 changes: 54 additions & 8 deletions pkg/shp/cmd/build/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package build
import (
"errors"
"fmt"
"sync"
"time"

buildv1alpha1 "github.com/shipwright-io/build/pkg/apis/build/v1alpha1"
Expand Down Expand Up @@ -31,8 +32,11 @@ type RunCommand struct {
logTail *tail.Tail // follow container logs
tailLogsStarted map[string]bool // controls tail instance per container

buildName string // build name
logLock sync.Mutex

buildName string
buildRunName string
namespace string
buildRunSpec *buildv1alpha1.BuildRunSpec // stores command-line flags
shpClientset buildclientset.Interface
follow bool // flag to tail pod logs
Expand Down Expand Up @@ -65,6 +69,7 @@ func (r *RunCommand) Complete(params *params.Params, io *genericclioptions.IOStr
}
r.logTail = tail.NewTail(r.Cmd().Context(), clientset)
r.ioStreams = io
r.namespace = params.Namespace()
if r.follow {
if r.shpClientset, err = params.ShipwrightClientSet(); err != nil {
return err
Expand All @@ -85,6 +90,7 @@ func (r *RunCommand) Complete(params *params.Params, io *genericclioptions.IOStr

r.pw.WithOnPodModifiedFn(r.onEvent)
r.pw.WithTimeoutPodFn(r.onTimeout)
r.pw.WithNoPodEventsYetFn(r.onNoPodEventsYet)

}

Expand Down Expand Up @@ -113,9 +119,43 @@ func (r *RunCommand) tailLogs(pod *corev1.Pod) {
}
}

// onNoPodEventsYet reacts to the pod watcher telling us it has not received any pod events for our build run
func (r *RunCommand) onNoPodEventsYet() {
r.Log(fmt.Sprintf("BuildRun %q log following has not observed any pod events yet.", r.buildRunName))
br, err := r.shpClientset.ShipwrightV1alpha1().BuildRuns(r.namespace).Get(r.cmd.Context(), r.buildRunName, metav1.GetOptions{})
if err != nil {
r.Log(fmt.Sprintf("error accessing BuildRun %q: %s", r.buildRunName, err.Error()))
} else {
c := br.Status.GetCondition(buildv1alpha1.Succeeded)
giveUp := false
msg := ""
switch {
case c != nil && c.Status == corev1.ConditionTrue:
giveUp = true
msg = fmt.Sprintf("BuildRun '%s' has been marked as successful.\n", br.Name)
case c != nil && c.Status == corev1.ConditionFalse:
giveUp = true
msg = fmt.Sprintf("BuildRun '%s' has been marked as failed.\n", br.Name)
case br.IsCanceled():
giveUp = true
msg = fmt.Sprintf("BuildRun '%s' has been canceled.\n", br.Name)
case br.DeletionTimestamp != nil:
giveUp = true
msg = fmt.Sprintf("BuildRun '%s' has been deleted.\n", br.Name)
case !br.HasStarted():
r.Log(fmt.Sprintf("BuildRun '%s' has been marked as failed.\n", br.Name))
}
if giveUp {
r.Log(msg)
r.Log(fmt.Sprintf("exting 'ship build run --follow' for BuildRun %q", br.Name))
r.stop()
}
}
}

// onTimeout reacts to either the context or request timeout causing the pod watcher to exit
func (r *RunCommand) onTimeout(msg string) {
fmt.Fprintf(r.ioStreams.Out, "BuildRun %q log following has stopped because: %q\n", r.buildRunName, msg)
r.Log(fmt.Sprintf("BuildRun %q log following has stopped because: %q\n", r.buildRunName, msg))
}

// onEvent reacts on pod state changes, to start and stop tailing container logs.
Expand All @@ -141,14 +181,14 @@ func (r *RunCommand) onEvent(pod *corev1.Pod) error {
err = fmt.Errorf("build pod '%s' has failed", pod.GetName())
}
// see if because of deletion or cancelation
fmt.Fprintf(r.ioStreams.Out, msg)
r.Log(msg)
r.stop()
return err
case corev1.PodSucceeded:
fmt.Fprintf(r.ioStreams.Out, "Pod '%s' has succeeded!\n", pod.GetName())
r.Log(fmt.Sprintf("Pod '%s' has succeeded!\n", pod.GetName()))
r.stop()
default:
fmt.Fprintf(r.ioStreams.Out, "Pod '%s' is in state %q...\n", pod.GetName(), string(pod.Status.Phase))
r.Log(fmt.Sprintf("Pod '%s' is in state %q...\n", pod.GetName(), string(pod.Status.Phase)))
// handle any issues with pulling images that may fail
for _, c := range pod.Status.Conditions {
if c.Type == corev1.PodInitialized || c.Type == corev1.ContainersReady {
Expand Down Expand Up @@ -182,7 +222,7 @@ func (r *RunCommand) Run(params *params.Params, ioStreams *genericclioptions.IOS
if err != nil {
return err
}
br, err = clientset.ShipwrightV1alpha1().BuildRuns(params.Namespace()).Create(r.cmd.Context(), br, metav1.CreateOptions{})
br, err = clientset.ShipwrightV1alpha1().BuildRuns(r.namespace).Create(r.cmd.Context(), br, metav1.CreateOptions{})
if err != nil {
return err
}
Expand All @@ -202,12 +242,17 @@ func (r *RunCommand) Run(params *params.Params, ioStreams *genericclioptions.IOS
r.buildName,
br.GetName(),
)}
r.pw.WithOnPodModifiedFn(r.onEvent)
r.pw.WithTimeoutPodFn(r.onTimeout)
_, err = r.pw.Start(listOpts)
return err
}

func (r *RunCommand) Log(msg string) {
// concurrent fmt.Fprintf(r.ioStream.Out...) calls need locking to avoid data races, as we 'write' to the stream
r.logLock.Lock()
defer r.logLock.Unlock()
fmt.Fprintf(r.ioStreams.Out, msg)
}

// runCmd instantiate the "build run" sub-command using common BuildRun flags.
func runCmd() runner.SubCommand {
cmd := &cobra.Command{
Expand All @@ -219,6 +264,7 @@ func runCmd() runner.SubCommand {
cmd: cmd,
buildRunSpec: flags.BuildRunSpecFromFlags(cmd.Flags()),
tailLogsStarted: make(map[string]bool),
logLock: sync.Mutex{},
}
cmd.Flags().BoolVarP(&runCommand.follow, "follow", "F", runCommand.follow, "Start a build and watch its log until it completes or fails.")
return runCommand
Expand Down
19 changes: 13 additions & 6 deletions pkg/shp/cmd/build/run_test.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package build

import (
"bytes"
"strings"
"sync"
"testing"

buildv1alpha1 "github.com/shipwright-io/build/pkg/apis/build/v1alpha1"
Expand Down Expand Up @@ -129,6 +131,7 @@ func TestStartBuildRunFollowLog(t *testing.T) {
follow: true,
shpClientset: shpclientset,
tailLogsStarted: make(map[string]bool),
logLock: sync.Mutex{},
}

// set up context
Expand All @@ -153,9 +156,7 @@ func TestStartBuildRunFollowLog(t *testing.T) {
cmd.Complete(param, &ioStreams, []string{name})
if len(test.to) > 0 {
cmd.Run(param, &ioStreams)
if !strings.Contains(out.String(), test.logText) {
t.Errorf("test %s: unexpected output: %s", test.name, out.String())
}
checkLog(test.name, test.logText, cmd, out, t)
continue
}
go func() {
Expand All @@ -169,9 +170,15 @@ func TestStartBuildRunFollowLog(t *testing.T) {
// mimic watch events, bypassing k8s fake client watch hoopla whose plug points are not always useful;
pod.Status.Phase = test.phase
cmd.onEvent(pod)
if !strings.Contains(out.String(), test.logText) {
t.Errorf("test %s: unexpected output: %s", test.name, out.String())
}
checkLog(test.name, test.logText, cmd, out, t)
}
}

func checkLog(name, text string, cmd *RunCommand, out *bytes.Buffer, t *testing.T) {
// need to employ log lock since accessing same iostream out used by Run cmd
cmd.logLock.Lock()
defer cmd.logLock.Unlock()
if !strings.Contains(out.String(), text) {
t.Errorf("test %s: unexpected output: %s", name, out.String())
}
}
6 changes: 3 additions & 3 deletions pkg/shp/reactor/pod_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,8 @@ func (p *PodWatcher) WithNoPodEventsYetFn(fn NoPodEventsYetFn) *PodWatcher {

// handleEvent applies user informed functions against informed pod and event.
func (p *PodWatcher) handleEvent(pod *corev1.Pod, event watch.Event) error {
p.stopLock.Lock()
defer p.stopLock.Unlock()
//p.stopLock.Lock()
//defer p.stopLock.Unlock()
p.eventTicker.Stop()
switch event.Type {
case watch.Added:
Expand Down Expand Up @@ -185,9 +185,9 @@ func (p *PodWatcher) Stop() {
// along with canceling of builds
p.stopLock.Lock()
defer p.stopLock.Unlock()
p.eventTicker.Stop()
if !p.stopped {
close(p.stopCh)
p.eventTicker.Stop()
p.stopped = true
}
}
Expand Down

0 comments on commit 65ae1c4

Please sign in to comment.