Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bulk loader allocates reserved predicates in first reduce shard. #4202

Merged
merged 11 commits into from
Oct 24, 2019
21 changes: 15 additions & 6 deletions dgraph/cmd/bulk/merge_shards.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,12 @@ const (
)

func mergeMapShardsIntoReduceShards(opt options) {
mapShards := shardDirs(filepath.Join(opt.TmpDir, mapShardDir))
shardDirs := readShardDirs(filepath.Join(opt.TmpDir, mapShardDir))
// First shard is handled differently because it contains reserved predicates.
firstShard := shardDirs[0]
// Sort the rest of the shards by size to allow the largest shards to be shuffled first.
shardDirs = shardDirs[1:]
sortBySize(shardDirs)

var reduceShards []string
for i := 0; i < opt.ReduceShards; i++ {
Expand All @@ -41,9 +46,15 @@ func mergeMapShardsIntoReduceShards(opt options) {
reduceShards = append(reduceShards, shardDir)
}

// Put the first map shard in the first reduce shard since it contains all the reserved
// predicates.
reduceShard := filepath.Join(reduceShards[0], filepath.Base(firstShard))
fmt.Printf("Shard %s -> Reduce %s\n", firstShard, reduceShard)
martinmr marked this conversation as resolved.
Show resolved Hide resolved
x.Check(os.Rename(firstShard, reduceShard))

// Heuristic: put the largest map shard into the smallest reduce shard
// until there are no more map shards left. Should be a good approximation.
for _, shard := range mapShards {
for _, shard := range shardDirs {
sortBySize(reduceShards)
reduceShard := filepath.Join(
reduceShards[len(reduceShards)-1], filepath.Base(shard))
Expand All @@ -52,7 +63,7 @@ func mergeMapShardsIntoReduceShards(opt options) {
}
}

func shardDirs(d string) []string {
func readShardDirs(d string) []string {
dir, err := os.Open(d)
x.Check(err)
shards, err := dir.Readdirnames(0)
Expand All @@ -61,9 +72,7 @@ func shardDirs(d string) []string {
for i, shard := range shards {
shards[i] = filepath.Join(d, shard)
}

// Allow largest shards to be shuffled first.
sortBySize(shards)
sort.Strings(shards)
return shards
}

Expand Down
2 changes: 1 addition & 1 deletion dgraph/cmd/bulk/reduce.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ type reducer struct {
}

func (r *reducer) run() error {
dirs := shardDirs(filepath.Join(r.opt.TmpDir, reduceShardDir))
dirs := readShardDirs(filepath.Join(r.opt.TmpDir, reduceShardDir))
x.AssertTrue(len(dirs) == r.opt.ReduceShards)
x.AssertTrue(len(r.opt.shardOutputDirs) == r.opt.ReduceShards)

Expand Down
8 changes: 8 additions & 0 deletions dgraph/cmd/bulk/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
var Bulk x.SubCommand

var defaultOutDir = "./out"
var groupFile = "group_id"

func init() {
Bulk.Cmd = &cobra.Command{
Expand Down Expand Up @@ -192,6 +193,13 @@ func run() {
dir := filepath.Join(opt.OutDir, strconv.Itoa(i), "p")
x.Check(os.MkdirAll(dir, 0700))
opt.shardOutputDirs = append(opt.shardOutputDirs, dir)

groupFile := filepath.Join(dir, groupFile)
f, err := os.OpenFile(groupFile, os.O_CREATE|os.O_WRONLY, 0600)
x.Check(err)
x.Check2(f.WriteString(strconv.Itoa(i + 1)))
x.Check2(f.WriteString("\n"))
x.Check(f.Close())
}

// Create a directory just for bulk loader's usage.
Expand Down
11 changes: 10 additions & 1 deletion dgraph/cmd/bulk/shard_map.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,11 @@

package bulk

import "sync"
import (
"sync"

"github.com/dgraph-io/dgraph/x"
)

type shardMap struct {
sync.RWMutex
Expand All @@ -33,6 +37,11 @@ func newShardMap(numShards int) *shardMap {
}

func (m *shardMap) shardFor(pred string) int {
// Always assign NQuads with reserved predicates to the first map shard.
if x.IsReservedPredicate(pred) {
return 0
}

m.RLock()
shard, ok := m.predToShard[pred]
m.RUnlock()
Expand Down
16 changes: 15 additions & 1 deletion edgraph/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,11 @@ package edgraph
import (
"encoding/json"
"fmt"
"io/ioutil"
"math"
"math/rand"
"os"
"path/filepath"
"sort"
"strconv"
"strings"
Expand Down Expand Up @@ -60,6 +62,7 @@ import (
const (
methodMutate = "Server.Mutate"
methodQuery = "Server.Query"
groupFile = "group_id"
)

// ServerState holds the state of the Dgraph server.
Expand Down Expand Up @@ -96,8 +99,19 @@ func InitServerState() {
State.needTs = make(chan tsReq, 100)

State.initStorage()

go State.fillTimestampRequests()

contents, err := ioutil.ReadFile(filepath.Join(Config.PostingDir, groupFile))
if err != nil {
return
}

glog.Infof("Found group_id file inside posting directory %s. Will attempt to read.",
Config.PostingDir)
groupId, err := strconv.ParseUint(strings.TrimSpace(string(contents)), 0, 32)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ineffectual assignment to err (from ineffassign)

glog.Warningf("Could not read %s file inside posting directory %s.",
groupFile, Config.PostingDir)
x.WorkerConfig.ProposedGroupId = uint32(groupId)
}

func (s *ServerState) runVlogGC(store *badger.DB) {
Expand Down
93 changes: 64 additions & 29 deletions systest/21million/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -1,43 +1,78 @@
# Docker compose file for use with test-21million.sh.
# Auto-generated with: [./compose -a 3 -z 1 -r 1 -d data -w]
#
version: "3.5"
services:
zero1:
alpha1:
image: dgraph/dgraph:latest
container_name: zero1
working_dir: /data/zero1
ports:
- 5180:5180
- 6180:6180
container_name: alpha1
working_dir: /data/alpha1
labels:
cluster: test
ports:
- 8180:8180
- 9180:9180
volumes:
- type: bind
source: $GOPATH/bin
target: /gobin
read_only: true
command: /gobin/dgraph zero -o 100 --my=zero1:5180 --logtostderr --bindall

alpha1:
- type: bind
source: $GOPATH/bin
target: /gobin
read_only: true
- data:/data
command: /gobin/dgraph alpha -o 100 --my=alpha1:7180 --lru_mb=1024 --zero=zero1:5180
--logtostderr -v=2 --idx=1 --whitelist=10.0.0.0/8,172.16.0.0/12,192.168.0.0/16
alpha2:
image: dgraph/dgraph:latest
container_name: alpha1
working_dir: /data/alpha1
container_name: alpha2
working_dir: /data/alpha2
depends_on:
- alpha1
labels:
cluster: test
ports:
- 8182:8182
- 9182:9182
volumes:
- type: bind
source: $GOPATH/bin
target: /gobin
read_only: true
- type: volume
source: data
target: /data/alpha1
- type: bind
source: $GOPATH/bin
target: /gobin
read_only: true
- data:/data
command: /gobin/dgraph alpha -o 102 --my=alpha2:7182 --lru_mb=1024 --zero=zero1:5180
--logtostderr -v=2 --idx=2 --whitelist=10.0.0.0/8,172.16.0.0/12,192.168.0.0/16
alpha3:
image: dgraph/dgraph:latest
container_name: alpha3
working_dir: /data/alpha3
depends_on:
- alpha2
labels:
cluster: test
ports:
- 8180:8180
- 9180:9180
- 8183:8183
- 9183:9183
volumes:
- type: bind
source: $GOPATH/bin
target: /gobin
read_only: true
- data:/data
command: /gobin/dgraph alpha -o 103 --my=alpha3:7183 --lru_mb=1024 --zero=zero1:5180
--logtostderr -v=2 --idx=3 --whitelist=10.0.0.0/8,172.16.0.0/12,192.168.0.0/16
zero1:
image: dgraph/dgraph:latest
container_name: zero1
working_dir: /data/zero1
labels:
cluster: test
command: >
/gobin/dgraph alpha --my=alpha1:7180 --zero=zero1:5180 -o 100 --logtostderr
--whitelist 10.0.0.0/8,172.16.0.0/12,192.168.0.0/16 --lru_mb=1024

ports:
- 5180:5180
- 6180:6180
volumes:
- type: bind
source: $GOPATH/bin
target: /gobin
read_only: true
- data:/data
command: /gobin/dgraph zero -o 100 --idx=1 --my=zero1:5180 --replicas=1 --logtostderr
-v=2 --bindall
volumes:
data:
13 changes: 13 additions & 0 deletions systest/21million/queries/query-052
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
{
q(func: has(dgraph.type)) {
count(uid)
}
}
---
{
"q": [
{
"count": 3508686
}
]
}
15 changes: 10 additions & 5 deletions systest/21million/test-21million.sh
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ set -e
readonly ME=${0##*/}
readonly SRCDIR=$(dirname $0)

QUERY_DIR=$SRCDIR/queries
BENCHMARKS_REPO="$GOPATH/src/github.com/dgraph-io/benchmarks"
SCHEMA_FILE="$BENCHMARKS_REPO/data/21million.schema"
DATA_FILE="$BENCHMARKS_REPO/data/21million.rdf.gz"
Expand Down Expand Up @@ -99,16 +98,22 @@ DockerCompose logs -f zero1 | grep -q -m1 "I've become the leader"

if [[ $LOADER == bulk ]]; then
Info "bulk loading data set"
DockerCompose run -v $BENCHMARKS_REPO:$BENCHMARKS_REPO --name bulk_load --rm alpha1 \
DockerCompose run -v $BENCHMARKS_REPO:$BENCHMARKS_REPO --name bulk_load zero1 \
bash -s <<EOF
mkdir -p /data/alpha1
mkdir -p /data/alpha2
mkdir -p /data/alpha3
/gobin/dgraph bulk --schema=$SCHEMA_FILE --files=$DATA_FILE \
--format=rdf --zero=zero1:5180 --out=/data/alpha1/bulk
mv /data/alpha1/bulk/0/p /data/alpha1
--format=rdf --zero=zero1:5180 --out=/data/zero1/bulk \
--reduce_shards 3 --map_shards 9
mv /data/zero1/bulk/0/p /data/alpha1
mv /data/zero1/bulk/1/p /data/alpha2
mv /data/zero1/bulk/2/p /data/alpha3
EOF
fi

Info "bringing up alpha container"
DockerCompose up -d --force-recreate alpha1
DockerCompose up -d --force-recreate alpha1 alpha2 alpha3

Info "waiting for alpha to be ready"
DockerCompose logs -f alpha1 | grep -q -m1 "Server is ready"
Expand Down
9 changes: 8 additions & 1 deletion worker/groups.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,13 +94,20 @@ func StartRaftNodes(walStore *badger.DB, bindall bool) {
id, err := raftwal.RaftId(walStore)
x.Check(err)
x.WorkerConfig.RaftId = id

// If the w directory already contains raft information, ignore the proposed
// group ID stored inside the p directory.
if id > 0 {
martinmr marked this conversation as resolved.
Show resolved Hide resolved
x.WorkerConfig.ProposedGroupId = 0
}
}
glog.Infof("Current Raft Id: %#x\n", x.WorkerConfig.RaftId)

// Successfully connect with dgraphzero, before doing anything else.

// Connect with Zero leader and figure out what group we should belong to.
m := &pb.Member{Id: x.WorkerConfig.RaftId, Addr: x.WorkerConfig.MyAddr}
m := &pb.Member{Id: x.WorkerConfig.RaftId, GroupId: x.WorkerConfig.ProposedGroupId,
martinmr marked this conversation as resolved.
Show resolved Hide resolved
Addr: x.WorkerConfig.MyAddr}
var connState *pb.ConnectionState
var err error
for { // Keep on retrying. See: https://github.com/dgraph-io/dgraph/issues/2289
Expand Down
3 changes: 3 additions & 0 deletions x/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,9 @@ type WorkerOptions struct {
// SnapshotAfter indicates the number of entries in the RAFT logs that are needed
// to allow a snapshot to be created.
SnapshotAfter int
// ProposedGroupId will be used if there's a file in the p directory called group_id with the
// proposed group ID for this server.
ProposedGroupId uint32
}

// WorkerConfig stores the global instance of the worker package's options.
Expand Down