From 5b5dbe61f950e1f499629bfb1b9f4434736aeaa6 Mon Sep 17 00:00:00 2001 From: Mike Seiler Date: Sun, 12 Mar 2023 13:21:49 -1000 Subject: [PATCH 1/3] fix duplicate queue disk usage -- fix by essentially removing it - on shuttle load we go through the level db with the saved pins and reconstruct the duplicate queue into a map from uint64 to bool and use that while the program is running --- pinner/pinmgr.go | 67 ++++++++++++++++++++++-------------------------- 1 file changed, 30 insertions(+), 37 deletions(-) diff --git a/pinner/pinmgr.go b/pinner/pinmgr.go index 93cdae34..d00c74ba 100644 --- a/pinner/pinmgr.go +++ b/pinner/pinmgr.go @@ -1,9 +1,7 @@ package pinner import ( - "bytes" "context" - "encoding/gob" "os" "path/filepath" "strconv" @@ -66,9 +64,9 @@ type PinManager struct { pinQueueIn chan *operation.PinningOperation pinQueueOut chan *operation.PinningOperation pinComplete chan *operation.PinningOperation - duplicateGuard *leveldb.DB - activePins map[uint]int // used to limit the number of pins per user - pinQueueCount map[uint]int // keep track of queue count per user + duplicateGuard map[uint64]bool // track whether a content id already exists in the queue + activePins map[uint]int // used to limit the number of pins per user + pinQueueCount map[uint]int // keep track of queue count per user pinQueue *goque.PrefixQueue pinQueueLk sync.Mutex RunPinFunc PinFunc @@ -143,7 +141,7 @@ func newPinManager(pinfunc PinFunc, scf PinStatusFunc, opts *PinManagerOpts, log if opts.QueueDataDir == "" { log.Fatal("Deque needs queue data dir") } - + duplicateGuard := buildDupliceGuardFromPinQueue(opts.QueueDataDir, log) pinQueue := createDQue(opts.QueueDataDir, log) //we need to have a variable pinQueueCount which keeps track in memory count in the queue //Since the disk dequeue is durable @@ -157,7 +155,7 @@ func newPinManager(pinfunc PinFunc, scf PinStatusFunc, opts *PinManagerOpts, log pinQueueIn: make(chan *operation.PinningOperation, 64), pinQueueOut: make(chan *operation.PinningOperation), pinComplete: make(chan *operation.PinningOperation, 64), - duplicateGuard: createLevelDB(opts.QueueDataDir, log), + duplicateGuard: duplicateGuard, RunPinFunc: pinfunc, StatusChangeFunc: scf, maxActivePerUser: opts.MaxActivePerUser, @@ -178,11 +176,9 @@ func (pm *PinManager) complete(po *operation.PinningOperation) { defer pm.pinQueueLk.Unlock() opData := getPinningData(po) - err := pm.duplicateGuard.Delete(createLevelDBKey(opData, pm.log), nil) - if err != nil { - //Delete will not returns error if key doesn't exist - pm.log.Errorf("Error deleting item from duplicate guard ", err) - } + + //delete from duplicateGuard + delete(pm.duplicateGuard, createLevelDBKey(opData, pm.log)) pm.activePins[po.UserId]-- if pm.activePins[po.UserId] == 0 { @@ -297,33 +293,29 @@ func (pm *PinManager) closeQueueDataStructures() { if err != nil { pm.log.Fatal(err) } - - err = pm.duplicateGuard.Close() - if err != nil { - pm.log.Fatal(err) - } } -func createLevelDBKey(value PinningOperationData, log *zap.SugaredLogger) []byte { - var buffer bytes.Buffer - enc := gob.NewEncoder(&buffer) - if err := enc.Encode(value.ContId); err != nil { - log.Fatal("Unable to encode value") - } - return buffer.Bytes() +func createLevelDBKey(value PinningOperationData, log *zap.SugaredLogger) uint64 { + return value.ContId } -func createLevelDB(QueueDataDir string, log *zap.SugaredLogger) *leveldb.DB { - dname := filepath.Join(QueueDataDir, "duplicateGuard") - err := os.MkdirAll(dname, os.ModePerm) - if err != nil { - log.Fatal("Unable to create directory for LevelDB. Out of disk? Too many open files? try ulimit -n 50000") - } +func buildDupliceGuardFromPinQueue(QueueDataDir string, log *zap.SugaredLogger) map[uint64]bool { + ret := make(map[uint64]bool) + dname := filepath.Join(QueueDataDir, "pinQueueMsgPack") db, err := leveldb.OpenFile(dname, nil) if err != nil { - log.Fatal("Unable to create LevelDB. Out of disk? Too many open files? try ulimit -n 50000") + return ret } - return db + iter := db.NewIterator(nil, nil) + for iter.Next() { + entry, err := decodeMsgPack(iter.Value()) + if err != nil { + continue + } + ret[entry.ContId] = true + } + db.Close() + return ret } // queue defines the unique queue for a prefix. @@ -379,7 +371,8 @@ func (pm *PinManager) enqueuePinOp(po *operation.PinningOperation) { pm.log.Debugf("adding pin(%d) operation to the pinner queue", po.ContId) poData := getPinningData(po) - if _, err := pm.duplicateGuard.Get(createLevelDBKey(poData, pm.log), nil); err != leveldb.ErrNotFound { + _, exists := pm.duplicateGuard[createLevelDBKey(poData, pm.log)] + if exists { //work already exists in the queue not adding duplicate return } @@ -400,10 +393,10 @@ func (pm *PinManager) enqueuePinOp(po *operation.PinningOperation) { pm.log.Fatal("Unable to add pin to queue.", err) } - err = pm.duplicateGuard.Put(createLevelDBKey(poData, pm.log), []byte{255}, nil) - if err != nil { - pm.log.Fatal("Unable to add to duplicate guard.") - } + // set cont id to true so it is not enqueued multiple times + pm.duplicateGuard[createLevelDBKey(poData, pm.log)] = true + + //increase global queue count by 1 pm.pinQueueCount[u]++ } From 7d1e3147a4d77ec667641511bde2b23837f29daa Mon Sep 17 00:00:00 2001 From: Mike Seiler Date: Sun, 12 Mar 2023 15:52:30 -1000 Subject: [PATCH 2/3] fix: gosec on db.close and spelling on fn name --- pinner/pinmgr.go | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/pinner/pinmgr.go b/pinner/pinmgr.go index d00c74ba..995eee4c 100644 --- a/pinner/pinmgr.go +++ b/pinner/pinmgr.go @@ -141,7 +141,7 @@ func newPinManager(pinfunc PinFunc, scf PinStatusFunc, opts *PinManagerOpts, log if opts.QueueDataDir == "" { log.Fatal("Deque needs queue data dir") } - duplicateGuard := buildDupliceGuardFromPinQueue(opts.QueueDataDir, log) + duplicateGuard := buildDuplicateeGuardFromPinQueue(opts.QueueDataDir, log) pinQueue := createDQue(opts.QueueDataDir, log) //we need to have a variable pinQueueCount which keeps track in memory count in the queue //Since the disk dequeue is durable @@ -299,7 +299,7 @@ func createLevelDBKey(value PinningOperationData, log *zap.SugaredLogger) uint64 return value.ContId } -func buildDupliceGuardFromPinQueue(QueueDataDir string, log *zap.SugaredLogger) map[uint64]bool { +func buildDuplicateeGuardFromPinQueue(QueueDataDir string, log *zap.SugaredLogger) map[uint64]bool { ret := make(map[uint64]bool) dname := filepath.Join(QueueDataDir, "pinQueueMsgPack") db, err := leveldb.OpenFile(dname, nil) @@ -314,7 +314,11 @@ func buildDupliceGuardFromPinQueue(QueueDataDir string, log *zap.SugaredLogger) } ret[entry.ContId] = true } - db.Close() + + err = db.Close() + if err != nil { + log.Fatal(err) + } return ret } From 16b9416636eb9cd80bb813bf92c0ada9daa0be87 Mon Sep 17 00:00:00 2001 From: Mike Seiler Date: Sun, 12 Mar 2023 15:54:21 -1000 Subject: [PATCH 3/3] bad @ spell --- pinner/pinmgr.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pinner/pinmgr.go b/pinner/pinmgr.go index 995eee4c..14fb0a6e 100644 --- a/pinner/pinmgr.go +++ b/pinner/pinmgr.go @@ -141,7 +141,7 @@ func newPinManager(pinfunc PinFunc, scf PinStatusFunc, opts *PinManagerOpts, log if opts.QueueDataDir == "" { log.Fatal("Deque needs queue data dir") } - duplicateGuard := buildDuplicateeGuardFromPinQueue(opts.QueueDataDir, log) + duplicateGuard := buildDuplicateGuardFromPinQueue(opts.QueueDataDir, log) pinQueue := createDQue(opts.QueueDataDir, log) //we need to have a variable pinQueueCount which keeps track in memory count in the queue //Since the disk dequeue is durable @@ -299,7 +299,7 @@ func createLevelDBKey(value PinningOperationData, log *zap.SugaredLogger) uint64 return value.ContId } -func buildDuplicateeGuardFromPinQueue(QueueDataDir string, log *zap.SugaredLogger) map[uint64]bool { +func buildDuplicateGuardFromPinQueue(QueueDataDir string, log *zap.SugaredLogger) map[uint64]bool { ret := make(map[uint64]bool) dname := filepath.Join(QueueDataDir, "pinQueueMsgPack") db, err := leveldb.OpenFile(dname, nil)