-
Notifications
You must be signed in to change notification settings - Fork 61
/
Copy pathbender.go
182 lines (156 loc) · 6.33 KB
/
bender.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
/*
Copyright 2014-2016 Pinterest, Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package bender
import (
"math"
"sync"
"time"
)
// An IntervalGenerator is a function that takes the current Unix epoch time
// (in nanoseconds) and returns a non-negative time (also in nanoseconds)
// until the next request should be sent. Bender provides functions to create
// interval generators for uniform and exponential distributions, each of
// which takes the target throughput (requests per second) and returns an
// IntervalGenerator. Neither of the included generators makes use of the
// function argument, but it is there for cases in which the simulated
// intervals are time dependent (you want to simulate the daily traffice
// variation of a web site, for example).
type IntervalGenerator func(int64) int64
// RequestExecutor is a function that takes the current Unix Epoch time (in
// nanoseconds) and a *Request, sends the request to the service, waits for
// the response, optionally validates it and returns an error or nil. This
// function is timed by the load tester, so it should do as little else as
// possible, and everything it does will be added to the reported service
// latency.
type RequestExecutor func(int64, interface{}) (interface{}, error)
// StartEvent is sent once at the start of the load test.
type StartEvent struct {
// The Unix epoch time in nanoseconds at which the load test started.
Start int64
}
// EndEvent is sent once at the end of the load test, after which no more events are sent.
type EndEvent struct {
// The Unix epoch times in nanoseconds at which the load test started and ended.
Start, End int64
}
// WaitEvent is sent once for each request before sleeping for the given interval.
type WaitEvent struct {
// The next wait time (in nanoseconds) and the accumulated overage time (the difference between
// the actual wait time and the intended wait time).
Wait, Overage int64
}
// StartRequestEvent is sent before a request is executed. The sending of this event happens before
// the timing of the request starts, to avoid potential issues, so it contains the timestamp of the
// event send, and not the timestamp of the request start.
type StartRequestEvent struct {
// The Unix epoch time (in nanoseconds) at which this event was created, which will be earlier
// than the sending of the associated request (for performance reasons)
Time int64
// The request that will be sent, nothing good can come from modifying it
Request interface{}
}
// EndRequestEvent is sent after a request has completed.
type EndRequestEvent struct {
// The Unix epoch times (in nanoseconds) at which the request was started and finished
Start, End int64
// The response data returned by the request executor
Response interface{}
// An error or nil if there was no error
Err error
}
// LoadTestThroughput starts a load test in which the caller controls the interval between requests
// being sent. See the package documentation for details on the arguments to this function.
func LoadTestThroughput(intervals IntervalGenerator, requests chan interface{}, requestExec RequestExecutor, recorder chan interface{}) {
go func() {
start := time.Now().UnixNano()
recorder <- &StartEvent{start}
var wg sync.WaitGroup
var overage int64
overageStart := time.Now().UnixNano()
for request := range requests {
wait := intervals(overageStart)
adjust := int64(math.Min(float64(wait), float64(overage)))
wait -= adjust
overage -= adjust
recorder <- &WaitEvent{wait, overage}
time.Sleep(time.Duration(wait))
wg.Add(1)
go func(req interface{}) {
defer wg.Done()
recorder <- &StartRequestEvent{time.Now().UnixNano(), req}
reqStart := time.Now().UnixNano()
res, err := requestExec(time.Now().UnixNano(), req)
recorder <- &EndRequestEvent{reqStart, time.Now().UnixNano(), res, err}
}(request)
overage += time.Now().UnixNano() - overageStart - wait
overageStart = time.Now().UnixNano()
}
wg.Wait()
recorder <- &EndEvent{start, time.Now().UnixNano()}
close(recorder)
}()
}
type empty struct{}
// WorkerSemaphore controls the number of "workers" that can be running as part of a load test
// using LoadTestConcurrency.
type WorkerSemaphore struct {
permits chan empty
}
// NewWorkerSemaphore creates an empty WorkerSemaphore (no workers).
func NewWorkerSemaphore() *WorkerSemaphore {
// TODO(charles): Signal and Wait block due to permits being unbuffered, should we add a buffer?
return &WorkerSemaphore{permits: make(chan empty)}
}
// Signal adds a worker to the pool of workers that are currently sending requests. If no requests
// are outstanding, this will block until a request is ready to send.
func (s WorkerSemaphore) Signal(n int) {
e := empty{}
for i := 0; i < n; i++ {
s.permits <- e
}
}
// Wait removes a worker from the pool. If all workers are busy, then this will wait until the next
// worker is finished, and remove it.
func (s WorkerSemaphore) Wait(n int) bool {
for i := 0; i < n; i++ {
<-s.permits
}
return true
}
// LoadTestConcurrency starts a load test in which the caller controls the number of goroutines that
// are sending requests. See the package documentation for details on the arguments to this
// function.
func LoadTestConcurrency(workers *WorkerSemaphore, requests chan interface{}, requestExec RequestExecutor, recorder chan interface{}) {
go func() {
start := time.Now().UnixNano()
recorder <- &StartEvent{start}
var wg sync.WaitGroup
for request := range requests {
workers.Wait(1)
wg.Add(1)
go func(req interface{}) {
defer func() {
wg.Done()
workers.Signal(1)
}()
reqStart := time.Now().UnixNano()
recorder <- &StartRequestEvent{start, req}
res, err := requestExec(time.Now().UnixNano(), req)
recorder <- &EndRequestEvent{reqStart, time.Now().UnixNano(), res, err}
}(request)
}
wg.Wait()
recorder <- &EndEvent{start, time.Now().UnixNano()}
close(recorder)
}()
}