diff --git a/ee/updatemanifest/run.go b/ee/updatemanifest/run.go index 7535ec06c5d..9c3035ff589 100644 --- a/ee/updatemanifest/run.go +++ b/ee/updatemanifest/run.go @@ -19,11 +19,9 @@ package updatemanifest import ( - "encoding/binary" "log" "net/url" "os" - "strings" "github.com/dgraph-io/dgraph/ee" "github.com/dgraph-io/dgraph/protos/pb" @@ -65,18 +63,6 @@ func init() { ee.RegisterEncFlag(flag) } -// Invalid bytes are replaced with the Unicode replacement rune. -// See https://golang.org/pkg/encoding/json/#Marshal -const replacementRune = rune('\ufffd') - -func parseNsAttr(attr string) (uint64, string, error) { - if strings.ContainsRune(attr, replacementRune) { - return 0, "", errors.Errorf("replacement rune found while parsing attr: %s (%+v)", - attr, []byte(attr)) - } - return binary.BigEndian.Uint64([]byte(attr[:8])), attr[8:], nil -} - func run() error { keys, err := ee.GetKeys(UpdateManifest.Conf) if err != nil { @@ -100,25 +86,25 @@ func run() error { for gid, preds := range manifest.Groups { parsedPreds := preds[:0] for _, pred := range preds { - ns, attr, err := parseNsAttr(pred) + attr, err := x.AttrFrom2103(pred) if err != nil { parsedPreds = append(parsedPreds, pred) logger.Printf("Unable to parse the pred: %v", pred) continue } - parsedPreds = append(parsedPreds, x.NamespaceAttr(ns, attr)) + parsedPreds = append(parsedPreds, attr) } manifest.Groups[gid] = parsedPreds } for _, op := range manifest.DropOperations { if op.DropOp == pb.DropOperation_ATTR { - ns, attr, err := parseNsAttr(op.DropValue) + attr, err := x.AttrFrom2103(op.DropValue) if err != nil { logger.Printf("Unable to parse the drop operation %+v pred: %v", op, []byte(op.DropValue)) continue } - op.DropValue = x.NamespaceAttr(ns, attr) + op.DropValue = attr } } // As we have made the required changes to the manifest, we should update the version too. diff --git a/worker/backup_manifest.go b/worker/backup_manifest.go index 36d79ac45e5..2a6eb8ff016 100644 --- a/worker/backup_manifest.go +++ b/worker/backup_manifest.go @@ -13,7 +13,6 @@ package worker import ( - "encoding/binary" "encoding/json" "fmt" "net/url" @@ -160,18 +159,6 @@ func getConsolidatedManifest(h UriHandler, uri *url.URL) (*MasterManifest, error return &MasterManifest{Manifests: mlist}, nil } -// Invalid bytes are replaced with the Unicode replacement rune. -// See https://golang.org/pkg/encoding/json/#Marshal -const replacementRune = rune('\ufffd') - -func parseNsAttr(attr string) (uint64, string, error) { - if strings.ContainsRune(attr, replacementRune) { - return 0, "", errors.Errorf("replacement rune found while parsing attr: %s (%+v)", - attr, []byte(attr)) - } - return binary.BigEndian.Uint64([]byte(attr[:8])), attr[8:], nil -} - // upgradeManifest updates the in-memory manifest from various versions to the latest version. // If the manifest version is 0 (dgraph version < v21.03), attach namespace to the predicates and // the drop data/attr operation. @@ -203,23 +190,23 @@ func upgradeManifest(m *Manifest) error { for gid, preds := range m.Groups { parsedPreds := preds[:0] for _, pred := range preds { - ns, attr, err := parseNsAttr(pred) + attr, err := x.AttrFrom2103(pred) if err != nil { return errors.Errorf("while parsing predicate got: %q", err) } - parsedPreds = append(parsedPreds, x.NamespaceAttr(ns, attr)) + parsedPreds = append(parsedPreds, attr) } m.Groups[gid] = parsedPreds } for _, op := range m.DropOperations { // We have a cluster wide drop data in v21.03. if op.DropOp == pb.DropOperation_ATTR { - ns, attr, err := parseNsAttr(op.DropValue) + attr, err := x.AttrFrom2103(op.DropValue) if err != nil { return errors.Errorf("while parsing the drop operation %+v got: %q", op, err) } - op.DropValue = x.NamespaceAttr(ns, attr) + op.DropValue = attr } } case 2105: diff --git a/worker/restore_map.go b/worker/restore_map.go index 84a9519bfe8..04f37cd635c 100644 --- a/worker/restore_map.go +++ b/worker/restore_map.go @@ -107,7 +107,7 @@ type loadBackupInput struct { restoreTs uint64 preds predicateSet dropNs map[uint64]struct{} - isOld bool + version int keepSchema bool compression string } @@ -391,11 +391,57 @@ func (m *mapper) processReqCh(ctx context.Context) error { kv.Value, err = update.Marshal() return err } - if in.isOld && parsedKey.IsType() { - if err := appendNamespace(); err != nil { - glog.Errorf("Unable to (un)marshal type: %+v. Err=%v\n", parsedKey, err) + changeFormat := func() error { + // In the backup taken on 2103, we have the schemaUpdate.Predicate in format + // |. That had issues with JSON marshalling. + // So, we switched over to the format -. + var err error + if parsedKey.IsSchema() { + var update pb.SchemaUpdate + if err := update.Unmarshal(kv.Value); err != nil { + return err + } + if update.Predicate, err = x.AttrFrom2103(update.Predicate); err != nil { + return err + } + kv.Value, err = update.Marshal() + return err + } + if parsedKey.IsType() { + var update pb.TypeUpdate + if err := update.Unmarshal(kv.Value); err != nil { + return err + } + if update.TypeName, err = x.AttrFrom2103(update.TypeName); err != nil { + return err + } + for _, sch := range update.Fields { + if sch.Predicate, err = x.AttrFrom2103(sch.Predicate); err != nil { + return err + } + } + kv.Value, err = update.Marshal() + return err + } + return nil + } + // We changed the format of predicate in 2103 and 2105. SchemaUpdate and TypeUpdate have + // predicate stored within them, so they also need to be updated accordingly. + switch in.version { + case 0: + if parsedKey.IsType() { + if err := appendNamespace(); err != nil { + glog.Errorf("Unable to (un)marshal type: %+v. Err=%v\n", parsedKey, err) + return nil + } + } + case 2103: + if err := changeFormat(); err != nil { + glog.Errorf("Unable to change format for: %+v Err=%+v", parsedKey, err) return nil } + default: + // for manifest versions >= 2015, do nothing. } // Reset the StreamId to prevent ordering issues while writing to stream writer. kv.StreamId = 0 @@ -657,7 +703,7 @@ func RunMapper(req *pb.RestoreRequest, mapDir string) (*mapResult, error) { in := &loadBackupInput{ preds: predSet, dropNs: localDropNs, - isOld: manifest.Version == 0, + version: manifest.Version, restoreTs: req.RestoreTs, // Only map the schema keys corresponding to the latest backup. keepSchema: i == 0, diff --git a/x/keys.go b/x/keys.go index 7260f78989f..14774d1f5c8 100644 --- a/x/keys.go +++ b/x/keys.go @@ -61,6 +61,19 @@ const ( NsSeparator = "-" ) +// Invalid bytes are replaced with the Unicode replacement rune. +// See https://golang.org/pkg/encoding/json/#Marshal +const replacementRune = rune('\ufffd') + +func AttrFrom2103(attr string) (string, error) { + if strings.ContainsRune(attr, replacementRune) { + return "", errors.Errorf("replacement rune found while parsing attr: %s (%+v)", + attr, []byte(attr)) + } + ns, pred := binary.BigEndian.Uint64([]byte(attr[:8])), attr[8:] + return NamespaceAttr(ns, pred), nil +} + func NamespaceToBytes(ns uint64) []byte { buf := make([]byte, 8) binary.BigEndian.PutUint64(buf, ns)