Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

br-stream: use raft router to apply kv files for sst_importer #24

Merged
merged 4 commits into from
Feb 28, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion components/br-stream/src/endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ where
async fn starts_flush_ticks(router: Router) {
let ticker = tick(Duration::from_secs(FLUSH_STORAGE_INTERVAL / 5));
loop {
// wait 10s to trigger tick
// wait 1min to trigger tick
let _ = ticker.recv().unwrap();
debug!("backup stream trigger flush tick");
router.tick().await;
Expand Down
1 change: 1 addition & 0 deletions components/br-stream/src/event_loader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ impl<S: Snapshot> EventLoader<S> {
/// Like [`cdc::Initializer`], but supports initialize over range.
/// Note: maybe we can merge those two structures?
#[derive(Clone)]
#[allow(dead_code)]
pub struct InitialDataLoader<E, R, RT> {
router: RT,
regions: R,
Expand Down
1 change: 0 additions & 1 deletion components/br-stream/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
// Copyright 2021 TiKV Project Authors. Licensed under Apache-2.0.
#![feature(inherent_ascii_escape)]
pub mod config;
mod endpoint;
pub mod errors;
Expand Down
3 changes: 2 additions & 1 deletion components/br-stream/src/router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -355,7 +355,7 @@ impl RouterInner {
}

pub async fn unregister_task(&self, task_name: &str) {
if let Some(_) = self.tasks.lock().await.remove(task_name) {
if self.tasks.lock().await.remove(task_name).is_some() {
info!(
"backup stream unregister task";
"task" => task_name,
Expand Down Expand Up @@ -985,6 +985,7 @@ impl std::fmt::Debug for DataFile {
struct KeyRange(Vec<u8>);

#[derive(Clone, Debug)]
#[allow(dead_code)]
struct TaskRange {
end: Vec<u8>,
task_name: String,
Expand Down
1 change: 1 addition & 0 deletions components/server/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -985,6 +985,7 @@ impl<ER: RaftEngine> TiKVServer<ER> {
// Import SST service.
let import_service = ImportSSTService::new(
self.config.import.clone(),
self.config.raft_store.raft_entry_max_size,
self.router.clone(),
engines.engines.kv.clone(),
servers.importer.clone(),
Expand Down
83 changes: 27 additions & 56 deletions components/sst_importer/src/sst_importer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@ use kvproto::import_sstpb::*;
use encryption::{encryption_method_to_db_encryption_method, DataKeyManager};
use engine_rocks::{get_env, RocksSstReader};
use engine_traits::{
name_to_cf, CfName, EncryptionKeyManager, FileEncryptionInfo, Iterator, KvEngine, Mutable,
SSTMetaInfo, SeekKey, SstCompressionType, SstExt, SstReader, SstWriter, SstWriterBuilder,
WriteBatch, CF_DEFAULT, CF_WRITE,
name_to_cf, CfName, EncryptionKeyManager, FileEncryptionInfo, Iterator, KvEngine, SSTMetaInfo,
SeekKey, SstCompressionType, SstExt, SstReader, SstWriter, SstWriterBuilder, CF_DEFAULT,
CF_WRITE,
};
use file_system::{get_io_rate_limiter, OpenOptions};
use kvproto::kvrpcpb::ApiVersion;
Expand Down Expand Up @@ -141,38 +141,6 @@ impl SSTImporter {
self.dir.exist(meta).unwrap_or(false)
}

// Donwloads and apply a KV file from an external storage.
pub fn apply<E: KvEngine>(
&self,
meta: &KvMeta,
backend: &StorageBackend,
rewrite_rule: &RewriteRule,
speed_limiter: Limiter,
engine: E,
) -> Result<Option<Range>> {
debug!("apply start";
"url" => ?backend,
"meta" => ?meta,
"rewrite_rule" => ?rewrite_rule,
);
match self.do_download_and_apply::<E>(
meta,
backend,
rewrite_rule,
&speed_limiter,
engine,
) {
Ok(r) => {
info!("apply"; "meta" => ?meta, "range" => ?r);
Ok(r)
}
Err(e) => {
error!(%e; "apply failed"; "meta" => ?meta,);
Err(e)
}
}
}

// Downloads an SST file from an external storage.
//
// This method is blocking. It performs the following transformations before
Expand Down Expand Up @@ -294,20 +262,17 @@ impl SSTImporter {
Ok(())
}

fn do_download_and_apply<E: KvEngine>(
pub fn do_download_kv_file(
&self,
meta: &KvMeta,
backend: &StorageBackend,
rewrite_rule: &RewriteRule,
speed_limiter: &Limiter,
engine: E,
) -> Result<Option<Range>> {
) -> Result<PathBuf> {
let name = meta.get_name();
let cf = meta.get_cf();
let path = self.dir.get_import_path(name)?;
let start = Instant::now();
self.download_file_from_external_storage(
// current length is 0. which means won't check the file length.
// don't check file length after download file for now.
meta.get_length(),
name,
path.temp.clone(),
Expand All @@ -322,8 +287,18 @@ impl SSTImporter {
.with_label_values(&["download"])
.observe(start.saturating_elapsed().as_secs_f64());

// iterator `path.temp` file and performs rewrites and apply.
let file = File::open(path.temp)?;
Ok(path.temp)
}

pub fn do_apply_kv_file<P: AsRef<Path>>(
&self,
restore_ts: u64,
file_path: P,
rewrite_rule: &RewriteRule,
build_fn: &mut dyn FnMut(Vec<u8>, Vec<u8>),
) -> Result<Option<Range>> {
// iterator file and performs rewrites and apply.
let file = File::open(file_path)?;
let mut reader = BufReader::new(file);
let mut buffer = Vec::new();
reader.read_to_end(&mut buffer)?;
Expand All @@ -336,13 +311,12 @@ impl SSTImporter {
let perform_rewrite = old_prefix != new_prefix;

// perform iteration and key rewrite.
let mut key = keys::data_key(new_prefix);
let mut key = new_prefix.to_vec();
let new_prefix_data_key_len = key.len();
let mut smallest_key = None;
let mut largest_key = None;

let start = Instant::now();
let mut wb = engine.write_batch();
loop {
if !event_iter.valid() {
break;
Expand All @@ -351,11 +325,11 @@ impl SSTImporter {
let iter_key = event_iter.key().to_vec();

let ts = Key::decode_ts_from(&iter_key)?;
if ts > TimeStamp::new(meta.get_restore_ts()) {
// we assume the keys in file are sorted by ts.
// so if we met the key not satisfy the ts.
if ts > TimeStamp::new(restore_ts) {
// we assume the keys in file are sorted by ts.
// so if we met the key not satisfy the ts.
// we can easily filter the remain keys.
break
break;
}

smallest_key = smallest_key.map_or_else(
Expand Down Expand Up @@ -383,20 +357,17 @@ impl SSTImporter {

debug!(
"perform rewrite new key: {:?}, new key prefix: {:?}, old key prefix: {:?}",
log_wrappers::Value::key(keys::origin_key(&key)),
log_wrappers::Value::key(&key),
log_wrappers::Value::key(new_prefix),
log_wrappers::Value::key(old_prefix),
);
} else {
key = keys::data_key(event_iter.key());
key = event_iter.key().to_vec();
}
let value = Cow::Borrowed(event_iter.value());
// TODO handle delete cf
wb.put_cf(cf, &key, &value)?;
let value = event_iter.value().to_vec();
build_fn(key.clone(), value);
}
wb.write()?;
let label = if perform_rewrite { "rewrite" } else { "normal" };
info!("apply file finished {}", name);
IMPORTER_APPLY_DURATION
.with_label_values(&[label])
.observe(start.saturating_elapsed().as_secs_f64());
Expand Down
1 change: 1 addition & 0 deletions components/test_raftstore/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -373,6 +373,7 @@ impl Simulator for ServerCluster {
};
let import_service = ImportSSTService::new(
cfg.import.clone(),
cfg.raft_store.raft_entry_max_size,
sim_router.clone(),
engines.kv.clone(),
Arc::clone(&importer),
Expand Down
Loading