From 1017c4ed1f9a8521efd318665642408b6280609e Mon Sep 17 00:00:00 2001 From: Manish R Jain Date: Tue, 4 May 2021 21:05:19 -0700 Subject: [PATCH] opt(txn commits): Optimize txns by passing Skiplists to Badger (#7777) In this PR, we use concurrent mutations system, introduced in #7694 , to generate Skiplists directly from a transaction, even before it is committed. When a transaction does get committed, we replace the key timestamps with the commit timestamp, in-place within the Skiplist. And in case of multiple transactions being committed, merge them together into one bigger Skiplist during the serial commit. This Skiplist is then handed over to Badger, bypassing its value log and WAL system. Instead, when Badger persists the skiplist into L0 tables, Dgraph gets a callback, which it uses to decide when it is safe to take a Raft snapshot / checkpoint. So, we no longer need Badger's WAL. Furthermore, live loader is also optimized to better deal with the conflicting transactions, and retrying them as frequently as possible, instead of only towards the end. The updated output from live loader shows we're able to deal with stragglers much better, with them causing almost no delays with this PR. There's also a bug fix in this PR. The Raft checkpointing code was not getting run as often as it should. This PR fixes that. ``` Master: [18:48:34-0700] Elapsed: 08m45s Txns: 21222 N-Quads: 21,221,870 N-Quads/s: 9,033 Aborts: 0 Number of TXs run : 21240 Number of N-Quads processed : 21239870 Time spent : 8m49.039284385s N-Quads processed per second : 40150 This PR: Number of TXs run : 21240 Number of N-Quads processed : 21239870 Time spent : 6m56.641399434s N-Quads processed per second : 51057 ``` Notable Changes: * Have each mutation create its own skiplist. Merge them during commit. * Use skiplist Builder * optimize mutations as well * Optimize live loader to better deal with conflicted requests. * Make a callback delete txns from Oracle. * Bug fix: Calculate raft checkpoint frequently. * Clarify snapshot rules. * Incremental rollups use skiplists too. * Stop rollups during drop operations. Co-authored-by: Ahsan Barkati --- dgraph/cmd/alpha/http_test.go | 33 +++-- dgraph/cmd/alpha/reindex_test.go | 20 ++- dgraph/cmd/live/batch.go | 98 +++++++++----- dgraph/cmd/live/run.go | 2 +- dgraph/main.go | 19 ++- edgraph/server.go | 11 +- ee/acl/acl_curl_test.go | 2 +- ee/acl/acl_test.go | 116 +++++++++-------- go.mod | 4 +- go.sum | 9 +- graphql/e2e/common/common.go | 1 + posting/index.go | 2 +- posting/index_test.go | 20 ++- posting/list_test.go | 4 +- posting/mvcc.go | 184 +++++++++++++++++++-------- posting/oracle.go | 38 +++++- query/common_test.go | 10 +- systest/queries_test.go | 9 +- t/t.go | 12 +- testutil/client.go | 7 +- worker/draft.go | 211 +++++++++++++++++++++++-------- worker/worker_test.go | 20 ++- 22 files changed, 591 insertions(+), 241 deletions(-) diff --git a/dgraph/cmd/alpha/http_test.go b/dgraph/cmd/alpha/http_test.go index 97638f25b0e..fa1dc20f40c 100644 --- a/dgraph/cmd/alpha/http_test.go +++ b/dgraph/cmd/alpha/http_test.go @@ -322,12 +322,16 @@ func runRequest(req *http.Request) (*x.QueryResWithData, []byte, *http.Response, func runWithRetriesForResp(method, contentType, url string, body string) ( *x.QueryResWithData, []byte, *http.Response, error) { +label: req, err := createRequest(method, contentType, url, body) if err != nil { return nil, nil, nil, err } - qr, respBody, resp, err := runRequest(req) + if err != nil && strings.Contains(err.Error(), "Please retry operation") { + time.Sleep(time.Second) + goto label + } if err != nil && strings.Contains(err.Error(), "Token is expired") { err = token.refreshToken() if err != nil { @@ -619,7 +623,13 @@ func TestAlterSanity(t *testing.T) { `{"drop_all":true}`} for _, op := range ops { + label: qr, _, err := runWithRetries("PUT", "", addr+"/alter", op) + if err != nil && strings.Contains(err.Error(), "Please retry") { + t.Logf("Got error: %v. Retrying...", err) + time.Sleep(time.Second) + goto label + } require.NoError(t, err) require.Len(t, qr.Errors, 0) } @@ -886,13 +896,20 @@ func TestDrainingMode(t *testing.T) { require.NoError(t, err, "Got error while running mutation: %v", err) } - err = alterSchema(`name: string @index(term) .`) - if expectErr { - require.True(t, err != nil && strings.Contains(err.Error(), "the server is in draining mode")) - } else { - require.NoError(t, err, "Got error while running alter: %v", err) - } - + err = x.RetryUntilSuccess(3, time.Second, func() error { + err := alterSchema(`name: string @index(term) .`) + if expectErr { + if err == nil { + return errors.New("expected error") + } + if err != nil && strings.Contains(err.Error(), "server is in draining mode") { + return nil + } + return err + } + return err + }) + require.NoError(t, err, "Got error while running alter: %v", err) } token := testutil.GrootHttpLogin(addr + "/admin") diff --git a/dgraph/cmd/alpha/reindex_test.go b/dgraph/cmd/alpha/reindex_test.go index ddaac4d8718..95a07637af7 100644 --- a/dgraph/cmd/alpha/reindex_test.go +++ b/dgraph/cmd/alpha/reindex_test.go @@ -21,6 +21,7 @@ import ( "testing" "time" + "github.com/dgraph-io/dgraph/x" "github.com/stretchr/testify/require" ) @@ -39,7 +40,10 @@ func TestReindexTerm(t *testing.T) { require.NoError(t, err) // perform re-indexing - require.NoError(t, alterSchema(`name: string @index(term) .`)) + err = x.RetryUntilSuccess(3, time.Second, func() error { + return alterSchema(`name: string @index(term) .`) + }) + require.NoError(t, err) q1 := `{ q(func: anyofterms(name, "bc")) { @@ -67,8 +71,11 @@ func TestReindexLang(t *testing.T) { _, err := mutationWithTs(mutationInp{body: m1, typ: "application/rdf", commitNow: true}) require.NoError(t, err) - // reindex - require.NoError(t, alterSchema(`name: string @lang @index(exact) .`)) + // perform re-indexing + err = x.RetryUntilSuccess(3, time.Second, func() error { + return alterSchema(`name: string @lang @index(exact) .`) + }) + require.NoError(t, err) q1 := `{ q(func: eq(name@en, "Runtime")) { @@ -141,8 +148,11 @@ func TestReindexReverseCount(t *testing.T) { _, err := mutationWithTs(mutationInp{body: m1, typ: "application/rdf", commitNow: true}) require.NoError(t, err) - // reindex - require.NoError(t, alterSchema(`value: [uid] @count @reverse .`)) + // perform re-indexing + err = x.RetryUntilSuccess(3, time.Second, func() error { + return alterSchema(`value: [uid] @count @reverse .`) + }) + require.NoError(t, err) q1 := `{ q(func: eq(count(~value), "3")) { diff --git a/dgraph/cmd/live/batch.go b/dgraph/cmd/live/batch.go index 6b0b4f76166..86f1c0f080b 100644 --- a/dgraph/cmd/live/batch.go +++ b/dgraph/cmd/live/batch.go @@ -74,14 +74,12 @@ type loader struct { retryRequestsWg sync.WaitGroup // Miscellaneous information to print counters. - // Num of N-Quads sent - nquads uint64 - // Num of txns sent - txns uint64 - // Num of aborts - aborts uint64 - // To get time elapsed - start time.Time + nquads uint64 // Num of N-Quads sent + txns uint64 // Num of txns sent + aborts uint64 // Num of aborts + start time.Time // To get time elapsed + inflight int32 // Number of inflight requests. + conc int32 // Number of request makers. conflicts map[uint64]struct{} uidsLock sync.RWMutex @@ -165,6 +163,7 @@ func (l *loader) infinitelyRetry(req *request) { } func (l *loader) mutate(req *request) error { + atomic.AddInt32(&l.inflight, 1) txn := l.dc.NewTxn() req.CommitNow = true request := &api.Request{ @@ -172,6 +171,7 @@ func (l *loader) mutate(req *request) error { Mutations: []*api.Mutation{req.Mutation}, } _, err := txn.Do(l.opts.Ctx, request) + atomic.AddInt32(&l.inflight, -1) return err } @@ -383,39 +383,69 @@ func (l *loader) deregister(req *request) { // makeRequests can receive requests from batchNquads or directly from BatchSetWithMark. // It doesn't need to batch the requests anymore. Batching is already done for it by the // caller functions. -func (l *loader) makeRequests() { +func (l *loader) makeRequests(id int) { defer l.requestsWg.Done() + atomic.AddInt32(&l.conc, 1) + defer atomic.AddInt32(&l.conc, -1) buffer := make([]*request, 0, l.opts.bufferSize) - drain := func(maxSize int) { - for len(buffer) > maxSize { - i := 0 - for _, req := range buffer { - // If there is no conflict in req, we will use it - // and then it would shift all the other reqs in buffer - if !l.addConflictKeys(req) { - buffer[i] = req - i++ - continue - } - // Req will no longer be part of a buffer - l.request(req) + var loops int + drain := func() { + i := 0 + for _, req := range buffer { + loops++ + // If there is no conflict in req, we will use it + // and then it would shift all the other reqs in buffer + if !l.addConflictKeys(req) { + buffer[i] = req + i++ + continue } - buffer = buffer[:i] + // Req will no longer be part of a buffer + l.request(req) } + buffer = buffer[:i] } - for req := range l.reqs { - req.conflicts = l.conflictKeysForReq(req) - if l.addConflictKeys(req) { - l.request(req) - } else { - buffer = append(buffer, req) + t := time.NewTicker(5 * time.Second) + defer t.Stop() + +outer: + for { + select { + case req, ok := <-l.reqs: + if !ok { + break outer + } + req.conflicts = l.conflictKeysForReq(req) + if l.addConflictKeys(req) { + l.request(req) + } else { + buffer = append(buffer, req) + } + + case <-t.C: + for { + drain() + if len(buffer) < l.opts.bufferSize { + break + } + time.Sleep(100 * time.Millisecond) + } } - drain(l.opts.bufferSize - 1) } - drain(0) + for len(buffer) > 0 { + select { + case <-t.C: + fmt.Printf("[%2d] Draining. len(buffer): %d\n", id, len(buffer)) + default: + } + + drain() + time.Sleep(100 * time.Millisecond) + } + fmt.Printf("[%2d] Looped %d times over buffered requests.\n", id, loops) } func (l *loader) printCounters() { @@ -429,9 +459,11 @@ func (l *loader) printCounters() { r.Capture(c.Nquads) elapsed := time.Since(start).Round(time.Second) timestamp := time.Now().Format("15:04:05Z0700") - fmt.Printf("[%s] Elapsed: %s Txns: %d N-Quads: %s N-Quads/s: %s Aborts: %d\n", + fmt.Printf("[%s] Elapsed: %s Txns: %d N-Quads: %s N-Quads/s: %s"+ + " Inflight: %2d/%2d Aborts: %d\n", timestamp, x.FixedDuration(elapsed), c.TxnsDone, - humanize.Comma(int64(c.Nquads)), humanize.Comma(int64(r.Rate())), c.Aborts) + humanize.Comma(int64(c.Nquads)), humanize.Comma(int64(r.Rate())), + atomic.LoadInt32(&l.inflight), atomic.LoadInt32(&l.conc), c.Aborts) } } diff --git a/dgraph/cmd/live/run.go b/dgraph/cmd/live/run.go index 5d95eb6372c..089f5f37f81 100644 --- a/dgraph/cmd/live/run.go +++ b/dgraph/cmd/live/run.go @@ -648,7 +648,7 @@ func setup(opts batchMutationOptions, dc *dgo.Dgraph, conf *viper.Viper) *loader l.requestsWg.Add(opts.Pending) for i := 0; i < opts.Pending; i++ { - go l.makeRequests() + go l.makeRequests(i) } rand.Seed(time.Now().Unix()) diff --git a/dgraph/main.go b/dgraph/main.go index a472b913d5c..611595d34cc 100644 --- a/dgraph/main.go +++ b/dgraph/main.go @@ -34,7 +34,13 @@ func main() { // benchmark notes are located in badger-bench/randread. runtime.GOMAXPROCS(128) - absDiff := func(a, b uint64) uint64 { + absU := func(a, b uint64) uint64 { + if a > b { + return a - b + } + return b - a + } + abs := func(a, b int) int { if a > b { return a - b } @@ -53,11 +59,12 @@ func main() { var js z.MemStats var lastAlloc uint64 + var numGo int for range ticker.C { // Read Jemalloc stats first. Print if there's a big difference. z.ReadMemStats(&js) - if diff := absDiff(uint64(z.NumAllocBytes()), lastAlloc); diff > 1<<30 { + if diff := absU(uint64(z.NumAllocBytes()), lastAlloc); diff > 1<<30 { glog.V(2).Infof("NumAllocBytes: %s jemalloc: Active %s Allocated: %s"+ " Resident: %s Retained: %s\n", humanize.IBytes(uint64(z.NumAllocBytes())), @@ -69,7 +76,13 @@ func main() { } runtime.ReadMemStats(&ms) - diff := absDiff(ms.HeapAlloc, lastMs.HeapAlloc) + diff := absU(ms.HeapAlloc, lastMs.HeapAlloc) + + curGo := runtime.NumGoroutine() + if diff := abs(curGo, numGo); diff >= 64 { + glog.V(2).Infof("Num goroutines: %d\n", curGo) + numGo = curGo + } switch { case ms.NumGC > lastNumGC: diff --git a/edgraph/server.go b/edgraph/server.go index f25216e6faf..c45cac8efa5 100644 --- a/edgraph/server.go +++ b/edgraph/server.go @@ -538,9 +538,16 @@ func (s *Server) Alter(ctx context.Context, op *api.Operation) (*api.Payload, er // TODO: Maybe add some checks about the schema. m.Schema = result.Preds m.Types = result.Types - _, err = query.ApplyMutations(ctx, m) + for i := 0; i < 3; i++ { + _, err = query.ApplyMutations(ctx, m) + if err != nil && strings.Contains(err.Error(), "Please retry operation") { + time.Sleep(time.Second) + continue + } + break + } if err != nil { - return empty, err + return empty, errors.Wrapf(err, "During ApplyMutations") } // wait for indexing to complete or context to be canceled. diff --git a/ee/acl/acl_curl_test.go b/ee/acl/acl_curl_test.go index c4723027db4..dcffc1a7fce 100644 --- a/ee/acl/acl_curl_test.go +++ b/ee/acl/acl_curl_test.go @@ -40,7 +40,7 @@ func TestCurlAuthorization(t *testing.T) { // test query through curl token, err := testutil.HttpLogin(&testutil.LoginParams{ Endpoint: adminEndpoint, - UserID: userid, + UserID: commonUserId, Passwd: userpassword, Namespace: x.GalaxyNamespace, }) diff --git a/ee/acl/acl_test.go b/ee/acl/acl_test.go index 147915c4689..5bf44e43a63 100644 --- a/ee/acl/acl_test.go +++ b/ee/acl/acl_test.go @@ -17,6 +17,7 @@ import ( "encoding/json" "errors" "fmt" + "math/rand" "os" "strconv" "strings" @@ -32,7 +33,7 @@ import ( ) var ( - userid = "alice" + commonUserId = "alice" userpassword = "simplepassword" ) @@ -230,16 +231,16 @@ func TestCreateAndDeleteUsers(t *testing.T) { // adding the user again should fail token := testutil.GrootHttpLogin(adminEndpoint) - resp := createUser(t, token, userid, userpassword) + resp := createUser(t, token, commonUserId, userpassword) require.Equal(t, 1, len(resp.Errors)) require.Equal(t, "couldn't rewrite mutation addUser because failed to rewrite mutation payload because id"+ " alice already exists for field name inside type User", resp.Errors[0].Message) checkUserCount(t, resp.Data, 0) // delete the user - _ = deleteUser(t, token, userid, true) + _ = deleteUser(t, token, commonUserId, true) - resp = createUser(t, token, userid, userpassword) + resp = createUser(t, token, commonUserId, userpassword) resp.RequireNoGraphQLErrors(t) // now we should be able to create the user again checkUserCount(t, resp.Data, 1) @@ -248,11 +249,11 @@ func TestCreateAndDeleteUsers(t *testing.T) { func resetUser(t *testing.T) { token := testutil.GrootHttpLogin(adminEndpoint) // clean up the user to allow repeated running of this test - deleteUserResp := deleteUser(t, token, userid, false) + deleteUserResp := deleteUser(t, token, commonUserId, false) deleteUserResp.RequireNoGraphQLErrors(t) glog.Infof("deleted user") - resp := createUser(t, token, userid, userpassword) + resp := createUser(t, token, commonUserId, userpassword) resp.RequireNoGraphQLErrors(t) checkUserCount(t, resp.Data, 1) glog.Infof("created user") @@ -353,8 +354,8 @@ const expireJwtSleep = 21 * time.Second func testAuthorization(t *testing.T, dg *dgo.Dgraph) { createAccountAndData(t, dg) ctx := context.Background() - if err := dg.LoginIntoNamespace(ctx, userid, userpassword, x.GalaxyNamespace); err != nil { - t.Fatalf("unable to login using the account %v", userid) + if err := dg.LoginIntoNamespace(ctx, commonUserId, userpassword, x.GalaxyNamespace); err != nil { + t.Fatalf("unable to login using the account %v", commonUserId) } // initially the query should return empty result, mutate and alter @@ -818,7 +819,7 @@ func createGroupAndAcls(t *testing.T, group string, addUserToGroup bool) { // add the user to the group if addUserToGroup { - addToGroup(t, token, userid, group) + addToGroup(t, token, commonUserId, group) } rules := []rule{ @@ -855,7 +856,7 @@ func TestPredicatePermission(t *testing.T) { } createAccountAndData(t, dg) ctx := context.Background() - err = dg.LoginIntoNamespace(ctx, userid, userpassword, x.GalaxyNamespace) + err = dg.LoginIntoNamespace(ctx, commonUserId, userpassword, x.GalaxyNamespace) require.NoError(t, err, "Logging in with the current password should have succeeded") // Schema query is allowed to all logged in users. @@ -928,7 +929,7 @@ func TestUnauthorizedDeletion(t *testing.T) { require.NoError(t, err, "login failed") createGroup(t, token, devGroup) - addToGroup(t, token, userid, devGroup) + addToGroup(t, token, commonUserId, devGroup) txn := dg.NewTxn() mutation := &api.Mutation{ @@ -947,7 +948,7 @@ func TestUnauthorizedDeletion(t *testing.T) { require.NoError(t, err) time.Sleep(defaultTimeToSleep) - err = userClient.LoginIntoNamespace(ctx, userid, userpassword, x.GalaxyNamespace) + err = userClient.LoginIntoNamespace(ctx, commonUserId, userpassword, x.GalaxyNamespace) require.NoError(t, err) _, err = deleteUsingNQuad(userClient, "<"+nodeUID+">", "<"+unAuthPred+">", "*") @@ -1095,7 +1096,7 @@ func TestQueryRemoveUnauthorizedPred(t *testing.T) { }) require.NoError(t, err, "login failed") createGroup(t, token, devGroup) - addToGroup(t, token, userid, devGroup) + addToGroup(t, token, commonUserId, devGroup) txn := dg.NewTxn() mutation := &api.Mutation{ @@ -1121,7 +1122,7 @@ func TestQueryRemoveUnauthorizedPred(t *testing.T) { require.NoError(t, err) time.Sleep(defaultTimeToSleep) - err = userClient.LoginIntoNamespace(ctx, userid, userpassword, x.GalaxyNamespace) + err = userClient.LoginIntoNamespace(ctx, commonUserId, userpassword, x.GalaxyNamespace) require.NoError(t, err) tests := []struct { @@ -1247,7 +1248,7 @@ func TestExpandQueryWithACLPermissions(t *testing.T) { createGroup(t, token, sreGroup) addRulesToGroup(t, token, sreGroup, []rule{{"age", Read.Code}, {"name", Write.Code}}) - addToGroup(t, token, userid, devGroup) + addToGroup(t, token, commonUserId, devGroup) txn := dg.NewTxn() mutation := &api.Mutation{ @@ -1278,7 +1279,7 @@ func TestExpandQueryWithACLPermissions(t *testing.T) { require.NoError(t, err) time.Sleep(defaultTimeToSleep) - err = userClient.LoginIntoNamespace(ctx, userid, userpassword, x.GalaxyNamespace) + err = userClient.LoginIntoNamespace(ctx, commonUserId, userpassword, x.GalaxyNamespace) require.NoError(t, err) // Query via user when user has no permissions @@ -1313,7 +1314,7 @@ func TestExpandQueryWithACLPermissions(t *testing.T) { }) require.NoError(t, err, "login failed") // Add alice to sre group which has read access to and write access to - addToGroup(t, token, userid, sreGroup) + addToGroup(t, token, commonUserId, sreGroup) time.Sleep(defaultTimeToSleep) resp, err = userClient.NewReadOnlyTxn().Query(ctx, query) @@ -1374,7 +1375,7 @@ func TestDeleteQueryWithACLPermissions(t *testing.T) { createGroup(t, token, devGroup) - addToGroup(t, token, userid, devGroup) + addToGroup(t, token, commonUserId, devGroup) txn := dg.NewTxn() mutation := &api.Mutation{ @@ -1411,7 +1412,7 @@ func TestDeleteQueryWithACLPermissions(t *testing.T) { require.NoError(t, err) time.Sleep(defaultTimeToSleep) - err = userClient.LoginIntoNamespace(ctx, userid, userpassword, x.GalaxyNamespace) + err = userClient.LoginIntoNamespace(ctx, commonUserId, userpassword, x.GalaxyNamespace) require.NoError(t, err) // delete S * * (user now has permission to name and age) @@ -1490,7 +1491,7 @@ func TestValQueryWithACLPermissions(t *testing.T) { // createGroup(t, accessJwt, sreGroup) // addRulesToGroup(t, accessJwt, sreGroup, []rule{{"age", Read.Code}, {"name", Write.Code}}) - addToGroup(t, token, userid, devGroup) + addToGroup(t, token, commonUserId, devGroup) txn := dg.NewTxn() mutation := &api.Mutation{ @@ -1625,7 +1626,7 @@ func TestValQueryWithACLPermissions(t *testing.T) { require.NoError(t, err) time.Sleep(defaultTimeToSleep) - err = userClient.LoginIntoNamespace(ctx, userid, userpassword, x.GalaxyNamespace) + err = userClient.LoginIntoNamespace(ctx, commonUserId, userpassword, x.GalaxyNamespace) require.NoError(t, err) // Query via user when user has no permissions @@ -1694,7 +1695,7 @@ func TestNewACLPredicates(t *testing.T) { require.NoError(t, err) time.Sleep(defaultTimeToSleep) - err = userClient.LoginIntoNamespace(ctx, userid, userpassword, x.GalaxyNamespace) + err = userClient.LoginIntoNamespace(ctx, commonUserId, userpassword, x.GalaxyNamespace) require.NoError(t, err) queryTests := []struct { @@ -1812,7 +1813,7 @@ func TestDeleteRule(t *testing.T) { require.NoError(t, err) time.Sleep(defaultTimeToSleep) - err = userClient.LoginIntoNamespace(ctx, userid, userpassword, x.GalaxyNamespace) + err = userClient.LoginIntoNamespace(ctx, commonUserId, userpassword, x.GalaxyNamespace) require.NoError(t, err) queryName := "{me(func: has(name)) {name}}" @@ -1876,7 +1877,7 @@ func addDataAndRules(ctx context.Context, t *testing.T, dg *dgo.Dgraph) map[stri userid as var(func: eq(dgraph.xid, "%s")) gid as var(func: eq(dgraph.type, "dgraph.type.Group")) @filter(eq(dgraph.xid, "dev") OR eq(dgraph.xid, "dev-a")) - }`, userid) + }`, commonUserId) addAliceToGroups := &api.NQuad{ Subject: "uid(userid)", Predicate: "dgraph.user.group", @@ -1936,7 +1937,7 @@ func TestQueryUserInfo(t *testing.T) { token, err := testutil.HttpLogin(&testutil.LoginParams{ Endpoint: adminEndpoint, - UserID: userid, + UserID: commonUserId, Passwd: userpassword, Namespace: x.GalaxyNamespace, }) @@ -2022,7 +2023,7 @@ func TestQueryUserInfo(t *testing.T) { userClient, err := testutil.DgraphClient(testutil.SockAddr) require.NoError(t, err) - err = userClient.LoginIntoNamespace(ctx, userid, userpassword, x.GalaxyNamespace) + err = userClient.LoginIntoNamespace(ctx, commonUserId, userpassword, x.GalaxyNamespace) require.NoError(t, err) resp, err := userClient.NewReadOnlyTxn().Query(ctx, query) @@ -2113,7 +2114,7 @@ func TestQueriesForNonGuardianUserWithoutGroup(t *testing.T) { token, err := testutil.HttpLogin(&testutil.LoginParams{ Endpoint: adminEndpoint, - UserID: userid, + UserID: commonUserId, Passwd: userpassword, Namespace: x.GalaxyNamespace, }) @@ -2340,7 +2341,7 @@ func TestSchemaQueryWithACL(t *testing.T) { // the other user should be able to view only the part of schema for which it has read access dg, err = testutil.DgraphClient(testutil.SockAddr) require.NoError(t, err) - require.NoError(t, dg.LoginIntoNamespace(context.Background(), userid, userpassword, x.GalaxyNamespace)) + require.NoError(t, dg.LoginIntoNamespace(context.Background(), commonUserId, userpassword, x.GalaxyNamespace)) resp, err = dg.NewReadOnlyTxn().Query(context.Background(), schemaQuery) require.NoError(t, err) require.JSONEq(t, aliceSchema, string(resp.GetJson())) @@ -2362,7 +2363,7 @@ func TestDeleteUserShouldDeleteUserFromGroup(t *testing.T) { }) require.NoError(t, err, "login failed") - _ = deleteUser(t, token, userid, true) + _ = deleteUser(t, token, commonUserId, true) gqlQuery := ` query { @@ -2592,7 +2593,7 @@ func assertNonGuardianFailure(t *testing.T, queryName string, respIsNull bool, token, err := testutil.HttpLogin(&testutil.LoginParams{ Endpoint: adminEndpoint, - UserID: userid, + UserID: commonUserId, Passwd: userpassword, Namespace: x.GalaxyNamespace, }) @@ -2602,7 +2603,7 @@ func assertNonGuardianFailure(t *testing.T, queryName string, respIsNull bool, require.Len(t, resp.Errors, 1) require.Contains(t, resp.Errors[0].Message, fmt.Sprintf("rpc error: code = PermissionDenied desc = Only guardians are allowed access."+ - " User '%s' is not a member of guardians group.", userid)) + " User '%s' is not a member of guardians group.", commonUserId)) if len(resp.Data) != 0 { queryVal := "null" if !respIsNull { @@ -2950,7 +2951,7 @@ func TestAllowUIDAccess(t *testing.T) { }) require.NoError(t, err, "login failed") createGroup(t, token, devGroup) - addToGroup(t, token, userid, devGroup) + addToGroup(t, token, commonUserId, devGroup) require.NoError(t, testutil.AssignUids(101)) mutation := &api.Mutation{ @@ -2969,7 +2970,7 @@ func TestAllowUIDAccess(t *testing.T) { require.NoError(t, err) time.Sleep(defaultTimeToSleep) - err = userClient.LoginIntoNamespace(ctx, userid, userpassword, x.GalaxyNamespace) + err = userClient.LoginIntoNamespace(ctx, commonUserId, userpassword, x.GalaxyNamespace) require.NoError(t, err) uidQuery := ` @@ -2987,7 +2988,8 @@ func TestAllowUIDAccess(t *testing.T) { } func TestAddNewPredicate(t *testing.T) { - ctx, _ := context.WithTimeout(context.Background(), 100*time.Second) + ctx, cancel := context.WithTimeout(context.Background(), 100*time.Second) + defer cancel() dg, err := testutil.DgraphClientWithGroot(testutil.SockAddr) require.NoError(t, err) @@ -2995,30 +2997,40 @@ func TestAddNewPredicate(t *testing.T) { testutil.DropAll(t, dg) resetUser(t) + id := fmt.Sprintf("%02d", rand.Intn(100)) + userId, newPred := "alice"+id, "newpred"+id + + t.Logf("Creating user: %s\n", userId) + token := testutil.GrootHttpLogin(adminEndpoint) + resp := createUser(t, token, userId, userpassword) + resp.RequireNoGraphQLErrors(t) + userClient, err := testutil.DgraphClient(testutil.SockAddr) require.NoError(t, err) - err = userClient.LoginIntoNamespace(ctx, userid, userpassword, x.GalaxyNamespace) + err = userClient.LoginIntoNamespace(ctx, userId, userpassword, x.GalaxyNamespace) require.NoError(t, err) + t.Logf("Will create new predicate: %s for user: %s\n", newPred, userId) + // Alice doesn't have access to create new predicate. err = userClient.Alter(ctx, &api.Operation{ - Schema: `newpred: string .`, + Schema: newPred + ": string .", }) require.Error(t, err, "User can't create new predicate. Alter should have returned error.") - token, err := testutil.HttpLogin(&testutil.LoginParams{ - Endpoint: adminEndpoint, - UserID: "groot", - Passwd: "password", - Namespace: x.GalaxyNamespace, - }) - require.NoError(t, err, "login failed") - addToGroup(t, token, userid, "guardians") - time.Sleep(expireJwtSleep) + addToGroup(t, token, userId, "guardians") + + // Login again to refresh our token. + err = userClient.LoginIntoNamespace(ctx, userId, userpassword, x.GalaxyNamespace) + require.NoError(t, err) // Alice is a guardian now, it can create new predicate. - err = userClient.Alter(ctx, &api.Operation{ - Schema: `newpred: string .`, + err = x.RetryUntilSuccess(60, time.Second, func() error { + err := userClient.Alter(ctx, &api.Operation{ + Schema: newPred + ": string .", + }) + t.Logf("While creating new predicate: %s, got error: %v\n", newPred, err) + return err }) require.NoError(t, err, "User is a guardian. Alter should have succeeded.") } @@ -3169,9 +3181,9 @@ func TestMutationWithValueVar(t *testing.T) { Passwd: "password", }) require.NoError(t, err) - createUser(t, token, userid, userpassword) + createUser(t, token, commonUserId, userpassword) createGroup(t, token, devGroup) - addToGroup(t, token, userid, devGroup) + addToGroup(t, token, commonUserId, devGroup) addRulesToGroup(t, token, devGroup, []rule{ { Predicate: "name", @@ -3207,7 +3219,7 @@ func TestMutationWithValueVar(t *testing.T) { userClient, err := testutil.DgraphClient(testutil.SockAddr) require.NoError(t, err) - err = userClient.LoginIntoNamespace(ctx, userid, userpassword, x.GalaxyNamespace) + err = userClient.LoginIntoNamespace(ctx, commonUserId, userpassword, x.GalaxyNamespace) require.NoError(t, err) _, err = userClient.NewTxn().Do(ctx, &api.Request{ @@ -3248,13 +3260,13 @@ func TestFailedLogin(t *testing.T) { require.NoError(t, err) // User is not present - err = client.LoginIntoNamespace(ctx, userid, "simplepassword", x.GalaxyNamespace) + err = client.LoginIntoNamespace(ctx, commonUserId, "simplepassword", x.GalaxyNamespace) require.Error(t, err) require.Contains(t, err.Error(), x.ErrorInvalidLogin.Error()) resetUser(t) // User is present - err = client.LoginIntoNamespace(ctx, userid, "randomstring", x.GalaxyNamespace) + err = client.LoginIntoNamespace(ctx, commonUserId, "randomstring", x.GalaxyNamespace) require.Error(t, err) require.Contains(t, err.Error(), x.ErrorInvalidLogin.Error()) } diff --git a/go.mod b/go.mod index 9f8a5217b53..cf9cf499bc9 100644 --- a/go.mod +++ b/go.mod @@ -4,7 +4,6 @@ go 1.12 // replace github.com/dgraph-io/badger/v3 => /home/mrjn/go/src/github.com/dgraph-io/badger // replace github.com/dgraph-io/ristretto => /home/mrjn/go/src/github.com/dgraph-io/ristretto - // replace github.com/dgraph-io/roaring => /home/mrjn/go/src/github.com/dgraph-io/roaring require ( @@ -12,14 +11,13 @@ require ( contrib.go.opencensus.io/exporter/prometheus v0.1.0 github.com/DataDog/datadog-go v0.0.0-20190425163447-40bafcb5f6c1 // indirect github.com/DataDog/opencensus-go-exporter-datadog v0.0.0-20190503082300-0f32ad59ab08 - github.com/DataDog/zstd v1.4.5 // indirect github.com/Masterminds/semver/v3 v3.1.0 github.com/Microsoft/go-winio v0.4.15 // indirect github.com/OneOfOne/xxhash v1.2.5 // indirect github.com/Shopify/sarama v1.27.2 github.com/blevesearch/bleve v1.0.13 github.com/codahale/hdrhistogram v0.0.0-20161010025455-3a0bb77429bd - github.com/dgraph-io/badger/v3 v3.0.0-20210405181011-d918b9904b2a + github.com/dgraph-io/badger/v3 v3.0.0-20210428203904-e4002b7758c8 github.com/dgraph-io/dgo/v210 v210.0.0-20210407152819-261d1c2a6987 github.com/dgraph-io/gqlgen v0.13.2 github.com/dgraph-io/gqlparser/v2 v2.2.0 diff --git a/go.sum b/go.sum index 31a40a898a8..aa2d3e3a099 100644 --- a/go.sum +++ b/go.sum @@ -28,9 +28,8 @@ github.com/DataDog/datadog-go v0.0.0-20190425163447-40bafcb5f6c1 h1:fSu93OUqfEko github.com/DataDog/datadog-go v0.0.0-20190425163447-40bafcb5f6c1/go.mod h1:LButxg5PwREeZtORoXG3tL4fMGNddJ+vMq1mwgfaqoQ= github.com/DataDog/opencensus-go-exporter-datadog v0.0.0-20190503082300-0f32ad59ab08 h1:5btKvK+N+FpW0EEgvxq7LWcUEwIRLsL4IwIo0u+Qlhs= github.com/DataDog/opencensus-go-exporter-datadog v0.0.0-20190503082300-0f32ad59ab08/go.mod h1:gMGUEe16aZh0QN941HgDjwrdjU4iTthPoz2/AtDRADE= -github.com/DataDog/zstd v1.4.1/go.mod h1:1jcaCB/ufaK+sKp1NBhlGmpz41jOoPQ35bpF36t7BBo= -github.com/DataDog/zstd v1.4.5 h1:EndNeuB0l9syBZhut0wns3gV1hL8zX8LIu6ZiVHWLIQ= -github.com/DataDog/zstd v1.4.5/go.mod h1:1jcaCB/ufaK+sKp1NBhlGmpz41jOoPQ35bpF36t7BBo= +github.com/DataDog/zstd v1.4.6-0.20210216161059-8cb8bacba7ba h1:3qB2yylqW3kVPr9QoPZtTJOXsJOUdNWT2CrZcifhs5g= +github.com/DataDog/zstd v1.4.6-0.20210216161059-8cb8bacba7ba/go.mod h1:g4AWEaM3yOg3HYfnJ3YIawPnVdXJh9QME85blwSAmyw= github.com/Joker/hpp v1.0.0/go.mod h1:8x5n+M1Hp5hC0g8okX3sR3vFQwynaX/UgSOM9MeBKzY= github.com/Joker/jade v1.0.1-0.20190614124447-d475f43051e7/go.mod h1:6E6s8o2AE4KhCrqr6GRJjdC/gNfTdxkIXvuGZZda2VM= github.com/Masterminds/semver/v3 v3.1.0 h1:Y2lUDsFKVRSYGojLJ1yLxSXdMmMYTYls0rCvoqmMUQk= @@ -121,8 +120,8 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/dgraph-io/badger v1.6.0 h1:DshxFxZWXUcO0xX476VJC07Xsr6ZCBVRHKZ93Oh7Evo= github.com/dgraph-io/badger v1.6.0/go.mod h1:zwt7syl517jmP8s94KqSxTlM6IMsdhYy6psNgSztDR4= -github.com/dgraph-io/badger/v3 v3.0.0-20210405181011-d918b9904b2a h1:KUJzMbhVSuSDkXXkV0yI1Uj/hGNOGTUEc0dbusDixas= -github.com/dgraph-io/badger/v3 v3.0.0-20210405181011-d918b9904b2a/go.mod h1:GHMCYxuDWyzbHkh4k3yyg4PM61tJPFfEGSMbE3Vd5QE= +github.com/dgraph-io/badger/v3 v3.0.0-20210428203904-e4002b7758c8 h1:w7sKwba8ZssNrGmMZAc0rfNfVevem6PUs+3itgzB9ww= +github.com/dgraph-io/badger/v3 v3.0.0-20210428203904-e4002b7758c8/go.mod h1:yxVCGRWH2KUhO8ywX5MT2jR4kgjaCJkVEWENdVAw2eM= github.com/dgraph-io/dgo/v210 v210.0.0-20210407152819-261d1c2a6987 h1:5aN6H88a2q3HkO8vSZxDlgjEpJf4Fz8rfy+/Wzx2uAc= github.com/dgraph-io/dgo/v210 v210.0.0-20210407152819-261d1c2a6987/go.mod h1:dCzdThGGTPYOAuNtrM6BiXj/86voHn7ZzkPL6noXR3s= github.com/dgraph-io/gqlgen v0.13.2 h1:TNhndk+eHKj5qE7BenKKSYdSIdOGhLqxR1rCiMso9KM= diff --git a/graphql/e2e/common/common.go b/graphql/e2e/common/common.go index 40aeea213be..7c206ebb0bb 100644 --- a/graphql/e2e/common/common.go +++ b/graphql/e2e/common/common.go @@ -59,6 +59,7 @@ var ( "is already running", "retry again, server is not ready", // given by Dgraph while applying the snapshot "Unavailable: Server not ready", // given by GraphQL layer, during init on admin server + "Please retry operation", } retryableCreateNamespaceErrors = append(retryableUpdateGQLSchemaErrors, diff --git a/posting/index.go b/posting/index.go index 25dad3d216b..c4910ca25f7 100644 --- a/posting/index.go +++ b/posting/index.go @@ -616,7 +616,7 @@ func (r *rebuilder) Run(ctx context.Context) error { } // Convert data into deltas. - txn.Update() + txn.Update(ctx) // txn.cache.Lock() is not required because we are the only one making changes to txn. kvs := make([]*bpb.KV, 0, len(txn.cache.deltas)) diff --git a/posting/index_test.go b/posting/index_test.go index 7e7e1bf1477..86870d70140 100644 --- a/posting/index_test.go +++ b/posting/index_test.go @@ -20,10 +20,12 @@ import ( "bytes" "context" "math" + "sync" "testing" "time" "github.com/dgraph-io/badger/v3" + "github.com/dgraph-io/badger/v3/y" "github.com/stretchr/testify/require" "github.com/dgraph-io/dgraph/protos/pb" @@ -157,10 +159,20 @@ func addMutation(t *testing.T, l *List, edge *pb.DirectedEdge, op uint32, require.NoError(t, err) } - txn.Update() - writer := NewTxnWriter(pstore) - require.NoError(t, txn.CommitToDisk(writer, commitTs)) - require.NoError(t, writer.Flush()) + txn.Update(context.Background()) + sl := txn.Skiplist() + + itr := sl.NewUniIterator(false) + itr.Rewind() + for itr.Valid() { + y.SetKeyTs(itr.Key(), commitTs) + itr.Next() + } + + var wg sync.WaitGroup + wg.Add(1) + pstore.HandoverSkiplist(sl, wg.Done) + wg.Wait() } const schemaVal = ` diff --git a/posting/list_test.go b/posting/list_test.go index 7b9ad2f3fa9..f1e0c549b3a 100644 --- a/posting/list_test.go +++ b/posting/list_test.go @@ -982,7 +982,7 @@ func TestLargePlistSplit(t *testing.T) { key := x.DataKey(uuid.New().String(), 1331) ol, err := getNew(key, ps, math.MaxUint64) require.NoError(t, err) - b := make([]byte, 30<<20) + b := make([]byte, 5<<20) rand.Read(b) for i := 1; i <= 2; i++ { edge := &pb.DirectedEdge{ @@ -999,7 +999,7 @@ func TestLargePlistSplit(t *testing.T) { ol, err = getNew(key, ps, math.MaxUint64) require.NoError(t, err) - b = make([]byte, 10<<20) + b = make([]byte, 5<<20) rand.Read(b) for i := 0; i < 63; i++ { edge := &pb.DirectedEdge{ diff --git a/posting/mvcc.go b/posting/mvcc.go index dbfab9957f9..663d05fa384 100644 --- a/posting/mvcc.go +++ b/posting/mvcc.go @@ -18,15 +18,18 @@ package posting import ( "bytes" + "encoding/binary" "encoding/hex" "math" + "sort" "strconv" "sync" "sync/atomic" "time" "github.com/dgraph-io/badger/v3" - bpb "github.com/dgraph-io/badger/v3/pb" + "github.com/dgraph-io/badger/v3/skl" + "github.com/dgraph-io/badger/v3/y" "github.com/dgraph-io/dgo/v210/protos/api" "github.com/dgraph-io/dgraph/protos/pb" "github.com/dgraph-io/dgraph/x" @@ -78,7 +81,7 @@ func init() { } // rollUpKey takes the given key's posting lists, rolls it up and writes back to badger -func (ir *incrRollupi) rollUpKey(writer *TxnWriter, key []byte) error { +func (ir *incrRollupi) rollUpKey(sl *skl.Skiplist, key []byte) error { l, err := GetNoStore(key, math.MaxUint64) if err != nil { return err @@ -97,7 +100,23 @@ func (ir *incrRollupi) rollUpKey(writer *TxnWriter, key []byte) error { glog.V(2).Infof("Rolled up %d keys", count) } } - return writer.Write(&bpb.KVList{Kv: kvs}) + + for _, kv := range kvs { + vs := y.ValueStruct{ + Value: kv.Value, + } + if len(kv.UserMeta) > 0 { + vs.UserMeta = kv.UserMeta[0] + } + switch vs.UserMeta { + case BitCompletePosting, BitEmptyPosting: + vs.Meta = badger.BitDiscardEarlierVersions + default: + } + sl.Put(y.KeyWithTs(kv.Key, kv.Version), vs) + } + + return nil } // TODO: When the opRollup is not running the keys from keysPool of ir are dropped. Figure out some @@ -124,34 +143,52 @@ func (ir *incrRollupi) addKeyToBatch(key []byte, priority int) { func (ir *incrRollupi) Process(closer *z.Closer) { defer closer.Done() - writer := NewTxnWriter(pstore) - defer writer.Flush() - m := make(map[uint64]int64) // map hash(key) to ts. hash(key) to limit the size of the map. + limiter := time.NewTicker(time.Millisecond) defer limiter.Stop() + cleanupTick := time.NewTicker(5 * time.Minute) defer cleanupTick.Stop() - forceRollupTick := time.NewTicker(500 * time.Millisecond) - defer forceRollupTick.Stop() + baseTick := time.NewTicker(500 * time.Millisecond) + defer baseTick.Stop() + + const initSize = 1 << 20 + sl := skl.NewGrowingSkiplist(initSize) + + handover := func() { + if sl.Empty() { + return + } + if err := x.RetryUntilSuccess(3600, time.Second, func() error { + return pstore.HandoverSkiplist(sl, nil) + }); err != nil { + glog.Errorf("Rollup handover skiplist returned error: %v\n", err) + } + // If we have an error, the skiplist might not be safe to use still. So, + // just create a new one always. + sl = skl.NewGrowingSkiplist(initSize) + } doRollup := func(batch *[][]byte, priority int) { currTs := time.Now().Unix() for _, key := range *batch { hash := z.MemHash(key) - if elem := m[hash]; currTs-elem >= 10 { - // Key not present or Key present but last roll up was more than 10 sec ago. - // Add/Update map and rollup. - m[hash] = currTs - if err := ir.rollUpKey(writer, key); err != nil { - glog.Warningf("Error %v rolling up key %v\n", err, key) - } + if elem := m[hash]; currTs-elem < 10 { + continue + } + // Key not present or Key present but last roll up was more than 10 sec ago. + // Add/Update map and rollup. + m[hash] = currTs + if err := ir.rollUpKey(sl, key); err != nil { + glog.Warningf("Error %v rolling up key %v\n", err, key) } } *batch = (*batch)[:0] ir.priorityKeys[priority].keysPool.Put(batch) } + var ticks int for { select { case <-closer.HasBeenClosed(): @@ -164,13 +201,17 @@ func (ir *incrRollupi) Process(closer *z.Closer) { delete(m, hash) } } - case <-forceRollupTick.C: + case <-baseTick.C: batch := ir.priorityKeys[0].keysPool.Get().(*[][]byte) if len(*batch) > 0 { doRollup(batch, 0) } else { ir.priorityKeys[0].keysPool.Put(batch) } + ticks++ + if ticks%4 == 0 { // base tick is every 500ms. This is 2s. + handover() + } case batch := <-ir.priorityKeys[0].keysCh: doRollup(batch, 0) // We don't need a limiter here as we don't expect to call this function frequently. @@ -228,15 +269,10 @@ func (txn *Txn) FillContext(ctx *api.TxnContext, gid uint32) { ctx.Keys = x.Unique(ctx.Keys) txn.Unlock() - txn.Update() txn.cache.fillPreds(ctx, gid) } -// CommitToDisk commits a transaction to disk. -// This function only stores deltas to the commit timestamps. It does not try to generate a state. -// State generation is done via rollups, which happen when a snapshot is created. -// Don't call this for schema mutations. Directly commit them. -func (txn *Txn) CommitToDisk(writer *TxnWriter, commitTs uint64) error { +func (txn *Txn) ToBuffer(buf *z.Buffer, commitTs uint64) error { if commitTs == 0 { return nil } @@ -258,39 +294,81 @@ func (txn *Txn) CommitToDisk(writer *TxnWriter, commitTs uint64) error { } }() - var idx int - for idx < len(keys) { - // writer.update can return early from the loop in case we encounter badger.ErrTxnTooBig. On - // that error, writer.update would still commit the transaction and return any error. If - // nil, we continue to process the remaining keys. - err := writer.update(commitTs, func(btxn *badger.Txn) error { - for ; idx < len(keys); idx++ { - key := keys[idx] - data := cache.deltas[key] - if len(data) == 0 { - continue - } - if ts := cache.maxVersions[key]; ts >= commitTs { - // Skip write because we already have a write at a higher ts. - // Logging here can cause a lot of output when doing Raft log replay. So, let's - // not output anything here. - continue - } - err := btxn.SetEntry(&badger.Entry{ - Key: []byte(key), - Value: data, - UserMeta: BitDeltaPosting, - }) - if err != nil { - return err - } - } - return nil - }) - if err != nil { - return err + for _, key := range keys { + k := []byte(key) + data := cache.deltas[key] + if len(data) == 0 { + continue + } + + if err := badger.ValidEntry(pstore, k, data); err != nil { + glog.Errorf("Invalid Entry. len(key): %d len(val): %d\n", len(k), len(data)) + continue + } + if ts := cache.maxVersions[key]; ts >= commitTs { + // Skip write because we already have a write at a higher ts. + // Logging here can cause a lot of output when doing Raft log replay. So, let's + // not output anything here. + continue + } + + key := y.KeyWithTs(k, commitTs) + val := y.ValueStruct{ + Value: data, + UserMeta: BitDeltaPosting, + } + + dst := buf.SliceAllocate(2 + len(key) + int(val.EncodedSize())) + binary.BigEndian.PutUint16(dst[:2], uint16(len(key))) + x.AssertTrue(len(key) == copy(dst[2:], key)) + x.AssertTrue(uint32(len(dst)-2-len(key)) == val.Encode(dst[2+len(key):])) + } + return nil +} + +// ToSkiplist replaces CommitToDisk. ToSkiplist creates a Badger usable Skiplist from the Txn, so +// it can be passed over to Badger after commit. This only stores deltas to the commit timestamps. +// It does not try to generate a state. State generation is done via rollups, which happen when a +// snapshot is created. Don't call this for schema mutations. Directly commit them. +func (txn *Txn) ToSkiplist() error { + cache := txn.cache + cache.Lock() + defer cache.Unlock() + + var keys []string + for key := range cache.deltas { + keys = append(keys, key) + } + sort.Strings(keys) + + // defer func() { + // Add these keys to be rolled up after we're done writing. This is the right place for them + // to be rolled up, because we just pushed these deltas over to Badger. + // TODO: This is no longer the right place. Figure out a new place for these keys. + // for _, key := range keys { + // IncrRollup.addKeyToBatch([]byte(key), 1) + // } + // }() + + b := skl.NewBuilder(1 << 10) + for _, key := range keys { + k := []byte(key) + data := cache.deltas[key] + if len(data) == 0 { + continue } + + if err := badger.ValidEntry(pstore, k, data); err != nil { + glog.Errorf("Invalid Entry. len(key): %d len(val): %d\n", len(k), len(data)) + continue + } + b.Add(y.KeyWithTs(k, math.MaxUint64), + y.ValueStruct{ + Value: data, + UserMeta: BitDeltaPosting, + }) } + txn.sl = b.Skiplist() return nil } diff --git a/posting/oracle.go b/posting/oracle.go index eaf3cd91d16..a55f0813af4 100644 --- a/posting/oracle.go +++ b/posting/oracle.go @@ -23,10 +23,12 @@ import ( "sync/atomic" "time" + "github.com/dgraph-io/badger/v3/skl" "github.com/dgraph-io/dgraph/protos/pb" "github.com/dgraph-io/dgraph/x" "github.com/golang/glog" ostats "go.opencensus.io/stats" + otrace "go.opencensus.io/trace" ) var o *oracle @@ -67,6 +69,9 @@ type Txn struct { cache *LocalCache // This pointer does not get modified. ErrCh chan error + + slWait sync.WaitGroup + sl *skl.Skiplist } // NewTxn returns a new Txn instance. @@ -89,9 +94,29 @@ func (txn *Txn) GetFromDelta(key []byte) (*List, error) { return txn.cache.GetFromDelta(key) } +func (txn *Txn) Skiplist() *skl.Skiplist { + txn.slWait.Wait() + return txn.sl +} + // Update calls UpdateDeltasAndDiscardLists on the local cache. -func (txn *Txn) Update() { +func (txn *Txn) Update(ctx context.Context) { + txn.Lock() + defer txn.Unlock() txn.cache.UpdateDeltasAndDiscardLists() + + // If we already have a pending Update, then wait for it to be done first. So it does not end up + // overwriting the skiplist that we generate here. + txn.slWait.Wait() + txn.slWait.Add(1) + go func() { + if err := txn.ToSkiplist(); err != nil { + glog.Errorf("While creating skiplist: %v\n", err) + } + span := otrace.FromContext(ctx) + span.Annotate(nil, "ToSkiplist done") + txn.slWait.Done() + }() } // Store is used by tests. @@ -242,6 +267,14 @@ func (o *oracle) WaitForTs(ctx context.Context, startTs uint64) error { } } +func (o *oracle) DeleteTxns(delta *pb.OracleDelta) { + o.Lock() + for _, txn := range delta.Txns { + delete(o.pendingTxns, txn.StartTs) + } + o.Unlock() +} + func (o *oracle) ProcessDelta(delta *pb.OracleDelta) { if glog.V(3) { glog.Infof("ProcessDelta: Max Assigned: %d", delta.MaxAssigned) @@ -257,9 +290,6 @@ func (o *oracle) ProcessDelta(delta *pb.OracleDelta) { o.Lock() defer o.Unlock() - for _, txn := range delta.Txns { - delete(o.pendingTxns, txn.StartTs) - } curMax := o.MaxAssigned() if delta.MaxAssigned < curMax { return diff --git a/query/common_test.go b/query/common_test.go index 1647d0d97c8..e3c270fc546 100644 --- a/query/common_test.go +++ b/query/common_test.go @@ -20,6 +20,7 @@ import ( "context" "encoding/json" "fmt" + "strings" "testing" "time" @@ -44,9 +45,12 @@ func setSchema(schema string) { } func dropPredicate(pred string) { - err := client.Alter(context.Background(), &api.Operation{ - DropAttr: pred, - }) +alter: + err := client.Alter(context.Background(), &api.Operation{DropAttr: pred}) + if err != nil && strings.Contains(err.Error(), "Please retry operation") { + time.Sleep(1 * time.Second) + goto alter + } if err != nil { panic(fmt.Sprintf("Could not drop predicate. Got error %v", err.Error())) } diff --git a/systest/queries_test.go b/systest/queries_test.go index 98be3aee72c..d0700dce193 100644 --- a/systest/queries_test.go +++ b/systest/queries_test.go @@ -24,10 +24,12 @@ import ( "io/ioutil" "net/http" "testing" + "time" "github.com/dgraph-io/dgo/v210" "github.com/dgraph-io/dgo/v210/protos/api" "github.com/dgraph-io/dgraph/testutil" + "github.com/dgraph-io/dgraph/x" "github.com/stretchr/testify/require" ) @@ -1361,8 +1363,11 @@ func EqWithAlteredIndexOrder(t *testing.T, c *dgo.Dgraph) { testutil.CompareJSON(t, expectedResult, string(resp.Json)) // now, let's set the schema with trigram before term - op = &api.Operation{Schema: `name: string @index(trigram, term) .`} - require.NoError(t, c.Alter(ctx, op)) + err = x.RetryUntilSuccess(1, time.Second, func() error { + op = &api.Operation{Schema: `name: string @index(trigram, term) .`} + return c.Alter(ctx, op) + }) + require.NoError(t, err) // querying with eq should still work resp, err = c.NewReadOnlyTxn().Query(ctx, q) diff --git a/t/t.go b/t/t.go index 61fd047a54b..88ab14c603f 100644 --- a/t/t.go +++ b/t/t.go @@ -146,6 +146,8 @@ func detectRace(prefix string) bool { } func outputLogs(prefix string) { + f, err := ioutil.TempFile(".", prefix+"*.log") + x.Check(err) printLogs := func(container string) { in := testutil.GetContainerInstance(prefix, container) c := in.GetContainer() @@ -154,7 +156,9 @@ func outputLogs(prefix string) { } logCmd := exec.Command("docker", "logs", c.ID) out, err := logCmd.CombinedOutput() - fmt.Printf("Docker logs for %d is %s with error %+v ", c.ID, string(out), err) + x.Check(err) + f.Write(out) + // fmt.Printf("Docker logs for %s is %s with error %+v ", c.ID, string(out), err) } for i := 0; i <= 3; i++ { printLogs("zero" + strconv.Itoa(i)) @@ -163,6 +167,11 @@ func outputLogs(prefix string) { for i := 0; i <= 6; i++ { printLogs("alpha" + strconv.Itoa(i)) } + f.Sync() + f.Close() + s := fmt.Sprintf("---> LOGS for %s written to %s .\n", prefix, f.Name()) + _, err = oc.Write([]byte(s)) + x.Check(err) } func stopCluster(composeFile, prefix string, wg *sync.WaitGroup, err error) { @@ -315,6 +324,7 @@ func runTests(taskCh chan task, closer *z.Closer) error { } start() if err = runTestsFor(ctx, task.pkg.ID, prefix); err != nil { + // fmt.Printf("ERROR for package: %s. Err: %v\n", task.pkg.ID, err) return err } } else { diff --git a/testutil/client.go b/testutil/client.go index 90e70acd10f..7e415b04b54 100644 --- a/testutil/client.go +++ b/testutil/client.go @@ -323,17 +323,17 @@ func HttpLogin(params *LoginParams) (*HttpToken, error) { data, found := outputJson["data"].(map[string]interface{}) if !found { - return nil, errors.Wrapf(err, "data entry found in the output") + return nil, errors.Errorf("data entry not found in the output: %s\n", respBody) } l, found := data["login"].(map[string]interface{}) if !found { - return nil, errors.Wrapf(err, "data entry found in the output") + return nil, errors.Errorf("login entry not found in the output: %s\n", respBody) } response, found := l["response"].(map[string]interface{}) if !found { - return nil, errors.Wrapf(err, "data entry found in the output") + return nil, errors.Errorf("response entry not found in the output: %s\n", respBody) } newAccessJwt, found := response["accessJWT"].(string) @@ -374,6 +374,7 @@ func GrootHttpLoginNamespace(endpoint string, namespace uint64) *HttpToken { Namespace: namespace, }) x.Check(err) + x.AssertTrue(token != nil) return token } diff --git a/worker/draft.go b/worker/draft.go index 24a6eac346d..e683fa69b01 100644 --- a/worker/draft.go +++ b/worker/draft.go @@ -42,6 +42,9 @@ import ( "github.com/dgraph-io/badger/v3" bpb "github.com/dgraph-io/badger/v3/pb" + "github.com/dgraph-io/badger/v3/skl" + "github.com/dgraph-io/badger/v3/table" + "github.com/dgraph-io/badger/v3/y" "github.com/dgraph-io/dgraph/conn" "github.com/dgraph-io/dgraph/posting" "github.com/dgraph-io/dgraph/protos/pb" @@ -245,6 +248,16 @@ func (n *node) startTask(id op) (*z.Closer, error) { return closer, nil } +func (n *node) stopTask(id op) { + n.opsLock.Lock() + closer, ok := n.ops[id] + n.opsLock.Unlock() + if !ok { + return + } + closer.SignalAndWait() +} + func (n *node) waitForTask(id op) { n.opsLock.Lock() closer, ok := n.ops[id] @@ -379,7 +392,7 @@ func (n *node) mutationWorker(workerId int) { txn := posting.Oracle().GetTxn(p.Mutations.StartTs) x.AssertTruef(txn != nil, "Unable to find txn with start ts: %d", p.Mutations.StartTs) - txn.ErrCh <- n.processMutations(ctx, p.Mutations, txn) + txn.ErrCh <- n.concMutations(ctx, p.Mutations, txn) close(txn.ErrCh) } @@ -396,7 +409,7 @@ func (n *node) mutationWorker(workerId int) { } } -func (n *node) processMutations(ctx context.Context, m *pb.Mutations, txn *posting.Txn) error { +func (n *node) concMutations(ctx context.Context, m *pb.Mutations, txn *posting.Txn) error { // It is possible that the user gives us multiple versions of the same edge, one with no facets // and another with facets. In that case, use stable sort to maintain the ordering given to us // by the user. @@ -417,7 +430,10 @@ func (n *node) processMutations(ctx context.Context, m *pb.Mutations, txn *posti return x.ErrConflict } // Discard the posting lists from cache to release memory at the end. - defer txn.Update() + defer func() { + txn.Update(ctx) + span.Annotate(nil, "update done") + }() // Update the applied index that we are seeing. if txn.AppliedIndexSeen == 0 { @@ -490,6 +506,11 @@ func (n *node) applyMutations(ctx context.Context, proposal *pb.Proposal) (rerr if proposal.Mutations.DropOp == pb.Mutations_DATA { // Ensures nothing get written to disk due to commit proposals. n.keysWritten.rejectBeforeIndex = proposal.Index + + // Stop rollups, otherwise we might end up overwriting some new data. + n.stopTask(opRollup) + defer n.startTask(opRollup) + posting.Oracle().ResetTxns() if err := posting.DeleteData(); err != nil { return err @@ -503,6 +524,11 @@ func (n *node) applyMutations(ctx context.Context, proposal *pb.Proposal) (rerr if proposal.Mutations.DropOp == pb.Mutations_ALL { // Ensures nothing get written to disk due to commit proposals. n.keysWritten.rejectBeforeIndex = proposal.Index + + // Stop rollups, otherwise we might end up overwriting some new data. + n.stopTask(opRollup) + defer n.startTask(opRollup) + posting.Oracle().ResetTxns() schema.State().DeleteAll() @@ -649,7 +675,7 @@ func (n *node) applyMutations(ctx context.Context, proposal *pb.Proposal) (rerr // If we have an error, re-run this. span.Annotatef(nil, "Re-running mutation from applyCh. Runs: %d", runs) - return n.processMutations(ctx, m, txn) + return n.concMutations(ctx, m, txn) } func (n *node) applyCommitted(proposal *pb.Proposal) error { @@ -986,50 +1012,95 @@ func (n *node) processApplyCh() { } } -// TODO(Anurag - 4 May 2020): Are we using pkey? Remove if unused. -func (n *node) commitOrAbort(pkey uint64, delta *pb.OracleDelta) error { - // First let's commit all mutations to disk. - writer := posting.NewTxnWriter(pstore) - toDisk := func(start, commit uint64) { - txn := posting.Oracle().GetTxn(start) +func (n *node) commitOrAbort(_ uint64, delta *pb.OracleDelta) error { + _, span := otrace.StartSpan(context.Background(), "node.commitOrAbort") + defer span.End() + + span.Annotate(nil, "Start") + start := time.Now() + var numKeys int + + itrStart := time.Now() + var itrs []y.Iterator + var txns []*posting.Txn + var sz int64 + for _, status := range delta.Txns { + txn := posting.Oracle().GetTxn(status.StartTs) if txn == nil { - return + continue } - txn.Update() - - n.keysWritten.totalKeys += len(txn.Deltas()) - ostats.Record(n.ctx, x.NumEdges.M(int64(len(txn.Deltas())))) for k := range txn.Deltas() { - n.keysWritten.keyCommitTs[z.MemHashString(k)] = commit - } - err := x.RetryUntilSuccess(int(x.Config.MaxRetries), - 10*time.Millisecond, func() error { - err := txn.CommitToDisk(writer, commit) - if err == badger.ErrBannedKey { - glog.Errorf("Error while writing to banned namespace.") - return nil - } - return err - }) + n.keysWritten.keyCommitTs[z.MemHashString(k)] = status.CommitTs + } + n.keysWritten.totalKeys += len(txn.Deltas()) + numKeys += len(txn.Deltas()) + if len(txn.Deltas()) == 0 { + continue + } + txns = append(txns, txn) - if err != nil { - glog.Errorf("Error while applying txn status to disk (%d -> %d): %v", - start, commit, err) + sz += txn.Skiplist().MemSize() + // Iterate to set the commit timestamp for all keys. + // Skiplist can block if the conversion to Skiplist isn't done yet. + itr := txn.Skiplist().NewIterator() + itr.SeekToFirst() + for itr.Valid() { + key := itr.Key() + // We don't expect the ordering of the keys to change due to setting their commit + // timestamps. Each key in the skiplist should be unique already. + y.SetKeyTs(key, status.CommitTs) + itr.Next() } - } + itr.Close() - for _, status := range delta.Txns { - toDisk(status.StartTs, status.CommitTs) + itrs = append(itrs, txn.Skiplist().NewUniIterator(false)) } - if err := writer.Flush(); err != nil { - return errors.Wrapf(err, "while flushing to disk") + span.Annotatef(nil, "Num keys: %d Itr: %s\n", numKeys, time.Since(itrStart)) + ostats.Record(n.ctx, x.NumEdges.M(int64(numKeys))) + + // This would be used for callback via Badger when skiplist is pushed to + // disk. + deleteTxns := func() { + posting.Oracle().DeleteTxns(delta) } - if x.WorkerConfig.HardSync { - if err := pstore.Sync(); err != nil { - glog.Errorf("Error while calling Sync while commitOrAbort: %v", err) + + if len(itrs) == 0 { + deleteTxns() + + } else { + sn := time.Now() + mi := table.NewMergeIterator(itrs, false) + mi.Rewind() + + var keys int + b := skl.NewBuilder(int64(float64(sz) * 1.1)) + for mi.Valid() { + b.Add(mi.Key(), mi.Value()) + keys++ + mi.Next() + } + span.Annotatef(nil, "Iterating and skiplist over %d keys took: %s", keys, time.Since(sn)) + err := x.RetryUntilSuccess(3600, time.Second, func() error { + if numKeys == 0 { + return nil + } + // We do the pending txn deletion in the callback, so that our snapshot and checkpoint + // tracking would only consider the txns which have been successfully pushed to disk. + return pstore.HandoverSkiplist(b.Skiplist(), deleteTxns) + }) + if err != nil { + glog.Errorf("while handing over skiplist: %v\n", err) } + span.Annotatef(nil, "Handover skiplist done for %d txns, %d keys", len(delta.Txns), numKeys) } + ms := x.SinceMs(start) + tags := []tag.Mutator{tag.Upsert(x.KeyMethod, "apply.toDisk")} + x.Check(ostats.RecordWithTags(context.Background(), tags, x.LatencyMs.M(ms))) + + // Before, we used to call pstore.Sync() here. We don't need to do that + // anymore because we're not using Badger's WAL. + g := groups() if delta.GroupChecksums != nil && delta.GroupChecksums[g.groupId()] > 0 { atomic.StoreUint64(&g.deltaChecksum, delta.GroupChecksums[g.groupId()]) @@ -1041,9 +1112,11 @@ func (n *node) commitOrAbort(pkey uint64, delta *pb.OracleDelta) error { txn.RemoveCachedKeys() } posting.WaitForCache() + span.Annotate(nil, "cache keys removed") // Now advance Oracle(), so we can service waiting reads. posting.Oracle().ProcessDelta(delta) + span.Annotate(nil, "process delta done") return nil } @@ -1217,7 +1290,8 @@ func (n *node) updateRaftProgress() error { atomic.StoreUint64(&n.checkpointTs, snap.ReadTs) n.Store.SetUint(raftwal.CheckpointIndex, snap.GetIndex()) - glog.V(2).Infof("[%#x] Set Raft progress to index: %d, ts: %d.", n.Id, snap.Index, snap.ReadTs) + glog.V(2).Infof("[%#x] Set Raft checkpoint to index: %d, ts: %d.", + n.Id, snap.Index, snap.ReadTs) return nil } @@ -1225,11 +1299,33 @@ func (n *node) checkpointAndClose(done chan struct{}) { snapshotAfterEntries := x.WorkerConfig.Raft.GetUint64("snapshot-after-entries") x.AssertTruef(snapshotAfterEntries > 10, "raft.snapshot-after must be a number greater than 10") - snapshotFrequency := x.WorkerConfig.Raft.GetDuration("snapshot-after-duration") - slowTicker := time.NewTicker(snapshotFrequency) + slowTicker := time.NewTicker(time.Minute) defer slowTicker.Stop() + exceededSnapshotByEntries := func() bool { + if snapshotAfterEntries == 0 { + // If snapshot-after isn't set, return true always. + return true + } + chk, err := n.Store.Checkpoint() + if err != nil { + glog.Errorf("While reading checkpoint: %v", err) + return false + } + first, err := n.Store.FirstIndex() + if err != nil { + glog.Errorf("While reading first index: %v", err) + return false + } + // If we're over snapshotAfterEntries, calculate would be true. + glog.V(3).Infof("Evaluating snapshot first:%d chk:%d (chk-first:%d) "+ + "snapshotAfterEntries:%d", first, chk, chk-first, + snapshotAfterEntries) + return chk-first > snapshotAfterEntries + } + lastSnapshotTime := time.Now() + snapshotFrequency := x.WorkerConfig.Raft.GetDuration("snapshot-after-duration") for { select { case <-slowTicker.C: @@ -1252,18 +1348,31 @@ func (n *node) checkpointAndClose(done chan struct{}) { continue } - // If we don't have a snapshot, or if there are too many log files in Raft, - // calculate a new snapshot. + // calculate would be true if: + // - snapshot is empty. + // - we have more than 4 log files in Raft WAL. + // - snapshot frequency is zero and exceeding threshold entries. + // - we have exceeded the threshold time since last snapshot and exceeding threshold + // entries. + // + // Note: In case we're exceeding threshold entries, but have not exceeded the + // threshold time since last snapshot, calculate would be false. calculate := raft.IsEmptySnap(snap) || n.Store.NumLogFiles() > 4 + if snapshotFrequency == 0 { + calculate = calculate || exceededSnapshotByEntries() + + } else if time.Since(lastSnapshotTime) > snapshotFrequency { + // If we haven't taken a snapshot since snapshotFrequency, calculate would + // follow snapshot entries. + calculate = calculate || exceededSnapshotByEntries() + } - // Only take snapshot if both snapshotFrequency and - // snapshotAfterEntries requirements are met. If set to 0, - // we consider duration condition to be disabled. - if snapshotFrequency == 0 || time.Since(lastSnapshotTime) > snapshotFrequency { - if chk, err := n.Store.Checkpoint(); err == nil { - if first, err := n.Store.FirstIndex(); err == nil { - // Save some cycles by only calculating snapshot if the checkpoint - // has gone quite a bit further than the first index. + // Check if we're snapshotAfterEntries away from the FirstIndex based off the last + // checkpoint. This is a cheap operation. We're not iterating over the logs. + if chk, err := n.Store.Checkpoint(); err == nil { + if first, err := n.Store.FirstIndex(); err == nil { + // If we're over snapshotAfterEntries, calculate would be true. + if snapshotAfterEntries > 0 { calculate = calculate || chk >= first+snapshotAfterEntries glog.V(3).Infof("Evaluating snapshot first:%d chk:%d (chk-first:%d) "+ "snapshotAfterEntries:%d snap:%v", first, chk, chk-first, @@ -1345,7 +1454,7 @@ func (n *node) Run() { if err != nil { glog.Errorf("While trying to find raft progress: %v", err) } else { - glog.Infof("Found Raft progress: %d", applied) + glog.Infof("Found Raft checkpoint: %d", applied) } var timer x.Timer diff --git a/worker/worker_test.go b/worker/worker_test.go index 1de0a1e7d7c..caaa3999433 100644 --- a/worker/worker_test.go +++ b/worker/worker_test.go @@ -23,12 +23,14 @@ import ( "math" "os" "strings" + "sync" "sync/atomic" "testing" "github.com/stretchr/testify/require" "github.com/dgraph-io/badger/v3" + "github.com/dgraph-io/badger/v3/y" "github.com/dgraph-io/dgo/v210" "github.com/dgraph-io/dgo/v210/protos/api" @@ -60,10 +62,20 @@ func commitTransaction(t *testing.T, edge *pb.DirectedEdge, l *posting.List) { commit := commitTs(startTs) - txn.Update() - writer := posting.NewTxnWriter(pstore) - require.NoError(t, txn.CommitToDisk(writer, commit)) - require.NoError(t, writer.Flush()) + txn.Update(context.Background()) + sl := txn.Skiplist() + + itr := sl.NewUniIterator(false) + itr.Rewind() + for itr.Valid() { + y.SetKeyTs(itr.Key(), commit) + itr.Next() + } + + var wg sync.WaitGroup + wg.Add(1) + pstore.HandoverSkiplist(sl, wg.Done) + wg.Wait() } func timestamp() uint64 {