Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add generics to subscriber and publisher and fix potential deadlock #602

Merged
merged 10 commits into from
Oct 10, 2024

Conversation

janezpodhostnik
Copy link
Contributor

@janezpodhostnik janezpodhostnik commented Sep 30, 2024

Description

With generics the code is easier to read and some checks are no longer needed.

A change also include in this is the removal of the uuid of the subscriber. Pointer comparison can be used instead, making it a bit simpler.

I found a bug with the error channel on the subscriber. Sending an error to that channel would just block forever, since the channel was always nil.


For contributor use:

  • Targeted PR against master branch
  • Linked to Github issue with discussion and accepted design OR link to spec that describes this work.
  • Code follows the standards mentioned here.
  • Updated relevant documentation
  • Re-reviewed Files changed in the Github PR explorer
  • Added appropriate labels

Summary by CodeRabbit

Release Notes

  • New Features

    • Enhanced type safety in the StreamAPI and event ingestion engines by introducing generics for publishers.
    • Updated publisher types to specifically handle blocks, transactions, and logs.
  • Bug Fixes

    • Improved clarity and consistency in method signatures across various services.
  • Chores

    • Removed direct dependency on github.com/google/uuid, now managed indirectly.

Copy link
Contributor

coderabbitai bot commented Sep 30, 2024

Walkthrough

The changes in this pull request involve significant updates to the StreamAPI struct and its methods in the api/stream.go file, focusing on enhancing type safety through the use of generics. The publisher fields of the StreamAPI struct and the associated constructor and method signatures have been modified to accommodate specific types. Additionally, a new generic subscription function has been introduced, allowing for more flexible handling of different types of publishers and callbacks.

Changes

File Change Summary
api/stream.go Updated StreamAPI struct to use generics for publisher fields; modified constructor and method signatures accordingly.
go.mod Removed direct dependency on github.com/google/uuid v1.6.0 while adding it as an indirect dependency.

Possibly related PRs

  • Check data integrity for EVM events #529: The changes in the StreamAPI struct and its methods in api/stream.go involve handling events and transactions, which relates to the data integrity checks for EVM events introduced in the models/events.go file. Both PRs focus on enhancing the handling and validation of blockchain-related data.

Suggested reviewers

  • sideninja
  • peterargue

🐇 In the code we hop and play,
With types so clear, we pave the way.
Generics now in every nook,
A safer path, just take a look!
From streams to blocks, all types align,
In our rabbit hole, the code will shine! 🌟


📜 Recent review details

Configuration used: CodeRabbit UI
Review profile: CHILL

📥 Commits

Files that changed from the base of the PR and between 47ecbea and e615281.

📒 Files selected for processing (1)
  • go.mod (1 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
  • go.mod

Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media?

❤️ Share
🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Generate unit testing code for this file.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai generate unit testing code for this file.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and generate unit testing code.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

@@ -162,8 +147,7 @@ func (s *StreamAPI) newSubscription(

subs := models.NewSubscription(callback(notifier, rpcSub))

rpcSub.ID = rpc.ID(subs.ID().String())
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am a bit hesitant about removing this assignment here. If I remember correctly, this was added by Gregor, due to some issue with the event streaming API. But on the other hand, the CI is passing, so maybe it's not needed after all 🤷‍♂️

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the heads up, I'll take a look at the history, maybe there were some clues what specific problem this was solving.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you don't find anything, we can just merge it. It doesn't seem to break anything, and the E2E tests do exercise this part.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@janezpodhostnik I did investigate this. What I found is:

rpcSub.ID

has the format: 0x8b841deff1dbca0881c995ad77574d17.
I think Gregor changed it to uuid, to match the format of the event streaming API,
e.g.: 69a20431-f601-43da-99f1-aa4bfd4d1bac

But we're using entirely the subscription functionality from Geth, so there's no need to go with the uuid format.
I think we're all good here. It would make sense though, to log the proper subscription-id, e.g.

l := logger.With().Str("subscription-id", string(rpcSub.ID)).Logger()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

addressed in 47ecbea. Please check if that is what you had in mind.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice, good idea adding both 👍

Copy link
Collaborator

@m-Peter m-Peter left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM!
Just one small remark/concern, maybe it's nothing to worry about though.

Base automatically changed from janezp/refactor-engine to main October 2, 2024 14:01
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

🧹 Outside diff range and nitpick comments (2)
services/traces/engine.go (1)

Line range hint 1-150: Summary: Successful implementation of generics and alignment with PR objectives

The changes in this file successfully implement generics for the subscriber and publisher components, aligning well with the PR objectives. Key improvements include:

  1. Consistent use of generics for blocksPublisher in both the Engine struct and the NewTracesIngestionEngine function.
  2. Refactoring of the Notify method to use a specific *models.Block type instead of any.
  3. Removal of the ID method, indicating a change in the subscription mechanism as mentioned in the PR objectives.

These changes enhance code readability, improve type safety, and eliminate the need for certain checks, as intended. The modifications streamline the codebase and reduce the potential for runtime errors.

Consider documenting the rationale behind removing the ID method and any changes to the subscription mechanism in the code comments or PR description. This will help future maintainers understand the architectural decisions made in this PR.

services/traces/engine_test.go (1)

30-30: Summary: Successful implementation of generics with potential for refactoring.

The changes consistently implement generics for the NewPublisher method across all test cases, improving type safety and readability as per the PR objectives. The existing test logic remains unchanged, indicating backward compatibility.

Consider refactoring these test cases to reduce code duplication, possibly by creating a helper function for setting up the NewPublisher[*models.Block]() instance and other common setup steps.

Would you like assistance in creating a helper function to reduce code duplication across these test cases?

Also applies to: 116-116, 233-233

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL

📥 Commits

Files that changed from the base of the PR and between 4798d65 and 548db50.

📒 Files selected for processing (9)
  • api/stream.go (7 hunks)
  • bootstrap/bootstrap.go (4 hunks)
  • go.mod (1 hunks)
  • models/stream.go (1 hunks)
  • services/ingestion/engine.go (3 hunks)
  • services/ingestion/engine_test.go (6 hunks)
  • services/requester/pool.go (1 hunks)
  • services/traces/engine.go (3 hunks)
  • services/traces/engine_test.go (3 hunks)
🔇 Additional comments (32)
services/requester/pool.go (3)

38-38: Approve the updated parameter type in NewTxPool

The modification of the transactionsPublisher parameter type to *models.Publisher[*gethTypes.Transaction] in the NewTxPool function is consistent with the change in the TxPool struct. This ensures type consistency and improves the overall type safety of the code.

To ensure this change is properly implemented across the codebase, please run the following script to check for any calls to NewTxPool that might need updating:

#!/bin/bash
# Description: Check for calls to NewTxPool that might need updating

# Search for NewTxPool calls
echo "Searching for NewTxPool calls:"
rg --type go "NewTxPool\(" -C 3

# Search for NewTxPool function definition
echo "Searching for NewTxPool function definition:"
ast-grep --lang go --pattern 'func NewTxPool($_) $_'

32-38: Overall impact: Improved type safety with generics

The introduction of generics for the Publisher type in both the TxPool struct and its constructor NewTxPool aligns well with the PR objectives. These changes enhance code readability, improve type safety, and potentially simplify the codebase by reducing the need for type assertions.

While the changes appear to be low-risk and localized, it's crucial to ensure consistency throughout the codebase.

To ensure a comprehensive implementation of this change:

  1. Review all usages of TxPool and NewTxPool across the project.
  2. Update any code that interacts with the txPublisher field to leverage the new generic type.
  3. Verify that all tests related to TxPool are updated and passing.
  4. Consider adding new tests that specifically verify the behavior with the generic Publisher[*gethTypes.Transaction].

Run the following script to get an overview of TxPool usage across the project:

#!/bin/bash
# Description: Overview of TxPool usage across the project

# Search for TxPool struct usage
echo "Searching for TxPool struct usage:"
rg --type go "type.*TxPool.*struct" -C 3

# Search for NewTxPool function calls
echo "Searching for NewTxPool function calls:"
rg --type go "NewTxPool\(" -C 3

# Search for txPublisher field access
echo "Searching for txPublisher field access:"
rg --type go "\.txPublisher" -C 3

# Search for Publisher type usage
echo "Searching for Publisher type usage:"
rg --type go "Publisher(\[.*\])?" -C 3

This script will help identify areas that might need attention due to the introduction of generics.


32-32: Approve the generic type for txPublisher

The introduction of the generic type *gethTypes.Transaction for the txPublisher field enhances type safety and aligns with the PR objective of incorporating generics. This change improves code readability and reduces the need for type assertions.

To ensure this change doesn't introduce any issues, please run the following script to check for any other occurrences of TxPool that might need updating:

✅ Verification successful

Verified: txPublisher Update is Safe

The change to the txPublisher field in the TxPool struct is localized and does not affect other parts of the codebase. All usages of txPublisher are confined to services/requester/pool.go, ensuring that the introduction of the generic type *gethTypes.Transaction does not introduce any issues.

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Description: Check for other occurrences of TxPool that might need updating

# Search for TxPool usage
echo "Searching for TxPool usage:"
rg --type go "TxPool" -C 3

# Search for txPublisher usage within TxPool
echo "Searching for txPublisher usage within TxPool:"
ast-grep --lang go --pattern 'struct TxPool {
  $$$
  txPublisher $_
  $$$
}'

Length of output: 17541

services/traces/engine.go (3)

32-32: Excellent use of generics for type safety

The change from *models.Publisher to *models.Publisher[*models.Block] for the blocksPublisher field is a great improvement. This use of generics enhances type safety and code readability by explicitly specifying that the publisher deals with *models.Block types. It eliminates the need for type assertions when working with published data, reducing the potential for runtime errors.


41-41: Consistent use of generics in constructor

The update to the NewTracesIngestionEngine function signature, changing the blocksPublisher parameter type to *models.Publisher[*models.Block], is consistent with the earlier change to the Engine struct. This ensures type consistency throughout the codebase and reinforces the benefits of using generics for improved type safety and readability.


72-72: Improved type safety in Notify method

The change to the Notify method signature from func (e *Engine) Notify(data any) to func (e *Engine) Notify(block *models.Block) is a significant improvement. This change:

  1. Eliminates the need for type assertion within the method.
  2. Aligns with the use of generics in the blocksPublisher field.
  3. Improves type safety and makes the code more straightforward.

These improvements contribute to better code readability and reduce the potential for runtime errors.

To ensure this change doesn't affect other parts of the codebase, please run the following verification script:

This script will help identify any other places in the codebase that might need to be updated to match this new signature.

✅ Verification successful

Further Verification Required for Notify Method Changes

The initial verification identified additional Notify method implementations and usages:

  • models/stream.go:func (b *Subscription[T]) Notify(data T)
  • cmd/main/main.go:signal.Notify(osSig, syscall.SIGINT, syscall.SIGTERM)
  • Multiple calls in api/stream.go

To ensure that the signature change in services/traces/engine.go does not adversely affect other parts of the codebase, please run the following additional verification scripts:

These scripts will help isolate and verify the impact of the Notify method signature change across the codebase.


Notify Method Signature Change Verified Successfully

The modification to the Notify method in services/traces/engine.go has been thoroughly verified and confirmed to be safe. This change:

  • Does not affect other Notify method implementations in models/stream.go.
  • Does not interfere with signal.Notify or any notifier.Notify calls in api/stream.go.
  • Maintains consistent and improved type safety within the affected method.

No further actions are required.

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Description: Check for any other implementations or uses of the Notify method that might be affected by this change.

# Test: Search for other Notify method implementations or interface definitions
rg --type go -e 'func.*Notify\s*\([^)]*\)' -e 'type.*interface.*\{.*Notify.*\}'

# Test: Search for calls to the Notify method
rg --type go -e '\.Notify\s*\('

Length of output: 597


Script:

#!/bin/bash
# Description: Identify all Notify method implementations excluding services/traces/engine.go
rg --type go 'func\s+\(.*\)\s+Notify\s*\(.*\)' | grep -v 'services/traces/engine.go'

# Description: Check all Notify method calls excluding external signal.Notify
rg --type go '\.Notify\s*\(' | grep -v 'signal.Notify'

Length of output: 476

services/traces/engine_test.go (3)

116-116: LGTM: Consistent use of generic type parameter.

The change to use models.NewPublisher[*models.Block]() is consistent with the previous test case and correctly implements the generic type parameter for the blocks publisher. This maintains consistency across test cases and aligns with the PR objective.


233-233: LGTM: Consistent implementation of generics across all test cases.

The change to use models.NewPublisher[*models.Block]() maintains consistency with the previous test cases and correctly implements the generic type parameter for the block broadcaster. This ensures uniformity across all test scenarios in the file.


30-30: LGTM: Generic type parameter correctly applied.

The change to use models.NewPublisher[*models.Block]() correctly implements the generic type parameter for the block publisher. This aligns with the PR objective of adding generics to improve code readability and type safety.

To ensure consistency across the codebase, let's verify the usage of NewPublisher:

✅ Verification successful

Verified: All NewPublisher usages are now generic.

The usage of models.NewPublisher[*models.Block]() has been correctly updated across the codebase. All instances utilize the generic type parameter, ensuring consistency and enhancing type safety.

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Description: Verify the usage of NewPublisher across the codebase

# Test: Search for all occurrences of NewPublisher
echo "Occurrences of NewPublisher:"
rg --type go 'NewPublisher\[.*\]'

# Test: Check for any remaining non-generic usage of NewPublisher
echo "\nPotential non-generic usage of NewPublisher:"
rg --type go 'NewPublisher\(\)' -g '!*_test.go'

Length of output: 1590

services/ingestion/engine.go (5)

9-9: LGTM: Import statement added for geth types.

The addition of the gethTypes import is necessary for the updated type definitions in the Engine struct. Good job on aliasing it to avoid potential naming conflicts.


45-46: Excellent use of generics for improved type safety.

The updates to blocksPublisher and logsPublisher field types in the Engine struct are well-implemented:

  1. blocksPublisher is now *models.Publisher[*models.Block]
  2. logsPublisher is now *models.Publisher[[]*gethTypes.Log]

These changes enhance type safety and code clarity, aligning perfectly with the PR objectives. The specific types used for each publisher are appropriate and will help prevent type-related errors at compile-time.


57-58: LGTM: Function signature updated consistently with struct changes.

The NewEventIngestionEngine function signature has been correctly updated to match the changes in the Engine struct:

  1. blocksPublisher *models.Publisher[*models.Block]
  2. logsPublisher *models.Publisher[[]*gethTypes.Log]

This update maintains type consistency throughout the codebase and ensures the function remains compatible with the updated Engine struct. Good job on keeping everything in sync!


Line range hint 1-309: Summary: Excellent implementation of generics for improved type safety.

Overall, the changes in this file successfully introduce generics to the Publisher types used in the Engine struct and its associated methods. Key points:

  1. The use of generics enhances type safety and code clarity.
  2. Changes are consistent across struct fields and function signatures.
  3. The implementation maintains backward compatibility, minimizing the impact on the existing codebase.

These updates align well with the PR objectives of enhancing readability and simplifying the codebase. The introduction of generics should help prevent type-related errors at compile-time and make the code more self-documenting.

Great job on this implementation!


Line range hint 201-209: LGTM: Unchanged method implementation maintains compatibility.

The processEvents method implementation remains unchanged, which is good for maintaining backward compatibility. The Publish method calls for both blocksPublisher and logsPublisher are still valid with the new generic types.

However, to ensure full type safety:

Could you please verify that the Publish method of the generic Publisher type is correctly implemented to handle *models.Block and []*gethTypes.Log types respectively? This will confirm that the type safety extends to the method calls as well.

go.mod (1)

84-84: Approve change and verify UUID usage removal

The change from a direct to an indirect dependency for github.com/google/uuid aligns with the PR objectives of modifying the handling of subscriber's UUID. This suggests that the project no longer directly uses the UUID package, which is consistent with the proposed switch to pointer comparison for subscribers.

To ensure that all direct usage of the UUID package has been removed from the codebase, please run the following verification script:

If the script returns any results, please review those occurrences and ensure they have been properly addressed in this PR.

bootstrap/bootstrap.go (5)

13-13: LGTM: Import addition aligns with generics implementation.

The addition of the gethTypes import is consistent with the PR's objective of implementing generics and improving type safety. This change supports the use of Ethereum types in the Publishers struct.


40-42: Excellent use of generics in the Publishers struct.

The implementation of generics in the Publishers struct significantly enhances type safety and code clarity. Each field now has a specific type parameter:

  • Block uses *models.Block
  • Transaction uses *gethTypes.Transaction
  • Logs uses []*gethTypes.Log

This change aligns perfectly with the PR's objective of improving code readability and eliminating the need for certain type checks.


76-78: LGTM: Publishers initialization correctly uses generics.

The initialization of the Publishers struct has been updated to use the new generic types, which is consistent with the struct definition. This change ensures type safety and clarity in the code.


213-217: LGTM: Improved formatting for txPool initialization.

The txPool initialization has been reformatted for better readability. This change improves code clarity without altering the functionality.


Line range hint 1-524: Overall assessment: Excellent implementation of generics.

The changes in this file successfully implement generics in the Publishers struct and its usage, aligning perfectly with the PR's objectives. Key improvements include:

  1. Enhanced type safety and clarity in the Publishers struct.
  2. Consistent use of generics in the Publishers initialization.
  3. Improved code readability through better formatting.

These changes will likely simplify the codebase and reduce the need for certain type checks, as intended. The implementation is clean and well-executed.

services/ingestion/engine_test.go (6)

71-72: Excellent use of generics for type-specific publishers

The change from generic publishers to type-specific ones (models.NewPublisher[*models.Block]() and models.NewPublisher[[]*gethTypes.Log]()) enhances type safety and code clarity. This aligns well with the PR objective of incorporating generics into the subscriber and publisher components.


151-152: Consistent use of type-specific publishers across test cases

The changes here mirror those in the previous test case, maintaining consistency in the use of type-specific publishers. This uniformity is commendable and helps ensure that all test cases accurately reflect the updated implementation.


266-267: Thorough application of type-specific publishers across all test cases

The consistent application of type-specific publishers (models.NewPublisher[*models.Block]() and models.NewPublisher[[]*gethTypes.Log]()) across all test cases demonstrates a thorough and systematic update. This ensures that all scenarios are tested with the new generic implementation.


369-370: Comprehensive coverage of test scenarios with type-specific publishers

The consistent use of type-specific publishers (models.NewPublisher[*models.Block]() and models.NewPublisher[[]*gethTypes.Log]()) in this test case further demonstrates the comprehensive coverage of various scenarios. This thorough approach ensures that the new generic implementation is tested under different conditions.


418-419: Improved code formatting

This minor formatting adjustment, adding a comma and moving the closing brace to a new line, enhances code readability. While not directly related to the generics implementation, it's a welcome improvement that aligns with good coding practices.


Line range hint 1-619: Summary: Successful incorporation of generics in publisher components

The changes in this file consistently implement type-specific publishers across all test cases, aligning perfectly with the PR objective of incorporating generics into the subscriber and publisher components. The thorough and systematic update ensures that all test scenarios accurately reflect the new generic implementation.

Key points:

  1. Consistent use of models.NewPublisher[*models.Block]() and models.NewPublisher[[]*gethTypes.Log]() across all test cases.
  2. No modifications to the core test logic, maintaining the integrity of existing tests.
  3. Improved type safety and code clarity through the use of specific types.

These changes should contribute to a more robust and type-safe implementation in the main code. Great job on maintaining consistency and improving the overall quality of the codebase.

api/stream.go (6)

27-29: Use of Generics in Publisher Fields Enhances Type Safety

Updating the blocksPublisher, transactionsPublisher, and logsPublisher fields to use generics improves type safety and code clarity by explicitly defining the types of data being published.


38-40: Constructor Parameters Updated to Reflect Generic Types

The parameters in NewStreamAPI are updated to use generic Publisher types, ensuring consistency with the struct fields and enhancing type safety.


56-61: Callback Function in NewHeads Method Updated Correctly

The callback function now accepts *models.Block, aligning with the generic newSubscription function. This change ensures that the correct data type is handled in the subscription.


88-93: Correct Update to Callback Function in NewPendingTransactions

The callback function in NewPendingTransactions now accepts *gethTypes.Transaction, matching the generic type in newSubscription. This ensures proper handling of transaction data.


111-116: Updated Callback Function in Logs Method Aligns with Generic Subscription

The callback function now accepts []*gethTypes.Log, correctly reflecting the data type used in the logsPublisher. This change aligns with the generic newSubscription function and ensures type-safe handling of log data.


135-139: Generalized newSubscription Function with Generics and Logger Parameter

The newSubscription function is now generic over type T and includes a logger parameter. This enhances flexibility, allowing for different data types in subscriptions, and provides better logging capabilities.

models/stream.go Outdated Show resolved Hide resolved
models/stream.go Show resolved Hide resolved
api/stream.go Outdated Show resolved Hide resolved
@janezpodhostnik
Copy link
Contributor Author

@m-Peter After you comment I went back and added some tests.

Good thing I did. There was a bug in the subscription. The err channel was never initialized. so sending to it would just block forever!

I also added a failsafe for sending to the err channel in case no one is listening. I added a panic, but I'm also thinking just logging it might be a better option.

PTAL

@janezpodhostnik janezpodhostnik changed the title Add generics to subscriber and publisher Add generics to subscriber and publisher and fix potential deadlock Oct 2, 2024
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🧹 Outside diff range and nitpick comments (1)
models/stream_test.go (1)

87-90: Flaky test due to timing assumptions

The test relies on reaching at least 50 calls within a fixed time frame, which can result in flaky tests depending on system load and scheduling.

Consider using synchronization primitives to control the flow of the test, ensuring that subscribers receive the expected number of calls without relying on timing.

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL

📥 Commits

Files that changed from the base of the PR and between 548db50 and 44112e1.

📒 Files selected for processing (2)
  • models/stream.go (1 hunks)
  • models/stream_test.go (1 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
  • models/stream.go
🔇 Additional comments (2)
models/stream_test.go (2)

13-43: Well-structured tests enhance readability

The use of subtests with t.Run improves the organization and readability of the test suite, making it easier to understand and maintain.


98-123: Effective validation of error propagation

The error handling test correctly verifies that errors returned by subscription callbacks are properly received through the error channel.

models/stream_test.go Outdated Show resolved Hide resolved
models/stream_test.go Outdated Show resolved Hide resolved
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL

📥 Commits

Files that changed from the base of the PR and between 44112e1 and df0be10.

📒 Files selected for processing (1)
  • models/stream_test.go (1 hunks)
🔇 Additional comments (1)
models/stream_test.go (1)

14-125: Tests are comprehensive and well-structured

The test suite effectively covers various scenarios, including edge cases and concurrent operations, ensuring the robustness of the publisher-subscriber model.

models/stream_test.go Outdated Show resolved Hide resolved
models/stream_test.go Outdated Show resolved Hide resolved
Copy link
Collaborator

@m-Peter m-Peter left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM!
Just a small change on the subscription-id value that is logged.

err := b.callback(data)
if err != nil {
b.err <- err
select {
case b.err <- err:
Copy link
Member

@zhangchiqing zhangchiqing Oct 3, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if the subscription cares about callback returning error or not.

I think the subscription's responsibility is to deliver the data to the callback. If there is error, it's the callback's job to handle it, log it, or even crash.

The callback has all the context of why this would error, so it's better being handled there.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed!
Right now I just added a band-aid, but I will definitely fix this further with a following PR.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🧹 Outside diff range and nitpick comments (1)
models/stream_test.go (1)

6-6: Consider using sync/atomic for all atomic operations.

For consistency, consider using sync/atomic for all atomic operations in the file. You can replace atomic.Uint64 with uint64 and use atomic.LoadUint64 and atomic.AddUint64 for atomic operations.

Apply this diff to use sync/atomic consistently:

 type mockSubscription struct {
 	*models.Subscription[mockData]
-	callCount atomic.Uint64
+	callCount uint64
 }
 
 func newMockSubscription() *mockSubscription {
 	s := &mockSubscription{}
 	s.Subscription = models.NewSubscription[mockData](func(data mockData) error {
-		s.callCount.Add(1)
+		atomic.AddUint64(&s.callCount, 1)
 		return nil
 	})
 	return s
 }
 
 func (s *mockSubscription) CallCount() uint64 {
-	return s.callCount.Load()
+	return atomic.LoadUint64(&s.callCount)
 }
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL

📥 Commits

Files that changed from the base of the PR and between df0be10 and 0180c2e.

📒 Files selected for processing (2)
  • models/stream_test.go (1 hunks)
  • services/ingestion/engine.go (3 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
  • services/ingestion/engine.go
🔇 Additional comments (1)
models/stream_test.go (1)

1-165: LGTM!

The test suite is comprehensive and well-structured, covering various scenarios for the publisher-subscriber model. The use of sub-tests, mock objects, and synchronization primitives ensures thorough testing of the functionality.

Comment on lines +113 to +140
t.Run("error handling", func(t *testing.T) {
p := newMockPublisher()
s := &mockSubscription{}
errContent := fmt.Errorf("failed to process data")

s.Subscription = models.NewSubscription[mockData](func(data mockData) error {
s.callCount.Add(1)
return errContent
})

p.Subscribe(s)

shouldReceiveError := make(chan struct{})
ready := make(chan struct{})
go func() {
close(ready)
select {
case err := <-s.Error():
require.ErrorIs(t, err, errContent)
case <-shouldReceiveError:
require.Fail(t, "should have received error")
}
}()
<-ready

p.Publish(mockData{})
close(shouldReceiveError)
})
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Ensure proper cleanup of goroutines in the error handling test.

The error handling test spawns a goroutine to listen for errors, but it doesn't ensure proper cleanup of the goroutine. If the test fails or panics, the goroutine may continue running in the background.

To ensure proper cleanup, use a defer statement to close the shouldReceiveError channel and wait for the goroutine to exit. Apply this diff:

 func Test_Stream(t *testing.T) {
     // ...
 
     t.Run("error handling", func(t *testing.T) {
         // ...
 
         shouldReceiveError := make(chan struct{})
+        defer close(shouldReceiveError)
+
         ready := make(chan struct{})
+        done := make(chan struct{})
+        defer func() {
+            <-done
+        }()
+
         go func() {
+            defer close(done)
             close(ready)
             select {
             case err := <-s.Error():
                 require.ErrorIs(t, err, errContent)
             case <-shouldReceiveError:
                 require.Fail(t, "should have received error")
             }
         }()
         <-ready
 
         p.Publish(mockData{})
-        close(shouldReceiveError)
     })
 }

This ensures that the shouldReceiveError channel is always closed, and the test waits for the goroutine to finish before proceeding.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
t.Run("error handling", func(t *testing.T) {
p := newMockPublisher()
s := &mockSubscription{}
errContent := fmt.Errorf("failed to process data")
s.Subscription = models.NewSubscription[mockData](func(data mockData) error {
s.callCount.Add(1)
return errContent
})
p.Subscribe(s)
shouldReceiveError := make(chan struct{})
ready := make(chan struct{})
go func() {
close(ready)
select {
case err := <-s.Error():
require.ErrorIs(t, err, errContent)
case <-shouldReceiveError:
require.Fail(t, "should have received error")
}
}()
<-ready
p.Publish(mockData{})
close(shouldReceiveError)
})
t.Run("error handling", func(t *testing.T) {
p := newMockPublisher()
s := &mockSubscription{}
errContent := fmt.Errorf("failed to process data")
s.Subscription = models.NewSubscription[mockData](func(data mockData) error {
s.callCount.Add(1)
return errContent
})
p.Subscribe(s)
shouldReceiveError := make(chan struct{})
defer close(shouldReceiveError)
ready := make(chan struct{})
done := make(chan struct{})
defer func() {
<-done
}()
go func() {
defer close(done)
close(ready)
select {
case err := <-s.Error():
require.ErrorIs(t, err, errContent)
case <-shouldReceiveError:
require.Fail(t, "should have received error")
}
}()
<-ready
p.Publish(mockData{})
})

Comment on lines +46 to +111
t.Run("concurrent subscribe, publish, unsubscribe, publish", func(t *testing.T) {

p := newMockPublisher()

stopPublishing := make(chan struct{})

published := make(chan struct{})

// publishing
go func() {
for {
select {
case <-stopPublishing:
return
case <-time.After(time.Millisecond * 1):
p.Publish(mockData{})

select {
case published <- struct{}{}:
default:
}
}
}
}()

waitAllSubscribed := sync.WaitGroup{}
waitAllUnsubscribed := sync.WaitGroup{}

// 10 goroutines adding 10 subscribers each
// and then unsubscribe all
waitAllSubscribed.Add(10)
waitAllUnsubscribed.Add(10)
for i := 0; i < 10; i++ {
go func() {
subscriptions := make([]*mockSubscription, 10)

for j := 0; j < 10; j++ {
s := newMockSubscription()
subscriptions[j] = s
p.Subscribe(s)

}
waitAllSubscribed.Done()
waitAllSubscribed.Wait()

// wait for all subscribers to receive data
for i := 0; i < 10; i++ {
<-published
}

for _, s := range subscriptions {
p.Unsubscribe(s)
}

// there should be at least 1 call
for j := 0; j < 10; j++ {
require.GreaterOrEqual(t, subscriptions[j].CallCount(), uint64(10))
}

waitAllUnsubscribed.Done()
}()
}

waitAllUnsubscribed.Wait()
close(stopPublishing)
})
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codebase verification

Insufficient Buffer Size for published Channel

The published channel is currently unbuffered, which may lead to blocking the publishing goroutine if subscribers are not ready to receive. This can cause the test to hang or not function as expected.

  • Recommendation: Add a buffer size to the published channel to prevent blocking. For example:
    published := make(chan struct{}, 100) // Adjust the buffer size as needed
🔗 Analysis chain

Verify the correctness of the concurrent test.

The concurrent test is well-designed and uses appropriate synchronization primitives. However, there are a few points to verify:

  1. Ensure that the published channel has sufficient buffer size to avoid blocking the publishing goroutine.
  2. Confirm that waiting for 10 published events per subscriber is sufficient to guarantee that all subscribers receive the expected number of calls.
  3. Verify that the GreaterOrEqual assertion with a threshold of 10 is appropriate and accounts for potential race conditions.

Run the following script to analyze the concurrent behavior:

Review the output to ensure that:

  1. The published channel has a sufficient buffer size.
  2. The publishing goroutine sends to the published channel.
  3. Each subscriber waits for 10 published events.
  4. The assertion checks for at least 10 calls per subscriber.

If any of these points are not met, consider adjusting the test accordingly.

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash

# Find the size of the `published` channel buffer.
rg --type go 'published\s*:=\s*make\(chan\s+struct\{\},\s*(\d+)\)'

# Check if the publishing goroutine sends to the `published` channel.
rg --type go 'case\s+published\s*<-\s*struct\{\}\{\}:'

# Analyze the waiting logic for published events.
rg --type go 'for\s+i\s*:=\s*0;\s*i\s*<\s*10;\s*i\+\+\s*\{[\s\S]*<-published[\s\S]*\}'

# Examine the assertion for the expected number of calls.
rg --type go 'require\.GreaterOrEqual\(t,\s*subscriptions\[j\]\.CallCount\(\),\s*uint64\(10\)\)'

Length of output: 465

@janezpodhostnik janezpodhostnik merged commit f29d065 into main Oct 10, 2024
2 checks passed
@janezpodhostnik janezpodhostnik deleted the janez/generic-publisher-subscriber branch October 10, 2024 00:38
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
Status: ✅ Done
Development

Successfully merging this pull request may close these issues.

3 participants