diff --git a/cmd/influxd/backup.go b/cmd/influxd/backup.go new file mode 100644 index 00000000000..b8de924152d --- /dev/null +++ b/cmd/influxd/backup.go @@ -0,0 +1,182 @@ +package main + +import ( + "bytes" + "encoding/json" + "flag" + "fmt" + "io" + "log" + "net/http" + "net/url" + "os" + + "github.com/influxdb/influxdb" +) + +// BackupSuffix is a suffix added to the backup while it's in-process. +const BackupSuffix = ".pending" + +// BackupCommand represents the program execution for "influxd backup". +type BackupCommand struct { + // The logger passed to the ticker during execution. + Logger *log.Logger + + // Standard input/output, overridden for testing. + Stderr io.Writer +} + +// NewBackupCommand returns a new instance of BackupCommand with default settings. +func NewBackupCommand() *BackupCommand { + return &BackupCommand{ + Stderr: os.Stderr, + } +} + +// Run excutes the program. +func (cmd *BackupCommand) Run(args ...string) error { + // Set up logger. + cmd.Logger = log.New(cmd.Stderr, "", log.LstdFlags) + cmd.Logger.Printf("influxdb backup, version %s, commit %s", version, commit) + + // Parse command line arguments. + u, path, err := cmd.parseFlags(args) + if err != nil { + return err + } + + // Retrieve snapshot from local file. + ss, err := influxdb.ReadFileSnapshot(path) + if err != nil && !os.IsNotExist(err) { + return fmt.Errorf("read file snapshot: %s", err) + } + + // Determine temporary path to download to. + tmppath := path + BackupSuffix + + // Calculate path of next backup file. + // This uses the path if it doesn't exist. + // Otherwise it appends an autoincrementing number. + path, err = cmd.nextPath(path) + if err != nil { + return fmt.Errorf("next path: %s", err) + } + + // Retrieve snapshot. + if err := cmd.download(u, ss, tmppath); err != nil { + return fmt.Errorf("download: %s", err) + } + + // Rename temporary file to final path. + if err := os.Rename(tmppath, path); err != nil { + return fmt.Errorf("rename: %s", err) + } + + // TODO: Check file integrity. + + // Notify user of completion. + cmd.Logger.Println("backup complete") + + return nil +} + +// parseFlags parses and validates the command line arguments. +func (cmd *BackupCommand) parseFlags(args []string) (url.URL, string, error) { + fs := flag.NewFlagSet("", flag.ContinueOnError) + host := fs.String("host", DefaultSnapshotURL.String(), "") + fs.SetOutput(cmd.Stderr) + fs.Usage = cmd.printUsage + if err := fs.Parse(args); err != nil { + return url.URL{}, "", err + } + + // Parse host. + u, err := url.Parse(*host) + if err != nil { + return url.URL{}, "", fmt.Errorf("parse host url: %s", err) + } + + // Require output path. + path := fs.Arg(0) + if path == "" { + return url.URL{}, "", fmt.Errorf("snapshot path required") + } + + return *u, path, nil +} + +// nextPath returns the next file to write to. +func (cmd *BackupCommand) nextPath(path string) (string, error) { + // Use base path if it doesn't exist. + if _, err := os.Stat(path); os.IsNotExist(err) { + return path, nil + } else if err != nil { + return "", err + } + + // Otherwise iterate through incremental files until one is available. + for i := 0; ; i++ { + s := fmt.Sprintf(path+".%d", i) + if _, err := os.Stat(s); os.IsNotExist(err) { + return s, nil + } else if err != nil { + return "", err + } + } +} + +// download downloads a snapshot from a host to a given path. +func (cmd *BackupCommand) download(u url.URL, ss *influxdb.Snapshot, path string) error { + // Create local file to write to. + f, err := os.Create(path) + if err != nil { + return fmt.Errorf("open temp file: %s", err) + } + defer f.Close() + + // Encode snapshot. + var buf bytes.Buffer + if ss != nil { + if err := json.NewEncoder(&buf).Encode(ss); err != nil { + return fmt.Errorf("encode snapshot: %s", err) + } + } + + // Create request with existing snapshot as the body. + u.Path = "/snapshot" + req, err := http.NewRequest("GET", u.String(), &buf) + if err != nil { + return fmt.Errorf("new request: %s", err) + } + + // Fetch the archive from the server. + resp, err := http.DefaultClient.Do(req) + if err != nil { + return fmt.Errorf("get: %s", err) + } + defer func() { _ = resp.Body.Close() }() + + // Check the status code. + if resp.StatusCode != http.StatusOK { + return fmt.Errorf("snapshot error: status=%d", resp.StatusCode) + } + + // Write the archive to disk. + if _, err := io.Copy(f, resp.Body); err != nil { + fmt.Errorf("write snapshot: %s", err) + } + + return nil +} + +// printUsage prints the usage message to STDERR. +func (cmd *BackupCommand) printUsage() { + fmt.Fprintf(cmd.Stderr, `usage: influxd backup [flags] PATH + +backup downloads a snapshot of a data node and saves it to disk. + + -host + The host to connect to snapshot. + Defaults to http://127.0.0.1:8087. +`) +} diff --git a/cmd/influxd/backup_test.go b/cmd/influxd/backup_test.go new file mode 100644 index 00000000000..e6a946b3d14 --- /dev/null +++ b/cmd/influxd/backup_test.go @@ -0,0 +1,122 @@ +package main_test + +import ( + "bytes" + "net/http" + "net/http/httptest" + "os" + "strings" + "testing" + + "github.com/influxdb/influxdb" + "github.com/influxdb/influxdb/cmd/influxd" +) + +// Ensure the backup can download from the server and save to disk. +func TestBackupCommand(t *testing.T) { + // Mock the backup endpoint. + s := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.URL.Path != "/snapshot" { + t.Fatalf("unexpected url path: %s", r.URL.Path) + } + + // Write a simple snapshot to the buffer. + sw := influxdb.NewSnapshotWriter() + sw.Snapshot = &influxdb.Snapshot{Files: []influxdb.SnapshotFile{ + {Name: "meta", Size: 5, Index: 10}, + }} + sw.FileWriters["meta"] = influxdb.NopWriteToCloser(bytes.NewBufferString("55555")) + if _, err := sw.WriteTo(w); err != nil { + t.Fatal(err) + } + })) + defer s.Close() + + // Create a temp path and remove incremental backups at the end. + path := tempfile() + defer os.Remove(path) + defer os.Remove(path) + defer os.Remove(path) + + // Execute the backup against the mock server. + for i := 0; i < 3; i++ { + if err := NewBackupCommand().Run("-host", s.URL, path); err != nil { + t.Fatal(err) + } + } + + // Verify snapshot and two incremental snapshots were written. + if _, err := os.Stat(path); err != nil { + t.Fatalf("snapshot not found: %s", err) + } else if _, err = os.Stat(path + ".0"); err != nil { + t.Fatalf("incremental snapshot(0) not found: %s", err) + } else if _, err = os.Stat(path + ".1"); err != nil { + t.Fatalf("incremental snapshot(1) not found: %s", err) + } +} + +// Ensure the backup command returns an error if flags cannot be parsed. +func TestBackupCommand_ErrFlagParse(t *testing.T) { + cmd := NewBackupCommand() + if err := cmd.Run("-bad-flag"); err == nil || err.Error() != `flag provided but not defined: -bad-flag` { + t.Fatal(err) + } else if !strings.Contains(cmd.Stderr.String(), "usage") { + t.Fatal("usage message not displayed") + } +} + +// Ensure the backup command returns an error if the host cannot be parsed. +func TestBackupCommand_ErrInvalidHostURL(t *testing.T) { + if err := NewBackupCommand().Run("-host", "http://%f"); err == nil || err.Error() != `parse host url: parse http://%f: hexadecimal escape in host` { + t.Fatal(err) + } +} + +// Ensure the backup command returns an error if the output path is not specified. +func TestBackupCommand_ErrPathRequired(t *testing.T) { + if err := NewBackupCommand().Run("-host", "//localhost"); err == nil || err.Error() != `snapshot path required` { + t.Fatal(err) + } +} + +// Ensure the backup returns an error if it cannot connect to the server. +func TestBackupCommand_ErrConnectionRefused(t *testing.T) { + // Start and immediately stop a server so we have a dead port. + s := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {})) + s.Close() + + // Execute the backup command. + path := tempfile() + defer os.Remove(path) + if err := NewBackupCommand().Run("-host", s.URL, path); err == nil || !strings.Contains(err.Error(), `connection refused`) { + t.Fatal(err) + } +} + +// Ensure the backup returns any non-200 status codes. +func TestBackupCommand_ErrServerError(t *testing.T) { + s := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusInternalServerError) + })) + defer s.Close() + + // Execute the backup command. + path := tempfile() + defer os.Remove(path) + if err := NewBackupCommand().Run("-host", s.URL, path); err == nil || err.Error() != `download: snapshot error: status=500` { + t.Fatal(err) + } +} + +// BackupCommand is a test wrapper for main.BackupCommand. +type BackupCommand struct { + *main.BackupCommand + Stderr bytes.Buffer +} + +// NewBackupCommand returns a new instance of BackupCommand. +func NewBackupCommand() *BackupCommand { + cmd := &BackupCommand{BackupCommand: main.NewBackupCommand()} + cmd.BackupCommand.Stderr = &cmd.Stderr + return cmd +} diff --git a/cmd/influxd/config.go b/cmd/influxd/config.go index 926c43935a1..162f4cf2189 100644 --- a/cmd/influxd/config.go +++ b/cmd/influxd/config.go @@ -38,6 +38,12 @@ const ( // DefaultDataPort represents the default port the data server runs on. DefaultDataPort = 8086 + // DefaultSnapshotBindAddress is the default bind address to serve snapshots from. + DefaultSnapshotBindAddress = "127.0.0.1" + + // DefaultSnapshotPort is the default port to serve snapshots from. + DefaultSnapshotPort = 8087 + // DefaultJoinURLs represents the default URLs for joining a cluster. DefaultJoinURLs = "" @@ -49,6 +55,11 @@ const ( DefaultGraphiteDatabaseName = "graphite" ) +var DefaultSnapshotURL = url.URL{ + Scheme: "http", + Host: net.JoinHostPort(DefaultSnapshotBindAddress, strconv.Itoa(DefaultSnapshotPort)), +} + // Config represents the configuration format for the influxd binary. type Config struct { Hostname string `toml:"hostname"` @@ -101,6 +112,10 @@ type Config struct { RetentionCreatePeriod Duration `toml:"retention-create-period"` } `toml:"data"` + Snapshot struct { + BindAddress string `toml:"bind-address"` + Port int `toml:"port"` + } Cluster struct { Dir string `toml:"dir"` } `toml:"cluster"` @@ -168,6 +183,8 @@ func NewConfig() (*Config, error) { c.Data.RetentionCheckEnabled = true c.Data.RetentionCheckPeriod = Duration(10 * time.Minute) c.Data.RetentionCreatePeriod = Duration(DefaultRetentionCreatePeriod) + c.Snapshot.BindAddress = DefaultSnapshotBindAddress + c.Snapshot.Port = DefaultSnapshotPort c.Admin.Enabled = true c.Admin.Port = 8083 c.ContinuousQuery.RecomputePreviousN = 2 @@ -215,6 +232,11 @@ func (c *Config) DataURL() url.URL { } } +// SnapshotAddr returns the TCP binding address for the snapshot handler. +func (c *Config) SnapshotAddr() string { + return net.JoinHostPort(c.Snapshot.BindAddress, strconv.Itoa(c.Snapshot.Port)) +} + // BrokerAddr returns the binding address the Broker server func (c *Config) BrokerAddr() string { return fmt.Sprintf("%s:%d", c.BindAddress, c.Broker.Port) diff --git a/cmd/influxd/main.go b/cmd/influxd/main.go index 409af0c65a3..284a48bf102 100644 --- a/cmd/influxd/main.go +++ b/cmd/influxd/main.go @@ -4,11 +4,13 @@ import ( "flag" "fmt" "log" + "math/rand" "os" "os/signal" "runtime" "runtime/pprof" "strings" + "time" ) const logo = ` @@ -36,6 +38,7 @@ const ( func main() { log.SetFlags(0) + rand.Seed(time.Now().UnixNano()) // If commit not set, make that clear. if commit == "" { @@ -68,6 +71,16 @@ func main() { execRun(args[1:]) case "": execRun(args) + case "backup": + cmd := NewBackupCommand() + if err := cmd.Run(args[1:]...); err != nil { + log.Fatalf("backup: %s", err) + } + case "restore": + cmd := NewRestoreCommand() + if err := cmd.Run(args[1:]...); err != nil { + log.Fatalf("restore: %s", err) + } case "version": execVersion(args[1:]) case "config": @@ -104,10 +117,13 @@ func execRun(args []string) { log.SetFlags(log.LstdFlags) writePIDFile(*pidPath) - if *configPath == "" { + // Parse configuration file from disk. + config, err := parseConfig(*configPath, *hostname) + if err != nil { + log.Fatal(err) + } else if *configPath == "" { log.Println("No config provided, using default settings") } - config := parseConfig(*configPath, *hostname) // Create a logging writer. logWriter := os.Stderr @@ -162,7 +178,10 @@ func execConfig(args []string) { ) fs.Parse(args) - config := parseConfig(*configPath, *hostname) + config, err := parseConfig(*configPath, *hostname) + if err != nil { + log.Fatalf("parse config: %s", err) + } config.Write(os.Stdout) } @@ -240,3 +259,6 @@ func stopProfiling() { prof.mem.Close() } } + +func warn(v ...interface{}) { fmt.Fprintln(os.Stderr, v...) } +func warnf(msg string, v ...interface{}) { fmt.Fprintf(os.Stderr, msg+"\n", v...) } diff --git a/cmd/influxd/restore.go b/cmd/influxd/restore.go new file mode 100644 index 00000000000..b6892f04569 --- /dev/null +++ b/cmd/influxd/restore.go @@ -0,0 +1,265 @@ +package main + +import ( + "encoding/binary" + "flag" + "fmt" + "io" + "io/ioutil" + "log" + "math/rand" + "net/url" + "os" + "path/filepath" + "time" + + "github.com/boltdb/bolt" + "github.com/influxdb/influxdb" + "github.com/influxdb/influxdb/raft" +) + +// RestoreCommand represents the program execution for "influxd restore". +type RestoreCommand struct { + // The logger passed to the ticker during execution. + Logger *log.Logger + + // Standard input/output, overridden for testing. + Stderr io.Writer +} + +// NewRestoreCommand returns a new instance of RestoreCommand with default settings. +func NewRestoreCommand() *RestoreCommand { + return &RestoreCommand{ + Stderr: os.Stderr, + } +} + +// Run excutes the program. +func (cmd *RestoreCommand) Run(args ...string) error { + // Set up logger. + cmd.Logger = log.New(cmd.Stderr, "", log.LstdFlags) + cmd.Logger.Printf("influxdb restore, version %s, commit %s", version, commit) + + // Parse command line arguments. + config, path, err := cmd.parseFlags(args) + if err != nil { + return err + } + + // Remove broker & data directories. + if err := os.RemoveAll(config.BrokerDir()); err != nil { + return fmt.Errorf("remove broker dir: %s", err) + } else if err := os.RemoveAll(config.DataDir()); err != nil { + return fmt.Errorf("remove data dir: %s", err) + } + + // Open snapshot file and all incremental backups. + ssr, files, err := influxdb.OpenFileSnapshotsReader(path) + if err != nil { + return fmt.Errorf("open: %s", err) + } + defer closeAll(files) + + // Extract manifest. + ss, err := ssr.Snapshot() + if err != nil { + return fmt.Errorf("snapshot: %s", err) + } + + // Unpack snapshot files into data directory. + if err := cmd.unpack(config.DataDir(), ssr); err != nil { + return fmt.Errorf("unpack: %s", err) + } + + // Generate broker & raft directories from manifest. + if err := cmd.materialize(config.BrokerDir(), ss, config.BrokerURL()); err != nil { + return fmt.Errorf("materialize: %s", err) + } + + // Notify user of completion. + cmd.Logger.Printf("restore complete using %s", path) + + return nil +} + +// parseFlags parses and validates the command line arguments. +func (cmd *RestoreCommand) parseFlags(args []string) (*Config, string, error) { + fs := flag.NewFlagSet("", flag.ContinueOnError) + configPath := fs.String("config", "", "") + fs.SetOutput(cmd.Stderr) + fs.Usage = cmd.printUsage + if err := fs.Parse(args); err != nil { + return nil, "", err + } + + // Parse configuration file from disk. + config, err := parseConfig(*configPath, "") + if err != nil { + log.Fatal(err) + } else if *configPath == "" { + log.Println("No config provided, using default settings") + } + + // Require output path. + path := fs.Arg(0) + if path == "" { + return nil, "", fmt.Errorf("snapshot path required") + } + + return config, path, nil +} + +func closeAll(a []io.Closer) { + for _, c := range a { + _ = c.Close() + } +} + +// unpack expands the files in the snapshot archive into a directory. +func (cmd *RestoreCommand) unpack(path string, ssr *influxdb.SnapshotsReader) error { + // Create root directory. + if err := os.MkdirAll(path, 0777); err != nil { + return fmt.Errorf("mkdir: err=%s", err) + } + + // Loop over files and extract. + for { + // Read entry header. + sf, err := ssr.Next() + if err == io.EOF { + break + } else if err != nil { + return fmt.Errorf("next: entry=%s, err=%s", sf.Name, err) + } + + // Log progress. + cmd.Logger.Printf("unpacking: %s / idx=%d (%d bytes)", sf.Name, sf.Index, sf.Size) + + // Create parent directory for output file. + if err := os.MkdirAll(filepath.Dir(filepath.Join(path, sf.Name)), 0777); err != nil { + return fmt.Errorf("mkdir: entry=%s, err=%s", sf.Name, err) + } + + if err := func() error { + // Create output file. + f, err := os.Create(filepath.Join(path, sf.Name)) + if err != nil { + return fmt.Errorf("create: entry=%s, err=%s", sf.Name, err) + } + defer f.Close() + + // Copy contents from reader. + if _, err := io.CopyN(f, ssr, sf.Size); err != nil { + return fmt.Errorf("copy: entry=%s, err=%s", sf.Name, err) + } + + return nil + }(); err != nil { + return err + } + } + + return nil +} + +// materialize creates broker & raft directories based on the snapshot. +func (cmd *RestoreCommand) materialize(path string, ss *influxdb.Snapshot, u url.URL) error { + // Materialize broker. + if err := cmd.materializeBroker(path, ss.Index()); err != nil { + return fmt.Errorf("broker: %s", err) + } + + // Materialize raft. + if err := cmd.materializeRaft(filepath.Join(path, "raft"), u); err != nil { + return fmt.Errorf("raft: %s", err) + } + + return nil +} + +func (cmd *RestoreCommand) materializeBroker(path string, index uint64) error { + // Create root directory. + if err := os.MkdirAll(path, 0777); err != nil { + return fmt.Errorf("mkdir: err=%s", err) + } + + // Create broker meta store. + meta, err := bolt.Open(filepath.Join(path, "meta"), 0666, &bolt.Options{Timeout: 1 * time.Second}) + if err != nil { + return fmt.Errorf("open broker meta: %s", err) + } + defer meta.Close() + + // Write highest index to meta store. + if err := meta.Update(func(tx *bolt.Tx) error { + b, err := tx.CreateBucket([]byte("meta")) + if err != nil { + return fmt.Errorf("create meta bucket: %s", err) + } + + if err := b.Put([]byte("index"), u64tob(index)); err != nil { + return fmt.Errorf("put: %s", err) + } + return nil + }); err != nil { + return fmt.Errorf("update broker meta: %s", err) + } + + return nil +} + +func (cmd *RestoreCommand) materializeRaft(path string, u url.URL) error { + // Create raft directory. + if err := os.MkdirAll(path, 0777); err != nil { + return fmt.Errorf("mkdir raft: err=%s", err) + } + + // Write raft id & term. + if err := ioutil.WriteFile(filepath.Join(path, "id"), []byte(`1`), 0666); err != nil { + return fmt.Errorf("write raft/id: %s", err) + } + if err := ioutil.WriteFile(filepath.Join(path, "term"), []byte(`1`), 0666); err != nil { + return fmt.Errorf("write raft/term: %s", err) + } + + // Generate configuration. + var rc raft.Config + rc.ClusterID = uint64(rand.Int()) + rc.MaxNodeID = 1 + rc.AddNode(1, u) + + // Marshal config. + f, err := os.Create(filepath.Join(path, "config")) + if err != nil { + return fmt.Errorf("create config: %s", err) + } + defer f.Close() + + // Write config. + if err := raft.NewConfigEncoder(f).Encode(&rc); err != nil { + return fmt.Errorf("encode config: %s", err) + } + + return nil +} + +// printUsage prints the usage message to STDERR. +func (cmd *RestoreCommand) printUsage() { + fmt.Fprintf(cmd.Stderr, `usage: influxd restore [flags] PATH + +restore uses a snapshot of a data node to rebuild a cluster. + + -config + Set the path to the configuration file. +`) +} + +// u64tob converts a uint64 into an 8-byte slice. +func u64tob(v uint64) []byte { + b := make([]byte, 8) + binary.BigEndian.PutUint64(b, v) + return b +} + +// btou64 converts an 8-byte slice into an uint64. +func btou64(b []byte) uint64 { return binary.BigEndian.Uint64(b) } diff --git a/cmd/influxd/restore_test.go b/cmd/influxd/restore_test.go new file mode 100644 index 00000000000..8b073dc4743 --- /dev/null +++ b/cmd/influxd/restore_test.go @@ -0,0 +1,170 @@ +package main_test + +import ( + "bytes" + "fmt" + "io/ioutil" + "os" + "path/filepath" + "reflect" + "testing" + "time" + + "github.com/influxdb/influxdb" + "github.com/influxdb/influxdb/cmd/influxd" +) + +// Ensure the restore command can expand a snapshot and bootstrap a broker. +func TestRestoreCommand(t *testing.T) { + now := time.Now() + + // Create root path to server. + path := tempfile() + defer os.Remove(path) + + // Create a config template that can use different ports. + var configString = fmt.Sprintf(` + [broker] + port=%%d + dir=%q + + [data] + port=%%d + dir = %q + + [snapshot] + port=%%d + `, + filepath.Join(path, "broker"), + filepath.Join(path, "data"), + ) + + // Create configuration file. + configPath := tempfile() + defer os.Remove(configPath) + + // Parse configuration. + MustWriteFile(configPath, []byte(fmt.Sprintf(configString, 8900, 8900, 8901))) + c, err := main.ParseConfigFile(configPath) + if err != nil { + t.Fatalf("parse config: %s", err) + } + + // Start server. + b, s := main.Run(c, "", "x.x", os.Stderr) + if b == nil { + t.Fatal("cannot run broker") + } else if s == nil { + t.Fatal("cannot run server") + } + + // Create data. + if err := s.CreateDatabase("db"); err != nil { + t.Fatalf("cannot create database: %s", err) + } + if index, err := s.WriteSeries("db", "default", []influxdb.Point{{Name: "cpu", Timestamp: now, Fields: map[string]interface{}{"value": float64(100)}}}); err != nil { + t.Fatalf("cannot write series: %s", err) + } else if err = s.Sync(1, index); err != nil { + t.Fatalf("shard sync: %s", err) + } + + // Create snapshot writer. + sw, err := s.CreateSnapshotWriter() + if err != nil { + t.Fatalf("create snapshot writer: %s", err) + } + + // Snapshot to file. + sspath := tempfile() + f, err := os.Create(sspath) + if err != nil { + t.Fatal(err) + } + sw.WriteTo(f) + f.Close() + + // Stop server. + s.Close() + b.Close() + + // Remove data & broker directories. + if err := os.RemoveAll(path); err != nil { + t.Fatalf("remove: %s", err) + } + + // Rewrite config to a new port and re-parse. + MustWriteFile(configPath, []byte(fmt.Sprintf(configString, 8910, 8910, 8911))) + c, err = main.ParseConfigFile(configPath) + if err != nil { + t.Fatalf("parse config: %s", err) + } + + // Execute the restore. + if err := NewRestoreCommand().Run("-config", configPath, sspath); err != nil { + t.Fatal(err) + } + + // Restart server. + b, s = main.Run(c, "", "x.x", os.Stderr) + if b == nil { + t.Fatal("cannot run broker") + } else if s == nil { + t.Fatal("cannot run server") + } + + // Write new data. + if err := s.CreateDatabase("newdb"); err != nil { + t.Fatalf("cannot create new database: %s", err) + } + if index, err := s.WriteSeries("newdb", "default", []influxdb.Point{{Name: "mem", Timestamp: now, Fields: map[string]interface{}{"value": float64(1000)}}}); err != nil { + t.Fatalf("cannot write new series: %s", err) + } else if err = s.Sync(2, index); err != nil { + t.Fatalf("shard sync: %s", err) + } + + // Read series data. + if v, err := s.ReadSeries("db", "default", "cpu", nil, now); err != nil { + t.Fatal(err) + } else if !reflect.DeepEqual(v, map[string]interface{}{"value": float64(100)}) { + t.Fatalf("read series(0) mismatch: %#v", v) + } + + // Read new series data. + if v, err := s.ReadSeries("newdb", "default", "mem", nil, now); err != nil { + t.Fatal(err) + } else if !reflect.DeepEqual(v, map[string]interface{}{"value": float64(1000)}) { + t.Fatalf("read series(1) mismatch: %#v", v) + } +} + +// RestoreCommand is a test wrapper for main.RestoreCommand. +type RestoreCommand struct { + *main.RestoreCommand + Stderr bytes.Buffer +} + +// NewRestoreCommand returns a new instance of RestoreCommand. +func NewRestoreCommand() *RestoreCommand { + cmd := &RestoreCommand{RestoreCommand: main.NewRestoreCommand()} + cmd.RestoreCommand.Stderr = &cmd.Stderr + return cmd +} + +// MustReadFile reads data from a file. Panic on error. +func MustReadFile(filename string) []byte { + b, err := ioutil.ReadFile(filename) + if err != nil { + panic(err.Error()) + } + return b +} + +// MustWriteFile writes data to a file. Panic on error. +func MustWriteFile(filename string, data []byte) { + if err := os.MkdirAll(filepath.Dir(filename), 0777); err != nil { + panic(err.Error()) + } + if err := ioutil.WriteFile(filename, data, 0666); err != nil { + panic(err.Error()) + } +} diff --git a/cmd/influxd/run.go b/cmd/influxd/run.go index 935b413936d..160957d1fe8 100644 --- a/cmd/influxd/run.go +++ b/cmd/influxd/run.go @@ -117,6 +117,17 @@ func Run(config *Config, join, version string, logWriter *os.File) (*messaging.B } log.Printf("data node #%d listening on %s", s.ID(), config.DataAddr()) + // Start snapshot handler. + go func() { + log.Fatal(http.ListenAndServe( + config.SnapshotAddr(), + &httpd.SnapshotHandler{ + CreateSnapshotWriter: s.CreateSnapshotWriter, + }, + )) + }() + log.Printf("snapshot endpoint listening on %s", config.SnapshotAddr()) + // Start the admin interface on the default port if config.Admin.Enabled { port := fmt.Sprintf(":%d", config.Admin.Port) @@ -227,21 +238,20 @@ func writePIDFile(path string) { } } -// parses the configuration from a given path. Sets overrides as needed. -func parseConfig(path, hostname string) *Config { +// parseConfig parses the configuration from a given path. Sets overrides as needed. +func parseConfig(path, hostname string) (*Config, error) { if path == "" { c, err := NewConfig() if err != nil { - log.Fatalf("failed to generate default config: %s. Please supply an explicit configuration file", - err.Error()) + return nil, fmt.Errorf("failed to generate default config: %s. Please supply an explicit configuration file", err.Error()) } - return c + return c, nil } // Parse configuration. config, err := ParseConfigFile(path) if err != nil { - log.Fatalf("config: %s", err) + return nil, fmt.Errorf("config: %s", err) } // Override config properties. @@ -249,7 +259,7 @@ func parseConfig(path, hostname string) *Config { config.Hostname = hostname } - return config + return config, nil } // creates and initializes a broker. diff --git a/cmd/influxd/server_integration_test.go b/cmd/influxd/server_integration_test.go index 25eead9951a..7f8b10452ad 100644 --- a/cmd/influxd/server_integration_test.go +++ b/cmd/influxd/server_integration_test.go @@ -98,6 +98,7 @@ func createCombinedNodeCluster(t *testing.T, testName, tmpDir string, nNodes, ba c.Data.Dir = filepath.Join(tmpDataDir, strconv.Itoa(basePort)) c.Broker.Port = basePort c.Data.Port = basePort + c.Snapshot.Port = basePort + 1 c.Admin.Enabled = false c.ReportingDisabled = true @@ -1455,3 +1456,6 @@ func mustMarshalJSON(v interface{}) string { return string(b) } + +func warn(v ...interface{}) { fmt.Fprintln(os.Stderr, v...) } +func warnf(msg string, v ...interface{}) { fmt.Fprintf(os.Stderr, msg+"\n", v...) } diff --git a/httpd/handler.go b/httpd/handler.go index 04773be3fbf..ac75dcb11b4 100644 --- a/httpd/handler.go +++ b/httpd/handler.go @@ -771,3 +771,34 @@ func recovery(inner http.Handler, name string, weblog *log.Logger) http.Handler } }) } + +// SnapshotHandler streams out a snapshot from the server. +type SnapshotHandler struct { + CreateSnapshotWriter func() (*influxdb.SnapshotWriter, error) +} + +func (h *SnapshotHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { + // Read in previous snapshot from request body. + var prev influxdb.Snapshot + if err := json.NewDecoder(r.Body).Decode(&prev); err != nil && err != io.EOF { + httpError(w, "error reading previous snapshot: "+err.Error(), false, http.StatusBadRequest) + return + } + + // Retrieve a snapshot from the server. + sw, err := h.CreateSnapshotWriter() + if err != nil { + httpError(w, "error creating snapshot writer: "+err.Error(), false, http.StatusInternalServerError) + return + } + defer sw.Close() + + // Subtract existing snapshot from writer. + sw.Snapshot = sw.Snapshot.Diff(&prev) + + // Write to response. + if _, err := sw.WriteTo(w); err != nil { + httpError(w, "error writing snapshot: "+err.Error(), false, http.StatusInternalServerError) + return + } +} diff --git a/httpd/handler_test.go b/httpd/handler_test.go index 32d7cc50426..aa0e8c3a463 100644 --- a/httpd/handler_test.go +++ b/httpd/handler_test.go @@ -1,6 +1,7 @@ package httpd_test import ( + "archive/tar" "bytes" "encoding/base64" "encoding/json" @@ -1619,6 +1620,55 @@ func TestHandler_ProcessContinousQueries(t *testing.T) { } } +// Ensure the snapshot handler can write a snapshot as a tar archive over HTTP. +func TestSnapshotHandler(t *testing.T) { + // Create handler and mock the snapshot creator. + var h httpd.SnapshotHandler + h.CreateSnapshotWriter = func() (*influxdb.SnapshotWriter, error) { + return &influxdb.SnapshotWriter{ + Snapshot: &influxdb.Snapshot{ + Files: []influxdb.SnapshotFile{ + {Name: "meta", Size: 5, Index: 12}, + {Name: "shards/1", Size: 6, Index: 15}, + }, + }, + FileWriters: map[string]influxdb.SnapshotFileWriter{ + "meta": influxdb.NopWriteToCloser(bytes.NewBufferString("55555")), + "shards/1": influxdb.NopWriteToCloser(bytes.NewBufferString("666666")), + }, + }, nil + } + + // Execute handler with an existing snapshot to diff. + // The "shards/1" has a higher index in the diff so it won't be included in the snapshot. + w := httptest.NewRecorder() + r, _ := http.NewRequest( + "GET", "http://localhost/snapshot", + strings.NewReader(`{"files":[{"name":"meta","index":10},{"name":"shards/1","index":20}]}`), + ) + h.ServeHTTP(w, r) + + // Verify status code is successful and the snapshot was written. + if w.Code != http.StatusOK { + t.Fatalf("unexpected status: %d", w.Code) + } else if w.Body == nil { + t.Fatal("body not written") + } + + // Read snapshot. + tr := tar.NewReader(w.Body) + if hdr, err := tr.Next(); err != nil { + t.Fatal(err) + } else if hdr.Name != "manifest" { + t.Fatalf("unexpected snapshot file: %s", hdr.Name) + } + if b, err := ioutil.ReadAll(tr); err != nil { + t.Fatal(err) + } else if string(b) != `{"files":[{"name":"meta","size":5,"index":12}]}` { + t.Fatalf("unexpected manifest: %s", b) + } +} + // batchWrite JSON Unmarshal tests // Utility functions for this test suite. diff --git a/messaging/broker.go b/messaging/broker.go index e26745c1a84..b3f45a0be6a 100644 --- a/messaging/broker.go +++ b/messaging/broker.go @@ -928,7 +928,7 @@ func ReadSegmentByIndex(path string, index uint64) (*Segment, error) { } else if index == 0 { return segments[0], nil } else if index < segments[0].Index { - return nil, ErrSegmentReclaimed + return segments[0], nil } // Find segment that contains index. diff --git a/messaging/broker_test.go b/messaging/broker_test.go index f524af1dbf5..d0ae7765e9c 100644 --- a/messaging/broker_test.go +++ b/messaging/broker_test.go @@ -407,7 +407,6 @@ func TestReadSegmentByIndex(t *testing.T) { {index: 19, segmentIndex: 12}, {index: 20, segmentIndex: 20}, {index: 21, segmentIndex: 20}, - {index: 5, segmentIndex: 6, err: messaging.ErrSegmentReclaimed}, } { segment, err := messaging.ReadSegmentByIndex(path, tt.index) if tt.err != nil { diff --git a/raft/log.go b/raft/log.go index e61cb343d53..f9f4c198d50 100644 --- a/raft/log.go +++ b/raft/log.go @@ -293,7 +293,7 @@ func (l *Log) Open(path string) error { c, err := l.readConfig() if err != nil { _ = l.close() - return err + return fmt.Errorf("read config: %s", err) } l.config = c @@ -439,18 +439,17 @@ func (l *Log) writeTerm(term uint64) error { func (l *Log) readConfig() (*Config, error) { // Read config from disk. f, err := os.Open(l.configPath()) - if err != nil && !os.IsNotExist(err) { + if os.IsNotExist(err) { + return nil, nil + } else if err != nil { return nil, err } defer func() { _ = f.Close() }() // Marshal file to a config type. - var config *Config - if f != nil { - config = &Config{} - if err := NewConfigDecoder(f).Decode(config); err != nil { - return nil, err - } + config := &Config{} + if err := NewConfigDecoder(f).Decode(config); err != nil { + return nil, err } return config, nil } diff --git a/server.go b/server.go index eca0105d448..0cdc92d090a 100644 --- a/server.go +++ b/server.go @@ -555,14 +555,34 @@ func (s *Server) broadcast(typ messaging.MessageType, c interface{}) (uint64, er } // Wait for the server to receive the message. - err = s.Sync(index) + err = s.Sync(BroadcastTopicID, index) return index, err } // Sync blocks until a given index (or a higher index) has been applied. // Returns any error associated with the command. -func (s *Server) Sync(index uint64) error { +func (s *Server) Sync(topicID, index uint64) error { + // Sync to the broadcast topic if specified. + if topicID == 0 { + return s.syncBroadcast(index) + } + + // Otherwise retrieve shard by id. + s.mu.RLock() + sh := s.shards[topicID] + s.mu.RUnlock() + + // Return error if there is no shard. + if sh == nil || sh.store == nil { + return errors.New("shard not owned") + } + + return sh.sync(index) +} + +// syncBroadcast syncs the broadcast topic. +func (s *Server) syncBroadcast(index uint64) error { for { // Check if index has occurred. If so, retrieve the error and return. s.mu.RLock() @@ -3833,3 +3853,10 @@ func (s *Server) reportStats(clusterID uint64) { client := http.Client{Timeout: time.Duration(5 * time.Second)} go client.Post("http://m.influxdb.com:8086/db/reporting/series?u=reporter&p=influxdb", "application/json", data) } + +// CreateSnapshotWriter returns a writer for the current snapshot. +func (s *Server) CreateSnapshotWriter() (*SnapshotWriter, error) { + s.mu.RLock() + defer s.mu.RUnlock() + return createServerSnapshotWriter(s) +} diff --git a/server_test.go b/server_test.go index 665b57f413a..2a62597c199 100644 --- a/server_test.go +++ b/server_test.go @@ -1725,6 +1725,50 @@ func TestServer_RunContinuousQueries(t *testing.T) { verify(3, `{"series":[{"name":"cpu_region","tags":{"region":"us-east"},"columns":["time","mean"],"values":[["1970-01-01T00:00:00Z",25]]},{"name":"cpu_region","tags":{"region":"us-west"},"columns":["time","mean"],"values":[["1970-01-01T00:00:00Z",75]]}]}`) } +// Ensure the server can create a snapshot writer. +func TestServer_CreateSnapshotWriter(t *testing.T) { + c := test.NewMessagingClient() + s := OpenServer(c) + defer s.Close() + + // Write metadata. + s.CreateDatabase("db") + s.CreateRetentionPolicy("db", &influxdb.RetentionPolicy{Name: "raw", Duration: 1 * time.Hour}) + s.CreateUser("susy", "pass", false) + + // Write one point. + index, err := s.WriteSeries("db", "raw", []influxdb.Point{{Name: "cpu", Timestamp: mustParseTime("2000-01-01T00:00:00Z"), Fields: map[string]interface{}{"value": float64(100)}}}) + if err != nil { + t.Fatal(err) + } + time.Sleep(1 * time.Second) // FIX: Sync on shard. + + // Create snapshot writer. + sw, err := s.CreateSnapshotWriter() + if err != nil { + t.Fatal(err) + } + defer sw.Close() + + // Verify snapshot is correct. + // + // NOTE: Sizes and indices here are subject to change. + // They are tracked here so that we can see when they change over time. + if len(sw.Snapshot.Files) != 2 { + t.Fatalf("unexpected file count: %d", len(sw.Snapshot.Files)) + } else if !reflect.DeepEqual(sw.Snapshot.Files[0], influxdb.SnapshotFile{Name: "meta", Size: 45056, Index: 6}) { + t.Fatalf("unexpected file(0): %#v", sw.Snapshot.Files[0]) + } else if !reflect.DeepEqual(sw.Snapshot.Files[1], influxdb.SnapshotFile{Name: "shards/1", Size: 24576, Index: index}) { + t.Fatalf("unexpected file(1): %#v", sw.Snapshot.Files[1]) + } + + // Write to buffer to verify that it does not error or panic. + var buf bytes.Buffer + if _, err := sw.WriteTo(&buf); err != nil { + t.Fatal(err) + } +} + func mustMarshalJSON(v interface{}) string { b, err := json.Marshal(v) if err != nil { diff --git a/shard.go b/shard.go index 813349b7073..bfa5a47f9dd 100644 --- a/shard.go +++ b/shard.go @@ -94,15 +94,12 @@ func (s *Shard) open(path string, conn MessagingConn) error { s.store = store // Initialize store. - s.index = 0 if err := s.store.Update(func(tx *bolt.Tx) error { + _, _ = tx.CreateBucketIfNotExists([]byte("meta")) _, _ = tx.CreateBucketIfNotExists([]byte("values")) // Find highest replicated index. - b, _ := tx.CreateBucketIfNotExists([]byte("meta")) - if buf := b.Get([]byte("index")); len(buf) > 0 { - s.index = btou64(buf) - } + s.index = shardMetaIndex(tx) // Open connection. if err := conn.Open(s.index, true); err != nil { @@ -123,6 +120,15 @@ func (s *Shard) open(path string, conn MessagingConn) error { return nil } +// shardMetaIndex returns the index from the "meta" bucket on a transaction. +func shardMetaIndex(tx *bolt.Tx) uint64 { + var index uint64 + if buf := tx.Bucket([]byte("meta")).Get([]byte("index")); len(buf) > 0 { + index = btou64(buf) + } + return index +} + // close shuts down the shard's store. func (s *Shard) close() error { // Wait for goroutines to stop. @@ -139,6 +145,19 @@ func (s *Shard) close() error { return nil } +// sync returns after a given index has been reached. +func (s *Shard) sync(index uint64) error { + for { + // Check if index has occurred. + if s.index >= index { + return nil + } + + // Otherwise wait momentarily and check again. + time.Sleep(1 * time.Millisecond) + } +} + // HasDataNodeID return true if the data node owns the shard. func (s *Shard) HasDataNodeID(id uint64) bool { for _, dataNodeID := range s.DataNodeIDs { diff --git a/snapshot.go b/snapshot.go new file mode 100644 index 00000000000..4ef85e510d9 --- /dev/null +++ b/snapshot.go @@ -0,0 +1,641 @@ +package influxdb + +import ( + "archive/tar" + "encoding/json" + "fmt" + "io" + "os" + "path" + "path/filepath" + "sort" + "time" + + "github.com/boltdb/bolt" +) + +// manifestName is the name of the manifest file in the snapshot. +const manifestName = "manifest" + +// Snapshot represents the state of the Server at a given time. +type Snapshot struct { + Files []SnapshotFile `json:"files"` +} + +// Index returns the highest index across all files. +func (s *Snapshot) Index() uint64 { + var index uint64 + for _, f := range s.Files { + if f.Index > index { + index = f.Index + } + } + return index +} + +// Diff returns a Snapshot of files that are newer in s than other. +func (s *Snapshot) Diff(other *Snapshot) *Snapshot { + diff := &Snapshot{} + + // Find versions of files that are newer in s. +loop: + for _, a := range s.Files { + // Try to find a newer version of the file in other. + // If found then don't append this file and move to the next file. + for _, b := range other.Files { + if a.Name != b.Name { + continue + } else if a.Index <= b.Index { + continue loop + } else { + break + } + } + + // Append the newest version. + diff.Files = append(diff.Files, a) + } + + // Sort files. + sort.Sort(SnapshotFiles(diff.Files)) + + return diff +} + +// Merge returns a Snapshot that combines s with other. +// Only the newest file between the two snapshots is returned. +func (s *Snapshot) Merge(other *Snapshot) *Snapshot { + ret := &Snapshot{} + ret.Files = make([]SnapshotFile, len(s.Files)) + copy(ret.Files, s.Files) + + // Update/insert versions of files that are newer in other. +loop: + for _, a := range other.Files { + for i, b := range ret.Files { + // Ignore if it doesn't match. + if a.Name != b.Name { + continue + } + + // Update if it's newer and then start the next file. + if a.Index > b.Index { + ret.Files[i] = a + } + continue loop + } + + // If the file wasn't found then append it. + ret.Files = append(ret.Files, a) + } + + // Sort files. + sort.Sort(SnapshotFiles(ret.Files)) + + return ret +} + +// SnapshotFile represents a single file in a Snapshot. +type SnapshotFile struct { + Name string `json:"name"` // filename + Size int64 `json:"size"` // file size + Index uint64 `json:"index"` // highest index applied +} + +// SnapshotFiles represents a sortable list of snapshot files. +type SnapshotFiles []SnapshotFile + +func (p SnapshotFiles) Len() int { return len(p) } +func (p SnapshotFiles) Less(i, j int) bool { return p[i].Name < p[j].Name } +func (p SnapshotFiles) Swap(i, j int) { p[i], p[j] = p[j], p[i] } + +// SnapshotReader reads a snapshot from a Reader. +// This type is not safe for concurrent use. +type SnapshotReader struct { + tr *tar.Reader + snapshot *Snapshot +} + +// NewSnapshotReader returns a new SnapshotReader reading from r. +func NewSnapshotReader(r io.Reader) *SnapshotReader { + return &SnapshotReader{ + tr: tar.NewReader(r), + } +} + +// Snapshot returns the snapshot meta data. +func (sr *SnapshotReader) Snapshot() (*Snapshot, error) { + if err := sr.readSnapshot(); err != nil { + return nil, err + } + return sr.snapshot, nil +} + +// readSnapshot reads the first entry from the snapshot and materializes the snapshot. +// This is skipped if the snapshot manifest has already been read. +func (sr *SnapshotReader) readSnapshot() error { + // Already read, ignore. + if sr.snapshot != nil { + return nil + } + + // Read manifest header. + hdr, err := sr.tr.Next() + if err != nil { + return fmt.Errorf("snapshot header: %s", err) + } else if hdr.Name != manifestName { + return fmt.Errorf("invalid snapshot header: expected manifest") + } + + // Materialize snapshot. + var snapshot Snapshot + if err := json.NewDecoder(sr.tr).Decode(&snapshot); err != nil { + return fmt.Errorf("decode manifest: %s", err) + } + sr.snapshot = &snapshot + + return nil +} + +// Next returns the next file in the snapshot. +func (sr *SnapshotReader) Next() (SnapshotFile, error) { + // Read snapshot if it hasn't been read yet. + if err := sr.readSnapshot(); err != nil { + return SnapshotFile{}, err + } + + // Read next header. + hdr, err := sr.tr.Next() + if err != nil { + return SnapshotFile{}, err + } + + // Match header to file in snapshot. + for i := range sr.snapshot.Files { + if sr.snapshot.Files[i].Name == hdr.Name { + return sr.snapshot.Files[i], nil + } + } + + // Return error if file is not in the snapshot. + return SnapshotFile{}, fmt.Errorf("snapshot entry not found in manifest: %s", hdr.Name) +} + +// Read reads the current entry in the snapshot. +func (sr *SnapshotReader) Read(b []byte) (n int, err error) { + // Read snapshot if it hasn't been read yet. + if err := sr.readSnapshot(); err != nil { + return 0, err + } + + // Pass read through to the tar reader. + return sr.tr.Read(b) +} + +// SnapshotsReader reads from a collection of snapshots. +// Only files with the highest index are read from the reader. +// This type is not safe for concurrent use. +type SnapshotsReader struct { + readers []*SnapshotReader // underlying snapshot readers + files []*SnapshotFile // current file for each reader + + snapshot *Snapshot // combined snapshot from all readers + index int // index of file in snapshot to read + curr *SnapshotReader // current reader +} + +// NewSnapshotsReader returns a new SnapshotsReader reading from a list of readers. +func NewSnapshotsReader(readers ...io.Reader) *SnapshotsReader { + r := &SnapshotsReader{ + readers: make([]*SnapshotReader, len(readers)), + files: make([]*SnapshotFile, len(readers)), + index: -1, + } + for i := range readers { + r.readers[i] = NewSnapshotReader(readers[i]) + } + return r +} + +// Snapshot returns the combined snapshot from all readers. +func (ssr *SnapshotsReader) Snapshot() (*Snapshot, error) { + // Use snapshot if it's already been calculated. + if ssr.snapshot != nil { + return ssr.snapshot, nil + } + + // Build snapshot from other readers. + ss := &Snapshot{} + for i, sr := range ssr.readers { + other, err := sr.Snapshot() + if err != nil { + return nil, fmt.Errorf("snapshot: idx=%d, err=%s", i, err) + } + ss = ss.Merge(other) + } + + // Cache snapshot and return. + ssr.snapshot = ss + return ss, nil +} + +// Next returns the next file in the reader. +func (ssr *SnapshotsReader) Next() (SnapshotFile, error) { + ss, err := ssr.Snapshot() + if err != nil { + return SnapshotFile{}, fmt.Errorf("snapshot: %s", err) + } + + // Return EOF if there are no more files in snapshot. + if ssr.index == len(ss.Files)-1 { + ssr.curr = nil + return SnapshotFile{}, io.EOF + } + + // Queue up next files. + if err := ssr.nextFiles(); err != nil { + return SnapshotFile{}, fmt.Errorf("next files: %s", err) + } + + // Increment the file index. + ssr.index++ + sf := ss.Files[ssr.index] + + // Find the matching reader. Clear other readers. + var sr *SnapshotReader + for i, f := range ssr.files { + if f == nil || f.Name != sf.Name { + continue + } + + // Set reader to the first match. + if sr == nil && *f == sf { + sr = ssr.readers[i] + } + ssr.files[i] = nil + } + + // Return an error if file doesn't match. + // This shouldn't happen unless the underlying snapshot is altered. + if sr == nil { + return SnapshotFile{}, fmt.Errorf("snaphot file not found in readers: %s", sf.Name) + } + + // Set current reader. + ssr.curr = sr + + // Return file. + return sf, nil +} + +// nextFiles queues up a next file for all readers. +func (ssr *SnapshotsReader) nextFiles() error { + for i, sr := range ssr.readers { + if ssr.files[i] == nil { + // Read next file. + sf, err := sr.Next() + if err == io.EOF { + ssr.files[i] = nil + continue + } else if err != nil { + return fmt.Errorf("next: reader=%d, err=%s", i, err) + } + + // Cache file. + ssr.files[i] = &sf + } + } + + return nil +} + +// nextIndex returns the index of the next reader to read from. +// Returns -1 if all readers are at EOF. +func (ssr *SnapshotsReader) nextIndex() int { + // Find the next file by name and lowest index. + index := -1 + for i, f := range ssr.files { + if f == nil { + continue + } else if index == -1 { + index = i + } else if f.Name < ssr.files[index].Name { + index = i + } else if f.Name == ssr.files[index].Name && f.Index > ssr.files[index].Index { + index = i + } + } + return index +} + +// Read reads the current entry in the reader. +func (ssr *SnapshotsReader) Read(b []byte) (n int, err error) { + if ssr.curr == nil { + return 0, io.EOF + } + return ssr.curr.Read(b) +} + +// OpenFileSnapshotsReader returns a SnapshotsReader based on the path of the base snapshot. +// Returns the underlying files which need to be closed separately. +func OpenFileSnapshotsReader(path string) (*SnapshotsReader, []io.Closer, error) { + var readers []io.Reader + var closers []io.Closer + if err := func() error { + // Open original snapshot file. + f, err := os.Open(path) + if os.IsNotExist(err) { + return err + } else if err != nil { + return fmt.Errorf("open snapshot: %s", err) + } + readers = append(readers, f) + closers = append(closers, f) + + // Open all incremental snapshots. + for i := 0; ; i++ { + filename := path + fmt.Sprintf(".%d", i) + f, err := os.Open(filename) + if os.IsNotExist(err) { + break + } else if err != nil { + return fmt.Errorf("open incremental snapshot: file=%s, err=%s", filename, err) + } + readers = append(readers, f) + closers = append(closers, f) + } + + return nil + }(); err != nil { + closeAll(closers) + return nil, nil, err + } + + return NewSnapshotsReader(readers...), nil, nil +} + +// ReadFileSnapshot returns a Snapshot for a given base snapshot path. +// This snapshot merges all incremental backup snapshots as well. +func ReadFileSnapshot(path string) (*Snapshot, error) { + // Open a multi-snapshot reader. + ssr, files, err := OpenFileSnapshotsReader(path) + if os.IsNotExist(err) { + return nil, err + } else if err != nil { + return nil, fmt.Errorf("open file snapshots reader: %s", err) + } + defer closeAll(files) + + // Read snapshot. + ss, err := ssr.Snapshot() + if err != nil { + return nil, fmt.Errorf("snapshot: %s", err) + } + + return ss, nil +} + +func closeAll(a []io.Closer) { + for _, c := range a { + _ = c.Close() + } +} + +// SnapshotWriter writes a snapshot and the underlying files to disk as a tar archive. +type SnapshotWriter struct { + // The snapshot to write from. + // Removing files from the snapshot after creation will cause those files to be ignored. + Snapshot *Snapshot + + // Writers for each file by filename. + // Writers will be closed as they're processed and will close by the end of WriteTo(). + FileWriters map[string]SnapshotFileWriter +} + +// NewSnapshotWriter returns a new instance of SnapshotWriter. +func NewSnapshotWriter() *SnapshotWriter { + return &SnapshotWriter{ + Snapshot: &Snapshot{}, + FileWriters: make(map[string]SnapshotFileWriter), + } +} + +// Close closes all file writers on the snapshot. +func (sw *SnapshotWriter) Close() error { + for _, fw := range sw.FileWriters { + _ = fw.Close() + } + return nil +} + +// closeUnusedWriters closes all file writers not on the snapshot. +// This allows transactions on these files to be short lived. +func (sw *SnapshotWriter) closeUnusedWriters() { +loop: + for name, fw := range sw.FileWriters { + // Find writer in snapshot. + for _, f := range sw.Snapshot.Files { + if f.Name == name { + continue loop + } + } + + // If not found then close it. + _ = fw.Close() + } +} + +// WriteTo writes the snapshot to the writer. +// File writers are closed as they are written. +// This function will always return n == 0. +func (sw *SnapshotWriter) WriteTo(w io.Writer) (n int64, err error) { + // Close any file writers that aren't required. + sw.closeUnusedWriters() + + // Sort snapshot files. + // This is required for combining multiple snapshots together. + sort.Sort(SnapshotFiles(sw.Snapshot.Files)) + + // Begin writing a tar file to the output. + tw := tar.NewWriter(w) + defer tw.Close() + + // Write manifest file. + if err := sw.writeManifestTo(tw); err != nil { + return 0, fmt.Errorf("write manifest: %s", err) + } + + // Write each backup file. + for _, f := range sw.Snapshot.Files { + if err := sw.writeFileTo(tw, &f); err != nil { + return 0, fmt.Errorf("write file: %s", err) + } + } + + // Close tar writer and check error. + if err := tw.Close(); err != nil { + return 0, fmt.Errorf("tar close: %s", err) + } + + return 0, nil +} + +// writeManifestTo writes a manifest of the contents of the snapshot to the archive. +func (sw *SnapshotWriter) writeManifestTo(tw *tar.Writer) error { + // Convert snapshot to JSON. + b, err := json.Marshal(sw.Snapshot) + if err != nil { + return fmt.Errorf("marshal json: %s", err) + } + + // Write header & file. + if err := tw.WriteHeader(&tar.Header{ + Name: manifestName, + Size: int64(len(b)), + Mode: 0666, + ModTime: time.Now(), + }); err != nil { + return fmt.Errorf("write header: %s", err) + } + if _, err := tw.Write(b); err != nil { + return fmt.Errorf("write: %s", err) + } + + return nil +} + +// writeFileTo writes a single file to the archive. +func (sw *SnapshotWriter) writeFileTo(tw *tar.Writer, f *SnapshotFile) error { + // Retrieve the file writer by filename. + fw := sw.FileWriters[f.Name] + if fw == nil { + return fmt.Errorf("file writer not found: name=%s", f.Name) + } + + // Write file header. + if err := tw.WriteHeader(&tar.Header{ + Name: f.Name, + Size: f.Size, + Mode: 0666, + ModTime: time.Now(), + }); err != nil { + return fmt.Errorf("write header: file=%s, err=%s", f.Name, err) + } + + // Copy the database to the writer. + if nn, err := fw.WriteTo(tw); err != nil { + return fmt.Errorf("write: file=%s, err=%s", f.Name, err) + } else if nn != f.Size { + return fmt.Errorf("short write: file=%s", f.Name) + } + + // Close the writer. + if err := fw.Close(); err != nil { + return fmt.Errorf("close: file=%s, err=%s", f.Name, err) + } + + return nil +} + +// createServerSnapshotWriter creates a snapshot writer from a locked server. +func createServerSnapshotWriter(s *Server) (*SnapshotWriter, error) { + // Exit if the server is closed. + if !s.opened() { + return nil, ErrServerClosed + } + + // Create snapshot writer. + sw := NewSnapshotWriter() + + if err := func() error { + f, fw, err := createMetaSnapshotFile(s.meta) + if err != nil { + return fmt.Errorf("create meta snapshot file: %s", err) + } + sw.Snapshot.Files = append(sw.Snapshot.Files, *f) + sw.FileWriters[f.Name] = fw + + // Create files for each shard. + for _, sh := range s.shards { + f, fw, err := createShardSnapshotFile(sh) + if err != nil { + return fmt.Errorf("create meta snapshot file: id=%d, err=%s", sh.ID, err) + } else if f != nil { + sw.Snapshot.Files = append(sw.Snapshot.Files, *f) + sw.FileWriters[f.Name] = fw + } + } + + return nil + }(); err != nil { + _ = sw.Close() + return nil, err + } + + return sw, nil +} + +func createMetaSnapshotFile(meta *metastore) (*SnapshotFile, SnapshotFileWriter, error) { + // Begin transaction. + tx, err := meta.db.Begin(false) + if err != nil { + return nil, nil, fmt.Errorf("begin: %s", err) + } + + // Create and return file and writer. + f := &SnapshotFile{ + Name: "meta", + Size: tx.Size(), + Index: (&metatx{tx}).index(), + } + return f, &boltTxCloser{tx}, nil +} + +func createShardSnapshotFile(sh *Shard) (*SnapshotFile, SnapshotFileWriter, error) { + // Ignore shard if it's not owned by the server. + if sh.store == nil { + return nil, nil, nil + } + + // Begin transaction. + tx, err := sh.store.Begin(false) + if err != nil { + return nil, nil, fmt.Errorf("begin: %s", err) + } + + // Create and return file and writer. + f := &SnapshotFile{ + Name: path.Join("shards", filepath.Base(sh.store.Path())), + Size: tx.Size(), + Index: shardMetaIndex(tx), + } + return f, &boltTxCloser{tx}, nil +} + +// SnapshotFileWriter is the interface used for writing a file to a snapshot. +type SnapshotFileWriter interface { + io.WriterTo + io.Closer +} + +// boltTxCloser wraps a Bolt transaction to implement io.Closer. +type boltTxCloser struct { + *bolt.Tx +} + +// Close rollsback the transaction. +func (tx *boltTxCloser) Close() error { return tx.Rollback() } + +// NopWriteToCloser returns an io.WriterTo that implements io.Closer. +func NopWriteToCloser(w io.WriterTo) interface { + io.WriterTo + io.Closer +} { + return &nopWriteToCloser{w} +} + +type nopWriteToCloser struct { + io.WriterTo +} + +func (w *nopWriteToCloser) Close() error { return nil } diff --git a/snapshot_test.go b/snapshot_test.go new file mode 100644 index 00000000000..c2fa151e8e9 --- /dev/null +++ b/snapshot_test.go @@ -0,0 +1,292 @@ +package influxdb_test + +import ( + "bytes" + "io" + "io/ioutil" + "reflect" + "testing" + + "github.com/influxdb/influxdb" +) + +// Ensure a snapshot can be diff'd so that only newer files are retrieved. +func TestSnapshot_Diff(t *testing.T) { + for i, tt := range []struct { + s *influxdb.Snapshot + other *influxdb.Snapshot + result *influxdb.Snapshot + }{ + // 0. Mixed higher, lower, equal indices. + { + s: &influxdb.Snapshot{Files: []influxdb.SnapshotFile{ + {Name: "a", Index: 1}, // remove: lower index + {Name: "b", Index: 10}, // remove: equal index + {Name: "c", Index: 21}, // keep: higher index + {Name: "d", Index: 15}, // keep: higher index + }}, + other: &influxdb.Snapshot{Files: []influxdb.SnapshotFile{ + {Name: "a", Index: 2}, + {Name: "b", Index: 10}, + {Name: "c", Index: 11}, + {Name: "d", Index: 14}, + }}, + result: &influxdb.Snapshot{Files: []influxdb.SnapshotFile{ + {Name: "c", Index: 21}, + {Name: "d", Index: 15}, + }}, + }, + + // 1. Files in other-only should not be added to diff. + { + s: &influxdb.Snapshot{Files: []influxdb.SnapshotFile{ + {Name: "a", Index: 2}, + }}, + other: &influxdb.Snapshot{Files: []influxdb.SnapshotFile{ + {Name: "a", Index: 1}, + {Name: "b", Index: 10}, + }}, + result: &influxdb.Snapshot{Files: []influxdb.SnapshotFile{ + {Name: "a", Index: 2}, + }}, + }, + + // 2. Files in s-only should be added to diff. + { + s: &influxdb.Snapshot{Files: []influxdb.SnapshotFile{ + {Name: "a", Index: 2}, + }}, + other: &influxdb.Snapshot{Files: []influxdb.SnapshotFile{}}, + result: &influxdb.Snapshot{Files: []influxdb.SnapshotFile{ + {Name: "a", Index: 2}, + }}, + }, + + // 3. Empty snapshots should return empty diffs. + { + s: &influxdb.Snapshot{Files: []influxdb.SnapshotFile{}}, + other: &influxdb.Snapshot{Files: []influxdb.SnapshotFile{}}, + result: &influxdb.Snapshot{Files: nil}, + }, + } { + result := tt.s.Diff(tt.other) + if !reflect.DeepEqual(tt.result, result) { + t.Errorf("%d. mismatch:\n\nexp=%#v\n\ngot=%#v", i, tt.result, result) + } + } +} + +// Ensure a snapshot can be merged so that the newest files from the two snapshots are returned. +func TestSnapshot_Merge(t *testing.T) { + for i, tt := range []struct { + s *influxdb.Snapshot + other *influxdb.Snapshot + result *influxdb.Snapshot + }{ + // 0. Mixed higher, lower, equal indices. + { + s: &influxdb.Snapshot{Files: []influxdb.SnapshotFile{ + {Name: "a", Size: 10, Index: 1}, + {Name: "b", Size: 10, Index: 10}, // keep: same, first + {Name: "c", Size: 10, Index: 21}, // keep: higher + {Name: "e", Size: 10, Index: 15}, // keep: higher + }}, + other: &influxdb.Snapshot{Files: []influxdb.SnapshotFile{ + {Name: "a", Size: 20, Index: 2}, // keep: higher + {Name: "b", Size: 20, Index: 10}, + {Name: "c", Size: 20, Index: 11}, + {Name: "d", Size: 20, Index: 14}, // keep: new + {Name: "e", Size: 20, Index: 12}, + }}, + result: &influxdb.Snapshot{Files: []influxdb.SnapshotFile{ + {Name: "a", Size: 20, Index: 2}, + {Name: "b", Size: 10, Index: 10}, + {Name: "c", Size: 10, Index: 21}, + {Name: "d", Size: 20, Index: 14}, + {Name: "e", Size: 10, Index: 15}, + }}, + }, + } { + result := tt.s.Merge(tt.other) + if !reflect.DeepEqual(tt.result, result) { + t.Errorf("%d. mismatch:\n\nexp=%#v\n\ngot=%#v", i, tt.result, result) + } + } +} + +// Ensure a snapshot writer can write a set of files to an archive +func TestSnapshotWriter(t *testing.T) { + // Create a new writer with a snapshot and file writers. + sw := influxdb.NewSnapshotWriter() + sw.Snapshot.Files = []influxdb.SnapshotFile{ + {Name: "meta", Size: 3, Index: 12}, + {Name: "shards/1", Size: 5, Index: 15}, + } + sw.FileWriters["meta"] = &bufCloser{Buffer: *bytes.NewBufferString("foo")} + sw.FileWriters["shards/1"] = &bufCloser{Buffer: *bytes.NewBufferString("55555")} + + // Write the snapshot to a buffer. + var buf bytes.Buffer + if _, err := sw.WriteTo(&buf); err != nil { + t.Fatal(err) + } + + // Ensure file writers are closed as they're writing. + if !sw.FileWriters["meta"].(*bufCloser).closed { + t.Fatal("meta file writer not closed") + } else if !sw.FileWriters["shards/1"].(*bufCloser).closed { + t.Fatal("shards/1 file writer not closed") + } + + // Close writer. + if err := sw.Close(); err != nil { + t.Fatal(err) + } + + // Read snapshot from buffer. + sr := influxdb.NewSnapshotReader(&buf) + + // Read the manifest. + if ss, err := sr.Snapshot(); err != nil { + t.Fatalf("unexpected error(snapshot): %s", err) + } else if !reflect.DeepEqual(sw.Snapshot, ss) { + t.Fatalf("snapshot mismatch:\n\nexp=%#v\n\ngot=%#v", sw.Snapshot, ss) + } + + // Next should be the meta file. + if f, err := sr.Next(); err != nil { + t.Fatalf("unexpected error(meta): %s", err) + } else if !reflect.DeepEqual(f, influxdb.SnapshotFile{Name: "meta", Size: 3, Index: 12}) { + t.Fatalf("file mismatch(meta): %#v", f) + } else if b := MustReadAll(sr); string(b) != `foo` { + t.Fatalf("unexpected file(meta): %s", b) + } + + // Next should be the shard file. + if f, err := sr.Next(); err != nil { + t.Fatalf("unexpected error(shards/1): %s", err) + } else if !reflect.DeepEqual(f, influxdb.SnapshotFile{Name: "shards/1", Size: 5, Index: 15}) { + t.Fatalf("file mismatch(shards/1): %#v", f) + } else if b := MustReadAll(sr); string(b) != `55555` { + t.Fatalf("unexpected file(shards/1): %s", b) + } + + // Check for end of snapshot. + if _, err := sr.Next(); err != io.EOF { + t.Fatalf("expected EOF: %s", err) + } +} + +// Ensure a snapshot writer closes unused file writers. +func TestSnapshotWriter_CloseUnused(t *testing.T) { + // Create a new writer with a snapshot and file writers. + sw := influxdb.NewSnapshotWriter() + sw.Snapshot.Files = []influxdb.SnapshotFile{ + {Name: "meta", Size: 3}, + } + sw.FileWriters["meta"] = &bufCloser{Buffer: *bytes.NewBufferString("foo")} + sw.FileWriters["other"] = &bufCloser{Buffer: *bytes.NewBufferString("55555")} + + // Write the snapshot to a buffer. + var buf bytes.Buffer + if _, err := sw.WriteTo(&buf); err != nil { + t.Fatal(err) + } + + // Ensure other writer is closed. + // This should happen at the beginning of the write so that it doesn't have + // to wait until the close of the whole writer. + if !sw.FileWriters["other"].(*bufCloser).closed { + t.Fatal("'other' file writer not closed") + } +} + +// Ensure a SnapshotsReader can read from multiple snapshots. +func TestSnapshotsReader(t *testing.T) { + var sw *influxdb.SnapshotWriter + bufs := make([]bytes.Buffer, 2) + + // Snapshot #1 + sw = influxdb.NewSnapshotWriter() + sw.Snapshot.Files = []influxdb.SnapshotFile{ + {Name: "meta", Size: 3, Index: 12}, + {Name: "shards/1", Size: 5, Index: 15}, + } + sw.FileWriters["meta"] = &bufCloser{Buffer: *bytes.NewBufferString("foo")} + sw.FileWriters["shards/1"] = &bufCloser{Buffer: *bytes.NewBufferString("55555")} + if _, err := sw.WriteTo(&bufs[0]); err != nil { + t.Fatal(err) + } else if err = sw.Close(); err != nil { + t.Fatal(err) + } + + // Snapshot #2 + sw = influxdb.NewSnapshotWriter() + sw.Snapshot.Files = []influxdb.SnapshotFile{ + {Name: "meta", Size: 3, Index: 20}, + {Name: "shards/2", Size: 6, Index: 30}, + } + sw.FileWriters["meta"] = &bufCloser{Buffer: *bytes.NewBufferString("bar")} + sw.FileWriters["shards/2"] = &bufCloser{Buffer: *bytes.NewBufferString("666666")} + if _, err := sw.WriteTo(&bufs[1]); err != nil { + t.Fatal(err) + } else if err = sw.Close(); err != nil { + t.Fatal(err) + } + + // Read and merge snapshots. + ssr := influxdb.NewSnapshotsReader(&bufs[0], &bufs[1]) + + // Next should be the second meta file. + if f, err := ssr.Next(); err != nil { + t.Fatalf("unexpected error(meta): %s", err) + } else if !reflect.DeepEqual(f, influxdb.SnapshotFile{Name: "meta", Size: 3, Index: 20}) { + t.Fatalf("file mismatch(meta): %#v", f) + } else if b := MustReadAll(ssr); string(b) != `bar` { + t.Fatalf("unexpected file(meta): %s", b) + } + + // Next should be shards/1. + if f, err := ssr.Next(); err != nil { + t.Fatalf("unexpected error(shards/1): %s", err) + } else if !reflect.DeepEqual(f, influxdb.SnapshotFile{Name: "shards/1", Size: 5, Index: 15}) { + t.Fatalf("file mismatch(shards/1): %#v", f) + } else if b := MustReadAll(ssr); string(b) != `55555` { + t.Fatalf("unexpected file(shards/1): %s", b) + } + + // Next should be shards/2. + if f, err := ssr.Next(); err != nil { + t.Fatalf("unexpected error(shards/2): %s", err) + } else if !reflect.DeepEqual(f, influxdb.SnapshotFile{Name: "shards/2", Size: 6, Index: 30}) { + t.Fatalf("file mismatch(shards/2): %#v", f) + } else if b := MustReadAll(ssr); string(b) != `666666` { + t.Fatalf("unexpected file(shards/2): %s", b) + } + + // Check for end of snapshot. + if _, err := ssr.Next(); err != io.EOF { + t.Fatalf("expected EOF: %s", err) + } +} + +// bufCloser adds a Close() method to a bytes.Buffer +type bufCloser struct { + bytes.Buffer + closed bool +} + +// Close marks the buffer as closed. +func (b *bufCloser) Close() error { + b.closed = true + return nil +} + +// Reads all data from the reader. Panic on error. +func MustReadAll(r io.Reader) []byte { + b, err := ioutil.ReadAll(r) + if err != nil { + panic(err.Error()) + } + return b +}