Skip to content

Commit

Permalink
Cherry-pick TLS support to release-3.0, and set version to v3.0.11 (#44)
Browse files Browse the repository at this point in the history
* *: support TLS (#40)

* *: support TLS

Signed-off-by: kennytm <kennytm@gmail.com>

* tests/integrations: make the entire test run on TLS connection

also added a test to ensure non-TLS-client <-> TLS-server fails.

Signed-off-by: kennytm <kennytm@gmail.com>

* Cargo.toml: set version to 3.0.11

Signed-off-by: kennytm <kennytm@gmail.com>
  • Loading branch information
kennytm authored Mar 2, 2020
1 parent f818484 commit 6020574
Show file tree
Hide file tree
Showing 13 changed files with 219 additions and 155 deletions.
192 changes: 106 additions & 86 deletions Cargo.lock

Large diffs are not rendered by default.

16 changes: 8 additions & 8 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "tikv-importer"
version = "3.0.10"
version = "3.0.11"
authors = ["The TiKV Authors"]
description = "Tool to help ingesting large number of KV pairs into TiKV cluster"
license = "Apache-2.0"
Expand All @@ -22,32 +22,32 @@ path = "tests/integrations/import/mod.rs"
clap = "2.33"
crc = "1.8"
crossbeam = "0.5"
engine = { version = "0.0.1", git = "https://github.com/tikv/tikv.git", tag = "v3.0.9" }
engine = { version = "0.0.1", git = "https://github.com/tikv/tikv.git", tag = "v3.0.10" }
futures = "0.1"
futures-cpupool = "0.1"
grpcio = "0.4"
kvproto = { version = "0.0.1", git = "https://github.com/pingcap/kvproto.git", branch = "release-3.0" }
lazy_static = "1.3"
log_wrappers = { version = "0.0.1", git = "https://github.com/tikv/tikv.git", tag = "v3.0.9" }
log_wrappers = { version = "0.0.1", git = "https://github.com/tikv/tikv.git", tag = "v3.0.10" }
prometheus = { version = "0.4", default-features = false, features = ["nightly", "push", "process"] }
quick-error = "1.2"
serde = { version = "1", features = ["derive"] }
serde_json = "1"
slog = { version = "2.4", features = ["max_level_trace", "release_max_level_debug"] }
slog-global = { version = "0.1", git = "https://github.com/breeswish/slog-global.git", rev = "91904ade" }
tikv = { version = "3.0.2", git = "https://github.com/tikv/tikv.git", tag = "v3.0.9" }
tikv_util = { version = "0.1", git = "https://github.com/tikv/tikv.git", tag = "v3.0.9" }
tikv = { version = "3.0.10", git = "https://github.com/tikv/tikv.git", tag = "v3.0.10" }
tikv_util = { version = "0.1", git = "https://github.com/tikv/tikv.git", tag = "v3.0.10" }
uuid = { version = "0.6", features = [ "serde", "v4" ] }
tipb = { git = "https://github.com/pingcap/tipb.git" }
toml = "0.4"

# temporarily fix build break on lexical-core 0.4.0
static_assertions = { version = "=0.3.1" }
# temporarily fix build break on batch-system 0.1.0
crossbeam071 = { package = "crossbeam", version = "=0.7.1" }

[dev-dependencies]
tempdir = "0.3"
rand = "0.6"
test_util = { version = "0.0.1", git = "https://github.com/tikv/tikv.git", tag = "v3.0.9" }
test_util = { version = "0.0.1", git = "https://github.com/tikv/tikv.git", tag = "v3.0.10" }

[features]
default = ['tikv/default']
Expand Down
3 changes: 3 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,9 @@ audit: pre-audit
clean:
cargo clean

check:
cargo check --no-default-features --features "${ENABLE_FEATURES}"

build:
cargo build --no-default-features --features "${ENABLE_FEATURES}"

Expand Down
5 changes: 5 additions & 0 deletions etc/tikv-importer.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,11 @@ compression-per-level = ["lz4", "no", "no", "no", "no", "no", "lz4"]
[rocksdb.writecf]
compression-per-level = ["lz4", "no", "no", "no", "no", "no", "lz4"]

[security]
## The path for TLS certificates. Empty string means disabling secure connections.
# ca-path = ""
# cert-path = ""
# key-path = ""

[import]
# the directory to store importing kv data.
Expand Down
17 changes: 12 additions & 5 deletions src/import/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,13 +62,18 @@ pub struct Client {
env: Arc<Environment>,
channels: Mutex<HashMap<u64, Channel>>,
min_available_ratio: f64,
security_mgr: Arc<SecurityManager>,
}

impl Client {
pub fn new(pd_addr: &str, cq_count: usize, min_available_ratio: f64) -> Result<Client> {
pub fn new(
pd_addr: &str,
cq_count: usize,
min_available_ratio: f64,
security_mgr: Arc<SecurityManager>,
) -> Result<Client> {
let cfg = PdConfig::new(vec![pd_addr.to_owned()]);
let sec_mgr = SecurityManager::default();
let rpc_client = RpcClient::new(&cfg, Arc::new(sec_mgr))?;
let rpc_client = RpcClient::new(&cfg, security_mgr.clone())?;
let env = EnvBuilder::new()
.name_prefix("import-client")
.cq_count(cq_count)
Expand All @@ -78,6 +83,7 @@ impl Client {
env: Arc::new(env),
channels: Mutex::new(HashMap::default()),
min_available_ratio,
security_mgr,
})
}

Expand All @@ -94,8 +100,8 @@ impl Client {
HashMapEntry::Occupied(e) => Ok(e.get().clone()),
HashMapEntry::Vacant(e) => {
let store = self.pd.get_store(store_id)?;
let builder = ChannelBuilder::new(Arc::clone(&self.env));
let channel = builder.connect(store.get_address());
let builder = ChannelBuilder::new(self.env.clone());
let channel = self.security_mgr.connect(builder, store.get_address());
Ok(e.insert(channel).clone())
}
}
Expand Down Expand Up @@ -172,6 +178,7 @@ impl Clone for Client {
env: Arc::clone(&self.env),
channels: Mutex::new(HashMap::default()),
min_available_ratio: self.min_available_ratio,
security_mgr: self.security_mgr.clone(),
}
}
}
Expand Down
49 changes: 27 additions & 22 deletions src/import/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,23 +34,23 @@ use super::common::*;
use super::Result;
use crate::import::stream::SSTFile;
use engine::rocks::util::security::encrypted_env_from_cipher_file;
use tikv_util::security::SecurityConfig;
use tikv_util::security::SecurityManager;

/// Engine wraps rocksdb::DB with customized options to support efficient bulk
/// write.
pub struct Engine {
db: Arc<DB>,
uuid: Uuid,
db_cfg: DbConfig,
security_cfg: SecurityConfig,
security_mgr: Arc<SecurityManager>,
}

impl Engine {
pub fn new<P: AsRef<Path>>(
path: P,
uuid: Uuid,
db_cfg: DbConfig,
security_cfg: SecurityConfig,
security_mgr: Arc<SecurityManager>,
) -> Result<Engine> {
let db = {
let (db_opts, cf_opts) = tune_dboptions_for_bulk_load(&db_cfg);
Expand All @@ -60,7 +60,7 @@ impl Engine {
db: Arc::new(db),
uuid,
db_cfg,
security_cfg,
security_mgr,
})
}

Expand Down Expand Up @@ -96,7 +96,7 @@ impl Engine {
}

pub fn new_sst_writer(&self) -> Result<SSTWriter> {
SSTWriter::new(&self.db_cfg, &self.security_cfg, self.db.path())
SSTWriter::new(&self.db_cfg, &self.security_mgr, self.db.path())
}

pub fn get_size_properties(&self) -> Result<SizeProperties> {
Expand Down Expand Up @@ -221,12 +221,13 @@ pub struct SSTWriter {
}

impl SSTWriter {
pub fn new(db_cfg: &DbConfig, security_cfg: &SecurityConfig, path: &str) -> Result<SSTWriter> {
pub fn new(db_cfg: &DbConfig, security_mgr: &SecurityManager, path: &str) -> Result<SSTWriter> {
let mut env = Arc::new(Env::new_mem());
let mut base_env = None;
if !security_cfg.cipher_file.is_empty() {
let cipher_file = security_mgr.cipher_file();
if !cipher_file.is_empty() {
base_env = Some(Arc::clone(&env));
env = encrypted_env_from_cipher_file(&security_cfg.cipher_file, Some(env))?;
env = encrypted_env_from_cipher_file(&cipher_file, Some(env))?;
}
let uuid = Uuid::new_v4().to_string();
// Placeholder. SstFileWriter don't actually use block cache.
Expand Down Expand Up @@ -384,14 +385,17 @@ mod tests {
use tikv::raftstore::store::RegionSnapshot;
use tikv::storage::mvcc::MvccReader;
use tikv::storage::BlockCacheConfig;
use tikv_util::file::file_exists;
use tikv_util::{
file::file_exists,
security::{SecurityConfig, SecurityManager},
};

fn new_engine() -> (TempDir, Engine) {
let dir = TempDir::new("test_import_engine").unwrap();
let uuid = Uuid::new_v4();
let db_cfg = DbConfig::default();
let security_cfg = SecurityConfig::default();
let engine = Engine::new(dir.path(), uuid, db_cfg, security_cfg).unwrap();
let security_mgr = Arc::default();
let engine = Engine::new(dir.path(), uuid, db_cfg, security_mgr).unwrap();
(dir, engine)
}

Expand Down Expand Up @@ -429,33 +433,34 @@ mod tests {

#[test]
fn test_sst_writer() {
test_sst_writer_with(1, &[CF_WRITE], &SecurityConfig::default());
test_sst_writer_with(1024, &[CF_DEFAULT, CF_WRITE], &SecurityConfig::default());
test_sst_writer_with(1, &[CF_WRITE], &SecurityManager::default());
test_sst_writer_with(1024, &[CF_DEFAULT, CF_WRITE], &SecurityManager::default());

let temp_dir = TempDir::new("/tmp/encrypted_env_from_cipher_file").unwrap();
let security_cfg = create_security_cfg(&temp_dir);
test_sst_writer_with(1, &[CF_WRITE], &security_cfg);
test_sst_writer_with(1024, &[CF_DEFAULT, CF_WRITE], &security_cfg);
let security_mgr = create_security_mgr(&temp_dir);
test_sst_writer_with(1, &[CF_WRITE], &security_mgr);
test_sst_writer_with(1024, &[CF_DEFAULT, CF_WRITE], &security_mgr);
}

fn create_security_cfg(temp_dir: &TempDir) -> SecurityConfig {
fn create_security_mgr(temp_dir: &TempDir) -> SecurityManager {
let path = temp_dir.path().join("cipher_file");
let mut cipher_file = File::create(&path).unwrap();
cipher_file.write_all(b"ACFFDBCC").unwrap();
cipher_file.sync_all().unwrap();
let mut security_cfg = SecurityConfig::default();
security_cfg.cipher_file = path.to_str().unwrap().to_owned();
assert_eq!(file_exists(&security_cfg.cipher_file), true);
security_cfg
SecurityManager::new(&security_cfg).unwrap()
}

fn test_sst_writer_with(value_size: usize, cf_names: &[&str], security_cfg: &SecurityConfig) {
fn test_sst_writer_with(value_size: usize, cf_names: &[&str], security_mgr: &SecurityManager) {
let temp_dir = TempDir::new("_test_sst_writer").unwrap();

let cfg = DbConfig::default();
let mut db_opts = cfg.build_opt();
if !security_cfg.cipher_file.is_empty() {
let env = encrypted_env_from_cipher_file(&security_cfg.cipher_file, None).unwrap();
let cipher_file = security_mgr.cipher_file();
if !cipher_file.is_empty() {
let env = encrypted_env_from_cipher_file(&cipher_file, None).unwrap();
db_opts.set_env(env);
}
let cache = BlockCacheConfig::default().build_shared_cache();
Expand All @@ -465,7 +470,7 @@ mod tests {

let n = 10;
let commit_ts = 10;
let mut w = SSTWriter::new(&cfg, &security_cfg, temp_dir.path().to_str().unwrap()).unwrap();
let mut w = SSTWriter::new(&cfg, &security_mgr, temp_dir.path().to_str().unwrap()).unwrap();

// Write some keys.
let value = vec![1u8; value_size];
Expand Down
3 changes: 3 additions & 0 deletions src/import/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,9 @@ quick_error! {
ResourceTemporarilyUnavailable(msg: String) {
display("{}", msg)
}
Security(msg: String) {
display("{}", msg)
}
}
}

Expand Down
Loading

0 comments on commit 6020574

Please sign in to comment.