Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: (re)genesis graceful shutdown #1821

Merged
merged 56 commits into from
Apr 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
56 commits
Select commit Hold shift + click to select a range
ed3ab94
snapshot fragments for json work
segfault-magnet Mar 30, 2024
42e0763
parquet fragment support, cleanup pending
segfault-magnet Mar 30, 2024
58b04be
add tests for fragments
segfault-magnet Mar 30, 2024
f440bb9
comment out denies
segfault-magnet Mar 30, 2024
3fb1586
Merge branch 'master' into feature/parallel_snapshot_writing
segfault-magnet Mar 31, 2024
4936ed9
snapshot generation uses concurrent workers
segfault-magnet Apr 1, 2024
3a0b898
task_manager used for import/export of snapshot
segfault-magnet Apr 1, 2024
316b1c9
cleanup imports
segfault-magnet Apr 1, 2024
35449e6
use rayon in genesis importer
segfault-magnet Apr 1, 2024
633dc22
move files around
segfault-magnet Apr 2, 2024
b0fdfdc
enable deny lints, fix errors in chain-config
segfault-magnet Apr 2, 2024
1f0b3c3
wip, investigating features
segfault-magnet Apr 2, 2024
1804a7d
feature gate imports, fix unused deps
segfault-magnet Apr 2, 2024
1209564
ci checks
segfault-magnet Apr 2, 2024
e01d204
remove unused result
segfault-magnet Apr 2, 2024
892849f
use rayon for exporter
segfault-magnet Apr 2, 2024
c8c28f7
remove uuid dep
segfault-magnet Apr 2, 2024
90785f4
dry up fragments tests
segfault-magnet Apr 2, 2024
58a4e25
deduplicate tests
segfault-magnet Apr 2, 2024
cb27956
inline path
segfault-magnet Apr 2, 2024
4171e87
dedupe writer tests
segfault-magnet Apr 2, 2024
9d09510
restructure into import/export format
segfault-magnet Apr 2, 2024
d84bd72
format and cargo sort
segfault-magnet Apr 2, 2024
ebe836b
update change log
segfault-magnet Apr 2, 2024
6060bad
Merge branch 'master' into feature/parallel_snapshot_writing
segfault-magnet Apr 2, 2024
47e5432
entries filter
segfault-magnet Apr 4, 2024
3c1ee01
optimize
segfault-magnet Apr 4, 2024
b9c3906
shorten bounds
segfault-magnet Apr 5, 2024
992bcc0
Merge branch 'master' into feature/parallel_snapshot_writing
segfault-magnet Apr 5, 2024
ec85160
can cancel/resume regenesis, pending progress info and e2e tests
segfault-magnet Apr 6, 2024
1d046dc
wip
segfault-magnet Apr 8, 2024
c9301c6
Merge branch 'master' into feature/regenesis_graceful_shutdown
segfault-magnet Apr 10, 2024
6a70cca
Merge remote-tracking branch 'origin/master' into feature/regenesis_g…
segfault-magnet Apr 11, 2024
bebc214
progress with cli/logs behavior
segfault-magnet Apr 11, 2024
4468ab1
make reader tolerate missing tables
segfault-magnet Apr 11, 2024
5c8654e
reenable denies
segfault-magnet Apr 11, 2024
8aea56d
remove index from group.
segfault-magnet Apr 11, 2024
a2c72d5
use state watcher instead of spawning a tokio task
segfault-magnet Apr 11, 2024
d21628c
cleanup
segfault-magnet Apr 11, 2024
00c3af9
improve rendering
segfault-magnet Apr 11, 2024
1d39fa0
grammar
segfault-magnet Apr 11, 2024
92b89d0
revert debugging stuff
segfault-magnet Apr 11, 2024
aacb2b3
update change log
segfault-magnet Apr 11, 2024
35746c7
fix state watcher default only being available with test-helpers
segfault-magnet Apr 11, 2024
8f0e375
pub use Groups
segfault-magnet Apr 11, 2024
2da160b
reformat
segfault-magnet Apr 11, 2024
5100fd2
add is empty to groups
segfault-magnet Apr 11, 2024
541ec4f
refactor since enumerations implement n-th method of Iterator
segfault-magnet Apr 11, 2024
921d677
Merge branch 'master' into feature/regenesis_graceful_shutdown
xgreenx Apr 11, 2024
c06ac4c
Update CHANGELOG.md
segfault-magnet Apr 12, 2024
d0d10c7
use group_num instead of index in order for the final report to be
segfault-magnet Apr 12, 2024
3bde4fc
move indicatiff down the cargo toml
segfault-magnet Apr 12, 2024
278cb99
PR comments
segfault-magnet Apr 12, 2024
754e999
Merge branch 'master' into feature/regenesis_graceful_shutdown
xgreenx Apr 13, 2024
4f06887
Merge branch 'master' into feature/regenesis_graceful_shutdown
xgreenx Apr 15, 2024
1ceb372
Added a comment for follow up
xgreenx Apr 15, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,12 @@ Description of the upcoming release here.

### Fixed

- [#1821](https://github.com/FuelLabs/fuel-core/pull/1821): Can handle missing tables in snapshot.
- [#1814](https://github.com/FuelLabs/fuel-core/pull/1814): Bugfix: the `iter_all_by_prefix` was not working for all tables. The change adds a `Rust` level filtering.

### Added

- [#1821](https://github.com/FuelLabs/fuel-core/pull/1821): Propagate shutdown signal to (re)genesis. Also add progress bar for (re)genesis.
- [#1813](https://github.com/FuelLabs/fuel-core/pull/1813): Added back support for `/health` endpoint.
- [#1799](https://github.com/FuelLabs/fuel-core/pull/1799): Snapshot creation is now concurrent.

Expand Down
33 changes: 33 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ pin-project-lite = "0.2"
axum = "0.5"
once_cell = "1.16"
prometheus-client = "0.22.0"
indicatif = { version = "0.17", default-features = false }
itertools = { version = "0.12", default-features = false }
insta = "1.8"
tempfile = "3.4"
Expand Down
25 changes: 21 additions & 4 deletions bin/fuel-core/src/cli/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,10 @@ use anyhow::Context;
use clap::Parser;
use fuel_core::{
chain_config::default_consensus_dev_key,
combined_database::CombinedDatabaseConfig,
combined_database::{
CombinedDatabase,
CombinedDatabaseConfig,
},
producer::Config as ProducerConfig,
service::{
config::Trigger,
Expand Down Expand Up @@ -377,16 +380,30 @@ pub async fn exec(command: Command) -> anyhow::Result<()> {
info!("Fuel Core version v{}", env!("CARGO_PKG_VERSION"));
trace!("Initializing in TRACE mode.");
// initialize the server
let server = FuelService::new_node(config).await?;
let combined_database = CombinedDatabase::from_config(&config.combined_db_config)?;

let service = FuelService::new(combined_database, config)?;

// Genesis could take a long time depending on the snapshot size. Start needs to be
// interruptible by the shutdown_signal
tokio::select! {
result = service.start_and_await() => {
result?;
}
_ = shutdown_signal() => {
service.stop();
}
}

// pause the main task while service is running
tokio::select! {
result = server.await_stop() => {
result = service.await_stop() => {
result?;
}
_ = shutdown_signal() => {}
}

server.stop_and_await().await?;
service.stop_and_await().await?;

Ok(())
}
Expand Down
8 changes: 2 additions & 6 deletions bin/fuel-core/src/cli/snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -323,7 +323,7 @@ mod tests {
reader
.read::<T>()
.unwrap()
.map_ok(|group| group.data)
.into_iter()
.flatten_ok()
.try_collect()
.unwrap()
Expand Down Expand Up @@ -784,11 +784,7 @@ mod tests {
T::OwnedValue: serde::de::DeserializeOwned + core::fmt::Debug + PartialEq,
StateConfig: AsTable<T>,
{
let actual = reader
.read()
.unwrap()
.map(|group| group.unwrap().data)
.collect_vec();
let actual: Vec<_> = reader.read().unwrap().into_iter().try_collect().unwrap();

let expected = expected_data
.into_iter()
Expand Down
59 changes: 37 additions & 22 deletions crates/chain-config/src/config/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -418,47 +418,47 @@ impl StateConfig {

let coins = reader
.read::<Coins>()?
.map_ok(|batch| batch.data)
.into_iter()
.flatten_ok()
.try_collect()?;

builder.add(coins);

let messages = reader
.read::<Messages>()?
.map_ok(|batch| batch.data)
.into_iter()
.flatten_ok()
.try_collect()?;

builder.add(messages);

let contract_state = reader
.read::<ContractsState>()?
.map_ok(|batch| batch.data)
.into_iter()
.flatten_ok()
.try_collect()?;

builder.add(contract_state);

let contract_balance = reader
.read::<ContractsAssets>()?
.map_ok(|batch| batch.data)
.into_iter()
.flatten_ok()
.try_collect()?;

builder.add(contract_balance);

let contract_code = reader
.read::<ContractsRawCode>()?
.map_ok(|batch| batch.data)
.into_iter()
.flatten_ok()
.try_collect()?;

builder.add(contract_code);

let contract_utxo = reader
.read::<ContractsLatestUtxo>()?
.map_ok(|batch| batch.data)
.into_iter()
.flatten_ok()
.try_collect()?;

Expand Down Expand Up @@ -532,7 +532,8 @@ impl StateConfig {
}

pub use reader::{
IntoIter,
GroupIter,
Groups,
SnapshotReader,
};
#[cfg(feature = "parquet")]
Expand All @@ -544,13 +545,6 @@ pub use writer::{
};
pub const MAX_GROUP_SIZE: usize = usize::MAX;

#[derive(Debug, PartialEq, Eq, Clone)]
pub struct Group<T> {
pub index: usize,
pub data: Vec<T>,
}
pub(crate) type GroupResult<T> = anyhow::Result<Group<T>>;

#[cfg(test)]
mod tests {
use std::path::Path;
Expand Down Expand Up @@ -717,6 +711,25 @@ mod tests {
pretty_assertions::assert_eq!(da_block_height, da_block_height_decoded);
}

#[test_case::test_case(given_parquet_writer)]
#[test_case::test_case(given_json_writer)]
fn missing_tables_tolerated(writer: impl FnOnce(&Path) -> SnapshotWriter) {
// given
let temp_dir = tempfile::tempdir().unwrap();
let writer = writer(temp_dir.path());
let snapshot = writer
.close(13.into(), 14u64.into(), &ChainConfig::local_testnet())
.unwrap();

let reader = SnapshotReader::open(snapshot).unwrap();

// when
let coins = reader.read::<Coins>().unwrap();

// then
assert_eq!(coins.into_iter().count(), 0);
}

fn assert_roundtrip<T>(
writer: impl FnOnce(&Path) -> SnapshotWriter,
reader: impl FnOnce(SnapshotMetadata, usize) -> SnapshotReader,
Expand Down Expand Up @@ -755,7 +768,11 @@ mod tests {
.close(10.into(), DaBlockHeight(11), &ChainConfig::local_testnet())
.unwrap();

let actual_groups = reader(snapshot, group_size).read().unwrap().collect_vec();
let actual_groups = reader(snapshot, group_size)
.read()
.unwrap()
.into_iter()
.collect_vec();

// then
assert_groups_identical(&expected_groups, actual_groups, skip_n_groups);
Expand All @@ -779,7 +796,7 @@ mod tests {
fn write_groups<T>(
&mut self,
encoder: &mut SnapshotWriter,
) -> Vec<Group<TableEntry<T>>>
) -> Vec<Vec<TableEntry<T>>>
where
T: TableWithBlueprint,
T::OwnedKey: serde::Serialize,
Expand All @@ -789,29 +806,27 @@ mod tests {
{
let groups = self.generate_groups();
for group in &groups {
encoder.write(group.data.clone()).unwrap();
encoder.write(group.clone()).unwrap();
}
groups
}

fn generate_groups<T>(&mut self) -> Vec<Group<T>>
fn generate_groups<T>(&mut self) -> Vec<Vec<T>>
where
T: Randomize,
{
::std::iter::repeat_with(|| T::randomize(&mut self.rand))
.chunks(self.group_size)
.into_iter()
.map(|chunk| chunk.collect_vec())
.enumerate()
.map(|(index, data)| Group { index, data })
.take(self.num_groups)
.collect()
}
}

fn assert_groups_identical<T>(
original: &[Group<T>],
read: impl IntoIterator<Item = Result<Group<T>, anyhow::Error>>,
original: &[Vec<T>],
read: impl IntoIterator<Item = Result<Vec<T>, anyhow::Error>>,
skip: usize,
) where
Vec<T>: PartialEq,
Expand Down
3 changes: 1 addition & 2 deletions crates/chain-config/src/config/state/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ pub mod encode;

#[cfg(test)]
mod tests {
use crate::Group;
use bytes::Bytes;
use itertools::Itertools;
use parquet::{
Expand Down Expand Up @@ -128,7 +127,7 @@ mod tests {
let mut decoder = Decoder::new(bytes).unwrap();

// when
let _: Group<_> = decoder.nth(1).unwrap().unwrap();
let _: Vec<_> = decoder.nth(1).unwrap().unwrap();

// then
let actually_read = bytes_read.load(std::sync::atomic::Ordering::SeqCst);
Expand Down
24 changes: 7 additions & 17 deletions crates/chain-config/src/config/state/parquet/decode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,27 +14,20 @@ use parquet::{
record::RowAccessor,
};

use crate::config::state::{
Group,
GroupResult,
};

pub struct Decoder<R: ChunkReader> {
data_source: SerializedFileReader<R>,
group_index: usize,
}

pub trait Decode<T> {
fn decode(bytes: &[u8]) -> anyhow::Result<T>
where
Self: Sized;
}

impl<R> Decoder<R>
where
R: ChunkReader + 'static,
{
fn current_group(&self) -> anyhow::Result<Group<Vec<u8>>> {
pub fn num_groups(&self) -> usize {
self.data_source.num_row_groups()
}

fn current_group(&self) -> anyhow::Result<Vec<Vec<u8>>> {
let data = self
.data_source
.get_row_group(self.group_index)?
Expand All @@ -51,18 +44,15 @@ where
})
.collect::<Result<Vec<_>, _>>()?;

Ok(Group {
index: self.group_index,
data,
})
Ok(data)
}
}

impl<R> Iterator for Decoder<R>
where
R: ChunkReader + 'static,
{
type Item = GroupResult<Vec<u8>>;
type Item = anyhow::Result<Vec<Vec<u8>>>;

fn next(&mut self) -> Option<Self::Item> {
if self.group_index >= self.data_source.metadata().num_row_groups() {
Expand Down
Loading
Loading