Skip to content

Commit

Permalink
rename deletepart to mutationpart
Browse files Browse the repository at this point in the history
  • Loading branch information
zhyass committed Dec 22, 2022
1 parent fcbdf30 commit 8bcdc4b
Show file tree
Hide file tree
Showing 7 changed files with 30 additions and 20 deletions.
4 changes: 2 additions & 2 deletions src/query/storages/fuse/fuse/src/operations/delete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,9 @@ use common_sql::evaluator::Evaluator;
use common_storages_table_meta::meta::Location;
use common_storages_table_meta::meta::TableSnapshot;

use crate::operations::mutation::DeletionPartInfo;
use crate::operations::mutation::DeletionSource;
use crate::operations::mutation::DeletionTransform;
use crate::operations::mutation::MutationPartInfo;
use crate::operations::mutation::MutationSink;
use crate::pipelines::processors::port::InputPort;
use crate::pipelines::processors::port::OutputPort;
Expand Down Expand Up @@ -186,7 +186,7 @@ impl FuseTable {
index_stats
.into_iter()
.zip(inner_parts.partitions.into_iter())
.map(|((a, b), c)| DeletionPartInfo::create(a, b, c))
.map(|((a, b), c)| MutationPartInfo::create(a, b, c))
.collect(),
);
ctx.try_set_partitions(parts)?;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,10 @@ use opendal::Operator;

use super::deletion_meta::Deletion;
use super::deletion_meta::DeletionSourceMeta;
use super::deletion_part::DeletionPartInfo;
use crate::io::write_data;
use crate::io::BlockReader;
use crate::io::TableMetaLocationGenerator;
use crate::operations::mutation::MutationPartInfo;
use crate::operations::util;
use crate::operations::BloomIndexState;
use crate::pipelines::processors::port::OutputPort;
Expand Down Expand Up @@ -294,7 +294,7 @@ impl Processor for DeletionSource {
async fn async_process(&mut self) -> Result<()> {
match std::mem::replace(&mut self.state, State::Finish) {
State::ReadData(Some(part)) => {
let deletion_part = DeletionPartInfo::from_part(&part)?;
let deletion_part = MutationPartInfo::from_part(&part)?;
self.index = deletion_part.index;
self.origin_stats = deletion_part.cluster_stats.clone();
let part = deletion_part.inner_part.clone();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,9 @@
// limitations under the License.

mod deletion_meta;
mod deletion_part;
mod deletion_source;
mod deletion_transform;

pub use deletion_meta::Deletion;
pub use deletion_part::DeletionPartInfo;
pub use deletion_source::DeletionSource;
pub use deletion_transform::DeletionTransform;
3 changes: 2 additions & 1 deletion src/query/storages/fuse/fuse/src/operations/mutation/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ pub mod base_mutator;
mod compact;
mod deletion;
pub mod mutation_meta;
mod mutation_part;
pub mod mutation_sink;
pub mod recluster_mutator;
mod update;
Expand All @@ -30,9 +31,9 @@ pub use compact::MergeSegmentsTransform;
pub use compact::SegmentCompactMutator;
pub use compact::SegmentCompactionState;
pub use compact::SegmentCompactor;
pub use deletion::DeletionPartInfo;
pub use deletion::DeletionSource;
pub use deletion::DeletionTransform;
pub use mutation_meta::MutationMeta;
pub use mutation_part::MutationPartInfo;
pub use mutation_sink::MutationSink;
pub use recluster_mutator::ReclusterMutator;
Original file line number Diff line number Diff line change
Expand Up @@ -24,20 +24,20 @@ use common_storages_table_meta::meta::ClusterStatistics;
use crate::pruning::BlockIndex;

#[derive(serde::Serialize, serde::Deserialize, PartialEq)]
pub struct DeletionPartInfo {
pub struct MutationPartInfo {
pub index: BlockIndex,
pub cluster_stats: Option<ClusterStatistics>,
pub inner_part: PartInfoPtr,
}

#[typetag::serde(name = "deletion")]
impl PartInfo for DeletionPartInfo {
#[typetag::serde(name = "mutation")]
impl PartInfo for MutationPartInfo {
fn as_any(&self) -> &dyn Any {
self
}

fn equals(&self, info: &Box<dyn PartInfo>) -> bool {
match info.as_any().downcast_ref::<DeletionPartInfo>() {
match info.as_any().downcast_ref::<MutationPartInfo>() {
None => false,
Some(other) => self == other,
}
Expand All @@ -48,24 +48,24 @@ impl PartInfo for DeletionPartInfo {
}
}

impl DeletionPartInfo {
impl MutationPartInfo {
pub fn create(
index: BlockIndex,
cluster_stats: Option<ClusterStatistics>,
inner_part: PartInfoPtr,
) -> PartInfoPtr {
Arc::new(Box::new(DeletionPartInfo {
Arc::new(Box::new(MutationPartInfo {
index,
cluster_stats,
inner_part,
}))
}

pub fn from_part(info: &PartInfoPtr) -> Result<&DeletionPartInfo> {
match info.as_any().downcast_ref::<DeletionPartInfo>() {
pub fn from_part(info: &PartInfoPtr) -> Result<&MutationPartInfo> {
match info.as_any().downcast_ref::<MutationPartInfo>() {
Some(part_ref) => Ok(part_ref),
None => Err(ErrorCode::Internal(
"Cannot downcast from PartInfo to DeletionPartInfo.",
"Cannot downcast from PartInfo to MutationPartInfo.",
)),
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,4 @@
// See the License for the specific language governing permissions and
// limitations under the License.

mod update_source;
mod update_source;
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,13 @@
// See the License for the specific language governing permissions and
// limitations under the License.


use std::any::Any;
use std::ops::Not;
use std::sync::Arc;

use common_catalog::plan::PartInfoPtr;
use common_catalog::table_context::TableContext;
use common_datablocks::serialize_data_blocks;
use common_datablocks::serialize_to_parquet;
use common_datablocks::DataBlock;
use common_datavalues::BooleanColumn;
use common_datavalues::ColumnRef;
Expand All @@ -35,6 +34,7 @@ use opendal::Operator;
use crate::io::write_data;
use crate::io::BlockReader;
use crate::io::TableMetaLocationGenerator;
use crate::operations::mutation::MutationPartInfo;
use crate::operations::util;
use crate::operations::BloomIndexState;
use crate::pipelines::processors::port::OutputPort;
Expand All @@ -57,8 +57,11 @@ pub struct UpdateSource {
state: State,
ctx: Arc<dyn TableContext>,
output: Arc<OutputPort>,
}

index: BlockIndex,
cluster_stats_gen: ClusterStatsGenerator,
origin_stats: Option<ClusterStatistics>,
}

#[async_trait::async_trait]
impl Processor for UpdateSource {
Expand Down Expand Up @@ -117,6 +120,14 @@ impl Processor for UpdateSource {

async fn async_process(&mut self) -> Result<()> {
match std::mem::replace(&mut self.state, State::Finish) {
State::ReadData(Some(part)) => {
let part = MutationPartInfo::from_part(&part)?;
self.index = part.index;
self.origin_stats = part.cluster_stats.clone();
let inner_part = part.inner_part.clone();
// let chunks = self.block_reader.read_columns_data(inner_part.clone()).await?;
// self.state = State::FilterData(inner_part, chunks);
}
_ => return Err(ErrorCode::Internal("It's a bug.")),
}
Ok(())
Expand Down

0 comments on commit 8bcdc4b

Please sign in to comment.