Skip to content

Commit

Permalink
fix: add replay endpoint
Browse files Browse the repository at this point in the history
  • Loading branch information
mauliksoneji committed Jun 22, 2021
1 parent 138e4f0 commit 31b9b43
Show file tree
Hide file tree
Showing 20 changed files with 856 additions and 443 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ pack-files:
generate-proto: ## regenerate protos
@echo " > cloning protobuf from odpf/proton"
@rm -rf proton/
@git -c advice.detachedHead=false clone https://github.com/odpf/proton --depth 1 --quiet --branch main
@git -c advice.detachedHead=false clone https://github.com/odpf/proton --depth 1 --quiet --branch DBTCH-1024
@echo " > generating protobuf"
@echo " > info: make sure correct version of dependencies are installed using 'install'"
@buf generate
Expand Down
58 changes: 44 additions & 14 deletions api/handler/v1/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -731,7 +731,44 @@ func (sv *RuntimeServiceServer) ListResourceSpecification(ctx context.Context, r
}, nil
}

func (sv *RuntimeServiceServer) ReplayDryRun(ctx context.Context, req *pb.ReplayDryRunRequest) (*pb.ReplayDryRunResponse, error) {
func (sv *RuntimeServiceServer) ReplayDryRun(ctx context.Context, req *pb.ReplayRequest) (*pb.ReplayDryRunResponse, error) {
replayRequestInput, err := sv.parseReplayRequest(req)
if err != nil {
return nil, err
}

rootNode, err := sv.jobSvc.ReplayDryRun(replayRequestInput)
if err != nil {
return nil, status.Error(codes.Internal, fmt.Sprintf("error while processing replay: %v", err))
}

node, err := sv.adapter.ToReplayExecutionTreeNode(rootNode)
if err != nil {
return nil, status.Error(codes.Internal, fmt.Sprintf("error while processing replay: %v", err))
}
return &pb.ReplayDryRunResponse{
Success: true,
Response: node,
}, nil
}

func (sv *RuntimeServiceServer) Replay(ctx context.Context, req *pb.ReplayRequest) (*pb.ReplayResponse, error) {
replayRequestInput, err := sv.parseReplayRequest(req)
if err != nil {
return nil, err
}

replayUUID, err := sv.jobSvc.Replay(replayRequestInput)
if err != nil {
return nil, status.Error(codes.Internal, fmt.Sprintf("error while processing replay: %v", err))
}

return &pb.ReplayResponse{
Id: replayUUID,
}, nil
}

func (sv *RuntimeServiceServer) parseReplayRequest(req *pb.ReplayRequest) (*models.ReplayRequestInput, error) {
projectRepo := sv.projectRepoFactory.New()
projSpec, err := projectRepo.GetByName(req.GetProjectName())
if err != nil {
Expand Down Expand Up @@ -764,20 +801,13 @@ func (sv *RuntimeServiceServer) ReplayDryRun(ctx context.Context, req *pb.Replay
if endDate.Before(startDate) {
return nil, status.Error(codes.InvalidArgument, fmt.Sprintf("replay end date cannot be before start date"))
}

rootNode, err := sv.jobSvc.ReplayDryRun(namespaceSpec, jobSpec, startDate, endDate)
if err != nil {
return nil, status.Error(codes.Internal, fmt.Sprintf("error while processing replay: %v", err))
replayRequest := models.ReplayRequestInput{
Job: jobSpec,
Start: startDate,
End: endDate,
Project: projSpec,
}

node, err := sv.adapter.ToReplayExecutionTreeNode(rootNode)
if err != nil {
return nil, status.Error(codes.Internal, fmt.Sprintf("error while processing replay: %v", err))
}
return &pb.ReplayDryRunResponse{
Success: true,
Response: node,
}, nil
return &replayRequest, nil
}

func NewRuntimeServiceServer(
Expand Down
10 changes: 8 additions & 2 deletions api/handler/v1/runtime_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1479,11 +1479,17 @@ func TestRuntimeServiceServer(t *testing.T) {
},
}),
}
replayRequestInput := &models.ReplayRequestInput{
Job: jobSpec,
Start: startDate,
End: endDate,
Project: projectSpec,
}
dagNode := tree.NewTreeNode(jobSpec)

jobService := new(mock.JobService)
jobService.On("GetByName", jobName, namespaceSpec).Return(jobSpec, nil)
jobService.On("ReplayDryRun", namespaceSpec, jobSpec, startDate, endDate).Return(dagNode, nil)
jobService.On("ReplayDryRun", replayRequestInput).Return(dagNode, nil)
defer jobService.AssertExpectations(t)

projectRepository := new(mock.ProjectRepository)
Expand Down Expand Up @@ -1514,7 +1520,7 @@ func TestRuntimeServiceServer(t *testing.T) {
nil,
nil,
)
replayRequest := pb.ReplayDryRunRequest{
replayRequest := pb.ReplayRequest{
ProjectName: projectName,
Namespace: namespaceSpec.Name,
JobName: jobName,
Expand Down
754 changes: 413 additions & 341 deletions api/proto/odpf/optimus/runtime_service.pb.go

Large diffs are not rendered by default.

141 changes: 139 additions & 2 deletions api/proto/odpf/optimus/runtime_service.pb.gw.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 31b9b43

Please sign in to comment.