Skip to content

Commit

Permalink
feat: streaming json document/array data (#2494)
Browse files Browse the repository at this point in the history
  • Loading branch information
tychoish committed Feb 1, 2024
1 parent a7cec68 commit 4fdd828
Show file tree
Hide file tree
Showing 13 changed files with 568 additions and 53 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/datasources/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ lance = { git = "https://github.com/universalmind303/lance", rev = "ffd4ac6ee2c6
bson = "2.9.0"
scylla = { version = "0.11.1" }
glob = "0.3.1"
indexmap = "2.1.0"

# SSH tunnels
[target.'cfg(any(target_os = "linux", target_os = "macos"))'.dependencies]
Expand Down
2 changes: 1 addition & 1 deletion crates/datasources/src/bson/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ impl BsonStream {
let stream_schema = schema.clone();

let stream = docs
.chunks(100)
.chunks(1000)
.map(move |results| Self::convert_chunk(results, stream_schema.clone()))
.boxed();

Expand Down
33 changes: 33 additions & 0 deletions crates/datasources/src/json/errors.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
use datafusion_ext::errors::ExtensionError;

use crate::object_store::errors::ObjectStoreSourceError;

#[derive(Debug, thiserror::Error)]
pub enum JsonError {
#[error("Unsupported json type: {0}")]
UnspportedType(&'static str),

#[error(transparent)]
SerdeJson(#[from] serde_json::Error),

#[error("no objects found {0}")]
NotFound(String),

#[error(transparent)]
ObjectStoreSource(#[from] ObjectStoreSourceError),

#[error(transparent)]
ObjectStore(#[from] object_store::Error),

#[error(transparent)]
Arrow(#[from] datafusion::arrow::error::ArrowError),

#[error(transparent)]
Datafusion(#[from] datafusion::error::DataFusionError),
}

impl From<JsonError> for ExtensionError {
fn from(e: JsonError) -> Self {
ExtensionError::String(e.to_string())
}
}
3 changes: 3 additions & 0 deletions crates/datasources/src/json/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
mod errors;
mod stream;
pub mod table;
145 changes: 145 additions & 0 deletions crates/datasources/src/json/stream.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
use std::pin::Pin;
use std::sync::{Arc, Mutex};
use std::task::{Context, Poll};

use datafusion::arrow::datatypes::{Schema, SchemaRef};
use datafusion::arrow::json::ReaderBuilder;
use datafusion::arrow::record_batch::RecordBatch;
use datafusion::error::DataFusionError;
use datafusion::execution::TaskContext;
use datafusion::physical_plan::streaming::PartitionStream;
use datafusion::physical_plan::{RecordBatchStream, SendableRecordBatchStream};
use futures::{Stream, StreamExt};
use object_store::{ObjectMeta, ObjectStore};
use serde_json::{Map, Value};

use crate::json::errors::JsonError;
use crate::json::table::push_unwind_json_values;

pub type SendableCheckedRecordBatchStrem =
Pin<Box<dyn Stream<Item = Result<RecordBatch, DataFusionError>> + Send>>;

pub struct JsonStream {
schema: Arc<Schema>,
stream: SendableCheckedRecordBatchStrem,
}

impl Stream for JsonStream {
type Item = Result<RecordBatch, DataFusionError>;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
self.stream.poll_next_unpin(cx)
}
}

impl RecordBatchStream for JsonStream {
fn schema(&self) -> SchemaRef {
self.schema.clone()
}
}

pub struct JsonPartitionStream {
schema: Arc<Schema>,
stream: Mutex<Option<SendableCheckedRecordBatchStrem>>,
}

impl PartitionStream for JsonPartitionStream {
fn schema(&self) -> &SchemaRef {
&self.schema
}

fn execute(&self, _ctx: Arc<TaskContext>) -> SendableRecordBatchStream {
let partition = self
.stream
.lock()
.unwrap()
.take()
.expect("stream can only be used once")
.boxed();

Box::pin(JsonStream {
schema: self.schema.clone(),
stream: partition,
})
}
}

impl JsonPartitionStream {
pub fn new(schema: Arc<Schema>, chunk: Vec<Map<String, Value>>) -> Self {
let stream_schema = schema.clone();
let stream = futures::stream::iter(chunk)
.chunks(1000)
.map(move |objs| {
let mut decoder = ReaderBuilder::new(stream_schema.to_owned()).build_decoder()?;
decoder
.serialize(&objs)
.map_err(|e| DataFusionError::External(Box::new(e)))?;
Ok(decoder.flush()?.unwrap())
})
.boxed();

Self {
schema: schema.clone(),
stream: Mutex::new(Some(stream)),
}
}
}

pub(crate) struct LazyJsonPartitionStream {
schema: Arc<Schema>,
store: Arc<dyn ObjectStore>,
obj: ObjectMeta,
}

impl PartitionStream for LazyJsonPartitionStream {
fn schema(&self) -> &SchemaRef {
&self.schema
}

fn execute(&self, _ctx: Arc<TaskContext>) -> SendableRecordBatchStream {
let stream_schema = self.schema.to_owned();
let store = self.store.clone();
let obj = self.obj.clone();

Box::pin(JsonStream {
schema: self.schema.clone(),
stream: futures::stream::once(async move {
futures::stream::iter(match Self::build(stream_schema, store, obj).await {
Ok(batches) => batches,
Err(e) => vec![Err(DataFusionError::External(Box::new(e)))],
})
})
.flatten()
.boxed(),
})
}
}

impl LazyJsonPartitionStream {
pub fn new(schema: Arc<Schema>, store: Arc<dyn ObjectStore>, obj: ObjectMeta) -> Self {
Self { schema, store, obj }
}

async fn build(
schema: Arc<Schema>,
store: Arc<dyn ObjectStore>,
obj: ObjectMeta,
) -> Result<Vec<Result<RecordBatch, DataFusionError>>, JsonError> {
let mut data = Vec::new();
push_unwind_json_values(
&mut data,
serde_json::from_slice::<Value>(&store.get(&obj.location).await?.bytes().await?),
)?;

Ok(data
.chunks(1000)
.map(|chunk| {
let mut decoder = ReaderBuilder::new(schema.clone()).build_decoder()?;
decoder
.serialize(chunk)
.map_err(|e| DataFusionError::External(Box::new(e)))?;
Ok(decoder.flush()?.unwrap())
})
.collect())
}
}
158 changes: 158 additions & 0 deletions crates/datasources/src/json/table.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
use std::sync::Arc;
use std::vec::Vec;

use datafusion::arrow::datatypes::{DataType, Field, Schema};
use datafusion::datasource::streaming::StreamingTable;
use datafusion::datasource::TableProvider;
use datafusion::physical_plan::streaming::PartitionStream;
use object_store::ObjectStore;
use serde_json::{Map, Value};

use crate::common::url::DatasourceUrl;
use crate::json::errors::JsonError;
use crate::json::stream::{JsonPartitionStream, LazyJsonPartitionStream};
use crate::object_store::generic::GenericStoreAccess;
use crate::object_store::ObjStoreAccess;

pub async fn json_streaming_table(
store_access: GenericStoreAccess,
source_url: DatasourceUrl,
) -> Result<Arc<dyn TableProvider>, JsonError> {
let path = source_url.path();

let store = store_access.create_store()?;

// assume that the file type is a glob and see if there are
// more files...
let mut list = store_access.list_globbed(&store, path.as_ref()).await?;

if list.is_empty() {
return Err(JsonError::NotFound(path.into_owned()));
}

// for consistent results, particularly for the sample, always
// sort by location
list.sort_by(|a, b| a.location.cmp(&b.location));

let mut data = Vec::new();
{
let first_obj = list
.pop()
.ok_or_else(|| JsonError::NotFound(path.into_owned()))?;
let blob = store
.get(&first_obj.location)
.await?
.bytes()
.await?
.to_vec();

push_unwind_json_values(
&mut data,
serde_json::from_slice::<serde_json::Value>(&blob),
)?;
}

let mut field_set = indexmap::IndexMap::<String, DataType>::new();
for obj in &data {
for (key, value) in obj.into_iter() {
let typ = type_for_value(value);
match field_set.get(key) {
Some(v) => match widen_type(v, typ) {
Some(wider) => field_set.insert(key.to_string(), wider),
None => None,
},
None => field_set.insert(key.to_string(), typ),
};
}
}
let schema = Arc::new(Schema::new(
field_set
.into_iter()
.map(|(k, v)| Field::new(k, v, true))
.collect::<Vec<_>>(),
));

let mut streams = Vec::<Arc<dyn PartitionStream>>::with_capacity(list.len());
streams.push(Arc::new(JsonPartitionStream::new(schema.clone(), data)));
for obj in list {
streams.push(Arc::new(LazyJsonPartitionStream::new(
schema.clone(),
store.clone(),
obj,
)));
}

Ok(Arc::new(StreamingTable::try_new(schema.clone(), streams)?))
}

pub(crate) fn push_unwind_json_values(
data: &mut Vec<Map<String, Value>>,
val: Result<Value, serde_json::Error>,
) -> Result<(), JsonError> {
match val? {
Value::Array(vals) => {
for v in vals {
match v {
Value::Object(doc) => data.push(doc),
Value::Null => data.push(Map::new()),
_ => {
return Err(JsonError::UnspportedType(
"only objects and arrays of objects are supported",
))
}
}
}
}
Value::Object(doc) => data.push(doc),
Value::Null => data.push(Map::new()),
_ => {
return Err(JsonError::UnspportedType(
"only objects and arrays of objects are supported",
))
}
};
Ok(())
}

fn widen_type(left: &DataType, right: DataType) -> Option<DataType> {
match (left, right) {
(&DataType::Null, right) => Some(right),
(&DataType::Int64 | &DataType::UInt64, DataType::Float64) => Some(DataType::Float64),
_ => None,
}
}

fn type_for_value(value: &Value) -> DataType {
match value {
Value::Array(v) => {
if v.is_empty() {
DataType::List(Arc::new(Field::new("", DataType::Null, true)))
} else {
DataType::List(Arc::new(Field::new(
"item",
type_for_value(v.first().unwrap()),
true,
)))
}
}
Value::String(_) => DataType::Utf8,
Value::Bool(_) => DataType::Boolean,
Value::Null => DataType::Null,
Value::Number(n) => {
if n.is_i64() {
DataType::Int64
} else if n.is_u64() {
DataType::UInt64
} else {
DataType::Float64
}
}
Value::Object(obj) => {
let mut fields = Vec::with_capacity(obj.len());
for (k, v) in obj.iter() {
fields.push(Field::new(k, type_for_value(v), true))
}
DataType::Struct(fields.into())
}
}
}
1 change: 1 addition & 0 deletions crates/datasources/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ pub mod clickhouse;
pub mod common;
pub mod debug;
pub mod excel;
pub mod json;
pub mod lake;
pub mod lance;
pub mod mongodb;
Expand Down
Loading

0 comments on commit 4fdd828

Please sign in to comment.