Skip to content

Commit

Permalink
Resolve comments
Browse files Browse the repository at this point in the history
  • Loading branch information
JasonXZLiu committed Dec 10, 2020
1 parent 71cfcf8 commit 273fa79
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 4 deletions.
6 changes: 4 additions & 2 deletions exporter/prometheusremotewriteexporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"errors"
"fmt"
"io"
"math"
"net/http"
"net/url"
"sync"
Expand Down Expand Up @@ -275,11 +276,12 @@ func (prwe *PrwExporter) export(ctx context.Context, tsMap map[string]*prompb.Ti
var mu sync.Mutex
var wg sync.WaitGroup

wg.Add(maxConcurrentRequests) // used to wait for workers to be finished
concurrencyLimit := int(math.Min(maxConcurrentRequests, float64(len(requests))))
wg.Add(concurrencyLimit) // used to wait for workers to be finished

// Run concurrencyLimit of workers until there
// is no more requests to execute in the input channel.
for i := 0; i < maxConcurrentRequests; i++ {
for i := 0; i < concurrencyLimit; i++ {
go func() {
defer wg.Done()

Expand Down
6 changes: 4 additions & 2 deletions exporter/prometheusremotewriteexporter/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,8 +240,10 @@ func batchTimeSeries(tsMap map[string]*prompb.TimeSeries, maxBatchByteSize int)
sizeOfCurrentBatch += sizeOfSeries
}

wrapped := convertTimeseriesToRequest(tsArray)
requests = append(requests, wrapped)
if len(tsArray) != 0 {
wrapped := convertTimeseriesToRequest(tsArray)
requests = append(requests, wrapped)
}

return requests, nil
}
Expand Down

0 comments on commit 273fa79

Please sign in to comment.