Skip to content

Commit

Permalink
Allow local storage support directory and partition the log files by …
Browse files Browse the repository at this point in the history
…table (tikv#33)

* move the files to some dirs

Signed-off-by: Yu Juncen <yu745514916@live.com>

* set failpoints when enabled

Signed-off-by: Yu Juncen <yu745514916@live.com>

* udpate the file name to {date}-{min_ts}-{uuid}

Signed-off-by: Yu Juncen <yu745514916@live.com>

* don't make test needs failpoints enabled

Signed-off-by: Yu Juncen <yu745514916@live.com>

* ignore already exists

Signed-off-by: Yu Juncen <yu745514916@live.com>

* create parent dirs for restoring

Signed-off-by: Yu Juncen <yu745514916@live.com>

* added version spec

Signed-off-by: Yu Juncen <yu745514916@live.com>

* run cargo check

Signed-off-by: Yu Juncen <yu745514916@live.com>
  • Loading branch information
YuJuncen authored Mar 22, 2022
1 parent 4f47f87 commit 179cf9e
Show file tree
Hide file tree
Showing 6 changed files with 137 additions and 43 deletions.
57 changes: 35 additions & 22 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ failpoints = [
"fail/failpoints",
"raftstore/failpoints",
"tikv_util/failpoints",
"engine_rocks/failpoints"
"engine_rocks/failpoints",
]
cloud-aws = [
"encryption_export/cloud-aws",
Expand Down
3 changes: 3 additions & 0 deletions components/backup-stream/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ default = ["test-engines-rocksdb"]
test-engines-rocksdb = [
"tikv/test-engines-rocksdb",
]
failpoints = ["fail/failpoints"]

[dependencies]
tokio = { version = "1.5", features = ["rt-multi-thread", "macros", "time", "sync"] }
Expand Down Expand Up @@ -41,6 +42,8 @@ openssl = "0.10"
dashmap = "5"
crossbeam = "0.8"
crossbeam-channel = "0.5"
chrono = "0.4"
fail = "0.4"

file_system = { path = "../file_system" }
tidb_query_datatype = {path = "../tidb_query_datatype", default-features = false}
Expand Down
46 changes: 36 additions & 10 deletions components/backup-stream/src/router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
use std::{
borrow::Borrow,
collections::HashMap,
fmt::Display,
io,
path::{Path, PathBuf},
result,
Expand Down Expand Up @@ -544,26 +545,44 @@ impl TempFileKey {
}
}

fn path_to_log_file(&self, min_ts: u64) -> String {
fn format_date_time(ts: u64) -> impl Display {
use chrono::prelude::*;
let millis = TimeStamp::physical(ts.into());
let dt = Utc.timestamp_millis(millis as _);
fail::fail_point!("stream_format_date_time", |s| {
return dt.format(&s.unwrap_or("%Y%m".to_owned())).to_string();
});
#[cfg(feature = "failpoints")]
return dt.format("%Y%m%d").to_string();
#[cfg(not(feature = "failpoints"))]
return dt.format("%Y%m%d");
}

fn path_to_log_file(&self, min_ts: u64, max_ts: u64) -> String {
format!(
// "/v1/t{:012}/{:012}-{}.log",
"v1_t{:012}_{:012}-{}.log",
"v1/t{:08}/{}-{:012}-{}.log",
self.table_id,
// We may delete a range of files, so using the max_ts for preventing remove some records wrong.
Self::format_date_time(max_ts),
min_ts,
uuid::Uuid::new_v4()
)
}

fn path_to_schema_file(min_ts: u64) -> String {
// format!("/v1/m/{:012}-{}.log", min_ts, uuid::Uuid::new_v4())
format!("v1_m{:012}-{}.log", min_ts, uuid::Uuid::new_v4())
fn path_to_schema_file(min_ts: u64, max_ts: u64) -> String {
format!(
"v1/schema-meta/{}-{:012}-{}.log",
Self::format_date_time(max_ts),
min_ts,
uuid::Uuid::new_v4(),
)
}

fn file_name(&self, min_ts: TimeStamp) -> String {
fn file_name(&self, min_ts: TimeStamp, max_ts: TimeStamp) -> String {
if self.is_meta {
Self::path_to_schema_file(min_ts.into_inner())
Self::path_to_schema_file(min_ts.into_inner(), max_ts.into_inner())
} else {
self.path_to_log_file(min_ts.into_inner())
self.path_to_log_file(min_ts.into_inner(), max_ts.into_inner())
}
}
}
Expand Down Expand Up @@ -997,7 +1016,7 @@ impl DataFile {

/// generate the metadata in protocol buffer of the file.
fn generate_metadata(&mut self, file_key: &TempFileKey) -> Result<DataFileInfo> {
self.set_storage_path(file_key.file_name(self.min_ts));
self.set_storage_path(file_key.file_name(self.min_ts, self.max_ts));

let mut meta = DataFileInfo::new();
meta.set_sha_256(
Expand Down Expand Up @@ -1300,4 +1319,11 @@ mod tests {
let rts = router.do_flush("nothing", 1, ts).await.unwrap();
assert_eq!(ts.into_inner(), rts);
}

#[test]
fn test_format_datetime() {
let s = TempFileKey::format_date_time(431656320867237891);
let s = s.to_string();
assert_eq!(s, "20220307");
}
}
6 changes: 6 additions & 0 deletions components/external_storage/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ extern crate slog_global;
#[allow(unused_extern_crates)]
extern crate tikv_alloc;

use std::fs;
use std::io::{self, Write};
use std::marker::Unpin;
use std::sync::Arc;
Expand Down Expand Up @@ -81,6 +82,11 @@ pub trait ExternalStorage: 'static + Send + Sync {
file_crypter: Option<FileEncryptionInfo>,
) -> io::Result<()> {
let reader = self.read(storage_name);
if let Some(p) = restore_name.parent() {
// try create all parent dirs from the path (optional).
// ignore the error because once it failed, it would soon be revealed by succeed steps.
let _ = fs::create_dir_all(p);
}
let output: &mut dyn Write = &mut File::create(restore_name)?;
// the minimum speed of reading data, in bytes/second.
// if reading speed is slower than this rate, we will stop with
Expand Down
Loading

0 comments on commit 179cf9e

Please sign in to comment.