Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reader fragment #749

Merged
merged 8 commits into from
Oct 3, 2024
Merged

Conversation

cody-littley
Copy link
Contributor

@cody-littley cody-littley commented Sep 5, 2024

Why are these changes needed?

PR 8 of 9 for the traffic generator project. This PR adds a worker that reads blobs.

Checks

  • I've made sure the lint is passing in this PR.
  • I've made sure the tests are passing. Note that there might be a few flaky tests, in that case, please comment that they are not relevant.
  • Testing Strategy
    • Unit tests
    • Integration tests
    • This PR is not tested :(

Signed-off-by: Cody Littley <cody@eigenlabs.org>
Signed-off-by: Cody Littley <cody@eigenlabs.org>
@cody-littley cody-littley self-assigned this Sep 5, 2024
Signed-off-by: Cody Littley <cody@eigenlabs.org>
Copy link
Contributor

@ian-shim ian-shim left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good! Just few comments

tools/traffic/workers/mock_chain_client.go Outdated Show resolved Hide resolved
api/clients/mock/retrieval_client.go Show resolved Hide resolved
tools/traffic/workers/blob_reader.go Outdated Show resolved Hide resolved
tools/traffic/workers/blob_reader.go Show resolved Hide resolved
tools/traffic/workers/blob_reader.go Outdated Show resolved Hide resolved
tools/traffic/workers/blob_reader.go Outdated Show resolved Hide resolved
tools/traffic/workers/blob_reader.go Outdated Show resolved Hide resolved
tools/traffic/workers/blob_reader.go Outdated Show resolved Hide resolved
assignments := chunks.Assignments

data, err := reader.retriever.CombineChunks(chunks)
if err != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is an interesting case and may warrant different error handling vs. the read failure above.
Read failures may happen for different reasons but mostly suggest some issues with network health.
Blob recovery failure here most certainly suggests a bug. So we may set up noisier alert for this one

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This makes an error log and updates a metric. What did you have in mind for a noisier alert?

Signed-off-by: Cody Littley <cody@eigenlabs.org>
Signed-off-by: Cody Littley <cody@eigenlabs.org>
metrics: &blobReaderMetrics{
generatorMetrics: generatorMetrics,
fetchBatchHeaderMetric: generatorMetrics.NewLatencyMetric("fetch_batch_header"),
fetchBatchHeaderSuccess: generatorMetrics.NewCountMetric("fetch_batch_header_success"),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How does it compare to having a metric with a "status" label for success/failure or valid/invalid (instead of one metric for each status)?

Copy link
Contributor Author

@cody-littley cody-littley Sep 10, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The way this code is currently set up, all of the latency metrics use the same base metric name but with different labels. The same is true for the count metrics and the gauge metrics. If anything, I feel like I'm using labels too much right now. 🙃

The way I should have set up this metrics API would have been to allow each entry to specify both a metric name and a label. Since this pattern is something that extends to all three components in the traffic pattern (the writer, the status checker, and the reader), should I work on this as a part of this PR or split that work into a separate one?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we can work on this + refactoring existing metrics in a separate PR?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sounds like a plan

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Proper wrapping may help avoid boilplate but it can also be an issue to over wrapping, making it not quite clear what's included. I'd think two thins important here may worth considering:

  • labels, useful for both readability and visibility of cardinality
  • documentation, useful for understanding what this metric is about

Also I'd suggest dropping the "Metric" in naming, it doesn't provide information. E.g. invalidBlobMetric would be much more informative if it's numInvalidBlobs, which is clear it's a counter for invalid blobs.

That said it sounds good to me to do it in separate PR -- smaller/modular PR is good.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense. Will address this in the follow up PR.


ctxTimeout, cancel := context.WithTimeout(*r.ctx, r.config.FetchBatchHeaderTimeout)
defer cancel()
batchHeader, err := metrics.InvokeAndReportLatency(r.metrics.fetchBatchHeaderMetric,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wouldn't consider a mix of core logic with monitoring code quite clean, and would prefer a separation.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are a few patterns I could utilize for reporting latency. Which do you prefer?

Pattern A

Wrap the base function in another function that calls the base function and records the amount of time it took to execute. (This is the pattern currently in the PR)

Pattern B

Measure the time before and after the function call.

start := time.Now()
foo()
end := time.Now()
metrics.reportDurationOfFoo(start, end)

Pattern C

Embed the measurement of the time to execute the method inside the method itself.

func foo() {
    start := time.now()
    // business logic
    end := time.now()
    metrics.reportDurationOfFoo(start, end)

Pattern D

Is there a strategy you like that I didn't mention?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Both B & C seem reasonable and clear. I'd choose one or the other based on which method is the more appropriate for reporting the metrics.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've removed the invokeAndReportLatency() metric in favor of pattern B

batchHeader.BlobHeadersRoot,
core.QuorumID(0))
})
cancel()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why cancel the context here? Do you want to wait for timeout?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, the intention was to wait for a timeout. The way I thought this block of code worked would be that the context would be automatically cancelled if the timeout elapses, and that the method RetrieveBlobChunks would block until either the work was completed or the context was cancelled. It's very possible I'm not using a good coding pattern here. What would be the proper way to implement these semantics?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we just need to ensure that the ctxTimeout is canceled within the method's scope?
Why not defer cancel() like we do in L135?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

change made

func (r *BlobReader) reportChunk(operatorId core.OperatorID) {
metric, exists := r.metrics.operatorSuccessMetrics[operatorId]
if !exists {
metric = r.metrics.generatorMetrics.NewCountMetric(fmt.Sprintf("operator_%x_returned_chunk", operatorId))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So there will be hundreds of metrics, not sure if that's a quite usable thing.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that this is going to be noisy. Is it critical for this utility to determine which nodes are failing to return chunks, or would it be ok just to report the fraction of nodes that return chunks?

If knowledge of which nodes are failing to return chunks is important, do you have any recommendations for how to extract this sort of data without using metrics this way?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm would keeping track of counts for only failed retrieval reduce the number of metrics here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Based on our in person discussion, the plan is to address which metrics we enable in production as a follow up task.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this could be updated in this PR

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This part needs to be careful about cardinality. SG to follow up to cap the scope here. LMK if you need some inputs on refactoring this later.

Copy link
Contributor Author

@cody-littley cody-littley Oct 3, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The primary reason why I'd like to address metrics as a follow up is because there are other metrics I added in prior PRs that also need to be considered. Makes sense to me to do all the metrics at once. Willing to do it in this PR if you feel strongly, but want to make you aware that the scope of this PR will grow if we want to address all of the metrics issues.

Copy link
Contributor

@ian-shim ian-shim left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm, @jianoaix can you chime in to Cody's responses?

batchHeader.BlobHeadersRoot,
core.QuorumID(0))
})
cancel()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we just need to ensure that the ctxTimeout is canceled within the method's scope?
Why not defer cancel() like we do in L135?


ctxTimeout, cancel := context.WithTimeout(*r.ctx, r.config.FetchBatchHeaderTimeout)
defer cancel()
batchHeader, err := metrics.InvokeAndReportLatency(r.metrics.fetchBatchHeaderMetric,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Both B & C seem reasonable and clear. I'd choose one or the other based on which method is the more appropriate for reporting the metrics.

metrics: &blobReaderMetrics{
generatorMetrics: generatorMetrics,
fetchBatchHeaderMetric: generatorMetrics.NewLatencyMetric("fetch_batch_header"),
fetchBatchHeaderSuccess: generatorMetrics.NewCountMetric("fetch_batch_header_success"),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we can work on this + refactoring existing metrics in a separate PR?

func (r *BlobReader) reportChunk(operatorId core.OperatorID) {
metric, exists := r.metrics.operatorSuccessMetrics[operatorId]
if !exists {
metric = r.metrics.generatorMetrics.NewCountMetric(fmt.Sprintf("operator_%x_returned_chunk", operatorId))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm would keeping track of counts for only failed retrieval reduce the number of metrics here?

Signed-off-by: Cody Littley <cody@eigenlabs.org>
Signed-off-by: Cody Littley <cody@eigenlabs.org>
Copy link
Contributor

@ian-shim ian-shim left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm

func (r *BlobReader) reportChunk(operatorId core.OperatorID) {
metric, exists := r.metrics.operatorSuccessMetrics[operatorId]
if !exists {
metric = r.metrics.generatorMetrics.NewCountMetric(fmt.Sprintf("operator_%x_returned_chunk", operatorId))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this could be updated in this PR

case <-(*r.ctx).Done():
err := (*r.ctx).Err()
if err != nil {
r.logger.Info("blob r context closed", "err:", err)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

blob reader context closed

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

metrics: &blobReaderMetrics{
generatorMetrics: generatorMetrics,
fetchBatchHeaderMetric: generatorMetrics.NewLatencyMetric("fetch_batch_header"),
fetchBatchHeaderSuccess: generatorMetrics.NewCountMetric("fetch_batch_header_success"),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Proper wrapping may help avoid boilplate but it can also be an issue to over wrapping, making it not quite clear what's included. I'd think two thins important here may worth considering:

  • labels, useful for both readability and visibility of cardinality
  • documentation, useful for understanding what this metric is about

Also I'd suggest dropping the "Metric" in naming, it doesn't provide information. E.g. invalidBlobMetric would be much more informative if it's numInvalidBlobs, which is clear it's a counter for invalid blobs.

That said it sounds good to me to do it in separate PR -- smaller/modular PR is good.

} else {
end := time.Now()
duration := end.Sub(start)
r.metrics.fetchBatchHeaderMetric.ReportLatency(duration)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These 3 lines can be simplified as r.metrics.fetchBatchHeaderMetric.ReportLatency(time.Since(start))

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

change made

r.logger.Error("failed to get batch header", "err:", err)
r.metrics.fetchBatchHeaderFailure.Increment()
return
} else {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need else after a return

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

simplified

r.logger.Error("failed to read chunks", "err:", err)
r.metrics.readFailureMetric.Increment()
return
} else {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

similar here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

func (r *BlobReader) reportChunk(operatorId core.OperatorID) {
metric, exists := r.metrics.operatorSuccessMetrics[operatorId]
if !exists {
metric = r.metrics.generatorMetrics.NewCountMetric(fmt.Sprintf("operator_%x_returned_chunk", operatorId))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This part needs to be careful about cardinality. SG to follow up to cap the scope here. LMK if you need some inputs on refactoring this later.


if err != nil {
tracker.getStatusErrorCountMetric.Increment()
return nil, err
} else {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here and below: can be simplified as mentioned above

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

Signed-off-by: Cody Littley <cody@eigenlabs.org>
@cody-littley cody-littley merged commit b3d1c35 into Layr-Labs:master Oct 3, 2024
6 checks passed
@cody-littley cody-littley deleted the reader-fragment branch October 3, 2024 19:16
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants