Skip to content

Commit

Permalink
cherry-pick to release-3.1 (#53)
Browse files Browse the repository at this point in the history
* Cherry-pick TLS support to release-3.1 (#44)

* *: support TLS (#40)

* *: support TLS

Signed-off-by: kennytm <kennytm@gmail.com>

* tests/integrations: make the entire test run on TLS connection

also added a test to ensure non-TLS-client <-> TLS-server fails.

Signed-off-by: kennytm <kennytm@gmail.com>

* tests: fix build failure

Signed-off-by: kennytm <kennytm@gmail.com>

Co-authored-by: kennytm <kennytm@gmail.com>
  • Loading branch information
3pointer and kennytm authored Apr 2, 2020
1 parent ed4d6f9 commit ced8e5b
Show file tree
Hide file tree
Showing 11 changed files with 105 additions and 61 deletions.
3 changes: 3 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,9 @@ audit: pre-audit
clean:
cargo clean

check:
cargo check --no-default-features --features "${ENABLE_FEATURES}"

build:
cargo build --no-default-features --features "${ENABLE_FEATURES}"

Expand Down
5 changes: 5 additions & 0 deletions etc/tikv-importer.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,11 @@ compression-per-level = ["lz4", "no", "no", "no", "no", "no", "lz4"]
[rocksdb.writecf]
compression-per-level = ["lz4", "no", "no", "no", "no", "no", "lz4"]

[security]
## The path for TLS certificates. Empty string means disabling secure connections.
# ca-path = ""
# cert-path = ""
# key-path = ""

[import]
# the directory to store importing kv data.
Expand Down
17 changes: 12 additions & 5 deletions src/import/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,13 +62,18 @@ pub struct Client {
env: Arc<Environment>,
channels: Mutex<HashMap<u64, Channel>>,
min_available_ratio: f64,
security_mgr: Arc<SecurityManager>,
}

impl Client {
pub fn new(pd_addr: &str, cq_count: usize, min_available_ratio: f64) -> Result<Client> {
pub fn new(
pd_addr: &str,
cq_count: usize,
min_available_ratio: f64,
security_mgr: Arc<SecurityManager>,
) -> Result<Client> {
let cfg = PdConfig::new(vec![pd_addr.to_owned()]);
let sec_mgr = SecurityManager::default();
let rpc_client = RpcClient::new(&cfg, Arc::new(sec_mgr))?;
let rpc_client = RpcClient::new(&cfg, security_mgr.clone())?;
let env = EnvBuilder::new()
.name_prefix("import-client")
.cq_count(cq_count)
Expand All @@ -78,6 +83,7 @@ impl Client {
env: Arc::new(env),
channels: Mutex::new(HashMap::default()),
min_available_ratio,
security_mgr,
})
}

Expand All @@ -94,8 +100,8 @@ impl Client {
HashMapEntry::Occupied(e) => Ok(e.get().clone()),
HashMapEntry::Vacant(e) => {
let store = self.pd.get_store(store_id)?;
let builder = ChannelBuilder::new(Arc::clone(&self.env));
let channel = builder.connect(store.get_address());
let builder = ChannelBuilder::new(self.env.clone());
let channel = self.security_mgr.connect(builder, store.get_address());
Ok(e.insert(channel).clone())
}
}
Expand Down Expand Up @@ -172,6 +178,7 @@ impl Clone for Client {
env: Arc::clone(&self.env),
channels: Mutex::new(HashMap::default()),
min_available_ratio: self.min_available_ratio,
security_mgr: self.security_mgr.clone(),
}
}
}
Expand Down
49 changes: 27 additions & 22 deletions src/import/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,23 +34,23 @@ use super::common::*;
use super::Result;
use crate::import::stream::SSTFile;
use engine::rocks::util::security::encrypted_env_from_cipher_file;
use tikv_util::security::SecurityConfig;
use tikv_util::security::SecurityManager;

/// Engine wraps rocksdb::DB with customized options to support efficient bulk
/// write.
pub struct Engine {
db: Arc<DB>,
uuid: Uuid,
db_cfg: DbConfig,
security_cfg: SecurityConfig,
security_mgr: Arc<SecurityManager>,
}

impl Engine {
pub fn new<P: AsRef<Path>>(
path: P,
uuid: Uuid,
db_cfg: DbConfig,
security_cfg: SecurityConfig,
security_mgr: Arc<SecurityManager>,
) -> Result<Engine> {
let db = {
let (db_opts, cf_opts) = tune_dboptions_for_bulk_load(&db_cfg);
Expand All @@ -60,7 +60,7 @@ impl Engine {
db: Arc::new(db),
uuid,
db_cfg,
security_cfg,
security_mgr,
})
}

Expand Down Expand Up @@ -96,7 +96,7 @@ impl Engine {
}

pub fn new_sst_writer(&self) -> Result<SSTWriter> {
SSTWriter::new(&self.db_cfg, &self.security_cfg, self.db.path())
SSTWriter::new(&self.db_cfg, &self.security_mgr, self.db.path())
}

pub fn get_size_properties(&self) -> Result<SizeProperties> {
Expand Down Expand Up @@ -221,12 +221,13 @@ pub struct SSTWriter {
}

impl SSTWriter {
pub fn new(db_cfg: &DbConfig, security_cfg: &SecurityConfig, path: &str) -> Result<SSTWriter> {
pub fn new(db_cfg: &DbConfig, security_mgr: &SecurityManager, path: &str) -> Result<SSTWriter> {
let mut env = Arc::new(Env::new_mem());
let mut base_env = None;
if !security_cfg.cipher_file.is_empty() {
let cipher_file = security_mgr.cipher_file();
if !cipher_file.is_empty() {
base_env = Some(Arc::clone(&env));
env = encrypted_env_from_cipher_file(&security_cfg.cipher_file, Some(env))?;
env = encrypted_env_from_cipher_file(&cipher_file, Some(env))?;
}
let uuid = Uuid::new_v4().to_string();
// Placeholder. SstFileWriter don't actually use block cache.
Expand Down Expand Up @@ -384,14 +385,17 @@ mod tests {
use tikv::raftstore::store::RegionSnapshot;
use tikv::storage::mvcc::MvccReader;
use tikv::storage::BlockCacheConfig;
use tikv_util::file::file_exists;
use tikv_util::{
file::file_exists,
security::{SecurityConfig, SecurityManager},
};

fn new_engine() -> (TempDir, Engine) {
let dir = TempDir::new("test_import_engine").unwrap();
let uuid = Uuid::new_v4();
let db_cfg = DbConfig::default();
let security_cfg = SecurityConfig::default();
let engine = Engine::new(dir.path(), uuid, db_cfg, security_cfg).unwrap();
let security_mgr = Arc::default();
let engine = Engine::new(dir.path(), uuid, db_cfg, security_mgr).unwrap();
(dir, engine)
}

Expand Down Expand Up @@ -429,33 +433,34 @@ mod tests {

#[test]
fn test_sst_writer() {
test_sst_writer_with(1, &[CF_WRITE], &SecurityConfig::default());
test_sst_writer_with(1024, &[CF_DEFAULT, CF_WRITE], &SecurityConfig::default());
test_sst_writer_with(1, &[CF_WRITE], &SecurityManager::default());
test_sst_writer_with(1024, &[CF_DEFAULT, CF_WRITE], &SecurityManager::default());

let temp_dir = TempDir::new("/tmp/encrypted_env_from_cipher_file").unwrap();
let security_cfg = create_security_cfg(&temp_dir);
test_sst_writer_with(1, &[CF_WRITE], &security_cfg);
test_sst_writer_with(1024, &[CF_DEFAULT, CF_WRITE], &security_cfg);
let security_mgr = create_security_mgr(&temp_dir);
test_sst_writer_with(1, &[CF_WRITE], &security_mgr);
test_sst_writer_with(1024, &[CF_DEFAULT, CF_WRITE], &security_mgr);
}

fn create_security_cfg(temp_dir: &TempDir) -> SecurityConfig {
fn create_security_mgr(temp_dir: &TempDir) -> SecurityManager {
let path = temp_dir.path().join("cipher_file");
let mut cipher_file = File::create(&path).unwrap();
cipher_file.write_all(b"ACFFDBCC").unwrap();
cipher_file.sync_all().unwrap();
let mut security_cfg = SecurityConfig::default();
security_cfg.cipher_file = path.to_str().unwrap().to_owned();
assert_eq!(file_exists(&security_cfg.cipher_file), true);
security_cfg
SecurityManager::new(&security_cfg).unwrap()
}

fn test_sst_writer_with(value_size: usize, cf_names: &[&str], security_cfg: &SecurityConfig) {
fn test_sst_writer_with(value_size: usize, cf_names: &[&str], security_mgr: &SecurityManager) {
let temp_dir = TempDir::new("_test_sst_writer").unwrap();

let cfg = DbConfig::default();
let mut db_opts = cfg.build_opt();
if !security_cfg.cipher_file.is_empty() {
let env = encrypted_env_from_cipher_file(&security_cfg.cipher_file, None).unwrap();
let cipher_file = security_mgr.cipher_file();
if !cipher_file.is_empty() {
let env = encrypted_env_from_cipher_file(&cipher_file, None).unwrap();
db_opts.set_env(env);
}
let cache = BlockCacheConfig::default().build_shared_cache();
Expand All @@ -465,7 +470,7 @@ mod tests {

let n = 10;
let commit_ts = 10;
let mut w = SSTWriter::new(&cfg, &security_cfg, temp_dir.path().to_str().unwrap()).unwrap();
let mut w = SSTWriter::new(&cfg, &security_mgr, temp_dir.path().to_str().unwrap()).unwrap();

// Write some keys.
let value = vec![1u8; value_size];
Expand Down
3 changes: 3 additions & 0 deletions src/import/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,9 @@ quick_error! {
ResourceTemporarilyUnavailable(msg: String) {
display("{}", msg)
}
Security(msg: String) {
display("{}", msg)
}
}
}

Expand Down
38 changes: 22 additions & 16 deletions src/import/kv_importer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use super::client::*;
use super::engine::*;
use super::import::*;
use super::{Config, Error, Result};
use tikv_util::security::SecurityConfig;
use tikv_util::security::SecurityManager;

pub struct Inner {
engines: HashMap<Uuid, Arc<EngineFile>>,
Expand All @@ -27,18 +27,24 @@ pub struct KVImporter {
cfg: Config,
dir: EngineDir,
inner: Mutex<Inner>,
pub(super) security_mgr: Arc<SecurityManager>,
}

impl KVImporter {
pub fn new(cfg: Config, db_cfg: DbConfig, security_cfg: SecurityConfig) -> Result<KVImporter> {
let dir = EngineDir::new(&cfg.import_dir, db_cfg, security_cfg)?;
pub fn new(
cfg: Config,
db_cfg: DbConfig,
security_mgr: Arc<SecurityManager>,
) -> Result<KVImporter> {
let dir = EngineDir::new(&cfg.import_dir, db_cfg, security_mgr.clone())?;
Ok(KVImporter {
cfg,
dir,
inner: Mutex::new(Inner {
engines: HashMap::default(),
import_jobs: HashMap::default(),
}),
security_mgr,
})
}

Expand Down Expand Up @@ -117,6 +123,7 @@ impl KVImporter {
pd_addr,
self.cfg.num_import_jobs,
self.cfg.min_available_ratio,
self.security_mgr.clone(),
)?;
let job = {
let mut inner = self.inner.lock().unwrap();
Expand Down Expand Up @@ -186,7 +193,7 @@ impl KVImporter {
/// is completed, the files are stored in `$root/$uuid`.
pub struct EngineDir {
db_cfg: DbConfig,
security_cfg: SecurityConfig,
security_mgr: Arc<SecurityManager>,
root_dir: PathBuf,
temp_dir: PathBuf,
}
Expand All @@ -197,7 +204,7 @@ impl EngineDir {
fn new<P: AsRef<Path>>(
root: P,
db_cfg: DbConfig,
security_cfg: SecurityConfig,
security_mgr: Arc<SecurityManager>,
) -> Result<EngineDir> {
let root_dir = root.as_ref().to_owned();
let temp_dir = root_dir.join(Self::TEMP_DIR);
Expand All @@ -207,7 +214,7 @@ impl EngineDir {
fs::create_dir_all(&temp_dir)?;
Ok(EngineDir {
db_cfg,
security_cfg,
security_mgr,
root_dir,
temp_dir,
})
Expand All @@ -229,7 +236,7 @@ impl EngineDir {
if path.save.exists() {
return Err(Error::FileExists(path.save));
}
EngineFile::new(uuid, path, self.db_cfg.clone(), self.security_cfg.clone())
EngineFile::new(uuid, path, self.db_cfg.clone(), self.security_mgr.clone())
}

/// Creates an engine from `$root/$uuid` for importing data.
Expand All @@ -239,7 +246,7 @@ impl EngineDir {
&path.save,
uuid,
self.db_cfg.clone(),
self.security_cfg.clone(),
self.security_mgr.clone(),
)
}

Expand Down Expand Up @@ -287,9 +294,9 @@ impl EngineFile {
uuid: Uuid,
path: EnginePath,
db_cfg: DbConfig,
security_cfg: SecurityConfig,
security_mgr: Arc<SecurityManager>,
) -> Result<EngineFile> {
let engine = Engine::new(&path.temp, uuid, db_cfg, security_cfg)?;
let engine = Engine::new(&path.temp, uuid, db_cfg, security_mgr)?;
Ok(EngineFile {
uuid,
path,
Expand Down Expand Up @@ -351,8 +358,7 @@ mod tests {

let mut cfg = Config::default();
cfg.import_dir = temp_dir.path().to_str().unwrap().to_owned();
let importer =
KVImporter::new(cfg, DbConfig::default(), SecurityConfig::default()).unwrap();
let importer = KVImporter::new(cfg, DbConfig::default(), Arc::default()).unwrap();

let uuid = Uuid::new_v4();
// Can not bind to an unopened engine.
Expand All @@ -372,7 +378,7 @@ mod tests {

let uuid = Uuid::new_v4();
let db_cfg = DbConfig::default();
let security_cfg = SecurityConfig::default();
let security_mgr = Arc::<SecurityManager>::default();
let path = EnginePath {
save: temp_dir.path().join("save"),
temp: temp_dir.path().join("temp"),
Expand All @@ -381,10 +387,10 @@ mod tests {
// Test close.
{
let mut f =
EngineFile::new(uuid, path.clone(), db_cfg.clone(), security_cfg.clone()).unwrap();
EngineFile::new(uuid, path.clone(), db_cfg.clone(), security_mgr.clone()).unwrap();
// Cannot create the same file again.
assert!(
EngineFile::new(uuid, path.clone(), db_cfg.clone(), security_cfg.clone()).is_err()
EngineFile::new(uuid, path.clone(), db_cfg.clone(), security_mgr.clone()).is_err()
);
assert!(path.temp.exists());
assert!(!path.save.exists());
Expand All @@ -397,7 +403,7 @@ mod tests {
// Test cleanup.
{
let f =
EngineFile::new(uuid, path.clone(), db_cfg.clone(), security_cfg.clone()).unwrap();
EngineFile::new(uuid, path.clone(), db_cfg.clone(), security_mgr.clone()).unwrap();
assert!(path.temp.exists());
assert!(!path.save.exists());
drop(f);
Expand Down
14 changes: 10 additions & 4 deletions src/import/kv_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use std::sync::Arc;
use grpcio::{ChannelBuilder, EnvBuilder, Server as GrpcServer, ServerBuilder};
use kvproto::import_kvpb_grpc::create_import_kv;

use tikv_util::thd_name;
use tikv_util::{security::SecurityManager, thd_name};

use super::{ImportKVService, KVImporter, TiKvConfig};

Expand All @@ -22,10 +22,12 @@ impl ImportKVServer {
let cfg = &tikv.server;
let addr = SocketAddr::from_str(&cfg.addr).unwrap();

let security_mgr = Arc::new(SecurityManager::new(&tikv.security).unwrap());

let importer = KVImporter::new(
tikv.import.clone(),
tikv.rocksdb.clone(),
tikv.security.clone(),
security_mgr.clone(),
)
.unwrap();
let import_service = ImportKVService::new(tikv.import.clone(), Arc::new(importer));
Expand All @@ -44,8 +46,12 @@ impl ImportKVServer {
.max_receive_message_len(-1)
.build_args();

let grpc_server = ServerBuilder::new(Arc::clone(&env))
.bind(format!("{}", addr.ip()), addr.port())
let grpc_server = security_mgr
.bind(
ServerBuilder::new(env.clone()),
&addr.ip().to_string(),
addr.port(),
)
.channel_args(channel_args)
.register_service(create_import_kv(import_service))
.build()
Expand Down
Loading

0 comments on commit ced8e5b

Please sign in to comment.