diff --git a/dgraph/cmd/alpha/run.go b/dgraph/cmd/alpha/run.go index ed7281b6c73..1a8673e6b3e 100644 --- a/dgraph/cmd/alpha/run.go +++ b/dgraph/cmd/alpha/run.go @@ -35,6 +35,7 @@ import ( "syscall" "time" + "github.com/dgraph-io/badger/v3" "github.com/dgraph-io/dgraph/ee" "github.com/dgraph-io/dgraph/ee/audit" @@ -128,16 +129,13 @@ they form a Raft group and provide synchronous replication. grpc.EnableTracing = false flag.String("badger", worker.BadgerDefaults, z.NewSuperFlagHelp(worker.BadgerDefaults). - Head("Badger options"). + Head("Badger options (Refer to badger documentation for all possible options)"). Flag("compression", `[none, zstd:level, snappy] Specifies the compression algorithm and compression level (if applicable) for the postings directory."none" would disable compression, while "zstd:1" would set zstd compression at level 1.`). - Flag("goroutines", + Flag("numgoroutines", "The number of goroutines to use in badger.Stream."). - Flag("max-retries", - "Commits to disk will give up after these number of retries to prevent locking the "+ - "worker in a failed state. Use -1 to retry infinitely."). String()) // Cache flags. @@ -212,6 +210,9 @@ they form a Raft group and provide synchronous replication. Flag("query-timeout", "Maximum time after which a query execution will fail. If set to"+ " 0, the timeout is infinite."). + Flag("max-retries", + "Commits to disk will give up after these number of retries to prevent locking the "+ + "worker in a failed state. Use -1 to retry infinitely."). Flag("txn-abort-after", "Abort any pending transactions older than this duration."+ " The liveness of a transaction is determined by its last mutation."). String()) @@ -633,22 +634,19 @@ func run() { pstoreBlockCacheSize := (cachePercent[1] * (totalCache << 20)) / 100 pstoreIndexCacheSize := (cachePercent[2] * (totalCache << 20)) / 100 - badger := z.NewSuperFlag(Alpha.Conf.GetString("badger")).MergeAndCheckDefault( - worker.BadgerDefaults) - ctype, clevel := x.ParseCompression(badger.GetString("compression")) + cacheOpts := fmt.Sprintf("blockcachesize=%d; indexcachesize=%d; ", + pstoreBlockCacheSize, pstoreIndexCacheSize) + bopts := badger.DefaultOptions("").FromSuperFlag(worker.BadgerDefaults + cacheOpts). + FromSuperFlag(Alpha.Conf.GetString("badger")) security := z.NewSuperFlag(Alpha.Conf.GetString("security")).MergeAndCheckDefault( worker.SecurityDefaults) conf := audit.GetAuditConf(Alpha.Conf.GetString("audit")) opts := worker.Options{ - PostingDir: Alpha.Conf.GetString("postings"), - WALDir: Alpha.Conf.GetString("wal"), - PostingDirCompression: ctype, - PostingDirCompressionLevel: clevel, - CacheMb: totalCache, - CachePercentage: cachePercentage, - PBlockCacheSize: pstoreBlockCacheSize, - PIndexCacheSize: pstoreIndexCacheSize, + PostingDir: Alpha.Conf.GetString("postings"), + WALDir: Alpha.Conf.GetString("wal"), + CacheMb: totalCache, + CachePercentage: cachePercentage, MutationsMode: worker.AllowMutations, AuthToken: security.GetString("token"), @@ -712,8 +710,7 @@ func run() { TLSServerConfig: tlsServerConf, HmacSecret: opts.HmacSecret, Audit: opts.Audit != nil, - Badger: badger, - MaxRetries: badger.GetInt64("max-retries"), + Badger: bopts, } x.WorkerConfig.Parse(Alpha.Conf) @@ -734,6 +731,7 @@ func run() { x.Config.BlockClusterWideDrop = x.Config.Limit.GetBool("disallow-drop") x.Config.LimitNormalizeNode = int(x.Config.Limit.GetInt64("normalize-node")) x.Config.QueryTimeout = x.Config.Limit.GetDuration("query-timeout") + x.Config.MaxRetries = x.Config.Limit.GetInt64("max-retries") x.Config.GraphQL = z.NewSuperFlag(Alpha.Conf.GetString("graphql")).MergeAndCheckDefault( worker.GraphQLDefaults) diff --git a/dgraph/cmd/bulk/loader.go b/dgraph/cmd/bulk/loader.go index 55fe98b62c1..8d2b15fec33 100644 --- a/dgraph/cmd/bulk/loader.go +++ b/dgraph/cmd/bulk/loader.go @@ -36,7 +36,6 @@ import ( "google.golang.org/grpc/credentials" "github.com/dgraph-io/badger/v3" - bo "github.com/dgraph-io/badger/v3/options" "github.com/dgraph-io/badger/v3/y" "github.com/dgraph-io/dgraph/chunker" @@ -85,12 +84,8 @@ type options struct { // ........... Badger options .......... // EncryptionKey is the key used for encryption. Enterprise only feature. EncryptionKey x.SensitiveByteSlice - // BadgerCompression is the compression algorithm to use while writing to badger. - BadgerCompression bo.CompressionType - // BadgerCompressionlevel is the compression level to use while writing to badger. - BadgerCompressionLevel int - BlockCacheSize int64 - IndexCacheSize int64 + // Badger options. + Badger badger.Options } type state struct { @@ -236,8 +231,8 @@ func (ld *loader) mapStage() { } ld.xids = xidmap.New(xidmap.XidMapOptions{ UidAssigner: ld.zero, - DB: db, - Dir: filepath.Join(ld.opt.TmpDir, bufferDir), + DB: db, + Dir: filepath.Join(ld.opt.TmpDir, bufferDir), }) fs := filestore.NewFileStore(ld.opt.DataFiles) diff --git a/dgraph/cmd/bulk/reduce.go b/dgraph/cmd/bulk/reduce.go index 9cc5d81ae88..8e34220edf2 100644 --- a/dgraph/cmd/bulk/reduce.go +++ b/dgraph/cmd/bulk/reduce.go @@ -130,18 +130,17 @@ func (r *reducer) createBadgerInternal(dir string, compression bool) *badger.DB key = nil } - opt := badger.DefaultOptions(dir). + opt := r.state.opt.Badger. + WithDir(dir).WithValueDir(dir). WithSyncWrites(false). - WithEncryptionKey(key). - WithBlockCacheSize(r.opt.BlockCacheSize). - WithIndexCacheSize(r.opt.IndexCacheSize) + WithEncryptionKey(key) opt.Compression = bo.None opt.ZSTDCompressionLevel = 0 // Overwrite badger options based on the options provided by the user. if compression { - opt.Compression = r.state.opt.BadgerCompression - opt.ZSTDCompressionLevel = r.state.opt.BadgerCompressionLevel + opt.Compression = r.state.opt.Badger.Compression + opt.ZSTDCompressionLevel = r.state.opt.Badger.ZSTDCompressionLevel } db, err := badger.OpenManaged(opt) diff --git a/dgraph/cmd/bulk/run.go b/dgraph/cmd/bulk/run.go index 22d76e95dbe..1a77d3ae24c 100644 --- a/dgraph/cmd/bulk/run.go +++ b/dgraph/cmd/bulk/run.go @@ -30,6 +30,7 @@ import ( "strconv" "strings" + "github.com/dgraph-io/badger/v3" "github.com/dgraph-io/dgraph/ee" "github.com/dgraph-io/dgraph/filestore" "github.com/dgraph-io/dgraph/protos/pb" @@ -47,8 +48,7 @@ var Bulk x.SubCommand var defaultOutDir = "./out" -const BulkBadgerDefaults = "compression=snappy; goroutines=8;" + - " cache-mb=64; cache-percentage=70,30;" +const BulkBadgerDefaults = "compression=snappy; numgoroutines=8;" func init() { Bulk.Cmd = &cobra.Command{ @@ -122,18 +122,13 @@ func init() { "Namespace onto which to load the data. If not set, will preserve the namespace.") flag.String("badger", BulkBadgerDefaults, z.NewSuperFlagHelp(BulkBadgerDefaults). - Head("Badger options"). + Head("Badger options (Refer to badger documentation for all possible options)"). Flag("compression", "Specifies the compression algorithm and compression level (if applicable) for the "+ `postings directory. "none" would disable compression, while "zstd:1" would set `+ "zstd compression at level 1."). - Flag("goroutines", + Flag("numgoroutines", "The number of goroutines to use in badger.Stream."). - Flag("cache-mb", - "Total size of cache (in MB) per shard in the reducer."). - Flag("cache-percentage", - "Cache percentages summing up to 100 for various caches. (FORMAT: BlockCacheSize,"+ - "IndexCacheSize)"). String()) x.RegisterClientTLSFlags(flag) @@ -142,9 +137,12 @@ func init() { } func run() { - badger := z.NewSuperFlag(Bulk.Conf.GetString("badger")).MergeAndCheckDefault( - BulkBadgerDefaults) - ctype, clevel := x.ParseCompression(badger.GetString("compression")) + cacheSize := 64 << 20 // These are the default values. User can overwrite them using --badger. + cacheDefaults := fmt.Sprintf("indexcachesize=%d; blockcachesize=%d; ", + (70*cacheSize)/100, (30*cacheSize)/100) + + bopts := badger.DefaultOptions("").FromSuperFlag(BulkBadgerDefaults + cacheDefaults). + FromSuperFlag(Bulk.Conf.GetString("badger")) opt := options{ DataFiles: Bulk.Conf.GetString("files"), DataFormat: Bulk.Conf.GetString("format"), @@ -172,10 +170,7 @@ func run() { NewUids: Bulk.Conf.GetBool("new_uids"), ClientDir: Bulk.Conf.GetString("xidmap"), Namespace: Bulk.Conf.GetUint64("force-namespace"), - - // Badger options - BadgerCompression: ctype, - BadgerCompressionLevel: clevel, + Badger: bopts, } x.PrintVersion() @@ -183,14 +178,6 @@ func run() { os.Exit(0) } - totalCache := int64(badger.GetUint64("cache-mb")) - x.AssertTruef(totalCache >= 0, "ERROR: Cache size must be non-negative") - cachePercent, err := x.GetCachePercentages(badger.GetString("cache-percentage"), 2) - x.Check(err) - totalCache <<= 20 // Convert to MB. - opt.BlockCacheSize = (cachePercent[0] * totalCache) / 100 - opt.IndexCacheSize = (cachePercent[1] * totalCache) / 100 - _, opt.EncryptionKey = ee.GetKeys(Bulk.Conf) if len(opt.EncryptionKey) == 0 { if opt.Encrypted || opt.EncryptedOut { diff --git a/go.mod b/go.mod index fe1253e6a9a..ed342f969b7 100644 --- a/go.mod +++ b/go.mod @@ -14,7 +14,7 @@ require ( github.com/Shopify/sarama v1.27.2 github.com/blevesearch/bleve v1.0.13 github.com/codahale/hdrhistogram v0.0.0-20161010025455-3a0bb77429bd - github.com/dgraph-io/badger/v3 v3.0.0-20210309075542-2245c18dfd1f + github.com/dgraph-io/badger/v3 v3.0.0-20210405181011-d918b9904b2a github.com/dgraph-io/dgo/v200 v200.0.0-20210401091508-95bfd74de60e github.com/dgraph-io/gqlgen v0.13.2 github.com/dgraph-io/gqlparser/v2 v2.2.0 diff --git a/go.sum b/go.sum index c786c2a422a..fa8e18a4a31 100644 --- a/go.sum +++ b/go.sum @@ -117,8 +117,8 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/dgraph-io/badger v1.6.0 h1:DshxFxZWXUcO0xX476VJC07Xsr6ZCBVRHKZ93Oh7Evo= github.com/dgraph-io/badger v1.6.0/go.mod h1:zwt7syl517jmP8s94KqSxTlM6IMsdhYy6psNgSztDR4= -github.com/dgraph-io/badger/v3 v3.0.0-20210309075542-2245c18dfd1f h1:dZpGNLp9YUpq4h2DRcWAjW5dWj47SM3W3NK71z6FRa0= -github.com/dgraph-io/badger/v3 v3.0.0-20210309075542-2245c18dfd1f/go.mod h1:GHMCYxuDWyzbHkh4k3yyg4PM61tJPFfEGSMbE3Vd5QE= +github.com/dgraph-io/badger/v3 v3.0.0-20210405181011-d918b9904b2a h1:KUJzMbhVSuSDkXXkV0yI1Uj/hGNOGTUEc0dbusDixas= +github.com/dgraph-io/badger/v3 v3.0.0-20210405181011-d918b9904b2a/go.mod h1:GHMCYxuDWyzbHkh4k3yyg4PM61tJPFfEGSMbE3Vd5QE= github.com/dgraph-io/dgo/v200 v200.0.0-20210401091508-95bfd74de60e h1:kdH2yqGYUl5xJARdI5kN1fjhVUV2sLC+vL1CVXhcAfo= github.com/dgraph-io/dgo/v200 v200.0.0-20210401091508-95bfd74de60e/go.mod h1:zCfS4R3E/UC/PhETXJYq/Blia0eCH1EQqKrWDvvimxE= github.com/dgraph-io/gqlgen v0.13.2 h1:TNhndk+eHKj5qE7BenKKSYdSIdOGhLqxR1rCiMso9KM= diff --git a/worker/backup_processor.go b/worker/backup_processor.go index c0e6f53cc80..c3283e8244e 100644 --- a/worker/backup_processor.go +++ b/worker/backup_processor.go @@ -62,7 +62,7 @@ func NewBackupProcessor(db *badger.DB, req *pb.BackupRequest) *BackupProcessor { bp := &BackupProcessor{ DB: db, Request: req, - threads: make([]*threadLocal, x.WorkerConfig.Badger.GetUint64("goroutines")), + threads: make([]*threadLocal, x.WorkerConfig.Badger.NumGoroutines), } if req.SinceTs > 0 && db != nil { bp.txn = db.NewTransactionAt(req.ReadTs, false) diff --git a/worker/config.go b/worker/config.go index 0daae39e376..050fc8b156a 100644 --- a/worker/config.go +++ b/worker/config.go @@ -20,7 +20,6 @@ import ( "path/filepath" "time" - bo "github.com/dgraph-io/badger/v3/options" "github.com/dgraph-io/dgraph/x" ) @@ -37,12 +36,6 @@ const ( type Options struct { // PostingDir is the path to the directory storing the postings.. PostingDir string - // PostingDirCompression is the compression algorithem used to compression Postings directory. - PostingDirCompression bo.CompressionType - // PostingDirCompressionLevel is the ZSTD compression level used by Postings directory. A - // higher value means more CPU intensive compression and better compression - // ratio. - PostingDirCompressionLevel int // WALDir is the path to the directory storing the write-ahead log. WALDir string // MutationsMode is the mode used to handle mutation requests. @@ -50,11 +43,6 @@ type Options struct { // AuthToken is the token to be passed for Alter HTTP requests. AuthToken string - // PBlockCacheSize is the size of block cache for pstore - PBlockCacheSize int64 - // PIndexCacheSize is the size of index cache for pstore - PIndexCacheSize int64 - // HmacSecret stores the secret used to sign JSON Web Tokens (JWT). HmacSecret x.SensitiveByteSlice // AccessJwtTtl is the TTL for the access JWT. diff --git a/worker/draft.go b/worker/draft.go index e1aedae1711..859c9c5cbfe 100644 --- a/worker/draft.go +++ b/worker/draft.go @@ -776,7 +776,7 @@ func (n *node) commitOrAbort(pkey uint64, delta *pb.OracleDelta) error { return } txn.Update() - err := x.RetryUntilSuccess(int(x.WorkerConfig.MaxRetries), + err := x.RetryUntilSuccess(int(x.Config.MaxRetries), 10*time.Millisecond, func() error { err := txn.CommitToDisk(writer, commit) if err == badger.ErrBannedKey { diff --git a/worker/server_state.go b/worker/server_state.go index eeef349c49d..18fd1aba82c 100644 --- a/worker/server_state.go +++ b/worker/server_state.go @@ -40,7 +40,7 @@ const ( // breaks. AclDefaults = `access-ttl=6h; refresh-ttl=30d; secret-file=;` AuditDefaults = `compress=false; days=10; size=100; dir=; output=; encrypt-file=;` - BadgerDefaults = `compression=snappy; goroutines=8; max-retries=-1;` + BadgerDefaults = `compression=snappy; numgoroutines=8;` RaftDefaults = `learner=false; snapshot-after-entries=10000; ` + `snapshot-after-duration=30m; pending-proposals=256; idx=; group=;` SecurityDefaults = `token=; whitelist=;` @@ -48,7 +48,8 @@ const ( CDCDefaults = `file=; kafka=; sasl_user=; sasl_password=; ca_cert=; client_cert=; ` + `client_key=;` LimitDefaults = `mutations=allow; query-edge=1000000; normalize-node=10000; ` + - `mutations-nquad=1000000; disallow-drop=false; query-timeout=0ms; txn-abort-after=5m;` + `mutations-nquad=1000000; disallow-drop=false; query-timeout=0ms; txn-abort-after=5m; ` + + ` max-retries=-1;` ZeroLimitsDefaults = `uid-lease=0; refill-interval=30s; disable-admin-http=false;` GraphQLDefaults = `introspection=true; debug=false; extensions=true; poll-interval=1s; ` + `lambda-url=;` @@ -98,10 +99,6 @@ func setBadgerOptions(opt badger.Options) badger.Options { // saved by disabling it. opt.DetectConflicts = false - glog.Infof("Setting Posting Dir Compression Level: %d", Config.PostingDirCompressionLevel) - opt.Compression = Config.PostingDirCompression - opt.ZSTDCompressionLevel = Config.PostingDirCompressionLevel - // Settings for the data directory. return opt } @@ -131,11 +128,9 @@ func (s *ServerState) initStorage() { // All the writes to posting store should be synchronous. We use batched writers // for posting lists, so the cost of sync writes is amortized. x.Check(os.MkdirAll(Config.PostingDir, 0700)) - opt := badger.DefaultOptions(Config.PostingDir). + opt := x.WorkerConfig.Badger. + WithDir(Config.PostingDir).WithValueDir(Config.PostingDir). WithNumVersionsToKeep(math.MaxInt32). - WithNumGoroutines(int(x.WorkerConfig.Badger.GetUint64("goroutines"))). - WithBlockCacheSize(Config.PBlockCacheSize). - WithIndexCacheSize(Config.PIndexCacheSize). WithNamespaceOffset(x.NamespaceOffset) opt = setBadgerOptions(opt) diff --git a/x/config.go b/x/config.go index 2632c2b0fd7..1c032b65f93 100644 --- a/x/config.go +++ b/x/config.go @@ -21,6 +21,7 @@ import ( "net" "time" + "github.com/dgraph-io/badger/v3" "github.com/dgraph-io/ristretto/z" "github.com/spf13/viper" ) @@ -43,6 +44,7 @@ type Options struct { BlockClusterWideDrop bool LimitNormalizeNode int QueryTimeout time.Duration + MaxRetries int64 // GraphQL options: // @@ -96,8 +98,8 @@ type WorkerOptions struct { TLSServerConfig *tls.Config // Raft stores options related to Raft. Raft *z.SuperFlag - // Badger stores options related to Badger. - Badger *z.SuperFlag + // Badger stores the badger options. + Badger badger.Options // WhiteListedIPRanges is a list of IP ranges from which requests will be allowed. WhiteListedIPRanges []IPRange // StrictMutations will cause mutations to unknown predicates to fail if set to true. @@ -108,9 +110,6 @@ type WorkerOptions struct { HmacSecret SensitiveByteSlice // AbortOlderThan tells Dgraph to discard transactions that are older than this duration. AbortOlderThan time.Duration - // MaxRetries indicates the number of retries Dgraph do to prevent locking the worker in a - // failed state. - MaxRetries int64 // ProposedGroupId will be used if there's a file in the p directory called group_id with the // proposed group ID for this server. ProposedGroupId uint32