Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: replace all ProstXxx with PbXxx #8621

Merged
merged 2 commits into from
Mar 17, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions src/batch/benches/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use risingwave_common::types::{DataType, ScalarImpl};
use risingwave_common::util::value_encoding::serialize_datum;
use risingwave_expr::expr::build_from_prost;
use risingwave_pb::data::data_type::TypeName;
use risingwave_pb::data::Datum as ProstDatum;
use risingwave_pb::data::PbDatum;
use risingwave_pb::expr::expr_node::RexNode;
use risingwave_pb::expr::expr_node::Type::{
ConstantValue as TConstValue, Equal, InputRef, Modulus,
Expand Down Expand Up @@ -53,7 +53,7 @@ fn create_filter_executor(chunk_size: usize, chunk_num: usize) -> BoxedExecutor
type_name: TypeName::Int64 as i32,
..Default::default()
}),
rex_node: Some(RexNode::Constant(ProstDatum {
rex_node: Some(RexNode::Constant(PbDatum {
body: serialize_datum(Some(ScalarImpl::Int64(2)).as_ref()),
})),
};
Expand All @@ -76,7 +76,7 @@ fn create_filter_executor(chunk_size: usize, chunk_num: usize) -> BoxedExecutor
type_name: TypeName::Int64 as i32,
..Default::default()
}),
rex_node: Some(RexNode::Constant(ProstDatum {
rex_node: Some(RexNode::Constant(PbDatum {
body: serialize_datum(Some(ScalarImpl::Int64(0)).as_ref()),
})),
};
Expand Down
8 changes: 4 additions & 4 deletions src/batch/benches/hash_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use risingwave_common::util::value_encoding::serialize_datum;
use risingwave_common::{enable_jemalloc_on_linux, hash};
use risingwave_expr::expr::build_from_prost;
use risingwave_pb::data::data_type::TypeName;
use risingwave_pb::data::Datum as ProstDatum;
use risingwave_pb::data::PbDatum;
use risingwave_pb::expr::expr_node::RexNode;
use risingwave_pb::expr::expr_node::Type::{
ConstantValue as TConstValue, GreaterThan, InputRef, Modulus,
Expand Down Expand Up @@ -58,7 +58,7 @@ fn create_hash_join_executor(
type_name: TypeName::Int64 as i32,
..Default::default()
}),
rex_node: Some(RexNode::Constant(ProstDatum {
rex_node: Some(RexNode::Constant(PbDatum {
body: serialize_datum(Some(ScalarImpl::Int64(123)).as_ref()),
})),
};
Expand Down Expand Up @@ -88,7 +88,7 @@ fn create_hash_join_executor(
type_name: TypeName::Int64 as i32,
..Default::default()
}),
rex_node: Some(RexNode::Constant(ProstDatum {
rex_node: Some(RexNode::Constant(PbDatum {
body: serialize_datum(Some(ScalarImpl::Int64(456)).as_ref()),
})),
};
Expand Down Expand Up @@ -141,7 +141,7 @@ fn create_hash_join_executor(
type_name: TypeName::Int64 as i32,
..Default::default()
}),
rex_node: Some(RexNode::Constant(ProstDatum {
rex_node: Some(RexNode::Constant(PbDatum {
body: serialize_datum(Some(ScalarImpl::Int64(100)).as_ref()),
})),
};
Expand Down
6 changes: 3 additions & 3 deletions src/batch/benches/nested_loop_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use risingwave_common::types::{DataType, ScalarImpl};
use risingwave_common::util::value_encoding::serialize_datum;
use risingwave_expr::expr::build_from_prost;
use risingwave_pb::data::data_type::TypeName;
use risingwave_pb::data::Datum as ProstDatum;
use risingwave_pb::data::PbDatum;
use risingwave_pb::expr::expr_node::RexNode;
use risingwave_pb::expr::expr_node::Type::{
ConstantValue as TConstValue, Equal, InputRef, Modulus,
Expand Down Expand Up @@ -68,7 +68,7 @@ fn create_nested_loop_join_executor(
type_name: TypeName::Int64 as i32,
..Default::default()
}),
rex_node: Some(RexNode::Constant(ProstDatum {
rex_node: Some(RexNode::Constant(PbDatum {
body: serialize_datum(Some(ScalarImpl::Int64(2)).as_ref()),
})),
};
Expand All @@ -79,7 +79,7 @@ fn create_nested_loop_join_executor(
type_name: TypeName::Int64 as i32,
..Default::default()
}),
rex_node: Some(RexNode::Constant(ProstDatum {
rex_node: Some(RexNode::Constant(PbDatum {
body: serialize_datum(Some(ScalarImpl::Int64(3)).as_ref()),
})),
};
Expand Down
4 changes: 2 additions & 2 deletions src/batch/src/executor/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ mod tests {
use risingwave_common::util::value_encoding::serialize_datum;
use risingwave_expr::expr::build_from_prost;
use risingwave_pb::data::data_type::TypeName;
use risingwave_pb::data::Datum as ProstDatum;
use risingwave_pb::data::PbDatum;
use risingwave_pb::expr::expr_node::Type::InputRef;
use risingwave_pb::expr::expr_node::{RexNode, Type};
use risingwave_pb::expr::{ExprNode, FunctionCall};
Expand Down Expand Up @@ -242,7 +242,7 @@ mod tests {
}],
..Default::default()
}),
rex_node: Some(RexNode::Constant(ProstDatum {
rex_node: Some(RexNode::Constant(PbDatum {
body: serialize_datum(
Some(ScalarImpl::List(ListValue::new(vec![Some(
2.to_scalar_value(),
Expand Down
14 changes: 7 additions & 7 deletions src/batch/src/executor/generic_exchange.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use risingwave_common::error::{Result, RwError};
use risingwave_common::util::iter_util::ZipEqFast;
use risingwave_common::util::select_all;
use risingwave_pb::batch_plan::plan_node::NodeBody;
use risingwave_pb::batch_plan::ExchangeSource as ProstExchangeSource;
use risingwave_pb::batch_plan::PbExchangeSource;
use risingwave_pb::plan_common::Field as NodeField;
use risingwave_rpc_client::ComputeClientPoolRef;

Expand All @@ -36,7 +36,7 @@ use super::BatchTaskMetricsWithTaskLabels;
use crate::executor::{BoxedDataChunkStream, BoxedExecutor, BoxedExecutorBuilder, Executor};

pub struct GenericExchangeExecutor<CS, C> {
proto_sources: Vec<ProstExchangeSource>,
proto_sources: Vec<PbExchangeSource>,
/// Mock-able CreateSource.
source_creators: Vec<CS>,
context: C,
Expand All @@ -56,7 +56,7 @@ pub trait CreateSource: Send {
async fn create_source(
&self,
context: impl BatchTaskContext,
prost_source: &ProstExchangeSource,
prost_source: &PbExchangeSource,
) -> Result<ExchangeSourceImpl>;
}

Expand All @@ -76,7 +76,7 @@ impl CreateSource for DefaultCreateSource {
async fn create_source(
&self,
context: impl BatchTaskContext,
prost_source: &ProstExchangeSource,
prost_source: &PbExchangeSource,
) -> Result<ExchangeSourceImpl> {
let peer_addr = prost_source.get_host()?.into();
let task_output_id = prost_source.get_task_output_id()?;
Expand Down Expand Up @@ -127,7 +127,7 @@ impl BoxedExecutorBuilder for GenericExchangeExecutorBuilder {
)?;

ensure!(!node.get_sources().is_empty());
let proto_sources: Vec<ProstExchangeSource> = node.get_sources().to_vec();
let proto_sources: Vec<PbExchangeSource> = node.get_sources().to_vec();
let source_creators =
vec![DefaultCreateSource::new(source.context().client_pool()); proto_sources.len()];

Expand Down Expand Up @@ -189,7 +189,7 @@ impl<CS: 'static + Send + CreateSource, C: BatchTaskContext> GenericExchangeExec

#[try_stream(boxed, ok = DataChunk, error = RwError)]
async fn data_chunk_stream(
prost_source: ProstExchangeSource,
prost_source: PbExchangeSource,
source_creator: CS,
context: C,
metrics: Option<BatchTaskMetricsWithTaskLabels>,
Expand Down Expand Up @@ -264,7 +264,7 @@ mod tests {
let chunks = vec![Some(chunk); 100];
let fake_exchange_source = FakeExchangeSource::new(chunks);
let fake_create_source = FakeCreateSource::new(fake_exchange_source);
proto_sources.push(ProstExchangeSource::default());
proto_sources.push(PbExchangeSource::default());
source_creators.push(fake_create_source);
}

Expand Down
8 changes: 4 additions & 4 deletions src/batch/src/executor/hash_agg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,7 @@ mod tests {
use risingwave_common::catalog::{Field, Schema};
use risingwave_common::test_prelude::DataChunkTestExt;
use risingwave_pb::data::data_type::TypeName;
use risingwave_pb::data::DataType as ProstDataType;
use risingwave_pb::data::PbDataType;
use risingwave_pb::expr::agg_call::Type;
use risingwave_pb::expr::{AggCall, InputRef};

Expand Down Expand Up @@ -309,12 +309,12 @@ mod tests {
r#type: Type::Sum as i32,
args: vec![InputRef {
index: 2,
r#type: Some(ProstDataType {
r#type: Some(PbDataType {
type_name: TypeName::Int32 as i32,
..Default::default()
}),
}],
return_type: Some(ProstDataType {
return_type: Some(PbDataType {
type_name: TypeName::Int64 as i32,
..Default::default()
}),
Expand Down Expand Up @@ -382,7 +382,7 @@ mod tests {
let agg_call = AggCall {
r#type: Type::Count as i32,
args: vec![],
return_type: Some(ProstDataType {
return_type: Some(PbDataType {
type_name: TypeName::Int64 as i32,
..Default::default()
}),
Expand Down
12 changes: 6 additions & 6 deletions src/batch/src/executor/join/local_lookup_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@ use risingwave_pb::batch_plan::exchange_info::DistributionMode;
use risingwave_pb::batch_plan::exchange_source::LocalExecutePlan::Plan;
use risingwave_pb::batch_plan::plan_node::NodeBody;
use risingwave_pb::batch_plan::{
ExchangeInfo, ExchangeNode, ExchangeSource as ProstExchangeSource, LocalExecutePlan,
PlanFragment, PlanNode, RowSeqScanNode, TaskId as ProstTaskId, TaskOutputId,
ExchangeInfo, ExchangeNode, PbExchangeSource, LocalExecutePlan,
PlanFragment, PlanNode, RowSeqScanNode, PbTaskId, TaskOutputId,
};
use risingwave_pb::common::{BatchQueryEpoch, WorkerNode};
use risingwave_pb::plan_common::StorageTableDesc;
Expand Down Expand Up @@ -113,8 +113,8 @@ impl<C: BatchTaskContext> InnerSideExecutorBuilder<C> {
Ok(row_seq_scan_node)
}

/// Creates the `ProstExchangeSource` using the given `id`.
fn build_prost_exchange_source(&self, id: &ParallelUnitId) -> Result<ProstExchangeSource> {
/// Creates the `PbExchangeSource` using the given `id`.
fn build_prost_exchange_source(&self, id: &ParallelUnitId) -> Result<PbExchangeSource> {
let worker = self.pu_to_worker_mapping.get(id).ok_or_else(|| {
internal_error("No worker node found for the given parallel unit id.")
})?;
Expand All @@ -134,9 +134,9 @@ impl<C: BatchTaskContext> InnerSideExecutorBuilder<C> {
epoch: Some(self.epoch.clone()),
};

let prost_exchange_source = ProstExchangeSource {
let prost_exchange_source = PbExchangeSource {
task_output_id: Some(TaskOutputId {
task_id: Some(ProstTaskId {
task_id: Some(PbTaskId {
// FIXME: We should replace this random generated uuid to current query_id for
// better dashboard. However, due to the lack of info of
// stage_id and task_id, we can not do it now. Now just make sure it will not
Expand Down
22 changes: 11 additions & 11 deletions src/batch/src/executor/join/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ use risingwave_common::error::Result;
use risingwave_common::row::Row;
use risingwave_common::types::{DataType, DatumRef};
use risingwave_common::util::iter_util::ZipEqFast;
use risingwave_pb::plan_common::JoinType as JoinTypeProst;
use risingwave_pb::plan_common::JoinType as JoinTypePb;

use crate::error::BatchError;

Expand Down Expand Up @@ -61,17 +61,17 @@ impl JoinType {
)
}

pub fn from_prost(prost: JoinTypeProst) -> Self {
pub fn from_prost(prost: JoinTypePb) -> Self {
match prost {
JoinTypeProst::Inner => JoinType::Inner,
JoinTypeProst::LeftOuter => JoinType::LeftOuter,
JoinTypeProst::LeftSemi => JoinType::LeftSemi,
JoinTypeProst::LeftAnti => JoinType::LeftAnti,
JoinTypeProst::RightOuter => JoinType::RightOuter,
JoinTypeProst::RightSemi => JoinType::RightSemi,
JoinTypeProst::RightAnti => JoinType::RightAnti,
JoinTypeProst::FullOuter => JoinType::FullOuter,
JoinTypeProst::Unspecified => unreachable!(),
JoinTypePb::Inner => JoinType::Inner,
JoinTypePb::LeftOuter => JoinType::LeftOuter,
JoinTypePb::LeftSemi => JoinType::LeftSemi,
JoinTypePb::LeftAnti => JoinType::LeftAnti,
JoinTypePb::RightOuter => JoinType::RightOuter,
JoinTypePb::RightSemi => JoinType::RightSemi,
JoinTypePb::RightAnti => JoinType::RightAnti,
JoinTypePb::FullOuter => JoinType::FullOuter,
JoinTypePb::Unspecified => unreachable!(),
}
}

Expand Down
10 changes: 5 additions & 5 deletions src/batch/src/executor/merge_sort_exchange.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use risingwave_common::error::{Result, RwError};
use risingwave_common::types::ToOwnedDatum;
use risingwave_common::util::sort_util::{ColumnOrder, HeapElem};
use risingwave_pb::batch_plan::plan_node::NodeBody;
use risingwave_pb::batch_plan::ExchangeSource as ProstExchangeSource;
use risingwave_pb::batch_plan::PbExchangeSource;

use crate::exchange_source::ExchangeSourceImpl;
use crate::executor::{
Expand All @@ -41,7 +41,7 @@ pub struct MergeSortExchangeExecutorImpl<CS, C> {
source_inputs: Vec<Option<DataChunk>>,
column_orders: Arc<Vec<ColumnOrder>>,
min_heap: BinaryHeap<HeapElem>,
proto_sources: Vec<ProstExchangeSource>,
proto_sources: Vec<PbExchangeSource>,
sources: Vec<ExchangeSourceImpl>, // impl
/// Mock-able CreateSource.
source_creators: Vec<CS>,
Expand Down Expand Up @@ -199,7 +199,7 @@ impl BoxedExecutorBuilder for MergeSortExchangeExecutorBuilder {
let column_orders = Arc::new(column_orders);

let exchange_node = sort_merge_node.get_exchange()?;
let proto_sources: Vec<ProstExchangeSource> = exchange_node.get_sources().to_vec();
let proto_sources: Vec<PbExchangeSource> = exchange_node.get_sources().to_vec();
let source_creators =
vec![DefaultCreateSource::new(source.context().client_pool()); proto_sources.len()];
ensure!(!exchange_node.get_sources().is_empty());
Expand Down Expand Up @@ -253,11 +253,11 @@ mod tests {
let fake_exchange_source = FakeExchangeSource::new(vec![Some(chunk)]);
let fake_create_source = FakeCreateSource::new(fake_exchange_source);

let mut proto_sources: Vec<ProstExchangeSource> = vec![];
let mut proto_sources: Vec<PbExchangeSource> = vec![];
let mut source_creators = vec![];
let num_sources = 2;
for _ in 0..num_sources {
proto_sources.push(ProstExchangeSource::default());
proto_sources.push(PbExchangeSource::default());
source_creators.push(fake_create_source.clone());
}
let column_orders = Arc::new(vec![ColumnOrder {
Expand Down
4 changes: 2 additions & 2 deletions src/batch/src/executor/row_seq_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use risingwave_common::util::select_all;
use risingwave_common::util::sort_util::{Direction, OrderType};
use risingwave_common::util::value_encoding::deserialize_datum;
use risingwave_pb::batch_plan::plan_node::NodeBody;
use risingwave_pb::batch_plan::{scan_range, ScanRange as ProstScanRange};
use risingwave_pb::batch_plan::{scan_range, PbScanRange};
use risingwave_pb::common::BatchQueryEpoch;
use risingwave_pb::plan_common::StorageTableDesc;
use risingwave_storage::store::PrefetchOptions;
Expand Down Expand Up @@ -75,7 +75,7 @@ impl ScanRange {

/// Create a scan range from the prost representation.
pub fn new(
scan_range: ProstScanRange,
scan_range: PbScanRange,
mut pk_types: impl Iterator<Item = DataType>,
) -> Result<Self> {
let pk_prefix = OwnedRow::new(
Expand Down
Loading