-
-
Notifications
You must be signed in to change notification settings - Fork 0
/
wrr.go
105 lines (85 loc) · 1.79 KB
/
wrr.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
package wrr
import "sync"
type Wrr[T any] struct {
queues map[string]*queue[T]
forceQueues *queue[T]
totalWeight int
lock *sync.Mutex
}
type Builder[T any] struct {
wrr *Wrr[T]
}
func New[T any]() *Builder[T] {
return &Builder[T]{
wrr: &Wrr[T]{
queues: make(map[string]*queue[T]),
forceQueues: nil,
totalWeight: 0,
lock: &sync.Mutex{},
},
}
}
func (builder *Builder[T]) AddPriority(priority *Priority) *Builder[T] {
builder.wrr.queues[priority.Name] = &queue[T]{
weight: priority.Weight,
currWeight: 0,
lock: &sync.Mutex{},
objs: make([]*T, 0),
}
builder.wrr.totalWeight += priority.Weight
return builder
}
func (builder *Builder[T]) AddPriorities(priorities ...*Priority) *Builder[T] {
for _, priority := range priorities {
builder.AddPriority(priority)
}
return builder
}
func (builder *Builder[T]) Build() (*Wrr[T], error) {
if len(builder.wrr.queues) == 0 {
return nil, ErrUncompleted
}
return builder.wrr, nil
}
func (wrr *Wrr[T]) Push(p *Priority, obj *T) error {
if p == nil || obj == nil {
return ErrIllegalParams
}
var q *queue[T]
if p.Force {
q = wrr.forceQueues
} else {
q = wrr.queues[p.Name]
}
if q == nil {
return ErrUncompleted
}
q.lock.Lock()
defer q.lock.Unlock()
q.objs = append(q.objs, obj)
return nil
}
func (wrr *Wrr[T]) Pop() (*T, error) {
var q *queue[T]
if wrr.forceQueues != nil && len(wrr.forceQueues.objs) > 0 {
q = wrr.forceQueues
return q.pop(nil)
}
wrr.lock.Lock()
defer wrr.lock.Unlock()
for _, qs := range wrr.queues {
if len(qs.objs) == 0 {
continue
}
qs.currWeight += qs.weight
if q == nil || q.currWeight < qs.currWeight {
q = qs
}
}
if q == nil {
return nil, ErrEmptyData
}
return q.pop(func() {
q.currWeight -= wrr.totalWeight
})
}