Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Create initial Coordinator V2 service #444

Merged
merged 46 commits into from
Jan 11, 2024
Merged
Show file tree
Hide file tree
Changes from 40 commits
Commits
Show all changes
46 commits
Select commit Hold shift + click to select a range
8e6e95d
feat: Create crate for coordinator control service
morgsmccauley Dec 12, 2023
c2f857b
feat: Fetch and print registry contract
morgsmccauley Dec 12, 2023
dd26099
feat: Used shared registry types
morgsmccauley Dec 14, 2023
025b315
refactor: Extract registry fetch to own mod
morgsmccauley Dec 14, 2023
3e05d9a
refactor: Use structured logging
morgsmccauley Dec 17, 2023
4ba5296
feat: Start all streams with stubbed data
morgsmccauley Dec 18, 2023
126296d
feat: Start stream with filter specified in registry
morgsmccauley Dec 18, 2023
64d835b
feat: Add `update_at`/`created_at` fields to registry
morgsmccauley Dec 18, 2023
c91f814
chore: Temporarily mock registry
morgsmccauley Dec 18, 2023
b19c607
feat: Write `last_indexed_block` per block stream
morgsmccauley Dec 18, 2023
0db45b0
feat: Start block stream from desired height
morgsmccauley Dec 18, 2023
a849a1a
refactor: Abstract block stream handling away
morgsmccauley Dec 18, 2023
32ca6b7
refactor: Allow mocking of `Registry`
morgsmccauley Dec 18, 2023
34b21f7
refactor: Allow mocking of `BlockStreamHandler`
morgsmccauley Dec 18, 2023
574b629
test: Add unit test for control loop
morgsmccauley Dec 18, 2023
b8cbcee
refactor: Rename for clarity
morgsmccauley Dec 18, 2023
19fe056
test: Test starting of block streams
morgsmccauley Dec 18, 2023
dd4f587
feat: Add `version` to block streams
morgsmccauley Dec 18, 2023
d011a22
feat: Write block stream version from coordinator
morgsmccauley Dec 18, 2023
bb157e7
feat: Stop streams not in registry
morgsmccauley Dec 19, 2023
fbc71d5
feat: Ignores streams with matching versions
morgsmccauley Dec 19, 2023
b803a03
feat: Restart streams with mismatched versions
morgsmccauley Dec 19, 2023
e0c3714
refactor: Avoid `.clone()`
morgsmccauley Dec 21, 2023
09cca2b
feat: Add support for `ActionFunctionCallRule`
morgsmccauley Dec 21, 2023
124cec9
feat: Log block stream requests
morgsmccauley Dec 21, 2023
a0568ed
fix: Map correct status values
morgsmccauley Dec 21, 2023
b70b25d
feat: Skip historical/delta lake processing for function/event rules
morgsmccauley Dec 21, 2023
4a81033
chore: Pin `near-lake-framework` to `0.7.5`
morgsmccauley Dec 21, 2023
dbdc1df
feat: Continuously loop registry config synchronization
morgsmccauley Dec 21, 2023
b2d8720
chore: Remove stubbed registry contract
morgsmccauley Dec 21, 2023
6e89697
chore: Use `near-primitives` feature for `registry-types`
morgsmccauley Jan 3, 2024
67e60fd
refactor: Rename `synchronise_registry_config` -> `synchronise_block_…
morgsmccauley Jan 7, 2024
83a2d92
feat: Start executors
morgsmccauley Jan 7, 2024
2977df0
feat: Add error context
morgsmccauley Jan 7, 2024
20b0fb4
feat: Restart executors with mismatched versions
morgsmccauley Jan 8, 2024
73609d8
feat: Stop executors not in registry
morgsmccauley Jan 8, 2024
a9cf247
refactor: Configure block stream redis stream from coordinator
morgsmccauley Jan 8, 2024
240ba3d
fix: Enable (hack) mocking by pinning lake framework version
morgsmccauley Jan 8, 2024
165111f
ci: Add checks for Coordinator
morgsmccauley Jan 8, 2024
62a3334
refactor: Rename to `last_published_block` as it has not been executed
morgsmccauley Jan 8, 2024
d7aa296
chore: Remove "historical" name references from logging
morgsmccauley Jan 11, 2024
e373d4c
fix: Publish blocks to correct redis stream
morgsmccauley Jan 11, 2024
abc7272
chore: Fix spelling
morgsmccauley Jan 11, 2024
762a078
fixup! fix: Publish blocks to correct redis stream
morgsmccauley Jan 11, 2024
d250f1f
refactor: Rename for clarity
morgsmccauley Jan 11, 2024
46745d2
fix: Fix rate limiting on protoc setup
morgsmccauley Jan 11, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 73 additions & 0 deletions .github/workflows/coordinator-ci.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
name: Coordinator

on:
push:
branches: [ main ]
paths:
- "coordinator/**"
pull_request:
paths:
- "coordinator/**"

env:
CARGO_TERM_COLOR: always

jobs:
check:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- name: Install Protoc
uses: arduino/setup-protoc@v2
- name: Check
working-directory: ./coordinator
run: cargo check

test:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- name: Install Protoc
uses: arduino/setup-protoc@v2
- name: Test
working-directory: ./coordinator
run: cargo test


format:
runs-on: ubuntu-20.04
steps:
- name: Checkout repository
uses: actions/checkout@v3
- name: Install Protoc
uses: arduino/setup-protoc@v2
- name: Install Rust
uses: actions-rs/toolchain@v1
with:
toolchain: 1.70.0
override: true
profile: minimal
components: rustfmt
- name: Check formatting
working-directory: ./coordinator
run: |
cargo fmt -- --check

clippy:
runs-on: ubuntu-20.04
steps:
- name: Checkout repository
uses: actions/checkout@v3
- name: Install Protoc
uses: arduino/setup-protoc@v2
- name: Install Rust
uses: actions-rs/toolchain@v1
with:
toolchain: 1.70.0
override: true
profile: minimal
components: clippy
- name: Clippy check
working-directory: ./coordinator
run: |
cargo clippy
2 changes: 1 addition & 1 deletion block-streamer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ wildmatch = "2.1.1"

registry-types = { path = "../registry/types", features = ["near-primitives"] }

near-lake-framework = "0.7.4"
near-lake-framework = "=0.7.4"
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The existing mocks for near-lake-framework depend on behaviour which does not exist in the next version.

I'm pinning this for now, when we upgrade we'll need to find another way to mock it.


[build-dependencies]
tonic-build = "0.10"
Expand Down
2 changes: 2 additions & 0 deletions block-streamer/examples/start_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
start_block_height: 106700000,
account_id: "morgs.near".to_string(),
function_name: "test".to_string(),
version: 0,
redis_stream: "morgs.near/test:block_stream".to_string(),
rule: Some(Rule::ActionAnyRule(ActionAnyRule {
affected_account_id: "social.near".to_string(),
status: Status::Success.into(),
Expand Down
69 changes: 38 additions & 31 deletions block-streamer/proto/block_streamer.proto
Original file line number Diff line number Diff line change
Expand Up @@ -2,48 +2,52 @@ syntax = "proto3";

package blockstreamer;

// The BlockStreamer service provides RPCs to manage BlockStream instances.
// The BlockStreamer service provides RPCs to manage BlockStream instances
service BlockStreamer {
// Starts a new BlockStream process.
// Starts a new BlockStream process
rpc StartStream (StartStreamRequest) returns (StartStreamResponse);

// Stops an existing BlockStream process.
// Stops an existing BlockStream process
rpc StopStream (StopStreamRequest) returns (StopStreamResponse);

// Lists all current BlockStream processes.
// Lists all current BlockStream processes
rpc ListStreams (ListStreamsRequest) returns (ListStreamsResponse);
}

// Request message for starting a BlockStream.
// Request message for starting a BlockStream
message StartStreamRequest {
// Which block height to start from.
// Which block height to start from
uint64 start_block_height = 1;
// The account ID which the indexer is defined under
// Account ID which the indexer is defined under
string account_id = 2;
// The name of the indexer
// Name of the indexer
string function_name = 3;
// The filter rule to apply to incoming blocks
// Block height corresponding to the created/updated height of the indexer
uint64 version = 4;
// Key of Redis Stream to publish blocks to
string redis_stream = 5;
// Filter rule to apply to incoming blocks
oneof rule {
ActionAnyRule action_any_rule = 4;
ActionFunctionCallRule action_function_call_rule = 5;
ActionAnyRule action_any_rule = 6;
ActionFunctionCallRule action_function_call_rule = 7;
}
}

// Match any action against the specified account
message ActionAnyRule {
// The account ID pattern to match against
// Account ID pattern to match against
string affected_account_id = 1;
darunrs marked this conversation as resolved.
Show resolved Hide resolved
// The status of the action to match against
// Status of the action to match against
Status status = 2;
}

// Match a specific function call against the specified account
message ActionFunctionCallRule {
// The account ID pattern to match against
// Account ID pattern to match against
string affected_account_id = 1;
// The function name to match against
// Function name to match against
string function_name = 2;
// The status of the action to match against
// Status of the action to match against
Status status = 3;
}

Expand All @@ -54,40 +58,43 @@ enum Status {
STATUS_ANY = 3;
}

// Response message for starting a BlockStream.
// Response message for starting a BlockStream
message StartStreamResponse {
// ID or handle of the started BlockStream.
// ID or handle of the started BlockStream
string stream_id = 1;
}

// Request message for stopping a BlockStream.
// Request message for stopping a BlockStream
message StopStreamRequest {
// ID or handle of the BlockStream to stop.
// ID or handle of the BlockStream to stop
string stream_id = 1;
}

// Response message for stopping a BlockStream.
// Response message for stopping a BlockStream
message StopStreamResponse {
// Confirmation message or status.
// Confirmation message or status
string status = 1;
}

// Request message for listing BlockStreams.
// Request message for listing BlockStreams
message ListStreamsRequest {
// Optional filters or parameters for listing streams.
// Optional filters or parameters for listing streams
}

// Response message for listing BlockStreams.
// Response message for listing BlockStreams
message ListStreamsResponse {
// List of active BlockStreams.
// List of active BlockStreams
repeated StreamInfo streams = 1;
}

// Information about a single BlockStream instance.
// Information about a single BlockStream instance
message StreamInfo {
darunrs marked this conversation as resolved.
Show resolved Hide resolved
// ID or handle of the BlockStream
string stream_id = 1;
int64 start_block_height = 2;
string indexer_name = 3;
string chain_id = 4;
string status = 5;
// Account ID of the indexer
string account_id = 3;
// Function name of the indexer
string function_name = 4;
// Block height corresponding to the created/updated height of the indexer
uint64 version = 5;
}
Loading
Loading