Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
  • Loading branch information
IanSmith123 committed Sep 22, 2021
1 parent 5c7417d commit f750489
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 15 deletions.
4 changes: 3 additions & 1 deletion pkg/cmd/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,9 @@ func (r *Runner) Run(yaml *config.YAMLConfig) {
continue
} else {
go func(fr *reader.FileReader, filename string) {
if err := fr.Read(); err != nil {
numReadFailed, err := fr.Read()
statsMgr.NumReadFailed += numReadFailed
if err != nil {
r.errs = append(r.errs, err)
statsMgr.StatsCh <- base.NewFileDoneStats(filename)
}
Expand Down
10 changes: 5 additions & 5 deletions pkg/reader/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,14 +111,14 @@ func (r *FileReader) prepareDataFile() (*string, error) {
return &filepath, nil
}

func (r *FileReader) Read() error {
func (r *FileReader) Read() (numErrorLines int64, err error) {
filePath, err := r.prepareDataFile()
if err != nil {
return err
return numErrorLines, err
}
file, err := os.Open(*filePath)
if err != nil {
return errors.Wrap(errors.ConfigError, err)
return numErrorLines, errors.Wrap(errors.ConfigError, err)
}
defer func() {
if err := file.Close(); err != nil {
Expand All @@ -136,7 +136,7 @@ func (r *FileReader) Read() error {

r.DataReader.InitReader(file)

lineNum, numErrorLines := 0, 0
lineNum := 0

if !r.WithHeader {
r.startLog()
Expand Down Expand Up @@ -179,5 +179,5 @@ func (r *FileReader) Read() error {
fpath, _ := base.FormatFilePath(*r.File.Path)
logger.Infof("Total lines of file(%s) is: %d, error lines: %d", fpath, lineNum, numErrorLines)

return nil
return numErrorLines, nil
}
19 changes: 10 additions & 9 deletions pkg/stats/statsmgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,14 @@ import (
)

type StatsMgr struct {
StatsCh chan base.Stats
DoneCh chan bool
NumFailed int64
totalCount int64
totalBatches int64
totalLatency int64
totalReqTime int64
StatsCh chan base.Stats
DoneCh chan bool
NumFailed int64
NumReadFailed int64
totalCount int64
totalBatches int64
totalLatency int64
totalReqTime int64
}

func NewStatsMgr(numReadingFiles int) *StatsMgr {
Expand Down Expand Up @@ -58,8 +59,8 @@ func (s *StatsMgr) print(prefix string, now time.Time) {
avgLatency := s.totalLatency / s.totalBatches
avgReq := s.totalReqTime / s.totalBatches
rps := float64(s.totalCount) / secs
logger.Infof("%s: Time(%.2fs), Finished(%d), Failed(%d), Latency AVG(%dus), Batches Req AVG(%dus), Rows AVG(%.2f/s)",
prefix, secs, s.totalCount, s.NumFailed, avgLatency, avgReq, rps)
logger.Infof("%s: Time(%.2fs), Finished(%d), Failed(%d), Read Failed(%d), Latency AVG(%dus), Batches Req AVG(%dus), Rows AVG(%.2f/s)",
prefix, secs, s.totalCount, s.NumFailed, s.NumReadFailed, avgLatency, avgReq, rps)
}

func (s *StatsMgr) startWorker(numReadingFiles int) {
Expand Down

0 comments on commit f750489

Please sign in to comment.