Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: rewritten gossip sync to be async from block processing #711

Merged
merged 32 commits into from
Jan 13, 2024
Merged
Show file tree
Hide file tree
Changes from 26 commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
bcc0c8d
strongly typed config
pompon0 Dec 13, 2023
26e43f1
WIP making it compile
pompon0 Dec 18, 2023
6f9539d
compiles with todos
pompon0 Dec 18, 2023
cb26d29
wip shifting payload to dal
pompon0 Dec 18, 2023
5ee4456
snapshot: before FinalBlock -> CommitQC
pompon0 Dec 19, 2023
b896301
simplified stuff
pompon0 Dec 19, 2023
dc103b0
snapshot
pompon0 Dec 19, 2023
0ea4776
last_in_batch in payload
pompon0 Dec 20, 2023
033d690
deduplicated gossip storage
pompon0 Dec 28, 2023
19850df
test db copying
pompon0 Dec 31, 2023
3039ef1
cloning db works
pompon0 Dec 31, 2023
93b7fa0
tests implemented, but fail
pompon0 Jan 1, 2024
31666b8
fast compilation
pompon0 Jan 2, 2024
2c3b870
consensus tests and documentation
pompon0 Jan 2, 2024
432ada4
Merge remote-tracking branch 'origin/main' into gprusak-consensus-con…
pompon0 Jan 3, 2024
261ad59
merged
pompon0 Jan 3, 2024
926cd26
zk fmt
pompon0 Jan 3, 2024
cc83a75
zk fmt, reverted en configuration
pompon0 Jan 3, 2024
27ed6b8
updated deps
pompon0 Jan 3, 2024
16e73f9
spellcheck
pompon0 Jan 3, 2024
827e2d1
removed unused deps
pompon0 Jan 3, 2024
4a9d4f3
applied comments
pompon0 Jan 3, 2024
ef6f949
added test of fetcher backfilling certs
pompon0 Jan 8, 2024
7e70bc0
removed the dal testonly method
pompon0 Jan 8, 2024
706559a
bumped era-consensus
pompon0 Jan 9, 2024
24ea1a6
Merge remote-tracking branch 'origin/main' into gprusak-consensus-con…
pompon0 Jan 10, 2024
5f12db2
missing changes
pompon0 Jan 10, 2024
3a6d733
sqlx queries
pompon0 Jan 10, 2024
7ec7094
applied comments
pompon0 Jan 10, 2024
dfa7497
lint
pompon0 Jan 10, 2024
622d26d
Merge branch 'main' into gprusak-consensus-config-work
pompon0 Jan 11, 2024
556e429
Merge branch 'main' into gprusak-consensus-config-work
pompon0 Jan 13, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions core/lib/dal/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@ zksync_system_constants = { path = "../constants" }
zksync_contracts = { path = "../contracts" }
zksync_types = { path = "../types" }
zksync_health_check = { path = "../health_check" }
zksync_consensus_roles = { version = "0.1.0", git = "https://github.com/matter-labs/era-consensus.git", rev = "84cdd9e45fd84bc1fac0b394c899ae33aef91afa" }
zksync_consensus_storage = { version = "0.1.0", git = "https://github.com/matter-labs/era-consensus.git", rev = "84cdd9e45fd84bc1fac0b394c899ae33aef91afa" }
zksync_protobuf = { version = "0.1.0", git = "https://github.com/matter-labs/era-consensus.git", rev = "84cdd9e45fd84bc1fac0b394c899ae33aef91afa" }
zksync_consensus_roles = { version = "0.1.0", git = "https://github.com/matter-labs/era-consensus.git", rev = "5727a3e0b22470bb90092388f9125bcb366df613" }
zksync_consensus_storage = { version = "0.1.0", git = "https://github.com/matter-labs/era-consensus.git", rev = "5727a3e0b22470bb90092388f9125bcb366df613" }
zksync_protobuf = { version = "0.1.0", git = "https://github.com/matter-labs/era-consensus.git", rev = "5727a3e0b22470bb90092388f9125bcb366df613" }

itertools = "0.10.1"
thiserror = "1.0"
Expand Down Expand Up @@ -55,4 +55,4 @@ tracing = "0.1"
assert_matches = "1.5.0"

[build-dependencies]
zksync_protobuf_build = { version = "0.1.0", git = "https://github.com/matter-labs/era-consensus.git", rev = "84cdd9e45fd84bc1fac0b394c899ae33aef91afa" }
zksync_protobuf_build = { version = "0.1.0", git = "https://github.com/matter-labs/era-consensus.git", rev = "5727a3e0b22470bb90092388f9125bcb366df613" }
2 changes: 1 addition & 1 deletion core/lib/dal/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ fn main() {
zksync_protobuf_build::Config {
input_root: "src/models/proto".into(),
proto_root: "zksync/dal".into(),
dependencies: vec!["::zksync_consensus_roles::proto".parse().unwrap()],
dependencies: vec![],
protobuf_crate: "::zksync_protobuf".parse().unwrap(),
is_public: true,
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
DROP TABLE miniblocks_consensus;
ALTER TABLE miniblocks ADD COLUMN consensus JSONB NULL;
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
ALTER TABLE miniblocks DROP COLUMN consensus;

CREATE TABLE miniblocks_consensus (
number BIGINT NOT NULL,
certificate JSONB NOT NULL,
PRIMARY KEY(number),
CHECK((certificate->'message'->'proposal'->'number')::jsonb::numeric = number),
slowli marked this conversation as resolved.
Show resolved Hide resolved
CONSTRAINT miniblocks_fk FOREIGN KEY(number)
REFERENCES miniblocks(number)
ON DELETE CASCADE
);
79 changes: 0 additions & 79 deletions core/lib/dal/src/blocks_dal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ use zksync_types::{
MAX_GAS_PER_PUBDATA_BYTE, U256,
};

pub use crate::models::storage_sync::ConsensusBlockFields;
use crate::{
instrument::InstrumentExt,
models::storage_block::{StorageL1Batch, StorageL1BatchHeader, StorageMiniblockHeader},
Expand Down Expand Up @@ -645,84 +644,6 @@ impl BlocksDal<'_, '_> {
Ok(())
}

/// Fetches the number of the last miniblock with consensus fields set.
/// Miniblocks with Consensus fields set constitute a prefix of sealed miniblocks,
/// so it is enough to traverse the miniblocks in descending order to find the last
/// with consensus fields.
///
/// If better efficiency is needed we can add an index on "miniblocks without consensus fields".
pub async fn get_last_miniblock_number_with_consensus_fields(
&mut self,
) -> anyhow::Result<Option<MiniblockNumber>> {
let Some(row) = sqlx::query!(
r#"
SELECT
number
FROM
miniblocks
WHERE
consensus IS NOT NULL
ORDER BY
number DESC
LIMIT
1
"#
)
.fetch_optional(self.storage.conn())
.await?
else {
return Ok(None);
};
Ok(Some(MiniblockNumber(row.number.try_into()?)))
}

/// Checks whether the specified miniblock has consensus field set.
pub async fn has_consensus_fields(&mut self, number: MiniblockNumber) -> sqlx::Result<bool> {
Ok(sqlx::query!(
r#"
SELECT
COUNT(*) AS "count!"
FROM
miniblocks
WHERE
number = $1
AND consensus IS NOT NULL
"#,
number.0 as i64
)
.fetch_one(self.storage.conn())
.await?
.count
> 0)
}

/// Sets consensus-related fields for the specified miniblock.
pub async fn set_miniblock_consensus_fields(
&mut self,
miniblock_number: MiniblockNumber,
consensus: &ConsensusBlockFields,
) -> anyhow::Result<()> {
let result = sqlx::query!(
r#"
UPDATE miniblocks
SET
consensus = $2
WHERE
number = $1
"#,
miniblock_number.0 as i64,
zksync_protobuf::serde::serialize(consensus, serde_json::value::Serializer).unwrap(),
)
.execute(self.storage.conn())
.await?;

anyhow::ensure!(
result.rows_affected() == 1,
"Miniblock #{miniblock_number} is not present in Postgres"
);
Ok(())
}

pub async fn get_last_sealed_miniblock_header(
&mut self,
) -> sqlx::Result<Option<MiniblockHeader>> {
Expand Down
154 changes: 90 additions & 64 deletions core/lib/dal/src/connection/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,6 @@ use crate::{metrics::CONNECTION_METRICS, StorageProcessor};

pub mod holder;

/// Obtains the test database URL from the environment variable.
fn get_test_database_url() -> anyhow::Result<String> {
env::var("TEST_DATABASE_URL").context(
"TEST_DATABASE_URL must be set. Normally, this is done by the 'zk' tool. \
Make sure that you are running the tests with 'zk test rust' command or equivalent.",
)
}

/// Builder for [`ConnectionPool`]s.
pub struct ConnectionPoolBuilder {
database_url: String,
Expand Down Expand Up @@ -68,85 +60,117 @@ impl ConnectionPoolBuilder {
statement_timeout = self.statement_timeout
);
Ok(ConnectionPool {
database_url: self.database_url.clone(),
inner: pool,
max_size: self.max_size,
})
}
}

/// Constructs a new temporary database (with a randomized name)
/// by cloning the database template pointed by TEST_DATABASE_URL env var.
/// The template is expected to have all migrations from dal/migrations applied.
/// For efficiency, the Postgres container of TEST_DATABASE_URL should be
/// configured with option "fsync=off" - it disables waiting for disk synchronization
/// whenever you write to the DBs, therefore making it as fast as an in-memory Postgres instance.
/// The database is not cleaned up automatically, but rather the whole Postgres
/// container is recreated whenever you call "zk test rust".
pub(super) async fn create_test_db() -> anyhow::Result<url::Url> {
use rand::Rng as _;
use sqlx::{Connection as _, Executor as _};

const PREFIX: &str = "test-";

let db_url = get_test_database_url().unwrap();
let mut db_url = url::Url::parse(&db_url)
.with_context(|| format!("{} is not a valid database address", db_url))?;
let db_name = db_url
.path()
.strip_prefix('/')
.with_context(|| format!("{} is not a valid database address", db_url.as_ref()))?
.to_string();
let db_copy_name = format!("{PREFIX}{}", rand::thread_rng().gen::<u64>());
db_url.set_path("");
let mut attempts = 10;
let mut conn = loop {
match sqlx::PgConnection::connect(db_url.as_ref()).await {
Ok(conn) => break conn,
Err(err) => {
attempts -= 1;
if attempts == 0 {
return Err(err).context("sqlx::PgConnection::connect()");
}
}
}
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
};
conn.execute(
format!("CREATE DATABASE \"{db_copy_name}\" WITH TEMPLATE \"{db_name}\"").as_str(),
)
.await
.context("failed to create a temporary database")?;
db_url.set_path(&db_copy_name);
Ok(db_url)
}

#[derive(Clone)]
pub struct ConnectionPool {
pub(crate) inner: PgPool,
database_url: String,
max_size: u32,
}

impl fmt::Debug for ConnectionPool {
fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
// We don't print the `database_url`, as is may contain
// sensistive information (e.g. database password).
formatter
.debug_struct("ConnectionPool")
.field("max_size", &self.max_size)
.finish_non_exhaustive()
}
}

impl ConnectionPool {
pub async fn test_pool() -> ConnectionPool {
let db_url = create_test_db()
pub struct TestTemplate(url::Url);

impl TestTemplate {
fn db_name(&self) -> &str {
self.0.path().strip_prefix('/').unwrap()
}

fn url(&self, db_name: &str) -> url::Url {
let mut url = self.0.clone();
url.set_path(db_name);
url
}

async fn connect_to(db_url: &url::Url) -> sqlx::Result<sqlx::PgConnection> {
use sqlx::Connection as _;
let mut attempts = 10;
loop {
match sqlx::PgConnection::connect(db_url.as_ref()).await {
Ok(conn) => return Ok(conn),
Err(err) => {
attempts -= 1;
if attempts == 0 {
return Err(err);
}
}
}
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
}
}

/// Obtains the test database URL from the environment variable.
pub fn empty() -> anyhow::Result<Self> {
let db_url = env::var("TEST_DATABASE_URL").context(
"TEST_DATABASE_URL must be set. Normally, this is done by the 'zk' tool. \
Make sure that you are running the tests with 'zk test rust' command or equivalent.",
)?;
Ok(Self(db_url.parse()?))
}

/// Closes the connection pool, disallows connecting to the underlying db,
/// so that the db can be used as a template.
pub async fn freeze(pool: ConnectionPool) -> anyhow::Result<Self> {
use sqlx::Executor as _;
let mut conn = pool.acquire_connection_retried().await?;
conn.execute(
"UPDATE pg_database SET datallowconn = false WHERE datname = current_database()",
)
.await
.context("SET dataallowconn = false")?;
drop(conn);
pool.inner.close().await;
Ok(Self(pool.database_url.parse()?))
}

/// Constructs a new temporary database (with a randomized name)
/// by cloning the database template pointed by TEST_DATABASE_URL env var.
/// The template is expected to have all migrations from dal/migrations applied.
/// For efficiency, the Postgres container of TEST_DATABASE_URL should be
/// configured with option "fsync=off" - it disables waiting for disk synchronization
/// whenever you write to the DBs, therefore making it as fast as an in-memory Postgres instance.
/// The database is not cleaned up automatically, but rather the whole Postgres
/// container is recreated whenever you call "zk test rust".
pub async fn create_db(&self) -> anyhow::Result<ConnectionPool> {
use rand::Rng as _;
use sqlx::Executor as _;

let mut conn = Self::connect_to(&self.url(""))
.await
.context("connect_to()")?;
let db_old = self.db_name();
let db_new = format!("test-{}", rand::thread_rng().gen::<u64>());
conn.execute(format!("CREATE DATABASE \"{db_new}\" WITH TEMPLATE \"{db_old}\"").as_str())
.await
.expect("Unable to prepare test database")
.to_string();
.context("CREATE DATABASE")?;

const TEST_MAX_CONNECTIONS: u32 = 50; // Expected to be enough for any unit test.
Self::builder(&db_url, TEST_MAX_CONNECTIONS)
ConnectionPool::builder(self.url(&db_new).as_ref(), TEST_MAX_CONNECTIONS)
.build()
.await
.unwrap()
.context("ConnectionPool::builder()")
}
}

impl ConnectionPool {
pub async fn test_pool() -> ConnectionPool {
TestTemplate::empty().unwrap().create_db().await.unwrap()
}

/// Initializes a builder for connection pools.
Expand Down Expand Up @@ -261,10 +285,12 @@ mod tests {

#[tokio::test]
async fn setting_statement_timeout() {
let db_url = create_test_db()
let db_url = TestTemplate::empty()
.unwrap()
.create_db()
.await
.expect("Unable to prepare test database")
.to_string();
.unwrap()
.database_url;

let pool = ConnectionPool::singleton(&db_url)
.set_statement_timeout(Some(Duration::from_secs(1)))
Expand Down
Loading
Loading