Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

various run build -F/--follow fixes around timeout, lack of pods, and data races #56

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/shp/cmd/build/create.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func (c *CreateCommand) Cmd() *cobra.Command {
}

// Complete fills internal subcommand structure for future work with user input
func (c *CreateCommand) Complete(params *params.Params, args []string) error {
func (c *CreateCommand) Complete(params *params.Params, io *genericclioptions.IOStreams, args []string) error {
switch len(args) {
case 1:
c.name = args[0]
Expand Down
2 changes: 1 addition & 1 deletion pkg/shp/cmd/build/delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func (c *DeleteCommand) Cmd() *cobra.Command {
}

// Complete fills DeleteSubCommand structure with data obtained from cobra command
func (c *DeleteCommand) Complete(params *params.Params, args []string) error {
func (c *DeleteCommand) Complete(params *params.Params, io *genericclioptions.IOStreams, args []string) error {
c.name = args[0]

return nil
Expand Down
2 changes: 1 addition & 1 deletion pkg/shp/cmd/build/list.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func (c *ListCommand) Cmd() *cobra.Command {
}

// Complete fills object with user input data
func (c *ListCommand) Complete(params *params.Params, args []string) error {
func (c *ListCommand) Complete(params *params.Params, io *genericclioptions.IOStreams, args []string) error {
return nil
}

Expand Down
118 changes: 85 additions & 33 deletions pkg/shp/cmd/build/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,14 @@ type RunCommand struct {
logTail *tail.Tail // follow container logs
tailLogsStarted map[string]bool // controls tail instance per container

buildName string // build name
logLock sync.Mutex

buildName string
buildRunName string
namespace string
buildRunSpec *buildv1alpha1.BuildRunSpec // stores command-line flags
shpClientset buildclientset.Interface
follow bool // flag to tail pod logs
watchLock sync.Mutex
}

const buildRunLongDesc = `
Expand All @@ -53,7 +55,7 @@ func (r *RunCommand) Cmd() *cobra.Command {
}

// Complete picks the build resource name from arguments, and instantiate additional components.
func (r *RunCommand) Complete(params *params.Params, args []string) error {
func (r *RunCommand) Complete(params *params.Params, io *genericclioptions.IOStreams, args []string) error {
switch len(args) {
case 1:
r.buildName = args[0]
Expand All @@ -66,6 +68,31 @@ func (r *RunCommand) Complete(params *params.Params, args []string) error {
return err
}
r.logTail = tail.NewTail(r.Cmd().Context(), clientset)
r.ioStreams = io
r.namespace = params.Namespace()
if r.follow {
if r.shpClientset, err = params.ShipwrightClientSet(); err != nil {
return err
}

kclientset, err := params.ClientSet()
if err != nil {
return err
}
to, err := params.RequestTimeout()
if err != nil {
return err
}
r.pw, err = reactor.NewPodWatcher(r.Cmd().Context(), to, kclientset, params.Namespace())
if err != nil {
return err
}

r.pw.WithOnPodModifiedFn(r.onEvent)
r.pw.WithTimeoutPodFn(r.onTimeout)
r.pw.WithNoPodEventsYetFn(r.onNoPodEventsYet)

}

// overwriting build-ref name to use what's on arguments
return r.Cmd().Flags().Set(flags.BuildrefNameFlag, r.buildName)
Expand All @@ -92,11 +119,49 @@ func (r *RunCommand) tailLogs(pod *corev1.Pod) {
}
}

// onNoPodEventsYet reacts to the pod watcher telling us it has not received any pod events for our build run
func (r *RunCommand) onNoPodEventsYet() {
r.Log(fmt.Sprintf("BuildRun %q log following has not observed any pod events yet.", r.buildRunName))
br, err := r.shpClientset.ShipwrightV1alpha1().BuildRuns(r.namespace).Get(r.cmd.Context(), r.buildRunName, metav1.GetOptions{})
if err != nil {
r.Log(fmt.Sprintf("error accessing BuildRun %q: %s", r.buildRunName, err.Error()))
return
}

c := br.Status.GetCondition(buildv1alpha1.Succeeded)
giveUp := false
msg := ""
switch {
case c != nil && c.Status == corev1.ConditionTrue:
giveUp = true
msg = fmt.Sprintf("BuildRun '%s' has been marked as successful.\n", br.Name)
case c != nil && c.Status == corev1.ConditionFalse:
giveUp = true
msg = fmt.Sprintf("BuildRun '%s' has been marked as failed.\n", br.Name)
case br.IsCanceled():
giveUp = true
msg = fmt.Sprintf("BuildRun '%s' has been canceled.\n", br.Name)
case br.DeletionTimestamp != nil:
giveUp = true
msg = fmt.Sprintf("BuildRun '%s' has been deleted.\n", br.Name)
case !br.HasStarted():
r.Log(fmt.Sprintf("BuildRun '%s' has been marked as failed.\n", br.Name))
}
if giveUp {
r.Log(msg)
r.Log(fmt.Sprintf("exiting 'ship build run --follow' for BuildRun %q", br.Name))
r.stop()
}

}

// onTimeout reacts to either the context or request timeout causing the pod watcher to exit
func (r *RunCommand) onTimeout(msg string) {
r.Log(fmt.Sprintf("BuildRun %q log following has stopped because: %q\n", r.buildRunName, msg))
}

// onEvent reacts on pod state changes, to start and stop tailing container logs.
func (r *RunCommand) onEvent(pod *corev1.Pod) error {
// found more data races during unit testing with concurrent events coming in
r.watchLock.Lock()
defer r.watchLock.Unlock()
switch pod.Status.Phase {
case corev1.PodRunning:
// graceful time to wait for container start
Expand All @@ -118,14 +183,14 @@ func (r *RunCommand) onEvent(pod *corev1.Pod) error {
err = fmt.Errorf("build pod '%s' has failed", pod.GetName())
}
// see if because of deletion or cancelation
fmt.Fprintf(r.ioStreams.Out, msg)
r.Log(msg)
r.stop()
return err
case corev1.PodSucceeded:
fmt.Fprintf(r.ioStreams.Out, "Pod '%s' has succeeded!\n", pod.GetName())
r.Log(fmt.Sprintf("Pod '%s' has succeeded!\n", pod.GetName()))
r.stop()
default:
fmt.Fprintf(r.ioStreams.Out, "Pod '%s' is in state %q...\n", pod.GetName(), string(pod.Status.Phase))
r.Log(fmt.Sprintf("Pod '%s' is in state %q...\n", pod.GetName(), string(pod.Status.Phase)))
// handle any issues with pulling images that may fail
for _, c := range pod.Status.Conditions {
if c.Type == corev1.PodInitialized || c.Type == corev1.ContainersReady {
Expand All @@ -146,10 +211,7 @@ func (r *RunCommand) stop() {

// Run creates a BuildRun resource based on Build's name informed on arguments.
func (r *RunCommand) Run(params *params.Params, ioStreams *genericclioptions.IOStreams) error {
// ran into some data race conditions during unit test with this starting up, but pod events
// coming in before we completed initialization below
r.watchLock.Lock()
// resource using GenerateName, which will provice a unique instance
// resource using GenerateName, which will provide a unique instance
br := &buildv1alpha1.BuildRun{
ObjectMeta: metav1.ObjectMeta{
GenerateName: fmt.Sprintf("%s-", r.buildName),
Expand All @@ -162,7 +224,7 @@ func (r *RunCommand) Run(params *params.Params, ioStreams *genericclioptions.IOS
if err != nil {
return err
}
br, err = clientset.ShipwrightV1alpha1().BuildRuns(params.Namespace()).Create(r.cmd.Context(), br, metav1.CreateOptions{})
br, err = clientset.ShipwrightV1alpha1().BuildRuns(r.namespace).Create(r.cmd.Context(), br, metav1.CreateOptions{})
if err != nil {
return err
}
Expand All @@ -172,15 +234,7 @@ func (r *RunCommand) Run(params *params.Params, ioStreams *genericclioptions.IOS
return nil
}

r.ioStreams = ioStreams
kclientset, err := params.ClientSet()
if err != nil {
return err
}
r.buildRunName = br.Name
if r.shpClientset, err = params.ShipwrightClientSet(); err != nil {
return err
}

// instantiating a pod watcher with a specific label-selector to find the indented pod where the
// actual build started by this subcommand is being executed, including the randomized buildrun
Expand All @@ -190,19 +244,17 @@ func (r *RunCommand) Run(params *params.Params, ioStreams *genericclioptions.IOS
r.buildName,
br.GetName(),
)}
r.pw, err = reactor.NewPodWatcher(r.Cmd().Context(), kclientset, listOpts, params.Namespace())
if err != nil {
return err
}

r.pw.WithOnPodModifiedFn(r.onEvent)
// cannot defer with unlock up top because r.pw.Start() blocks; but the erroring out above kills the
// cli invocation, so it does not matter
r.watchLock.Unlock()
_, err = r.pw.Start()
_, err = r.pw.Start(listOpts)
return err
}

func (r *RunCommand) Log(msg string) {
// concurrent fmt.Fprintf(r.ioStream.Out...) calls need locking to avoid data races, as we 'write' to the stream
r.logLock.Lock()
defer r.logLock.Unlock()
fmt.Fprintf(r.ioStreams.Out, msg)
}

Comment on lines +251 to +257
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have you considered using a channel for this? the Log function could write to the channel and another go function reads from it and writes the messages to standard out. I don't "think" that the sync.Mutex here will cause any blocking issues, but the channel might be even less likely to. Just a thought I had while looking through this.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I'm positive mutex here is not a problem with the runtime code cause this method is the only one that uses it.

So we prevent the concurrent writes that race detection flagged.

It was only concurent writes in runtime code that we need to be wary of here then.

Also, I say runtime code, cause I do leverage this in the unit tests in one and only one spot to verify contents of the out buffer.

But still, given all these qualifiers, especially as there is not read / write interaction we are coordinating in the non test code, I think this simpler solution is better.

But good consideration to sort out - thanks.

// runCmd instantiate the "build run" sub-command using common BuildRun flags.
func runCmd() runner.SubCommand {
cmd := &cobra.Command{
Expand All @@ -214,7 +266,7 @@ func runCmd() runner.SubCommand {
cmd: cmd,
buildRunSpec: flags.BuildRunSpecFromFlags(cmd.Flags()),
tailLogsStarted: make(map[string]bool),
watchLock: sync.Mutex{},
logLock: sync.Mutex{},
}
cmd.Flags().BoolVarP(&runCommand.follow, "follow", "F", runCommand.follow, "Start a build and watch its log until it completes or fails.")
return runCommand
Expand Down
83 changes: 48 additions & 35 deletions pkg/shp/cmd/build/run_test.go
Original file line number Diff line number Diff line change
@@ -1,21 +1,21 @@
package build

import (
"runtime"
"bytes"
"strings"
"sync"
"testing"
"time"

buildv1alpha1 "github.com/shipwright-io/build/pkg/apis/build/v1alpha1"
shpfake "github.com/shipwright-io/build/pkg/client/clientset/versioned/fake"
"github.com/shipwright-io/cli/pkg/shp/flags"
"github.com/shipwright-io/cli/pkg/shp/params"
"github.com/shipwright-io/cli/pkg/shp/reactor"
"github.com/spf13/cobra"

corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
kruntime "k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/cli-runtime/pkg/genericclioptions"
"k8s.io/client-go/kubernetes/fake"
fakekubetesting "k8s.io/client-go/testing"
Expand All @@ -26,6 +26,8 @@ func TestStartBuildRunFollowLog(t *testing.T) {
name string
phase corev1.PodPhase
logText string
to string
noPodYet bool
cancelled bool
brDeleted bool
podDeleted bool
Expand Down Expand Up @@ -76,6 +78,16 @@ func TestStartBuildRunFollowLog(t *testing.T) {
// k8s folks to "be careful" with it; fortunately, what we do for tail and pod_watcher so far is within
// the realm of reliable.
},
{
name: "timeout",
to: "1s",
logText: reactor.RequestTimeoutMessage,
},
{
name: "no pod yet",
noPodYet: true,
logText: "has not observed any pod events yet",
},
}

for _, test := range tests {
Expand Down Expand Up @@ -103,6 +115,7 @@ func TestStartBuildRunFollowLog(t *testing.T) {
},
}
shpclientset := shpfake.NewSimpleClientset()

// need this reactor since the Run method uses the ObjectMeta.GenerateName k8s feature to generate the random
// name for the BuildRun. However, for our purposes with unit testing, we want to control the name of the BuildRun
// to facilitate the list/selector via labels that is also employed by the Run method.
Expand All @@ -116,7 +129,10 @@ func TestStartBuildRunFollowLog(t *testing.T) {
return true, br, nil
}
shpclientset.PrependReactor("get", "buildruns", getReactorFunc)
kclientset := fake.NewSimpleClientset(pod)
kclientset := fake.NewSimpleClientset()
if !test.noPodYet {
kclientset = fake.NewSimpleClientset(pod)
}
ccmd := &cobra.Command{}
cmd := &RunCommand{
cmd: ccmd,
Expand All @@ -125,12 +141,16 @@ func TestStartBuildRunFollowLog(t *testing.T) {
follow: true,
shpClientset: shpclientset,
tailLogsStarted: make(map[string]bool),
watchLock: sync.Mutex{},
logLock: sync.Mutex{},
}

// set up context
cmd.Cmd().ExecuteC()
param := params.NewParamsForTest(kclientset, shpclientset, nil, metav1.NamespaceDefault)
pm := genericclioptions.NewConfigFlags(true)
if len(test.to) > 0 {
pm.Timeout = &test.to
}
param := params.NewParamsForTest(kclientset, shpclientset, pm, metav1.NamespaceDefault)

ioStreams, _, out, _ := genericclioptions.NewTestIOStreams()

Expand All @@ -143,7 +163,13 @@ func TestStartBuildRunFollowLog(t *testing.T) {
pod.DeletionTimestamp = &metav1.Time{}
}

cmd.Complete(param, []string{name})
cmd.Complete(param, &ioStreams, []string{name})
if len(test.to) > 0 {
cmd.Run(param, &ioStreams)
checkLog(test.name, test.logText, cmd, out, t)
continue
}

go func() {
err := cmd.Run(param, &ioStreams)
if err != nil {
Expand All @@ -152,35 +178,22 @@ func TestStartBuildRunFollowLog(t *testing.T) {

}()

// yield the processor, so the initialization in Run can occur; afterward, the watchLock should allow
// coordination between Run and onEvent
runtime.Gosched()

// even with our release of the context above with Gosched(), repeated runs in CI have surfaced occasional timing issues between
// cmd.Run() finishing initialization and cmd.onEvent trying to used struct variables, resulting in panics; so we employ the lock here
// to insure the required initializations have run; this is still better than a generic "sleep log enough for
// the init to occur.
cmd.watchLock.Lock()
err := wait.PollImmediate(1*time.Second, 3*time.Second, func() (done bool, err error) {
// check any of the vars on RunCommand that are used in onEvent and make sure they are set;
// we are verifying the initialization done in Run() on RunCommand is complete
if cmd.pw != nil && cmd.ioStreams != nil && cmd.shpClientset != nil {
cmd.watchLock.Unlock()
return true, nil
}
return false, nil
})
if err != nil {
cmd.watchLock.Unlock()
t.Errorf("Run initialization did not complete in time")
}

// mimic watch events, bypassing k8s fake client watch hoopla whose plug points are not always useful;
pod.Status.Phase = test.phase
cmd.onEvent(pod)
if !strings.Contains(out.String(), test.logText) {
t.Errorf("test %s: unexpected output: %s", test.name, out.String())
if !test.noPodYet {
// mimic watch events, bypassing k8s fake client watch hoopla whose plug points are not always useful;
pod.Status.Phase = test.phase
cmd.onEvent(pod)
} else {
cmd.onNoPodEventsYet()
}
checkLog(test.name, test.logText, cmd, out, t)
}
}

func checkLog(name, text string, cmd *RunCommand, out *bytes.Buffer, t *testing.T) {
// need to employ log lock since accessing same iostream out used by Run cmd
cmd.logLock.Lock()
defer cmd.logLock.Unlock()
if !strings.Contains(out.String(), text) {
t.Errorf("test %s: unexpected output: %s", name, out.String())
}
}
2 changes: 1 addition & 1 deletion pkg/shp/cmd/buildrun/cancel.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func (c *CancelCommand) Cmd() *cobra.Command {
}

// Complete fills in data provided by user
func (c *CancelCommand) Complete(params *params.Params, args []string) error {
func (c *CancelCommand) Complete(params *params.Params, io *genericclioptions.IOStreams, args []string) error {
c.name = args[0]

return nil
Expand Down
Loading