@@ -23,8 +23,7 @@ use iceberg::io::FileIO;
2323use iceberg:: spec:: { TableMetadata , TableMetadataBuilder } ;
2424use iceberg:: table:: Table ;
2525use iceberg:: {
26- Catalog , Error , ErrorKind , Namespace , NamespaceIdent , Result , TableCommit , TableCreation ,
27- TableIdent ,
26+ Catalog , Error , Namespace , NamespaceIdent , Result , TableCommit , TableCreation , TableIdent ,
2827} ;
2928use sqlx:: any:: { install_default_drivers, AnyPoolOptions , AnyQueryResult , AnyRow } ;
3029use sqlx:: { Any , AnyPool , Row , Transaction } ;
@@ -702,11 +701,7 @@ impl Catalog for SqlCatalog {
702701 let tbl_metadata = TableMetadataBuilder :: from_table_creation ( tbl_creation) ?
703702 . build ( ) ?
704703 . metadata ;
705- let tbl_metadata_location = format ! (
706- "{}/metadata/0-{}.metadata.json" ,
707- location. clone( ) ,
708- Uuid :: new_v4( )
709- ) ;
704+ let tbl_metadata_location = metadata_path ( & location, Uuid :: new_v4 ( ) ) ;
710705
711706 let file = self . fileio . new_output ( & tbl_metadata_location) ?;
712707 file. write ( serde_json:: to_vec ( & tbl_metadata) ?. into ( ) )
@@ -769,23 +764,96 @@ impl Catalog for SqlCatalog {
769764 Ok ( ( ) )
770765 }
771766
772- async fn update_table ( & self , _commit : TableCommit ) -> Result < Table > {
773- Err ( Error :: new (
774- ErrorKind :: FeatureUnsupported ,
775- "Updating a table is not supported yet" ,
776- ) )
767+ async fn update_table ( & self , mut commit : TableCommit ) -> Result < Table > {
768+ let identifier = commit. identifier ( ) . clone ( ) ;
769+ if !self . table_exists ( & identifier) . await ? {
770+ return no_such_table_err ( & identifier) ;
771+ }
772+
773+ let requirements = commit. take_requirements ( ) ;
774+ let table_updates = commit. take_updates ( ) ;
775+
776+ let table = self . load_table ( & identifier) . await ?;
777+ let mut update_table_metadata_builder =
778+ TableMetadataBuilder :: new_from_metadata ( table. metadata ( ) . clone ( ) , None ) ;
779+
780+ for table_update in table_updates {
781+ update_table_metadata_builder = table_update. apply ( update_table_metadata_builder) ?;
782+ }
783+
784+ for table_requirement in requirements {
785+ table_requirement. check ( Some ( table. metadata ( ) ) ) ?;
786+ }
787+
788+ let new_table_meta_location = metadata_path ( table. metadata ( ) . location ( ) , Uuid :: new_v4 ( ) ) ;
789+ let file = self . fileio . new_output ( & new_table_meta_location) ?;
790+ let update_table_metadata = update_table_metadata_builder. build ( ) ?;
791+ file. write ( serde_json:: to_vec ( & update_table_metadata. metadata ) ?. into ( ) )
792+ . await ?;
793+
794+ let update = format ! (
795+ "UPDATE {CATALOG_TABLE_NAME}
796+ SET {CATALOG_FIELD_METADATA_LOCATION_PROP} = ?
797+ WHERE {CATALOG_FIELD_CATALOG_NAME} = ?
798+ AND {CATALOG_FIELD_TABLE_NAMESPACE} = ?
799+ AND {CATALOG_FIELD_TABLE_NAME} = ?
800+ AND (
801+ {CATALOG_FIELD_RECORD_TYPE} = '{CATALOG_FIELD_TABLE_RECORD_TYPE}'
802+ OR {CATALOG_FIELD_RECORD_TYPE} IS NULL
803+ )"
804+ ) ;
805+
806+ let namespace_name = identifier. namespace ( ) . join ( "." ) ;
807+ let args: Vec < Option < & str > > = vec ! [
808+ Some ( & new_table_meta_location) ,
809+ Some ( & self . name) ,
810+ Some ( & namespace_name) ,
811+ Some ( identifier. name( ) ) ,
812+ ] ;
813+
814+ let update_result = self . execute ( & update, args, None ) . await ?;
815+ if update_result. rows_affected ( ) != 1 {
816+ return Err ( Error :: new (
817+ iceberg:: ErrorKind :: Unexpected ,
818+ format ! (
819+ "Failed to update Table {:?} from Catalog {:?}" ,
820+ identifier, & self . name,
821+ ) ,
822+ ) ) ;
823+ }
824+
825+ Ok ( Table :: builder ( )
826+ . file_io ( self . fileio . clone ( ) )
827+ . identifier ( identifier)
828+ . metadata_location ( new_table_meta_location)
829+ . metadata ( update_table_metadata. metadata )
830+ . build ( ) ?)
777831 }
778832}
779833
834+ /// Generate the metadata path for a table
835+ #[ inline]
836+ pub fn metadata_path ( meta_data_location : & str , uuid : Uuid ) -> String {
837+ format ! ( "{}/metadata/0-{}.metadata.json" , meta_data_location, uuid)
838+ }
839+
780840#[ cfg( test) ]
781841mod tests {
782842 use std:: collections:: { HashMap , HashSet } ;
783843 use std:: hash:: Hash ;
844+ use iceberg:: spec:: SnapshotReference ;
845+ use iceberg:: spec:: SnapshotRetention ;
846+ use iceberg:: spec:: MAIN_BRANCH ;
784847
785848 use iceberg:: io:: FileIOBuilder ;
786- use iceberg:: spec:: { NestedField , PartitionSpec , PrimitiveType , Schema , SortOrder , Type } ;
849+ use iceberg:: spec:: {
850+ NestedField , Operation , PartitionSpec , PrimitiveType , Schema , Snapshot , SortOrder , Type ,
851+ } ;
787852 use iceberg:: table:: Table ;
788- use iceberg:: { Catalog , Namespace , NamespaceIdent , TableCreation , TableIdent } ;
853+ use iceberg:: {
854+ Catalog , Namespace , NamespaceIdent , TableCommit , TableCreation , TableIdent ,
855+ TableRequirement , TableUpdate ,
856+ } ;
789857 use itertools:: Itertools ;
790858 use regex:: Regex ;
791859 use sqlx:: migrate:: MigrateDatabase ;
@@ -1770,4 +1838,232 @@ mod tests {
17701838 "Unexpected => No such table: TableIdent { namespace: NamespaceIdent([\" a\" ]), name: \" tbl1\" }"
17711839 ) ;
17721840 }
1841+
1842+ #[ tokio:: test]
1843+ async fn test_update_table_throws_error_if_table_not_exist ( ) {
1844+ let warehouse_loc = temp_path ( ) ;
1845+ let catalog = new_sql_catalog ( warehouse_loc. clone ( ) ) . await ;
1846+ let namespace_ident = NamespaceIdent :: new ( "a" . into ( ) ) ;
1847+ let table_name = "tbl1" ;
1848+ let table_ident = TableIdent :: new ( namespace_ident. clone ( ) , table_name. into ( ) ) ;
1849+ create_namespace ( & catalog, & namespace_ident) . await ;
1850+ let table_commit = TableCommit :: builder ( )
1851+ . ident ( table_ident. clone ( ) )
1852+ . updates ( vec ! [ ] )
1853+ . requirements ( vec ! [ ] )
1854+ . build ( ) ;
1855+ let err = catalog
1856+ . update_table ( table_commit)
1857+ . await
1858+ . unwrap_err ( )
1859+ . to_string ( ) ;
1860+ assert_eq ! (
1861+ err,
1862+ "Unexpected => No such table: TableIdent { namespace: NamespaceIdent([\" a\" ]), name: \" tbl1\" }"
1863+ ) ;
1864+ }
1865+
1866+ #[ tokio:: test]
1867+ async fn test_update_table_add_snapshot ( ) {
1868+ let warehouse_loc = temp_path ( ) ;
1869+ let catalog = new_sql_catalog ( warehouse_loc. clone ( ) ) . await ;
1870+ let namespace_ident = NamespaceIdent :: new ( "a" . into ( ) ) ;
1871+ create_namespace ( & catalog, & namespace_ident) . await ;
1872+
1873+ let table_name = "abc" ;
1874+ let location = warehouse_loc. clone ( ) ;
1875+ let table_creation = TableCreation :: builder ( )
1876+ . name ( table_name. into ( ) )
1877+ . location ( location. clone ( ) )
1878+ . schema ( simple_table_schema ( ) )
1879+ . build ( ) ;
1880+
1881+ let expected_table_ident = TableIdent :: new ( namespace_ident. clone ( ) , table_name. into ( ) ) ;
1882+
1883+ assert_table_eq (
1884+ & catalog
1885+ . create_table ( & namespace_ident, table_creation)
1886+ . await
1887+ . unwrap ( ) ,
1888+ & expected_table_ident,
1889+ & simple_table_schema ( ) ,
1890+ ) ;
1891+
1892+ let table = catalog. load_table ( & expected_table_ident) . await . unwrap ( ) ;
1893+ assert_table_eq ( & table, & expected_table_ident, & simple_table_schema ( ) ) ;
1894+
1895+ let table_snapshots_iter = table. metadata ( ) . snapshots ( ) ;
1896+ assert_eq ! ( 0 , table_snapshots_iter. count( ) ) ;
1897+
1898+ // Add snapshot
1899+ let record = r#"
1900+ {
1901+ "snapshot-id": 3051729675574597004,
1902+ "sequence-number": 10,
1903+ "timestamp-ms": 9992191116217,
1904+ "summary": {
1905+ "operation": "append"
1906+ },
1907+ "manifest-list": "s3://b/wh/.../s1.avro",
1908+ "schema-id": 0
1909+ }
1910+ "# ;
1911+
1912+ let snapshot = serde_json:: from_str :: < Snapshot > ( record) . unwrap ( ) ;
1913+ let table_update = TableUpdate :: AddSnapshot {
1914+ snapshot : snapshot. clone ( ) ,
1915+ } ;
1916+ let requirements = vec ! [ ] ;
1917+ let table_commit = TableCommit :: builder ( )
1918+ . ident ( expected_table_ident. clone ( ) )
1919+ . updates ( vec ! [ table_update] )
1920+ . requirements ( requirements)
1921+ . build ( ) ;
1922+ let table = catalog. update_table ( table_commit) . await . unwrap ( ) ;
1923+ let snapshot_vec = table. metadata ( ) . snapshots ( ) . collect_vec ( ) ;
1924+ assert_eq ! ( 1 , snapshot_vec. len( ) ) ;
1925+ let snapshot = & snapshot_vec[ 0 ] ;
1926+ assert_eq ! ( snapshot. snapshot_id( ) , 3051729675574597004 ) ;
1927+ assert_eq ! ( snapshot. timestamp_ms( ) , 9992191116217 ) ;
1928+ assert_eq ! ( snapshot. sequence_number( ) , 10 ) ;
1929+ assert_eq ! ( snapshot. schema_id( ) . unwrap( ) , 0 ) ;
1930+ assert_eq ! ( snapshot. manifest_list( ) , "s3://b/wh/.../s1.avro" ) ;
1931+ assert_eq ! ( snapshot. summary( ) . operation, Operation :: Append ) ;
1932+ assert_eq ! ( snapshot. summary( ) . additional_properties, HashMap :: new( ) ) ;
1933+
1934+ // Add another snapshot
1935+ // Add snapshot
1936+ let record = r#"
1937+ {
1938+ "snapshot-id": 3051729675574597005,
1939+ "sequence-number": 11,
1940+ "timestamp-ms": 9992191117217,
1941+ "summary": {
1942+ "operation": "append"
1943+ },
1944+ "manifest-list": "s3://b/wh/.../s2.avro",
1945+ "schema-id": 0
1946+ }
1947+ "# ;
1948+ let snapshot = serde_json:: from_str :: < Snapshot > ( record) . unwrap ( ) ;
1949+ let table_update = TableUpdate :: AddSnapshot {
1950+ snapshot : snapshot. clone ( ) ,
1951+ } ;
1952+ let requirement = TableRequirement :: RefSnapshotIdMatch {
1953+ r#ref : "main" . to_string ( ) ,
1954+ snapshot_id : Some ( 3051729675574597004 ) ,
1955+ } ;
1956+ let requirements = vec ! [ requirement] ;
1957+ let table_commit = TableCommit :: builder ( )
1958+ . ident ( expected_table_ident. clone ( ) )
1959+ . updates ( vec ! [ table_update] )
1960+ . requirements ( requirements)
1961+ . build ( ) ;
1962+ assert ! ( catalog. update_table( table_commit) . await . is_err( ) ) ;
1963+ }
1964+
1965+ #[ tokio:: test]
1966+ async fn test_update_table_set_snapshot_ref ( ) {
1967+ let warehouse_loc = temp_path ( ) ;
1968+ let catalog = new_sql_catalog ( warehouse_loc. clone ( ) ) . await ;
1969+ let namespace_ident = NamespaceIdent :: new ( "a" . into ( ) ) ;
1970+ create_namespace ( & catalog, & namespace_ident) . await ;
1971+
1972+ let table_name = "abc" ;
1973+ let location = warehouse_loc. clone ( ) ;
1974+ let table_creation = TableCreation :: builder ( )
1975+ . name ( table_name. into ( ) )
1976+ . location ( location. clone ( ) )
1977+ . schema ( simple_table_schema ( ) )
1978+ . build ( ) ;
1979+
1980+ let expected_table_ident = TableIdent :: new ( namespace_ident. clone ( ) , table_name. into ( ) ) ;
1981+
1982+ assert_table_eq (
1983+ & catalog
1984+ . create_table ( & namespace_ident, table_creation)
1985+ . await
1986+ . unwrap ( ) ,
1987+ & expected_table_ident,
1988+ & simple_table_schema ( ) ,
1989+ ) ;
1990+
1991+ let table = catalog. load_table ( & expected_table_ident) . await . unwrap ( ) ;
1992+ assert_table_eq ( & table, & expected_table_ident, & simple_table_schema ( ) ) ;
1993+
1994+ let table_snapshots_iter = table. metadata ( ) . snapshots ( ) ;
1995+ assert_eq ! ( 0 , table_snapshots_iter. count( ) ) ;
1996+ let table = catalog. load_table ( & expected_table_ident) . await . unwrap ( ) ;
1997+ assert_table_eq ( & table, & expected_table_ident, & simple_table_schema ( ) ) ;
1998+
1999+ let table_snapshots_iter = table. metadata ( ) . snapshots ( ) ;
2000+ assert_eq ! ( 0 , table_snapshots_iter. count( ) ) ;
2001+
2002+ // Add snapshot
2003+ let record = r#"
2004+ {
2005+ "snapshot-id": 3051729675574597004,
2006+ "sequence-number": 10,
2007+ "timestamp-ms": 9992191116217,
2008+ "summary": {
2009+ "operation": "append"
2010+ },
2011+ "manifest-list": "s3://b/wh/.../s1.avro",
2012+ "schema-id": 0
2013+ }
2014+ "# ;
2015+
2016+ let snapshot = serde_json:: from_str :: < Snapshot > ( record) . unwrap ( ) ;
2017+ let table_update = TableUpdate :: AddSnapshot {
2018+ snapshot : snapshot. clone ( ) ,
2019+ } ;
2020+ let requirements = vec ! [ ] ;
2021+ let table_commit = TableCommit :: builder ( )
2022+ . ident ( expected_table_ident. clone ( ) )
2023+ . updates ( vec ! [ table_update] )
2024+ . requirements ( requirements)
2025+ . build ( ) ;
2026+ let table = catalog. update_table ( table_commit) . await . unwrap ( ) ;
2027+ let snapshot_vec = table. metadata ( ) . snapshots ( ) . collect_vec ( ) ;
2028+ assert_eq ! ( 1 , snapshot_vec. len( ) ) ;
2029+ let snapshot = & snapshot_vec[ 0 ] ;
2030+ assert_eq ! ( snapshot. snapshot_id( ) , 3051729675574597004 ) ;
2031+ assert_eq ! ( snapshot. timestamp_ms( ) , 9992191116217 ) ;
2032+ assert_eq ! ( snapshot. sequence_number( ) , 10 ) ;
2033+ assert_eq ! ( snapshot. schema_id( ) . unwrap( ) , 0 ) ;
2034+ assert_eq ! ( snapshot. manifest_list( ) , "s3://b/wh/.../s1.avro" ) ;
2035+ assert_eq ! ( snapshot. summary( ) . operation, Operation :: Append ) ;
2036+ assert_eq ! ( snapshot. summary( ) . additional_properties, HashMap :: new( ) ) ;
2037+
2038+ let table_update_set_snapshot_ref = TableUpdate :: SetSnapshotRef {
2039+ ref_name : MAIN_BRANCH . to_string ( ) ,
2040+ reference : SnapshotReference {
2041+ snapshot_id : snapshot. snapshot_id ( ) ,
2042+ retention : SnapshotRetention :: Branch {
2043+ min_snapshots_to_keep : Some ( 10 ) ,
2044+ max_snapshot_age_ms : None ,
2045+ max_ref_age_ms : None ,
2046+ } ,
2047+ } ,
2048+ } ;
2049+
2050+ let table_commit = TableCommit :: builder ( )
2051+ . ident ( expected_table_ident. clone ( ) )
2052+ . updates ( vec ! [ table_update_set_snapshot_ref] )
2053+ . requirements ( vec ! [ ] )
2054+ . build ( ) ;
2055+ let table = catalog. update_table ( table_commit) . await . unwrap ( ) ;
2056+ let snapshot_refs_map = table. metadata ( ) . snapshot_refs ( ) ;
2057+ assert_eq ! ( 1 , snapshot_refs_map. len( ) ) ;
2058+ let snapshot_ref = snapshot_refs_map. get ( MAIN_BRANCH ) . unwrap ( ) ;
2059+ let expected_snapshot_ref = SnapshotReference {
2060+ snapshot_id : 3051729675574597004 ,
2061+ retention : SnapshotRetention :: Branch {
2062+ min_snapshots_to_keep : Some ( 10 ) ,
2063+ max_snapshot_age_ms : None ,
2064+ max_ref_age_ms : None ,
2065+ } ,
2066+ } ;
2067+ assert_eq ! ( snapshot_ref, & expected_snapshot_ref) ;
2068+ }
17732069}
0 commit comments