Skip to content

Commit

Permalink
resolve conflcit
Browse files Browse the repository at this point in the history
  • Loading branch information
snissn committed Oct 18, 2022
2 parents 2bdcd76 + eb587c8 commit 6476a92
Showing 1 changed file with 44 additions and 23 deletions.
67 changes: 44 additions & 23 deletions pinner/pinmgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package pinner
import (
"context"
"os"
"strconv"
"sync"
"time"

Expand Down Expand Up @@ -36,7 +37,8 @@ func NewPinManager(pinfunc PinFunc, scf PinStatusFunc, opts *PinManagerOpts) *Pi

return &PinManager{
pinQueue: createDQue(opts.QueueDataDir),
pinQueuePriority: createDQue(opts.QueueDataDir),
activePins: make(map[uint]int),
pinQueueCount: make(map[uint]int),
pinQueueIn: make(chan *PinningOperation, 64),
pinQueueOut: make(chan *PinningOperation),
pinComplete: make(chan *PinningOperation, 64),
Expand All @@ -62,9 +64,10 @@ type PinManager struct {
pinQueueIn chan *PinningOperation
pinQueueOut chan *PinningOperation
pinComplete chan *PinningOperation
pinQueue *goque.Queue
pinQueuePriority *goque.Queue
duplicateGuard map[PinningOperationData]bool
activePins map[uint]int // used to limit the number of pins per user
pinQueueCount map[uint]int // keep track of queue count per user
pinQueue *goque.PrefixQueue
pinQueueLk sync.Mutex
RunPinFunc PinFunc
StatusChangeFunc PinStatusFunc
Expand Down Expand Up @@ -171,7 +174,7 @@ func (po *PinningOperation) SetStatus(st types.PinningStatus) {
func (pm *PinManager) PinQueueSize() int {
pm.pinQueueLk.Lock()
defer pm.pinQueueLk.Unlock()
return int(pm.pinQueuePriority.Length() + pm.pinQueue.Length())
return int(pm.pinQueue.Length())
}

func (pm *PinManager) Add(op *PinningOperation) {
Expand Down Expand Up @@ -206,17 +209,37 @@ func (pm *PinManager) doPinning(op *PinningOperation) error {

func (pm *PinManager) popNextPinOp() *PinningOperation {

var pq *goque.Queue
if pm.pinQueuePriority.Length() > 0 {
pq = pm.pinQueuePriority
if pm.pinQueue.Length() == 0 {
return nil // no content in queue
}

var minCount int = 10000
var user uint

//if user id = 0 has any pins to work on, use that
if pm.pinQueueCount[0] > 0 {
user = 0
} else {
if pm.pinQueue.Length() == 0 {
return nil // no content in queue or priority queue
//if not find user with least number of active workers and use that
for u := range pm.pinQueueCount {
active := pm.activePins[u]
if active < minCount {
minCount = active
user = u
}
}
pq = pm.pinQueue
}
if minCount >= pm.maxActivePerUser && user != 0 {
//return nil if the min count is greater than the limit and user is not 0
return nil
}

item, err := pm.pinQueue.Dequeue(getUserForQueue(user))
pm.pinQueueCount[user]--
if pm.pinQueueCount[user] == 0 {
delete(pm.pinQueueCount, user)
}

item, err := pq.Dequeue()
// Dequeue the next item in the queue
if err != nil {
log.Fatal("Error dequeuing item ", err)
Expand All @@ -232,19 +255,21 @@ func (pm *PinManager) popNextPinOp() *PinningOperation {

}

func createDQue(QueueDataDir string) *goque.Queue {

//TODO figure out if we want to make this persistent or continue to use mkdirtemp and if so clean up the file
func createDQue(QueueDataDir string) *goque.PrefixQueue {

//TODO figure out if we want to make this persistent or continue to use mkdirtemp and if so how often should we clean up the file
dname, err := os.MkdirTemp(QueueDataDir, "pinqueue")
//fmt.Println("make", dname)
q, err := goque.OpenQueue(dname)
q, err := goque.OpenPrefixQueue(dname)
if err != nil {
log.Fatal("Unable to create Queue. Out of disk? Too many open files? try ulimit -n 50000")
}
return q
}

func getUserForQueue(UserId uint) []byte {
return []byte(strconv.Itoa(int(UserId)))
}

func (pm *PinManager) enqueuePinOp(po *PinningOperation) {

opdata := getPinningData(po)
Expand All @@ -258,13 +283,9 @@ func (pm *PinManager) enqueuePinOp(po *PinningOperation) {
if po.SkipLimiter {
u = 0
}
var dq *goque.Queue
if u == 0 {
dq = pm.pinQueuePriority
} else {
dq = pm.pinQueue
}
_, err := dq.EnqueueObject(po)

_, err := pm.pinQueue.EnqueueObject(getUserForQueue(u), po)
pm.pinQueueCount[u]++
if err != nil {
log.Fatal("Unable to add pin to queue.")
}
Expand Down

0 comments on commit 6476a92

Please sign in to comment.