From 39d45074309b4ea11839e33a619c9dd149aa5a38 Mon Sep 17 00:00:00 2001 From: Daniel Mai Date: Wed, 22 May 2019 15:29:08 -0700 Subject: [PATCH] Vendor in latest badger with bug fix for StreamWriter. --- vendor/github.com/dgraph-io/badger/db.go | 76 ++++++++- vendor/github.com/dgraph-io/badger/errors.go | 3 + .../github.com/dgraph-io/badger/publisher.go | 151 ++++++++++++++++++ .../dgraph-io/badger/stream_writer.go | 23 ++- vendor/github.com/dgraph-io/badger/value.go | 25 ++- vendor/vendor.json | 44 ++--- 6 files changed, 289 insertions(+), 33 deletions(-) create mode 100644 vendor/github.com/dgraph-io/badger/publisher.go diff --git a/vendor/github.com/dgraph-io/badger/db.go b/vendor/github.com/dgraph-io/badger/db.go index 8eb52788236..24e7b1a401e 100644 --- a/vendor/github.com/dgraph-io/badger/db.go +++ b/vendor/github.com/dgraph-io/badger/db.go @@ -18,6 +18,7 @@ package badger import ( "bytes" + "context" "encoding/binary" "encoding/hex" "expvar" @@ -32,6 +33,7 @@ import ( "time" "github.com/dgraph-io/badger/options" + "github.com/dgraph-io/badger/pb" "github.com/dgraph-io/badger/skl" "github.com/dgraph-io/badger/table" "github.com/dgraph-io/badger/y" @@ -54,8 +56,11 @@ type closers struct { memtable *y.Closer writes *y.Closer valueGC *y.Closer + pub *y.Closer } +type callback func(kv *pb.KVList) + // DB provides the various functions required to interact with Badger. // DB is thread-safe. type DB struct { @@ -76,6 +81,7 @@ type DB struct { vhead valuePointer // less than or equal to a pointer to the last vlog value put into mt writeCh chan *request flushChan chan flushTask // For flushing memtables. + closeOnce sync.Once // For closing DB only once. // Number of log rotates since the last memtable flush. We will access this field via atomic // functions. Since we are not going to use any 64bit atomic functions, there is no need for @@ -85,6 +91,8 @@ type DB struct { blockWrites int32 orc *oracle + + pub *publisher } const ( @@ -267,6 +275,7 @@ func Open(opt Options) (db *DB, err error) { dirLockGuard: dirLockGuard, valueDirGuard: valueDirLockGuard, orc: newOracle(opt), + pub: newPublisher(), } // Calculate initial size. @@ -323,16 +332,26 @@ func Open(opt Options) (db *DB, err error) { db.closers.valueGC = y.NewCloser(1) go db.vlog.waitOnGC(db.closers.valueGC) + db.closers.pub = y.NewCloser(1) + go db.pub.listenForUpdates(db.closers.pub) + valueDirLockGuard = nil dirLockGuard = nil manifestFile = nil return db, nil } -// Close closes a DB. It's crucial to call it to ensure all the pending updates -// make their way to disk. Calling DB.Close() multiple times is not safe and would -// cause panic. -func (db *DB) Close() (err error) { +// Close closes a DB. It's crucial to call it to ensure all the pending updates make their way to +// disk. Calling DB.Close() multiple times would still only close the DB once. +func (db *DB) Close() error { + var err error + db.closeOnce.Do(func() { + err = db.close() + }) + return err +} + +func (db *DB) close() (err error) { db.elog.Printf("Closing database") atomic.StoreInt32(&db.blockWrites, 1) @@ -342,6 +361,8 @@ func (db *DB) Close() (err error) { // Stop writes next. db.closers.writes.SignalAndWait() + db.closers.pub.SignalAndWait() + // Now close the value log. if vlogErr := db.vlog.Close(); vlogErr != nil { err = errors.Wrap(vlogErr, "DB.Close") @@ -610,6 +631,8 @@ func (db *DB) writeRequests(reqs []*request) error { return err } + db.elog.Printf("Sending updates to subscribers") + db.pub.sendUpdates(reqs) db.elog.Printf("Writing to memtable") var count int for _, b := range reqs { @@ -662,6 +685,8 @@ func (db *DB) sendToWriteCh(entries []*Entry) (*request, error) { req.Entries = entries req.Wg = sync.WaitGroup{} req.Wg.Add(1) + req.IncrRef() // for db write + req.IncrRef() // for publisher updates db.writeCh <- req // Handled in doWrites. y.NumPuts.Add(int64(len(entries))) @@ -1409,3 +1434,46 @@ func (db *DB) DropPrefix(prefix []byte) error { db.opt.Infof("DropPrefix done") return nil } + +// Subscribe can be used watch key changes for the given key prefix. +func (db *DB) Subscribe(ctx context.Context, cb callback, prefix []byte, prefixes ...[]byte) error { + if cb == nil { + return ErrNilCallback + } + prefixes = append(prefixes, prefix) + c := y.NewCloser(1) + recvCh, id := db.pub.newSubscriber(c, prefixes...) + slurp := func(batch *pb.KVList) { + defer func() { + if len(batch.GetKv()) > 0 { + cb(batch) + } + }() + for { + select { + case kvs := <-recvCh: + batch.Kv = append(batch.Kv, kvs.Kv...) + default: + return + } + } + } + for { + select { + case <-c.HasBeenClosed(): + slurp(new(pb.KVList)) + // Drain if any pending updates. + c.Done() + // No need to delete here. Closer will be called only while + // closing DB. Subscriber will be deleted by cleanSubscribers. + return nil + case <-ctx.Done(): + c.Done() + db.pub.deleteSubscriber(id) + // Delete the subscriber to avoid further updates. + return ctx.Err() + case batch := <-recvCh: + slurp(batch) + } + } +} diff --git a/vendor/github.com/dgraph-io/badger/errors.go b/vendor/github.com/dgraph-io/badger/errors.go index a0c1806870c..cad66cb16ef 100644 --- a/vendor/github.com/dgraph-io/badger/errors.go +++ b/vendor/github.com/dgraph-io/badger/errors.go @@ -102,4 +102,7 @@ var ( // ErrBlockedWrites is returned if the user called DropAll. During the process of dropping all // data from Badger, we stop accepting new writes, by returning this error. ErrBlockedWrites = errors.New("Writes are blocked, possibly due to DropAll or Close") + + // ErrNilCallback is returned when subscriber's callback is nil. + ErrNilCallback = errors.New("Callback cannot be nil") ) diff --git a/vendor/github.com/dgraph-io/badger/publisher.go b/vendor/github.com/dgraph-io/badger/publisher.go new file mode 100644 index 00000000000..60f4fc90031 --- /dev/null +++ b/vendor/github.com/dgraph-io/badger/publisher.go @@ -0,0 +1,151 @@ +/* + * Copyright 2019 Dgraph Labs, Inc. and Contributors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package badger + +import ( + "bytes" + "sync" + + "github.com/dgraph-io/badger/pb" + "github.com/dgraph-io/badger/y" +) + +type subscriber struct { + prefixes [][]byte + sendCh chan<- *pb.KVList + subCloser *y.Closer +} + +type publisher struct { + sync.Mutex + pubCh chan requests + subscribers map[uint64]subscriber + nextID uint64 +} + +func newPublisher() *publisher { + return &publisher{ + pubCh: make(chan requests, 1000), + subscribers: make(map[uint64]subscriber), + nextID: 0, + } +} + +func (p *publisher) listenForUpdates(c *y.Closer) { + defer func() { + p.cleanSubscribers() + c.Done() + }() + slurp := func(batch []*request) { + for { + select { + case reqs := <-p.pubCh: + batch = append(batch, reqs...) + default: + p.publishUpdates(batch) + return + } + } + } + for { + select { + case <-c.HasBeenClosed(): + return + case reqs := <-p.pubCh: + slurp(reqs) + } + } +} + +func (p *publisher) publishUpdates(reqs requests) { + kvs := &pb.KVList{} + p.Lock() + defer func() { + p.Unlock() + // Release all the request. + reqs.DecrRef() + }() + for _, s := range p.subscribers { + for _, prefix := range s.prefixes { + for _, req := range reqs { + for _, e := range req.Entries { + // TODO: Use trie to find subscribers. + if bytes.HasPrefix(e.Key, prefix) { + k := y.SafeCopy(nil, e.Key) + kv := &pb.KV{ + Key: y.ParseKey(k), + Value: y.SafeCopy(nil, e.Value), + Meta: []byte{e.UserMeta}, + ExpiresAt: e.ExpiresAt, + Version: y.ParseTs(k), + } + kvs.Kv = append(kvs.Kv, kv) + } + } + } + } + if len(kvs.GetKv()) > 0 { + s.sendCh <- kvs + } + } +} + +func (p *publisher) newSubscriber(c *y.Closer, prefixes ...[]byte) (<-chan *pb.KVList, uint64) { + p.Lock() + defer p.Unlock() + ch := make(chan *pb.KVList, 1000) + id := p.nextID + // Increment next ID. + p.nextID++ + p.subscribers[id] = subscriber{ + prefixes: prefixes, + sendCh: ch, + subCloser: c, + } + return ch, id +} + +// cleanSubscribers stops all the subscribers. Ideally, It should be called while closing DB. +func (p *publisher) cleanSubscribers() { + p.Lock() + defer p.Unlock() + for id, s := range p.subscribers { + delete(p.subscribers, id) + s.subCloser.SignalAndWait() + } +} + +func (p *publisher) deleteSubscriber(id uint64) { + p.Lock() + defer p.Unlock() + if _, ok := p.subscribers[id]; !ok { + return + } + delete(p.subscribers, id) +} + +func (p *publisher) sendUpdates(reqs []*request) { + // TODO: Prefix check before pushing into pubCh. + if p.noOfSubscribers() != 0 { + p.pubCh <- reqs + } +} + +func (p *publisher) noOfSubscribers() int { + p.Lock() + defer p.Unlock() + return len(p.subscribers) +} diff --git a/vendor/github.com/dgraph-io/badger/stream_writer.go b/vendor/github.com/dgraph-io/badger/stream_writer.go index 99fea70c647..cdf8849c058 100644 --- a/vendor/github.com/dgraph-io/badger/stream_writer.go +++ b/vendor/github.com/dgraph-io/badger/stream_writer.go @@ -27,6 +27,8 @@ import ( "github.com/pkg/errors" ) +const headStreamId uint32 = math.MaxUint32 + // StreamWriter is used to write data coming from multiple streams. The streams must not have any // overlapping key ranges. Within each stream, the keys must be sorted. Badger Stream framework is // capable of generating such an output. So, this StreamWriter can be used at the other end to build @@ -157,7 +159,7 @@ func (sw *StreamWriter) Flush() error { // Encode and write the value log head into a new table. data := make([]byte, vptrSize) sw.head.Encode(data) - headWriter := sw.newWriter(math.MaxUint32) + headWriter := sw.newWriter(headStreamId) if err := headWriter.Add( y.KeyWithTs(head, sw.maxVersion), y.ValueStruct{Value: data}); err != nil { @@ -195,13 +197,13 @@ type sortedWriter struct { builder *table.Builder lastKey []byte - streamID uint32 + streamId uint32 } -func (sw *StreamWriter) newWriter(streamID uint32) *sortedWriter { +func (sw *StreamWriter) newWriter(streamId uint32) *sortedWriter { return &sortedWriter{ db: sw.db, - streamID: streamID, + streamId: streamId, throttle: sw.throttle, builder: table.NewTableBuilder(), } @@ -269,6 +271,10 @@ func (w *sortedWriter) createTable(data []byte) error { lc := w.db.lc var lhandler *levelHandler + // We should start the levels from 1, because we need level 0 to set the !badger!head key. We + // cannot mix up this key with other keys from the DB, otherwise we would introduce a range + // overlap violation. + y.AssertTrue(len(lc.levels) > 1) for _, l := range lc.levels[1:] { ratio := float64(l.getTotalSize()) / float64(l.maxTotalSize) if ratio < 1.0 { @@ -277,8 +283,15 @@ func (w *sortedWriter) createTable(data []byte) error { } } if lhandler == nil { + // If we're exceeding the size of the lowest level, shove it in the lowest level. Can't do + // better than that. lhandler = lc.levels[len(lc.levels)-1] } + if w.streamId == headStreamId { + // This is a special !badger!head key. We should store it at level 0, separate from all the + // other keys to avoid an overlap. + lhandler = lc.levels[0] + } // Now that table can be opened successfully, let's add this to the MANIFEST. change := &pb.ManifestChange{ Id: tbl.ID(), @@ -293,6 +306,6 @@ func (w *sortedWriter) createTable(data []byte) error { return err } w.db.opt.Infof("Table created: %d at level: %d for stream: %d. Size: %s\n", - fileID, lhandler.level, w.streamID, humanize.Bytes(uint64(tbl.Size()))) + fileID, lhandler.level, w.streamId, humanize.Bytes(uint64(tbl.Size()))) return nil } diff --git a/vendor/github.com/dgraph-io/badger/value.go b/vendor/github.com/dgraph-io/badger/value.go index 3ae3d344c60..26edc185ef1 100644 --- a/vendor/github.com/dgraph-io/badger/value.go +++ b/vendor/github.com/dgraph-io/badger/value.go @@ -885,16 +885,37 @@ type request struct { Ptrs []valuePointer Wg sync.WaitGroup Err error + ref int32 +} + +func (req *request) IncrRef() { + atomic.AddInt32(&req.ref, 1) +} + +func (req *request) DecrRef() { + nRef := atomic.AddInt32(&req.ref, -1) + if nRef > 0 { + return + } + req.Entries = nil + requestPool.Put(req) } func (req *request) Wait() error { req.Wg.Wait() - req.Entries = nil err := req.Err - requestPool.Put(req) + req.DecrRef() // DecrRef after writing to DB. return err } +type requests []*request + +func (reqs requests) DecrRef() { + for _, req := range reqs { + req.DecrRef() + } +} + // sync function syncs content of latest value log file to disk. Syncing of value log directory is // not required here as it happens every time a value log file rotation happens(check createVlogFile // function). During rotation, previous value log file also gets synced to disk. It only syncs file diff --git a/vendor/vendor.json b/vendor/vendor.json index de3c9f6f7d4..47736f934ad 100644 --- a/vendor/vendor.json +++ b/vendor/vendor.json @@ -423,26 +423,26 @@ "revisionTime": "2016-09-07T16:21:46Z" }, { - "checksumSHA1": "fzZnUzHmwL4/i/EiRIlE0up4184=", + "checksumSHA1": "0UKCvZLZpSV9xNxjS3km2dcWYlk=", "path": "github.com/dgraph-io/badger", - "revision": "d98dd6856e8eb7b02dab0a802624b58a79847678", - "revisionTime": "2019-05-03T23:10:59Z", + "revision": "50073e3d8500778cf5cac2d73deb83514348192b", + "revisionTime": "2019-05-22T22:17:18Z", "version": "HEAD", "versionExact": "HEAD" }, { "checksumSHA1": "oOuT7ebEiZ1ViHLKdFxKFOvobAQ=", "path": "github.com/dgraph-io/badger/options", - "revision": "d98dd6856e8eb7b02dab0a802624b58a79847678", - "revisionTime": "2019-05-03T23:10:59Z", + "revision": "50073e3d8500778cf5cac2d73deb83514348192b", + "revisionTime": "2019-05-22T22:17:18Z", "version": "HEAD", "versionExact": "HEAD" }, { - "checksumSHA1": "yu19DxVlBi/XuPl0SRV46bkBNUI=", + "checksumSHA1": "SV7o4+eEK7/XNWC7H7Z5vWCoHP0=", "path": "github.com/dgraph-io/badger/pb", - "revision": "d98dd6856e8eb7b02dab0a802624b58a79847678", - "revisionTime": "2019-05-03T23:10:59Z", + "revision": "50073e3d8500778cf5cac2d73deb83514348192b", + "revisionTime": "2019-05-22T22:17:18Z", "version": "HEAD", "versionExact": "HEAD" }, @@ -457,24 +457,24 @@ { "checksumSHA1": "00T6XbLV4d95J7hm6kTXDReaQHM=", "path": "github.com/dgraph-io/badger/skl", - "revision": "d98dd6856e8eb7b02dab0a802624b58a79847678", - "revisionTime": "2019-05-03T23:10:59Z", + "revision": "50073e3d8500778cf5cac2d73deb83514348192b", + "revisionTime": "2019-05-22T22:17:18Z", "version": "HEAD", "versionExact": "HEAD" }, { "checksumSHA1": "ovAuDsGfn83KcTieKJjvx93TyUU=", "path": "github.com/dgraph-io/badger/table", - "revision": "d98dd6856e8eb7b02dab0a802624b58a79847678", - "revisionTime": "2019-05-03T23:10:59Z", + "revision": "50073e3d8500778cf5cac2d73deb83514348192b", + "revisionTime": "2019-05-22T22:17:18Z", "version": "HEAD", "versionExact": "HEAD" }, { - "checksumSHA1": "FKcWgmBresQu0pBFw+6SGC25pdE=", + "checksumSHA1": "KI48+d+XHzrlAenqQh/Re7swIWk=", "path": "github.com/dgraph-io/badger/y", - "revision": "d98dd6856e8eb7b02dab0a802624b58a79847678", - "revisionTime": "2019-05-03T23:10:59Z", + "revision": "50073e3d8500778cf5cac2d73deb83514348192b", + "revisionTime": "2019-05-22T22:17:18Z", "version": "HEAD", "versionExact": "HEAD" }, @@ -593,10 +593,10 @@ "revisionTime": "2016-01-25T20:49:56Z" }, { - "checksumSHA1": "Y2MOwzNZfl4NRNDbLCZa6sgx7O0=", + "checksumSHA1": "CGj8VcI/CpzxaNqlqpEVM7qElD4=", "path": "github.com/golang/protobuf/proto", - "revision": "e91709a02e0e8ff8b86b7aa913fdc9ae9498e825", - "revisionTime": "2019-04-09T05:09:43Z" + "revision": "b285ee9cfc6c881bb20c0d8dc73370ea9b9ec90f", + "revisionTime": "2019-05-17T06:12:10Z" }, { "checksumSHA1": "z4copNgeTN77OymdDKqLaIK/vSI=", @@ -899,14 +899,14 @@ { "checksumSHA1": "UxahDzW2v4mf/+aFxruuupaoIwo=", "path": "golang.org/x/net/internal/timeseries", - "revision": "3ec19112720433827bbce8be9342797f5a6aaaf9", - "revisionTime": "2019-05-12T12:16:34Z" + "revision": "f3200d17e092c607f615320ecaad13d87ad9a2b3", + "revisionTime": "2019-05-22T15:39:15Z" }, { "checksumSHA1": "HvmG9LfStMLF+hIC7xR4SxegMis=", "path": "golang.org/x/net/trace", - "revision": "3ec19112720433827bbce8be9342797f5a6aaaf9", - "revisionTime": "2019-05-12T12:16:34Z" + "revision": "f3200d17e092c607f615320ecaad13d87ad9a2b3", + "revisionTime": "2019-05-22T15:39:15Z" }, { "checksumSHA1": "REkmyB368pIiip76LiqMLspgCRk=",