Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support to control memory usage for header map #2239

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions ckb-bin/src/subcommand/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,12 @@ pub fn run(args: RunArgs, version: Version) -> Result<(), ExitCode> {

let sync_shared = Arc::new(SyncShared::with_tmpdir(
shared.clone(),
args.config
.network
.sync
.as_ref()
.cloned()
.unwrap_or_default(),
args.config.tmp_dir.as_ref(),
));
let network_state = Arc::new(
Expand Down
1 change: 1 addition & 0 deletions network/src/protocols/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ fn net_service_start(name: String) -> Node {
upnp: false,
bootnode_mode: true,
max_send_buffer: None,
sync: None,
};

let network_state =
Expand Down
2 changes: 1 addition & 1 deletion rpc/src/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ fn setup_node(height: u64) -> (Shared, ChainController, RpcServer) {
.start(Some("rpc-test-network"))
.expect("Start network service failed")
};
let sync_shared = Arc::new(SyncShared::new(shared.clone()));
let sync_shared = Arc::new(SyncShared::new(shared.clone(), Default::default()));
let synchronizer = Synchronizer::new(chain_controller.clone(), Arc::clone(&sync_shared));
let indexer_store = {
let mut indexer_config = IndexerConfig::default();
Expand Down
1 change: 1 addition & 0 deletions sync/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ ckb-chain = { path = "../chain" }
ckb-shared = { path = "../shared" }
ckb-store = { path = "../store" }
ckb-db = { path = "../db" }
ckb-app-config = {path = "../util/app-config"}
ckb-types = {path = "../util/types"}
ckb-network = { path = "../network" }
ckb-logger = {path = "../util/logger"}
Expand Down
2 changes: 1 addition & 1 deletion sync/src/relayer/tests/helper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ pub(crate) fn build_chain(tip: BlockNumber) -> (Relayer, OutPoint) {
.expect("processing block should be ok");
}

let sync_shared = Arc::new(SyncShared::new(shared));
let sync_shared = Arc::new(SyncShared::new(shared, Default::default()));
(
Relayer::new(
chain_controller,
Expand Down
2 changes: 1 addition & 1 deletion sync/src/synchronizer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -739,7 +739,7 @@ mod tests {
}

fn gen_synchronizer(chain_controller: ChainController, shared: Shared) -> Synchronizer {
let shared = Arc::new(SyncShared::new(shared));
let shared = Arc::new(SyncShared::new(shared, Default::default()));
Synchronizer::new(chain_controller, shared)
}

Expand Down
5 changes: 4 additions & 1 deletion sync/src/tests/sync_shared.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,10 @@ fn test_insert_parent_unknown_block() {
let chain_service = ChainService::new(shared.clone(), table);
chain_service.start::<&str>(None)
};
(SyncShared::new(shared), chain_controller)
(
SyncShared::new(shared, Default::default()),
chain_controller,
)
};

let block = shared1
Expand Down
2 changes: 1 addition & 1 deletion sync/src/tests/synchronizer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ fn setup_node(height: u64) -> (TestNode, Shared) {
.expect("process block should be OK");
}

let sync_shared = Arc::new(SyncShared::new(shared.clone()));
let sync_shared = Arc::new(SyncShared::new(shared.clone(), Default::default()));
let synchronizer = Synchronizer::new(chain_controller, sync_shared);
let mut node = TestNode::default();
let protocol = Arc::new(RwLock::new(synchronizer)) as Arc<_>;
Expand Down
2 changes: 1 addition & 1 deletion sync/src/tests/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ pub fn build_chain(tip: BlockNumber) -> (SyncShared, ChainController) {
chain_service.start::<&str>(None)
};
generate_blocks(&shared, &chain_controller, tip);
let sync_shared = SyncShared::new(shared);
let sync_shared = SyncShared::new(shared, Default::default());
(sync_shared, chain_controller)
}

Expand Down
17 changes: 15 additions & 2 deletions sync/src/types/header_map/backend_rocksdb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::path;

use ckb_db::internal::{
ops::{Delete as _, GetPinned as _, Open as _, Put as _},
DB,
BlockBasedOptions, Options, DB,
};
use ckb_logger::{debug, warn};
use ckb_types::{packed::Byte32, prelude::*};
Expand Down Expand Up @@ -47,7 +47,20 @@ impl KeyValueBackend for RocksDBBackend {
builder.tempdir()
};
if let Ok(cache_dir) = cache_dir_res {
if let Ok(db) = DB::open_default(cache_dir.path()) {
// We minimize memory usage at all costs here.
// If we want to use more memory, we should increase the limit of KeyValueMemory.
let opts = {
let mut block_opts = BlockBasedOptions::default();
block_opts.disable_cache();
let mut opts = Options::default();
opts.create_if_missing(true);
opts.set_block_based_table_factory(&block_opts);
opts.set_write_buffer_size(4 * 1024 * 1024);
opts.set_max_write_buffer_number(2);
opts.set_min_write_buffer_number_to_merge(1);
opts
};
if let Ok(db) = DB::open(&opts, cache_dir.path()) {
debug!(
"open a key-value database({}) to save header map into disk",
cache_dir.path().to_str().unwrap_or("")
Expand Down
13 changes: 9 additions & 4 deletions sync/src/types/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use crate::{
MAX_TIP_AGE, NORMAL_INDEX, POW_INTERVAL, RETRY_ASK_TX_TIMEOUT_INCREASE, SUSPEND_SYNC_TIME,
TIME_TRACE_SIZE,
};
use ckb_app_config::SyncConfig;
use ckb_chain::chain::ChainController;
use ckb_chain_spec::consensus::Consensus;
use ckb_logger::{debug, debug_target, error, trace};
Expand Down Expand Up @@ -1174,11 +1175,11 @@ pub struct SyncShared {
}

impl SyncShared {
pub fn new(shared: Shared) -> SyncShared {
Self::with_tmpdir::<PathBuf>(shared, None)
pub fn new(shared: Shared, sync_config: SyncConfig) -> SyncShared {
Self::with_tmpdir::<PathBuf>(shared, sync_config, None)
}

pub fn with_tmpdir<P>(shared: Shared, tmpdir: Option<P>) -> SyncShared
pub fn with_tmpdir<P>(shared: Shared, sync_config: SyncConfig, tmpdir: Option<P>) -> SyncShared
where
P: AsRef<Path>,
{
Expand All @@ -1190,7 +1191,11 @@ impl SyncShared {
)
};
let shared_best_header = RwLock::new(HeaderView::new(header, total_difficulty));
let header_map = HeaderMap::new(tmpdir, 300_000, 20_000);
let header_map = HeaderMap::new(
tmpdir,
sync_config.header_map.primary_limit,
sync_config.header_map.backend_close_threshold,
);

let state = SyncState {
n_sync_started: AtomicUsize::new(0),
Expand Down
1 change: 1 addition & 0 deletions test/src/net.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ impl Net {
upnp: false,
bootnode_mode: false,
max_send_buffer: None,
sync: None,
};

let network_state =
Expand Down
2 changes: 1 addition & 1 deletion util/app-config/src/configs/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ pub use miner::{
ClientConfig as MinerClientConfig, Config as MinerConfig, DummyConfig, EaglesongSimpleConfig,
ExtraHashFunction, WorkerConfig as MinerWorkerConfig,
};
pub use network::Config as NetworkConfig;
pub use network::{Config as NetworkConfig, HeaderMapConfig, SyncConfig};
pub use network_alert::Config as NetworkAlertConfig;
pub use notify::Config as NotifyConfig;
pub use rpc::{Config as RpcConfig, Module as RpcModule};
Expand Down
24 changes: 24 additions & 0 deletions util/app-config/src/configs/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,30 @@ pub struct Config {
pub bootnode_mode: bool,
// Max send buffer size
pub max_send_buffer: Option<usize>,
pub sync: Option<SyncConfig>,
}

#[derive(Clone, Debug, Serialize, Deserialize, Default)]
pub struct SyncConfig {
#[serde(default)]
pub header_map: HeaderMapConfig,
}

#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct HeaderMapConfig {
yangby-cryptape marked this conversation as resolved.
Show resolved Hide resolved
// The maximum size of data in memory
pub primary_limit: usize,
// Disable cache if the size of data in memory less than this threshold
pub backend_close_threshold: usize,
}

impl Default for HeaderMapConfig {
fn default() -> Self {
Self {
primary_limit: 300_000,
backend_close_threshold: 20_000,
}
}
}

pub(crate) fn generate_random_key() -> [u8; 32] {
Expand Down