Skip to content

Commit

Permalink
fix(restore): update the schema and type from 2103 (#7838)
Browse files Browse the repository at this point in the history
With #7810 change, we changed the format of the predicate. We missed updating the schema and predicate. This PR fixes it.
  • Loading branch information
NamanJain8 authored May 19, 2021
1 parent 70812e4 commit 8504fb1
Show file tree
Hide file tree
Showing 4 changed files with 72 additions and 40 deletions.
22 changes: 4 additions & 18 deletions ee/updatemanifest/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,9 @@
package updatemanifest

import (
"encoding/binary"
"log"
"net/url"
"os"
"strings"

"github.com/dgraph-io/dgraph/ee"
"github.com/dgraph-io/dgraph/protos/pb"
Expand Down Expand Up @@ -65,18 +63,6 @@ func init() {
ee.RegisterEncFlag(flag)
}

// Invalid bytes are replaced with the Unicode replacement rune.
// See https://golang.org/pkg/encoding/json/#Marshal
const replacementRune = rune('\ufffd')

func parseNsAttr(attr string) (uint64, string, error) {
if strings.ContainsRune(attr, replacementRune) {
return 0, "", errors.Errorf("replacement rune found while parsing attr: %s (%+v)",
attr, []byte(attr))
}
return binary.BigEndian.Uint64([]byte(attr[:8])), attr[8:], nil
}

func run() error {
keys, err := ee.GetKeys(UpdateManifest.Conf)
if err != nil {
Expand All @@ -100,25 +86,25 @@ func run() error {
for gid, preds := range manifest.Groups {
parsedPreds := preds[:0]
for _, pred := range preds {
ns, attr, err := parseNsAttr(pred)
attr, err := x.AttrFrom2103(pred)
if err != nil {
parsedPreds = append(parsedPreds, pred)
logger.Printf("Unable to parse the pred: %v", pred)
continue
}
parsedPreds = append(parsedPreds, x.NamespaceAttr(ns, attr))
parsedPreds = append(parsedPreds, attr)
}
manifest.Groups[gid] = parsedPreds
}
for _, op := range manifest.DropOperations {
if op.DropOp == pb.DropOperation_ATTR {
ns, attr, err := parseNsAttr(op.DropValue)
attr, err := x.AttrFrom2103(op.DropValue)
if err != nil {
logger.Printf("Unable to parse the drop operation %+v pred: %v",
op, []byte(op.DropValue))
continue
}
op.DropValue = x.NamespaceAttr(ns, attr)
op.DropValue = attr
}
}
// As we have made the required changes to the manifest, we should update the version too.
Expand Down
21 changes: 4 additions & 17 deletions worker/backup_manifest.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
package worker

import (
"encoding/binary"
"encoding/json"
"fmt"
"net/url"
Expand Down Expand Up @@ -160,18 +159,6 @@ func getConsolidatedManifest(h UriHandler, uri *url.URL) (*MasterManifest, error
return &MasterManifest{Manifests: mlist}, nil
}

// Invalid bytes are replaced with the Unicode replacement rune.
// See https://golang.org/pkg/encoding/json/#Marshal
const replacementRune = rune('\ufffd')

func parseNsAttr(attr string) (uint64, string, error) {
if strings.ContainsRune(attr, replacementRune) {
return 0, "", errors.Errorf("replacement rune found while parsing attr: %s (%+v)",
attr, []byte(attr))
}
return binary.BigEndian.Uint64([]byte(attr[:8])), attr[8:], nil
}

// upgradeManifest updates the in-memory manifest from various versions to the latest version.
// If the manifest version is 0 (dgraph version < v21.03), attach namespace to the predicates and
// the drop data/attr operation.
Expand Down Expand Up @@ -203,23 +190,23 @@ func upgradeManifest(m *Manifest) error {
for gid, preds := range m.Groups {
parsedPreds := preds[:0]
for _, pred := range preds {
ns, attr, err := parseNsAttr(pred)
attr, err := x.AttrFrom2103(pred)
if err != nil {
return errors.Errorf("while parsing predicate got: %q", err)
}
parsedPreds = append(parsedPreds, x.NamespaceAttr(ns, attr))
parsedPreds = append(parsedPreds, attr)
}
m.Groups[gid] = parsedPreds
}
for _, op := range m.DropOperations {
// We have a cluster wide drop data in v21.03.
if op.DropOp == pb.DropOperation_ATTR {
ns, attr, err := parseNsAttr(op.DropValue)
attr, err := x.AttrFrom2103(op.DropValue)
if err != nil {
return errors.Errorf("while parsing the drop operation %+v got: %q",
op, err)
}
op.DropValue = x.NamespaceAttr(ns, attr)
op.DropValue = attr
}
}
case 2105:
Expand Down
56 changes: 51 additions & 5 deletions worker/restore_map.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ type loadBackupInput struct {
restoreTs uint64
preds predicateSet
dropNs map[uint64]struct{}
isOld bool
version int
keepSchema bool
compression string
}
Expand Down Expand Up @@ -391,11 +391,57 @@ func (m *mapper) processReqCh(ctx context.Context) error {
kv.Value, err = update.Marshal()
return err
}
if in.isOld && parsedKey.IsType() {
if err := appendNamespace(); err != nil {
glog.Errorf("Unable to (un)marshal type: %+v. Err=%v\n", parsedKey, err)
changeFormat := func() error {
// In the backup taken on 2103, we have the schemaUpdate.Predicate in format
// <namespace 8 bytes>|<attribute>. That had issues with JSON marshalling.
// So, we switched over to the format <namespace hex string>-<attribute>.
var err error
if parsedKey.IsSchema() {
var update pb.SchemaUpdate
if err := update.Unmarshal(kv.Value); err != nil {
return err
}
if update.Predicate, err = x.AttrFrom2103(update.Predicate); err != nil {
return err
}
kv.Value, err = update.Marshal()
return err
}
if parsedKey.IsType() {
var update pb.TypeUpdate
if err := update.Unmarshal(kv.Value); err != nil {
return err
}
if update.TypeName, err = x.AttrFrom2103(update.TypeName); err != nil {
return err
}
for _, sch := range update.Fields {
if sch.Predicate, err = x.AttrFrom2103(sch.Predicate); err != nil {
return err
}
}
kv.Value, err = update.Marshal()
return err
}
return nil
}
// We changed the format of predicate in 2103 and 2105. SchemaUpdate and TypeUpdate have
// predicate stored within them, so they also need to be updated accordingly.
switch in.version {
case 0:
if parsedKey.IsType() {
if err := appendNamespace(); err != nil {
glog.Errorf("Unable to (un)marshal type: %+v. Err=%v\n", parsedKey, err)
return nil
}
}
case 2103:
if err := changeFormat(); err != nil {
glog.Errorf("Unable to change format for: %+v Err=%+v", parsedKey, err)
return nil
}
default:
// for manifest versions >= 2015, do nothing.
}
// Reset the StreamId to prevent ordering issues while writing to stream writer.
kv.StreamId = 0
Expand Down Expand Up @@ -657,7 +703,7 @@ func RunMapper(req *pb.RestoreRequest, mapDir string) (*mapResult, error) {
in := &loadBackupInput{
preds: predSet,
dropNs: localDropNs,
isOld: manifest.Version == 0,
version: manifest.Version,
restoreTs: req.RestoreTs,
// Only map the schema keys corresponding to the latest backup.
keepSchema: i == 0,
Expand Down
13 changes: 13 additions & 0 deletions x/keys.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,19 @@ const (
NsSeparator = "-"
)

// Invalid bytes are replaced with the Unicode replacement rune.
// See https://golang.org/pkg/encoding/json/#Marshal
const replacementRune = rune('\ufffd')

func AttrFrom2103(attr string) (string, error) {
if strings.ContainsRune(attr, replacementRune) {
return "", errors.Errorf("replacement rune found while parsing attr: %s (%+v)",
attr, []byte(attr))
}
ns, pred := binary.BigEndian.Uint64([]byte(attr[:8])), attr[8:]
return NamespaceAttr(ns, pred), nil
}

func NamespaceToBytes(ns uint64) []byte {
buf := make([]byte, 8)
binary.BigEndian.PutUint64(buf, ns)
Expand Down

0 comments on commit 8504fb1

Please sign in to comment.