Skip to content

Commit

Permalink
[Akshat] Poll proctord for proc execution status
Browse files Browse the repository at this point in the history
- Post streaming logs, poll proctord for status
- CLI exits with non-zero error on proc execution failure
  • Loading branch information
Tasdik Rahman committed Dec 17, 2018
1 parent 62d2ab6 commit ca6947b
Show file tree
Hide file tree
Showing 8 changed files with 347 additions and 30 deletions.
22 changes: 19 additions & 3 deletions cmd/execution/executioner.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,11 @@ import (
"github.com/fatih/color"
"github.com/gojektech/proctor/daemon"
"github.com/gojektech/proctor/io"
proctord_utility "github.com/gojektech/proctor/proctord/utility"
"github.com/spf13/cobra"
)

func NewCmd(printer io.Printer, proctorEngineClient daemon.Client) *cobra.Command {
func NewCmd(printer io.Printer, proctorEngineClient daemon.Client, osExitFunc func(int)) *cobra.Command {
return &cobra.Command{
Use: "execute",
Short: "Execute a proc with given arguments",
Expand Down Expand Up @@ -44,18 +45,33 @@ func NewCmd(printer io.Printer, proctorEngineClient daemon.Client) *cobra.Comman

executedProcName, err := proctorEngineClient.ExecuteProc(procName, procArgs)
if err != nil {
printer.Println(err.Error(), color.FgRed)
printer.Println("Error submitting proc for execution", color.FgRed)
return
}

printer.Println("Proc execution successful. \nStreaming logs:", color.FgGreen)
printer.Println("Proc submitted for execution. \nStreaming logs:", color.FgGreen)
err = proctorEngineClient.StreamProcLogs(executedProcName)
if err != nil {
printer.Println("Error Streaming Logs", color.FgRed)
return
}

printer.Println("Log stream of proc completed.", color.FgGreen)

procExecutionStatus, err := proctorEngineClient.GetDefinitiveProcExecutionStatus(executedProcName)
if err != nil {
printer.Println("Error Fetching Proc execution status", color.FgRed)
osExitFunc(1)
return
}

if procExecutionStatus != proctord_utility.JobSucceeded {
printer.Println("Proc execution failed", color.FgRed)
osExitFunc(1)
return
}

printer.Println("Proc execution successful", color.FgGreen)
},
}
}
76 changes: 70 additions & 6 deletions cmd/execution/executioner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/fatih/color"
"github.com/gojektech/proctor/daemon"
"github.com/gojektech/proctor/io"
"github.com/gojektech/proctor/proctord/utility"
"github.com/spf13/cobra"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/suite"
Expand All @@ -23,7 +24,7 @@ type ExecutionCmdTestSuite struct {
func (s *ExecutionCmdTestSuite) SetupTest() {
s.mockPrinter = &io.MockPrinter{}
s.mockProctorEngineClient = &daemon.MockClient{}
s.testExecutionCmd = NewCmd(s.mockPrinter, s.mockProctorEngineClient)
s.testExecutionCmd = NewCmd(s.mockPrinter, s.mockProctorEngineClient, func(exitCode int) {})
}

func (s *ExecutionCmdTestSuite) TestExecutionCmdUsage() {
Expand All @@ -49,11 +50,14 @@ func (s *ExecutionCmdTestSuite) TestExecutionCmd() {

s.mockProctorEngineClient.On("ExecuteProc", "say-hello-world", procArgs).Return("executed-proc-name", nil).Once()

s.mockPrinter.On("Println", "Proc execution successful. \nStreaming logs:", color.FgGreen).Once()
s.mockPrinter.On("Println", "Proc submitted for execution. \nStreaming logs:", color.FgGreen).Once()

s.mockProctorEngineClient.On("StreamProcLogs", "executed-proc-name").Return(nil).Once()
s.mockPrinter.On("Println", "Log stream of proc completed.", color.FgGreen).Once()

s.mockProctorEngineClient.On("GetDefinitiveProcExecutionStatus", "executed-proc-name").Return(utility.JobSucceeded, nil).Once()
s.mockPrinter.On("Println", "Proc execution successful", color.FgGreen).Once()

s.testExecutionCmd.Run(&cobra.Command{}, args)

s.mockProctorEngineClient.AssertExpectations(s.T())
Expand All @@ -69,11 +73,14 @@ func (s *ExecutionCmdTestSuite) TestExecutionCmdForNoProcVariables() {
procArgs := make(map[string]string)
s.mockProctorEngineClient.On("ExecuteProc", "say-hello-world", procArgs).Return("executed-proc-name", nil).Once()

s.mockPrinter.On("Println", "Proc execution successful. \nStreaming logs:", color.FgGreen).Once()
s.mockPrinter.On("Println", "Proc submitted for execution. \nStreaming logs:", color.FgGreen).Once()

s.mockProctorEngineClient.On("StreamProcLogs", "executed-proc-name").Return(nil).Once()
s.mockPrinter.On("Println", "Log stream of proc completed.", color.FgGreen).Once()

s.mockProctorEngineClient.On("GetDefinitiveProcExecutionStatus", "executed-proc-name").Return(utility.JobSucceeded, nil).Once()
s.mockPrinter.On("Println", "Proc execution successful", color.FgGreen).Once()

s.testExecutionCmd.Run(&cobra.Command{}, args)

s.mockProctorEngineClient.AssertExpectations(s.T())
Expand All @@ -90,11 +97,14 @@ func (s *ExecutionCmdTestSuite) TestExecutionCmdForIncorrectVariableFormat() {
procArgs := make(map[string]string)
s.mockProctorEngineClient.On("ExecuteProc", "say-hello-world", procArgs).Return("executed-proc-name", nil).Once()

s.mockPrinter.On("Println", "Proc execution successful. \nStreaming logs:", color.FgGreen).Once()
s.mockPrinter.On("Println", "Proc submitted for execution. \nStreaming logs:", color.FgGreen).Once()

s.mockProctorEngineClient.On("StreamProcLogs", "executed-proc-name").Return(nil).Once()
s.mockPrinter.On("Println", "Log stream of proc completed.", color.FgGreen).Once()

s.mockProctorEngineClient.On("GetDefinitiveProcExecutionStatus", "executed-proc-name").Return(utility.JobSucceeded, nil).Once()
s.mockPrinter.On("Println", "Proc execution successful", color.FgGreen).Once()

s.testExecutionCmd.Run(&cobra.Command{}, args)

s.mockProctorEngineClient.AssertExpectations(s.T())
Expand All @@ -110,7 +120,7 @@ func (s *ExecutionCmdTestSuite) TestExecutionCmdForProctorEngineExecutionFailure
procArgs := make(map[string]string)
s.mockProctorEngineClient.On("ExecuteProc", "say-hello-world", procArgs).Return("", errors.New("test error")).Once()

s.mockPrinter.On("Println", "test error", color.FgRed).Once()
s.mockPrinter.On("Println", "Error submitting proc for execution", color.FgRed).Once()

s.testExecutionCmd.Run(&cobra.Command{}, args)

Expand All @@ -127,7 +137,7 @@ func (s *ExecutionCmdTestSuite) TestExecutionCmdForProctorEngineLogStreamingFail
procArgs := make(map[string]string)
s.mockProctorEngineClient.On("ExecuteProc", "say-hello-world", procArgs).Return("executed-proc-name", nil).Once()

s.mockPrinter.On("Println", "Proc execution successful. \nStreaming logs:", color.FgGreen).Once()
s.mockPrinter.On("Println", "Proc submitted for execution. \nStreaming logs:", color.FgGreen).Once()

s.mockProctorEngineClient.On("StreamProcLogs", "executed-proc-name").Return(errors.New("error")).Once()
s.mockPrinter.On("Println", "Error Streaming Logs", color.FgRed).Once()
Expand All @@ -138,6 +148,60 @@ func (s *ExecutionCmdTestSuite) TestExecutionCmdForProctorEngineLogStreamingFail
s.mockPrinter.AssertExpectations(s.T())
}

func (s *ExecutionCmdTestSuite) TestExecutionCmdForProctorEngineGetDefinitiveProcExecutionStatusError() {
args := []string{"say-hello-world"}

s.mockPrinter.On("Println", fmt.Sprintf("%-40s %-100s", "Executing Proc", "say-hello-world"), color.Reset).Once()
s.mockPrinter.On("Println", "With No Variables", color.FgRed).Once()

procArgs := make(map[string]string)
s.mockProctorEngineClient.On("ExecuteProc", "say-hello-world", procArgs).Return("executed-proc-name", nil).Once()

s.mockPrinter.On("Println", "Proc submitted for execution. \nStreaming logs:", color.FgGreen).Once()

s.mockProctorEngineClient.On("StreamProcLogs", "executed-proc-name").Return(nil).Once()
s.mockPrinter.On("Println", "Log stream of proc completed.", color.FgGreen).Once()

s.mockProctorEngineClient.On("GetDefinitiveProcExecutionStatus", "executed-proc-name").Return("", errors.New("some error")).Once()
s.mockPrinter.On("Println", "Error Fetching Proc execution status", color.FgRed).Once()

osExitFunc := func(exitCode int) {
assert.Equal(s.T(), 1, exitCode)
}
testExecutionCmdOSExit := NewCmd(s.mockPrinter, s.mockProctorEngineClient, osExitFunc)
testExecutionCmdOSExit.Run(&cobra.Command{}, args)

s.mockProctorEngineClient.AssertExpectations(s.T())
s.mockPrinter.AssertExpectations(s.T())
}

func (s *ExecutionCmdTestSuite) TestExecutionCmdForProctorEngineGetDefinitiveProcExecutionStatusFailure() {
args := []string{"say-hello-world"}

s.mockPrinter.On("Println", fmt.Sprintf("%-40s %-100s", "Executing Proc", "say-hello-world"), color.Reset).Once()
s.mockPrinter.On("Println", "With No Variables", color.FgRed).Once()

procArgs := make(map[string]string)
s.mockProctorEngineClient.On("ExecuteProc", "say-hello-world", procArgs).Return("executed-proc-name", nil).Once()

s.mockPrinter.On("Println", "Proc submitted for execution. \nStreaming logs:", color.FgGreen).Once()

s.mockProctorEngineClient.On("StreamProcLogs", "executed-proc-name").Return(nil).Once()
s.mockPrinter.On("Println", "Log stream of proc completed.", color.FgGreen).Once()

s.mockProctorEngineClient.On("GetDefinitiveProcExecutionStatus", "executed-proc-name").Return(utility.JobFailed, nil).Once()
s.mockPrinter.On("Println", "Proc execution failed", color.FgRed).Once()

osExitFunc := func(exitCode int) {
assert.Equal(s.T(), 1, exitCode)
}
testExecutionCmdOSExit := NewCmd(s.mockPrinter, s.mockProctorEngineClient, osExitFunc)
testExecutionCmdOSExit.Run(&cobra.Command{}, args)

s.mockProctorEngineClient.AssertExpectations(s.T())
s.mockPrinter.AssertExpectations(s.T())
}

func TestExecutionCmdTestSuite(t *testing.T) {
suite.Run(t, new(ExecutionCmdTestSuite))
}
3 changes: 2 additions & 1 deletion cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ func Execute(printer io.Printer, proctorEngineClient daemon.Client) {
descriptionCmd := description.NewCmd(printer, proctorEngineClient)
rootCmd.AddCommand(descriptionCmd)

executionCmd := execution.NewCmd(printer, proctorEngineClient)
//TODO: Test execution.NewCmd is given os.Exit function as params
executionCmd := execution.NewCmd(printer, proctorEngineClient, os.Exit)
rootCmd.AddCommand(executionCmd)

listCmd := list.NewCmd(printer, proctorEngineClient)
Expand Down
25 changes: 15 additions & 10 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,18 +12,20 @@ import (
)

const (
Environment = "ENVIRONMENT"
ProctorHost = "PROCTOR_HOST"
EmailId = "EMAIL_ID"
AccessToken = "ACCESS_TOKEN"
ConnectionTimeoutSecs = "CONNECTION_TIMEOUT_SECS"
Environment = "ENVIRONMENT"
ProctorHost = "PROCTOR_HOST"
EmailId = "EMAIL_ID"
AccessToken = "ACCESS_TOKEN"
ConnectionTimeoutSecs = "CONNECTION_TIMEOUT_SECS"
ProcExecutionStatusPollCount = "PROC_EXECUTION_STATUS_POLL_COUNT"
)

type ProctorConfig struct {
Host string
Email string
AccessToken string
ConnectionTimeoutSecs time.Duration
Host string
Email string
AccessToken string
ConnectionTimeoutSecs time.Duration
ProcExecutionStatusPollCount int
}

type ConfigError struct {
Expand All @@ -47,6 +49,7 @@ func NewLoader() Loader {

func (loader *loader) Load() (ProctorConfig, ConfigError) {
viper.SetDefault(ConnectionTimeoutSecs, 10)
viper.SetDefault(ProcExecutionStatusPollCount, 30)
viper.AutomaticEnv()

viper.AddConfigPath(ConfigFileDir())
Expand Down Expand Up @@ -75,7 +78,9 @@ func (loader *loader) Load() (ProctorConfig, ConfigError) {
emailId := viper.GetString(EmailId)
accessToken := viper.GetString(AccessToken)
connectionTimeout := time.Duration(viper.GetInt(ConnectionTimeoutSecs)) * time.Second
return ProctorConfig{Host: proctorHost, Email: emailId, AccessToken: accessToken, ConnectionTimeoutSecs: connectionTimeout}, ConfigError{}
procExecutionStatusPollCount := viper.GetInt(ProcExecutionStatusPollCount)

return ProctorConfig{Host: proctorHost, Email: emailId, AccessToken: accessToken, ConnectionTimeoutSecs: connectionTimeout, ProcExecutionStatusPollCount: procExecutionStatusPollCount}, ConfigError{}
}

// Returns Config file directory
Expand Down
9 changes: 6 additions & 3 deletions config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ func (s *ConfigTestSuite) TearDownTest() {
os.Unsetenv(EmailId)
os.Unsetenv(AccessToken)
os.Unsetenv(ConnectionTimeoutSecs)
os.Unsetenv(ProcExecutionStatusPollCount)
os.Remove(s.configFilePath)
}

Expand All @@ -64,23 +65,24 @@ func (s *ConfigTestSuite) TestLoadConfigsFromEnvironmentVariables() {
os.Setenv(EmailId, email)
os.Setenv(AccessToken, accessToken)
os.Setenv(ConnectionTimeoutSecs, "20")
os.Setenv(ProcExecutionStatusPollCount, "10")
s.createProctorConfigFile("")

proctorConfig, err := s.configLoader.Load()

assert.Empty(t, err)
assert.Equal(t, ProctorConfig{Host: proctorHost, Email: email, AccessToken: accessToken, ConnectionTimeoutSecs: time.Duration(20 * time.Second)}, proctorConfig)
assert.Equal(t, ProctorConfig{Host: proctorHost, Email: email, AccessToken: accessToken, ConnectionTimeoutSecs: time.Duration(20 * time.Second), ProcExecutionStatusPollCount: 10}, proctorConfig)
}

func (s *ConfigTestSuite) TestLoadConfigFromFile() {
t := s.T()

s.createProctorConfigFile("PROCTOR_HOST: file.example.com\nEMAIL_ID: file@example.com\nACCESS_TOKEN: file-token\nCONNECTION_TIMEOUT_SECS: 30")
s.createProctorConfigFile("PROCTOR_HOST: file.example.com\nEMAIL_ID: file@example.com\nACCESS_TOKEN: file-token\nCONNECTION_TIMEOUT_SECS: 30\nPROC_EXECUTION_STATUS_POLL_COUNT: 15")

proctorConfig, err := s.configLoader.Load()

assert.Empty(t, err)
assert.Equal(t, ProctorConfig{Host: "file.example.com", Email: "file@example.com", AccessToken: "file-token", ConnectionTimeoutSecs: time.Duration(30 * time.Second)}, proctorConfig)
assert.Equal(t, ProctorConfig{Host: "file.example.com", Email: "file@example.com", AccessToken: "file-token", ConnectionTimeoutSecs: time.Duration(30 * time.Second), ProcExecutionStatusPollCount: 15}, proctorConfig)
}

func (s *ConfigTestSuite) TestCheckForMandatoryConfig() {
Expand All @@ -101,6 +103,7 @@ func (s *ConfigTestSuite) TestTakesDefaultValueForConfigs() {

assert.Empty(t, err)
assert.Equal(t, time.Duration(10*time.Second), proctorConfig.ConnectionTimeoutSecs)
assert.Equal(t, 30, proctorConfig.ProcExecutionStatusPollCount)
}

func (s *ConfigTestSuite) TestShouldPrintInstructionsForConfigFileIfFileNotFound() {
Expand Down
58 changes: 51 additions & 7 deletions daemon/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,16 +28,18 @@ type Client interface {
ListProcs() ([]proc.Metadata, error)
ExecuteProc(string, map[string]string) (string, error)
StreamProcLogs(string) error
GetDefinitiveProcExecutionStatus(string) (string, error)
}

type client struct {
printer io.Printer
proctorConfigLoader config.Loader
proctordHost string
emailId string
accessToken string
clientVersion string
connectionTimeoutSecs time.Duration
printer io.Printer
proctorConfigLoader config.Loader
proctordHost string
emailId string
accessToken string
clientVersion string
connectionTimeoutSecs time.Duration
procExecutionStatusPollCount int
}

type ProcToExecute struct {
Expand Down Expand Up @@ -65,6 +67,7 @@ func (c *client) loadProctorConfig() error {
c.emailId = proctorConfig.Email
c.accessToken = proctorConfig.AccessToken
c.connectionTimeoutSecs = proctorConfig.ConnectionTimeoutSecs
c.procExecutionStatusPollCount = proctorConfig.ProcExecutionStatusPollCount

return nil
}
Expand Down Expand Up @@ -200,6 +203,47 @@ func (c *client) StreamProcLogs(name string) error {
}
}

func (c *client) GetDefinitiveProcExecutionStatus(procName string) (string, error) {
err := c.loadProctorConfig()
if err != nil {
return "", err
}

for count := 0; count < c.procExecutionStatusPollCount; count += 1 {
httpClient := &http.Client{
Timeout: c.connectionTimeoutSecs,
}

req, err := http.NewRequest("GET", "http://"+c.proctordHost+"/jobs/execute/"+procName+"/status", nil)
req.Header.Add(utility.UserEmailHeaderKey, c.emailId)
req.Header.Add(utility.AccessTokenHeaderKey, c.accessToken)
req.Header.Add(utility.ClientVersionHeaderKey, c.clientVersion)

resp, err := httpClient.Do(req)
if err != nil {
return "", buildNetworkError(err)
}

if resp.StatusCode != http.StatusOK {
return "", buildHTTPError(c, resp)
}

body, err := ioutil.ReadAll(resp.Body)
defer resp.Body.Close()
if err != nil {
return "", err
}

procExecutionStatus := string(body)
if procExecutionStatus == utility.JobSucceeded || procExecutionStatus == utility.JobFailed {
return procExecutionStatus, nil
}

time.Sleep(time.Duration(count) * 100 * time.Millisecond)
}
return "", errors.New(fmt.Sprintf("No definitive status received for proc name %s from proctord", procName))
}

func buildNetworkError(err error) error {
if netError, ok := err.(net.Error); ok && netError.Timeout() {
return fmt.Errorf("%s\n%s\n%s", utility.GenericTimeoutErrorHeader, netError.Error(), utility.GenericTimeoutErrorBody)
Expand Down
5 changes: 5 additions & 0 deletions daemon/client_mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,8 @@ func (m *MockClient) StreamProcLogs(name string) error {
args := m.Called(name)
return args.Error(0)
}

func (m *MockClient) GetDefinitiveProcExecutionStatus(name string) (string, error) {
args := m.Called(name)
return args.Get(0).(string), args.Error(1)
}
Loading

0 comments on commit ca6947b

Please sign in to comment.