diff --git a/crates/datafusion_ext/src/errors.rs b/crates/datafusion_ext/src/errors.rs index 1be6d2d56..0fb5fa160 100644 --- a/crates/datafusion_ext/src/errors.rs +++ b/crates/datafusion_ext/src/errors.rs @@ -40,4 +40,13 @@ pub enum ExtensionError { ListingErrBoxed(#[from] Box), } +impl ExtensionError { + pub fn access(err: E) -> Self + where + E: std::error::Error + Send + Sync + 'static, + { + Self::Access(Box::new(err)) + } +} + pub type Result = std::result::Result; diff --git a/crates/datasources/src/lake/iceberg/table.rs b/crates/datasources/src/lake/iceberg/table.rs index 3e62bf08a..735a15d95 100644 --- a/crates/datasources/src/lake/iceberg/table.rs +++ b/crates/datasources/src/lake/iceberg/table.rs @@ -89,28 +89,26 @@ impl TableState { async fn open(location: DatasourceUrl, store: Arc) -> Result { // Get table version. // TODO: Handle not finding a version hint. - let version = { - let path = format_object_path(&location, "metadata/version-hint.text")?; - let path = ObjectPath::parse(path)?; - let bs = store.get(&path).await?.bytes().await?; - let s = String::from_utf8(bs.to_vec()).map_err(|e| { - IcebergError::DataInvalid(format!("Expected utf-8 in version hint: {}", e)) - })?; - - s.parse::().map_err(|e| { - IcebergError::DataInvalid(format!("Expected version hint to be a number: {}", e)) - })? + let path = format_object_path(&location, "metadata/version-hint.text")?; + let path = ObjectPath::parse(path)?; + let bs = store.get(&path).await?.bytes().await?; + let version_contents = String::from_utf8(bs.to_vec()).map_err(|e| { + IcebergError::DataInvalid(format!("Expected utf-8 in version hint: {}", e)) + })?; + // Read the first line of the `version-hint.text` file. + let first_line = if let Some((first_line, _)) = version_contents.split_once('\n') { + first_line + } else { + version_contents.as_str() }; + let version = first_line.trim(); // Read metadata. - let metadata = { - let path = format_object_path(&location, format!("metadata/v{version}.metadata.json"))?; - let bs = store.get(&path).await?.bytes().await?; - let metadata: TableMetadata = serde_json::from_slice(&bs).map_err(|e| { - IcebergError::DataInvalid(format!("Failed to read table metadata: {}", e)) - })?; - metadata - }; + let path = format_object_path(&location, format!("metadata/v{version}.metadata.json"))?; + let bs = store.get(&path).await?.bytes().await?; + let metadata: TableMetadata = serde_json::from_slice(&bs).map_err(|e| { + IcebergError::DataInvalid(format!("Failed to read table metadata: {}", e)) + })?; let resolver = PathResolver::from_metadata(&metadata); diff --git a/crates/sqlbuiltins/src/functions/table/iceberg.rs b/crates/sqlbuiltins/src/functions/table/iceberg.rs index 7997f720e..f018a0cac 100644 --- a/crates/sqlbuiltins/src/functions/table/iceberg.rs +++ b/crates/sqlbuiltins/src/functions/table/iceberg.rs @@ -1,30 +1,3 @@ -mod data_files; -mod scan; -mod snapshots; - -use std::collections::HashMap; -use std::sync::Arc; - -use super::table_location_and_opts; -use async_trait::async_trait; -pub(crate) use data_files::*; -use datafusion::arrow::array::{Int32Builder, Int64Builder, StringBuilder, UInt64Builder}; -use datafusion::arrow::datatypes::{DataType, Field, Schema}; -use datafusion::arrow::record_batch::RecordBatch; -use datafusion::datasource::{MemTable, TableProvider}; -use datafusion_ext::errors::{ExtensionError, Result}; -use datafusion_ext::functions::{FuncParamValue, TableFuncContextProvider}; -use datasources::lake::iceberg::table::IcebergTable; -use datasources::lake::storage_options_into_object_store; -use protogen::metastore::types::catalog::{FunctionType, RuntimePreference}; -pub(crate) use scan::*; -pub(crate) use snapshots::*; - -use super::TableFunc; - -fn box_err(err: E) -> ExtensionError -where - E: std::error::Error + Send + Sync + 'static, -{ - ExtensionError::Access(Box::new(err)) -} +pub mod data_files; +pub mod scan; +pub mod snapshots; diff --git a/crates/sqlbuiltins/src/functions/table/iceberg/data_files.rs b/crates/sqlbuiltins/src/functions/table/iceberg/data_files.rs index e2d631a4e..b918d2f72 100644 --- a/crates/sqlbuiltins/src/functions/table/iceberg/data_files.rs +++ b/crates/sqlbuiltins/src/functions/table/iceberg/data_files.rs @@ -1,6 +1,25 @@ -use super::*; +use std::{collections::HashMap, sync::Arc}; -use crate::functions::ConstBuiltinFunction; +use async_trait::async_trait; +use datafusion::{ + arrow::{ + array::{Int64Builder, StringBuilder, UInt64Builder}, + datatypes::{DataType, Field, Schema}, + record_batch::RecordBatch, + }, + datasource::{MemTable, TableProvider}, +}; +use datafusion_ext::{ + errors::{ExtensionError, Result}, + functions::{FuncParamValue, TableFuncContextProvider}, +}; +use datasources::lake::{iceberg::table::IcebergTable, storage_options_into_object_store}; +use protogen::metastore::types::catalog::{FunctionType, RuntimePreference}; + +use crate::functions::{ + table::{table_location_and_opts, TableFunc}, + ConstBuiltinFunction, +}; /// Scan data file metadata for the current snapshot of an iceberg table. Will /// not attempt to read data files. @@ -28,10 +47,16 @@ impl TableFunc for IcebergDataFiles { ) -> Result> { let (loc, opts) = table_location_and_opts(ctx, args, &mut opts)?; - let store = storage_options_into_object_store(&loc, &opts).map_err(box_err)?; - let table = IcebergTable::open(loc, store).await.map_err(box_err)?; + let store = + storage_options_into_object_store(&loc, &opts).map_err(ExtensionError::access)?; + let table = IcebergTable::open(loc, store) + .await + .map_err(ExtensionError::access)?; - let manifests = table.read_manifests().await.map_err(box_err)?; + let manifests = table + .read_manifests() + .await + .map_err(ExtensionError::access)?; let schema = Arc::new(Schema::new(vec![ Field::new("manifest_index", DataType::UInt64, false), diff --git a/crates/sqlbuiltins/src/functions/table/iceberg/scan.rs b/crates/sqlbuiltins/src/functions/table/iceberg/scan.rs index 6653ab188..adbf7392d 100644 --- a/crates/sqlbuiltins/src/functions/table/iceberg/scan.rs +++ b/crates/sqlbuiltins/src/functions/table/iceberg/scan.rs @@ -1,6 +1,18 @@ -use super::*; +use std::{collections::HashMap, sync::Arc}; -use crate::functions::ConstBuiltinFunction; +use async_trait::async_trait; +use datafusion::datasource::TableProvider; +use datafusion_ext::{ + errors::{ExtensionError, Result}, + functions::{FuncParamValue, TableFuncContextProvider}, +}; +use datasources::lake::{iceberg::table::IcebergTable, storage_options_into_object_store}; +use protogen::metastore::types::catalog::{FunctionType, RuntimePreference}; + +use crate::functions::{ + table::{table_location_and_opts, TableFunc}, + ConstBuiltinFunction, +}; /// Scan an iceberg table. #[derive(Debug, Clone, Copy)] @@ -29,11 +41,12 @@ impl TableFunc for IcebergScan { // TODO: Reduce duplication let (loc, opts) = table_location_and_opts(ctx, args, &mut opts)?; - let store = storage_options_into_object_store(&loc, &opts).map_err(box_err)?; + let store = + storage_options_into_object_store(&loc, &opts).map_err(ExtensionError::access)?; let table = IcebergTable::open(loc.clone(), store) .await - .map_err(box_err)?; - let reader = table.table_reader().await.map_err(box_err)?; + .map_err(ExtensionError::access)?; + let reader = table.table_reader().await.map_err(ExtensionError::access)?; Ok(reader) } diff --git a/crates/sqlbuiltins/src/functions/table/iceberg/snapshots.rs b/crates/sqlbuiltins/src/functions/table/iceberg/snapshots.rs index 9980ac9c6..d2dbf7314 100644 --- a/crates/sqlbuiltins/src/functions/table/iceberg/snapshots.rs +++ b/crates/sqlbuiltins/src/functions/table/iceberg/snapshots.rs @@ -1,6 +1,25 @@ -use super::*; +use std::{collections::HashMap, sync::Arc}; -use crate::functions::ConstBuiltinFunction; +use async_trait::async_trait; +use datafusion::{ + arrow::{ + array::{Int32Builder, Int64Builder, StringBuilder}, + datatypes::{DataType, Field, Schema}, + record_batch::RecordBatch, + }, + datasource::{MemTable, TableProvider}, +}; +use datafusion_ext::{ + errors::{ExtensionError, Result}, + functions::{FuncParamValue, TableFuncContextProvider}, +}; +use datasources::lake::{iceberg::table::IcebergTable, storage_options_into_object_store}; +use protogen::metastore::types::catalog::{FunctionType, RuntimePreference}; + +use crate::functions::{ + table::{table_location_and_opts, TableFunc}, + ConstBuiltinFunction, +}; /// Scan snapshot information for an iceberg tables. Will not attempt to read /// data files. @@ -28,8 +47,11 @@ impl TableFunc for IcebergSnapshots { ) -> Result> { let (loc, opts) = table_location_and_opts(ctx, args, &mut opts)?; - let store = storage_options_into_object_store(&loc, &opts).map_err(box_err)?; - let table = IcebergTable::open(loc, store).await.map_err(box_err)?; + let store = + storage_options_into_object_store(&loc, &opts).map_err(ExtensionError::access)?; + let table = IcebergTable::open(loc, store) + .await + .map_err(ExtensionError::access)?; let snapshots = &table.metadata().snapshots; diff --git a/crates/sqlbuiltins/src/functions/table/mod.rs b/crates/sqlbuiltins/src/functions/table/mod.rs index 61362fa4c..856184dd2 100644 --- a/crates/sqlbuiltins/src/functions/table/mod.rs +++ b/crates/sqlbuiltins/src/functions/table/mod.rs @@ -30,7 +30,7 @@ use self::bigquery::ReadBigQuery; use self::delta::DeltaScan; use self::excel::ExcelScan; use self::generate_series::GenerateSeries; -use self::iceberg::{IcebergDataFiles, IcebergScan, IcebergSnapshots}; +use self::iceberg::{data_files::IcebergDataFiles, scan::IcebergScan, snapshots::IcebergSnapshots}; use self::lance::LanceScan; use self::mongo::ReadMongoDb; use self::mysql::ReadMysql; diff --git a/testdata/iceberg/tables/lineitem_simple_longversion/data/.00000-1-be6e61d5-99c3-497e-9ed4-0bd7e131f18e-00001.parquet.crc b/testdata/iceberg/tables/lineitem_simple_longversion/data/.00000-1-be6e61d5-99c3-497e-9ed4-0bd7e131f18e-00001.parquet.crc new file mode 100644 index 000000000..8ec027877 Binary files /dev/null and b/testdata/iceberg/tables/lineitem_simple_longversion/data/.00000-1-be6e61d5-99c3-497e-9ed4-0bd7e131f18e-00001.parquet.crc differ diff --git a/testdata/iceberg/tables/lineitem_simple_longversion/data/00000-1-be6e61d5-99c3-497e-9ed4-0bd7e131f18e-00001.parquet b/testdata/iceberg/tables/lineitem_simple_longversion/data/00000-1-be6e61d5-99c3-497e-9ed4-0bd7e131f18e-00001.parquet new file mode 100644 index 000000000..9191c9f6b Binary files /dev/null and b/testdata/iceberg/tables/lineitem_simple_longversion/data/00000-1-be6e61d5-99c3-497e-9ed4-0bd7e131f18e-00001.parquet differ diff --git a/testdata/iceberg/tables/lineitem_simple_longversion/metadata/.84151bf4-ec53-4513-ace5-b94e197e6162-m0.avro.crc b/testdata/iceberg/tables/lineitem_simple_longversion/metadata/.84151bf4-ec53-4513-ace5-b94e197e6162-m0.avro.crc new file mode 100644 index 000000000..2691e68f9 Binary files /dev/null and b/testdata/iceberg/tables/lineitem_simple_longversion/metadata/.84151bf4-ec53-4513-ace5-b94e197e6162-m0.avro.crc differ diff --git a/testdata/iceberg/tables/lineitem_simple_longversion/metadata/.snap-7051076103797751626-1-84151bf4-ec53-4513-ace5-b94e197e6162.avro.crc b/testdata/iceberg/tables/lineitem_simple_longversion/metadata/.snap-7051076103797751626-1-84151bf4-ec53-4513-ace5-b94e197e6162.avro.crc new file mode 100644 index 000000000..8b510f6bd Binary files /dev/null and b/testdata/iceberg/tables/lineitem_simple_longversion/metadata/.snap-7051076103797751626-1-84151bf4-ec53-4513-ace5-b94e197e6162.avro.crc differ diff --git a/testdata/iceberg/tables/lineitem_simple_longversion/metadata/.v1.metadata.json.crc b/testdata/iceberg/tables/lineitem_simple_longversion/metadata/.v1.metadata.json.crc new file mode 100644 index 000000000..e7e2abe28 Binary files /dev/null and b/testdata/iceberg/tables/lineitem_simple_longversion/metadata/.v1.metadata.json.crc differ diff --git a/testdata/iceberg/tables/lineitem_simple_longversion/metadata/.version-hint.text.crc b/testdata/iceberg/tables/lineitem_simple_longversion/metadata/.version-hint.text.crc new file mode 100644 index 000000000..04ea5726b Binary files /dev/null and b/testdata/iceberg/tables/lineitem_simple_longversion/metadata/.version-hint.text.crc differ diff --git a/testdata/iceberg/tables/lineitem_simple_longversion/metadata/84151bf4-ec53-4513-ace5-b94e197e6162-m0.avro b/testdata/iceberg/tables/lineitem_simple_longversion/metadata/84151bf4-ec53-4513-ace5-b94e197e6162-m0.avro new file mode 100644 index 000000000..8a3f0ea07 Binary files /dev/null and b/testdata/iceberg/tables/lineitem_simple_longversion/metadata/84151bf4-ec53-4513-ace5-b94e197e6162-m0.avro differ diff --git a/testdata/iceberg/tables/lineitem_simple_longversion/metadata/snap-7051076103797751626-1-84151bf4-ec53-4513-ace5-b94e197e6162.avro b/testdata/iceberg/tables/lineitem_simple_longversion/metadata/snap-7051076103797751626-1-84151bf4-ec53-4513-ace5-b94e197e6162.avro new file mode 100644 index 000000000..5e36741b4 Binary files /dev/null and b/testdata/iceberg/tables/lineitem_simple_longversion/metadata/snap-7051076103797751626-1-84151bf4-ec53-4513-ace5-b94e197e6162.avro differ diff --git a/testdata/iceberg/tables/lineitem_simple_longversion/metadata/v100000000000.metadata.json b/testdata/iceberg/tables/lineitem_simple_longversion/metadata/v100000000000.metadata.json new file mode 100644 index 000000000..6919dece3 --- /dev/null +++ b/testdata/iceberg/tables/lineitem_simple_longversion/metadata/v100000000000.metadata.json @@ -0,0 +1,143 @@ +{ + "format-version" : 2, + "table-uuid" : "6f4d8a47-2611-4f70-b018-defcbfc6377e", + "location" : "./iceberg/tables/lineitem_simple", + "last-sequence-number" : 1, + "last-updated-ms" : 1690903621972, + "last-column-id" : 16, + "current-schema-id" : 0, + "schemas" : [ { + "type" : "struct", + "schema-id" : 0, + "fields" : [ { + "id" : 1, + "name" : "l_orderkey", + "required" : false, + "type" : "long" + }, { + "id" : 2, + "name" : "l_partkey", + "required" : false, + "type" : "long" + }, { + "id" : 3, + "name" : "l_suppkey", + "required" : false, + "type" : "long" + }, { + "id" : 4, + "name" : "l_linenumber", + "required" : false, + "type" : "int" + }, { + "id" : 5, + "name" : "l_quantity", + "required" : false, + "type" : "decimal(15, 2)" + }, { + "id" : 6, + "name" : "l_extendedprice", + "required" : false, + "type" : "decimal(15, 2)" + }, { + "id" : 7, + "name" : "l_discount", + "required" : false, + "type" : "decimal(15, 2)" + }, { + "id" : 8, + "name" : "l_tax", + "required" : false, + "type" : "decimal(15, 2)" + }, { + "id" : 9, + "name" : "l_returnflag", + "required" : false, + "type" : "string" + }, { + "id" : 10, + "name" : "l_linestatus", + "required" : false, + "type" : "string" + }, { + "id" : 11, + "name" : "l_shipdate", + "required" : false, + "type" : "date" + }, { + "id" : 12, + "name" : "l_commitdate", + "required" : false, + "type" : "date" + }, { + "id" : 13, + "name" : "l_receiptdate", + "required" : false, + "type" : "date" + }, { + "id" : 14, + "name" : "l_shipinstruct", + "required" : false, + "type" : "string" + }, { + "id" : 15, + "name" : "l_shipmode", + "required" : false, + "type" : "string" + }, { + "id" : 16, + "name" : "l_comment", + "required" : false, + "type" : "string" + } ] + } ], + "default-spec-id" : 0, + "partition-specs" : [ { + "spec-id" : 0, + "fields" : [ ] + } ], + "last-partition-id" : 999, + "default-sort-order-id" : 0, + "sort-orders" : [ { + "order-id" : 0, + "fields" : [ ] + } ], + "properties" : { + "owner" : "sean", + "write.update.mode" : "merge-on-read" + }, + "current-snapshot-id" : 7051076103797751626, + "refs" : { + "main" : { + "snapshot-id" : 7051076103797751626, + "type" : "branch" + } + }, + "snapshots" : [ { + "sequence-number" : 1, + "snapshot-id" : 7051076103797751626, + "timestamp-ms" : 1690903621972, + "summary" : { + "operation" : "append", + "spark.app.id" : "local-1690903619201", + "added-data-files" : "1", + "added-records" : "1000", + "added-files-size" : "37204", + "changed-partition-count" : "1", + "total-records" : "1000", + "total-files-size" : "37204", + "total-data-files" : "1", + "total-delete-files" : "0", + "total-position-deletes" : "0", + "total-equality-deletes" : "0" + }, + "manifest-list" : "iceberg/tables/lineitem_simple/metadata/snap-7051076103797751626-1-84151bf4-ec53-4513-ace5-b94e197e6162.avro", + "schema-id" : 0 + } ], + "statistics" : [ ], + "snapshot-log" : [ { + "timestamp-ms" : 1690903621972, + "snapshot-id" : 7051076103797751626 + } ], + "metadata-log" : [ ] +} \ No newline at end of file diff --git a/testdata/iceberg/tables/lineitem_simple_longversion/metadata/version-hint.text b/testdata/iceberg/tables/lineitem_simple_longversion/metadata/version-hint.text new file mode 100644 index 000000000..8ce5fb1fd --- /dev/null +++ b/testdata/iceberg/tables/lineitem_simple_longversion/metadata/version-hint.text @@ -0,0 +1 @@ +100000000000 \ No newline at end of file diff --git a/testdata/sqllogictests_iceberg/local.slt b/testdata/sqllogictests_iceberg/local.slt index c8ec7ef00..62537d384 100644 --- a/testdata/sqllogictests_iceberg/local.slt +++ b/testdata/sqllogictests_iceberg/local.slt @@ -94,3 +94,11 @@ REG AIR 314 SHIP 316 TRUCK 264 +# We should be able to query a table with version number longer than i32 limit. +# +# See: https://github.com/GlareDB/glaredb/issues/2277 +query T +select count(*) = 1000 + from iceberg_scan('../../testdata/iceberg/tables/lineitem_simple_longversion'); +---- +t