Skip to content

Commit

Permalink
Store namespace in predicate as a hex separated by a hyphen to preven…
Browse files Browse the repository at this point in the history
…t json marshal issues (#8601)

We used to store predicate as <namespace>|<attribute> (pipe | signifies
concatenation). We store this as a string. <namespace> is 8 bytes
uint64, which when marshaled to JSON bytes mess up the predicate. This
is because for the namespace greater than 127, the UTF-8 encoding might
take up several bytes (also if the mapping does not exist, then it
replaces it with some other rune). This affects three identified places
in Dgraph:

- Live loader using guardian of galaxy
- Backup and List Backup
- Http clients and Ratel
- Schema and predicate

Fix:
Fix is to have a UTF-8 string when dealing with JSON. A better idea is
to use UTF-8 string even for internal operations. Only when we
read/write to badger we convert it into the format of the byte.
New Format: <namespace>-<attribute> (- is the hyphen literal)
<namespace> is a string "81" in hex

We also update the manifest version after update. This diff takes care
that older backups are still compatible and can be used to restore.

Contains: 
#7838
#7828
#7825
#7815
#7810
  • Loading branch information
harshil-goel authored and all-seeing-code committed Feb 8, 2023
1 parent b70d235 commit 7443a10
Show file tree
Hide file tree
Showing 27 changed files with 312 additions and 207 deletions.
2 changes: 1 addition & 1 deletion dgraph/cmd/alpha/http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -428,7 +428,7 @@ func TestTransactionBasic(t *testing.T) {
require.Equal(t, 2, len(mr.preds))
var parsedPreds []string
for _, pred := range mr.preds {
p := strings.Split(pred, "-")[1]
p := strings.SplitN(pred, "-", 2)[1]
parsedPreds = append(parsedPreds, x.ParseAttr(p))
}
sort.Strings(parsedPreds)
Expand Down
2 changes: 1 addition & 1 deletion dgraph/cmd/alpha/upsert_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ type QueryResult struct {

func splitPreds(ps []string) []string {
for i, p := range ps {
ps[i] = x.ParseAttr(strings.Split(p, "-")[1])
ps[i] = x.ParseAttr(strings.SplitN(p, "-", 2)[1])
}

return ps
Expand Down
4 changes: 2 additions & 2 deletions dgraph/cmd/zero/oracle.go
Original file line number Diff line number Diff line change
Expand Up @@ -365,15 +365,15 @@ func (s *Server) commit(ctx context.Context, src *api.TxnContext) error {
checkPreds := func() error {
// Check if any of these tablets is being moved. If so, abort the transaction.
for _, pkey := range src.Preds {
splits := strings.Split(pkey, "-")
splits := strings.SplitN(pkey, "-", 2)
if len(splits) < 2 {
return errors.Errorf("Unable to find group id in %s", pkey)
}
gid, err := strconv.Atoi(splits[0])
if err != nil {
return errors.Wrapf(err, "unable to parse group id from %s", pkey)
}
pred := strings.Join(splits[1:], "-")
pred := splits[1]
tablet := s.ServingTablet(pred)
if tablet == nil {
return errors.Errorf("Tablet for %s is nil", pred)
Expand Down
11 changes: 1 addition & 10 deletions edgraph/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -1110,16 +1110,7 @@ func filterTablets(ctx context.Context, ms *pb.MembershipState) error {
return errors.Errorf("Namespace not found in JWT.")
}
if namespace == x.GalaxyNamespace {
// For galaxy namespace, we don't want to filter out the predicates. We only format the
// namespace to human readable form.
for _, group := range ms.Groups {
tablets := make(map[string]*pb.Tablet)
for tabletName, tablet := range group.Tablets {
tablet.Predicate = x.FormatNsAttr(tablet.Predicate)
tablets[x.FormatNsAttr(tabletName)] = tablet
}
group.Tablets = tablets
}
// For galaxy namespace, we don't want to filter out the predicates.
return nil
}
for _, group := range ms.GetGroups() {
Expand Down
13 changes: 6 additions & 7 deletions graphql/admin/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,13 +47,13 @@ func resolveState(ctx context.Context, q schema.Query) *resolve.Resolved {
u := jsonpb.Unmarshaler{}
var ms pb.MembershipState
err = u.Unmarshal(bytes.NewReader(resp.GetJson()), &ms)

if err != nil {
return resolve.EmptyResult(q, err)
}

// map to graphql response structure
state := convertToGraphQLResp(ms)
ns, _ := x.ExtractNamespace(ctx)
// map to graphql response structure. Only guardian of galaxy can list the namespaces.
state := convertToGraphQLResp(ms, ns == x.GalaxyNamespace)
b, err := json.Marshal(state)
if err != nil {
return resolve.EmptyResult(q, err)
Expand All @@ -77,7 +77,7 @@ func resolveState(ctx context.Context, q schema.Query) *resolve.Resolved {
// values and not the keys. For pb.MembershipState.Group, the keys are the group IDs
// and pb.Group didn't contain this ID, so we are creating a custom clusterGroup type,
// which is same as pb.Group and also contains the ID for the group.
func convertToGraphQLResp(ms pb.MembershipState) membershipState {
func convertToGraphQLResp(ms pb.MembershipState, listNs bool) membershipState {
var state membershipState

// namespaces stores set of namespaces
Expand All @@ -92,9 +92,8 @@ func convertToGraphQLResp(ms pb.MembershipState) membershipState {
var tablets = make([]*pb.Tablet, 0, len(v.Tablets))
for name, v1 := range v.Tablets {
tablets = append(tablets, v1)
val, err := x.ExtractNamespaceFromPredicate(name)
if err == nil {
namespaces[val] = struct{}{}
if listNs {
namespaces[x.ParseNamespace(name)] = struct{}{}
}
}
state.Groups = append(state.Groups, clusterGroup{
Expand Down
18 changes: 7 additions & 11 deletions posting/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -592,16 +592,15 @@ func (r *rebuilder) Run(ctx context.Context) error {

glog.V(1).Infof(
"Rebuilding index for predicate %s: Starting process. StartTs=%d. Prefix=\n%s\n",
x.FormatNsAttr(r.attr), r.startTs, hex.Dump(r.prefix))
r.attr, r.startTs, hex.Dump(r.prefix))

// Counter is used here to ensure that all keys are committed at different timestamp.
// We set it to 1 in case there are no keys found and NewStreamAt is called with ts=0.
var counter uint64 = 1

tmpWriter := tmpDB.NewManagedWriteBatch()
stream := pstore.NewStreamAt(r.startTs)
stream.LogPrefix = fmt.Sprintf("Rebuilding index for predicate %s (1/2):",
x.FormatNsAttr(r.attr))
stream.LogPrefix = fmt.Sprintf("Rebuilding index for predicate %s (1/2):", r.attr)
stream.Prefix = r.prefix
stream.KeyToList = func(key []byte, itr *badger.Iterator) (*bpb.KVList, error) {
// We should return quickly if the context is no longer valid.
Expand Down Expand Up @@ -663,21 +662,19 @@ func (r *rebuilder) Run(ctx context.Context) error {
return err
}
glog.V(1).Infof("Rebuilding index for predicate %s: building temp index took: %v\n",
x.FormatNsAttr(r.attr), time.Since(start))
r.attr, time.Since(start))

// Now we write all the created posting lists to disk.
glog.V(1).Infof("Rebuilding index for predicate %s: writing index to badger",
x.FormatNsAttr(r.attr))
glog.V(1).Infof("Rebuilding index for predicate %s: writing index to badger", r.attr)
start = time.Now()
defer func() {
glog.V(1).Infof("Rebuilding index for predicate %s: writing index took: %v\n",
x.FormatNsAttr(r.attr), time.Since(start))
r.attr, time.Since(start))
}()

writer := pstore.NewManagedWriteBatch()
tmpStream := tmpDB.NewStreamAt(counter)
tmpStream.LogPrefix = fmt.Sprintf("Rebuilding index for predicate %s (2/2):",
x.FormatNsAttr(r.attr))
tmpStream.LogPrefix = fmt.Sprintf("Rebuilding index for predicate %s (2/2):", r.attr)
tmpStream.KeyToList = func(key []byte, itr *badger.Iterator) (*bpb.KVList, error) {
l, err := ReadPostingList(key, itr)
if err != nil {
Expand Down Expand Up @@ -720,8 +717,7 @@ func (r *rebuilder) Run(ctx context.Context) error {
if err := tmpStream.Orchestrate(ctx); err != nil {
return err
}
glog.V(1).Infof("Rebuilding index for predicate %s: Flushing all writes.\n",
x.FormatNsAttr(r.attr))
glog.V(1).Infof("Rebuilding index for predicate %s: Flushing all writes.\n", r.attr)
return writer.Flush()
}

Expand Down
16 changes: 8 additions & 8 deletions posting/list_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -559,7 +559,7 @@ func TestAddMutation_mrjn2(t *testing.T) {
}

func TestAddMutation_gru(t *testing.T) {
key := x.DataKey("question.tag", 0x01)
key := x.DataKey(x.GalaxyAttr("question.tag"), 0x01)
ol, err := getNew(key, ps, math.MaxUint64)
require.NoError(t, err)

Expand Down Expand Up @@ -592,7 +592,7 @@ func TestAddMutation_gru(t *testing.T) {
}

func TestAddMutation_gru2(t *testing.T) {
key := x.DataKey("question.tag", 0x100)
key := x.DataKey(x.GalaxyAttr("question.tag"), 0x100)
ol, err := getNew(key, ps, math.MaxUint64)
require.NoError(t, err)

Expand Down Expand Up @@ -639,7 +639,7 @@ func TestAddMutation_gru2(t *testing.T) {
func TestAddAndDelMutation(t *testing.T) {
// Ensure each test uses unique key since we don't clear the postings
// after each test
key := x.DataKey("dummy_key", 0x927)
key := x.DataKey(x.GalaxyAttr("dummy_key"), 0x927)
ol, err := getNew(key, ps, math.MaxUint64)
require.NoError(t, err)

Expand Down Expand Up @@ -878,7 +878,7 @@ func createMultiPartList(t *testing.T, size int, addFacet bool) (*List, int) {
defer setMaxListSize(maxListSize)
maxListSize = 5000

key := x.DataKey(uuid.New().String(), 1331)
key := x.DataKey(x.GalaxyAttr(uuid.New().String()), 1331)
ol, err := getNew(key, ps, math.MaxUint64)
require.NoError(t, err)
commits := 0
Expand Down Expand Up @@ -926,7 +926,7 @@ func createAndDeleteMultiPartList(t *testing.T, size int) (*List, int) {
defer setMaxListSize(maxListSize)
maxListSize = 10000

key := x.DataKey(uuid.New().String(), 1331)
key := x.DataKey(x.GalaxyAttr(uuid.New().String()), 1331)
ol, err := getNew(key, ps, math.MaxUint64)
require.NoError(t, err)
commits := 0
Expand Down Expand Up @@ -1087,7 +1087,7 @@ func TestBinSplit(t *testing.T) {
defer func() {
maxListSize = originalListSize
}()
key := x.DataKey(uuid.New().String(), 1331)
key := x.DataKey(x.GalaxyAttr(uuid.New().String()), 1331)
ol, err := getNew(key, ps, math.MaxUint64)
require.NoError(t, err)
for i := 1; i <= size; i++ {
Expand Down Expand Up @@ -1268,7 +1268,7 @@ func TestMultiPartListDeleteAndAdd(t *testing.T) {
maxListSize = 5000

// Add entries to the maps.
key := x.DataKey(uuid.New().String(), 1331)
key := x.DataKey(x.GalaxyAttr(uuid.New().String()), 1331)
ol, err := getNew(key, ps, math.MaxUint64)
require.NoError(t, err)
for i := 1; i <= size; i++ {
Expand Down Expand Up @@ -1407,7 +1407,7 @@ func TestRecursiveSplits(t *testing.T) {

// Create a list that should be split recursively.
size := int(1e5)
key := x.DataKey(uuid.New().String(), 1331)
key := x.DataKey(x.GalaxyAttr(uuid.New().String()), 1331)
ol, err := getNew(key, ps, math.MaxUint64)
require.NoError(t, err)
commits := 0
Expand Down
20 changes: 14 additions & 6 deletions systest/backup/filesystem/backup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ var (
alphaBackupDir = "/data/backups"
oldBackupDir1 = "/data/to_restore/1"
oldBackupDir2 = "/data/to_restore/2"
oldBackupDir3 = "/data/to_restore/3"
alphaContainers = []string{
"alpha1",
"alpha2",
Expand Down Expand Up @@ -92,7 +93,8 @@ func TestBackupOfOldRestore(t *testing.T) {
common.DirSetup(t)
common.CopyOldBackupDir(t)

conn, err := grpc.Dial(testutil.SockAddr, grpc.WithTransportCredentials(credentials.NewTLS(testutil.GetAlphaClientConfig(t))))
conn, err := grpc.Dial(testutil.SockAddr,
grpc.WithTransportCredentials(credentials.NewTLS(testutil.GetAlphaClientConfig(t))))
require.NoError(t, err)
dg := dgo.NewDgraphClient(api.NewDgraphClient(conn))
require.NoError(t, err)
Expand All @@ -105,7 +107,8 @@ func TestBackupOfOldRestore(t *testing.T) {
sendRestoreRequest(t, oldBackupDir1)
testutil.WaitForRestore(t, dg, testutil.SockAddrHttp)

resp, err := dg.NewTxn().Query(context.Background(), `{ authors(func: has(Author.name)) { count(uid) } }`)
q := `{ authors(func: has(Author.name)) { count(uid) } }`
resp, err := dg.NewTxn().Query(context.Background(), q)
require.NoError(t, err)
require.JSONEq(t, "{\"authors\":[{\"count\":1}]}", string(resp.Json))

Expand All @@ -117,7 +120,7 @@ func TestBackupOfOldRestore(t *testing.T) {
sendRestoreRequest(t, alphaBackupDir)
testutil.WaitForRestore(t, dg, testutil.SockAddrHttp)

resp, err = dg.NewTxn().Query(context.Background(), `{ authors(func: has(Author.name)) { count(uid) } }`)
resp, err = dg.NewTxn().Query(context.Background(), q)
require.NoError(t, err)
require.JSONEq(t, "{\"authors\":[{\"count\":1}]}", string(resp.Json))
}
Expand Down Expand Up @@ -151,16 +154,19 @@ func TestRestoreOfOldBackup(t *testing.T) {
require.NoError(t, err)
require.JSONEq(t, r, string(resp.Json))
}

queryAndCheck("p1", 0)
queryAndCheck("p2", 2)
queryAndCheck("p3", 0)
queryAndCheck("p4", 2)
}
t.Run("backup of 20.11", func(t *testing.T) { test(oldBackupDir2) })
t.Run("backup of 21.03", func(t *testing.T) { test(oldBackupDir3) })
}

func TestBackupFilesystem(t *testing.T) {
conn, err := grpc.Dial(testutil.SockAddr, grpc.WithTransportCredentials(credentials.NewTLS(testutil.GetAlphaClientConfig(t))))
conn, err := grpc.Dial(testutil.SockAddr,
grpc.WithTransportCredentials(credentials.NewTLS(testutil.GetAlphaClientConfig(t))))
require.NoError(t, err)
dg := dgo.NewDgraphClient(api.NewDgraphClient(conn))

Expand Down Expand Up @@ -432,15 +438,17 @@ func runBackupInternal(t *testing.T, forceFull bool, numExpectedFiles,

var data interface{}
require.NoError(t, json.NewDecoder(resp.Body).Decode(&data))
require.Equal(t, "Success", testutil.JsonGet(data, "data", "backup", "response", "code").(string))
require.Equal(t, "Success",
testutil.JsonGet(data, "data", "backup", "response", "code").(string))
taskId := testutil.JsonGet(data, "data", "backup", "taskId").(string)
testutil.WaitForTask(t, taskId, true, testutil.SockAddrHttp)

// Verify that the right amount of files and directories were created.
common.CopyToLocalFs(t)

files := x.WalkPathFunc(copyBackupDir, func(path string, isdir bool) bool {
return !isdir && strings.HasSuffix(path, ".backup") && strings.HasPrefix(path, "data/backups_copy/dgraph.")
return !isdir && strings.HasSuffix(path, ".backup") &&
strings.HasPrefix(path, "data/backups_copy/dgraph.")
})
require.Equal(t, numExpectedFiles, len(files))

Expand Down
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
1 change: 1 addition & 0 deletions systest/backup/filesystem/data/to_restore/3/manifest.json
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{"Manifests":[{"type":"full","since":0,"read_ts":9,"groups":{"1":["\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000p1","\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000dgraph.graphql.p_query","\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000dgraph.graphql.xid","\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000dgraph.graphql.schema","\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000dgraph.drop.op","\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000dgraph.type","\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000p3","\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000p2"],"2":[],"3":[]},"backup_id":"quirky_kapitsa4","backup_num":1,"version":2103,"path":"dgraph.20210517.095641.969","encrypted":false,"drop_operations":null,"compression":"snappy"},{"type":"incremental","since":0,"read_ts":21,"groups":{"1":["\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000dgraph.drop.op","\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000p1","\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000dgraph.graphql.schema","\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000dgraph.graphql.p_query","\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000p3","\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000dgraph.graphql.xid","\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000p4","\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000p2","\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000dgraph.type"],"2":[],"3":[]},"backup_id":"quirky_kapitsa4","backup_num":2,"version":2103,"path":"dgraph.20210517.095716.130","encrypted":false,"drop_operations":[{"drop_op":1}],"compression":"snappy"},{"type":"incremental","since":0,"read_ts":26,"groups":{"1":["\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000p4","\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000p2","\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000dgraph.type","\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000p1","\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000dgraph.graphql.schema","\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000dgraph.graphql.p_query","\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000p3","\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000dgraph.drop.op","\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000dgraph.graphql.xid"],"2":[],"3":[]},"backup_id":"quirky_kapitsa4","backup_num":3,"version":2103,"path":"dgraph.20210517.095726.320","encrypted":false,"drop_operations":[{"drop_op":2,"drop_value":"\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000p3"}],"compression":"snappy"}]}
4 changes: 0 additions & 4 deletions worker/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,10 +89,6 @@ func (m *Manifest) getPredsInGroup(gid uint32) predicateSet {

predSet := make(predicateSet)
for _, pred := range preds {
if m.Version == 0 {
// For older versions, preds set will contain attribute without namespace.
pred = x.NamespaceAttr(x.GalaxyNamespace, pred)
}
predSet[pred] = struct{}{}
}
return predSet
Expand Down
8 changes: 4 additions & 4 deletions worker/backup_ee.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ func ProcessBackupRequest(ctx context.Context, req *pb.BackupRequest) error {
m := Manifest{
ReadTs: req.ReadTs,
Groups: predMap,
Version: x.DgraphVersion,
Version: x.ManifestVersion,
DropOperations: dropOperations,
Path: dir,
Compression: "snappy",
Expand Down Expand Up @@ -555,14 +555,14 @@ func (pr *BackupProcessor) CompleteBackup(ctx context.Context, m *Manifest) erro
return err
}

manifest, err := GetManifest(handler, uri)
manifest, err := GetManifestNoUpgrade(handler, uri)
if err != nil {
return err
}
manifest.Manifests = append(manifest.Manifests, m)

if err := createManifest(handler, uri, manifest); err != nil {
return errors.Wrap(err, "Complete backup failed")
if err := CreateManifest(handler, uri, manifest); err != nil {
return errors.Wrap(err, "complete backup failed")
}
glog.Infof("Backup completed OK.")
return nil
Expand Down
Loading

0 comments on commit 7443a10

Please sign in to comment.