Skip to content

Commit a4882d7

Browse files
committed
feat: Add rest catalog support for cli
1 parent 31b0c0a commit a4882d7

File tree

6 files changed

+198
-83
lines changed

6 files changed

+198
-83
lines changed

Cargo.lock

Lines changed: 2 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/catalog/rest/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,3 +50,4 @@ iceberg_test_utils = { path = "../../test_utils", features = ["tests"] }
5050
mockito = { workspace = true }
5151
port_scanner = { workspace = true }
5252
tokio = { workspace = true }
53+
toml = { workspace = true }

crates/catalog/rest/src/catalog.rs

Lines changed: 32 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ use itertools::Itertools;
3131
use reqwest::header::{
3232
HeaderMap, HeaderName, HeaderValue, {self},
3333
};
34+
use serde_derive::Deserialize;
3435
use reqwest::{Client, Method, StatusCode, Url};
3536
use tokio::sync::OnceCell;
3637
use typed_builder::TypedBuilder;
@@ -49,7 +50,7 @@ const CARGO_PKG_VERSION: &str = env!("CARGO_PKG_VERSION");
4950
const PATH_V1: &str = "v1";
5051

5152
/// Rest catalog configuration.
52-
#[derive(Clone, Debug, TypedBuilder)]
53+
#[derive(Clone, Debug, TypedBuilder, Deserialize, PartialEq)]
5354
pub struct RestCatalogConfig {
5455
uri: String,
5556

@@ -59,6 +60,7 @@ pub struct RestCatalogConfig {
5960
#[builder(default)]
6061
props: HashMap<String, String>,
6162

63+
#[serde(skip)]
6264
#[builder(default)]
6365
client: Option<Client>,
6466
}
@@ -2257,4 +2259,33 @@ mod tests {
22572259
config_mock.assert_async().await;
22582260
update_table_mock.assert_async().await;
22592261
}
2262+
2263+
#[test]
2264+
fn test_config_parse_toml() {
2265+
let config_str = r#"
2266+
uri = "http://localhost:8080"
2267+
warehouse = "s3://demo"
2268+
[props]
2269+
"s3.endpoint" = "http://localhost:9000"
2270+
"s3.access_key_id" = "kfc_crazy_thursday"
2271+
"#;
2272+
2273+
let toml_table: toml::Table = toml::de::from_str(config_str).unwrap();
2274+
let config: RestCatalogConfig = toml_table.try_into().unwrap();
2275+
2276+
assert_eq!(config.uri, "http://localhost:8080");
2277+
assert_eq!(config.warehouse, Some("s3://demo".to_string()));
2278+
2279+
let expect_props = HashMap::from([
2280+
(
2281+
"s3.endpoint".to_string(),
2282+
"http://localhost:9000".to_string(),
2283+
),
2284+
(
2285+
"s3.access_key_id".to_string(),
2286+
"kfc_crazy_thursday".to_string(),
2287+
),
2288+
]);
2289+
assert_eq!(expect_props, config.props);
2290+
}
22602291
}

crates/integrations/cli/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ dirs = {workspace = true}
4141
stacker = {workspace = true}
4242
mimalloc = {workspace = true}
4343
fs-err = {workspace = true}
44+
serde = {workspace = true}
4445

4546
[package.metadata.cargo-machete]
4647
# These dependencies are added to ensure minimal dependency version

crates/integrations/cli/src/catalog.rs

Lines changed: 125 additions & 82 deletions
Original file line numberDiff line numberDiff line change
@@ -20,112 +20,92 @@ use std::collections::HashMap;
2020
use std::path::Path;
2121
use std::sync::Arc;
2222

23-
use anyhow::anyhow;
23+
use anyhow::{anyhow, Context};
2424
use datafusion::catalog::{CatalogProvider, CatalogProviderList};
2525
use fs_err::read_to_string;
2626
use iceberg_catalog_rest::{RestCatalog, RestCatalogConfig};
2727
use iceberg_datafusion::IcebergCatalogProvider;
28+
use serde::Deserialize;
2829
use toml::{Table as TomlTable, Value};
2930

3031
const CONFIG_NAME_CATALOGS: &str = "catalogs";
3132

32-
#[derive(Debug)]
33-
pub struct IcebergCatalogList {
34-
catalogs: HashMap<String, Arc<IcebergCatalogProvider>>,
33+
#[derive(Deserialize, Debug, PartialEq)]
34+
#[serde(tag = "type")]
35+
pub enum CatalogConfig {
36+
#[serde(rename = "rest")]
37+
Rest(RestCatalogConfig),
3538
}
3639

37-
impl IcebergCatalogList {
38-
pub async fn parse(path: &Path) -> anyhow::Result<Self> {
39-
let toml_table: TomlTable = toml::from_str(&read_to_string(path)?)?;
40-
Self::parse_table(&toml_table).await
40+
#[derive(Deserialize, Debug, PartialEq)]
41+
pub struct CatalogConfigDef {
42+
name: String,
43+
#[serde(flatten)]
44+
config: CatalogConfig,
45+
}
46+
47+
impl CatalogConfigDef {
48+
async fn try_into_catalog(self) -> anyhow::Result<Arc<IcebergCatalogProvider>> {
49+
match self.config {
50+
CatalogConfig::Rest(config) => {
51+
let catalog = RestCatalog::new(config);
52+
Ok(Arc::new(
53+
IcebergCatalogProvider::try_new(Arc::new(catalog)).await?,
54+
))
55+
}
56+
}
4157
}
4258

43-
pub async fn parse_table(configs: &TomlTable) -> anyhow::Result<Self> {
44-
if let Value::Array(catalogs_config) =
45-
configs.get(CONFIG_NAME_CATALOGS).ok_or_else(|| {
46-
anyhow::Error::msg(format!("{CONFIG_NAME_CATALOGS} entry not found in config"))
47-
})?
48-
{
49-
let mut catalogs = HashMap::with_capacity(catalogs_config.len());
50-
for config in catalogs_config {
51-
if let Value::Table(table_config) = config {
52-
let (name, catalog_provider) =
53-
IcebergCatalogList::parse_one(table_config).await?;
54-
catalogs.insert(name, catalog_provider);
59+
pub fn parse(root: &TomlTable) -> anyhow::Result<HashMap<String, Self>> {
60+
if let Value::Array(catalog_configs) = root.get(CONFIG_NAME_CATALOGS).ok_or_else(|| {
61+
anyhow::Error::msg(format!("{CONFIG_NAME_CATALOGS} entry not found in config"))
62+
})? {
63+
let mut catalogs = HashMap::with_capacity(catalog_configs.len());
64+
for value in catalog_configs {
65+
if let Value::Table(table) = value {
66+
let catalog: CatalogConfigDef = table.clone().try_into()?;
67+
if catalogs.contains_key(&catalog.name) {
68+
return Err(anyhow!("Duplicate catalog name: {}", catalog.name));
69+
}
70+
catalogs.insert(catalog.name.clone(), catalog);
5571
} else {
5672
return Err(anyhow!("{CONFIG_NAME_CATALOGS} entry must be a table"));
5773
}
5874
}
59-
Ok(Self { catalogs })
75+
Ok(catalogs)
6076
} else {
6177
Err(anyhow!("{CONFIG_NAME_CATALOGS} must be an array of table!"))
6278
}
6379
}
80+
}
6481

65-
async fn parse_one(
66-
config: &TomlTable,
67-
) -> anyhow::Result<(String, Arc<IcebergCatalogProvider>)> {
68-
let name = config
69-
.get("name")
70-
.ok_or_else(|| anyhow::anyhow!("name not found for catalog"))?
71-
.as_str()
72-
.ok_or_else(|| anyhow::anyhow!("name is not string"))?;
73-
74-
let r#type = config
75-
.get("type")
76-
.ok_or_else(|| anyhow::anyhow!("type not found for catalog"))?
77-
.as_str()
78-
.ok_or_else(|| anyhow::anyhow!("type is not string"))?;
79-
80-
if r#type != "rest" {
81-
return Err(anyhow::anyhow!("Only rest catalog is supported for now!"));
82-
}
82+
impl TryFrom<TomlTable> for CatalogConfigDef {
83+
type Error = anyhow::Error;
8384

84-
let catalog_config = config
85-
.get("config")
86-
.ok_or_else(|| anyhow::anyhow!("config not found for catalog {name}"))?
87-
.as_table()
88-
.ok_or_else(|| anyhow::anyhow!("config is not table for catalog {name}"))?;
89-
90-
let uri = catalog_config
91-
.get("uri")
92-
.ok_or_else(|| anyhow::anyhow!("uri not found for catalog {name}"))?
93-
.as_str()
94-
.ok_or_else(|| anyhow::anyhow!("uri is not string"))?;
95-
96-
let warehouse = catalog_config
97-
.get("warehouse")
98-
.ok_or_else(|| anyhow::anyhow!("warehouse not found for catalog {name}"))?
99-
.as_str()
100-
.ok_or_else(|| anyhow::anyhow!("warehouse is not string for catalog {name}"))?;
101-
102-
let props_table = catalog_config
103-
.get("props")
104-
.ok_or_else(|| anyhow::anyhow!("props not found for catalog {name}"))?
105-
.as_table()
106-
.ok_or_else(|| anyhow::anyhow!("props is not table for catalog {name}"))?;
107-
108-
let mut props = HashMap::with_capacity(props_table.len());
109-
for (key, value) in props_table {
110-
let value_str = value
111-
.as_str()
112-
.ok_or_else(|| anyhow::anyhow!("props {key} is not string"))?;
113-
props.insert(key.to_string(), value_str.to_string());
85+
fn try_from(table: TomlTable) -> Result<Self, Self::Error> {
86+
table
87+
.try_into::<CatalogConfigDef>()
88+
.with_context(|| "Failed to parse catalog config".to_string())
89+
}
90+
}
91+
92+
#[derive(Debug)]
93+
pub struct IcebergCatalogList {
94+
catalogs: HashMap<String, Arc<IcebergCatalogProvider>>,
95+
}
96+
97+
impl IcebergCatalogList {
98+
pub async fn parse(path: &Path) -> anyhow::Result<Self> {
99+
let root_config: TomlTable = toml::from_str(&read_to_string(path)?)?;
100+
let catalog_configs = CatalogConfigDef::parse(&root_config)?;
101+
102+
let mut catalogs = HashMap::with_capacity(catalog_configs.len());
103+
for (name, config) in catalog_configs {
104+
let catalog = config.try_into_catalog().await?;
105+
catalogs.insert(name, catalog);
114106
}
115107

116-
let rest_catalog_config = RestCatalogConfig::builder()
117-
.uri(uri.to_string())
118-
.warehouse(warehouse.to_string())
119-
.props(props)
120-
.build();
121-
122-
Ok((
123-
name.to_string(),
124-
Arc::new(
125-
IcebergCatalogProvider::try_new(Arc::new(RestCatalog::new(rest_catalog_config)))
126-
.await?,
127-
),
128-
))
108+
Ok(Self { catalogs })
129109
}
130110
}
131111

@@ -153,3 +133,66 @@ impl CatalogProviderList for IcebergCatalogList {
153133
.map(|c| c.clone() as Arc<dyn CatalogProvider>)
154134
}
155135
}
136+
137+
#[cfg(test)]
138+
mod tests {
139+
use std::collections::HashMap;
140+
141+
use fs_err::read_to_string;
142+
use iceberg_catalog_rest::RestCatalogConfig;
143+
use toml::Table as TomlTable;
144+
145+
use crate::{CatalogConfig, CatalogConfigDef};
146+
147+
#[test]
148+
fn test_parse_config() {
149+
let config_file_path = format!("{}/testdata/catalogs.toml", env!("CARGO_MANIFEST_DIR"));
150+
151+
let root_config: TomlTable =
152+
toml::from_str(&read_to_string(config_file_path).unwrap()).unwrap();
153+
154+
let catalog_configs = CatalogConfigDef::parse(&root_config).unwrap();
155+
156+
assert_eq!(catalog_configs.len(), 2);
157+
158+
let catalog1 = catalog_configs.get("demo").unwrap();
159+
let expected_catalog1 = CatalogConfigDef {
160+
name: "demo".to_string(),
161+
config: CatalogConfig::Rest(
162+
RestCatalogConfig::builder()
163+
.uri("http://localhost:8080".to_string())
164+
.warehouse("s3://iceberg-demo".to_string())
165+
.props(HashMap::from([
166+
(
167+
"s3.endpoint".to_string(),
168+
"http://localhost:9000".to_string(),
169+
),
170+
("s3.access_key_id".to_string(), "admin".to_string()),
171+
]))
172+
.build(),
173+
),
174+
};
175+
176+
assert_eq!(catalog1, &expected_catalog1);
177+
178+
let catalog2 = catalog_configs.get("demo2").unwrap();
179+
let expected_catalog2 = CatalogConfigDef {
180+
name: "demo2".to_string(),
181+
config: CatalogConfig::Rest(
182+
RestCatalogConfig::builder()
183+
.uri("http://localhost2:8080".to_string())
184+
.warehouse("s3://iceberg-demo2".to_string())
185+
.props(HashMap::from([
186+
(
187+
"s3.endpoint".to_string(),
188+
"http://localhost2:9090".to_string(),
189+
),
190+
("s3.access_key_id".to_string(), "admin2".to_string()),
191+
]))
192+
.build(),
193+
),
194+
};
195+
196+
assert_eq!(catalog2, &expected_catalog2);
197+
}
198+
}
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one
2+
# or more contributor license agreements. See the NOTICE file
3+
# distributed with this work for additional information
4+
# regarding copyright ownership. The ASF licenses this file
5+
# to you under the Apache License, Version 2.0 (the
6+
# "License"); you may not use this file except in compliance
7+
# with the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing,
12+
# software distributed under the License is distributed on an
13+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
# KIND, either express or implied. See the License for the
15+
# specific language governing permissions and limitations
16+
# under the License.
17+
18+
[[catalogs]]
19+
name = "demo"
20+
type = "rest"
21+
uri = "http://localhost:8080"
22+
warehouse = "s3://iceberg-demo"
23+
24+
[catalogs.props]
25+
"s3.endpoint" = "http://localhost:9000"
26+
"s3.access_key_id" = "admin"
27+
28+
29+
[[catalogs]]
30+
name = "demo2"
31+
type = "rest"
32+
uri = "http://localhost2:8080"
33+
warehouse = "s3://iceberg-demo2"
34+
35+
[catalogs.props]
36+
"s3.endpoint" = "http://localhost2:9090"
37+
"s3.access_key_id" = "admin2"

0 commit comments

Comments
 (0)