Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Change split keys to have a different prefix. #4908

Merged
merged 10 commits into from
Mar 16, 2020
59 changes: 58 additions & 1 deletion posting/list.go
Original file line number Diff line number Diff line change
Expand Up @@ -496,6 +496,10 @@ func (l *List) setMutation(startTs uint64, data []byte) {
// return errStopIteration // to break iteration.
// })
func (l *List) Iterate(readTs uint64, afterUid uint64, f func(obj *pb.Posting) error) error {
if l == nil {
return nil
}

l.RLock()
defer l.RUnlock()
return l.iterate(readTs, afterUid, f)
Expand Down Expand Up @@ -661,6 +665,10 @@ loop:

// IsEmpty returns true if there are no uids at the given timestamp after the given UID.
func (l *List) IsEmpty(readTs, afterUid uint64) (bool, error) {
if l == nil {
return true, nil
}

l.RLock()
defer l.RUnlock()
var count int
Expand Down Expand Up @@ -689,6 +697,10 @@ func (l *List) length(readTs, afterUid uint64) int {

// Length iterates over the mutation layer and counts number of elements.
func (l *List) Length(readTs, afterUid uint64) int {
if l == nil {
return 0
}

l.RLock()
defer l.RUnlock()
return l.length(readTs, afterUid)
Expand All @@ -714,6 +726,10 @@ func (l *List) Length(readTs, afterUid uint64) int {
// to be deleted, at which point the entire list will be marked for deletion.
// As the list grows, existing parts might be split if they become too big.
func (l *List) Rollup() ([]*bpb.KV, error) {
if l == nil {
return nil, nil
}

l.RLock()
defer l.RUnlock()
out, err := l.rollup(math.MaxUint64, true)
Expand Down Expand Up @@ -749,6 +765,10 @@ func (l *List) Rollup() ([]*bpb.KV, error) {
// SingleListRollup works like rollup but generates a single list with no splits.
// It's used during backup so that each backed up posting list is stored in a single key.
func (l *List) SingleListRollup() (*bpb.KV, error) {
if l == nil {
return nil, nil
}

l.RLock()
defer l.RUnlock()

Expand Down Expand Up @@ -906,6 +926,10 @@ func (l *List) rollup(readTs uint64, split bool) (*rollupOutput, error) {

// ApproxLen returns an approximate count of the UIDs in the posting list.
func (l *List) ApproxLen() int {
if l == nil {
return 0
}

l.RLock()
defer l.RUnlock()
return len(l.mutationMap) + codec.ApproxLen(l.plist.Pack)
Expand All @@ -915,6 +939,10 @@ func (l *List) ApproxLen() int {
// We have to apply the filtering before applying (offset, count).
// WARNING: Calling this function just to get UIDs is expensive
func (l *List) Uids(opt ListOptions) (*pb.List, error) {
if l == nil {
return nil, nil
}

// Pre-assign length to make it faster.
l.RLock()
// Use approximate length for initial capacity.
Expand Down Expand Up @@ -953,6 +981,10 @@ func (l *List) Uids(opt ListOptions) (*pb.List, error) {
// Postings calls postFn with the postings that are common with
// UIDs in the opt ListOptions.
func (l *List) Postings(opt ListOptions, postFn func(*pb.Posting) error) error {
if l == nil {
return nil
}

l.RLock()
defer l.RUnlock()

Expand All @@ -968,6 +1000,9 @@ func (l *List) Postings(opt ListOptions, postFn func(*pb.Posting) error) error {

// AllUntaggedValues returns all the values in the posting list with no language tag.
func (l *List) AllUntaggedValues(readTs uint64) ([]types.Val, error) {
if l == nil {
return nil, nil
}
l.RLock()
defer l.RUnlock()

Expand Down Expand Up @@ -1003,6 +1038,9 @@ func (l *List) allUntaggedFacets(readTs uint64) ([]*pb.Facets, error) {

// AllValues returns all the values in the posting list.
func (l *List) AllValues(readTs uint64) ([]types.Val, error) {
if l == nil {
return nil, nil
}
l.RLock()
defer l.RUnlock()

Expand All @@ -1020,6 +1058,9 @@ func (l *List) AllValues(readTs uint64) ([]types.Val, error) {

// GetLangTags finds the language tags of each posting in the list.
func (l *List) GetLangTags(readTs uint64) ([]string, error) {
if l == nil {
return nil, nil
}
l.RLock()
defer l.RUnlock()

Expand All @@ -1035,6 +1076,10 @@ func (l *List) GetLangTags(readTs uint64) ([]string, error) {
// Value returns the default value from the posting list. The default value is
// defined as the value without a language tag.
func (l *List) Value(readTs uint64) (rval types.Val, rerr error) {
if l == nil {
return rval, nil
}

l.RLock()
defer l.RUnlock()
val, found, err := l.findValue(readTs, math.MaxUint64)
Expand All @@ -1054,6 +1099,9 @@ func (l *List) Value(readTs uint64) (rval types.Val, rerr error) {
// If list consists of one or more languages, first available value is returned.
// If no language from the list matches the values, processing is the same as for empty list.
func (l *List) ValueFor(readTs uint64, langs []string) (rval types.Val, rerr error) {
if l == nil {
return rval, nil
}
l.RLock() // All public methods should acquire locks, while private ones should assert them.
defer l.RUnlock()
p, err := l.postingFor(readTs, langs)
Expand All @@ -1066,6 +1114,9 @@ func (l *List) ValueFor(readTs uint64, langs []string) (rval types.Val, rerr err

// PostingFor returns the posting according to the preferred language list.
func (l *List) PostingFor(readTs uint64, langs []string) (p *pb.Posting, rerr error) {
if l == nil {
return nil, nil
}
l.RLock()
defer l.RUnlock()
return l.postingFor(readTs, langs)
Expand All @@ -1078,6 +1129,10 @@ func (l *List) postingFor(readTs uint64, langs []string) (p *pb.Posting, rerr er

// ValueForTag returns the value in the posting list with the given language tag.
func (l *List) ValueForTag(readTs uint64, tag string) (rval types.Val, rerr error) {
if l == nil {
return rval, nil
}

l.RLock()
defer l.RUnlock()
p, err := l.postingForTag(readTs, tag)
Expand Down Expand Up @@ -1191,7 +1246,9 @@ func (l *List) findPosting(readTs uint64, uid uint64) (found bool, pos *pb.Posti
// Facets gives facets for the posting representing value.
func (l *List) Facets(readTs uint64, param *pb.FacetParams, langs []string,
listType bool) ([]*pb.Facets, error) {

if l == nil {
return nil, nil
}
l.RLock()
defer l.RUnlock()

Expand Down
20 changes: 7 additions & 13 deletions posting/mvcc.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ var (
ErrTsTooOld = errors.Errorf("Transaction is too old")
// ErrInvalidKey is returned when trying to read a posting list using
// an invalid key (e.g the key to a single part of a larger multi-part list).
ErrInvalidKey = errors.Errorf("cannot read posting list from this key")
ErrInvalidKey = errors.Errorf("cannot read posting list using multi-part list key")
)

// ShouldAbort returns whether the transaction should be aborted.
Expand Down Expand Up @@ -154,8 +154,12 @@ func ReadPostingList(key []byte, it *badger.Iterator) (*List, error) {
}
if pk.HasStartUid {
// Trying to read a single part of a multi part list. This type of list
// should be read once using the canonical list (with startUid equal to zero).
return nil, ErrInvalidKey
// should be read using using the main key because the information needed
// to access the whole list is stored there.
// The function returns a nil list instead. This is safe to do because all
// public methods of the List object are no-ops and the list is being already
// accessed via the main key in the places where this code is reached (e.g rollups).
return nil, nil
}

l := new(List)
Expand Down Expand Up @@ -184,16 +188,6 @@ func ReadPostingList(key []byte, it *badger.Iterator) (*List, error) {
}
l.minTs = item.Version()

// If this list is a multi-part list, advance past the keys holding the parts.
if len(l.plist.GetSplits()) > 0 {
lastKey, err := x.GetSplitKey(key, math.MaxUint64)
if err != nil {
return nil, errors.Wrapf(err,
"while advancing past the end of multi-part list with key [%v]", key)
}
it.Seek(lastKey)
}

// No need to do Next here. The outer loop can take care of skipping
// more versions of the same key.
return l, nil
Expand Down