Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: change default 'json' extension handler #2797

Merged
merged 2 commits into from
Mar 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 8 additions & 3 deletions crates/datafusion_ext/src/planner/relation/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -243,8 +243,6 @@ fn infer_func_for_file(path: &str) -> Result<OwnedTableReference> {
.ok_or_else(|| DataFusionError::Plan(format!("strange file extension: {path}")))?
.to_lowercase();

// TODO: We can be a bit more sophisticated here and handle compression
// schemes as well.
Ok(match ext.as_str() {
"parquet" => OwnedTableReference::Partial {
schema: "public".into(),
Expand All @@ -254,7 +252,11 @@ fn infer_func_for_file(path: &str) -> Result<OwnedTableReference> {
schema: "public".into(),
table: "read_csv".into(),
},
"json" | "jsonl" | "ndjson" => OwnedTableReference::Partial {
"json" => OwnedTableReference::Partial {
schema: "public".into(),
table: "read_json".into(),
},
"ndjson" | "jsonl" => OwnedTableReference::Partial {
schema: "public".into(),
table: "read_ndjson".into(),
},
Expand All @@ -270,6 +272,9 @@ fn infer_func_for_file(path: &str) -> Result<OwnedTableReference> {
if let Ok(compression_type) = ext.parse::<FileCompressionType>() {
let ext = compression_type.get_ext();
let path = path.trim_end_matches(ext.as_str());
// TODO: only parquet/ndjson/csv actually support
// compression, so we'll end up attempting to handle
// compression for some types and not others.
infer_func_for_file(path)?
} else {
return Err(DataFusionError::Plan(format!(
Expand Down
2 changes: 1 addition & 1 deletion crates/datasources/src/json/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
mod errors;
pub mod errors;
mod stream;
pub mod table;
23 changes: 15 additions & 8 deletions crates/sqlexec/src/dispatch/external.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use datasources::common::url::DatasourceUrl;
use datasources::debug::DebugTableType;
use datasources::excel::table::ExcelTableProvider;
use datasources::excel::ExcelTable;
use datasources::json::table::json_streaming_table;
use datasources::lake::delta::access::{load_table_direct, DeltaLakeAccessor};
use datasources::lake::iceberg::table::IcebergTable;
use datasources::lake::{storage_options_into_object_store, storage_options_into_store_access};
Expand Down Expand Up @@ -641,6 +642,9 @@ impl<'a> ExternalDispatcher<'a> {
compression: Option<&String>,
) -> Result<Arc<dyn TableProvider>> {
let path = path.as_ref();
// TODO: only parquet/ndjson/csv actually support compression,
// so we'll end up attempting to handle compression for some
// types and not others.
let compression = compression
.map(|c| c.parse::<FileCompressionType>())
.transpose()?
Expand Down Expand Up @@ -669,21 +673,24 @@ impl<'a> ExternalDispatcher<'a> {
accessor.clone().list_globbed(path).await?,
)
.await?),
"ndjson" | "json" => Ok(accessor
.clone()
.into_table_provider(
&self.df_ctx.state(),
Arc::new(JsonFormat::default().with_file_compression_type(compression)),
accessor.clone().list_globbed(path).await?,
)
.await?),
"bson" => Ok(bson_streaming_table(
access.clone(),
DatasourceUrl::try_new(path)?,
None,
Some(128),
)
.await?),
"json" => Ok(
json_streaming_table(access.clone(), DatasourceUrl::try_new(path)?, None).await?,
),
"ndjson" | "jsonl" => Ok(accessor
.clone()
.into_table_provider(
&self.df_ctx.state(),
Arc::new(JsonFormat::default().with_file_compression_type(compression)),
accessor.clone().list_globbed(path).await?,
)
.await?),
_ => Err(DispatchError::String(
format!("Unsupported file type: '{}', for '{}'", file_type, path,).to_string(),
)),
Expand Down
2 changes: 2 additions & 0 deletions crates/sqlexec/src/dispatch/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,8 @@ pub enum DispatchError {
#[error(transparent)]
BsonDatasource(#[from] datasources::bson::errors::BsonError),
#[error(transparent)]
JsonDatasource(#[from] datasources::json::errors::JsonError),
#[error(transparent)]
ClickhouseDatasource(#[from] datasources::clickhouse::errors::ClickhouseError),
#[error(transparent)]
NativeDatasource(#[from] datasources::native::errors::NativeError),
Expand Down
1,000 changes: 1,000 additions & 0 deletions testdata/json/userdata1.ndjson

Large diffs are not rendered by default.

File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
34 changes: 20 additions & 14 deletions testdata/sqllogictests/infer.slt
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,17 @@ select id, "./testdata/parquet/userdata1.parquet".first_name
1 Amanda


query I
select count(*) from './testdata/json/userdata1.ndjson'
----
1000

query I
select count(*) from './testdata/json/userdata1.json'
----
1000


query IT
select id, "./testdata/json/userdata1.json".first_name
from './testdata/json/userdata1.json'
Expand Down Expand Up @@ -56,7 +62,7 @@ select count(*) from './testdata/parquet/*'

#Tests for inferring table functions from compressed file formats

#Tests for CSV with .gz, .bz2, .xz, .zst
#Tests for CSV with .gz, .bz2, .xz, .zst
#csv.gz
query
select count(*) from './testdata/csv/userdata1.csv.gz'
Expand Down Expand Up @@ -122,67 +128,67 @@ select id, "./testdata/csv/userdata1.csv.zst".first_name
#Tests for json with .gz, .bz2, .xz, .zst
#json.gz
query
select count(*) from './testdata/json/userdata1.json.gz'
select count(*) from './testdata/json/userdata1.ndjson.gz'
----
1000

#json.gz
query IT
select id, "./testdata/json/userdata1.json.gz".first_name
from './testdata/json/userdata1.json.gz'
select id, "./testdata/json/userdata1.ndjson.gz".first_name
from './testdata/json/userdata1.ndjson.gz'
order by id
limit 1
----
1 Amanda

#json.bz2
query
select count(*) from './testdata/json/userdata1.json.bz2'
select count(*) from './testdata/json/userdata1.ndjson.bz2'
----
1000

#json.bz2
query IT
select id, "./testdata/json/userdata1.json.bz2".first_name
from './testdata/json/userdata1.json.bz2'
select id, "./testdata/json/userdata1.ndjson.bz2".first_name
from './testdata/json/userdata1.ndjson.bz2'
order by id
limit 1
----
1 Amanda

#json.xz
query
select count(*) from './testdata/json/userdata1.json.xz'
select count(*) from './testdata/json/userdata1.ndjson.xz'
----
1000

#json.xz
query IT
select id, "./testdata/json/userdata1.json.xz".first_name
from './testdata/json/userdata1.json.xz'
select id, "./testdata/json/userdata1.ndjson.xz".first_name
from './testdata/json/userdata1.ndjson.xz'
order by id
limit 1
----
1 Amanda

#json.zst
query
select count(*) from './testdata/json/userdata1.json.zst'
select count(*) from './testdata/json/userdata1.ndjson.zst'
----
1000

#json.zst
query IT
select id, "./testdata/json/userdata1.json.zst".first_name
from './testdata/json/userdata1.json.zst'
select id, "./testdata/json/userdata1.ndjson.zst".first_name
from './testdata/json/userdata1.ndjson.zst'
order by id
limit 1
----
1 Amanda



#For infering function from parquet compressed formats .bz2, .xz, .zst, .gz
#For infering function from parquet compressed formats .bz2, .xz, .zst, .gz
#parquet.bz2
statement error compression not supported for parquet
select count(*) from './testdata/parquet/userdata1.parquet.bz2'
Expand Down