Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(storage): use exist_table_id to filter exist key when compaction #3038

Merged
merged 5 commits into from
Jun 9, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions proto/hummock.proto
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,8 @@ message CompactTask {
repeated common.ParallelUnitMapping vnode_mappings = 11;
// compaction group the task belongs to
uint64 compaction_group_id = 12;
// existing_table_ids for compaction drop key
repeated uint32 existing_table_ids = 13;
}

message LevelHandler {
Expand Down
2 changes: 2 additions & 0 deletions src/meta/src/hummock/compaction/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ impl CompactStatus {
} else {
ret.split_ranges
};

let compact_task = CompactTask {
input_ssts: vec![ret.select_level, ret.target_level],
splits,
Expand Down Expand Up @@ -157,6 +158,7 @@ impl CompactStatus {
task_status: false,
vnode_mappings: vec![],
compaction_group_id,
existing_table_ids: vec![],
};
Some(compact_task)
}
Expand Down
1 change: 1 addition & 0 deletions src/meta/src/hummock/compactor_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,7 @@ mod tests {
task_status: false,
vnode_mappings: vec![],
compaction_group_id: StaticCompactionGroupId::SharedBuffer.into(),
existing_table_ids: vec![],
}
}

Expand Down
46 changes: 32 additions & 14 deletions src/meta/src/hummock/hummock_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ use crate::manager::{IdCategory, MetaSrvEnv};
use crate::model::{MetadataModel, ValTransaction, VarTransaction, Worker};
use crate::rpc::metrics::MetaMetrics;
use crate::storage::{MetaStore, Transaction};
use crate::stream::FragmentManagerRef;

// Update to states are performed as follow:
// - Initialize ValTransaction for the meta state to update
Expand All @@ -72,6 +73,9 @@ pub struct HummockManager<S: MetaStore> {
compaction_scheduler: parking_lot::RwLock<Option<CompactionRequestChannelRef>>,
// TODO: refactor to remove this field
config: Arc<CompactionConfig>,

// for compaction to get some info (e.g. existing_table_ids)
fragment_manager: FragmentManagerRef<S>,
}

pub type HummockManagerRef<S> = Arc<HummockManager<S>>;
Expand Down Expand Up @@ -161,6 +165,7 @@ where
cluster_manager: ClusterManagerRef<S>,
metrics: Arc<MetaMetrics>,
compaction_group_manager: CompactionGroupManagerRef<S>,
fragment_manager: FragmentManagerRef<S>,
) -> Result<HummockManager<S>> {
let config = compaction_group_manager.config().clone();
let instance = HummockManager {
Expand All @@ -172,6 +177,7 @@ where
compaction_group_manager,
compaction_scheduler: parking_lot::RwLock::new(None),
config: Arc::new(config),
fragment_manager,
};

instance.load_meta_store_state().await?;
Expand Down Expand Up @@ -573,6 +579,7 @@ where
task_id as HummockCompactionTaskId,
compaction_group_id,
);
let existing_table_ids_from_meta = self.fragment_manager.existing_table_ids().await?;
let ret = match compact_task {
None => Ok(None),
Some(mut compact_task) => {
Expand All @@ -590,22 +597,28 @@ where
.flat_map(|v| v.snapshot_id.clone())
.fold(max_committed_epoch, std::cmp::min)
};

// to get all relational table_id from sst_info
let table_ids = compact_task
.input_ssts
.iter()
.flat_map(|level| {
level
.table_infos
.iter()
.flat_map(|sst_info| {
sst_info.vnode_bitmaps.iter().map(|bitmap| bitmap.table_id)
})
.collect_vec()
})
.collect::<HashSet<u32>>();

if compact_task.target_level != 0 {
let table_ids = compact_task
.input_ssts
.iter()
.flat_map(|level| {
level
.table_infos
.iter()
.flat_map(|sst_info| {
sst_info.vnode_bitmaps.iter().map(|bitmap| bitmap.table_id)
})
.collect_vec()
})
.collect::<HashSet<u32>>();
compact_task.vnode_mappings.reserve_exact(table_ids.len());
for table_id in table_ids {
}

for table_id in table_ids {
if compact_task.target_level != 0 {
if let Some(vnode_mapping) = self
.env
.hash_mapping_manager()
Expand All @@ -620,6 +633,11 @@ where
compact_task.vnode_mappings.push(compressed_mapping);
}
}

// to found exist table_id from
if existing_table_ids_from_meta.contains(&table_id) {
compact_task.existing_table_ids.push(table_id);
}
}

commit_multi_var!(self, None, compact_status)?;
Expand Down
3 changes: 3 additions & 0 deletions src/meta/src/hummock/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ use crate::hummock::{HummockManager, HummockManagerRef};
use crate::manager::MetaSrvEnv;
use crate::rpc::metrics::MetaMetrics;
use crate::storage::{MemStore, MetaStore};
use crate::stream::FragmentManager;

pub async fn add_test_tables<S>(
hummock_manager: &HummockManager<S>,
Expand Down Expand Up @@ -166,12 +167,14 @@ pub async fn setup_compute_env(
CompactionGroupManager::new_with_config(env.clone(), config.clone())
.await
.unwrap();
let fragment_manager = Arc::new(FragmentManager::new(env.clone()).await.unwrap());
let hummock_manager = Arc::new(
HummockManager::new(
env.clone(),
cluster_manager.clone(),
Arc::new(MetaMetrics::new()),
Arc::new(compaction_group_manager),
fragment_manager.clone(),
)
.await
.unwrap(),
Expand Down
1 change: 1 addition & 0 deletions src/meta/src/rpc/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ pub async fn rpc_serve_with_store<S: MetaStore>(
cluster_manager.clone(),
meta_metrics.clone(),
compaction_group_manager.clone(),
fragment_manager.clone(),
)
.await
.unwrap(),
Expand Down
17 changes: 17 additions & 0 deletions src/meta/src/stream/meta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -466,4 +466,21 @@ where
}
Ok(())
}

// existing_table_ids include the table_ref_id (source and materialized_view) +
// internal_table_id (stateful executor)
pub async fn existing_table_ids(&self) -> Result<HashSet<u32>> {
let mut result_set = HashSet::default();
let map = &self.core.read().await.table_fragments;

for (k, v) in map {
result_set.insert(k.table_id());

for internal_table_id in v.internal_table_ids() {
result_set.insert(internal_table_id);
}
}

Ok(result_set)
}
}
1 change: 1 addition & 0 deletions src/meta/src/stream/stream_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -838,6 +838,7 @@ mod tests {
cluster_manager.clone(),
meta_metrics.clone(),
compaction_group_manager.clone(),
fragment_manager.clone(),
)
.await?,
);
Expand Down
87 changes: 81 additions & 6 deletions src/storage/src/hummock/compactor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashMap;
use std::collections::{HashMap, HashSet};
use std::future::Future;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
Expand All @@ -26,7 +26,7 @@ use risingwave_common::config::StorageConfig;
use risingwave_common::util::compress::decompress_data;
use risingwave_hummock_sdk::compact::compact_task_to_string;
use risingwave_hummock_sdk::compaction_group::StaticCompactionGroupId;
use risingwave_hummock_sdk::key::{get_epoch, Epoch, FullKey};
use risingwave_hummock_sdk::key::{get_epoch, get_table_id, Epoch, FullKey};
use risingwave_hummock_sdk::key_range::KeyRange;
use risingwave_hummock_sdk::{HummockSSTableId, VersionedComparator};
use risingwave_pb::common::VNodeBitmap;
Expand Down Expand Up @@ -92,6 +92,38 @@ pub struct CompactorContext {
pub compaction_executor: Option<Arc<CompactionExecutor>>,
}

trait CompactionFilter {
fn filter(&self, _: &[u8]) -> bool {
true
}
}

pub struct DummyCompactionFilter;
impl CompactionFilter for DummyCompactionFilter {}

#[derive(Clone)]
pub struct StateCleanUpCompactionFilter {
existing_table_ids: HashSet<u32>,
}

impl StateCleanUpCompactionFilter {
fn new(table_id_set: HashSet<u32>) -> Self {
StateCleanUpCompactionFilter {
existing_table_ids: table_id_set,
}
}
}

impl CompactionFilter for StateCleanUpCompactionFilter {
fn filter(&self, key: &[u8]) -> bool {
let table_id_option = get_table_id(key);
match table_id_option {
None => true,
Some(table_id) => self.existing_table_ids.contains(&table_id),
}
}
}

#[derive(Clone)]
/// Implementation of Hummock compaction.
pub struct Compactor {
Expand Down Expand Up @@ -161,6 +193,7 @@ impl Compactor {
// VNode mappings are not required when compacting shared buffer to L0
vnode_mappings: vec![],
compaction_group_id: StaticCompactionGroupId::StateDefault.into(),
existing_table_ids: vec![],
};

let sstable_store = context.sstable_store.clone();
Expand All @@ -187,6 +220,7 @@ impl Compactor {
.await? as BoxedForwardHummockIterator;
let vnode2unit = vnode2unit.clone();
let compaction_executor = compactor.context.compaction_executor.as_ref().cloned();

let split_task = async move {
compactor
.compact_key_range(split_index, iter, vnode2unit)
Expand Down Expand Up @@ -347,14 +381,18 @@ impl Compactor {
);
}

let compaction_filter =
StateCleanUpCompactionFilter::new(HashSet::from_iter(compact_task.existing_table_ids));

for (split_index, _) in compact_task.splits.iter().enumerate() {
let compactor = compactor.clone();
let vnode2unit = vnode2unit.clone();
let compaction_executor = compactor.context.compaction_executor.as_ref().cloned();
let filter = compaction_filter.clone();
let split_task = async move {
let merge_iter = compactor.build_sst_iter().await?;
compactor
.compact_key_range(split_index, merge_iter, vnode2unit)
.compact_key_range_with_filter(split_index, merge_iter, vnode2unit, filter)
.await
};
let rx = match Compactor::request_execution(compaction_executor, split_task) {
Expand Down Expand Up @@ -447,11 +485,12 @@ impl Compactor {

/// Compact the given key range and merge iterator.
/// Upon a successful return, the built SSTs are already uploaded to object store.
async fn compact_key_range(
async fn compact_key_range_impl(
&self,
split_index: usize,
iter: BoxedForwardHummockIterator,
vnode2unit: Arc<HashMap<u32, Vec<u32>>>,
compaction_filter: impl CompactionFilter,
) -> HummockResult<CompactOutput> {
let split = self.compact_task.splits[split_index].clone();
let kr = KeyRange {
Expand Down Expand Up @@ -482,12 +521,14 @@ impl Compactor {
} else {
self.context.stats.compact_sst_duration.start_timer()
};

Compactor::compact_and_build_sst(
&mut builder,
kr,
iter,
!self.compact_task.is_target_ultimate_and_leveling,
self.compact_task.watermark,
compaction_filter,
)
.await?;
let builder_len = builder.len();
Expand Down Expand Up @@ -538,6 +579,28 @@ impl Compactor {
Ok((split_index, ssts))
}

async fn compact_key_range(
&self,
split_index: usize,
iter: BoxedForwardHummockIterator,
vnode2unit: Arc<HashMap<u32, Vec<u32>>>,
) -> HummockResult<CompactOutput> {
let dummy_compaction_filter = DummyCompactionFilter {};
self.compact_key_range_impl(split_index, iter, vnode2unit, dummy_compaction_filter)
.await
}

async fn compact_key_range_with_filter(
&self,
split_index: usize,
iter: BoxedForwardHummockIterator,
vnode2unit: Arc<HashMap<u32, Vec<u32>>>,
compaction_filter: impl CompactionFilter,
) -> HummockResult<CompactOutput> {
self.compact_key_range_impl(split_index, iter, vnode2unit, compaction_filter)
.await
}

/// Build the merge iterator based on the given input ssts.
async fn build_sst_iter(&self) -> HummockResult<BoxedForwardHummockIterator> {
let mut table_iters: Vec<BoxedForwardHummockIterator> = Vec::new();
Expand Down Expand Up @@ -722,6 +785,7 @@ impl Compactor {
mut iter: BoxedForwardHummockIterator,
has_user_key_overlap: bool,
watermark: Epoch,
compaction_filter: impl CompactionFilter,
) -> HummockResult<()>
where
B: Clone + Fn() -> F,
Expand Down Expand Up @@ -767,14 +831,25 @@ impl Compactor {

// Among keys with same user key, only retain keys which satisfy `epoch` >= `watermark`,
// and the latest key which satisfies `epoch` < `watermark`
let mut drop = false;
if epoch < watermark {
skip_key = BytesMut::from(iter_key);
if iter.value().is_delete() && !has_user_key_overlap {
iter.next().await?;
continue;
drop = true;
}
}

// in our design, frontend avoid to access keys which had be deleted, so we dont need to
// consider the epoch when the compaction_filter match (it means that mv had drop)
if !drop && !compaction_filter.filter(iter_key) {
drop = true;
}

if drop {
iter.next().await?;
continue;
}

// Don't allow two SSTs to share same user key
sst_builder
.add_full_key(FullKey::from_slice(iter_key), iter.value(), is_new_user_key)
Expand Down
Loading