Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/development' into development
Browse files Browse the repository at this point in the history
  • Loading branch information
aviaIguazio committed Jul 18, 2019
2 parents 2b539aa + ed98df3 commit b51fbb3
Show file tree
Hide file tree
Showing 5 changed files with 157 additions and 9 deletions.
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
language: go

go:
- 1.9.x
- 1.12.x
- tip

go_import_path: github.com/v3io/http_blaster
Expand Down
15 changes: 7 additions & 8 deletions examples/csv_2_kv_example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,14 @@ title = "Workload example"
[workloads]
[workloads.load_to_t1]
name="load_to_t1"
Generator="csv2kv"
container="1"
Target="t2/"
Generator="csv2kv_perf"
container="bigdata"
Target="mytable/"
workers=10
duration="60s"
payload="examples/payloads/order-book-sample.csv"
schema="examples/schemas/schema_example.json"





[workloads.load_to_t1.header]
X-v3io-session-key="a90a2aa8-663a-497d-98d0-47f260080615"
X-v3io-function="PutItem"

3 changes: 3 additions & 0 deletions httpblaster/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,9 @@ func (self *Executor) load_request_generator() (chan *request_generators.Request
case request_generators.CSV2KV:
req_gen = &request_generators.Csv2KV{}
break
case request_generators.CSV2KVPERF:
req_gen = &request_generators.Csv2KVPerf{}
break
case request_generators.CSVUPDATEKV:
req_gen = &request_generators.CsvUpdateKV{}
break
Expand Down
1 change: 1 addition & 0 deletions httpblaster/request_generators/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ const (
CSV2TSDB = "csv2tsdb"
STATS2TSDB = "stats2tsdb"
FAKER2KV = "faker2kv"
CSV2KVPERF = "csv2kv_perf"
)

type RequestCommon struct {
Expand Down
145 changes: 145 additions & 0 deletions httpblaster/request_generators/csv2kv_perf.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
package request_generators

import (
"encoding/csv"
log "github.com/sirupsen/logrus"
"github.com/v3io/http_blaster/httpblaster/config"
"github.com/v3io/http_blaster/httpblaster/igz_data"
"io"
"os"
"runtime"
"strings"
"sync"
"time"
)

type Csv2KVPerf struct {
workload config.Workload
RequestCommon
}

func (self *Csv2KVPerf) UseCommon(c RequestCommon) {

}

func (self *Csv2KVPerf) generate_request(ch_records chan []string, ch_req chan *Request, host string,
wg *sync.WaitGroup, done chan struct{}) {
defer wg.Done()
parser := igz_data.EmdSchemaParser{}
var contentType string = "text/html"
e := parser.LoadSchema(self.workload.Schema, "", "")
if e != nil {
panic(e)
}
for {
select {
case <-done:
return
case r := <-ch_records:
json_payload := parser.EmdFromCSVRecord(r)
req := AcquireRequest()
self.PrepareRequest(contentType, self.workload.Header, "PUT",
self.base_uri, json_payload, host, req.Request)
ch_req <- req
}
}
}

func (self *Csv2KVPerf) generate(ch_req chan *Request, payload string, host string, done chan struct{}) {
defer close(ch_req)

ch_records:= self.generate_records(payload, host, done)

wg := sync.WaitGroup{}
wg.Add(runtime.NumCPU())
for c := 0; c < runtime.NumCPU(); c++ {
go self.generate_request(ch_records, ch_req, host, &wg, done)
}

wg.Wait()
}

func (self *Csv2KVPerf)generate_records(payload string, host string, done chan struct{}) (chan []string) {
var ch_records chan []string = make(chan []string, 1000)

go func(){
parser := igz_data.EmdSchemaParser{}
e := parser.LoadSchema(self.workload.Schema, "", "")
if e != nil {
panic(e)
}
for {
ch_files := self.FilesScan(self.workload.Payload)
for f := range ch_files {
log.Info("Scaning file ",f," for records")
fp, err := os.Open(f)
if err != nil {
panic(err)
}

r := csv.NewReader(fp)
r.Comma = parser.JsonSchema.Settings.Separator.Rune
var line_count= 0
for {
record, err := r.Read()
if err != nil {
if err == io.EOF {
break
}
panic(err)
}

if strings.HasPrefix(record[0], "#") {
log.Println("Skipping scv header ", strings.Join(record[:], ","))
} else {
ch_records <- record
line_count++
if line_count%1024 == 0 {
log.Printf("line: %d from file %s was submitted", line_count, f)
}
}
select {
case <-done:
log.Info("stopping")
close(ch_records)
return
default:
}

}
fp.Close()
}

//close(ch_records)
}
}()
return ch_records
}



func (self *Csv2KVPerf) GenerateRequests(global config.Global, wl config.Workload, tls_mode bool, host string, ret_ch chan *Response, worker_qd int) chan *Request {
self.workload = wl
if self.workload.Header == nil {
self.workload.Header = make(map[string]string)
}
self.workload.Header["X-v3io-function"] = "PutItem"

self.SetBaseUri(tls_mode, host, self.workload.Container, self.workload.Target)

ch_req := make(chan *Request, worker_qd)

done := make(chan struct{})
go func() {
select {
case <-time.After(self.workload.Duration.Duration):
log.Info("Time has come")
close(done)
}
}()


go self.generate(ch_req, self.workload.Payload, host, done)

return ch_req
}

0 comments on commit b51fbb3

Please sign in to comment.