Skip to content

Commit

Permalink
[jenson] Schedule procs for execution from proctord
Browse files Browse the repository at this point in the history
  • Loading branch information
olttwa committed Jan 9, 2019
1 parent 857a98d commit aa25c33
Show file tree
Hide file tree
Showing 11 changed files with 336 additions and 1 deletion.
85 changes: 85 additions & 0 deletions proctord/jobs/schedule/handler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
package schedule

import (
"encoding/json"
"net/http"

"github.com/gojektech/proctor/proctord/jobs/metadata"
"github.com/gojektech/proctor/proctord/logger"
"github.com/gojektech/proctor/proctord/storage"
"github.com/gojektech/proctor/proctord/utility"
)

type scheduler struct {
store storage.Store
metadataStore metadata.Store
}

type Scheduler interface {
Schedule() http.HandlerFunc
}

func NewScheduler(store storage.Store, metadataStore metadata.Store) Scheduler {
return &scheduler{
metadataStore: metadataStore,
store: store,
}
}

func (scheduler *scheduler) Schedule() http.HandlerFunc {
return func(w http.ResponseWriter, req *http.Request) {
var scheduledJob ScheduledJob
err := json.NewDecoder(req.Body).Decode(&scheduledJob)
userEmail := req.Header.Get(utility.UserEmailHeaderKey)
defer req.Body.Close()
if err != nil {
logger.Error("Error parsing request body for scheduling jobs: ", err.Error())

w.WriteHeader(http.StatusBadRequest)
w.Write([]byte(utility.ClientError))

return
}

_, err = scheduler.metadataStore.GetJobMetadata(scheduledJob.Name)
if err != nil {
if err.Error() == "redigo: nil returned" {
logger.Error("Client provided non existent proc name: ", scheduledJob.Name)

w.WriteHeader(http.StatusNotFound)
w.Write([]byte(utility.NonExistentProcClientError))
} else {
logger.Error("Error fetching metadata for proc", err.Error())

w.WriteHeader(http.StatusInternalServerError)
w.Write([]byte(utility.ServerError))
}

return
}

scheduledJob.ID, err = scheduler.store.InsertScheduledJob(scheduledJob.Name, scheduledJob.Tags, scheduledJob.Time, scheduledJob.NotificationEmails, userEmail, scheduledJob.Args)
if err != nil {
logger.Error("Error persisting scheduled job", err.Error())

w.WriteHeader(http.StatusInternalServerError)
w.Write([]byte(utility.ServerError))

return
}

responseBody, err := json.Marshal(scheduledJob)
if err != nil {
logger.Error("Error marshaling response body", err.Error())

w.WriteHeader(http.StatusInternalServerError)
w.Write([]byte(utility.ServerError))

return
}

w.WriteHeader(http.StatusCreated)
w.Write(responseBody)
return
}
}
145 changes: 145 additions & 0 deletions proctord/jobs/schedule/handler_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
package schedule

import (
"bytes"
"encoding/json"
"errors"
"io/ioutil"
"net/http"
"net/http/httptest"
"testing"

"github.com/gojektech/proctor/proctord/jobs/metadata"
"github.com/gojektech/proctor/proctord/storage"
"github.com/gojektech/proctor/proctord/utility"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/suite"
)

type SchedulerTestSuite struct {
suite.Suite
mockStore *storage.MockStore
mockMetadataStore *metadata.MockStore

testScheduler Scheduler
}

func (suite *SchedulerTestSuite) SetupTest() {
suite.mockMetadataStore = &metadata.MockStore{}
suite.mockStore = &storage.MockStore{}

suite.testScheduler = NewScheduler(suite.mockStore, suite.mockMetadataStore)
}

func (suite *SchedulerTestSuite) TestSuccessfulJobScheduling() {
t := suite.T()

userEmail := "mrproctor@example.com"
scheduledJob := ScheduledJob{
Name: "any-job",
Args: map[string]string{},
Time: "* 2 * * *",
NotificationEmails: "foo@bar.com,bar@foo.com",
Tags: "tag-one,tag-two",
}
requestBody, err := json.Marshal(scheduledJob)
assert.NoError(t, err)

responseRecorder := httptest.NewRecorder()
req := httptest.NewRequest("POST", "/schedule", bytes.NewReader(requestBody))
req.Header.Set(utility.UserEmailHeaderKey, userEmail)

suite.mockMetadataStore.On("GetJobMetadata", scheduledJob.Name).Return(&metadata.Metadata{}, nil)
insertedScheduledJobID := "123"
suite.mockStore.On("InsertScheduledJob", scheduledJob.Name, scheduledJob.Tags, scheduledJob.Time, scheduledJob.NotificationEmails, userEmail, scheduledJob.Args).Return(insertedScheduledJobID, nil)

suite.testScheduler.Schedule()(responseRecorder, req)

assert.Equal(t, http.StatusCreated, responseRecorder.Code)

expectedResponse := ScheduledJob{}
err = json.NewDecoder(responseRecorder.Body).Decode(&expectedResponse)
assert.NoError(t, err)
assert.Equal(t, insertedScheduledJobID, expectedResponse.ID)
}

func (suite *SchedulerTestSuite) TestBadRequestWhenRequestBodyIsIncorrectForJobScheduling() {
t := suite.T()

req := httptest.NewRequest("POST", "/schedule", bytes.NewBuffer([]byte("invalid json")))
responseRecorder := httptest.NewRecorder()

suite.testScheduler.Schedule()(responseRecorder, req)

assert.Equal(t, http.StatusBadRequest, responseRecorder.Code)
responseBody, _ := ioutil.ReadAll(responseRecorder.Body)
assert.Equal(t, utility.ClientError, string(responseBody))
}

func (suite *SchedulerTestSuite) TestNonExistentJobScheduling() {
t := suite.T()

scheduledJob := ScheduledJob{
Name: "non-existent",
}
requestBody, err := json.Marshal(scheduledJob)
assert.NoError(t, err)

responseRecorder := httptest.NewRecorder()
req := httptest.NewRequest("POST", "/schedule", bytes.NewReader(requestBody))

suite.mockMetadataStore.On("GetJobMetadata", scheduledJob.Name).Return(&metadata.Metadata{}, errors.New("redigo: nil returned"))

suite.testScheduler.Schedule()(responseRecorder, req)

assert.Equal(t, http.StatusNotFound, responseRecorder.Code)
responseBody, _ := ioutil.ReadAll(responseRecorder.Body)
assert.Equal(t, utility.NonExistentProcClientError, string(responseBody))
}

func (suite *SchedulerTestSuite) TestErrorFetchingJobMetadata() {
t := suite.T()

scheduledJob := ScheduledJob{
Name: "non-existent",
}
requestBody, err := json.Marshal(scheduledJob)
assert.NoError(t, err)

responseRecorder := httptest.NewRecorder()
req := httptest.NewRequest("POST", "/schedule", bytes.NewReader(requestBody))

suite.mockMetadataStore.On("GetJobMetadata", scheduledJob.Name).Return(&metadata.Metadata{}, errors.New("any error"))

suite.testScheduler.Schedule()(responseRecorder, req)

assert.Equal(t, http.StatusInternalServerError, responseRecorder.Code)
responseBody, _ := ioutil.ReadAll(responseRecorder.Body)
assert.Equal(t, utility.ServerError, string(responseBody))
}

func (suite *SchedulerTestSuite) TestErrorPersistingScheduledJob() {
t := suite.T()

scheduledJob := ScheduledJob{
Name: "non-existent",
}
requestBody, err := json.Marshal(scheduledJob)
assert.NoError(t, err)

responseRecorder := httptest.NewRecorder()
req := httptest.NewRequest("POST", "/schedule", bytes.NewReader(requestBody))

suite.mockMetadataStore.On("GetJobMetadata", scheduledJob.Name).Return(&metadata.Metadata{}, nil)
suite.mockStore.On("InsertScheduledJob", scheduledJob.Name, scheduledJob.Tags, scheduledJob.Time, scheduledJob.NotificationEmails, "", scheduledJob.Args).Return("", errors.New("any-error"))

suite.testScheduler.Schedule()(responseRecorder, req)

assert.Equal(t, http.StatusInternalServerError, responseRecorder.Code)
responseBody, _ := ioutil.ReadAll(responseRecorder.Body)
assert.Equal(t, utility.ServerError, string(responseBody))
}

func TestScheduleTestSuite(t *testing.T) {
suite.Run(t, new(SchedulerTestSuite))
}
10 changes: 10 additions & 0 deletions proctord/jobs/schedule/scheduled_job.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package schedule

type ScheduledJob struct {
ID string `json:"id"`
Name string `json:"name"`
Args map[string]string `json:"args"`
NotificationEmails string `json:"notification_emails"`
Time string `json:"time"`
Tags string `json:"tags"`
}
1 change: 1 addition & 0 deletions proctord/migrations/7_CreateJobsScheduleTable.down.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
DROP TABLE IF EXISTS jobs_schedule;
11 changes: 11 additions & 0 deletions proctord/migrations/7_CreateJobsScheduleTable.up.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
CREATE TABLE jobs_schedule (
id uuid not null primary key,
name text,
args text,
tags text,
notification_emails text,
time text,
user_email text,
created_at timestamp default now(),
updated_at timestamp default now()
);
7 changes: 6 additions & 1 deletion proctord/server/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,20 @@ package server

import (
"fmt"
"net/http"

"github.com/gojektech/proctor/proctord/audit"
http_client "github.com/gojektech/proctor/proctord/http"
"github.com/gojektech/proctor/proctord/jobs/execution"
"github.com/gojektech/proctor/proctord/jobs/logs"
"github.com/gojektech/proctor/proctord/jobs/metadata"
"github.com/gojektech/proctor/proctord/jobs/schedule"
"github.com/gojektech/proctor/proctord/jobs/secrets"
"github.com/gojektech/proctor/proctord/kubernetes"
"github.com/gojektech/proctor/proctord/middleware"
"github.com/gojektech/proctor/proctord/redis"
"github.com/gojektech/proctor/proctord/storage"
"github.com/gojektech/proctor/proctord/storage/postgres"
"net/http"

"github.com/gojektech/proctor/proctord/instrumentation"
"github.com/gorilla/mux"
Expand Down Expand Up @@ -44,6 +46,8 @@ func NewRouter() (*mux.Router, error) {
jobMetadataHandler := metadata.NewHandler(metadataStore)
jobSecretsHandler := secrets.NewHandler(secretsStore)

procScheduleHandler := schedule.NewScheduler(store, metadataStore)

router.HandleFunc("/ping", func(w http.ResponseWriter, req *http.Request) {
fmt.Fprintf(w, "pong")
})
Expand All @@ -54,6 +58,7 @@ func NewRouter() (*mux.Router, error) {
router.HandleFunc(instrumentation.Wrap("/jobs/metadata", middleware.ValidateClientVersion(jobMetadataHandler.HandleSubmission()))).Methods("POST")
router.HandleFunc(instrumentation.Wrap("/jobs/metadata", middleware.ValidateClientVersion(jobMetadataHandler.HandleBulkDisplay()))).Methods("GET")
router.HandleFunc(instrumentation.Wrap("/jobs/secrets", middleware.ValidateClientVersion(jobSecretsHandler.HandleSubmission()))).Methods("POST")
router.HandleFunc(instrumentation.Wrap("/jobs/schedule", middleware.ValidateClientVersion(procScheduleHandler.Schedule()))).Methods("POST")

return router, nil
}
12 changes: 12 additions & 0 deletions proctord/storage/postgres/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,15 @@ type JobsExecutionAuditLog struct {
CreatedAt time.Time `db:"created_at"`
UpdatedAt time.Time `db:"updated_at"`
}

type JobsSchedule struct {
ID string `db:"id"`
Name string `db:"name"`
Args string `db:"args"`
Tags string `db:"tags"`
Time string `db:"time"`
NotificationEmails string `db:"notification_emails"`
UserEmail string `db:"user_email"`
CreatedAt time.Time `db:"created_at"`
UpdatedAt time.Time `db:"updated_at"`
}
21 changes: 21 additions & 0 deletions proctord/storage/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,18 @@ import (
"bytes"
"encoding/base64"
"encoding/gob"
"encoding/json"
"time"

"github.com/gojektech/proctor/proctord/storage/postgres"
uuid "github.com/satori/go.uuid"
)

type Store interface {
JobsExecutionAuditLog(string, string, string, string, string, string, map[string]string) error
UpdateJobsExecutionAuditLog(string, string) error
GetJobExecutionStatus(string) (string, error)
InsertScheduledJob(string, string, string, string, string, map[string]string) (string, error)
}

type store struct {
Expand Down Expand Up @@ -70,3 +73,21 @@ func (store *store) GetJobExecutionStatus(JobNameSubmittedForExecution string) (

return jobsExecutionAuditLogResult[0].JobExecutionStatus, nil
}

func (store *store) InsertScheduledJob(name, tags, time, notificationEmails, userEmail string, args map[string]string) (string, error) {
jsonEncodedArgs, err := json.Marshal(args)
if err != nil {
return "", err
}

jobsSchedule := postgres.JobsSchedule{
ID: uuid.NewV4().String(),
Name: name,
Args: base64.StdEncoding.EncodeToString(jsonEncodedArgs),
Tags: tags,
Time: time,
NotificationEmails: notificationEmails,
UserEmail: userEmail,
}
return jobsSchedule.ID, store.postgresClient.NamedExec("INSERT INTO jobs_schedule (id, name, tags, time, notification_emails, user_email, args) VALUES (:id, :name, :tags, :time, :notification_emails, :user_email, :args)", &jobsSchedule)
}
5 changes: 5 additions & 0 deletions proctord/storage/store_mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,8 @@ func (m *MockStore) GetJobExecutionStatus(jobName string) (string, error) {
args := m.Called(jobName)
return args.String(0), args.Error(1)
}

func (m *MockStore) InsertScheduledJob(jobName, tags, time, notificationEmails, userEmail string, jobArgs map[string]string) (string, error) {
args := m.Called(jobName, tags, time, notificationEmails, userEmail, jobArgs)
return args.String(0), args.Error(1)
}
Loading

0 comments on commit aa25c33

Please sign in to comment.