Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor Pipeline #3233

Merged
merged 19 commits into from
Sep 28, 2022
Merged

Refactor Pipeline #3233

merged 19 commits into from
Sep 28, 2022

Conversation

MrAlias
Copy link
Contributor

@MrAlias MrAlias commented Sep 23, 2022

pipeline type

The pipeline type is declared with a mapping of instrument aggregations and other data needed to export measurements:

// pipeline connects all of the instruments created by a meter provider to a Reader.
// This is the object that will be `Reader.register()` when a meter provider is created.
//
// As instruments are created the instrument should be checked if it exists in the
// views of a the Reader, and if so each aggregator should be added to the pipeline.
type pipeline struct {
resource *resource.Resource
sync.Mutex
aggregations map[instrumentation.Scope]map[instrumentKey]instrumentValue
callbacks []func(context.Context)
}

However, as the documentation states, a "pipeline connects all of the instruments created by a meter provider to a Reader", it not only serves as the "sync point" for instruments, but also the "pull point" for a Reader. There is a one-to-one mapping between a Reader and the views it was registered with and the pipeline.

This strongly indicates there are fields that are missing from the type. Namely, the Reader and []view.View it corresponds to. By adding these fields, they will no longer need to be tracked elsewhere. E.g.

type pipeline struct {
	resource *resource.Resource

	reader Reader
	views  []view.View

	sync.Mutex
	aggregations map[instrumentation.Scope]map[instrumentKey]instrumentValue
	callbacks    []func(context.Context)
}

pipelineRegistry type

The pipelineRegistry is declared as a struct containing two maps:

// pipelineRegistry manages creating pipelines, and aggregators. Meters retrieve
// new Aggregators from a pipelineRegistry.
type pipelineRegistry struct {
views map[Reader][]view.View
pipelines map[Reader]*pipeline
}

After its creation, the pipelineRegistry fields do not change (a by-product of a MeterProvider only allowing Reader registration during initialization). There is never a need to look-up existing Readers to ensure a unique insert.

The only iteration of these fields is done in parallel:

for rdr, views := range reg.views {
pipe := reg.pipelines[rdr]

This strongly indicates that these fields can be unified into a single field. When the Reader and []view.View data are moved into the pipeline (see above), both fields and the type itself are flattened into a slice of pipelines. E.g.

// pipelines is the group of pipelines connecting Readers with instrument
// measurement.
type pipelines []*pipeline

This new type still provides the iteration like before, but now over a slice type instead of by linking two maps:

	for _, pipe := range pipelines {
		// The Reader and views are now just fields of pipe.
		reader, views := pipe.reader, pipe.views
		/* ... */
	}

Facilitators

There are three "create" functions currently associated with the creation of internal.Aggreagtors for an instrument:

  1. func createAggregators[N int64 | float64](reg *pipelineRegistry, inst view.Instrument, instUnit unit.Unit) ([]internal.Aggregator[N], error) {
  2. func createAggregatorsForReader[N int64 | float64](rdr Reader, views []view.View, inst view.Instrument) (map[instrumentID]internal.Aggregator[N], error) {
  3. func createAggregator[N int64 | float64](agg aggregation.Aggregation, temporality metricdata.Temporality, monotonic bool) internal.Aggregator[N] {

Multiple of these create funcs were declared to help break apart a long set of logic and to interface with the generic internal.Aggreagtor type.

The use and purpose of each function is not immediately clear to a developer. It requires cognitive load to understand the complexity these introduce.

Furthermore, both createAggregators and createAggregatorsForReader are closer in function to methods on a pipelineRegistry and Reader/[]view.View pair respectively. With the Reader/[]view.View pair being moved into the pipeline type (see above), it makes sense to define each one as a method. However, methods cannot be declared with generics. The receiver itself needs to be defined with a generic which is problematic.

Instead, use the facilitator pattern to accomplish this. Not only will this allow the interface with generic and non-generic types, it will help name things for the purpose they serve.

inserter

Create an inserter type that acts as a facilitator of a pipeline (see above) to return []internal.Aggregator for a new instrument from a resolver (see below).

// inserter facilitates inserting of new instruments into a pipeline.
type inserter[N int64 | float64] struct {
	pipeline *pipeline
}

// Instrument inserts instrument inst with instUnit returning the Aggregators that need to be updated with measurments for that instrument.
func (i *inserter[N]) Instrument(inst view.Instrument, instUnit unit.Unit) ([]internal.Aggregator[N], error) { /* ... */}

// aggregator returns the Aggregator for an instrument configuration.
func (i *inserter[N]) aggregator(inst view.Instrument) (internal.Aggregator[N], error) { /* ... */}

The inserter.Instrument method replaces use of the createAggregatorsForReader function, and inserter.aggregator replaces use of the createAggregator function.

resolver

Create a resolver type that acts as a facilitator of inserter (see above) to return []internal.Aggregator for a new instrument from an instrument provider.

// resolver facilitates resolving Aggregators an instrument needs to aggregate
// measurements with while updating all pipelines that need to pull from those
// aggregations.
type resolver[N int64 | float64] struct {
	inserters []*inserter[N]
}

// Aggregators returns the Aggregators instrument inst needs to update when it makes a measurement.
func (r *resolver[N]) Aggregators(inst view.Instrument, instUnit unit.Unit) ([]internal.Aggregator[N], error) { /* ... */}

The resolver.Aggregators replaces the use of the createAggregators function.

@MrAlias MrAlias added pkg:SDK Related to an SDK package area:metrics Part of OpenTelemetry Metrics labels Sep 23, 2022
@MrAlias MrAlias added this to the Metric SDK: Beta milestone Sep 23, 2022
@codecov
Copy link

codecov bot commented Sep 23, 2022

Codecov Report

Merging #3233 (d45bee9) into main (12e16d4) will increase coverage by 0.0%.
The diff coverage is 96.2%.

Additional details and impacted files

Impacted file tree graph

@@          Coverage Diff          @@
##            main   #3233   +/-   ##
=====================================
  Coverage   77.3%   77.3%           
=====================================
  Files        159     159           
  Lines      11167   11184   +17     
=====================================
+ Hits        8637    8653   +16     
  Misses      2333    2333           
- Partials     197     198    +1     
Impacted Files Coverage Δ
sdk/metric/pipeline.go 96.0% <95.2%> (-1.3%) ⬇️
sdk/metric/instrument_provider.go 80.0% <100.0%> (ø)
sdk/metric/meter.go 100.0% <100.0%> (ø)
sdk/metric/provider.go 100.0% <100.0%> (ø)
exporters/jaeger/jaeger.go 90.3% <0.0%> (-0.9%) ⬇️
sdk/trace/batch_span_processor.go 81.9% <0.0%> (+1.7%) ⬆️

@MrAlias MrAlias added the Skip Changelog PRs that do not require a CHANGELOG.md entry label Sep 23, 2022
@MrAlias MrAlias changed the title WIP Refactor Pipeline Refactor Pipeline Sep 23, 2022
@MrAlias MrAlias marked this pull request as ready for review September 23, 2022 19:33
MrAlias added a commit to MrAlias/opentelemetry-go that referenced this pull request Sep 23, 2022
@MadVikingGod
Copy link
Contributor

I like where this has landed. At a high level, I think it is much cleaner than what I fabrocobbled together.

I do wonder how this will solve the problem raised in #3229. In particular, the case when I do something like:

ctr, err := mp.SyncInt64().Counter("name") // This should be ok
ctr, err := mp.SyncFloat64().Counter("name") // This shouldn't produce aggregators, and should produce and err

Do you think there will be any problem adding something like a caching layer?

@MrAlias
Copy link
Contributor Author

MrAlias commented Sep 26, 2022

I like where this has landed. At a high level, I think it is much cleaner than what I fabrocobbled together.

I do wonder how this will solve the problem raised in #3229. In particular, the case when I do something like:

ctr, err := mp.SyncInt64().Counter("name") // This should be ok
ctr, err := mp.SyncFloat64().Counter("name") // This shouldn't produce aggregators, and should produce and err

Do you think there will be any problem adding something like a caching layer?

Yeah, it should be pretty easy: MrAlias#660

MrAlias added a commit to MrAlias/opentelemetry-go that referenced this pull request Sep 27, 2022
MrAlias added a commit to MrAlias/opentelemetry-go that referenced this pull request Sep 27, 2022
@MrAlias MrAlias merged commit aca054b into open-telemetry:main Sep 28, 2022
MrAlias added a commit to MrAlias/opentelemetry-go that referenced this pull request Sep 28, 2022
@jmacd
Copy link
Contributor

jmacd commented Sep 28, 2022

💯

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area:metrics Part of OpenTelemetry Metrics pkg:SDK Related to an SDK package Skip Changelog PRs that do not require a CHANGELOG.md entry
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants