Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cdc: limit cdc event fetching speed to reduce RocksDB read load #15849

Merged
merged 2 commits into from
Nov 1, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 47 additions & 3 deletions components/cdc/src/endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -384,6 +384,7 @@ pub struct Endpoint<T, E, S> {
workers: Runtime,
scan_concurrency_semaphore: Arc<Semaphore>,
scan_speed_limiter: Limiter,
fetch_speed_limiter: Limiter,
max_scan_batch_bytes: usize,
max_scan_batch_size: usize,
sink_memory_quota: Arc<MemoryQuota>,
Expand Down Expand Up @@ -439,11 +440,16 @@ impl<T: 'static + CdcHandle<E>, E: KvEngine, S: StoreRegionMeta> Endpoint<T, E,
let scan_concurrency_semaphore =
Arc::new(Semaphore::new(config.incremental_scan_concurrency));
let old_value_cache = OldValueCache::new(config.old_value_cache_memory_quota);
let speed_limiter = Limiter::new(if config.incremental_scan_speed_limit.0 > 0 {
let scan_speed_limiter = Limiter::new(if config.incremental_scan_speed_limit.0 > 0 {
config.incremental_scan_speed_limit.0 as f64
} else {
f64::INFINITY
});
let fetch_speed_limiter = Limiter::new(if config.incremental_fetch_speed_limit.0 > 0 {
config.incremental_fetch_speed_limit.0 as f64
} else {
f64::INFINITY
});

CDC_SINK_CAP.set(sink_memory_quota.capacity() as i64);
// For scan efficiency, the scan batch bytes should be around 1MB.
Expand All @@ -469,7 +475,8 @@ impl<T: 'static + CdcHandle<E>, E: KvEngine, S: StoreRegionMeta> Endpoint<T, E,
pd_client,
tso_worker,
timer: SteadyTimer::default(),
scan_speed_limiter: speed_limiter,
scan_speed_limiter,
fetch_speed_limiter,
max_scan_batch_bytes,
max_scan_batch_size,
config: config.clone(),
Expand Down Expand Up @@ -550,6 +557,15 @@ impl<T: 'static + CdcHandle<E>, E: KvEngine, S: StoreRegionMeta> Endpoint<T, E,

self.scan_speed_limiter.set_speed_limit(new_speed_limit);
}
if change.get("incremental_fetch_speed_limit").is_some() {
let new_speed_limit = if self.config.incremental_fetch_speed_limit.0 > 0 {
self.config.incremental_fetch_speed_limit.0 as f64
} else {
f64::INFINITY
};

self.fetch_speed_limiter.set_speed_limit(new_speed_limit);
}
}

pub fn set_max_scan_batch_size(&mut self, max_scan_batch_size: usize) {
Expand Down Expand Up @@ -793,7 +809,8 @@ impl<T: 'static + CdcHandle<E>, E: KvEngine, S: StoreRegionMeta> Endpoint<T, E,
sink: conn.get_sink().clone(),
request_id: request.get_request_id(),
downstream_state,
speed_limiter: self.scan_speed_limiter.clone(),
scan_speed_limiter: self.scan_speed_limiter.clone(),
fetch_speed_limiter: self.fetch_speed_limiter.clone(),
max_scan_batch_bytes: self.max_scan_batch_bytes,
max_scan_batch_size: self.max_scan_batch_size,
observe_id,
Expand Down Expand Up @@ -1769,6 +1786,33 @@ mod tests {
< f64::EPSILON
);
}

// Modify incremental_fetch_speed_limit.
{
let mut updated_cfg = cfg.clone();
{
updated_cfg.incremental_fetch_speed_limit = ReadableSize::mb(2048);
}
let diff = cfg.diff(&updated_cfg);

assert_eq!(
ep.config.incremental_fetch_speed_limit,
ReadableSize::mb(512)
);
assert!(
(ep.fetch_speed_limiter.speed_limit() - ReadableSize::mb(512).0 as f64).abs()
< f64::EPSILON
);
ep.run(Task::ChangeConfig(diff));
assert_eq!(
ep.config.incremental_fetch_speed_limit,
ReadableSize::mb(2048)
);
assert!(
(ep.fetch_speed_limiter.speed_limit() - ReadableSize::mb(2048).0 as f64).abs()
< f64::EPSILON
);
}
}

#[test]
Expand Down
28 changes: 17 additions & 11 deletions components/cdc/src/initializer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,9 @@ pub(crate) struct Initializer<E> {
pub(crate) request_id: u64,
pub(crate) checkpoint_ts: TimeStamp,

pub(crate) speed_limiter: Limiter,
pub(crate) scan_speed_limiter: Limiter,
pub(crate) fetch_speed_limiter: Limiter,

pub(crate) max_scan_batch_bytes: usize,
pub(crate) max_scan_batch_size: usize,

Expand Down Expand Up @@ -404,16 +406,14 @@ impl<E: KvEngine> Initializer<E> {
perf_delta,
} = self.do_scan(scanner, old_value_cursors, &mut entries)?;

CDC_SCAN_BYTES.inc_by(emit as _);
TLS_CDC_PERF_STATS.with(|x| *x.borrow_mut() += perf_delta);
tls_flush_perf_stats();
let require = if let Some(bytes) = disk_read {
if let Some(bytes) = disk_read {
CDC_SCAN_DISK_READ_BYTES.inc_by(bytes as _);
bytes
} else {
perf_delta.block_read_byte as usize
};
self.speed_limiter.consume(require).await;
self.scan_speed_limiter.consume(bytes).await;
}
CDC_SCAN_BYTES.inc_by(emit as _);
self.fetch_speed_limiter.consume(emit as _).await;

if let Some(resolver) = resolver {
// Track the locks.
Expand Down Expand Up @@ -624,7 +624,8 @@ mod tests {
}

fn mock_initializer(
speed_limit: usize,
scan_limit: usize,
fetch_limit: usize,
buffer: usize,
engine: Option<RocksEngine>,
kv_api: ChangeDataRequestKvApi,
Expand Down Expand Up @@ -665,7 +666,8 @@ mod tests {
conn_id: ConnId::new(),
request_id: 0,
checkpoint_ts: 1.into(),
speed_limiter: Limiter::new(speed_limit as _),
scan_speed_limiter: Limiter::new(scan_limit as _),
fetch_speed_limiter: Limiter::new(fetch_limit as _),
max_scan_batch_bytes: 1024 * 1024,
max_scan_batch_size: 1024,
build_resolver: true,
Expand Down Expand Up @@ -717,6 +719,7 @@ mod tests {
// Buffer must be large enough to unblock async incremental scan.
let buffer = 1000;
let (mut worker, pool, mut initializer, rx, mut drain) = mock_initializer(
total_bytes,
total_bytes,
buffer,
engine.kv_engine(),
Expand Down Expand Up @@ -832,6 +835,7 @@ mod tests {
// Buffer must be large enough to unblock async incremental scan.
let buffer = 1000;
let (mut worker, pool, mut initializer, _rx, mut drain) = mock_initializer(
total_bytes,
total_bytes,
buffer,
engine.kv_engine(),
Expand Down Expand Up @@ -914,6 +918,7 @@ mod tests {
// Do incremental scan with different `hint_min_ts` values.
for checkpoint_ts in [200, 100, 150] {
let (mut worker, pool, mut initializer, _rx, mut drain) = mock_initializer(
usize::MAX,
usize::MAX,
1000,
engine.kv_engine(),
Expand Down Expand Up @@ -979,6 +984,7 @@ mod tests {
let total_bytes = 1;
let buffer = 1;
let (mut worker, _pool, mut initializer, rx, _drain) = mock_initializer(
total_bytes,
total_bytes,
buffer,
None,
Expand Down Expand Up @@ -1034,7 +1040,7 @@ mod tests {
let total_bytes = 1;
let buffer = 1;
let (mut worker, pool, mut initializer, _rx, _drain) =
mock_initializer(total_bytes, buffer, None, kv_api, false);
mock_initializer(total_bytes, total_bytes, buffer, None, kv_api, false);

let change_cmd = ChangeObserver::from_cdc(1, ObserveHandle::new());
let raft_router = CdcRaftRouter(MockRaftStoreRouter::new());
Expand Down
5 changes: 5 additions & 0 deletions src/config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2940,7 +2940,11 @@ pub struct CdcConfig {
#[online_config(skip)]
pub incremental_scan_threads: usize,
pub incremental_scan_concurrency: usize,
/// Limit scan speed based on disk I/O traffic.
pub incremental_scan_speed_limit: ReadableSize,
/// Limit scan speed based on memory accesing traffic.
#[doc(hidden)]
pub incremental_fetch_speed_limit: ReadableSize,
/// `TsFilter` can increase speed and decrease resource usage when
/// incremental content is much less than total content. However in
/// other cases, `TsFilter` can make performance worse because it needs
Expand Down Expand Up @@ -2979,6 +2983,7 @@ impl Default for CdcConfig {
// TiCDC requires a SSD, the typical write speed of SSD
// is more than 500MB/s, so 128MB/s is enough.
incremental_scan_speed_limit: ReadableSize::mb(128),
incremental_fetch_speed_limit: ReadableSize::mb(512),
incremental_scan_ts_filter_ratio: 0.2,
tso_worker_threads: 1,
// 512MB memory for CDC sink.
Expand Down
1 change: 1 addition & 0 deletions tests/integrations/config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -851,6 +851,7 @@ fn test_serde_custom_tikv_config() {
incremental_scan_threads: 3,
incremental_scan_concurrency: 4,
incremental_scan_speed_limit: ReadableSize(7),
incremental_fetch_speed_limit: ReadableSize(8),
incremental_scan_ts_filter_ratio: 0.7,
tso_worker_threads: 2,
old_value_cache_memory_quota: ReadableSize::mb(14),
Expand Down
1 change: 1 addition & 0 deletions tests/integrations/config/test-custom.toml
Original file line number Diff line number Diff line change
Expand Up @@ -699,6 +699,7 @@ hibernate-regions-compatible = false
incremental-scan-threads = 3
incremental-scan-concurrency = 4
incremental-scan-speed-limit = 7
incremental-fetch-speed-limit = 8
incremental-scan-ts-filter-ratio = 0.7
tso-worker-threads = 2
old-value-cache-memory-quota = "14MB"
Expand Down