diff --git a/go-chaos/backend/connection.go b/go-chaos/backend/connection.go index 3facdf25f..2131464ce 100644 --- a/go-chaos/backend/connection.go +++ b/go-chaos/backend/connection.go @@ -23,8 +23,8 @@ import ( v1 "k8s.io/api/core/v1" ) -func ConnectBrokers() error { - k8Client, err := internal.CreateK8Client() +func ConnectBrokers(kubeConfigPath string, namespace string) error { + k8Client, err := internal.CreateK8Client(kubeConfigPath, namespace) if err != nil { return err } @@ -55,8 +55,8 @@ func ConnectBrokers() error { return nil } -func ConnectGateway() error { - k8Client, err := internal.CreateK8Client() +func ConnectGateway(kubeConfigPath string, namespace string) error { + k8Client, err := internal.CreateK8Client(kubeConfigPath, namespace) if err != nil { return err } @@ -103,8 +103,8 @@ type DisconnectBrokerCfg struct { OneDirection bool } -func DisconnectBroker(disconnectBrokerCfg DisconnectBrokerCfg) error { - k8Client, err := prepareBrokerDisconnect() +func DisconnectBroker(kubeConfigPath string, namespace string, disconnectBrokerCfg DisconnectBrokerCfg) error { + k8Client, err := prepareBrokerDisconnect(kubeConfigPath, namespace) zbClient, closeFn, err := ConnectToZeebeCluster(k8Client) if err != nil { @@ -138,8 +138,8 @@ type DisconnectGatewayCfg struct { BrokerCfg Broker } -func DisconnectGateway(disconnectGatewayCfg DisconnectGatewayCfg) error { - k8Client, zbClient, closeFn, err := prepareGatewayDisconnect() +func DisconnectGateway(kubeConfigPath string, namespace string, disconnectGatewayCfg DisconnectGatewayCfg) error { + k8Client, zbClient, closeFn, err := prepareGatewayDisconnect(kubeConfigPath, namespace) if err != nil { return err } @@ -173,8 +173,8 @@ func DisconnectGateway(disconnectGatewayCfg DisconnectGatewayCfg) error { return nil } -func prepareGatewayDisconnect() (internal.K8Client, zbc.Client, func(), error) { - k8Client, err := prepareBrokerDisconnect() +func prepareGatewayDisconnect(kubeConfigPath string, namespace string) (internal.K8Client, zbc.Client, func(), error) { + k8Client, err := prepareBrokerDisconnect(kubeConfigPath, namespace) if err != nil { return k8Client, nil, nil, err } @@ -199,8 +199,8 @@ func prepareGatewayDisconnect() (internal.K8Client, zbc.Client, func(), error) { return k8Client, zbClient, closeFn, nil } -func prepareBrokerDisconnect() (internal.K8Client, error) { - k8Client, err := internal.CreateK8Client() +func prepareBrokerDisconnect(kubeConfigPath string, namespace string) (internal.K8Client, error) { + k8Client, err := internal.CreateK8Client(kubeConfigPath, namespace) if err != nil { return internal.K8Client{}, err } diff --git a/go-chaos/cmd/backup.go b/go-chaos/cmd/backup.go index 3e1e772a4..153048227 100644 --- a/go-chaos/cmd/backup.go +++ b/go-chaos/cmd/backup.go @@ -15,6 +15,7 @@ package cmd import ( + "context" "encoding/json" "errors" "fmt" @@ -33,78 +34,83 @@ import ( "github.com/zeebe-io/zeebe-chaos/go-chaos/internal" ) -var ( - backupId string -) +func AddBackupCommand(rootCmd *cobra.Command, flags Flags) { -func init() { - rootCmd.AddCommand(backupCommand) - backupCommand.AddCommand(setupBackupCommand) - backupCommand.AddCommand(takeBackupCommand) - takeBackupCommand.Flags().StringVar(&backupId, "backupId", strconv.FormatInt(time.Now().UnixMilli(), 10), "optionally specify the backup id to use, uses the current timestamp by default") - backupCommand.AddCommand(waitForBackupCommand) - waitForBackupCommand.Flags().StringVar(&backupId, "backupId", strconv.FormatInt(time.Now().UnixMilli(), 10), "optionally specify the backup id to use, uses the current timestamp by default") - backupCommand.AddCommand(restoreBackupCommand) - restoreBackupCommand.Flags().StringVar(&backupId, "backupId", strconv.FormatInt(time.Now().UnixMilli(), 10), "optionally specify the backup id to use, uses the current timestamp by default") -} + var backupCommand = &cobra.Command{ + Use: "backup", + Short: "Controls Zeebe backups", + Long: "Can be used to take backups and query their status", + } -var backupCommand = &cobra.Command{ - Use: "backup", - Short: "Controls Zeebe backups", - Long: "Can be used to take backups and query their status", -} + var setupBackupCommand = &cobra.Command{ + Use: "setup", + Short: "Configures a zeebe cluster's backup settings", + RunE: func(cmd *cobra.Command, args []string) error { + return setupBackup(flags.kubeConfigPath, flags.namespace) + }, + } -var setupBackupCommand = &cobra.Command{ - Use: "setup", - Short: "Configures a zeebe cluster's backup settings", - RunE: setupBackup, -} + var takeBackupCommand = &cobra.Command{ + Use: "take", + Short: "Trigger a backup", + RunE: func(cmd *cobra.Command, args []string) error { + return takeBackup(flags) + }, + } -var takeBackupCommand = &cobra.Command{ - Use: "take", - Short: "Trigger a backup", - RunE: takeBackup, -} + var waitForBackupCommand = &cobra.Command{ + Use: "wait", + Short: "Wait for a backup to complete or fail", + RunE: func(cmd *cobra.Command, args []string) error { + return waitForBackup(flags) + }, + } -var waitForBackupCommand = &cobra.Command{ - Use: "wait", - Short: "Wait for a backup to complete or fail", - RunE: waitForBackup, -} + var restoreBackupCommand = &cobra.Command{ + Use: "restore", + Short: "Restore from a given backup id", + RunE: func(cmd *cobra.Command, args []string) error { + return restoreFromBackup(flags) + }, + } -var restoreBackupCommand = &cobra.Command{ - Use: "restore", - Short: "Restore from a given backup id", - RunE: restoreFromBackup, + rootCmd.AddCommand(backupCommand) + backupCommand.AddCommand(setupBackupCommand) + backupCommand.AddCommand(takeBackupCommand) + takeBackupCommand.Flags().StringVar(&flags.backupId, "backupId", strconv.FormatInt(time.Now().UnixMilli(), 10), "optionally specify the backup id to use, uses the current timestamp by default") + backupCommand.AddCommand(waitForBackupCommand) + waitForBackupCommand.Flags().StringVar(&flags.backupId, "backupId", strconv.FormatInt(time.Now().UnixMilli(), 10), "optionally specify the backup id to use, uses the current timestamp by default") + backupCommand.AddCommand(restoreBackupCommand) + restoreBackupCommand.Flags().StringVar(&flags.backupId, "backupId", strconv.FormatInt(time.Now().UnixMilli(), 10), "optionally specify the backup id to use, uses the current timestamp by default") } -func setupBackup(cmd *cobra.Command, _ []string) error { - k8Client, err := internal.CreateK8Client() +func setupBackup(kubeConfigPath string, namespace string) error { + k8Client, err := internal.CreateK8Client(kubeConfigPath, namespace) if err != nil { panic(err) } - namespace := k8Client.GetCurrentNamespace() + namespace = k8Client.GetCurrentNamespace() err = k8Client.PauseReconciliation() if err != nil { return err } - _, err = createBackupSecret(cmd, k8Client, namespace) + _, err = createBackupSecret(k8Client, namespace) if err != nil { return err } - err = setupStatefulSetForBackups(cmd, err, k8Client, namespace) + err = setupStatefulSetForBackups(err, k8Client, namespace) if err != nil { return err } - err = setupGatewayForBackups(cmd, err, k8Client, namespace) + err = setupGatewayForBackups(err, k8Client, namespace) return err } -func setupStatefulSetForBackups(cmd *cobra.Command, err error, k8Client internal.K8Client, namespace string) error { +func setupStatefulSetForBackups(err error, k8Client internal.K8Client, namespace string) error { sfs, err := k8Client.GetZeebeStatefulSet() if err != nil { return err @@ -122,17 +128,17 @@ func setupStatefulSetForBackups(cmd *cobra.Command, err error, k8Client internal core.EnvVar{Name: "MANAGEMENT_ENDPOINTS_BACKUPS_ENABLED", Value: "true"}, ) - _, err = k8Client.Clientset.AppsV1().StatefulSets(namespace).Update(cmd.Context(), sfs, meta.UpdateOptions{}) + _, err = k8Client.Clientset.AppsV1().StatefulSets(namespace).Update(context.TODO(), sfs, meta.UpdateOptions{}) return err } -func setupGatewayForBackups(cmd *cobra.Command, err error, k8Client internal.K8Client, namespace string) error { +func setupGatewayForBackups(err error, k8Client internal.K8Client, namespace string) error { saasGatewayLabels := meta.LabelSelector{ MatchLabels: map[string]string{"app.kubernetes.io/component": "standalone-gateway"}, } var gatewayDeployments *apps.DeploymentList - gatewayDeployments, err = k8Client.Clientset.AppsV1().Deployments(namespace).List(cmd.Context(), meta.ListOptions{LabelSelector: labels.Set(saasGatewayLabels.MatchLabels).String()}) + gatewayDeployments, err = k8Client.Clientset.AppsV1().Deployments(namespace).List(context.TODO(), meta.ListOptions{LabelSelector: labels.Set(saasGatewayLabels.MatchLabels).String()}) if err != nil { return err } @@ -141,7 +147,7 @@ func setupGatewayForBackups(cmd *cobra.Command, err error, k8Client internal.K8C MatchLabels: map[string]string{"app.kubernetes.io/component": "zeebe-gateway"}, } gatewayDeployments, err = k8Client.Clientset.AppsV1().Deployments(namespace).List( - cmd.Context(), + context.TODO(), meta.ListOptions{LabelSelector: labels.Set(selector.MatchLabels).String()}, ) if err != nil { @@ -156,13 +162,13 @@ func setupGatewayForBackups(cmd *cobra.Command, err error, k8Client internal.K8C core.EnvVar{Name: "MANAGEMENT_ENDPOINTS_WEB_EXPOSURE_INCLUDE", Value: "*"}, core.EnvVar{Name: "MANAGEMENT_ENDPOINTS_BACKUPS_ENABLED", Value: "true"}, ) - _, err = k8Client.Clientset.AppsV1().Deployments(namespace).Update(cmd.Context(), &gateway, meta.UpdateOptions{}) + _, err = k8Client.Clientset.AppsV1().Deployments(namespace).Update(context.TODO(), &gateway, meta.UpdateOptions{}) return err } -func createBackupSecret(cmd *cobra.Command, k8Client internal.K8Client, namespace string) (*core.Secret, error) { +func createBackupSecret(k8Client internal.K8Client, namespace string) (*core.Secret, error) { return k8Client.Clientset.CoreV1().Secrets(namespace).Create( - cmd.Context(), + context.TODO(), &core.Secret{ Type: "Opaque", ObjectMeta: meta.ObjectMeta{Name: "zeebe-backup-store-s3"}, @@ -176,8 +182,8 @@ func createBackupSecret(cmd *cobra.Command, k8Client internal.K8Client, namespac ) } -func takeBackup(*cobra.Command, []string) error { - k8Client, err := internal.CreateK8Client() +func takeBackup(flags Flags) error { + k8Client, err := createK8ClientWithFlags(flags) if err != nil { panic(err) } @@ -190,7 +196,7 @@ func takeBackup(*cobra.Command, []string) error { port := 9600 closePortForward := k8Client.MustGatewayPortForward(port, port) defer closePortForward() - url := fmt.Sprintf("http://localhost:%d/actuator/backups/%s", port, backupId) + url := fmt.Sprintf("http://localhost:%d/actuator/backups/%s", port, flags.backupId) resp, err := http.Post(url, "", nil) if err != nil { return err @@ -205,8 +211,8 @@ func takeBackup(*cobra.Command, []string) error { return err } -func waitForBackup(*cobra.Command, []string) error { - k8Client, err := internal.CreateK8Client() +func waitForBackup(flags Flags) error { + k8Client, err := createK8ClientWithFlags(flags) if err != nil { panic(err) } @@ -216,7 +222,7 @@ func waitForBackup(*cobra.Command, []string) error { defer closePortForward() for { - backup, err := getBackupStatus(port, backupId) + backup, err := getBackupStatus(port, flags.backupId) if err != nil { return err } @@ -234,8 +240,8 @@ func waitForBackup(*cobra.Command, []string) error { } -func restoreFromBackup(cmd *cobra.Command, _ []string) error { - k8Client, err := internal.CreateK8Client() +func restoreFromBackup(flags Flags) error { + k8Client, err := createK8ClientWithFlags(flags) if err != nil { panic(err) } @@ -272,7 +278,7 @@ func restoreFromBackup(cmd *cobra.Command, _ []string) error { Name: "restore-from-backup", Image: sfs.Spec.Template.Spec.Containers[0].Image, ImagePullPolicy: core.PullAlways, - Env: restoreEnvFromSfs(sfs), + Env: restoreEnvFromSfs(flags, sfs), EnvFrom: []core.EnvFromSource{{SecretRef: &core.SecretEnvSource{LocalObjectReference: core.LocalObjectReference{Name: "zeebe-backup-store-s3"}}}}, VolumeMounts: []core.VolumeMount{ { @@ -284,7 +290,7 @@ func restoreFromBackup(cmd *cobra.Command, _ []string) error { } sfs.Spec.Template.Spec.InitContainers = []core.Container{deleteContainer, restoreContainer} - _, err = k8Client.Clientset.AppsV1().StatefulSets(namespace).Update(cmd.Context(), sfs, meta.UpdateOptions{}) + _, err = k8Client.Clientset.AppsV1().StatefulSets(namespace).Update(context.TODO(), sfs, meta.UpdateOptions{}) if err != nil { return err } @@ -308,7 +314,7 @@ func restoreFromBackup(cmd *cobra.Command, _ []string) error { return nil } -func restoreEnvFromSfs(sfs *apps.StatefulSet) []core.EnvVar { +func restoreEnvFromSfs(flags Flags, sfs *apps.StatefulSet) []core.EnvVar { zeebeEnv := sfs.Spec.Template.Spec.Containers[0].Env restoreEnv := make([]core.EnvVar, 0) for _, env := range zeebeEnv { @@ -326,7 +332,7 @@ func restoreEnvFromSfs(sfs *apps.StatefulSet) []core.EnvVar { }, core.EnvVar{ Name: "ZEEBE_RESTORE_FROM_BACKUP_ID", - Value: backupId, + Value: flags.backupId, }) return restoreEnv } diff --git a/go-chaos/cmd/connect.go b/go-chaos/cmd/connect.go index 851e47148..bd47b6991 100644 --- a/go-chaos/cmd/connect.go +++ b/go-chaos/cmd/connect.go @@ -19,34 +19,34 @@ import ( "github.com/zeebe-io/zeebe-chaos/go-chaos/backend" ) -func init() { - rootCmd.AddCommand(connect) - connect.AddCommand(connectBrokers) - connect.AddCommand(connectGateway) -} +func AddConnectCmd(rootCmd *cobra.Command, flags Flags) { + var connect = &cobra.Command{ + Use: "connect", + Short: "Connect Zeebe nodes", + Long: `Connect all Zeebe nodes again, after they have been disconnected uses sub-commands to connect brokers, gateways, etc.`, + } -var connect = &cobra.Command{ - Use: "connect", - Short: "Connect Zeebe nodes", - Long: `Connect all Zeebe nodes again, after they have been disconnected uses sub-commands to connect brokers, gateways, etc.`, -} + var connectBrokers = &cobra.Command{ + Use: "brokers", + Short: "Connect Zeebe Brokers", + Long: `Connect all Zeebe Brokers again, after they have been disconnected.`, + Run: func(cmd *cobra.Command, args []string) { + err := backend.ConnectBrokers(flags.kubeConfigPath, flags.namespace) + ensureNoError(err) + }, + } -var connectBrokers = &cobra.Command{ - Use: "brokers", - Short: "Connect Zeebe Brokers", - Long: `Connect all Zeebe Brokers again, after they have been disconnected.`, - Run: func(cmd *cobra.Command, args []string) { - err := backend.ConnectBrokers() - ensureNoError(err) - }, -} + var connectGateway = &cobra.Command{ + Use: "gateway", + Short: "Connect Zeebe Gateway", + Long: `Connect all Zeebe Gateway again, after it has been disconnected.`, + Run: func(cmd *cobra.Command, args []string) { + err := backend.ConnectGateway(flags.kubeConfigPath, flags.namespace) + ensureNoError(err) + }, + } -var connectGateway = &cobra.Command{ - Use: "gateway", - Short: "Connect Zeebe Gateway", - Long: `Connect all Zeebe Gateway again, after it has been disconnected.`, - Run: func(cmd *cobra.Command, args []string) { - err := backend.ConnectGateway() - ensureNoError(err) - }, + rootCmd.AddCommand(connect) + connect.AddCommand(connectBrokers) + connect.AddCommand(connectGateway) } diff --git a/go-chaos/cmd/dataloss_sim.go b/go-chaos/cmd/dataloss_sim.go index 628b34447..22368eea9 100644 --- a/go-chaos/cmd/dataloss_sim.go +++ b/go-chaos/cmd/dataloss_sim.go @@ -15,109 +15,111 @@ package cmd import ( + "time" + "github.com/spf13/cobra" "github.com/zeebe-io/zeebe-chaos/go-chaos/internal" - "time" ) -func init() { +func AddDatalossSimulationCmd(rootCmd *cobra.Command, flags Flags) { + + var datalossCmd = &cobra.Command{ + Use: "dataloss", + Short: "Simulate dataloss and recover", + Long: `Simulate dataloss of a broker, and recover from it.`, + } + + var prepareCmd = &cobra.Command{ + Use: "prepare", + Short: "Prepare the k8s deployment for dataloss test", + Long: `Prepares the k8s deployment - such as applying patches to statefulsets - to enable applying dataloss commands.`, + Run: func(cmd *cobra.Command, args []string) { + k8Client, err := createK8ClientWithFlags(flags) + if err != nil { + panic(err) + } + + // Add Init container for dataloss simulation test + err = k8Client.ApplyInitContainerPatch() + + if err != nil { + panic(err) + } + + internal.LogInfo("Prepared cluster in namesapce %s", k8Client.GetCurrentNamespace()) + }, + } + + var datalossDelete = &cobra.Command{ + Use: "delete", + Short: "Delete data of a broker", + Long: `Delete data of a broker by deleting the pvc and the pod`, + Run: func(cmd *cobra.Command, args []string) { + + k8Client, err := createK8ClientWithFlags(flags) + if err != nil { + panic(err) + } + + pod, err := internal.GetBrokerPodForNodeId(k8Client, int32(flags.nodeId)) + + if err != nil { + internal.LogInfo("Failed to get pod with nodeId %d %s", flags.nodeId, err) + panic(err) + } + + k8Client.DeletePvcOfBroker(pod.Name) + + internal.SetInitContainerBlockFlag(k8Client, flags.nodeId, "true") + err = k8Client.RestartPod(pod.Name) + if err != nil { + internal.LogInfo("Failed to restart pod %s", pod.Name) + panic(err) + } + + internal.LogInfo("Deleted pod %s in namespace %s", pod.Name, k8Client.GetCurrentNamespace()) + }, + } + + var datalossRecover = &cobra.Command{ + Use: "recover", + Short: "Recover broker after full data loss", + Long: `Restart the broker after full data loss, wait until the data is fully recovered`, + Run: func(cmd *cobra.Command, args []string) { + + k8Client, err := createK8ClientWithFlags(flags) + if err != nil { + panic(err) + } + + err = internal.SetInitContainerBlockFlag(k8Client, flags.nodeId, "false") + if err != nil { + panic(err) + } + + pod, err := internal.GetBrokerPodForNodeId(k8Client, int32(flags.nodeId)) + + if err != nil { + internal.LogInfo("Failed to get pod with nodeId %d %s", flags.nodeId, err) + panic(err) + } + + // The pod is restarting after dataloss, so it takes longer to be ready + err = k8Client.AwaitPodReadiness(pod.Name, 10*time.Minute) + + if err != nil { + internal.LogInfo("%s", err) + panic(err) + } + internal.LogInfo("Broker %d is recovered", flags.nodeId) + }, + } + rootCmd.AddCommand(datalossCmd) datalossCmd.AddCommand(prepareCmd) datalossCmd.AddCommand(datalossDelete) datalossCmd.AddCommand(datalossRecover) - datalossDelete.Flags().IntVar(&nodeId, "nodeId", 1, "Specify the id of the broker") - datalossRecover.Flags().IntVar(&nodeId, "nodeId", 1, "Specify the id of the broker") -} - -var datalossCmd = &cobra.Command{ - Use: "dataloss", - Short: "Simulate dataloss and recover", - Long: `Simulate dataloss of a broker, and recover from it.`, -} - -var prepareCmd = &cobra.Command{ - Use: "prepare", - Short: "Prepare the k8s deployment for dataloss test", - Long: `Prepares the k8s deployment - such as applying patches to statefulsets - to enable applying dataloss commands.`, - Run: func(cmd *cobra.Command, args []string) { - k8Client, err := internal.CreateK8Client() - if err != nil { - panic(err) - } - - // Add Init container for dataloss simulation test - err = k8Client.ApplyInitContainerPatch() - - if err != nil { - panic(err) - } - - internal.LogInfo("Prepared cluster in namesapce %s", k8Client.GetCurrentNamespace()) - }, -} - -var datalossDelete = &cobra.Command{ - Use: "delete", - Short: "Delete data of a broker", - Long: `Delete data of a broker by deleting the pvc and the pod`, - Run: func(cmd *cobra.Command, args []string) { - - k8Client, err := internal.CreateK8Client() - if err != nil { - panic(err) - } - - pod, err := internal.GetBrokerPodForNodeId(k8Client, int32(nodeId)) - - if err != nil { - internal.LogInfo("Failed to get pod with nodeId %d %s", nodeId, err) - panic(err) - } - - k8Client.DeletePvcOfBroker(pod.Name) - - internal.SetInitContainerBlockFlag(k8Client, nodeId, "true") - err = k8Client.RestartPod(pod.Name) - if err != nil { - internal.LogInfo("Failed to restart pod %s", pod.Name) - panic(err) - } - - internal.LogInfo("Deleted pod %s in namespace %s", pod.Name, k8Client.GetCurrentNamespace()) - }, -} - -var datalossRecover = &cobra.Command{ - Use: "recover", - Short: "Recover broker after full data loss", - Long: `Restart the broker after full data loss, wait until the data is fully recovered`, - Run: func(cmd *cobra.Command, args []string) { - - k8Client, err := internal.CreateK8Client() - if err != nil { - panic(err) - } - - err = internal.SetInitContainerBlockFlag(k8Client, nodeId, "false") - if err != nil { - panic(err) - } - - pod, err := internal.GetBrokerPodForNodeId(k8Client, int32(nodeId)) - - if err != nil { - internal.LogInfo("Failed to get pod with nodeId %d %s", nodeId, err) - panic(err) - } - - // The pod is restarting after dataloss, so it takes longer to be ready - err = k8Client.AwaitPodReadiness(pod.Name, 10*time.Minute) - - if err != nil { - internal.LogInfo("%s", err) - panic(err) - } - internal.LogInfo("Broker %d is recovered", nodeId) - }, + datalossDelete.Flags().IntVar(&flags.nodeId, "nodeId", 1, "Specify the id of the broker") + datalossRecover.Flags().IntVar(&flags.nodeId, "nodeId", 1, "Specify the id of the broker") } diff --git a/go-chaos/cmd/deploy.go b/go-chaos/cmd/deploy.go index 3878ef6fd..b0992b3a2 100644 --- a/go-chaos/cmd/deploy.go +++ b/go-chaos/cmd/deploy.go @@ -20,107 +20,108 @@ import ( "github.com/zeebe-io/zeebe-chaos/go-chaos/internal" ) -func init() { - rootCmd.AddCommand(deployCmd) - - deployCmd.AddCommand(deployProcessModelCmd) - deployProcessModelCmd.Flags().StringVar(&processModelPath, "processModelPath", "", - "Specify the path to a BPMN process model, which should be deployed. Defaults to a benchmark process model with one task (included in zbchaos). If the path starts with 'bpmn/' zbchaos will look for a referenced model bundled within the cli, like: 'bpmn/one_task.bpmn'.") - - deployCmd.AddCommand(deployMultiVersionProcessModelCmd) - deployMultiVersionProcessModelCmd.Flags().IntVar(&versionCount, "versionCount", 10, - "Specify how many different versions of a default BPMN and DMN model should be deployed. Useful for testing deployment distribution.") - - deployCmd.AddCommand(deployWorkerCmd) - deployCmd.AddCommand(deployChaosModels) -} - -var deployCmd = &cobra.Command{ - Use: "deploy", - Short: "Deploy certain resource", - Long: `Deploy certain resource, like process model(s) or kubernetes manifest.`, -} - -var deployProcessModelCmd = &cobra.Command{ - Use: "process", - Short: "Deploy a process model to Zeebe", - Long: `Deploy a process model to Zeebe. +func AddDeployCmd(rootCmd *cobra.Command, flags Flags) { + + var deployCmd = &cobra.Command{ + Use: "deploy", + Short: "Deploy certain resource", + Long: `Deploy certain resource, like process model(s) or kubernetes manifest.`, + } + + var deployProcessModelCmd = &cobra.Command{ + Use: "process", + Short: "Deploy a process model to Zeebe", + Long: `Deploy a process model to Zeebe. Can be used to deploy a specific process model or multiple version of a default BPMN and DMN model. Defaults to the later, which is useful for experimenting with deployment distribution.`, - Run: func(cmd *cobra.Command, args []string) { - k8Client, err := internal.CreateK8Client() - ensureNoError(err) + Run: func(cmd *cobra.Command, args []string) { + k8Client, err := createK8ClientWithFlags(flags) + ensureNoError(err) - port := 26500 - closeFn := k8Client.MustGatewayPortForward(port, port) - defer closeFn() + port := 26500 + closeFn := k8Client.MustGatewayPortForward(port, port) + defer closeFn() - zbClient, err := internal.CreateZeebeClient(port) - ensureNoError(err) - defer zbClient.Close() + zbClient, err := internal.CreateZeebeClient(port) + ensureNoError(err) + defer zbClient.Close() - processDefinitionKey, err := internal.DeployModel(zbClient, processModelPath) - ensureNoError(err) + processDefinitionKey, err := internal.DeployModel(zbClient, flags.processModelPath) + ensureNoError(err) - internal.LogInfo("Deployed given process model %s, under key %d!", processModelPath, processDefinitionKey) - }, -} + internal.LogInfo("Deployed given process model %s, under key %d!", flags.processModelPath, processDefinitionKey) + }, + } -var deployMultiVersionProcessModelCmd = &cobra.Command{ - Use: "multi-version", - Short: "Deploy multiple versions to Zeebe", - Long: `Deploy multiple versions of process and dmn models to Zeebe. + var deployMultiVersionProcessModelCmd = &cobra.Command{ + Use: "multi-version", + Short: "Deploy multiple versions to Zeebe", + Long: `Deploy multiple versions of process and dmn models to Zeebe. Useful for experimenting with deployment distribution.`, - Run: func(cmd *cobra.Command, args []string) { - k8Client, err := internal.CreateK8Client() - ensureNoError(err) - - port := 26500 - closeFn := k8Client.MustGatewayPortForward(port, port) - defer closeFn() - - zbClient, err := internal.CreateZeebeClient(port) - ensureNoError(err) - defer zbClient.Close() - - err = internal.DeployDifferentVersions(zbClient, int32(versionCount)) - ensureNoError(err) - internal.LogInfo("Deployed different process models of different types and versions to zeebe!") - }, -} - -var deployWorkerCmd = &cobra.Command{ - Use: "worker", - Short: "Deploy a worker deployment to the Zeebe cluster", - Long: `Deploy a worker deployment to the Zeebe cluster. + Run: func(cmd *cobra.Command, args []string) { + k8Client, err := createK8ClientWithFlags(flags) + ensureNoError(err) + + port := 26500 + closeFn := k8Client.MustGatewayPortForward(port, port) + defer closeFn() + + zbClient, err := internal.CreateZeebeClient(port) + ensureNoError(err) + defer zbClient.Close() + + err = internal.DeployDifferentVersions(zbClient, int32(flags.versionCount)) + ensureNoError(err) + internal.LogInfo("Deployed different process models of different types and versions to zeebe!") + }, + } + + var deployWorkerCmd = &cobra.Command{ + Use: "worker", + Short: "Deploy a worker deployment to the Zeebe cluster", + Long: `Deploy a worker deployment to the Zeebe cluster. The workers can be used as part of some chaos experiments to complete process instances etc.`, - Run: func(cmd *cobra.Command, args []string) { - k8Client, err := internal.CreateK8Client() - ensureNoError(err) + Run: func(cmd *cobra.Command, args []string) { + k8Client, err := createK8ClientWithFlags(flags) + ensureNoError(err) - err = k8Client.CreateWorkerDeployment() - ensureNoError(err) + err = k8Client.CreateWorkerDeployment() + ensureNoError(err) - internal.LogInfo("Worker successfully deployed to the current namespace: %s", k8Client.GetCurrentNamespace()) - }, -} + internal.LogInfo("Worker successfully deployed to the current namespace: %s", k8Client.GetCurrentNamespace()) + }, + } -var deployChaosModels = &cobra.Command{ - Use: "chaos", - Short: "Deploy all chaos BPMN models to the Zeebe cluster", - Long: `Deploy all chaos BPMN models to the to the Zeebe cluster. + var deployChaosModels = &cobra.Command{ + Use: "chaos", + Short: "Deploy all chaos BPMN models to the Zeebe cluster", + Long: `Deploy all chaos BPMN models to the to the Zeebe cluster. The process models allow to execute chaos experiments.`, - Run: func(cmd *cobra.Command, args []string) { - k8Client, err := internal.CreateK8Client() - ensureNoError(err) + Run: func(cmd *cobra.Command, args []string) { + k8Client, err := createK8ClientWithFlags(flags) + ensureNoError(err) + + zbClient, closeFn, err := backend.ConnectToZeebeCluster(k8Client) + ensureNoError(err) + defer closeFn() + + err = internal.DeployChaosModels(zbClient) + ensureNoError(err) + + internal.LogInfo("Deployed successfully process models to run chaos experiments") + }, + } + + rootCmd.AddCommand(deployCmd) - zbClient, closeFn, err := backend.ConnectToZeebeCluster(k8Client) - ensureNoError(err) - defer closeFn() + deployCmd.AddCommand(deployProcessModelCmd) + deployProcessModelCmd.Flags().StringVar(&flags.processModelPath, "processModelPath", "", + "Specify the path to a BPMN process model, which should be deployed. Defaults to a benchmark process model with one task (included in zbchaos). If the path starts with 'bpmn/' zbchaos will look for a referenced model bundled within the cli, like: 'bpmn/one_task.bpmn'.") - err = internal.DeployChaosModels(zbClient) - ensureNoError(err) + deployCmd.AddCommand(deployMultiVersionProcessModelCmd) + deployMultiVersionProcessModelCmd.Flags().IntVar(&flags.versionCount, "versionCount", 10, + "Specify how many different versions of a default BPMN and DMN model should be deployed. Useful for testing deployment distribution.") - internal.LogInfo("Deployed successfully process models to run chaos experiments") - }, + deployCmd.AddCommand(deployWorkerCmd) + deployCmd.AddCommand(deployChaosModels) } diff --git a/go-chaos/cmd/disconnect.go b/go-chaos/cmd/disconnect.go index 7a8c22b2f..a704992e6 100644 --- a/go-chaos/cmd/disconnect.go +++ b/go-chaos/cmd/disconnect.go @@ -19,87 +19,83 @@ import ( "github.com/zeebe-io/zeebe-chaos/go-chaos/backend" ) -var ( - oneDirection bool - disconnectToAll bool -) +func ensureNoError(err error) { + if err != nil { + panic(err) + } +} + +func AddDisconnectCommand(rootCmd *cobra.Command, flags Flags) { + + var disconnect = &cobra.Command{ + Use: "disconnect", + Short: "Disconnect Zeebe nodes", + Long: `Disconnect Zeebe nodes, uses sub-commands to disconnect leaders, followers, etc.`, + } + + var disconnectBrokers = &cobra.Command{ + Use: "brokers", + Short: "Disconnect Zeebe Brokers", + Long: `Disconnect Zeebe Brokers with a given partition and role.`, + Run: func(cmd *cobra.Command, args []string) { + err := backend.DisconnectBroker(flags.kubeConfigPath, flags.namespace, backend.DisconnectBrokerCfg{ + Broker1Cfg: backend.Broker{ + NodeId: flags.broker1NodeId, + PartitionId: flags.broker1PartitionId, + Role: flags.broker1Role, + }, + Broker2Cfg: backend.Broker{ + NodeId: flags.broker2NodeId, + PartitionId: flags.broker2PartitionId, + Role: flags.broker2Role, + }, + OneDirection: flags.oneDirection, + }) + ensureNoError(err) + }, + } + + var disconnectGateway = &cobra.Command{ + Use: "gateway", + Short: "Disconnect Zeebe Gateway", + Long: `Disconnect Zeebe Gateway from Broker with a given partition and role.`, + Run: func(cmd *cobra.Command, args []string) { + err := backend.DisconnectGateway(flags.kubeConfigPath, flags.namespace, backend.DisconnectGatewayCfg{ + OneDirection: flags.oneDirection, + DisconnectToAll: flags.disconnectToAll, + BrokerCfg: backend.Broker{ + Role: flags.role, + PartitionId: flags.partitionId, + NodeId: flags.nodeId, + }, + }) + ensureNoError(err) + }, + } -func init() { rootCmd.AddCommand(disconnect) // disconnect brokers disconnect.AddCommand(disconnectBrokers) // broker 1 - disconnectBrokers.Flags().StringVar(&broker1Role, "broker1Role", "LEADER", "Specify the partition role [LEADER, FOLLOWER] of the first Broker") - disconnectBrokers.Flags().IntVar(&broker1PartitionId, "broker1PartitionId", 1, "Specify the partition id of the first Broker") - disconnectBrokers.Flags().IntVar(&broker1NodeId, "broker1NodeId", -1, "Specify the nodeId of the first Broker") + disconnectBrokers.Flags().StringVar(&flags.broker1Role, "broker1Role", "LEADER", "Specify the partition role [LEADER, FOLLOWER] of the first Broker") + disconnectBrokers.Flags().IntVar(&flags.broker1PartitionId, "broker1PartitionId", 1, "Specify the partition id of the first Broker") + disconnectBrokers.Flags().IntVar(&flags.broker1NodeId, "broker1NodeId", -1, "Specify the nodeId of the first Broker") disconnectBrokers.MarkFlagsMutuallyExclusive("broker1PartitionId", "broker1NodeId") // broker 2 - disconnectBrokers.Flags().StringVar(&broker2Role, "broker2Role", "LEADER", "Specify the partition role [LEADER, FOLLOWER] of the second Broker") - disconnectBrokers.Flags().IntVar(&broker2PartitionId, "broker2PartitionId", 2, "Specify the partition id of the second Broker") - disconnectBrokers.Flags().IntVar(&broker2NodeId, "broker2NodeId", -1, "Specify the nodeId of the second Broker") + disconnectBrokers.Flags().StringVar(&flags.broker2Role, "broker2Role", "LEADER", "Specify the partition role [LEADER, FOLLOWER] of the second Broker") + disconnectBrokers.Flags().IntVar(&flags.broker2PartitionId, "broker2PartitionId", 2, "Specify the partition id of the second Broker") + disconnectBrokers.Flags().IntVar(&flags.broker2NodeId, "broker2NodeId", -1, "Specify the nodeId of the second Broker") // general - disconnectBrokers.Flags().BoolVar(&oneDirection, "one-direction", false, "Specify whether the network partition should be setup only in one direction (asymmetric)") + disconnectBrokers.Flags().BoolVar(&flags.oneDirection, "one-direction", false, "Specify whether the network partition should be setup only in one direction (asymmetric)") disconnectBrokers.MarkFlagsMutuallyExclusive("broker2PartitionId", "broker2NodeId") // disconnect gateway disconnect.AddCommand(disconnectGateway) - disconnectGateway.Flags().IntVar(&nodeId, "nodeId", -1, "Specify the nodeId of the Broker") - disconnectGateway.Flags().StringVar(&role, "role", "LEADER", "Specify the partition role [LEADER, FOLLOWER] of the Broker") - disconnectGateway.Flags().IntVar(&partitionId, "partitionId", 1, "Specify the partition id of the Broker") - disconnectGateway.Flags().BoolVar(&oneDirection, "one-direction", false, "Specify whether the network partition should be setup only in one direction (asymmetric)") - disconnectGateway.Flags().BoolVar(&disconnectToAll, "all", false, "Specify whether the gateway should be disconnected to all brokers") + disconnectGateway.Flags().IntVar(&flags.nodeId, "nodeId", -1, "Specify the nodeId of the Broker") + disconnectGateway.Flags().StringVar(&flags.role, "role", "LEADER", "Specify the partition role [LEADER, FOLLOWER] of the Broker") + disconnectGateway.Flags().IntVar(&flags.partitionId, "partitionId", 1, "Specify the partition id of the Broker") + disconnectGateway.Flags().BoolVar(&flags.oneDirection, "one-direction", false, "Specify whether the network partition should be setup only in one direction (asymmetric)") + disconnectGateway.Flags().BoolVar(&flags.disconnectToAll, "all", false, "Specify whether the gateway should be disconnected to all brokers") disconnectGateway.MarkFlagsMutuallyExclusive("all", "partitionId", "nodeId") } - -var disconnect = &cobra.Command{ - Use: "disconnect", - Short: "Disconnect Zeebe nodes", - Long: `Disconnect Zeebe nodes, uses sub-commands to disconnect leaders, followers, etc.`, -} - -func ensureNoError(err error) { - if err != nil { - panic(err) - } -} - -var disconnectBrokers = &cobra.Command{ - Use: "brokers", - Short: "Disconnect Zeebe Brokers", - Long: `Disconnect Zeebe Brokers with a given partition and role.`, - Run: func(cmd *cobra.Command, args []string) { - err := backend.DisconnectBroker(backend.DisconnectBrokerCfg{ - Broker1Cfg: backend.Broker{ - NodeId: broker1NodeId, - PartitionId: broker1PartitionId, - Role: broker1Role, - }, - Broker2Cfg: backend.Broker{ - NodeId: broker2NodeId, - PartitionId: broker2PartitionId, - Role: broker2Role, - }, - OneDirection: oneDirection, - }) - ensureNoError(err) - }, -} - -var disconnectGateway = &cobra.Command{ - Use: "gateway", - Short: "Disconnect Zeebe Gateway", - Long: `Disconnect Zeebe Gateway from Broker with a given partition and role.`, - Run: func(cmd *cobra.Command, args []string) { - err := backend.DisconnectGateway(backend.DisconnectGatewayCfg{ - OneDirection: oneDirection, - DisconnectToAll: disconnectToAll, - BrokerCfg: backend.Broker{ - Role: role, - PartitionId: partitionId, - NodeId: nodeId, - }, - }) - ensureNoError(err) - }, -} diff --git a/go-chaos/cmd/exporting.go b/go-chaos/cmd/exporting.go index 85f2d0de7..7cd5413b6 100644 --- a/go-chaos/cmd/exporting.go +++ b/go-chaos/cmd/exporting.go @@ -22,37 +22,40 @@ import ( "github.com/zeebe-io/zeebe-chaos/go-chaos/internal" ) -func init() { - rootCmd.AddCommand(exportingCommand) +func AddExportingCmds(rootCmd *cobra.Command, flags Flags) { + var exportingCommand = &cobra.Command{ + Use: "exporting", + Short: "Controls Zeebe Exporting", + Long: "Can be used to start and stop exporting", + } - exportingCommand.AddCommand(pauseExportingCommand) - exportingCommand.AddCommand(resumeExportingCommand) -} + var pauseExportingCommand = &cobra.Command{ + Use: "pause", + Short: "Pause exporting on all partitions", + RunE: func(cmd *cobra.Command, args []string) error { + k8Client, err := createK8ClientWithFlags(flags) + ensureNoError(err) + return pauseExporting(k8Client) + }, + } -var exportingCommand = &cobra.Command{ - Use: "exporting", - Short: "Controls Zeebe Exporting", - Long: "Can be used to start and stop exporting", -} + var resumeExportingCommand = &cobra.Command{ + Use: "resume", + Short: "Resume exporting on all partitions", + RunE: func(cmd *cobra.Command, args []string) error { + k8Client, err := createK8ClientWithFlags(flags) + ensureNoError(err) + return resumeExporting(k8Client) + }, + } -var pauseExportingCommand = &cobra.Command{ - Use: "pause", - Short: "Pause exporting on all partitions", - RunE: pauseExporting, -} + rootCmd.AddCommand(exportingCommand) -var resumeExportingCommand = &cobra.Command{ - Use: "resume", - Short: "Resume exporting on all partitions", - RunE: resumeExporting, + exportingCommand.AddCommand(pauseExportingCommand) + exportingCommand.AddCommand(resumeExportingCommand) } -func pauseExporting(cmd *cobra.Command, args []string) error { - k8Client, err := internal.CreateK8Client() - if err != nil { - panic(err) - } - +func pauseExporting(k8Client internal.K8Client) error { port := 9600 closePortForward := k8Client.MustGatewayPortForward(port, port) defer closePortForward() @@ -65,12 +68,7 @@ func pauseExporting(cmd *cobra.Command, args []string) error { return err } -func resumeExporting(cmd *cobra.Command, args []string) error { - k8Client, err := internal.CreateK8Client() - if err != nil { - panic(err) - } - +func resumeExporting(k8Client internal.K8Client) error { port := 9600 closePortForward := k8Client.MustGatewayPortForward(port, port) defer closePortForward() @@ -81,5 +79,4 @@ func resumeExporting(cmd *cobra.Command, args []string) error { } defer resp.Body.Close() return err - } diff --git a/go-chaos/cmd/publish.go b/go-chaos/cmd/publish.go index 25c5a03c3..98fb5a80a 100644 --- a/go-chaos/cmd/publish.go +++ b/go-chaos/cmd/publish.go @@ -22,41 +22,42 @@ import ( "github.com/zeebe-io/zeebe-chaos/go-chaos/internal" ) -func init() { - rootCmd.AddCommand(publishCmd) - publishCmd.Flags().IntVar(&partitionId, "partitionId", 1, "Specify the id of the partition") - publishCmd.Flags().StringVar(&msgName, "msgName", "msg", "Specify the name of the message, which should be published.") -} +func AddPublishCmd(rootCmd *cobra.Command, flags Flags) { -var publishCmd = &cobra.Command{ - Use: "publish", - Short: "Publish a message", - Long: `Publish a message to a certain partition.`, - Run: func(cmd *cobra.Command, args []string) { - k8Client, err := internal.CreateK8Client() - panicOnError(err) + var publishCmd = &cobra.Command{ + Use: "publish", + Short: "Publish a message", + Long: `Publish a message to a certain partition.`, + Run: func(cmd *cobra.Command, args []string) { + k8Client, err := createK8ClientWithFlags(flags) + panicOnError(err) - port := 26500 - closeFn := k8Client.MustGatewayPortForward(port, port) - defer closeFn() + port := 26500 + closeFn := k8Client.MustGatewayPortForward(port, port) + defer closeFn() - zbClient, err := internal.CreateZeebeClient(port) - panicOnError(err) - defer zbClient.Close() + zbClient, err := internal.CreateZeebeClient(port) + panicOnError(err) + defer zbClient.Close() - topology, err := internal.GetTopology(zbClient) - panicOnError(err) + topology, err := internal.GetTopology(zbClient) + panicOnError(err) - correlationKey, err := internal.FindCorrelationKeyForPartition(partitionId, int(topology.PartitionsCount)) - panicOnError(err) + correlationKey, err := internal.FindCorrelationKeyForPartition(flags.partitionId, int(topology.PartitionsCount)) + panicOnError(err) - internal.LogVerbose("Send message '%s', with correaltion key '%s' (ASCII: %d) ", msgName, correlationKey, int(correlationKey[0])) + internal.LogVerbose("Send message '%s', with correaltion key '%s' (ASCII: %d) ", flags.msgName, correlationKey, int(correlationKey[0])) - messageResponse, err := zbClient.NewPublishMessageCommand().MessageName(msgName).CorrelationKey(correlationKey).TimeToLive(time.Minute * 5).Send(context.TODO()) - partitionIdFromKey := internal.ExtractPartitionIdFromKey(messageResponse.Key) + messageResponse, err := zbClient.NewPublishMessageCommand().MessageName(flags.msgName).CorrelationKey(correlationKey).TimeToLive(time.Minute * 5).Send(context.TODO()) + partitionIdFromKey := internal.ExtractPartitionIdFromKey(messageResponse.Key) - internal.LogInfo("Message was sent and returned key %d, which corresponds to partition: %d", messageResponse.Key, partitionIdFromKey) - }, + internal.LogInfo("Message was sent and returned key %d, which corresponds to partition: %d", messageResponse.Key, partitionIdFromKey) + }, + } + + rootCmd.AddCommand(publishCmd) + publishCmd.Flags().IntVar(&flags.partitionId, "partitionId", 1, "Specify the id of the partition") + publishCmd.Flags().StringVar(&flags.msgName, "msgName", "msg", "Specify the name of the message, which should be published.") } func panicOnError(err error) { diff --git a/go-chaos/cmd/restart.go b/go-chaos/cmd/restart.go index 8107d39f3..ab1cd125f 100644 --- a/go-chaos/cmd/restart.go +++ b/go-chaos/cmd/restart.go @@ -19,50 +19,57 @@ import ( "github.com/zeebe-io/zeebe-chaos/go-chaos/internal" ) -func init() { +func AddRestartCmd(rootCmd *cobra.Command, flags Flags) { + + var restartCmd = &cobra.Command{ + Use: "restart", + Short: "Restarts a Zeebe node", + Long: `Restarts a Zeebe node, it can be chosen between: broker, gateway or a worker.`, + } + + var restartBrokerCmd = &cobra.Command{ + Use: "broker", + Short: "Restarts a Zeebe broker", + Long: `Restarts a Zeebe broker with a certain role and given partition.`, + Run: func(cmd *cobra.Command, args []string) { + k8Client, err := createK8ClientWithFlags(flags) + ensureNoError(err) + brokerPod := restartBroker(k8Client, flags.nodeId, flags.partitionId, flags.role, nil) + internal.LogInfo("Restarted %s", brokerPod) + }, + } + + var restartGatewayCmd = &cobra.Command{ + Use: "gateway", + Short: "Restarts a Zeebe gateway", + Long: `Restarts a Zeebe gateway.`, + Run: func(cmd *cobra.Command, args []string) { + k8Client, err := createK8ClientWithFlags(flags) + ensureNoError(err) + gatewayPod := restartGateway(k8Client, nil) + internal.LogInfo("Restarted %s", gatewayPod) + }, + } + + var restartWorkerCmd = &cobra.Command{ + Use: "worker", + Short: "Restart a Zeebe worker", + Long: `Restart a Zeebe worker.`, + Run: func(cmd *cobra.Command, args []string) { + k8Client, err := createK8ClientWithFlags(flags) + ensureNoError(err) + restartWorker(k8Client, flags.all, "Restarted", nil) + }, + } + rootCmd.AddCommand(restartCmd) restartCmd.AddCommand(restartBrokerCmd) - restartBrokerCmd.Flags().StringVar(&role, "role", "LEADER", "Specify the partition role [LEADER, FOLLOWER, INACTIVE]") - restartBrokerCmd.Flags().IntVar(&partitionId, "partitionId", 1, "Specify the id of the partition") - restartBrokerCmd.Flags().IntVar(&nodeId, "nodeId", -1, "Specify the nodeId of the Broker") + restartBrokerCmd.Flags().StringVar(&flags.role, "role", "LEADER", "Specify the partition role [LEADER, FOLLOWER, INACTIVE]") + restartBrokerCmd.Flags().IntVar(&flags.partitionId, "partitionId", 1, "Specify the id of the partition") + restartBrokerCmd.Flags().IntVar(&flags.nodeId, "nodeId", -1, "Specify the nodeId of the Broker") restartBrokerCmd.MarkFlagsMutuallyExclusive("partitionId", "nodeId") restartCmd.AddCommand(restartGatewayCmd) restartCmd.AddCommand(restartWorkerCmd) - restartWorkerCmd.Flags().BoolVar(&all, "all", false, "Specify whether all workers should be restarted") -} - -var restartCmd = &cobra.Command{ - Use: "restart", - Short: "Restarts a Zeebe node", - Long: `Restarts a Zeebe node, it can be chosen between: broker, gateway or a worker.`, -} - -var restartBrokerCmd = &cobra.Command{ - Use: "broker", - Short: "Restarts a Zeebe broker", - Long: `Restarts a Zeebe broker with a certain role and given partition.`, - Run: func(cmd *cobra.Command, args []string) { - brokerPod := restartBroker(nodeId, partitionId, role, nil) - internal.LogInfo("Restarted %s", brokerPod) - }, -} - -var restartGatewayCmd = &cobra.Command{ - Use: "gateway", - Short: "Restarts a Zeebe gateway", - Long: `Restarts a Zeebe gateway.`, - Run: func(cmd *cobra.Command, args []string) { - gatewayPod := restartGateway(nil) - internal.LogInfo("Restarted %s", gatewayPod) - }, -} - -var restartWorkerCmd = &cobra.Command{ - Use: "worker", - Short: "Restart a Zeebe worker", - Long: `Restart a Zeebe worker.`, - Run: func(cmd *cobra.Command, args []string) { - restartWorker(all, "Restarted", nil) - }, + restartWorkerCmd.Flags().BoolVar(&flags.all, "all", false, "Specify whether all workers should be restarted") } diff --git a/go-chaos/cmd/root.go b/go-chaos/cmd/root.go index 368c27810..9ef755760 100644 --- a/go-chaos/cmd/root.go +++ b/go-chaos/cmd/root.go @@ -18,14 +18,13 @@ import ( "fmt" "os" - "github.com/camunda/zeebe/clients/go/v8/pkg/zbc" "github.com/rs/zerolog/log" "github.com/spf13/cobra" "github.com/zeebe-io/zeebe-chaos/go-chaos/internal" ) -var ( +type Flags struct { partitionId int role string nodeId int @@ -40,55 +39,85 @@ var ( broker2PartitionId int broker2Role string broker2NodeId int -) + // backup + backupId string + + // disconnect + oneDirection bool + disconnectToAll bool + + // stress + + cpuStress bool + memoryStress bool + ioStress bool + timeoutSec string + + // terminate + + all bool + + // verify + version int + bpmnProcessId string + timeoutInSec int + kubeConfigPath string + namespace string +} + +var Version = "development" +var Commit = "HEAD" var Verbose bool -var KubeConfigPath string -var Namespace string -var ClientId string -var ClientSecret string -var Audience string var JsonLogging bool -var rootCmd = &cobra.Command{ - Use: "zbchaos", - Short: "Zeebe chaos is a chaos experiment tool for Zeebe", - Long: `A chaos experimenting toolkit for Zeebe. +func NewCmd() *cobra.Command { + flags := Flags{} + + rootCmd := &cobra.Command{ + Use: "zbchaos", + Short: "Zeebe chaos is a chaos experiment tool for Zeebe", + Long: `A chaos experimenting toolkit for Zeebe. Perfect to inject some chaos into your brokers and gateways.`, - PersistentPreRun: func(cmd *cobra.Command, args []string) { - internal.Verbosity = Verbose - internal.JsonLogging = JsonLogging - if JsonLogging { - internal.JsonLogger = log.With().Logger() - } - internal.Namespace = Namespace - internal.KubeConfigPath = KubeConfigPath - if ClientId != "" && ClientSecret != "" { - internal.ZeebeClientCredential, _ = zbc.NewOAuthCredentialsProvider(&zbc.OAuthProviderConfig{ - ClientID: ClientId, - ClientSecret: ClientSecret, - Audience: Audience, - }) - } - }, -} + PersistentPreRun: func(cmd *cobra.Command, args []string) { + internal.Verbosity = Verbose + internal.JsonLogging = JsonLogging + if JsonLogging { + internal.JsonLogger = log.With().Logger() + } + }, + } -func init() { rootCmd.PersistentFlags().BoolVarP(&Verbose, "verbose", "v", false, "verbose output") rootCmd.PersistentFlags().BoolVarP(&JsonLogging, "jsonLogging", "", false, "json logging output") - rootCmd.PersistentFlags().StringVar(&KubeConfigPath, "kubeconfig", "", "path the the kube config that will be used") - rootCmd.PersistentFlags().StringVarP(&Namespace, "namespace", "n", "", "connect to the given namespace") - rootCmd.PersistentFlags().StringVarP(&ClientId, "clientId", "c", "", "connect using the given clientId") - rootCmd.PersistentFlags().StringVar(&ClientSecret, "clientSecret", "", "connect using the given client secret") - rootCmd.PersistentFlags().StringVar(&Audience, "audience", "", "connect using the given client secret") -} + rootCmd.PersistentFlags().StringVar(&flags.kubeConfigPath, "kubeconfig", "", "path the the kube config that will be used") + rootCmd.PersistentFlags().StringVarP(&flags.namespace, "namespace", "n", "", "connect to the given namespace") + + AddBackupCommand(rootCmd, flags) + AddBrokersCommand(rootCmd, flags) + AddConnectCmd(rootCmd, flags) + AddDatalossSimulationCmd(rootCmd, flags) + AddDeployCmd(rootCmd, flags) + AddDisconnectCommand(rootCmd, flags) + AddExportingCmds(rootCmd, flags) + AddPublishCmd(rootCmd, flags) + AddRestartCmd(rootCmd, flags) + AddStressCmd(rootCmd, flags) + AddTerminateCommand(rootCmd, flags) + AddTopologyCmd(rootCmd, flags) + AddVerifyCommands(rootCmd, flags) + AddVersionCmd(rootCmd) + AddWorkerCmd(rootCmd) -func NewCmd() *cobra.Command { return rootCmd } +func createK8ClientWithFlags(flags Flags) (internal.K8Client, error) { + return internal.CreateK8Client(flags.kubeConfigPath, flags.namespace) +} + func Execute() { - if err := rootCmd.Execute(); err != nil { + if err := NewCmd().Execute(); err != nil { fmt.Fprintln(os.Stderr, err) os.Exit(1) } diff --git a/go-chaos/cmd/stress.go b/go-chaos/cmd/stress.go index f599f43f8..092335ecf 100644 --- a/go-chaos/cmd/stress.go +++ b/go-chaos/cmd/stress.go @@ -24,82 +24,74 @@ import ( v1 "k8s.io/api/core/v1" ) -var ( - cpuStress bool - memoryStress bool - ioStress bool - timeoutSec string -) +func AddStressCmd(rootCmd *cobra.Command, flags Flags) { + stress := &cobra.Command{ + Use: "stress", + Short: "Put stress on a Zeebe node", + Long: `Put stress on a Zeebe node. Node can be choose from gateway or brokers. Stress can be of different kind: memory, io or CPU. The different stress types can be combined.`, + } -func init() { - rootCmd.AddCommand(stress) + stressBroker := &cobra.Command{ + Use: "broker", + Short: "Put stress on a Zeebe Broker", + Long: `Put stress on a Zeebe Broker. Broker can be identified via ID or partition and role. Stress can be of different kinds: memory, io or CPU.`, + Run: func(cmd *cobra.Command, args []string) { + internal.Verbosity = Verbose + k8Client, err := createK8ClientWithFlags(flags) + ensureNoError(err) + + port := 26500 + closeFn := k8Client.MustGatewayPortForward(port, port) + defer closeFn() + + zbClient, err := internal.CreateZeebeClient(port) + ensureNoError(err) + defer zbClient.Close() + + pod := getBrokerPod(k8Client, zbClient, flags.nodeId, flags.partitionId, flags.role) + internal.LogInfo("Put stress on %s", pod.Name) + + stressType := internal.StressType{CpuStress: flags.cpuStress, IoStress: flags.ioStress, MemStress: flags.memoryStress} + err = internal.PutStressOnPod(k8Client, flags.timeoutSec, pod.Name, stressType) + ensureNoError(err) + }, + } + + stressGateway := &cobra.Command{ + Use: "gateway", + Short: "Put stress on a Zeebe Gateway", + Long: `Put stress on a Zeebe Gateway. Stress can be of different kinds: memory, io or CPU.`, + Run: func(cmd *cobra.Command, args []string) { + internal.Verbosity = Verbose + k8Client, err := createK8ClientWithFlags(flags) + ensureNoError(err) + + pod := getGatewayPod(k8Client) + internal.LogInfo("Put stress on %s", pod.Name) + + stressType := internal.StressType{CpuStress: flags.cpuStress, IoStress: flags.ioStress, MemStress: flags.memoryStress} + err = internal.PutStressOnPod(k8Client, flags.timeoutSec, pod.Name, stressType) + ensureNoError(err) + }, + } - stress.PersistentFlags().BoolVar(&cpuStress, "cpu", true, "Specify whether CPU stress should put on the node") - stress.PersistentFlags().BoolVar(&memoryStress, "memory", false, "Specify whether memory stress should put on the node") - stress.PersistentFlags().BoolVar(&ioStress, "io", false, "Specify whether io stress should put on the node") - stressBroker.PersistentFlags().StringVar(&timeoutSec, "timeout", "30", "Specify how long the stress should be executed in seconds. Default: 30") + rootCmd.AddCommand(stress) + stress.PersistentFlags().BoolVar(&flags.cpuStress, "cpu", true, "Specify whether CPU stress should put on the node") + stress.PersistentFlags().BoolVar(&flags.memoryStress, "memory", false, "Specify whether memory stress should put on the node") + stress.PersistentFlags().BoolVar(&flags.ioStress, "io", false, "Specify whether io stress should put on the node") + stressBroker.PersistentFlags().StringVar(&flags.timeoutSec, "timeout", "30", "Specify how long the stress should be executed in seconds. Default: 30") // stress brokers stress.AddCommand(stressBroker) - stressBroker.Flags().IntVar(&nodeId, "nodeId", -1, "Specify the nodeId of the Broker") - stressBroker.Flags().StringVar(&role, "role", "LEADER", "Specify the partition role [LEADER, FOLLOWER] of the Broker") - stressBroker.Flags().IntVar(&partitionId, "partitionId", 1, "Specify the partition id of the Broker") + stressBroker.Flags().IntVar(&flags.nodeId, "nodeId", -1, "Specify the nodeId of the Broker") + stressBroker.Flags().StringVar(&flags.role, "role", "LEADER", "Specify the partition role [LEADER, FOLLOWER] of the Broker") + stressBroker.Flags().IntVar(&flags.partitionId, "partitionId", 1, "Specify the partition id of the Broker") stress.AddCommand(stressGateway) } -var stress = &cobra.Command{ - Use: "stress", - Short: "Put stress on a Zeebe node", - Long: `Put stress on a Zeebe node. Node can be choose from gateway or brokers. Stress can be of different kind: memory, io or CPU. The different stress types can be combined.`, -} - -var stressBroker = &cobra.Command{ - Use: "broker", - Short: "Put stress on a Zeebe Broker", - Long: `Put stress on a Zeebe Broker. Broker can be identified via ID or partition and role. Stress can be of different kinds: memory, io or CPU.`, - Run: func(cmd *cobra.Command, args []string) { - internal.Verbosity = Verbose - k8Client, err := internal.CreateK8Client() - ensureNoError(err) - - port := 26500 - closeFn := k8Client.MustGatewayPortForward(port, port) - defer closeFn() - - zbClient, err := internal.CreateZeebeClient(port) - ensureNoError(err) - defer zbClient.Close() - - pod := getBrokerPod(k8Client, zbClient, nodeId, partitionId, role) - internal.LogInfo("Put stress on %s", pod.Name) - - stressType := internal.StressType{CpuStress: cpuStress, IoStress: ioStress, MemStress: memoryStress} - err = internal.PutStressOnPod(k8Client, timeoutSec, pod.Name, stressType) - ensureNoError(err) - }, -} - -var stressGateway = &cobra.Command{ - Use: "gateway", - Short: "Put stress on a Zeebe Gateway", - Long: `Put stress on a Zeebe Gateway. Stress can be of different kinds: memory, io or CPU.`, - Run: func(cmd *cobra.Command, args []string) { - internal.Verbosity = Verbose - k8Client, err := internal.CreateK8Client() - ensureNoError(err) - - pod := getGatewayPod(k8Client) - internal.LogInfo("Put stress on %s", pod.Name) - - stressType := internal.StressType{CpuStress: cpuStress, IoStress: ioStress, MemStress: memoryStress} - err = internal.PutStressOnPod(k8Client, timeoutSec, pod.Name, stressType) - ensureNoError(err) - }, -} - func getBrokerPod(k8Client internal.K8Client, zbClient zbc.Client, brokerNodeId int, brokerPartitionId int, brokerRole string) *v1.Pod { var brokerPod *v1.Pod var err error @@ -110,7 +102,7 @@ func getBrokerPod(k8Client internal.K8Client, zbClient zbc.Client, brokerNodeId } else { brokerPod, err = internal.GetBrokerPodForPartitionAndRole(k8Client, zbClient, brokerPartitionId, brokerRole) ensureNoError(err) - internal.LogVerbose("Found Broker %s as %s for partition %d.", brokerPod.Name, role, brokerPartitionId) + internal.LogVerbose("Found Broker %s as %s for partition %d.", brokerPod.Name, brokerRole, brokerPartitionId) } return brokerPod diff --git a/go-chaos/cmd/terminate.go b/go-chaos/cmd/terminate.go index 32001fa13..2e0484f39 100644 --- a/go-chaos/cmd/terminate.go +++ b/go-chaos/cmd/terminate.go @@ -22,70 +22,71 @@ import ( "github.com/zeebe-io/zeebe-chaos/go-chaos/internal" ) -var ( - all bool -) +func AddTerminateCommand(rootCmd *cobra.Command, flags Flags) { + + var terminateCmd = &cobra.Command{ + Use: "terminate", + Short: "Terminates a Zeebe node", + Long: `Terminates a Zeebe node, it can be chosen between: broker, gateway or a worker.`, + } + + var terminateBrokerCmd = &cobra.Command{ + Use: "broker", + Short: "Terminates a Zeebe broker", + Long: `Terminates a Zeebe broker with a certain role and given partition.`, + Run: func(cmd *cobra.Command, args []string) { + k8Client, err := createK8ClientWithFlags(flags) + ensureNoError(err) + gracePeriodSec := int64(0) + brokerName := restartBroker(k8Client, flags.nodeId, flags.partitionId, flags.role, &gracePeriodSec) + internal.LogInfo("Terminated %s", brokerName) + }, + } + + var terminateGatewayCmd = &cobra.Command{ + Use: "gateway", + Short: "Terminates a Zeebe gateway", + Long: `Terminates a Zeebe gateway.`, + Run: func(cmd *cobra.Command, args []string) { + k8Client, err := createK8ClientWithFlags(flags) + ensureNoError(err) + gracePeriodSec := int64(0) + gatewayPod := restartGateway(k8Client, &gracePeriodSec) + internal.LogInfo("Terminated %s", gatewayPod) + }, + } + + var terminateWorkerCmd = &cobra.Command{ + Use: "worker", + Short: "Terminates a Zeebe worker", + Long: `Terminates a Zeebe worker.`, + Run: func(cmd *cobra.Command, args []string) { + k8Client, err := createK8ClientWithFlags(flags) + ensureNoError(err) + gracePeriodSec := int64(0) + restartWorker(k8Client, flags.all, "Terminated", &gracePeriodSec) + }, + } -func init() { rootCmd.AddCommand(terminateCmd) terminateCmd.AddCommand(terminateBrokerCmd) - terminateBrokerCmd.Flags().StringVar(&role, "role", "LEADER", "Specify the partition role [LEADER, FOLLOWER]") - terminateBrokerCmd.Flags().IntVar(&partitionId, "partitionId", 1, "Specify the id of the partition") - terminateBrokerCmd.Flags().IntVar(&nodeId, "nodeId", -1, "Specify the nodeId of the Broker") + terminateBrokerCmd.Flags().StringVar(&flags.role, "role", "LEADER", "Specify the partition role [LEADER, FOLLOWER]") + terminateBrokerCmd.Flags().IntVar(&flags.partitionId, "partitionId", 1, "Specify the id of the partition") + terminateBrokerCmd.Flags().IntVar(&flags.nodeId, "nodeId", -1, "Specify the nodeId of the Broker") terminateBrokerCmd.MarkFlagsMutuallyExclusive("partitionId", "nodeId") terminateCmd.AddCommand(terminateGatewayCmd) terminateCmd.AddCommand(terminateWorkerCmd) - terminateWorkerCmd.Flags().BoolVar(&all, "all", false, "Specify whether all workers should be terminated") -} + terminateWorkerCmd.Flags().BoolVar(&flags.all, "all", false, "Specify whether all workers should be terminated") -var terminateCmd = &cobra.Command{ - Use: "terminate", - Short: "Terminates a Zeebe node", - Long: `Terminates a Zeebe node, it can be chosen between: broker, gateway or a worker.`, -} - -var terminateBrokerCmd = &cobra.Command{ - Use: "broker", - Short: "Terminates a Zeebe broker", - Long: `Terminates a Zeebe broker with a certain role and given partition.`, - Run: func(cmd *cobra.Command, args []string) { - gracePeriodSec := int64(0) - brokerName := restartBroker(nodeId, partitionId, role, &gracePeriodSec) - internal.LogInfo("Terminated %s", brokerName) - }, -} - -var terminateGatewayCmd = &cobra.Command{ - Use: "gateway", - Short: "Terminates a Zeebe gateway", - Long: `Terminates a Zeebe gateway.`, - Run: func(cmd *cobra.Command, args []string) { - gracePeriodSec := int64(0) - gatewayPod := restartGateway(&gracePeriodSec) - internal.LogInfo("Terminated %s", gatewayPod) - }, -} - -var terminateWorkerCmd = &cobra.Command{ - Use: "worker", - Short: "Terminates a Zeebe worker", - Long: `Terminates a Zeebe worker.`, - Run: func(cmd *cobra.Command, args []string) { - gracePeriodSec := int64(0) - restartWorker(all, "Terminated", &gracePeriodSec) - }, } // Restart a broker pod. Pod is identified either by nodeId or by partitionId and role. // GracePeriod (in second) can be nil, which would mean using K8 default. // Returns the broker which has been restarted -func restartBroker(nodeId int, partitionId int, role string, gracePeriod *int64) string { - k8Client, err := internal.CreateK8Client() - ensureNoError(err) - +func restartBroker(k8Client internal.K8Client, nodeId int, partitionId int, role string, gracePeriod *int64) string { port := 26500 closeFn := k8Client.MustGatewayPortForward(port, port) defer closeFn() @@ -104,10 +105,7 @@ func restartBroker(nodeId int, partitionId int, role string, gracePeriod *int64) // Restart a gateway pod. The pod is the first from a list of existing pods. // GracePeriod (in second) can be nil, which would mean using K8 default. // Returns the gateway which has been restarted -func restartGateway(gracePeriod *int64) string { - k8Client, err := internal.CreateK8Client() - ensureNoError(err) - +func restartGateway(k8Client internal.K8Client, gracePeriod *int64) string { gatewayPodNames, err := k8Client.GetGatewayPodNames() ensureNoError(err) @@ -124,10 +122,7 @@ func restartGateway(gracePeriod *int64) string { // Restart a worker pod. The pod is the first from a list of existing pods, if all is not specified. // GracePeriod (in second) can be nil, which would mean using K8 default. // The actionName specifies whether it was restarted or terminated to log the right thing. -func restartWorker(all bool, actionName string, gracePeriod *int64) { - k8Client, err := internal.CreateK8Client() - ensureNoError(err) - +func restartWorker(k8Client internal.K8Client, all bool, actionName string, gracePeriod *int64) { workerPods, err := k8Client.GetWorkerPods() ensureNoError(err) diff --git a/go-chaos/cmd/topology.go b/go-chaos/cmd/topology.go index ef2ed0ac7..66f6a1d4b 100644 --- a/go-chaos/cmd/topology.go +++ b/go-chaos/cmd/topology.go @@ -26,38 +26,38 @@ import ( "github.com/zeebe-io/zeebe-chaos/go-chaos/internal" ) -func init() { - rootCmd.AddCommand(topologyCmd) -} +func AddTopologyCmd(rootCmd *cobra.Command, flags Flags) { -var topologyCmd = &cobra.Command{ - Use: "topology", - Short: "Print the Zeebe topology deployed in the current namespace", - Long: `Shows the current Zeebe topology, in the current kubernetes namespace.`, - Run: func(cmd *cobra.Command, args []string) { - k8Client, err := internal.CreateK8Client() - if err != nil { - panic(err) - } + var topologyCmd = &cobra.Command{ + Use: "topology", + Short: "Print the Zeebe topology deployed in the current namespace", + Long: `Shows the current Zeebe topology, in the current kubernetes namespace.`, + Run: func(cmd *cobra.Command, args []string) { + k8Client, err := createK8ClientWithFlags(flags) + if err != nil { + panic(err) + } - port := 26500 - closeFn := k8Client.MustGatewayPortForward(port, port) - defer closeFn() + port := 26500 + closeFn := k8Client.MustGatewayPortForward(port, port) + defer closeFn() - client, err := internal.CreateZeebeClient(port) - if err != nil { - panic(err) - } + client, err := internal.CreateZeebeClient(port) + if err != nil { + panic(err) + } - response, err := client.NewTopologyCommand().Send(context.TODO()) - if err != nil { - panic(err) - } + response, err := client.NewTopologyCommand().Send(context.TODO()) + if err != nil { + panic(err) + } - builder := strings.Builder{} - writeTopologyToOutput(&builder, response) - internal.LogInfo(builder.String()) - }, + builder := strings.Builder{} + writeTopologyToOutput(&builder, response) + internal.LogInfo(builder.String()) + }, + } + rootCmd.AddCommand(topologyCmd) } func writeTopologyToOutput(output io.Writer, response *pb.TopologyResponse) { diff --git a/go-chaos/cmd/verify.go b/go-chaos/cmd/verify.go index ece48afe5..53fcfc554 100644 --- a/go-chaos/cmd/verify.go +++ b/go-chaos/cmd/verify.go @@ -21,81 +21,75 @@ import ( "github.com/zeebe-io/zeebe-chaos/go-chaos/internal" ) -var ( - version int - bpmnProcessId string - timeoutInSec int -) +func AddVerifyCommands(rootCmd *cobra.Command, flags Flags) { + + var verifyCmd = &cobra.Command{ + Use: "verify", + Short: "Verify certain properties", + Long: `Verify certain properties on Zeebe nodes, like readiness or steady-state.`, + } + + var verifyReadinessCmd = &cobra.Command{ + Use: "readiness", + Short: "Verify readiness of a Zeebe nodes", + Long: `Verifies the readiness of Zeebe nodes.`, + Run: func(cmd *cobra.Command, args []string) { + + k8Client, err := createK8ClientWithFlags(flags) + ensureNoError(err) + + err = k8Client.AwaitReadiness() + ensureNoError(err) + + internal.LogInfo("All Zeebe nodes are running.") + }, + } + + var verifyInstanceCreation = &cobra.Command{ + Use: "instance-creation", + Short: "Verify the instance creation", + Long: `Verifies that an instance from a specific process model can be created on a specific partition. +Process instances are created until the required partition is reached.`, + Run: func(cmd *cobra.Command, args []string) { + k8Client, err := createK8ClientWithFlags(flags) + ensureNoError(err) + + port := 26500 + closeFn := k8Client.MustGatewayPortForward(port, port) + defer closeFn() + + zbClient, err := internal.CreateZeebeClient(port) + ensureNoError(err) + defer zbClient.Close() + + processInstanceCreator, err := internal.CreateProcessInstanceCreator(zbClient, internal.ProcessInstanceCreationOptions{ + BpmnProcessId: flags.bpmnProcessId, + Version: int32(flags.version), + AwaitResult: flags.awaitResult, + Variables: flags.variables, + }) + ensureNoError(err) + if flags.awaitResult { + internal.LogVerbose("We await the result of the process instance creation, thus we skip the partition id check.") + flags.partitionId = 0 + } + err = internal.CreateProcessInstanceOnPartition(processInstanceCreator, int32(flags.partitionId), time.Duration(flags.timeoutInSec)*time.Second) + ensureNoError(err) + + internal.LogInfo("The steady-state was successfully verified!") + }, + } -func init() { rootCmd.AddCommand(verifyCmd) verifyCmd.AddCommand(verifyReadinessCmd) verifyCmd.AddCommand(verifyInstanceCreation) - verifyInstanceCreation.Flags().IntVar(&partitionId, "partitionId", 1, "Specify the id of the partition") - verifyInstanceCreation.Flags().StringVar(&variables, "variables", "", "Specify the variables for the process instance. Expect json string.") - verifyInstanceCreation.Flags().BoolVar(&awaitResult, "awaitResult", false, + verifyInstanceCreation.Flags().IntVar(&flags.partitionId, "partitionId", 1, "Specify the id of the partition") + verifyInstanceCreation.Flags().StringVar(&flags.variables, "variables", "", "Specify the variables for the process instance. Expect json string.") + verifyInstanceCreation.Flags().BoolVar(&flags.awaitResult, "awaitResult", false, "Specify whether the completion of the created process instance should be awaited. Note: if this flag is specified it is expected that it doesn't matter where the instance creation is happening, partition id is not validated and creation not retried.") - verifyInstanceCreation.Flags().IntVar(&timeoutInSec, "timeoutInSec", 30, "Specify the timeout of the verification in seconds") - - verifyInstanceCreation.Flags().StringVar(&bpmnProcessId, "bpmnProcessId", "benchmark", "Specify the BPMN process ID for which the instance should be created.") - verifyInstanceCreation.Flags().IntVar(&version, "version", -1, "Specify the version for which the instance should be created, defaults to latest version.") - -} - -var verifyCmd = &cobra.Command{ - Use: "verify", - Short: "Verify certain properties", - Long: `Verify certain properties on Zeebe nodes, like readiness or steady-state.`, -} + verifyInstanceCreation.Flags().IntVar(&flags.timeoutInSec, "timeoutInSec", 30, "Specify the timeout of the verification in seconds") -var verifyReadinessCmd = &cobra.Command{ - Use: "readiness", - Short: "Verify readiness of a Zeebe nodes", - Long: `Verifies the readiness of Zeebe nodes.`, - Run: func(cmd *cobra.Command, args []string) { - - k8Client, err := internal.CreateK8Client() - ensureNoError(err) - - err = k8Client.AwaitReadiness() - ensureNoError(err) - - internal.LogInfo("All Zeebe nodes are running.") - }, -} - -var verifyInstanceCreation = &cobra.Command{ - Use: "instance-creation", - Short: "Verify the instance creation", - Long: `Verifies that an instance from a specific process model can be created on a specific partition. -Process instances are created until the required partition is reached.`, - Run: func(cmd *cobra.Command, args []string) { - k8Client, err := internal.CreateK8Client() - ensureNoError(err) - - port := 26500 - closeFn := k8Client.MustGatewayPortForward(port, port) - defer closeFn() - - zbClient, err := internal.CreateZeebeClient(port) - ensureNoError(err) - defer zbClient.Close() - - processInstanceCreator, err := internal.CreateProcessInstanceCreator(zbClient, internal.ProcessInstanceCreationOptions{ - BpmnProcessId: bpmnProcessId, - Version: int32(version), - AwaitResult: awaitResult, - Variables: variables, - }) - ensureNoError(err) - if awaitResult { - internal.LogVerbose("We await the result of the process instance creation, thus we skip the partition id check.") - partitionId = 0 - } - err = internal.CreateProcessInstanceOnPartition(processInstanceCreator, int32(partitionId), time.Duration(timeoutInSec)*time.Second) - ensureNoError(err) - - internal.LogInfo("The steady-state was successfully verified!") - }, + verifyInstanceCreation.Flags().StringVar(&flags.bpmnProcessId, "bpmnProcessId", "benchmark", "Specify the BPMN process ID for which the instance should be created.") + verifyInstanceCreation.Flags().IntVar(&flags.version, "version", -1, "Specify the version for which the instance should be created, defaults to latest version.") } diff --git a/go-chaos/cmd/version.go b/go-chaos/cmd/version.go index 0bacadb95..ced2f6f01 100644 --- a/go-chaos/cmd/version.go +++ b/go-chaos/cmd/version.go @@ -22,25 +22,21 @@ import ( "github.com/zeebe-io/zeebe-chaos/go-chaos/internal" ) -var ( - Version = "development" - Commit = "HEAD" -) - func VersionString() string { commit := Commit[0:int(math.Min(8, float64(len(Commit))))] return fmt.Sprintf("zbchaos %s (commit: %s)", Version, commit) } -var versionCmd = &cobra.Command{ - Use: "version", - Short: "Print the version of zbchaos", - Args: cobra.NoArgs, - Run: func(cmd *cobra.Command, args []string) { - internal.LogInfo(VersionString()) - }, -} +func AddVersionCmd(rootCmd *cobra.Command) { + + var versionCmd = &cobra.Command{ + Use: "version", + Short: "Print the version of zbchaos", + Args: cobra.NoArgs, + Run: func(cmd *cobra.Command, args []string) { + internal.LogInfo(VersionString()) + }, + } -func init() { rootCmd.AddCommand(versionCmd) } diff --git a/go-chaos/cmd/worker.go b/go-chaos/cmd/worker.go index aebec4156..82d4f9761 100644 --- a/go-chaos/cmd/worker.go +++ b/go-chaos/cmd/worker.go @@ -36,17 +36,16 @@ const ENV_CLIENT_ID = "CHAOS_AUTOMATION_CLUSTER_CLIENT_ID" const ENV_CLIENT_SECRET = "CHAOS_AUTOMATION_CLUSTER_CLIENT_SECRET" const ENV_ADDRESS = "CHAOS_AUTOMATION_CLUSTER_ADDRESS" -func init() { +func AddWorkerCmd(rootCmd *cobra.Command) { + var workerCommand = &cobra.Command{ + Use: "worker", + Short: "Starts a worker for zbchaos jobs", + Long: "Starts a worker for zbchaos jobs that executes zbchaos commands", + Run: start_worker, + } rootCmd.AddCommand(workerCommand) } -var workerCommand = &cobra.Command{ - Use: "worker", - Short: "Starts a worker for zbchaos jobs", - Long: "Starts a worker for zbchaos jobs that executes zbchaos commands", - Run: start_worker, -} - func start_worker(cmd *cobra.Command, args []string) { // The credentials are set via env var's. // We use here different names for the environment variables on purpose. @@ -94,8 +93,9 @@ func handleZbChaosJob(client zbworker.JobClient, job entities.Job) { func runZbChaosCommand(args []string, ctx context.Context) error { internal.LogInfo("Running command with args: %v ", args) - rootCmd.SetArgs(args) - _, err := rootCmd.ExecuteContextC(ctx) + cmd := NewCmd() + cmd.SetArgs(args) + _, err := cmd.ExecuteContextC(ctx) if err != nil { return err } diff --git a/go-chaos/cmd/zeebePods.go b/go-chaos/cmd/zeebePods.go index 790bb7976..75643e79e 100644 --- a/go-chaos/cmd/zeebePods.go +++ b/go-chaos/cmd/zeebePods.go @@ -19,27 +19,27 @@ import ( "github.com/zeebe-io/zeebe-chaos/go-chaos/internal" ) -func init() { - rootCmd.AddCommand(getZeebeBrokersCmd) -} +func AddBrokersCommand(rootCmd *cobra.Command, flags Flags) { + var getZeebeBrokersCmd = &cobra.Command{ + Use: "brokers", + Short: "Print the name of the Zeebe broker pods", + Long: `Show all names of deployed Zeebe brokers, in the current kubernetes namespace.`, + Run: func(cmd *cobra.Command, args []string) { + k8Client, err := createK8ClientWithFlags(flags) + if err != nil { + panic(err) + } -var getZeebeBrokersCmd = &cobra.Command{ - Use: "brokers", - Short: "Print the name of the Zeebe broker pods", - Long: `Show all names of deployed Zeebe brokers, in the current kubernetes namespace.`, - Run: func(cmd *cobra.Command, args []string) { - k8Client, err := internal.CreateK8Client() - if err != nil { - panic(err) - } + pods, err := k8Client.GetBrokerPodNames() + if err != nil { + panic(err) + } - pods, err := k8Client.GetBrokerPodNames() - if err != nil { - panic(err) - } + for _, item := range pods { + internal.LogInfo("%s", item) + } + }, + } - for _, item := range pods { - internal.LogInfo("%s", item) - } - }, + rootCmd.AddCommand(getZeebeBrokersCmd) } diff --git a/go-chaos/internal/flags.go b/go-chaos/internal/flags.go index d054c7e8e..829791c6d 100644 --- a/go-chaos/internal/flags.go +++ b/go-chaos/internal/flags.go @@ -15,21 +15,12 @@ package internal import ( - "github.com/camunda/zeebe/clients/go/v8/pkg/zbc" "github.com/rs/zerolog" ) -// defines whether the functions should print verbose output +// Verbosity defines whether the functions should print verbose output var Verbosity = false -// defines if a custom kube config should be used instead of the default one found by k8s -var KubeConfigPath string - -// sets the namespace to be used instead of the namespace from the current kube context -var Namespace string - -var ZeebeClientCredential zbc.CredentialsProvider - -// defines whether the logging should be structured json logging +// JsonLogging defines whether the logging should be structured json logging var JsonLogging bool var JsonLogger zerolog.Logger diff --git a/go-chaos/internal/k8helper.go b/go-chaos/internal/k8helper.go index f8fd7baea..4f847631e 100644 --- a/go-chaos/internal/k8helper.go +++ b/go-chaos/internal/k8helper.go @@ -43,8 +43,8 @@ func (c K8Client) GetCurrentNamespace() string { } // Creates a kubernetes client, based on the local kubeconfig -func CreateK8Client() (K8Client, error) { - settings := findKubernetesSettings() +func CreateK8Client(kubeConfigPath string, namespace string) (K8Client, error) { + settings := findKubernetesSettings(kubeConfigPath, namespace) return createK8Client(settings) } @@ -89,8 +89,8 @@ type KubernetesSettings struct { namespace string } -func findKubernetesSettings() KubernetesSettings { - kubeconfig := KubeConfigPath +func findKubernetesSettings(kubeConfigPath string, namespace string) KubernetesSettings { + kubeconfig := kubeConfigPath if kubeconfig == "" { // based on https://github.com/kubernetes/client-go/blob/master/examples/out-of-cluster-client-configuration/main.go if home := homedir.HomeDir(); home != "" { @@ -99,6 +99,6 @@ func findKubernetesSettings() KubernetesSettings { } return KubernetesSettings{ kubeConfigPath: kubeconfig, - namespace: Namespace, + namespace: namespace, } } diff --git a/go-chaos/internal/k8helper_test.go b/go-chaos/internal/k8helper_test.go index 391348a54..a09c8f83f 100644 --- a/go-chaos/internal/k8helper_test.go +++ b/go-chaos/internal/k8helper_test.go @@ -75,7 +75,7 @@ func Test_ResolveDefaultKubePath(t *testing.T) { home := homedir.HomeDir() // when - settings := findKubernetesSettings() + settings := findKubernetesSettings("", "") // then assert.Equal(t, strings.Join([]string{home, ".kube/config"}, "/"), settings.kubeConfigPath)