diff --git a/cmd/cain.go b/cmd/cain.go index a95df55..eef9ead 100644 --- a/cmd/cain.go +++ b/cmd/cain.go @@ -37,12 +37,13 @@ func NewRootCmd(args []string) *cobra.Command { } type backupCmd struct { - namespace string - selector string - container string - keyspace string - dst string - parallel int + namespace string + selector string + container string + keyspace string + dst string + parallel int + bufferSize float64 out io.Writer } @@ -56,7 +57,7 @@ func NewBackupCmd(out io.Writer) *cobra.Command { Short: "backup cassandra cluster to cloud storage", Long: ``, Run: func(cmd *cobra.Command, args []string) { - if _, err := cain.Backup(b.namespace, b.selector, b.container, b.keyspace, b.dst, b.parallel); err != nil { + if _, err := cain.Backup(b.namespace, b.selector, b.container, b.keyspace, b.dst, b.parallel, b.bufferSize); err != nil { log.Fatal(err) } }, @@ -69,18 +70,20 @@ func NewBackupCmd(out io.Writer) *cobra.Command { f.StringVarP(&b.keyspace, "keyspace", "k", "", "keyspace to act on") f.StringVar(&b.dst, "dst", "", "destination to backup to. Example: s3://bucket/cassandra") f.IntVarP(&b.parallel, "parallel", "p", 1, "number of files to copy in parallel. set this flag to 0 for full parallelism") + f.Float64VarP(&b.bufferSize, "buffer-size", "b", 6.75, "in memory buffer size (MB) to use for files copy (buffer per file)") return cmd } type restoreCmd struct { - src string - keyspace string - tag string - namespace string - selector string - container string - parallel int + src string + keyspace string + tag string + namespace string + selector string + container string + parallel int + bufferSize float64 out io.Writer } @@ -94,7 +97,7 @@ func NewRestoreCmd(out io.Writer) *cobra.Command { Short: "restore cassandra cluster from cloud storage", Long: ``, Run: func(cmd *cobra.Command, args []string) { - if err := cain.Restore(r.src, r.keyspace, r.tag, r.namespace, r.selector, r.container, r.parallel); err != nil { + if err := cain.Restore(r.src, r.keyspace, r.tag, r.namespace, r.selector, r.container, r.parallel, r.bufferSize); err != nil { log.Fatal(err) } }, @@ -108,6 +111,7 @@ func NewRestoreCmd(out io.Writer) *cobra.Command { f.StringVarP(&r.selector, "selector", "l", "", "selector to filter on") f.StringVarP(&r.container, "container", "c", "cassandra", "container name to act on") f.IntVarP(&r.parallel, "parallel", "p", 1, "number of files to copy in parallel. set this flag to 0 for full parallelism") + f.Float64VarP(&r.bufferSize, "buffer-size", "b", 6.75, "in memory buffer size (MB) to use for files copy (buffer per file)") return cmd } diff --git a/pkg/cain/cain.go b/pkg/cain/cain.go index 3bcc3e4..6ee5b29 100644 --- a/pkg/cain/cain.go +++ b/pkg/cain/cain.go @@ -9,7 +9,7 @@ import ( ) // Backup performs backup -func Backup(namespace, selector, container, keyspace, dst string, parallel int) (string, error) { +func Backup(namespace, selector, container, keyspace, dst string, parallel int, bufferSize float64) (string, error) { log.Println("Backup started!") dstPrefix, dstPath := utils.SplitInTwo(dst, "://") @@ -45,7 +45,7 @@ func Backup(namespace, selector, container, keyspace, dst string, parallel int) } log.Println("Starting files copy") - if err := skbn.PerformCopy(k8sClient, dstClient, "k8s", dstPrefix, fromToPathsAllPods, parallel); err != nil { + if err := skbn.PerformCopy(k8sClient, dstClient, "k8s", dstPrefix, fromToPathsAllPods, parallel, bufferSize); err != nil { return "", err } @@ -57,7 +57,7 @@ func Backup(namespace, selector, container, keyspace, dst string, parallel int) } // Restore performs restore -func Restore(src, keyspace, tag, namespace, selector, container string, parallel int) error { +func Restore(src, keyspace, tag, namespace, selector, container string, parallel int, bufferSize float64) error { log.Println("Restore started!") srcPrefix, srcBasePath := utils.SplitInTwo(src, "://") @@ -102,7 +102,7 @@ func Restore(src, keyspace, tag, namespace, selector, container string, parallel TruncateTables(k8sClient, namespace, container, keyspace, existingPods, tablesToRefresh, materializedViews) log.Println("Starting files copy") - if err := skbn.PerformCopy(srcClient, k8sClient, srcPrefix, "k8s", fromToPaths, parallel); err != nil { + if err := skbn.PerformCopy(srcClient, k8sClient, srcPrefix, "k8s", fromToPaths, parallel, bufferSize); err != nil { return err } diff --git a/pkg/cain/cqlsh.go b/pkg/cain/cqlsh.go index 96fc272..fca4a1c 100644 --- a/pkg/cain/cqlsh.go +++ b/pkg/cain/cqlsh.go @@ -1,6 +1,7 @@ package cain import ( + "bytes" "crypto/sha256" "fmt" "log" @@ -26,7 +27,8 @@ func BackupKeyspaceSchema(iK8sClient, iDstClient interface{}, namespace, pod, co dstBasePath := filepath.Join(dstPath, namespace, clusterName, keyspace, sum) schemaToPath := filepath.Join(dstBasePath, "schema.cql") - if err := skbn.Upload(iDstClient, dstPrefix, schemaToPath, "", schema); err != nil { + reader := bytes.NewReader(schema) + if err := skbn.Upload(iDstClient, dstPrefix, schemaToPath, "", reader); err != nil { return "", nil } @@ -108,7 +110,8 @@ func Cqlsh(iK8sClient interface{}, namespace, pod, container string, command []s k8sClient := iK8sClient.(*skbn.K8sClient) command = append([]string{"cqlsh", "-e"}, command...) - stdout, stderr, err := skbn.Exec(*k8sClient, namespace, pod, container, command, nil) + stdout := new(bytes.Buffer) + stderr, err := skbn.Exec(*k8sClient, namespace, pod, container, command, nil, stdout) if len(stderr) != 0 { return nil, fmt.Errorf("STDERR: " + (string)(stderr)) @@ -117,7 +120,7 @@ func Cqlsh(iK8sClient interface{}, namespace, pod, container string, command []s return nil, err } - return removeWarning(stdout), nil + return removeWarning(stdout.Bytes()), nil } func removeWarning(b []byte) []byte { diff --git a/pkg/cain/nodetool.go b/pkg/cain/nodetool.go index 4f2c014..8c1e2b5 100644 --- a/pkg/cain/nodetool.go +++ b/pkg/cain/nodetool.go @@ -1,6 +1,7 @@ package cain import ( + "bytes" "fmt" "log" "strings" @@ -123,7 +124,8 @@ func refreshTable(k8sClient *skbn.K8sClient, namespace, pod, container, keyspace func nodetool(k8sClient *skbn.K8sClient, namespace, pod, container string, command []string) (string, error) { command = append([]string{"nodetool"}, command...) - stdout, stderr, err := skbn.Exec(*k8sClient, namespace, pod, container, command, nil) + stdout := new(bytes.Buffer) + stderr, err := skbn.Exec(*k8sClient, namespace, pod, container, command, nil, stdout) if len(stderr) != 0 { return "", fmt.Errorf("STDERR: " + (string)(stderr)) } @@ -131,7 +133,7 @@ func nodetool(k8sClient *skbn.K8sClient, namespace, pod, container string, comma return "", err } - return (string)(stdout), nil + return stdout.String(), nil } func printOutput(output, pod string) {