Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

opt(txn commits): Optimize txns by passing Skiplists to Badger #7777

Merged
merged 22 commits into from
May 5, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 25 additions & 8 deletions dgraph/cmd/alpha/http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -322,12 +322,16 @@ func runRequest(req *http.Request) (*x.QueryResWithData, []byte, *http.Response,
func runWithRetriesForResp(method, contentType, url string, body string) (
*x.QueryResWithData, []byte, *http.Response, error) {

label:
req, err := createRequest(method, contentType, url, body)
if err != nil {
return nil, nil, nil, err
}

qr, respBody, resp, err := runRequest(req)
if err != nil && strings.Contains(err.Error(), "Please retry operation") {
time.Sleep(time.Second)
goto label
}
if err != nil && strings.Contains(err.Error(), "Token is expired") {
err = token.refreshToken()
if err != nil {
Expand Down Expand Up @@ -619,7 +623,13 @@ func TestAlterSanity(t *testing.T) {
`{"drop_all":true}`}

for _, op := range ops {
label:
qr, _, err := runWithRetries("PUT", "", addr+"/alter", op)
if err != nil && strings.Contains(err.Error(), "Please retry") {
t.Logf("Got error: %v. Retrying...", err)
time.Sleep(time.Second)
goto label
}
require.NoError(t, err)
require.Len(t, qr.Errors, 0)
}
Expand Down Expand Up @@ -886,13 +896,20 @@ func TestDrainingMode(t *testing.T) {
require.NoError(t, err, "Got error while running mutation: %v", err)
}

err = alterSchema(`name: string @index(term) .`)
if expectErr {
require.True(t, err != nil && strings.Contains(err.Error(), "the server is in draining mode"))
} else {
require.NoError(t, err, "Got error while running alter: %v", err)
}

err = x.RetryUntilSuccess(3, time.Second, func() error {
err := alterSchema(`name: string @index(term) .`)
if expectErr {
if err == nil {
return errors.New("expected error")
}
if err != nil && strings.Contains(err.Error(), "server is in draining mode") {
return nil
}
return err
}
return err
})
require.NoError(t, err, "Got error while running alter: %v", err)
}

token := testutil.GrootHttpLogin(addr + "/admin")
Expand Down
20 changes: 15 additions & 5 deletions dgraph/cmd/alpha/reindex_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"testing"
"time"

"github.com/dgraph-io/dgraph/x"
"github.com/stretchr/testify/require"
)

Expand All @@ -39,7 +40,10 @@ func TestReindexTerm(t *testing.T) {
require.NoError(t, err)

// perform re-indexing
require.NoError(t, alterSchema(`name: string @index(term) .`))
err = x.RetryUntilSuccess(3, time.Second, func() error {
return alterSchema(`name: string @index(term) .`)
})
require.NoError(t, err)

q1 := `{
q(func: anyofterms(name, "bc")) {
Expand Down Expand Up @@ -67,8 +71,11 @@ func TestReindexLang(t *testing.T) {
_, err := mutationWithTs(mutationInp{body: m1, typ: "application/rdf", commitNow: true})
require.NoError(t, err)

// reindex
require.NoError(t, alterSchema(`name: string @lang @index(exact) .`))
// perform re-indexing
err = x.RetryUntilSuccess(3, time.Second, func() error {
return alterSchema(`name: string @lang @index(exact) .`)
})
require.NoError(t, err)

q1 := `{
q(func: eq(name@en, "Runtime")) {
Expand Down Expand Up @@ -141,8 +148,11 @@ func TestReindexReverseCount(t *testing.T) {
_, err := mutationWithTs(mutationInp{body: m1, typ: "application/rdf", commitNow: true})
require.NoError(t, err)

// reindex
require.NoError(t, alterSchema(`value: [uid] @count @reverse .`))
// perform re-indexing
err = x.RetryUntilSuccess(3, time.Second, func() error {
return alterSchema(`value: [uid] @count @reverse .`)
})
require.NoError(t, err)

q1 := `{
q(func: eq(count(~value), "3")) {
Expand Down
98 changes: 65 additions & 33 deletions dgraph/cmd/live/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,14 +74,12 @@ type loader struct {
retryRequestsWg sync.WaitGroup

// Miscellaneous information to print counters.
// Num of N-Quads sent
nquads uint64
// Num of txns sent
txns uint64
// Num of aborts
aborts uint64
// To get time elapsed
start time.Time
nquads uint64 // Num of N-Quads sent
txns uint64 // Num of txns sent
aborts uint64 // Num of aborts
start time.Time // To get time elapsed
inflight int32 // Number of inflight requests.
conc int32 // Number of request makers.

conflicts map[uint64]struct{}
uidsLock sync.RWMutex
Expand Down Expand Up @@ -165,13 +163,15 @@ func (l *loader) infinitelyRetry(req *request) {
}

func (l *loader) mutate(req *request) error {
atomic.AddInt32(&l.inflight, 1)
txn := l.dc.NewTxn()
req.CommitNow = true
request := &api.Request{
CommitNow: true,
Mutations: []*api.Mutation{req.Mutation},
}
_, err := txn.Do(l.opts.Ctx, request)
atomic.AddInt32(&l.inflight, -1)
return err
}

Expand Down Expand Up @@ -383,39 +383,69 @@ func (l *loader) deregister(req *request) {
// makeRequests can receive requests from batchNquads or directly from BatchSetWithMark.
// It doesn't need to batch the requests anymore. Batching is already done for it by the
// caller functions.
func (l *loader) makeRequests() {
func (l *loader) makeRequests(id int) {
defer l.requestsWg.Done()
atomic.AddInt32(&l.conc, 1)
defer atomic.AddInt32(&l.conc, -1)

buffer := make([]*request, 0, l.opts.bufferSize)
drain := func(maxSize int) {
for len(buffer) > maxSize {
i := 0
for _, req := range buffer {
// If there is no conflict in req, we will use it
// and then it would shift all the other reqs in buffer
if !l.addConflictKeys(req) {
buffer[i] = req
i++
continue
}
// Req will no longer be part of a buffer
l.request(req)
var loops int
drain := func() {
i := 0
for _, req := range buffer {
loops++
// If there is no conflict in req, we will use it
// and then it would shift all the other reqs in buffer
if !l.addConflictKeys(req) {
buffer[i] = req
i++
continue
}
buffer = buffer[:i]
// Req will no longer be part of a buffer
l.request(req)
}
buffer = buffer[:i]
}

for req := range l.reqs {
req.conflicts = l.conflictKeysForReq(req)
if l.addConflictKeys(req) {
l.request(req)
} else {
buffer = append(buffer, req)
t := time.NewTicker(5 * time.Second)
defer t.Stop()

outer:
for {
select {
case req, ok := <-l.reqs:
if !ok {
break outer
}
req.conflicts = l.conflictKeysForReq(req)
if l.addConflictKeys(req) {
l.request(req)
} else {
buffer = append(buffer, req)
}

case <-t.C:
for {
drain()
if len(buffer) < l.opts.bufferSize {
break
}
time.Sleep(100 * time.Millisecond)
}
}
drain(l.opts.bufferSize - 1)
}

drain(0)
for len(buffer) > 0 {
select {
case <-t.C:
fmt.Printf("[%2d] Draining. len(buffer): %d\n", id, len(buffer))
default:
}

drain()
time.Sleep(100 * time.Millisecond)
}
fmt.Printf("[%2d] Looped %d times over buffered requests.\n", id, loops)
}

func (l *loader) printCounters() {
Expand All @@ -429,9 +459,11 @@ func (l *loader) printCounters() {
r.Capture(c.Nquads)
elapsed := time.Since(start).Round(time.Second)
timestamp := time.Now().Format("15:04:05Z0700")
fmt.Printf("[%s] Elapsed: %s Txns: %d N-Quads: %s N-Quads/s: %s Aborts: %d\n",
fmt.Printf("[%s] Elapsed: %s Txns: %d N-Quads: %s N-Quads/s: %s"+
" Inflight: %2d/%2d Aborts: %d\n",
timestamp, x.FixedDuration(elapsed), c.TxnsDone,
humanize.Comma(int64(c.Nquads)), humanize.Comma(int64(r.Rate())), c.Aborts)
humanize.Comma(int64(c.Nquads)), humanize.Comma(int64(r.Rate())),
atomic.LoadInt32(&l.inflight), atomic.LoadInt32(&l.conc), c.Aborts)
}
}

Expand Down
2 changes: 1 addition & 1 deletion dgraph/cmd/live/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -648,7 +648,7 @@ func setup(opts batchMutationOptions, dc *dgo.Dgraph, conf *viper.Viper) *loader

l.requestsWg.Add(opts.Pending)
for i := 0; i < opts.Pending; i++ {
go l.makeRequests()
go l.makeRequests(i)
}

rand.Seed(time.Now().Unix())
Expand Down
19 changes: 16 additions & 3 deletions dgraph/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,13 @@ func main() {
// benchmark notes are located in badger-bench/randread.
runtime.GOMAXPROCS(128)

absDiff := func(a, b uint64) uint64 {
absU := func(a, b uint64) uint64 {
if a > b {
return a - b
}
return b - a
}
abs := func(a, b int) int {
if a > b {
return a - b
}
Expand All @@ -53,11 +59,12 @@ func main() {

var js z.MemStats
var lastAlloc uint64
var numGo int

for range ticker.C {
// Read Jemalloc stats first. Print if there's a big difference.
z.ReadMemStats(&js)
if diff := absDiff(uint64(z.NumAllocBytes()), lastAlloc); diff > 1<<30 {
if diff := absU(uint64(z.NumAllocBytes()), lastAlloc); diff > 1<<30 {
glog.V(2).Infof("NumAllocBytes: %s jemalloc: Active %s Allocated: %s"+
" Resident: %s Retained: %s\n",
humanize.IBytes(uint64(z.NumAllocBytes())),
Expand All @@ -69,7 +76,13 @@ func main() {
}

runtime.ReadMemStats(&ms)
diff := absDiff(ms.HeapAlloc, lastMs.HeapAlloc)
diff := absU(ms.HeapAlloc, lastMs.HeapAlloc)

curGo := runtime.NumGoroutine()
if diff := abs(curGo, numGo); diff >= 64 {
glog.V(2).Infof("Num goroutines: %d\n", curGo)
numGo = curGo
}

switch {
case ms.NumGC > lastNumGC:
Expand Down
11 changes: 9 additions & 2 deletions edgraph/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -538,9 +538,16 @@ func (s *Server) Alter(ctx context.Context, op *api.Operation) (*api.Payload, er
// TODO: Maybe add some checks about the schema.
m.Schema = result.Preds
m.Types = result.Types
_, err = query.ApplyMutations(ctx, m)
for i := 0; i < 3; i++ {
_, err = query.ApplyMutations(ctx, m)
if err != nil && strings.Contains(err.Error(), "Please retry operation") {
time.Sleep(time.Second)
continue
}
break
}
if err != nil {
return empty, err
return empty, errors.Wrapf(err, "During ApplyMutations")
}

// wait for indexing to complete or context to be canceled.
Expand Down
2 changes: 1 addition & 1 deletion ee/acl/acl_curl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func TestCurlAuthorization(t *testing.T) {
// test query through curl
token, err := testutil.HttpLogin(&testutil.LoginParams{
Endpoint: adminEndpoint,
UserID: userid,
UserID: commonUserId,
Passwd: userpassword,
Namespace: x.GalaxyNamespace,
})
Expand Down
Loading