diff --git a/pinner/pinmgr.go b/pinner/pinmgr.go index 93cdae34..14fb0a6e 100644 --- a/pinner/pinmgr.go +++ b/pinner/pinmgr.go @@ -1,9 +1,7 @@ package pinner import ( - "bytes" "context" - "encoding/gob" "os" "path/filepath" "strconv" @@ -66,9 +64,9 @@ type PinManager struct { pinQueueIn chan *operation.PinningOperation pinQueueOut chan *operation.PinningOperation pinComplete chan *operation.PinningOperation - duplicateGuard *leveldb.DB - activePins map[uint]int // used to limit the number of pins per user - pinQueueCount map[uint]int // keep track of queue count per user + duplicateGuard map[uint64]bool // track whether a content id already exists in the queue + activePins map[uint]int // used to limit the number of pins per user + pinQueueCount map[uint]int // keep track of queue count per user pinQueue *goque.PrefixQueue pinQueueLk sync.Mutex RunPinFunc PinFunc @@ -143,7 +141,7 @@ func newPinManager(pinfunc PinFunc, scf PinStatusFunc, opts *PinManagerOpts, log if opts.QueueDataDir == "" { log.Fatal("Deque needs queue data dir") } - + duplicateGuard := buildDuplicateGuardFromPinQueue(opts.QueueDataDir, log) pinQueue := createDQue(opts.QueueDataDir, log) //we need to have a variable pinQueueCount which keeps track in memory count in the queue //Since the disk dequeue is durable @@ -157,7 +155,7 @@ func newPinManager(pinfunc PinFunc, scf PinStatusFunc, opts *PinManagerOpts, log pinQueueIn: make(chan *operation.PinningOperation, 64), pinQueueOut: make(chan *operation.PinningOperation), pinComplete: make(chan *operation.PinningOperation, 64), - duplicateGuard: createLevelDB(opts.QueueDataDir, log), + duplicateGuard: duplicateGuard, RunPinFunc: pinfunc, StatusChangeFunc: scf, maxActivePerUser: opts.MaxActivePerUser, @@ -178,11 +176,9 @@ func (pm *PinManager) complete(po *operation.PinningOperation) { defer pm.pinQueueLk.Unlock() opData := getPinningData(po) - err := pm.duplicateGuard.Delete(createLevelDBKey(opData, pm.log), nil) - if err != nil { - //Delete will not returns error if key doesn't exist - pm.log.Errorf("Error deleting item from duplicate guard ", err) - } + + //delete from duplicateGuard + delete(pm.duplicateGuard, createLevelDBKey(opData, pm.log)) pm.activePins[po.UserId]-- if pm.activePins[po.UserId] == 0 { @@ -297,33 +293,33 @@ func (pm *PinManager) closeQueueDataStructures() { if err != nil { pm.log.Fatal(err) } - - err = pm.duplicateGuard.Close() - if err != nil { - pm.log.Fatal(err) - } } -func createLevelDBKey(value PinningOperationData, log *zap.SugaredLogger) []byte { - var buffer bytes.Buffer - enc := gob.NewEncoder(&buffer) - if err := enc.Encode(value.ContId); err != nil { - log.Fatal("Unable to encode value") - } - return buffer.Bytes() +func createLevelDBKey(value PinningOperationData, log *zap.SugaredLogger) uint64 { + return value.ContId } -func createLevelDB(QueueDataDir string, log *zap.SugaredLogger) *leveldb.DB { - dname := filepath.Join(QueueDataDir, "duplicateGuard") - err := os.MkdirAll(dname, os.ModePerm) +func buildDuplicateGuardFromPinQueue(QueueDataDir string, log *zap.SugaredLogger) map[uint64]bool { + ret := make(map[uint64]bool) + dname := filepath.Join(QueueDataDir, "pinQueueMsgPack") + db, err := leveldb.OpenFile(dname, nil) if err != nil { - log.Fatal("Unable to create directory for LevelDB. Out of disk? Too many open files? try ulimit -n 50000") + return ret } - db, err := leveldb.OpenFile(dname, nil) + iter := db.NewIterator(nil, nil) + for iter.Next() { + entry, err := decodeMsgPack(iter.Value()) + if err != nil { + continue + } + ret[entry.ContId] = true + } + + err = db.Close() if err != nil { - log.Fatal("Unable to create LevelDB. Out of disk? Too many open files? try ulimit -n 50000") + log.Fatal(err) } - return db + return ret } // queue defines the unique queue for a prefix. @@ -379,7 +375,8 @@ func (pm *PinManager) enqueuePinOp(po *operation.PinningOperation) { pm.log.Debugf("adding pin(%d) operation to the pinner queue", po.ContId) poData := getPinningData(po) - if _, err := pm.duplicateGuard.Get(createLevelDBKey(poData, pm.log), nil); err != leveldb.ErrNotFound { + _, exists := pm.duplicateGuard[createLevelDBKey(poData, pm.log)] + if exists { //work already exists in the queue not adding duplicate return } @@ -400,10 +397,10 @@ func (pm *PinManager) enqueuePinOp(po *operation.PinningOperation) { pm.log.Fatal("Unable to add pin to queue.", err) } - err = pm.duplicateGuard.Put(createLevelDBKey(poData, pm.log), []byte{255}, nil) - if err != nil { - pm.log.Fatal("Unable to add to duplicate guard.") - } + // set cont id to true so it is not enqueued multiple times + pm.duplicateGuard[createLevelDBKey(poData, pm.log)] = true + + //increase global queue count by 1 pm.pinQueueCount[u]++ }