diff --git a/systest/ludicrous/1predicate-c.schema b/systest/ludicrous/1predicate-c.schema new file mode 100644 index 00000000000..327e5062506 --- /dev/null +++ b/systest/ludicrous/1predicate-c.schema @@ -0,0 +1 @@ +pred: [int] @count . diff --git a/systest/ludicrous/1predicate.schema b/systest/ludicrous/1predicate.schema new file mode 100644 index 00000000000..113528a3df7 --- /dev/null +++ b/systest/ludicrous/1predicate.schema @@ -0,0 +1 @@ +pred: [int] . diff --git a/systest/ludicrous/docker-compose.yml b/systest/ludicrous/docker-compose.yml new file mode 100644 index 00000000000..c0b8663ba96 --- /dev/null +++ b/systest/ludicrous/docker-compose.yml @@ -0,0 +1,78 @@ +# Auto-generated with: [./compose -a 3 -z 1 -r 1 -d data -w] +# +version: "3.5" +services: + alpha1: + image: dgraph/dgraph:latest + container_name: alpha1 + working_dir: /data/alpha1 + labels: + cluster: test + ports: + - 8180:8180 + - 9180:9180 + volumes: + - type: bind + source: $GOPATH/bin + target: /gobin + read_only: true + - data:/data + command: /gobin/dgraph alpha -o 100 --my=alpha1:7180 --lru_mb=1024 --zero=zero1:5180 + --logtostderr -v=2 --idx=1 --whitelist=10.0.0.0/8,172.16.0.0/12,192.168.0.0/16 --ludicrous_mode + alpha2: + image: dgraph/dgraph:latest + container_name: alpha2 + working_dir: /data/alpha2 + depends_on: + - alpha1 + labels: + cluster: test + ports: + - 8182:8182 + - 9182:9182 + volumes: + - type: bind + source: $GOPATH/bin + target: /gobin + read_only: true + - data:/data + command: /gobin/dgraph alpha -o 102 --my=alpha2:7182 --lru_mb=1024 --zero=zero1:5180 + --logtostderr -v=2 --idx=2 --whitelist=10.0.0.0/8,172.16.0.0/12,192.168.0.0/16 --ludicrous_mode + alpha3: + image: dgraph/dgraph:latest + container_name: alpha3 + working_dir: /data/alpha3 + depends_on: + - alpha2 + labels: + cluster: test + ports: + - 8183:8183 + - 9183:9183 + volumes: + - type: bind + source: $GOPATH/bin + target: /gobin + read_only: true + - data:/data + command: /gobin/dgraph alpha -o 103 --my=alpha3:7183 --lru_mb=1024 --zero=zero1:5180 + --logtostderr -v=2 --idx=3 --whitelist=10.0.0.0/8,172.16.0.0/12,192.168.0.0/16 --ludicrous_mode + zero1: + image: dgraph/dgraph:latest + container_name: zero1 + working_dir: /data/zero1 + labels: + cluster: test + ports: + - 5180:5180 + - 6180:6180 + volumes: + - type: bind + source: $GOPATH/bin + target: /gobin + read_only: true + - data:/data + command: /gobin/dgraph zero -o 100 --idx=1 --my=zero1:5180 --replicas=1 --logtostderr + -v=2 --bindall --ludicrous_mode +volumes: + data: diff --git a/systest/ludicrous/queries/query-001 b/systest/ludicrous/queries/query-001 new file mode 100644 index 00000000000..e33d6c3a996 --- /dev/null +++ b/systest/ludicrous/queries/query-001 @@ -0,0 +1,13 @@ +{ + result(func: gt(count(pred), 5)) { + count(pred) + } +} +--- +{ + "result": [ + { + "count(pred)": 30000 + } + ] +} diff --git a/systest/ludicrous/run_test.go b/systest/ludicrous/run_test.go new file mode 100644 index 00000000000..fbbb8f964d4 --- /dev/null +++ b/systest/ludicrous/run_test.go @@ -0,0 +1,102 @@ +// +build standalone + +/* + * Copyright 2019 Dgraph Labs, Inc. and Contributors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package main + +import ( + "context" + "flag" + "io/ioutil" + "path" + "runtime" + "strings" + "testing" + "time" + + "github.com/dgraph-io/dgraph/chunker" + "github.com/dgraph-io/dgraph/testutil" + "github.com/stretchr/testify/require" +) + +// JSON output can be hundreds of lines and diffs can scroll off the terminal before you +// can look at them. This option allows saving the JSON to a specified directory instead +// for easier reviewing after the test completes. +var savedir = flag.String("savedir", "", + "directory to save json from test failures in") +var quiet = flag.Bool("quiet", false, + "just output whether json differs, not a diff") + +func TestQueries(t *testing.T) { + _, thisFile, _, _ := runtime.Caller(0) + queryDir := path.Join(path.Dir(thisFile), "queries") + + // For this test we DON'T want to start with an empty database. + dg, err := testutil.DgraphClient(testutil.SockAddr) + if err != nil { + t.Fatalf("Error while getting a dgraph client: %v", err) + } + + files, err := ioutil.ReadDir(queryDir) + if err != nil { + t.Fatalf("Error reading directory: %s", err.Error()) + } + + savepath := "" + diffs := 0 + for _, file := range files { + if !strings.HasPrefix(file.Name(), "query-") { + continue + } + t.Run(file.Name(), func(t *testing.T) { + filename := path.Join(queryDir, file.Name()) + reader, cleanup := chunker.FileReader(filename, nil) + bytes, err := ioutil.ReadAll(reader) + if err != nil { + t.Fatalf("Error reading file: %s", err.Error()) + } + contents := string(bytes[:]) + cleanup() + + // The test query and expected result are separated by a delimiter. + bodies := strings.SplitN(contents, "\n---\n", 2) + + // If a query takes too long to run, it probably means dgraph is stuck and there's + // no point in waiting longer or trying more tests. + ctx, cancel := context.WithTimeout(context.Background(), time.Minute) + resp, err := dg.NewTxn().Query(ctx, bodies[0]) + cancel() + if ctx.Err() == context.DeadlineExceeded { + t.Fatal("aborting test due to query timeout") + } + require.NoError(t, err) + + t.Logf("running %s", file.Name()) + if *savedir != "" { + savepath = path.Join(*savedir, file.Name()) + } + + if !testutil.EqualJSON(t, bodies[1], string(resp.GetJson()), savepath, *quiet) { + diffs++ + } + }) + } + + if *savedir != "" && diffs > 0 { + t.Logf("test json saved in directory: %s", *savedir) + } +} diff --git a/systest/ludicrous/test.sh b/systest/ludicrous/test.sh new file mode 100755 index 00000000000..b0edb84cfb0 --- /dev/null +++ b/systest/ludicrous/test.sh @@ -0,0 +1,142 @@ +#!/bin/bash + +set -e +readonly ME=${0##*/} +readonly SRCDIR=$(dirname $0) + +SCHEMA_C_FILE="1predicate.schema" +SCHEMA_FILE="1predicate-c.schema" +DATA_FILE="1predicate.rdf.gz" + +function Info { + echo -e "INFO: $*" +} + +function DockerCompose { + docker-compose -p dgraph "$@" +} +function DgraphLive { + dgraph live --ludicrous_mode "$@" +} + +HELP= CLEANUP= SAVEDIR= LOAD_ONLY= QUIET= + +ARGS=$(/usr/bin/getopt -n$ME -o"h" -l"help,cleanup:,savedir:,load-only,quiet" -- "$@") || exit 1 +eval set -- "$ARGS" +while true; do + case "$1" in + -h|--help) HELP=yes; ;; + --cleanup) CLEANUP=${2,,}; shift ;; + --savedir) SAVEDIR=${2,,}; shift ;; + --load-only) LOAD_ONLY=yes ;; + --quiet) QUIET=yes ;; + --) shift; break ;; + esac + shift +done + +if [[ $HELP ]]; then + cat <] [--savedir=path] [--mode=] + +options: + + --cleanup all = take down containers and data volume (default) + servers = take down dgraph zero and alpha but leave data volume up + none = leave up containers and data volume + --savedir=path specify a directory to save test failure json in + for easier post-test review + --load-only load data but do not run tests + --quiet just report which queries differ, without a diff +EOF + exit 0 +fi + + +# default to cleaning up both services and volume +if [[ -z $CLEANUP ]]; then + CLEANUP=all +elif [[ $CLEANUP != all && $CLEANUP != servers && $CLEANUP != none ]]; then + echo >&2 "$ME: cleanup must be 'all' or 'servers' or 'none'" + exit 1 +fi + +# default to quiet mode if diffs are being saved in a directory +if [[ -n $SAVEDIR ]]; then + QUIET=yes +fi + +Info "entering directory $SRCDIR" +cd $SRCDIR + +Info "removing old data (if any)" +DockerCompose down -v --remove-orphans + +Info "bringing up zero container" +DockerCompose up -d --remove-orphans --force-recreate zero1 + +Info "waiting for zero to become leader" +DockerCompose logs -f zero1 | grep -q -m1 "I've become the leader" + +Info "bringing up alpha container" +DockerCompose up -d --force-recreate alpha1 alpha2 alpha3 + +Info "waiting for alpha to be ready" +DockerCompose logs -f alpha1 | grep -q -m1 "Server is ready" +# after the server prints the log "Server is ready", it may be still loading data from badger +Info "sleeping for 10 seconds for the server to be ready" +sleep 10 + +for i in {1..10}; do sleep 1; curl 'http://localhost:8180/alter' --data-binary $'@1predicate-c.schema'; echo "schema-c"$i; done & +for i in {1..10}; do sleep 1; curl 'http://localhost:8180/alter' --data-binary $'@1predicate.schema'; echo "schema"$i; done & + +Info "live loading data set" +DgraphLive --schema=$SCHEMA_FILE --files=$DATA_FILE --format=rdf --zero=:5180 --alpha=:9180 --logtostderr --batch=1 & + +wait; + +sleep 10 + + +if [[ $LOAD_ONLY ]]; then + Info "exiting after data load" + exit 0 +fi + +# replace variables if set with the corresponding option +SAVEDIR=${SAVEDIR:+-savedir=$SAVEDIR} +QUIET=${QUIET:+-quiet} + +Info "running benchmarks/regression queries" + +if [[ ! -z "$TEAMCITY_VERSION" ]]; then + # Make TeamCity aware of Go tests + export GOFLAGS="-json" +fi +go test -v -tags standalone $SAVEDIR $QUIET || FOUND_DIFFS=1 + +if [[ $FOUND_DIFFS -eq 0 ]]; then + Info "no diffs found in query results" +else + Info "Cluster logs for alpha1" + docker logs alpha1 + Info "Cluster logs for alpha2" + docker logs alpha2 + Info "Cluster logs for alpha3" + docker logs alpha3 + Info "Cluster logs for zero1" + docker logs zero1 + Info "found some diffs in query results" +fi + +if [[ $CLEANUP == all ]]; then + Info "bringing down zero and alpha and data volumes" + DockerCompose down -v +elif [[ $CLEANUP == none ]]; then + Info "leaving up zero and alpha" +else + Info "bringing down zero and alpha only" + DockerCompose down +fi + +exit $FOUND_DIFFS diff --git a/worker/draft.go b/worker/draft.go index efc386c5e50..59d906be699 100644 --- a/worker/draft.go +++ b/worker/draft.go @@ -229,7 +229,7 @@ func newNode(store *raftwal.DiskStorage, gid uint32, id uint64, myAddr string) * ops: make(map[op]*y.Closer), } if x.WorkerConfig.LudicrousMode { - n.ex = newExecutor() + n.ex = newExecutor(&m.Applied) } return n } @@ -351,6 +351,17 @@ func (n *node) applyMutations(ctx context.Context, proposal *pb.Proposal) (rerr return err } } + + // If Dgraph is running in ludicrous mode and we get some schema we should wait for all + // active mutations to finish. Previously we were thinking of only waiting for active + // mutations related to predicates present in schema mutation. But this might cause issues + // as we call DropPrefix() on Badger while running schema mutations. DropPrefix() blocks + // writes on Badger and returns error if writes are tried. To avoid this we should wait for + // all active mutations to finish irrespective of predicates present in schema mutation. + if x.WorkerConfig.LudicrousMode && len(proposal.Mutations.Schema) > 0 { + n.ex.waitForActiveMutations() + } + if err := runSchemaMutation(ctx, proposal.Mutations.Schema, startTs); err != nil { return err } @@ -434,7 +445,7 @@ func (n *node) applyMutations(ctx context.Context, proposal *pb.Proposal) (rerr }) if x.WorkerConfig.LudicrousMode { - n.ex.addEdges(ctx, m.StartTs, m.Edges) + n.ex.addEdges(ctx, proposal) return nil } @@ -503,14 +514,7 @@ func (n *node) applyCommitted(proposal *pb.Proposal) error { span.Annotatef(nil, "While applying mutations: %v", err) return err } - if x.WorkerConfig.LudicrousMode { - ts := proposal.Mutations.StartTs - return n.commitOrAbort(proposal.Key, &pb.OracleDelta{ - Txns: []*pb.TxnStatus{ - {StartTs: ts, CommitTs: ts}, - }, - }) - } + span.Annotate(nil, "Done") return nil } @@ -855,14 +859,14 @@ func (n *node) proposeSnapshot(discardN int) error { const ( maxPendingSize int64 = 64 << 20 // in bytes. - nodeApplyChan = "raft node applyCh" + nodeApplyChan = "pushing to raft node applyCh" ) func rampMeter(address *int64, maxSize int64, component string) { start := time.Now() defer func() { if dur := time.Since(start); dur > time.Second { - glog.Infof("Blocked pushing to %s for %v", component, dur.Round(time.Millisecond)) + glog.Infof("Blocked %s for %v", component, dur.Round(time.Millisecond)) } }() for { @@ -1487,13 +1491,18 @@ func (n *node) calculateSnapshot(startIdx uint64, discardN int) (*pb.Snapshot, e span.Annotatef(nil, "Error: %v", err) return nil, err } + + var start uint64 if proposal.Mutations != nil { - start := proposal.Mutations.StartTs + start = proposal.Mutations.StartTs if start >= minPendingStart && snapshotIdx == 0 { snapshotIdx = entry.Index - 1 } } - if proposal.Delta != nil { + // In ludicrous mode commitTs for any transaction is same as startTs. + if x.WorkerConfig.LudicrousMode { + maxCommitTs = x.Max(maxCommitTs, start) + } else if proposal.Delta != nil { for _, txn := range proposal.Delta.GetTxns() { maxCommitTs = x.Max(maxCommitTs, txn.CommitTs) } diff --git a/worker/executor.go b/worker/executor.go index e8e2eb4f73a..8e087878ab8 100644 --- a/worker/executor.go +++ b/worker/executor.go @@ -33,20 +33,24 @@ type subMutation struct { edges []*pb.DirectedEdge ctx context.Context startTs uint64 + index uint64 } type executor struct { pendingSize int64 + smCount int64 // Stores count for active sub mutations. sync.RWMutex predChan map[string]chan *subMutation closer *y.Closer + applied *y.WaterMark } -func newExecutor() *executor { +func newExecutor(applied *y.WaterMark) *executor { ex := &executor{ predChan: make(map[string]chan *subMutation), closer: y.NewCloser(0), + applied: applied, } go ex.shutdown() return ex @@ -80,7 +84,9 @@ func (e *executor) processMutationCh(ch chan *subMutation) { glog.Errorf("Error while waiting for writes: %v", err) } + e.applied.Done(payload.index) atomic.AddInt64(&e.pendingSize, -esize) + atomic.AddInt64(&e.smCount, -1) } } @@ -93,8 +99,8 @@ func (e *executor) shutdown() { } } -// getChannelUnderLock obtains the channel for the given pred. It must be called under e.Lock(). -func (e *executor) getChannelUnderLock(pred string) (ch chan *subMutation) { +// getChannel obtains the channel for the given pred. It must be called under e.Lock(). +func (e *executor) getChannel(pred string) (ch chan *subMutation) { ch, ok := e.predChan[pred] if ok { return ch @@ -111,9 +117,13 @@ const ( executorAddEdges = "executor.addEdges" ) -func (e *executor) addEdges(ctx context.Context, startTs uint64, edges []*pb.DirectedEdge) { +func (e *executor) addEdges(ctx context.Context, proposal *pb.Proposal) { rampMeter(&e.pendingSize, maxPendingEdgesSize, executorAddEdges) + index := proposal.Index + startTs := proposal.Mutations.StartTs + edges := proposal.Mutations.Edges + payloadMap := make(map[string]*subMutation) var esize int64 for _, edge := range edges { @@ -122,6 +132,7 @@ func (e *executor) addEdges(ctx context.Context, startTs uint64, edges []*pb.Dir payloadMap[edge.Attr] = &subMutation{ ctx: ctx, startTs: startTs, + index: index, } payload = payloadMap[edge.Attr] } @@ -138,9 +149,18 @@ func (e *executor) addEdges(ctx context.Context, startTs uint64, edges []*pb.Dir default: // Closer is not closed. And we have the Lock, so sending on channel should be safe. for attr, payload := range payloadMap { - e.getChannelUnderLock(attr) <- payload + e.applied.Begin(index) + atomic.AddInt64(&e.smCount, 1) + e.getChannel(attr) <- payload } } atomic.AddInt64(&e.pendingSize, esize) } + +// waitForActiveMutations waits for all the mutations (currently active) to finish. This function +// should be called before running any schema mutation. +func (e *executor) waitForActiveMutations() { + glog.Infoln("executor: wait for active mutation to finish") + rampMeter(&e.smCount, 0, "waiting on active mutations to finish") +}