Skip to content

Commit

Permalink
Merge pull request #2053 from influxdb/backup-restore
Browse files Browse the repository at this point in the history
Backup & Restore
  • Loading branch information
pauldix committed Mar 25, 2015
2 parents b164b63 + 21782c0 commit 5e47ed1
Show file tree
Hide file tree
Showing 18 changed files with 1,926 additions and 27 deletions.
182 changes: 182 additions & 0 deletions cmd/influxd/backup.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,182 @@
package main

import (
"bytes"
"encoding/json"
"flag"
"fmt"
"io"
"log"
"net/http"
"net/url"
"os"

"github.com/influxdb/influxdb"
)

// BackupSuffix is a suffix added to the backup while it's in-process.
const BackupSuffix = ".pending"

// BackupCommand represents the program execution for "influxd backup".
type BackupCommand struct {
// The logger passed to the ticker during execution.
Logger *log.Logger

// Standard input/output, overridden for testing.
Stderr io.Writer
}

// NewBackupCommand returns a new instance of BackupCommand with default settings.
func NewBackupCommand() *BackupCommand {
return &BackupCommand{
Stderr: os.Stderr,
}
}

// Run excutes the program.
func (cmd *BackupCommand) Run(args ...string) error {
// Set up logger.
cmd.Logger = log.New(cmd.Stderr, "", log.LstdFlags)
cmd.Logger.Printf("influxdb backup, version %s, commit %s", version, commit)

// Parse command line arguments.
u, path, err := cmd.parseFlags(args)
if err != nil {
return err
}

// Retrieve snapshot from local file.
ss, err := influxdb.ReadFileSnapshot(path)
if err != nil && !os.IsNotExist(err) {
return fmt.Errorf("read file snapshot: %s", err)
}

// Determine temporary path to download to.
tmppath := path + BackupSuffix

// Calculate path of next backup file.
// This uses the path if it doesn't exist.
// Otherwise it appends an autoincrementing number.
path, err = cmd.nextPath(path)
if err != nil {
return fmt.Errorf("next path: %s", err)
}

// Retrieve snapshot.
if err := cmd.download(u, ss, tmppath); err != nil {
return fmt.Errorf("download: %s", err)
}

// Rename temporary file to final path.
if err := os.Rename(tmppath, path); err != nil {
return fmt.Errorf("rename: %s", err)
}

// TODO: Check file integrity.

// Notify user of completion.
cmd.Logger.Println("backup complete")

return nil
}

// parseFlags parses and validates the command line arguments.
func (cmd *BackupCommand) parseFlags(args []string) (url.URL, string, error) {
fs := flag.NewFlagSet("", flag.ContinueOnError)
host := fs.String("host", DefaultSnapshotURL.String(), "")
fs.SetOutput(cmd.Stderr)
fs.Usage = cmd.printUsage
if err := fs.Parse(args); err != nil {
return url.URL{}, "", err
}

// Parse host.
u, err := url.Parse(*host)
if err != nil {
return url.URL{}, "", fmt.Errorf("parse host url: %s", err)
}

// Require output path.
path := fs.Arg(0)
if path == "" {
return url.URL{}, "", fmt.Errorf("snapshot path required")
}

return *u, path, nil
}

// nextPath returns the next file to write to.
func (cmd *BackupCommand) nextPath(path string) (string, error) {
// Use base path if it doesn't exist.
if _, err := os.Stat(path); os.IsNotExist(err) {
return path, nil
} else if err != nil {
return "", err
}

// Otherwise iterate through incremental files until one is available.
for i := 0; ; i++ {
s := fmt.Sprintf(path+".%d", i)
if _, err := os.Stat(s); os.IsNotExist(err) {
return s, nil
} else if err != nil {
return "", err
}
}
}

// download downloads a snapshot from a host to a given path.
func (cmd *BackupCommand) download(u url.URL, ss *influxdb.Snapshot, path string) error {
// Create local file to write to.
f, err := os.Create(path)
if err != nil {
return fmt.Errorf("open temp file: %s", err)
}
defer f.Close()

// Encode snapshot.
var buf bytes.Buffer
if ss != nil {
if err := json.NewEncoder(&buf).Encode(ss); err != nil {
return fmt.Errorf("encode snapshot: %s", err)
}
}

// Create request with existing snapshot as the body.
u.Path = "/snapshot"
req, err := http.NewRequest("GET", u.String(), &buf)
if err != nil {
return fmt.Errorf("new request: %s", err)
}

// Fetch the archive from the server.
resp, err := http.DefaultClient.Do(req)
if err != nil {
return fmt.Errorf("get: %s", err)
}
defer func() { _ = resp.Body.Close() }()

// Check the status code.
if resp.StatusCode != http.StatusOK {
return fmt.Errorf("snapshot error: status=%d", resp.StatusCode)
}

// Write the archive to disk.
if _, err := io.Copy(f, resp.Body); err != nil {
fmt.Errorf("write snapshot: %s", err)
}

return nil
}

// printUsage prints the usage message to STDERR.
func (cmd *BackupCommand) printUsage() {
fmt.Fprintf(cmd.Stderr, `usage: influxd backup [flags] PATH
backup downloads a snapshot of a data node and saves it to disk.
-host <url>
The host to connect to snapshot.
Defaults to http://127.0.0.1:8087.
`)
}
122 changes: 122 additions & 0 deletions cmd/influxd/backup_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
package main_test

import (
"bytes"
"net/http"
"net/http/httptest"
"os"
"strings"
"testing"

"github.com/influxdb/influxdb"
"github.com/influxdb/influxdb/cmd/influxd"
)

// Ensure the backup can download from the server and save to disk.
func TestBackupCommand(t *testing.T) {
// Mock the backup endpoint.
s := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.URL.Path != "/snapshot" {
t.Fatalf("unexpected url path: %s", r.URL.Path)
}

// Write a simple snapshot to the buffer.
sw := influxdb.NewSnapshotWriter()
sw.Snapshot = &influxdb.Snapshot{Files: []influxdb.SnapshotFile{
{Name: "meta", Size: 5, Index: 10},
}}
sw.FileWriters["meta"] = influxdb.NopWriteToCloser(bytes.NewBufferString("55555"))
if _, err := sw.WriteTo(w); err != nil {
t.Fatal(err)
}
}))
defer s.Close()

// Create a temp path and remove incremental backups at the end.
path := tempfile()
defer os.Remove(path)
defer os.Remove(path)
defer os.Remove(path)

// Execute the backup against the mock server.
for i := 0; i < 3; i++ {
if err := NewBackupCommand().Run("-host", s.URL, path); err != nil {
t.Fatal(err)
}
}

// Verify snapshot and two incremental snapshots were written.
if _, err := os.Stat(path); err != nil {
t.Fatalf("snapshot not found: %s", err)
} else if _, err = os.Stat(path + ".0"); err != nil {
t.Fatalf("incremental snapshot(0) not found: %s", err)
} else if _, err = os.Stat(path + ".1"); err != nil {
t.Fatalf("incremental snapshot(1) not found: %s", err)
}
}

// Ensure the backup command returns an error if flags cannot be parsed.
func TestBackupCommand_ErrFlagParse(t *testing.T) {
cmd := NewBackupCommand()
if err := cmd.Run("-bad-flag"); err == nil || err.Error() != `flag provided but not defined: -bad-flag` {
t.Fatal(err)
} else if !strings.Contains(cmd.Stderr.String(), "usage") {
t.Fatal("usage message not displayed")
}
}

// Ensure the backup command returns an error if the host cannot be parsed.
func TestBackupCommand_ErrInvalidHostURL(t *testing.T) {
if err := NewBackupCommand().Run("-host", "http://%f"); err == nil || err.Error() != `parse host url: parse http://%f: hexadecimal escape in host` {
t.Fatal(err)
}
}

// Ensure the backup command returns an error if the output path is not specified.
func TestBackupCommand_ErrPathRequired(t *testing.T) {
if err := NewBackupCommand().Run("-host", "//localhost"); err == nil || err.Error() != `snapshot path required` {
t.Fatal(err)
}
}

// Ensure the backup returns an error if it cannot connect to the server.
func TestBackupCommand_ErrConnectionRefused(t *testing.T) {
// Start and immediately stop a server so we have a dead port.
s := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {}))
s.Close()

// Execute the backup command.
path := tempfile()
defer os.Remove(path)
if err := NewBackupCommand().Run("-host", s.URL, path); err == nil || !strings.Contains(err.Error(), `connection refused`) {
t.Fatal(err)
}
}

// Ensure the backup returns any non-200 status codes.
func TestBackupCommand_ErrServerError(t *testing.T) {
s := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusInternalServerError)
}))
defer s.Close()

// Execute the backup command.
path := tempfile()
defer os.Remove(path)
if err := NewBackupCommand().Run("-host", s.URL, path); err == nil || err.Error() != `download: snapshot error: status=500` {
t.Fatal(err)
}
}

// BackupCommand is a test wrapper for main.BackupCommand.
type BackupCommand struct {
*main.BackupCommand
Stderr bytes.Buffer
}

// NewBackupCommand returns a new instance of BackupCommand.
func NewBackupCommand() *BackupCommand {
cmd := &BackupCommand{BackupCommand: main.NewBackupCommand()}
cmd.BackupCommand.Stderr = &cmd.Stderr
return cmd
}
22 changes: 22 additions & 0 deletions cmd/influxd/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,12 @@ const (
// DefaultDataPort represents the default port the data server runs on.
DefaultDataPort = 8086

// DefaultSnapshotBindAddress is the default bind address to serve snapshots from.
DefaultSnapshotBindAddress = "127.0.0.1"

// DefaultSnapshotPort is the default port to serve snapshots from.
DefaultSnapshotPort = 8087

// DefaultJoinURLs represents the default URLs for joining a cluster.
DefaultJoinURLs = ""

Expand All @@ -49,6 +55,11 @@ const (
DefaultGraphiteDatabaseName = "graphite"
)

var DefaultSnapshotURL = url.URL{
Scheme: "http",
Host: net.JoinHostPort(DefaultSnapshotBindAddress, strconv.Itoa(DefaultSnapshotPort)),
}

// Config represents the configuration format for the influxd binary.
type Config struct {
Hostname string `toml:"hostname"`
Expand Down Expand Up @@ -101,6 +112,10 @@ type Config struct {
RetentionCreatePeriod Duration `toml:"retention-create-period"`
} `toml:"data"`

Snapshot struct {
BindAddress string `toml:"bind-address"`
Port int `toml:"port"`
}
Cluster struct {
Dir string `toml:"dir"`
} `toml:"cluster"`
Expand Down Expand Up @@ -168,6 +183,8 @@ func NewConfig() (*Config, error) {
c.Data.RetentionCheckEnabled = true
c.Data.RetentionCheckPeriod = Duration(10 * time.Minute)
c.Data.RetentionCreatePeriod = Duration(DefaultRetentionCreatePeriod)
c.Snapshot.BindAddress = DefaultSnapshotBindAddress
c.Snapshot.Port = DefaultSnapshotPort
c.Admin.Enabled = true
c.Admin.Port = 8083
c.ContinuousQuery.RecomputePreviousN = 2
Expand Down Expand Up @@ -215,6 +232,11 @@ func (c *Config) DataURL() url.URL {
}
}

// SnapshotAddr returns the TCP binding address for the snapshot handler.
func (c *Config) SnapshotAddr() string {
return net.JoinHostPort(c.Snapshot.BindAddress, strconv.Itoa(c.Snapshot.Port))
}

// BrokerAddr returns the binding address the Broker server
func (c *Config) BrokerAddr() string {
return fmt.Sprintf("%s:%d", c.BindAddress, c.Broker.Port)
Expand Down
Loading

0 comments on commit 5e47ed1

Please sign in to comment.