diff --git a/src/query/storages/fuse/fuse/src/operations/delete.rs b/src/query/storages/fuse/fuse/src/operations/delete.rs index 4c63e440fec16..cfc5340bc803c 100644 --- a/src/query/storages/fuse/fuse/src/operations/delete.rs +++ b/src/query/storages/fuse/fuse/src/operations/delete.rs @@ -32,9 +32,9 @@ use common_sql::evaluator::Evaluator; use common_storages_table_meta::meta::Location; use common_storages_table_meta::meta::TableSnapshot; -use crate::operations::mutation::DeletionPartInfo; use crate::operations::mutation::DeletionSource; use crate::operations::mutation::DeletionTransform; +use crate::operations::mutation::MutationPartInfo; use crate::operations::mutation::MutationSink; use crate::pipelines::processors::port::InputPort; use crate::pipelines::processors::port::OutputPort; @@ -186,7 +186,7 @@ impl FuseTable { index_stats .into_iter() .zip(inner_parts.partitions.into_iter()) - .map(|((a, b), c)| DeletionPartInfo::create(a, b, c)) + .map(|((a, b), c)| MutationPartInfo::create(a, b, c)) .collect(), ); ctx.try_set_partitions(parts)?; diff --git a/src/query/storages/fuse/fuse/src/operations/mutation/deletion/deletion_source.rs b/src/query/storages/fuse/fuse/src/operations/mutation/deletion/deletion_source.rs index 394c74aa52207..297a8a8137954 100644 --- a/src/query/storages/fuse/fuse/src/operations/mutation/deletion/deletion_source.rs +++ b/src/query/storages/fuse/fuse/src/operations/mutation/deletion/deletion_source.rs @@ -33,10 +33,10 @@ use opendal::Operator; use super::deletion_meta::Deletion; use super::deletion_meta::DeletionSourceMeta; -use super::deletion_part::DeletionPartInfo; use crate::io::write_data; use crate::io::BlockReader; use crate::io::TableMetaLocationGenerator; +use crate::operations::mutation::MutationPartInfo; use crate::operations::util; use crate::operations::BloomIndexState; use crate::pipelines::processors::port::OutputPort; @@ -294,7 +294,7 @@ impl Processor for DeletionSource { async fn async_process(&mut self) -> Result<()> { match std::mem::replace(&mut self.state, State::Finish) { State::ReadData(Some(part)) => { - let deletion_part = DeletionPartInfo::from_part(&part)?; + let deletion_part = MutationPartInfo::from_part(&part)?; self.index = deletion_part.index; self.origin_stats = deletion_part.cluster_stats.clone(); let part = deletion_part.inner_part.clone(); diff --git a/src/query/storages/fuse/fuse/src/operations/mutation/deletion/mod.rs b/src/query/storages/fuse/fuse/src/operations/mutation/deletion/mod.rs index c6c06846a3f71..d4efa10b16229 100644 --- a/src/query/storages/fuse/fuse/src/operations/mutation/deletion/mod.rs +++ b/src/query/storages/fuse/fuse/src/operations/mutation/deletion/mod.rs @@ -13,11 +13,9 @@ // limitations under the License. mod deletion_meta; -mod deletion_part; mod deletion_source; mod deletion_transform; pub use deletion_meta::Deletion; -pub use deletion_part::DeletionPartInfo; pub use deletion_source::DeletionSource; pub use deletion_transform::DeletionTransform; diff --git a/src/query/storages/fuse/fuse/src/operations/mutation/mod.rs b/src/query/storages/fuse/fuse/src/operations/mutation/mod.rs index d3e82f3f6743c..8694e0f6e57cd 100644 --- a/src/query/storages/fuse/fuse/src/operations/mutation/mod.rs +++ b/src/query/storages/fuse/fuse/src/operations/mutation/mod.rs @@ -17,6 +17,7 @@ pub mod base_mutator; mod compact; mod deletion; pub mod mutation_meta; +mod mutation_part; pub mod mutation_sink; pub mod recluster_mutator; mod update; @@ -30,9 +31,9 @@ pub use compact::MergeSegmentsTransform; pub use compact::SegmentCompactMutator; pub use compact::SegmentCompactionState; pub use compact::SegmentCompactor; -pub use deletion::DeletionPartInfo; pub use deletion::DeletionSource; pub use deletion::DeletionTransform; pub use mutation_meta::MutationMeta; +pub use mutation_part::MutationPartInfo; pub use mutation_sink::MutationSink; pub use recluster_mutator::ReclusterMutator; diff --git a/src/query/storages/fuse/fuse/src/operations/mutation/deletion/deletion_part.rs b/src/query/storages/fuse/fuse/src/operations/mutation/mutation_part.rs similarity index 79% rename from src/query/storages/fuse/fuse/src/operations/mutation/deletion/deletion_part.rs rename to src/query/storages/fuse/fuse/src/operations/mutation/mutation_part.rs index bb2fda70d67db..2dad0c5b12211 100644 --- a/src/query/storages/fuse/fuse/src/operations/mutation/deletion/deletion_part.rs +++ b/src/query/storages/fuse/fuse/src/operations/mutation/mutation_part.rs @@ -24,20 +24,20 @@ use common_storages_table_meta::meta::ClusterStatistics; use crate::pruning::BlockIndex; #[derive(serde::Serialize, serde::Deserialize, PartialEq)] -pub struct DeletionPartInfo { +pub struct MutationPartInfo { pub index: BlockIndex, pub cluster_stats: Option, pub inner_part: PartInfoPtr, } -#[typetag::serde(name = "deletion")] -impl PartInfo for DeletionPartInfo { +#[typetag::serde(name = "mutation")] +impl PartInfo for MutationPartInfo { fn as_any(&self) -> &dyn Any { self } fn equals(&self, info: &Box) -> bool { - match info.as_any().downcast_ref::() { + match info.as_any().downcast_ref::() { None => false, Some(other) => self == other, } @@ -48,24 +48,24 @@ impl PartInfo for DeletionPartInfo { } } -impl DeletionPartInfo { +impl MutationPartInfo { pub fn create( index: BlockIndex, cluster_stats: Option, inner_part: PartInfoPtr, ) -> PartInfoPtr { - Arc::new(Box::new(DeletionPartInfo { + Arc::new(Box::new(MutationPartInfo { index, cluster_stats, inner_part, })) } - pub fn from_part(info: &PartInfoPtr) -> Result<&DeletionPartInfo> { - match info.as_any().downcast_ref::() { + pub fn from_part(info: &PartInfoPtr) -> Result<&MutationPartInfo> { + match info.as_any().downcast_ref::() { Some(part_ref) => Ok(part_ref), None => Err(ErrorCode::Internal( - "Cannot downcast from PartInfo to DeletionPartInfo.", + "Cannot downcast from PartInfo to MutationPartInfo.", )), } } diff --git a/src/query/storages/fuse/fuse/src/operations/mutation/update/mod.rs b/src/query/storages/fuse/fuse/src/operations/mutation/update/mod.rs index 373fe6fbb6365..25ddbfd26af47 100644 --- a/src/query/storages/fuse/fuse/src/operations/mutation/update/mod.rs +++ b/src/query/storages/fuse/fuse/src/operations/mutation/update/mod.rs @@ -12,4 +12,4 @@ // See the License for the specific language governing permissions and // limitations under the License. -mod update_source; \ No newline at end of file +mod update_source; diff --git a/src/query/storages/fuse/fuse/src/operations/mutation/update/update_source.rs b/src/query/storages/fuse/fuse/src/operations/mutation/update/update_source.rs index 43ca564b3ba27..1255e61ab03e4 100644 --- a/src/query/storages/fuse/fuse/src/operations/mutation/update/update_source.rs +++ b/src/query/storages/fuse/fuse/src/operations/mutation/update/update_source.rs @@ -12,14 +12,13 @@ // See the License for the specific language governing permissions and // limitations under the License. - use std::any::Any; use std::ops::Not; use std::sync::Arc; use common_catalog::plan::PartInfoPtr; use common_catalog::table_context::TableContext; -use common_datablocks::serialize_data_blocks; +use common_datablocks::serialize_to_parquet; use common_datablocks::DataBlock; use common_datavalues::BooleanColumn; use common_datavalues::ColumnRef; @@ -35,6 +34,7 @@ use opendal::Operator; use crate::io::write_data; use crate::io::BlockReader; use crate::io::TableMetaLocationGenerator; +use crate::operations::mutation::MutationPartInfo; use crate::operations::util; use crate::operations::BloomIndexState; use crate::pipelines::processors::port::OutputPort; @@ -57,8 +57,11 @@ pub struct UpdateSource { state: State, ctx: Arc, output: Arc, -} + index: BlockIndex, + cluster_stats_gen: ClusterStatsGenerator, + origin_stats: Option, +} #[async_trait::async_trait] impl Processor for UpdateSource { @@ -117,6 +120,14 @@ impl Processor for UpdateSource { async fn async_process(&mut self) -> Result<()> { match std::mem::replace(&mut self.state, State::Finish) { + State::ReadData(Some(part)) => { + let part = MutationPartInfo::from_part(&part)?; + self.index = part.index; + self.origin_stats = part.cluster_stats.clone(); + let inner_part = part.inner_part.clone(); + // let chunks = self.block_reader.read_columns_data(inner_part.clone()).await?; + // self.state = State::FilterData(inner_part, chunks); + } _ => return Err(ErrorCode::Internal("It's a bug.")), } Ok(())