Skip to content

Commit

Permalink
fix(bulkLoader): Use flags for cache (#6322) (#6466)
Browse files Browse the repository at this point in the history
Bulk loader uses caches in compression and this PR adds flags to make it
configurable.

Bulk loader was setting compressionLevel but not the compression option.
As a result of this, badger wasn't compressing any data. This PR fixes
this setting compression if compressionLevel is greater than 0.

(cherry picked from commit 99341dc)
  • Loading branch information
Ibrahim Jarif authored Sep 28, 2020
1 parent 4922a28 commit 54f58af
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 21 deletions.
2 changes: 2 additions & 0 deletions dgraph/cmd/bulk/loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,8 @@ type options struct {
BadgerKeyFile string
// BadgerCompressionlevel is the compression level to use while writing to badger.
BadgerCompressionLevel int
BlockCacheSize int64
IndexCacheSize int64
}

type state struct {
Expand Down
35 changes: 15 additions & 20 deletions dgraph/cmd/bulk/reduce.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,13 +130,22 @@ func (r *reducer) createBadgerInternal(dir string, compression bool) *badger.DB
}
}

opt := badger.DefaultOptions(dir).WithSyncWrites(false).
WithTableLoadingMode(bo.MemoryMap).WithValueThreshold(1 << 10 /* 1 KB */).
WithLogger(nil).WithBlockCacheSize(1 << 20).
WithEncryptionKey(enc.ReadEncryptionKeyFile(r.opt.BadgerKeyFile)).WithCompression(bo.None)

opt := badger.DefaultOptions(dir).
WithSyncWrites(false).
WithTableLoadingMode(bo.MemoryMap).
WithValueThreshold(1 << 10 /* 1 KB */).
WithLogger(nil).
WithEncryptionKey(enc.ReadEncryptionKeyFile(r.opt.BadgerKeyFile)).
WithBlockCacheSize(r.opt.BlockCacheSize).
WithIndexCacheSize(r.opt.IndexCacheSize)

opt.Compression = bo.None
opt.ZSTDCompressionLevel = 0
// Overwrite badger options based on the options provided by the user.
r.setBadgerOptions(&opt, compression)
if compression {
opt.Compression = bo.ZSTD
opt.ZSTDCompressionLevel = r.state.opt.BadgerCompressionLevel
}

db, err := badger.OpenManaged(opt)
x.Check(err)
Expand All @@ -161,20 +170,6 @@ func (r *reducer) createTmpBadger() *badger.DB {
return db
}

func (r *reducer) setBadgerOptions(opt *badger.Options, compression bool) {
if !compression {
opt.Compression = bo.None
opt.ZSTDCompressionLevel = 0
return
}
// Set the compression level.
opt.ZSTDCompressionLevel = r.state.opt.BadgerCompressionLevel
if r.state.opt.BadgerCompressionLevel < 1 {
x.Fatalf("Invalid compression level: %d. It should be greater than zero",
r.state.opt.BadgerCompressionLevel)
}
}

type mapIterator struct {
fd *os.File
reader *bufio.Reader
Expand Down
19 changes: 18 additions & 1 deletion dgraph/cmd/bulk/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,10 @@ func init() {
" schema and data files (if encrytped). Enterprise feature.")
flag.Int("badger.compression_level", 1,
"The compression level for Badger. A higher value uses more resources.")
flag.Int64("badger.cache_mb", 0, "Total size of cache (in MB) per shard in reducer.")
flag.String("badger.cache_percentage", "0,100",
"Cache percentages summing up to 100 for various caches"+
" (FORMAT: BlockCacheSize, IndexCacheSize).")
}

func run() {
Expand All @@ -135,7 +139,7 @@ func run() {
ReduceShards: Bulk.Conf.GetInt("reduce_shards"),
CustomTokenizers: Bulk.Conf.GetString("custom_tokenizers"),
NewUids: Bulk.Conf.GetBool("new_uids"),

// Badger options
BadgerKeyFile: Bulk.Conf.GetString("encryption_key_file"),
BadgerCompressionLevel: Bulk.Conf.GetInt("badger.compression_level"),
}
Expand All @@ -149,6 +153,19 @@ func run() {
fmt.Printf("Cannot enable encryption: %s", x.ErrNotSupported)
os.Exit(1)
}
if opt.BadgerCompressionLevel < 0 {
fmt.Printf("Invalid compression level: %d. It should be non-negative",
opt.BadgerCompressionLevel)
}

totalCache := int64(Bulk.Conf.GetInt("badger.cache_mb"))
x.AssertTruef(totalCache >= 0, "ERROR: Cache size must be non-negative")
cachePercent, err := x.GetCachePercentages(Bulk.Conf.GetString("badger.cache_percentage"), 2)
x.Check(err)
totalCache <<= 20 // Convert to MB.
opt.BlockCacheSize = (cachePercent[0] * totalCache) / 100
opt.IndexCacheSize = (cachePercent[1] * totalCache) / 100

if opt.Encrypted && opt.BadgerKeyFile == "" {
fmt.Printf("Must use --encryption_key_file option with --encrypted option.\n")
os.Exit(1)
Expand Down

0 comments on commit 54f58af

Please sign in to comment.