From 5ae9166f90ff989c557e45fcb9592ac59afa0fa0 Mon Sep 17 00:00:00 2001 From: rghetia Date: Fri, 22 Mar 2019 09:22:20 -0700 Subject: [PATCH] Add support for reader. (#1049) * Add support for reader. * handle multiple call to reader.Stop and test for producer while reader is stopped. * fix typo. * fix review comment and refactor reader to metricexport package. * change Reader to IntervalReader and add Reader. Also include ctx in ExportMetric api. * modify export interface to return error and make it plural. * remove option and provide Start function. * move exporter.go to metricexport package. * added option for reader. * make constants private. --- metric/metricexport/export.go | 26 +++ metric/metricexport/reader.go | 187 +++++++++++++++++++++ metric/metricexport/reader_test.go | 260 +++++++++++++++++++++++++++++ 3 files changed, 473 insertions(+) create mode 100644 metric/metricexport/export.go create mode 100644 metric/metricexport/reader.go create mode 100644 metric/metricexport/reader_test.go diff --git a/metric/metricexport/export.go b/metric/metricexport/export.go new file mode 100644 index 000000000..23f4a864a --- /dev/null +++ b/metric/metricexport/export.go @@ -0,0 +1,26 @@ +// Copyright 2019, OpenCensus Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package metricexport + +import ( + "context" + + "go.opencensus.io/metric/metricdata" +) + +// Exporter is an interface that exporters implement to export the metric data. +type Exporter interface { + ExportMetrics(ctx context.Context, data []*metricdata.Metric) error +} diff --git a/metric/metricexport/reader.go b/metric/metricexport/reader.go new file mode 100644 index 000000000..44ace7008 --- /dev/null +++ b/metric/metricexport/reader.go @@ -0,0 +1,187 @@ +// Copyright 2019, OpenCensus Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +package metricexport + +import ( + "fmt" + "time" + + "context" + "go.opencensus.io/metric/metricdata" + "go.opencensus.io/metric/metricproducer" + "go.opencensus.io/trace" + "sync" +) + +var ( + defaultSampler = trace.ProbabilitySampler(0.0001) + errReportingIntervalTooLow = fmt.Errorf("reporting interval less than %d", minimumReportingDuration) + errAlreadyStarted = fmt.Errorf("already started") + errIntervalReaderNil = fmt.Errorf("interval reader is nil") + errExporterNil = fmt.Errorf("exporter is nil") + errReaderNil = fmt.Errorf("reader is nil") +) + +const ( + defaultReportingDuration = 60 * time.Second + minimumReportingDuration = 1 * time.Second + defaultSpanName = "ExportMetrics" +) + +// ReaderOptions contains options pertaining to metrics reader. +type ReaderOptions struct { + // SpanName is the name used for span created to export metrics. + SpanName string +} + +// Reader reads metrics from all producers registered +// with producer manager and exports those metrics using provided +// exporter. +type Reader struct { + sampler trace.Sampler + + spanName string +} + +// IntervalReader periodically reads metrics from all producers registered +// with producer manager and exports those metrics using provided +// exporter. Call Reader.Stop() to stop the reader. +type IntervalReader struct { + // ReportingInterval it the time duration between two consecutive + // metrics reporting. defaultReportingDuration is used if it is not set. + // It cannot be set lower than minimumReportingDuration. + ReportingInterval time.Duration + + exporter Exporter + timer *time.Ticker + quit, done chan bool + mu sync.RWMutex + reader *Reader +} + +// ReaderOption apply changes to ReaderOptions. +type ReaderOption func(*ReaderOptions) + +// WithSpanName makes new reader to use given span name when exporting metrics. +func WithSpanName(spanName string) ReaderOption { + return func(o *ReaderOptions) { + o.SpanName = spanName + } +} + +// NewReader returns a reader configured with specified options. +func NewReader(o ...ReaderOption) *Reader { + var opts ReaderOptions + for _, op := range o { + op(&opts) + } + reader := &Reader{defaultSampler, defaultSpanName} + if opts.SpanName != "" { + reader.spanName = opts.SpanName + } + return reader +} + +// NewIntervalReader creates a reader. Once started it periodically +// reads metrics from all producers and exports them using provided exporter. +func NewIntervalReader(reader *Reader, exporter Exporter) (*IntervalReader, error) { + if exporter == nil { + return nil, errExporterNil + } + if reader == nil { + return nil, errReaderNil + } + + r := &IntervalReader{ + exporter: exporter, + reader: reader, + } + return r, nil +} + +// Start starts the IntervalReader which periodically reads metrics from all +// producers registered with global producer manager. If the reporting interval +// is not set prior to calling this function then default reporting interval +// is used. +func (ir *IntervalReader) Start() error { + if ir == nil { + return errIntervalReaderNil + } + ir.mu.Lock() + defer ir.mu.Unlock() + var reportingInterval = defaultReportingDuration + if ir.ReportingInterval != 0 { + if ir.ReportingInterval < minimumReportingDuration { + return errReportingIntervalTooLow + } + reportingInterval = ir.ReportingInterval + } + + if ir.done != nil { + return errAlreadyStarted + } + ir.timer = time.NewTicker(reportingInterval) + ir.quit = make(chan bool) + ir.done = make(chan bool) + + go ir.startInternal() + return nil +} + +func (ir *IntervalReader) startInternal() { + for { + select { + case <-ir.timer.C: + ir.reader.ReadAndExport(ir.exporter) + case <-ir.quit: + ir.timer.Stop() + ir.done <- true + return + } + } +} + +// Stop stops the reader from reading and exporting metrics. +// Additional call to Stop are no-ops. +func (ir *IntervalReader) Stop() { + if ir == nil { + return + } + ir.mu.Lock() + defer ir.mu.Unlock() + if ir.quit == nil { + return + } + ir.quit <- true + <-ir.done + close(ir.quit) + close(ir.done) + ir.quit = nil +} + +// ReadAndExport reads metrics from all producer registered with +// producer manager and then exports them using provided exporter. +func (r *Reader) ReadAndExport(exporter Exporter) { + ctx, span := trace.StartSpan(context.Background(), r.spanName, trace.WithSampler(r.sampler)) + defer span.End() + producers := metricproducer.GlobalManager().GetAll() + data := []*metricdata.Metric{} + for _, producer := range producers { + data = append(data, producer.Read()...) + } + // TODO: [rghetia] add metrics for errors. + exporter.ExportMetrics(ctx, data) +} diff --git a/metric/metricexport/reader_test.go b/metric/metricexport/reader_test.go new file mode 100644 index 000000000..a043530e8 --- /dev/null +++ b/metric/metricexport/reader_test.go @@ -0,0 +1,260 @@ +// Copyright 2019, OpenCensus Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +package metricexport + +import ( + "context" + "sync" + "testing" + "time" + + "go.opencensus.io/metric" + "go.opencensus.io/metric/metricdata" + "go.opencensus.io/metric/metricproducer" +) + +var ( + ir1 *IntervalReader + ir2 *IntervalReader + reader1 = NewReader(WithSpanName("test-export-span")) + exporter1 = &metricExporter{} + exporter2 = &metricExporter{} + gaugeEntry *metric.Int64GaugeEntry + duration1 = time.Duration(1000 * time.Millisecond) + duration2 = time.Duration(2000 * time.Millisecond) +) + +type metricExporter struct { + sync.Mutex + metrics []*metricdata.Metric +} + +func (e *metricExporter) ExportMetrics(ctx context.Context, metrics []*metricdata.Metric) error { + e.Lock() + defer e.Unlock() + + e.metrics = append(e.metrics, metrics...) + return nil +} + +func init() { + r := metric.NewRegistry() + metricproducer.GlobalManager().AddProducer(r) + g, _ := r.AddInt64Gauge("active_request", "Number of active requests, per method.", metricdata.UnitDimensionless, "method") + gaugeEntry, _ = g.GetEntry(metricdata.NewLabelValue("foo")) +} + +func TestNewReaderWitDefaultOptions(t *testing.T) { + r := NewReader() + + if r.spanName != defaultSpanName { + t.Errorf("span name: got %v, want %v\n", r.spanName, defaultSpanName) + } +} + +func TestNewReaderWitSpanName(t *testing.T) { + spanName := "test-span" + r := NewReader(WithSpanName(spanName)) + + if r.spanName != spanName { + t.Errorf("span name: got %+v, want %v\n", r.spanName, spanName) + } +} + +func TestNewReader(t *testing.T) { + r := NewReader() + + gaugeEntry.Add(1) + + r.ReadAndExport(exporter1) + + checkExportedCount(exporter1, 1, t) + checkExportedMetricDesc(exporter1, "active_request", t) + resetExporter(exporter1) +} + +func TestNewIntervalReader(t *testing.T) { + ir1 = createAndStart(exporter1, duration1, t) + + gaugeEntry.Add(1) + + time.Sleep(1500 * time.Millisecond) + checkExportedCount(exporter1, 1, t) + checkExportedMetricDesc(exporter1, "active_request", t) + ir1.Stop() + resetExporter(exporter1) +} + +func TestManualReadForIntervalReader(t *testing.T) { + ir1 = createAndStart(exporter1, duration1, t) + + gaugeEntry.Set(1) + reader1.ReadAndExport(exporter1) + gaugeEntry.Set(4) + + time.Sleep(1500 * time.Millisecond) + + checkExportedCount(exporter1, 2, t) + checkExportedValues(exporter1, []int64{1, 4}, t) // one for manual read other for time based. + checkExportedMetricDesc(exporter1, "active_request", t) + ir1.Stop() + resetExporter(exporter1) +} + +func TestProducerWithIntervalReaderStop(t *testing.T) { + ir1 = createAndStart(exporter1, duration1, t) + ir1.Stop() + + gaugeEntry.Add(1) + + time.Sleep(1500 * time.Millisecond) + + checkExportedCount(exporter1, 0, t) + checkExportedMetricDesc(exporter1, "active_request", t) + resetExporter(exporter1) +} + +func TestProducerWithMultipleIntervalReaders(t *testing.T) { + ir1 = createAndStart(exporter1, duration1, t) + ir2 = createAndStart(exporter2, duration2, t) + + gaugeEntry.Add(1) + + time.Sleep(2500 * time.Millisecond) + + checkExportedCount(exporter1, 2, t) + checkExportedMetricDesc(exporter1, "active_request", t) + checkExportedCount(exporter2, 1, t) + checkExportedMetricDesc(exporter2, "active_request", t) + ir1.Stop() + ir2.Stop() + resetExporter(exporter1) + resetExporter(exporter1) +} + +func TestIntervalReaderMultipleStop(t *testing.T) { + ir1 = createAndStart(exporter1, duration1, t) + stop := make(chan bool, 1) + go func() { + ir1.Stop() + ir1.Stop() + stop <- true + }() + + select { + case _ = <-stop: + case <-time.After(1 * time.Second): + t.Fatalf("ir1 stop got blocked") + } +} + +func TestIntervalReaderMultipleStart(t *testing.T) { + ir1 = createAndStart(exporter1, duration1, t) + ir1.Start() + + gaugeEntry.Add(1) + + time.Sleep(1500 * time.Millisecond) + + checkExportedCount(exporter1, 1, t) + checkExportedMetricDesc(exporter1, "active_request", t) + ir1.Stop() + resetExporter(exporter1) +} + +func TestNewIntervalReaderWithNilReader(t *testing.T) { + _, err := NewIntervalReader(nil, exporter1) + if err == nil { + t.Fatalf("expected error but got nil\n") + } +} + +func TestNewIntervalReaderWithNilExporter(t *testing.T) { + _, err := NewIntervalReader(reader1, nil) + if err == nil { + t.Fatalf("expected error but got nil\n") + } +} + +func TestNewIntervalReaderStartWithInvalidInterval(t *testing.T) { + ir, err := NewIntervalReader(reader1, exporter1) + ir.ReportingInterval = time.Duration(500 * time.Millisecond) + err = ir.Start() + if err == nil { + t.Fatalf("expected error but got nil\n") + } +} + +func checkExportedCount(exporter *metricExporter, wantCount int, t *testing.T) { + exporter.Lock() + defer exporter.Unlock() + gotCount := len(exporter.metrics) + if gotCount != wantCount { + t.Fatalf("exported metric count: got %d, want %d\n", gotCount, wantCount) + } +} + +func checkExportedValues(exporter *metricExporter, wantValues []int64, t *testing.T) { + exporter.Lock() + defer exporter.Unlock() + gotCount := len(exporter.metrics) + wantCount := len(wantValues) + if gotCount != wantCount { + t.Errorf("exported metric count: got %d, want %d\n", gotCount, wantCount) + return + } + for i, wantValue := range wantValues { + var gotValue int64 + switch v := exporter.metrics[i].TimeSeries[0].Points[0].Value.(type) { + case int64: + gotValue = v + default: + t.Errorf("expected float64 value but found other %T", exporter.metrics[i].TimeSeries[0].Points[0].Value) + } + if gotValue != wantValue { + t.Errorf("values idx %d, got: %v, want %v", i, gotValue, wantValue) + } + } +} + +func checkExportedMetricDesc(exporter *metricExporter, wantMdName string, t *testing.T) { + exporter.Lock() + defer exporter.Unlock() + for _, metric := range exporter.metrics { + gotMdName := metric.Descriptor.Name + if gotMdName != wantMdName { + t.Errorf("got %s, want %s\n", gotMdName, wantMdName) + } + } + exporter.metrics = nil +} + +func resetExporter(exporter *metricExporter) { + exporter.Lock() + defer exporter.Unlock() + exporter.metrics = nil +} + +// createAndStart stops the current processors and creates a new one. +func createAndStart(exporter *metricExporter, d time.Duration, t *testing.T) *IntervalReader { + ir, _ := NewIntervalReader(reader1, exporter) + ir.ReportingInterval = d + err := ir.Start() + if err != nil { + t.Fatalf("error creating reader %v\n", err) + } + return ir +}