diff --git a/ee/backup/backup.go b/ee/backup/backup.go index 28652ff542c..90ff91d7787 100644 --- a/ee/backup/backup.go +++ b/ee/backup/backup.go @@ -69,6 +69,19 @@ type Manifest struct { Path string `json:"-"` } +func (m *Manifest) getPredsInGroup(gid uint32) predicateSet { + preds, ok := m.Groups[gid] + if !ok { + return nil + } + + predSet := make(predicateSet) + for _, pred := range preds { + predSet[pred] = struct{}{} + } + return predSet +} + // WriteBackup uses the request values to create a stream writer then hand off the data // retrieval to stream.Orchestrate. The writer will create all the fd's needed to // collect the data and later move to the target. diff --git a/ee/backup/file_handler.go b/ee/backup/file_handler.go index fb8b12debfd..ce3836691ac 100644 --- a/ee/backup/file_handler.go +++ b/ee/backup/file_handler.go @@ -157,14 +157,18 @@ func (h *fileHandler) Load(uri *url.URL, backupId string, fn loadFn) (uint64, er } path := filepath.Dir(manifests[i].Path) - for groupId := range manifest.Groups { - file := filepath.Join(path, backupName(manifest.Since, groupId)) + for gid := range manifest.Groups { + file := filepath.Join(path, backupName(manifest.Since, gid)) fp, err := os.Open(file) if err != nil { return 0, errors.Wrapf(err, "Failed to open %q", file) } defer fp.Close() - if err = fn(fp, int(groupId)); err != nil { + + // Only restore the predicates that were assigned to this group at the time + // of the last backup. + predSet := manifests[len(manifests)-1].getPredsInGroup(gid) + if err = fn(fp, int(gid), predSet); err != nil { return 0, err } } diff --git a/ee/backup/handler.go b/ee/backup/handler.go index e13f59198fd..bb5d0fe57e2 100644 --- a/ee/backup/handler.go +++ b/ee/backup/handler.go @@ -128,9 +128,13 @@ func NewUriHandler(uri *url.URL) (UriHandler, error) { return h, nil } +// predicateSet is a map whose keys are predicates. It is meant to be used as a set. +type predicateSet map[string]struct{} + // loadFn is a function that will receive the current file being read. -// A reader and the backup groupId are passed as arguments. -type loadFn func(reader io.Reader, groupId int) error +// A reader, the backup groupId, and a map whose keys are the predicates to restore +// are passed as arguments. +type loadFn func(reader io.Reader, groupId int, preds predicateSet) error // Load will scan location l for backup files in the given backup series and load them // sequentially. Returns the maximum Since value on success, otherwise an error. diff --git a/ee/backup/restore.go b/ee/backup/restore.go index 225340199e4..9e8afe13473 100644 --- a/ee/backup/restore.go +++ b/ee/backup/restore.go @@ -36,7 +36,7 @@ import ( func RunRestore(pdir, location, backupId string) (uint64, error) { // Scan location for backup files and load them. Each file represents a node group, // and we create a new p dir for each. - return Load(location, backupId, func(r io.Reader, groupId int) error { + return Load(location, backupId, func(r io.Reader, groupId int, preds predicateSet) error { dir := filepath.Join(pdir, fmt.Sprintf("p%d", groupId)) db, err := badger.OpenManaged(badger.DefaultOptions(dir). WithSyncWrites(false). @@ -55,17 +55,17 @@ func RunRestore(pdir, location, backupId string) (uint64, error) { if err != nil { return nil } - return loadFromBackup(db, gzReader, 16) + return loadFromBackup(db, gzReader, preds) }) } // loadFromBackup reads the backup, converts the keys and values to the required format, // and loads them to the given badger DB. -func loadFromBackup(db *badger.DB, r io.Reader, maxPendingWrites int) error { +func loadFromBackup(db *badger.DB, r io.Reader, preds predicateSet) error { br := bufio.NewReaderSize(r, 16<<10) unmarshalBuf := make([]byte, 1<<10) - loader := db.NewKVLoader(maxPendingWrites) + loader := db.NewKVLoader(16) for { var sz uint64 err := binary.Read(br, binary.LittleEndian, &sz) @@ -99,6 +99,17 @@ func loadFromBackup(db *badger.DB, r io.Reader, maxPendingWrites int) error { return err } + // Filter keys using the preds set. Do not do this filtering for type keys + // as they are meant to be in every group and their Attr value does not + // match a predicate name. + parsedKey := x.Parse(restoreKey) + if parsedKey == nil { + return errors.Errorf("could not parse key %s", hex.Dump(restoreKey)) + } + if _, ok := preds[parsedKey.Attr]; !parsedKey.IsType() && !ok { + continue + } + var restoreVal []byte switch kv.GetUserMeta()[0] { case posting.BitEmptyPosting, posting.BitCompletePosting, posting.BitDeltaPosting: diff --git a/ee/backup/s3_handler.go b/ee/backup/s3_handler.go index b8912073b31..638db33d0ea 100644 --- a/ee/backup/s3_handler.go +++ b/ee/backup/s3_handler.go @@ -295,13 +295,14 @@ func (h *s3Handler) Load(uri *url.URL, backupId string, fn loadFn) (uint64, erro } path := filepath.Dir(manifests[i].Path) - for groupId := range manifest.Groups { - object := filepath.Join(path, backupName(manifest.Since, groupId)) + for gid := range manifest.Groups { + object := filepath.Join(path, backupName(manifest.Since, gid)) reader, err := mc.GetObject(h.bucketName, object, minio.GetObjectOptions{}) if err != nil { return 0, errors.Wrapf(err, "Failed to get %q", object) } defer reader.Close() + st, err := reader.Stat() if err != nil { return 0, errors.Wrapf(err, "Stat failed %q", object) @@ -310,7 +311,11 @@ func (h *s3Handler) Load(uri *url.URL, backupId string, fn loadFn) (uint64, erro return 0, errors.Errorf("Remote object is empty or inaccessible: %s", object) } fmt.Printf("Downloading %q, %d bytes\n", object, st.Size) - if err = fn(reader, int(groupId)); err != nil { + + // Only restore the predicates that were assigned to this group at the time + // of the last backup. + predSet := manifests[len(manifests)-1].getPredsInGroup(gid) + if err = fn(reader, int(gid), predSet); err != nil { return 0, err } }