Skip to content

Commit

Permalink
[Tasdik] Update proc execution status to FAILED on receiving relevant…
Browse files Browse the repository at this point in the history
… message

- Add updated at column in database
- Close channel when watching for job events

Closes #58
  • Loading branch information
olttwa committed Dec 18, 2018
1 parent 842e624 commit 69623b2
Show file tree
Hide file tree
Showing 9 changed files with 117 additions and 39 deletions.
1 change: 1 addition & 0 deletions proctord/jobs/execution/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ func (executioner *executioner) Status() http.HandlerFunc {
fmt.Fprintf(w, jobExecutionStatus)
}
}

func (executioner *executioner) Handle() http.HandlerFunc {
return func(w http.ResponseWriter, req *http.Request) {
ctx := req.Context()
Expand Down
8 changes: 6 additions & 2 deletions proctord/kubernetes/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,20 +220,24 @@ func (client *client) JobExecutionStatus(JobNameSubmittedForExecution string) (s
}

resultChan := watchJob.ResultChan()
defer watchJob.Stop()
var event watch.Event
var jobEvent *batch_v1.Job

for event = range resultChan {
if event.Type == watch.Error {
return utility.JobFailed, nil
return utility.JobExecutionStatusFetchError, nil
}

jobEvent = event.Object.(*batch_v1.Job)
if jobEvent.Status.Succeeded >= int32(1) {
return utility.JobSucceeded, nil
} else if jobEvent.Status.Failed >= int32(1) {
return utility.JobFailed, nil
}
}

return utility.JobFailed, nil
return utility.NoDefinitiveJobExecutionStatusFound, nil
}

func (client *client) getLogsStreamReaderFor(podName string) (io.ReadCloser, error) {
Expand Down
60 changes: 49 additions & 11 deletions proctord/kubernetes/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"time"

"github.com/gojektech/proctor/proctord/config"
"github.com/gojektech/proctor/proctord/utility"
"github.com/jarcoal/httpmock"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/suite"
Expand Down Expand Up @@ -148,18 +149,24 @@ func (suite *ClientTestSuite) TestShouldReturnSuccessJobExecutionStatus() {
watcher := watch.NewFake()
suite.fakeClientSet.PrependWatchReactor("jobs", testing_kubernetes.DefaultWatchReactor(watcher, nil))

var testJob batchV1.Job
var activeJob batchV1.Job
var succeededJob batchV1.Job
uniqueJobName := "proctor-job-2"
label := jobLabel(uniqueJobName)
objectMeta := meta_v1.ObjectMeta{
Name: uniqueJobName,
Labels: label,
}
testJob.ObjectMeta = objectMeta
activeJob.ObjectMeta = objectMeta
succeededJob.ObjectMeta = objectMeta

go func() {
testJob.Status.Succeeded = 1
watcher.Modify(&testJob)
activeJob.Status.Active = 1
watcher.Modify(&activeJob)

succeededJob.Status.Active = 0
succeededJob.Status.Succeeded = 1
watcher.Modify(&succeededJob)

time.Sleep(time.Second * 1)
watcher.Stop()
Expand All @@ -168,7 +175,7 @@ func (suite *ClientTestSuite) TestShouldReturnSuccessJobExecutionStatus() {
jobExecutionStatus, err := suite.testClient.JobExecutionStatus(uniqueJobName)
assert.NoError(t, err)

assert.Equal(t, "SUCCEEDED", jobExecutionStatus, "Should return true for job success")
assert.Equal(t, utility.JobSucceeded, jobExecutionStatus, "Should return SUCCEEDED")
}

func (suite *ClientTestSuite) TestShouldReturnFailedJobExecutionStatus() {
Expand All @@ -177,6 +184,40 @@ func (suite *ClientTestSuite) TestShouldReturnFailedJobExecutionStatus() {
watcher := watch.NewFake()
suite.fakeClientSet.PrependWatchReactor("jobs", testing_kubernetes.DefaultWatchReactor(watcher, nil))

var activeJob batchV1.Job
var failedJob batchV1.Job
uniqueJobName := "proctor-job-1"
label := jobLabel(uniqueJobName)
objectMeta := meta_v1.ObjectMeta{
Name: uniqueJobName,
Labels: label,
}
activeJob.ObjectMeta = objectMeta
failedJob.ObjectMeta = objectMeta

go func() {
activeJob.Status.Active = 1
watcher.Modify(&activeJob)
failedJob.Status.Active = 0
failedJob.Status.Failed = 1
watcher.Modify(&failedJob)

time.Sleep(time.Second * 1)
watcher.Stop()
}()

jobExecutionStatus, err := suite.testClient.JobExecutionStatus(uniqueJobName)
assert.NoError(t, err)

assert.Equal(t, utility.JobFailed, jobExecutionStatus, "Should return FAILED")
}

func (suite *ClientTestSuite) TestJobExecutionStatusForNonDefinitiveStatus() {
t := suite.T()

watcher := watch.NewFake()
suite.fakeClientSet.PrependWatchReactor("jobs", testing_kubernetes.DefaultWatchReactor(watcher, nil))

var testJob batchV1.Job
uniqueJobName := "proctor-job-1"
label := jobLabel(uniqueJobName)
Expand All @@ -189,9 +230,6 @@ func (suite *ClientTestSuite) TestShouldReturnFailedJobExecutionStatus() {
go func() {
testJob.Status.Active = 1
watcher.Modify(&testJob)
testJob.Status.Active = 0
testJob.Status.Failed = 1
watcher.Modify(&testJob)

time.Sleep(time.Second * 1)
watcher.Stop()
Expand All @@ -200,10 +238,10 @@ func (suite *ClientTestSuite) TestShouldReturnFailedJobExecutionStatus() {
jobExecutionStatus, err := suite.testClient.JobExecutionStatus(uniqueJobName)
assert.NoError(t, err)

assert.Equal(t, "FAILED", jobExecutionStatus, "Should return true for job success")
assert.Equal(t, utility.NoDefinitiveJobExecutionStatusFound, jobExecutionStatus, "Should return NO_DEFINITIVE_JOB_EXECUTION_STATUS_FOUND")
}

func (suite *ClientTestSuite) TestShouldReturnErrorJobExecutionStatus() {
func (suite *ClientTestSuite) TestShouldReturnJobExecutionStatusFetchError() {
t := suite.T()

watcher := watch.NewFake()
Expand All @@ -228,7 +266,7 @@ func (suite *ClientTestSuite) TestShouldReturnErrorJobExecutionStatus() {
jobExecutionStatus, err := suite.testClient.JobExecutionStatus(uniqueJobName)
assert.NoError(t, err)

assert.Equal(t, "FAILED", jobExecutionStatus, "Should return true for job success")
assert.Equal(t, utility.JobExecutionStatusFetchError, jobExecutionStatus, "Should return JOB_EXECUTION_STATUS_FETCH_ERROR")
}

func TestClientTestSuite(t *testing.T) {
Expand Down
1 change: 1 addition & 0 deletions proctord/migrations/6_AddColumnUpdatedAt.down.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
alter table jobs_execution_audit_log drop column if exists updated_at;
1 change: 1 addition & 0 deletions proctord/migrations/6_AddColumnUpdatedAt.up.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
alter table jobs_execution_audit_log add column updated_at timestamp default now();
7 changes: 6 additions & 1 deletion proctord/storage/postgres/schema.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package postgres

import "database/sql"
import (
"database/sql"
"time"
)

type JobsExecutionAuditLog struct {
JobName string `db:"job_name"`
Expand All @@ -10,4 +13,6 @@ type JobsExecutionAuditLog struct {
JobArgs string `db:"job_args"`
JobSubmissionStatus string `db:"job_submission_status"`
JobExecutionStatus string `db:"job_execution_status"`
CreatedAt time.Time `db:"created_at"`
UpdatedAt time.Time `db:"updated_at"`
}
12 changes: 8 additions & 4 deletions proctord/storage/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bytes"
"encoding/base64"
"encoding/gob"
"time"

"github.com/gojektech/proctor/proctord/storage/postgres"
)
Expand Down Expand Up @@ -40,17 +41,20 @@ func (store *store) JobsExecutionAuditLog(jobSubmissionStatus, jobExecutionStatu
JobArgs: base64.StdEncoding.EncodeToString(encodedJobArgs.Bytes()),
JobSubmissionStatus: jobSubmissionStatus,
JobExecutionStatus: jobExecutionStatus,
CreatedAt: time.Now(),
UpdatedAt: time.Now(),
}
return store.postgresClient.NamedExec("INSERT INTO jobs_execution_audit_log (job_name, user_email, image_name, job_name_submitted_for_execution, job_args, job_submission_status, job_execution_status) VALUES (:job_name, :user_email, :image_name, :job_name_submitted_for_execution, :job_args, :job_submission_status, :job_execution_status)", &jobsExecutionAuditLog)
return store.postgresClient.NamedExec("INSERT INTO jobs_execution_audit_log (job_name, user_email, image_name, job_name_submitted_for_execution, job_args, job_submission_status, job_execution_status, created_at, updated_at) VALUES (:job_name, :user_email, :image_name, :job_name_submitted_for_execution, :job_args, :job_submission_status, :job_execution_status, :created_at, :updated_at)", &jobsExecutionAuditLog)
}

func (store *store) UpdateJobsExecutionAuditLog(JobNameSubmittedForExecution, status string) error {
func (store *store) UpdateJobsExecutionAuditLog(JobNameSubmittedForExecution, jobExecutionStatus string) error {
jobsExecutionAuditLog := postgres.JobsExecutionAuditLog{
JobExecutionStatus: status,
JobExecutionStatus: jobExecutionStatus,
JobNameSubmittedForExecution: postgres.StringToSQLString(JobNameSubmittedForExecution),
UpdatedAt: time.Now(),
}

return store.postgresClient.NamedExec("UPDATE jobs_execution_audit_log SET job_execution_status = :job_execution_status where job_name_submitted_for_execution = :job_name_submitted_for_execution", &jobsExecutionAuditLog)
return store.postgresClient.NamedExec("UPDATE jobs_execution_audit_log SET job_execution_status = :job_execution_status, updated_at = :updated_at where job_name_submitted_for_execution = :job_name_submitted_for_execution", &jobsExecutionAuditLog)
}

func (store *store) GetJobExecutionStatus(JobNameSubmittedForExecution string) (string, error) {
Expand Down
62 changes: 42 additions & 20 deletions proctord/storage/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ func TestJobsExecutionAuditLog(t *testing.T) {
jobName := "any-job"
imageName := "any-image"
userEmail := "mrproctor@example.com"
JobNameSubmittedForExecution := "any-submission"
jobNameSubmittedForExecution := "any-submission"
jobArgs := map[string]string{"key": "value"}
jobSubmissionStatus := "any-status"
jobExecutionStatus := "any-execution-status"
Expand All @@ -30,23 +30,24 @@ func TestJobsExecutionAuditLog(t *testing.T) {
err := enc.Encode(jobArgs)
assert.NoError(t, err)

data := postgres.JobsExecutionAuditLog{
JobName: jobName,
UserEmail: userEmail,
ImageName: imageName,
JobNameSubmittedForExecution: postgres.StringToSQLString(JobNameSubmittedForExecution),
JobArgs: base64.StdEncoding.EncodeToString(encodedJobArgs.Bytes()),
JobSubmissionStatus: jobSubmissionStatus,
JobExecutionStatus: jobExecutionStatus,
}

mockPostgresClient.On("NamedExec",
"INSERT INTO jobs_execution_audit_log (job_name, user_email, image_name, job_name_submitted_for_execution, job_args, job_submission_status, job_execution_status) VALUES (:job_name, :user_email, :image_name, :job_name_submitted_for_execution, :job_args, :job_submission_status, :job_execution_status)",
&data).
"INSERT INTO jobs_execution_audit_log (job_name, user_email, image_name, job_name_submitted_for_execution, job_args, job_submission_status, job_execution_status, created_at, updated_at) VALUES (:job_name, :user_email, :image_name, :job_name_submitted_for_execution, :job_args, :job_submission_status, :job_execution_status, :created_at, :updated_at)",
mock.Anything).
Run(func(args mock.Arguments) {
data := args.Get(1).(*postgres.JobsExecutionAuditLog)

assert.Equal(t, jobName, data.JobName)
assert.Equal(t, userEmail, data.UserEmail)
assert.Equal(t, imageName, data.ImageName)
assert.Equal(t, postgres.StringToSQLString(jobNameSubmittedForExecution), data.JobNameSubmittedForExecution)
assert.Equal(t, base64.StdEncoding.EncodeToString(encodedJobArgs.Bytes()), data.JobArgs)
assert.Equal(t, jobSubmissionStatus, data.JobSubmissionStatus)
assert.Equal(t, jobExecutionStatus, data.JobExecutionStatus)
}).
Return(nil).
Once()

err = testStore.JobsExecutionAuditLog(jobSubmissionStatus, jobExecutionStatus, jobName, userEmail, JobNameSubmittedForExecution, imageName, jobArgs)
err = testStore.JobsExecutionAuditLog(jobSubmissionStatus, jobExecutionStatus, jobName, userEmail, jobNameSubmittedForExecution, imageName, jobArgs)

assert.NoError(t, err)
mockPostgresClient.AssertExpectations(t)
Expand All @@ -61,13 +62,9 @@ func TestJobsExecutionAuditLogPostgresClientFailure(t *testing.T) {
err := enc.Encode(map[string]string{})
assert.NoError(t, err)

data := postgres.JobsExecutionAuditLog{
JobArgs: base64.StdEncoding.EncodeToString(encodedJobArgs.Bytes()),
}

mockPostgresClient.On("NamedExec",
"INSERT INTO jobs_execution_audit_log (job_name, user_email, image_name, job_name_submitted_for_execution, job_args, job_submission_status, job_execution_status) VALUES (:job_name, :user_email, :image_name, :job_name_submitted_for_execution, :job_args, :job_submission_status, :job_execution_status)",
&data).
"INSERT INTO jobs_execution_audit_log (job_name, user_email, image_name, job_name_submitted_for_execution, job_args, job_submission_status, job_execution_status, created_at, updated_at) VALUES (:job_name, :user_email, :image_name, :job_name_submitted_for_execution, :job_args, :job_submission_status, :job_execution_status, :created_at, :updated_at)",
mock.Anything).
Return(errors.New("error")).
Once()

Expand All @@ -77,6 +74,31 @@ func TestJobsExecutionAuditLogPostgresClientFailure(t *testing.T) {
mockPostgresClient.AssertExpectations(t)
}

func TestUpdateJobsExecutionAuditLog(t *testing.T) {
mockPostgresClient := &postgres.ClientMock{}
testStore := New(mockPostgresClient)

jobNameSubmittedForExecution := "any-submission"
jobExecutionStatus := "updated-status"

mockPostgresClient.On("NamedExec",
"UPDATE jobs_execution_audit_log SET job_execution_status = :job_execution_status, updated_at = :updated_at where job_name_submitted_for_execution = :job_name_submitted_for_execution",
mock.Anything).
Run(func(args mock.Arguments) {
data := args.Get(1).(*postgres.JobsExecutionAuditLog)

assert.Equal(t, postgres.StringToSQLString(jobNameSubmittedForExecution), data.JobNameSubmittedForExecution)
assert.Equal(t, jobExecutionStatus, data.JobExecutionStatus)
}).
Return(nil).
Once()

err := testStore.UpdateJobsExecutionAuditLog(jobNameSubmittedForExecution, jobExecutionStatus)

assert.NoError(t, err)
mockPostgresClient.AssertExpectations(t)
}

func TestGetJobsStatusWhenJobIsPresent(t *testing.T) {
mockPostgresClient := &postgres.ClientMock{}
testStore := New(mockPostgresClient)
Expand Down
4 changes: 3 additions & 1 deletion proctord/utility/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ const JobSubmissionServerError = "server_error"
const JobSucceeded = "SUCCEEDED"
const JobFailed = "FAILED"
const JobWaiting = "WAITING"
const JobExecutionStatusFetchError = "JOB_EXECUTION_STATUS_FETCH_ERROR"
const NoDefinitiveJobExecutionStatusFound = "NO_DEFINITIVE_JOB_EXECUTION_STATUS_FOUND"

const JobNameContextKey = "job_name"
const UserEmailContextKey = "user_email"
Expand All @@ -35,7 +37,7 @@ const JobSubmissionStatusContextKey = "job_sumission_status"

const UserEmailHeaderKey = "Email-Id"
const AccessTokenHeaderKey = "Access-Token"
const ClientVersionHeaderKey = "Client-Version"
const ClientVersionHeaderKey = "Client-Version"

func MergeMaps(mapOne, mapTwo map[string]string) map[string]string {
result := make(map[string]string)
Expand Down

0 comments on commit 69623b2

Please sign in to comment.