Skip to content

Commit

Permalink
fix: client for get service logs
Browse files Browse the repository at this point in the history
Signed-off-by: Vladislav Sukhin <vladislav@kubeshop.io>
  • Loading branch information
vsukhin committed Nov 25, 2024
1 parent bd39964 commit 2c77962
Show file tree
Hide file tree
Showing 5 changed files with 64 additions and 6 deletions.
47 changes: 44 additions & 3 deletions cmd/kubectl-testkube/commands/testworkflows/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ func NewRunTestWorkflowCmd() *cobra.Command {
masks []string
tags map[string]string
selectors []string
serviceName string
serviceIndex int
)

cmd := &cobra.Command{
Expand Down Expand Up @@ -146,7 +148,7 @@ func NewRunTestWorkflowCmd() *cobra.Command {
ui.NL()
if !execution.FailedToInitialize() {
if watchEnabled && len(args) > 0 {
exitCode = uiWatch(execution, client)
exitCode = uiWatch(execution, serviceName, serviceIndex, client)
ui.NL()
if downloadArtifactsEnabled {
tests.DownloadTestWorkflowArtifacts(execution.Id, downloadDir, format, masks, client, outputPretty)
Expand Down Expand Up @@ -181,12 +183,21 @@ func NewRunTestWorkflowCmd() *cobra.Command {
cmd.Flags().StringArrayVarP(&masks, "mask", "", []string{}, "regexp to filter downloaded files, single or comma separated, like report/.* or .*\\.json,.*\\.js$")
cmd.Flags().StringToStringVarP(&tags, "tag", "", map[string]string{}, "execution tags in a form of name1=val1 passed to executor")
cmd.Flags().StringSliceVarP(&selectors, "label", "l", nil, "label key value pair: --label key1=value1 or label expression")
cmd.Flags().StringVar(&serviceName, "service-name", "", "test workflow service name")
cmd.Flags().IntVar(&serviceIndex, "service-index", 0, "test workflow service index")

return cmd
}

func uiWatch(execution testkube.TestWorkflowExecution, client apiclientv1.Client) int {
result, err := watchTestWorkflowLogs(execution.Id, execution.Signature, client)
func uiWatch(execution testkube.TestWorkflowExecution, serviceName string, serviceIndex int, client apiclientv1.Client) int {
var result *testkube.TestWorkflowResult
var err error

if serviceName == "" {
result, err = watchTestWorkflowLogs(execution.Id, execution.Signature, client)
} else {
result, err = watchTestWorkflowServiceLogs(execution.Id, serviceName, serviceIndex, execution.Signature, client)
}
ui.ExitOnError("reading test workflow execution logs", err)

// Apply the result in the execution
Expand Down Expand Up @@ -313,6 +324,36 @@ func watchTestWorkflowLogs(id string, signature []testkube.TestWorkflowSignature
return result, err
}

func watchTestWorkflowServiceLogs(id, serviceName string, serviceIndex int, signature []testkube.TestWorkflowSignature, client apiclientv1.Client) (*testkube.TestWorkflowResult, error) {
ui.Info("Getting logs from test workflow service pod", fmt.Sprintf("%s-%s-%d", id, serviceName, serviceIndex))

notifications, err := client.GetTestWorkflowExecutionServiceNotifications(id, serviceName, serviceIndex)
ui.ExitOnError("getting logs from executor", err)

steps := flattenSignatures(signature)

var result *testkube.TestWorkflowResult
var isLineBeginning = true
for l := range notifications {
if l.Output != nil {
continue
}
if l.Result != nil {
if printResultDifference(result, l.Result, steps) {
isLineBeginning = true
}
result = l.Result
continue
}

printStructuredLogLines(l.Log, &isLineBeginning)
}

ui.NL()

return result, err
}

func printStatusHeader(i, n int, name string) {
if i == -1 {
fmt.Println("\n" + ui.LightCyan(fmt.Sprintf("• %s", name)))
Expand Down
10 changes: 9 additions & 1 deletion cmd/kubectl-testkube/commands/testworkflows/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,11 @@ import (
)

func NewWatchTestWorkflowExecutionCmd() *cobra.Command {
var (
serviceName string
serviceIndex int
)

cmd := &cobra.Command{
Use: "testworkflowexecution <executionName>",
Aliases: []string{"testworkflowexecutions", "twe", "tw"},
Expand All @@ -31,7 +36,7 @@ func NewWatchTestWorkflowExecutionCmd() *cobra.Command {
ui.ExitOnError("render test workflow execution", err)

ui.NL()
exitCode := uiWatch(execution, client)
exitCode := uiWatch(execution, serviceName, serviceIndex, client)
ui.NL()

execution, err = client.GetTestWorkflowExecution(execution.Id)
Expand All @@ -43,5 +48,8 @@ func NewWatchTestWorkflowExecutionCmd() *cobra.Command {
},
}

cmd.Flags().StringVar(&serviceName, "service-name", "", "test workflow service name")
cmd.Flags().IntVar(&serviceIndex, "service-index", 0, "test workflow service index")

return cmd
}
4 changes: 2 additions & 2 deletions internal/app/api/v1/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,9 +150,9 @@ func (s *TestkubeAPI) Init(server server.HTTPServer) {
testWorkflowExecutions.Post("/", s.ExecuteTestWorkflowHandler())
testWorkflowExecutions.Get("/:executionID", s.GetTestWorkflowExecutionHandler())
testWorkflowExecutions.Get("/:executionID/notifications", s.StreamTestWorkflowExecutionNotificationsHandler())
testWorkflowExecutions.Get("/:executionID/notifications/:serviceName/:serviceIndex", s.StreamTestWorkflowExecutionServiceNotificationsHandler())
testWorkflowExecutions.Get("/:executionID/notifications/:serviceName/:serviceIndex<int\\>", s.StreamTestWorkflowExecutionServiceNotificationsHandler())
testWorkflowExecutions.Get("/:executionID/notifications/stream", s.StreamTestWorkflowExecutionNotificationsWebSocketHandler())
testWorkflowExecutions.Get("/:executionID/notifications/stream:serviceName/:serviceIndex", s.StreamTestWorkflowExecutionServiceNotificationsWebSocketHandler())
testWorkflowExecutions.Get("/:executionID/notifications/stream:serviceName/:serviceIndex<int\\>", s.StreamTestWorkflowExecutionServiceNotificationsWebSocketHandler())
testWorkflowExecutions.Post("/:executionID/abort", s.AbortTestWorkflowExecutionHandler())
testWorkflowExecutions.Post("/:executionID/pause", s.PauseTestWorkflowExecutionHandler())
testWorkflowExecutions.Post("/:executionID/resume", s.ResumeTestWorkflowExecutionHandler())
Expand Down
1 change: 1 addition & 0 deletions pkg/api/v1/client/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ type TestWorkflowAPI interface {
ExecuteTestWorkflows(selector string, request testkube.TestWorkflowExecutionRequest) ([]testkube.TestWorkflowExecution, error)
GetTestWorkflowExecutionNotifications(id string) (chan testkube.TestWorkflowExecutionNotification, error)
GetTestWorkflowExecutionLogs(id string) ([]byte, error)
GetTestWorkflowExecutionServiceNotifications(id, serviceName string, serviceIndex int) (chan testkube.TestWorkflowExecutionNotification, error)
}

// TestWorkflowExecutionAPI describes test workflow api methods
Expand Down
8 changes: 8 additions & 0 deletions pkg/api/v1/client/testworkflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,14 @@ func (c TestWorkflowClient) GetTestWorkflowExecutionNotifications(id string) (no
return notifications, err
}

// GetTestWorkflowExecutionServiceNotifications returns events stream from job pods, based on job pods logs
func (c TestWorkflowClient) GetTestWorkflowExecutionServiceNotifications(id, serviceName string, serviceIndex int) (notifications chan testkube.TestWorkflowExecutionNotification, err error) {
notifications = make(chan testkube.TestWorkflowExecutionNotification)
uri := c.testWorkflowTransport.GetURI("/test-workflow-executions/%s/notifications/%s/%d", id, serviceName, serviceIndex)
err = c.testWorkflowTransport.GetTestWorkflowExecutionNotifications(uri, notifications)
return notifications, err
}

// GetTestWorkflowExecution returns single test workflow execution by id
func (c TestWorkflowClient) GetTestWorkflowExecution(id string) (testkube.TestWorkflowExecution, error) {
uri := c.testWorkflowExecutionTransport.GetURI("/test-workflow-executions/%s", id)
Expand Down

0 comments on commit 2c77962

Please sign in to comment.