From ef4afeb0d0cf4fc08868683603cd6e474be9be18 Mon Sep 17 00:00:00 2001 From: rghetia Date: Thu, 11 Apr 2019 21:06:30 -0700 Subject: [PATCH] fix race condition in reading record and updating record. (#1104) --- stats/view/worker.go | 2 + stats/view/worker_commands.go | 4 ++ stats/view/worker_test.go | 87 +++++++++++++++++++++++++++++++++++ 3 files changed, 93 insertions(+) diff --git a/stats/view/worker.go b/stats/view/worker.go index 37279b39e..2f3c018af 100644 --- a/stats/view/worker.go +++ b/stats/view/worker.go @@ -236,6 +236,8 @@ func (w *worker) reportView(v *viewInternal, now time.Time) { } func (w *worker) reportUsage(now time.Time) { + w.mu.Lock() + defer w.mu.Unlock() for _, v := range w.views { w.reportView(v, now) } diff --git a/stats/view/worker_commands.go b/stats/view/worker_commands.go index ba6203a50..0267e179a 100644 --- a/stats/view/worker_commands.go +++ b/stats/view/worker_commands.go @@ -121,6 +121,8 @@ type retrieveDataResp struct { } func (cmd *retrieveDataReq) handleCommand(w *worker) { + w.mu.Lock() + defer w.mu.Unlock() vi, ok := w.views[cmd.v] if !ok { cmd.c <- &retrieveDataResp{ @@ -153,6 +155,8 @@ type recordReq struct { } func (cmd *recordReq) handleCommand(w *worker) { + w.mu.Lock() + defer w.mu.Unlock() for _, m := range cmd.ms { if (m == stats.Measurement{}) { // not registered continue diff --git a/stats/view/worker_test.go b/stats/view/worker_test.go index d43014648..8d4546ea4 100644 --- a/stats/view/worker_test.go +++ b/stats/view/worker_test.go @@ -22,6 +22,8 @@ import ( "testing" "time" + "go.opencensus.io/metric/metricdata" + "go.opencensus.io/metric/metricexport" "go.opencensus.io/stats" "go.opencensus.io/tag" ) @@ -397,6 +399,91 @@ func TestUnregisterReportsUsage(t *testing.T) { } } +func TestWorkerRace(t *testing.T) { + restart() + ctx := context.Background() + + m1 := stats.Int64("measure", "desc", "unit") + view1 := &View{Name: "count", Measure: m1, Aggregation: Count()} + m2 := stats.Int64("measure2", "desc", "unit") + view2 := &View{Name: "count2", Measure: m2, Aggregation: Count()} + + // 1. This will export every microsecond. + SetReportingPeriod(time.Microsecond) + + if err := Register(view1, view2); err != nil { + t.Fatalf("cannot register: %v", err) + } + + e := &countExporter{} + RegisterExporter(e) + + // Synchronize and make sure every goroutine has terminated before we exit + var waiter sync.WaitGroup + waiter.Add(3) + defer waiter.Wait() + + doneCh := make(chan bool) + // 2. Record write routine at 700ns + go func() { + defer waiter.Done() + tick := time.NewTicker(700 * time.Nanosecond) + defer tick.Stop() + + defer func() { + close(doneCh) + }() + + for i := 0; i < 1e3; i++ { + stats.Record(ctx, m1.M(1)) + stats.Record(ctx, m2.M(1)) + stats.Record(ctx, m2.M(1)) + <-tick.C + } + }() + + // 2. Simulating RetrieveData 900ns + go func() { + defer waiter.Done() + tick := time.NewTicker(900 * time.Nanosecond) + defer tick.Stop() + + for { + select { + case <-doneCh: + return + case <-tick.C: + RetrieveData(view1.Name) + } + } + }() + + // 4. Export via Reader routine at 800ns + go func() { + defer waiter.Done() + tick := time.NewTicker(800 * time.Nanosecond) + defer tick.Stop() + + reader := metricexport.Reader{} + for { + select { + case <-doneCh: + return + case <-tick.C: + // Perform some collection here + reader.ReadAndExport(&testExporter{}) + } + } + }() +} + +type testExporter struct { +} + +func (te *testExporter) ExportMetrics(ctx context.Context, metrics []*metricdata.Metric) error { + return nil +} + type countExporter struct { sync.Mutex count int64