From 9ba2e5115a24211dc2721c3fdb1d8f4b1ff89c56 Mon Sep 17 00:00:00 2001 From: gabemontero Date: Fri, 8 Oct 2021 19:40:06 -0400 Subject: [PATCH] add no pod events yet callback add lock to avoid data race on Out stream used for logging --- pkg/shp/cmd/build/run.go | 64 +++++++++++++++++++++++++++++----- pkg/shp/cmd/build/run_test.go | 19 ++++++---- pkg/shp/reactor/pod_watcher.go | 6 ++-- 3 files changed, 72 insertions(+), 17 deletions(-) diff --git a/pkg/shp/cmd/build/run.go b/pkg/shp/cmd/build/run.go index b4997ccaf..720f14a80 100644 --- a/pkg/shp/cmd/build/run.go +++ b/pkg/shp/cmd/build/run.go @@ -3,6 +3,7 @@ package build import ( "errors" "fmt" + "sync" "time" buildv1alpha1 "github.com/shipwright-io/build/pkg/apis/build/v1alpha1" @@ -31,8 +32,11 @@ type RunCommand struct { logTail *tail.Tail // follow container logs tailLogsStarted map[string]bool // controls tail instance per container - buildName string // build name + logLock sync.Mutex + + buildName string buildRunName string + namespace string buildRunSpec *buildv1alpha1.BuildRunSpec // stores command-line flags shpClientset buildclientset.Interface follow bool // flag to tail pod logs @@ -65,6 +69,7 @@ func (r *RunCommand) Complete(params *params.Params, io *genericclioptions.IOStr } r.logTail = tail.NewTail(r.Cmd().Context(), clientset) r.ioStreams = io + r.namespace = params.Namespace() if r.follow { if r.shpClientset, err = params.ShipwrightClientSet(); err != nil { return err @@ -85,6 +90,7 @@ func (r *RunCommand) Complete(params *params.Params, io *genericclioptions.IOStr r.pw.WithOnPodModifiedFn(r.onEvent) r.pw.WithTimeoutPodFn(r.onTimeout) + r.pw.WithNoPodEventsYetFn(r.onNoPodEventsYet) } @@ -113,9 +119,45 @@ func (r *RunCommand) tailLogs(pod *corev1.Pod) { } } +// onNoPodEventsYet reacts to the pod watcher telling us it has not received any pod events for our build run +func (r *RunCommand) onNoPodEventsYet() { + r.Log(fmt.Sprintf("BuildRun %q log following has not observed any pod events yet.", r.buildRunName)) + br, err := r.shpClientset.ShipwrightV1alpha1().BuildRuns(r.namespace).Get(r.cmd.Context(), r.buildRunName, metav1.GetOptions{}) + if err != nil { + r.Log(fmt.Sprintf("error accessing BuildRun %q: %s", r.buildRunName, err.Error())) + return + } + + c := br.Status.GetCondition(buildv1alpha1.Succeeded) + giveUp := false + msg := "" + switch { + case c != nil && c.Status == corev1.ConditionTrue: + giveUp = true + msg = fmt.Sprintf("BuildRun '%s' has been marked as successful.\n", br.Name) + case c != nil && c.Status == corev1.ConditionFalse: + giveUp = true + msg = fmt.Sprintf("BuildRun '%s' has been marked as failed.\n", br.Name) + case br.IsCanceled(): + giveUp = true + msg = fmt.Sprintf("BuildRun '%s' has been canceled.\n", br.Name) + case br.DeletionTimestamp != nil: + giveUp = true + msg = fmt.Sprintf("BuildRun '%s' has been deleted.\n", br.Name) + case !br.HasStarted(): + r.Log(fmt.Sprintf("BuildRun '%s' has been marked as failed.\n", br.Name)) + } + if giveUp { + r.Log(msg) + r.Log(fmt.Sprintf("exiting 'ship build run --follow' for BuildRun %q", br.Name)) + r.stop() + } + +} + // onTimeout reacts to either the context or request timeout causing the pod watcher to exit func (r *RunCommand) onTimeout(msg string) { - fmt.Fprintf(r.ioStreams.Out, "BuildRun %q log following has stopped because: %q\n", r.buildRunName, msg) + r.Log(fmt.Sprintf("BuildRun %q log following has stopped because: %q\n", r.buildRunName, msg)) } // onEvent reacts on pod state changes, to start and stop tailing container logs. @@ -141,14 +183,14 @@ func (r *RunCommand) onEvent(pod *corev1.Pod) error { err = fmt.Errorf("build pod '%s' has failed", pod.GetName()) } // see if because of deletion or cancelation - fmt.Fprintf(r.ioStreams.Out, msg) + r.Log(msg) r.stop() return err case corev1.PodSucceeded: - fmt.Fprintf(r.ioStreams.Out, "Pod '%s' has succeeded!\n", pod.GetName()) + r.Log(fmt.Sprintf("Pod '%s' has succeeded!\n", pod.GetName())) r.stop() default: - fmt.Fprintf(r.ioStreams.Out, "Pod '%s' is in state %q...\n", pod.GetName(), string(pod.Status.Phase)) + r.Log(fmt.Sprintf("Pod '%s' is in state %q...\n", pod.GetName(), string(pod.Status.Phase))) // handle any issues with pulling images that may fail for _, c := range pod.Status.Conditions { if c.Type == corev1.PodInitialized || c.Type == corev1.ContainersReady { @@ -182,7 +224,7 @@ func (r *RunCommand) Run(params *params.Params, ioStreams *genericclioptions.IOS if err != nil { return err } - br, err = clientset.ShipwrightV1alpha1().BuildRuns(params.Namespace()).Create(r.cmd.Context(), br, metav1.CreateOptions{}) + br, err = clientset.ShipwrightV1alpha1().BuildRuns(r.namespace).Create(r.cmd.Context(), br, metav1.CreateOptions{}) if err != nil { return err } @@ -202,12 +244,17 @@ func (r *RunCommand) Run(params *params.Params, ioStreams *genericclioptions.IOS r.buildName, br.GetName(), )} - r.pw.WithOnPodModifiedFn(r.onEvent) - r.pw.WithTimeoutPodFn(r.onTimeout) _, err = r.pw.Start(listOpts) return err } +func (r *RunCommand) Log(msg string) { + // concurrent fmt.Fprintf(r.ioStream.Out...) calls need locking to avoid data races, as we 'write' to the stream + r.logLock.Lock() + defer r.logLock.Unlock() + fmt.Fprintf(r.ioStreams.Out, msg) +} + // runCmd instantiate the "build run" sub-command using common BuildRun flags. func runCmd() runner.SubCommand { cmd := &cobra.Command{ @@ -219,6 +266,7 @@ func runCmd() runner.SubCommand { cmd: cmd, buildRunSpec: flags.BuildRunSpecFromFlags(cmd.Flags()), tailLogsStarted: make(map[string]bool), + logLock: sync.Mutex{}, } cmd.Flags().BoolVarP(&runCommand.follow, "follow", "F", runCommand.follow, "Start a build and watch its log until it completes or fails.") return runCommand diff --git a/pkg/shp/cmd/build/run_test.go b/pkg/shp/cmd/build/run_test.go index 770c6f20e..4290f3276 100644 --- a/pkg/shp/cmd/build/run_test.go +++ b/pkg/shp/cmd/build/run_test.go @@ -1,7 +1,9 @@ package build import ( + "bytes" "strings" + "sync" "testing" buildv1alpha1 "github.com/shipwright-io/build/pkg/apis/build/v1alpha1" @@ -129,6 +131,7 @@ func TestStartBuildRunFollowLog(t *testing.T) { follow: true, shpClientset: shpclientset, tailLogsStarted: make(map[string]bool), + logLock: sync.Mutex{}, } // set up context @@ -153,9 +156,7 @@ func TestStartBuildRunFollowLog(t *testing.T) { cmd.Complete(param, &ioStreams, []string{name}) if len(test.to) > 0 { cmd.Run(param, &ioStreams) - if !strings.Contains(out.String(), test.logText) { - t.Errorf("test %s: unexpected output: %s", test.name, out.String()) - } + checkLog(test.name, test.logText, cmd, out, t) continue } go func() { @@ -169,9 +170,15 @@ func TestStartBuildRunFollowLog(t *testing.T) { // mimic watch events, bypassing k8s fake client watch hoopla whose plug points are not always useful; pod.Status.Phase = test.phase cmd.onEvent(pod) - if !strings.Contains(out.String(), test.logText) { - t.Errorf("test %s: unexpected output: %s", test.name, out.String()) - } + checkLog(test.name, test.logText, cmd, out, t) + } +} +func checkLog(name, text string, cmd *RunCommand, out *bytes.Buffer, t *testing.T) { + // need to employ log lock since accessing same iostream out used by Run cmd + cmd.logLock.Lock() + defer cmd.logLock.Unlock() + if !strings.Contains(out.String(), text) { + t.Errorf("test %s: unexpected output: %s", name, out.String()) } } diff --git a/pkg/shp/reactor/pod_watcher.go b/pkg/shp/reactor/pod_watcher.go index dae759f6c..95294436a 100644 --- a/pkg/shp/reactor/pod_watcher.go +++ b/pkg/shp/reactor/pod_watcher.go @@ -90,8 +90,8 @@ func (p *PodWatcher) WithNoPodEventsYetFn(fn NoPodEventsYetFn) *PodWatcher { // handleEvent applies user informed functions against informed pod and event. func (p *PodWatcher) handleEvent(pod *corev1.Pod, event watch.Event) error { - p.stopLock.Lock() - defer p.stopLock.Unlock() + //p.stopLock.Lock() + //defer p.stopLock.Unlock() p.eventTicker.Stop() switch event.Type { case watch.Added: @@ -185,9 +185,9 @@ func (p *PodWatcher) Stop() { // along with canceling of builds p.stopLock.Lock() defer p.stopLock.Unlock() + p.eventTicker.Stop() if !p.stopped { close(p.stopCh) - p.eventTicker.Stop() p.stopped = true } }