Skip to content

Commit

Permalink
fix: check for testworkflow service
Browse files Browse the repository at this point in the history
Signed-off-by: Vladislav Sukhin <vladislav@kubeshop.io>
  • Loading branch information
vsukhin committed Nov 27, 2024
1 parent cf2dd76 commit fff63c0
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 8 deletions.
4 changes: 2 additions & 2 deletions internal/app/api/v1/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,9 +150,9 @@ func (s *TestkubeAPI) Init(server server.HTTPServer) {
testWorkflowExecutions.Post("/", s.ExecuteTestWorkflowHandler())
testWorkflowExecutions.Get("/:executionID", s.GetTestWorkflowExecutionHandler())
testWorkflowExecutions.Get("/:executionID/notifications", s.StreamTestWorkflowExecutionNotificationsHandler())
testWorkflowExecutions.Get("/:executionID/notifications/:serviceName/:serviceIndex", s.StreamTestWorkflowExecutionServiceNotificationsHandler())
testWorkflowExecutions.Get("/:executionID/notifications/:serviceName/:serviceIndex<int>", s.StreamTestWorkflowExecutionServiceNotificationsHandler())
testWorkflowExecutions.Get("/:executionID/notifications/stream", s.StreamTestWorkflowExecutionNotificationsWebSocketHandler())
testWorkflowExecutions.Get("/:executionID/notifications/stream/:serviceName/:serviceIndex", s.StreamTestWorkflowExecutionServiceNotificationsWebSocketHandler())
testWorkflowExecutions.Get("/:executionID/notifications/stream/:serviceName/:serviceIndex<int>", s.StreamTestWorkflowExecutionServiceNotificationsWebSocketHandler())
testWorkflowExecutions.Post("/:executionID/abort", s.AbortTestWorkflowExecutionHandler())
testWorkflowExecutions.Post("/:executionID/pause", s.PauseTestWorkflowExecutionHandler())
testWorkflowExecutions.Post("/:executionID/resume", s.ResumeTestWorkflowExecutionHandler())
Expand Down
28 changes: 22 additions & 6 deletions internal/app/api/v1/testworkflowexecutions.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,13 +103,21 @@ func (s *TestkubeAPI) StreamTestWorkflowExecutionServiceNotificationsHandler() f
return s.ClientError(c, errPrefix, err)
}

found := false
if execution.Workflow != nil && execution.Workflow.Spec != nil {
_, found = execution.Workflow.Spec.Services[serviceName]
}

if !found {
return s.ClientError(c, errPrefix, errors.New("unknown service for test workflow execution"))
}

// Check for the logs
id := fmt.Sprintf("%s-%s-%s", execution.Id, serviceName, serviceIndex)
notifications := s.ExecutionWorkerClient.Notifications(ctx, id, executionworkertypes.NotificationsOptions{
Hints: executionworkertypes.Hints{
Namespace: execution.Namespace,
// ScheduledAt: common.Ptr(execution.ScheduledAt),
// Signature: execution.Signature,
Namespace: execution.Namespace,
ScheduledAt: common.Ptr(execution.ScheduledAt),
},
})
if notifications.Err() != nil {
Expand Down Expand Up @@ -179,13 +187,21 @@ func (s *TestkubeAPI) StreamTestWorkflowExecutionServiceNotificationsWebSocketHa
return
}

found := false
if execution.Workflow != nil && execution.Workflow.Spec != nil {
_, found = execution.Workflow.Spec.Services[serviceName]
}

if !found {
return
}

// Check for the logs
id := fmt.Sprintf("%s-%s-%s", execution.Id, serviceName, serviceIndex)
notifications := s.ExecutionWorkerClient.Notifications(ctx, id, executionworkertypes.NotificationsOptions{
Hints: executionworkertypes.Hints{
Namespace: execution.Namespace,
// Signature: execution.Signature,
// ScheduledAt: common.Ptr(execution.ScheduledAt),
Namespace: execution.Namespace,
ScheduledAt: common.Ptr(execution.ScheduledAt),
},
})
if notifications.Err() != nil {
Expand Down

0 comments on commit fff63c0

Please sign in to comment.