Skip to content

Commit

Permalink
Added time --input-raw-max-wait
Browse files Browse the repository at this point in the history
In some low traffic cases you can have cases when time between request minutes.
Additionally increasing speed can be not an option.
Now you can "skip" this pauses, by seetting max wait time
  • Loading branch information
buger committed Jul 8, 2021
1 parent 8e76559 commit 19ad90a
Show file tree
Hide file tree
Showing 6 changed files with 23 additions and 15 deletions.
8 changes: 7 additions & 1 deletion input_file.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,12 +196,13 @@ type FileInput struct {
loop bool
readDepth int
dryRun bool
maxWait time.Duration

stats *expvar.Map
}

// NewFileInput constructor for FileInput. Accepts file path as argument.
func NewFileInput(path string, loop bool, readDepth int, dryRun bool) (i *FileInput) {
func NewFileInput(path string, loop bool, readDepth int, maxWait time.Duration, dryRun bool) (i *FileInput) {
i = new(FileInput)
i.data = make(chan []byte, 1000)
i.exit = make(chan bool)
Expand All @@ -211,6 +212,7 @@ func NewFileInput(path string, loop bool, readDepth int, dryRun bool) (i *FileIn
i.readDepth = readDepth
i.stats = expvar.NewMap("file-" + path)
i.dryRun = dryRun
i.maxWait = maxWait

if err := i.init(); err != nil {
return
Expand Down Expand Up @@ -351,6 +353,10 @@ func (i *FileInput) emit() {
diff = int64(float64(diff) / i.speedFactor)
}

if i.maxWait > 0 && diff > int64(i.maxWait) {
diff = int64(i.maxWait)
}

if diff >= 0 {
lastTime = payload.timestamp

Expand Down
12 changes: 6 additions & 6 deletions input_file_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ func TestInputFileMultipleFilesWithRequestsOnly(t *testing.T) {
file2.Write([]byte(payloadSeparator))
file2.Close()

input := NewFileInput(fmt.Sprintf("/tmp/%d*", rnd), false, 100, false)
input := NewFileInput(fmt.Sprintf("/tmp/%d*", rnd), false, 100, 0, false)

for i := '1'; i <= '4'; i++ {
msg, _ := input.PluginRead()
Expand All @@ -130,7 +130,7 @@ func TestInputFileRequestsWithLatency(t *testing.T) {
file.Write([]byte("1 3 250000000\nrequest3"))
file.Write([]byte(payloadSeparator))

input := NewFileInput(fmt.Sprintf("/tmp/%d", rnd), false, 100, false)
input := NewFileInput(fmt.Sprintf("/tmp/%d", rnd), false, 100, 0, false)

start := time.Now().UnixNano()
for i := 0; i < 3; i++ {
Expand Down Expand Up @@ -170,7 +170,7 @@ func TestInputFileMultipleFilesWithRequestsAndResponses(t *testing.T) {
file2.Write([]byte(payloadSeparator))
file2.Close()

input := NewFileInput(fmt.Sprintf("/tmp/%d*", rnd), false, 100, false)
input := NewFileInput(fmt.Sprintf("/tmp/%d*", rnd), false, 100, 0, false)

for i := '1'; i <= '4'; i++ {
msg, _ := input.PluginRead()
Expand Down Expand Up @@ -198,7 +198,7 @@ func TestInputFileLoop(t *testing.T) {
file.Write([]byte(payloadSeparator))
file.Close()

input := NewFileInput(fmt.Sprintf("/tmp/%d", rnd), true, 100, false)
input := NewFileInput(fmt.Sprintf("/tmp/%d", rnd), true, 100, 0, false)

// Even if we have just 2 requests in file, it should indifinitly loop
for i := 0; i < 1000; i++ {
Expand Down Expand Up @@ -226,7 +226,7 @@ func TestInputFileCompressed(t *testing.T) {
name2 := output2.file.Name()
output2.Close()

input := NewFileInput(fmt.Sprintf("/tmp/%d*", rnd), false, 100, false)
input := NewFileInput(fmt.Sprintf("/tmp/%d*", rnd), false, 100, 0, false)
for i := 0; i < 2000; i++ {
input.PluginRead()
}
Expand Down Expand Up @@ -326,7 +326,7 @@ func CreateCaptureFile(requestGenerator *RequestGenerator) *CaptureFile {
func ReadFromCaptureFile(captureFile *os.File, count int, callback writeCallback) (err error) {
wg := new(sync.WaitGroup)

input := NewFileInput(captureFile.Name(), false, 100, false)
input := NewFileInput(captureFile.Name(), false, 100, 0, false)
output := NewTestOutput(func(msg *Message) {
callback(msg)
wg.Done()
Expand Down
2 changes: 1 addition & 1 deletion output_file_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func TestFileOutput(t *testing.T) {
emitter.Close()

var counter int64
input2 := NewFileInput("/tmp/test_requests.gor", false, 100, false)
input2 := NewFileInput("/tmp/test_requests.gor", false, 100, 0, false)
output2 := NewTestOutput(func(*Message) {
atomic.AddInt64(&counter, 1)
wg.Done()
Expand Down
2 changes: 1 addition & 1 deletion plugins.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ func NewPlugins() *InOutPlugins {
}

for _, options := range Settings.InputFile {
plugins.registerPlugin(NewFileInput, options, Settings.InputFileLoop, Settings.InputFileReadDepth, Settings.InputFileDryRun)
plugins.registerPlugin(NewFileInput, options, Settings.InputFileLoop, Settings.InputFileReadDepth, Settings.InputFileMaxWait, Settings.InputFileDryRun)
}

for _, path := range Settings.OutputFile {
Expand Down
2 changes: 1 addition & 1 deletion s3_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ func TestInputFileFromS3(t *testing.T) {
<-output.closeCh
}

input := NewFileInput(fmt.Sprintf("s3://test-gor-eu/%d", rnd), false, 100, false)
input := NewFileInput(fmt.Sprintf("s3://test-gor-eu/%d", rnd), false, 100, 0, false)

buf := make([]byte, 1000)
for i := 0; i <= 19999; i++ {
Expand Down
12 changes: 7 additions & 5 deletions settings.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,12 @@ type AppSettings struct {
OutputTCPConfig TCPOutputConfig
OutputTCPStats bool `json:"output-tcp-stats"`

InputFile MultiOption `json:"input-file"`
InputFileLoop bool `json:"input-file-loop"`
InputFileReadDepth int `json:"input-file-read-depth"`
InputFileDryRun bool `json:"input-file-dry-run"`
OutputFile MultiOption `json:"output-file"`
InputFile MultiOption `json:"input-file"`
InputFileLoop bool `json:"input-file-loop"`
InputFileReadDepth int `json:"input-file-read-depth"`
InputFileDryRun bool `json:"input-file-dry-run"`
InputFileMaxWait time.Duration `json:"input-file-max-wait"`
OutputFile MultiOption `json:"output-file"`
OutputFileConfig FileOutputConfig

InputRAW MultiOption `json:"input_raw"`
Expand Down Expand Up @@ -117,6 +118,7 @@ func init() {
flag.BoolVar(&Settings.InputFileLoop, "input-file-loop", false, "Loop input files, useful for performance testing.")
flag.IntVar(&Settings.InputFileReadDepth, "input-file-read-depth", 100, "GoReplay tries to read and cache multiple records, in advance. In parallel it also perform sorting of requests, if they came out of order. Since it needs hold this buffer in memory, bigger values can cause worse performance")
flag.BoolVar(&Settings.InputFileDryRun, "input-file-dry-run", false, "Simulate reading from the data source without replaying it. You will get information about expected replay time, number of found records etc.")
flag.DurationVar(&Settings.InputFileMaxWait, "input-file-max-wait", 0, "Set the maximum time between requests. Can help in situations when you have too long periods between request, and you want to skip them. Example: --input-raw-max-wait 1s")

flag.Var(&Settings.OutputFile, "output-file", "Write incoming requests to file: \n\tgor --input-raw :80 --output-file ./requests.gor")
flag.DurationVar(&Settings.OutputFileConfig.FlushInterval, "output-file-flush-interval", time.Second, "Interval for forcing buffer flush to the file, default: 1s.")
Expand Down

0 comments on commit 19ad90a

Please sign in to comment.