Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix duplicate queue disk usage #945

Merged
merged 3 commits into from
Mar 13, 2023
Merged
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 33 additions & 36 deletions pinner/pinmgr.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
package pinner

import (
"bytes"
"context"
"encoding/gob"
"os"
"path/filepath"
"strconv"
Expand Down Expand Up @@ -66,9 +64,9 @@ type PinManager struct {
pinQueueIn chan *operation.PinningOperation
pinQueueOut chan *operation.PinningOperation
pinComplete chan *operation.PinningOperation
duplicateGuard *leveldb.DB
activePins map[uint]int // used to limit the number of pins per user
pinQueueCount map[uint]int // keep track of queue count per user
duplicateGuard map[uint64]bool // track whether a content id already exists in the queue
activePins map[uint]int // used to limit the number of pins per user
pinQueueCount map[uint]int // keep track of queue count per user
pinQueue *goque.PrefixQueue
pinQueueLk sync.Mutex
RunPinFunc PinFunc
Expand Down Expand Up @@ -143,7 +141,7 @@ func newPinManager(pinfunc PinFunc, scf PinStatusFunc, opts *PinManagerOpts, log
if opts.QueueDataDir == "" {
log.Fatal("Deque needs queue data dir")
}

duplicateGuard := buildDuplicateeGuardFromPinQueue(opts.QueueDataDir, log)
pinQueue := createDQue(opts.QueueDataDir, log)
//we need to have a variable pinQueueCount which keeps track in memory count in the queue
//Since the disk dequeue is durable
Expand All @@ -157,7 +155,7 @@ func newPinManager(pinfunc PinFunc, scf PinStatusFunc, opts *PinManagerOpts, log
pinQueueIn: make(chan *operation.PinningOperation, 64),
pinQueueOut: make(chan *operation.PinningOperation),
pinComplete: make(chan *operation.PinningOperation, 64),
duplicateGuard: createLevelDB(opts.QueueDataDir, log),
duplicateGuard: duplicateGuard,
RunPinFunc: pinfunc,
StatusChangeFunc: scf,
maxActivePerUser: opts.MaxActivePerUser,
Expand All @@ -178,11 +176,9 @@ func (pm *PinManager) complete(po *operation.PinningOperation) {
defer pm.pinQueueLk.Unlock()

opData := getPinningData(po)
err := pm.duplicateGuard.Delete(createLevelDBKey(opData, pm.log), nil)
if err != nil {
//Delete will not returns error if key doesn't exist
pm.log.Errorf("Error deleting item from duplicate guard ", err)
}

//delete from duplicateGuard
delete(pm.duplicateGuard, createLevelDBKey(opData, pm.log))

pm.activePins[po.UserId]--
if pm.activePins[po.UserId] == 0 {
Expand Down Expand Up @@ -297,33 +293,33 @@ func (pm *PinManager) closeQueueDataStructures() {
if err != nil {
pm.log.Fatal(err)
}

err = pm.duplicateGuard.Close()
if err != nil {
pm.log.Fatal(err)
}
}

func createLevelDBKey(value PinningOperationData, log *zap.SugaredLogger) []byte {
var buffer bytes.Buffer
enc := gob.NewEncoder(&buffer)
if err := enc.Encode(value.ContId); err != nil {
log.Fatal("Unable to encode value")
}
return buffer.Bytes()
func createLevelDBKey(value PinningOperationData, log *zap.SugaredLogger) uint64 {
return value.ContId
}

func createLevelDB(QueueDataDir string, log *zap.SugaredLogger) *leveldb.DB {
dname := filepath.Join(QueueDataDir, "duplicateGuard")
err := os.MkdirAll(dname, os.ModePerm)
func buildDuplicateeGuardFromPinQueue(QueueDataDir string, log *zap.SugaredLogger) map[uint64]bool {
ret := make(map[uint64]bool)
dname := filepath.Join(QueueDataDir, "pinQueueMsgPack")
db, err := leveldb.OpenFile(dname, nil)
if err != nil {
log.Fatal("Unable to create directory for LevelDB. Out of disk? Too many open files? try ulimit -n 50000")
return ret
}
db, err := leveldb.OpenFile(dname, nil)
iter := db.NewIterator(nil, nil)
for iter.Next() {
entry, err := decodeMsgPack(iter.Value())
if err != nil {
continue
}
ret[entry.ContId] = true
}

err = db.Close()
if err != nil {
log.Fatal("Unable to create LevelDB. Out of disk? Too many open files? try ulimit -n 50000")
log.Fatal(err)
}
return db
return ret
}

// queue defines the unique queue for a prefix.
Expand Down Expand Up @@ -379,7 +375,8 @@ func (pm *PinManager) enqueuePinOp(po *operation.PinningOperation) {
pm.log.Debugf("adding pin(%d) operation to the pinner queue", po.ContId)

poData := getPinningData(po)
if _, err := pm.duplicateGuard.Get(createLevelDBKey(poData, pm.log), nil); err != leveldb.ErrNotFound {
_, exists := pm.duplicateGuard[createLevelDBKey(poData, pm.log)]
if exists {
//work already exists in the queue not adding duplicate
return
}
Expand All @@ -400,10 +397,10 @@ func (pm *PinManager) enqueuePinOp(po *operation.PinningOperation) {
pm.log.Fatal("Unable to add pin to queue.", err)
}

err = pm.duplicateGuard.Put(createLevelDBKey(poData, pm.log), []byte{255}, nil)
if err != nil {
pm.log.Fatal("Unable to add to duplicate guard.")
}
// set cont id to true so it is not enqueued multiple times
pm.duplicateGuard[createLevelDBKey(poData, pm.log)] = true

//increase global queue count by 1
pm.pinQueueCount[u]++
}

Expand Down