Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: support TLS #40

Merged
merged 2 commits into from
Feb 19, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,9 @@ audit: pre-audit
clean:
cargo clean

check:
cargo check --no-default-features --features "${ENABLE_FEATURES}"

build:
cargo build --no-default-features --features "${ENABLE_FEATURES}"

Expand Down
5 changes: 5 additions & 0 deletions etc/tikv-importer.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,11 @@ compression-per-level = ["lz4", "no", "no", "no", "no", "no", "lz4"]
[rocksdb.writecf]
compression-per-level = ["lz4", "no", "no", "no", "no", "no", "lz4"]

[security]
## The path for TLS certificates. Empty string means disabling secure connections.
# ca-path = ""
# cert-path = ""
# key-path = ""

[import]
# the directory to store importing kv data.
Expand Down
19 changes: 13 additions & 6 deletions src/import/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@ use kvproto::pdpb::OperatorStatus;
use kvproto::tikvpb::TikvClient;

use pd_client::{Config as PdConfig, Error as PdError, PdClient, RegionInfo, RpcClient};
use txn_types::Key;
use tikv_util::collections::{HashMap, HashMapEntry};
use tikv_util::security::SecurityManager;
use txn_types::Key;

use super::common::*;
use super::{Error, Result};
Expand Down Expand Up @@ -61,13 +61,18 @@ pub struct Client {
env: Arc<Environment>,
channels: Mutex<HashMap<u64, Channel>>,
min_available_ratio: f64,
security_mgr: Arc<SecurityManager>,
}

impl Client {
pub fn new(pd_addr: &str, cq_count: usize, min_available_ratio: f64) -> Result<Client> {
pub fn new(
pd_addr: &str,
cq_count: usize,
min_available_ratio: f64,
security_mgr: Arc<SecurityManager>,
) -> Result<Client> {
let cfg = PdConfig::new(vec![pd_addr.to_owned()]);
let sec_mgr = SecurityManager::default();
let rpc_client = RpcClient::new(&cfg, Arc::new(sec_mgr))?;
let rpc_client = RpcClient::new(&cfg, security_mgr.clone())?;
let env = EnvBuilder::new()
.name_prefix("import-client")
.cq_count(cq_count)
Expand All @@ -77,6 +82,7 @@ impl Client {
env: Arc::new(env),
channels: Mutex::new(HashMap::default()),
min_available_ratio,
security_mgr,
})
}

Expand All @@ -93,8 +99,8 @@ impl Client {
HashMapEntry::Occupied(e) => Ok(e.get().clone()),
HashMapEntry::Vacant(e) => {
let store = self.pd.get_store(store_id)?;
let builder = ChannelBuilder::new(Arc::clone(&self.env));
let channel = builder.connect(store.get_address());
let builder = ChannelBuilder::new(self.env.clone());
let channel = self.security_mgr.connect(builder, store.get_address());
Ok(e.insert(channel).clone())
}
}
Expand Down Expand Up @@ -171,6 +177,7 @@ impl Clone for Client {
env: Arc::clone(&self.env),
channels: Mutex::new(HashMap::default()),
min_available_ratio: self.min_available_ratio,
security_mgr: self.security_mgr.clone(),
}
}
}
Expand Down
55 changes: 30 additions & 25 deletions src/import/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ use std::sync::Arc;
use crc::crc32::{self, Hasher32};
use uuid::Uuid;

use kvproto::import_kvpb::*;
use kvproto::import_kvpb::mutation::Op as MutationOp;
use kvproto::import_kvpb::*;
use kvproto::import_sstpb::*;

use engine::rocks::util::{new_engine_opt, CFOptions};
Expand All @@ -28,30 +28,30 @@ use tikv::raftstore::coprocessor::properties::{
IndexHandle, RangeProperties, RangePropertiesCollectorFactory, SizeProperties,
};
use tikv::storage::mvcc::{Write, WriteType};
use txn_types::{is_short_value, Key, TimeStamp};
use tikv_util::config::MB;
use txn_types::{is_short_value, Key, TimeStamp};

use super::common::*;
use super::Result;
use crate::import::stream::SSTFile;
use engine::rocks::util::security::encrypted_env_from_cipher_file;
use tikv_util::security::SecurityConfig;
use tikv_util::security::SecurityManager;

/// Engine wraps rocksdb::DB with customized options to support efficient bulk
/// write.
pub struct Engine {
db: Arc<DB>,
uuid: Uuid,
db_cfg: DbConfig,
security_cfg: SecurityConfig,
security_mgr: Arc<SecurityManager>,
}

impl Engine {
pub fn new<P: AsRef<Path>>(
path: P,
uuid: Uuid,
db_cfg: DbConfig,
security_cfg: SecurityConfig,
security_mgr: Arc<SecurityManager>,
) -> Result<Engine> {
let db = {
let (db_opts, cf_opts) = tune_dboptions_for_bulk_load(&db_cfg);
Expand All @@ -61,7 +61,7 @@ impl Engine {
db: Arc::new(db),
uuid,
db_cfg,
security_cfg,
security_mgr,
})
}

Expand Down Expand Up @@ -113,7 +113,7 @@ impl Engine {
}

pub fn new_sst_writer(&self) -> Result<SSTWriter> {
SSTWriter::new(&self.db_cfg, &self.security_cfg, self.db.path())
SSTWriter::new(&self.db_cfg, &self.security_mgr, self.db.path())
}

pub fn get_size_properties(&self) -> Result<SizeProperties> {
Expand Down Expand Up @@ -259,12 +259,13 @@ pub struct SSTWriter {
}

impl SSTWriter {
pub fn new(db_cfg: &DbConfig, security_cfg: &SecurityConfig, path: &str) -> Result<SSTWriter> {
pub fn new(db_cfg: &DbConfig, security_mgr: &SecurityManager, path: &str) -> Result<SSTWriter> {
let mut env = Arc::new(Env::new_mem());
let mut base_env = None;
if !security_cfg.cipher_file.is_empty() {
let cipher_file = security_mgr.cipher_file();
if !cipher_file.is_empty() {
base_env = Some(Arc::clone(&env));
env = encrypted_env_from_cipher_file(&security_cfg.cipher_file, Some(env))?;
env = encrypted_env_from_cipher_file(&cipher_file, Some(env))?;
}
let uuid = Uuid::new_v4().to_string();
// Placeholder. SstFileWriter don't actually use block cache.
Expand Down Expand Up @@ -425,16 +426,19 @@ mod tests {
RngCore, SeedableRng,
};
use tikv::raftstore::store::RegionSnapshot;
use tikv::storage::mvcc::MvccReader;
use tikv::storage::config::BlockCacheConfig;
use tikv_util::file::file_exists;
use tikv::storage::mvcc::MvccReader;
use tikv_util::{
file::file_exists,
security::{SecurityConfig, SecurityManager},
};

fn new_engine() -> (TempDir, Engine) {
let dir = TempDir::new("test_import_engine").unwrap();
let uuid = Uuid::new_v4();
let db_cfg = DbConfig::default();
let security_cfg = SecurityConfig::default();
let engine = Engine::new(dir.path(), uuid, db_cfg, security_cfg).unwrap();
let security_mgr = Arc::default();
let engine = Engine::new(dir.path(), uuid, db_cfg, security_mgr).unwrap();
(dir, engine)
}

Expand Down Expand Up @@ -499,33 +503,34 @@ mod tests {

#[test]
fn test_sst_writer() {
test_sst_writer_with(1, &[CF_WRITE], &SecurityConfig::default());
test_sst_writer_with(1024, &[CF_DEFAULT, CF_WRITE], &SecurityConfig::default());
test_sst_writer_with(1, &[CF_WRITE], &SecurityManager::default());
test_sst_writer_with(1024, &[CF_DEFAULT, CF_WRITE], &SecurityManager::default());

let temp_dir = TempDir::new("/tmp/encrypted_env_from_cipher_file").unwrap();
let security_cfg = create_security_cfg(&temp_dir);
test_sst_writer_with(1, &[CF_WRITE], &security_cfg);
test_sst_writer_with(1024, &[CF_DEFAULT, CF_WRITE], &security_cfg);
let security_mgr = create_security_mgr(&temp_dir);
test_sst_writer_with(1, &[CF_WRITE], &security_mgr);
test_sst_writer_with(1024, &[CF_DEFAULT, CF_WRITE], &security_mgr);
}

fn create_security_cfg(temp_dir: &TempDir) -> SecurityConfig {
fn create_security_mgr(temp_dir: &TempDir) -> SecurityManager {
let path = temp_dir.path().join("cipher_file");
let mut cipher_file = File::create(&path).unwrap();
cipher_file.write_all(b"ACFFDBCC").unwrap();
cipher_file.sync_all().unwrap();
let mut security_cfg = SecurityConfig::default();
security_cfg.cipher_file = path.to_str().unwrap().to_owned();
assert_eq!(file_exists(&security_cfg.cipher_file), true);
security_cfg
SecurityManager::new(&security_cfg).unwrap()
}

fn test_sst_writer_with(value_size: usize, cf_names: &[&str], security_cfg: &SecurityConfig) {
fn test_sst_writer_with(value_size: usize, cf_names: &[&str], security_mgr: &SecurityManager) {
let temp_dir = TempDir::new("_test_sst_writer").unwrap();

let cfg = DbConfig::default();
let mut db_opts = cfg.build_opt();
if !security_cfg.cipher_file.is_empty() {
let env = encrypted_env_from_cipher_file(&security_cfg.cipher_file, None).unwrap();
let cipher_file = security_mgr.cipher_file();
if !cipher_file.is_empty() {
let env = encrypted_env_from_cipher_file(&cipher_file, None).unwrap();
db_opts.set_env(env);
}
let cache = BlockCacheConfig::default().build_shared_cache();
Expand All @@ -535,7 +540,7 @@ mod tests {

let n = 10;
let commit_ts = 10;
let mut w = SSTWriter::new(&cfg, &security_cfg, temp_dir.path().to_str().unwrap()).unwrap();
let mut w = SSTWriter::new(&cfg, &security_mgr, temp_dir.path().to_str().unwrap()).unwrap();

// Write some keys.
let value = vec![1u8; value_size];
Expand Down
3 changes: 3 additions & 0 deletions src/import/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,9 @@ quick_error! {
ResourceTemporarilyUnavailable(msg: String) {
display("{}", msg)
}
Security(msg: String) {
display("{}", msg)
}
}
}

Expand Down
38 changes: 22 additions & 16 deletions src/import/kv_importer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use super::client::*;
use super::engine::*;
use super::import::*;
use super::{Config, Error, Result};
use tikv_util::security::SecurityConfig;
use tikv_util::security::SecurityManager;

pub struct Inner {
engines: HashMap<Uuid, Arc<EngineFile>>,
Expand All @@ -27,18 +27,24 @@ pub struct KVImporter {
cfg: Config,
dir: EngineDir,
inner: Mutex<Inner>,
pub(super) security_mgr: Arc<SecurityManager>,
}

impl KVImporter {
pub fn new(cfg: Config, db_cfg: DbConfig, security_cfg: SecurityConfig) -> Result<KVImporter> {
let dir = EngineDir::new(&cfg.import_dir, db_cfg, security_cfg)?;
pub fn new(
cfg: Config,
db_cfg: DbConfig,
security_mgr: Arc<SecurityManager>,
) -> Result<KVImporter> {
let dir = EngineDir::new(&cfg.import_dir, db_cfg, security_mgr.clone())?;
Ok(KVImporter {
cfg,
dir,
inner: Mutex::new(Inner {
engines: HashMap::default(),
import_jobs: HashMap::default(),
}),
security_mgr,
})
}

Expand Down Expand Up @@ -117,6 +123,7 @@ impl KVImporter {
pd_addr,
self.cfg.num_import_jobs,
self.cfg.min_available_ratio,
self.security_mgr.clone(),
)?;
let job = {
let mut inner = self.inner.lock().unwrap();
Expand Down Expand Up @@ -186,7 +193,7 @@ impl KVImporter {
/// is completed, the files are stored in `$root/$uuid`.
pub struct EngineDir {
db_cfg: DbConfig,
security_cfg: SecurityConfig,
security_mgr: Arc<SecurityManager>,
root_dir: PathBuf,
temp_dir: PathBuf,
}
Expand All @@ -197,7 +204,7 @@ impl EngineDir {
fn new<P: AsRef<Path>>(
root: P,
db_cfg: DbConfig,
security_cfg: SecurityConfig,
security_mgr: Arc<SecurityManager>,
) -> Result<EngineDir> {
let root_dir = root.as_ref().to_owned();
let temp_dir = root_dir.join(Self::TEMP_DIR);
Expand All @@ -207,7 +214,7 @@ impl EngineDir {
fs::create_dir_all(&temp_dir)?;
Ok(EngineDir {
db_cfg,
security_cfg,
security_mgr,
root_dir,
temp_dir,
})
Expand All @@ -229,7 +236,7 @@ impl EngineDir {
if path.save.exists() {
return Err(Error::FileExists(path.save));
}
EngineFile::new(uuid, path, self.db_cfg.clone(), self.security_cfg.clone())
EngineFile::new(uuid, path, self.db_cfg.clone(), self.security_mgr.clone())
}

/// Creates an engine from `$root/$uuid` for importing data.
Expand All @@ -239,7 +246,7 @@ impl EngineDir {
&path.save,
uuid,
self.db_cfg.clone(),
self.security_cfg.clone(),
self.security_mgr.clone(),
)
}

Expand Down Expand Up @@ -287,9 +294,9 @@ impl EngineFile {
uuid: Uuid,
path: EnginePath,
db_cfg: DbConfig,
security_cfg: SecurityConfig,
security_mgr: Arc<SecurityManager>,
) -> Result<EngineFile> {
let engine = Engine::new(&path.temp, uuid, db_cfg, security_cfg)?;
let engine = Engine::new(&path.temp, uuid, db_cfg, security_mgr)?;
Ok(EngineFile {
uuid,
path,
Expand Down Expand Up @@ -356,8 +363,7 @@ mod tests {

let mut cfg = Config::default();
cfg.import_dir = temp_dir.path().to_str().unwrap().to_owned();
let importer =
KVImporter::new(cfg, DbConfig::default(), SecurityConfig::default()).unwrap();
let importer = KVImporter::new(cfg, DbConfig::default(), Arc::default()).unwrap();

let uuid = Uuid::new_v4();
// Can not bind to an unopened engine.
Expand All @@ -380,7 +386,7 @@ mod tests {

let uuid = Uuid::new_v4();
let db_cfg = DbConfig::default();
let security_cfg = SecurityConfig::default();
let security_mgr = Arc::<SecurityManager>::default();
let path = EnginePath {
save: temp_dir.path().join("save"),
temp: temp_dir.path().join("temp"),
Expand All @@ -389,10 +395,10 @@ mod tests {
// Test close.
{
let mut f =
EngineFile::new(uuid, path.clone(), db_cfg.clone(), security_cfg.clone()).unwrap();
EngineFile::new(uuid, path.clone(), db_cfg.clone(), security_mgr.clone()).unwrap();
// Cannot create the same file again.
assert!(
EngineFile::new(uuid, path.clone(), db_cfg.clone(), security_cfg.clone()).is_err()
EngineFile::new(uuid, path.clone(), db_cfg.clone(), security_mgr.clone()).is_err()
);
assert!(path.temp.exists());
assert!(!path.save.exists());
Expand All @@ -405,7 +411,7 @@ mod tests {
// Test cleanup.
{
let f =
EngineFile::new(uuid, path.clone(), db_cfg.clone(), security_cfg.clone()).unwrap();
EngineFile::new(uuid, path.clone(), db_cfg.clone(), security_mgr.clone()).unwrap();
assert!(path.temp.exists());
assert!(!path.save.exists());
drop(f);
Expand Down
Loading