diff --git a/cmd/api-server/main.go b/cmd/api-server/main.go index 9fe93fa5129..f3bcd976f29 100644 --- a/cmd/api-server/main.go +++ b/cmd/api-server/main.go @@ -15,6 +15,7 @@ import ( "github.com/kubeshop/testkube/cmd/api-server/commons" "github.com/kubeshop/testkube/cmd/api-server/services" "github.com/kubeshop/testkube/internal/app/api/debug" + agentclient "github.com/kubeshop/testkube/pkg/agent/client" cloudartifacts "github.com/kubeshop/testkube/pkg/cloud/data/artifact" cloudtestworkflow "github.com/kubeshop/testkube/pkg/cloud/data/testworkflow" "github.com/kubeshop/testkube/pkg/event/kind/cdevent" @@ -134,7 +135,7 @@ func main() { if strings.HasPrefix(controlPlaneUrl, fmt.Sprintf("%s:%d", cfg.APIServerFullname, cfg.GRPCServerPort)) { controlPlaneUrl = fmt.Sprintf("127.0.0.1:%d", cfg.GRPCServerPort) } - grpcConn, err = agent.NewGRPCConnection( + grpcConn, err = agentclient.NewGRPCConnection( ctx, cfg.TestkubeProTLSInsecure, cfg.TestkubeProSkipVerify, diff --git a/cmd/logs-server/main.go b/cmd/logs-server/main.go index c3d5f5c0af8..d64ce5aa4c2 100644 --- a/cmd/logs-server/main.go +++ b/cmd/logs-server/main.go @@ -3,7 +3,6 @@ package main import ( "context" "errors" - "os" "os/signal" "syscall" @@ -14,7 +13,7 @@ import ( "google.golang.org/grpc/credentials" "github.com/kubeshop/testkube/internal/common" - "github.com/kubeshop/testkube/pkg/agent" + agentclient "github.com/kubeshop/testkube/pkg/agent/client" "github.com/kubeshop/testkube/pkg/event/bus" "github.com/kubeshop/testkube/pkg/log" "github.com/kubeshop/testkube/pkg/logs" @@ -106,7 +105,7 @@ func main() { switch mode { case common.ModeAgent: - grpcConn, err := agent.NewGRPCConnection( + grpcConn, err := agentclient.NewGRPCConnection( ctx, cfg.TestkubeProTLSInsecure, cfg.TestkubeProSkipVerify, diff --git a/cmd/testworkflow-init/commands/run.go b/cmd/testworkflow-init/commands/run.go index 9afd0ca66d4..c17505b0d60 100644 --- a/cmd/testworkflow-init/commands/run.go +++ b/cmd/testworkflow-init/commands/run.go @@ -7,12 +7,13 @@ import ( "github.com/kubeshop/testkube/cmd/testworkflow-init/data" "github.com/kubeshop/testkube/cmd/testworkflow-init/orchestration" "github.com/kubeshop/testkube/cmd/testworkflow-init/output" + "github.com/kubeshop/testkube/cmd/testworkflow-init/runtime" "github.com/kubeshop/testkube/pkg/expressions" "github.com/kubeshop/testkube/pkg/testworkflows/testworkflowprocessor/action/actiontypes/lite" ) func Run(run lite.ActionExecute, container lite.LiteActionContainer) { - machine := data.GetInternalTestWorkflowMachine() + machine := runtime.GetInternalTestWorkflowMachine() state := data.GetState() step := state.GetStep(run.Ref) @@ -32,14 +33,14 @@ func Run(run lite.ActionExecute, container lite.LiteActionContainer) { // Ensure the command is not empty if len(command) == 0 { - output.ExitErrorf(data.CodeInputError, "command is required") + output.ExitErrorf(constants.CodeInputError, "command is required") } // Resolve the command to run for i := range command { value, err := expressions.CompileAndResolveTemplate(command[i], machine, expressions.FinalizerFail) if err != nil { - output.ExitErrorf(data.CodeInternal, "failed to compute argument '%d': %s", i, err.Error()) + output.ExitErrorf(constants.CodeInternal, "failed to compute argument '%d': %s", i, err.Error()) } command[i], _ = value.Static().StringValue() } @@ -48,11 +49,11 @@ func Run(run lite.ActionExecute, container lite.LiteActionContainer) { execution := orchestration.Executions.Create(command[0], command[1:]) result, err := execution.Run() if err != nil { - output.ExitErrorf(data.CodeInternal, "failed to execute: %v", err) + output.ExitErrorf(constants.CodeInternal, "failed to execute: %v", err) } // Initialize local state - var status data.StepStatus + var status constants.StepStatus success := result.ExitCode == 0 @@ -61,11 +62,11 @@ func Run(run lite.ActionExecute, container lite.LiteActionContainer) { success = !success } if result.Aborted { - status = data.StepStatusAborted + status = constants.StepStatusAborted } else if success { - status = data.StepStatusPassed + status = constants.StepStatusPassed } else { - status = data.StepStatusFailed + status = constants.StepStatusFailed } // Abandon saving execution data if the step has been finished before diff --git a/cmd/testworkflow-init/commands/setup.go b/cmd/testworkflow-init/commands/setup.go index 004b2eda415..e17243bbe22 100644 --- a/cmd/testworkflow-init/commands/setup.go +++ b/cmd/testworkflow-init/commands/setup.go @@ -6,7 +6,6 @@ import ( "strings" "github.com/kubeshop/testkube/cmd/testworkflow-init/constants" - "github.com/kubeshop/testkube/cmd/testworkflow-init/data" "github.com/kubeshop/testkube/cmd/testworkflow-init/output" "github.com/kubeshop/testkube/pkg/testworkflows/testworkflowprocessor/action/actiontypes/lite" "github.com/kubeshop/testkube/pkg/version" @@ -24,7 +23,7 @@ func Setup(config lite.ActionSetup) error { // Copy the init process stdoutUnsafe.Print("Configuring init process...") if config.CopyInit { - err := exec.Command("cp", "/init", data.InitPath).Run() + err := exec.Command("cp", "/init", constants.InitPath).Run() if err != nil { stdoutUnsafe.Error(" error\n") stdoutUnsafe.Errorf(" failed to copy the /init process: %s\n", err.Error()) @@ -38,7 +37,7 @@ func Setup(config lite.ActionSetup) error { // Copy the toolkit stdoutUnsafe.Print("Configuring toolkit...") if config.CopyToolkit { - err := exec.Command("cp", "/toolkit", data.ToolkitPath).Run() + err := exec.Command("cp", "/toolkit", constants.ToolkitPath).Run() if err != nil { stdoutUnsafe.Error(" error\n") stdoutUnsafe.Errorf(" failed to copy the /toolkit utilities: %s\n", err.Error()) @@ -54,7 +53,7 @@ func Setup(config lite.ActionSetup) error { if config.CopyBinaries { // Use `cp` on the whole directory, as it has plenty of files, which lead to the same FS block. // Copying individual files will lead to high FS usage - err := exec.Command("cp", "-rf", defaultInitImageBusyboxBinaryPath, data.InternalBinPath).Run() + err := exec.Command("cp", "-rf", defaultInitImageBusyboxBinaryPath, constants.InternalBinPath).Run() if err != nil { stdoutUnsafe.Error(" error\n") stdoutUnsafe.Errorf(" failed to copy the binaries: %s\n", err.Error()) @@ -66,7 +65,7 @@ func Setup(config lite.ActionSetup) error { } // Expose debugging Pod information - stdoutUnsafe.Output(data.InitStepName, "pod", map[string]string{ + stdoutUnsafe.Output(constants.InitStepName, "pod", map[string]string{ "name": os.Getenv(constants.EnvPodName), "nodeName": os.Getenv(constants.EnvNodeName), "namespace": os.Getenv(constants.EnvNamespaceName), diff --git a/cmd/testworkflow-init/constants/codes.go b/cmd/testworkflow-init/constants/codes.go new file mode 100644 index 00000000000..9742950ed9c --- /dev/null +++ b/cmd/testworkflow-init/constants/codes.go @@ -0,0 +1,7 @@ +package constants + +const ( + CodeAborted uint8 = 137 + CodeInputError uint8 = 155 + CodeInternal uint8 = 190 +) diff --git a/cmd/testworkflow-init/constants/names.go b/cmd/testworkflow-init/constants/names.go new file mode 100644 index 00000000000..467c71f26cb --- /dev/null +++ b/cmd/testworkflow-init/constants/names.go @@ -0,0 +1,5 @@ +package constants + +const ( + InitStepName = "tktw-init" +) diff --git a/cmd/testworkflow-init/constants/paths.go b/cmd/testworkflow-init/constants/paths.go new file mode 100644 index 00000000000..ac8714c8e83 --- /dev/null +++ b/cmd/testworkflow-init/constants/paths.go @@ -0,0 +1,15 @@ +package constants + +import "path/filepath" + +const ( + InternalPath = "/.tktw" + TerminationLogPath = "/dev/termination-log" +) + +var ( + InternalBinPath = filepath.Join(InternalPath, "bin") + InitPath = filepath.Join(InternalPath, "init") + ToolkitPath = filepath.Join(InternalPath, "toolkit") + StatePath = filepath.Join(InternalPath, "state") +) diff --git a/cmd/testworkflow-init/data/constants.go b/cmd/testworkflow-init/constants/statuses.go similarity index 59% rename from cmd/testworkflow-init/data/constants.go rename to cmd/testworkflow-init/constants/statuses.go index fff1490b399..60c77dbd63d 100644 --- a/cmd/testworkflow-init/data/constants.go +++ b/cmd/testworkflow-init/constants/statuses.go @@ -1,19 +1,4 @@ -package data - -import "path/filepath" - -const ( - InitStepName = "tktw-init" - InternalPath = "/.tktw" - TerminationLogPath = "/dev/termination-log" -) - -var ( - InternalBinPath = filepath.Join(InternalPath, "bin") - InitPath = filepath.Join(InternalPath, "init") - ToolkitPath = filepath.Join(InternalPath, "toolkit") - StatePath = filepath.Join(InternalPath, "state") -) +package constants type StepStatus string @@ -47,9 +32,3 @@ func StepStatusFromCode(code string) StepStatus { } return StepStatusAborted } - -const ( - CodeAborted uint8 = 137 - CodeInputError uint8 = 155 - CodeInternal uint8 = 190 -) diff --git a/cmd/testworkflow-init/data/client.go b/cmd/testworkflow-init/data/client.go new file mode 100644 index 00000000000..733b7666d1a --- /dev/null +++ b/cmd/testworkflow-init/data/client.go @@ -0,0 +1,39 @@ +package data + +import ( + "context" + "sync" + + "github.com/kubeshop/testkube/cmd/testworkflow-init/constants" + "github.com/kubeshop/testkube/cmd/testworkflow-init/output" + agentclient "github.com/kubeshop/testkube/pkg/agent/client" + "github.com/kubeshop/testkube/pkg/cloud" + "github.com/kubeshop/testkube/pkg/credentials" + "github.com/kubeshop/testkube/pkg/log" +) + +var ( + cloudMu sync.Mutex + cloudClient cloud.TestKubeCloudAPIClient +) + +func CloudClient() cloud.TestKubeCloudAPIClient { + cloudMu.Lock() + defer cloudMu.Unlock() + + if cloudClient == nil { + cfg := GetState().InternalConfig.Worker.Connection + logger := log.NewSilent() + grpcConn, err := agentclient.NewGRPCConnection(context.Background(), cfg.TlsInsecure, cfg.SkipVerify, cfg.Url, "", "", "", logger) + if err != nil { + output.ExitErrorf(constants.CodeInternal, "failed to connect with the Control Plane: %s", err.Error()) + } + cloudClient = cloud.NewTestKubeCloudAPIClient(grpcConn) + } + return cloudClient +} + +func Credentials() credentials.CredentialRepository { + cfg := GetState().InternalConfig + return credentials.NewCredentialRepository(CloudClient(), cfg.Worker.Connection.ApiKey, cfg.Execution.Id) +} diff --git a/cmd/testworkflow-init/data/expressions.go b/cmd/testworkflow-init/data/expressions.go index 5f584fc0a4c..4ead38d421a 100644 --- a/cmd/testworkflow-init/data/expressions.go +++ b/cmd/testworkflow-init/data/expressions.go @@ -5,6 +5,7 @@ import ( "os" "strings" + "github.com/kubeshop/testkube/cmd/testworkflow-init/constants" "github.com/kubeshop/testkube/cmd/testworkflow-init/output" "github.com/kubeshop/testkube/pkg/expressions" ) @@ -68,12 +69,12 @@ var StateMachine = expressions.NewMachine(). currentStatus := GetState().CurrentStatus expr, err := expressions.EvalExpression(currentStatus, RefNotFailedMachine, AliasMachine) if err != nil { - output.ExitErrorf(CodeInternal, "current status is invalid: %s: %v\n", currentStatus, err.Error()) + output.ExitErrorf(constants.CodeInternal, "current status is invalid: %s: %v\n", currentStatus, err.Error()) } if passed, _ := expr.BoolValue(); passed { - return string(StepStatusPassed), true + return string(constants.StepStatusPassed), true } - return string(StepStatusFailed), true + return string(constants.StepStatusFailed), true } else if name == "self.status" { state := GetState() step := state.GetStep(state.CurrentRef) @@ -123,7 +124,7 @@ var RefSuccessMachine = expressions.NewMachine(). if s.Status == nil { return nil, false } - return *s.Status == StepStatusPassed || *s.Status == StepStatusSkipped, true + return *s.Status == constants.StepStatusPassed || *s.Status == constants.StepStatusSkipped, true }) var RefNotFailedMachine = expressions.NewMachine(). @@ -135,7 +136,7 @@ var RefNotFailedMachine = expressions.NewMachine(). return exp, true } } - return s.Status == nil || *s.Status == StepStatusPassed || *s.Status == StepStatusSkipped, true + return s.Status == nil || *s.Status == constants.StepStatusPassed || *s.Status == constants.StepStatusSkipped, true }) func Expression(expr string, m ...expressions.Machine) (expressions.StaticValue, error) { diff --git a/cmd/testworkflow-init/data/global.go b/cmd/testworkflow-init/data/global.go index 8c31ac8e5e0..d7a3f06fea2 100644 --- a/cmd/testworkflow-init/data/global.go +++ b/cmd/testworkflow-init/data/global.go @@ -18,7 +18,3 @@ func GetBaseTestWorkflowMachine() expressions.Machine { GetState() // load state return expressions.CombinedMachines(EnvMachine, StateMachine, fileMachine) } - -func GetInternalTestWorkflowMachine() expressions.Machine { - return expressions.CombinedMachines(RefSuccessMachine, AliasMachine, GetBaseTestWorkflowMachine()) -} diff --git a/cmd/testworkflow-init/data/state.go b/cmd/testworkflow-init/data/state.go index 1bf5b5fa706..8d960c514ec 100644 --- a/cmd/testworkflow-init/data/state.go +++ b/cmd/testworkflow-init/data/state.go @@ -9,6 +9,7 @@ import ( "strings" "sync" + "github.com/kubeshop/testkube/cmd/testworkflow-init/constants" "github.com/kubeshop/testkube/cmd/testworkflow-init/output" "github.com/kubeshop/testkube/pkg/expressions" "github.com/kubeshop/testkube/pkg/testworkflows/testworkflowconfig" @@ -148,14 +149,14 @@ func persistTerminationLog() { ref = *actions[i].End } if actions[i].Type() == lite.ActionTypeSetup { - ref = InitStepName + ref = constants.InitStepName } if ref == "" { continue } step := s.GetStep(ref) if step.Status == nil { - statuses = append(statuses, fmt.Sprintf("%s,%d", StepStatusAborted, CodeAborted)) + statuses = append(statuses, fmt.Sprintf("%s,%d", constants.StepStatusAborted, constants.CodeAborted)) } else { statuses = append(statuses, fmt.Sprintf("%s,%d", (*step.Status).Code(), step.ExitCode)) } @@ -168,9 +169,9 @@ func persistTerminationLog() { prevTerminationLog = statuses // Write the termination log - err := os.WriteFile(TerminationLogPath, []byte(strings.Join(statuses, "/")), 0) + err := os.WriteFile(constants.TerminationLogPath, []byte(strings.Join(statuses, "/")), 0) if err != nil { - output.UnsafeExitErrorf(CodeInternal, "failed to save the termination log: %s", err.Error()) + output.UnsafeExitErrorf(constants.CodeInternal, "failed to save the termination log: %s", err.Error()) } } @@ -181,7 +182,7 @@ func GetState() *state { defer loadStateMu.Unlock() loadStateMu.Lock() if !loadedState { - readState(StatePath) + readState(constants.StatePath) loadedState = true } return currentState @@ -192,6 +193,6 @@ func SaveTerminationLog() { } func SaveState() { - persistState(StatePath) + persistState(constants.StatePath) persistTerminationLog() } diff --git a/cmd/testworkflow-init/data/stepData.go b/cmd/testworkflow-init/data/stepData.go index 7b7371b0558..8171d841039 100644 --- a/cmd/testworkflow-init/data/stepData.go +++ b/cmd/testworkflow-init/data/stepData.go @@ -6,6 +6,7 @@ import ( "sync" "time" + "github.com/kubeshop/testkube/cmd/testworkflow-init/constants" "github.com/kubeshop/testkube/cmd/testworkflow-init/output" ) @@ -15,17 +16,17 @@ type RetryPolicy struct { } type StepData struct { - Ref string `json:"_,omitempty"` - ExitCode uint8 `json:"e,omitempty"` - Status *StepStatus `json:"s,omitempty"` - StartedAt *time.Time `json:"S,omitempty"` - Condition string `json:"c,omitempty"` - Parents []string `json:"p,omitempty"` - Timeout *time.Duration `json:"t,omitempty"` - PausedOnStart bool `json:"P,omitempty"` - Retry RetryPolicy `json:"r,omitempty"` - Result string `json:"R,omitempty"` - Iteration int32 `json:"i,omitempty"` + Ref string `json:"_,omitempty"` + ExitCode uint8 `json:"e,omitempty"` + Status *constants.StepStatus `json:"s,omitempty"` + StartedAt *time.Time `json:"S,omitempty"` + Condition string `json:"c,omitempty"` + Parents []string `json:"p,omitempty"` + Timeout *time.Duration `json:"t,omitempty"` + PausedOnStart bool `json:"P,omitempty"` + Retry RetryPolicy `json:"r,omitempty"` + Result string `json:"R,omitempty"` + Iteration int32 `json:"i,omitempty"` // Pausing PausedNs int64 `json:"n,omitempty"` @@ -53,22 +54,22 @@ func (s *StepData) ResolveCondition() (bool, error) { return expr.Static().BoolValue() } -func (s *StepData) ResolveResult() (StepStatus, error) { +func (s *StepData) ResolveResult() (constants.StepStatus, error) { if s.Result == "" { - return StepStatusAborted, errors.New("missing result expression") + return constants.StepStatusAborted, errors.New("missing result expression") } expr, err := Expression(s.Result, RefSuccessMachine) if err != nil { - return StepStatusAborted, err + return constants.StepStatusAborted, err } success, err := expr.Static().BoolValue() if err != nil { - return StepStatusAborted, err + return constants.StepStatusAborted, err } if success { - return StepStatusPassed, nil + return constants.StepStatusPassed, nil } - return StepStatusFailed, nil + return constants.StepStatusFailed, nil } func (s *StepData) SetExitCode(exitCode uint8) *StepData { @@ -99,7 +100,7 @@ func (s *StepData) SetTimeout(timeout string) *StepData { } duration, err := time.ParseDuration(timeout) if err != nil { - output.ExitErrorf(CodeInputError, "invalid timeout duration: %s: %s", timeout, err.Error()) + output.ExitErrorf(constants.CodeInputError, "invalid timeout duration: %s: %s", timeout, err.Error()) } s.Timeout = &duration return s @@ -115,7 +116,7 @@ func (s *StepData) SetRetryPolicy(policy RetryPolicy) *StepData { return s } -func (s *StepData) SetStatus(status StepStatus) *StepData { +func (s *StepData) SetStatus(status constants.StepStatus) *StepData { s.Status = &status return s } diff --git a/cmd/testworkflow-init/main.go b/cmd/testworkflow-init/main.go index 75a7384e64d..7ed32ecb502 100644 --- a/cmd/testworkflow-init/main.go +++ b/cmd/testworkflow-init/main.go @@ -19,6 +19,7 @@ import ( "github.com/kubeshop/testkube/cmd/testworkflow-init/obfuscator" "github.com/kubeshop/testkube/cmd/testworkflow-init/orchestration" "github.com/kubeshop/testkube/cmd/testworkflow-init/output" + "github.com/kubeshop/testkube/cmd/testworkflow-init/runtime" "github.com/kubeshop/testkube/pkg/expressions" "github.com/kubeshop/testkube/pkg/testworkflows/testworkflowprocessor/action/actiontypes/lite" ) @@ -42,22 +43,22 @@ func main() { orchestration.Setup.SetSensitiveWordMinimumLength(SensitiveMinimumLength) // Prepare empty state file if it doesn't exist - _, err := os.Stat(data.StatePath) + _, err := os.Stat(constants.StatePath) if errors.Is(err, os.ErrNotExist) { - stdout.Hint(data.InitStepName, constants.InstructionStart) + stdout.Hint(constants.InitStepName, constants.InstructionStart) stdoutUnsafe.Print("Creating state...") - err := os.WriteFile(data.StatePath, nil, 0777) + err := os.WriteFile(constants.StatePath, nil, 0777) if err != nil { stdoutUnsafe.Error(" error\n") - output.ExitErrorf(data.CodeInternal, "failed to create state file: %s", err.Error()) + output.ExitErrorf(constants.CodeInternal, "failed to create state file: %s", err.Error()) } - os.Chmod(data.StatePath, 0777) + os.Chmod(constants.StatePath, 0777) stdoutUnsafe.Print(" done\n") } else if err != nil { - stdout.Hint(data.InitStepName, constants.InstructionStart) + stdout.Hint(constants.InitStepName, constants.InstructionStart) stdoutUnsafe.Print("Accessing state...") stdoutUnsafe.Error(" error\n") - output.ExitErrorf(data.CodeInternal, "cannot access state file: %s", err.Error()) + output.ExitErrorf(constants.CodeInternal, "cannot access state file: %s", err.Error()) } // Store the instructions in the state if they are provided @@ -80,13 +81,13 @@ func main() { // Ensure there is a group index provided if len(os.Args) != 2 { - output.ExitErrorf(data.CodeInternal, "invalid arguments provided - expected only one") + output.ExitErrorf(constants.CodeInternal, "invalid arguments provided - expected only one") } // Determine group index to run groupIndex, err := strconv.ParseInt(os.Args[1], 10, 32) if err != nil { - output.ExitErrorf(data.CodeInputError, "invalid run group passed: %s", err.Error()) + output.ExitErrorf(constants.CodeInputError, "invalid run group passed: %s", err.Error()) } // Handle aborting @@ -143,7 +144,7 @@ func main() { }) _, err = controlSrv.Listen() if err != nil { - output.ExitErrorf(data.CodeInternal, "Failed to start control server at port %d: %s\n", constants.ControlServerPort, err.Error()) + output.ExitErrorf(constants.CodeInternal, "Failed to start control server at port %d: %s\n", constants.ControlServerPort, err.Error()) } // Keep a list of paused steps for execution @@ -187,7 +188,7 @@ func main() { orchestration.Start(step) break } - output.ExitErrorf(data.CodeInputError, err.Error()) + output.ExitErrorf(constants.CodeInputError, err.Error()) } stdout.SetSensitiveWords(orchestration.Setup.GetSensitiveWords()) currentContainer = *action.Container @@ -205,15 +206,15 @@ func main() { // Determine if the step should be skipped executable, err := step.ResolveCondition() if err != nil { - output.ExitErrorf(data.CodeInternal, "failed to determine condition of '%s' step: %s: %v", *action.Start, step.Condition, err.Error()) + output.ExitErrorf(constants.CodeInternal, "failed to determine condition of '%s' step: %s: %v", *action.Start, step.Condition, err.Error()) } if !executable { - step.SetStatus(data.StepStatusSkipped) + step.SetStatus(constants.StepStatusSkipped) // Skip all the children for _, v := range state.Steps { if slices.Contains(v.Parents, step.Ref) { - v.SetStatus(data.StepStatusSkipped) + v.SetStatus(constants.StepStatusSkipped) } } } @@ -231,7 +232,7 @@ func main() { if step.Status == nil { status, err := step.ResolveResult() if err != nil { - output.ExitErrorf(data.CodeInternal, "failed to determine result of '%s' step: %s: %v", *action.End, step.Result, err.Error()) + output.ExitErrorf(constants.CodeInternal, "failed to determine result of '%s' step: %s: %v", *action.End, step.Result, err.Error()) } step.SetStatus(status) } @@ -240,15 +241,15 @@ func main() { case lite.ActionTypeSetup: err := orchestration.Setup.UseEnv(constants.EnvGroupDebug) if err != nil { - output.ExitErrorf(data.CodeInputError, err.Error()) + output.ExitErrorf(constants.CodeInputError, err.Error()) } stdout.SetSensitiveWords(orchestration.Setup.GetSensitiveWords()) - step := state.GetStep(data.InitStepName) + step := state.GetStep(constants.InitStepName) err = commands.Setup(*action.Setup) if err == nil { - step.SetStatus(data.StepStatusPassed) + step.SetStatus(constants.StepStatusPassed) } else { - step.SetStatus(data.StepStatusFailed) + step.SetStatus(constants.StepStatusFailed) } orchestration.End(step) if err != nil { @@ -268,14 +269,14 @@ func main() { // Ignore when it is aborted if orchestration.Executions.IsAborted() { - step.SetStatus(data.StepStatusAborted) + step.SetStatus(constants.StepStatusAborted) continue } // Configure the environment err := orchestration.Setup.UseCurrentEnv() if err != nil { - output.ExitErrorf(data.CodeInputError, err.Error()) + output.ExitErrorf(constants.CodeInputError, err.Error()) } if action.Execute.Toolkit { serialized, _ := json.Marshal(state.InternalConfig) @@ -318,16 +319,16 @@ func main() { // Iterate over timed out step for _, r := range timedOut { - r.SetStatus(data.StepStatusTimeout) + r.SetStatus(constants.StepStatusTimeout) sub := state.GetSubSteps(r.Ref) for i := range sub { if sub[i].IsFinished() { continue } if sub[i].IsStarted() { - sub[i].SetStatus(data.StepStatusTimeout) + sub[i].SetStatus(constants.StepStatusTimeout) } else { - sub[i].SetStatus(data.StepStatusSkipped) + sub[i].SetStatus(constants.StepStatusSkipped) } } stdoutUnsafe.Println("Timed out.") @@ -352,7 +353,7 @@ func main() { // Ignore when it is aborted if orchestration.Executions.IsAborted() { - step.SetStatus(data.StepStatusAborted) + step.SetStatus(constants.StepStatusAborted) break } @@ -379,7 +380,7 @@ func main() { if until == "" { until = "passed" } - expr, err := expressions.CompileAndResolve(until, data.LocalMachine, data.GetInternalTestWorkflowMachine(), expressions.FinalizerFail) + expr, err := expressions.CompileAndResolve(until, data.LocalMachine, runtime.GetInternalTestWorkflowMachine(), expressions.FinalizerFail) if err != nil { stdout.Printf("failed to execute retry condition: %s: %s\n", until, err.Error()) break @@ -406,7 +407,7 @@ func main() { // Stop the container after all the instructions are interpret _ = orchestration.Executions.Kill() if orchestration.Executions.IsAborted() { - os.Exit(int(data.CodeAborted)) + os.Exit(int(constants.CodeAborted)) } else { os.Exit(0) } diff --git a/cmd/testworkflow-init/orchestration/control.go b/cmd/testworkflow-init/orchestration/control.go index a2c5e84dda1..1701364350f 100644 --- a/cmd/testworkflow-init/orchestration/control.go +++ b/cmd/testworkflow-init/orchestration/control.go @@ -35,7 +35,7 @@ func FinishExecution(step *data.StepData, result constants.ExecutionResult) { func End(step *data.StepData) { if !step.IsFinished() { v, e := json.Marshal(step) - output.ExitErrorf(data.CodeInternal, "cannot mark unfinished step as finished: %s, %v", string(v), e) + output.ExitErrorf(constants.CodeInternal, "cannot mark unfinished step as finished: %s, %v", string(v), e) } instructions.PrintHintDetails(step.Ref, constants.InstructionEnd, *step.Status) } diff --git a/cmd/testworkflow-init/orchestration/executions.go b/cmd/testworkflow-init/orchestration/executions.go index 2697ab1bc10..5bf0eadd57a 100644 --- a/cmd/testworkflow-init/orchestration/executions.go +++ b/cmd/testworkflow-init/orchestration/executions.go @@ -10,6 +10,7 @@ import ( "github.com/pkg/errors" + "github.com/kubeshop/testkube/cmd/testworkflow-init/constants" "github.com/kubeshop/testkube/cmd/testworkflow-init/data" "github.com/kubeshop/testkube/cmd/testworkflow-init/output" ) @@ -150,7 +151,7 @@ type execution struct { func (e *execution) Run() (*executionResult, error) { // Immediately fail when aborted if e.group.aborted.Load() { - return &executionResult{Aborted: true, ExitCode: data.CodeAborted}, nil + return &executionResult{Aborted: true, ExitCode: constants.CodeAborted}, nil } // Ensure it's not paused @@ -163,7 +164,7 @@ func (e *execution) Run() (*executionResult, error) { if e.group.aborted.Load() { e.group.pauseMu.Unlock() e.cmdMu.Unlock() - return &executionResult{Aborted: true, ExitCode: data.CodeAborted}, nil + return &executionResult{Aborted: true, ExitCode: constants.CodeAborted}, nil } // Initialize local state @@ -208,7 +209,7 @@ func (e *execution) Run() (*executionResult, error) { // Fail when aborted if e.group.aborted.Load() { - return &executionResult{Aborted: true, ExitCode: data.CodeAborted}, nil + return &executionResult{Aborted: true, ExitCode: constants.CodeAborted}, nil } return &executionResult{ExitCode: uint8(exitCode)}, nil diff --git a/cmd/testworkflow-init/orchestration/setup.go b/cmd/testworkflow-init/orchestration/setup.go index 09ea0b497c5..ed5e438ff26 100644 --- a/cmd/testworkflow-init/orchestration/setup.go +++ b/cmd/testworkflow-init/orchestration/setup.go @@ -47,6 +47,7 @@ type setup struct { envGroups map[string]map[string]string envGroupsComputed map[string]map[string]struct{} envGroupsSensitive map[string]map[string]struct{} + envAdditionalSensitive map[string]struct{} envCurrentGroup int envSelectedGroup string minSensitiveWordLength int @@ -58,6 +59,7 @@ func newSetup() *setup { envGroups: map[string]map[string]string{}, envGroupsComputed: map[string]map[string]struct{}{}, envGroupsSensitive: map[string]map[string]struct{}{}, + envAdditionalSensitive: map[string]struct{}{}, envCurrentGroup: -1, minSensitiveWordLength: 1, } @@ -106,8 +108,17 @@ func (c *setup) SetSensitiveWordMinimumLength(length int) { } } +func (c *setup) AddSensitiveWords(words ...string) { + for i := range words { + c.envAdditionalSensitive[words[i]] = struct{}{} + } +} + func (c *setup) GetSensitiveWords() []string { - words := make([]string, 0) + words := make([]string, 0, len(c.envAdditionalSensitive)) + for value := range c.envAdditionalSensitive { + words = append(words, value) + } for _, name := range commonSensitiveVariables { value := os.Getenv(name) if len(value) < c.minSensitiveWordLength { @@ -201,9 +212,9 @@ func (c *setup) UseEnv(group string) error { // Ensure the built-in binaries are available if os.Getenv("PATH") == "" { - os.Setenv("PATH", data.InternalBinPath) + os.Setenv("PATH", constants.InternalBinPath) } else { - os.Setenv("PATH", fmt.Sprintf("%s:%s", os.Getenv("PATH"), data.InternalBinPath)) + os.Setenv("PATH", fmt.Sprintf("%s:%s", os.Getenv("PATH"), constants.InternalBinPath)) } // Compute dynamic environment variables diff --git a/cmd/testworkflow-init/runtime/machine.go b/cmd/testworkflow-init/runtime/machine.go new file mode 100644 index 00000000000..81269571f52 --- /dev/null +++ b/cmd/testworkflow-init/runtime/machine.go @@ -0,0 +1,18 @@ +package runtime + +import ( + "github.com/kubeshop/testkube/cmd/testworkflow-init/data" + "github.com/kubeshop/testkube/cmd/testworkflow-init/orchestration" + "github.com/kubeshop/testkube/cmd/testworkflow-init/output" + "github.com/kubeshop/testkube/pkg/credentials" + "github.com/kubeshop/testkube/pkg/expressions" +) + +func GetInternalTestWorkflowMachine() expressions.Machine { + return expressions.CombinedMachines(data.RefSuccessMachine, data.AliasMachine, + data.GetBaseTestWorkflowMachine(), + credentials.NewCredentialMachine(data.Credentials(), func(_ string, value string) { + orchestration.Setup.AddSensitiveWords(value) + output.Std.SetSensitiveWords(orchestration.Setup.GetSensitiveWords()) + })) +} diff --git a/cmd/testworkflow-toolkit/commands/artifacts.go b/cmd/testworkflow-toolkit/commands/artifacts.go index d9f9e4926c6..8ecd33c4c45 100644 --- a/cmd/testworkflow-toolkit/commands/artifacts.go +++ b/cmd/testworkflow-toolkit/commands/artifacts.go @@ -13,7 +13,7 @@ import ( "google.golang.org/protobuf/types/known/emptypb" "github.com/kubeshop/testkube/cmd/testworkflow-toolkit/env/config" - "github.com/kubeshop/testkube/pkg/agent" + agentclient "github.com/kubeshop/testkube/pkg/agent/client" "github.com/kubeshop/testkube/pkg/capabilities" "github.com/kubeshop/testkube/pkg/cloud" @@ -84,7 +84,7 @@ func NewArtifactsCmd() *cobra.Command { // Archive ctx, cancel := context.WithTimeout(cmd.Context(), 30*time.Second) defer cancel() - ctx = agent.AddAPIKeyMeta(ctx, config.Config().Worker.Connection.ApiKey) + ctx = agentclient.AddAPIKeyMeta(ctx, config.Config().Worker.Connection.ApiKey) executor, client := env.Cloud(ctx) proContext, err := client.GetProContext(ctx, &emptypb.Empty{}) var supported []*cloud.Capability diff --git a/cmd/testworkflow-toolkit/env/client.go b/cmd/testworkflow-toolkit/env/client.go index 1031cb444ce..debdb35b296 100644 --- a/cmd/testworkflow-toolkit/env/client.go +++ b/cmd/testworkflow-toolkit/env/client.go @@ -6,6 +6,7 @@ import ( "math" "net/url" "strconv" + "sync" corev1 "k8s.io/api/core/v1" @@ -13,10 +14,10 @@ import ( "k8s.io/client-go/rest" config2 "github.com/kubeshop/testkube/cmd/testworkflow-toolkit/env/config" + agentclient "github.com/kubeshop/testkube/pkg/agent/client" "github.com/kubeshop/testkube/pkg/cache" "github.com/kubeshop/testkube/cmd/kubectl-testkube/config" - "github.com/kubeshop/testkube/pkg/agent" "github.com/kubeshop/testkube/pkg/api/v1/client" "github.com/kubeshop/testkube/pkg/cloud" cloudexecutor "github.com/kubeshop/testkube/pkg/cloud/data/executor" @@ -86,12 +87,26 @@ func Testkube() client.Client { return client.NewDirectAPIClient(httpClient, sseClient, fmt.Sprintf("http://%s:%d", host, port), "") } +var ( + cloudMu sync.Mutex + cloudExecutor cloudexecutor.Executor + cloudClient cloud.TestKubeCloudAPIClient +) + func Cloud(ctx context.Context) (cloudexecutor.Executor, cloud.TestKubeCloudAPIClient) { - cfg := config2.Config().Worker.Connection - grpcConn, err := agent.NewGRPCConnection(ctx, cfg.TlsInsecure, cfg.SkipVerify, cfg.Url, "", "", "", log.DefaultLogger) - if err != nil { - ui.Fail(fmt.Errorf("failed to connect with Cloud: %w", err)) + cloudMu.Lock() + defer cloudMu.Unlock() + + if cloudExecutor == nil { + cfg := config2.Config().Worker.Connection + logger := log.NewSilent() + grpcConn, err := agentclient.NewGRPCConnection(ctx, cfg.TlsInsecure, cfg.SkipVerify, cfg.Url, "", "", "", logger) + if err != nil { + ui.Fail(fmt.Errorf("failed to connect with Cloud: %w", err)) + } + cloudClient = cloud.NewTestKubeCloudAPIClient(grpcConn) + cloudExecutor = cloudexecutor.NewCloudGRPCExecutor(cloudClient, grpcConn, cfg.ApiKey) } - grpcClient := cloud.NewTestKubeCloudAPIClient(grpcConn) - return cloudexecutor.NewCloudGRPCExecutor(grpcClient, grpcConn, cfg.ApiKey), grpcClient + + return cloudExecutor, cloudClient } diff --git a/pkg/agent/agent.go b/pkg/agent/agent.go index 116c7198127..b612770d261 100644 --- a/pkg/agent/agent.go +++ b/pkg/agent/agent.go @@ -2,28 +2,21 @@ package agent import ( "context" - "crypto/tls" - "crypto/x509" "fmt" "math" - "os" "time" - "github.com/kubeshop/testkube/pkg/executor/output" - "github.com/kubeshop/testkube/pkg/version" - - "google.golang.org/grpc/credentials" - "google.golang.org/grpc/credentials/insecure" - "google.golang.org/grpc/encoding/gzip" - "google.golang.org/grpc/keepalive" - "github.com/pkg/errors" "github.com/valyala/fasthttp" "go.uber.org/zap" "golang.org/x/sync/errgroup" "google.golang.org/grpc" + "google.golang.org/grpc/encoding/gzip" "google.golang.org/grpc/metadata" + agentclient "github.com/kubeshop/testkube/pkg/agent/client" + "github.com/kubeshop/testkube/pkg/executor/output" + "github.com/kubeshop/testkube/internal/config" "github.com/kubeshop/testkube/pkg/api/v1/testkube" "github.com/kubeshop/testkube/pkg/cloud" @@ -31,8 +24,6 @@ import ( ) const ( - timeout = 10 * time.Second - apiKeyMeta = "api-key" clusterIDMeta = "cluster-id" cloudMigrateMeta = "migrate" orgIdMeta = "environment-id" @@ -44,88 +35,6 @@ const ( // buffer up to five messages per worker const bufferSizePerWorker = 5 -func NewGRPCConnection( - ctx context.Context, - isInsecure bool, - skipVerify bool, - server string, - certFile, keyFile, caFile string, - logger *zap.SugaredLogger, -) (*grpc.ClientConn, error) { - ctx, cancel := context.WithTimeout(ctx, timeout) - defer cancel() - tlsConfig := &tls.Config{MinVersion: tls.VersionTLS12} - if skipVerify { - tlsConfig = &tls.Config{InsecureSkipVerify: true} - } else { - if certFile != "" && keyFile != "" { - if err := clientCert(tlsConfig, certFile, keyFile); err != nil { - return nil, err - } - } - if caFile != "" { - if err := rootCAs(tlsConfig, caFile); err != nil { - return nil, err - } - } - } - - creds := credentials.NewTLS(tlsConfig) - if isInsecure { - creds = insecure.NewCredentials() - } - - kacp := keepalive.ClientParameters{ - Time: 10 * time.Second, - Timeout: 5 * time.Second, - PermitWithoutStream: true, - } - - userAgent := version.Version + "/" + version.Commit - logger.Infow("initiating connection with control plane", "userAgent", userAgent, "server", server, "insecure", isInsecure, "skipVerify", skipVerify, "certFile", certFile, "keyFile", keyFile, "caFile", caFile) - // WithBlock, WithReturnConnectionError and FailOnNonTempDialError are recommended not to be used by gRPC go docs - // but given that Agent will not work if gRPC connection cannot be established, it is ok to use them and assert issues at dial time - return grpc.DialContext( - ctx, - server, - grpc.WithBlock(), - grpc.WithReturnConnectionError(), - grpc.FailOnNonTempDialError(true), - grpc.WithUserAgent(userAgent), - grpc.WithTransportCredentials(creds), - grpc.WithKeepaliveParams(kacp), - ) -} - -func rootCAs(tlsConfig *tls.Config, file ...string) error { - pool := x509.NewCertPool() - for _, f := range file { - rootPEM, err := os.ReadFile(f) - if err != nil || rootPEM == nil { - return fmt.Errorf("agent: error loading or parsing rootCA file: %v", err) - } - ok := pool.AppendCertsFromPEM(rootPEM) - if !ok { - return fmt.Errorf("agent: failed to parse root certificate from %q", f) - } - } - tlsConfig.RootCAs = pool - return nil -} - -func clientCert(tlsConfig *tls.Config, certFile, keyFile string) error { - cert, err := tls.LoadX509KeyPair(certFile, keyFile) - if err != nil { - return fmt.Errorf("agent: error loading client certificate: %v", err) - } - cert.Leaf, err = x509.ParseCertificate(cert.Certificate[0]) - if err != nil { - return fmt.Errorf("agent: error parsing client certificate: %v", err) - } - tlsConfig.Certificates = []tls.Certificate{cert} - return nil -} - type Agent struct { client cloud.TestKubeCloudAPIClient handler fasthttp.RequestHandler @@ -309,7 +218,7 @@ func (ag *Agent) receiveCommand(ctx context.Context, stream cloud.TestKubeCloudA func (ag *Agent) runCommandLoop(ctx context.Context) error { if ag.proContext.APIKey != "" { - ctx = AddAPIKeyMeta(ctx, ag.proContext.APIKey) + ctx = agentclient.AddAPIKeyMeta(ctx, ag.proContext.APIKey) } ctx = metadata.AppendToOutgoingContext(ctx, clusterIDMeta, ag.clusterID) @@ -429,11 +338,6 @@ func (ag *Agent) executeCommand(_ context.Context, cmd *cloud.ExecuteRequest) *c } } -func AddAPIKeyMeta(ctx context.Context, apiKey string) context.Context { - md := metadata.Pairs(apiKeyMeta, apiKey) - return metadata.NewOutgoingContext(ctx, md) -} - type cloudResponse struct { resp *cloud.ExecuteRequest err error diff --git a/pkg/agent/agent_test.go b/pkg/agent/agent_test.go index 8cf5c15688b..fb6c6d17418 100644 --- a/pkg/agent/agent_test.go +++ b/pkg/agent/agent_test.go @@ -8,6 +8,7 @@ import ( "testing" "time" + agentclient "github.com/kubeshop/testkube/pkg/agent/client" "github.com/kubeshop/testkube/pkg/api/v1/testkube" "github.com/kubeshop/testkube/pkg/executor/output" "github.com/kubeshop/testkube/pkg/log" @@ -50,7 +51,7 @@ func TestCommandExecution(t *testing.T) { atomic.AddInt32(&msgCnt, 1) } - grpcConn, err := agent.NewGRPCConnection(context.Background(), true, false, url, "", "", "", log.DefaultLogger) + grpcConn, err := agentclient.NewGRPCConnection(context.Background(), true, false, url, "", "", "", log.DefaultLogger) ui.ExitOnError("error creating gRPC connection", err) defer grpcConn.Close() diff --git a/pkg/agent/client/client.go b/pkg/agent/client/client.go new file mode 100644 index 00000000000..eb863f2cd36 --- /dev/null +++ b/pkg/agent/client/client.go @@ -0,0 +1,111 @@ +package client + +import ( + "context" + "crypto/tls" + "crypto/x509" + "fmt" + "os" + "time" + + "go.uber.org/zap" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials" + "google.golang.org/grpc/credentials/insecure" + "google.golang.org/grpc/keepalive" + "google.golang.org/grpc/metadata" + + "github.com/kubeshop/testkube/pkg/version" +) + +const ( + timeout = 10 * time.Second + apiKeyMeta = "api-key" +) + +func NewGRPCConnection( + ctx context.Context, + isInsecure bool, + skipVerify bool, + server string, + certFile, keyFile, caFile string, + logger *zap.SugaredLogger, +) (*grpc.ClientConn, error) { + ctx, cancel := context.WithTimeout(ctx, timeout) + defer cancel() + tlsConfig := &tls.Config{MinVersion: tls.VersionTLS12} + if skipVerify { + tlsConfig = &tls.Config{InsecureSkipVerify: true} + } else { + if certFile != "" && keyFile != "" { + if err := clientCert(tlsConfig, certFile, keyFile); err != nil { + return nil, err + } + } + if caFile != "" { + if err := rootCAs(tlsConfig, caFile); err != nil { + return nil, err + } + } + } + + creds := credentials.NewTLS(tlsConfig) + if isInsecure { + creds = insecure.NewCredentials() + } + + kacp := keepalive.ClientParameters{ + Time: 10 * time.Second, + Timeout: 5 * time.Second, + PermitWithoutStream: true, + } + + userAgent := version.Version + "/" + version.Commit + logger.Infow("initiating connection with control plane", "userAgent", userAgent, "server", server, "insecure", isInsecure, "skipVerify", skipVerify, "certFile", certFile, "keyFile", keyFile, "caFile", caFile) + // WithBlock, WithReturnConnectionError and FailOnNonTempDialError are recommended not to be used by gRPC go docs + // but given that Agent will not work if gRPC connection cannot be established, it is ok to use them and assert issues at dial time + return grpc.DialContext( + ctx, + server, + grpc.WithBlock(), + grpc.WithReturnConnectionError(), + grpc.FailOnNonTempDialError(true), + grpc.WithUserAgent(userAgent), + grpc.WithTransportCredentials(creds), + grpc.WithKeepaliveParams(kacp), + ) +} + +func rootCAs(tlsConfig *tls.Config, file ...string) error { + pool := x509.NewCertPool() + for _, f := range file { + rootPEM, err := os.ReadFile(f) + if err != nil || rootPEM == nil { + return fmt.Errorf("agent: error loading or parsing rootCA file: %v", err) + } + ok := pool.AppendCertsFromPEM(rootPEM) + if !ok { + return fmt.Errorf("agent: failed to parse root certificate from %q", f) + } + } + tlsConfig.RootCAs = pool + return nil +} + +func clientCert(tlsConfig *tls.Config, certFile, keyFile string) error { + cert, err := tls.LoadX509KeyPair(certFile, keyFile) + if err != nil { + return fmt.Errorf("agent: error loading client certificate: %v", err) + } + cert.Leaf, err = x509.ParseCertificate(cert.Certificate[0]) + if err != nil { + return fmt.Errorf("agent: error parsing client certificate: %v", err) + } + tlsConfig.Certificates = []tls.Certificate{cert} + return nil +} + +func AddAPIKeyMeta(ctx context.Context, apiKey string) context.Context { + md := metadata.Pairs(apiKeyMeta, apiKey) + return metadata.NewOutgoingContext(ctx, md) +} diff --git a/pkg/agent/events.go b/pkg/agent/events.go index 957bfcdae94..c0ad490bd46 100644 --- a/pkg/agent/events.go +++ b/pkg/agent/events.go @@ -10,6 +10,7 @@ import ( "google.golang.org/grpc" "google.golang.org/grpc/encoding/gzip" + agentclient "github.com/kubeshop/testkube/pkg/agent/client" "github.com/kubeshop/testkube/pkg/api/v1/testkube" "github.com/kubeshop/testkube/pkg/cloud" "github.com/kubeshop/testkube/pkg/event/kind/common" @@ -62,7 +63,7 @@ func (ag *Agent) Notify(event testkube.Event) (result testkube.EventResult) { func (ag *Agent) runEventLoop(ctx context.Context) error { opts := []grpc.CallOption{grpc.UseCompressor(gzip.Name)} if ag.apiKey != "" { - ctx = AddAPIKeyMeta(ctx, ag.apiKey) + ctx = agentclient.AddAPIKeyMeta(ctx, ag.apiKey) } stream, err := ag.client.Send(ctx, opts...) diff --git a/pkg/agent/events_test.go b/pkg/agent/events_test.go index eeaab03be9e..663e06ac192 100644 --- a/pkg/agent/events_test.go +++ b/pkg/agent/events_test.go @@ -7,6 +7,7 @@ import ( "testing" "time" + agentclient "github.com/kubeshop/testkube/pkg/agent/client" "github.com/kubeshop/testkube/pkg/executor/output" "github.com/kubeshop/testkube/pkg/log" "github.com/kubeshop/testkube/pkg/ui" @@ -47,7 +48,7 @@ func TestEventLoop(t *testing.T) { logger, _ := zap.NewDevelopment() - grpcConn, err := agent.NewGRPCConnection(context.Background(), true, false, url, "", "", "", log.DefaultLogger) + grpcConn, err := agentclient.NewGRPCConnection(context.Background(), true, false, url, "", "", "", log.DefaultLogger) ui.ExitOnError("error creating gRPC connection", err) defer grpcConn.Close() diff --git a/pkg/agent/logs.go b/pkg/agent/logs.go index 4040e6f6f68..6fdc81e2aef 100644 --- a/pkg/agent/logs.go +++ b/pkg/agent/logs.go @@ -6,6 +6,7 @@ import ( "math" "time" + agentclient "github.com/kubeshop/testkube/pkg/agent/client" "github.com/kubeshop/testkube/pkg/cloud" "github.com/kubeshop/testkube/pkg/log" @@ -18,7 +19,7 @@ import ( const logStreamRetryCount = 10 func (ag *Agent) runLogStreamLoop(ctx context.Context) error { - ctx = AddAPIKeyMeta(ctx, ag.apiKey) + ctx = agentclient.AddAPIKeyMeta(ctx, ag.apiKey) ag.logger.Infow("initiating log streaming connection with control plane") // creates a new Stream from the client side. ctx is used for the lifetime of the stream. diff --git a/pkg/agent/logs_test.go b/pkg/agent/logs_test.go index 11fc9bfc797..0491467415c 100644 --- a/pkg/agent/logs_test.go +++ b/pkg/agent/logs_test.go @@ -9,6 +9,7 @@ import ( "github.com/kubeshop/testkube/internal/config" "github.com/kubeshop/testkube/pkg/agent" + agentclient "github.com/kubeshop/testkube/pkg/agent/client" "github.com/kubeshop/testkube/pkg/api/v1/testkube" "github.com/kubeshop/testkube/pkg/cloud" "github.com/kubeshop/testkube/pkg/executor/output" @@ -46,7 +47,7 @@ func TestLogStream(t *testing.T) { fmt.Fprintf(ctx, "Hi there! RequestURI is %q", ctx.RequestURI()) } - grpcConn, err := agent.NewGRPCConnection(context.Background(), true, false, url, "", "", "", log.DefaultLogger) + grpcConn, err := agentclient.NewGRPCConnection(context.Background(), true, false, url, "", "", "", log.DefaultLogger) ui.ExitOnError("error creating gRPC connection", err) defer grpcConn.Close() diff --git a/pkg/agent/testworkflows.go b/pkg/agent/testworkflows.go index 510a627a47c..2001fa9debd 100644 --- a/pkg/agent/testworkflows.go +++ b/pkg/agent/testworkflows.go @@ -12,6 +12,7 @@ import ( "google.golang.org/grpc" "google.golang.org/grpc/encoding/gzip" + agentclient "github.com/kubeshop/testkube/pkg/agent/client" "github.com/kubeshop/testkube/pkg/api/v1/testkube" "github.com/kubeshop/testkube/pkg/cloud" "github.com/kubeshop/testkube/pkg/testworkflows/executionworker/controller" @@ -29,7 +30,7 @@ func getTestWorkflowNotificationType(n testkube.TestWorkflowExecutionNotificatio } func (ag *Agent) runTestWorkflowNotificationsLoop(ctx context.Context) error { - ctx = AddAPIKeyMeta(ctx, ag.apiKey) + ctx = agentclient.AddAPIKeyMeta(ctx, ag.apiKey) ag.logger.Infow("initiating workflow notifications streaming connection with Cloud API") // creates a new Stream from the client side. ctx is used for the lifetime of the stream. diff --git a/pkg/api/v1/testkube/model_test_workflow_result_extended.go b/pkg/api/v1/testkube/model_test_workflow_result_extended.go index ff56259d0d9..0738c637ce9 100644 --- a/pkg/api/v1/testkube/model_test_workflow_result_extended.go +++ b/pkg/api/v1/testkube/model_test_workflow_result_extended.go @@ -9,7 +9,7 @@ import ( "github.com/gookit/color" - "github.com/kubeshop/testkube/cmd/testworkflow-init/data" + "github.com/kubeshop/testkube/cmd/testworkflow-init/constants" "github.com/kubeshop/testkube/internal/common" ) @@ -152,7 +152,7 @@ func (r *TestWorkflowResult) IsAnyStepPaused() bool { } func (r *TestWorkflowResult) IsKnownStep(ref string) bool { - if ref == data.InitStepName { + if ref == constants.InitStepName { return true } _, ok := r.Steps[ref] @@ -485,7 +485,7 @@ func (r *TestWorkflowResult) HealAborted(sigSequence []TestWorkflowSignature, er // Check all the executable steps in the sequence for i := range sigSequence { ref := sigSequence[i].Ref - if ref == data.InitStepName || !r.IsKnownStep(ref) || len(sigSequence[i].Children) > 0 { + if ref == constants.InitStepName || !r.IsKnownStep(ref) || len(sigSequence[i].Children) > 0 { continue } step := r.Steps[ref] @@ -510,7 +510,7 @@ func (r *TestWorkflowResult) HealAborted(sigSequence []TestWorkflowSignature, er // Do it from end, so we can handle nested groups for i := len(sigSequence) - 1; i >= 0; i-- { ref := sigSequence[i].Ref - if ref == data.InitStepName || !r.IsKnownStep(ref) || len(sigSequence[i].Children) == 0 { + if ref == constants.InitStepName || !r.IsKnownStep(ref) || len(sigSequence[i].Children) == 0 { continue } step := r.Steps[ref] diff --git a/pkg/cloud/data/executor/executor.go b/pkg/cloud/data/executor/executor.go index 0ef9c5c7337..47531d443e7 100644 --- a/pkg/cloud/data/executor/executor.go +++ b/pkg/cloud/data/executor/executor.go @@ -9,7 +9,7 @@ import ( "google.golang.org/grpc/encoding/gzip" "google.golang.org/protobuf/types/known/structpb" - "github.com/kubeshop/testkube/pkg/agent" + agentclient "github.com/kubeshop/testkube/pkg/agent/client" "github.com/kubeshop/testkube/pkg/cloud" ) @@ -44,7 +44,7 @@ func (e *CloudGRPCExecutor) Execute(ctx context.Context, command Command, payloa Command: string(command), Payload: &s, } - ctx = agent.AddAPIKeyMeta(ctx, e.apiKey) + ctx = agentclient.AddAPIKeyMeta(ctx, e.apiKey) opts := []grpc.CallOption{grpc.UseCompressor(gzip.Name), grpc.MaxCallRecvMsgSize(math.MaxInt32)} cmdResponse, err := e.client.Call(ctx, &req, opts...) if err != nil { diff --git a/pkg/cloud/service.pb.go b/pkg/cloud/service.pb.go index 7a140af6a52..12eabcdee49 100644 --- a/pkg/cloud/service.pb.go +++ b/pkg/cloud/service.pb.go @@ -7,13 +7,12 @@ package cloud import ( - reflect "reflect" - sync "sync" - protoreflect "google.golang.org/protobuf/reflect/protoreflect" protoimpl "google.golang.org/protobuf/runtime/protoimpl" emptypb "google.golang.org/protobuf/types/known/emptypb" structpb "google.golang.org/protobuf/types/known/structpb" + reflect "reflect" + sync "sync" ) const ( @@ -975,6 +974,108 @@ func (x *WebsocketData) GetBody() []byte { return nil } +type CredentialRequest struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Name string `protobuf:"bytes,1,opt,name=name,proto3" json:"name,omitempty"` + ExecutionId string `protobuf:"bytes,2,opt,name=execution_id,json=executionId,proto3" json:"execution_id,omitempty"` +} + +func (x *CredentialRequest) Reset() { + *x = CredentialRequest{} + if protoimpl.UnsafeEnabled { + mi := &file_proto_service_proto_msgTypes[12] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *CredentialRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*CredentialRequest) ProtoMessage() {} + +func (x *CredentialRequest) ProtoReflect() protoreflect.Message { + mi := &file_proto_service_proto_msgTypes[12] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use CredentialRequest.ProtoReflect.Descriptor instead. +func (*CredentialRequest) Descriptor() ([]byte, []int) { + return file_proto_service_proto_rawDescGZIP(), []int{12} +} + +func (x *CredentialRequest) GetName() string { + if x != nil { + return x.Name + } + return "" +} + +func (x *CredentialRequest) GetExecutionId() string { + if x != nil { + return x.ExecutionId + } + return "" +} + +type CredentialResponse struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Content []byte `protobuf:"bytes,1,opt,name=content,proto3" json:"content,omitempty"` +} + +func (x *CredentialResponse) Reset() { + *x = CredentialResponse{} + if protoimpl.UnsafeEnabled { + mi := &file_proto_service_proto_msgTypes[13] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *CredentialResponse) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*CredentialResponse) ProtoMessage() {} + +func (x *CredentialResponse) ProtoReflect() protoreflect.Message { + mi := &file_proto_service_proto_msgTypes[13] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use CredentialResponse.ProtoReflect.Descriptor instead. +func (*CredentialResponse) Descriptor() ([]byte, []int) { + return file_proto_service_proto_rawDescGZIP(), []int{13} +} + +func (x *CredentialResponse) GetContent() []byte { + if x != nil { + return x.Content + } + return nil +} + var File_proto_service_proto protoreflect.FileDescriptor var file_proto_service_proto_rawDesc = []byte{ @@ -1084,66 +1185,79 @@ var file_proto_service_proto_rawDesc = []byte{ 0x12, 0x25, 0x0a, 0x06, 0x6f, 0x70, 0x63, 0x6f, 0x64, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x0d, 0x2e, 0x63, 0x6c, 0x6f, 0x75, 0x64, 0x2e, 0x4f, 0x70, 0x63, 0x6f, 0x64, 0x65, 0x52, 0x06, 0x6f, 0x70, 0x63, 0x6f, 0x64, 0x65, 0x12, 0x12, 0x0a, 0x04, 0x62, 0x6f, 0x64, 0x79, 0x18, - 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x04, 0x62, 0x6f, 0x64, 0x79, 0x2a, 0x48, 0x0a, 0x15, 0x4c, - 0x6f, 0x67, 0x73, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, - 0x54, 0x79, 0x70, 0x65, 0x12, 0x16, 0x0a, 0x12, 0x53, 0x54, 0x52, 0x45, 0x41, 0x4d, 0x5f, 0x4c, - 0x4f, 0x47, 0x5f, 0x4d, 0x45, 0x53, 0x53, 0x41, 0x47, 0x45, 0x10, 0x00, 0x12, 0x17, 0x0a, 0x13, - 0x53, 0x54, 0x52, 0x45, 0x41, 0x4d, 0x5f, 0x48, 0x45, 0x41, 0x4c, 0x54, 0x48, 0x5f, 0x43, 0x48, - 0x45, 0x43, 0x4b, 0x10, 0x01, 0x2a, 0x69, 0x0a, 0x24, 0x54, 0x65, 0x73, 0x74, 0x57, 0x6f, 0x72, - 0x6b, 0x66, 0x6c, 0x6f, 0x77, 0x4e, 0x6f, 0x74, 0x69, 0x66, 0x69, 0x63, 0x61, 0x74, 0x69, 0x6f, - 0x6e, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x54, 0x79, 0x70, 0x65, 0x12, 0x1f, 0x0a, - 0x1b, 0x57, 0x4f, 0x52, 0x4b, 0x46, 0x4c, 0x4f, 0x57, 0x5f, 0x53, 0x54, 0x52, 0x45, 0x41, 0x4d, - 0x5f, 0x4c, 0x4f, 0x47, 0x5f, 0x4d, 0x45, 0x53, 0x53, 0x41, 0x47, 0x45, 0x10, 0x00, 0x12, 0x20, - 0x0a, 0x1c, 0x57, 0x4f, 0x52, 0x4b, 0x46, 0x4c, 0x4f, 0x57, 0x5f, 0x53, 0x54, 0x52, 0x45, 0x41, - 0x4d, 0x5f, 0x48, 0x45, 0x41, 0x4c, 0x54, 0x48, 0x5f, 0x43, 0x48, 0x45, 0x43, 0x4b, 0x10, 0x01, - 0x2a, 0x8a, 0x01, 0x0a, 0x1c, 0x54, 0x65, 0x73, 0x74, 0x57, 0x6f, 0x72, 0x6b, 0x66, 0x6c, 0x6f, - 0x77, 0x4e, 0x6f, 0x74, 0x69, 0x66, 0x69, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x54, 0x79, 0x70, - 0x65, 0x12, 0x19, 0x0a, 0x15, 0x57, 0x4f, 0x52, 0x4b, 0x46, 0x4c, 0x4f, 0x57, 0x5f, 0x53, 0x54, - 0x52, 0x45, 0x41, 0x4d, 0x5f, 0x45, 0x52, 0x52, 0x4f, 0x52, 0x10, 0x00, 0x12, 0x17, 0x0a, 0x13, - 0x57, 0x4f, 0x52, 0x4b, 0x46, 0x4c, 0x4f, 0x57, 0x5f, 0x53, 0x54, 0x52, 0x45, 0x41, 0x4d, 0x5f, - 0x4c, 0x4f, 0x47, 0x10, 0x01, 0x12, 0x1a, 0x0a, 0x16, 0x57, 0x4f, 0x52, 0x4b, 0x46, 0x4c, 0x4f, - 0x57, 0x5f, 0x53, 0x54, 0x52, 0x45, 0x41, 0x4d, 0x5f, 0x52, 0x45, 0x53, 0x55, 0x4c, 0x54, 0x10, - 0x02, 0x12, 0x1a, 0x0a, 0x16, 0x57, 0x4f, 0x52, 0x4b, 0x46, 0x4c, 0x4f, 0x57, 0x5f, 0x53, 0x54, - 0x52, 0x45, 0x41, 0x4d, 0x5f, 0x4f, 0x55, 0x54, 0x50, 0x55, 0x54, 0x10, 0x03, 0x2a, 0x4c, 0x0a, - 0x06, 0x4f, 0x70, 0x63, 0x6f, 0x64, 0x65, 0x12, 0x0e, 0x0a, 0x0a, 0x55, 0x4e, 0x53, 0x50, 0x45, - 0x43, 0x46, 0x49, 0x45, 0x44, 0x10, 0x00, 0x12, 0x0e, 0x0a, 0x0a, 0x54, 0x45, 0x58, 0x54, 0x5f, - 0x46, 0x52, 0x41, 0x4d, 0x45, 0x10, 0x01, 0x12, 0x10, 0x0a, 0x0c, 0x42, 0x49, 0x4e, 0x41, 0x52, - 0x59, 0x5f, 0x46, 0x52, 0x41, 0x4d, 0x45, 0x10, 0x02, 0x12, 0x10, 0x0a, 0x0c, 0x48, 0x45, 0x41, - 0x4c, 0x54, 0x48, 0x5f, 0x43, 0x48, 0x45, 0x43, 0x4b, 0x10, 0x03, 0x32, 0x8d, 0x04, 0x0a, 0x10, - 0x54, 0x65, 0x73, 0x74, 0x4b, 0x75, 0x62, 0x65, 0x43, 0x6c, 0x6f, 0x75, 0x64, 0x41, 0x50, 0x49, - 0x12, 0x3c, 0x0a, 0x07, 0x45, 0x78, 0x65, 0x63, 0x75, 0x74, 0x65, 0x12, 0x16, 0x2e, 0x63, 0x6c, - 0x6f, 0x75, 0x64, 0x2e, 0x45, 0x78, 0x65, 0x63, 0x75, 0x74, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, - 0x6e, 0x73, 0x65, 0x1a, 0x15, 0x2e, 0x63, 0x6c, 0x6f, 0x75, 0x64, 0x2e, 0x45, 0x78, 0x65, 0x63, - 0x75, 0x74, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x28, 0x01, 0x30, 0x01, 0x12, 0x36, - 0x0a, 0x04, 0x53, 0x65, 0x6e, 0x64, 0x12, 0x14, 0x2e, 0x63, 0x6c, 0x6f, 0x75, 0x64, 0x2e, 0x57, - 0x65, 0x62, 0x73, 0x6f, 0x63, 0x6b, 0x65, 0x74, 0x44, 0x61, 0x74, 0x61, 0x1a, 0x16, 0x2e, 0x67, - 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, - 0x6d, 0x70, 0x74, 0x79, 0x28, 0x01, 0x12, 0x35, 0x0a, 0x04, 0x43, 0x61, 0x6c, 0x6c, 0x12, 0x15, - 0x2e, 0x63, 0x6c, 0x6f, 0x75, 0x64, 0x2e, 0x43, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x52, 0x65, - 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x16, 0x2e, 0x63, 0x6c, 0x6f, 0x75, 0x64, 0x2e, 0x43, 0x6f, - 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x41, 0x0a, - 0x0c, 0x45, 0x78, 0x65, 0x63, 0x75, 0x74, 0x65, 0x41, 0x73, 0x79, 0x6e, 0x63, 0x12, 0x16, 0x2e, - 0x63, 0x6c, 0x6f, 0x75, 0x64, 0x2e, 0x45, 0x78, 0x65, 0x63, 0x75, 0x74, 0x65, 0x52, 0x65, 0x73, - 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x1a, 0x15, 0x2e, 0x63, 0x6c, 0x6f, 0x75, 0x64, 0x2e, 0x45, 0x78, - 0x65, 0x63, 0x75, 0x74, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x28, 0x01, 0x30, 0x01, - 0x12, 0x48, 0x0a, 0x0d, 0x47, 0x65, 0x74, 0x4c, 0x6f, 0x67, 0x73, 0x53, 0x74, 0x72, 0x65, 0x61, - 0x6d, 0x12, 0x19, 0x2e, 0x63, 0x6c, 0x6f, 0x75, 0x64, 0x2e, 0x4c, 0x6f, 0x67, 0x73, 0x53, 0x74, - 0x72, 0x65, 0x61, 0x6d, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x1a, 0x18, 0x2e, 0x63, - 0x6c, 0x6f, 0x75, 0x64, 0x2e, 0x4c, 0x6f, 0x67, 0x73, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x52, - 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x28, 0x01, 0x30, 0x01, 0x12, 0x7b, 0x0a, 0x22, 0x47, 0x65, - 0x74, 0x54, 0x65, 0x73, 0x74, 0x57, 0x6f, 0x72, 0x6b, 0x66, 0x6c, 0x6f, 0x77, 0x4e, 0x6f, 0x74, - 0x69, 0x66, 0x69, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, - 0x12, 0x28, 0x2e, 0x63, 0x6c, 0x6f, 0x75, 0x64, 0x2e, 0x54, 0x65, 0x73, 0x74, 0x57, 0x6f, 0x72, - 0x6b, 0x66, 0x6c, 0x6f, 0x77, 0x4e, 0x6f, 0x74, 0x69, 0x66, 0x69, 0x63, 0x61, 0x74, 0x69, 0x6f, - 0x6e, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x1a, 0x27, 0x2e, 0x63, 0x6c, 0x6f, - 0x75, 0x64, 0x2e, 0x54, 0x65, 0x73, 0x74, 0x57, 0x6f, 0x72, 0x6b, 0x66, 0x6c, 0x6f, 0x77, 0x4e, - 0x6f, 0x74, 0x69, 0x66, 0x69, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x52, 0x65, 0x71, 0x75, - 0x65, 0x73, 0x74, 0x28, 0x01, 0x30, 0x01, 0x12, 0x42, 0x0a, 0x0d, 0x47, 0x65, 0x74, 0x50, 0x72, - 0x6f, 0x43, 0x6f, 0x6e, 0x74, 0x65, 0x78, 0x74, 0x12, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, + 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x04, 0x62, 0x6f, 0x64, 0x79, 0x22, 0x4a, 0x0a, 0x11, 0x43, + 0x72, 0x65, 0x64, 0x65, 0x6e, 0x74, 0x69, 0x61, 0x6c, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, + 0x12, 0x12, 0x0a, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, + 0x6e, 0x61, 0x6d, 0x65, 0x12, 0x21, 0x0a, 0x0c, 0x65, 0x78, 0x65, 0x63, 0x75, 0x74, 0x69, 0x6f, + 0x6e, 0x5f, 0x69, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0b, 0x65, 0x78, 0x65, 0x63, + 0x75, 0x74, 0x69, 0x6f, 0x6e, 0x49, 0x64, 0x22, 0x2e, 0x0a, 0x12, 0x43, 0x72, 0x65, 0x64, 0x65, + 0x6e, 0x74, 0x69, 0x61, 0x6c, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x18, 0x0a, + 0x07, 0x63, 0x6f, 0x6e, 0x74, 0x65, 0x6e, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x07, + 0x63, 0x6f, 0x6e, 0x74, 0x65, 0x6e, 0x74, 0x2a, 0x48, 0x0a, 0x15, 0x4c, 0x6f, 0x67, 0x73, 0x53, + 0x74, 0x72, 0x65, 0x61, 0x6d, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x54, 0x79, 0x70, 0x65, + 0x12, 0x16, 0x0a, 0x12, 0x53, 0x54, 0x52, 0x45, 0x41, 0x4d, 0x5f, 0x4c, 0x4f, 0x47, 0x5f, 0x4d, + 0x45, 0x53, 0x53, 0x41, 0x47, 0x45, 0x10, 0x00, 0x12, 0x17, 0x0a, 0x13, 0x53, 0x54, 0x52, 0x45, + 0x41, 0x4d, 0x5f, 0x48, 0x45, 0x41, 0x4c, 0x54, 0x48, 0x5f, 0x43, 0x48, 0x45, 0x43, 0x4b, 0x10, + 0x01, 0x2a, 0x69, 0x0a, 0x24, 0x54, 0x65, 0x73, 0x74, 0x57, 0x6f, 0x72, 0x6b, 0x66, 0x6c, 0x6f, + 0x77, 0x4e, 0x6f, 0x74, 0x69, 0x66, 0x69, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x52, 0x65, + 0x71, 0x75, 0x65, 0x73, 0x74, 0x54, 0x79, 0x70, 0x65, 0x12, 0x1f, 0x0a, 0x1b, 0x57, 0x4f, 0x52, + 0x4b, 0x46, 0x4c, 0x4f, 0x57, 0x5f, 0x53, 0x54, 0x52, 0x45, 0x41, 0x4d, 0x5f, 0x4c, 0x4f, 0x47, + 0x5f, 0x4d, 0x45, 0x53, 0x53, 0x41, 0x47, 0x45, 0x10, 0x00, 0x12, 0x20, 0x0a, 0x1c, 0x57, 0x4f, + 0x52, 0x4b, 0x46, 0x4c, 0x4f, 0x57, 0x5f, 0x53, 0x54, 0x52, 0x45, 0x41, 0x4d, 0x5f, 0x48, 0x45, + 0x41, 0x4c, 0x54, 0x48, 0x5f, 0x43, 0x48, 0x45, 0x43, 0x4b, 0x10, 0x01, 0x2a, 0x8a, 0x01, 0x0a, + 0x1c, 0x54, 0x65, 0x73, 0x74, 0x57, 0x6f, 0x72, 0x6b, 0x66, 0x6c, 0x6f, 0x77, 0x4e, 0x6f, 0x74, + 0x69, 0x66, 0x69, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x54, 0x79, 0x70, 0x65, 0x12, 0x19, 0x0a, + 0x15, 0x57, 0x4f, 0x52, 0x4b, 0x46, 0x4c, 0x4f, 0x57, 0x5f, 0x53, 0x54, 0x52, 0x45, 0x41, 0x4d, + 0x5f, 0x45, 0x52, 0x52, 0x4f, 0x52, 0x10, 0x00, 0x12, 0x17, 0x0a, 0x13, 0x57, 0x4f, 0x52, 0x4b, + 0x46, 0x4c, 0x4f, 0x57, 0x5f, 0x53, 0x54, 0x52, 0x45, 0x41, 0x4d, 0x5f, 0x4c, 0x4f, 0x47, 0x10, + 0x01, 0x12, 0x1a, 0x0a, 0x16, 0x57, 0x4f, 0x52, 0x4b, 0x46, 0x4c, 0x4f, 0x57, 0x5f, 0x53, 0x54, + 0x52, 0x45, 0x41, 0x4d, 0x5f, 0x52, 0x45, 0x53, 0x55, 0x4c, 0x54, 0x10, 0x02, 0x12, 0x1a, 0x0a, + 0x16, 0x57, 0x4f, 0x52, 0x4b, 0x46, 0x4c, 0x4f, 0x57, 0x5f, 0x53, 0x54, 0x52, 0x45, 0x41, 0x4d, + 0x5f, 0x4f, 0x55, 0x54, 0x50, 0x55, 0x54, 0x10, 0x03, 0x2a, 0x4c, 0x0a, 0x06, 0x4f, 0x70, 0x63, + 0x6f, 0x64, 0x65, 0x12, 0x0e, 0x0a, 0x0a, 0x55, 0x4e, 0x53, 0x50, 0x45, 0x43, 0x46, 0x49, 0x45, + 0x44, 0x10, 0x00, 0x12, 0x0e, 0x0a, 0x0a, 0x54, 0x45, 0x58, 0x54, 0x5f, 0x46, 0x52, 0x41, 0x4d, + 0x45, 0x10, 0x01, 0x12, 0x10, 0x0a, 0x0c, 0x42, 0x49, 0x4e, 0x41, 0x52, 0x59, 0x5f, 0x46, 0x52, + 0x41, 0x4d, 0x45, 0x10, 0x02, 0x12, 0x10, 0x0a, 0x0c, 0x48, 0x45, 0x41, 0x4c, 0x54, 0x48, 0x5f, + 0x43, 0x48, 0x45, 0x43, 0x4b, 0x10, 0x03, 0x32, 0xd3, 0x04, 0x0a, 0x10, 0x54, 0x65, 0x73, 0x74, + 0x4b, 0x75, 0x62, 0x65, 0x43, 0x6c, 0x6f, 0x75, 0x64, 0x41, 0x50, 0x49, 0x12, 0x3c, 0x0a, 0x07, + 0x45, 0x78, 0x65, 0x63, 0x75, 0x74, 0x65, 0x12, 0x16, 0x2e, 0x63, 0x6c, 0x6f, 0x75, 0x64, 0x2e, + 0x45, 0x78, 0x65, 0x63, 0x75, 0x74, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x1a, + 0x15, 0x2e, 0x63, 0x6c, 0x6f, 0x75, 0x64, 0x2e, 0x45, 0x78, 0x65, 0x63, 0x75, 0x74, 0x65, 0x52, + 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x28, 0x01, 0x30, 0x01, 0x12, 0x36, 0x0a, 0x04, 0x53, 0x65, + 0x6e, 0x64, 0x12, 0x14, 0x2e, 0x63, 0x6c, 0x6f, 0x75, 0x64, 0x2e, 0x57, 0x65, 0x62, 0x73, 0x6f, + 0x63, 0x6b, 0x65, 0x74, 0x44, 0x61, 0x74, 0x61, 0x1a, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, - 0x1a, 0x19, 0x2e, 0x63, 0x6c, 0x6f, 0x75, 0x64, 0x2e, 0x50, 0x72, 0x6f, 0x43, 0x6f, 0x6e, 0x74, - 0x65, 0x78, 0x74, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x42, 0x0b, 0x5a, 0x09, 0x70, - 0x6b, 0x67, 0x2f, 0x63, 0x6c, 0x6f, 0x75, 0x64, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x28, 0x01, 0x12, 0x35, 0x0a, 0x04, 0x43, 0x61, 0x6c, 0x6c, 0x12, 0x15, 0x2e, 0x63, 0x6c, 0x6f, + 0x75, 0x64, 0x2e, 0x43, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, + 0x74, 0x1a, 0x16, 0x2e, 0x63, 0x6c, 0x6f, 0x75, 0x64, 0x2e, 0x43, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, + 0x64, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x41, 0x0a, 0x0c, 0x45, 0x78, 0x65, + 0x63, 0x75, 0x74, 0x65, 0x41, 0x73, 0x79, 0x6e, 0x63, 0x12, 0x16, 0x2e, 0x63, 0x6c, 0x6f, 0x75, + 0x64, 0x2e, 0x45, 0x78, 0x65, 0x63, 0x75, 0x74, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, + 0x65, 0x1a, 0x15, 0x2e, 0x63, 0x6c, 0x6f, 0x75, 0x64, 0x2e, 0x45, 0x78, 0x65, 0x63, 0x75, 0x74, + 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x28, 0x01, 0x30, 0x01, 0x12, 0x48, 0x0a, 0x0d, + 0x47, 0x65, 0x74, 0x4c, 0x6f, 0x67, 0x73, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x12, 0x19, 0x2e, + 0x63, 0x6c, 0x6f, 0x75, 0x64, 0x2e, 0x4c, 0x6f, 0x67, 0x73, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, + 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x1a, 0x18, 0x2e, 0x63, 0x6c, 0x6f, 0x75, 0x64, + 0x2e, 0x4c, 0x6f, 0x67, 0x73, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x52, 0x65, 0x71, 0x75, 0x65, + 0x73, 0x74, 0x28, 0x01, 0x30, 0x01, 0x12, 0x7b, 0x0a, 0x22, 0x47, 0x65, 0x74, 0x54, 0x65, 0x73, + 0x74, 0x57, 0x6f, 0x72, 0x6b, 0x66, 0x6c, 0x6f, 0x77, 0x4e, 0x6f, 0x74, 0x69, 0x66, 0x69, 0x63, + 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x12, 0x28, 0x2e, 0x63, + 0x6c, 0x6f, 0x75, 0x64, 0x2e, 0x54, 0x65, 0x73, 0x74, 0x57, 0x6f, 0x72, 0x6b, 0x66, 0x6c, 0x6f, + 0x77, 0x4e, 0x6f, 0x74, 0x69, 0x66, 0x69, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x52, 0x65, + 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x1a, 0x27, 0x2e, 0x63, 0x6c, 0x6f, 0x75, 0x64, 0x2e, 0x54, + 0x65, 0x73, 0x74, 0x57, 0x6f, 0x72, 0x6b, 0x66, 0x6c, 0x6f, 0x77, 0x4e, 0x6f, 0x74, 0x69, 0x66, + 0x69, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x28, + 0x01, 0x30, 0x01, 0x12, 0x42, 0x0a, 0x0d, 0x47, 0x65, 0x74, 0x50, 0x72, 0x6f, 0x43, 0x6f, 0x6e, + 0x74, 0x65, 0x78, 0x74, 0x12, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, + 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x1a, 0x19, 0x2e, 0x63, + 0x6c, 0x6f, 0x75, 0x64, 0x2e, 0x50, 0x72, 0x6f, 0x43, 0x6f, 0x6e, 0x74, 0x65, 0x78, 0x74, 0x52, + 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x44, 0x0a, 0x0d, 0x47, 0x65, 0x74, 0x43, 0x72, + 0x65, 0x64, 0x65, 0x6e, 0x74, 0x69, 0x61, 0x6c, 0x12, 0x18, 0x2e, 0x63, 0x6c, 0x6f, 0x75, 0x64, + 0x2e, 0x43, 0x72, 0x65, 0x64, 0x65, 0x6e, 0x74, 0x69, 0x61, 0x6c, 0x52, 0x65, 0x71, 0x75, 0x65, + 0x73, 0x74, 0x1a, 0x19, 0x2e, 0x63, 0x6c, 0x6f, 0x75, 0x64, 0x2e, 0x43, 0x72, 0x65, 0x64, 0x65, + 0x6e, 0x74, 0x69, 0x61, 0x6c, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x42, 0x0b, 0x5a, + 0x09, 0x70, 0x6b, 0x67, 0x2f, 0x63, 0x6c, 0x6f, 0x75, 0x64, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, + 0x6f, 0x33, } var ( @@ -1159,7 +1273,7 @@ func file_proto_service_proto_rawDescGZIP() []byte { } var file_proto_service_proto_enumTypes = make([]protoimpl.EnumInfo, 4) -var file_proto_service_proto_msgTypes = make([]protoimpl.MessageInfo, 14) +var file_proto_service_proto_msgTypes = make([]protoimpl.MessageInfo, 16) var file_proto_service_proto_goTypes = []interface{}{ (LogsStreamRequestType)(0), // 0: cloud.LogsStreamRequestType (TestWorkflowNotificationsRequestType)(0), // 1: cloud.TestWorkflowNotificationsRequestType @@ -1177,19 +1291,21 @@ var file_proto_service_proto_goTypes = []interface{}{ (*HeaderValue)(nil), // 13: cloud.HeaderValue (*ExecuteResponse)(nil), // 14: cloud.ExecuteResponse (*WebsocketData)(nil), // 15: cloud.WebsocketData - nil, // 16: cloud.ExecuteRequest.HeadersEntry - nil, // 17: cloud.ExecuteResponse.HeadersEntry - (*structpb.Struct)(nil), // 18: google.protobuf.Struct - (*emptypb.Empty)(nil), // 19: google.protobuf.Empty + (*CredentialRequest)(nil), // 16: cloud.CredentialRequest + (*CredentialResponse)(nil), // 17: cloud.CredentialResponse + nil, // 18: cloud.ExecuteRequest.HeadersEntry + nil, // 19: cloud.ExecuteResponse.HeadersEntry + (*structpb.Struct)(nil), // 20: google.protobuf.Struct + (*emptypb.Empty)(nil), // 21: google.protobuf.Empty } var file_proto_service_proto_depIdxs = []int32{ 0, // 0: cloud.LogsStreamRequest.request_type:type_name -> cloud.LogsStreamRequestType - 18, // 1: cloud.CommandRequest.payload:type_name -> google.protobuf.Struct - 16, // 2: cloud.ExecuteRequest.headers:type_name -> cloud.ExecuteRequest.HeadersEntry + 20, // 1: cloud.CommandRequest.payload:type_name -> google.protobuf.Struct + 18, // 2: cloud.ExecuteRequest.headers:type_name -> cloud.ExecuteRequest.HeadersEntry 1, // 3: cloud.TestWorkflowNotificationsRequest.request_type:type_name -> cloud.TestWorkflowNotificationsRequestType 2, // 4: cloud.TestWorkflowNotificationsResponse.type:type_name -> cloud.TestWorkflowNotificationType 12, // 5: cloud.ProContextResponse.capabilities:type_name -> cloud.Capability - 17, // 6: cloud.ExecuteResponse.headers:type_name -> cloud.ExecuteResponse.HeadersEntry + 19, // 6: cloud.ExecuteResponse.headers:type_name -> cloud.ExecuteResponse.HeadersEntry 3, // 7: cloud.WebsocketData.opcode:type_name -> cloud.Opcode 13, // 8: cloud.ExecuteRequest.HeadersEntry.value:type_name -> cloud.HeaderValue 13, // 9: cloud.ExecuteResponse.HeadersEntry.value:type_name -> cloud.HeaderValue @@ -1199,16 +1315,18 @@ var file_proto_service_proto_depIdxs = []int32{ 14, // 13: cloud.TestKubeCloudAPI.ExecuteAsync:input_type -> cloud.ExecuteResponse 5, // 14: cloud.TestKubeCloudAPI.GetLogsStream:input_type -> cloud.LogsStreamResponse 10, // 15: cloud.TestKubeCloudAPI.GetTestWorkflowNotificationsStream:input_type -> cloud.TestWorkflowNotificationsResponse - 19, // 16: cloud.TestKubeCloudAPI.GetProContext:input_type -> google.protobuf.Empty - 8, // 17: cloud.TestKubeCloudAPI.Execute:output_type -> cloud.ExecuteRequest - 19, // 18: cloud.TestKubeCloudAPI.Send:output_type -> google.protobuf.Empty - 7, // 19: cloud.TestKubeCloudAPI.Call:output_type -> cloud.CommandResponse - 8, // 20: cloud.TestKubeCloudAPI.ExecuteAsync:output_type -> cloud.ExecuteRequest - 4, // 21: cloud.TestKubeCloudAPI.GetLogsStream:output_type -> cloud.LogsStreamRequest - 9, // 22: cloud.TestKubeCloudAPI.GetTestWorkflowNotificationsStream:output_type -> cloud.TestWorkflowNotificationsRequest - 11, // 23: cloud.TestKubeCloudAPI.GetProContext:output_type -> cloud.ProContextResponse - 17, // [17:24] is the sub-list for method output_type - 10, // [10:17] is the sub-list for method input_type + 21, // 16: cloud.TestKubeCloudAPI.GetProContext:input_type -> google.protobuf.Empty + 16, // 17: cloud.TestKubeCloudAPI.GetCredential:input_type -> cloud.CredentialRequest + 8, // 18: cloud.TestKubeCloudAPI.Execute:output_type -> cloud.ExecuteRequest + 21, // 19: cloud.TestKubeCloudAPI.Send:output_type -> google.protobuf.Empty + 7, // 20: cloud.TestKubeCloudAPI.Call:output_type -> cloud.CommandResponse + 8, // 21: cloud.TestKubeCloudAPI.ExecuteAsync:output_type -> cloud.ExecuteRequest + 4, // 22: cloud.TestKubeCloudAPI.GetLogsStream:output_type -> cloud.LogsStreamRequest + 9, // 23: cloud.TestKubeCloudAPI.GetTestWorkflowNotificationsStream:output_type -> cloud.TestWorkflowNotificationsRequest + 11, // 24: cloud.TestKubeCloudAPI.GetProContext:output_type -> cloud.ProContextResponse + 17, // 25: cloud.TestKubeCloudAPI.GetCredential:output_type -> cloud.CredentialResponse + 18, // [18:26] is the sub-list for method output_type + 10, // [10:18] is the sub-list for method input_type 10, // [10:10] is the sub-list for extension type_name 10, // [10:10] is the sub-list for extension extendee 0, // [0:10] is the sub-list for field type_name @@ -1364,6 +1482,30 @@ func file_proto_service_proto_init() { return nil } } + file_proto_service_proto_msgTypes[12].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*CredentialRequest); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_proto_service_proto_msgTypes[13].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*CredentialResponse); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } } type x struct{} out := protoimpl.TypeBuilder{ @@ -1371,7 +1513,7 @@ func file_proto_service_proto_init() { GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: file_proto_service_proto_rawDesc, NumEnums: 4, - NumMessages: 14, + NumMessages: 16, NumExtensions: 0, NumServices: 1, }, diff --git a/pkg/cloud/service_grpc.pb.go b/pkg/cloud/service_grpc.pb.go index 8aab7a21191..5b329874fe6 100644 --- a/pkg/cloud/service_grpc.pb.go +++ b/pkg/cloud/service_grpc.pb.go @@ -8,7 +8,6 @@ package cloud import ( context "context" - grpc "google.golang.org/grpc" codes "google.golang.org/grpc/codes" status "google.golang.org/grpc/status" @@ -33,6 +32,7 @@ type TestKubeCloudAPIClient interface { GetLogsStream(ctx context.Context, opts ...grpc.CallOption) (TestKubeCloudAPI_GetLogsStreamClient, error) GetTestWorkflowNotificationsStream(ctx context.Context, opts ...grpc.CallOption) (TestKubeCloudAPI_GetTestWorkflowNotificationsStreamClient, error) GetProContext(ctx context.Context, in *emptypb.Empty, opts ...grpc.CallOption) (*ProContextResponse, error) + GetCredential(ctx context.Context, in *CredentialRequest, opts ...grpc.CallOption) (*CredentialResponse, error) } type testKubeCloudAPIClient struct { @@ -219,6 +219,15 @@ func (c *testKubeCloudAPIClient) GetProContext(ctx context.Context, in *emptypb. return out, nil } +func (c *testKubeCloudAPIClient) GetCredential(ctx context.Context, in *CredentialRequest, opts ...grpc.CallOption) (*CredentialResponse, error) { + out := new(CredentialResponse) + err := c.cc.Invoke(ctx, "/cloud.TestKubeCloudAPI/GetCredential", in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + // TestKubeCloudAPIServer is the server API for TestKubeCloudAPI service. // All implementations must embed UnimplementedTestKubeCloudAPIServer // for forward compatibility @@ -232,6 +241,7 @@ type TestKubeCloudAPIServer interface { GetLogsStream(TestKubeCloudAPI_GetLogsStreamServer) error GetTestWorkflowNotificationsStream(TestKubeCloudAPI_GetTestWorkflowNotificationsStreamServer) error GetProContext(context.Context, *emptypb.Empty) (*ProContextResponse, error) + GetCredential(context.Context, *CredentialRequest) (*CredentialResponse, error) mustEmbedUnimplementedTestKubeCloudAPIServer() } @@ -260,6 +270,9 @@ func (UnimplementedTestKubeCloudAPIServer) GetTestWorkflowNotificationsStream(Te func (UnimplementedTestKubeCloudAPIServer) GetProContext(context.Context, *emptypb.Empty) (*ProContextResponse, error) { return nil, status.Errorf(codes.Unimplemented, "method GetProContext not implemented") } +func (UnimplementedTestKubeCloudAPIServer) GetCredential(context.Context, *CredentialRequest) (*CredentialResponse, error) { + return nil, status.Errorf(codes.Unimplemented, "method GetCredential not implemented") +} func (UnimplementedTestKubeCloudAPIServer) mustEmbedUnimplementedTestKubeCloudAPIServer() {} // UnsafeTestKubeCloudAPIServer may be embedded to opt out of forward compatibility for this service. @@ -439,6 +452,24 @@ func _TestKubeCloudAPI_GetProContext_Handler(srv interface{}, ctx context.Contex return interceptor(ctx, in, info, handler) } +func _TestKubeCloudAPI_GetCredential_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(CredentialRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(TestKubeCloudAPIServer).GetCredential(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/cloud.TestKubeCloudAPI/GetCredential", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(TestKubeCloudAPIServer).GetCredential(ctx, req.(*CredentialRequest)) + } + return interceptor(ctx, in, info, handler) +} + // TestKubeCloudAPI_ServiceDesc is the grpc.ServiceDesc for TestKubeCloudAPI service. // It's only intended for direct use with grpc.RegisterService, // and not to be introspected or modified (even as a copy) @@ -454,6 +485,10 @@ var TestKubeCloudAPI_ServiceDesc = grpc.ServiceDesc{ MethodName: "GetProContext", Handler: _TestKubeCloudAPI_GetProContext_Handler, }, + { + MethodName: "GetCredential", + Handler: _TestKubeCloudAPI_GetCredential_Handler, + }, }, Streams: []grpc.StreamDesc{ { diff --git a/pkg/controlplane/server.go b/pkg/controlplane/server.go index 8589571a22c..a88a744c98e 100644 --- a/pkg/controlplane/server.go +++ b/pkg/controlplane/server.go @@ -62,6 +62,10 @@ func (s *Server) GetProContext(_ context.Context, _ *emptypb.Empty) (*cloud.ProC return nil, status.Error(codes.Unimplemented, "not supported in the standalone version") } +func (s *Server) GetCredential(_ context.Context, _ *cloud.CredentialRequest) (*cloud.CredentialResponse, error) { + return nil, status.Error(codes.Unimplemented, "not supported in the standalone version") +} + func (s *Server) ExecuteAsync(srv cloud.TestKubeCloudAPI_ExecuteAsyncServer) error { ctx, cancel := context.WithCancel(srv.Context()) g, _ := errgroup.WithContext(ctx) diff --git a/pkg/credentials/expressions.go b/pkg/credentials/expressions.go new file mode 100644 index 00000000000..8ca52ada84d --- /dev/null +++ b/pkg/credentials/expressions.go @@ -0,0 +1,38 @@ +package credentials + +import ( + "context" + "fmt" + + "github.com/kubeshop/testkube/pkg/expressions" +) + +func NewCredentialMachine(repository CredentialRepository, observers ...func(name string, value string)) expressions.Machine { + return expressions.NewMachine().RegisterFunction("credential", func(values ...expressions.StaticValue) (interface{}, bool, error) { + computed := false + if len(values) == 2 { + if values[1].IsBool() { + computed, _ = values[1].BoolValue() + } else { + return nil, true, fmt.Errorf(`"credential" function expects 2nd argument to be boolean, %s provided`, values[1].String()) + } + } else if len(values) != 1 { + return nil, true, fmt.Errorf(`"credential" function expects 1-2 arguments, %d provided`, len(values)) + } + + name, _ := values[0].StringValue() + value, err := repository.Get(context.Background(), name) + if err != nil { + return nil, true, err + } + if computed { + expr, err := expressions.CompileAndResolveTemplate(string(value)) + return expr, true, err + } + valueStr := string(value) + for i := range observers { + observers[i](name, valueStr) + } + return valueStr, true, nil + }) +} diff --git a/pkg/credentials/mock_repository.go b/pkg/credentials/mock_repository.go new file mode 100644 index 00000000000..ec907f03a95 --- /dev/null +++ b/pkg/credentials/mock_repository.go @@ -0,0 +1,50 @@ +// Code generated by MockGen. DO NOT EDIT. +// Source: github.com/kubeshop/testkube/pkg/credentials (interfaces: CredentialRepository) + +// Package credentials is a generated GoMock package. +package credentials + +import ( + context "context" + reflect "reflect" + + gomock "github.com/golang/mock/gomock" +) + +// MockCredentialRepository is a mock of CredentialRepository interface. +type MockCredentialRepository struct { + ctrl *gomock.Controller + recorder *MockCredentialRepositoryMockRecorder +} + +// MockCredentialRepositoryMockRecorder is the mock recorder for MockCredentialRepository. +type MockCredentialRepositoryMockRecorder struct { + mock *MockCredentialRepository +} + +// NewMockCredentialRepository creates a new mock instance. +func NewMockCredentialRepository(ctrl *gomock.Controller) *MockCredentialRepository { + mock := &MockCredentialRepository{ctrl: ctrl} + mock.recorder = &MockCredentialRepositoryMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockCredentialRepository) EXPECT() *MockCredentialRepositoryMockRecorder { + return m.recorder +} + +// Get mocks base method. +func (m *MockCredentialRepository) Get(arg0 context.Context, arg1 string) ([]byte, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Get", arg0, arg1) + ret0, _ := ret[0].([]byte) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// Get indicates an expected call of Get. +func (mr *MockCredentialRepositoryMockRecorder) Get(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Get", reflect.TypeOf((*MockCredentialRepository)(nil).Get), arg0, arg1) +} diff --git a/pkg/credentials/repository.go b/pkg/credentials/repository.go new file mode 100644 index 00000000000..65f568a2c34 --- /dev/null +++ b/pkg/credentials/repository.go @@ -0,0 +1,37 @@ +package credentials + +import ( + "context" + "math" + + "google.golang.org/grpc" + "google.golang.org/grpc/encoding/gzip" + + agentclient "github.com/kubeshop/testkube/pkg/agent/client" + "github.com/kubeshop/testkube/pkg/cloud" +) + +//go:generate mockgen -destination=./mock_repository.go -package=credentials "github.com/kubeshop/testkube/pkg/credentials" CredentialRepository +type CredentialRepository interface { + Get(ctx context.Context, name string) ([]byte, error) +} + +type credentialRepository struct { + client cloud.TestKubeCloudAPIClient + apiKey string + executionId string +} + +func NewCredentialRepository(client cloud.TestKubeCloudAPIClient, apiKey, executionId string) CredentialRepository { + return &credentialRepository{client: client, apiKey: apiKey, executionId: executionId} +} + +func (c *credentialRepository) Get(ctx context.Context, name string) ([]byte, error) { + ctx = agentclient.AddAPIKeyMeta(ctx, c.apiKey) + opts := []grpc.CallOption{grpc.UseCompressor(gzip.Name), grpc.MaxCallRecvMsgSize(math.MaxInt32)} + result, err := c.client.GetCredential(ctx, &cloud.CredentialRequest{Name: name, ExecutionId: c.executionId}, opts...) + if err != nil { + return nil, err + } + return result.Content, nil +} diff --git a/pkg/envs/bool.go b/pkg/envs/bool.go deleted file mode 100644 index 28ada150383..00000000000 --- a/pkg/envs/bool.go +++ /dev/null @@ -1,18 +0,0 @@ -package envs - -import ( - "os" - "strconv" -) - -func IsTrue(name string) (is bool) { - var err error - if val, ok := os.LookupEnv(name); ok { - is, err = strconv.ParseBool(val) - if err != nil { - return false - } - } - - return is -} diff --git a/pkg/executor/scraper/factory/factory.go b/pkg/executor/scraper/factory/factory.go index 1b8d89bff00..bf4b1d6d46c 100644 --- a/pkg/executor/scraper/factory/factory.go +++ b/pkg/executor/scraper/factory/factory.go @@ -8,7 +8,7 @@ import ( cloudevents "github.com/cloudevents/sdk-go/v2" "github.com/pkg/errors" - "github.com/kubeshop/testkube/pkg/agent" + agentclient "github.com/kubeshop/testkube/pkg/agent/client" "github.com/kubeshop/testkube/pkg/cloud" cloudscraper "github.com/kubeshop/testkube/pkg/cloud/data/artifact" cloudexecutor "github.com/kubeshop/testkube/pkg/cloud/data/executor" @@ -102,7 +102,7 @@ func getRemoteStorageUploader(ctx context.Context, params envs.Params) (uploader output.PrintLogf( "%s Uploading artifacts using Remote Storage Uploader (timeout:%ds, agentInsecure:%v, agentSkipVerify: %v, url: %s, scraperSkipVerify: %v)", ui.IconCheckMark, params.ProConnectionTimeoutSec, params.ProAPITLSInsecure, params.ProAPISkipVerify, params.ProAPIURL, params.SkipVerify) - grpcConn, err := agent.NewGRPCConnection( + grpcConn, err := agentclient.NewGRPCConnection( ctxTimeout, params.ProAPITLSInsecure, params.ProAPISkipVerify, diff --git a/pkg/log/log.go b/pkg/log/log.go index 0806edb835d..73dbd52473e 100644 --- a/pkg/log/log.go +++ b/pkg/log/log.go @@ -2,19 +2,51 @@ package log import ( "log" + "os" + "strconv" "go.uber.org/zap" "go.uber.org/zap/zapcore" - - "github.com/kubeshop/testkube/pkg/envs" ) +func IsTrue(name string) (is bool) { + var err error + if val, ok := os.LookupEnv(name); ok { + is, err = strconv.ParseBool(val) + if err != nil { + return false + } + } + + return is +} + // New returns new logger instance func New() *zap.SugaredLogger { atomicLevel := zap.NewAtomicLevel() atomicLevel.SetLevel(zap.InfoLevel) - if envs.IsTrue("DEBUG") { + if IsTrue("DEBUG") { + atomicLevel.SetLevel(zap.DebugLevel) + } + + zapCfg := zap.NewProductionConfig() + zapCfg.Level = atomicLevel + zapCfg.EncoderConfig.EncodeTime = zapcore.RFC3339TimeEncoder + + z, err := zapCfg.Build() + if err != nil { + log.Fatalf("can't initialize zap logger: %v", err) + } + logger := z.Sugar() + return logger +} + +func NewSilent() *zap.SugaredLogger { + atomicLevel := zap.NewAtomicLevel() + + atomicLevel.SetLevel(zap.WarnLevel) + if IsTrue("DEBUG") { atomicLevel.SetLevel(zap.DebugLevel) } diff --git a/pkg/logs/adapter/cloud_test.go b/pkg/logs/adapter/cloud_test.go index badd6c6caac..c087d5b05cd 100644 --- a/pkg/logs/adapter/cloud_test.go +++ b/pkg/logs/adapter/cloud_test.go @@ -19,7 +19,7 @@ import ( "google.golang.org/grpc/metadata" "google.golang.org/grpc/status" - "github.com/kubeshop/testkube/pkg/agent" + agentclient "github.com/kubeshop/testkube/pkg/agent/client" "github.com/kubeshop/testkube/pkg/log" "github.com/kubeshop/testkube/pkg/logs/events" "github.com/kubeshop/testkube/pkg/logs/pb" @@ -38,7 +38,7 @@ func TestCloudAdapter(t *testing.T) { id := "id1" // and connection - grpcConn, err := agent.NewGRPCConnection(ctx, true, true, ts.Url, "", "", "", log.DefaultLogger) + grpcConn, err := agentclient.NewGRPCConnection(ctx, true, true, ts.Url, "", "", "", log.DefaultLogger) assert.NoError(t, err) defer grpcConn.Close() @@ -80,7 +80,7 @@ func TestCloudAdapter(t *testing.T) { id3 := "id3" // and connection - grpcConn, err := agent.NewGRPCConnection(ctx, true, true, ts.Url, "", "", "", log.DefaultLogger) + grpcConn, err := agentclient.NewGRPCConnection(ctx, true, true, ts.Url, "", "", "", log.DefaultLogger) assert.NoError(t, err) defer grpcConn.Close() grpcClient := pb.NewCloudLogsServiceClient(grpcConn) @@ -129,7 +129,7 @@ func TestCloudAdapter(t *testing.T) { id := "id1M" // and grpc connetion to the server - grpcConn, err := agent.NewGRPCConnection(ctx, true, true, ts.Url, "", "", "", log.DefaultLogger) + grpcConn, err := agentclient.NewGRPCConnection(ctx, true, true, ts.Url, "", "", "", log.DefaultLogger) assert.NoError(t, err) defer grpcConn.Close() @@ -163,7 +163,7 @@ func TestCloudAdapter(t *testing.T) { ctx := context.Background() // and grpc connetion to the server - grpcConn, err := agent.NewGRPCConnection(ctx, true, true, ts.Url, "", "", "", log.DefaultLogger) + grpcConn, err := agentclient.NewGRPCConnection(ctx, true, true, ts.Url, "", "", "", log.DefaultLogger) assert.NoError(t, err) defer grpcConn.Close() diff --git a/pkg/testworkflows/executionworker/controller/notifier.go b/pkg/testworkflows/executionworker/controller/notifier.go index b28a5fa225d..ea76a2f1ce6 100644 --- a/pkg/testworkflows/executionworker/controller/notifier.go +++ b/pkg/testworkflows/executionworker/controller/notifier.go @@ -7,7 +7,6 @@ import ( "time" "github.com/kubeshop/testkube/cmd/testworkflow-init/constants" - "github.com/kubeshop/testkube/cmd/testworkflow-init/data" "github.com/kubeshop/testkube/cmd/testworkflow-init/instructions" "github.com/kubeshop/testkube/internal/common" "github.com/kubeshop/testkube/pkg/api/v1/testkube" @@ -75,7 +74,7 @@ func (n *notifier) Raw(ref string, ts time.Time, message string, temporary bool) n.lastTs = ts } if message != "" { - if ref == data.InitStepName { + if ref == constants.InitStepName { ref = "" } n.send(Notification{ @@ -107,7 +106,7 @@ func (n *notifier) Event(ref string, ts time.Time, level, reason, message string } func (n *notifier) Output(ref string, ts time.Time, output *instructions.Instruction) { - if ref == data.InitStepName { + if ref == constants.InitStepName { ref = "" } else if ref != "" { if _, ok := n.result.Steps[ref]; !ok { @@ -169,7 +168,7 @@ func (n *notifier) Instruction(ts time.Time, hint instructions.Instruction) { ts = ts.UTC() // Load the current step information - init := hint.Ref == data.InitStepName + init := hint.Ref == constants.InitStepName step, ok := n.result.Steps[hint.Ref] if init { step = *n.result.Initialization @@ -322,7 +321,7 @@ func (n *notifier) fillGaps(force bool) { // TODO: estimate startedAt/finishedAt too? - if ref == data.InitStepName { + if ref == constants.InitStepName { n.result.Initialization.Status = common.Ptr(container.Statuses[refIndexes[ref]].Status) n.result.Initialization.ExitCode = float64(container.Statuses[refIndexes[ref]].ExitCode) } else { diff --git a/pkg/testworkflows/executionworker/controller/utils.go b/pkg/testworkflows/executionworker/controller/utils.go index 4232a561f09..5eb97bdb21f 100644 --- a/pkg/testworkflows/executionworker/controller/utils.go +++ b/pkg/testworkflows/executionworker/controller/utils.go @@ -6,7 +6,7 @@ import ( corev1 "k8s.io/api/core/v1" - "github.com/kubeshop/testkube/cmd/testworkflow-init/data" + "github.com/kubeshop/testkube/cmd/testworkflow-init/constants" "github.com/kubeshop/testkube/pkg/api/v1/testkube" "github.com/kubeshop/testkube/pkg/testworkflows/testworkflowprocessor/action/actiontypes" "github.com/kubeshop/testkube/pkg/testworkflows/testworkflowprocessor/action/actiontypes/lite" @@ -48,8 +48,8 @@ func ExtractRefsFromActionList(list actiontypes.ActionList) (started []string, f for i := range list { switch list[i].Type() { case lite.ActionTypeSetup: - started = append(started, data.InitStepName) - finished = append(finished, data.InitStepName) + started = append(started, constants.InitStepName) + finished = append(finished, constants.InitStepName) case lite.ActionTypeStart: started = append(started, *list[i].Start) case lite.ActionTypeEnd: diff --git a/pkg/testworkflows/executionworker/controller/watchers/commons.go b/pkg/testworkflows/executionworker/controller/watchers/commons.go index 044012e517c..4c8378177da 100644 --- a/pkg/testworkflows/executionworker/controller/watchers/commons.go +++ b/pkg/testworkflows/executionworker/controller/watchers/commons.go @@ -14,7 +14,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/watch" - "github.com/kubeshop/testkube/cmd/testworkflow-init/data" + "github.com/kubeshop/testkube/cmd/testworkflow-init/constants" "github.com/kubeshop/testkube/pkg/api/v1/testkube" ) @@ -325,7 +325,7 @@ func ReadContainerResult(status *corev1.ContainerStatus, errorFallback string) C } // Gather information - stepStatus := testkube.TestWorkflowStepStatus(data.StepStatusFromCode(match[1])) + stepStatus := testkube.TestWorkflowStepStatus(constants.StepStatusFromCode(match[1])) exitCode, _ := strconv.Atoi(match[2]) // Don't trust after there is `aborted` status detected diff --git a/pkg/testworkflows/executionworker/controller/watchinstrumentedpod.go b/pkg/testworkflows/executionworker/controller/watchinstrumentedpod.go index a5de517ec6a..e050805a0df 100644 --- a/pkg/testworkflows/executionworker/controller/watchinstrumentedpod.go +++ b/pkg/testworkflows/executionworker/controller/watchinstrumentedpod.go @@ -9,7 +9,6 @@ import ( "k8s.io/client-go/kubernetes" "github.com/kubeshop/testkube/cmd/testworkflow-init/constants" - "github.com/kubeshop/testkube/cmd/testworkflow-init/data" "github.com/kubeshop/testkube/cmd/testworkflow-init/instructions" "github.com/kubeshop/testkube/pkg/api/v1/testkube" watchers2 "github.com/kubeshop/testkube/pkg/testworkflows/executionworker/controller/watchers" @@ -101,7 +100,7 @@ func WatchInstrumentedPod(parentCtx context.Context, clientSet kubernetes.Interf initialRefs := make([]string, len(actions)) for i := range refs { for j := range refs[i] { - if refs[i][j] == data.InitStepName { + if refs[i][j] == constants.InitStepName { initialRefs[i] = "" break } @@ -115,7 +114,7 @@ func WatchInstrumentedPod(parentCtx context.Context, clientSet kubernetes.Interf } // Iterate over containers - lastStarted := data.InitStepName + lastStarted := constants.InitStepName containersReady := false for containerIndex := 0; containerIndex < len(refs); containerIndex++ { aborted := false diff --git a/pkg/testworkflows/testworkflowprocessor/action/actiontypes/analysis.go b/pkg/testworkflows/testworkflowprocessor/action/actiontypes/analysis.go index 97f527085c1..db1c7af3768 100644 --- a/pkg/testworkflows/testworkflowprocessor/action/actiontypes/analysis.go +++ b/pkg/testworkflows/testworkflowprocessor/action/actiontypes/analysis.go @@ -1,7 +1,7 @@ package actiontypes import ( - "github.com/kubeshop/testkube/cmd/testworkflow-init/data" + "github.com/kubeshop/testkube/cmd/testworkflow-init/constants" "github.com/kubeshop/testkube/pkg/expressions" "github.com/kubeshop/testkube/pkg/testworkflows/testworkflowprocessor/action/actiontypes/lite" ) @@ -14,7 +14,7 @@ func (a ActionList) GetLastRef() string { case lite.ActionTypeStart: return *a[i].Start case lite.ActionTypeSetup: - return data.InitStepName + return constants.InitStepName } } return "" diff --git a/pkg/testworkflows/testworkflowprocessor/secretmachine.go b/pkg/testworkflows/testworkflowprocessor/secretmachine.go index 84ba56943f8..5784f8e94a3 100644 --- a/pkg/testworkflows/testworkflowprocessor/secretmachine.go +++ b/pkg/testworkflows/testworkflowprocessor/secretmachine.go @@ -19,7 +19,7 @@ func createSecretMachine(mapEnvs map[string]corev1.EnvVarSource) expressions.Mac if values[2].IsBool() { computed, _ = values[2].BoolValue() } else { - return nil, true, fmt.Errorf(`"secret" function expects 3rd argument to be boolean, %s provided`, values[3].String()) + return nil, true, fmt.Errorf(`"secret" function expects 3rd argument to be boolean, %s provided`, values[2].String()) } } else if len(values) != 2 { return nil, true, fmt.Errorf(`"secret" function expects 2-3 arguments, %d provided`, len(values)) diff --git a/proto/service.proto b/proto/service.proto index 6911fb50b5f..7ed7f0db957 100644 --- a/proto/service.proto +++ b/proto/service.proto @@ -17,6 +17,7 @@ service TestKubeCloudAPI { rpc GetLogsStream(stream LogsStreamResponse) returns (stream LogsStreamRequest); rpc GetTestWorkflowNotificationsStream(stream TestWorkflowNotificationsResponse) returns (stream TestWorkflowNotificationsRequest); rpc GetProContext(google.protobuf.Empty) returns (ProContextResponse); + rpc GetCredential(CredentialRequest) returns (CredentialResponse); } enum LogsStreamRequestType { @@ -114,3 +115,12 @@ message WebsocketData { Opcode opcode = 1; bytes body = 2; } + +message CredentialRequest { + string name = 1; + string execution_id = 2; +} + +message CredentialResponse { + bytes content = 1; +}