Skip to content

Commit

Permalink
perf(parser): do to_ascii_lowercase only once (risingwavelabs#8718)
Browse files Browse the repository at this point in the history
  • Loading branch information
lmatz authored Mar 23, 2023
1 parent 669087e commit 3a0ee3d
Show file tree
Hide file tree
Showing 9 changed files with 48 additions and 183 deletions.
2 changes: 1 addition & 1 deletion ci/workflows/pull-request.yml
Original file line number Diff line number Diff line change
Expand Up @@ -312,7 +312,7 @@ steps:
# files: "*-junit.xml"
# format: "junit"
- ./ci/plugins/upload-failure-logs
timeout_in_minutes: 15
timeout_in_minutes: 18
retry: *auto-retry

- label: "misc check"
Expand Down
90 changes: 10 additions & 80 deletions src/connector/src/parser/avro/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -462,86 +462,16 @@ mod test {

fn build_rw_columns() -> Vec<SourceColumnDesc> {
vec![
SourceColumnDesc {
name: "id".to_string(),
data_type: DataType::Int32,
column_id: ColumnId::from(0),
is_row_id: false,
is_meta: false,
fields: vec![],
},
SourceColumnDesc {
name: "sequence_id".to_string(),
data_type: DataType::Int64,
column_id: ColumnId::from(1),
is_row_id: false,
is_meta: false,
fields: vec![],
},
SourceColumnDesc {
name: "name".to_string(),
data_type: DataType::Varchar,
column_id: ColumnId::from(2),
is_row_id: false,
is_meta: false,
fields: vec![],
},
SourceColumnDesc {
name: "score".to_string(),
data_type: DataType::Float32,
column_id: ColumnId::from(3),
is_row_id: false,
is_meta: false,
fields: vec![],
},
SourceColumnDesc {
name: "avg_score".to_string(),
data_type: DataType::Float64,
column_id: ColumnId::from(4),
is_row_id: false,
is_meta: false,
fields: vec![],
},
SourceColumnDesc {
name: "is_lasted".to_string(),
data_type: DataType::Boolean,
column_id: ColumnId::from(5),
is_row_id: false,
is_meta: false,
fields: vec![],
},
SourceColumnDesc {
name: "entrance_date".to_string(),
data_type: DataType::Date,
column_id: ColumnId::from(6),
is_row_id: false,
is_meta: false,
fields: vec![],
},
SourceColumnDesc {
name: "birthday".to_string(),
data_type: DataType::Timestamp,
column_id: ColumnId::from(7),
is_row_id: false,
is_meta: false,
fields: vec![],
},
SourceColumnDesc {
name: "anniversary".to_string(),
data_type: DataType::Timestamp,
column_id: ColumnId::from(8),
is_row_id: false,
is_meta: false,
fields: vec![],
},
SourceColumnDesc {
name: "passed".to_string(),
data_type: DataType::Interval,
column_id: ColumnId::from(9),
is_row_id: false,
is_meta: false,
fields: vec![],
},
SourceColumnDesc::simple("id", DataType::Int32, ColumnId::from(0)),
SourceColumnDesc::simple("sequence_id", DataType::Int64, ColumnId::from(1)),
SourceColumnDesc::simple("name", DataType::Varchar, ColumnId::from(2)),
SourceColumnDesc::simple("score", DataType::Float32, ColumnId::from(3)),
SourceColumnDesc::simple("avg_score", DataType::Float64, ColumnId::from(4)),
SourceColumnDesc::simple("is_lasted", DataType::Boolean, ColumnId::from(5)),
SourceColumnDesc::simple("entrance_date", DataType::Date, ColumnId::from(6)),
SourceColumnDesc::simple("birthday", DataType::Timestamp, ColumnId::from(7)),
SourceColumnDesc::simple("anniversary", DataType::Timestamp, ColumnId::from(8)),
SourceColumnDesc::simple("passed", DataType::Interval, ColumnId::from(9)),
]
}

Expand Down
13 changes: 6 additions & 7 deletions src/connector/src/parser/canal/simd_json_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ impl CanalJsonParser {
writer.insert(|column| {
cannal_simd_json_parse_value(
&column.data_type,
v.get(column.name.to_ascii_lowercase().as_str()),
v.get(column.name_in_lower_case.as_str()),
)
})
})
Expand Down Expand Up @@ -135,15 +135,14 @@ impl CanalJsonParser {
// in origin canal, old only contains the changed columns but data
// contains all columns.
// in ticdc, old contains all fields
let col_name_lc = column.name.to_ascii_lowercase();
let before_value = before
.get(col_name_lc.as_str())
.or_else(|| after.get(col_name_lc.as_str()));
let col_name_lc = column.name_in_lower_case.as_str();
let before_value =
before.get(col_name_lc).or_else(|| after.get(col_name_lc));
let before =
cannal_simd_json_parse_value(&column.data_type, before_value)?;
let after = cannal_simd_json_parse_value(
&column.data_type,
after.get(col_name_lc.as_str()),
after.get(col_name_lc),
)?;
Ok((before, after))
})
Expand All @@ -169,7 +168,7 @@ impl CanalJsonParser {
writer.delete(|column| {
cannal_simd_json_parse_value(
&column.data_type,
v.get(column.name.to_ascii_lowercase().as_str()),
v.get(column.name_in_lower_case.as_str()),
)
})
})
Expand Down
8 changes: 4 additions & 4 deletions src/connector/src/parser/debezium/simd_json_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,11 +97,11 @@ impl DebeziumJsonParser {
writer.update(|column| {
let before = simd_json_parse_value(
&column.data_type,
before.get(column.name.to_ascii_lowercase().as_str()),
before.get(column.name_in_lower_case.as_str()),
)?;
let after = simd_json_parse_value(
&column.data_type,
after.get(column.name.to_ascii_lowercase().as_str()),
after.get(column.name_in_lower_case.as_str()),
)?;

Ok((before, after))
Expand All @@ -120,7 +120,7 @@ impl DebeziumJsonParser {
writer.insert(|column| {
simd_json_parse_value(
&column.data_type,
after.get(column.name.to_ascii_lowercase().as_str()),
after.get(column.name_in_lower_case.as_str()),
)
.map_err(Into::into)
})
Expand All @@ -138,7 +138,7 @@ impl DebeziumJsonParser {
writer.delete(|column| {
simd_json_parse_value(
&column.data_type,
before.get(column.name.to_ascii_lowercase().as_str()),
before.get(column.name_in_lower_case.as_str()),
)
.map_err(Into::into)
})
Expand Down
15 changes: 6 additions & 9 deletions src/connector/src/parser/json_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,14 +69,11 @@ impl JsonParser {
writer: &mut SourceStreamChunkRowWriter<'_>,
) -> Result<WriteGuard> {
writer.insert(|desc| {
simd_json_parse_value(
&desc.data_type,
value.get(desc.name.to_ascii_lowercase().as_str()),
)
.map_err(|e| {
tracing::error!("failed to process value ({}): {}", value, e);
e.into()
})
simd_json_parse_value(&desc.data_type, value.get(desc.name_in_lower_case.as_str()))
.map_err(|e| {
tracing::error!("failed to process value ({}): {}", value, e);
e.into()
})
})
}

Expand Down Expand Up @@ -119,7 +116,7 @@ impl JsonParser {
let fill_fn = |desc: &SourceColumnDesc| {
simd_json_parse_value(
&desc.data_type,
value.get(desc.name.to_ascii_lowercase().as_str()),
value.get(desc.name_in_lower_case.as_str()),
)
.map_err(|e| {
tracing::error!(
Expand Down
13 changes: 5 additions & 8 deletions src/connector/src/parser/maxwell/simd_json_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ impl MaxwellParser {
writer.insert(|column| {
simd_json_parse_value(
&column.data_type,
after.get(column.name.to_ascii_lowercase().as_str()),
after.get(column.name_in_lower_case.as_str()),
)
.map_err(Into::into)
})
Expand All @@ -90,13 +90,10 @@ impl MaxwellParser {

writer.update(|column| {
// old only contains the changed columns but data contains all columns.
let col_name_lc = column.name.to_ascii_lowercase();
let before_value = before
.get(col_name_lc.as_str())
.or_else(|| after.get(col_name_lc.as_str()));
let col_name_lc = column.name_in_lower_case.as_str();
let before_value = before.get(col_name_lc).or_else(|| after.get(col_name_lc));
let before = simd_json_parse_value(&column.data_type, before_value)?;
let after =
simd_json_parse_value(&column.data_type, after.get(col_name_lc.as_str()))?;
let after = simd_json_parse_value(&column.data_type, after.get(col_name_lc))?;
Ok((before, after))
})
}
Expand All @@ -107,7 +104,7 @@ impl MaxwellParser {
writer.delete(|column| {
simd_json_parse_value(
&column.data_type,
before.get(column.name.to_ascii_lowercase().as_str()),
before.get(column.name_in_lower_case.as_str()),
)
.map_err(Into::into)
})
Expand Down
2 changes: 1 addition & 1 deletion src/connector/src/parser/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ impl SourceStreamChunkRowWriter<'_> {
&mut self,
mut f: impl FnMut(&SourceColumnDesc) -> Result<A::Output>,
) -> Result<WriteGuard> {
let mut modify_col = vec![];
let mut modify_col = Vec::with_capacity(self.descs.len());
self.descs
.iter()
.zip_eq(self.builders.iter_mut())
Expand Down
7 changes: 6 additions & 1 deletion src/connector/src/source/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use risingwave_common::types::DataType;
#[derive(Clone, Debug)]
pub struct SourceColumnDesc {
pub name: String,
pub name_in_lower_case: String,
pub data_type: DataType,
pub column_id: ColumnId,
pub fields: Vec<ColumnDesc>,
Expand All @@ -39,8 +40,11 @@ impl SourceColumnDesc {
!matches!(data_type, DataType::List { .. } | DataType::Struct(..)),
"called `SourceColumnDesc::simple` with a composite type."
);
let name = name.into();
let name_in_lower_case = name.to_ascii_lowercase();
Self {
name: name.into(),
name,
name_in_lower_case,
data_type,
column_id,
fields: vec![],
Expand All @@ -60,6 +64,7 @@ impl From<&ColumnDesc> for SourceColumnDesc {
let is_meta = c.name.starts_with("_rw_kafka_timestamp");
Self {
name: c.name.clone(),
name_in_lower_case: c.name.to_ascii_lowercase(),
data_type: c.data_type.clone(),
column_id: c.column_id,
fields: c.field_descs.clone(),
Expand Down
81 changes: 9 additions & 72 deletions src/source/benches/json_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,78 +53,15 @@ fn generate_all_json() -> Vec<Vec<u8>> {

fn get_descs() -> Vec<SourceColumnDesc> {
vec![
SourceColumnDesc {
name: "i32".to_string(),
data_type: DataType::Int32,
column_id: ColumnId::from(0),
is_row_id: false,
is_meta: false,
fields: vec![],
},
SourceColumnDesc {
name: "bool".to_string(),
data_type: DataType::Boolean,
column_id: ColumnId::from(2),
is_row_id: false,
is_meta: false,
fields: vec![],
},
SourceColumnDesc {
name: "i16".to_string(),
data_type: DataType::Int16,
column_id: ColumnId::from(3),
is_row_id: false,
is_meta: false,
fields: vec![],
},
SourceColumnDesc {
name: "i64".to_string(),
data_type: DataType::Int64,
column_id: ColumnId::from(4),
is_row_id: false,
is_meta: false,
fields: vec![],
},
SourceColumnDesc {
name: "f32".to_string(),
data_type: DataType::Float32,
column_id: ColumnId::from(5),
is_row_id: false,
is_meta: false,
fields: vec![],
},
SourceColumnDesc {
name: "f64".to_string(),
data_type: DataType::Float64,
column_id: ColumnId::from(6),
is_row_id: false,
is_meta: false,
fields: vec![],
},
SourceColumnDesc {
name: "varchar".to_string(),
data_type: DataType::Varchar,
column_id: ColumnId::from(7),
is_row_id: false,
is_meta: false,
fields: vec![],
},
SourceColumnDesc {
name: "date".to_string(),
data_type: DataType::Date,
column_id: ColumnId::from(8),
is_row_id: false,
is_meta: false,
fields: vec![],
},
SourceColumnDesc {
name: "timestamp".to_string(),
data_type: DataType::Timestamp,
column_id: ColumnId::from(9),
is_row_id: false,
is_meta: false,
fields: vec![],
},
SourceColumnDesc::simple("i32", DataType::Int32, ColumnId::from(0)),
SourceColumnDesc::simple("bool", DataType::Boolean, ColumnId::from(2)),
SourceColumnDesc::simple("i16", DataType::Int16, ColumnId::from(3)),
SourceColumnDesc::simple("i64", DataType::Int64, ColumnId::from(4)),
SourceColumnDesc::simple("f32", DataType::Float32, ColumnId::from(5)),
SourceColumnDesc::simple("f64", DataType::Float64, ColumnId::from(6)),
SourceColumnDesc::simple("varchar", DataType::Varchar, ColumnId::from(7)),
SourceColumnDesc::simple("date", DataType::Date, ColumnId::from(8)),
SourceColumnDesc::simple("timestamp", DataType::Timestamp, ColumnId::from(9)),
]
}

Expand Down

0 comments on commit 3a0ee3d

Please sign in to comment.