Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[processor/deltatocumulative] partial linear pipeline #35048

Merged
merged 8 commits into from
Sep 27, 2024

Conversation

sh0rez
Copy link
Contributor

@sh0rez sh0rez commented Sep 6, 2024

Description:
Partially introduces a highly decoupled, linear processing pipeline.
Implemented as a standalone struct to make review easier, will refactor this later.
Instead of overloading Map.Store() to do aggregation, staleness and
limiting, this functionality is now explcitly handled in
ConsumeMetrics.

This highly aids readability and makes understanding this processor a
lot easier, as less mental context needs to be kept.

Notes to reviewer:
See 68dc901 for the main added logic.
Compare processor.go (old, nested) to linear.go (new, linear)

Replaces #34757

Link to tracking Issue: none

Testing: This is a refactor. Existing tests were not modified and still pass

Documentation: not needed

Copy link
Contributor

@ArthurSens ArthurSens left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks good to me as a first glance, but it would be awesome if we had some kind of end-to-end test with generated testdata, similar to what we have with intervalprocessor.

Not sure if I'm missing something, but I don't see a test that creates a new deltatocumulative processor through the Factory, call ConsumeMetrics and checks the result. I see for the original processor, but not for linear. I can also see the Chain object you created to call two processors together, but I'm not understanding where exactly it's used xD. In summary, I think you covered the e2e tests with linear but I'm struggling to understand how exactly

internal/exp/metrics/identity/stream.go Outdated Show resolved Hide resolved
@sh0rez
Copy link
Contributor Author

sh0rez commented Sep 10, 2024

@ArthurSens It looks good to me as a first glance, but it would be awesome if we had some kind of end-to-end test with generated testdata, similar to what we have with intervalprocessor.

It's there, in recently merged processor_test.go:

proc, err := self.NewFactory().CreateMetricsProcessor(
context.Background(),
processortest.NewNopSettings(),
cfg,
next,
)


@ArthurSens I can also see the Chain object you created to call two processors together, but I'm not understanding where exactly it's used xD

It's in factory.go. I first create both processors (Processor and Linear) as usual, then return them as Chain{}. This works, because Chain is a []processor.Metrics, which itself implements processor.Metrics on that slice

@ArthurSens
Copy link
Contributor

ArthurSens commented Sep 10, 2024

But if I'm reading things correctly, we're only calling the first processor of the Chain

https://github.com/open-telemetry/opentelemetry-collector-contrib/pull/35048/files#diff-5922c3911f502d9f26fbd71672eca70c790535c8e35d4bd61f14b10bafd82d7eR28-R33

EDIT:
ok nevermind, finally understood. One processor is the next of the other, so when we call the first processor in the chain, the first one will call the second 👍

@sh0rez
Copy link
Contributor Author

sh0rez commented Sep 11, 2024

@RichieSams can you take a look?

@RichieSams
Copy link
Contributor

@RichieSams can you take a look?

Yes. Apologies. I've been meaning to take a look at this for a while now. But kept getting waylaid. I'll review this afternoon

}
}

func (stale Tracker) Collect(max time.Duration) []identity.Stream {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor: Can we rename this to CollectStale()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in processor.go I named it the field stale staleness.Tracker, so using it reads as p.stale.Collect().
Renaming this to p.stale.CollectStale() which I very slightly like less, because it stutters.

no strong opinion

}
linear := newLinear(pcfg, ltel, proc)

return Chain{linear, proc}, nil
Copy link
Contributor

@RichieSams RichieSams Sep 17, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we still need to chain them? What isn't yet implemented in Linear? IMO it would be much simpler (for metrics, file structure, etc) to just switch wholesale, rather than trying to keep both around.

The deltatocumulative-linear branch didn't have chain. So I was confused at first when reviewing the PR.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Linear only does sums on this branch.

Making linear do everything involves some fairly advanced generics usage, which I think deserves to be reviewed properly and probably separately :)

Given the code already exists on the non-partial branch, I expect to send the next patch right after merging this one.

We should take care to merge so that we only release after merging both.

@RichieSams
Copy link
Contributor

RichieSams commented Sep 17, 2024

Overall, I quite like the code. I personally would vote to do the change wholesale (which I believe is what the deltatocumulative-linear branch does). Rather than one datasource at at time, which requires the chain stuff and more confusion.

@sh0rez sh0rez requested a review from a team as a code owner September 23, 2024 09:12
@sh0rez sh0rez force-pushed the deltatocumulative-linear-sums branch from 84a4210 to 79a3988 Compare September 23, 2024 09:27
@jpkrohling jpkrohling added the Skip Changelog PRs that do not require a CHANGELOG.md entry label Sep 24, 2024
adds staleness.Tracker type to `internal/exp/metrics`, which does the
same as `staleness.Staleness`, but in a less coupled way
adds metrics for tracking operations of the linear pipeline
Introduces a highly decoupled, linear processing pipeline.
Instead of overloading `Map.Store()` to do aggregation, staleness and
limiting, this functionality is now explcitly handled in
`ConsumeMetrics`.

This highly aids readability and makes understanding this processor a
lot easier, as less mental context needs to be kept.
Datapoints are first processed by the linear pipeline, and then
forwarded to the traditional one for anything not yet implemented
@sh0rez sh0rez force-pushed the deltatocumulative-linear-sums branch from 79a3988 to e51c2f3 Compare September 24, 2024 12:56
Copy link
Contributor

@carrieedwards carrieedwards left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like this refactor!

@jpkrohling jpkrohling changed the title deltatocumulative: partial linear pipeline [processor/deltatocumulative] partial linear pipeline Sep 27, 2024
@jpkrohling jpkrohling merged commit 2613b89 into open-telemetry:main Sep 27, 2024
156 checks passed
@github-actions github-actions bot added this to the next release milestone Sep 27, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants