Skip to content

Commit b6fcbee

Browse files
committed
fix create tables
1 parent 4a9817b commit b6fcbee

File tree

1 file changed

+74
-12
lines changed

1 file changed

+74
-12
lines changed

crates/catalog/s3tables/src/catalog.rs

Lines changed: 74 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ use aws_sdk_s3tables::operation::create_table::CreateTableOutput;
66
use aws_sdk_s3tables::operation::get_namespace::GetNamespaceOutput;
77
use aws_sdk_s3tables::operation::get_table::GetTableOutput;
88
use aws_sdk_s3tables::operation::list_tables::ListTablesOutput;
9+
use aws_sdk_s3tables::types::OpenTableFormat;
910
use iceberg::io::FileIO;
1011
use iceberg::spec::{TableMetadata, TableMetadataBuilder};
1112
use iceberg::table::Table;
@@ -263,12 +264,14 @@ impl Catalog for S3TablesCatalog {
263264
/// We have to get this random warehouse location after the table is created.
264265
///
265266
/// This function can return an error in the following situations:
267+
/// - If the location of the table is set by user, identified by a specific
268+
/// `DataInvalid` variant.
266269
/// - Errors from the underlying database creation process, converted using
267270
/// `from_aws_sdk_error`.
268271
async fn create_table(
269272
&self,
270273
namespace: &NamespaceIdent,
271-
creation: TableCreation,
274+
mut creation: TableCreation,
272275
) -> Result<Table> {
273276
let table_ident = TableIdent::new(namespace.clone(), creation.name.clone());
274277

@@ -278,27 +281,41 @@ impl Catalog for S3TablesCatalog {
278281
.create_table()
279282
.table_bucket_arn(self.config.table_bucket_arn.clone())
280283
.namespace(namespace.to_url_string())
284+
.format(OpenTableFormat::Iceberg)
281285
.name(table_ident.name())
282286
.send()
283287
.await
284288
.map_err(from_aws_sdk_error)?;
285289

286-
// get warehouse location
287-
let get_resp: GetTableOutput = self
288-
.s3tables_client
289-
.get_table()
290-
.table_bucket_arn(self.config.table_bucket_arn.clone())
291-
.namespace(namespace.to_url_string())
292-
.name(table_ident.name())
293-
.send()
294-
.await
295-
.map_err(from_aws_sdk_error)?;
290+
// prepare metadata location. the warehouse location is generated by s3tables catalog,
291+
// which looks like: s3://e6c9bf20-991a-46fb-kni5xs1q2yxi3xxdyxzjzigdeop1quse2b--table-s3
292+
let metadata_location = match &creation.location {
293+
Some(_) => {
294+
return Err(Error::new(
295+
ErrorKind::DataInvalid,
296+
"The location of the table is generated by s3tables catalog, can't be set by user.",
297+
));
298+
}
299+
None => {
300+
let get_resp: GetTableOutput = self
301+
.s3tables_client
302+
.get_table()
303+
.table_bucket_arn(self.config.table_bucket_arn.clone())
304+
.namespace(namespace.to_url_string())
305+
.name(table_ident.name())
306+
.send()
307+
.await
308+
.map_err(from_aws_sdk_error)?;
309+
let warehouse_location = get_resp.warehouse_location().to_string();
310+
create_metadata_location(warehouse_location, 0)?
311+
}
312+
};
296313

297314
// write metadata to file
315+
creation.location = Some(metadata_location.clone());
298316
let metadata = TableMetadataBuilder::from_table_creation(creation)?
299317
.build()?
300318
.metadata;
301-
let metadata_location = create_metadata_location(get_resp.warehouse_location(), 0)?;
302319
self.file_io
303320
.new_output(&metadata_location)?
304321
.write(serde_json::to_vec(&metadata)?.into())
@@ -460,6 +477,8 @@ where T: std::fmt::Debug {
460477

461478
#[cfg(test)]
462479
mod tests {
480+
use iceberg::spec::{NestedField, PrimitiveType, Schema, Type};
481+
463482
use super::*;
464483

465484
async fn load_s3tables_catalog_from_env() -> Result<Option<S3TablesCatalog>> {
@@ -541,4 +560,47 @@ mod tests {
541560
catalog.drop_namespace(&namespace).await.unwrap();
542561
assert!(!catalog.namespace_exists(&namespace).await.unwrap());
543562
}
563+
564+
#[tokio::test]
565+
async fn test_s3tables_create_delete_table() {
566+
let catalog = match load_s3tables_catalog_from_env().await {
567+
Ok(Some(catalog)) => catalog,
568+
Ok(None) => return,
569+
Err(e) => panic!("Error loading catalog: {}", e),
570+
};
571+
572+
let creation = {
573+
let schema = Schema::builder()
574+
.with_schema_id(0)
575+
.with_fields(vec![
576+
NestedField::required(1, "foo", Type::Primitive(PrimitiveType::Int)).into(),
577+
NestedField::required(2, "bar", Type::Primitive(PrimitiveType::String)).into(),
578+
])
579+
.build()
580+
.unwrap();
581+
TableCreation::builder()
582+
.name("test_s3tables_create_delete_table".to_string())
583+
.properties(HashMap::new())
584+
.schema(schema)
585+
.build()
586+
};
587+
588+
let namespace = NamespaceIdent::new("test_s3tables_create_delete_table".to_string());
589+
let table_ident = TableIdent::new(
590+
namespace.clone(),
591+
"test_s3tables_create_delete_table".to_string(),
592+
);
593+
catalog.drop_namespace(&namespace).await.ok();
594+
catalog.drop_table(&table_ident).await.ok();
595+
596+
catalog
597+
.create_namespace(&namespace, HashMap::new())
598+
.await
599+
.unwrap();
600+
catalog.create_table(&namespace, creation).await.unwrap();
601+
assert!(catalog.table_exists(&table_ident).await.unwrap());
602+
catalog.drop_table(&table_ident).await.unwrap();
603+
assert!(!catalog.table_exists(&table_ident).await.unwrap());
604+
catalog.drop_namespace(&namespace).await.unwrap();
605+
}
544606
}

0 commit comments

Comments
 (0)