diff --git a/analytic_engine/src/instance/write_worker.rs b/analytic_engine/src/instance/write_worker.rs index 8439fa1749..ab3955af78 100644 --- a/analytic_engine/src/instance/write_worker.rs +++ b/analytic_engine/src/instance/write_worker.rs @@ -304,7 +304,7 @@ impl WorkerLocal { worker_num: usize, ) -> Result<()> { let worker_id = self.data.as_ref().id; - if table_id % worker_num != worker_id { + if choose_worker(table_id, worker_num) != worker_id { return DataNotLegal { table: table_name, worker_id, @@ -666,7 +666,7 @@ impl WriteGroup { /// /// Returns the WriteHandle of the worker pub fn choose_worker(&self, table_id: TableId) -> WriteHandle { - let index = table_id.as_u64() as usize % self.worker_datas.len(); + let index = choose_worker(table_id.as_u64() as usize, self.worker_datas.len()); let worker_data = self.worker_datas[index].clone(); WriteHandle { worker_data } @@ -989,7 +989,12 @@ impl WriteWorker { self.local.data.background_notify.notified().await; } } - +/// Centralize the logic of choosing worker for table into one place. +/// Choose worker by modulo total worker_num +#[inline] +pub fn choose_worker(table_id: usize, worker_num: usize) -> usize { + table_id % worker_num +} #[cfg(test)] pub mod tests { use common_util::runtime; diff --git a/analytic_engine/src/table/data.rs b/analytic_engine/src/table/data.rs index a2ab694289..757ff6d9c6 100644 --- a/analytic_engine/src/table/data.rs +++ b/analytic_engine/src/table/data.rs @@ -29,7 +29,7 @@ use snafu::{Backtrace, OptionExt, ResultExt, Snafu}; use table_engine::{engine::CreateTableRequest, table::TableId}; use crate::{ - instance::write_worker::{WorkerLocal, WriteHandle}, + instance::write_worker::{choose_worker, WorkerLocal, WriteHandle}, memtable::{ factory::{FactoryRef as MemTableFactoryRef, Options as MemTableOptions}, skiplist::factory::SkiplistMemTableFactory, @@ -568,7 +568,7 @@ impl TableDataSet { ) -> Option { self.table_datas .values() - .filter(|t| t.id.as_u64() as usize % worker_num == worker_index) + .filter(|t| choose_worker(t.id.as_u64() as usize, worker_num) == worker_index) .max_by_key(|t| t.memtable_memory_usage()) .cloned() }