-
Notifications
You must be signed in to change notification settings - Fork 10
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Update blockTracker as messages are indexed #284
Update blockTracker as messages are indexed #284
Conversation
Caution Review failedThe pull request is closed. WalkthroughThis pull request introduces several changes across multiple files, primarily focusing on enhancing type safety and functionality related to blockchain block tracking. A new interface Changes
Possibly related PRs
Suggested reviewers
Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media? 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
This stack of pull requests is managed by Graphite. Learn more about stacking. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 4
🧹 Outside diff range and nitpick comments (11)
pkg/indexer/interface.go (1)
5-8
: Consider adding godoc comments.Adding documentation would improve clarity about:
- The interface's purpose in block tracking
- Expected behavior of GetLatestBlock
- Possible error cases from UpdateLatestBlock
+// IBlockTracker defines the interface for tracking the latest processed block number +// in a blockchain indexing context. type IBlockTracker interface { + // GetLatestBlock returns the most recently processed block number GetLatestBlock() uint64 + // UpdateLatestBlock updates the latest processed block number + // Returns an error if the update operation fails UpdateLatestBlock(ctx context.Context, block uint64) error }pkg/indexer/indexer_test.go (2)
22-24
: Consider tightening mock expectationsThe
UpdateLatestBlock
expectation usesmock.Anything
for both arguments, which might be too permissive. Consider validating the actual block number being passed to ensure the correct block is being tracked.-blockTracker.EXPECT().UpdateLatestBlock(mock.Anything, mock.Anything).Return(nil) +blockTracker.EXPECT().UpdateLatestBlock(mock.Anything, event.BlockNumber).Return(nil)
Synchronization improvement needed, but retry behavior is consistent
The codebase shows a consistent retry pattern in
pkg/indexer/indexer.go
with a documented 100ms sleep for retryable errors. The test inindexer_test.go
specifically verifies this retry behavior by simulating a retryable error on the first attempt. However, the test synchronization usingtime.Sleep
should be improved.Consider this synchronization improvement:
-go indexLogs(context.Background(), channel, testutils.NewLog(t), logStorer, blockTracker) -time.Sleep(200 * time.Millisecond) +done := make(chan struct{}) +ctx, cancel := context.WithCancel(context.Background()) +defer cancel() +go func() { + indexLogs(ctx, channel, testutils.NewLog(t), logStorer, blockTracker) + close(done) +}() +select { +case <-done: +case <-time.After(time.Second): + t.Fatal("test timed out") +}🔗 Analysis chain
Line range hint
59-63
: Verify retry behavior and improve test synchronizationThe test uses
time.Sleep
which could be flaky. Additionally, let's verify that the retry behavior is consistent across the codebase.Run this script to verify retry behavior consistency:
Consider the same synchronization improvement as suggested for TestIndexLogsSuccess:
-go indexLogs(context.Background(), channel, testutils.NewLog(t), logStorer, blockTracker) -time.Sleep(200 * time.Millisecond) +done := make(chan struct{}) +ctx, cancel := context.WithCancel(context.Background()) +defer cancel() +go func() { + indexLogs(ctx, channel, testutils.NewLog(t), logStorer, blockTracker) + close(done) +}() +select { +case <-done: +case <-time.After(time.Second): + t.Fatal("test timed out") +}🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash # Description: Check for consistent retry behavior across the codebase # Look for retry-related constants or configurations rg -A 2 "retry|backoff|attempt" # Look for similar error handling patterns ast-grep --pattern 'if $err != nil { $$$ return $$.NewLogStorageError($$$) $$$ }'Length of output: 3287
pkg/indexer/blockTracker.go (2)
Line range hint
80-100
: Consider documenting the zero block behaviorThe function correctly handles the case of new contracts by returning 0 when no rows are found. Consider adding a comment to explicitly document this behavior, as it's an important contract of the function.
func loadLatestBlock( ctx context.Context, contractAddress string, querier *queries.Queries, ) (uint64, error) { + // Returns 0 for new contracts (when no block has been indexed yet) latestBlock, err := querier.GetLatestBlock(ctx, contractAddress)
Line range hint
1-100
: Well-designed concurrency patternThe implementation uses a good combination of atomic operations for fast reads and mutex for coordinating DB updates. The double-checked locking pattern in
UpdateLatestBlock
prevents unnecessary DB operations while maintaining thread safety.pkg/blockchain/rpcLogStreamer_test.go (1)
Add test coverage for blockTracker integration in rpcLogStreamer_test.go
Based on the verification results:
- The indexer_test.go shows blockTracker is being used and tested for updating latest blocks
- The rpcLogStreamer_test.go's buildStreamer function doesn't include blockTracker integration
- The PR title indicates blockTracker is now part of indexing, but the test doesn't verify this integration
Suggested test additions:
- Mock the blockTracker interface in the test setup
- Verify blockTracker.UpdateLatestBlock is called with correct block numbers
- Test scenarios where blockTracker operations fail
🔗 Analysis chain
Line range hint
59-85
: Add test coverage for blockTracker integration.Given that the PR title mentions "Actually use blockTracker as part of indexing", this test should include scenarios verifying the integration with the blockTracker interface.
Let's check if there are other test files covering the blockTracker integration:
Let me check the content of the test file and the actual implementation to better understand the blockTracker integration.
Let me check the implementation of the rpcLogStreamer to understand how it integrates with blockTracker.
Let me try one final search to locate the blockTracker integration:
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash # Search for test files that might contain blockTracker tests rg -l -g '*_test.go' 'blockTracker' # Search for the interface implementation ast-grep --pattern 'interface IBlockTracker { $$$ }'Length of output: 120
Script:
#!/bin/bash # Check the blockTracker test coverage in indexer_test.go rg -A 10 'blockTracker' pkg/indexer/indexer_test.go # Find the actual implementation of blockTracker ast-grep --pattern 'type $_ struct { blockTracker $_ $$$ }' # Find any mock implementations rg -l 'mock.*blockTracker'Length of output: 1288
Script:
#!/bin/bash # Check the rpcLogStreamer implementation rg -A 10 'type RPCLogStreamer' pkg/blockchain/ # Find where blockTracker is used in rpcLogStreamer rg -A 5 'blockTracker' pkg/blockchain/ # Find the interface definition ast-grep --pattern 'interface IBlockTracker { $$$ }' # Check if there are any mocks for blockTracker fd -g 'mock*' pkg/indexer/Length of output: 173
Script:
#!/bin/bash # Search for files containing both rpcLogStreamer and blockTracker rg -l 'rpcLogStreamer.*blockTracker|blockTracker.*rpcLogStreamer' # Look for the struct definition and its constructor ast-grep --pattern 'type RPCLogStreamer struct { $$$ }' # Check the buildStreamer function in the test file rg -A 10 'func buildStreamer' pkg/blockchain/rpcLogStreamer_test.goLength of output: 471
pkg/mocks/indexer/mock_IBlockTracker.go (1)
1-128
: Well-structured mock for testing block tracking functionalityThis mock implementation will enable proper testing of the indexing logic with block tracking. The comprehensive API and automatic expectations checking will help maintain test quality.
Consider adding example usage in the package documentation to help other developers write tests using this mock.
pkg/blockchain/rpcLogStreamer.go (2)
Line range hint
133-144
: Consider graceful shutdown instead of panicWhile the max disconnect time check is important, panicking might not be the best approach in a production environment. Consider:
- Implementing a graceful shutdown
- Adding metrics/alerts for disconnect duration
- Allowing configuration of the behavior (panic vs. shutdown)
- panic( - "Max disconnect time exceeded. Node might drift too far away from expected state", - ) + r.logger.Error("Max disconnect time exceeded. Initiating shutdown...") + r.Stop() + return
179-179
: Ensure consistent integer types in metricsThe metrics call uses
int
while the block numbers areuint64
, which could cause issues with very large block numbers.- metrics.EmitCurrentBlock(contractAddress, int(highestBlock)) + metrics.EmitCurrentBlock(contractAddress, highestBlock) // Update metrics function to use uint64pkg/indexer/indexer.go (2)
Line range hint
198-216
: Ensure thread safety ofBlockTracker
when accessed concurrentlyThe
indexLogs
function is called in multiple goroutines, each potentially accessing shared resources withinBlockTracker
. Verify thatBlockTracker
is implemented in a thread-safe manner to prevent data races and ensure consistent behavior across concurrent operations.
143-147
: Enhance error context when initializing block trackersCurrently, if
NewBlockTracker
fails, the error is returned without additional context. Improving the error messages by including details such as the contract address or specific operation can aid in debugging and maintaining the system.Also applies to: 160-164
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
📒 Files selected for processing (8)
.mockery.yaml
(1 hunks)pkg/blockchain/rpcLogStreamer.go
(7 hunks)pkg/blockchain/rpcLogStreamer_test.go
(2 hunks)pkg/indexer/blockTracker.go
(2 hunks)pkg/indexer/indexer.go
(9 hunks)pkg/indexer/indexer_test.go
(2 hunks)pkg/indexer/interface.go
(1 hunks)pkg/mocks/indexer/mock_IBlockTracker.go
(1 hunks)
🔇 Additional comments (16)
pkg/indexer/interface.go (1)
5-8
: LGTM! Well-designed interface with proper type safety.
The interface is well-structured with:
- Appropriate use of uint64 for block numbers
- Proper error handling for updates
- Context support for cancellation/timeout
.mockery.yaml (2)
17-19
: LGTM! Configuration follows the established pattern.
The addition of IBlockTracker
interface is properly formatted and follows the existing structure in the mockery configuration.
17-19
: Verify interface existence and usage.
Let's verify that the interface exists and is being used in the codebase.
✅ Verification successful
Interface exists and is actively used in the codebase
The IBlockTracker interface is properly defined in pkg/indexer/interface.go
with two methods: GetLatestBlock()
and UpdateLatestBlock()
. It's being used across multiple files including the implementation (indexer.go
), tests (indexer_test.go
), and has corresponding mock (mock_IBlockTracker.go
).
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Description: Verify the existence and usage of IBlockTracker interface
# Check if the interface exists
echo "Checking interface definition..."
ast-grep --pattern 'type IBlockTracker interface {
$$$
}'
# Check interface usage
echo "Checking interface usage..."
rg -l "IBlockTracker" --type go
Length of output: 575
pkg/indexer/indexer_test.go (2)
13-14
: LGTM! Good separation of mock imports
The separation of mocks into specific packages (indexer and storer) improves code organization and maintainability.
40-42
: Same feedback as TestIndexLogsSuccess regarding mock expectations
Consider tightening the UpdateLatestBlock
expectations to validate the actual block number being tracked.
pkg/indexer/blockTracker.go (1)
Line range hint 38-43
: LGTM! The function rename improves clarity
The change from getLatestBlock
to loadLatestBlock
better reflects that this is a database operation. The error handling and thread-safe initialization using atomic store are well implemented.
pkg/blockchain/rpcLogStreamer_test.go (1)
59-60
: LGTM! Consistent type safety improvements.
The change to uint64
for block numbers maintains consistency with the function parameter changes.
pkg/mocks/indexer/mock_IBlockTracker.go (5)
1-10
: LGTM! Clean and minimal imports
The file header and imports are appropriate for a mock implementation.
11-23
: LGTM! Well-structured mock implementation
The mock struct and expecter implementation follow mockery's best practices, providing a clean API for test setup.
116-128
: LGTM! Robust constructor implementation
The constructor follows testing best practices:
- Automatic expectations checking
- Proper cleanup registration
- Clear interface requirements
24-68
: Verify interface compatibility for GetLatestBlock
The implementation looks correct, but let's verify it matches the interface definition.
✅ Verification successful
Mock implementation correctly matches the interface
The mock implementation of GetLatestBlock()
in MockIBlockTracker
perfectly matches the interface definition found in pkg/indexer/interface.go
. The method signature returns uint64
as required, and the implementation includes appropriate type assertions and panic handling for mock scenarios.
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Description: Verify the GetLatestBlock method signature matches the interface
# Search for the interface definition
ast-grep --pattern 'interface IBlockTracker {
$$$
GetLatestBlock() uint64
$$$
}'
Length of output: 88
Script:
#!/bin/bash
# Let's try to find the interface definition using ripgrep
rg "type IBlockTracker interface" -A 5
Length of output: 258
69-114
: Verify interface compatibility for UpdateLatestBlock
The implementation looks correct, but let's verify it matches the interface definition.
✅ Verification successful
Mock implementation matches interface definition perfectly
The mock implementation in pkg/mocks/indexer/mock_IBlockTracker.go
correctly implements the UpdateLatestBlock
method as defined in the interface IBlockTracker
(pkg/indexer/interface.go
). The method signature matches exactly:
- Same parameter types:
context.Context
anduint64
- Same return type:
error
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Description: Verify the UpdateLatestBlock method signature matches the interface
# Search for the interface definition
ast-grep --pattern 'interface IBlockTracker {
$$$
UpdateLatestBlock(ctx context.Context, block uint64) error
$$$
}'
Length of output: 123
Script:
#!/bin/bash
# Let's try to find the interface definition using ripgrep
rg "type IBlockTracker interface" -A 10
Length of output: 259
pkg/blockchain/rpcLogStreamer.go (3)
Line range hint 47-56
: LGTM: Type change is consistent and implementation is correct
The change from int
to uint64
for fromBlock
parameter is appropriate for blockchain block numbers and maintains consistency with other changes.
Line range hint 66-71
: LGTM: Struct field type change is appropriate
The change to uint64
for fromBlock
field aligns with blockchain domain requirements and maintains type consistency.
21-24
: Verify L2 reorg assumptions and risks
The change to uint64
for block numbers is appropriate. However, setting LAG_FROM_HIGHEST_BLOCK
to 0 assumes L2s have negligible reorg risk. While this is generally true, it's worth verifying:
- Which L2s this will be deployed to
- Their historical reorg depths
- The impact of potential reorgs on the indexing process
✅ Verification successful
Setting LAG_FROM_HIGHEST_BLOCK to 0 is acceptable with implemented safeguards
After reviewing the full implementation, setting LAG_FROM_HIGHEST_BLOCK
to 0 for L2s is acceptable because:
- The code includes a robust error handling mechanism with automatic retries
- There's a
maxDisconnectTime
parameter that forces a shutdown if the node drifts too far from the expected state - The implementation includes a paginated log fetching system that can handle reorgs implicitly by refetching blocks
- The streaming mechanism can recover from temporary disruptions while maintaining consistency
The change from int
to uint64
is also correct as it better represents block numbers which are always non-negative.
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Search for any reorg handling logic or comments about reorgs
rg -i "reorg" --type go
Length of output: 211
Script:
#!/bin/bash
# Let's check for L2-related configurations or comments
rg -i "l2" --type go
# Also check for any block processing logic that might handle chain reorganizations
ast-grep --pattern 'func $_($$$) {
$$$
block$$$
$$$
}'
Length of output: 371
Script:
#!/bin/bash
# Let's examine the full context of rpcLogStreamer to understand the block processing logic
cat pkg/blockchain/rpcLogStreamer.go
# Also check for any configuration files that might contain chain-specific settings
fd -e yaml -e json -e toml . | xargs grep -l "chain\|network"
Length of output: 6399
pkg/indexer/indexer.go (1)
149-149
: Verify correctness of initial block numbers from GetLatestBlock
When initializing the log streams with GetLatestBlock()
, ensure that the block numbers returned are accurate. Incorrect block numbers might lead to missed events or processing duplicates. It's important to confirm that the block tracker is correctly persisting and retrieving the latest processed block.
Also applies to: 166-166
756e259
to
4e21e1f
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Caution
Inline review comments failed to post. This is likely due to GitHub's limits when posting large numbers of comments.
Actionable comments posted: 1
🛑 Comments failed to post (1)
pkg/blockchain/rpcLogStreamer.go (1)
179-179: 🛠️ Refactor suggestion
Update metrics function to use uint64
The
metrics.EmitCurrentBlock
function is still usingint
while we're passing auint64
block number. This could cause issues with very large block numbers.Consider updating the metrics function signature to:
-func EmitCurrentBlock(contractAddress string, block int) +func EmitCurrentBlock(contractAddress string, block uint64)Committable suggestion skipped: line range outside the PR's diff.
9988ae5
to
f3496fc
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Outside diff range and nitpick comments (1)
pkg/indexer/indexer.go (1)
143-146
: Consider adding debug logs for initial block numbers.While the block tracker initialization looks good, it would be helpful to add debug logs showing the initial block numbers being used. This can aid in debugging and monitoring.
messagesTracker, err := NewBlockTracker(ctx, cfg.MessagesContractAddress, querier) if err != nil { return nil, err } +logger.Debug("initialized messages block tracker", zap.Uint64("startBlock", messagesTracker.GetLatestBlock())) identityUpdatesTracker, err := NewBlockTracker(ctx, cfg.IdentityUpdatesContractAddress, querier) if err != nil { return nil, err } +logger.Debug("initialized identity updates block tracker", zap.Uint64("startBlock", identityUpdatesTracker.GetLatestBlock()))Also applies to: 160-163
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
📒 Files selected for processing (8)
.mockery.yaml
(1 hunks)pkg/blockchain/rpcLogStreamer.go
(7 hunks)pkg/blockchain/rpcLogStreamer_test.go
(3 hunks)pkg/indexer/blockTracker.go
(2 hunks)pkg/indexer/indexer.go
(9 hunks)pkg/indexer/indexer_test.go
(2 hunks)pkg/indexer/interface.go
(1 hunks)pkg/mocks/indexer/mock_IBlockTracker.go
(1 hunks)
🚧 Files skipped from review as they are similar to previous changes (7)
- .mockery.yaml
- pkg/blockchain/rpcLogStreamer.go
- pkg/blockchain/rpcLogStreamer_test.go
- pkg/indexer/blockTracker.go
- pkg/indexer/indexer_test.go
- pkg/indexer/interface.go
- pkg/mocks/indexer/mock_IBlockTracker.go
🧰 Additional context used
📓 Learnings (1)
pkg/indexer/indexer.go (1)
Learnt from: neekolas
PR: xmtp/xmtpd#284
File: pkg/indexer/indexer.go:214-216
Timestamp: 2024-11-20T16:46:25.460Z
Learning: In `pkg/indexer/indexer.go`, it's acceptable if the `blockTracker` data becomes stale because the application will simply repeat some messages on the next startup. There's no need to implement retry logic for `UpdateLatestBlock` failures in the `indexLogs` function.
🔇 Additional comments (4)
pkg/indexer/indexer.go (4)
Line range hint 67-91
: LGTM! Block tracking integration looks good.
The changes properly integrate block tracking into both message and identity update indexing paths while maintaining proper error handling.
Also applies to: 116-116
125-129
: LGTM! Well-structured struct changes.
The new block tracker fields are logically grouped with their corresponding channels and follow consistent naming conventions.
214-216
: LGTM! Block tracker update implementation is appropriate.
The implementation correctly updates the block tracker after successful log storage. The error handling approach (log and continue) is appropriate given the clarification that stale block tracker data is acceptable since messages will be repeated on the next startup.
91-91
: Verify BlockTracker implementation is thread-safe.
The block tracker is accessed from multiple goroutines (messages and identity updates). Let's verify that the BlockTracker implementation properly handles concurrent access.
Also applies to: 116-116
✅ Verification successful
Let me check the actual implementation of BlockTracker to understand how the atomic variable is used.
Let me check the methods that access and modify the latestBlock field to ensure thread safety.
BlockTracker implementation is thread-safe
The implementation properly handles concurrent access through:
- Use of
atomic.Uint64
for thelatestBlock
field ensuring atomic read/write operations - Proper usage of
Store()
andLoad()
methods for all access tolatestBlock
- Additional
sync.Mutex
field available for other synchronization needs
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Description: Check BlockTracker implementation for thread safety mechanisms
# Look for mutex usage or other synchronization primitives in BlockTracker
echo "Checking BlockTracker implementation for thread safety..."
ast-grep --pattern 'type BlockTracker struct {
$$$
sync.$_
$$$
}'
# Look for atomic operations
rg 'atomic\.' --type go
Length of output: 283
Script:
#!/bin/bash
# Check the BlockTracker implementation
echo "Checking BlockTracker implementation..."
rg -A 20 "type BlockTracker struct" --type go
# Check for methods that modify latestBlock
echo -e "\nChecking methods that modify latestBlock..."
ast-grep --pattern 'func ($_ *BlockTracker) $$$latestBlock$$$'
Length of output: 1428
Script:
#!/bin/bash
# Check methods of BlockTracker that access or modify latestBlock
echo "Checking BlockTracker methods..."
rg -B 2 -A 10 "func \(\*BlockTracker\)" --type go
# Also check for Store/Load operations on atomic.Uint64
echo -e "\nChecking atomic operations..."
rg "\.Store\(|\.Load\(" --type go
Length of output: 825
f3496fc
to
08f2515
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
📒 Files selected for processing (8)
.mockery.yaml
(1 hunks)pkg/blockchain/rpcLogStreamer.go
(7 hunks)pkg/blockchain/rpcLogStreamer_test.go
(3 hunks)pkg/indexer/blockTracker.go
(2 hunks)pkg/indexer/indexer.go
(9 hunks)pkg/indexer/indexer_test.go
(2 hunks)pkg/indexer/interface.go
(1 hunks)pkg/mocks/indexer/mock_IBlockTracker.go
(1 hunks)
🚧 Files skipped from review as they are similar to previous changes (7)
- .mockery.yaml
- pkg/blockchain/rpcLogStreamer.go
- pkg/blockchain/rpcLogStreamer_test.go
- pkg/indexer/blockTracker.go
- pkg/indexer/indexer.go
- pkg/indexer/interface.go
- pkg/mocks/indexer/mock_IBlockTracker.go
🔇 Additional comments (3)
pkg/indexer/indexer_test.go (3)
13-14
: LGTM! Clean mock organization
The separation of mocks into specific packages (indexerMocks
and storerMocks
) improves code organization and clarity.
21-25
: LGTM! Proper block tracking setup
The block tracking setup is well-structured:
- Uses appropriate uint64 type for block numbers
- Correctly sets up mock expectations for UpdateLatestBlock
Also applies to: 28-29
34-35
: Skip comment about time.Sleep
08f2515
to
d58f07c
Compare
tl;dr
blockTracker
as part of indexing the chainrpcLogStreamer
to use a uint64 instead ofint
type for block numbers for consistencySummary by CodeRabbit
New Features
IBlockTracker
for tracking and updating the latest blockchain block.IBlockTracker
to facilitate testing.Bug Fixes
int
touint64
for better type safety in block number handling.Tests
IBlockTracker
interface and updating test functions for consistency.Chores