Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Avoid set addr/engine-addr/version/git-hash and HackedLockManager in src/node.rs #139

Merged
merged 10 commits into from
Aug 15, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/pr-ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ jobs:
export ENGINE_LABEL_VALUE=tiflash
export RUST_BACKTRACE=full
cargo check
cargo test --features compat_new_proxy --package tests --test proxy normal
cargo test --package tests --test proxy proxy
cargo test --package tests --test failpoints cases::test_normal
cargo test --package tests --test failpoints cases::test_bootstrap
cargo test --package tests --test failpoints cases::test_compact_log
Expand All @@ -72,5 +74,3 @@ jobs:
cargo test --package tests --test failpoints cases::test_merge
cargo test --package tests --test failpoints cases::test_import_service
cargo test --package tests --test failpoints cases::test_proxy_replica_read
cargo test --features compat_new_proxy --package tests --test proxy normal
cargo test --package tests --test proxy proxy
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 12 additions & 0 deletions components/proxy_server/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,24 @@ use crate::fatal;
#[serde(rename_all = "kebab-case")]
pub struct ProxyConfig {
pub snap_handle_pool_size: usize,
pub engine_addr: String,
pub engine_store_version: String,
pub engine_store_git_hash: String,
}

pub const DEFAULT_ENGINE_ADDR: &str = if cfg!(feature = "failpoints") {
"127.0.0.1:20206"
} else {
""
};

impl Default for ProxyConfig {
fn default() -> Self {
ProxyConfig {
snap_handle_pool_size: 2,
engine_addr: DEFAULT_ENGINE_ADDR.to_string(),
engine_store_version: String::default(),
engine_store_git_hash: String::default(),
}
}
}
Expand Down
53 changes: 53 additions & 0 deletions components/proxy_server/src/hacked_lock_mgr.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
use tikv::{
server::{lock_manager::waiter_manager::Callback, Error, Result},
storage::{
lock_manager::{DiagnosticContext, Lock, LockManager as LockManagerTrait, WaitTimeout},
ProcessResult, StorageCallback,
},
};
use txn_types::TimeStamp;

#[derive(Copy, Clone)]
pub struct HackedLockManager {}

#[allow(dead_code)]
#[allow(unused_variables)]
impl LockManagerTrait for HackedLockManager {
fn wait_for(
&self,
start_ts: TimeStamp,
cb: StorageCallback,
pr: ProcessResult,
lock: Lock,
is_first_lock: bool,
timeout: Option<WaitTimeout>,
diag_ctx: DiagnosticContext,
) {
unimplemented!()
}

fn wake_up(
&self,
lock_ts: TimeStamp,
hashes: Vec<u64>,
commit_ts: TimeStamp,
is_pessimistic_txn: bool,
) {
unimplemented!()
}

fn has_waiter(&self) -> bool {
todo!()
}

fn dump_wait_for_entries(&self, cb: Callback) {
todo!()
}
}

impl HackedLockManager {
pub fn new() -> Self {
Self {}
}
pub fn stop(&mut self) {}
}
1 change: 1 addition & 0 deletions components/proxy_server/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ extern crate tikv_util;

#[macro_use]
pub mod config;
pub mod hacked_lock_mgr;
pub mod proxy;
pub mod run;
pub mod setup;
Expand Down
75 changes: 49 additions & 26 deletions components/proxy_server/src/proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use std::{
process,
};

use clap::{App, Arg};
use clap::{App, Arg, ArgMatches};
use tikv::config::TiKvConfig;

use crate::{
Expand All @@ -18,6 +18,49 @@ use crate::{
},
};

// Not the same as TiKV
pub const TIFLASH_DEFAULT_LISTENING_ADDR: &str = "127.0.0.1:20170";
pub const TIFLASH_DEFAULT_STATUS_ADDR: &str = "127.0.0.1:20292";

fn make_tikv_config() -> TiKvConfig {
let mut default = TiKvConfig::default();
setup_default_tikv_config(&mut default);
default
}

pub fn setup_default_tikv_config(default: &mut TiKvConfig) {
default.server.addr = TIFLASH_DEFAULT_LISTENING_ADDR.to_string();
default.server.status_addr = TIFLASH_DEFAULT_STATUS_ADDR.to_string();
default.server.advertise_status_addr = TIFLASH_DEFAULT_STATUS_ADDR.to_string();
}

pub fn gen_tikv_config(
matches: &ArgMatches,
is_config_check: bool,
unrecognized_keys: &mut Vec<String>,
) -> TiKvConfig {
matches
.value_of_os("config")
.map_or_else(make_tikv_config, |path| {
let path = Path::new(path);
TiKvConfig::from_file(
path,
if is_config_check {
Some(unrecognized_keys)
} else {
None
},
)
.unwrap_or_else(|e| {
panic!(
"invalid auto generated configuration file {}, err {}",
path.display(),
e
);
})
})
}

pub unsafe fn run_proxy(
argc: c_int,
argv: *const *const c_char,
Expand Down Expand Up @@ -223,34 +266,11 @@ pub unsafe fn run_proxy(
let mut unrecognized_keys = Vec::new();
let is_config_check = matches.is_present("config-check");

let mut config = matches
.value_of_os("config")
.map_or_else(TiKvConfig::default, |path| {
let path = Path::new(path);
TiKvConfig::from_file(
path,
if is_config_check {
Some(&mut unrecognized_keys)
} else {
None
},
)
.unwrap_or_else(|e| {
panic!(
"invalid auto generated configuration file {}, err {}",
path.display(),
e
);
})
});

check_engine_label(&matches);
overwrite_config_with_cmd_args(&mut config, &matches);
config.logger_compatible_adjust();
let mut config = gen_tikv_config(&matches, is_config_check, &mut unrecognized_keys);

let mut proxy_unrecognized_keys = Vec::new();
// Double read the same file for proxy-specific arguments.
let proxy_config =
let mut proxy_config =
matches
.value_of_os("config")
.map_or_else(crate::config::ProxyConfig::default, |path| {
Expand All @@ -271,6 +291,9 @@ pub unsafe fn run_proxy(
);
})
});
check_engine_label(&matches);
overwrite_config_with_cmd_args(&mut config, &mut proxy_config, &matches);
config.logger_compatible_adjust();

// TODO(tiflash) We should later use ProxyConfig for proxy's own settings like `snap_handle_pool_size`
if is_config_check {
Expand Down
30 changes: 27 additions & 3 deletions components/proxy_server/src/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,6 @@ use tikv::{
config::{Config as ServerConfig, ServerConfigManager},
create_raft_storage,
gc_worker::{AutoGcConfig, GcWorker},
lock_manager::HackedLockManager as LockManager,
raftkv::ReplicaReadLockChecker,
resolve,
service::{DebugService, DiagnosticsService},
Expand All @@ -105,7 +104,10 @@ use tikv_util::{
};
use tokio::runtime::Builder;

use crate::{config::ProxyConfig, fatal, setup::*, util::ffi_server_info};
use crate::{
config::ProxyConfig, fatal, hacked_lock_mgr::HackedLockManager as LockManager, setup::*,
util::ffi_server_info,
};

#[inline]
pub fn run_impl<CER: ConfiguredRaftEngine, F: KvFormat>(
Expand Down Expand Up @@ -948,7 +950,7 @@ impl<ER: RaftEngine> TiKvServer<ER> {
in_memory_pessimistic_lock: Arc::new(AtomicBool::new(true)),
};

let storage = create_raft_storage::<_, _, _, F>(
let storage = create_raft_storage::<_, _, _, F, _>(
engines.engine.clone(),
&self.config.storage,
storage_read_pool_handle,
Expand Down Expand Up @@ -1077,6 +1079,27 @@ impl<ER: RaftEngine> TiKvServer<ER> {
.unwrap_or_else(|e| fatal!("failed to validate raftstore config {}", e));
let raft_store = Arc::new(VersionTrack::new(self.config.raft_store.clone()));
let health_service = HealthService::default();
let mut default_store = kvproto::metapb::Store::default();

if !self.proxy_config.engine_store_version.is_empty() {
breezewish marked this conversation as resolved.
Show resolved Hide resolved
default_store.set_version(self.proxy_config.engine_store_version.clone());
}
if !self.proxy_config.engine_store_git_hash.is_empty() {
default_store.set_git_hash(self.proxy_config.engine_store_git_hash.clone());
}
// addr -> store.peer_address
if self.config.server.advertise_addr.is_empty() {
default_store.set_peer_address(self.config.server.addr.clone());
} else {
default_store.set_peer_address(self.config.server.advertise_addr.clone())
}
// engine_addr -> store.addr
if !self.proxy_config.engine_addr.is_empty() {
default_store.set_address(self.proxy_config.engine_addr.clone());
} else {
panic!("engine address is empty");
}

let mut node = Node::new(
self.system.take().unwrap(),
&server_config.value().clone(),
Expand All @@ -1086,6 +1109,7 @@ impl<ER: RaftEngine> TiKvServer<ER> {
self.state.clone(),
self.background_worker.clone(),
Some(health_service.clone()),
Some(default_store),
);
node.try_bootstrap_store(engines.engines.clone())
.unwrap_or_else(|e| fatal!("failed to bootstrap node id: {}", e));
Expand Down
17 changes: 11 additions & 6 deletions components/proxy_server/src/setup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,15 @@ pub use server::setup::{
use tikv::config::{check_critical_config, persist_config, MetricConfig, TiKvConfig};
use tikv_util::{self, config, logger};

use crate::config::ProxyConfig;
pub use crate::fatal;

#[allow(dead_code)]
pub fn overwrite_config_with_cmd_args(config: &mut TiKvConfig, matches: &ArgMatches<'_>) {
pub fn overwrite_config_with_cmd_args(
config: &mut TiKvConfig,
proxy_config: &mut ProxyConfig,
matches: &ArgMatches<'_>,
) {
if let Some(level) = matches.value_of("log-level") {
config.log.level = logger::get_level_by_string(level).unwrap();
config.log_level = slog::Level::Info;
Expand Down Expand Up @@ -47,21 +52,21 @@ pub fn overwrite_config_with_cmd_args(config: &mut TiKvConfig, matches: &ArgMatc
}

if let Some(engine_store_version) = matches.value_of("engine-version") {
config.server.engine_store_version = engine_store_version.to_owned();
proxy_config.engine_store_version = engine_store_version.to_owned();
}

if let Some(engine_store_git_hash) = matches.value_of("engine-git-hash") {
config.server.engine_store_git_hash = engine_store_git_hash.to_owned();
proxy_config.engine_store_git_hash = engine_store_git_hash.to_owned();
}

if config.server.engine_addr.is_empty() {
if proxy_config.engine_addr.is_empty() {
if let Some(engine_addr) = matches.value_of("engine-addr") {
config.server.engine_addr = engine_addr.to_owned();
proxy_config.engine_addr = engine_addr.to_owned();
}
}

if let Some(engine_addr) = matches.value_of("advertise-engine-addr") {
config.server.engine_addr = engine_addr.to_owned();
proxy_config.engine_addr = engine_addr.to_owned();
}

if let Some(data_dir) = matches.value_of("data-dir") {
Expand Down
20 changes: 11 additions & 9 deletions components/server/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ use tikv::{
config::{Config as ServerConfig, ServerConfigManager},
create_raft_storage,
gc_worker::{AutoGcConfig, GcWorker},
lock_manager::HackedLockManager as LockManager,
lock_manager::LockManager,
raftkv::ReplicaReadLockChecker,
resolve,
service::{DebugService, DiagnosticsService},
Expand Down Expand Up @@ -519,13 +519,14 @@ impl<ER: RaftEngine> TiKvServer<ER> {
.engine
.set_txn_extra_scheduler(Arc::new(txn_extra_scheduler));

// let lock_mgr = LockManager::new(&self.config.pessimistic_txn);
let lock_mgr = LockManager::new();
// cfg_controller.register(
// tikv::config::Module::PessimisticTxn,
// Box::new(lock_mgr.config_manager()),
// );
// lock_mgr.register_detector_role_change_observer(self.coprocessor_host.as_mut().unwrap());
// Recover TiKV's lock manager, since we don't use this crate now.
let lock_mgr = LockManager::new(&self.config.pessimistic_txn);
// let lock_mgr = LockManager::new();
cfg_controller.register(
tikv::config::Module::PessimisticTxn,
Box::new(lock_mgr.config_manager()),
);
lock_mgr.register_detector_role_change_observer(self.coprocessor_host.as_mut().unwrap());

let engines = self.engines.as_ref().unwrap();

Expand Down Expand Up @@ -613,7 +614,7 @@ impl<ER: RaftEngine> TiKvServer<ER> {
in_memory_pessimistic_lock: Arc::new(AtomicBool::new(true)),
};

let storage = create_raft_storage::<_, _, _, F>(
let storage = create_raft_storage::<_, _, _, F, _>(
engines.engine.clone(),
&self.config.storage,
storage_read_pool_handle,
Expand Down Expand Up @@ -751,6 +752,7 @@ impl<ER: RaftEngine> TiKvServer<ER> {
self.state.clone(),
self.background_worker.clone(),
Some(health_service.clone()),
None,
);
node.try_bootstrap_store(engines.engines.clone())
.unwrap_or_else(|e| fatal!("failed to bootstrap node id: {}", e));
Expand Down
18 changes: 0 additions & 18 deletions components/server/src/setup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -260,24 +260,6 @@ pub fn overwrite_config_with_cmd_args(config: &mut TiKvConfig, matches: &ArgMatc
config.server.advertise_status_addr = advertise_status_addr.to_owned();
}

if let Some(engine_store_version) = matches.value_of("engine-version") {
config.server.engine_store_version = engine_store_version.to_owned();
}

if let Some(engine_store_git_hash) = matches.value_of("engine-git-hash") {
config.server.engine_store_git_hash = engine_store_git_hash.to_owned();
}

if config.server.engine_addr.is_empty() {
if let Some(engine_addr) = matches.value_of("engine-addr") {
config.server.engine_addr = engine_addr.to_owned();
}
}

if let Some(engine_addr) = matches.value_of("advertise-engine-addr") {
config.server.engine_addr = engine_addr.to_owned();
}

if let Some(data_dir) = matches.value_of("data-dir") {
config.storage.data_dir = data_dir.to_owned();
}
Expand Down
Loading