Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: make job deployment asynchronous #296

Merged
merged 32 commits into from
May 10, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
54eff27
feat: add deploy manager
arinda-arif Apr 11, 2022
3bf8033
feat: add job deployment repository
arinda-arif Apr 13, 2022
c796260
feat: add job_deployment_repo test
arinda-arif Apr 13, 2022
adaddd5
feat: modify job refresh to async
arinda-arif Apr 14, 2022
c317f10
refactor: use status in GetDeployJobsStatusResponse and simplify refr…
arinda-arif Apr 18, 2022
76b0ddf
Merge branch 'main' into refresh-async
arinda-arif Apr 18, 2022
a2b8386
fix: rebase proto and fix lint issue
arinda-arif Apr 18, 2022
83d4ed5
test: add unit tests on job spec handler and airflow2 DeployJobsVerbose
arinda-arif Apr 18, 2022
d2c3aff
test: add unit tests in deploy_manager, airflow2, and job service
arinda-arif Apr 19, 2022
24c7329
test: fix deploy function call assert that is run asynchronously
arinda-arif Apr 19, 2022
3ec91e1
Merge branch 'main' into refresh-async
arinda-arif Apr 20, 2022
7ed89b4
refactor: add failureCount to GetDeploymentStatusResponse and refacto…
arinda-arif Apr 20, 2022
04af1bd
refactor: remove DeployManagerConfig and reuse config.Deployer
arinda-arif Apr 20, 2022
d280e0c
refactor: use deploy Assign to assign request to deployers
arinda-arif Apr 22, 2022
2fbc374
fix: tidy up to fix lint
arinda-arif Apr 22, 2022
b405337
fix: change uuid import, reword test case title, add default value to…
arinda-arif Apr 22, 2022
e39b560
fix: add return if status cancelled found when polling job deployment
arinda-arif Apr 22, 2022
4e7c8b7
refactor: rename JobDeploymentRepository UpdateByID to Update
arinda-arif Apr 22, 2022
139dda0
refactor: use waitgroup when initializing deployers
arinda-arif Apr 22, 2022
f0797fb
fix: add lock to GetFirstExecutableRequest
arinda-arif Apr 22, 2022
852dfb6
fix: make updated_at in job_deployment table as not null
arinda-arif Apr 22, 2022
5a45d20
refactor: add more logs in deployer and deploy_manager
arinda-arif Apr 22, 2022
4c2164c
fix: add missing log on constructor and fix lint
arinda-arif Apr 23, 2022
296d34f
fix: change namespace var in job refresh cli, fix color in log, fix a…
arinda-arif Apr 25, 2022
1e9c080
Merge branch 'main' into refresh-async
arinda-arif Apr 26, 2022
f677381
fix: add missing parameter in scheduler prime due to rebase and fix lint
arinda-arif Apr 26, 2022
6fd0383
fix: add deployer ID and project name in deploy_manager and deployer …
arinda-arif Apr 26, 2022
797aee4
fix: remove unused waitgroup in deploy_manager and use enum on job de…
arinda-arif May 9, 2022
034ab88
fix: convert job deployment status to string before being used in Get…
arinda-arif May 9, 2022
ccf7d6a
fix: revert waitgroup and change job deployment status from enum to c…
arinda-arif May 9, 2022
be858b4
Merge branch 'main' into refresh-async
arinda-arif May 9, 2022
c9a796d
fix: bump job deployment table migration file index
arinda-arif May 9, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ NAME = "github.com/odpf/optimus"
LAST_COMMIT := $(shell git rev-parse --short HEAD)
LAST_TAG := "$(shell git rev-list --tags --max-count=1)"
OPMS_VERSION := "$(shell git describe --tags ${LAST_TAG})-next"
PROTON_COMMIT := "da1f6413e587ac1b13ad3429a133526514e2f5fe"
PROTON_COMMIT := "4cd69522575a01a496bc5babb95d42f38cc8c2cd"

.PHONY: build test test-ci generate-proto unit-test-ci smoke-test integration-test vet coverage clean install lint

Expand Down
40 changes: 39 additions & 1 deletion api/handler/v1beta1/job_spec.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"sync"
"time"

"github.com/google/uuid"
"github.com/odpf/salt/log"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
Expand Down Expand Up @@ -298,14 +299,51 @@ func (sv *JobSpecServiceServer) RefreshJobs(req *pb.RefreshJobsRequest, respStre
mu: new(sync.Mutex),
})

if err := sv.jobSvc.Refresh(respStream.Context(), req.ProjectName, req.NamespaceNames, req.JobNames, observers); err != nil {
err := sv.jobSvc.Refresh(respStream.Context(), req.ProjectName, req.NamespaceNames, req.JobNames, observers)
if err != nil {
return status.Errorf(codes.Internal, "failed to refresh jobs: \n%s", err.Error())
}

sv.l.Info("finished job refresh", "time", time.Since(startTime))
return nil
}

func (sv *JobSpecServiceServer) GetDeployJobsStatus(ctx context.Context, req *pb.GetDeployJobsStatusRequest) (*pb.GetDeployJobsStatusResponse, error) {
deployID, err := uuid.Parse(req.DeployId)
if err != nil {
return nil, err
}

jobDeployment, err := sv.jobSvc.GetDeployment(ctx, models.DeploymentID(deployID))
if err != nil {
return nil, status.Errorf(codes.Internal, "failed to get job deployment: \n%s", err.Error())
}

switch jobDeployment.Status {
case models.JobDeploymentStatusSucceed:
return &pb.GetDeployJobsStatusResponse{
Status: jobDeployment.Status.String(),
SuccessCount: int32(jobDeployment.Details.SuccessCount),
}, nil
case models.JobDeploymentStatusFailed:
var deployJobFailures []*pb.DeployJobFailure
for _, failure := range jobDeployment.Details.Failures {
deployJobFailures = append(deployJobFailures, &pb.DeployJobFailure{JobName: failure.JobName, Message: failure.Message})
}

return &pb.GetDeployJobsStatusResponse{
arinda-arif marked this conversation as resolved.
Show resolved Hide resolved
Status: jobDeployment.Status.String(),
SuccessCount: int32(jobDeployment.Details.SuccessCount),
FailureCount: int32(jobDeployment.Details.FailureCount),
Failures: deployJobFailures,
}, nil
default:
arinda-arif marked this conversation as resolved.
Show resolved Hide resolved
return &pb.GetDeployJobsStatusResponse{
Status: jobDeployment.Status.String(),
}, nil
}
}

func NewJobSpecServiceServer(l log.Logger, jobService models.JobService, adapter ProtoAdapter,
projectService service.ProjectService, namespaceService service.NamespaceService, progressObserver progress.Observer) *JobSpecServiceServer {
return &JobSpecServiceServer{
Expand Down
145 changes: 145 additions & 0 deletions api/handler/v1beta1/job_spec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -603,4 +603,149 @@ func TestJobSpecificationOnServer(t *testing.T) {
assert.Contains(t, err.Error(), errorMsg)
})
})

t.Run("GetDeployJobsStatus", func(t *testing.T) {
projectName := "a-data-project"
projectSpec := models.ProjectSpec{
ID: models.ProjectID(uuid.New()),
Name: projectName,
Config: map[string]string{
"bucket": "gs://some_folder",
},
}
t.Run("should get on progress job deployment successfully", func(t *testing.T) {
jobService := new(mock.JobService)
defer jobService.AssertExpectations(t)

jobSpecServiceServer := v1.NewJobSpecServiceServer(
log,
jobService,
nil,
nil,
nil,
nil,
)
deployID := uuid.New()
jobDeployment := models.JobDeployment{
ID: models.DeploymentID(deployID),
Project: projectSpec,
Status: models.JobDeploymentStatusInProgress,
}
getDeployJobsStatusResponse := &pb.GetDeployJobsStatusResponse{
Status: jobDeployment.Status.String(),
SuccessCount: 0,
}

jobService.On("GetDeployment", ctx, models.DeploymentID(deployID)).Return(jobDeployment, nil)

getDeployJobsStatusRequest := &pb.GetDeployJobsStatusRequest{DeployId: deployID.String()}
actual, err := jobSpecServiceServer.GetDeployJobsStatus(ctx, getDeployJobsStatusRequest)

assert.Nil(t, err)
assert.Equal(t, getDeployJobsStatusResponse, actual)
})
t.Run("should get succeeded job deployment successfully", func(t *testing.T) {
jobService := new(mock.JobService)
defer jobService.AssertExpectations(t)

jobSpecServiceServer := v1.NewJobSpecServiceServer(
log,
jobService,
nil,
nil,
nil,
nil,
)
deployID := uuid.New()
jobDeployment := models.JobDeployment{
ID: models.DeploymentID(deployID),
Project: projectSpec,
Status: models.JobDeploymentStatusSucceed,
Details: models.JobDeploymentDetail{
SuccessCount: 5,
},
}
getDeployJobsStatusResponse := &pb.GetDeployJobsStatusResponse{
Status: jobDeployment.Status.String(),
SuccessCount: int32(jobDeployment.Details.SuccessCount),
}

jobService.On("GetDeployment", ctx, models.DeploymentID(deployID)).Return(jobDeployment, nil)

getDeployJobsStatusRequest := &pb.GetDeployJobsStatusRequest{DeployId: deployID.String()}
actual, err := jobSpecServiceServer.GetDeployJobsStatus(ctx, getDeployJobsStatusRequest)

assert.Nil(t, err)
assert.Equal(t, getDeployJobsStatusResponse, actual)
})
t.Run("should get failed job deployment successfully", func(t *testing.T) {
jobService := new(mock.JobService)
defer jobService.AssertExpectations(t)

jobSpecServiceServer := v1.NewJobSpecServiceServer(
log,
jobService,
nil,
nil,
nil,
nil,
)
deployID := uuid.New()
jobDeployment := models.JobDeployment{
ID: models.DeploymentID(deployID),
Project: projectSpec,
Status: models.JobDeploymentStatusFailed,
Details: models.JobDeploymentDetail{
SuccessCount: 4,
Failures: []models.JobDeploymentFailure{
{
JobName: "job-a",
Message: "internal error",
},
},
},
}
getDeployJobsStatusResponse := &pb.GetDeployJobsStatusResponse{
Status: jobDeployment.Status.String(),
SuccessCount: int32(jobDeployment.Details.SuccessCount),
Failures: []*pb.DeployJobFailure{
{
JobName: jobDeployment.Details.Failures[0].JobName,
Message: jobDeployment.Details.Failures[0].Message,
},
},
}

jobService.On("GetDeployment", ctx, models.DeploymentID(deployID)).Return(jobDeployment, nil)

getDeployJobsStatusRequest := &pb.GetDeployJobsStatusRequest{DeployId: deployID.String()}
actual, err := jobSpecServiceServer.GetDeployJobsStatus(ctx, getDeployJobsStatusRequest)

assert.Nil(t, err)
assert.Equal(t, getDeployJobsStatusResponse, actual)
})
t.Run("should failed when unable to get job deployment", func(t *testing.T) {
jobService := new(mock.JobService)
defer jobService.AssertExpectations(t)

jobSpecServiceServer := v1.NewJobSpecServiceServer(
log,
jobService,
nil,
nil,
nil,
nil,
)
deployID := uuid.New()

errorMsg := "internal error"
jobService.On("GetDeployment", ctx, models.DeploymentID(deployID)).Return(models.JobDeployment{}, errors.New(errorMsg))

getDeployJobsStatusRequest := &pb.GetDeployJobsStatusRequest{DeployId: deployID.String()}
actual, err := jobSpecServiceServer.GetDeployJobsStatus(ctx, getDeployJobsStatusRequest)

assert.Nil(t, actual)
assert.Contains(t, err.Error(), errorMsg)
})
})
}
17 changes: 13 additions & 4 deletions api/handler/v1beta1/observer.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ func (obs *jobRefreshObserver) Notify(e progress.Event) {
}
if evt.Err != nil {
resp.Success = false
resp.Message = evt.Err.Error()
resp.Value = evt.Err.Error()
}

if err := obs.stream.Send(resp); err != nil {
Expand All @@ -144,7 +144,7 @@ func (obs *jobRefreshObserver) Notify(e progress.Event) {
case *models.ProgressJobSpecUnknownDependencyUsed:
resp := &pb.RefreshJobsResponse{
JobName: evt.Job,
Message: evt.String(),
Value: evt.String(),
Success: false,
Type: evt.Type(),
}
Expand All @@ -154,16 +154,25 @@ func (obs *jobRefreshObserver) Notify(e progress.Event) {
case *models.ProgressJobDependencyResolution:
resp := &pb.RefreshJobsResponse{
JobName: evt.Job,
Message: evt.String(),
Value: evt.String(),
Success: true,
Type: evt.Type(),
}
if evt.Err != nil {
resp.Success = false
resp.Message = evt.Err.Error()
resp.Value = evt.Err.Error()
}
if err := obs.stream.Send(resp); err != nil {
obs.log.Error("failed to send failed dependency resolution notification", "evt", evt.String(), "error", err)
}
case *models.ProgressJobDeploymentRequestCreated:
resp := &pb.RefreshJobsResponse{
Value: evt.ID().UUID().String(),
Success: true,
Type: evt.Type(),
}
if err := obs.stream.Send(resp); err != nil {
obs.log.Error("failed to send job deployment request created", "evt", evt.String(), "error", err)
}
}
}
Loading