From b169849c44c031f7f42bf0ddfa24937bf225ed62 Mon Sep 17 00:00:00 2001 From: Ashish Goswami Date: Fri, 10 Jul 2020 20:07:33 +0530 Subject: [PATCH] fix(dgraph): Handle schema updates correctly in ludicrous mode (#5814) Fixes: DGRAPH-1656 Currently schema updates are not handled correctly in ludicrous mode. This PR fixed it. Before starting any schema mutation, we wait for all the active mutations to finish and then apply schema mutation. (cherry picked from commit 994a07fe33e2b85edae4c4ca94bef088919e3143) --- systest/21million/test-21million.sh | 3 + systest/ludicrous/1predicate-c.schema | 1 + systest/ludicrous/1predicate.schema | 1 + systest/ludicrous/docker-compose.yml | 78 ++++++++++++++ systest/ludicrous/queries/query-001 | 13 +++ systest/ludicrous/run_test.go | 102 ++++++++++++++++++ systest/ludicrous/test.sh | 142 ++++++++++++++++++++++++++ worker/draft.go | 37 ++++--- worker/executor.go | 30 +++++- 9 files changed, 388 insertions(+), 19 deletions(-) create mode 100644 systest/ludicrous/1predicate-c.schema create mode 100644 systest/ludicrous/1predicate.schema create mode 100644 systest/ludicrous/docker-compose.yml create mode 100644 systest/ludicrous/queries/query-001 create mode 100644 systest/ludicrous/run_test.go create mode 100755 systest/ludicrous/test.sh diff --git a/systest/21million/test-21million.sh b/systest/21million/test-21million.sh index 145e880dd4e..21f1cb8bf79 100755 --- a/systest/21million/test-21million.sh +++ b/systest/21million/test-21million.sh @@ -52,6 +52,9 @@ options: for easier post-test review --load-only load data but do not run tests --quiet just report which queries differ, without a diff + --mode normal = run dgraph in normal mode + none = run dgraph in normal mode + ludicrous = run dgraph in ludicrous mode EOF exit 0 fi diff --git a/systest/ludicrous/1predicate-c.schema b/systest/ludicrous/1predicate-c.schema new file mode 100644 index 00000000000..327e5062506 --- /dev/null +++ b/systest/ludicrous/1predicate-c.schema @@ -0,0 +1 @@ +pred: [int] @count . diff --git a/systest/ludicrous/1predicate.schema b/systest/ludicrous/1predicate.schema new file mode 100644 index 00000000000..113528a3df7 --- /dev/null +++ b/systest/ludicrous/1predicate.schema @@ -0,0 +1 @@ +pred: [int] . diff --git a/systest/ludicrous/docker-compose.yml b/systest/ludicrous/docker-compose.yml new file mode 100644 index 00000000000..c0b8663ba96 --- /dev/null +++ b/systest/ludicrous/docker-compose.yml @@ -0,0 +1,78 @@ +# Auto-generated with: [./compose -a 3 -z 1 -r 1 -d data -w] +# +version: "3.5" +services: + alpha1: + image: dgraph/dgraph:latest + container_name: alpha1 + working_dir: /data/alpha1 + labels: + cluster: test + ports: + - 8180:8180 + - 9180:9180 + volumes: + - type: bind + source: $GOPATH/bin + target: /gobin + read_only: true + - data:/data + command: /gobin/dgraph alpha -o 100 --my=alpha1:7180 --lru_mb=1024 --zero=zero1:5180 + --logtostderr -v=2 --idx=1 --whitelist=10.0.0.0/8,172.16.0.0/12,192.168.0.0/16 --ludicrous_mode + alpha2: + image: dgraph/dgraph:latest + container_name: alpha2 + working_dir: /data/alpha2 + depends_on: + - alpha1 + labels: + cluster: test + ports: + - 8182:8182 + - 9182:9182 + volumes: + - type: bind + source: $GOPATH/bin + target: /gobin + read_only: true + - data:/data + command: /gobin/dgraph alpha -o 102 --my=alpha2:7182 --lru_mb=1024 --zero=zero1:5180 + --logtostderr -v=2 --idx=2 --whitelist=10.0.0.0/8,172.16.0.0/12,192.168.0.0/16 --ludicrous_mode + alpha3: + image: dgraph/dgraph:latest + container_name: alpha3 + working_dir: /data/alpha3 + depends_on: + - alpha2 + labels: + cluster: test + ports: + - 8183:8183 + - 9183:9183 + volumes: + - type: bind + source: $GOPATH/bin + target: /gobin + read_only: true + - data:/data + command: /gobin/dgraph alpha -o 103 --my=alpha3:7183 --lru_mb=1024 --zero=zero1:5180 + --logtostderr -v=2 --idx=3 --whitelist=10.0.0.0/8,172.16.0.0/12,192.168.0.0/16 --ludicrous_mode + zero1: + image: dgraph/dgraph:latest + container_name: zero1 + working_dir: /data/zero1 + labels: + cluster: test + ports: + - 5180:5180 + - 6180:6180 + volumes: + - type: bind + source: $GOPATH/bin + target: /gobin + read_only: true + - data:/data + command: /gobin/dgraph zero -o 100 --idx=1 --my=zero1:5180 --replicas=1 --logtostderr + -v=2 --bindall --ludicrous_mode +volumes: + data: diff --git a/systest/ludicrous/queries/query-001 b/systest/ludicrous/queries/query-001 new file mode 100644 index 00000000000..e33d6c3a996 --- /dev/null +++ b/systest/ludicrous/queries/query-001 @@ -0,0 +1,13 @@ +{ + result(func: gt(count(pred), 5)) { + count(pred) + } +} +--- +{ + "result": [ + { + "count(pred)": 30000 + } + ] +} diff --git a/systest/ludicrous/run_test.go b/systest/ludicrous/run_test.go new file mode 100644 index 00000000000..fbbb8f964d4 --- /dev/null +++ b/systest/ludicrous/run_test.go @@ -0,0 +1,102 @@ +// +build standalone + +/* + * Copyright 2019 Dgraph Labs, Inc. and Contributors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package main + +import ( + "context" + "flag" + "io/ioutil" + "path" + "runtime" + "strings" + "testing" + "time" + + "github.com/dgraph-io/dgraph/chunker" + "github.com/dgraph-io/dgraph/testutil" + "github.com/stretchr/testify/require" +) + +// JSON output can be hundreds of lines and diffs can scroll off the terminal before you +// can look at them. This option allows saving the JSON to a specified directory instead +// for easier reviewing after the test completes. +var savedir = flag.String("savedir", "", + "directory to save json from test failures in") +var quiet = flag.Bool("quiet", false, + "just output whether json differs, not a diff") + +func TestQueries(t *testing.T) { + _, thisFile, _, _ := runtime.Caller(0) + queryDir := path.Join(path.Dir(thisFile), "queries") + + // For this test we DON'T want to start with an empty database. + dg, err := testutil.DgraphClient(testutil.SockAddr) + if err != nil { + t.Fatalf("Error while getting a dgraph client: %v", err) + } + + files, err := ioutil.ReadDir(queryDir) + if err != nil { + t.Fatalf("Error reading directory: %s", err.Error()) + } + + savepath := "" + diffs := 0 + for _, file := range files { + if !strings.HasPrefix(file.Name(), "query-") { + continue + } + t.Run(file.Name(), func(t *testing.T) { + filename := path.Join(queryDir, file.Name()) + reader, cleanup := chunker.FileReader(filename, nil) + bytes, err := ioutil.ReadAll(reader) + if err != nil { + t.Fatalf("Error reading file: %s", err.Error()) + } + contents := string(bytes[:]) + cleanup() + + // The test query and expected result are separated by a delimiter. + bodies := strings.SplitN(contents, "\n---\n", 2) + + // If a query takes too long to run, it probably means dgraph is stuck and there's + // no point in waiting longer or trying more tests. + ctx, cancel := context.WithTimeout(context.Background(), time.Minute) + resp, err := dg.NewTxn().Query(ctx, bodies[0]) + cancel() + if ctx.Err() == context.DeadlineExceeded { + t.Fatal("aborting test due to query timeout") + } + require.NoError(t, err) + + t.Logf("running %s", file.Name()) + if *savedir != "" { + savepath = path.Join(*savedir, file.Name()) + } + + if !testutil.EqualJSON(t, bodies[1], string(resp.GetJson()), savepath, *quiet) { + diffs++ + } + }) + } + + if *savedir != "" && diffs > 0 { + t.Logf("test json saved in directory: %s", *savedir) + } +} diff --git a/systest/ludicrous/test.sh b/systest/ludicrous/test.sh new file mode 100755 index 00000000000..b0edb84cfb0 --- /dev/null +++ b/systest/ludicrous/test.sh @@ -0,0 +1,142 @@ +#!/bin/bash + +set -e +readonly ME=${0##*/} +readonly SRCDIR=$(dirname $0) + +SCHEMA_C_FILE="1predicate.schema" +SCHEMA_FILE="1predicate-c.schema" +DATA_FILE="1predicate.rdf.gz" + +function Info { + echo -e "INFO: $*" +} + +function DockerCompose { + docker-compose -p dgraph "$@" +} +function DgraphLive { + dgraph live --ludicrous_mode "$@" +} + +HELP= CLEANUP= SAVEDIR= LOAD_ONLY= QUIET= + +ARGS=$(/usr/bin/getopt -n$ME -o"h" -l"help,cleanup:,savedir:,load-only,quiet" -- "$@") || exit 1 +eval set -- "$ARGS" +while true; do + case "$1" in + -h|--help) HELP=yes; ;; + --cleanup) CLEANUP=${2,,}; shift ;; + --savedir) SAVEDIR=${2,,}; shift ;; + --load-only) LOAD_ONLY=yes ;; + --quiet) QUIET=yes ;; + --) shift; break ;; + esac + shift +done + +if [[ $HELP ]]; then + cat <] [--savedir=path] [--mode=] + +options: + + --cleanup all = take down containers and data volume (default) + servers = take down dgraph zero and alpha but leave data volume up + none = leave up containers and data volume + --savedir=path specify a directory to save test failure json in + for easier post-test review + --load-only load data but do not run tests + --quiet just report which queries differ, without a diff +EOF + exit 0 +fi + + +# default to cleaning up both services and volume +if [[ -z $CLEANUP ]]; then + CLEANUP=all +elif [[ $CLEANUP != all && $CLEANUP != servers && $CLEANUP != none ]]; then + echo >&2 "$ME: cleanup must be 'all' or 'servers' or 'none'" + exit 1 +fi + +# default to quiet mode if diffs are being saved in a directory +if [[ -n $SAVEDIR ]]; then + QUIET=yes +fi + +Info "entering directory $SRCDIR" +cd $SRCDIR + +Info "removing old data (if any)" +DockerCompose down -v --remove-orphans + +Info "bringing up zero container" +DockerCompose up -d --remove-orphans --force-recreate zero1 + +Info "waiting for zero to become leader" +DockerCompose logs -f zero1 | grep -q -m1 "I've become the leader" + +Info "bringing up alpha container" +DockerCompose up -d --force-recreate alpha1 alpha2 alpha3 + +Info "waiting for alpha to be ready" +DockerCompose logs -f alpha1 | grep -q -m1 "Server is ready" +# after the server prints the log "Server is ready", it may be still loading data from badger +Info "sleeping for 10 seconds for the server to be ready" +sleep 10 + +for i in {1..10}; do sleep 1; curl 'http://localhost:8180/alter' --data-binary $'@1predicate-c.schema'; echo "schema-c"$i; done & +for i in {1..10}; do sleep 1; curl 'http://localhost:8180/alter' --data-binary $'@1predicate.schema'; echo "schema"$i; done & + +Info "live loading data set" +DgraphLive --schema=$SCHEMA_FILE --files=$DATA_FILE --format=rdf --zero=:5180 --alpha=:9180 --logtostderr --batch=1 & + +wait; + +sleep 10 + + +if [[ $LOAD_ONLY ]]; then + Info "exiting after data load" + exit 0 +fi + +# replace variables if set with the corresponding option +SAVEDIR=${SAVEDIR:+-savedir=$SAVEDIR} +QUIET=${QUIET:+-quiet} + +Info "running benchmarks/regression queries" + +if [[ ! -z "$TEAMCITY_VERSION" ]]; then + # Make TeamCity aware of Go tests + export GOFLAGS="-json" +fi +go test -v -tags standalone $SAVEDIR $QUIET || FOUND_DIFFS=1 + +if [[ $FOUND_DIFFS -eq 0 ]]; then + Info "no diffs found in query results" +else + Info "Cluster logs for alpha1" + docker logs alpha1 + Info "Cluster logs for alpha2" + docker logs alpha2 + Info "Cluster logs for alpha3" + docker logs alpha3 + Info "Cluster logs for zero1" + docker logs zero1 + Info "found some diffs in query results" +fi + +if [[ $CLEANUP == all ]]; then + Info "bringing down zero and alpha and data volumes" + DockerCompose down -v +elif [[ $CLEANUP == none ]]; then + Info "leaving up zero and alpha" +else + Info "bringing down zero and alpha only" + DockerCompose down +fi + +exit $FOUND_DIFFS diff --git a/worker/draft.go b/worker/draft.go index 906366c96ee..bd3895797de 100644 --- a/worker/draft.go +++ b/worker/draft.go @@ -229,7 +229,7 @@ func newNode(store *raftwal.DiskStorage, gid uint32, id uint64, myAddr string) * ops: make(map[op]*y.Closer), } if x.WorkerConfig.LudicrousMode { - n.ex = newExecutor() + n.ex = newExecutor(&m.Applied) } return n } @@ -351,6 +351,17 @@ func (n *node) applyMutations(ctx context.Context, proposal *pb.Proposal) (rerr return err } } + + // If Dgraph is running in ludicrous mode and we get some schema we should wait for all + // active mutations to finish. Previously we were thinking of only waiting for active + // mutations related to predicates present in schema mutation. But this might cause issues + // as we call DropPrefix() on Badger while running schema mutations. DropPrefix() blocks + // writes on Badger and returns error if writes are tried. To avoid this we should wait for + // all active mutations to finish irrespective of predicates present in schema mutation. + if x.WorkerConfig.LudicrousMode && len(proposal.Mutations.Schema) > 0 { + n.ex.waitForActiveMutations() + } + if err := runSchemaMutation(ctx, proposal.Mutations.Schema, startTs); err != nil { return err } @@ -434,7 +445,7 @@ func (n *node) applyMutations(ctx context.Context, proposal *pb.Proposal) (rerr }) if x.WorkerConfig.LudicrousMode { - n.ex.addEdges(ctx, m.StartTs, m.Edges) + n.ex.addEdges(ctx, proposal) return nil } @@ -503,14 +514,7 @@ func (n *node) applyCommitted(proposal *pb.Proposal) error { span.Annotatef(nil, "While applying mutations: %v", err) return err } - if x.WorkerConfig.LudicrousMode { - ts := proposal.Mutations.StartTs - return n.commitOrAbort(proposal.Key, &pb.OracleDelta{ - Txns: []*pb.TxnStatus{ - {StartTs: ts, CommitTs: ts}, - }, - }) - } + span.Annotate(nil, "Done") return nil } @@ -851,14 +855,14 @@ func (n *node) proposeSnapshot(discardN int) error { const ( maxPendingSize int64 = 64 << 20 // in bytes. - nodeApplyChan = "raft node applyCh" + nodeApplyChan = "pushing to raft node applyCh" ) func rampMeter(address *int64, maxSize int64, component string) { start := time.Now() defer func() { if dur := time.Since(start); dur > time.Second { - glog.Infof("Blocked pushing to %s for %v", component, dur.Round(time.Millisecond)) + glog.Infof("Blocked %s for %v", component, dur.Round(time.Millisecond)) } }() for { @@ -1483,13 +1487,18 @@ func (n *node) calculateSnapshot(startIdx uint64, discardN int) (*pb.Snapshot, e span.Annotatef(nil, "Error: %v", err) return nil, err } + + var start uint64 if proposal.Mutations != nil { - start := proposal.Mutations.StartTs + start = proposal.Mutations.StartTs if start >= minPendingStart && snapshotIdx == 0 { snapshotIdx = entry.Index - 1 } } - if proposal.Delta != nil { + // In ludicrous mode commitTs for any transaction is same as startTs. + if x.WorkerConfig.LudicrousMode { + maxCommitTs = x.Max(maxCommitTs, start) + } else if proposal.Delta != nil { for _, txn := range proposal.Delta.GetTxns() { maxCommitTs = x.Max(maxCommitTs, txn.CommitTs) } diff --git a/worker/executor.go b/worker/executor.go index e8e2eb4f73a..8e087878ab8 100644 --- a/worker/executor.go +++ b/worker/executor.go @@ -33,20 +33,24 @@ type subMutation struct { edges []*pb.DirectedEdge ctx context.Context startTs uint64 + index uint64 } type executor struct { pendingSize int64 + smCount int64 // Stores count for active sub mutations. sync.RWMutex predChan map[string]chan *subMutation closer *y.Closer + applied *y.WaterMark } -func newExecutor() *executor { +func newExecutor(applied *y.WaterMark) *executor { ex := &executor{ predChan: make(map[string]chan *subMutation), closer: y.NewCloser(0), + applied: applied, } go ex.shutdown() return ex @@ -80,7 +84,9 @@ func (e *executor) processMutationCh(ch chan *subMutation) { glog.Errorf("Error while waiting for writes: %v", err) } + e.applied.Done(payload.index) atomic.AddInt64(&e.pendingSize, -esize) + atomic.AddInt64(&e.smCount, -1) } } @@ -93,8 +99,8 @@ func (e *executor) shutdown() { } } -// getChannelUnderLock obtains the channel for the given pred. It must be called under e.Lock(). -func (e *executor) getChannelUnderLock(pred string) (ch chan *subMutation) { +// getChannel obtains the channel for the given pred. It must be called under e.Lock(). +func (e *executor) getChannel(pred string) (ch chan *subMutation) { ch, ok := e.predChan[pred] if ok { return ch @@ -111,9 +117,13 @@ const ( executorAddEdges = "executor.addEdges" ) -func (e *executor) addEdges(ctx context.Context, startTs uint64, edges []*pb.DirectedEdge) { +func (e *executor) addEdges(ctx context.Context, proposal *pb.Proposal) { rampMeter(&e.pendingSize, maxPendingEdgesSize, executorAddEdges) + index := proposal.Index + startTs := proposal.Mutations.StartTs + edges := proposal.Mutations.Edges + payloadMap := make(map[string]*subMutation) var esize int64 for _, edge := range edges { @@ -122,6 +132,7 @@ func (e *executor) addEdges(ctx context.Context, startTs uint64, edges []*pb.Dir payloadMap[edge.Attr] = &subMutation{ ctx: ctx, startTs: startTs, + index: index, } payload = payloadMap[edge.Attr] } @@ -138,9 +149,18 @@ func (e *executor) addEdges(ctx context.Context, startTs uint64, edges []*pb.Dir default: // Closer is not closed. And we have the Lock, so sending on channel should be safe. for attr, payload := range payloadMap { - e.getChannelUnderLock(attr) <- payload + e.applied.Begin(index) + atomic.AddInt64(&e.smCount, 1) + e.getChannel(attr) <- payload } } atomic.AddInt64(&e.pendingSize, esize) } + +// waitForActiveMutations waits for all the mutations (currently active) to finish. This function +// should be called before running any schema mutation. +func (e *executor) waitForActiveMutations() { + glog.Infoln("executor: wait for active mutation to finish") + rampMeter(&e.smCount, 0, "waiting on active mutations to finish") +}