Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cmd: add file lock to work around port conflict (#7355) #7447

Merged
merged 11 commits into from
Apr 23, 2020
79 changes: 74 additions & 5 deletions cmd/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,9 @@ use raftstore::{
};
use std::{
convert::TryFrom,
fmt,
fs::File,
env, fmt,
fs::{self, File},
net::SocketAddr,
path::{Path, PathBuf},
sync::{Arc, Mutex},
thread::JoinHandle,
Expand Down Expand Up @@ -92,6 +93,7 @@ pub fn run_tikv(config: TiKvConfig) {

let _m = Monitor::default();

tikv.check_conflict_addr();
tikv.init_fs();
tikv.init_yatp();
tikv.init_encryption();
Expand Down Expand Up @@ -126,7 +128,7 @@ struct TiKVServer {
region_info_accessor: RegionInfoAccessor,
coprocessor_host: Option<CoprocessorHost>,
to_stop: Vec<Box<dyn Stop>>,
lock_file: Option<File>,
lock_files: Vec<File>,
}

struct Engines {
Expand Down Expand Up @@ -185,7 +187,7 @@ impl TiKVServer {
region_info_accessor,
coprocessor_host,
to_stop: vec![Box::new(resolve_worker)],
lock_file: None,
lock_files: vec![],
}
}

Expand Down Expand Up @@ -249,6 +251,45 @@ impl TiKVServer {
pd_client
}

fn check_conflict_addr(&mut self) {
let cur_addr: SocketAddr = self
.config
.server
.addr
.parse()
.expect("failed to parse into a socket address");
let cur_ip = cur_addr.ip();
let cur_port = cur_addr.port();
let lock_dir = get_lock_dir();

let search_base = env::temp_dir().join(&lock_dir);
std::fs::create_dir_all(&search_base)
.unwrap_or_else(|_| panic!("create {} failed", search_base.display()));

for result in fs::read_dir(&search_base).unwrap() {
if let Ok(entry) = result {
if !entry.file_type().unwrap().is_file() {
continue;
}
let file_path = entry.path();
let file_name = file_path.file_name().unwrap().to_str().unwrap();
if let Ok(addr) = file_name.replace('_', ":").parse::<SocketAddr>() {
let ip = addr.ip();
let port = addr.port();
if cur_port == port
&& (cur_ip == ip || cur_ip.is_unspecified() || ip.is_unspecified())
{
let _ = try_lock_conflict_addr(file_path);
}
}
}
}

let cur_path = search_base.join(cur_addr.to_string().replace(':', "_"));
let cur_file = try_lock_conflict_addr(cur_path);
self.lock_files.push(cur_file);
}

fn init_fs(&mut self) {
let lock_path = self.store_path.join(Path::new("LOCK"));

Expand All @@ -260,7 +301,7 @@ impl TiKVServer {
self.store_path.display()
);
}
self.lock_file = Some(f);
self.lock_files.push(f);

if tikv_util::panic_mark_file_exists(&self.config.storage.data_dir) {
fatal!(
Expand Down Expand Up @@ -813,6 +854,34 @@ fn check_system_config(config: &TiKvConfig) {
}
}

fn try_lock_conflict_addr<P: AsRef<Path>>(path: P) -> File {
let f = File::create(path.as_ref()).unwrap_or_else(|e| {
fatal!(
"failed to create lock at {}: {}",
path.as_ref().display(),
e
)
});

if f.try_lock_exclusive().is_err() {
fatal!(
"{} already in use, maybe another instance is binding with this address.",
path.as_ref().file_name().unwrap().to_str().unwrap()
);
}
f
}

#[cfg(unix)]
fn get_lock_dir() -> String {
format!("{}_TIKV_LOCK_FILES", unsafe { libc::getuid() })
}

#[cfg(not(unix))]
fn get_lock_dir() -> String {
"TIKV_LOCK_FILES".to_owned()
}

/// A small trait for components which can be trivially stopped. Lets us keep
/// a list of these in `TiKV`, rather than storing each component individually.
trait Stop {
Expand Down