Skip to content
This repository has been archived by the owner on Jul 31, 2023. It is now read-only.

Commit

Permalink
fix race condition in reading record and updating record. (#1104)
Browse files Browse the repository at this point in the history
  • Loading branch information
rghetia committed Apr 25, 2019
1 parent bf1b28d commit ef4afeb
Show file tree
Hide file tree
Showing 3 changed files with 93 additions and 0 deletions.
2 changes: 2 additions & 0 deletions stats/view/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,8 @@ func (w *worker) reportView(v *viewInternal, now time.Time) {
}

func (w *worker) reportUsage(now time.Time) {
w.mu.Lock()
defer w.mu.Unlock()
for _, v := range w.views {
w.reportView(v, now)
}
Expand Down
4 changes: 4 additions & 0 deletions stats/view/worker_commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,8 @@ type retrieveDataResp struct {
}

func (cmd *retrieveDataReq) handleCommand(w *worker) {
w.mu.Lock()
defer w.mu.Unlock()
vi, ok := w.views[cmd.v]
if !ok {
cmd.c <- &retrieveDataResp{
Expand Down Expand Up @@ -153,6 +155,8 @@ type recordReq struct {
}

func (cmd *recordReq) handleCommand(w *worker) {
w.mu.Lock()
defer w.mu.Unlock()
for _, m := range cmd.ms {
if (m == stats.Measurement{}) { // not registered
continue
Expand Down
87 changes: 87 additions & 0 deletions stats/view/worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import (
"testing"
"time"

"go.opencensus.io/metric/metricdata"
"go.opencensus.io/metric/metricexport"
"go.opencensus.io/stats"
"go.opencensus.io/tag"
)
Expand Down Expand Up @@ -397,6 +399,91 @@ func TestUnregisterReportsUsage(t *testing.T) {
}
}

func TestWorkerRace(t *testing.T) {
restart()
ctx := context.Background()

m1 := stats.Int64("measure", "desc", "unit")
view1 := &View{Name: "count", Measure: m1, Aggregation: Count()}
m2 := stats.Int64("measure2", "desc", "unit")
view2 := &View{Name: "count2", Measure: m2, Aggregation: Count()}

// 1. This will export every microsecond.
SetReportingPeriod(time.Microsecond)

if err := Register(view1, view2); err != nil {
t.Fatalf("cannot register: %v", err)
}

e := &countExporter{}
RegisterExporter(e)

// Synchronize and make sure every goroutine has terminated before we exit
var waiter sync.WaitGroup
waiter.Add(3)
defer waiter.Wait()

doneCh := make(chan bool)
// 2. Record write routine at 700ns
go func() {
defer waiter.Done()
tick := time.NewTicker(700 * time.Nanosecond)
defer tick.Stop()

defer func() {
close(doneCh)
}()

for i := 0; i < 1e3; i++ {
stats.Record(ctx, m1.M(1))
stats.Record(ctx, m2.M(1))
stats.Record(ctx, m2.M(1))
<-tick.C
}
}()

// 2. Simulating RetrieveData 900ns
go func() {
defer waiter.Done()
tick := time.NewTicker(900 * time.Nanosecond)
defer tick.Stop()

for {
select {
case <-doneCh:
return
case <-tick.C:
RetrieveData(view1.Name)
}
}
}()

// 4. Export via Reader routine at 800ns
go func() {
defer waiter.Done()
tick := time.NewTicker(800 * time.Nanosecond)
defer tick.Stop()

reader := metricexport.Reader{}
for {
select {
case <-doneCh:
return
case <-tick.C:
// Perform some collection here
reader.ReadAndExport(&testExporter{})
}
}
}()
}

type testExporter struct {
}

func (te *testExporter) ExportMetrics(ctx context.Context, metrics []*metricdata.Metric) error {
return nil
}

type countExporter struct {
sync.Mutex
count int64
Expand Down

0 comments on commit ef4afeb

Please sign in to comment.