Skip to content

Commit d63d2d8

Browse files
committed
fix create_table
1 parent dab8e51 commit d63d2d8

File tree

2 files changed

+150
-15
lines changed

2 files changed

+150
-15
lines changed

crates/catalog/s3tables/src/catalog.rs

Lines changed: 147 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -62,10 +62,18 @@ impl S3TablesCatalog {
6262

6363
#[async_trait]
6464
impl Catalog for S3TablesCatalog {
65+
/// List namespaces from s3tables catalog.
66+
///
67+
/// S3Tables doesn't support nested namespaces. If parent is provided, it will
68+
/// return an empty list.
6569
async fn list_namespaces(
6670
&self,
6771
parent: Option<&NamespaceIdent>,
6872
) -> Result<Vec<NamespaceIdent>> {
73+
if parent.is_some() {
74+
return Ok(vec![]);
75+
}
76+
6977
let mut result = Vec::new();
7078
let mut continuation_token = None;
7179
loop {
@@ -91,6 +99,22 @@ impl Catalog for S3TablesCatalog {
9199
Ok(result)
92100
}
93101

102+
/// Creates a new namespace with the given identifier and properties.
103+
///
104+
/// Attempts to create a namespace defined by the `namespace`. The `properties`
105+
/// parameter is ignored.
106+
///
107+
/// The following naming rules apply to namespaces:
108+
///
109+
/// - Names must be between 3 (min) and 63 (max) characters long.
110+
/// - Names can consist only of lowercase letters, numbers, and underscores (_).
111+
/// - Names must begin and end with a letter or number.
112+
/// - Names must not contain hyphens (-) or periods (.).
113+
///
114+
/// This function can return an error in the following situations:
115+
///
116+
/// - Errors from the underlying database creation process, converted using
117+
/// `from_aws_sdk_error`.
94118
async fn create_namespace(
95119
&self,
96120
namespace: &NamespaceIdent,
@@ -108,6 +132,15 @@ impl Catalog for S3TablesCatalog {
108132
))
109133
}
110134

135+
/// Retrieves a namespace by its identifier.
136+
///
137+
/// Validates the given namespace identifier and then queries the
138+
/// underlying database client to fetch the corresponding namespace data.
139+
/// Constructs a `Namespace` object with the retrieved data and returns it.
140+
///
141+
/// This function can return an error in any of the following situations:
142+
/// - If there is an error querying the database, returned by
143+
/// `from_aws_sdk_error`.
111144
async fn get_namespace(&self, namespace: &NamespaceIdent) -> Result<Namespace> {
112145
let req = self
113146
.s3tables_client
@@ -122,6 +155,18 @@ impl Catalog for S3TablesCatalog {
122155
))
123156
}
124157

158+
/// Checks if a namespace exists within the s3tables catalog.
159+
///
160+
/// Validates the namespace identifier by querying the s3tables catalog
161+
/// to determine if the specified namespace exists.
162+
///
163+
/// # Returns
164+
/// A `Result<bool>` indicating the outcome of the check:
165+
/// - `Ok(true)` if the namespace exists.
166+
/// - `Ok(false)` if the namespace does not exist, identified by a specific
167+
/// `IsNotFoundException` variant.
168+
/// - `Err(...)` if an error occurs during validation or the s3tables catalog
169+
/// query, with the error encapsulating the issue.
125170
async fn namespace_exists(&self, namespace: &NamespaceIdent) -> Result<bool> {
126171
let req = self
127172
.s3tables_client
@@ -140,6 +185,10 @@ impl Catalog for S3TablesCatalog {
140185
}
141186
}
142187

188+
/// Updates the properties of an existing namespace.
189+
///
190+
/// S3Tables doesn't support updating namespace properties, so this function
191+
/// will always return an error.
143192
async fn update_namespace(
144193
&self,
145194
namespace: &NamespaceIdent,
@@ -151,6 +200,14 @@ impl Catalog for S3TablesCatalog {
151200
))
152201
}
153202

203+
/// Drops an existing namespace from the s3tables catalog.
204+
///
205+
/// Validates the namespace identifier and then deletes the corresponding
206+
/// namespace from the s3tables catalog.
207+
///
208+
/// This function can return an error in the following situations:
209+
/// - Errors from the underlying database deletion process, converted using
210+
/// `from_aws_sdk_error`.
154211
async fn drop_namespace(&self, namespace: &NamespaceIdent) -> Result<()> {
155212
let req = self
156213
.s3tables_client
@@ -161,6 +218,14 @@ impl Catalog for S3TablesCatalog {
161218
Ok(())
162219
}
163220

221+
/// Lists all tables within a given namespace.
222+
///
223+
/// Retrieves all tables associated with the specified namespace and returns
224+
/// their identifiers.
225+
///
226+
/// This function can return an error in the following situations:
227+
/// - Errors from the underlying database query process, converted using
228+
/// `from_aws_sdk_error`.
164229
async fn list_tables(&self, namespace: &NamespaceIdent) -> Result<Vec<TableIdent>> {
165230
let mut result = Vec::new();
166231
let mut continuation_token = None;
@@ -188,37 +253,68 @@ impl Catalog for S3TablesCatalog {
188253
Ok(result)
189254
}
190255

256+
/// Creates a new table within a specified namespace.
257+
///
258+
/// Attempts to create a table defined by the `creation` parameter. The metadata
259+
/// location is generated by the s3tables catalog, looks like:
260+
///
261+
/// s3://{RANDOM WAREHOUSE LOCATION}/metadata/00087-{UUID}.metadata.json
262+
///
263+
/// We have to get this random warehouse location after the table is created.
264+
///
265+
/// This function can return an error in the following situations:
266+
/// - Errors from the underlying database creation process, converted using
267+
/// `from_aws_sdk_error`.
191268
async fn create_table(
192269
&self,
193270
namespace: &NamespaceIdent,
194271
creation: TableCreation,
195272
) -> Result<Table> {
196273
let table_ident = TableIdent::new(namespace.clone(), creation.name.clone());
197274

275+
// create table
276+
let create_resp: CreateTableOutput = self
277+
.s3tables_client
278+
.create_table()
279+
.table_bucket_arn(self.config.table_bucket_arn.clone())
280+
.namespace(namespace.to_url_string())
281+
.name(table_ident.name())
282+
.send()
283+
.await
284+
.map_err(from_aws_sdk_error)?;
285+
286+
// get warehouse location
287+
let get_resp: GetTableOutput = self
288+
.s3tables_client
289+
.get_table()
290+
.table_bucket_arn(self.config.table_bucket_arn.clone())
291+
.namespace(namespace.to_url_string())
292+
.name(table_ident.name())
293+
.send()
294+
.await
295+
.map_err(from_aws_sdk_error)?;
296+
297+
// write metadata to file
198298
let metadata = TableMetadataBuilder::from_table_creation(creation)?
199299
.build()?
200300
.metadata;
201-
let metadata_location =
202-
create_metadata_location(namespace.to_url_string(), table_ident.name(), 0)?;
301+
let metadata_location = create_metadata_location(
302+
get_resp.warehouse_location(),
303+
get_resp.version_token().parse::<i32>().unwrap(),
304+
)?;
203305
self.file_io
204306
.new_output(&metadata_location)?
205307
.write(serde_json::to_vec(&metadata)?.into())
206308
.await?;
207309

208-
self.s3tables_client
209-
.create_table()
210-
.table_bucket_arn(self.config.table_bucket_arn.clone())
211-
.namespace(namespace.to_url_string())
212-
.name(table_ident.name())
213-
.send()
214-
.await
215-
.map_err(from_aws_sdk_error)?;
310+
// update metadata location
216311
self.s3tables_client
217312
.update_table_metadata_location()
218313
.table_bucket_arn(self.config.table_bucket_arn.clone())
219314
.namespace(namespace.to_url_string())
220315
.name(table_ident.name())
221316
.metadata_location(metadata_location.clone())
317+
.version_token(create_resp.version_token())
222318
.send()
223319
.await
224320
.map_err(from_aws_sdk_error)?;
@@ -232,6 +328,16 @@ impl Catalog for S3TablesCatalog {
232328
Ok(table)
233329
}
234330

331+
/// Loads an existing table from the s3tables catalog.
332+
///
333+
/// Retrieves the metadata location of the specified table and constructs a
334+
/// `Table` object with the retrieved metadata.
335+
///
336+
/// This function can return an error in the following situations:
337+
/// - If the table does not have a metadata location, identified by a specific
338+
/// `Unexpected` variant.
339+
/// - Errors from the underlying database query process, converted using
340+
/// `from_aws_sdk_error`.
235341
async fn load_table(&self, table_ident: &TableIdent) -> Result<Table> {
236342
let req = self
237343
.s3tables_client
@@ -263,6 +369,14 @@ impl Catalog for S3TablesCatalog {
263369
Ok(table)
264370
}
265371

372+
/// Drops an existing table from the s3tables catalog.
373+
///
374+
/// Validates the table identifier and then deletes the corresponding
375+
/// table from the s3tables catalog.
376+
///
377+
/// This function can return an error in the following situations:
378+
/// - Errors from the underlying database deletion process, converted using
379+
/// `from_aws_sdk_error`.
266380
async fn drop_table(&self, table: &TableIdent) -> Result<()> {
267381
let req = self
268382
.s3tables_client
@@ -274,6 +388,18 @@ impl Catalog for S3TablesCatalog {
274388
Ok(())
275389
}
276390

391+
/// Checks if a table exists within the s3tables catalog.
392+
///
393+
/// Validates the table identifier by querying the s3tables catalog
394+
/// to determine if the specified table exists.
395+
///
396+
/// # Returns
397+
/// A `Result<bool>` indicating the outcome of the check:
398+
/// - `Ok(true)` if the table exists.
399+
/// - `Ok(false)` if the table does not exist, identified by a specific
400+
/// `IsNotFoundException` variant.
401+
/// - `Err(...)` if an error occurs during validation or the s3tables catalog
402+
/// query, with the error encapsulating the issue.
277403
async fn table_exists(&self, table_ident: &TableIdent) -> Result<bool> {
278404
let req = self
279405
.s3tables_client
@@ -293,6 +419,14 @@ impl Catalog for S3TablesCatalog {
293419
}
294420
}
295421

422+
/// Renames an existing table within the s3tables catalog.
423+
///
424+
/// Validates the source and destination table identifiers and then renames
425+
/// the source table to the destination table.
426+
///
427+
/// This function can return an error in the following situations:
428+
/// - Errors from the underlying database renaming process, converted using
429+
/// `from_aws_sdk_error`.
296430
async fn rename_table(&self, src: &TableIdent, dest: &TableIdent) -> Result<()> {
297431
let req = self
298432
.s3tables_client
@@ -306,6 +440,9 @@ impl Catalog for S3TablesCatalog {
306440
Ok(())
307441
}
308442

443+
/// Updates an existing table within the s3tables catalog.
444+
///
445+
/// This function is still in development and will always return an error.
309446
async fn update_table(&self, commit: TableCommit) -> Result<Table> {
310447
Err(Error::new(
311448
ErrorKind::FeatureUnsupported,

crates/catalog/s3tables/src/utils.rs

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -57,8 +57,7 @@ pub(crate) async fn create_sdk_config(
5757

5858
/// Create metadata location from `location` and `version`
5959
pub(crate) fn create_metadata_location(
60-
namespace: impl AsRef<str>,
61-
table_name: impl AsRef<str>,
60+
warehouse_location: impl AsRef<str>,
6261
version: i32,
6362
) -> Result<String> {
6463
if version < 0 {
@@ -74,9 +73,8 @@ pub(crate) fn create_metadata_location(
7473
let version = format!("{:0>5}", version);
7574
let id = Uuid::new_v4();
7675
let metadata_location = format!(
77-
"{}/{}/metadata/{}-{}.metadata.json",
78-
namespace.as_ref(),
79-
table_name.as_ref(),
76+
"{}/metadata/{}-{}.metadata.json",
77+
warehouse_location.as_ref(),
8078
version,
8179
id
8280
);

0 commit comments

Comments
 (0)