Skip to content

Commit

Permalink
fix more
Browse files Browse the repository at this point in the history
Signed-off-by: Bugen Zhao <i@bugenzhao.com>
  • Loading branch information
BugenZhao committed Dec 9, 2024
1 parent 4f81021 commit ad9ed16
Show file tree
Hide file tree
Showing 12 changed files with 76 additions and 79 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,7 @@ redundant_explicit_links = "allow"
lto = "off"
# use parallel frontend to speed up build
# TODO: may consider applying to release/production profile as well
rustflags = ["-Z", "threads=8"]
# rustflags = ["-Z", "threads=8"]

[profile.release]
debug = "full"
Expand Down
4 changes: 2 additions & 2 deletions src/common/src/catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -575,8 +575,8 @@ impl Engine {

pub fn debug_to_string(self) -> String {
match self {
Engine::Hummock => "Hummock".to_string(),
Engine::Iceberg => "Iceberg".to_string(),
Engine::Hummock => "Hummock".to_owned(),
Engine::Iceberg => "Iceberg".to_owned(),
}
}
}
Expand Down
5 changes: 2 additions & 3 deletions src/connector/src/connector_common/iceberg/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ impl IcebergCommon {
let enable_config_load = self.enable_config_load.unwrap_or(false);
iceberg_configs.insert(
"iceberg.table.io.disable_config_load".to_owned(),
enable_config_load.to_owned(),
enable_config_load.to_string(),
);

load_iceberg_base_catalog_config(&iceberg_configs)?
Expand Down Expand Up @@ -227,8 +227,7 @@ impl IcebergCommon {
java_catalog_configs.insert("init-creation-stacktrace".to_owned(), "false".to_owned());

if let Some(region) = &self.region {
java_catalog_configs
.insert("client.region".to_string(), region.clone().to_string());
java_catalog_configs.insert("client.region".to_owned(), region.clone());
}
if let Some(endpoint) = &self.endpoint {
java_catalog_configs.insert("s3.endpoint".to_owned(), endpoint.clone());
Expand Down
2 changes: 1 addition & 1 deletion src/connector/src/parser/chunk_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ impl SourceStreamChunkRowWriter<'_> {
)?))
} else {
Err(AccessError::Uncategorized {
message: "CDC metadata not found in the message".to_string(),
message: "CDC metadata not found in the message".to_owned(),
})
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ mod tests {
})
+ tail_separator;
let text = text.into_bytes();
let split_id: Arc<str> = "1".to_string().into_boxed_str().into();
let split_id: Arc<str> = "1".to_owned().into_boxed_str().into();
let s = text
.chunks(N2)
.enumerate()
Expand Down
4 changes: 2 additions & 2 deletions src/connector/src/source/iceberg/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,10 +64,10 @@ impl IcebergProperties {
pub async fn create_catalog_v2(&self) -> ConnectorResult<Arc<dyn CatalogV2>> {
let mut java_catalog_props = HashMap::new();
if let Some(jdbc_user) = self.jdbc_user.clone() {
java_catalog_props.insert("jdbc.user".to_string(), jdbc_user);
java_catalog_props.insert("jdbc.user".to_owned(), jdbc_user);
}
if let Some(jdbc_password) = self.jdbc_password.clone() {
java_catalog_props.insert("jdbc.password".to_string(), jdbc_password);
java_catalog_props.insert("jdbc.password".to_owned(), jdbc_password);
}
// TODO: support path_style_access and java_catalog_props for iceberg source
self.common.create_catalog_v2(&java_catalog_props).await
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/handler/alter_streaming_rate_limit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ pub async fn handle_alter_streaming_rate_limit(
{
return Err(InvalidInputSyntax(
"PAUSE or RESUME is invalid when the stream has pre configured ratelimit."
.to_string(),
.to_owned(),
)
.into());
}
Expand Down
114 changes: 56 additions & 58 deletions src/frontend/src/handler/create_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1395,8 +1395,8 @@ pub async fn create_iceberg_engine_table(

let meta_client = session.env().meta_client();
let system_params = meta_client.get_system_params().await?;
let state_store_endpoint = system_params.state_store().to_string();
let data_directory = system_params.data_directory().to_string();
let state_store_endpoint = system_params.state_store().to_owned();
let data_directory = system_params.data_directory().to_owned();
let meta_store_endpoint = meta_client.get_meta_store_endpoint().await?;

let (s3_region, s3_bucket, s3_endpoint, s3_ak, s3_sk) = match state_store_endpoint {
Expand All @@ -1408,7 +1408,7 @@ pub async fn create_iceberg_engine_table(
};
(
s3_region,
s3.strip_prefix("hummock+s3://").unwrap().to_string(),
s3.strip_prefix("hummock+s3://").unwrap().to_owned(),
None,
None,
None,
Expand All @@ -1429,11 +1429,11 @@ pub async fn create_iceberg_engine_table(
};
let (address, bucket) = rest.split_once('/').unwrap();
(
"us-east-1".to_string(),
bucket.to_string(),
Some((endpoint_prefix.to_string() + address).to_string()),
Some(access_key_id.to_string()),
Some(secret_access_key.to_string()),
"us-east-1".to_owned(),
bucket.to_owned(),
Some(format!("{}{}", endpoint_prefix, address)),
Some(access_key_id.to_owned()),
Some(secret_access_key.to_owned()),
)
}
_ => {
Expand All @@ -1445,31 +1445,29 @@ pub async fn create_iceberg_engine_table(
};

let meta_store_endpoint = url::Url::parse(&meta_store_endpoint).map_err(|_| {
ErrorCode::InternalError("failed to parse the meta store endpoint".to_string())
ErrorCode::InternalError("failed to parse the meta store endpoint".to_owned())
})?;
let meta_store_backend = meta_store_endpoint.scheme().to_string();
let meta_store_user = meta_store_endpoint.username().to_string();
let meta_store_backend = meta_store_endpoint.scheme().to_owned();
let meta_store_user = meta_store_endpoint.username().to_owned();
let meta_store_password = meta_store_endpoint
.password()
.ok_or_else(|| {
ErrorCode::InternalError(
"failed to parse password from meta store endpoint".to_string(),
)
ErrorCode::InternalError("failed to parse password from meta store endpoint".to_owned())
})?
.to_string();
.to_owned();
let meta_store_host = meta_store_endpoint
.host_str()
.ok_or_else(|| {
ErrorCode::InternalError("failed to parse host from meta store endpoint".to_string())
ErrorCode::InternalError("failed to parse host from meta store endpoint".to_owned())
})?
.to_string();
.to_owned();
let meta_store_port = meta_store_endpoint.port().ok_or_else(|| {
ErrorCode::InternalError("failed to parse port from meta store endpoint".to_string())
ErrorCode::InternalError("failed to parse port from meta store endpoint".to_owned())
})?;
let meta_store_database = meta_store_endpoint
.path()
.trim_start_matches('/')
.to_string();
.to_owned();

let Ok(meta_backend) = MetaBackend::from_str(&meta_store_backend, true) else {
bail!("failed to parse meta backend: {}", meta_store_backend);
Expand All @@ -1481,16 +1479,16 @@ pub async fn create_iceberg_engine_table(
.read_guard()
.get_database_by_id(&table.database_id)?
.name()
.to_string();
.to_owned();
let rw_schema_name = session
.env()
.catalog_reader()
.read_guard()
.get_schema_by_id(&table.database_id, &table.schema_id)?
.name()
.to_string();
let iceberg_catalog_name = rw_db_name.to_string();
let iceberg_database_name = rw_schema_name.to_string();
.clone();
let iceberg_catalog_name = rw_db_name.clone();
let iceberg_database_name = rw_schema_name.clone();
let iceberg_table_name = table_name.0.last().unwrap().real_value();

// Iceberg sinks require a primary key, if none is provided, we will use the _row_id column
Expand Down Expand Up @@ -1530,7 +1528,7 @@ pub async fn create_iceberg_engine_table(

// For the table without primary key. We will use `_row_id` as primary key
let sink_from = if pks.is_empty() {
pks = vec![ROWID_PREFIX.to_string()];
pks = vec![ROWID_PREFIX.to_owned()];
let [stmt]: [_; 1] =
Parser::parse_sql(&format!("select {}, * from {}", ROWID_PREFIX, table_name))
.context("unable to parse query")?
Expand All @@ -1548,7 +1546,7 @@ pub async fn create_iceberg_engine_table(
let with_properties = WithProperties(vec![]);
let mut sink_name = table_name.clone();
*sink_name.0.last_mut().unwrap() = Ident::from(
(ICEBERG_SINK_PREFIX.to_string() + &sink_name.0.last().unwrap().real_value()).as_str(),
(ICEBERG_SINK_PREFIX.to_owned() + &sink_name.0.last().unwrap().real_value()).as_str(),
);
let create_sink_stmt = CreateSinkStatement {
if_not_exists: false,
Expand Down Expand Up @@ -1593,40 +1591,40 @@ pub async fn create_iceberg_engine_table(

let mut sink_handler_args = handler_args.clone();
let mut with = BTreeMap::new();
with.insert("connector".to_string(), "iceberg".to_string());
with.insert("connector".to_owned(), "iceberg".to_owned());

with.insert("primary_key".to_string(), pks.join(","));
with.insert("type".to_string(), "upsert".to_string());
with.insert("catalog.type".to_string(), "jdbc".to_string());
with.insert("warehouse.path".to_string(), warehouse_path.clone());
with.insert("primary_key".to_owned(), pks.join(","));
with.insert("type".to_owned(), "upsert".to_owned());
with.insert("catalog.type".to_owned(), "jdbc".to_owned());
with.insert("warehouse.path".to_owned(), warehouse_path.clone());
if let Some(s3_endpoint) = s3_endpoint.clone() {
with.insert("s3.endpoint".to_string(), s3_endpoint);
with.insert("s3.endpoint".to_owned(), s3_endpoint);
}
if let Some(s3_ak) = s3_ak.clone() {
with.insert("s3.access.key".to_string(), s3_ak.clone());
with.insert("s3.access.key".to_owned(), s3_ak.clone());
}
if let Some(s3_sk) = s3_sk.clone() {
with.insert("s3.secret.key".to_string(), s3_sk.clone());
with.insert("s3.secret.key".to_owned(), s3_sk.clone());
}
with.insert("s3.region".to_string(), s3_region.clone());
with.insert("catalog.uri".to_string(), catalog_uri.clone());
with.insert("catalog.jdbc.user".to_string(), meta_store_user.clone());
with.insert("s3.region".to_owned(), s3_region.clone());
with.insert("catalog.uri".to_owned(), catalog_uri.clone());
with.insert("catalog.jdbc.user".to_owned(), meta_store_user.clone());
with.insert(
"catalog.jdbc.password".to_string(),
"catalog.jdbc.password".to_owned(),
meta_store_password.clone(),
);
with.insert("catalog.name".to_string(), iceberg_catalog_name.clone());
with.insert("database.name".to_string(), iceberg_database_name.clone());
with.insert("table.name".to_string(), iceberg_table_name.to_string());
with.insert("catalog.name".to_owned(), iceberg_catalog_name.clone());
with.insert("database.name".to_owned(), iceberg_database_name.clone());
with.insert("table.name".to_owned(), iceberg_table_name.clone());
// TODO: change the `commit_checkpoint_interval` to a configurable value
with.insert("commit_checkpoint_interval".to_string(), "1".to_string());
with.insert("create_table_if_not_exists".to_string(), "true".to_string());
with.insert("enable_config_load".to_string(), "true".to_string());
with.insert("commit_checkpoint_interval".to_owned(), "1".to_owned());
with.insert("create_table_if_not_exists".to_owned(), "true".to_owned());
with.insert("enable_config_load".to_owned(), "true".to_owned());
sink_handler_args.with_options = WithOptions::new_with_options(with);

let mut source_name = table_name.clone();
*source_name.0.last_mut().unwrap() = Ident::from(
(ICEBERG_SOURCE_PREFIX.to_string() + &source_name.0.last().unwrap().real_value()).as_str(),
(ICEBERG_SOURCE_PREFIX.to_owned() + &source_name.0.last().unwrap().real_value()).as_str(),
);
let create_source_stmt = CreateSourceStatement {
temporary: false,
Expand All @@ -1643,29 +1641,29 @@ pub async fn create_iceberg_engine_table(

let mut source_handler_args = handler_args.clone();
let mut with = BTreeMap::new();
with.insert("connector".to_string(), "iceberg".to_string());
with.insert("catalog.type".to_string(), "jdbc".to_string());
with.insert("warehouse.path".to_string(), warehouse_path.clone());
with.insert("connector".to_owned(), "iceberg".to_owned());
with.insert("catalog.type".to_owned(), "jdbc".to_owned());
with.insert("warehouse.path".to_owned(), warehouse_path.clone());
if let Some(s3_endpoint) = s3_endpoint {
with.insert("s3.endpoint".to_string(), s3_endpoint.clone());
with.insert("s3.endpoint".to_owned(), s3_endpoint.clone());
}
if let Some(s3_ak) = s3_ak.clone() {
with.insert("s3.access.key".to_string(), s3_ak.clone());
with.insert("s3.access.key".to_owned(), s3_ak.clone());
}
if let Some(s3_sk) = s3_sk.clone() {
with.insert("s3.secret.key".to_string(), s3_sk.clone());
with.insert("s3.secret.key".to_owned(), s3_sk.clone());
}
with.insert("s3.region".to_string(), s3_region.clone());
with.insert("catalog.uri".to_string(), catalog_uri.clone());
with.insert("catalog.jdbc.user".to_string(), meta_store_user.clone());
with.insert("s3.region".to_owned(), s3_region.clone());
with.insert("catalog.uri".to_owned(), catalog_uri.clone());
with.insert("catalog.jdbc.user".to_owned(), meta_store_user.clone());
with.insert(
"catalog.jdbc.password".to_string(),
"catalog.jdbc.password".to_owned(),
meta_store_password.clone(),
);
with.insert("catalog.name".to_string(), iceberg_catalog_name.clone());
with.insert("database.name".to_string(), iceberg_database_name.clone());
with.insert("table.name".to_string(), iceberg_table_name.to_string());
with.insert("enable_config_load".to_string(), "true".to_string());
with.insert("catalog.name".to_owned(), iceberg_catalog_name.clone());
with.insert("database.name".to_owned(), iceberg_database_name.clone());
with.insert("table.name".to_owned(), iceberg_table_name.clone());
with.insert("enable_config_load".to_owned(), "true".to_owned());
source_handler_args.with_options = WithOptions::new_with_options(with);

// before we create the table, ensure the JVM is initialized as we use jdbc catalog right now.
Expand Down
12 changes: 6 additions & 6 deletions src/frontend/src/handler/drop_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ pub async fn handle_drop_table(
.get_source_by_name(
db_name,
schema_path,
&(ICEBERG_SOURCE_PREFIX.to_string() + &table_name),
&(ICEBERG_SOURCE_PREFIX.to_owned() + &table_name),
)
.map(|(source, _)| source.clone())
{
Expand All @@ -92,7 +92,7 @@ pub async fn handle_drop_table(
.get_sink_by_name(
db_name,
schema_path,
&(ICEBERG_SINK_PREFIX.to_string() + &table_name),
&(ICEBERG_SINK_PREFIX.to_owned() + &table_name),
)
.map(|(sink, _)| sink.clone())
{
Expand All @@ -114,10 +114,10 @@ pub async fn handle_drop_table(
ObjectName::from(match schema_name {
Some(ref schema) => vec![
Ident::from(schema.as_str()),
Ident::from((ICEBERG_SINK_PREFIX.to_string() + &table_name).as_str()),
Ident::from((ICEBERG_SINK_PREFIX.to_owned() + &table_name).as_str()),
],
None => vec![Ident::from(
(ICEBERG_SINK_PREFIX.to_string() + &table_name).as_str(),
(ICEBERG_SINK_PREFIX.to_owned() + &table_name).as_str(),
)],
}),
true,
Expand Down Expand Up @@ -167,10 +167,10 @@ pub async fn handle_drop_table(
ObjectName::from(match schema_name {
Some(ref schema) => vec![
Ident::from(schema.as_str()),
Ident::from((ICEBERG_SOURCE_PREFIX.to_string() + &table_name).as_str()),
Ident::from((ICEBERG_SOURCE_PREFIX.to_owned() + &table_name).as_str()),
],
None => vec![Ident::from(
(ICEBERG_SOURCE_PREFIX.to_string() + &table_name).as_str(),
(ICEBERG_SOURCE_PREFIX.to_owned() + &table_name).as_str(),
)],
}),
true,
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/handler/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -326,7 +326,7 @@ pub fn get_table_catalog_by_table_name(
let (table, schema_name) =
reader.get_created_table_by_name(db_name, schema_path, &real_table_name)?;

Ok((table.clone(), schema_name.to_string()))
Ok((table.clone(), schema_name.to_owned()))
}

#[cfg(test)]
Expand Down
4 changes: 2 additions & 2 deletions src/frontend/src/utils/with_options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ pub(crate) fn resolve_connection_ref_and_secret_ref(
{
connection_params.as_ref().map(|cp| {
jsonbb::json!({
"connection_type": cp.connection_type().as_str_name().to_string()
"connection_type": cp.connection_type().as_str_name().to_owned()
})
})
},
Expand Down Expand Up @@ -314,7 +314,7 @@ pub(crate) fn resolve_connection_ref_and_secret_ref(
.any(|k| k.starts_with("aws"))
{
return Err(RwError::from(ErrorCode::InvalidParameterValue(
"Glue related options/secrets are not allowed when using schema registry connection".to_string()
"Glue related options/secrets are not allowed when using schema registry connection".to_owned()
)));
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/meta/src/controller/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1565,7 +1565,7 @@ impl CatalogController {
{
pb_connection.info.as_ref().and_then(|info| match info {
ConnectionInfo::ConnectionParams(params) => {
Some(params.connection_type().as_str_name().to_string())
Some(params.connection_type().as_str_name().to_owned())
}
_ => None,
})
Expand Down

0 comments on commit ad9ed16

Please sign in to comment.