Skip to content

Commit

Permalink
Merge branch 'master' into martinmr/bulk-loader-splits
Browse files Browse the repository at this point in the history
  • Loading branch information
martinmr committed Mar 11, 2020
2 parents 7ba337b + 5fcb59b commit 51c2d8f
Show file tree
Hide file tree
Showing 65 changed files with 2,449 additions and 401 deletions.
3 changes: 2 additions & 1 deletion chunker/chunk.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,8 @@ func NewChunker(inputFormat InputFormat, batchSize int) Chunker {
nqs: NewNQuadBuffer(batchSize),
}
default:
panic("unknown input format")
x.Panic(errors.New("unknown input format"))
return nil
}
}

Expand Down
5 changes: 3 additions & 2 deletions chunker/json_parser_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ func (exp *Experiment) verify() {
require.NoError(exp.t, dg.Alter(ctx, &api.Operation{DropAll: true}), "drop all failed")
require.NoError(exp.t, dg.Alter(ctx, &api.Operation{Schema: exp.schema}),
"schema change failed")
require.NoError(exp.t, testutil.WaitForAlter(ctx, dg, exp.schema))

_, err = dg.NewTxn().Mutate(ctx,
&api.Mutation{Set: exp.nqs, CommitNow: true})
Expand Down Expand Up @@ -134,14 +135,14 @@ func TestNquadsFromJson1(t *testing.T) {
name
age
married
address
address
}}`,
expected: `{"alice": [
{"name": "Alice",
"age": 26,
"married": true,
"address": {"coordinates": [2,1.1], "type": "Point"}}
]}
]}
`}
exp.verify()
}
Expand Down
11 changes: 6 additions & 5 deletions codec/benchmark/benchmark.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ package main
import (
"compress/gzip"
"encoding/binary"
"errors"
"fmt"
"io"
"os"
Expand Down Expand Up @@ -59,28 +60,28 @@ const (
func read(filename string) []int {
f, err := os.Open(filename)
if err != nil {
panic(err)
x.Panic(err)
}
defer f.Close()

fgzip, err := gzip.NewReader(f)
if err != nil {
panic(err)
x.Panic(err)
}
defer fgzip.Close()

buf := make([]byte, 4)
_, err = fgzip.Read(buf)
if err != nil && err != io.EOF {
panic(err)
x.Panic(err)
}
ndata := binary.LittleEndian.Uint32(buf)

data := make([]int, ndata)
for i := range data {
_, err = fgzip.Read(buf)
if err != nil && err != io.EOF {
panic(err)
x.Panic(err)
}

data[i] = int(binary.LittleEndian.Uint32(buf))
Expand Down Expand Up @@ -176,7 +177,7 @@ func fmtBenchmark(name string, speed int) {
func main() {
data := read("clustered1M.bin.gz")
if !sort.IsSorted(sort.IntSlice(data)) {
panic("test data must be sorted")
x.Panic(errors.New("test data must be sorted"))
}

chunks64 := chunkify64(data)
Expand Down
3 changes: 2 additions & 1 deletion contrib/integration/swap/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/dgraph-io/dgo/v2/protos/api"
"github.com/dgraph-io/dgraph/testutil"
"github.com/dgraph-io/dgraph/x"
"github.com/pkg/errors"
)

var (
Expand Down Expand Up @@ -367,7 +368,7 @@ func checkInvariants(c *dgo.Dgraph, uids []string, sentences []string) error {
sort.Strings(gotUids)
sort.Strings(uids)
if !reflect.DeepEqual(gotUids, uids) {
panic(fmt.Sprintf(`query: %s\n
x.Panic(errors.Errorf(`query: %s\n
Uids in index for %q didn't match
calculated: %v. Len: %d
got: %v
Expand Down
14 changes: 9 additions & 5 deletions contrib/integration/testtxn/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -371,6 +371,9 @@ func TestIgnoreIndexConflict(t *testing.T) {
if err := s.dg.Alter(context.Background(), op); err != nil {
log.Fatal(err)
}
if err := testutil.WaitForAlter(context.Background(), s.dg, op.Schema); err != nil {
log.Fatal(err)
}

txn := s.dg.NewTxn()
mu := &api.Mutation{}
Expand Down Expand Up @@ -424,9 +427,11 @@ func TestReadIndexKeySameTxn(t *testing.T) {
if err := s.dg.Alter(context.Background(), op); err != nil {
log.Fatal(err)
}
if err := testutil.WaitForAlter(context.Background(), s.dg, op.Schema); err != nil {
log.Fatal(err)
}

txn := s.dg.NewTxn()

mu := &api.Mutation{
CommitNow: true,
SetJson: []byte(`{"name": "Manish"}`),
Expand Down Expand Up @@ -933,8 +938,7 @@ func TestTxnDiscardBeforeCommit(t *testing.T) {
}

func alterSchema(dg *dgo.Dgraph, schema string) {
op := api.Operation{}
op.Schema = schema
err := dg.Alter(ctxb, &op)
x.Check(err)
op := api.Operation{Schema: schema}
x.Check(dg.Alter(ctxb, &op))
x.Check(testutil.WaitForAlter(ctxb, dg, schema))
}
2 changes: 1 addition & 1 deletion dgraph/cmd/alpha/http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func runGzipWithRetry(contentType, url string, buf io.Reader, gzReq, gzResp bool
*http.Response, error) {

client := &http.Client{}
numRetries := 2
numRetries := 3

var resp *http.Response
var err error
Expand Down
53 changes: 53 additions & 0 deletions dgraph/cmd/alpha/reindex_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@
package alpha

import (
"strings"
"testing"
"time"

"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -185,3 +187,54 @@ func TestReindexReverseCount(t *testing.T) {
}
}`, res)
}

func checkSchema(t *testing.T, query, key string) {
for i := 0; i < 10; i++ {
res, _, err := queryWithTs(query, "application/graphql+-", "", 0)
require.NoError(t, err)
if strings.Contains(res, key) {
return
}
time.Sleep(100 * time.Millisecond)

if i == 9 {
t.Fatalf("expected %v, got schema: %v", key, res)
}
}
}

func TestBgIndexSchemaReverse(t *testing.T) {
require.NoError(t, dropAll())
q1 := `schema(pred: [value]) {}`
require.NoError(t, alterSchema(`value: [uid] .`))
checkSchema(t, q1, "list")
require.NoError(t, alterSchema(`value: [uid] @count @reverse .`))
checkSchema(t, q1, "reverse")
}

func TestBgIndexSchemaTokenizers(t *testing.T) {
require.NoError(t, dropAll())
q1 := `schema(pred: [value]) {}`
require.NoError(t, alterSchema(`value: string @index(fulltext, hash) .`))
checkSchema(t, q1, "fulltext")
require.NoError(t, alterSchema(`value: string @index(term, hash) @upsert .`))
checkSchema(t, q1, "term")
}

func TestBgIndexSchemaCount(t *testing.T) {
require.NoError(t, dropAll())
q1 := `schema(pred: [value]) {}`
require.NoError(t, alterSchema(`value: [uid] @count .`))
checkSchema(t, q1, "count")
require.NoError(t, alterSchema(`value: [uid] @reverse .`))
checkSchema(t, q1, "reverse")
}

func TestBgIndexSchemaReverseAndCount(t *testing.T) {
require.NoError(t, dropAll())
q1 := `schema(pred: [value]) {}`
require.NoError(t, alterSchema(`value: [uid] @reverse .`))
checkSchema(t, q1, "reverse")
require.NoError(t, alterSchema(`value: [uid] @count .`))
checkSchema(t, q1, "count")
}
10 changes: 9 additions & 1 deletion dgraph/cmd/alpha/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -592,8 +592,16 @@ func run() {
x.Config.QueryEdgeLimit = cast.ToUint64(Alpha.Conf.GetString("query_edge_limit"))
x.Config.NormalizeNodeLimit = cast.ToInt(Alpha.Conf.GetString("normalize_node_limit"))

x.PrintVersion()
x.InitSentry(enc.EeBuild)
defer x.FlushSentry()
x.ConfigureSentryScope("alpha")
x.WrapPanics()

// Simulate a Sentry exception or panic event as shown below.
// x.CaptureSentryException(errors.New("alpha exception"))
// x.Panic(errors.New("alpha manual panic will send 2 events"))

x.PrintVersion()
glog.Infof("x.Config: %+v", x.Config)
glog.Infof("x.WorkerConfig: %+v", x.WorkerConfig)
glog.Infof("worker.Config: %+v", worker.Config)
Expand Down
62 changes: 58 additions & 4 deletions dgraph/cmd/alpha/run_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,10 +107,22 @@ func runJSONMutation(m string) error {
}

func alterSchema(s string) error {
_, _, err := runWithRetries("PUT", "", addr+"/alter", s)
if err != nil {
return errors.Wrapf(err, "while running request with retries")
for {
_, _, err := runWithRetries("PUT", "", addr+"/alter", s)
if err != nil && strings.Contains(err.Error(), "is already being modified") {
time.Sleep(time.Second)
continue
} else if err != nil {
return errors.Wrapf(err, "while running request with retries")
} else {
break
}
}

if err := waitForAlter(s); err != nil {
return errors.Wrapf(err, "while waiting for alter to complete")
}

return nil
}

Expand All @@ -124,6 +136,48 @@ func alterSchemaWithRetry(s string) error {
return err
}

// waitForAlter waits for the alter operation to complete.
func waitForAlter(s string) error {
ps, err := schema.Parse(s)
if err != nil {
return err
}

for {
resp, _, err := queryWithTs("schema{}", "application/graphql+-", "false", 0)
if err != nil {
return err
}

var result struct {
Data struct {
Schema []*pb.SchemaNode
}
}
if err := json.Unmarshal([]byte(resp), &result); err != nil {
return err
}

actual := make(map[string]*pb.SchemaNode)
for _, rs := range result.Data.Schema {
actual[rs.Predicate] = rs
}

done := true
for _, su := range ps.Preds {
if n, ok := actual[su.Predicate]; !ok || !testutil.SameIndexes(su, n) {
done = false
break
}
}
if done {
return nil
}

time.Sleep(time.Second)
}
}

func dropAll() error {
op := `{"drop_all": true}`
_, _, err := runWithRetries("PUT", "", addr+"/alter", op)
Expand Down Expand Up @@ -405,7 +459,7 @@ func TestSchemaMutationUidError1(t *testing.T) {
var s2 = `
friend: uid .
`
require.Error(t, alterSchemaWithRetry(s2))
require.Error(t, alterSchema(s2))
}

// add index
Expand Down
2 changes: 0 additions & 2 deletions dgraph/cmd/alpha/upsert_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1613,8 +1613,6 @@ func TestUpsertWithValueVar(t *testing.T) {
require.NoError(t, alterSchema(`amount: int .`))
res, err := mutationWithTs(`{ set { _:p <amount> "0" . } }`, "application/rdf", false, true, 0)
require.NoError(t, err)
b, _ := json.MarshalIndent(res, "", " ")
fmt.Printf("%s\n", b)

const (
// this upsert block increments the value of the counter by one
Expand Down
2 changes: 1 addition & 1 deletion dgraph/cmd/counter/increment.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ func queryCounter(ctx context.Context, txn *dgo.Txn, pred string) (Counter, erro
case 1:
counter = m["q"][0]
default:
panic(fmt.Sprintf("Invalid response: %q", resp.Json))
x.Panic(errors.Errorf("Invalid response: %q", resp.Json))
}
span.Annotatef(nil, "Found counter: %+v", counter)
counter.startTs = resp.GetTxn().GetStartTs()
Expand Down
2 changes: 1 addition & 1 deletion dgraph/cmd/live/load-json/load_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@ import (
"strings"
"testing"

"github.com/dgraph-io/dgo/v2"
"github.com/stretchr/testify/require"

"github.com/dgraph-io/dgo/v2"
"github.com/dgraph-io/dgraph/testutil"
"github.com/dgraph-io/dgraph/x"
)
Expand Down
7 changes: 6 additions & 1 deletion dgraph/cmd/live/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ import (
"github.com/dgraph-io/dgo/v2/protos/api"

"github.com/dgraph-io/dgraph/chunker"
"github.com/dgraph-io/dgraph/testutil"
"github.com/dgraph-io/dgraph/types"
"github.com/dgraph-io/dgraph/x"
"github.com/dgraph-io/dgraph/xidmap"
Expand Down Expand Up @@ -194,7 +195,11 @@ func processSchemaFile(ctx context.Context, file string, dgraphClient *dgo.Dgrap

op := &api.Operation{}
op.Schema = string(b)
return dgraphClient.Alter(ctx, op)
if err := dgraphClient.Alter(ctx, op); err != nil {
return err
}
// TODO(Aman): avoid using functions from testutil.
return testutil.WaitForAlter(ctx, dgraphClient, op.Schema)
}

func (l *loader) uid(val string) string {
Expand Down
3 changes: 2 additions & 1 deletion dgraph/cmd/migrate/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"reflect"
"strings"

"github.com/dgraph-io/dgraph/x"
"github.com/go-sql-driver/mysql"
"github.com/pkg/errors"
)
Expand Down Expand Up @@ -119,7 +120,7 @@ func getColumnValues(columns []string, dataTypes []dataType,
case datetimeType:
valuePtrs = append(valuePtrs, new(mysql.NullTime))
default:
panic(fmt.Sprintf("detected unsupported type %s on column %s",
x.Panic(errors.Errorf("detected unsupported type %s on column %s",
dataTypes[i], columns[i]))
}
}
Expand Down
Loading

0 comments on commit 51c2d8f

Please sign in to comment.