From 894672330e7cc48d31db4ddb2a22113f4046f897 Mon Sep 17 00:00:00 2001 From: Martin Martinez Rivera Date: Mon, 9 Mar 2020 17:35:25 -0700 Subject: [PATCH 1/9] Initial commit. --- dgraph/cmd/debug/run.go | 2 +- posting/list.go | 59 ++++++++++++++++++++++++++++++++++++++++- posting/mvcc.go | 15 +++-------- 3 files changed, 62 insertions(+), 14 deletions(-) diff --git a/dgraph/cmd/debug/run.go b/dgraph/cmd/debug/run.go index b705cea8483..29e8d822e14 100644 --- a/dgraph/cmd/debug/run.go +++ b/dgraph/cmd/debug/run.go @@ -121,7 +121,7 @@ func uidToVal(itr *badger.Iterator, prefix string) map[uint64]int { lastKey = append(lastKey[:0], item.Key()...) pk, err := x.Parse(item.Key()) x.Check(err) - if !pk.IsData() || !strings.HasPrefix(pk.Attr, prefix) { + if !pk.IsData() || !strings.HasPrefix(pk.Attr, prefix) || pk.HasStartUid { continue } if pk.IsSchema() { diff --git a/posting/list.go b/posting/list.go index 303879063ec..4b1b322a335 100644 --- a/posting/list.go +++ b/posting/list.go @@ -496,6 +496,10 @@ func (l *List) setMutation(startTs uint64, data []byte) { // return errStopIteration // to break iteration. // }) func (l *List) Iterate(readTs uint64, afterUid uint64, f func(obj *pb.Posting) error) error { + if l == nil { + return nil + } + l.RLock() defer l.RUnlock() return l.iterate(readTs, afterUid, f) @@ -661,6 +665,10 @@ loop: // IsEmpty returns true if there are no uids at the given timestamp after the given UID. func (l *List) IsEmpty(readTs, afterUid uint64) (bool, error) { + if l == nil { + return true, nil + } + l.RLock() defer l.RUnlock() var count int @@ -689,6 +697,10 @@ func (l *List) length(readTs, afterUid uint64) int { // Length iterates over the mutation layer and counts number of elements. func (l *List) Length(readTs, afterUid uint64) int { + if l == nil { + return 0 + } + l.RLock() defer l.RUnlock() return l.length(readTs, afterUid) @@ -714,6 +726,10 @@ func (l *List) Length(readTs, afterUid uint64) int { // to be deleted, at which point the entire list will be marked for deletion. // As the list grows, existing parts might be split if they become too big. func (l *List) Rollup() ([]*bpb.KV, error) { + if l == nil { + return nil, nil + } + l.RLock() defer l.RUnlock() out, err := l.rollup(math.MaxUint64, true) @@ -749,6 +765,10 @@ func (l *List) Rollup() ([]*bpb.KV, error) { // SingleListRollup works like rollup but generates a single list with no splits. // It's used during backup so that each backed up posting list is stored in a single key. func (l *List) SingleListRollup() (*bpb.KV, error) { + if l == nil { + return nil, nil + } + l.RLock() defer l.RUnlock() @@ -906,6 +926,10 @@ func (l *List) rollup(readTs uint64, split bool) (*rollupOutput, error) { // ApproxLen returns an approximate count of the UIDs in the posting list. func (l *List) ApproxLen() int { + if l == nil { + return 0 + } + l.RLock() defer l.RUnlock() return len(l.mutationMap) + codec.ApproxLen(l.plist.Pack) @@ -915,6 +939,10 @@ func (l *List) ApproxLen() int { // We have to apply the filtering before applying (offset, count). // WARNING: Calling this function just to get UIDs is expensive func (l *List) Uids(opt ListOptions) (*pb.List, error) { + if l == nil { + return nil, nil + } + // Pre-assign length to make it faster. l.RLock() // Use approximate length for initial capacity. @@ -953,6 +981,10 @@ func (l *List) Uids(opt ListOptions) (*pb.List, error) { // Postings calls postFn with the postings that are common with // UIDs in the opt ListOptions. func (l *List) Postings(opt ListOptions, postFn func(*pb.Posting) error) error { + if l == nil { + return nil + } + l.RLock() defer l.RUnlock() @@ -968,6 +1000,9 @@ func (l *List) Postings(opt ListOptions, postFn func(*pb.Posting) error) error { // AllUntaggedValues returns all the values in the posting list with no language tag. func (l *List) AllUntaggedValues(readTs uint64) ([]types.Val, error) { + if l == nil { + return nil, nil + } l.RLock() defer l.RUnlock() @@ -1003,6 +1038,9 @@ func (l *List) allUntaggedFacets(readTs uint64) ([]*pb.Facets, error) { // AllValues returns all the values in the posting list. func (l *List) AllValues(readTs uint64) ([]types.Val, error) { + if l == nil { + return nil, nil + } l.RLock() defer l.RUnlock() @@ -1020,6 +1058,9 @@ func (l *List) AllValues(readTs uint64) ([]types.Val, error) { // GetLangTags finds the language tags of each posting in the list. func (l *List) GetLangTags(readTs uint64) ([]string, error) { + if l == nil { + return nil, nil + } l.RLock() defer l.RUnlock() @@ -1035,6 +1076,10 @@ func (l *List) GetLangTags(readTs uint64) ([]string, error) { // Value returns the default value from the posting list. The default value is // defined as the value without a language tag. func (l *List) Value(readTs uint64) (rval types.Val, rerr error) { + if l == nil { + return rval, nil + } + l.RLock() defer l.RUnlock() val, found, err := l.findValue(readTs, math.MaxUint64) @@ -1054,6 +1099,9 @@ func (l *List) Value(readTs uint64) (rval types.Val, rerr error) { // If list consists of one or more languages, first available value is returned. // If no language from the list matches the values, processing is the same as for empty list. func (l *List) ValueFor(readTs uint64, langs []string) (rval types.Val, rerr error) { + if l == nil { + return rval, nil + } l.RLock() // All public methods should acquire locks, while private ones should assert them. defer l.RUnlock() p, err := l.postingFor(readTs, langs) @@ -1066,6 +1114,9 @@ func (l *List) ValueFor(readTs uint64, langs []string) (rval types.Val, rerr err // PostingFor returns the posting according to the preferred language list. func (l *List) PostingFor(readTs uint64, langs []string) (p *pb.Posting, rerr error) { + if l == nil { + return nil, nil + } l.RLock() defer l.RUnlock() return l.postingFor(readTs, langs) @@ -1078,6 +1129,10 @@ func (l *List) postingFor(readTs uint64, langs []string) (p *pb.Posting, rerr er // ValueForTag returns the value in the posting list with the given language tag. func (l *List) ValueForTag(readTs uint64, tag string) (rval types.Val, rerr error) { + if l == nil { + return rval, nil + } + l.RLock() defer l.RUnlock() p, err := l.postingForTag(readTs, tag) @@ -1191,7 +1246,9 @@ func (l *List) findPosting(readTs uint64, uid uint64) (found bool, pos *pb.Posti // Facets gives facets for the posting representing value. func (l *List) Facets(readTs uint64, param *pb.FacetParams, langs []string, listType bool) ([]*pb.Facets, error) { - + if l == nil { + return nil, nil + } l.RLock() defer l.RUnlock() diff --git a/posting/mvcc.go b/posting/mvcc.go index d5aedeba218..237cb7fd6e1 100644 --- a/posting/mvcc.go +++ b/posting/mvcc.go @@ -35,7 +35,7 @@ var ( ErrTsTooOld = errors.Errorf("Transaction is too old") // ErrInvalidKey is returned when trying to read a posting list using // an invalid key (e.g the key to a single part of a larger multi-part list). - ErrInvalidKey = errors.Errorf("cannot read posting list from this key") + ErrInvalidKey = errors.Errorf("cannot read posting list using multi-part list key") ) // ShouldAbort returns whether the transaction should be aborted. @@ -155,7 +155,8 @@ func ReadPostingList(key []byte, it *badger.Iterator) (*List, error) { if pk.HasStartUid { // Trying to read a single part of a multi part list. This type of list // should be read once using the canonical list (with startUid equal to zero). - return nil, ErrInvalidKey + // Return a nil list. + return nil, nil } l := new(List) @@ -184,16 +185,6 @@ func ReadPostingList(key []byte, it *badger.Iterator) (*List, error) { } l.minTs = item.Version() - // If this list is a multi-part list, advance past the keys holding the parts. - if len(l.plist.GetSplits()) > 0 { - lastKey, err := x.GetSplitKey(key, math.MaxUint64) - if err != nil { - return nil, errors.Wrapf(err, - "while advancing past the end of multi-part list with key [%v]", key) - } - it.Seek(lastKey) - } - // No need to do Next here. The outer loop can take care of skipping // more versions of the same key. return l, nil From ffcffa5136a502c4191fc70a8c93163f8d99887d Mon Sep 17 00:00:00 2001 From: Martin Martinez Rivera Date: Tue, 10 Mar 2020 09:57:02 -0700 Subject: [PATCH 2/9] Revert debug tool change. --- dgraph/cmd/debug/run.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dgraph/cmd/debug/run.go b/dgraph/cmd/debug/run.go index 29e8d822e14..b705cea8483 100644 --- a/dgraph/cmd/debug/run.go +++ b/dgraph/cmd/debug/run.go @@ -121,7 +121,7 @@ func uidToVal(itr *badger.Iterator, prefix string) map[uint64]int { lastKey = append(lastKey[:0], item.Key()...) pk, err := x.Parse(item.Key()) x.Check(err) - if !pk.IsData() || !strings.HasPrefix(pk.Attr, prefix) || pk.HasStartUid { + if !pk.IsData() || !strings.HasPrefix(pk.Attr, prefix) { continue } if pk.IsSchema() { From 5050ff5d01246a2e2a7eb4bd913ef6cc723e7539 Mon Sep 17 00:00:00 2001 From: Martin Martinez Rivera Date: Tue, 10 Mar 2020 10:09:25 -0700 Subject: [PATCH 3/9] Add comment. --- posting/mvcc.go | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/posting/mvcc.go b/posting/mvcc.go index 237cb7fd6e1..90130e137e1 100644 --- a/posting/mvcc.go +++ b/posting/mvcc.go @@ -154,8 +154,11 @@ func ReadPostingList(key []byte, it *badger.Iterator) (*List, error) { } if pk.HasStartUid { // Trying to read a single part of a multi part list. This type of list - // should be read once using the canonical list (with startUid equal to zero). - // Return a nil list. + // should be read using using the main key because the information needed + // to access the whole list is stored there. + // The function returns a nil list instead. This is safe to do because all + // public methods of the List object are no-ops and the list is being already + // accessed via the main key in the places where this code is reached (e.g rollups). return nil, nil } From b45a4ba5246e1952a5f4ff81cc95a6b3ddbd1529 Mon Sep 17 00:00:00 2001 From: Martin Martinez Rivera Date: Thu, 12 Mar 2020 15:53:55 -0700 Subject: [PATCH 4/9] Change keys.go to havbe different prefix for split keys. --- x/keys.go | 74 ++++++++++++++------------------------------------ x/keys_test.go | 2 +- 2 files changed, 21 insertions(+), 55 deletions(-) diff --git a/x/keys.go b/x/keys.go index 614f00416ab..102ebbad5a2 100644 --- a/x/keys.go +++ b/x/keys.go @@ -45,9 +45,8 @@ const ( DefaultPrefix = byte(0x00) byteSchema = byte(0x01) byteType = byte(0x02) - // ByteSplit is a constant to specify a given key corresponds to a posting list split - // into multiple parts. - ByteSplit = byte(0x01) + // byteSplit signals that the key stores an individual part of a multi-part list. + byteSplit = byte(0x04) // ByteUnused is a constant to specify keys which need to be discarded. ByteUnused = byte(0xff) ) @@ -101,27 +100,21 @@ func TypeKey(attr string) []byte { // DataKey generates a data key with the given attribute and UID. // The structure of a data key is as follows: // -// byte 0: key type prefix (set to DefaultPrefix) +// byte 0: key type prefix (set to DefaultPrefix or byteSplit if part of a multi-part list) // byte 1-2: length of attr // next len(attr) bytes: value of attr // next byte: data type prefix (set to ByteData) -// next byte: byte to determine if this key corresponds to a list that has been split -// into multiple parts // next eight bytes: value of uid // next eight bytes (optional): if the key corresponds to a split list, the startUid of -// the split stored in this key. +// the split stored in this key and the first byte will be sets to byteSplit. func DataKey(attr string, uid uint64) []byte { prefixLen := 1 + 2 + len(attr) - totalLen := prefixLen + 1 + 1 + 8 + totalLen := prefixLen + 1 + 8 buf := generateKey(DefaultPrefix, attr, totalLen) rest := buf[prefixLen:] rest[0] = ByteData - // By default, this key does not correspond to a part of a split key. - rest = rest[1:] - rest[0] = 0 - rest = rest[1:] binary.BigEndian.PutUint64(rest, uid) return buf @@ -130,27 +123,21 @@ func DataKey(attr string, uid uint64) []byte { // ReverseKey generates a reverse key with the given attribute and UID. // The structure of a reverse key is as follows: // -// byte 0: key type prefix (set to DefaultPrefix) +// byte 0: key type prefix (set to DefaultPrefix or byteSplit if part of a multi-part list) // byte 1-2: length of attr // next len(attr) bytes: value of attr // next byte: data type prefix (set to ByteReverse) -// next byte: byte to determine if this key corresponds to a list that has been split -// into multiple parts // next eight bytes: value of uid // next eight bytes (optional): if the key corresponds to a split list, the startUid of // the split stored in this key. func ReverseKey(attr string, uid uint64) []byte { prefixLen := 1 + 2 + len(attr) - totalLen := prefixLen + 1 + 1 + 8 + totalLen := prefixLen + 1 + 8 buf := generateKey(DefaultPrefix, attr, totalLen) rest := buf[prefixLen:] rest[0] = ByteReverse - // By default, this key does not correspond to a part of a split key. - rest = rest[1:] - rest[0] = 0 - rest = rest[1:] binary.BigEndian.PutUint64(rest, uid) return buf @@ -159,27 +146,21 @@ func ReverseKey(attr string, uid uint64) []byte { // IndexKey generates a index key with the given attribute and term. // The structure of an index key is as follows: // -// byte 0: key type prefix (set to DefaultPrefix) +// byte 0: key type prefix (set to DefaultPrefix or byteSplit if part of a multi-part list) // byte 1-2: length of attr // next len(attr) bytes: value of attr // next byte: data type prefix (set to ByteIndex) -// next byte: byte to determine if this key corresponds to a list that has been split -// into multiple parts // next len(term) bytes: value of term // next eight bytes (optional): if the key corresponds to a split list, the startUid of // the split stored in this key. func IndexKey(attr, term string) []byte { prefixLen := 1 + 2 + len(attr) - totalLen := prefixLen + 1 + 1 + len(term) + totalLen := prefixLen + 1 + len(term) buf := generateKey(DefaultPrefix, attr, totalLen) rest := buf[prefixLen:] rest[0] = ByteIndex - // By default, this key does not correspond to a part of a split key. - rest = rest[1:] - rest[0] = 0 - rest = rest[1:] AssertTrue(len(term) == copy(rest, term)) return buf @@ -192,13 +173,10 @@ func IndexKey(attr, term string) []byte { // byte 1-2: length of attr // next len(attr) bytes: value of attr // next byte: data type prefix (set to ByteCount or ByteCountRev) -// next byte: byte to determine if this key corresponds to a list that has been split -// into multiple parts. Since count indexes only store one number, this value will -// always be zero. // next four bytes: value of count. func CountKey(attr string, count uint32, reverse bool) []byte { prefixLen := 1 + 2 + len(attr) - totalLen := prefixLen + 1 + 1 + 4 + totalLen := prefixLen + 1 + 4 buf := generateKey(DefaultPrefix, attr, totalLen) rest := buf[prefixLen:] @@ -208,10 +186,6 @@ func CountKey(attr string, count uint32, reverse bool) []byte { rest[0] = ByteCount } - // By default, this key does not correspond to a part of a split key. - rest = rest[1:] - rest[0] = 0 - rest = rest[1:] binary.BigEndian.PutUint32(rest, count) return buf @@ -231,12 +205,12 @@ type ParsedKey struct { // IsData returns whether the key is a data key. func (p ParsedKey) IsData() bool { - return p.bytePrefix == DefaultPrefix && p.byteType == ByteData + return (p.bytePrefix == DefaultPrefix || p.bytePrefix == byteSplit) && p.byteType == ByteData } // IsReverse returns whether the key is a reverse key. func (p ParsedKey) IsReverse() bool { - return p.bytePrefix == DefaultPrefix && p.byteType == ByteReverse + return (p.bytePrefix == DefaultPrefix || p.bytePrefix == byteSplit) && p.byteType == ByteReverse } // IsCountOrCountRev returns whether the key is a count or a count rev key. @@ -246,17 +220,17 @@ func (p ParsedKey) IsCountOrCountRev() bool { // IsCount returns whether the key is a count key. func (p ParsedKey) IsCount() bool { - return p.bytePrefix == DefaultPrefix && p.byteType == ByteCount + return (p.bytePrefix == DefaultPrefix || p.bytePrefix == byteSplit) && p.byteType == ByteCount } // IsCountRev returns whether the key is a count rev key. func (p ParsedKey) IsCountRev() bool { - return p.bytePrefix == DefaultPrefix && p.byteType == ByteCountRev + return (p.bytePrefix == DefaultPrefix || p.bytePrefix == byteSplit) && p.byteType == ByteCountRev } // IsIndex returns whether the key is an index key. func (p ParsedKey) IsIndex() bool { - return p.bytePrefix == DefaultPrefix && p.byteType == ByteIndex + return (p.bytePrefix == DefaultPrefix || p.bytePrefix == byteSplit) && p.byteType == ByteIndex } // IsSchema returns whether the key is a schema key. @@ -451,16 +425,9 @@ func GetSplitKey(baseKey []byte, startUid uint64) ([]byte, error) { keyCopy := make([]byte, len(baseKey)+8) copy(keyCopy, baseKey) - p, err := Parse(baseKey) - if err != nil { - return nil, err - } - - index := 1 + 2 + len(p.Attr) + 1 - if index >= len(keyCopy) { - panic("Cannot write to key. Key is too small") - } - keyCopy[index] = ByteSplit + // Change the first byte (i.e the key prefix) to byteSplit to signal this is an + // individual part of a single list key. + keyCopy[0] = byteSplit binary.BigEndian.PutUint64(keyCopy[len(baseKey):], startUid) return keyCopy, nil @@ -476,6 +443,8 @@ func Parse(key []byte) (ParsedKey, error) { return p, nil } + p.HasStartUid = key[0] == byteSplit + sz := int(binary.BigEndian.Uint16(key[1:3])) k := key[3:] @@ -491,9 +460,6 @@ func Parse(key []byte) (ParsedKey, error) { p.byteType = k[0] k = k[1:] - p.HasStartUid = k[0] == ByteSplit - k = k[1:] - switch p.byteType { case ByteData, ByteReverse: if len(k) < 8 { diff --git a/x/keys_test.go b/x/keys_test.go index 6cadabd8808..1a3c38bf0c6 100644 --- a/x/keys_test.go +++ b/x/keys_test.go @@ -55,7 +55,7 @@ func TestDataKey(t *testing.T) { } } -func TestParseDataKeysWithStartUid(t *testing.T) { +func TestParseDataKeyWithStartUid(t *testing.T) { var uid uint64 startUid := uint64(math.MaxUint64) for uid = 0; uid < 1001; uid++ { From 8ed39df55485409e4b6baaa55fdbec0612f9ef44 Mon Sep 17 00:00:00 2001 From: Martin Martinez Rivera Date: Thu, 12 Mar 2020 16:24:03 -0700 Subject: [PATCH 5/9] Fix build --- posting/index.go | 9 +++++---- x/keys.go | 28 ++++++++++++++-------------- 2 files changed, 19 insertions(+), 18 deletions(-) diff --git a/posting/index.go b/posting/index.go index cb8b85e41f4..5085be736c8 100644 --- a/posting/index.go +++ b/posting/index.go @@ -485,7 +485,7 @@ func deleteTokensFor(attr, tokenizerName string, hasLang bool) error { // Also delete all the parts of any list that has been split into multiple parts. // Such keys have a different prefix (the last byte is set to 1). prefix = pk.IndexPrefix() - prefix[len(prefix)-1] = x.ByteSplit + prefix[0] = x.ByteSplit prefix = append(prefix, tokenizer.Identifier()) return pstore.DropPrefix(prefix) } @@ -500,7 +500,7 @@ func deleteReverseEdges(attr string) error { // Also delete all the parts of any list that has been split into multiple parts. // Such keys have a different prefix (the last byte is set to 1). prefix = pk.ReversePrefix() - prefix[len(prefix)-1] = x.ByteSplit + prefix[0] = x.ByteSplit return pstore.DropPrefix(prefix) } @@ -517,13 +517,14 @@ func deleteCountIndex(attr string) error { // Also delete all the parts of any list that has been split into multiple parts. // Such keys have a different prefix (the last byte is set to 1). prefix := pk.CountPrefix(false) - prefix[len(prefix)-1] = x.ByteSplit + prefix[0] = x.ByteSplit if err := pstore.DropPrefix(prefix); err != nil { return err } + // Delete parts for count-reverse index. prefix = pk.CountPrefix(true) - prefix[len(prefix)-1] = x.ByteSplit + prefix[0] = x.ByteSplit return pstore.DropPrefix(prefix) } diff --git a/x/keys.go b/x/keys.go index 102ebbad5a2..266097a3cbc 100644 --- a/x/keys.go +++ b/x/keys.go @@ -45,8 +45,8 @@ const ( DefaultPrefix = byte(0x00) byteSchema = byte(0x01) byteType = byte(0x02) - // byteSplit signals that the key stores an individual part of a multi-part list. - byteSplit = byte(0x04) + // ByteSplit signals that the key stores an individual part of a multi-part list. + ByteSplit = byte(0x04) // ByteUnused is a constant to specify keys which need to be discarded. ByteUnused = byte(0xff) ) @@ -100,13 +100,13 @@ func TypeKey(attr string) []byte { // DataKey generates a data key with the given attribute and UID. // The structure of a data key is as follows: // -// byte 0: key type prefix (set to DefaultPrefix or byteSplit if part of a multi-part list) +// byte 0: key type prefix (set to DefaultPrefix or ByteSplit if part of a multi-part list) // byte 1-2: length of attr // next len(attr) bytes: value of attr // next byte: data type prefix (set to ByteData) // next eight bytes: value of uid // next eight bytes (optional): if the key corresponds to a split list, the startUid of -// the split stored in this key and the first byte will be sets to byteSplit. +// the split stored in this key and the first byte will be sets to ByteSplit. func DataKey(attr string, uid uint64) []byte { prefixLen := 1 + 2 + len(attr) totalLen := prefixLen + 1 + 8 @@ -123,7 +123,7 @@ func DataKey(attr string, uid uint64) []byte { // ReverseKey generates a reverse key with the given attribute and UID. // The structure of a reverse key is as follows: // -// byte 0: key type prefix (set to DefaultPrefix or byteSplit if part of a multi-part list) +// byte 0: key type prefix (set to DefaultPrefix or ByteSplit if part of a multi-part list) // byte 1-2: length of attr // next len(attr) bytes: value of attr // next byte: data type prefix (set to ByteReverse) @@ -146,7 +146,7 @@ func ReverseKey(attr string, uid uint64) []byte { // IndexKey generates a index key with the given attribute and term. // The structure of an index key is as follows: // -// byte 0: key type prefix (set to DefaultPrefix or byteSplit if part of a multi-part list) +// byte 0: key type prefix (set to DefaultPrefix or ByteSplit if part of a multi-part list) // byte 1-2: length of attr // next len(attr) bytes: value of attr // next byte: data type prefix (set to ByteIndex) @@ -205,12 +205,12 @@ type ParsedKey struct { // IsData returns whether the key is a data key. func (p ParsedKey) IsData() bool { - return (p.bytePrefix == DefaultPrefix || p.bytePrefix == byteSplit) && p.byteType == ByteData + return (p.bytePrefix == DefaultPrefix || p.bytePrefix == ByteSplit) && p.byteType == ByteData } // IsReverse returns whether the key is a reverse key. func (p ParsedKey) IsReverse() bool { - return (p.bytePrefix == DefaultPrefix || p.bytePrefix == byteSplit) && p.byteType == ByteReverse + return (p.bytePrefix == DefaultPrefix || p.bytePrefix == ByteSplit) && p.byteType == ByteReverse } // IsCountOrCountRev returns whether the key is a count or a count rev key. @@ -220,17 +220,17 @@ func (p ParsedKey) IsCountOrCountRev() bool { // IsCount returns whether the key is a count key. func (p ParsedKey) IsCount() bool { - return (p.bytePrefix == DefaultPrefix || p.bytePrefix == byteSplit) && p.byteType == ByteCount + return (p.bytePrefix == DefaultPrefix || p.bytePrefix == ByteSplit) && p.byteType == ByteCount } // IsCountRev returns whether the key is a count rev key. func (p ParsedKey) IsCountRev() bool { - return (p.bytePrefix == DefaultPrefix || p.bytePrefix == byteSplit) && p.byteType == ByteCountRev + return (p.bytePrefix == DefaultPrefix || p.bytePrefix == ByteSplit) && p.byteType == ByteCountRev } // IsIndex returns whether the key is an index key. func (p ParsedKey) IsIndex() bool { - return (p.bytePrefix == DefaultPrefix || p.bytePrefix == byteSplit) && p.byteType == ByteIndex + return (p.bytePrefix == DefaultPrefix || p.bytePrefix == ByteSplit) && p.byteType == ByteIndex } // IsSchema returns whether the key is a schema key. @@ -425,9 +425,9 @@ func GetSplitKey(baseKey []byte, startUid uint64) ([]byte, error) { keyCopy := make([]byte, len(baseKey)+8) copy(keyCopy, baseKey) - // Change the first byte (i.e the key prefix) to byteSplit to signal this is an + // Change the first byte (i.e the key prefix) to ByteSplit to signal this is an // individual part of a single list key. - keyCopy[0] = byteSplit + keyCopy[0] = ByteSplit binary.BigEndian.PutUint64(keyCopy[len(baseKey):], startUid) return keyCopy, nil @@ -443,7 +443,7 @@ func Parse(key []byte) (ParsedKey, error) { return p, nil } - p.HasStartUid = key[0] == byteSplit + p.HasStartUid = key[0] == ByteSplit sz := int(binary.BigEndian.Uint16(key[1:3])) k := key[3:] From 1d20ba77d276ce3ce8ca1232c3948fec4a7918a7 Mon Sep 17 00:00:00 2001 From: Martin Martinez Rivera Date: Thu, 12 Mar 2020 17:06:01 -0700 Subject: [PATCH 6/9] Fix test failures in posting. --- x/keys.go | 24 ++++++++++-------------- 1 file changed, 10 insertions(+), 14 deletions(-) diff --git a/x/keys.go b/x/keys.go index 266097a3cbc..89cf2ec8e34 100644 --- a/x/keys.go +++ b/x/keys.go @@ -287,53 +287,49 @@ func (p ParsedKey) SkipType() []byte { // DataPrefix returns the prefix for data keys. func (p ParsedKey) DataPrefix() []byte { - buf := make([]byte, 1+2+len(p.Attr)+1+1) + buf := make([]byte, 1+2+len(p.Attr)+1) buf[0] = p.bytePrefix rest := buf[1:] k := writeAttr(rest, p.Attr) - AssertTrue(len(k) == 2) + AssertTrue(len(k) == 1) k[0] = ByteData - k[1] = 0 return buf } // IndexPrefix returns the prefix for index keys. func (p ParsedKey) IndexPrefix() []byte { - buf := make([]byte, 1+2+len(p.Attr)+1+1) - buf[0] = p.bytePrefix + buf := make([]byte, 1+2+len(p.Attr)+1) + buf[0] = DefaultPrefix rest := buf[1:] k := writeAttr(rest, p.Attr) - AssertTrue(len(k) == 2) + AssertTrue(len(k) == 1) k[0] = ByteIndex - k[1] = 0 return buf } // ReversePrefix returns the prefix for index keys. func (p ParsedKey) ReversePrefix() []byte { - buf := make([]byte, 1+2+len(p.Attr)+1+1) - buf[0] = p.bytePrefix + buf := make([]byte, 1+2+len(p.Attr)+1) + buf[0] = DefaultPrefix rest := buf[1:] k := writeAttr(rest, p.Attr) - AssertTrue(len(k) == 2) + AssertTrue(len(k) == 1) k[0] = ByteReverse - k[1] = 0 return buf } // CountPrefix returns the prefix for count keys. func (p ParsedKey) CountPrefix(reverse bool) []byte { - buf := make([]byte, 1+2+len(p.Attr)+1+1) + buf := make([]byte, 1+2+len(p.Attr)+1) buf[0] = p.bytePrefix rest := buf[1:] k := writeAttr(rest, p.Attr) - AssertTrue(len(k) == 2) + AssertTrue(len(k) == 1) if reverse { k[0] = ByteCountRev } else { k[0] = ByteCount } - k[1] = 0 return buf } From 9a2cd0b13805cd86ffd5d303ddf5f1d2457f5600 Mon Sep 17 00:00:00 2001 From: Martin Martinez Rivera Date: Thu, 12 Mar 2020 17:25:08 -0700 Subject: [PATCH 7/9] Only rollup keys with the default prefix. --- worker/draft.go | 1 + worker/export.go | 6 ++++++ 2 files changed, 7 insertions(+) diff --git a/worker/draft.go b/worker/draft.go index a42605f883a..ac4eea16358 100644 --- a/worker/draft.go +++ b/worker/draft.go @@ -1095,6 +1095,7 @@ func (n *node) rollupLists(readTs uint64) error { } stream := pstore.NewStreamAt(readTs) + stream.Prefix = []byte{x.DefaultPrefix} stream.LogPrefix = "Rolling up" stream.ChooseKey = func(item *badger.Item) bool { switch item.UserMeta() { diff --git a/worker/export.go b/worker/export.go index d9d4b97142e..d8721308a8b 100644 --- a/worker/export.go +++ b/worker/export.go @@ -438,6 +438,12 @@ func export(ctx context.Context, in *pb.ExportRequest) error { return false } + // Do not pick keys storing parts of a multi-part list. They will be read + // from the main key. + if pk.HasStartUid { + return false + } + // _predicate_ is deprecated but leaving this here so that users with a // binary with version >= 1.1 can export data from a version < 1.1 without // this internal data showing up. From 308d3741e8658d2759976b61abca27e49d113d1d Mon Sep 17 00:00:00 2001 From: Martin Martinez Rivera Date: Fri, 13 Mar 2020 13:28:15 -0700 Subject: [PATCH 8/9] Add comment. --- x/keys.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/x/keys.go b/x/keys.go index ac4bdf9db72..968aa156689 100644 --- a/x/keys.go +++ b/x/keys.go @@ -425,8 +425,8 @@ func GetSplitKey(baseKey []byte, startUid uint64) ([]byte, error) { // individual part of a single list key. keyCopy[0] = ByteSplit + // Append the start uid at the end of the key. binary.BigEndian.PutUint64(keyCopy[len(baseKey):], startUid) - return keyCopy, nil } From 45738bdd18626f4d1c1f3f307d8cac9232721db1 Mon Sep 17 00:00:00 2001 From: Martin Martinez Rivera Date: Mon, 16 Mar 2020 16:01:09 -0700 Subject: [PATCH 9/9] Address review comments. --- posting/list.go | 62 ++------------------------------------------ posting/list_test.go | 2 +- posting/mvcc.go | 2 +- x/keys.go | 9 ++++--- x/keys_test.go | 10 +++---- 5 files changed, 15 insertions(+), 70 deletions(-) diff --git a/posting/list.go b/posting/list.go index c408359b9db..55ac35a3c46 100644 --- a/posting/list.go +++ b/posting/list.go @@ -540,10 +540,6 @@ func (l *List) setMutation(startTs uint64, data []byte) { // return errStopIteration // to break iteration. // }) func (l *List) Iterate(readTs uint64, afterUid uint64, f func(obj *pb.Posting) error) error { - if l == nil { - return nil - } - l.RLock() defer l.RUnlock() return l.iterate(readTs, afterUid, f) @@ -709,10 +705,6 @@ loop: // IsEmpty returns true if there are no uids at the given timestamp after the given UID. func (l *List) IsEmpty(readTs, afterUid uint64) (bool, error) { - if l == nil { - return true, nil - } - l.RLock() defer l.RUnlock() var count int @@ -741,10 +733,6 @@ func (l *List) length(readTs, afterUid uint64) int { // Length iterates over the mutation layer and counts number of elements. func (l *List) Length(readTs, afterUid uint64) int { - if l == nil { - return 0 - } - l.RLock() defer l.RUnlock() return l.length(readTs, afterUid) @@ -770,10 +758,6 @@ func (l *List) Length(readTs, afterUid uint64) int { // to be deleted, at which point the entire list will be marked for deletion. // As the list grows, existing parts might be split if they become too big. func (l *List) Rollup() ([]*bpb.KV, error) { - if l == nil { - return nil, nil - } - l.RLock() defer l.RUnlock() out, err := l.rollup(math.MaxUint64, true) @@ -809,10 +793,6 @@ func (l *List) Rollup() ([]*bpb.KV, error) { // SingleListRollup works like rollup but generates a single list with no splits. // It's used during backup so that each backed up posting list is stored in a single key. func (l *List) SingleListRollup() (*bpb.KV, error) { - if l == nil { - return nil, nil - } - l.RLock() defer l.RUnlock() @@ -838,7 +818,7 @@ func (out *rollupOutput) marshalPostingListPart( baseKey []byte, startUid uint64, plist *pb.PostingList) (*bpb.KV, error) { kv := &bpb.KV{} kv.Version = out.newMinTs - key, err := x.GetSplitKey(baseKey, startUid) + key, err := x.SplitKey(baseKey, startUid) if err != nil { return nil, errors.Wrapf(err, "cannot generate split key for list with base key %s and start UID %d", @@ -970,10 +950,6 @@ func (l *List) rollup(readTs uint64, split bool) (*rollupOutput, error) { // ApproxLen returns an approximate count of the UIDs in the posting list. func (l *List) ApproxLen() int { - if l == nil { - return 0 - } - l.RLock() defer l.RUnlock() return len(l.mutationMap) + codec.ApproxLen(l.plist.Pack) @@ -983,10 +959,6 @@ func (l *List) ApproxLen() int { // We have to apply the filtering before applying (offset, count). // WARNING: Calling this function just to get UIDs is expensive func (l *List) Uids(opt ListOptions) (*pb.List, error) { - if l == nil { - return nil, nil - } - // Pre-assign length to make it faster. l.RLock() // Use approximate length for initial capacity. @@ -1025,10 +997,6 @@ func (l *List) Uids(opt ListOptions) (*pb.List, error) { // Postings calls postFn with the postings that are common with // UIDs in the opt ListOptions. func (l *List) Postings(opt ListOptions, postFn func(*pb.Posting) error) error { - if l == nil { - return nil - } - l.RLock() defer l.RUnlock() @@ -1044,9 +1012,6 @@ func (l *List) Postings(opt ListOptions, postFn func(*pb.Posting) error) error { // AllUntaggedValues returns all the values in the posting list with no language tag. func (l *List) AllUntaggedValues(readTs uint64) ([]types.Val, error) { - if l == nil { - return nil, nil - } l.RLock() defer l.RUnlock() @@ -1082,9 +1047,6 @@ func (l *List) allUntaggedFacets(readTs uint64) ([]*pb.Facets, error) { // AllValues returns all the values in the posting list. func (l *List) AllValues(readTs uint64) ([]types.Val, error) { - if l == nil { - return nil, nil - } l.RLock() defer l.RUnlock() @@ -1102,9 +1064,6 @@ func (l *List) AllValues(readTs uint64) ([]types.Val, error) { // GetLangTags finds the language tags of each posting in the list. func (l *List) GetLangTags(readTs uint64) ([]string, error) { - if l == nil { - return nil, nil - } l.RLock() defer l.RUnlock() @@ -1120,10 +1079,6 @@ func (l *List) GetLangTags(readTs uint64) ([]string, error) { // Value returns the default value from the posting list. The default value is // defined as the value without a language tag. func (l *List) Value(readTs uint64) (rval types.Val, rerr error) { - if l == nil { - return rval, nil - } - l.RLock() defer l.RUnlock() val, found, err := l.findValue(readTs, math.MaxUint64) @@ -1143,9 +1098,6 @@ func (l *List) Value(readTs uint64) (rval types.Val, rerr error) { // If list consists of one or more languages, first available value is returned. // If no language from the list matches the values, processing is the same as for empty list. func (l *List) ValueFor(readTs uint64, langs []string) (rval types.Val, rerr error) { - if l == nil { - return rval, nil - } l.RLock() // All public methods should acquire locks, while private ones should assert them. defer l.RUnlock() p, err := l.postingFor(readTs, langs) @@ -1158,9 +1110,6 @@ func (l *List) ValueFor(readTs uint64, langs []string) (rval types.Val, rerr err // PostingFor returns the posting according to the preferred language list. func (l *List) PostingFor(readTs uint64, langs []string) (p *pb.Posting, rerr error) { - if l == nil { - return nil, nil - } l.RLock() defer l.RUnlock() return l.postingFor(readTs, langs) @@ -1173,10 +1122,6 @@ func (l *List) postingFor(readTs uint64, langs []string) (p *pb.Posting, rerr er // ValueForTag returns the value in the posting list with the given language tag. func (l *List) ValueForTag(readTs uint64, tag string) (rval types.Val, rerr error) { - if l == nil { - return rval, nil - } - l.RLock() defer l.RUnlock() p, err := l.postingForTag(readTs, tag) @@ -1290,9 +1235,6 @@ func (l *List) findPosting(readTs uint64, uid uint64) (found bool, pos *pb.Posti // Facets gives facets for the posting representing value. func (l *List) Facets(readTs uint64, param *pb.FacetParams, langs []string, listType bool) ([]*pb.Facets, error) { - if l == nil { - return nil, nil - } l.RLock() defer l.RUnlock() @@ -1318,7 +1260,7 @@ func (l *List) Facets(readTs uint64, param *pb.FacetParams, langs []string, } func (l *List) readListPart(startUid uint64) (*pb.PostingList, error) { - key, err := x.GetSplitKey(l.key, startUid) + key, err := x.SplitKey(l.key, startUid) if err != nil { return nil, errors.Wrapf(err, "cannot generate key for list with base key %s and start UID %d", diff --git a/posting/list_test.go b/posting/list_test.go index f8a22dc12ec..56ebb2f86c0 100644 --- a/posting/list_test.go +++ b/posting/list_test.go @@ -1065,7 +1065,7 @@ func TestMultiPartListMarshal(t *testing.T) { require.Equal(t, key, kvs[0].Key) for i, startUid := range ol.plist.Splits { - partKey, err := x.GetSplitKey(key, startUid) + partKey, err := x.SplitKey(key, startUid) require.NoError(t, err) require.Equal(t, partKey, kvs[i+1].Key) part, err := ol.readListPart(startUid) diff --git a/posting/mvcc.go b/posting/mvcc.go index 90130e137e1..ab4070dff39 100644 --- a/posting/mvcc.go +++ b/posting/mvcc.go @@ -159,7 +159,7 @@ func ReadPostingList(key []byte, it *badger.Iterator) (*List, error) { // The function returns a nil list instead. This is safe to do because all // public methods of the List object are no-ops and the list is being already // accessed via the main key in the places where this code is reached (e.g rollups). - return nil, nil + return nil, ErrInvalidKey } l := new(List) diff --git a/x/keys.go b/x/keys.go index 968aa156689..74586ff27ae 100644 --- a/x/keys.go +++ b/x/keys.go @@ -387,7 +387,7 @@ func FromBackupKey(backupKey *pb.BackupKey) []byte { if backupKey.StartUid > 0 { var err error - key, err = GetSplitKey(key, backupKey.StartUid) + key, err = SplitKey(key, backupKey.StartUid) Check(err) } return key @@ -416,11 +416,14 @@ func PredicatePrefix(predicate string) []byte { return buf } -// GetSplitKey takes a key baseKey and generates the key of the list split that starts at startUid. -func GetSplitKey(baseKey []byte, startUid uint64) ([]byte, error) { +// SplitKey takes a key baseKey and generates the key of the list split that starts at startUid. +func SplitKey(baseKey []byte, startUid uint64) ([]byte, error) { keyCopy := make([]byte, len(baseKey)+8) copy(keyCopy, baseKey) + if keyCopy[0] != DefaultPrefix { + return nil, errors.Errorf("only keys with default prefix can have a split key") + } // Change the first byte (i.e the key prefix) to ByteSplit to signal this is an // individual part of a single list key. keyCopy[0] = ByteSplit diff --git a/x/keys_test.go b/x/keys_test.go index 1a3c38bf0c6..907853f13a2 100644 --- a/x/keys_test.go +++ b/x/keys_test.go @@ -61,7 +61,7 @@ func TestParseDataKeyWithStartUid(t *testing.T) { for uid = 0; uid < 1001; uid++ { sattr := fmt.Sprintf("attr:%d", uid) key := DataKey(sattr, uid) - key, err := GetSplitKey(key, startUid) + key, err := SplitKey(key, startUid) require.NoError(t, err) pk, err := Parse(key) require.NoError(t, err) @@ -98,7 +98,7 @@ func TestIndexKeyWithStartUid(t *testing.T) { sterm := fmt.Sprintf("term:%d", uid) key := IndexKey(sattr, sterm) - key, err := GetSplitKey(key, startUid) + key, err := SplitKey(key, startUid) require.NoError(t, err) pk, err := Parse(key) require.NoError(t, err) @@ -133,7 +133,7 @@ func TestReverseKeyWithStartUid(t *testing.T) { sattr := fmt.Sprintf("attr:%d", uid) key := ReverseKey(sattr, uid) - key, err := GetSplitKey(key, startUid) + key, err := SplitKey(key, startUid) require.NoError(t, err) pk, err := Parse(key) require.NoError(t, err) @@ -168,7 +168,7 @@ func TestCountKeyWithStartUid(t *testing.T) { sattr := fmt.Sprintf("attr:%d", count) key := CountKey(sattr, count, true) - key, err := GetSplitKey(key, startUid) + key, err := SplitKey(key, startUid) require.NoError(t, err) pk, err := Parse(key) require.NoError(t, err) @@ -211,7 +211,7 @@ func TestTypeKey(t *testing.T) { func TestBadStartUid(t *testing.T) { testKey := func(key []byte) { - key, err := GetSplitKey(key, 10) + key, err := SplitKey(key, 10) require.NoError(t, err) _, err = Parse(key) require.NoError(t, err)