Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(meta): deprecate branched sst #16698

Merged
merged 3 commits into from
May 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion proto/hummock.proto
Original file line number Diff line number Diff line change
Expand Up @@ -889,7 +889,7 @@

message BranchedObject {
uint64 object_id = 1;
uint64 sst_id = 2;
repeated uint64 sst_id = 2;

Check failure on line 892 in proto/hummock.proto

View workflow job for this annotation

GitHub Actions / Check breaking changes in Protobuf files

Field "2" on message "BranchedObject" changed label from "optional" to "repeated".
// Compaction group id the SST belongs to.
uint64 compaction_group_id = 3;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,11 @@ use crate::catalog::system_catalog::SysCatalogReaderImpl;
use crate::error::Result;

#[derive(Fields)]
#[primary_key(object_id, sst_id)] // TODO: is this correct?
#[primary_key(object_id, compaction_group_id)]
struct RwHummockBranchedObject {
object_id: i64,
sst_id: i64,
compaction_group_id: i64,
sst_id: Vec<i64>,
}

#[system_catalog(table, "rw_catalog.rw_hummock_branched_objects")]
Expand All @@ -33,7 +33,7 @@ async fn read(reader: &SysCatalogReaderImpl) -> Result<Vec<RwHummockBranchedObje
.into_iter()
.map(|o| RwHummockBranchedObject {
object_id: o.object_id as _,
sst_id: o.sst_id as _,
sst_id: o.sst_id.into_iter().map(|id| id as _).collect(),
compaction_group_id: o.compaction_group_id as _,
})
.collect();
Expand Down
3 changes: 2 additions & 1 deletion src/meta/src/hummock/manager/checkpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use tracing::warn;
use crate::hummock::error::Result;
use crate::hummock::manager::versioning::Versioning;
use crate::hummock::manager::{read_lock, write_lock};
use crate::hummock::metrics_utils::trigger_gc_stat;
use crate::hummock::metrics_utils::{trigger_gc_stat, trigger_split_stat};
use crate::hummock::HummockManager;

#[derive(Default)]
Expand Down Expand Up @@ -235,6 +235,7 @@ impl HummockManager {

let min_pinned_version_id = versioning.min_pinned_version_id();
trigger_gc_stat(&self.metrics, &versioning.checkpoint, min_pinned_version_id);
trigger_split_stat(&self.metrics, &versioning.current_version);
drop(versioning_guard);
timer.observe_duration();
self.metrics
Expand Down
105 changes: 10 additions & 95 deletions src/meta/src/hummock/manager/compaction_group_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ use function_name::named;
use itertools::Itertools;
use risingwave_common::catalog::TableId;
use risingwave_hummock_sdk::compaction_group::hummock_version_ext::{
build_version_delta_after_version, get_compaction_group_ids, get_compaction_group_ssts,
get_member_table_ids, try_get_compaction_group_id_by_table_id, TableGroupInfo,
build_version_delta_after_version, get_compaction_group_ids, get_member_table_ids,
try_get_compaction_group_id_by_table_id, TableGroupInfo,
};
use risingwave_hummock_sdk::compaction_group::{StateTableId, StaticCompactionGroupId};
use risingwave_hummock_sdk::CompactionGroupId;
Expand All @@ -33,7 +33,7 @@ use risingwave_pb::hummock::rise_ctl_update_compaction_config_request::mutable_c
use risingwave_pb::hummock::subscribe_compaction_event_request::ReportTask;
use risingwave_pb::hummock::{
compact_task, CompactionConfig, CompactionGroupInfo, CompatibilityVersion, GroupConstruct,
GroupDelta, GroupDestroy, GroupMetaChange, GroupTableChange,
GroupDelta, GroupDestroy, GroupMetaChange,
};
use thiserror_ext::AsReport;
use tokio::sync::{OnceCell, RwLock};
Expand All @@ -43,9 +43,7 @@ use crate::hummock::compaction::compaction_config::{
validate_compaction_config, CompactionConfigBuilder,
};
use crate::hummock::error::{Error, Result};
use crate::hummock::manager::{
commit_multi_var, create_trx_wrapper, drop_sst, read_lock, HummockManager,
};
use crate::hummock::manager::{commit_multi_var, create_trx_wrapper, read_lock, HummockManager};
use crate::hummock::metrics_utils::remove_compaction_group_in_sst_stat;
use crate::hummock::model::CompactionGroup;
use crate::hummock::sequence::{next_compaction_group_id, next_sstable_object_id};
Expand Down Expand Up @@ -304,12 +302,6 @@ impl HummockManager {
.push(TableId::new(*table_id));
}

// Remove empty group, GC SSTs and remove metric.
let mut branched_ssts = create_trx_wrapper!(
self.meta_store_ref(),
BTreeMapTransactionWrapper,
BTreeMapTransaction::new(&mut versioning.branched_ssts)
);
let groups_to_remove = modified_groups
.into_iter()
.filter_map(|(group_id, member_count)| {
Expand All @@ -321,12 +313,6 @@ impl HummockManager {
})
.collect_vec();
for group_id in &groups_to_remove {
// We don't bother to add IntraLevelDelta to remove SSTs from group, because the entire
// group is to be removed.
// However, we need to take care of SST GC for the removed group.
for (object_id, sst_id) in get_compaction_group_ssts(current_version, *group_id) {
drop_sst(&mut branched_ssts, *group_id, object_id, sst_id);
}
let group_deltas = &mut new_version_delta
.group_deltas
.entry(*group_id)
Expand All @@ -350,7 +336,6 @@ impl HummockManager {
remove_compaction_group_in_sst_stat(&self.metrics, *group_id, max_level);
}
versioning.current_version = current_version;
branched_ssts.commit_memory();

self.notify_last_version_delta(versioning);

Expand Down Expand Up @@ -434,7 +419,6 @@ impl HummockManager {
.move_state_table_to_compaction_group(
parent_group_id,
table_ids,
None,
self.env.opts.partition_vnode_count,
)
.await?;
Expand All @@ -453,7 +437,6 @@ impl HummockManager {
&self,
parent_group_id: CompactionGroupId,
table_ids: &[StateTableId],
target_group_id: Option<CompactionGroupId>,
partition_vnode_count: u32,
) -> Result<(CompactionGroupId, BTreeMap<StateTableId, u32>)> {
let mut table_to_partition = BTreeMap::default();
Expand Down Expand Up @@ -485,18 +468,6 @@ impl HummockManager {
parent_group_id
)));
}
if let Some(compaction_group_id) = target_group_id {
if !versioning.check_branched_sst_in_target_group(
&table_ids,
&parent_group_id,
&compaction_group_id,
) {
return Err(Error::CompactionGroup(format!(
"invalid split attempt for group {}: we shall wait some time for parent group and target group could compact stale sst files",
parent_group_id
)));
}
}
let mut new_version_delta = create_trx_wrapper!(
self.meta_store_ref(),
BTreeMapEntryTransactionWrapper,
Expand All @@ -514,44 +485,8 @@ impl HummockManager {
),
)
.await?;
let mut new_group = None;
let target_compaction_group_id = match target_group_id {
Some(compaction_group_id) => {
match current_version.levels.get(&compaction_group_id) {
Some(group) => {
for table_id in &table_ids {
if group.member_table_ids.contains(table_id) {
return Err(Error::CompactionGroup(format!(
"table {} already exist in group {}",
*table_id, compaction_group_id,
)));
}
}
}
None => {
return Err(Error::CompactionGroup(format!(
"target group {} does not exist",
compaction_group_id,
)));
}
}
let group_deltas = &mut new_version_delta
.group_deltas
.entry(compaction_group_id)
.or_default()
.group_deltas;
group_deltas.push(GroupDelta {
delta_type: Some(DeltaType::GroupTableChange(GroupTableChange {
table_ids: table_ids.to_vec(),
origin_group_id: parent_group_id,
target_group_id: compaction_group_id,
new_sst_start_id,
version: CompatibilityVersion::NoTrivialSplit as i32,
})),
});
compaction_group_id
}
None => {
let (new_group, target_compaction_group_id) = {
{
// All NewCompactionGroup pairs are mapped to one new compaction group.
let new_compaction_group_id = next_compaction_group_id(&self.env).await?;
// The new config will be persisted later.
Expand All @@ -578,7 +513,6 @@ impl HummockManager {
},
);

new_group = Some((new_compaction_group_id, config));
new_version_delta.group_deltas.insert(
parent_group_id,
GroupDeltas {
Expand All @@ -590,20 +524,15 @@ impl HummockManager {
}],
},
);
new_compaction_group_id
((new_compaction_group_id, config), new_compaction_group_id)
}
};

let mut current_version = versioning.current_version.clone();
let sst_split_info = current_version.apply_version_delta(&new_version_delta);

// `branched_ssts` only commit in memory, so `TXN` make no difference.
let mut branched_ssts = create_trx_wrapper!(
self.meta_store_ref(),
BTreeMapTransactionWrapper,
BTreeMapTransaction::new(&mut versioning.branched_ssts)
);
if let Some((new_compaction_group_id, config)) = new_group {
let (new_compaction_group_id, config) = new_group;
{
let mut compaction_group_manager = self.compaction_group_manager.write().await;
let insert = create_trx_wrapper!(
self.meta_store_ref(),
Expand All @@ -622,27 +551,13 @@ impl HummockManager {
for table_id in table_ids {
table_to_partition.insert(table_id, partition_vnode_count);
}
} else {
commit_multi_var!(self.meta_store_ref(), new_version_delta)?;
}
versioning.current_version = current_version;
// Updates SST split info
let mut changed_sst_ids: HashSet<u64> = HashSet::default();
for (object_id, sst_id, parent_old_sst_id, parent_new_sst_id) in sst_split_info {
for (_, _, parent_old_sst_id, _) in sst_split_info {
changed_sst_ids.insert(parent_old_sst_id);
match branched_ssts.get_mut(object_id) {
Some(mut entry) => {
entry.insert(parent_group_id, parent_new_sst_id);
entry.insert(target_compaction_group_id, sst_id);
}
None => {
let mut groups = HashMap::from_iter([(target_compaction_group_id, sst_id)]);
groups.insert(parent_group_id, parent_new_sst_id);
branched_ssts.insert(object_id, groups);
}
}
}
branched_ssts.commit_memory();
self.notify_last_version_delta(versioning);
drop(versioning_guard);
let mut canceled_tasks = vec![];
Expand Down
Loading
Loading