Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bulk loader allocates reserved predicates in first reduce shard. #4202

Merged
merged 11 commits into from
Oct 24, 2019
14 changes: 11 additions & 3 deletions dgraph/cmd/bulk/merge_shards.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,11 @@ const (

func mergeMapShardsIntoReduceShards(opt options) {
mapShards := shardDirs(filepath.Join(opt.TmpDir, mapShardDir))
martinmr marked this conversation as resolved.
Show resolved Hide resolved
// First shard is handled differently because it contains reserved predicates.
firstShard := mapShards[0]
// Sort the rest of the shards by size to allow the largest shards to be shuffled first.
mapShards = mapShards[1:]
sortBySize(mapShards)

var reduceShards []string
for i := 0; i < opt.ReduceShards; i++ {
Expand All @@ -41,6 +46,12 @@ func mergeMapShardsIntoReduceShards(opt options) {
reduceShards = append(reduceShards, shardDir)
}

// Put the first map shard in the first reduce shard since it contains all the reserved
// predicates.
reduceShard := filepath.Join(reduceShards[0], filepath.Base(firstShard))
fmt.Printf("Shard %s -> Reduce %s\n", firstShard, reduceShard)
martinmr marked this conversation as resolved.
Show resolved Hide resolved
x.Check(os.Rename(firstShard, reduceShard))

// Heuristic: put the largest map shard into the smallest reduce shard
// until there are no more map shards left. Should be a good approximation.
for _, shard := range mapShards {
Expand All @@ -61,9 +72,6 @@ func shardDirs(d string) []string {
for i, shard := range shards {
shards[i] = filepath.Join(d, shard)
}

// Allow largest shards to be shuffled first.
sortBySize(shards)
return shards
}

Expand Down
7 changes: 7 additions & 0 deletions dgraph/cmd/bulk/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,13 @@ func run() {
dir := filepath.Join(opt.OutDir, strconv.Itoa(i), "p")
x.Check(os.MkdirAll(dir, 0700))
opt.shardOutputDirs = append(opt.shardOutputDirs, dir)

groupFile := filepath.Join(dir, "group_id")
f, err := os.OpenFile(groupFile, os.O_CREATE|os.O_WRONLY, 0600)
x.Check(err)
x.Check2(f.WriteString(strconv.Itoa(i)))
x.Check2(f.WriteString("\n"))
x.Check(f.Close())
}

// Create a directory just for bulk loader's usage.
Expand Down
11 changes: 10 additions & 1 deletion dgraph/cmd/bulk/shard_map.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,11 @@

package bulk

import "sync"
import (
"sync"

"github.com/dgraph-io/dgraph/x"
)

type shardMap struct {
sync.RWMutex
Expand All @@ -33,6 +37,11 @@ func newShardMap(numShards int) *shardMap {
}

func (m *shardMap) shardFor(pred string) int {
// Always assign NQuads with reserved predicates to the first map shard.
if x.IsReservedPredicate(pred) {
return 0
}

m.RLock()
shard, ok := m.predToShard[pred]
m.RUnlock()
Expand Down
11 changes: 11 additions & 0 deletions edgraph/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,11 @@ package edgraph
import (
"encoding/json"
"fmt"
"io/ioutil"
"math"
"math/rand"
"os"
"path/filepath"
"sort"
"strconv"
"strings"
Expand Down Expand Up @@ -60,6 +62,7 @@ import (
const (
methodMutate = "Server.Mutate"
methodQuery = "Server.Query"
groupIdPath = "group_id"
)

// ServerState holds the state of the Dgraph server.
Expand Down Expand Up @@ -97,6 +100,14 @@ func InitServerState() {

State.initStorage()

groupIdFile := filepath.Join(Config.PostingDir, groupIdPath)
if contents, err := ioutil.ReadFile(groupIdFile); err == nil {
groupId, err := strconv.ParseUint(strings.TrimSpace(string(contents)), 10, 32)
x.Checkf(err, "Error reading %s file inside posting directory %s",
martinmr marked this conversation as resolved.
Show resolved Hide resolved
groupIdPath, Config.PostingDir)
x.WorkerConfig.ProposedGroupId = uint32(groupId)
}

go State.fillTimestampRequests()
}

Expand Down
9 changes: 8 additions & 1 deletion worker/groups.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,13 +94,20 @@ func StartRaftNodes(walStore *badger.DB, bindall bool) {
id, err := raftwal.RaftId(walStore)
x.Check(err)
x.WorkerConfig.RaftId = id

// If the w directory already contains raft information, ignore the proposed
// group ID stored inside the p directory.
if id > 0 {
martinmr marked this conversation as resolved.
Show resolved Hide resolved
x.WorkerConfig.ProposedGroupId = 0
}
}
glog.Infof("Current Raft Id: %#x\n", x.WorkerConfig.RaftId)

// Successfully connect with dgraphzero, before doing anything else.

// Connect with Zero leader and figure out what group we should belong to.
m := &pb.Member{Id: x.WorkerConfig.RaftId, Addr: x.WorkerConfig.MyAddr}
m := &pb.Member{Id: x.WorkerConfig.RaftId, GroupId: x.WorkerConfig.ProposedGroupId,
martinmr marked this conversation as resolved.
Show resolved Hide resolved
Addr: x.WorkerConfig.MyAddr}
var connState *pb.ConnectionState
var err error
for { // Keep on retrying. See: https://github.com/dgraph-io/dgraph/issues/2289
Expand Down
3 changes: 3 additions & 0 deletions x/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,9 @@ type WorkerOptions struct {
// SnapshotAfter indicates the number of entries in the RAFT logs that are needed
// to allow a snapshot to be created.
SnapshotAfter int
// ProposedGroupId will be used if there's a file in the p directory called group_id with the
// proposed group ID for this server.
ProposedGroupId uint32
}

// WorkerConfig stores the global instance of the worker package's options.
Expand Down