Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bulk loader allocates reserved predicates in first reduce shard. #4202

Merged
merged 11 commits into from
Oct 24, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 16 additions & 6 deletions dgraph/cmd/bulk/merge_shards.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,13 @@ const (
)

func mergeMapShardsIntoReduceShards(opt options) {
mapShards := shardDirs(filepath.Join(opt.TmpDir, mapShardDir))
shardDirs := readShardDirs(filepath.Join(opt.TmpDir, mapShardDir))
// First shard is handled differently because it contains reserved predicates.
x.AssertTrue(len(shardDirs) > 0)
firstShard := shardDirs[0]
// Sort the rest of the shards by size to allow the largest shards to be shuffled first.
shardDirs = shardDirs[1:]
sortBySize(shardDirs)

var reduceShards []string
for i := 0; i < opt.ReduceShards; i++ {
Expand All @@ -41,9 +47,15 @@ func mergeMapShardsIntoReduceShards(opt options) {
reduceShards = append(reduceShards, shardDir)
}

// Put the first map shard in the first reduce shard since it contains all the reserved
// predicates.
reduceShard := filepath.Join(reduceShards[0], filepath.Base(firstShard))
fmt.Printf("Shard %s -> Reduce %s\n", firstShard, reduceShard)
martinmr marked this conversation as resolved.
Show resolved Hide resolved
x.Check(os.Rename(firstShard, reduceShard))

// Heuristic: put the largest map shard into the smallest reduce shard
// until there are no more map shards left. Should be a good approximation.
for _, shard := range mapShards {
for _, shard := range shardDirs {
sortBySize(reduceShards)
reduceShard := filepath.Join(
reduceShards[len(reduceShards)-1], filepath.Base(shard))
Expand All @@ -52,7 +64,7 @@ func mergeMapShardsIntoReduceShards(opt options) {
}
}

func shardDirs(d string) []string {
func readShardDirs(d string) []string {
dir, err := os.Open(d)
x.Check(err)
shards, err := dir.Readdirnames(0)
Expand All @@ -61,9 +73,7 @@ func shardDirs(d string) []string {
for i, shard := range shards {
shards[i] = filepath.Join(d, shard)
}

// Allow largest shards to be shuffled first.
sortBySize(shards)
sort.Strings(shards)
return shards
}

Expand Down
2 changes: 1 addition & 1 deletion dgraph/cmd/bulk/reduce.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ type reducer struct {
}

func (r *reducer) run() error {
dirs := shardDirs(filepath.Join(r.opt.TmpDir, reduceShardDir))
dirs := readShardDirs(filepath.Join(r.opt.TmpDir, reduceShardDir))
x.AssertTrue(len(dirs) == r.opt.ReduceShards)
x.AssertTrue(len(r.opt.shardOutputDirs) == r.opt.ReduceShards)

Expand Down
8 changes: 8 additions & 0 deletions dgraph/cmd/bulk/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
var Bulk x.SubCommand

var defaultOutDir = "./out"
var groupFile = "group_id"

func init() {
Bulk.Cmd = &cobra.Command{
Expand Down Expand Up @@ -192,6 +193,13 @@ func run() {
dir := filepath.Join(opt.OutDir, strconv.Itoa(i), "p")
x.Check(os.MkdirAll(dir, 0700))
opt.shardOutputDirs = append(opt.shardOutputDirs, dir)

groupFile := filepath.Join(dir, groupFile)
f, err := os.OpenFile(groupFile, os.O_CREATE|os.O_WRONLY, 0600)
x.Check(err)
x.Check2(f.WriteString(strconv.Itoa(i + 1)))
x.Check2(f.WriteString("\n"))
x.Check(f.Close())
}

// Create a directory just for bulk loader's usage.
Expand Down
11 changes: 10 additions & 1 deletion dgraph/cmd/bulk/shard_map.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,11 @@

package bulk

import "sync"
import (
"sync"

"github.com/dgraph-io/dgraph/x"
)

type shardMap struct {
sync.RWMutex
Expand All @@ -33,6 +37,11 @@ func newShardMap(numShards int) *shardMap {
}

func (m *shardMap) shardFor(pred string) int {
// Always assign NQuads with reserved predicates to the first map shard.
if x.IsReservedPredicate(pred) {
return 0
}

m.RLock()
shard, ok := m.predToShard[pred]
m.RUnlock()
Expand Down
6 changes: 6 additions & 0 deletions dgraph/cmd/zero/zero.go
Original file line number Diff line number Diff line change
Expand Up @@ -512,6 +512,12 @@ func (s *Server) Connect(ctx context.Context,
// We need more servers here, so let's add it.
proposal.Member = m
return proposal
} else if m.ForceGroupId {
// If the group ID was taken from the group_id file, force the member
// to be in this group even if the group is at capacity. This should
// not happen if users properly initialize a cluster after a bulk load.
proposal.Member = m
return proposal
}
// Already have plenty of servers serving this group.
}
Expand Down
18 changes: 17 additions & 1 deletion edgraph/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,11 @@ package edgraph
import (
"encoding/json"
"fmt"
"io/ioutil"
"math"
"math/rand"
"os"
"path/filepath"
"sort"
"strconv"
"strings"
Expand Down Expand Up @@ -60,6 +62,7 @@ import (
const (
methodMutate = "Server.Mutate"
methodQuery = "Server.Query"
groupFile = "group_id"
)

// ServerState holds the state of the Dgraph server.
Expand Down Expand Up @@ -96,8 +99,21 @@ func InitServerState() {
State.needTs = make(chan tsReq, 100)

State.initStorage()

go State.fillTimestampRequests()

contents, err := ioutil.ReadFile(filepath.Join(Config.PostingDir, groupFile))
if err != nil {
return
}

glog.Infof("Found group_id file inside posting directory %s. Will attempt to read.",
Config.PostingDir)
groupId, err := strconv.ParseUint(strings.TrimSpace(string(contents)), 0, 32)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ineffectual assignment to err (from ineffassign)

if err != nil {
glog.Warningf("Could not read %s file inside posting directory %s.",
groupFile, Config.PostingDir)
}
x.WorkerConfig.ProposedGroupId = uint32(groupId)
}

func (s *ServerState) runVlogGC(store *badger.DB) {
Expand Down
1 change: 1 addition & 0 deletions protos/pb.proto
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ message Member {
uint64 last_update = 6;

bool cluster_info_only = 13;
bool force_group_id = 14;
}

message Group {
Expand Down
Loading