Skip to content

Commit

Permalink
Rename Integrator to Processor (#863)
Browse files Browse the repository at this point in the history
Co-authored-by: Tyler Yahn <MrAlias@users.noreply.github.com>
  • Loading branch information
jmacd and MrAlias authored Jun 23, 2020
1 parent 4e71b4e commit 2966505
Show file tree
Hide file tree
Showing 18 changed files with 139 additions and 139 deletions.
2 changes: 1 addition & 1 deletion api/global/internal/benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import (
"go.opentelemetry.io/otel/api/trace"
export "go.opentelemetry.io/otel/sdk/export/metric"
sdk "go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/metric/integrator/test"
"go.opentelemetry.io/otel/sdk/metric/processor/test"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
)

Expand Down
2 changes: 1 addition & 1 deletion exporters/metric/prometheus/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ type Config struct {
}

// NewExportPipeline sets up a complete export pipeline with the recommended setup,
// using the recommended selector and standard integrator. See the pull.Options.
// using the recommended selector and standard processor. See the pull.Options.
func NewExportPipeline(config Config, options ...pull.Option) (*Exporter, error) {
if config.Registry == nil {
config.Registry = prometheus.NewRegistry()
Expand Down
2 changes: 1 addition & 1 deletion exporters/metric/stdout/stdout.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ func InstallNewPipeline(config Config, options ...push.Option) (*push.Controller

// NewExportPipeline sets up a complete export pipeline with the
// recommended setup, chaining a NewRawExporter into the recommended
// selectors and integrators.
// selectors and processors.
func NewExportPipeline(config Config, options ...push.Option) (*push.Controller, error) {
exporter, err := NewRawExporter(config)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion exporters/metric/test/test.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ func (p *CheckpointSet) Add(desc *metric.Descriptor, newAgg export.Aggregator, l
return newAgg, true
}

// ForEach does not use ExportKindSelected: use a real Integrator to
// ForEach does not use ExportKindSelected: use a real Processor to
// test ExportKind functionality.
func (p *CheckpointSet) ForEach(_ export.ExportKindSelector, f func(export.Record) error) error {
for _, r := range p.updates {
Expand Down
6 changes: 3 additions & 3 deletions exporters/otlp/otlp_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import (
metricsdk "go.opentelemetry.io/otel/sdk/export/metric"
exporttrace "go.opentelemetry.io/otel/sdk/export/trace"
"go.opentelemetry.io/otel/sdk/metric/controller/push"
integrator "go.opentelemetry.io/otel/sdk/metric/integrator/basic"
processor "go.opentelemetry.io/otel/sdk/metric/processor/basic"
"go.opentelemetry.io/otel/sdk/metric/selector/simple"
"go.opentelemetry.io/otel/sdk/resource"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
Expand Down Expand Up @@ -117,8 +117,8 @@ func newExporterEndToEndTest(t *testing.T, additionalOpts []otlp.ExporterOption)
}

selector := simple.NewWithExactDistribution()
integrator := integrator.New(selector, metricsdk.PassThroughExporter)
pusher := push.New(integrator, exp)
processor := processor.New(selector, metricsdk.PassThroughExporter)
pusher := push.New(processor, exp)
pusher.Start()

ctx := context.Background()
Expand Down
28 changes: 14 additions & 14 deletions sdk/export/metric/metric.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (
"go.opentelemetry.io/otel/sdk/resource"
)

// Integrator is responsible for deciding which kind of aggregation to
// Processor is responsible for deciding which kind of aggregation to
// use (via AggregatorSelector), gathering exported results from the
// SDK during collection, and deciding over which dimensions to group
// the exported data.
Expand All @@ -42,9 +42,9 @@ import (
//
// The `Process` method is called during collection in a
// single-threaded context from the SDK, after the aggregator is
// checkpointed, allowing the integrator to build the set of metrics
// checkpointed, allowing the processor to build the set of metrics
// currently being exported.
type Integrator interface {
type Processor interface {
// AggregatorSelector is responsible for selecting the
// concrete type of Aggregator used for a metric in the SDK.
//
Expand Down Expand Up @@ -177,18 +177,18 @@ type Exporter interface {
// The Context comes from the controller that initiated
// collection.
//
// The CheckpointSet interface refers to the Integrator that just
// The CheckpointSet interface refers to the Processor that just
// completed collection.
Export(context.Context, CheckpointSet) error

// ExportKindSelector is an interface used by the Integrator
// ExportKindSelector is an interface used by the Processor
// in deciding whether to compute Delta or Cumulative
// Aggregations when passing Records to this Exporter.
ExportKindSelector
}

// ExportKindSelector is a sub-interface of Exporter used to indicate
// whether the Integrator should compute Delta or Cumulative
// whether the Processor should compute Delta or Cumulative
// Aggregations.
type ExportKindSelector interface {
// ExportKindFor should return the correct ExportKind that
Expand All @@ -198,7 +198,7 @@ type ExportKindSelector interface {
}

// CheckpointSet allows a controller to access a complete checkpoint of
// aggregated metrics from the Integrator. This is passed to the
// aggregated metrics from the Processor. This is passed to the
// Exporter which may then use ForEach to iterate over the collection
// of aggregated metrics.
type CheckpointSet interface {
Expand All @@ -219,9 +219,9 @@ type CheckpointSet interface {

// Locker supports locking the checkpoint set. Collection
// into the checkpoint set cannot take place (in case of a
// stateful integrator) while it is locked.
// stateful processor) while it is locked.
//
// The Integrator attached to the Accumulator MUST be called
// The Processor attached to the Accumulator MUST be called
// with the lock held.
sync.Locker

Expand All @@ -232,7 +232,7 @@ type CheckpointSet interface {
}

// Metadata contains the common elements for exported metric data that
// are shared by the Accumulator->Integrator and Integrator->Exporter
// are shared by the Accumulator->Processor and Processor->Exporter
// steps.
type Metadata struct {
descriptor *metric.Descriptor
Expand All @@ -241,14 +241,14 @@ type Metadata struct {
}

// Accumulation contains the exported data for a single metric instrument
// and label set, as prepared by an Accumulator for the Integrator.
// and label set, as prepared by an Accumulator for the Processor.
type Accumulation struct {
Metadata
aggregator Aggregator
}

// Record contains the exported data for a single metric instrument
// and label set, as prepared by the Integrator for the Exporter.
// and label set, as prepared by the Processor for the Exporter.
// This includes the effective start and end time for the aggregation.
type Record struct {
Metadata
Expand All @@ -274,7 +274,7 @@ func (m Metadata) Resource() *resource.Resource {
}

// NewAccumulation allows Accumulator implementations to construct new
// Accumulations to send to Integrators. The Descriptor, Labels, Resource,
// Accumulations to send to Processors. The Descriptor, Labels, Resource,
// and Aggregator represent aggregate metric events received over a single
// collection period.
func NewAccumulation(descriptor *metric.Descriptor, labels *label.Set, resource *resource.Resource, aggregator Aggregator) Accumulation {
Expand All @@ -294,7 +294,7 @@ func (r Accumulation) Aggregator() Aggregator {
return r.aggregator
}

// NewRecord allows Integrator implementations to construct export
// NewRecord allows Processor implementations to construct export
// records. The Descriptor, Labels, and Aggregator represent
// aggregate metric events received over a single collection period.
func NewRecord(descriptor *metric.Descriptor, labels *label.Set, resource *resource.Resource, aggregation aggregation.Aggregation, start, end time.Time) Record {
Expand Down
2 changes: 1 addition & 1 deletion sdk/metric/benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import (
"go.opentelemetry.io/otel/api/metric"
export "go.opentelemetry.io/otel/sdk/export/metric"
sdk "go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/metric/integrator/test"
"go.opentelemetry.io/otel/sdk/metric/processor/test"
)

type benchFixture struct {
Expand Down
32 changes: 16 additions & 16 deletions sdk/metric/controller/pull/pull.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import (
export "go.opentelemetry.io/otel/sdk/export/metric"
sdk "go.opentelemetry.io/otel/sdk/metric"
controllerTime "go.opentelemetry.io/otel/sdk/metric/controller/time"
integrator "go.opentelemetry.io/otel/sdk/metric/integrator/basic"
processor "go.opentelemetry.io/otel/sdk/metric/processor/basic"
"go.opentelemetry.io/otel/sdk/resource"
)

Expand All @@ -32,11 +32,11 @@ import (
const DefaultCachePeriod time.Duration = 10 * time.Second

// Controller manages access to a *sdk.Accumulator and
// *basic.Integrator. Use Provider() for obtaining Meters. Use
// *basic.Processor. Use Provider() for obtaining Meters. Use
// Foreach() for accessing current records.
type Controller struct {
accumulator *sdk.Accumulator
integrator *integrator.Integrator
processor *processor.Processor
provider *registry.Provider
period time.Duration
lastCollect time.Time
Expand All @@ -53,25 +53,25 @@ func New(aselector export.AggregatorSelector, eselector export.ExportKindSelecto
for _, opt := range options {
opt.Apply(config)
}
integrator := integrator.New(aselector, eselector)
processor := processor.New(aselector, eselector)
accum := sdk.NewAccumulator(
integrator,
processor,
sdk.WithResource(config.Resource),
)
return &Controller{
accumulator: accum,
integrator: integrator,
processor: processor,
provider: registry.NewProvider(accum),
period: config.CachePeriod,
checkpoint: integrator.CheckpointSet(),
checkpoint: processor.CheckpointSet(),
clock: controllerTime.RealClock{},
}
}

// SetClock sets the clock used for caching. For testing purposes.
func (c *Controller) SetClock(clock controllerTime.Clock) {
c.integrator.Lock()
defer c.integrator.Unlock()
c.processor.Lock()
defer c.processor.Unlock()
c.clock = clock
}

Expand All @@ -84,17 +84,17 @@ func (c *Controller) Provider() metric.Provider {
// Foreach gives the caller read-locked access to the current
// export.CheckpointSet.
func (c *Controller) ForEach(ks export.ExportKindSelector, f func(export.Record) error) error {
c.integrator.RLock()
defer c.integrator.RUnlock()
c.processor.RLock()
defer c.processor.RUnlock()

return c.checkpoint.ForEach(ks, f)
}

// Collect requests a collection. The collection will be skipped if
// the last collection is aged less than the CachePeriod.
func (c *Controller) Collect(ctx context.Context) error {
c.integrator.Lock()
defer c.integrator.Unlock()
c.processor.Lock()
defer c.processor.Unlock()

if c.period > 0 {
now := c.clock.Now()
Expand All @@ -106,9 +106,9 @@ func (c *Controller) Collect(ctx context.Context) error {
c.lastCollect = now
}

c.integrator.StartCollection()
c.processor.StartCollection()
c.accumulator.Collect(ctx)
err := c.integrator.FinishCollection()
c.checkpoint = c.integrator.CheckpointSet()
err := c.processor.FinishCollection()
c.checkpoint = c.processor.CheckpointSet()
return err
}
2 changes: 1 addition & 1 deletion sdk/metric/controller/pull/pull_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import (
export "go.opentelemetry.io/otel/sdk/export/metric"
"go.opentelemetry.io/otel/sdk/metric/controller/pull"
controllerTest "go.opentelemetry.io/otel/sdk/metric/controller/test"
"go.opentelemetry.io/otel/sdk/metric/integrator/test"
"go.opentelemetry.io/otel/sdk/metric/processor/test"
selector "go.opentelemetry.io/otel/sdk/metric/selector/simple"
)

Expand Down
20 changes: 10 additions & 10 deletions sdk/metric/controller/push/push.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import (
export "go.opentelemetry.io/otel/sdk/export/metric"
sdk "go.opentelemetry.io/otel/sdk/metric"
controllerTime "go.opentelemetry.io/otel/sdk/metric/controller/time"
"go.opentelemetry.io/otel/sdk/metric/integrator/basic"
"go.opentelemetry.io/otel/sdk/metric/processor/basic"
)

// DefaultPushPeriod is the default time interval between pushes.
Expand All @@ -36,7 +36,7 @@ type Controller struct {
lock sync.Mutex
accumulator *sdk.Accumulator
provider *registry.Provider
integrator *basic.Integrator
processor *basic.Processor
exporter export.Exporter
wg sync.WaitGroup
ch chan struct{}
Expand All @@ -60,15 +60,15 @@ func New(selector export.AggregatorSelector, exporter export.Exporter, opts ...O
c.Timeout = c.Period
}

integrator := basic.New(selector, exporter)
processor := basic.New(selector, exporter)
impl := sdk.NewAccumulator(
integrator,
processor,
sdk.WithResource(c.Resource),
)
return &Controller{
provider: registry.NewProvider(impl),
accumulator: impl,
integrator: integrator,
processor: processor,
exporter: exporter,
ch: make(chan struct{}),
period: c.Period,
Expand Down Expand Up @@ -139,16 +139,16 @@ func (c *Controller) tick() {
ctx, cancel := context.WithTimeout(context.Background(), c.timeout)
defer cancel()

c.integrator.Lock()
defer c.integrator.Unlock()
c.processor.Lock()
defer c.processor.Unlock()

c.integrator.StartCollection()
c.processor.StartCollection()
c.accumulator.Collect(ctx)
if err := c.integrator.FinishCollection(); err != nil {
if err := c.processor.FinishCollection(); err != nil {
global.Handle(err)
}

if err := c.exporter.Export(ctx, c.integrator.CheckpointSet()); err != nil {
if err := c.exporter.Export(ctx, c.processor.CheckpointSet()); err != nil {
global.Handle(err)
}
}
6 changes: 3 additions & 3 deletions sdk/metric/controller/push/push_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ import (
"go.opentelemetry.io/otel/sdk/export/metric/aggregation"
"go.opentelemetry.io/otel/sdk/metric/controller/push"
controllerTest "go.opentelemetry.io/otel/sdk/metric/controller/test"
"go.opentelemetry.io/otel/sdk/metric/integrator/test"
integratorTest "go.opentelemetry.io/otel/sdk/metric/integrator/test"
"go.opentelemetry.io/otel/sdk/metric/processor/test"
processorTest "go.opentelemetry.io/otel/sdk/metric/processor/test"
"go.opentelemetry.io/otel/sdk/resource"
)

Expand Down Expand Up @@ -125,7 +125,7 @@ func (e *testExporter) resetRecords() ([]export.Record, int) {

func TestPushDoubleStop(t *testing.T) {
fix := newFixture(t)
p := push.New(integratorTest.AggregatorSelector(), fix.exporter)
p := push.New(processorTest.AggregatorSelector(), fix.exporter)
p.Start()
p.Stop()
p.Stop()
Expand Down
Loading

0 comments on commit 2966505

Please sign in to comment.