diff --git a/Makefile b/Makefile index a1be2ef..33bbc0d 100644 --- a/Makefile +++ b/Makefile @@ -62,6 +62,9 @@ audit: pre-audit clean: cargo clean +check: + cargo check --no-default-features --features "${ENABLE_FEATURES}" + build: cargo build --no-default-features --features "${ENABLE_FEATURES}" diff --git a/etc/tikv-importer.toml b/etc/tikv-importer.toml index 19d78f9..401343e 100644 --- a/etc/tikv-importer.toml +++ b/etc/tikv-importer.toml @@ -39,6 +39,11 @@ compression-per-level = ["lz4", "no", "no", "no", "no", "no", "lz4"] [rocksdb.writecf] compression-per-level = ["lz4", "no", "no", "no", "no", "no", "lz4"] +[security] +## The path for TLS certificates. Empty string means disabling secure connections. +# ca-path = "" +# cert-path = "" +# key-path = "" [import] # the directory to store importing kv data. diff --git a/src/import/client.rs b/src/import/client.rs index a12b789..0035d6f 100644 --- a/src/import/client.rs +++ b/src/import/client.rs @@ -15,9 +15,9 @@ use kvproto::pdpb::OperatorStatus; use kvproto::tikvpb::TikvClient; use pd_client::{Config as PdConfig, Error as PdError, PdClient, RegionInfo, RpcClient}; -use txn_types::Key; use tikv_util::collections::{HashMap, HashMapEntry}; use tikv_util::security::SecurityManager; +use txn_types::Key; use super::common::*; use super::{Error, Result}; @@ -61,13 +61,18 @@ pub struct Client { env: Arc, channels: Mutex>, min_available_ratio: f64, + security_mgr: Arc, } impl Client { - pub fn new(pd_addr: &str, cq_count: usize, min_available_ratio: f64) -> Result { + pub fn new( + pd_addr: &str, + cq_count: usize, + min_available_ratio: f64, + security_mgr: Arc, + ) -> Result { let cfg = PdConfig::new(vec![pd_addr.to_owned()]); - let sec_mgr = SecurityManager::default(); - let rpc_client = RpcClient::new(&cfg, Arc::new(sec_mgr))?; + let rpc_client = RpcClient::new(&cfg, security_mgr.clone())?; let env = EnvBuilder::new() .name_prefix("import-client") .cq_count(cq_count) @@ -77,6 +82,7 @@ impl Client { env: Arc::new(env), channels: Mutex::new(HashMap::default()), min_available_ratio, + security_mgr, }) } @@ -93,8 +99,8 @@ impl Client { HashMapEntry::Occupied(e) => Ok(e.get().clone()), HashMapEntry::Vacant(e) => { let store = self.pd.get_store(store_id)?; - let builder = ChannelBuilder::new(Arc::clone(&self.env)); - let channel = builder.connect(store.get_address()); + let builder = ChannelBuilder::new(self.env.clone()); + let channel = self.security_mgr.connect(builder, store.get_address()); Ok(e.insert(channel).clone()) } } @@ -171,6 +177,7 @@ impl Clone for Client { env: Arc::clone(&self.env), channels: Mutex::new(HashMap::default()), min_available_ratio: self.min_available_ratio, + security_mgr: self.security_mgr.clone(), } } } diff --git a/src/import/engine.rs b/src/import/engine.rs index c079277..dae43aa 100644 --- a/src/import/engine.rs +++ b/src/import/engine.rs @@ -11,8 +11,8 @@ use std::sync::Arc; use crc::crc32::{self, Hasher32}; use uuid::Uuid; -use kvproto::import_kvpb::*; use kvproto::import_kvpb::mutation::Op as MutationOp; +use kvproto::import_kvpb::*; use kvproto::import_sstpb::*; use engine::rocks::util::{new_engine_opt, CFOptions}; @@ -28,14 +28,14 @@ use tikv::raftstore::coprocessor::properties::{ IndexHandle, RangeProperties, RangePropertiesCollectorFactory, SizeProperties, }; use tikv::storage::mvcc::{Write, WriteType}; -use txn_types::{is_short_value, Key, TimeStamp}; use tikv_util::config::MB; +use txn_types::{is_short_value, Key, TimeStamp}; use super::common::*; use super::Result; use crate::import::stream::SSTFile; use engine::rocks::util::security::encrypted_env_from_cipher_file; -use tikv_util::security::SecurityConfig; +use tikv_util::security::SecurityManager; /// Engine wraps rocksdb::DB with customized options to support efficient bulk /// write. @@ -43,7 +43,7 @@ pub struct Engine { db: Arc, uuid: Uuid, db_cfg: DbConfig, - security_cfg: SecurityConfig, + security_mgr: Arc, } impl Engine { @@ -51,7 +51,7 @@ impl Engine { path: P, uuid: Uuid, db_cfg: DbConfig, - security_cfg: SecurityConfig, + security_mgr: Arc, ) -> Result { let db = { let (db_opts, cf_opts) = tune_dboptions_for_bulk_load(&db_cfg); @@ -61,7 +61,7 @@ impl Engine { db: Arc::new(db), uuid, db_cfg, - security_cfg, + security_mgr, }) } @@ -113,7 +113,7 @@ impl Engine { } pub fn new_sst_writer(&self) -> Result { - SSTWriter::new(&self.db_cfg, &self.security_cfg, self.db.path()) + SSTWriter::new(&self.db_cfg, &self.security_mgr, self.db.path()) } pub fn get_size_properties(&self) -> Result { @@ -259,12 +259,13 @@ pub struct SSTWriter { } impl SSTWriter { - pub fn new(db_cfg: &DbConfig, security_cfg: &SecurityConfig, path: &str) -> Result { + pub fn new(db_cfg: &DbConfig, security_mgr: &SecurityManager, path: &str) -> Result { let mut env = Arc::new(Env::new_mem()); let mut base_env = None; - if !security_cfg.cipher_file.is_empty() { + let cipher_file = security_mgr.cipher_file(); + if !cipher_file.is_empty() { base_env = Some(Arc::clone(&env)); - env = encrypted_env_from_cipher_file(&security_cfg.cipher_file, Some(env))?; + env = encrypted_env_from_cipher_file(&cipher_file, Some(env))?; } let uuid = Uuid::new_v4().to_string(); // Placeholder. SstFileWriter don't actually use block cache. @@ -425,16 +426,19 @@ mod tests { RngCore, SeedableRng, }; use tikv::raftstore::store::RegionSnapshot; - use tikv::storage::mvcc::MvccReader; use tikv::storage::config::BlockCacheConfig; - use tikv_util::file::file_exists; + use tikv::storage::mvcc::MvccReader; + use tikv_util::{ + file::file_exists, + security::{SecurityConfig, SecurityManager}, + }; fn new_engine() -> (TempDir, Engine) { let dir = TempDir::new("test_import_engine").unwrap(); let uuid = Uuid::new_v4(); let db_cfg = DbConfig::default(); - let security_cfg = SecurityConfig::default(); - let engine = Engine::new(dir.path(), uuid, db_cfg, security_cfg).unwrap(); + let security_mgr = Arc::default(); + let engine = Engine::new(dir.path(), uuid, db_cfg, security_mgr).unwrap(); (dir, engine) } @@ -499,16 +503,16 @@ mod tests { #[test] fn test_sst_writer() { - test_sst_writer_with(1, &[CF_WRITE], &SecurityConfig::default()); - test_sst_writer_with(1024, &[CF_DEFAULT, CF_WRITE], &SecurityConfig::default()); + test_sst_writer_with(1, &[CF_WRITE], &SecurityManager::default()); + test_sst_writer_with(1024, &[CF_DEFAULT, CF_WRITE], &SecurityManager::default()); let temp_dir = TempDir::new("/tmp/encrypted_env_from_cipher_file").unwrap(); - let security_cfg = create_security_cfg(&temp_dir); - test_sst_writer_with(1, &[CF_WRITE], &security_cfg); - test_sst_writer_with(1024, &[CF_DEFAULT, CF_WRITE], &security_cfg); + let security_mgr = create_security_mgr(&temp_dir); + test_sst_writer_with(1, &[CF_WRITE], &security_mgr); + test_sst_writer_with(1024, &[CF_DEFAULT, CF_WRITE], &security_mgr); } - fn create_security_cfg(temp_dir: &TempDir) -> SecurityConfig { + fn create_security_mgr(temp_dir: &TempDir) -> SecurityManager { let path = temp_dir.path().join("cipher_file"); let mut cipher_file = File::create(&path).unwrap(); cipher_file.write_all(b"ACFFDBCC").unwrap(); @@ -516,16 +520,17 @@ mod tests { let mut security_cfg = SecurityConfig::default(); security_cfg.cipher_file = path.to_str().unwrap().to_owned(); assert_eq!(file_exists(&security_cfg.cipher_file), true); - security_cfg + SecurityManager::new(&security_cfg).unwrap() } - fn test_sst_writer_with(value_size: usize, cf_names: &[&str], security_cfg: &SecurityConfig) { + fn test_sst_writer_with(value_size: usize, cf_names: &[&str], security_mgr: &SecurityManager) { let temp_dir = TempDir::new("_test_sst_writer").unwrap(); let cfg = DbConfig::default(); let mut db_opts = cfg.build_opt(); - if !security_cfg.cipher_file.is_empty() { - let env = encrypted_env_from_cipher_file(&security_cfg.cipher_file, None).unwrap(); + let cipher_file = security_mgr.cipher_file(); + if !cipher_file.is_empty() { + let env = encrypted_env_from_cipher_file(&cipher_file, None).unwrap(); db_opts.set_env(env); } let cache = BlockCacheConfig::default().build_shared_cache(); @@ -535,7 +540,7 @@ mod tests { let n = 10; let commit_ts = 10; - let mut w = SSTWriter::new(&cfg, &security_cfg, temp_dir.path().to_str().unwrap()).unwrap(); + let mut w = SSTWriter::new(&cfg, &security_mgr, temp_dir.path().to_str().unwrap()).unwrap(); // Write some keys. let value = vec![1u8; value_size]; diff --git a/src/import/errors.rs b/src/import/errors.rs index 09db4a6..19e78a4 100644 --- a/src/import/errors.rs +++ b/src/import/errors.rs @@ -107,6 +107,9 @@ quick_error! { ResourceTemporarilyUnavailable(msg: String) { display("{}", msg) } + Security(msg: String) { + display("{}", msg) + } } } diff --git a/src/import/kv_importer.rs b/src/import/kv_importer.rs index aaab662..1c7f2f9 100644 --- a/src/import/kv_importer.rs +++ b/src/import/kv_importer.rs @@ -15,7 +15,7 @@ use super::client::*; use super::engine::*; use super::import::*; use super::{Config, Error, Result}; -use tikv_util::security::SecurityConfig; +use tikv_util::security::SecurityManager; pub struct Inner { engines: HashMap>, @@ -27,11 +27,16 @@ pub struct KVImporter { cfg: Config, dir: EngineDir, inner: Mutex, + pub(super) security_mgr: Arc, } impl KVImporter { - pub fn new(cfg: Config, db_cfg: DbConfig, security_cfg: SecurityConfig) -> Result { - let dir = EngineDir::new(&cfg.import_dir, db_cfg, security_cfg)?; + pub fn new( + cfg: Config, + db_cfg: DbConfig, + security_mgr: Arc, + ) -> Result { + let dir = EngineDir::new(&cfg.import_dir, db_cfg, security_mgr.clone())?; Ok(KVImporter { cfg, dir, @@ -39,6 +44,7 @@ impl KVImporter { engines: HashMap::default(), import_jobs: HashMap::default(), }), + security_mgr, }) } @@ -117,6 +123,7 @@ impl KVImporter { pd_addr, self.cfg.num_import_jobs, self.cfg.min_available_ratio, + self.security_mgr.clone(), )?; let job = { let mut inner = self.inner.lock().unwrap(); @@ -186,7 +193,7 @@ impl KVImporter { /// is completed, the files are stored in `$root/$uuid`. pub struct EngineDir { db_cfg: DbConfig, - security_cfg: SecurityConfig, + security_mgr: Arc, root_dir: PathBuf, temp_dir: PathBuf, } @@ -197,7 +204,7 @@ impl EngineDir { fn new>( root: P, db_cfg: DbConfig, - security_cfg: SecurityConfig, + security_mgr: Arc, ) -> Result { let root_dir = root.as_ref().to_owned(); let temp_dir = root_dir.join(Self::TEMP_DIR); @@ -207,7 +214,7 @@ impl EngineDir { fs::create_dir_all(&temp_dir)?; Ok(EngineDir { db_cfg, - security_cfg, + security_mgr, root_dir, temp_dir, }) @@ -229,7 +236,7 @@ impl EngineDir { if path.save.exists() { return Err(Error::FileExists(path.save)); } - EngineFile::new(uuid, path, self.db_cfg.clone(), self.security_cfg.clone()) + EngineFile::new(uuid, path, self.db_cfg.clone(), self.security_mgr.clone()) } /// Creates an engine from `$root/$uuid` for importing data. @@ -239,7 +246,7 @@ impl EngineDir { &path.save, uuid, self.db_cfg.clone(), - self.security_cfg.clone(), + self.security_mgr.clone(), ) } @@ -287,9 +294,9 @@ impl EngineFile { uuid: Uuid, path: EnginePath, db_cfg: DbConfig, - security_cfg: SecurityConfig, + security_mgr: Arc, ) -> Result { - let engine = Engine::new(&path.temp, uuid, db_cfg, security_cfg)?; + let engine = Engine::new(&path.temp, uuid, db_cfg, security_mgr)?; Ok(EngineFile { uuid, path, @@ -356,8 +363,7 @@ mod tests { let mut cfg = Config::default(); cfg.import_dir = temp_dir.path().to_str().unwrap().to_owned(); - let importer = - KVImporter::new(cfg, DbConfig::default(), SecurityConfig::default()).unwrap(); + let importer = KVImporter::new(cfg, DbConfig::default(), Arc::default()).unwrap(); let uuid = Uuid::new_v4(); // Can not bind to an unopened engine. @@ -380,7 +386,7 @@ mod tests { let uuid = Uuid::new_v4(); let db_cfg = DbConfig::default(); - let security_cfg = SecurityConfig::default(); + let security_mgr = Arc::::default(); let path = EnginePath { save: temp_dir.path().join("save"), temp: temp_dir.path().join("temp"), @@ -389,10 +395,10 @@ mod tests { // Test close. { let mut f = - EngineFile::new(uuid, path.clone(), db_cfg.clone(), security_cfg.clone()).unwrap(); + EngineFile::new(uuid, path.clone(), db_cfg.clone(), security_mgr.clone()).unwrap(); // Cannot create the same file again. assert!( - EngineFile::new(uuid, path.clone(), db_cfg.clone(), security_cfg.clone()).is_err() + EngineFile::new(uuid, path.clone(), db_cfg.clone(), security_mgr.clone()).is_err() ); assert!(path.temp.exists()); assert!(!path.save.exists()); @@ -405,7 +411,7 @@ mod tests { // Test cleanup. { let f = - EngineFile::new(uuid, path.clone(), db_cfg.clone(), security_cfg.clone()).unwrap(); + EngineFile::new(uuid, path.clone(), db_cfg.clone(), security_mgr.clone()).unwrap(); assert!(path.temp.exists()); assert!(!path.save.exists()); drop(f); diff --git a/src/import/kv_server.rs b/src/import/kv_server.rs index f4d36a8..3fe1e5c 100644 --- a/src/import/kv_server.rs +++ b/src/import/kv_server.rs @@ -7,7 +7,7 @@ use std::sync::Arc; use grpcio::{ChannelBuilder, EnvBuilder, Server as GrpcServer, ServerBuilder}; use kvproto::import_kvpb::create_import_kv; -use tikv_util::thd_name; +use tikv_util::{security::SecurityManager, thd_name}; use super::{ImportKVService, KVImporter, TiKvConfig}; @@ -22,10 +22,12 @@ impl ImportKVServer { let cfg = &tikv.server; let addr = SocketAddr::from_str(&cfg.addr).unwrap(); + let security_mgr = Arc::new(SecurityManager::new(&tikv.security).unwrap()); + let importer = KVImporter::new( tikv.import.clone(), tikv.rocksdb.clone(), - tikv.security.clone(), + security_mgr.clone(), ) .unwrap(); let import_service = ImportKVService::new(tikv.import.clone(), Arc::new(importer)); @@ -44,8 +46,12 @@ impl ImportKVServer { .max_receive_message_len(-1) .build_args(); - let grpc_server = ServerBuilder::new(Arc::clone(&env)) - .bind(format!("{}", addr.ip()), addr.port()) + let grpc_server = security_mgr + .bind( + ServerBuilder::new(env.clone()), + &addr.ip().to_string(), + addr.port(), + ) .channel_args(channel_args) .register_service(create_import_kv(import_service)) .build() diff --git a/src/import/kv_service.rs b/src/import/kv_service.rs index 48846c9..18b1e02 100644 --- a/src/import/kv_service.rs +++ b/src/import/kv_service.rs @@ -9,8 +9,8 @@ use grpcio::{ClientStreamingSink, RequestStream, RpcContext, UnarySink}; use kvproto::import_kvpb::*; use uuid::Uuid; -use txn_types::Key; use tikv_util::time::Instant; +use txn_types::Key; use super::client::*; use super::metrics::{self, *}; @@ -71,11 +71,12 @@ impl ImportKv for ImportKVService { let label = "switch_mode"; let timer = Instant::now_coarse(); let min_available_ratio = self.cfg.min_available_ratio; + let security_mgr = self.importer.security_mgr.clone(); ctx.spawn( self.threads .spawn_fn(move || { - let client = Client::new(req.get_pd_addr(), 1, min_available_ratio)?; + let client = Client::new(req.get_pd_addr(), 1, min_available_ratio, security_mgr)?; match client.switch_cluster(req.get_request()) { Ok(_) => { info!("switch cluster"; "req" => ?req.get_request()); @@ -294,6 +295,7 @@ impl ImportKv for ImportKVService { let label = "compact_cluster"; let timer = Instant::now_coarse(); let min_available_ratio = self.cfg.min_available_ratio; + let security_mgr = self.importer.security_mgr.clone(); let mut compact = req.get_request().clone(); if compact.has_range() { @@ -311,7 +313,8 @@ impl ImportKv for ImportKVService { ctx.spawn( self.threads .spawn_fn(move || { - let client = Client::new(req.get_pd_addr(), 1, min_available_ratio)?; + let client = + Client::new(req.get_pd_addr(), 1, min_available_ratio, security_mgr)?; match client.compact_cluster(&compact) { Ok(_) => { info!("compact cluster"; "req" => ?compact); diff --git a/src/import/prepare.rs b/src/import/prepare.rs index 2c25e55..80e2283 100644 --- a/src/import/prepare.rs +++ b/src/import/prepare.rs @@ -328,7 +328,6 @@ mod tests { use tikv::config::DbConfig; use txn_types::Key; - use tikv_util::security::SecurityConfig; fn new_encoded_key(k: &[u8]) -> Vec { if k.is_empty() { @@ -343,8 +342,8 @@ mod tests { let dir = TempDir::new("test_import_prepare_job").unwrap(); let uuid = Uuid::new_v4(); let db_cfg = DbConfig::default(); - let security_cfg = SecurityConfig::default(); - let engine = Arc::new(Engine::new(dir.path(), uuid, db_cfg, security_cfg).unwrap()); + let security_mgr = Arc::default(); + let engine = Arc::new(Engine::new(dir.path(), uuid, db_cfg, security_mgr).unwrap()); // Generate entries to prepare. let (n, m) = (4, 4); diff --git a/src/import/stream.rs b/src/import/stream.rs index b13942a..630aee1 100644 --- a/src/import/stream.rs +++ b/src/import/stream.rs @@ -204,7 +204,6 @@ mod tests { use tempdir::TempDir; use tikv::config::DbConfig; - use tikv_util::security::SecurityConfig; use txn_types::{Key, TimeStamp}; fn open_db>(path: P) -> Arc { @@ -370,8 +369,8 @@ mod tests { let dir = TempDir::new("test_import_sst_file_stream").unwrap(); let uuid = Uuid::new_v4(); let db_cfg = DbConfig::default(); - let security_cfg = SecurityConfig::default(); - let engine = Arc::new(Engine::new(dir.path(), uuid, db_cfg, security_cfg).unwrap()); + let security_mgr = Arc::default(); + let engine = Arc::new(Engine::new(dir.path(), uuid, db_cfg, security_mgr).unwrap()); for i in 0..16 { let k = Key::from_raw(&[i]).append_ts(TimeStamp::zero()); @@ -451,10 +450,16 @@ mod tests { let mut stream = SSTFileStream::new(cfg, client, engine, sst_range, finished_ranges); for (start, end, range_end) in expected_ranges { let (range, ssts) = stream.next().unwrap().unwrap(); - let start = Key::from_raw(&[start]).append_ts(TimeStamp::zero()).into_encoded(); - let end = Key::from_raw(&[end]).append_ts(TimeStamp::zero()).into_encoded(); + let start = Key::from_raw(&[start]) + .append_ts(TimeStamp::zero()) + .into_encoded(); + let end = Key::from_raw(&[end]) + .append_ts(TimeStamp::zero()) + .into_encoded(); let range_end = match range_end { - Some(v) => Key::from_raw(&[v]).append_ts(TimeStamp::zero()).into_encoded(), + Some(v) => Key::from_raw(&[v]) + .append_ts(TimeStamp::zero()) + .into_encoded(), None => RANGE_MAX.to_owned(), }; assert_eq!(range.get_start(), start.as_slice()); diff --git a/tests/integrations/import/kv_service.rs b/tests/integrations/import/kv_service.rs index c47cd7a..05ee6a3 100644 --- a/tests/integrations/import/kv_service.rs +++ b/tests/integrations/import/kv_service.rs @@ -8,27 +8,35 @@ use futures::{stream, Future, Stream}; use tempdir::TempDir; use uuid::Uuid; -use grpcio::{ChannelBuilder, Environment, Result, WriteFlags}; -use kvproto::import_kvpb::*; +use grpcio::{ChannelBuilder, Environment, Result, RpcStatusCode, WriteFlags}; use kvproto::import_kvpb::mutation::Op as MutationOp; +use kvproto::import_kvpb::*; -use test_util::retry; +use test_util::{new_security_cfg, retry}; use tikv_importer::import::{ImportKVServer, TiKvConfig}; +use tikv_util::security::SecurityManager; -fn new_kv_server() -> (ImportKVServer, ImportKvClient, TempDir) { +fn new_kv_server(enable_client_tls: bool) -> (ImportKVServer, ImportKvClient, TempDir) { let temp_dir = TempDir::new("test_import_kv_server").unwrap(); let mut cfg = TiKvConfig::default(); cfg.server.addr = "127.0.0.1:0".to_owned(); cfg.import.import_dir = temp_dir.path().to_str().unwrap().to_owned(); + cfg.security = new_security_cfg(); let server = ImportKVServer::new(&cfg); let ch = { let env = Arc::new(Environment::new(1)); let addr = server.bind_addrs().first().unwrap(); - ChannelBuilder::new(env) - .keepalive_timeout(Duration::from_secs(60)) - .connect(&format!("{}:{}", addr.0, addr.1)) + let addr = format!("{}:{}", addr.0, addr.1); + let builder = ChannelBuilder::new(env).keepalive_timeout(Duration::from_secs(60)); + if enable_client_tls { + SecurityManager::new(&cfg.security) + .unwrap() + .connect(builder, &addr) + } else { + builder.connect(&addr) + } }; let client = ImportKvClient::new(ch); @@ -37,9 +45,20 @@ fn new_kv_server() -> (ImportKVServer, ImportKvClient, TempDir) { (server, client, temp_dir) } +#[test] +fn test_kv_service_without_tls() { + let (mut server, client, _) = new_kv_server(false); + server.start(); + + match retry!(client.get_version(&GetVersionRequest::default())) { + Err(grpcio::Error::RpcFailure(status)) if status.status == RpcStatusCode::UNAVAILABLE => {} + other => panic!("unexpected {:?}", other), + } +} + #[test] fn test_kv_service() { - let (mut server, client, _) = new_kv_server(); + let (mut server, client, _) = new_kv_server(true); server.start(); let resp = retry!(client.get_version(&GetVersionRequest::default())).unwrap();