Skip to content

Commit

Permalink
added sharded map
Browse files Browse the repository at this point in the history
  • Loading branch information
harshil-goel committed Oct 24, 2024
1 parent c14105a commit 8e4e205
Show file tree
Hide file tree
Showing 14 changed files with 536 additions and 113 deletions.
1 change: 1 addition & 0 deletions edgraph/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -1411,6 +1411,7 @@ func (s *Server) doQuery(ctx context.Context, req *Request) (resp *api.Response,
EncodingNs: uint64(l.Json.Nanoseconds()),
TotalNs: uint64((time.Since(l.Start)).Nanoseconds()),
}
//fmt.Println("====Query Resp", qc.req.Query, qc.req.StartTs, qc.req, string(resp.Json))
return resp, gqlErrs
}

Expand Down
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
module github.com/dgraph-io/dgraph/v24

replace github.com/dgraph-io/ristretto => /home/harshil/Projects/ristretto/

go 1.22.6

require (
Expand Down
2 changes: 0 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -151,8 +151,6 @@ github.com/dgraph-io/gqlparser/v2 v2.2.2 h1:CnxXOKL4EPguKqcGV/z4u4VoW5izUkOTIsNM
github.com/dgraph-io/gqlparser/v2 v2.2.2/go.mod h1:MYS4jppjyx8b9tuUtjV7jU1UFZK6P9fvO8TsIsQtRKU=
github.com/dgraph-io/graphql-transport-ws v0.0.0-20210511143556-2cef522f1f15 h1:X2NRsgAtVUAp2nmTPCq+x+wTcRRrj74CEpy7E0Unsl4=
github.com/dgraph-io/graphql-transport-ws v0.0.0-20210511143556-2cef522f1f15/go.mod h1:7z3c/5w0sMYYZF5bHsrh8IH4fKwG5O5Y70cPH1ZLLRQ=
github.com/dgraph-io/ristretto v1.0.0 h1:SYG07bONKMlFDUYu5pEu3DGAh8c2OFNzKm6G9J4Si84=
github.com/dgraph-io/ristretto v1.0.0/go.mod h1:jTi2FiYEhQ1NsMmA7DeBykizjOuY88NhKBkepyu1jPc=
github.com/dgraph-io/simdjson-go v0.3.0 h1:h71LO7vR4LHMPUhuoGN8bqGm1VNfGOlAG8BI6iDUKw0=
github.com/dgraph-io/simdjson-go v0.3.0/go.mod h1:Otpysdjaxj9OGaJusn4pgQV7OFh2bELuHANq0I78uvY=
github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ=
Expand Down
21 changes: 19 additions & 2 deletions posting/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,11 +217,28 @@ func (txn *Txn) addIndexMutation(ctx context.Context, edge *pb.DirectedEdge, tok
return err
}

x.AssertTrue(plist != nil)
if err = plist.addMutation(ctx, txn, edge); err != nil {
return err
}
ostats.Record(ctx, x.NumEdges.M(1))

//mpost := NewPosting(edge)
//mpost.StartTs = txn.StartTs
//if mpost.PostingType != pb.Posting_REF {
// edge.ValueId = fingerprintEdge(edge)
// mpost.Uid = edge.ValueId
//}

////fmt.Println("ADDING MUTATION", plist.mutationMap, key, edge)
//txn.addConflictKey(indexConflicKey(key, edge))

//plist.Lock()
//defer plist.Unlock()
//if err != plist.updateMutationLayer(mpost, false) {
// return errors.Wrapf(err, "cannot update mutation layer of key %s with value %+v",
// hex.EncodeToString(plist.key), mpost)
//}

//ostats.Record(ctx, x.NumEdges.M(1))
return nil
}

Expand Down
1 change: 1 addition & 0 deletions posting/index_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,7 @@ func addMutation(t *testing.T, l *List, edge *pb.DirectedEdge, op uint32,
}

txn.Update()
txn.UpdateCachedKeys(commitTs)
writer := NewTxnWriter(pstore)
require.NoError(t, txn.CommitToDisk(writer, commitTs))
require.NoError(t, writer.Flush())
Expand Down
22 changes: 21 additions & 1 deletion posting/list.go
Original file line number Diff line number Diff line change
Expand Up @@ -720,6 +720,17 @@ func (l *List) addMutation(ctx context.Context, txn *Txn, t *pb.DirectedEdge) er
return l.addMutationInternal(ctx, txn, t)
}

func indexConflicKey(key []byte, t *pb.DirectedEdge) uint64 {
getKey := func(key []byte, uid uint64) uint64 {
// Instead of creating a string first and then doing a fingerprint, let's do a fingerprint
// here to save memory allocations.
// Not entirely sure about effect on collision chances due to this simple XOR with uid.
return farm.Fingerprint64(key) ^ uid
}

return getKey(key, t.ValueId)
}

func GetConflictKey(pk x.ParsedKey, key []byte, t *pb.DirectedEdge) uint64 {
getKey := func(key []byte, uid uint64) uint64 {
// Instead of creating a string first and then doing a fingerprint, let's do a fingerprint
Expand Down Expand Up @@ -889,7 +900,6 @@ func (l *List) setMutation(startTs uint64, data []byte) {
l.mutationMap = newMutableLayer()
}
l.mutationMap.setCurrentEntries(startTs, pl)

if pl.CommitTs != 0 {
l.maxTs = x.Max(l.maxTs, pl.CommitTs)
}
Expand Down Expand Up @@ -920,6 +930,7 @@ func (l *List) Iterate(readTs uint64, afterUid uint64, f func(obj *pb.Posting) e
// If greater than zero, this timestamp must thus be greater than l.minTs.
func (l *List) pickPostings(readTs uint64) (uint64, []*pb.Posting) {
// This function would return zero ts for entries above readTs.
// either way, effective ts is returning ts.
effective := func(start, commit uint64) uint64 {
if commit > 0 && commit <= readTs {
// Has been committed and below the readTs.
Expand Down Expand Up @@ -959,6 +970,7 @@ func (l *List) iterate(readTs uint64, afterUid uint64, f func(obj *pb.Posting) e

// mposts is the list of mutable postings
deleteBelowTs, mposts := l.pickPostings(readTs)
//fmt.Println(mposts, deleteBelowTs, l.plist)
if readTs < l.minTs {
return errors.Errorf("readTs: %d less than minTs: %d for key: %q", readTs, l.minTs, l.key)
}
Expand Down Expand Up @@ -1116,6 +1128,7 @@ func (l *List) getPostingAndLength(readTs, afterUid, uid uint64) (int, bool, *pb
var count int
var found bool
var post *pb.Posting

err := l.iterate(readTs, afterUid, func(p *pb.Posting) error {
if p.Uid == uid {
post = p
Expand Down Expand Up @@ -1259,6 +1272,7 @@ func (l *List) Rollup(alloc *z.Allocator, readTs uint64) ([]*bpb.KV, error) {
return bytes.Compare(kvs[i].Key, kvs[j].Key) <= 0
})

//fmt.Println("ROLLING UP", l.key, out.plist, kv.Version)
x.PrintRollup(out.plist, out.parts, l.key, kv.Version)
x.VerifyPostingSplits(kvs, out.plist, out.parts, l.key)
return kvs, nil
Expand Down Expand Up @@ -1678,8 +1692,10 @@ func (l *List) findStaticValue(readTs uint64) *pb.PostingList {
if l.mutationMap == nil {
// If mutation map is empty, check if there is some data, and return it.
if l.plist != nil && len(l.plist.Postings) > 0 {
//fmt.Println("nil map plist")
return l.plist
}
//fmt.Println("nil map nil")
return nil
}

Expand All @@ -1691,6 +1707,7 @@ func (l *List) findStaticValue(readTs uint64) *pb.PostingList {
// If maxTs < readTs then we need to read maxTs
if l.maxTs <= readTs {
if mutation := l.mutationMap.get(l.maxTs); mutation != nil {
//fmt.Println("mutation", mutation)
return mutation
}
}
Expand All @@ -1706,11 +1723,13 @@ func (l *List) findStaticValue(readTs uint64) *pb.PostingList {
}
}, readTs)
if mutation != nil {
//fmt.Println("iterate", mutation)
return mutation
}

// If we reach here, that means that there was no entry in mutation map which is less than readTs. That
// means we need to return l.plist
//fmt.Println("got nothing", l.plist, l.plist != nil)
return l.plist
}

Expand Down Expand Up @@ -1872,6 +1891,7 @@ func (l *List) findPosting(readTs uint64, uid uint64) (found bool, pos *pb.Posti
// Iterate starts iterating after the given argument, so we pass UID - 1
// TODO Find what happens when uid = math.MaxUint64
searchFurther, pos := l.mutationMap.findPosting(readTs, uid)
//fmt.Println("FIND POSTING", readTs, "key", l.key, "mutationMap:", l.mutationMap, "plist:", l.plist, uid, searchFurther, pos)
if pos != nil {
return true, pos, nil
}
Expand Down
6 changes: 6 additions & 0 deletions posting/list_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ func TestGetSinglePosting(t *testing.T) {

res, err := l.StaticValue(1)
require.NoError(t, err)
//fmt.Println(res, res == nil)
require.Equal(t, res == nil, true)

l.plist = create_pl(1, 1)
Expand Down Expand Up @@ -225,6 +226,7 @@ func TestAddMutation(t *testing.T) {

func getFirst(t *testing.T, l *List, readTs uint64) (res pb.Posting) {
require.NoError(t, l.Iterate(readTs, 0, func(p *pb.Posting) error {
//fmt.Println("INSIDE ITERATE", p)
res = *p
return ErrStopIteration
}))
Expand All @@ -233,6 +235,7 @@ func getFirst(t *testing.T, l *List, readTs uint64) (res pb.Posting) {

func checkValue(t *testing.T, ol *List, val string, readTs uint64) {
p := getFirst(t, ol, readTs)
//fmt.Println("HERE", val, string(p.Value), p, ol, p.Uid)
require.Equal(t, uint64(math.MaxUint64), p.Uid) // Cast to prevent overflow.
require.EqualValues(t, val, p.Value)
}
Expand Down Expand Up @@ -532,6 +535,8 @@ func TestReadSingleValue(t *testing.T) {
kvs, err := ol.Rollup(nil, txn.StartTs-3)
require.NoError(t, err)
require.NoError(t, writePostingListToDisk(kvs))
// Delete item from global cache before reading, as we are not updating the cache in the test
memoryLayer.Del(z.MemHash(key))
ol, err = getNew(key, ps, math.MaxUint64)
require.NoError(t, err)
}
Expand All @@ -541,6 +546,7 @@ func TestReadSingleValue(t *testing.T) {
j = ol.minTs
}
for ; j < i+6; j++ {
ResetCache()
tx := NewTxn(j)
k, err := tx.cache.GetSinglePosting(key)
require.NoError(t, err)
Expand Down
12 changes: 9 additions & 3 deletions posting/lists.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ func (vc *viLocalCache) GetWithLockHeld(key []byte) (rval index.Value, rerr erro
func (vc *viLocalCache) GetValueFromPostingList(pl *List) (rval index.Value, rerr error) {
value := pl.findStaticValue(vc.delegate.startTs)

if value == nil {
if value == nil || len(value.Postings) == 0 {
return nil, ErrNoValue
}

Expand Down Expand Up @@ -312,8 +312,9 @@ func (lc *LocalCache) getInternal(key []byte, readFromDisk bool) (*List, error)
}
} else {
pl = &List{
key: key,
plist: new(pb.PostingList),
key: key,
plist: new(pb.PostingList),
mutationMap: newMutableLayer(),
}
}

Expand All @@ -336,6 +337,7 @@ func (lc *LocalCache) GetSinglePosting(key []byte) (*pb.PostingList, error) {

pl := &pb.PostingList{}
if delta, ok := lc.deltas[string(key)]; ok && len(delta) > 0 {
//fmt.Println("GETTING FROM DELTAS")
err := pl.Unmarshal(delta)
lc.RUnlock()
return pl, err
Expand All @@ -356,6 +358,7 @@ func (lc *LocalCache) GetSinglePosting(key []byte) (*pb.PostingList, error) {
// If both pl and err are empty, that means that there was no data in local cache, hence we should
// read the data from badger.
if pl != nil || err != nil {
//fmt.Println("GETTING POSTING1", lc.startTs, pl)
return pl, err
}

Expand All @@ -372,6 +375,7 @@ func (lc *LocalCache) GetSinglePosting(key []byte) (*pb.PostingList, error) {
return pl.Unmarshal(val)
})

//fmt.Println("GETTING POSTING FROM BADGER", lc.startTs, pl)
return pl, err
}

Expand All @@ -395,6 +399,8 @@ func (lc *LocalCache) GetSinglePosting(key []byte) (*pb.PostingList, error) {
}
}
pl.Postings = pl.Postings[:idx]
//pk, _ := x.Parse([]byte(key))
//fmt.Println("====Getting single posting", lc.startTs, pk, pl.Postings)
return pl, nil
}

Expand Down
Loading

0 comments on commit 8e4e205

Please sign in to comment.