Skip to content

Commit

Permalink
Encapsulate all commands (#289)
Browse files Browse the repository at this point in the history
In order to not run into issues with global variables reused and reset
during worker execute, we encapsulated all commands and make it
available via `NewCMD` so everytime we have to create a new command to
execute zbchaos in the workers.

This allows us to always use a clean context.
  • Loading branch information
ChrisKujawa authored Dec 9, 2022
2 parents e08d6ac + b5f573f commit d4e5724
Show file tree
Hide file tree
Showing 20 changed files with 779 additions and 772 deletions.
24 changes: 12 additions & 12 deletions go-chaos/backend/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ import (
v1 "k8s.io/api/core/v1"
)

func ConnectBrokers() error {
k8Client, err := internal.CreateK8Client()
func ConnectBrokers(kubeConfigPath string, namespace string) error {
k8Client, err := internal.CreateK8Client(kubeConfigPath, namespace)
if err != nil {
return err
}
Expand Down Expand Up @@ -55,8 +55,8 @@ func ConnectBrokers() error {
return nil
}

func ConnectGateway() error {
k8Client, err := internal.CreateK8Client()
func ConnectGateway(kubeConfigPath string, namespace string) error {
k8Client, err := internal.CreateK8Client(kubeConfigPath, namespace)
if err != nil {
return err
}
Expand Down Expand Up @@ -103,8 +103,8 @@ type DisconnectBrokerCfg struct {
OneDirection bool
}

func DisconnectBroker(disconnectBrokerCfg DisconnectBrokerCfg) error {
k8Client, err := prepareBrokerDisconnect()
func DisconnectBroker(kubeConfigPath string, namespace string, disconnectBrokerCfg DisconnectBrokerCfg) error {
k8Client, err := prepareBrokerDisconnect(kubeConfigPath, namespace)

zbClient, closeFn, err := ConnectToZeebeCluster(k8Client)
if err != nil {
Expand Down Expand Up @@ -138,8 +138,8 @@ type DisconnectGatewayCfg struct {
BrokerCfg Broker
}

func DisconnectGateway(disconnectGatewayCfg DisconnectGatewayCfg) error {
k8Client, zbClient, closeFn, err := prepareGatewayDisconnect()
func DisconnectGateway(kubeConfigPath string, namespace string, disconnectGatewayCfg DisconnectGatewayCfg) error {
k8Client, zbClient, closeFn, err := prepareGatewayDisconnect(kubeConfigPath, namespace)
if err != nil {
return err
}
Expand Down Expand Up @@ -173,8 +173,8 @@ func DisconnectGateway(disconnectGatewayCfg DisconnectGatewayCfg) error {
return nil
}

func prepareGatewayDisconnect() (internal.K8Client, zbc.Client, func(), error) {
k8Client, err := prepareBrokerDisconnect()
func prepareGatewayDisconnect(kubeConfigPath string, namespace string) (internal.K8Client, zbc.Client, func(), error) {
k8Client, err := prepareBrokerDisconnect(kubeConfigPath, namespace)
if err != nil {
return k8Client, nil, nil, err
}
Expand All @@ -199,8 +199,8 @@ func prepareGatewayDisconnect() (internal.K8Client, zbc.Client, func(), error) {
return k8Client, zbClient, closeFn, nil
}

func prepareBrokerDisconnect() (internal.K8Client, error) {
k8Client, err := internal.CreateK8Client()
func prepareBrokerDisconnect(kubeConfigPath string, namespace string) (internal.K8Client, error) {
k8Client, err := internal.CreateK8Client(kubeConfigPath, namespace)
if err != nil {
return internal.K8Client{}, err
}
Expand Down
132 changes: 69 additions & 63 deletions go-chaos/cmd/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package cmd

import (
"context"
"encoding/json"
"errors"
"fmt"
Expand All @@ -33,78 +34,83 @@ import (
"github.com/zeebe-io/zeebe-chaos/go-chaos/internal"
)

var (
backupId string
)
func AddBackupCommand(rootCmd *cobra.Command, flags Flags) {

func init() {
rootCmd.AddCommand(backupCommand)
backupCommand.AddCommand(setupBackupCommand)
backupCommand.AddCommand(takeBackupCommand)
takeBackupCommand.Flags().StringVar(&backupId, "backupId", strconv.FormatInt(time.Now().UnixMilli(), 10), "optionally specify the backup id to use, uses the current timestamp by default")
backupCommand.AddCommand(waitForBackupCommand)
waitForBackupCommand.Flags().StringVar(&backupId, "backupId", strconv.FormatInt(time.Now().UnixMilli(), 10), "optionally specify the backup id to use, uses the current timestamp by default")
backupCommand.AddCommand(restoreBackupCommand)
restoreBackupCommand.Flags().StringVar(&backupId, "backupId", strconv.FormatInt(time.Now().UnixMilli(), 10), "optionally specify the backup id to use, uses the current timestamp by default")
}
var backupCommand = &cobra.Command{
Use: "backup",
Short: "Controls Zeebe backups",
Long: "Can be used to take backups and query their status",
}

var backupCommand = &cobra.Command{
Use: "backup",
Short: "Controls Zeebe backups",
Long: "Can be used to take backups and query their status",
}
var setupBackupCommand = &cobra.Command{
Use: "setup",
Short: "Configures a zeebe cluster's backup settings",
RunE: func(cmd *cobra.Command, args []string) error {
return setupBackup(flags.kubeConfigPath, flags.namespace)
},
}

var setupBackupCommand = &cobra.Command{
Use: "setup",
Short: "Configures a zeebe cluster's backup settings",
RunE: setupBackup,
}
var takeBackupCommand = &cobra.Command{
Use: "take",
Short: "Trigger a backup",
RunE: func(cmd *cobra.Command, args []string) error {
return takeBackup(flags)
},
}

var takeBackupCommand = &cobra.Command{
Use: "take",
Short: "Trigger a backup",
RunE: takeBackup,
}
var waitForBackupCommand = &cobra.Command{
Use: "wait",
Short: "Wait for a backup to complete or fail",
RunE: func(cmd *cobra.Command, args []string) error {
return waitForBackup(flags)
},
}

var waitForBackupCommand = &cobra.Command{
Use: "wait",
Short: "Wait for a backup to complete or fail",
RunE: waitForBackup,
}
var restoreBackupCommand = &cobra.Command{
Use: "restore",
Short: "Restore from a given backup id",
RunE: func(cmd *cobra.Command, args []string) error {
return restoreFromBackup(flags)
},
}

var restoreBackupCommand = &cobra.Command{
Use: "restore",
Short: "Restore from a given backup id",
RunE: restoreFromBackup,
rootCmd.AddCommand(backupCommand)
backupCommand.AddCommand(setupBackupCommand)
backupCommand.AddCommand(takeBackupCommand)
takeBackupCommand.Flags().StringVar(&flags.backupId, "backupId", strconv.FormatInt(time.Now().UnixMilli(), 10), "optionally specify the backup id to use, uses the current timestamp by default")
backupCommand.AddCommand(waitForBackupCommand)
waitForBackupCommand.Flags().StringVar(&flags.backupId, "backupId", strconv.FormatInt(time.Now().UnixMilli(), 10), "optionally specify the backup id to use, uses the current timestamp by default")
backupCommand.AddCommand(restoreBackupCommand)
restoreBackupCommand.Flags().StringVar(&flags.backupId, "backupId", strconv.FormatInt(time.Now().UnixMilli(), 10), "optionally specify the backup id to use, uses the current timestamp by default")
}

func setupBackup(cmd *cobra.Command, _ []string) error {
k8Client, err := internal.CreateK8Client()
func setupBackup(kubeConfigPath string, namespace string) error {
k8Client, err := internal.CreateK8Client(kubeConfigPath, namespace)
if err != nil {
panic(err)
}

namespace := k8Client.GetCurrentNamespace()
namespace = k8Client.GetCurrentNamespace()

err = k8Client.PauseReconciliation()
if err != nil {
return err
}

_, err = createBackupSecret(cmd, k8Client, namespace)
_, err = createBackupSecret(k8Client, namespace)
if err != nil {
return err
}

err = setupStatefulSetForBackups(cmd, err, k8Client, namespace)
err = setupStatefulSetForBackups(err, k8Client, namespace)
if err != nil {
return err
}
err = setupGatewayForBackups(cmd, err, k8Client, namespace)
err = setupGatewayForBackups(err, k8Client, namespace)
return err
}

func setupStatefulSetForBackups(cmd *cobra.Command, err error, k8Client internal.K8Client, namespace string) error {
func setupStatefulSetForBackups(err error, k8Client internal.K8Client, namespace string) error {
sfs, err := k8Client.GetZeebeStatefulSet()
if err != nil {
return err
Expand All @@ -122,17 +128,17 @@ func setupStatefulSetForBackups(cmd *cobra.Command, err error, k8Client internal
core.EnvVar{Name: "MANAGEMENT_ENDPOINTS_BACKUPS_ENABLED", Value: "true"},
)

_, err = k8Client.Clientset.AppsV1().StatefulSets(namespace).Update(cmd.Context(), sfs, meta.UpdateOptions{})
_, err = k8Client.Clientset.AppsV1().StatefulSets(namespace).Update(context.TODO(), sfs, meta.UpdateOptions{})
return err
}

func setupGatewayForBackups(cmd *cobra.Command, err error, k8Client internal.K8Client, namespace string) error {
func setupGatewayForBackups(err error, k8Client internal.K8Client, namespace string) error {
saasGatewayLabels := meta.LabelSelector{
MatchLabels: map[string]string{"app.kubernetes.io/component": "standalone-gateway"},
}
var gatewayDeployments *apps.DeploymentList

gatewayDeployments, err = k8Client.Clientset.AppsV1().Deployments(namespace).List(cmd.Context(), meta.ListOptions{LabelSelector: labels.Set(saasGatewayLabels.MatchLabels).String()})
gatewayDeployments, err = k8Client.Clientset.AppsV1().Deployments(namespace).List(context.TODO(), meta.ListOptions{LabelSelector: labels.Set(saasGatewayLabels.MatchLabels).String()})
if err != nil {
return err
}
Expand All @@ -141,7 +147,7 @@ func setupGatewayForBackups(cmd *cobra.Command, err error, k8Client internal.K8C
MatchLabels: map[string]string{"app.kubernetes.io/component": "zeebe-gateway"},
}
gatewayDeployments, err = k8Client.Clientset.AppsV1().Deployments(namespace).List(
cmd.Context(),
context.TODO(),
meta.ListOptions{LabelSelector: labels.Set(selector.MatchLabels).String()},
)
if err != nil {
Expand All @@ -156,13 +162,13 @@ func setupGatewayForBackups(cmd *cobra.Command, err error, k8Client internal.K8C
core.EnvVar{Name: "MANAGEMENT_ENDPOINTS_WEB_EXPOSURE_INCLUDE", Value: "*"},
core.EnvVar{Name: "MANAGEMENT_ENDPOINTS_BACKUPS_ENABLED", Value: "true"},
)
_, err = k8Client.Clientset.AppsV1().Deployments(namespace).Update(cmd.Context(), &gateway, meta.UpdateOptions{})
_, err = k8Client.Clientset.AppsV1().Deployments(namespace).Update(context.TODO(), &gateway, meta.UpdateOptions{})
return err
}

func createBackupSecret(cmd *cobra.Command, k8Client internal.K8Client, namespace string) (*core.Secret, error) {
func createBackupSecret(k8Client internal.K8Client, namespace string) (*core.Secret, error) {
return k8Client.Clientset.CoreV1().Secrets(namespace).Create(
cmd.Context(),
context.TODO(),
&core.Secret{
Type: "Opaque",
ObjectMeta: meta.ObjectMeta{Name: "zeebe-backup-store-s3"},
Expand All @@ -176,8 +182,8 @@ func createBackupSecret(cmd *cobra.Command, k8Client internal.K8Client, namespac
)
}

func takeBackup(*cobra.Command, []string) error {
k8Client, err := internal.CreateK8Client()
func takeBackup(flags Flags) error {
k8Client, err := createK8ClientWithFlags(flags)
if err != nil {
panic(err)
}
Expand All @@ -190,7 +196,7 @@ func takeBackup(*cobra.Command, []string) error {
port := 9600
closePortForward := k8Client.MustGatewayPortForward(port, port)
defer closePortForward()
url := fmt.Sprintf("http://localhost:%d/actuator/backups/%s", port, backupId)
url := fmt.Sprintf("http://localhost:%d/actuator/backups/%s", port, flags.backupId)
resp, err := http.Post(url, "", nil)
if err != nil {
return err
Expand All @@ -205,8 +211,8 @@ func takeBackup(*cobra.Command, []string) error {
return err
}

func waitForBackup(*cobra.Command, []string) error {
k8Client, err := internal.CreateK8Client()
func waitForBackup(flags Flags) error {
k8Client, err := createK8ClientWithFlags(flags)
if err != nil {
panic(err)
}
Expand All @@ -216,7 +222,7 @@ func waitForBackup(*cobra.Command, []string) error {
defer closePortForward()

for {
backup, err := getBackupStatus(port, backupId)
backup, err := getBackupStatus(port, flags.backupId)
if err != nil {
return err
}
Expand All @@ -234,8 +240,8 @@ func waitForBackup(*cobra.Command, []string) error {

}

func restoreFromBackup(cmd *cobra.Command, _ []string) error {
k8Client, err := internal.CreateK8Client()
func restoreFromBackup(flags Flags) error {
k8Client, err := createK8ClientWithFlags(flags)
if err != nil {
panic(err)
}
Expand Down Expand Up @@ -272,7 +278,7 @@ func restoreFromBackup(cmd *cobra.Command, _ []string) error {
Name: "restore-from-backup",
Image: sfs.Spec.Template.Spec.Containers[0].Image,
ImagePullPolicy: core.PullAlways,
Env: restoreEnvFromSfs(sfs),
Env: restoreEnvFromSfs(flags, sfs),
EnvFrom: []core.EnvFromSource{{SecretRef: &core.SecretEnvSource{LocalObjectReference: core.LocalObjectReference{Name: "zeebe-backup-store-s3"}}}},
VolumeMounts: []core.VolumeMount{
{
Expand All @@ -284,7 +290,7 @@ func restoreFromBackup(cmd *cobra.Command, _ []string) error {
}
sfs.Spec.Template.Spec.InitContainers = []core.Container{deleteContainer, restoreContainer}

_, err = k8Client.Clientset.AppsV1().StatefulSets(namespace).Update(cmd.Context(), sfs, meta.UpdateOptions{})
_, err = k8Client.Clientset.AppsV1().StatefulSets(namespace).Update(context.TODO(), sfs, meta.UpdateOptions{})
if err != nil {
return err
}
Expand All @@ -308,7 +314,7 @@ func restoreFromBackup(cmd *cobra.Command, _ []string) error {
return nil
}

func restoreEnvFromSfs(sfs *apps.StatefulSet) []core.EnvVar {
func restoreEnvFromSfs(flags Flags, sfs *apps.StatefulSet) []core.EnvVar {
zeebeEnv := sfs.Spec.Template.Spec.Containers[0].Env
restoreEnv := make([]core.EnvVar, 0)
for _, env := range zeebeEnv {
Expand All @@ -326,7 +332,7 @@ func restoreEnvFromSfs(sfs *apps.StatefulSet) []core.EnvVar {
},
core.EnvVar{
Name: "ZEEBE_RESTORE_FROM_BACKUP_ID",
Value: backupId,
Value: flags.backupId,
})
return restoreEnv
}
Expand Down
54 changes: 27 additions & 27 deletions go-chaos/cmd/connect.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,34 +19,34 @@ import (
"github.com/zeebe-io/zeebe-chaos/go-chaos/backend"
)

func init() {
rootCmd.AddCommand(connect)
connect.AddCommand(connectBrokers)
connect.AddCommand(connectGateway)
}
func AddConnectCmd(rootCmd *cobra.Command, flags Flags) {
var connect = &cobra.Command{
Use: "connect",
Short: "Connect Zeebe nodes",
Long: `Connect all Zeebe nodes again, after they have been disconnected uses sub-commands to connect brokers, gateways, etc.`,
}

var connect = &cobra.Command{
Use: "connect",
Short: "Connect Zeebe nodes",
Long: `Connect all Zeebe nodes again, after they have been disconnected uses sub-commands to connect brokers, gateways, etc.`,
}
var connectBrokers = &cobra.Command{
Use: "brokers",
Short: "Connect Zeebe Brokers",
Long: `Connect all Zeebe Brokers again, after they have been disconnected.`,
Run: func(cmd *cobra.Command, args []string) {
err := backend.ConnectBrokers(flags.kubeConfigPath, flags.namespace)
ensureNoError(err)
},
}

var connectBrokers = &cobra.Command{
Use: "brokers",
Short: "Connect Zeebe Brokers",
Long: `Connect all Zeebe Brokers again, after they have been disconnected.`,
Run: func(cmd *cobra.Command, args []string) {
err := backend.ConnectBrokers()
ensureNoError(err)
},
}
var connectGateway = &cobra.Command{
Use: "gateway",
Short: "Connect Zeebe Gateway",
Long: `Connect all Zeebe Gateway again, after it has been disconnected.`,
Run: func(cmd *cobra.Command, args []string) {
err := backend.ConnectGateway(flags.kubeConfigPath, flags.namespace)
ensureNoError(err)
},
}

var connectGateway = &cobra.Command{
Use: "gateway",
Short: "Connect Zeebe Gateway",
Long: `Connect all Zeebe Gateway again, after it has been disconnected.`,
Run: func(cmd *cobra.Command, args []string) {
err := backend.ConnectGateway()
ensureNoError(err)
},
rootCmd.AddCommand(connect)
connect.AddCommand(connectBrokers)
connect.AddCommand(connectGateway)
}
Loading

0 comments on commit d4e5724

Please sign in to comment.