Skip to content

Commit

Permalink
Log the progress of substreams (#4935)
Browse files Browse the repository at this point in the history
* Update protobuf structure

* Log progress message once every 30 sec.

* superfluous trace_id

* better formating

* spelling

* humane representation of time

* concise logs

* factor out the log string code

* move loging code to other file

* don't allow multiple trace_id in logs

* proper handling of trace_id

* v0.33.0 (#4886)

* cargo: update workspace crates' version to v0.33.0

* update NEWS.md for v0.33.0

* Add new API Version to validate when setting fields not defined in the schema  (#4894)

* build(deps): bump chrono from 0.4.26 to 0.4.31 (#4876)

Bumps [chrono](https://github.com/chronotope/chrono) from 0.4.26 to 0.4.31.
- [Release notes](https://github.com/chronotope/chrono/releases)
- [Changelog](https://github.com/chronotope/chrono/blob/main/CHANGELOG.md)
- [Commits](chronotope/chrono@v0.4.26...v0.4.31)

---
updated-dependencies:
- dependency-name: chrono
  dependency-type: direct:production
  update-type: version-update:semver-patch
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>

* build(deps): bump webpki from 0.22.0 to 0.22.1 (#4857)

Bumps [webpki](https://github.com/briansmith/webpki) from 0.22.0 to 0.22.1.
- [Commits](https://github.com/briansmith/webpki/commits)

---
updated-dependencies:
- dependency-name: webpki
  dependency-type: indirect
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>

* runtime: only include valid fields in entity for store_set

* graph, runtime: add new apiVersion to validate fields not defined in the schema

* graph: update tests for setting invalid field

* tests: add runner tests for undefined field setting validation in apiVersion 0.0.8

* graph: add check_invalid_fields method to HostExports

---------

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>

* update NEWS.md

* tests: add .gitignore for api-version test

---------

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>

* docker: upgrade cloudbuild machineType

* fix(graphql): change CDN to JS Deliver for GraphiQL (#4941)

* fix(graphql): change CDN to JS Deliver for GraphiQL

* fix(graphql): add crossorigin prop

* build(deps): bump toml from 0.7.6 to 0.8.4

Bumps [toml](https://github.com/toml-rs/toml) from 0.7.6 to 0.8.4.
- [Commits](toml-rs/toml@toml-v0.7.6...toml-v0.8.4)

---
updated-dependencies:
- dependency-name: toml
  dependency-type: direct:production
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <support@github.com>

* build(deps): bump semver from 1.0.18 to 1.0.20

Bumps [semver](https://github.com/dtolnay/semver) from 1.0.18 to 1.0.20.
- [Release notes](https://github.com/dtolnay/semver/releases)
- [Commits](dtolnay/semver@1.0.18...1.0.20)

---
updated-dependencies:
- dependency-name: semver
  dependency-type: direct:production
  update-type: version-update:semver-patch
...

Signed-off-by: dependabot[bot] <support@github.com>

* docs: Update information about `graphman config check`

* docs: Add a guide for sharding

* tests: Update runner-tests package.json

* fix(tests): yarn workspace to install all subfolders

* ci: update to node 20

* fix(graphql): add crossorigin prop (#4948)

* Fix issues in runner tests (#4962)

* tests: Don't compare WASM backtraces

While it would be nice to test them in isolation, here it is just
making these high-level tests too sensitive to unrelated changes.

* tests: Test PoI as hex and update to current value

This changed probably because graph-cli and -ts were updated.

* tests: Make `data_source_revert` more robust to code updates

Previously it would break whenever the graft base subgraph id would change,
now that id is updated automatically.

* Update tests/runner-tests/data-source-revert/package.json

Co-authored-by: Saihajpreet Singh <saihajpreet.singh@gmail.com>

---------

Co-authored-by: Saihajpreet Singh <saihajpreet.singh@gmail.com>

* store/test-store: test more types for DataSource context in manifest

* server/index-node : lazy load features from manifest when a subgraph is not deployed

* [Feature] Add support for `endBlock` in data sources (#4787)

* graph,chain,store/test-store : Allow new param `endBlock` in manifest

* core,graph,store: ignore end_block reached datasources in match_and_decode, include them in processed datasources

* tests : add runner tests for end-block

* core: move TriggerFilter construction into SubgraphRunner.run_inner

* core: filter out endBlock reached subgraphs when constructing TriggerFilter

* chain,core: refactor endBlock implementation

* refactor `SubgraphRunner.run_inner` to extract `build_filter`

* core : handle reverts for endBlock

* chain,graph: set min_spec_version requirements for endBlock

* core: refaction `build_filter`

* tests: runner test for endblock on reorg

* core: restart block stream in the next block for endblock reached ds

* graph: bump specVersion requirement for endBlock

* core: refactor build_filter logic

* core, tests, graph : make TriggerFilters testable

* chain/startknet: endBlock support for starknet

* chain,core,graph: refactor end_block implementation

* core: refactor build_filter

* Add comments for end-block runner tests

* graph, runtime, chain: Add GasMetrics for DIPS experiments

* graph: refactor GasCounter for trackiing gas metrics, add a new env for gas metrics

* tests: Revamp the integration tests

With these changes, we do not use truffler/ganache anymore. Tests are now
written as normal Rust tests. See `tests/README.md` for details

* runner tests: Do not clobber api-version tests

Two different tests are run in the same directory; make sure that the files
they generate and use do not clobber each other.

* workflows: Do not compile with --verbose for runner tests

* graph, store: Track connection wait time in the trace

There are other places during query execution where we might be waiting for
connections, but this at least records the times for getting the actual
data.

* graph, store: Track permit wait times in query traces

* fix: Increase RPC base backoff (#4984)

* fix: Increase RPC base backoff

* fix: Add 'stack underflow' to deterministic geth errors

* Make `SubstreamsBlockIngestor` start at chain's head if no cursor already exist (#4951)

Fixes #4942

---------

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: Zoran Cvetkov <zoran@edgeandnode.com>
Co-authored-by: Krishnanand V P <44740264+incrypto32@users.noreply.github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Co-authored-by: incrypto32 <vpkrishnanand@gmail.com>
Co-authored-by: Saihajpreet Singh <saihajpreet.singh@gmail.com>
Co-authored-by: David Lutterkort <lutter@watzmann.net>
Co-authored-by: Leonardo Yvens <leoyvens@gmail.com>
Co-authored-by: Matthieu Vachon <matt@streamingfast.io>
  • Loading branch information
9 people authored Nov 13, 2023
1 parent 2bc67bb commit 79703ba
Show file tree
Hide file tree
Showing 6 changed files with 322 additions and 100 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions graph/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ hex = "0.4.3"
http = "0.2.3"
futures = "0.1.21"
graphql-parser = "0.4.0"
humantime = "2.1.0"
lazy_static = "1.4.0"
num-bigint = { version = "^0.2.6", features = ["serde"] }
num_cpus = "1.16.0"
Expand Down
125 changes: 90 additions & 35 deletions graph/proto/substreams-rpc.proto
Original file line number Diff line number Diff line change
Expand Up @@ -52,14 +52,14 @@ message Response {
oneof message {
SessionInit session = 1; // Always sent first
ModulesProgress progress = 2; // Progress of data preparation, before sending in the stream of `data` events.
BlockScopedData block_scoped_data = 3;
BlockScopedData block_scoped_data = 3;
BlockUndoSignal block_undo_signal = 4;
Error fatal_error = 5;

// Available only in developer mode, and only if `debug_initial_store_snapshot_for_modules` is set.
InitialSnapshotData debug_snapshot_data = 10;
// Available only in developer mode, and only if `debug_initial_store_snapshot_for_modules` is set.
InitialSnapshotComplete debug_snapshot_complete = 11;

}
}

Expand All @@ -68,7 +68,7 @@ message Response {
// with a block number above 'last_valid_block' has been reverted
// on-chain. Delete that data and restart from 'last_valid_cursor'
message BlockUndoSignal {
sf.substreams.v1.BlockRef last_valid_block = 1;
sf.substreams.v1.BlockRef last_valid_block = 1;
string last_valid_cursor = 2;
}

Expand All @@ -86,6 +86,9 @@ message BlockScopedData {

message SessionInit {
string trace_id = 1;
uint64 resolved_start_block = 2;
uint64 linear_handoff_block = 3;
uint64 max_parallel_workers = 4;
}

message InitialSnapshotComplete {
Expand Down Expand Up @@ -124,45 +127,93 @@ message OutputDebugInfo {
bool cached = 3;
}

// ModulesProgress is a message that is sent every 500ms
message ModulesProgress {
repeated ModuleProgress modules = 1;
// previously: repeated ModuleProgress modules = 1;
// these previous `modules` messages were sent in bursts and are not sent anymore.
reserved 1;
// List of jobs running on tier2 servers
repeated Job running_jobs = 2;
// Execution statistics for each module
repeated ModuleStats modules_stats = 3;
// Stages definition and completed block ranges
repeated Stage stages = 4;

ProcessedBytes processed_bytes = 5;
}

message ModuleProgress {
string name = 1;
message ProcessedBytes {
uint64 total_bytes_read = 1;
uint64 total_bytes_written = 2;
}

oneof type {
ProcessedRanges processed_ranges = 2;
InitialState initial_state = 3;
ProcessedBytes processed_bytes = 4;
Failed failed = 5;
}

message ProcessedRanges {
repeated BlockRange processed_ranges = 1;
}
message InitialState {
uint64 available_up_to_block = 2;
}
message ProcessedBytes {
uint64 total_bytes_read = 1;
uint64 total_bytes_written = 2;
uint64 bytes_read_delta = 3;
uint64 bytes_written_delta = 4;
uint64 nano_seconds_delta = 5;
}
message Failed {
string reason = 1;
repeated string logs = 2;
// FailureLogsTruncated is a flag that tells you if you received all the logs or if they
// were truncated because you logged too much (fixed limit currently is set to 128 KiB).
bool logs_truncated = 3;
}
message Error {
string module = 1;
string reason = 2;
repeated string logs = 3;
// FailureLogsTruncated is a flag that tells you if you received all the logs or if they
// were truncated because you logged too much (fixed limit currently is set to 128 KiB).
bool logs_truncated = 4;
}

message BlockRange {
uint64 start_block = 2;
uint64 end_block = 3;

message Job {
uint32 stage = 1;
uint64 start_block = 2;
uint64 stop_block = 3;
uint64 processed_blocks = 4;
uint64 duration_ms = 5;
}

message Stage {
repeated string modules = 1;
repeated BlockRange completed_ranges = 2;
}

// ModuleStats gathers metrics and statistics from each module, running on tier1 or tier2
// All the 'count' and 'time_ms' values may include duplicate for each stage going over that module
message ModuleStats {
// name of the module
string name = 1;

// total_processed_blocks is the sum of blocks sent to that module code
uint64 total_processed_block_count = 2;
// total_processing_time_ms is the sum of all time spent running that module code
uint64 total_processing_time_ms = 3;

//// external_calls are chain-specific intrinsics, like "Ethereum RPC calls".
repeated ExternalCallMetric external_call_metrics = 4;

// total_store_operation_time_ms is the sum of all time spent running that module code waiting for a store operation (ex: read, write, delete...)
uint64 total_store_operation_time_ms = 5;
// total_store_read_count is the sum of all the store Read operations called from that module code
uint64 total_store_read_count = 6;

// total_store_write_count is the sum of all store Write operations called from that module code (store-only)
uint64 total_store_write_count = 10;

// total_store_deleteprefix_count is the sum of all store DeletePrefix operations called from that module code (store-only)
// note that DeletePrefix can be a costly operation on large stores
uint64 total_store_deleteprefix_count = 11;

// store_size_bytes is the uncompressed size of the full KV store for that module, from the last 'merge' operation (store-only)
uint64 store_size_bytes = 12;

// total_store_merging_time_ms is the time spent merging partial stores into a full KV store for that module (store-only)
uint64 total_store_merging_time_ms = 13;

// store_currently_merging is true if there is a merging operation (partial store to full KV store) on the way.
bool store_currently_merging = 14;

// highest_contiguous_block is the highest block in the highest merged full KV store of that module (store-only)
uint64 highest_contiguous_block = 15;
}

message ExternalCallMetric {
string name = 1;
uint64 count = 2;
uint64 time_ms = 3;
}

message StoreDelta {
Expand All @@ -179,3 +230,7 @@ message StoreDelta {
bytes new_value = 5;
}

message BlockRange {
uint64 start_block = 2;
uint64 end_block = 3;
}
27 changes: 26 additions & 1 deletion graph/src/blockchain/block_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,11 @@ use async_stream::stream;
use futures03::Stream;
use std::fmt;
use std::sync::Arc;
use std::time::Instant;
use thiserror::Error;
use tokio::sync::mpsc::{self, Receiver, Sender};

use super::substreams_block_stream::SubstreamsLogData;
use super::{Block, BlockPtr, Blockchain};
use crate::anyhow::Result;
use crate::components::store::{BlockNumber, DeploymentLocator};
Expand Down Expand Up @@ -356,10 +358,16 @@ pub trait SubstreamsMapper<C: Blockchain>: Send + Sync {
&self,
logger: &mut Logger,
message: Option<Message>,
log_data: &mut SubstreamsLogData,
) -> Result<Option<BlockStreamEvent<C>>, SubstreamsError> {
match message {
Some(SubstreamsMessage::Session(session_init)) => {
*logger = logger.new(o!("trace_id" => session_init.trace_id));
info!(
&logger,
"Received session init";
"session" => format!("{:?}", session_init),
);
log_data.trace_id = session_init.trace_id;
return Ok(None);
}
Some(SubstreamsMessage::BlockUndoSignal(undo)) => {
Expand All @@ -371,6 +379,7 @@ pub trait SubstreamsMapper<C: Blockchain>: Send + Sync {
hash: valid_block.id.trim_start_matches("0x").try_into()?,
number: valid_block.number as i32,
};
log_data.last_seen_block = valid_block.number;
return Ok(Some(BlockStreamEvent::Revert(
valid_ptr,
FirehoseCursor::from(undo.last_valid_cursor.clone()),
Expand Down Expand Up @@ -403,13 +412,29 @@ pub trait SubstreamsMapper<C: Blockchain>: Send + Sync {
};

let block = self.decode_triggers(&logger, &clock, &map_output).await?;
log_data.last_seen_block = block.block.number() as u64;

Ok(Some(BlockStreamEvent::ProcessBlock(
block,
FirehoseCursor::from(cursor.clone()),
)))
}

Some(SubstreamsMessage::Progress(progress)) => {
if log_data.last_progress.elapsed() > Duration::from_secs(30) {
info!(&logger, "{}", log_data.info_string(&progress); "trace_id" => &log_data.trace_id);
debug!(&logger, "{}", log_data.debug_string(&progress); "trace_id" => &log_data.trace_id);
trace!(
&logger,
"Received progress update";
"progress" => format!("{:?}", progress),
"trace_id" => &log_data.trace_id,
);
log_data.last_progress = Instant::now();
}
Ok(None)
}

// ignoring Progress messages and SessionInit
// We are only interested in Data and Undo signals
_ => Ok(None),
Expand Down
90 changes: 88 additions & 2 deletions graph/src/blockchain/substreams_block_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,11 @@ use crate::blockchain::block_stream::{BlockStream, BlockStreamEvent};
use crate::blockchain::Blockchain;
use crate::prelude::*;
use crate::substreams::Modules;
use crate::substreams_rpc::{Request, Response};
use crate::substreams_rpc::{ModulesProgress, Request, Response};
use crate::util::backoff::ExponentialBackoff;
use async_stream::try_stream;
use futures03::{Stream, StreamExt};
use humantime::format_duration;
use std::sync::Arc;
use std::task::{Context, Poll};
use std::time::{Duration, Instant};
Expand Down Expand Up @@ -178,6 +179,8 @@ fn stream_blocks<C: Blockchain, F: SubstreamsMapper<C>>(
#[allow(unused_assignments)]
let mut skip_backoff = false;

let mut log_data = SubstreamsLogData::new();

try_stream! {
let endpoint = client.firehose_endpoint()?;
let mut logger = logger.new(o!("deployment" => deployment.clone(), "provider" => endpoint.provider.to_string()));
Expand Down Expand Up @@ -225,6 +228,7 @@ fn stream_blocks<C: Blockchain, F: SubstreamsMapper<C>>(
response,
mapper.as_ref(),
&mut logger,
&mut log_data,
).await {
Ok(block_response) => {
match block_response {
Expand Down Expand Up @@ -289,14 +293,15 @@ async fn process_substreams_response<C: Blockchain, F: SubstreamsMapper<C>>(
result: Result<Response, Status>,
mapper: &F,
logger: &mut Logger,
log_data: &mut SubstreamsLogData,
) -> Result<Option<BlockResponse<C>>, Error> {
let response = match result {
Ok(v) => v,
Err(e) => return Err(anyhow!("An error occurred while streaming blocks: {:#}", e)),
};

match mapper
.to_block_stream_event(logger, response.message)
.to_block_stream_event(logger, response.message, log_data)
.await
.context("Mapping message to BlockStreamEvent failed")?
{
Expand Down Expand Up @@ -326,3 +331,84 @@ impl<C: Blockchain> BlockStream<C> for SubstreamsBlockStream<C> {
SUBSTREAMS_BUFFER_STREAM_SIZE
}
}

pub struct SubstreamsLogData {
pub last_progress: Instant,
pub last_seen_block: u64,
pub trace_id: String,
}

impl SubstreamsLogData {
fn new() -> SubstreamsLogData {
SubstreamsLogData {
last_progress: Instant::now(),
last_seen_block: 0,
trace_id: "".to_string(),
}
}
pub fn info_string(&self, progress: &ModulesProgress) -> String {
format!(
"Substreams backend graph_out last block is {}, {} stages, {} jobs",
self.last_seen_block,
progress.stages.len(),
progress.running_jobs.len()
)
}
pub fn debug_string(&self, progress: &ModulesProgress) -> String {
let len = progress.stages.len();
let mut stages_str = "".to_string();
for i in (0..len).rev() {
let stage = &progress.stages[i];
let range = if stage.completed_ranges.len() > 0 {
let b = stage.completed_ranges.iter().map(|x| x.end_block).min();
format!(" up to {}", b.unwrap_or(0))
} else {
"".to_string()
};
let mlen = stage.modules.len();
let module = if mlen == 0 {
"".to_string()
} else if mlen == 1 {
format!(" ({})", stage.modules[0])
} else {
format!(" ({} +{})", stage.modules[mlen - 1], mlen - 1)
};
if !stages_str.is_empty() {
stages_str.push_str(", ");
}
stages_str.push_str(&format!("#{}{}{}", i, range, module));
}
let stage_str = if len > 0 {
format!(" Stages: [{}]", stages_str)
} else {
"".to_string()
};
let mut jobs_str = "".to_string();
let jlen = progress.running_jobs.len();
for i in 0..jlen {
let job = &progress.running_jobs[i];
if !jobs_str.is_empty() {
jobs_str.push_str(", ");
}
let duration_str = format_duration(Duration::from_millis(job.duration_ms));
jobs_str.push_str(&format!(
"#{} on Stage {} @ {} | +{}|{} elapsed {}",
i,
job.stage,
job.start_block,
job.processed_blocks,
job.stop_block - job.start_block,
duration_str
));
}
let job_str = if jlen > 0 {
format!(", Jobs: [{}]", jobs_str)
} else {
"".to_string()
};
format!(
"Substreams backend graph_out last block is {},{}{}",
self.last_seen_block, stage_str, job_str,
)
}
}
Loading

0 comments on commit 79703ba

Please sign in to comment.