Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(storage): implement backup and restore #16504

Merged
merged 15 commits into from
Jan 21, 2020
Merged
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
1. [16523](https://github.com/influxdata/influxdb/pull/16523): Change influx packages to be CRD compliant
1. [16547](https://github.com/influxdata/influxdb/pull/16547): Allow trailing newline in credentials file and CLI integration
1. [16545](https://github.com/influxdata/influxdb/pull/16545): Add support for prefixed cursor search to ForwardCursor types
1. [16504](https://github.com/influxdata/influxdb/pull/16504): Add backup and restore

### UI Improvements

Expand Down
16 changes: 12 additions & 4 deletions authorizer/authorize.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,23 @@ import (
// IsAllowed checks to see if an action is authorized by retrieving the authorizer
// off of context and authorizing the action appropriately.
func IsAllowed(ctx context.Context, p influxdb.Permission) error {
return IsAllowedAll(ctx, []influxdb.Permission{p})
}

// IsAllowedAll checks to see if an action is authorized by ALL permissions.
// Also see IsAllowed.
func IsAllowedAll(ctx context.Context, permissions []influxdb.Permission) error {
a, err := influxdbcontext.GetAuthorizer(ctx)
if err != nil {
return err
}

if !a.Allowed(p) {
return &influxdb.Error{
Code: influxdb.EUnauthorized,
Msg: fmt.Sprintf("%s is unauthorized", p),
for _, p := range permissions {
if !a.Allowed(p) {
return &influxdb.Error{
Code: influxdb.EUnauthorized,
Msg: fmt.Sprintf("%s is unauthorized", p),
}
}
}

Expand Down
48 changes: 48 additions & 0 deletions authorizer/backup.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package authorizer

import (
"context"
"io"

"github.com/influxdata/influxdb"
"github.com/influxdata/influxdb/kit/tracing"
)

var _ influxdb.BackupService = (*BackupService)(nil)

// BackupService wraps a influxdb.BackupService and authorizes actions
// against it appropriately.
type BackupService struct {
s influxdb.BackupService
}

// NewBackupService constructs an instance of an authorizing backup service.
func NewBackupService(s influxdb.BackupService) *BackupService {
return &BackupService{
s: s,
}
}

func (b BackupService) CreateBackup(ctx context.Context) (int, []string, error) {
span, ctx := tracing.StartSpanFromContext(ctx)
defer span.Finish()

if err := IsAllowedAll(ctx, influxdb.ReadAllPermissions()); err != nil {
return 0, nil, err
}
return b.s.CreateBackup(ctx)
}

func (b BackupService) FetchBackupFile(ctx context.Context, backupID int, backupFile string, w io.Writer) error {
span, ctx := tracing.StartSpanFromContext(ctx)
defer span.Finish()

if err := IsAllowedAll(ctx, influxdb.ReadAllPermissions()); err != nil {
return err
}
return b.s.FetchBackupFile(ctx, backupID, backupFile, w)
}

func (b BackupService) InternalBackupPath(backupID int) string {
return b.s.InternalBackupPath(backupID)
}
10 changes: 10 additions & 0 deletions authz.go
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,16 @@ func OperPermissions() []Permission {
return ps
}

// ReadAllPermissions represents permission to read all data and metadata.
// Like OperPermissions, but allows read-only users.
func ReadAllPermissions() []Permission {
ps := make([]Permission, len(AllResourceTypes))
for i, t := range AllResourceTypes {
ps[i] = Permission{Action: ReadAction, Resource: Resource{Type: t}}
}
return ps
}

// OwnerPermissions are the default permissions for those who own a resource.
func OwnerPermissions(orgID ID) []Permission {
ps := []Permission{}
Expand Down
23 changes: 23 additions & 0 deletions backup.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package influxdb

import (
"context"
"io"
)

// BackupService represents the data backup functions of InfluxDB.
type BackupService interface {
// CreateBackup creates a local copy (hard links) of the TSM data for all orgs and buckets.
// The return values are used to download each backup file.
CreateBackup(context.Context) (backupID int, backupFiles []string, err error)
// FetchBackupFile downloads one backup file, data or metadata.
FetchBackupFile(ctx context.Context, backupID int, backupFile string, w io.Writer) error
// InternalBackupPath is a utility to determine the on-disk location of a backup fileset.
InternalBackupPath(backupID int) string
}

// KVBackupService represents the meta data backup functions of InfluxDB.
type KVBackupService interface {
// Backup creates a live backup copy of the metadata database.
Backup(ctx context.Context, w io.Writer) error
}
2 changes: 2 additions & 0 deletions bolt/bbolt.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ import (
"go.uber.org/zap"
)

const DefaultFilename = "influxd.bolt"

// Client is a client for the boltDB data store.
type Client struct {
Path string
Expand Down
12 changes: 12 additions & 0 deletions bolt/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bytes"
"context"
"fmt"
"io"
"os"
"path/filepath"
"time"
Expand Down Expand Up @@ -124,6 +125,17 @@ func (s *KVStore) Update(ctx context.Context, fn func(tx kv.Tx) error) error {
})
}

// Backup copies all K:Vs to a writer, in BoltDB format.
func (s *KVStore) Backup(ctx context.Context, w io.Writer) error {
span, _ := tracing.StartSpanFromContext(ctx)
defer span.Finish()

return s.db.View(func(tx *bolt.Tx) error {
_, err := tx.WriteTo(w)
return err
})
}

// Tx is a light wrapper around a boltdb transaction. It implements kv.Tx.
type Tx struct {
tx *bolt.Tx
Expand Down
111 changes: 111 additions & 0 deletions cmd/influx/backup.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
package main

import (
"context"
"fmt"
"os"
"path/filepath"

"github.com/influxdata/influxdb"
"github.com/influxdata/influxdb/bolt"
"github.com/influxdata/influxdb/http"
"github.com/spf13/cobra"
"github.com/spf13/viper"
"go.uber.org/multierr"
)

func cmdBackup() *cobra.Command {
cmd := &cobra.Command{
Use: "backup",
Short: "Backup the data in InfluxDB",
Long: fmt.Sprintf(
`Backs up data and meta data for the running InfluxDB instance.
Downloaded files are written to the directory indicated by --path.
The target directory, and any parent directories, are created automatically.
Data file have extension .tsm; meta data is written to %s in the same directory.`,
bolt.DefaultFilename),
RunE: backupF,
}
opts := flagOpts{
{
DestP: &backupFlags.Path,
Flag: "path",
Short: 'p',
EnvVar: "PATH",
Desc: "directory path to write backup files to",
Required: true,
},
}
opts.mustRegister(cmd)

return cmd
}

var backupFlags struct {
Path string
}

func init() {
jacobmarble marked this conversation as resolved.
Show resolved Hide resolved
err := viper.BindEnv("PATH")
if err != nil {
panic(err)
}
if h := viper.GetString("PATH"); h != "" {
backupFlags.Path = h
}
}

func newBackupService() (influxdb.BackupService, error) {
return &http.BackupService{
Addr: flags.host,
Token: flags.token,
}, nil
}

func backupF(cmd *cobra.Command, args []string) error {
ctx := context.Background()

if flags.local {
return fmt.Errorf("local flag not supported for backup command")
}

if backupFlags.Path == "" {
return fmt.Errorf("must specify path")
}

err := os.MkdirAll(backupFlags.Path, 0777)
if err != nil && !os.IsExist(err) {
return err
}

backupService, err := newBackupService()
if err != nil {
return err
}

id, backupFilenames, err := backupService.CreateBackup(ctx)
if err != nil {
return err
}

fmt.Printf("Backup ID %d contains %d files\n", id, len(backupFilenames))

for _, backupFilename := range backupFilenames {
dest := filepath.Join(backupFlags.Path, backupFilename)
w, err := os.OpenFile(dest, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0666)
if err != nil {
return err
}
err = backupService.FetchBackupFile(ctx, id, backupFilename, w)
if err != nil {
return multierr.Append(fmt.Errorf("error fetching file %s: %v", backupFilename, err), w.Close())
}
if err = w.Close(); err != nil {
return err
}
}

fmt.Printf("Backup complete")

return nil
}
3 changes: 2 additions & 1 deletion cmd/influx/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ func influxCmd(opts ...genericCLIOptFn) *cobra.Command {

cmd.AddCommand(
cmdAuth(),
cmdBackup(),
cmdBucket(runEWrapper),
cmdDelete(),
cmdOrganization(runEWrapper),
Expand Down Expand Up @@ -199,7 +200,7 @@ func defaultTokenPath() (string, string, error) {
if err != nil {
return "", "", err
}
return filepath.Join(dir, "credentials"), dir, nil
return filepath.Join(dir, http.DefaultTokenFile), dir, nil
}

func getTokenFromDefaultPath() string {
Expand Down
14 changes: 14 additions & 0 deletions cmd/influxd/launcher/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package launcher

import (
"context"
"io"
"io/ioutil"
"os"
"sync"
Expand Down Expand Up @@ -29,6 +30,7 @@ type Engine interface {
storage.PointsWriter
storage.BucketDeleter
prom.PrometheusCollector
influxdb.BackupService

SeriesCardinality() int64

Expand Down Expand Up @@ -165,3 +167,15 @@ func (t *TemporaryEngine) Flush(ctx context.Context) {
t.log.Fatal("unable to open engine", zap.Error(err))
}
}

func (t *TemporaryEngine) CreateBackup(ctx context.Context) (int, []string, error) {
return t.engine.CreateBackup(ctx)
}

func (t *TemporaryEngine) FetchBackupFile(ctx context.Context, backupID int, backupFile string, w io.Writer) error {
return t.engine.FetchBackupFile(ctx, backupID, backupFile, w)
}

func (t *TemporaryEngine) InternalBackupPath(backupID int) string {
return t.engine.InternalBackupPath(backupID)
}
5 changes: 4 additions & 1 deletion cmd/influxd/launcher/launcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ func buildLauncherCommand(l *Launcher, cmd *cobra.Command) {
{
DestP: &l.boltPath,
Flag: "bolt-path",
Default: filepath.Join(dir, "influxd.bolt"),
Default: filepath.Join(dir, bolt.DefaultFilename),
Desc: "path to boltdb database",
},
{
Expand Down Expand Up @@ -581,6 +581,7 @@ func (m *Launcher) run(ctx context.Context) (err error) {
var (
deleteService platform.DeleteService = m.engine
pointsWriter storage.PointsWriter = m.engine
backupService platform.BackupService = m.engine
)

// TODO(cwolff): Figure out a good default per-query memory limit:
Expand Down Expand Up @@ -772,6 +773,8 @@ func (m *Launcher) run(ctx context.Context) (err error) {
NewQueryService: source.NewQueryService,
PointsWriter: pointsWriter,
DeleteService: deleteService,
BackupService: backupService,
KVBackupService: m.kvService,
AuthorizationService: authSvc,
// Wrap the BucketService in a storage backed one that will ensure deleted buckets are removed from the storage engine.
BucketService: storage.NewBucketService(bucketSvc, m.engine),
Expand Down
3 changes: 2 additions & 1 deletion cmd/influxd/launcher/launcher_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/influxdata/flux"
"github.com/influxdata/flux/lang"
platform "github.com/influxdata/influxdb"
"github.com/influxdata/influxdb/bolt"
influxdbcontext "github.com/influxdata/influxdb/context"
"github.com/influxdata/influxdb/http"
"github.com/influxdata/influxdb/kv"
Expand Down Expand Up @@ -79,7 +80,7 @@ func RunTestLauncherOrFail(tb testing.TB, ctx context.Context, args ...string) *

// Run executes the program with additional arguments to set paths and ports.
func (tl *TestLauncher) Run(ctx context.Context, args ...string) error {
args = append(args, "--bolt-path", filepath.Join(tl.Path, "influxd.bolt"))
args = append(args, "--bolt-path", filepath.Join(tl.Path, bolt.DefaultFilename))
args = append(args, "--engine-path", filepath.Join(tl.Path, "engine"))
args = append(args, "--http-bind-address", "127.0.0.1:0")
args = append(args, "--log-level", "debug")
Expand Down
2 changes: 2 additions & 0 deletions cmd/influxd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/influxdata/influxdb/cmd/influxd/generate"
"github.com/influxdata/influxdb/cmd/influxd/inspect"
"github.com/influxdata/influxdb/cmd/influxd/launcher"
"github.com/influxdata/influxdb/cmd/influxd/restore"
_ "github.com/influxdata/influxdb/query/builtin"
_ "github.com/influxdata/influxdb/tsdb/tsi1"
_ "github.com/influxdata/influxdb/tsdb/tsm1"
Expand Down Expand Up @@ -46,6 +47,7 @@ func init() {
rootCmd.AddCommand(launcher.NewCommand())
rootCmd.AddCommand(generate.Command)
rootCmd.AddCommand(inspect.NewCommand())
rootCmd.AddCommand(restore.Command)

// TODO: this should be removed in the future: https://github.com/influxdata/influxdb/issues/16220
if os.Getenv("QUERY_TRACING") == "1" {
Expand Down
Loading