Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(restore): update the schema and type from 2103 #7838

Merged
merged 3 commits into from
May 19, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 4 additions & 18 deletions ee/updatemanifest/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,9 @@
package updatemanifest

import (
"encoding/binary"
"log"
"net/url"
"os"
"strings"

"github.com/dgraph-io/dgraph/ee"
"github.com/dgraph-io/dgraph/protos/pb"
Expand Down Expand Up @@ -65,18 +63,6 @@ func init() {
ee.RegisterEncFlag(flag)
}

// Invalid bytes are replaced with the Unicode replacement rune.
// See https://golang.org/pkg/encoding/json/#Marshal
const replacementRune = rune('\ufffd')

func parseNsAttr(attr string) (uint64, string, error) {
if strings.ContainsRune(attr, replacementRune) {
return 0, "", errors.Errorf("replacement rune found while parsing attr: %s (%+v)",
attr, []byte(attr))
}
return binary.BigEndian.Uint64([]byte(attr[:8])), attr[8:], nil
}

func run() error {
keys, err := ee.GetKeys(UpdateManifest.Conf)
if err != nil {
Expand All @@ -100,25 +86,25 @@ func run() error {
for gid, preds := range manifest.Groups {
parsedPreds := preds[:0]
for _, pred := range preds {
ns, attr, err := parseNsAttr(pred)
attr, err := x.AttrFrom2103(pred)
if err != nil {
parsedPreds = append(parsedPreds, pred)
logger.Printf("Unable to parse the pred: %v", pred)
continue
}
parsedPreds = append(parsedPreds, x.NamespaceAttr(ns, attr))
parsedPreds = append(parsedPreds, attr)
}
manifest.Groups[gid] = parsedPreds
}
for _, op := range manifest.DropOperations {
if op.DropOp == pb.DropOperation_ATTR {
ns, attr, err := parseNsAttr(op.DropValue)
attr, err := x.AttrFrom2103(op.DropValue)
if err != nil {
logger.Printf("Unable to parse the drop operation %+v pred: %v",
op, []byte(op.DropValue))
continue
}
op.DropValue = x.NamespaceAttr(ns, attr)
op.DropValue = attr
}
}
// As we have made the required changes to the manifest, we should update the version too.
Expand Down
21 changes: 4 additions & 17 deletions worker/backup_manifest.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
package worker

import (
"encoding/binary"
"encoding/json"
"fmt"
"net/url"
Expand Down Expand Up @@ -160,18 +159,6 @@ func getConsolidatedManifest(h UriHandler, uri *url.URL) (*MasterManifest, error
return &MasterManifest{Manifests: mlist}, nil
}

// Invalid bytes are replaced with the Unicode replacement rune.
// See https://golang.org/pkg/encoding/json/#Marshal
const replacementRune = rune('\ufffd')

func parseNsAttr(attr string) (uint64, string, error) {
if strings.ContainsRune(attr, replacementRune) {
return 0, "", errors.Errorf("replacement rune found while parsing attr: %s (%+v)",
attr, []byte(attr))
}
return binary.BigEndian.Uint64([]byte(attr[:8])), attr[8:], nil
}

// upgradeManifest updates the in-memory manifest from various versions to the latest version.
// If the manifest version is 0 (dgraph version < v21.03), attach namespace to the predicates and
// the drop data/attr operation.
Expand Down Expand Up @@ -203,23 +190,23 @@ func upgradeManifest(m *Manifest) error {
for gid, preds := range m.Groups {
parsedPreds := preds[:0]
for _, pred := range preds {
ns, attr, err := parseNsAttr(pred)
attr, err := x.AttrFrom2103(pred)
if err != nil {
return errors.Errorf("while parsing predicate got: %q", err)
}
parsedPreds = append(parsedPreds, x.NamespaceAttr(ns, attr))
parsedPreds = append(parsedPreds, attr)
}
m.Groups[gid] = parsedPreds
}
for _, op := range m.DropOperations {
// We have a cluster wide drop data in v21.03.
if op.DropOp == pb.DropOperation_ATTR {
ns, attr, err := parseNsAttr(op.DropValue)
attr, err := x.AttrFrom2103(op.DropValue)
if err != nil {
return errors.Errorf("while parsing the drop operation %+v got: %q",
op, err)
}
op.DropValue = x.NamespaceAttr(ns, attr)
op.DropValue = attr
}
}
case 2105:
Expand Down
56 changes: 51 additions & 5 deletions worker/restore_map.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ type loadBackupInput struct {
restoreTs uint64
preds predicateSet
dropNs map[uint64]struct{}
isOld bool
version int
keepSchema bool
compression string
}
Expand Down Expand Up @@ -391,11 +391,57 @@ func (m *mapper) processReqCh(ctx context.Context) error {
kv.Value, err = update.Marshal()
return err
}
if in.isOld && parsedKey.IsType() {
if err := appendNamespace(); err != nil {
glog.Errorf("Unable to (un)marshal type: %+v. Err=%v\n", parsedKey, err)
changeFormat := func() error {
// In the backup taken on 2103, we have the schemaUpdate.Predicate in format
// <namespace 8 bytes>|<attribute>. That had issues with JSON marshalling.
// So, we switched over to the format <namespace hex string>-<attribute>.
var err error
if parsedKey.IsSchema() {
var update pb.SchemaUpdate
if err := update.Unmarshal(kv.Value); err != nil {
return err
}
if update.Predicate, err = x.AttrFrom2103(update.Predicate); err != nil {
return err
}
kv.Value, err = update.Marshal()
return err
}
if parsedKey.IsType() {
var update pb.TypeUpdate
if err := update.Unmarshal(kv.Value); err != nil {
return err
}
if update.TypeName, err = x.AttrFrom2103(update.TypeName); err != nil {
return err
}
for _, sch := range update.Fields {
if sch.Predicate, err = x.AttrFrom2103(sch.Predicate); err != nil {
return err
}
}
kv.Value, err = update.Marshal()
return err
}
return nil
}
// We changed the format of predicate in 2103 and 2105. SchemaUpdate and TypeUpdate have
// predicate stored within them, so they also need to be updated accordingly.
switch in.version {
NamanJain8 marked this conversation as resolved.
Show resolved Hide resolved
case 0:
if parsedKey.IsType() {
if err := appendNamespace(); err != nil {
glog.Errorf("Unable to (un)marshal type: %+v. Err=%v\n", parsedKey, err)
return nil
}
}
case 2103:
if err := changeFormat(); err != nil {
glog.Errorf("Unable to change format for: %+v Err=%+v", parsedKey, err)
return nil
}
default:
// for manifest versions >= 2015, do nothing.
}
// Reset the StreamId to prevent ordering issues while writing to stream writer.
kv.StreamId = 0
Expand Down Expand Up @@ -657,7 +703,7 @@ func RunMapper(req *pb.RestoreRequest, mapDir string) (*mapResult, error) {
in := &loadBackupInput{
preds: predSet,
dropNs: localDropNs,
isOld: manifest.Version == 0,
version: manifest.Version,
restoreTs: req.RestoreTs,
// Only map the schema keys corresponding to the latest backup.
keepSchema: i == 0,
Expand Down
13 changes: 13 additions & 0 deletions x/keys.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,19 @@ const (
NsSeparator = "-"
)

// Invalid bytes are replaced with the Unicode replacement rune.
// See https://golang.org/pkg/encoding/json/#Marshal
const replacementRune = rune('\ufffd')

func AttrFrom2103(attr string) (string, error) {
if strings.ContainsRune(attr, replacementRune) {
return "", errors.Errorf("replacement rune found while parsing attr: %s (%+v)",
attr, []byte(attr))
}
ns, pred := binary.BigEndian.Uint64([]byte(attr[:8])), attr[8:]
return NamespaceAttr(ns, pred), nil
}

func NamespaceToBytes(ns uint64) []byte {
buf := make([]byte, 8)
binary.BigEndian.PutUint64(buf, ns)
Expand Down