diff --git a/src/query/pipeline/core/src/processors/processor.rs b/src/query/pipeline/core/src/processors/processor.rs
index 757a239be91a9..41f42ce1d00be 100644
--- a/src/query/pipeline/core/src/processors/processor.rs
+++ b/src/query/pipeline/core/src/processors/processor.rs
@@ -22,7 +22,6 @@ use common_exception::ErrorCode;
 use common_exception::Result;
 use common_expression::ColumnId;
 use common_expression::Expr;
-use common_expression::RemoteExpr;
 use futures::future::BoxFuture;
 use futures::FutureExt;
 use minitrace::prelude::*;
@@ -106,7 +105,7 @@ pub trait Processor: Send {
     fn add_runtime_filters(&mut self, _filters: HashMap<ColumnId, Expr>) -> Result<()> {
         Err(ErrorCode::Unimplemented(format!(
             "{} can't add runtime filters",
-            self.name
+            self.name()
         )))
     }
 }
diff --git a/src/query/pipeline/sources/src/sync_source.rs b/src/query/pipeline/sources/src/sync_source.rs
index 630193eac3b03..8c4db1597dacf 100644
--- a/src/query/pipeline/sources/src/sync_source.rs
+++ b/src/query/pipeline/sources/src/sync_source.rs
@@ -117,6 +117,10 @@ impl<T: 'static + SyncSource> Processor for SyncSourcer<T> {
         match self.inner.generate()? {
             None => self.is_finish = true,
             Some(data_block) => {
+                if data_block.is_empty() && data_block.get_meta().is_none() {
+                    // A part was pruned by runtime filter
+                    return Ok(());
+                }
                 let progress_values = ProgressValues {
                     rows: data_block.num_rows(),
                     bytes: data_block.memory_size(),
diff --git a/src/query/service/src/pipelines/processors/transforms/hash_join/hash_join_state.rs b/src/query/service/src/pipelines/processors/transforms/hash_join/hash_join_state.rs
index 08ca9c9ea75d9..7de4fd12aacdd 100644
--- a/src/query/service/src/pipelines/processors/transforms/hash_join/hash_join_state.rs
+++ b/src/query/service/src/pipelines/processors/transforms/hash_join/hash_join_state.rs
@@ -51,6 +51,7 @@ use parking_lot::RwLock;
 use crate::pipelines::processors::transforms::hash_join::build_state::BuildState;
 use crate::pipelines::processors::transforms::hash_join::row::RowSpace;
 use crate::pipelines::processors::transforms::hash_join::util::build_schema_wrap_nullable;
+use crate::pipelines::processors::transforms::hash_join::util::inlist_filter;
 use crate::pipelines::processors::HashJoinDesc;
 use crate::sessions::QueryContext;
 
@@ -266,81 +267,17 @@ impl HashJoinState {
             .iter()
             .zip(self.hash_join_desc.probe_keys.iter())
         {
-            // Only support key is a column
-            if let Expr::ColumnRef {
-                span,
-                id,
-                data_type,
-                display_name,
-            } = probe_key
-            {
-                let column_id: usize = self.hash_join_desc.probe_schema.fields[*id]
-                    .name()
-                    .parse()
-                    .unwrap();
-                let raw_probe_key = RawExpr::ColumnRef {
-                    span: span.clone(),
-                    id: column_id,
-                    data_type: data_type.clone(),
-                    display_name: display_name.clone(),
-                };
-                let mut columns = Vec::with_capacity(data_blocks.len());
-                for block in data_blocks.iter() {
-                    if block.num_columns() == 0 {
-                        continue;
-                    }
-                    let evaluator = Evaluator::new(block, &func_ctx, &BUILTIN_FUNCTIONS);
-                    let column = evaluator
-                        .run(build_key)?
-                        .convert_to_full_column(build_key.data_type(), block.num_rows());
-                    columns.push(column);
-                }
-                // Generate inlist using build column
-                let build_key_column = Column::concat_columns(columns.into_iter())?;
-                let mut list = Vec::with_capacity(build_key_column.len());
-                for value in build_key_column.iter() {
-                    list.push(RawExpr::Constant {
-                        span: None,
-                        scalar: value.to_owned(),
-                    })
-                }
-                let array = RawExpr::FunctionCall {
-                    span: None,
-                    name: "array".to_string(),
-                    params: vec![],
-                    args: list,
-                };
-                let distinct_list = RawExpr::FunctionCall {
-                    span: None,
-                    name: "array_distinct".to_string(),
-                    params: vec![],
-                    args: vec![array],
-                };
-
-                let args = vec![distinct_list, raw_probe_key];
-                // Make contain function
-                let contain_func = RawExpr::FunctionCall {
-                    span: None,
-                    name: "contains".to_string(),
-                    params: vec![],
-                    args,
-                };
-                runtime_filters.insert(
-                    *id as ColumnId,
-                    contain_func
-                        .type_check(self.hash_join_desc.probe_schema.as_ref())?
-                        .project_column_ref(|index| {
-                            self.hash_join_desc
-                                .probe_schema
-                                .index_of(&index.to_string())
-                                .unwrap()
-                        }),
-                );
+            if let Some(filter) = inlist_filter(
+                &func_ctx,
+                &self.hash_join_desc.probe_schema,
+                build_key,
+                probe_key,
+                data_blocks,
+            )? {
+                runtime_filters.insert(filter.0, filter.1);
             }
         }
-
         data_blocks.clear();
-
         Ok(())
     }
 }
diff --git a/src/query/service/src/pipelines/processors/transforms/hash_join/util.rs b/src/query/service/src/pipelines/processors/transforms/hash_join/util.rs
index a6aabf31c0b01..f793496441368 100644
--- a/src/query/service/src/pipelines/processors/transforms/hash_join/util.rs
+++ b/src/query/service/src/pipelines/processors/transforms/hash_join/util.rs
@@ -12,9 +12,19 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
+use common_exception::Result;
+use common_expression::Column;
+use common_expression::ColumnId;
+use common_expression::DataBlock;
 use common_expression::DataField;
 use common_expression::DataSchemaRef;
 use common_expression::DataSchemaRefExt;
+use common_expression::Evaluator;
+use common_expression::Expr;
+use common_expression::FunctionContext;
+use common_expression::RawExpr;
+use common_functions::BUILTIN_FUNCTIONS;
+use common_sql::TypeCheck;
 
 pub(crate) fn build_schema_wrap_nullable(build_schema: &DataSchemaRef) -> DataSchemaRef {
     let mut nullable_field = Vec::with_capacity(build_schema.fields().len());
@@ -37,3 +47,78 @@ pub(crate) fn probe_schema_wrap_nullable(probe_schema: &DataSchemaRef) -> DataSc
     }
     DataSchemaRefExt::create(nullable_field)
 }
+
+// Construct inlist runtime filter
+pub(crate) fn inlist_filter(
+    func_ctx: &FunctionContext,
+    probe_schema: &DataSchemaRef,
+    build_key: &Expr,
+    probe_key: &Expr,
+    build_blocks: &[DataBlock],
+) -> Result<Option<(ColumnId, Expr)>> {
+    // Currently, only support key is a column, will support more later.
+    // Such as t1.a + 1 = t2.a, or t1.a + t1.b = t2.a (left side is probe side)
+    if let Expr::ColumnRef {
+        span,
+        id,
+        data_type,
+        display_name,
+    } = probe_key
+    {
+        let column_id: usize = probe_schema.fields[*id].name().parse().unwrap();
+        let raw_probe_key = RawExpr::ColumnRef {
+            span: span.clone(),
+            id: column_id,
+            data_type: data_type.clone(),
+            display_name: display_name.clone(),
+        };
+        let mut columns = Vec::with_capacity(build_blocks.len());
+        for block in build_blocks.iter() {
+            if block.num_columns() == 0 {
+                continue;
+            }
+            let evaluator = Evaluator::new(block, &func_ctx, &BUILTIN_FUNCTIONS);
+            let column = evaluator
+                .run(build_key)?
+                .convert_to_full_column(build_key.data_type(), block.num_rows());
+            columns.push(column);
+        }
+        // Generate inlist using build column
+        let build_key_column = Column::concat_columns(columns.into_iter())?;
+        let mut list = Vec::with_capacity(build_key_column.len());
+        for value in build_key_column.iter() {
+            list.push(RawExpr::Constant {
+                span: None,
+                scalar: value.to_owned(),
+            })
+        }
+        let array = RawExpr::FunctionCall {
+            span: None,
+            name: "array".to_string(),
+            params: vec![],
+            args: list,
+        };
+        let distinct_list = RawExpr::FunctionCall {
+            span: None,
+            name: "array_distinct".to_string(),
+            params: vec![],
+            args: vec![array],
+        };
+
+        let args = vec![distinct_list, raw_probe_key];
+        // Make contain function
+        let contain_func = RawExpr::FunctionCall {
+            span: None,
+            name: "contains".to_string(),
+            params: vec![],
+            args,
+        };
+        return Ok(Some((
+            *id as ColumnId,
+            contain_func
+                .type_check(probe_schema.as_ref())?
+                .project_column_ref(|index| probe_schema.index_of(&index.to_string()).unwrap()),
+        )));
+    }
+    Ok(None)
+}
diff --git a/src/query/storages/fuse/src/operations/read/parquet_data_source_reader.rs b/src/query/storages/fuse/src/operations/read/parquet_data_source_reader.rs
index a8138386c895c..21f982bf3d279 100644
--- a/src/query/storages/fuse/src/operations/read/parquet_data_source_reader.rs
+++ b/src/query/storages/fuse/src/operations/read/parquet_data_source_reader.rs
@@ -31,6 +31,7 @@ use common_pipeline_core::processors::Processor;
 use common_pipeline_core::processors::ProcessorPtr;
 use common_pipeline_sources::SyncSource;
 use common_pipeline_sources::SyncSourcer;
+use log::info;
 
 use super::parquet_data_source::DataSource;
 use crate::fuse_part::FusePartInfo;
@@ -114,7 +115,7 @@ impl SyncSource for ReadParquetDataSource<true> {
                     &self.runtime_filters,
                     &self.partitions.ctx.get_function_context()?,
                 )? {
-                    return Ok(None);
+                    return Ok(Some(DataBlock::empty()));
                 }
 
                 if let Some(index_reader) = self.index_reader.as_ref() {
diff --git a/src/query/storages/fuse/src/operations/read/runtime_filter_prunner.rs b/src/query/storages/fuse/src/operations/read/runtime_filter_prunner.rs
index 8a271bfe7cb36..222a3d9a69a4b 100644
--- a/src/query/storages/fuse/src/operations/read/runtime_filter_prunner.rs
+++ b/src/query/storages/fuse/src/operations/read/runtime_filter_prunner.rs
@@ -22,6 +22,7 @@ use common_expression::Expr;
 use common_expression::FunctionContext;
 use common_expression::Scalar;
 use common_functions::BUILTIN_FUNCTIONS;
+use log::info;
 use storages_common_index::statistics_to_domain;
 
 use crate::FusePartInfo;
@@ -36,7 +37,7 @@ pub fn runtime_filter_pruner(
     }
 
     let part = FusePartInfo::from_part(part)?;
-    Ok(filters.iter().any(|(id, filter)| {
+    let pruned = filters.iter().any(|(id, filter)| {
         let column_refs = filter.column_refs();
         // Currently only support filter with one column(probe key).
         debug_assert!(column_refs.len() == 1);
@@ -54,15 +55,22 @@ pub fn runtime_filter_pruner(
                     func_ctx,
                     &BUILTIN_FUNCTIONS,
                 );
-                matches!(new_expr, Expr::Constant {
+                return matches!(new_expr, Expr::Constant {
                     scalar: Scalar::Boolean(false),
                     ..
-                })
-            } else {
-                false
+                });
             }
-        } else {
-            false
         }
-    }))
+        false
+    });
+
+    if pruned {
+        info!(
+            "Pruned partition with {:?} rows by runtime filter",
+            part.nums_rows
+        );
+        return Ok(true);
+    }
+
+    Ok(false)
 }