From 057b98943b526f57b47c188cd962b7840b15ed72 Mon Sep 17 00:00:00 2001 From: NamanJain8 Date: Wed, 12 May 2021 21:30:36 +0530 Subject: [PATCH 01/10] fix json marshal unmarshal for namespace > 127 --- dgraph/cmd/live/run.go | 2 +- dgraph/cmd/zero/oracle.go | 8 +++-- edgraph/server.go | 8 +++++ ee/backup/run.go | 2 +- graphql/e2e/custom_logic/cmd/main.go | 4 +-- posting/lists.go | 2 +- worker/backup_ee.go | 3 +- worker/backup_manifest.go | 40 ++++++++++++++++++++++- x/keys.go | 10 ++++++ x/x_test.go | 48 ++++++++++++++++++++++++++++ 10 files changed, 118 insertions(+), 9 deletions(-) diff --git a/dgraph/cmd/live/run.go b/dgraph/cmd/live/run.go index 8bb1d01c165..efcdf9f6d1f 100644 --- a/dgraph/cmd/live/run.go +++ b/dgraph/cmd/live/run.go @@ -682,7 +682,7 @@ func (l *loader) populateNamespaces(ctx context.Context, dc *dgo.Dgraph, singleN } for _, pred := range sch.Predicates { - ns := x.ParseNamespace(pred.Predicate) + ns, _ := x.ParseNsAttrFromJson(pred.Predicate) l.namespaces[ns] = struct{}{} } return nil diff --git a/dgraph/cmd/zero/oracle.go b/dgraph/cmd/zero/oracle.go index 4df2de2af37..17e2811be32 100644 --- a/dgraph/cmd/zero/oracle.go +++ b/dgraph/cmd/zero/oracle.go @@ -370,14 +370,18 @@ func (s *Server) commit(ctx context.Context, src *api.TxnContext) error { // Check if any of these tablets is being moved. If so, abort the transaction. for _, pkey := range src.Preds { splits := strings.Split(pkey, "-") - if len(splits) < 2 { + if len(splits) < 3 { return errors.Errorf("Unable to find group id in %s", pkey) } gid, err := strconv.Atoi(splits[0]) if err != nil { return errors.Wrapf(err, "unable to parse group id from %s", pkey) } - pred := strings.Join(splits[1:], "-") + ns, err := strconv.ParseUint(splits[1], 0, 64) + if err != nil { + return errors.Wrapf(err, "unable to parse namespace from %s", pkey) + } + pred := string(x.NamespaceToBytes(ns)) + strings.Join(splits[2:], "-") tablet := s.ServingTablet(pred) if tablet == nil { return errors.Errorf("Tablet for %s is nil", pred) diff --git a/edgraph/server.go b/edgraph/server.go index e2d33c4e3b4..cb1104a71a0 100644 --- a/edgraph/server.go +++ b/edgraph/server.go @@ -1374,6 +1374,14 @@ func processQuery(ctx context.Context, qc *queryContext) (*api.Response, error) return er.Types[i].TypeName < er.Types[j].TypeName }) + if x.IsGalaxyOperation(ctx) { + // Schema contains ns|attr in the predicates. Format it, so that it is parsed by json + // correctly. + for _, node := range er.SchemaNode { + node.Predicate = x.FormatNsAttr(node.Predicate) + } + } + respMap := make(map[string]interface{}) if len(er.SchemaNode) > 0 { respMap["schema"] = er.SchemaNode diff --git a/ee/backup/run.go b/ee/backup/run.go index d42f0168b35..ebf032b9a0f 100644 --- a/ee/backup/run.go +++ b/ee/backup/run.go @@ -257,7 +257,7 @@ func runExportBackup() error { glog.Infof("Created temporary map directory: %s\n", mapDir) // TODO: Can probably make this procesing concurrent. - for gid, _ := range latestManifest.Groups { + for gid := range latestManifest.Groups { glog.Infof("Exporting group: %d", gid) req := &pb.RestoreRequest{ GroupId: gid, diff --git a/graphql/e2e/custom_logic/cmd/main.go b/graphql/e2e/custom_logic/cmd/main.go index a639235a661..bb197a613be 100644 --- a/graphql/e2e/custom_logic/cmd/main.go +++ b/graphql/e2e/custom_logic/cmd/main.go @@ -1264,10 +1264,10 @@ func gqlCarsWithErrorHandler(w http.ResponseWriter, r *http.Request) { "cars": output, }, "errors": []map[string]interface{}{ - map[string]interface{}{ + { "message": "error-1 from cars", }, - map[string]interface{}{ + { "message": "error-2 from cars", }, }, diff --git a/posting/lists.go b/posting/lists.go index 76315788cf2..1b90e6fca11 100644 --- a/posting/lists.go +++ b/posting/lists.go @@ -240,7 +240,7 @@ func (lc *LocalCache) fillPreds(ctx *api.TxnContext, gid uint32) { } // Also send the group id that the predicate was being served by. This is useful when // checking if Zero should allow a commit during a predicate move. - predKey := fmt.Sprintf("%d-%s", gid, pk.Attr) + predKey := fmt.Sprintf("%d-%s", gid, x.FormatNsAttr(pk.Attr)) ctx.Preds = append(ctx.Preds, predKey) } ctx.Preds = x.Unique(ctx.Preds) diff --git a/worker/backup_ee.go b/worker/backup_ee.go index 04bd725baf2..bf313aa463f 100644 --- a/worker/backup_ee.go +++ b/worker/backup_ee.go @@ -189,7 +189,8 @@ func ProcessBackupRequest(ctx context.Context, req *pb.BackupRequest) error { } } - glog.Infof("Created backup request: read_ts:%d since_ts:%d unix_ts:\"%s\" destination:\"%s\" . Groups=%v\n", req.ReadTs, req.SinceTs, req.UnixTs, req.Destination, groups) + glog.Infof("Created backup request: read_ts:%d since_ts:%d unix_ts:\"%s\" destination:\"%s\" ."+ + " Groups=%v\n", req.ReadTs, req.SinceTs, req.UnixTs, req.Destination, groups) ctx, cancel := context.WithCancel(ctx) defer cancel() diff --git a/worker/backup_manifest.go b/worker/backup_manifest.go index e098e1bae46..1bdc1a9e288 100644 --- a/worker/backup_manifest.go +++ b/worker/backup_manifest.go @@ -99,7 +99,7 @@ func getFilteredManifests(h UriHandler, manifests []*Manifest, var validManifests []*Manifest for _, m := range manifests { missingFiles := false - for g, _ := range m.Groups { + for g := range m.Groups { path := filepath.Join(m.Path, backupName(m.ValidReadTs(), g)) if !h.FileExists(path) { missingFiles = true @@ -172,6 +172,7 @@ func readManifest(h UriHandler, path string) (*Manifest, error) { if err := json.Unmarshal(b, &m); err != nil { return &m, errors.Wrap(err, "readManifest failed to unmarshal: ") } + convertJsonToMemory(&m) return &m, nil } @@ -195,9 +196,42 @@ func readMasterManifest(h UriHandler, path string) (*MasterManifest, error) { if err := json.Unmarshal(b, &m); err != nil { return &m, errors.Wrap(err, "readMasterManifest failed to unmarshal: ") } + for _, manifest := range m.Manifests { + convertJsonToMemory(manifest) + } return &m, nil } +// In the manifest, we store the predicates in the format: ns-attr +// But in-memory, we have ns|attr (where | denotes concatenation). +func convertJsonToMemory(m *Manifest) { + if m.Version == 0 { + return + } + for gid, preds := range m.Groups { + parsedPreds := preds[:0] + for _, pred := range preds { + ns, attr := x.ParseNsAttrFromJson(pred) + parsedPreds = append(parsedPreds, string(x.NamespaceToBytes(ns))+attr) + } + m.Groups[gid] = parsedPreds + } +} + +func convertMemoryToJson(m *Manifest) { + if m.Version == 0 { + // TODO(Naman): Check if m.Version can be zero while creating a manifest/masterManifest. + return + } + for gid, preds := range m.Groups { + parsedPreds := preds[:0] + for _, pred := range preds { + parsedPreds = append(parsedPreds, x.FormatNsAttr(pred)) + } + m.Groups[gid] = parsedPreds + } +} + func GetManifest(h UriHandler, uri *url.URL) (*MasterManifest, error) { if !h.DirExists("") { return &MasterManifest{}, errors.Errorf("getManifest: The uri path: %q doesn't exists", @@ -218,6 +252,10 @@ func createManifest(h UriHandler, uri *url.URL, manifest *MasterManifest) error } } + for _, m := range manifest.Manifests { + convertMemoryToJson(m) + } + w, err := h.CreateFile(tmpManifest) if err != nil { return errors.Wrap(err, "createManifest failed to create tmp path") diff --git a/x/keys.go b/x/keys.go index 418cba0c492..2b8f6c5d2a6 100644 --- a/x/keys.go +++ b/x/keys.go @@ -101,6 +101,16 @@ func ParseNamespace(attr string) uint64 { return binary.BigEndian.Uint64([]byte(attr[:8])) } +// ParseNamespaceFromJson returns the namespace from the given value. +// Format: ns-attr +func ParseNsAttrFromJson(attr string) (uint64, string) { + splits := strings.Split(attr, "-") + AssertTrue(len(splits) >= 2) + ns, err := strconv.ParseUint(splits[0], 0, 64) + Check(err) + return ns, strings.Join(splits[1:], "-") +} + func ParseAttrList(attrs []string) []string { var resp []string for _, attr := range attrs { diff --git a/x/x_test.go b/x/x_test.go index e910f4f2db6..135b6529159 100644 --- a/x/x_test.go +++ b/x/x_test.go @@ -17,6 +17,7 @@ package x import ( + "encoding/json" "fmt" "math" "testing" @@ -197,3 +198,50 @@ func TestToHex(t *testing.T) { require.Equal(t, []byte(`"0xffffffffffffffff"`), ToHex(math.MaxUint64, false)) require.Equal(t, []byte(`<0xffffffffffffffff>`), ToHex(math.MaxUint64, true)) } + +// func TestSchemaUnmarshal(t *testing.T) { +// type predicate struct { +// Predicate string `json:"predicate,omitempty"` +// } +// type schema struct { +// Predicates []*predicate `json:"schema,omitempty"` +// } + +// var nodes []*pb.SchemaNode +// nodes = append(nodes, &pb.SchemaNode{Predicate: NamespaceAttr(129, "name")}) +// for _, node := range nodes { +// node.Predicate = SchemaEncode(node.Predicate) +// } + +// respMap := make(map[string]interface{}) +// respMap["schema"] = nodes +// resp := &dgoapi.Response{} +// var err error +// resp.Json, err = json.Marshal(respMap) +// require.NoError(t, err) +// t.Log(string(resp.Json)) + +// var sch schema +// require.NoError(t, json.Unmarshal(resp.GetJson(), &sch)) + +// for _, pred := range sch.Predicates { +// pred.Predicate = SchemaDecode(pred.Predicate) +// t.Logf("Unmarshalled %s %v", FormatNsAttr(pred.Predicate), []byte(pred.Predicate)) +// } +// } + +func TestSchemaUnmarshalSimple(t *testing.T) { + type predicate struct { + Predicate string `json:"predicate,omitempty"` + } + + p := &predicate{Predicate: NamespaceAttr(128, "name")} + b, err := json.Marshal(p) + require.NoError(t, err) + t.Log(string(b)) + + var p2 predicate + require.NoError(t, json.Unmarshal(b, &p2)) + t.Logf("Unmarshalled %s %v", FormatNsAttr(p2.Predicate), []byte(p2.Predicate)) + +} From 62931c166d33c6e1a446a0d36b59a368a535ac11 Mon Sep 17 00:00:00 2001 From: NamanJain8 Date: Wed, 12 May 2021 22:51:27 +0530 Subject: [PATCH 02/10] Revert "fix json marshal unmarshal for namespace > 127" This reverts commit 057b98943b526f57b47c188cd962b7840b15ed72. --- dgraph/cmd/live/run.go | 2 +- dgraph/cmd/zero/oracle.go | 8 ++--- edgraph/server.go | 8 ----- ee/backup/run.go | 2 +- graphql/e2e/custom_logic/cmd/main.go | 4 +-- posting/lists.go | 2 +- worker/backup_ee.go | 3 +- worker/backup_manifest.go | 40 +---------------------- x/keys.go | 10 ------ x/x_test.go | 48 ---------------------------- 10 files changed, 9 insertions(+), 118 deletions(-) diff --git a/dgraph/cmd/live/run.go b/dgraph/cmd/live/run.go index efcdf9f6d1f..8bb1d01c165 100644 --- a/dgraph/cmd/live/run.go +++ b/dgraph/cmd/live/run.go @@ -682,7 +682,7 @@ func (l *loader) populateNamespaces(ctx context.Context, dc *dgo.Dgraph, singleN } for _, pred := range sch.Predicates { - ns, _ := x.ParseNsAttrFromJson(pred.Predicate) + ns := x.ParseNamespace(pred.Predicate) l.namespaces[ns] = struct{}{} } return nil diff --git a/dgraph/cmd/zero/oracle.go b/dgraph/cmd/zero/oracle.go index 17e2811be32..4df2de2af37 100644 --- a/dgraph/cmd/zero/oracle.go +++ b/dgraph/cmd/zero/oracle.go @@ -370,18 +370,14 @@ func (s *Server) commit(ctx context.Context, src *api.TxnContext) error { // Check if any of these tablets is being moved. If so, abort the transaction. for _, pkey := range src.Preds { splits := strings.Split(pkey, "-") - if len(splits) < 3 { + if len(splits) < 2 { return errors.Errorf("Unable to find group id in %s", pkey) } gid, err := strconv.Atoi(splits[0]) if err != nil { return errors.Wrapf(err, "unable to parse group id from %s", pkey) } - ns, err := strconv.ParseUint(splits[1], 0, 64) - if err != nil { - return errors.Wrapf(err, "unable to parse namespace from %s", pkey) - } - pred := string(x.NamespaceToBytes(ns)) + strings.Join(splits[2:], "-") + pred := strings.Join(splits[1:], "-") tablet := s.ServingTablet(pred) if tablet == nil { return errors.Errorf("Tablet for %s is nil", pred) diff --git a/edgraph/server.go b/edgraph/server.go index cb1104a71a0..e2d33c4e3b4 100644 --- a/edgraph/server.go +++ b/edgraph/server.go @@ -1374,14 +1374,6 @@ func processQuery(ctx context.Context, qc *queryContext) (*api.Response, error) return er.Types[i].TypeName < er.Types[j].TypeName }) - if x.IsGalaxyOperation(ctx) { - // Schema contains ns|attr in the predicates. Format it, so that it is parsed by json - // correctly. - for _, node := range er.SchemaNode { - node.Predicate = x.FormatNsAttr(node.Predicate) - } - } - respMap := make(map[string]interface{}) if len(er.SchemaNode) > 0 { respMap["schema"] = er.SchemaNode diff --git a/ee/backup/run.go b/ee/backup/run.go index ebf032b9a0f..d42f0168b35 100644 --- a/ee/backup/run.go +++ b/ee/backup/run.go @@ -257,7 +257,7 @@ func runExportBackup() error { glog.Infof("Created temporary map directory: %s\n", mapDir) // TODO: Can probably make this procesing concurrent. - for gid := range latestManifest.Groups { + for gid, _ := range latestManifest.Groups { glog.Infof("Exporting group: %d", gid) req := &pb.RestoreRequest{ GroupId: gid, diff --git a/graphql/e2e/custom_logic/cmd/main.go b/graphql/e2e/custom_logic/cmd/main.go index bb197a613be..a639235a661 100644 --- a/graphql/e2e/custom_logic/cmd/main.go +++ b/graphql/e2e/custom_logic/cmd/main.go @@ -1264,10 +1264,10 @@ func gqlCarsWithErrorHandler(w http.ResponseWriter, r *http.Request) { "cars": output, }, "errors": []map[string]interface{}{ - { + map[string]interface{}{ "message": "error-1 from cars", }, - { + map[string]interface{}{ "message": "error-2 from cars", }, }, diff --git a/posting/lists.go b/posting/lists.go index 1b90e6fca11..76315788cf2 100644 --- a/posting/lists.go +++ b/posting/lists.go @@ -240,7 +240,7 @@ func (lc *LocalCache) fillPreds(ctx *api.TxnContext, gid uint32) { } // Also send the group id that the predicate was being served by. This is useful when // checking if Zero should allow a commit during a predicate move. - predKey := fmt.Sprintf("%d-%s", gid, x.FormatNsAttr(pk.Attr)) + predKey := fmt.Sprintf("%d-%s", gid, pk.Attr) ctx.Preds = append(ctx.Preds, predKey) } ctx.Preds = x.Unique(ctx.Preds) diff --git a/worker/backup_ee.go b/worker/backup_ee.go index bf313aa463f..04bd725baf2 100644 --- a/worker/backup_ee.go +++ b/worker/backup_ee.go @@ -189,8 +189,7 @@ func ProcessBackupRequest(ctx context.Context, req *pb.BackupRequest) error { } } - glog.Infof("Created backup request: read_ts:%d since_ts:%d unix_ts:\"%s\" destination:\"%s\" ."+ - " Groups=%v\n", req.ReadTs, req.SinceTs, req.UnixTs, req.Destination, groups) + glog.Infof("Created backup request: read_ts:%d since_ts:%d unix_ts:\"%s\" destination:\"%s\" . Groups=%v\n", req.ReadTs, req.SinceTs, req.UnixTs, req.Destination, groups) ctx, cancel := context.WithCancel(ctx) defer cancel() diff --git a/worker/backup_manifest.go b/worker/backup_manifest.go index 1bdc1a9e288..e098e1bae46 100644 --- a/worker/backup_manifest.go +++ b/worker/backup_manifest.go @@ -99,7 +99,7 @@ func getFilteredManifests(h UriHandler, manifests []*Manifest, var validManifests []*Manifest for _, m := range manifests { missingFiles := false - for g := range m.Groups { + for g, _ := range m.Groups { path := filepath.Join(m.Path, backupName(m.ValidReadTs(), g)) if !h.FileExists(path) { missingFiles = true @@ -172,7 +172,6 @@ func readManifest(h UriHandler, path string) (*Manifest, error) { if err := json.Unmarshal(b, &m); err != nil { return &m, errors.Wrap(err, "readManifest failed to unmarshal: ") } - convertJsonToMemory(&m) return &m, nil } @@ -196,42 +195,9 @@ func readMasterManifest(h UriHandler, path string) (*MasterManifest, error) { if err := json.Unmarshal(b, &m); err != nil { return &m, errors.Wrap(err, "readMasterManifest failed to unmarshal: ") } - for _, manifest := range m.Manifests { - convertJsonToMemory(manifest) - } return &m, nil } -// In the manifest, we store the predicates in the format: ns-attr -// But in-memory, we have ns|attr (where | denotes concatenation). -func convertJsonToMemory(m *Manifest) { - if m.Version == 0 { - return - } - for gid, preds := range m.Groups { - parsedPreds := preds[:0] - for _, pred := range preds { - ns, attr := x.ParseNsAttrFromJson(pred) - parsedPreds = append(parsedPreds, string(x.NamespaceToBytes(ns))+attr) - } - m.Groups[gid] = parsedPreds - } -} - -func convertMemoryToJson(m *Manifest) { - if m.Version == 0 { - // TODO(Naman): Check if m.Version can be zero while creating a manifest/masterManifest. - return - } - for gid, preds := range m.Groups { - parsedPreds := preds[:0] - for _, pred := range preds { - parsedPreds = append(parsedPreds, x.FormatNsAttr(pred)) - } - m.Groups[gid] = parsedPreds - } -} - func GetManifest(h UriHandler, uri *url.URL) (*MasterManifest, error) { if !h.DirExists("") { return &MasterManifest{}, errors.Errorf("getManifest: The uri path: %q doesn't exists", @@ -252,10 +218,6 @@ func createManifest(h UriHandler, uri *url.URL, manifest *MasterManifest) error } } - for _, m := range manifest.Manifests { - convertMemoryToJson(m) - } - w, err := h.CreateFile(tmpManifest) if err != nil { return errors.Wrap(err, "createManifest failed to create tmp path") diff --git a/x/keys.go b/x/keys.go index 2b8f6c5d2a6..418cba0c492 100644 --- a/x/keys.go +++ b/x/keys.go @@ -101,16 +101,6 @@ func ParseNamespace(attr string) uint64 { return binary.BigEndian.Uint64([]byte(attr[:8])) } -// ParseNamespaceFromJson returns the namespace from the given value. -// Format: ns-attr -func ParseNsAttrFromJson(attr string) (uint64, string) { - splits := strings.Split(attr, "-") - AssertTrue(len(splits) >= 2) - ns, err := strconv.ParseUint(splits[0], 0, 64) - Check(err) - return ns, strings.Join(splits[1:], "-") -} - func ParseAttrList(attrs []string) []string { var resp []string for _, attr := range attrs { diff --git a/x/x_test.go b/x/x_test.go index 135b6529159..e910f4f2db6 100644 --- a/x/x_test.go +++ b/x/x_test.go @@ -17,7 +17,6 @@ package x import ( - "encoding/json" "fmt" "math" "testing" @@ -198,50 +197,3 @@ func TestToHex(t *testing.T) { require.Equal(t, []byte(`"0xffffffffffffffff"`), ToHex(math.MaxUint64, false)) require.Equal(t, []byte(`<0xffffffffffffffff>`), ToHex(math.MaxUint64, true)) } - -// func TestSchemaUnmarshal(t *testing.T) { -// type predicate struct { -// Predicate string `json:"predicate,omitempty"` -// } -// type schema struct { -// Predicates []*predicate `json:"schema,omitempty"` -// } - -// var nodes []*pb.SchemaNode -// nodes = append(nodes, &pb.SchemaNode{Predicate: NamespaceAttr(129, "name")}) -// for _, node := range nodes { -// node.Predicate = SchemaEncode(node.Predicate) -// } - -// respMap := make(map[string]interface{}) -// respMap["schema"] = nodes -// resp := &dgoapi.Response{} -// var err error -// resp.Json, err = json.Marshal(respMap) -// require.NoError(t, err) -// t.Log(string(resp.Json)) - -// var sch schema -// require.NoError(t, json.Unmarshal(resp.GetJson(), &sch)) - -// for _, pred := range sch.Predicates { -// pred.Predicate = SchemaDecode(pred.Predicate) -// t.Logf("Unmarshalled %s %v", FormatNsAttr(pred.Predicate), []byte(pred.Predicate)) -// } -// } - -func TestSchemaUnmarshalSimple(t *testing.T) { - type predicate struct { - Predicate string `json:"predicate,omitempty"` - } - - p := &predicate{Predicate: NamespaceAttr(128, "name")} - b, err := json.Marshal(p) - require.NoError(t, err) - t.Log(string(b)) - - var p2 predicate - require.NoError(t, json.Unmarshal(b, &p2)) - t.Logf("Unmarshalled %s %v", FormatNsAttr(p2.Predicate), []byte(p2.Predicate)) - -} From 83b217c8294f62dbf4b3b3f1fe49f4aeb2932866 Mon Sep 17 00:00:00 2001 From: NamanJain8 Date: Thu, 13 May 2021 00:41:19 +0530 Subject: [PATCH 03/10] use ns-attr format internally for dgraph while ns|attr for badger --- edgraph/server.go | 11 +--- posting/index.go | 18 +++--- posting/list_test.go | 16 ++--- t/t.go | 2 +- worker/groups.go | 2 +- x/keys.go | 135 ++++++++++++++++++------------------------- 6 files changed, 74 insertions(+), 110 deletions(-) diff --git a/edgraph/server.go b/edgraph/server.go index e2d33c4e3b4..a58b7b5d613 100644 --- a/edgraph/server.go +++ b/edgraph/server.go @@ -1039,16 +1039,7 @@ func filterTablets(ctx context.Context, ms *pb.MembershipState) error { return errors.Errorf("Namespace not found in JWT.") } if namespace == x.GalaxyNamespace { - // For galaxy namespace, we don't want to filter out the predicates. We only format the - // namespace to human readable form. - for _, group := range ms.Groups { - tablets := make(map[string]*pb.Tablet) - for tabletName, tablet := range group.Tablets { - tablet.Predicate = x.FormatNsAttr(tablet.Predicate) - tablets[x.FormatNsAttr(tabletName)] = tablet - } - group.Tablets = tablets - } + // For galaxy namespace, we don't want to filter out the predicates. return nil } for _, group := range ms.GetGroups() { diff --git a/posting/index.go b/posting/index.go index f3ac5a90df5..76b133d23f6 100644 --- a/posting/index.go +++ b/posting/index.go @@ -593,7 +593,7 @@ func (r *rebuilder) Run(ctx context.Context) error { glog.V(1).Infof( "Rebuilding index for predicate %s: Starting process. StartTs=%d. Prefix=\n%s\n", - x.FormatNsAttr(r.attr), r.startTs, hex.Dump(r.prefix)) + r.attr, r.startTs, hex.Dump(r.prefix)) // Counter is used here to ensure that all keys are committed at different timestamp. // We set it to 1 in case there are no keys found and NewStreamAt is called with ts=0. @@ -601,8 +601,7 @@ func (r *rebuilder) Run(ctx context.Context) error { tmpWriter := tmpDB.NewManagedWriteBatch() stream := pstore.NewStreamAt(r.startTs) - stream.LogPrefix = fmt.Sprintf("Rebuilding index for predicate %s (1/2):", - x.FormatNsAttr(r.attr)) + stream.LogPrefix = fmt.Sprintf("Rebuilding index for predicate %s (1/2):", r.attr) stream.Prefix = r.prefix stream.KeyToList = func(key []byte, itr *badger.Iterator) (*bpb.KVList, error) { // We should return quickly if the context is no longer valid. @@ -664,21 +663,19 @@ func (r *rebuilder) Run(ctx context.Context) error { return err } glog.V(1).Infof("Rebuilding index for predicate %s: building temp index took: %v\n", - x.FormatNsAttr(r.attr), time.Since(start)) + r.attr, time.Since(start)) // Now we write all the created posting lists to disk. - glog.V(1).Infof("Rebuilding index for predicate %s: writing index to badger", - x.FormatNsAttr(r.attr)) + glog.V(1).Infof("Rebuilding index for predicate %s: writing index to badger", r.attr) start = time.Now() defer func() { glog.V(1).Infof("Rebuilding index for predicate %s: writing index took: %v\n", - x.FormatNsAttr(r.attr), time.Since(start)) + r.attr, time.Since(start)) }() writer := pstore.NewManagedWriteBatch() tmpStream := tmpDB.NewStreamAt(counter) - tmpStream.LogPrefix = fmt.Sprintf("Rebuilding index for predicate %s (2/2):", - x.FormatNsAttr(r.attr)) + tmpStream.LogPrefix = fmt.Sprintf("Rebuilding index for predicate %s (2/2):", r.attr) tmpStream.KeyToList = func(key []byte, itr *badger.Iterator) (*bpb.KVList, error) { l, err := ReadPostingList(key, itr) if err != nil { @@ -721,8 +718,7 @@ func (r *rebuilder) Run(ctx context.Context) error { if err := tmpStream.Orchestrate(ctx); err != nil { return err } - glog.V(1).Infof("Rebuilding index for predicate %s: Flushing all writes.\n", - x.FormatNsAttr(r.attr)) + glog.V(1).Infof("Rebuilding index for predicate %s: Flushing all writes.\n", r.attr) return writer.Flush() } diff --git a/posting/list_test.go b/posting/list_test.go index 776ced1e0ae..db89b7065b5 100644 --- a/posting/list_test.go +++ b/posting/list_test.go @@ -559,7 +559,7 @@ func TestAddMutation_mrjn2(t *testing.T) { } func TestAddMutation_gru(t *testing.T) { - key := x.DataKey("question.tag", 0x01) + key := x.DataKey(x.GalaxyAttr("question.tag"), 0x01) ol, err := getNew(key, ps, math.MaxUint64) require.NoError(t, err) @@ -592,7 +592,7 @@ func TestAddMutation_gru(t *testing.T) { } func TestAddMutation_gru2(t *testing.T) { - key := x.DataKey("question.tag", 0x100) + key := x.DataKey(x.GalaxyAttr("question.tag"), 0x100) ol, err := getNew(key, ps, math.MaxUint64) require.NoError(t, err) @@ -639,7 +639,7 @@ func TestAddMutation_gru2(t *testing.T) { func TestAddAndDelMutation(t *testing.T) { // Ensure each test uses unique key since we don't clear the postings // after each test - key := x.DataKey("dummy_key", 0x927) + key := x.DataKey(x.GalaxyAttr("dummy_key"), 0x927) ol, err := getNew(key, ps, math.MaxUint64) require.NoError(t, err) @@ -878,7 +878,7 @@ func createMultiPartList(t *testing.T, size int, addFacet bool) (*List, int) { defer setMaxListSize(maxListSize) maxListSize = 5000 - key := x.DataKey(uuid.New().String(), 1331) + key := x.DataKey(x.GalaxyAttr(uuid.New().String()), 1331) ol, err := getNew(key, ps, math.MaxUint64) require.NoError(t, err) commits := 0 @@ -926,7 +926,7 @@ func createAndDeleteMultiPartList(t *testing.T, size int) (*List, int) { defer setMaxListSize(maxListSize) maxListSize = 10000 - key := x.DataKey(uuid.New().String(), 1331) + key := x.DataKey(x.GalaxyAttr(uuid.New().String()), 1331) ol, err := getNew(key, ps, math.MaxUint64) require.NoError(t, err) commits := 0 @@ -1040,7 +1040,7 @@ func TestBinSplit(t *testing.T) { defer func() { maxListSize = originalListSize }() - key := x.DataKey(uuid.New().String(), 1331) + key := x.DataKey(x.GalaxyAttr(uuid.New().String()), 1331) ol, err := getNew(key, ps, math.MaxUint64) require.NoError(t, err) for i := 1; i <= size; i++ { @@ -1221,7 +1221,7 @@ func TestMultiPartListDeleteAndAdd(t *testing.T) { maxListSize = 5000 // Add entries to the maps. - key := x.DataKey(uuid.New().String(), 1331) + key := x.DataKey(x.GalaxyAttr(uuid.New().String()), 1331) ol, err := getNew(key, ps, math.MaxUint64) require.NoError(t, err) for i := 1; i <= size; i++ { @@ -1360,7 +1360,7 @@ func TestRecursiveSplits(t *testing.T) { // Create a list that should be split recursively. size := int(1e5) - key := x.DataKey(uuid.New().String(), 1331) + key := x.DataKey(x.GalaxyAttr(uuid.New().String()), 1331) ol, err := getNew(key, ps, math.MaxUint64) require.NoError(t, err) commits := 0 diff --git a/t/t.go b/t/t.go index b5bf4b01a10..8fdf7570c02 100644 --- a/t/t.go +++ b/t/t.go @@ -154,7 +154,7 @@ func outputLogs(prefix string) { } logCmd := exec.Command("docker", "logs", c.ID) out, err := logCmd.CombinedOutput() - fmt.Printf("Docker logs for %d is %s with error %+v ", c.ID, string(out), err) + fmt.Printf("Docker logs for %s is %s with error %+v ", c.ID, string(out), err) } for i := 0; i <= 3; i++ { printLogs("zero" + strconv.Itoa(i)) diff --git a/worker/groups.go b/worker/groups.go index d5969dbc8aa..0a656d52c48 100644 --- a/worker/groups.go +++ b/worker/groups.go @@ -488,7 +488,7 @@ func (g *groupi) sendTablet(tablet *pb.Tablet) (*pb.Tablet, error) { } if out.GroupId == groups().groupId() { - glog.Infof("Serving tablet for: %v\n", x.FormatNsAttr(tablet.GetPredicate())) + glog.Infof("Serving tablet for: %v\n", tablet.GetPredicate()) } return out, nil } diff --git a/x/keys.go b/x/keys.go index 418cba0c492..fa503cd3169 100644 --- a/x/keys.go +++ b/x/keys.go @@ -57,6 +57,8 @@ const ( IgnoreBytes = "1-8" // NamespaceOffset is the offset in badger key from which the next 8 bytes contain namespace. NamespaceOffset = 1 + // NsSeparator is the separator between between the namespace and attribute. + NsSeparator = string(byte(30)) ) func NamespaceToBytes(ns uint64) []byte { @@ -67,7 +69,7 @@ func NamespaceToBytes(ns uint64) []byte { // NamespaceAttr is used to generate attr from namespace. func NamespaceAttr(ns uint64, attr string) string { - return string(NamespaceToBytes(ns)) + attr + return nsToStr(ns) + NsSeparator + attr } func NamespaceAttrList(ns uint64, preds []string) []string { @@ -84,21 +86,25 @@ func GalaxyAttr(attr string) string { // ParseNamespaceAttr returns the namespace and attr from the given value. func ParseNamespaceAttr(attr string) (uint64, string) { - return binary.BigEndian.Uint64([]byte(attr[:8])), attr[8:] + splits := strings.SplitN(attr, NsSeparator, 2) + return strToNs(splits[0]), splits[1] } func ParseNamespaceBytes(attr string) ([]byte, string) { - return []byte(attr[:8]), attr[8:] + splits := strings.SplitN(attr, NsSeparator, 2) + ns := make([]byte, 8) + binary.BigEndian.PutUint64(ns, strToNs(splits[0])) + return ns, splits[1] } // ParseAttr returns the attr from the given value. func ParseAttr(attr string) string { - return attr[8:] + return strings.SplitN(attr, NsSeparator, 2)[1] } // ParseNamespace returns the namespace from the given value. func ParseNamespace(attr string) uint64 { - return binary.BigEndian.Uint64([]byte(attr[:8])) + return strToNs(strings.SplitN(attr, NsSeparator, 2)[0]) } func ParseAttrList(attrs []string) []string { @@ -109,13 +115,19 @@ func ParseAttrList(attrs []string) []string { return resp } -func IsReverseAttr(attr string) bool { - return attr[8] == '~' +func strToNs(s string) uint64 { + ns, err := strconv.ParseUint(s, 0, 64) + Check(err) + return ns } -func FormatNsAttr(attr string) string { - ns, attr := ParseNamespaceAttr(attr) - return strconv.FormatUint(ns, 10) + "-" + attr +func nsToStr(ns uint64) string { + return strconv.FormatUint(ns, 10) +} + +func IsReverseAttr(attr string) bool { + pred := strings.SplitN(attr, NsSeparator, 2)[1] + return pred[0] == '~' } func writeAttr(buf []byte, attr string) []byte { @@ -130,19 +142,18 @@ func writeAttr(buf []byte, attr string) []byte { // genKey creates the key and writes the initial bytes (type byte, length of attribute, // and the attribute itself). It leaves the rest of the key empty for further processing -// if necessary. -func generateKey(typeByte byte, attr string, totalLen int) []byte { - AssertTrue(totalLen >= 1+2+len(attr)) - - buf := make([]byte, totalLen) - buf[0] = typeByte +// if necessary. It also returns next index from where further processing should be done. +func generateKey(typeByte byte, attr string, extra int) ([]byte, int) { // Separate namespace and attribute from attr and write namespace in the first 8 bytes of key. namespace, attr := ParseNamespaceBytes(attr) + prefixLen := 1 + 8 + 2 + len(attr) // byteType + ns + len(pred) + pred + buf := make([]byte, prefixLen+extra) + buf[0] = typeByte AssertTrue(copy(buf[1:], namespace) == 8) rest := buf[9:] writeAttr(rest, attr) - return buf + return buf, prefixLen } // SchemaKey returns schema key for given attribute. Schema keys are stored @@ -153,7 +164,8 @@ func generateKey(typeByte byte, attr string, totalLen int) []byte { // byte 1-2: length of attr // next len(attr) bytes: value of attr func SchemaKey(attr string) []byte { - return generateKey(ByteSchema, attr, 1+2+len(attr)) + key, _ := generateKey(ByteSchema, attr, 0) + return key } // TypeKey returns type key for given type name. Type keys are stored separately @@ -164,7 +176,8 @@ func SchemaKey(attr string) []byte { // byte 1-2: length of typeName // next len(attr) bytes: value of attr (the type name) func TypeKey(attr string) []byte { - return generateKey(ByteType, attr, 1+2+len(attr)) + key, _ := generateKey(ByteType, attr, 0) + return key } // DataKey generates a data key with the given attribute and UID. @@ -178,9 +191,8 @@ func TypeKey(attr string) []byte { // next eight bytes (optional): if the key corresponds to a split list, the startUid of // the split stored in this key and the first byte will be sets to ByteSplit. func DataKey(attr string, uid uint64) []byte { - prefixLen := 1 + 2 + len(attr) - totalLen := prefixLen + 1 + 8 - buf := generateKey(DefaultPrefix, attr, totalLen) + extra := 1 + 8 // ByteData + UID + buf, prefixLen := generateKey(DefaultPrefix, attr, extra) rest := buf[prefixLen:] rest[0] = ByteData @@ -201,9 +213,8 @@ func DataKey(attr string, uid uint64) []byte { // next eight bytes (optional): if the key corresponds to a split list, the startUid of // the split stored in this key. func ReverseKey(attr string, uid uint64) []byte { - prefixLen := 1 + 2 + len(attr) - totalLen := prefixLen + 1 + 8 - buf := generateKey(DefaultPrefix, attr, totalLen) + extra := 1 + 8 // ByteReverse + UID + buf, prefixLen := generateKey(DefaultPrefix, attr, extra) rest := buf[prefixLen:] rest[0] = ByteReverse @@ -224,9 +235,8 @@ func ReverseKey(attr string, uid uint64) []byte { // next eight bytes (optional): if the key corresponds to a split list, the startUid of // the split stored in this key. func IndexKey(attr, term string) []byte { - prefixLen := 1 + 2 + len(attr) - totalLen := prefixLen + 1 + len(term) - buf := generateKey(DefaultPrefix, attr, totalLen) + extra := 1 + len(term) // ByteIndex + term + buf, prefixLen := generateKey(DefaultPrefix, attr, extra) rest := buf[prefixLen:] rest[0] = ByteIndex @@ -246,9 +256,8 @@ func IndexKey(attr, term string) []byte { // next byte: data type prefix (set to ByteCount or ByteCountRev) // next four bytes: value of count. func CountKey(attr string, count uint32, reverse bool) []byte { - prefixLen := 1 + 2 + len(attr) - totalLen := prefixLen + 1 + 4 - buf := generateKey(DefaultPrefix, attr, totalLen) + extra := 1 + 4 // ByteCount + Count + buf, prefixLen := generateKey(DefaultPrefix, attr, extra) rest := buf[prefixLen:] if reverse { @@ -333,14 +342,9 @@ func (p ParsedKey) IsOfType(typ byte) bool { // SkipPredicate returns the first key after the keys corresponding to the predicate // of this key. Useful when iterating in the reverse order. func (p ParsedKey) SkipPredicate() []byte { - buf := make([]byte, 1+2+len(p.Attr)+1) - buf[0] = p.bytePrefix - ns, attr := ParseNamespaceBytes(p.Attr) - AssertTrue(copy(buf[1:], ns) == 8) - rest := buf[9:] - k := writeAttr(rest, attr) - AssertTrue(len(k) == 1) - k[0] = 0xFF + buf, prefixLen := generateKey(p.bytePrefix, p.Attr, 1) + AssertTrue(len(buf[prefixLen:]) == 1) + buf[prefixLen] = 0xFF return buf } @@ -361,56 +365,33 @@ func (p ParsedKey) SkipType() []byte { // DataPrefix returns the prefix for data keys. func (p ParsedKey) DataPrefix() []byte { - buf := make([]byte, 1+2+len(p.Attr)+1) - buf[0] = p.bytePrefix - ns, attr := ParseNamespaceBytes(p.Attr) - AssertTrue(copy(buf[1:], ns) == 8) - rest := buf[9:] - k := writeAttr(rest, attr) - AssertTrue(len(k) == 1) - k[0] = ByteData + buf, prefixLen := generateKey(p.bytePrefix, p.Attr, 1) + buf[prefixLen] = ByteData return buf } // IndexPrefix returns the prefix for index keys. func (p ParsedKey) IndexPrefix() []byte { - buf := make([]byte, 1+2+len(p.Attr)+1) - buf[0] = DefaultPrefix - ns, attr := ParseNamespaceBytes(p.Attr) - AssertTrue(copy(buf[1:], ns) == 8) - rest := buf[9:] - k := writeAttr(rest, attr) - AssertTrue(len(k) == 1) - k[0] = ByteIndex + buf, prefixLen := generateKey(DefaultPrefix, p.Attr, 1) + buf[prefixLen] = ByteIndex return buf } // ReversePrefix returns the prefix for index keys. func (p ParsedKey) ReversePrefix() []byte { - buf := make([]byte, 1+2+len(p.Attr)+1) - buf[0] = DefaultPrefix - ns, attr := ParseNamespaceBytes(p.Attr) - AssertTrue(copy(buf[1:], ns) == 8) - rest := buf[9:] - k := writeAttr(rest, attr) - AssertTrue(len(k) == 1) - k[0] = ByteReverse + buf, prefixLen := generateKey(DefaultPrefix, p.Attr, 1) + buf[prefixLen] = ByteReverse return buf } // CountPrefix returns the prefix for count keys. func (p ParsedKey) CountPrefix(reverse bool) []byte { - buf := make([]byte, 1+2+len(p.Attr)+1) - buf[0] = p.bytePrefix - ns, attr := ParseNamespaceBytes(p.Attr) - AssertTrue(copy(buf[1:], ns) == 8) - rest := buf[9:] - k := writeAttr(rest, attr) - AssertTrue(len(k) == 1) + buf, prefixLen := generateKey(DefaultPrefix, p.Attr, 1) + buf[prefixLen] = ByteReverse if reverse { - k[0] = ByteCountRev + buf[prefixLen] = ByteCountRev } else { - k[0] = ByteCount + buf[prefixLen] = ByteCount } return buf } @@ -496,12 +477,8 @@ func TypePrefix() []byte { // PredicatePrefix returns the prefix for all keys belonging to this predicate except schema key. func PredicatePrefix(predicate string) []byte { - buf := make([]byte, 1+2+len(predicate)) - buf[0] = DefaultPrefix - ns, predicate := ParseNamespaceBytes(predicate) - AssertTrue(copy(buf[1:], ns) == 8) - k := writeAttr(buf[9:], predicate) - AssertTrue(len(k) == 0) + buf, prefixLen := generateKey(DefaultPrefix, predicate, 0) + AssertTrue(len(buf) == prefixLen) return buf } @@ -556,7 +533,7 @@ func Parse(key []byte) (ParsedKey, error) { if len(k) < sz { return p, errors.Errorf("Invalid size %v for key %v", sz, key) } - p.Attr = string(namespace) + string(k[:sz]) + p.Attr = NamespaceAttr(binary.BigEndian.Uint64(namespace), string(k[:sz])) k = k[sz:] switch p.bytePrefix { From bc8883d165363122affcadb8696124824d3fcf6e Mon Sep 17 00:00:00 2001 From: NamanJain8 Date: Thu, 13 May 2021 01:29:59 +0530 Subject: [PATCH 04/10] fix TestNamespace --- x/keys_test.go | 40 ++++++++++++++++++++-------------------- 1 file changed, 20 insertions(+), 20 deletions(-) diff --git a/x/keys_test.go b/x/keys_test.go index 50ceb8b83ef..5571054f914 100644 --- a/x/keys_test.go +++ b/x/keys_test.go @@ -29,8 +29,8 @@ func TestNameSpace(t *testing.T) { ns := uint64(133) attr := "name" nsAttr := NamespaceAttr(ns, attr) - require.Equal(t, 8+len(attr), len(nsAttr)) - parsedAttr := ParseAttr(nsAttr) + parsedNs, parsedAttr := ParseNamespaceAttr(nsAttr) + require.Equal(t, ns, parsedNs) require.Equal(t, attr, parsedAttr) } @@ -39,7 +39,7 @@ func TestDataKey(t *testing.T) { // key with uid = 0 is invalid uid = 0 - key := DataKey(NamespaceAttr(GalaxyNamespace, "bad uid"), uid) + key := DataKey(GalaxyAttr("bad uid"), uid) _, err := Parse(key) require.Error(t, err) @@ -47,7 +47,7 @@ func TestDataKey(t *testing.T) { // Use the uid to derive the attribute so it has variable length and the test // can verify that multiple sizes of attr work correctly. sattr := fmt.Sprintf("attr:%d", uid) - key := DataKey(NamespaceAttr(GalaxyNamespace, sattr), uid) + key := DataKey(GalaxyAttr(sattr), uid) pk, err := Parse(key) require.NoError(t, err) @@ -59,14 +59,14 @@ func TestDataKey(t *testing.T) { keys := make([]string, 0, 1024) for uid = 1024; uid >= 1; uid-- { - key := DataKey(NamespaceAttr(GalaxyNamespace, "testing.key"), uid) + key := DataKey(GalaxyAttr("testing.key"), uid) keys = append(keys, string(key)) } // Test that sorting is as expected. sort.Strings(keys) require.True(t, sort.StringsAreSorted(keys)) for i, key := range keys { - exp := DataKey(NamespaceAttr(GalaxyNamespace, "testing.key"), uint64(i+1)) + exp := DataKey(GalaxyAttr("testing.key"), uint64(i+1)) require.Equal(t, string(exp), key) } } @@ -76,7 +76,7 @@ func TestParseDataKeyWithStartUid(t *testing.T) { startUid := uint64(math.MaxUint64) for uid = 1; uid < 1001; uid++ { sattr := fmt.Sprintf("attr:%d", uid) - key := DataKey(NamespaceAttr(GalaxyNamespace, sattr), uid) + key := DataKey(GalaxyAttr(sattr), uid) key, err := SplitKey(key, startUid) require.NoError(t, err) pk, err := Parse(key) @@ -96,7 +96,7 @@ func TestIndexKey(t *testing.T) { sattr := fmt.Sprintf("attr:%d", uid) sterm := fmt.Sprintf("term:%d", uid) - key := IndexKey(NamespaceAttr(GalaxyNamespace, sattr), sterm) + key := IndexKey(GalaxyAttr(sattr), sterm) pk, err := Parse(key) require.NoError(t, err) @@ -113,7 +113,7 @@ func TestIndexKeyWithStartUid(t *testing.T) { sattr := fmt.Sprintf("attr:%d", uid) sterm := fmt.Sprintf("term:%d", uid) - key := IndexKey(NamespaceAttr(GalaxyNamespace, sattr), sterm) + key := IndexKey(GalaxyAttr(sattr), sterm) key, err := SplitKey(key, startUid) require.NoError(t, err) pk, err := Parse(key) @@ -132,7 +132,7 @@ func TestReverseKey(t *testing.T) { for uid = 1; uid < 1001; uid++ { sattr := fmt.Sprintf("attr:%d", uid) - key := ReverseKey(NamespaceAttr(GalaxyNamespace, sattr), uid) + key := ReverseKey(GalaxyAttr(sattr), uid) pk, err := Parse(key) require.NoError(t, err) @@ -148,7 +148,7 @@ func TestReverseKeyWithStartUid(t *testing.T) { for uid = 1; uid < 1001; uid++ { sattr := fmt.Sprintf("attr:%d", uid) - key := ReverseKey(NamespaceAttr(GalaxyNamespace, sattr), uid) + key := ReverseKey(GalaxyAttr(sattr), uid) key, err := SplitKey(key, startUid) require.NoError(t, err) pk, err := Parse(key) @@ -167,7 +167,7 @@ func TestCountKey(t *testing.T) { for count = 0; count < 1001; count++ { sattr := fmt.Sprintf("attr:%d", count) - key := CountKey(NamespaceAttr(GalaxyNamespace, sattr), count, true) + key := CountKey(GalaxyAttr(sattr), count, true) pk, err := Parse(key) require.NoError(t, err) @@ -183,7 +183,7 @@ func TestCountKeyWithStartUid(t *testing.T) { for count = 0; count < 1001; count++ { sattr := fmt.Sprintf("attr:%d", count) - key := CountKey(NamespaceAttr(GalaxyNamespace, sattr), count, true) + key := CountKey(GalaxyAttr(sattr), count, true) key, err := SplitKey(key, startUid) require.NoError(t, err) pk, err := Parse(key) @@ -202,7 +202,7 @@ func TestSchemaKey(t *testing.T) { for uid = 0; uid < 1001; uid++ { sattr := fmt.Sprintf("attr:%d", uid) - key := SchemaKey(NamespaceAttr(GalaxyNamespace, sattr)) + key := SchemaKey(GalaxyAttr(sattr)) pk, err := Parse(key) require.NoError(t, err) @@ -216,7 +216,7 @@ func TestTypeKey(t *testing.T) { for uid = 0; uid < 1001; uid++ { sattr := fmt.Sprintf("attr:%d", uid) - key := TypeKey(NamespaceAttr(GalaxyNamespace, sattr)) + key := TypeKey(GalaxyAttr(sattr)) pk, err := Parse(key) require.NoError(t, err) @@ -236,16 +236,16 @@ func TestBadStartUid(t *testing.T) { require.Error(t, err) } - key := DataKey(NamespaceAttr(GalaxyNamespace, "aa"), 1) + key := DataKey(GalaxyAttr("aa"), 1) testKey(key) - key = ReverseKey(NamespaceAttr(GalaxyNamespace, "aa"), 1) + key = ReverseKey(GalaxyAttr("aa"), 1) testKey(key) - key = CountKey(NamespaceAttr(GalaxyNamespace, "aa"), 0, false) + key = CountKey(GalaxyAttr("aa"), 0, false) testKey(key) - key = CountKey(NamespaceAttr(GalaxyNamespace, "aa"), 0, true) + key = CountKey(GalaxyAttr("aa"), 0, true) testKey(key) } @@ -271,7 +271,7 @@ func TestBadKeys(t *testing.T) { // key with uid = 0 is invalid uid := 0 - key = DataKey(NamespaceAttr(GalaxyNamespace, "bad uid"), uint64(uid)) + key = DataKey(GalaxyAttr("bad uid"), uint64(uid)) _, err = Parse(key) require.Error(t, err) } From 856e1bacd9ca05302cd275a52d049d61a4fb04a5 Mon Sep 17 00:00:00 2001 From: NamanJain8 Date: Thu, 13 May 2021 01:50:15 +0530 Subject: [PATCH 05/10] address comments --- x/keys.go | 20 ++++++++++---------- x/keys_test.go | 29 +++++++++++++++++++++++++++++ 2 files changed, 39 insertions(+), 10 deletions(-) diff --git a/x/keys.go b/x/keys.go index fa503cd3169..7260f78989f 100644 --- a/x/keys.go +++ b/x/keys.go @@ -58,7 +58,7 @@ const ( // NamespaceOffset is the offset in badger key from which the next 8 bytes contain namespace. NamespaceOffset = 1 // NsSeparator is the separator between between the namespace and attribute. - NsSeparator = string(byte(30)) + NsSeparator = "-" ) func NamespaceToBytes(ns uint64) []byte { @@ -69,7 +69,7 @@ func NamespaceToBytes(ns uint64) []byte { // NamespaceAttr is used to generate attr from namespace. func NamespaceAttr(ns uint64, attr string) string { - return nsToStr(ns) + NsSeparator + attr + return uintToStr(ns) + NsSeparator + attr } func NamespaceAttrList(ns uint64, preds []string) []string { @@ -87,13 +87,13 @@ func GalaxyAttr(attr string) string { // ParseNamespaceAttr returns the namespace and attr from the given value. func ParseNamespaceAttr(attr string) (uint64, string) { splits := strings.SplitN(attr, NsSeparator, 2) - return strToNs(splits[0]), splits[1] + return strToUint(splits[0]), splits[1] } func ParseNamespaceBytes(attr string) ([]byte, string) { splits := strings.SplitN(attr, NsSeparator, 2) ns := make([]byte, 8) - binary.BigEndian.PutUint64(ns, strToNs(splits[0])) + binary.BigEndian.PutUint64(ns, strToUint(splits[0])) return ns, splits[1] } @@ -104,7 +104,7 @@ func ParseAttr(attr string) string { // ParseNamespace returns the namespace from the given value. func ParseNamespace(attr string) uint64 { - return strToNs(strings.SplitN(attr, NsSeparator, 2)[0]) + return strToUint(strings.SplitN(attr, NsSeparator, 2)[0]) } func ParseAttrList(attrs []string) []string { @@ -115,14 +115,14 @@ func ParseAttrList(attrs []string) []string { return resp } -func strToNs(s string) uint64 { - ns, err := strconv.ParseUint(s, 0, 64) +// For consistency, use base16 to encode/decode the namespace. +func strToUint(s string) uint64 { + ns, err := strconv.ParseUint(s, 16, 64) Check(err) return ns } - -func nsToStr(ns uint64) string { - return strconv.FormatUint(ns, 10) +func uintToStr(ns uint64) string { + return strconv.FormatUint(ns, 16) } func IsReverseAttr(attr string) bool { diff --git a/x/keys_test.go b/x/keys_test.go index 5571054f914..646ef71a63f 100644 --- a/x/keys_test.go +++ b/x/keys_test.go @@ -17,6 +17,7 @@ package x import ( + "encoding/json" "fmt" "math" "sort" @@ -275,3 +276,31 @@ func TestBadKeys(t *testing.T) { _, err = Parse(key) require.Error(t, err) } + +func TestJsonMarshal(t *testing.T) { + type predicate struct { + Predicate string `json:"predicate,omitempty"` + } + + p := &predicate{Predicate: NamespaceAttr(129, "name")} + b, err := json.Marshal(p) + require.NoError(t, err) + + var p2 predicate + require.NoError(t, json.Unmarshal(b, &p2)) + ns, attr := ParseNamespaceAttr(p2.Predicate) + require.Equal(t, uint64(129), ns) + require.Equal(t, "name", attr) +} + +func TestNsSeparator(t *testing.T) { + uid := uint64(10) + pred := "name" + NsSeparator + "surname" + key := DataKey(GalaxyAttr(pred), uid) + pk, err := Parse(key) + require.NoError(t, err) + require.Equal(t, uid, pk.Uid) + ns, attr := ParseNamespaceAttr(pk.Attr) + require.Equal(t, GalaxyNamespace, ns) + require.Equal(t, pred, attr) +} From c4c6206bc094924401f8be455d9954548a15ab6c Mon Sep 17 00:00:00 2001 From: NamanJain8 Date: Thu, 13 May 2021 12:33:26 +0530 Subject: [PATCH 06/10] Fix http tests --- dgraph/cmd/alpha/http_test.go | 2 +- dgraph/cmd/alpha/upsert_test.go | 2 +- dgraph/cmd/zero/oracle.go | 4 ++-- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/dgraph/cmd/alpha/http_test.go b/dgraph/cmd/alpha/http_test.go index 97638f25b0e..15b8a0bb809 100644 --- a/dgraph/cmd/alpha/http_test.go +++ b/dgraph/cmd/alpha/http_test.go @@ -428,7 +428,7 @@ func TestTransactionBasic(t *testing.T) { require.Equal(t, 2, len(mr.preds)) var parsedPreds []string for _, pred := range mr.preds { - p := strings.Split(pred, "-")[1] + p := strings.SplitN(pred, "-", 2)[1] parsedPreds = append(parsedPreds, x.ParseAttr(p)) } sort.Strings(parsedPreds) diff --git a/dgraph/cmd/alpha/upsert_test.go b/dgraph/cmd/alpha/upsert_test.go index a597250e7d6..f532f6a7d97 100644 --- a/dgraph/cmd/alpha/upsert_test.go +++ b/dgraph/cmd/alpha/upsert_test.go @@ -39,7 +39,7 @@ type QueryResult struct { func splitPreds(ps []string) []string { for i, p := range ps { - ps[i] = x.ParseAttr(strings.Split(p, "-")[1]) + ps[i] = x.ParseAttr(strings.SplitN(p, "-", 2)[1]) } return ps diff --git a/dgraph/cmd/zero/oracle.go b/dgraph/cmd/zero/oracle.go index 4df2de2af37..5109e095973 100644 --- a/dgraph/cmd/zero/oracle.go +++ b/dgraph/cmd/zero/oracle.go @@ -369,7 +369,7 @@ func (s *Server) commit(ctx context.Context, src *api.TxnContext) error { checkPreds := func() error { // Check if any of these tablets is being moved. If so, abort the transaction. for _, pkey := range src.Preds { - splits := strings.Split(pkey, "-") + splits := strings.SplitN(pkey, "-", 2) if len(splits) < 2 { return errors.Errorf("Unable to find group id in %s", pkey) } @@ -377,7 +377,7 @@ func (s *Server) commit(ctx context.Context, src *api.TxnContext) error { if err != nil { return errors.Wrapf(err, "unable to parse group id from %s", pkey) } - pred := strings.Join(splits[1:], "-") + pred := splits[1] tablet := s.ServingTablet(pred) if tablet == nil { return errors.Errorf("Tablet for %s is nil", pred) From f064985944ee48492bc055abad8fa7160eb849d9 Mon Sep 17 00:00:00 2001 From: NamanJain8 Date: Thu, 13 May 2021 14:28:00 +0530 Subject: [PATCH 07/10] add update_manifest tool --- dgraph/cmd/root.go | 4 +- dgraph/cmd/update_manifest/run.go | 109 ++++++++++++++++++++++++++++++ ee/backup/run.go | 2 +- worker/backup_ee.go | 4 +- worker/backup_manifest.go | 6 +- 5 files changed, 118 insertions(+), 7 deletions(-) create mode 100644 dgraph/cmd/update_manifest/run.go diff --git a/dgraph/cmd/root.go b/dgraph/cmd/root.go index b5d23e197c6..8001397a254 100644 --- a/dgraph/cmd/root.go +++ b/dgraph/cmd/root.go @@ -38,6 +38,7 @@ import ( "github.com/dgraph-io/dgraph/dgraph/cmd/increment" "github.com/dgraph-io/dgraph/dgraph/cmd/live" "github.com/dgraph-io/dgraph/dgraph/cmd/migrate" + "github.com/dgraph-io/dgraph/dgraph/cmd/update_manifest" "github.com/dgraph-io/dgraph/dgraph/cmd/version" "github.com/dgraph-io/dgraph/dgraph/cmd/zero" "github.com/dgraph-io/dgraph/upgrade" @@ -83,7 +84,8 @@ var rootConf = viper.New() // subcommands initially contains all default sub-commands. var subcommands = []*x.SubCommand{ &bulk.Bulk, &cert.Cert, &conv.Conv, &live.Live, &alpha.Alpha, &zero.Zero, &version.Version, - &debug.Debug, &migrate.Migrate, &debuginfo.DebugInfo, &upgrade.Upgrade, &decrypt.Decrypt, &increment.Increment, + &debug.Debug, &migrate.Migrate, &debuginfo.DebugInfo, &upgrade.Upgrade, &decrypt.Decrypt, + &increment.Increment, &update_manifest.UpdateManifest, } func initCmds() { diff --git a/dgraph/cmd/update_manifest/run.go b/dgraph/cmd/update_manifest/run.go new file mode 100644 index 00000000000..5a49ab2303b --- /dev/null +++ b/dgraph/cmd/update_manifest/run.go @@ -0,0 +1,109 @@ +/* + * Copyright 2021 Dgraph Labs, Inc. and Contributors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package update_manifest + +import ( + "encoding/binary" + "log" + "net/url" + "os" + + "github.com/dgraph-io/dgraph/ee" + "github.com/dgraph-io/dgraph/protos/pb" + "github.com/dgraph-io/dgraph/worker" + "github.com/dgraph-io/dgraph/x" + "github.com/pkg/errors" + "github.com/spf13/cobra" +) + +var ( + logger = log.New(os.Stderr, "", 0) + // UpdateManifest is the sub-command invoked when running "dgraph update_manifest". + UpdateManifest x.SubCommand + quiet bool // enabling quiet mode would suppress the warning logs +) + +var opt struct { + location string + key []byte +} + +func init() { + UpdateManifest.Cmd = &cobra.Command{ + Use: "update_manifest", + Short: "Run the Dgraph update tool to update the manifest from v21.03 to latest.", + Run: func(cmd *cobra.Command, args []string) { + if err := run(); err != nil { + logger.Fatalf("%v\n", err) + } + }, + Annotations: map[string]string{"group": "tool"}, + } + UpdateManifest.EnvPrefix = "DGRAPH_UPDATE_MANIFEST" + UpdateManifest.Cmd.SetHelpTemplate(x.NonRootTemplate) + + flag := UpdateManifest.Cmd.Flags() + flag.StringVarP(&opt.location, "location", "l", "", + `Sets the location of the backup. Both file URIs and s3 are supported. + This command will take care of all the full + incremental backups present in the location.`) + ee.RegisterEncFlag(flag) +} + +func run() error { + keys, err := ee.GetKeys(UpdateManifest.Conf) + if err != nil { + return err + } + opt.key = keys.EncKey + uri, err := url.Parse(opt.location) + if err != nil { + return errors.Wrapf(err, "while parsing location") + } + handler, err := worker.NewUriHandler(uri, nil) + if err != nil { + return errors.Wrapf(err, "while creating uri handler") + } + masterManifest, err := worker.GetManifest(handler, uri) + if err != nil { + return errors.Wrapf(err, "while getting manifest") + } + + // Update the master manifest with the changes for drop operations and group predicates. + for _, manifest := range masterManifest.Manifests { + for gid, preds := range manifest.Groups { + parsedPreds := preds[:0] + for _, pred := range preds { + ns, attr := binary.BigEndian.Uint64([]byte(pred[:8])), pred[8:] + parsedPreds = append(parsedPreds, x.NamespaceAttr(ns, attr)) + } + manifest.Groups[gid] = parsedPreds + } + for _, op := range manifest.DropOperations { + if op.DropOp == pb.DropOperation_ATTR { + nsattr := op.DropValue + ns, attr := binary.BigEndian.Uint64([]byte(nsattr[:8])), nsattr[8:] + op.DropValue = x.NamespaceAttr(ns, attr) + } + } + } + + // Rewrite the master manifest. + if err := worker.CreateManifest(handler, uri, masterManifest); err != nil { + return errors.Wrap(err, "Complete backup failed") + } + return nil +} diff --git a/ee/backup/run.go b/ee/backup/run.go index d42f0168b35..ebf032b9a0f 100644 --- a/ee/backup/run.go +++ b/ee/backup/run.go @@ -257,7 +257,7 @@ func runExportBackup() error { glog.Infof("Created temporary map directory: %s\n", mapDir) // TODO: Can probably make this procesing concurrent. - for gid, _ := range latestManifest.Groups { + for gid := range latestManifest.Groups { glog.Infof("Exporting group: %d", gid) req := &pb.RestoreRequest{ GroupId: gid, diff --git a/worker/backup_ee.go b/worker/backup_ee.go index 04bd725baf2..61d8796fe65 100644 --- a/worker/backup_ee.go +++ b/worker/backup_ee.go @@ -554,7 +554,7 @@ func (pr *BackupProcessor) CompleteBackup(ctx context.Context, m *Manifest) erro } manifest.Manifests = append(manifest.Manifests, m) - if err := createManifest(handler, uri, manifest); err != nil { + if err := CreateManifest(handler, uri, manifest); err != nil { return errors.Wrap(err, "Complete backup failed") } glog.Infof("Backup completed OK.") @@ -668,7 +668,7 @@ func checkAndGetDropOp(key []byte, l *posting.List, readTs uint64) (*pb.DropOper // * DROP_NS;ns // So, accordingly construct the *pb.DropOperation. dropOp := &pb.DropOperation{} - dropInfo := strings.Split(string(val), ";") + dropInfo := strings.SplitN(string(val), ";", 2) if len(dropInfo) != 2 { return nil, errors.Errorf("Unexpected value: %s for dgraph.drop.op", val) } diff --git a/worker/backup_manifest.go b/worker/backup_manifest.go index e098e1bae46..1fcb5185ee5 100644 --- a/worker/backup_manifest.go +++ b/worker/backup_manifest.go @@ -99,7 +99,7 @@ func getFilteredManifests(h UriHandler, manifests []*Manifest, var validManifests []*Manifest for _, m := range manifests { missingFiles := false - for g, _ := range m.Groups { + for g := range m.Groups { path := filepath.Join(m.Path, backupName(m.ValidReadTs(), g)) if !h.FileExists(path) { missingFiles = true @@ -178,7 +178,7 @@ func readManifest(h UriHandler, path string) (*Manifest, error) { func GetLatestManifest(h UriHandler, uri *url.URL) (*Manifest, error) { manifest, err := GetManifest(h, uri) if err != nil { - return &Manifest{}, errors.Wrap(err, "Fialed to get the manifest") + return &Manifest{}, errors.Wrap(err, "Failed to get the manifest") } if len(manifest.Manifests) == 0 { return &Manifest{}, nil @@ -210,7 +210,7 @@ func GetManifest(h UriHandler, uri *url.URL) (*MasterManifest, error) { return manifest, nil } -func createManifest(h UriHandler, uri *url.URL, manifest *MasterManifest) error { +func CreateManifest(h UriHandler, uri *url.URL, manifest *MasterManifest) error { var err error if !h.DirExists("./") { if err := h.CreateDir("./"); err != nil { From 10bb70effefd79ab4fe5a6f61960fb11c76764e1 Mon Sep 17 00:00:00 2001 From: NamanJain8 Date: Thu, 13 May 2021 20:14:52 +0530 Subject: [PATCH 08/10] handle unicode error more robustly --- dgraph/cmd/update_manifest/run.go | 26 +++++++++++++++++++++++--- 1 file changed, 23 insertions(+), 3 deletions(-) diff --git a/dgraph/cmd/update_manifest/run.go b/dgraph/cmd/update_manifest/run.go index 5a49ab2303b..1a650243649 100644 --- a/dgraph/cmd/update_manifest/run.go +++ b/dgraph/cmd/update_manifest/run.go @@ -21,6 +21,7 @@ import ( "log" "net/url" "os" + "strings" "github.com/dgraph-io/dgraph/ee" "github.com/dgraph-io/dgraph/protos/pb" @@ -63,6 +64,17 @@ func init() { ee.RegisterEncFlag(flag) } +// Invalid bytes are replaced with the Unicode replacement rune. +// See https://golang.org/pkg/encoding/json/#Marshal +const replacementRune = rune('\ufffd') + +func parseNsAttr(attr string) (uint64, string, error) { + if strings.ContainsRune(attr, replacementRune) { + return 0, "", errors.New("replacement char found") + } + return binary.BigEndian.Uint64([]byte(attr[:8])), attr[8:], nil +} + func run() error { keys, err := ee.GetKeys(UpdateManifest.Conf) if err != nil { @@ -87,15 +99,23 @@ func run() error { for gid, preds := range manifest.Groups { parsedPreds := preds[:0] for _, pred := range preds { - ns, attr := binary.BigEndian.Uint64([]byte(pred[:8])), pred[8:] + ns, attr, err := parseNsAttr(pred) + if err != nil { + logger.Printf("Unable to parse the pred: %v", pred) + continue + } parsedPreds = append(parsedPreds, x.NamespaceAttr(ns, attr)) } manifest.Groups[gid] = parsedPreds } for _, op := range manifest.DropOperations { if op.DropOp == pb.DropOperation_ATTR { - nsattr := op.DropValue - ns, attr := binary.BigEndian.Uint64([]byte(nsattr[:8])), nsattr[8:] + ns, attr, err := parseNsAttr(op.DropValue) + if err != nil { + logger.Printf("Unable to parse the drop operation %+v pred: %v", + op, []byte(op.DropValue)) + continue + } op.DropValue = x.NamespaceAttr(ns, attr) } } From 84d034ebba563e3dd373f6d84ae817db0ea60cab Mon Sep 17 00:00:00 2001 From: NamanJain8 Date: Thu, 13 May 2021 20:50:23 +0530 Subject: [PATCH 09/10] golint happy --- dgraph/cmd/root.go | 3 +-- dgraph/cmd/root_ee.go | 2 ++ dgraph/cmd/update_manifest/run.go | 10 ++++------ 3 files changed, 7 insertions(+), 8 deletions(-) diff --git a/dgraph/cmd/root.go b/dgraph/cmd/root.go index 8001397a254..e7fa77d5089 100644 --- a/dgraph/cmd/root.go +++ b/dgraph/cmd/root.go @@ -38,7 +38,6 @@ import ( "github.com/dgraph-io/dgraph/dgraph/cmd/increment" "github.com/dgraph-io/dgraph/dgraph/cmd/live" "github.com/dgraph-io/dgraph/dgraph/cmd/migrate" - "github.com/dgraph-io/dgraph/dgraph/cmd/update_manifest" "github.com/dgraph-io/dgraph/dgraph/cmd/version" "github.com/dgraph-io/dgraph/dgraph/cmd/zero" "github.com/dgraph-io/dgraph/upgrade" @@ -85,7 +84,7 @@ var rootConf = viper.New() var subcommands = []*x.SubCommand{ &bulk.Bulk, &cert.Cert, &conv.Conv, &live.Live, &alpha.Alpha, &zero.Zero, &version.Version, &debug.Debug, &migrate.Migrate, &debuginfo.DebugInfo, &upgrade.Upgrade, &decrypt.Decrypt, - &increment.Increment, &update_manifest.UpdateManifest, + &increment.Increment, } func initCmds() { diff --git a/dgraph/cmd/root_ee.go b/dgraph/cmd/root_ee.go index 70c2a7d8b5f..c9399c1b820 100644 --- a/dgraph/cmd/root_ee.go +++ b/dgraph/cmd/root_ee.go @@ -13,6 +13,7 @@ package cmd import ( + "github.com/dgraph-io/dgraph/dgraph/cmd/updatemanifest" acl "github.com/dgraph-io/dgraph/ee/acl" "github.com/dgraph-io/dgraph/ee/audit" "github.com/dgraph-io/dgraph/ee/backup" @@ -25,5 +26,6 @@ func init() { &backup.ExportBackup, &acl.CmdAcl, &audit.CmdAudit, + &updatemanifest.UpdateManifest, ) } diff --git a/dgraph/cmd/update_manifest/run.go b/dgraph/cmd/update_manifest/run.go index 1a650243649..3f605d08547 100644 --- a/dgraph/cmd/update_manifest/run.go +++ b/dgraph/cmd/update_manifest/run.go @@ -1,3 +1,5 @@ +// +build !oss + /* * Copyright 2021 Dgraph Labs, Inc. and Contributors * @@ -14,7 +16,7 @@ * limitations under the License. */ -package update_manifest +package updatemanifest import ( "encoding/binary" @@ -35,7 +37,6 @@ var ( logger = log.New(os.Stderr, "", 0) // UpdateManifest is the sub-command invoked when running "dgraph update_manifest". UpdateManifest x.SubCommand - quiet bool // enabling quiet mode would suppress the warning logs ) var opt struct { @@ -122,8 +123,5 @@ func run() error { } // Rewrite the master manifest. - if err := worker.CreateManifest(handler, uri, masterManifest); err != nil { - return errors.Wrap(err, "Complete backup failed") - } - return nil + return errors.Wrap(worker.CreateManifest(handler, uri, masterManifest), "rewrite failed") } From fdfba66052d9578e19dea9fa7f3b8bd9f37f451b Mon Sep 17 00:00:00 2001 From: NamanJain8 Date: Thu, 13 May 2021 20:58:29 +0530 Subject: [PATCH 10/10] fix build --- dgraph/cmd/root_ee.go | 2 +- {dgraph/cmd/update_manifest => ee/updatemanifest}/run.go | 0 2 files changed, 1 insertion(+), 1 deletion(-) rename {dgraph/cmd/update_manifest => ee/updatemanifest}/run.go (100%) diff --git a/dgraph/cmd/root_ee.go b/dgraph/cmd/root_ee.go index c9399c1b820..80f5591eca5 100644 --- a/dgraph/cmd/root_ee.go +++ b/dgraph/cmd/root_ee.go @@ -13,10 +13,10 @@ package cmd import ( - "github.com/dgraph-io/dgraph/dgraph/cmd/updatemanifest" acl "github.com/dgraph-io/dgraph/ee/acl" "github.com/dgraph-io/dgraph/ee/audit" "github.com/dgraph-io/dgraph/ee/backup" + "github.com/dgraph-io/dgraph/ee/updatemanifest" ) func init() { diff --git a/dgraph/cmd/update_manifest/run.go b/ee/updatemanifest/run.go similarity index 100% rename from dgraph/cmd/update_manifest/run.go rename to ee/updatemanifest/run.go