Skip to content

Commit

Permalink
fix(bulkLoader): Use flags for cache (#6322) (#6467)
Browse files Browse the repository at this point in the history
Bulk loader uses caches in compression and this PR adds flags to make it
configurable.

Bulk loader was setting compressionLevel but not the compression option.
As a result of this, badger wasn't compressing any data. This PR fixes
this setting compression if compressionLevel is greater than 0.

(cherry picked from commit 99341dc)
  • Loading branch information
Ibrahim Jarif authored Sep 28, 2020
1 parent 74daf3a commit f3a2091
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 44 deletions.
2 changes: 2 additions & 0 deletions dgraph/cmd/bulk/loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,8 @@ type options struct {
EncryptionKey x.SensitiveByteSlice
// BadgerCompressionlevel is the compression level to use while writing to badger.
BadgerCompressionLevel int
BlockCacheSize int64
IndexCacheSize int64
}

type state struct {
Expand Down
35 changes: 15 additions & 20 deletions dgraph/cmd/bulk/reduce.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,13 +129,22 @@ func (r *reducer) createBadgerInternal(dir string, compression bool) *badger.DB
}
}

opt := badger.DefaultOptions(dir).WithSyncWrites(false).
WithTableLoadingMode(bo.MemoryMap).WithValueThreshold(1 << 10 /* 1 KB */).
WithLogger(nil).WithBlockCacheSize(1 << 20).
WithEncryptionKey(r.opt.EncryptionKey).WithCompression(bo.None)

opt := badger.DefaultOptions(dir).
WithSyncWrites(false).
WithTableLoadingMode(bo.MemoryMap).
WithValueThreshold(1 << 10 /* 1 KB */).
WithLogger(nil).
WithEncryptionKey(r.opt.EncryptionKey).
WithBlockCacheSize(r.opt.BlockCacheSize).
WithIndexCacheSize(r.opt.IndexCacheSize)

opt.Compression = bo.None
opt.ZSTDCompressionLevel = 0
// Overwrite badger options based on the options provided by the user.
r.setBadgerOptions(&opt, compression)
if compression {
opt.Compression = bo.ZSTD
opt.ZSTDCompressionLevel = r.state.opt.BadgerCompressionLevel
}

db, err := badger.OpenManaged(opt)
x.Check(err)
Expand All @@ -160,20 +169,6 @@ func (r *reducer) createTmpBadger() *badger.DB {
return db
}

func (r *reducer) setBadgerOptions(opt *badger.Options, compression bool) {
if !compression {
opt.Compression = bo.None
opt.ZSTDCompressionLevel = 0
return
}
// Set the compression level.
opt.ZSTDCompressionLevel = r.state.opt.BadgerCompressionLevel
if r.state.opt.BadgerCompressionLevel < 1 {
x.Fatalf("Invalid compression level: %d. It should be greater than zero",
r.state.opt.BadgerCompressionLevel)
}
}

type mapIterator struct {
fd *os.File
reader *bufio.Reader
Expand Down
65 changes: 41 additions & 24 deletions dgraph/cmd/bulk/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,44 +104,61 @@ func init() {
// Options around how to set up Badger.
flag.Int("badger.compression_level", 1,
"The compression level for Badger. A higher value uses more resources.")
flag.Int64("badger.cache_mb", 0, "Total size of cache (in MB) per shard in reducer.")
flag.String("badger.cache_percentage", "0,100",
"Cache percentages summing up to 100 for various caches"+
" (FORMAT: BlockCacheSize, IndexCacheSize).")

// Encryption and Vault options
enc.RegisterFlags(flag)
}

func run() {
var err error
opt := options{
DataFiles: Bulk.Conf.GetString("files"),
DataFormat: Bulk.Conf.GetString("format"),
SchemaFile: Bulk.Conf.GetString("schema"),
GqlSchemaFile: Bulk.Conf.GetString("graphql_schema"),
Encrypted: Bulk.Conf.GetBool("encrypted"),
OutDir: Bulk.Conf.GetString("out"),
ReplaceOutDir: Bulk.Conf.GetBool("replace_out"),
TmpDir: Bulk.Conf.GetString("tmp"),
NumGoroutines: Bulk.Conf.GetInt("num_go_routines"),
MapBufSize: uint64(Bulk.Conf.GetInt("mapoutput_mb")),
SkipMapPhase: Bulk.Conf.GetBool("skip_map_phase"),
CleanupTmp: Bulk.Conf.GetBool("cleanup_tmp"),
NumReducers: Bulk.Conf.GetInt("reducers"),
Version: Bulk.Conf.GetBool("version"),
StoreXids: Bulk.Conf.GetBool("store_xids"),
ZeroAddr: Bulk.Conf.GetString("zero"),
HttpAddr: Bulk.Conf.GetString("http"),
IgnoreErrors: Bulk.Conf.GetBool("ignore_errors"),
MapShards: Bulk.Conf.GetInt("map_shards"),
ReduceShards: Bulk.Conf.GetInt("reduce_shards"),
CustomTokenizers: Bulk.Conf.GetString("custom_tokenizers"),
NewUids: Bulk.Conf.GetBool("new_uids"),
ClientDir: Bulk.Conf.GetString("xidmap"),
DataFiles: Bulk.Conf.GetString("files"),
DataFormat: Bulk.Conf.GetString("format"),
SchemaFile: Bulk.Conf.GetString("schema"),
GqlSchemaFile: Bulk.Conf.GetString("graphql_schema"),
Encrypted: Bulk.Conf.GetBool("encrypted"),
OutDir: Bulk.Conf.GetString("out"),
ReplaceOutDir: Bulk.Conf.GetBool("replace_out"),
TmpDir: Bulk.Conf.GetString("tmp"),
NumGoroutines: Bulk.Conf.GetInt("num_go_routines"),
MapBufSize: uint64(Bulk.Conf.GetInt("mapoutput_mb")),
SkipMapPhase: Bulk.Conf.GetBool("skip_map_phase"),
CleanupTmp: Bulk.Conf.GetBool("cleanup_tmp"),
NumReducers: Bulk.Conf.GetInt("reducers"),
Version: Bulk.Conf.GetBool("version"),
StoreXids: Bulk.Conf.GetBool("store_xids"),
ZeroAddr: Bulk.Conf.GetString("zero"),
HttpAddr: Bulk.Conf.GetString("http"),
IgnoreErrors: Bulk.Conf.GetBool("ignore_errors"),
MapShards: Bulk.Conf.GetInt("map_shards"),
ReduceShards: Bulk.Conf.GetInt("reduce_shards"),
CustomTokenizers: Bulk.Conf.GetString("custom_tokenizers"),
NewUids: Bulk.Conf.GetBool("new_uids"),
ClientDir: Bulk.Conf.GetString("xidmap"),
// Badger options
BadgerCompressionLevel: Bulk.Conf.GetInt("badger.compression_level"),
}

x.PrintVersion()
if opt.Version {
os.Exit(0)
}
if opt.BadgerCompressionLevel < 0 {
fmt.Printf("Invalid compression level: %d. It should be non-negative",
opt.BadgerCompressionLevel)
}

totalCache := int64(Bulk.Conf.GetInt("badger.cache_mb"))
x.AssertTruef(totalCache >= 0, "ERROR: Cache size must be non-negative")
cachePercent, err := x.GetCachePercentages(Bulk.Conf.GetString("badger.cache_percentage"), 2)
x.Check(err)
totalCache <<= 20 // Convert to MB.
opt.BlockCacheSize = (cachePercent[0] * totalCache) / 100
opt.IndexCacheSize = (cachePercent[1] * totalCache) / 100

if opt.EncryptionKey, err = enc.ReadKey(Bulk.Conf); err != nil {
fmt.Printf("unable to read key %v", err)
return
Expand Down

0 comments on commit f3a2091

Please sign in to comment.