Skip to content

Commit

Permalink
[cain/restore] add option to restore schema, fixes #4
Browse files Browse the repository at this point in the history
  • Loading branch information
maorfr committed Dec 26, 2018
1 parent 426ddab commit cb98729
Show file tree
Hide file tree
Showing 4 changed files with 55 additions and 3 deletions.
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ Cain is a backup and restore tool for Cassandra on Kubernetes. It is named after
Cain supports the following cloud storage services:

* AWS S3
* Minio S3
* Azure Blob Storage

Cain is now an official part of the Helm [incubator/cassandra](https://github.com/helm/charts/tree/master/incubator/cassandra) chart!
Expand Down Expand Up @@ -109,6 +110,7 @@ Flags:
-k, --keyspace string keyspace to act on
-n, --namespace string namespace to find cassandra cluster
-p, --parallel int number of files to copy in parallel. set this flag to 0 for full parallelism (default 1)
-s, --schema string schema version to restore (optional)
-l, --selector string selector to filter on
--src string source to restore from. Example: s3://bucket/cassandra/namespace/cluster-name
-t, --tag string tag to restore
Expand Down
2 changes: 1 addition & 1 deletion cmd/cain.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ func NewRestoreCmd(out io.Writer) *cobra.Command {
f.StringVar(&r.src, "src", "", "source to restore from. Example: s3://bucket/cassandra/namespace/cluster-name")
f.StringVarP(&r.keyspace, "keyspace", "k", "", "keyspace to act on")
f.StringVarP(&r.tag, "tag", "t", "", "tag to restore")
f.StringVarP(&r.schema, "schema", "s", "", "schema to restore")
f.StringVarP(&r.schema, "schema", "s", "", "schema version to restore (optional)")
f.StringVarP(&r.namespace, "namespace", "n", "", "namespace to find cassandra cluster")
f.StringVarP(&r.selector, "selector", "l", "", "selector to filter on")
f.StringVarP(&r.container, "container", "c", "cassandra", "container name to act on")
Expand Down
16 changes: 15 additions & 1 deletion pkg/cain/cain.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package cain

import (
"fmt"
"log"
"path/filepath"

Expand Down Expand Up @@ -100,8 +101,21 @@ func Restore(o RestoreOptions) error {
log.Println("Getting current schema")
_, sum, err := DescribeKeyspaceSchema(k8sClient, o.Namespace, existingPods[0], o.Container, o.Keyspace)
if err != nil {
return err
if o.Schema == "" {
return err
}
log.Println("Schema not found, restoring schema", o.Schema)
sum, err = RestoreKeyspaceSchema(srcClient, k8sClient, srcPrefix, srcBasePath, o.Namespace, existingPods[0], o.Container, o.Keyspace, o.Schema, o.Parallel, o.BufferSize)
if err != nil {
return err
}
log.Println("Restored schema:", sum)
}

if o.Schema != "" && sum != o.Schema {
return fmt.Errorf("specified schema %s is not the same as found schema %s", o.Schema, sum)
}

log.Println("Found schema:", sum)

log.Println("Calculating paths. This may take a while...")
Expand Down
38 changes: 37 additions & 1 deletion pkg/cain/cqlsh.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func DescribeKeyspaceSchema(iK8sClient interface{}, namespace, pod, container, k
command := []string{fmt.Sprintf("DESC %s;", keyspace)}
schema, err := Cqlsh(iK8sClient, namespace, pod, container, command)
if err != nil {
return nil, "", fmt.Errorf("Could not describe schema. make sure a schema exists for keyspace \"%s\". %s", keyspace, err)
return nil, "", fmt.Errorf("Could not describe schema. make sure a schema exists for keyspace \"%s\" or restore it using \"--schema\". %s", keyspace, err)
}
h := sha256.New()
h.Write(schema)
Expand All @@ -49,6 +49,24 @@ func DescribeKeyspaceSchema(iK8sClient interface{}, namespace, pod, container, k
return schema, sum, nil
}

// RestoreKeyspaceSchema restores a keyspace schema
func RestoreKeyspaceSchema(srcClient, iK8sClient interface{}, srcPrefix, srcPath, namespace, pod, container, keyspace, schema string, parallel int, bufferSize float64) (string, error) {
schemaTmpFile := fmt.Sprintf("/tmp/%s/schema.cql", keyspace)
fromTo := skbn.FromToPair{
FromPath: filepath.Join(srcPath, keyspace, schema, "schema.cql"),
ToPath: filepath.Join(namespace, pod, container, schemaTmpFile),
}
if err := skbn.PerformCopy(srcClient, iK8sClient, srcPrefix, "k8s", []skbn.FromToPair{fromTo}, parallel, bufferSize); err != nil {
return "", err
}
if _, err := CqlshF(iK8sClient, namespace, pod, container, schemaTmpFile); err != nil {
return "", err
}
_, sum, err := DescribeKeyspaceSchema(iK8sClient, namespace, pod, container, keyspace)

return sum, err
}

// TruncateTables truncates the provided tables in all pods
func TruncateTables(iK8sClient interface{}, namespace, container, keyspace string, pods, tables, materializedViews []string) {
bwgSize := len(pods)
Expand Down Expand Up @@ -123,6 +141,24 @@ func Cqlsh(iK8sClient interface{}, namespace, pod, container string, command []s
return removeWarning(stdout.Bytes()), nil
}

// CqlshF executes cqlsh -f file in a given pod
func CqlshF(iK8sClient interface{}, namespace, pod, container string, file string) ([]byte, error) {
k8sClient := iK8sClient.(*skbn.K8sClient)

command := []string{"cqlsh", "-f", file}
stdout := new(bytes.Buffer)
stderr, err := skbn.Exec(*k8sClient, namespace, pod, container, command, nil, stdout)

if len(stderr) != 0 {
return nil, fmt.Errorf("STDERR: " + (string)(stderr))
}
if err != nil {
return nil, err
}

return removeWarning(stdout.Bytes()), nil
}

func removeWarning(b []byte) []byte {
const warning = "Warning: Cannot create directory at `/home/cassandra/.cassandra`. Command history will not be saved."
return []byte(strings.Replace((string)(b), warning, "", 1))
Expand Down

0 comments on commit cb98729

Please sign in to comment.