Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[testbed] Fix loadgenerator race condition #32351

Merged
merged 11 commits into from
May 1, 2024
72 changes: 26 additions & 46 deletions testbed/testbed/load_generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,8 @@ type ProviderSender struct {

options LoadOptions

// Record information about previous errors to avoid flood of error messages.
prevErr error
sendType string
sendType string
generateFunc func() error
}

// NewLoadGenerator creates a ProviderSender to send DataProvider-generated telemetry via a DataSender.
Expand All @@ -90,12 +89,15 @@ func NewLoadGenerator(dataProvider DataProvider, sender DataSender) (LoadGenerat
switch t := ps.Sender.(type) {
case TraceDataSender:
ps.sendType = "traces"
ps.generateFunc = ps.generateTrace
case MetricDataSender:
ps.sendType = "metrics"
ps.generateFunc = ps.generateMetrics
case LogDataSender:
ps.sendType = "logs"
ps.generateFunc = ps.generateLog
default:
ps.sendType = fmt.Sprintf("invalid-%T", t)
return nil, fmt.Errorf("failed creating load generator, unhandled data type %T", t)
}

return ps, nil
Expand Down Expand Up @@ -210,19 +212,17 @@ func (ps *ProviderSender) generate() {
defer workers.Done()
t := time.NewTicker(time.Second / time.Duration(ps.options.DataItemsPerSecond/ps.options.ItemsPerBatch/numWorkers))
defer t.Stop()

var prevErr error
for {
select {
case <-t.C:
switch ps.Sender.(type) {
case TraceDataSender:
ps.generateTrace()
case MetricDataSender:
ps.generateMetrics()
case LogDataSender:
ps.generateLog()
default:
log.Printf("Invalid type of ProviderSender sender")
err := ps.generateFunc()
// log the error if it is different from the previous result
if err != nil && (prevErr == nil || err.Error() != prevErr.Error()) {
log.Printf("%v", err)
}
prevErr = err
case <-ps.stopSignal:
return
}
Expand All @@ -236,19 +236,18 @@ func (ps *ProviderSender) generate() {
ps.Sender.Flush()
}

func (ps *ProviderSender) generateTrace() {
func (ps *ProviderSender) generateTrace() error {
traceSender := ps.Sender.(TraceDataSender)

traceData, done := ps.Provider.GenerateTraces()
if done {
return
return nil
}

for {
err := traceSender.ConsumeTraces(context.Background(), traceData)
if err == nil {
ps.prevErr = nil
break
return nil
}

if !consumererror.IsPermanent(err) {
Expand All @@ -257,29 +256,22 @@ func (ps *ProviderSender) generateTrace() {
}

ps.permanentErrors.Add(uint64(traceData.SpanCount()))

// update prevErr to err if it's different than last observed error
if ps.prevErr == nil || ps.prevErr.Error() != err.Error() {
ps.prevErr = err
log.Printf("Cannot send traces: %v", err)
}
break
return fmt.Errorf("cannot send traces: %w", err)
}
}

func (ps *ProviderSender) generateMetrics() {
func (ps *ProviderSender) generateMetrics() error {
metricSender := ps.Sender.(MetricDataSender)

metricData, done := ps.Provider.GenerateMetrics()
if done {
return
return nil
}

for {
err := metricSender.ConsumeMetrics(context.Background(), metricData)
if err == nil {
ps.prevErr = nil
break
return nil
}

if !consumererror.IsPermanent(err) {
Expand All @@ -288,28 +280,22 @@ func (ps *ProviderSender) generateMetrics() {
}

ps.permanentErrors.Add(uint64(metricData.DataPointCount()))

// update prevErr to err if it's different than last observed error
if ps.prevErr == nil || ps.prevErr.Error() != err.Error() {
ps.prevErr = err
log.Printf("Cannot send metrics: %v", err)
}
break
return fmt.Errorf("cannot send metrics: %w", err)
}
}

func (ps *ProviderSender) generateLog() {
func (ps *ProviderSender) generateLog() error {
logSender := ps.Sender.(LogDataSender)

logData, done := ps.Provider.GenerateLogs()
if done {
return
return nil
}

for {
err := logSender.ConsumeLogs(context.Background(), logData)
if err == nil {
ps.prevErr = nil
break
return nil
}

if !consumererror.IsPermanent(err) {
Expand All @@ -318,12 +304,6 @@ func (ps *ProviderSender) generateLog() {
}

ps.permanentErrors.Add(uint64(logData.LogRecordCount()))

// update prevErr to err if it's different than last observed error
if ps.prevErr == nil || ps.prevErr.Error() != err.Error() {
ps.prevErr = err
log.Printf("Cannot send logs: %v", err)
}
break
return fmt.Errorf("cannot send logs: %w", err)
}
}