Skip to content

Commit

Permalink
feat(node_framework): Synchronize pools layer with logic in initializ…
Browse files Browse the repository at this point in the history
…e_components (#2079)

## What ❔

There was some new logic related to the configuration of DAL/pools in
`initialize_components` that wasn't mirrored in the pools layer.
This PR changes the pools layer to match logic in
`initialize_components`.

## Why ❔

We do not intend the framework to change the way the server works.

## Checklist

<!-- Check your PR fulfills the following items. -->
<!-- For draft PRs check the boxes as you complete them. -->

- [ ] PR title corresponds to the body of PR (we generate changelog
entries from PRs).
- [ ] Tests for the changes have been added / updated.
- [ ] Documentation comments have been added / updated.
- [ ] Code has been formatted via `zk fmt` and `zk lint`.
- [ ] Spellcheck has been run via `zk spellcheck`.
  • Loading branch information
popzxc authored May 29, 2024
1 parent a6ec15c commit 3202461
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 18 deletions.
52 changes: 48 additions & 4 deletions core/node/node_framework/src/implementations/layers/pools_layer.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,14 @@
use std::sync::Arc;

use zksync_config::configs::{DatabaseSecrets, PostgresConfig};
use zksync_dal::{ConnectionPool, Core};
use zksync_db_connection::healthcheck::ConnectionPoolHealthCheck;

use crate::{
implementations::resources::pools::{MasterPool, PoolResource, ProverPool, ReplicaPool},
implementations::resources::{
healthcheck::AppHealthCheckResource,
pools::{MasterPool, PoolResource, ProverPool, ReplicaPool},
},
service::ServiceContext,
wiring_layer::{WiringError, WiringLayer},
};
Expand Down Expand Up @@ -74,30 +81,67 @@ impl WiringLayer for PoolsLayer {
));
}

if self.with_master || self.with_replica {
if let Some(threshold) = self.config.slow_query_threshold() {
ConnectionPool::<Core>::global_config().set_slow_query_threshold(threshold)?;
}
if let Some(threshold) = self.config.long_connection_threshold() {
ConnectionPool::<Core>::global_config().set_long_connection_threshold(threshold)?;
}
}

if self.with_master {
let pool_size = self.config.max_connections()?;
let pool_size_master = self.config.max_connections_master().unwrap_or(pool_size);

context.insert_resource(PoolResource::<MasterPool>::new(
self.secrets.master_url()?,
self.config.max_connections()?,
self.config.statement_timeout(),
pool_size_master,
None,
None,
))?;
}

if self.with_replica {
// We're most interested in setting acquire / statement timeouts for the API server, which puts the most load
// on Postgres.
context.insert_resource(PoolResource::<ReplicaPool>::new(
self.secrets.replica_url()?,
self.config.max_connections()?,
self.config.statement_timeout(),
self.config.acquire_timeout(),
))?;
}

if self.with_prover {
context.insert_resource(PoolResource::<ProverPool>::new(
self.secrets.prover_url()?,
self.config.max_connections()?,
self.config.statement_timeout(),
None,
None,
))?;
}

// Insert health checks for the core pool.
let connection_pool = if self.with_replica {
context
.get_resource::<PoolResource<ReplicaPool>>()
.await?
.get()
.await?
} else {
context
.get_resource::<PoolResource<MasterPool>>()
.await?
.get()
.await?
};
let db_health_check = ConnectionPoolHealthCheck::new(connection_pool);
let AppHealthCheckResource(app_health) = context.get_resource_or_default().await;
app_health
.insert_custom_component(Arc::new(db_health_check))
.map_err(WiringError::internal)?;

Ok(())
}
}
19 changes: 5 additions & 14 deletions core/node/node_framework/src/implementations/resources/pools.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use std::{
fmt,
sync::{
atomic::{AtomicU32, Ordering},
Arc,
Expand All @@ -16,28 +15,17 @@ use zksync_types::url::SensitiveUrl;
use crate::resource::Resource;

/// Represents a connection pool to a certain kind of database.
#[derive(Clone)]
#[derive(Debug, Clone)]
pub struct PoolResource<P: PoolKind> {
connections_count: Arc<AtomicU32>,
url: SensitiveUrl,
max_connections: u32,
statement_timeout: Option<Duration>,
acquire_timeout: Option<Duration>,
unbound_pool: Arc<Mutex<Option<ConnectionPool<P::DbMarker>>>>,
_kind: std::marker::PhantomData<P>,
}

impl<P: PoolKind> fmt::Debug for PoolResource<P> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("PoolResource")
.field("connections_count", &self.connections_count)
.field("url", &self.url)
.field("max_connections", &self.max_connections)
.field("statement_timeout", &self.statement_timeout)
.field("unbound_pool", &self.unbound_pool)
.finish_non_exhaustive()
}
}

impl<P: PoolKind> Resource for PoolResource<P> {
fn name() -> String {
format!("common/{}_pool", P::kind_str())
Expand All @@ -49,12 +37,14 @@ impl<P: PoolKind> PoolResource<P> {
url: SensitiveUrl,
max_connections: u32,
statement_timeout: Option<Duration>,
acquire_timeout: Option<Duration>,
) -> Self {
Self {
connections_count: Arc::new(AtomicU32::new(0)),
url,
max_connections,
statement_timeout,
acquire_timeout,
unbound_pool: Arc::new(Mutex::new(None)),
_kind: std::marker::PhantomData,
}
Expand All @@ -63,6 +53,7 @@ impl<P: PoolKind> PoolResource<P> {
fn builder(&self) -> ConnectionPoolBuilder<P::DbMarker> {
let mut builder = ConnectionPool::builder(self.url.clone(), self.max_connections);
builder.set_statement_timeout(self.statement_timeout);
builder.set_acquire_timeout(self.acquire_timeout);
builder
}

Expand Down

0 comments on commit 3202461

Please sign in to comment.