Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Don't parse version-hint as a number for iceberg tables #2281

Merged
merged 2 commits into from
Dec 19, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions crates/datafusion_ext/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,4 +40,13 @@ pub enum ExtensionError {
ListingErrBoxed(#[from] Box<dyn std::error::Error + Sync + Send>),
}

impl ExtensionError {
pub fn access<E>(err: E) -> Self
where
E: std::error::Error + Send + Sync + 'static,
{
Self::Access(Box::new(err))
}
}

pub type Result<T, E = ExtensionError> = std::result::Result<T, E>;
36 changes: 17 additions & 19 deletions crates/datasources/src/lake/iceberg/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,28 +89,26 @@ impl TableState {
async fn open(location: DatasourceUrl, store: Arc<dyn ObjectStore>) -> Result<TableState> {
// Get table version.
// TODO: Handle not finding a version hint.
let version = {
let path = format_object_path(&location, "metadata/version-hint.text")?;
let path = ObjectPath::parse(path)?;
let bs = store.get(&path).await?.bytes().await?;
let s = String::from_utf8(bs.to_vec()).map_err(|e| {
IcebergError::DataInvalid(format!("Expected utf-8 in version hint: {}", e))
})?;

s.parse::<i32>().map_err(|e| {
IcebergError::DataInvalid(format!("Expected version hint to be a number: {}", e))
})?
let path = format_object_path(&location, "metadata/version-hint.text")?;
let path = ObjectPath::parse(path)?;
let bs = store.get(&path).await?.bytes().await?;
let version_contents = String::from_utf8(bs.to_vec()).map_err(|e| {
IcebergError::DataInvalid(format!("Expected utf-8 in version hint: {}", e))
})?;
// Read the first line of the `version-hint.text` file.
let first_line = if let Some((first_line, _)) = version_contents.split_once('\n') {
first_line
} else {
version_contents.as_str()
};
let version = first_line.trim();

// Read metadata.
let metadata = {
let path = format_object_path(&location, format!("metadata/v{version}.metadata.json"))?;
let bs = store.get(&path).await?.bytes().await?;
let metadata: TableMetadata = serde_json::from_slice(&bs).map_err(|e| {
IcebergError::DataInvalid(format!("Failed to read table metadata: {}", e))
})?;
metadata
};
let path = format_object_path(&location, format!("metadata/v{version}.metadata.json"))?;
let bs = store.get(&path).await?.bytes().await?;
let metadata: TableMetadata = serde_json::from_slice(&bs).map_err(|e| {
IcebergError::DataInvalid(format!("Failed to read table metadata: {}", e))
})?;

let resolver = PathResolver::from_metadata(&metadata);

Expand Down
33 changes: 3 additions & 30 deletions crates/sqlbuiltins/src/functions/table/iceberg.rs
Original file line number Diff line number Diff line change
@@ -1,30 +1,3 @@
mod data_files;
mod scan;
mod snapshots;

use std::collections::HashMap;
use std::sync::Arc;

use super::table_location_and_opts;
use async_trait::async_trait;
pub(crate) use data_files::*;
use datafusion::arrow::array::{Int32Builder, Int64Builder, StringBuilder, UInt64Builder};
use datafusion::arrow::datatypes::{DataType, Field, Schema};
use datafusion::arrow::record_batch::RecordBatch;
use datafusion::datasource::{MemTable, TableProvider};
use datafusion_ext::errors::{ExtensionError, Result};
use datafusion_ext::functions::{FuncParamValue, TableFuncContextProvider};
use datasources::lake::iceberg::table::IcebergTable;
use datasources::lake::storage_options_into_object_store;
use protogen::metastore::types::catalog::{FunctionType, RuntimePreference};
pub(crate) use scan::*;
pub(crate) use snapshots::*;

use super::TableFunc;

fn box_err<E>(err: E) -> ExtensionError
where
E: std::error::Error + Send + Sync + 'static,
{
ExtensionError::Access(Box::new(err))
}
pub mod data_files;
pub mod scan;
pub mod snapshots;
35 changes: 30 additions & 5 deletions crates/sqlbuiltins/src/functions/table/iceberg/data_files.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,25 @@
use super::*;
use std::{collections::HashMap, sync::Arc};

use crate::functions::ConstBuiltinFunction;
use async_trait::async_trait;
use datafusion::{
arrow::{
array::{Int64Builder, StringBuilder, UInt64Builder},
datatypes::{DataType, Field, Schema},
record_batch::RecordBatch,
},
datasource::{MemTable, TableProvider},
};
use datafusion_ext::{
errors::{ExtensionError, Result},
functions::{FuncParamValue, TableFuncContextProvider},
};
use datasources::lake::{iceberg::table::IcebergTable, storage_options_into_object_store};
use protogen::metastore::types::catalog::{FunctionType, RuntimePreference};

use crate::functions::{
table::{table_location_and_opts, TableFunc},
ConstBuiltinFunction,
};

/// Scan data file metadata for the current snapshot of an iceberg table. Will
/// not attempt to read data files.
Expand Down Expand Up @@ -28,10 +47,16 @@ impl TableFunc for IcebergDataFiles {
) -> Result<Arc<dyn TableProvider>> {
let (loc, opts) = table_location_and_opts(ctx, args, &mut opts)?;

let store = storage_options_into_object_store(&loc, &opts).map_err(box_err)?;
let table = IcebergTable::open(loc, store).await.map_err(box_err)?;
let store =
storage_options_into_object_store(&loc, &opts).map_err(ExtensionError::access)?;
let table = IcebergTable::open(loc, store)
.await
.map_err(ExtensionError::access)?;

let manifests = table.read_manifests().await.map_err(box_err)?;
let manifests = table
.read_manifests()
.await
.map_err(ExtensionError::access)?;

let schema = Arc::new(Schema::new(vec![
Field::new("manifest_index", DataType::UInt64, false),
Expand Down
23 changes: 18 additions & 5 deletions crates/sqlbuiltins/src/functions/table/iceberg/scan.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,18 @@
use super::*;
use std::{collections::HashMap, sync::Arc};

use crate::functions::ConstBuiltinFunction;
use async_trait::async_trait;
use datafusion::datasource::TableProvider;
use datafusion_ext::{
errors::{ExtensionError, Result},
functions::{FuncParamValue, TableFuncContextProvider},
};
use datasources::lake::{iceberg::table::IcebergTable, storage_options_into_object_store};
use protogen::metastore::types::catalog::{FunctionType, RuntimePreference};

use crate::functions::{
table::{table_location_and_opts, TableFunc},
ConstBuiltinFunction,
};

/// Scan an iceberg table.
#[derive(Debug, Clone, Copy)]
Expand Down Expand Up @@ -29,11 +41,12 @@ impl TableFunc for IcebergScan {
// TODO: Reduce duplication
let (loc, opts) = table_location_and_opts(ctx, args, &mut opts)?;

let store = storage_options_into_object_store(&loc, &opts).map_err(box_err)?;
let store =
storage_options_into_object_store(&loc, &opts).map_err(ExtensionError::access)?;
let table = IcebergTable::open(loc.clone(), store)
.await
.map_err(box_err)?;
let reader = table.table_reader().await.map_err(box_err)?;
.map_err(ExtensionError::access)?;
let reader = table.table_reader().await.map_err(ExtensionError::access)?;

Ok(reader)
}
Expand Down
30 changes: 26 additions & 4 deletions crates/sqlbuiltins/src/functions/table/iceberg/snapshots.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,25 @@
use super::*;
use std::{collections::HashMap, sync::Arc};

use crate::functions::ConstBuiltinFunction;
use async_trait::async_trait;
use datafusion::{
arrow::{
array::{Int32Builder, Int64Builder, StringBuilder},
datatypes::{DataType, Field, Schema},
record_batch::RecordBatch,
},
datasource::{MemTable, TableProvider},
};
use datafusion_ext::{
errors::{ExtensionError, Result},
functions::{FuncParamValue, TableFuncContextProvider},
};
use datasources::lake::{iceberg::table::IcebergTable, storage_options_into_object_store};
use protogen::metastore::types::catalog::{FunctionType, RuntimePreference};

use crate::functions::{
table::{table_location_and_opts, TableFunc},
ConstBuiltinFunction,
};

/// Scan snapshot information for an iceberg tables. Will not attempt to read
/// data files.
Expand Down Expand Up @@ -28,8 +47,11 @@ impl TableFunc for IcebergSnapshots {
) -> Result<Arc<dyn TableProvider>> {
let (loc, opts) = table_location_and_opts(ctx, args, &mut opts)?;

let store = storage_options_into_object_store(&loc, &opts).map_err(box_err)?;
let table = IcebergTable::open(loc, store).await.map_err(box_err)?;
let store =
storage_options_into_object_store(&loc, &opts).map_err(ExtensionError::access)?;
let table = IcebergTable::open(loc, store)
.await
.map_err(ExtensionError::access)?;

let snapshots = &table.metadata().snapshots;

Expand Down
2 changes: 1 addition & 1 deletion crates/sqlbuiltins/src/functions/table/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use self::bigquery::ReadBigQuery;
use self::delta::DeltaScan;
use self::excel::ExcelScan;
use self::generate_series::GenerateSeries;
use self::iceberg::{IcebergDataFiles, IcebergScan, IcebergSnapshots};
use self::iceberg::{data_files::IcebergDataFiles, scan::IcebergScan, snapshots::IcebergSnapshots};
use self::lance::LanceScan;
use self::mongo::ReadMongoDb;
use self::mysql::ReadMysql;
Expand Down
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
{
"format-version" : 2,
"table-uuid" : "6f4d8a47-2611-4f70-b018-defcbfc6377e",
"location" : "./iceberg/tables/lineitem_simple",
"last-sequence-number" : 1,
"last-updated-ms" : 1690903621972,
"last-column-id" : 16,
"current-schema-id" : 0,
"schemas" : [ {
"type" : "struct",
"schema-id" : 0,
"fields" : [ {
"id" : 1,
"name" : "l_orderkey",
"required" : false,
"type" : "long"
}, {
"id" : 2,
"name" : "l_partkey",
"required" : false,
"type" : "long"
}, {
"id" : 3,
"name" : "l_suppkey",
"required" : false,
"type" : "long"
}, {
"id" : 4,
"name" : "l_linenumber",
"required" : false,
"type" : "int"
}, {
"id" : 5,
"name" : "l_quantity",
"required" : false,
"type" : "decimal(15, 2)"
}, {
"id" : 6,
"name" : "l_extendedprice",
"required" : false,
"type" : "decimal(15, 2)"
}, {
"id" : 7,
"name" : "l_discount",
"required" : false,
"type" : "decimal(15, 2)"
}, {
"id" : 8,
"name" : "l_tax",
"required" : false,
"type" : "decimal(15, 2)"
}, {
"id" : 9,
"name" : "l_returnflag",
"required" : false,
"type" : "string"
}, {
"id" : 10,
"name" : "l_linestatus",
"required" : false,
"type" : "string"
}, {
"id" : 11,
"name" : "l_shipdate",
"required" : false,
"type" : "date"
}, {
"id" : 12,
"name" : "l_commitdate",
"required" : false,
"type" : "date"
}, {
"id" : 13,
"name" : "l_receiptdate",
"required" : false,
"type" : "date"
}, {
"id" : 14,
"name" : "l_shipinstruct",
"required" : false,
"type" : "string"
}, {
"id" : 15,
"name" : "l_shipmode",
"required" : false,
"type" : "string"
}, {
"id" : 16,
"name" : "l_comment",
"required" : false,
"type" : "string"
} ]
} ],
"default-spec-id" : 0,
"partition-specs" : [ {
"spec-id" : 0,
"fields" : [ ]
} ],
"last-partition-id" : 999,
"default-sort-order-id" : 0,
"sort-orders" : [ {
"order-id" : 0,
"fields" : [ ]
} ],
"properties" : {
"owner" : "sean",
"write.update.mode" : "merge-on-read"
},
"current-snapshot-id" : 7051076103797751626,
"refs" : {
"main" : {
"snapshot-id" : 7051076103797751626,
"type" : "branch"
}
},
"snapshots" : [ {
"sequence-number" : 1,
"snapshot-id" : 7051076103797751626,
"timestamp-ms" : 1690903621972,
"summary" : {
"operation" : "append",
"spark.app.id" : "local-1690903619201",
"added-data-files" : "1",
"added-records" : "1000",
"added-files-size" : "37204",
"changed-partition-count" : "1",
"total-records" : "1000",
"total-files-size" : "37204",
"total-data-files" : "1",
"total-delete-files" : "0",
"total-position-deletes" : "0",
"total-equality-deletes" : "0"
},
"manifest-list" : "iceberg/tables/lineitem_simple/metadata/snap-7051076103797751626-1-84151bf4-ec53-4513-ace5-b94e197e6162.avro",
"schema-id" : 0
} ],
"statistics" : [ ],
"snapshot-log" : [ {
"timestamp-ms" : 1690903621972,
"snapshot-id" : 7051076103797751626
} ],
"metadata-log" : [ ]
}
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
100000000000
8 changes: 8 additions & 0 deletions testdata/sqllogictests_iceberg/local.slt
Original file line number Diff line number Diff line change
Expand Up @@ -94,3 +94,11 @@ REG AIR 314
SHIP 316
TRUCK 264

# We should be able to query a table with version number longer than i32 limit.
#
# See: https://github.com/GlareDB/glaredb/issues/2277
query T
select count(*) = 1000
from iceberg_scan('../../testdata/iceberg/tables/lineitem_simple_longversion');
----
t
Loading