Skip to content

Commit

Permalink
opt(txn commits): Optimize txns by passing Skiplists to Badger (#7777)
Browse files Browse the repository at this point in the history
In this PR, we use concurrent mutations system, introduced in #7694 , to generate Skiplists directly from a transaction, even before it is committed.

When a transaction does get committed, we replace the key timestamps with the commit timestamp, in-place within the Skiplist. And in case of multiple transactions being committed, merge them together into one bigger Skiplist during the serial commit.

This Skiplist is then handed over to Badger, bypassing its value log and WAL system. Instead, when Badger persists the skiplist into L0 tables, Dgraph gets a callback, which it uses to decide when it is safe to take a Raft snapshot / checkpoint. So, we no longer need Badger's WAL.

Furthermore, live loader is also optimized to better deal with the conflicting transactions, and retrying them as frequently as possible, instead of only towards the end. The updated output from live loader shows we're able to deal with stragglers much better, with them causing almost no delays with this PR.

There's also a bug fix in this PR. The Raft checkpointing code was not getting run as often as it should. This PR fixes that.

```
Master:
[18:48:34-0700] Elapsed: 08m45s Txns: 21222 N-Quads: 21,221,870 N-Quads/s: 9,033 Aborts: 0
Number of TXs run            : 21240
Number of N-Quads processed  : 21239870
Time spent                   : 8m49.039284385s
N-Quads processed per second : 40150

This PR:
Number of TXs run            : 21240
Number of N-Quads processed  : 21239870
Time spent                   : 6m56.641399434s
N-Quads processed per second : 51057
```

Notable Changes:
* Have each mutation create its own skiplist. Merge them during commit.
* Use skiplist Builder
* optimize mutations as well
* Optimize live loader to better deal with conflicted requests.
* Make a callback delete txns from Oracle.
* Bug fix: Calculate raft checkpoint frequently.
* Clarify snapshot rules.
* Incremental rollups use skiplists too.
* Stop rollups during drop operations.

Co-authored-by: Ahsan Barkati <ahsanbarkati@gmail.com>
  • Loading branch information
manishrjain and ahsanbarkati authored May 5, 2021
1 parent 60bec16 commit 1017c4e
Show file tree
Hide file tree
Showing 22 changed files with 591 additions and 241 deletions.
33 changes: 25 additions & 8 deletions dgraph/cmd/alpha/http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -322,12 +322,16 @@ func runRequest(req *http.Request) (*x.QueryResWithData, []byte, *http.Response,
func runWithRetriesForResp(method, contentType, url string, body string) (
*x.QueryResWithData, []byte, *http.Response, error) {

label:
req, err := createRequest(method, contentType, url, body)
if err != nil {
return nil, nil, nil, err
}

qr, respBody, resp, err := runRequest(req)
if err != nil && strings.Contains(err.Error(), "Please retry operation") {
time.Sleep(time.Second)
goto label
}
if err != nil && strings.Contains(err.Error(), "Token is expired") {
err = token.refreshToken()
if err != nil {
Expand Down Expand Up @@ -619,7 +623,13 @@ func TestAlterSanity(t *testing.T) {
`{"drop_all":true}`}

for _, op := range ops {
label:
qr, _, err := runWithRetries("PUT", "", addr+"/alter", op)
if err != nil && strings.Contains(err.Error(), "Please retry") {
t.Logf("Got error: %v. Retrying...", err)
time.Sleep(time.Second)
goto label
}
require.NoError(t, err)
require.Len(t, qr.Errors, 0)
}
Expand Down Expand Up @@ -886,13 +896,20 @@ func TestDrainingMode(t *testing.T) {
require.NoError(t, err, "Got error while running mutation: %v", err)
}

err = alterSchema(`name: string @index(term) .`)
if expectErr {
require.True(t, err != nil && strings.Contains(err.Error(), "the server is in draining mode"))
} else {
require.NoError(t, err, "Got error while running alter: %v", err)
}

err = x.RetryUntilSuccess(3, time.Second, func() error {
err := alterSchema(`name: string @index(term) .`)
if expectErr {
if err == nil {
return errors.New("expected error")
}
if err != nil && strings.Contains(err.Error(), "server is in draining mode") {
return nil
}
return err
}
return err
})
require.NoError(t, err, "Got error while running alter: %v", err)
}

token := testutil.GrootHttpLogin(addr + "/admin")
Expand Down
20 changes: 15 additions & 5 deletions dgraph/cmd/alpha/reindex_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"testing"
"time"

"github.com/dgraph-io/dgraph/x"
"github.com/stretchr/testify/require"
)

Expand All @@ -39,7 +40,10 @@ func TestReindexTerm(t *testing.T) {
require.NoError(t, err)

// perform re-indexing
require.NoError(t, alterSchema(`name: string @index(term) .`))
err = x.RetryUntilSuccess(3, time.Second, func() error {
return alterSchema(`name: string @index(term) .`)
})
require.NoError(t, err)

q1 := `{
q(func: anyofterms(name, "bc")) {
Expand Down Expand Up @@ -67,8 +71,11 @@ func TestReindexLang(t *testing.T) {
_, err := mutationWithTs(mutationInp{body: m1, typ: "application/rdf", commitNow: true})
require.NoError(t, err)

// reindex
require.NoError(t, alterSchema(`name: string @lang @index(exact) .`))
// perform re-indexing
err = x.RetryUntilSuccess(3, time.Second, func() error {
return alterSchema(`name: string @lang @index(exact) .`)
})
require.NoError(t, err)

q1 := `{
q(func: eq(name@en, "Runtime")) {
Expand Down Expand Up @@ -141,8 +148,11 @@ func TestReindexReverseCount(t *testing.T) {
_, err := mutationWithTs(mutationInp{body: m1, typ: "application/rdf", commitNow: true})
require.NoError(t, err)

// reindex
require.NoError(t, alterSchema(`value: [uid] @count @reverse .`))
// perform re-indexing
err = x.RetryUntilSuccess(3, time.Second, func() error {
return alterSchema(`value: [uid] @count @reverse .`)
})
require.NoError(t, err)

q1 := `{
q(func: eq(count(~value), "3")) {
Expand Down
98 changes: 65 additions & 33 deletions dgraph/cmd/live/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,14 +74,12 @@ type loader struct {
retryRequestsWg sync.WaitGroup

// Miscellaneous information to print counters.
// Num of N-Quads sent
nquads uint64
// Num of txns sent
txns uint64
// Num of aborts
aborts uint64
// To get time elapsed
start time.Time
nquads uint64 // Num of N-Quads sent
txns uint64 // Num of txns sent
aborts uint64 // Num of aborts
start time.Time // To get time elapsed
inflight int32 // Number of inflight requests.
conc int32 // Number of request makers.

conflicts map[uint64]struct{}
uidsLock sync.RWMutex
Expand Down Expand Up @@ -165,13 +163,15 @@ func (l *loader) infinitelyRetry(req *request) {
}

func (l *loader) mutate(req *request) error {
atomic.AddInt32(&l.inflight, 1)
txn := l.dc.NewTxn()
req.CommitNow = true
request := &api.Request{
CommitNow: true,
Mutations: []*api.Mutation{req.Mutation},
}
_, err := txn.Do(l.opts.Ctx, request)
atomic.AddInt32(&l.inflight, -1)
return err
}

Expand Down Expand Up @@ -383,39 +383,69 @@ func (l *loader) deregister(req *request) {
// makeRequests can receive requests from batchNquads or directly from BatchSetWithMark.
// It doesn't need to batch the requests anymore. Batching is already done for it by the
// caller functions.
func (l *loader) makeRequests() {
func (l *loader) makeRequests(id int) {
defer l.requestsWg.Done()
atomic.AddInt32(&l.conc, 1)
defer atomic.AddInt32(&l.conc, -1)

buffer := make([]*request, 0, l.opts.bufferSize)
drain := func(maxSize int) {
for len(buffer) > maxSize {
i := 0
for _, req := range buffer {
// If there is no conflict in req, we will use it
// and then it would shift all the other reqs in buffer
if !l.addConflictKeys(req) {
buffer[i] = req
i++
continue
}
// Req will no longer be part of a buffer
l.request(req)
var loops int
drain := func() {
i := 0
for _, req := range buffer {
loops++
// If there is no conflict in req, we will use it
// and then it would shift all the other reqs in buffer
if !l.addConflictKeys(req) {
buffer[i] = req
i++
continue
}
buffer = buffer[:i]
// Req will no longer be part of a buffer
l.request(req)
}
buffer = buffer[:i]
}

for req := range l.reqs {
req.conflicts = l.conflictKeysForReq(req)
if l.addConflictKeys(req) {
l.request(req)
} else {
buffer = append(buffer, req)
t := time.NewTicker(5 * time.Second)
defer t.Stop()

outer:
for {
select {
case req, ok := <-l.reqs:
if !ok {
break outer
}
req.conflicts = l.conflictKeysForReq(req)
if l.addConflictKeys(req) {
l.request(req)
} else {
buffer = append(buffer, req)
}

case <-t.C:
for {
drain()
if len(buffer) < l.opts.bufferSize {
break
}
time.Sleep(100 * time.Millisecond)
}
}
drain(l.opts.bufferSize - 1)
}

drain(0)
for len(buffer) > 0 {
select {
case <-t.C:
fmt.Printf("[%2d] Draining. len(buffer): %d\n", id, len(buffer))
default:
}

drain()
time.Sleep(100 * time.Millisecond)
}
fmt.Printf("[%2d] Looped %d times over buffered requests.\n", id, loops)
}

func (l *loader) printCounters() {
Expand All @@ -429,9 +459,11 @@ func (l *loader) printCounters() {
r.Capture(c.Nquads)
elapsed := time.Since(start).Round(time.Second)
timestamp := time.Now().Format("15:04:05Z0700")
fmt.Printf("[%s] Elapsed: %s Txns: %d N-Quads: %s N-Quads/s: %s Aborts: %d\n",
fmt.Printf("[%s] Elapsed: %s Txns: %d N-Quads: %s N-Quads/s: %s"+
" Inflight: %2d/%2d Aborts: %d\n",
timestamp, x.FixedDuration(elapsed), c.TxnsDone,
humanize.Comma(int64(c.Nquads)), humanize.Comma(int64(r.Rate())), c.Aborts)
humanize.Comma(int64(c.Nquads)), humanize.Comma(int64(r.Rate())),
atomic.LoadInt32(&l.inflight), atomic.LoadInt32(&l.conc), c.Aborts)
}
}

Expand Down
2 changes: 1 addition & 1 deletion dgraph/cmd/live/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -648,7 +648,7 @@ func setup(opts batchMutationOptions, dc *dgo.Dgraph, conf *viper.Viper) *loader

l.requestsWg.Add(opts.Pending)
for i := 0; i < opts.Pending; i++ {
go l.makeRequests()
go l.makeRequests(i)
}

rand.Seed(time.Now().Unix())
Expand Down
19 changes: 16 additions & 3 deletions dgraph/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,13 @@ func main() {
// benchmark notes are located in badger-bench/randread.
runtime.GOMAXPROCS(128)

absDiff := func(a, b uint64) uint64 {
absU := func(a, b uint64) uint64 {
if a > b {
return a - b
}
return b - a
}
abs := func(a, b int) int {
if a > b {
return a - b
}
Expand All @@ -53,11 +59,12 @@ func main() {

var js z.MemStats
var lastAlloc uint64
var numGo int

for range ticker.C {
// Read Jemalloc stats first. Print if there's a big difference.
z.ReadMemStats(&js)
if diff := absDiff(uint64(z.NumAllocBytes()), lastAlloc); diff > 1<<30 {
if diff := absU(uint64(z.NumAllocBytes()), lastAlloc); diff > 1<<30 {
glog.V(2).Infof("NumAllocBytes: %s jemalloc: Active %s Allocated: %s"+
" Resident: %s Retained: %s\n",
humanize.IBytes(uint64(z.NumAllocBytes())),
Expand All @@ -69,7 +76,13 @@ func main() {
}

runtime.ReadMemStats(&ms)
diff := absDiff(ms.HeapAlloc, lastMs.HeapAlloc)
diff := absU(ms.HeapAlloc, lastMs.HeapAlloc)

curGo := runtime.NumGoroutine()
if diff := abs(curGo, numGo); diff >= 64 {
glog.V(2).Infof("Num goroutines: %d\n", curGo)
numGo = curGo
}

switch {
case ms.NumGC > lastNumGC:
Expand Down
11 changes: 9 additions & 2 deletions edgraph/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -538,9 +538,16 @@ func (s *Server) Alter(ctx context.Context, op *api.Operation) (*api.Payload, er
// TODO: Maybe add some checks about the schema.
m.Schema = result.Preds
m.Types = result.Types
_, err = query.ApplyMutations(ctx, m)
for i := 0; i < 3; i++ {
_, err = query.ApplyMutations(ctx, m)
if err != nil && strings.Contains(err.Error(), "Please retry operation") {
time.Sleep(time.Second)
continue
}
break
}
if err != nil {
return empty, err
return empty, errors.Wrapf(err, "During ApplyMutations")
}

// wait for indexing to complete or context to be canceled.
Expand Down
2 changes: 1 addition & 1 deletion ee/acl/acl_curl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func TestCurlAuthorization(t *testing.T) {
// test query through curl
token, err := testutil.HttpLogin(&testutil.LoginParams{
Endpoint: adminEndpoint,
UserID: userid,
UserID: commonUserId,
Passwd: userpassword,
Namespace: x.GalaxyNamespace,
})
Expand Down
Loading

0 comments on commit 1017c4e

Please sign in to comment.