Skip to content

Commit

Permalink
[BREAKING] feat(flags): expand badger to accept all valid options (#7677
Browse files Browse the repository at this point in the history
)

Makes --badger flag to accept all the valid badger options (except Logger & EncryptionKey). Additionally. it accepts input for "compression" with following values [none, snappy, zstd:<level>].
Corresponding PRs in badger: dgraph-io/badger#1688, dgraph-io/badger#1689
It also renames "goroutines" subflag of --badger to "numgoroutines". Also, it removes "cache-size" and "cache-percentage" subflag from --badger in bulk command.

Additionally, this also moves the max-retries under --limit.
  • Loading branch information
NamanJain8 authored Apr 6, 2021
1 parent f9d045a commit 14c2b72
Show file tree
Hide file tree
Showing 11 changed files with 50 additions and 89 deletions.
34 changes: 16 additions & 18 deletions dgraph/cmd/alpha/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"syscall"
"time"

"github.com/dgraph-io/badger/v3"
"github.com/dgraph-io/dgraph/ee"
"github.com/dgraph-io/dgraph/ee/audit"

Expand Down Expand Up @@ -128,16 +129,13 @@ they form a Raft group and provide synchronous replication.
grpc.EnableTracing = false

flag.String("badger", worker.BadgerDefaults, z.NewSuperFlagHelp(worker.BadgerDefaults).
Head("Badger options").
Head("Badger options (Refer to badger documentation for all possible options)").
Flag("compression",
`[none, zstd:level, snappy] Specifies the compression algorithm and
compression level (if applicable) for the postings directory."none" would disable
compression, while "zstd:1" would set zstd compression at level 1.`).
Flag("goroutines",
Flag("numgoroutines",
"The number of goroutines to use in badger.Stream.").
Flag("max-retries",
"Commits to disk will give up after these number of retries to prevent locking the "+
"worker in a failed state. Use -1 to retry infinitely.").
String())

// Cache flags.
Expand Down Expand Up @@ -212,6 +210,9 @@ they form a Raft group and provide synchronous replication.
Flag("query-timeout",
"Maximum time after which a query execution will fail. If set to"+
" 0, the timeout is infinite.").
Flag("max-retries",
"Commits to disk will give up after these number of retries to prevent locking the "+
"worker in a failed state. Use -1 to retry infinitely.").
Flag("txn-abort-after", "Abort any pending transactions older than this duration."+
" The liveness of a transaction is determined by its last mutation.").
String())
Expand Down Expand Up @@ -633,22 +634,19 @@ func run() {
pstoreBlockCacheSize := (cachePercent[1] * (totalCache << 20)) / 100
pstoreIndexCacheSize := (cachePercent[2] * (totalCache << 20)) / 100

badger := z.NewSuperFlag(Alpha.Conf.GetString("badger")).MergeAndCheckDefault(
worker.BadgerDefaults)
ctype, clevel := x.ParseCompression(badger.GetString("compression"))
cacheOpts := fmt.Sprintf("blockcachesize=%d; indexcachesize=%d; ",
pstoreBlockCacheSize, pstoreIndexCacheSize)
bopts := badger.DefaultOptions("").FromSuperFlag(worker.BadgerDefaults + cacheOpts).
FromSuperFlag(Alpha.Conf.GetString("badger"))

security := z.NewSuperFlag(Alpha.Conf.GetString("security")).MergeAndCheckDefault(
worker.SecurityDefaults)
conf := audit.GetAuditConf(Alpha.Conf.GetString("audit"))
opts := worker.Options{
PostingDir: Alpha.Conf.GetString("postings"),
WALDir: Alpha.Conf.GetString("wal"),
PostingDirCompression: ctype,
PostingDirCompressionLevel: clevel,
CacheMb: totalCache,
CachePercentage: cachePercentage,
PBlockCacheSize: pstoreBlockCacheSize,
PIndexCacheSize: pstoreIndexCacheSize,
PostingDir: Alpha.Conf.GetString("postings"),
WALDir: Alpha.Conf.GetString("wal"),
CacheMb: totalCache,
CachePercentage: cachePercentage,

MutationsMode: worker.AllowMutations,
AuthToken: security.GetString("token"),
Expand Down Expand Up @@ -712,8 +710,7 @@ func run() {
TLSServerConfig: tlsServerConf,
HmacSecret: opts.HmacSecret,
Audit: opts.Audit != nil,
Badger: badger,
MaxRetries: badger.GetInt64("max-retries"),
Badger: bopts,
}
x.WorkerConfig.Parse(Alpha.Conf)

Expand All @@ -734,6 +731,7 @@ func run() {
x.Config.BlockClusterWideDrop = x.Config.Limit.GetBool("disallow-drop")
x.Config.LimitNormalizeNode = int(x.Config.Limit.GetInt64("normalize-node"))
x.Config.QueryTimeout = x.Config.Limit.GetDuration("query-timeout")
x.Config.MaxRetries = x.Config.Limit.GetInt64("max-retries")

x.Config.GraphQL = z.NewSuperFlag(Alpha.Conf.GetString("graphql")).MergeAndCheckDefault(
worker.GraphQLDefaults)
Expand Down
13 changes: 4 additions & 9 deletions dgraph/cmd/bulk/loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ import (
"google.golang.org/grpc/credentials"

"github.com/dgraph-io/badger/v3"
bo "github.com/dgraph-io/badger/v3/options"
"github.com/dgraph-io/badger/v3/y"

"github.com/dgraph-io/dgraph/chunker"
Expand Down Expand Up @@ -85,12 +84,8 @@ type options struct {
// ........... Badger options ..........
// EncryptionKey is the key used for encryption. Enterprise only feature.
EncryptionKey x.SensitiveByteSlice
// BadgerCompression is the compression algorithm to use while writing to badger.
BadgerCompression bo.CompressionType
// BadgerCompressionlevel is the compression level to use while writing to badger.
BadgerCompressionLevel int
BlockCacheSize int64
IndexCacheSize int64
// Badger options.
Badger badger.Options
}

type state struct {
Expand Down Expand Up @@ -236,8 +231,8 @@ func (ld *loader) mapStage() {
}
ld.xids = xidmap.New(xidmap.XidMapOptions{
UidAssigner: ld.zero,
DB: db,
Dir: filepath.Join(ld.opt.TmpDir, bufferDir),
DB: db,
Dir: filepath.Join(ld.opt.TmpDir, bufferDir),
})

fs := filestore.NewFileStore(ld.opt.DataFiles)
Expand Down
11 changes: 5 additions & 6 deletions dgraph/cmd/bulk/reduce.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,18 +130,17 @@ func (r *reducer) createBadgerInternal(dir string, compression bool) *badger.DB
key = nil
}

opt := badger.DefaultOptions(dir).
opt := r.state.opt.Badger.
WithDir(dir).WithValueDir(dir).
WithSyncWrites(false).
WithEncryptionKey(key).
WithBlockCacheSize(r.opt.BlockCacheSize).
WithIndexCacheSize(r.opt.IndexCacheSize)
WithEncryptionKey(key)

opt.Compression = bo.None
opt.ZSTDCompressionLevel = 0
// Overwrite badger options based on the options provided by the user.
if compression {
opt.Compression = r.state.opt.BadgerCompression
opt.ZSTDCompressionLevel = r.state.opt.BadgerCompressionLevel
opt.Compression = r.state.opt.Badger.Compression
opt.ZSTDCompressionLevel = r.state.opt.Badger.ZSTDCompressionLevel
}

db, err := badger.OpenManaged(opt)
Expand Down
35 changes: 11 additions & 24 deletions dgraph/cmd/bulk/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"strconv"
"strings"

"github.com/dgraph-io/badger/v3"
"github.com/dgraph-io/dgraph/ee"
"github.com/dgraph-io/dgraph/filestore"
"github.com/dgraph-io/dgraph/protos/pb"
Expand All @@ -47,8 +48,7 @@ var Bulk x.SubCommand

var defaultOutDir = "./out"

const BulkBadgerDefaults = "compression=snappy; goroutines=8;" +
" cache-mb=64; cache-percentage=70,30;"
const BulkBadgerDefaults = "compression=snappy; numgoroutines=8;"

func init() {
Bulk.Cmd = &cobra.Command{
Expand Down Expand Up @@ -122,18 +122,13 @@ func init() {
"Namespace onto which to load the data. If not set, will preserve the namespace.")

flag.String("badger", BulkBadgerDefaults, z.NewSuperFlagHelp(BulkBadgerDefaults).
Head("Badger options").
Head("Badger options (Refer to badger documentation for all possible options)").
Flag("compression",
"Specifies the compression algorithm and compression level (if applicable) for the "+
`postings directory. "none" would disable compression, while "zstd:1" would set `+
"zstd compression at level 1.").
Flag("goroutines",
Flag("numgoroutines",
"The number of goroutines to use in badger.Stream.").
Flag("cache-mb",
"Total size of cache (in MB) per shard in the reducer.").
Flag("cache-percentage",
"Cache percentages summing up to 100 for various caches. (FORMAT: BlockCacheSize,"+
"IndexCacheSize)").
String())

x.RegisterClientTLSFlags(flag)
Expand All @@ -142,9 +137,12 @@ func init() {
}

func run() {
badger := z.NewSuperFlag(Bulk.Conf.GetString("badger")).MergeAndCheckDefault(
BulkBadgerDefaults)
ctype, clevel := x.ParseCompression(badger.GetString("compression"))
cacheSize := 64 << 20 // These are the default values. User can overwrite them using --badger.
cacheDefaults := fmt.Sprintf("indexcachesize=%d; blockcachesize=%d; ",
(70*cacheSize)/100, (30*cacheSize)/100)

bopts := badger.DefaultOptions("").FromSuperFlag(BulkBadgerDefaults + cacheDefaults).
FromSuperFlag(Bulk.Conf.GetString("badger"))
opt := options{
DataFiles: Bulk.Conf.GetString("files"),
DataFormat: Bulk.Conf.GetString("format"),
Expand Down Expand Up @@ -172,25 +170,14 @@ func run() {
NewUids: Bulk.Conf.GetBool("new_uids"),
ClientDir: Bulk.Conf.GetString("xidmap"),
Namespace: Bulk.Conf.GetUint64("force-namespace"),

// Badger options
BadgerCompression: ctype,
BadgerCompressionLevel: clevel,
Badger: bopts,
}

x.PrintVersion()
if opt.Version {
os.Exit(0)
}

totalCache := int64(badger.GetUint64("cache-mb"))
x.AssertTruef(totalCache >= 0, "ERROR: Cache size must be non-negative")
cachePercent, err := x.GetCachePercentages(badger.GetString("cache-percentage"), 2)
x.Check(err)
totalCache <<= 20 // Convert to MB.
opt.BlockCacheSize = (cachePercent[0] * totalCache) / 100
opt.IndexCacheSize = (cachePercent[1] * totalCache) / 100

_, opt.EncryptionKey = ee.GetKeys(Bulk.Conf)
if len(opt.EncryptionKey) == 0 {
if opt.Encrypted || opt.EncryptedOut {
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ require (
github.com/Shopify/sarama v1.27.2
github.com/blevesearch/bleve v1.0.13
github.com/codahale/hdrhistogram v0.0.0-20161010025455-3a0bb77429bd
github.com/dgraph-io/badger/v3 v3.0.0-20210309075542-2245c18dfd1f
github.com/dgraph-io/badger/v3 v3.0.0-20210405181011-d918b9904b2a
github.com/dgraph-io/dgo/v200 v200.0.0-20210401091508-95bfd74de60e
github.com/dgraph-io/gqlgen v0.13.2
github.com/dgraph-io/gqlparser/v2 v2.2.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -117,8 +117,8 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/dgraph-io/badger v1.6.0 h1:DshxFxZWXUcO0xX476VJC07Xsr6ZCBVRHKZ93Oh7Evo=
github.com/dgraph-io/badger v1.6.0/go.mod h1:zwt7syl517jmP8s94KqSxTlM6IMsdhYy6psNgSztDR4=
github.com/dgraph-io/badger/v3 v3.0.0-20210309075542-2245c18dfd1f h1:dZpGNLp9YUpq4h2DRcWAjW5dWj47SM3W3NK71z6FRa0=
github.com/dgraph-io/badger/v3 v3.0.0-20210309075542-2245c18dfd1f/go.mod h1:GHMCYxuDWyzbHkh4k3yyg4PM61tJPFfEGSMbE3Vd5QE=
github.com/dgraph-io/badger/v3 v3.0.0-20210405181011-d918b9904b2a h1:KUJzMbhVSuSDkXXkV0yI1Uj/hGNOGTUEc0dbusDixas=
github.com/dgraph-io/badger/v3 v3.0.0-20210405181011-d918b9904b2a/go.mod h1:GHMCYxuDWyzbHkh4k3yyg4PM61tJPFfEGSMbE3Vd5QE=
github.com/dgraph-io/dgo/v200 v200.0.0-20210401091508-95bfd74de60e h1:kdH2yqGYUl5xJARdI5kN1fjhVUV2sLC+vL1CVXhcAfo=
github.com/dgraph-io/dgo/v200 v200.0.0-20210401091508-95bfd74de60e/go.mod h1:zCfS4R3E/UC/PhETXJYq/Blia0eCH1EQqKrWDvvimxE=
github.com/dgraph-io/gqlgen v0.13.2 h1:TNhndk+eHKj5qE7BenKKSYdSIdOGhLqxR1rCiMso9KM=
Expand Down
2 changes: 1 addition & 1 deletion worker/backup_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func NewBackupProcessor(db *badger.DB, req *pb.BackupRequest) *BackupProcessor {
bp := &BackupProcessor{
DB: db,
Request: req,
threads: make([]*threadLocal, x.WorkerConfig.Badger.GetUint64("goroutines")),
threads: make([]*threadLocal, x.WorkerConfig.Badger.NumGoroutines),
}
if req.SinceTs > 0 && db != nil {
bp.txn = db.NewTransactionAt(req.ReadTs, false)
Expand Down
12 changes: 0 additions & 12 deletions worker/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"path/filepath"
"time"

bo "github.com/dgraph-io/badger/v3/options"
"github.com/dgraph-io/dgraph/x"
)

Expand All @@ -37,24 +36,13 @@ const (
type Options struct {
// PostingDir is the path to the directory storing the postings..
PostingDir string
// PostingDirCompression is the compression algorithem used to compression Postings directory.
PostingDirCompression bo.CompressionType
// PostingDirCompressionLevel is the ZSTD compression level used by Postings directory. A
// higher value means more CPU intensive compression and better compression
// ratio.
PostingDirCompressionLevel int
// WALDir is the path to the directory storing the write-ahead log.
WALDir string
// MutationsMode is the mode used to handle mutation requests.
MutationsMode int
// AuthToken is the token to be passed for Alter HTTP requests.
AuthToken string

// PBlockCacheSize is the size of block cache for pstore
PBlockCacheSize int64
// PIndexCacheSize is the size of index cache for pstore
PIndexCacheSize int64

// HmacSecret stores the secret used to sign JSON Web Tokens (JWT).
HmacSecret x.SensitiveByteSlice
// AccessJwtTtl is the TTL for the access JWT.
Expand Down
2 changes: 1 addition & 1 deletion worker/draft.go
Original file line number Diff line number Diff line change
Expand Up @@ -776,7 +776,7 @@ func (n *node) commitOrAbort(pkey uint64, delta *pb.OracleDelta) error {
return
}
txn.Update()
err := x.RetryUntilSuccess(int(x.WorkerConfig.MaxRetries),
err := x.RetryUntilSuccess(int(x.Config.MaxRetries),
10*time.Millisecond, func() error {
err := txn.CommitToDisk(writer, commit)
if err == badger.ErrBannedKey {
Expand Down
15 changes: 5 additions & 10 deletions worker/server_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,15 +40,16 @@ const (
// breaks.
AclDefaults = `access-ttl=6h; refresh-ttl=30d; secret-file=;`
AuditDefaults = `compress=false; days=10; size=100; dir=; output=; encrypt-file=;`
BadgerDefaults = `compression=snappy; goroutines=8; max-retries=-1;`
BadgerDefaults = `compression=snappy; numgoroutines=8;`
RaftDefaults = `learner=false; snapshot-after-entries=10000; ` +
`snapshot-after-duration=30m; pending-proposals=256; idx=; group=;`
SecurityDefaults = `token=; whitelist=;`
LudicrousDefaults = `enabled=false; concurrency=2000;`
CDCDefaults = `file=; kafka=; sasl_user=; sasl_password=; ca_cert=; client_cert=; ` +
`client_key=;`
LimitDefaults = `mutations=allow; query-edge=1000000; normalize-node=10000; ` +
`mutations-nquad=1000000; disallow-drop=false; query-timeout=0ms; txn-abort-after=5m;`
`mutations-nquad=1000000; disallow-drop=false; query-timeout=0ms; txn-abort-after=5m; ` +
` max-retries=-1;`
ZeroLimitsDefaults = `uid-lease=0; refill-interval=30s; disable-admin-http=false;`
GraphQLDefaults = `introspection=true; debug=false; extensions=true; poll-interval=1s; ` +
`lambda-url=;`
Expand Down Expand Up @@ -98,10 +99,6 @@ func setBadgerOptions(opt badger.Options) badger.Options {
// saved by disabling it.
opt.DetectConflicts = false

glog.Infof("Setting Posting Dir Compression Level: %d", Config.PostingDirCompressionLevel)
opt.Compression = Config.PostingDirCompression
opt.ZSTDCompressionLevel = Config.PostingDirCompressionLevel

// Settings for the data directory.
return opt
}
Expand Down Expand Up @@ -131,11 +128,9 @@ func (s *ServerState) initStorage() {
// All the writes to posting store should be synchronous. We use batched writers
// for posting lists, so the cost of sync writes is amortized.
x.Check(os.MkdirAll(Config.PostingDir, 0700))
opt := badger.DefaultOptions(Config.PostingDir).
opt := x.WorkerConfig.Badger.
WithDir(Config.PostingDir).WithValueDir(Config.PostingDir).
WithNumVersionsToKeep(math.MaxInt32).
WithNumGoroutines(int(x.WorkerConfig.Badger.GetUint64("goroutines"))).
WithBlockCacheSize(Config.PBlockCacheSize).
WithIndexCacheSize(Config.PIndexCacheSize).
WithNamespaceOffset(x.NamespaceOffset)
opt = setBadgerOptions(opt)

Expand Down
9 changes: 4 additions & 5 deletions x/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"net"
"time"

"github.com/dgraph-io/badger/v3"
"github.com/dgraph-io/ristretto/z"
"github.com/spf13/viper"
)
Expand All @@ -43,6 +44,7 @@ type Options struct {
BlockClusterWideDrop bool
LimitNormalizeNode int
QueryTimeout time.Duration
MaxRetries int64

// GraphQL options:
//
Expand Down Expand Up @@ -96,8 +98,8 @@ type WorkerOptions struct {
TLSServerConfig *tls.Config
// Raft stores options related to Raft.
Raft *z.SuperFlag
// Badger stores options related to Badger.
Badger *z.SuperFlag
// Badger stores the badger options.
Badger badger.Options
// WhiteListedIPRanges is a list of IP ranges from which requests will be allowed.
WhiteListedIPRanges []IPRange
// StrictMutations will cause mutations to unknown predicates to fail if set to true.
Expand All @@ -108,9 +110,6 @@ type WorkerOptions struct {
HmacSecret SensitiveByteSlice
// AbortOlderThan tells Dgraph to discard transactions that are older than this duration.
AbortOlderThan time.Duration
// MaxRetries indicates the number of retries Dgraph do to prevent locking the worker in a
// failed state.
MaxRetries int64
// ProposedGroupId will be used if there's a file in the p directory called group_id with the
// proposed group ID for this server.
ProposedGroupId uint32
Expand Down

0 comments on commit 14c2b72

Please sign in to comment.