Skip to content

Commit

Permalink
Fix, refactor, document and test the slot limiters (#771)
Browse files Browse the repository at this point in the history
Fix, refactor, document and test the slot limiters and update the release notes

This fixes #770
  • Loading branch information
na-- authored Sep 18, 2018
1 parent 460ceba commit 9e7047c
Show file tree
Hide file tree
Showing 3 changed files with 123 additions and 18 deletions.
50 changes: 34 additions & 16 deletions js/modules/k6/http/limiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,51 +20,69 @@

package http

type SlotLimiter struct {
ch chan struct{}
}
import (
"sync"
)

// SlotLimiter can restrict the concurrent execution of tasks to the given `slots` limit
type SlotLimiter chan struct{}

// NewSlotLimiter initializes and returns a new SlotLimiter with the given slot count
func NewSlotLimiter(slots int) SlotLimiter {
if slots <= 0 {
return SlotLimiter{nil}
return nil
}

ch := make(chan struct{}, slots)
for i := 0; i < slots; i++ {
ch <- struct{}{}
}
return SlotLimiter{ch}
return ch
}

func (l *SlotLimiter) Begin() {
if l.ch != nil {
<-l.ch
// Begin uses up a slot to denote the start of a task exeuction. It's a noop if the number
// of slots is 0, and if no slots are available, it blocks and waits.
func (sl SlotLimiter) Begin() {
if sl != nil {
<-sl
}
}

func (l *SlotLimiter) End() {
if l.ch != nil {
l.ch <- struct{}{}
// End restores a slot and should be called at the end of a taks execution, preferably
// from a defer statement right after Begin()
func (sl SlotLimiter) End() {
if sl != nil {
sl <- struct{}{}
}
}

// MultiSlotLimiter can restrict the concurrent execution of different groups of tasks
// to the given `slots` limit. Each group is represented with a string ID.
type MultiSlotLimiter struct {
m map[string]*SlotLimiter
m map[string]SlotLimiter
slots int
mutex sync.Mutex
}

func NewMultiSlotLimiter(slots int) MultiSlotLimiter {
return MultiSlotLimiter{make(map[string]*SlotLimiter), slots}
// NewMultiSlotLimiter initializes and returns a new MultiSlotLimiter with the given slot count
func NewMultiSlotLimiter(slots int) *MultiSlotLimiter {
return &MultiSlotLimiter{make(map[string]SlotLimiter), slots, sync.Mutex{}}
}

func (l *MultiSlotLimiter) Slot(s string) *SlotLimiter {
// Slot is used to retrieve the corresponding slot to the given string ID. If no slot with that ID exists,
// it creates it and saves it for future use. It is safe to call this method concurrently.
func (l *MultiSlotLimiter) Slot(s string) SlotLimiter {
if l.slots == 0 {
return nil
}

l.mutex.Lock()
defer l.mutex.Unlock()

ll, ok := l.m[s]
if !ok {
tmp := NewSlotLimiter(l.slots)
ll = &tmp
ll = tmp
l.m[s] = ll
}
return ll
Expand Down
88 changes: 87 additions & 1 deletion js/modules/k6/http/limiter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,16 @@
package http

import (
"fmt"
"sync"
"sync/atomic"
"testing"

"github.com/stretchr/testify/assert"
)

func TestSlotLimiter(t *testing.T) {
func TestSlotLimiterSingleSlot(t *testing.T) {
t.Parallel()
l := NewSlotLimiter(1)
l.Begin()
done := false
Expand All @@ -38,7 +42,67 @@ func TestSlotLimiter(t *testing.T) {
l.End()
}

func TestSlotLimiterUnlimited(t *testing.T) {
t.Parallel()
l := NewSlotLimiter(0)
l.Begin()
l.Begin()
l.Begin()
}
func TestSlotLimiters(t *testing.T) {
t.Parallel()
testCases := []struct{ limit, launches, expMid int }{
{0, 0, 0},
{0, 1, 1},
{0, 5, 5},
{1, 5, 1},
{2, 5, 2},
{5, 6, 5},
{6, 5, 5},
{10, 7, 7},
}

for _, tc := range testCases {
tc := tc
t.Run(fmt.Sprintf("limit=%d,launches=%d", tc.limit, tc.launches), func(t *testing.T) {
t.Parallel()
l := NewSlotLimiter(tc.limit)
wg := sync.WaitGroup{}

if tc.limit == 0 {
wg.Add(tc.launches)
} else if tc.launches < tc.limit {
wg.Add(tc.launches)
} else {
wg.Add(tc.limit)
}

var counter uint32

for i := 0; i < tc.launches; i++ {
go func() {
l.Begin()
atomic.AddUint32(&counter, 1)
wg.Done()
}()
}
wg.Wait()
assert.Equal(t, uint32(tc.expMid), atomic.LoadUint32(&counter))

if tc.limit != 0 && tc.limit < tc.launches {
wg.Add(tc.launches - tc.limit)
for i := 0; i < tc.launches; i++ {
l.End()
}
wg.Wait()
assert.Equal(t, uint32(tc.launches), atomic.LoadUint32(&counter))
}
})
}
}

func TestMultiSlotLimiter(t *testing.T) {
t.Parallel()
t.Run("0", func(t *testing.T) {
l := NewMultiSlotLimiter(0)
assert.Nil(t, l.Slot("test"))
Expand All @@ -48,4 +112,26 @@ func TestMultiSlotLimiter(t *testing.T) {
assert.Equal(t, l.Slot("test"), l.Slot("test"))
assert.NotNil(t, l.Slot("test"))
})
t.Run("2", func(t *testing.T) {
l := NewMultiSlotLimiter(1)
wg := sync.WaitGroup{}
wg.Add(2)

var s1, s2 SlotLimiter
go func() {
s1 = l.Slot("ctest")
wg.Done()
}()
go func() {
s2 = l.Slot("ctest")
wg.Done()
}()
wg.Wait()

assert.NotNil(t, s1)
assert.Equal(t, s1, s2)
assert.Equal(t, s1, l.Slot("ctest"))
assert.NotEqual(t, s1, l.Slot("dtest"))
assert.NotNil(t, l.Slot("dtest"))
})
}
3 changes: 2 additions & 1 deletion release notes/upcoming.md
Original file line number Diff line number Diff line change
Expand Up @@ -77,4 +77,5 @@ Thanks to @sherrman for reporting the binary handling issues that prompted the a
* Config: Environment variables can now be used to modify k6's behavior in the `k6 login` subcommands. (#734)
* HTTP: Binary response bodies were mangled because there was no way to avoid converting them to UTF-16 JavaScript strings. (#749)
* Config: Stages were appended instead of overwritten from upper config "tiers", and were doubled when supplied via the CLI flag (#759)
* HAR converter: Fixed a panic due to a missing array length check (#760)
* HAR converter: Fixed a panic due to a missing array length check (#760)
* HTTP: `http.batch()` calls could panic because of a data race when the `batchPerHost` global option was used (#770)

0 comments on commit 9e7047c

Please sign in to comment.