Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Trace memory alloc/dealloc by thread #366

Merged
merged 27 commits into from
Mar 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions proxy_components/engine_store_ffi/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ openssl-vendored = [
"openssl/vendored"
]

jemalloc = ["proxy_ffi/jemalloc"]
external-jemalloc = ["proxy_ffi/external-jemalloc"]

[dependencies]
batch-system = { workspace = true, default-features = false }
bitflags = "1.0.1"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@ impl<T: Transport + 'static, ER: RaftEngine> ProxyForwarder<T, ER> {
}

pub fn on_region_changed(&self, ob_region: &Region, e: RegionChangeEvent, r: StateRole) {
self.engine_store_server_helper
.maybe_jemalloc_register_alloc();
self.engine_store_server_helper
.directly_report_jemalloc_alloc();
let region_id = ob_region.get_id();
if e == RegionChangeEvent::Destroy {
info!(
Expand Down Expand Up @@ -104,6 +108,10 @@ impl<T: Transport + 'static, ER: RaftEngine> ProxyForwarder<T, ER> {
}

pub fn on_role_change(&self, ob_region: &Region, r: &RoleChange) {
self.engine_store_server_helper
.maybe_jemalloc_register_alloc();
self.engine_store_server_helper
.directly_report_jemalloc_alloc();
let region_id = ob_region.get_id();
let is_replicated = !r.initialized;
let is_fap_enabled = if let Some(b) = self.engine.proxy_ext.config_set.as_ref() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,6 @@ pub fn maybe_collect_states(
}

pub fn collect_all_states(cluster_ext: &ClusterExt, region_id: u64) -> HashMap<u64, States> {
maybe_collect_states(cluster_ext, region_id, None);
let prev_state = maybe_collect_states(cluster_ext, region_id, None);
assert_eq!(
prev_state.len(),
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
// Copyright 2022 TiKV Project Authors. Licensed under Apache-2.0.

use std::{cell::RefCell, pin::Pin, sync::atomic::Ordering};
use std::{
cell::RefCell,
pin::Pin,
sync::{atomic::Ordering, Mutex},
};

use engine_store_ffi::TiFlashEngine;

Expand All @@ -18,6 +22,23 @@ use super::{
};
use crate::mock_cluster;

#[derive(Clone)]
pub struct ThreadInfoJealloc {
pub allocated_ptr: u64,
pub deallocated_ptr: u64,
}

impl ThreadInfoJealloc {
pub fn allocated(&self) -> u64 {
unsafe { *(self.allocated_ptr as *const u64) }
}
pub fn deallocated(&self) -> u64 {
unsafe { *(self.deallocated_ptr as *const u64) }
}
pub fn remaining(&self) -> i64 {
self.allocated() as i64 - self.deallocated() as i64
}
}
pub struct EngineStoreServer {
pub id: u64,
// TODO engines maybe changed into TabletRegistry?
Expand All @@ -28,6 +49,7 @@ pub struct EngineStoreServer {
pub page_storage: MockPageStorage,
// (region_id, peer_id) -> MockRegion
pub tmp_fap_regions: HashMap<RegionId, Box<MockRegion>>,
pub thread_info_map: Mutex<HashMap<String, ThreadInfoJealloc>>,
}

impl EngineStoreServer {
Expand All @@ -40,6 +62,7 @@ impl EngineStoreServer {
region_states: RefCell::new(Default::default()),
page_storage: Default::default(),
tmp_fap_regions: Default::default(),
thread_info_map: Default::default(),
}
}

Expand Down Expand Up @@ -369,6 +392,8 @@ pub fn gen_engine_store_server_helper(
fn_query_fap_snapshot_state: Some(ffi_query_fap_snapshot_state),
fn_kvstore_region_exists: Some(ffi_kvstore_region_exists),
fn_clear_fap_snapshot: Some(ffi_clear_fap_snapshot),
fn_report_thread_allocate_info: Some(ffi_report_thread_allocate_info),
fn_report_thread_allocate_batch: Some(ffi_report_thread_allocate_batch),
ps: PageStorageInterfaces {
fn_create_write_batch: Some(ffi_mockps_create_write_batch),
fn_wb_put_page: Some(ffi_mockps_wb_put_page),
Expand Down Expand Up @@ -609,3 +634,49 @@ unsafe extern "C" fn ffi_get_lock_by_key(
},
}
}

unsafe extern "C" fn ffi_report_thread_allocate_info(
arg1: *mut interfaces_ffi::EngineStoreServerWrap,
_: u64,
name: interfaces_ffi::BaseBuffView,
t: interfaces_ffi::ReportThreadAllocateInfoType,
value: u64,
) {
let store = into_engine_store_server_wrap(arg1);
let tn = std::str::from_utf8(name.to_slice()).unwrap().to_string();
match (*store.engine_store_server)
.thread_info_map
.lock()
.expect("poisoned")
.entry(tn)
{
std::collections::hash_map::Entry::Occupied(mut o) => {
if t == interfaces_ffi::ReportThreadAllocateInfoType::AllocPtr {
o.get_mut().allocated_ptr = value;
} else if t == interfaces_ffi::ReportThreadAllocateInfoType::DeallocPtr {
o.get_mut().deallocated_ptr = value;
}
}
std::collections::hash_map::Entry::Vacant(v) => {
if t == interfaces_ffi::ReportThreadAllocateInfoType::AllocPtr {
v.insert(ThreadInfoJealloc {
allocated_ptr: value,
deallocated_ptr: 0,
});
} else if t == interfaces_ffi::ReportThreadAllocateInfoType::DeallocPtr {
v.insert(ThreadInfoJealloc {
allocated_ptr: 0,
deallocated_ptr: value,
});
}
}
}
}

unsafe extern "C" fn ffi_report_thread_allocate_batch(
_: *mut interfaces_ffi::EngineStoreServerWrap,
_: u64,
_name: interfaces_ffi::BaseBuffView,
_data: interfaces_ffi::ReportThreadAllocateInfoBatch,
) {
}
3 changes: 3 additions & 0 deletions proxy_components/proxy_ffi/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,15 @@ test-engines-rocksdb = [
test-engines-panic = [
"engine_test/test-engines-panic",
]
jemalloc = []

# TODO use encryption/openssl-vendored if later supports
openssl-vendored = [
"openssl/vendored"
]

external-jemalloc = []

[dependencies]
encryption = { workspace = true, default-features = false }
openssl = { workspace = true } # TODO only for feature
Expand Down
110 changes: 108 additions & 2 deletions proxy_components/proxy_ffi/src/engine_store_helper_impls.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
// Copyright 2016 TiKV Project Authors. Licensed under Apache-2.0.
use std::pin::Pin;
use std::{cell::RefCell, pin::Pin};

use kvproto::{kvrpcpb, metapb, raft_cmdpb};

Expand Down Expand Up @@ -34,6 +34,13 @@ pub fn gen_engine_store_server_helper(
unsafe { &(*(engine_store_server_helper as *const EngineStoreServerHelper)) }
}

thread_local! {
pub static JEMALLOC_REGISTERED: RefCell<bool> = RefCell::new(false);
pub static JEMALLOC_TNAME: RefCell<(String, u64)> = RefCell::new(Default::default());
pub static JEMALLOC_ALLOCP: RefCell<*mut u64> = RefCell::new(std::ptr::null_mut());
pub static JEMALLOC_DEALLOCP: RefCell<*mut u64> = RefCell::new(std::ptr::null_mut());
}

/// # Safety
/// The lifetime of `engine_store_server_helper` is definitely longer than
/// `ENGINE_STORE_SERVER_HELPER_PTR`.
Expand All @@ -49,6 +56,85 @@ pub fn set_server_info_resp(res: &kvproto::diagnosticspb::ServerInfoResponse, pt
}

impl EngineStoreServerHelper {
pub fn maybe_jemalloc_register_alloc(&self) {
JEMALLOC_REGISTERED.with(|b| {
if !*b.borrow() {
unsafe {
let ptr_alloc: u64 = crate::jemalloc_utils::get_allocatep_on_thread_start();
let ptr_dealloc: u64 = crate::jemalloc_utils::get_deallocatep_on_thread_start();
let thread_name = std::thread::current().name().unwrap_or("").to_string();
let thread_id: u64 = std::thread::current().id().as_u64().into();
(self.fn_report_thread_allocate_info.into_inner())(
self.inner,
thread_id,
BaseBuffView::from(thread_name.as_bytes()),
interfaces_ffi::ReportThreadAllocateInfoType::Reset,
0,
);
(self.fn_report_thread_allocate_info.into_inner())(
self.inner,
thread_id,
BaseBuffView::from(thread_name.as_bytes()),
interfaces_ffi::ReportThreadAllocateInfoType::AllocPtr,
ptr_alloc,
);
(self.fn_report_thread_allocate_info.into_inner())(
self.inner,
thread_id,
BaseBuffView::from(thread_name.as_bytes()),
interfaces_ffi::ReportThreadAllocateInfoType::DeallocPtr,
ptr_dealloc,
);

// Some threads are not everlasting, so we don't want TiFlash to directly access
// the pointer.
JEMALLOC_TNAME.with(|p| {
*p.borrow_mut() = (thread_name, thread_id);
});
if ptr_alloc != 0 {
JEMALLOC_ALLOCP.with(|p| {
*p.borrow_mut() = ptr_alloc as *mut u64;
});
}
if ptr_dealloc != 0 {
JEMALLOC_DEALLOCP.with(|p| {
*p.borrow_mut() = ptr_dealloc as *mut u64;
});
}
}
*(b.borrow_mut()) = true;
}
});
}

pub fn directly_report_jemalloc_alloc(&self) {
JEMALLOC_TNAME.with(|thread_info| unsafe {
let a = JEMALLOC_ALLOCP.with(|p| {
let p = *p.borrow_mut();
if p.is_null() {
return 0;
}
*p
});
let d = JEMALLOC_DEALLOCP.with(|p| {
let p = *p.borrow_mut();
if p.is_null() {
return 0;
}
*p
});
(self.fn_report_thread_allocate_batch.into_inner())(
self.inner,
thread_info.borrow().1,
BaseBuffView::from(thread_info.borrow().0.as_bytes()),
interfaces_ffi::ReportThreadAllocateInfoBatch {
alloc: a,
dealloc: d,
},
);
});
}

pub fn gc_raw_cpp_ptr(&self, ptr: *mut ::std::os::raw::c_void, tp: RawCppPtrType) {
debug_assert!(self.fn_gc_raw_cpp_ptr.is_some());
unsafe {
Expand Down Expand Up @@ -82,6 +168,7 @@ impl EngineStoreServerHelper {

pub fn handle_compute_store_stats(&self) -> StoreStats {
debug_assert!(self.fn_handle_compute_store_stats.is_some());
self.maybe_jemalloc_register_alloc();
unsafe { (self.fn_handle_compute_store_stats.into_inner())(self.inner) }
}

Expand All @@ -91,16 +178,22 @@ impl EngineStoreServerHelper {
header: RaftCmdHeader,
) -> EngineStoreApplyRes {
debug_assert!(self.fn_handle_write_raft_cmd.is_some());
self.maybe_jemalloc_register_alloc();
self.directly_report_jemalloc_alloc();
unsafe { (self.fn_handle_write_raft_cmd.into_inner())(self.inner, cmds.gen_view(), header) }
}

pub fn handle_get_engine_store_server_status(&self) -> EngineStoreServerStatus {
debug_assert!(self.fn_handle_get_engine_store_server_status.is_some());
self.maybe_jemalloc_register_alloc();
self.directly_report_jemalloc_alloc();
unsafe { (self.fn_handle_get_engine_store_server_status.into_inner())(self.inner) }
}

pub fn handle_set_proxy(&self, proxy: *const RaftStoreProxyFFIHelper) {
debug_assert!(self.fn_atomic_update_proxy.is_some());
self.maybe_jemalloc_register_alloc();
self.directly_report_jemalloc_alloc();
unsafe { (self.fn_atomic_update_proxy.into_inner())(self.inner, proxy as *mut _) }
}

Expand Down Expand Up @@ -129,7 +222,8 @@ impl EngineStoreServerHelper {
header: RaftCmdHeader,
) -> EngineStoreApplyRes {
debug_assert!(self.fn_handle_admin_raft_cmd.is_some());

self.maybe_jemalloc_register_alloc();
self.directly_report_jemalloc_alloc();
unsafe {
let req = ProtoMsgBaseBuff::new(req);
let resp = ProtoMsgBaseBuff::new(resp);
Expand Down Expand Up @@ -158,6 +252,8 @@ impl EngineStoreServerHelper {
term: u64,
) -> bool {
debug_assert!(self.fn_try_flush_data.is_some());
self.maybe_jemalloc_register_alloc();
self.directly_report_jemalloc_alloc();
// TODO(proactive flush)
unsafe {
(self.fn_try_flush_data.into_inner())(
Expand Down Expand Up @@ -187,6 +283,8 @@ impl EngineStoreServerHelper {
) -> RawCppPtr {
debug_assert!(self.fn_pre_handle_snapshot.is_some());

self.maybe_jemalloc_register_alloc();
self.directly_report_jemalloc_alloc();
let snaps_view = into_sst_views(snaps);
unsafe {
let region = ProtoMsgBaseBuff::new(region);
Expand All @@ -203,13 +301,17 @@ impl EngineStoreServerHelper {

pub fn apply_pre_handled_snapshot(&self, snap: RawCppPtr) {
debug_assert!(self.fn_apply_pre_handled_snapshot.is_some());
self.maybe_jemalloc_register_alloc();
self.directly_report_jemalloc_alloc();
unsafe {
(self.fn_apply_pre_handled_snapshot.into_inner())(self.inner, snap.ptr, snap.type_)
}
}

pub fn abort_pre_handle_snapshot(&self, region_id: u64, peer_id: u64) {
debug_assert!(self.fn_abort_pre_handle_snapshot.is_some());
self.maybe_jemalloc_register_alloc();
self.directly_report_jemalloc_alloc();
unsafe { (self.fn_abort_pre_handle_snapshot.into_inner())(self.inner, region_id, peer_id) }
}

Expand Down Expand Up @@ -277,6 +379,8 @@ impl EngineStoreServerHelper {
) -> EngineStoreApplyRes {
debug_assert!(self.fn_handle_ingest_sst.is_some());

self.maybe_jemalloc_register_alloc();
self.directly_report_jemalloc_alloc();
let snaps_view = into_sst_views(snaps);
unsafe {
(self.fn_handle_ingest_sst.into_inner())(
Expand All @@ -290,6 +394,8 @@ impl EngineStoreServerHelper {
pub fn handle_destroy(&self, region_id: u64) {
debug_assert!(self.fn_handle_destroy.is_some());

self.maybe_jemalloc_register_alloc();
self.directly_report_jemalloc_alloc();
unsafe {
(self.fn_handle_destroy.into_inner())(self.inner, region_id);
}
Expand Down
Loading
Loading