Skip to content

Commit

Permalink
feat(node_framework): Unify Task types + misc improvements (#2325)
Browse files Browse the repository at this point in the history
## What ❔

- Unifies `Task` types. Now we only have a single `Task` type with
different `TaskKind` specifiers.
- Refactors `ZkStackService::run` so that it's more readable.
- Updates the framework documentation.
- Minor improvements here and there.

## Why ❔

- Preparing framework for the previously proposed refactoring (e.g.
`FromContext` / `IntoContext` IO flow).
- Preparing framework for the publishing.

## Checklist

<!-- Check your PR fulfills the following items. -->
<!-- For draft PRs check the boxes as you complete them. -->

- [ ] PR title corresponds to the body of PR (we generate changelog
entries from PRs).
- [ ] Tests for the changes have been added / updated.
- [ ] Documentation comments have been added / updated.
- [ ] Code has been formatted via `zk fmt` and `zk lint`.
  • Loading branch information
popzxc authored Jun 26, 2024
1 parent 8b1fbab commit 298a97e
Show file tree
Hide file tree
Showing 26 changed files with 530 additions and 1,011 deletions.
2 changes: 0 additions & 2 deletions core/node/node_framework/examples/showcase.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,6 @@ struct DatabaseResource(pub Arc<dyn Database>);
///
/// For the latter requirement, there exists an `Unique` wrapper that can be used to store non-`Clone`
/// resources. It's not used in this example, but it's a useful thing to know about.
///
/// Finally, there are other wrappers for resources as well, like `ResourceCollection` and `LazyResource`.
impl Resource for DatabaseResource {
fn name() -> String {
// The convention for resource names is `<scope>/<name>`. In this case, the scope is `common`, but
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use zksync_config::configs::chain::CircuitBreakerConfig;
use crate::{
implementations::resources::circuit_breakers::CircuitBreakersResource,
service::{ServiceContext, StopReceiver},
task::{TaskId, UnconstrainedTask},
task::{Task, TaskId, TaskKind},
wiring_layer::{WiringError, WiringLayer},
};

Expand Down Expand Up @@ -44,7 +44,7 @@ impl WiringLayer for CircuitBreakerCheckerLayer {
circuit_breaker_checker,
};

node.add_unconstrained_task(Box::new(task));
node.add_task(Box::new(task));
Ok(())
}
}
Expand All @@ -55,15 +55,16 @@ struct CircuitBreakerCheckerTask {
}

#[async_trait::async_trait]
impl UnconstrainedTask for CircuitBreakerCheckerTask {
impl Task for CircuitBreakerCheckerTask {
fn kind(&self) -> TaskKind {
TaskKind::UnconstrainedTask
}

fn id(&self) -> TaskId {
"circuit_breaker_checker".into()
}

async fn run_unconstrained(
mut self: Box<Self>,
stop_receiver: StopReceiver,
) -> anyhow::Result<()> {
async fn run(mut self: Box<Self>, stop_receiver: StopReceiver) -> anyhow::Result<()> {
self.circuit_breaker_checker.run(stop_receiver.0).await
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use zksync_node_api_server::healthcheck::HealthCheckHandle;
use crate::{
implementations::resources::healthcheck::AppHealthCheckResource,
service::{ServiceContext, StopReceiver},
task::{TaskId, UnconstrainedTask},
task::{Task, TaskId, TaskKind},
wiring_layer::{WiringError, WiringLayer},
};

Expand Down Expand Up @@ -41,7 +41,7 @@ impl WiringLayer for HealthCheckLayer {
app_health_check,
};

node.add_unconstrained_task(Box::new(task));
node.add_task(Box::new(task));
Ok(())
}
}
Expand All @@ -53,15 +53,16 @@ struct HealthCheckTask {
}

#[async_trait::async_trait]
impl UnconstrainedTask for HealthCheckTask {
impl Task for HealthCheckTask {
fn kind(&self) -> TaskKind {
TaskKind::UnconstrainedTask
}

fn id(&self) -> TaskId {
"healthcheck_server".into()
}

async fn run_unconstrained(
mut self: Box<Self>,
mut stop_receiver: StopReceiver,
) -> anyhow::Result<()> {
async fn run(mut self: Box<Self>, mut stop_receiver: StopReceiver) -> anyhow::Result<()> {
let handle =
HealthCheckHandle::spawn_server(self.config.bind_addr(), self.app_health_check.clone());
stop_receiver.0.changed().await?;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,8 @@ use zksync_types::{commitment::L1BatchCommitmentMode, Address};

use crate::{
implementations::resources::eth_interface::EthInterfaceResource,
precondition::Precondition,
service::{ServiceContext, StopReceiver},
task::TaskId,
task::{Task, TaskId, TaskKind},
wiring_layer::{WiringError, WiringLayer},
};

Expand Down Expand Up @@ -51,19 +50,23 @@ impl WiringLayer for L1BatchCommitmentModeValidationLayer {
query_client,
);

context.add_precondition(Box::new(task));
context.add_task(Box::new(task));

Ok(())
}
}

#[async_trait::async_trait]
impl Precondition for L1BatchCommitmentModeValidationTask {
impl Task for L1BatchCommitmentModeValidationTask {
fn kind(&self) -> TaskKind {
TaskKind::Precondition
}

fn id(&self) -> TaskId {
"l1_batch_commitment_mode_validation".into()
}

async fn check(self: Box<Self>, stop_receiver: StopReceiver) -> anyhow::Result<()> {
async fn run(self: Box<Self>, stop_receiver: StopReceiver) -> anyhow::Result<()> {
(*self).exit_on_success().run(stop_receiver.0).await
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use zksync_dal::{metrics::PostgresMetrics, ConnectionPool, Core};
use crate::{
implementations::resources::pools::{PoolResource, ReplicaPool},
service::{ServiceContext, StopReceiver},
task::{TaskId, UnconstrainedTask},
task::{Task, TaskId, TaskKind},
wiring_layer::{WiringError, WiringLayer},
};

Expand All @@ -32,7 +32,7 @@ impl WiringLayer for PostgresMetricsLayer {
async fn wire(self: Box<Self>, mut context: ServiceContext<'_>) -> Result<(), WiringError> {
let replica_pool_resource = context.get_resource::<PoolResource<ReplicaPool>>().await?;
let pool_for_metrics = replica_pool_resource.get_singleton().await?;
context.add_unconstrained_task(Box::new(PostgresMetricsScrapingTask { pool_for_metrics }));
context.add_task(Box::new(PostgresMetricsScrapingTask { pool_for_metrics }));

Ok(())
}
Expand All @@ -44,15 +44,16 @@ struct PostgresMetricsScrapingTask {
}

#[async_trait::async_trait]
impl UnconstrainedTask for PostgresMetricsScrapingTask {
impl Task for PostgresMetricsScrapingTask {
fn kind(&self) -> TaskKind {
TaskKind::UnconstrainedTask
}

fn id(&self) -> TaskId {
"postgres_metrics_scraping".into()
}

async fn run_unconstrained(
self: Box<Self>,
mut stop_receiver: StopReceiver,
) -> anyhow::Result<()> {
async fn run(self: Box<Self>, mut stop_receiver: StopReceiver) -> anyhow::Result<()> {
tokio::select! {
() = PostgresMetrics::run_scraping(self.pool_for_metrics, SCRAPE_INTERVAL) => {
tracing::warn!("Postgres metrics scraping unexpectedly stopped");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use zksync_vlog::prometheus::PrometheusExporterConfig;
use crate::{
implementations::resources::healthcheck::AppHealthCheckResource,
service::{ServiceContext, StopReceiver},
task::{TaskId, UnconstrainedTask},
task::{Task, TaskId, TaskKind},
wiring_layer::{WiringError, WiringLayer},
};

Expand Down Expand Up @@ -46,18 +46,22 @@ impl WiringLayer for PrometheusExporterLayer {
prometheus_health_updater,
});

node.add_unconstrained_task(task);
node.add_task(task);
Ok(())
}
}

#[async_trait::async_trait]
impl UnconstrainedTask for PrometheusExporterTask {
impl Task for PrometheusExporterTask {
fn kind(&self) -> TaskKind {
TaskKind::UnconstrainedTask
}

fn id(&self) -> TaskId {
"prometheus_exporter".into()
}

async fn run_unconstrained(self: Box<Self>, stop_receiver: StopReceiver) -> anyhow::Result<()> {
async fn run(self: Box<Self>, stop_receiver: StopReceiver) -> anyhow::Result<()> {
let prometheus_task = self.config.run(stop_receiver.0);
self.prometheus_health_updater
.update(HealthStatus::Ready.into());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,8 @@ use crate::{
main_node_client::MainNodeClientResource,
pools::{MasterPool, PoolResource},
},
precondition::Precondition,
service::{ServiceContext, StopReceiver},
task::TaskId,
task::{Task, TaskId, TaskKind},
wiring_layer::{WiringError, WiringLayer},
};

Expand Down Expand Up @@ -45,7 +44,7 @@ impl WiringLayer for ReorgDetectorCheckerLayer {
let pool = pool_resource.get().await?;

// Create and insert precondition.
context.add_precondition(Box::new(CheckerPrecondition {
context.add_task(Box::new(CheckerPrecondition {
pool: pool.clone(),
reorg_detector: ReorgDetector::new(main_node_client, pool),
}));
Expand All @@ -60,12 +59,16 @@ pub struct CheckerPrecondition {
}

#[async_trait::async_trait]
impl Precondition for CheckerPrecondition {
impl Task for CheckerPrecondition {
fn kind(&self) -> TaskKind {
TaskKind::Precondition
}

fn id(&self) -> TaskId {
"reorg_detector_checker".into()
}

async fn check(mut self: Box<Self>, mut stop_receiver: StopReceiver) -> anyhow::Result<()> {
async fn run(mut self: Box<Self>, mut stop_receiver: StopReceiver) -> anyhow::Result<()> {
// Given that this is a precondition -- i.e. something that starts before some invariants are met,
// we need to first ensure that there is at least one batch in the database (there may be none if
// either genesis or snapshot recovery has not been performed yet).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use crate::{
reverter::BlockReverterResource,
},
service::{ServiceContext, StopReceiver},
task::{TaskId, UnconstrainedOneshotTask},
task::{Task, TaskId, TaskKind},
wiring_layer::{WiringError, WiringLayer},
};

Expand Down Expand Up @@ -46,7 +46,7 @@ impl WiringLayer for ReorgDetectorRunnerLayer {
let reverter = context.get_resource::<BlockReverterResource>().await?.0;

// Create and insert task.
context.add_unconstrained_oneshot_task(Box::new(RunnerUnconstrainedOneshotTask {
context.add_task(Box::new(RunnerUnconstrainedOneshotTask {
reorg_detector: ReorgDetector::new(main_node_client, pool),
reverter,
}));
Expand All @@ -61,15 +61,16 @@ pub struct RunnerUnconstrainedOneshotTask {
}

#[async_trait::async_trait]
impl UnconstrainedOneshotTask for RunnerUnconstrainedOneshotTask {
impl Task for RunnerUnconstrainedOneshotTask {
fn kind(&self) -> TaskKind {
TaskKind::UnconstrainedOneshotTask
}

fn id(&self) -> TaskId {
"reorg_detector_runner".into()
}

async fn run_unconstrained_oneshot(
mut self: Box<Self>,
stop_receiver: StopReceiver,
) -> anyhow::Result<()> {
async fn run(mut self: Box<Self>, stop_receiver: StopReceiver) -> anyhow::Result<()> {
match self.reorg_detector.run_once(stop_receiver.0.clone()).await {
Ok(()) => {}
Err(zksync_reorg_detector::Error::ReorgDetected(last_correct_l1_batch)) => {
Expand Down
15 changes: 8 additions & 7 deletions core/node/node_framework/src/implementations/layers/sigint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use tokio::sync::oneshot;

use crate::{
service::{ServiceContext, StopReceiver},
task::{TaskId, UnconstrainedTask},
task::{Task, TaskId, TaskKind},
wiring_layer::{WiringError, WiringLayer},
};

Expand All @@ -23,7 +23,7 @@ impl WiringLayer for SigintHandlerLayer {

async fn wire(self: Box<Self>, mut node: ServiceContext<'_>) -> Result<(), WiringError> {
// SIGINT may happen at any time, so we must handle it as soon as it happens.
node.add_unconstrained_task(Box::new(SigintHandlerTask));
node.add_task(Box::new(SigintHandlerTask));
Ok(())
}
}
Expand All @@ -32,15 +32,16 @@ impl WiringLayer for SigintHandlerLayer {
struct SigintHandlerTask;

#[async_trait::async_trait]
impl UnconstrainedTask for SigintHandlerTask {
impl Task for SigintHandlerTask {
fn kind(&self) -> TaskKind {
TaskKind::UnconstrainedTask
}

fn id(&self) -> TaskId {
"sigint_handler".into()
}

async fn run_unconstrained(
self: Box<Self>,
mut stop_receiver: StopReceiver,
) -> anyhow::Result<()> {
async fn run(self: Box<Self>, mut stop_receiver: StopReceiver) -> anyhow::Result<()> {
let (sigint_sender, sigint_receiver) = oneshot::channel();
let mut sigint_sender = Some(sigint_sender); // Has to be done this way since `set_handler` requires `FnMut`.
ctrlc::set_handler(move || {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,8 @@ use crate::{
implementations::resources::{
eth_interface::EthInterfaceResource, main_node_client::MainNodeClientResource,
},
precondition::Precondition,
service::{ServiceContext, StopReceiver},
task::TaskId,
task::{Task, TaskId, TaskKind},
wiring_layer::{WiringError, WiringLayer},
};

Expand Down Expand Up @@ -54,19 +53,23 @@ impl WiringLayer for ValidateChainIdsLayer {
main_node_client,
);

context.add_precondition(Box::new(task));
context.add_task(Box::new(task));

Ok(())
}
}

#[async_trait::async_trait]
impl Precondition for ValidateChainIdsTask {
impl Task for ValidateChainIdsTask {
fn kind(&self) -> TaskKind {
TaskKind::Precondition
}

fn id(&self) -> TaskId {
"validate_chain_ids".into()
}

async fn check(self: Box<Self>, stop_receiver: StopReceiver) -> anyhow::Result<()> {
async fn run(self: Box<Self>, stop_receiver: StopReceiver) -> anyhow::Result<()> {
(*self).run_once(stop_receiver.0).await
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,6 @@ pub struct SyncStateResource(pub SyncState);

impl Resource for SyncStateResource {
fn name() -> String {
"sync_state".into()
"common/sync_state".into()
}
}
11 changes: 1 addition & 10 deletions core/node/node_framework/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,25 +1,16 @@
//! # ZK Stack node initialization framework.
//!
//! ## Introduction
//!
//! This crate provides core abstractions that allow one to compose a ZK Stack node.
//! Main concepts used in this crate are:
//! - [`WiringLayer`](wiring_layer::WiringLayer) - builder interface for tasks.
//! - [`Task`](task::Task) - a unit of work that can be executed by the node.
//! - [`Resource`](resource::Resource) - a piece of logic that can be shared between tasks. Most resources are
//! represented by generic interfaces and also serve as points of customization for tasks.
//! - [`ResourceProvider`](resource::ResourceProvider) - a trait that allows one to provide resources to the node.
//! - [`ZkStackService`](service::ZkStackService) - a container for tasks and resources that takes care of initialization, running
//! and shutting down.
//!
//! The general flow to compose a node is as follows:
//! - Create a [`ResourceProvider`](resource::ResourceProvider) that can provide all the resources that the node needs.
//! - Create a [`ZkStackService`](node::ZkStackService) with that [`ResourceProvider`](resource::ResourceProvider).
//! - Add tasks to the node.
//! - Run it.
//! - [`ZkStackServiceBuilder`](service::ZkStackServiceBuilder) - a builder for the service.
pub mod implementations;
pub mod precondition;
pub mod resource;
pub mod service;
pub mod task;
Expand Down
Loading

0 comments on commit 298a97e

Please sign in to comment.