Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: streaming json document/array data #2494

Merged
merged 19 commits into from
Jan 30, 2024
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 15 additions & 14 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/datasources/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ lance = { git = "https://github.com/universalmind303/lance", rev = "ffd4ac6ee2c6
bson = "2.7.0"
scylla = { version = "0.11.1" }
glob = "0.3.1"
indexmap = "2.1.0"

# SSH tunnels
[target.'cfg(any(target_os = "linux", target_os = "macos"))'.dependencies]
Expand Down
33 changes: 33 additions & 0 deletions crates/datasources/src/json/errors.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
use datafusion_ext::errors::ExtensionError;

use crate::object_store::errors::ObjectStoreSourceError;

#[derive(Debug, thiserror::Error)]
pub enum JsonError {
#[error("Unsupported json type: {0}")]
UnspportedType(&'static str),

#[error(transparent)]
SerdeJson(#[from] serde_json::Error),

#[error("no objects found {0}")]
NotFound(String),

#[error(transparent)]
ObjectStoreSource(#[from] ObjectStoreSourceError),

#[error(transparent)]
ObjectStore(#[from] object_store::Error),

#[error(transparent)]
Arrow(#[from] datafusion::arrow::error::ArrowError),

#[error(transparent)]
Datafusion(#[from] datafusion::error::DataFusionError),
}

impl From<JsonError> for ExtensionError {
fn from(e: JsonError) -> Self {
ExtensionError::String(e.to_string())
}
}
3 changes: 3 additions & 0 deletions crates/datasources/src/json/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
mod errors;
mod stream;
pub mod table;
83 changes: 83 additions & 0 deletions crates/datasources/src/json/stream.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
use std::pin::Pin;
use std::sync::{Arc, Mutex};
use std::task::{Context, Poll};

use datafusion::arrow::datatypes::{Schema, SchemaRef};
use datafusion::arrow::json::ReaderBuilder;
use datafusion::arrow::record_batch::RecordBatch;
use datafusion::error::DataFusionError;
use datafusion::execution::TaskContext;
use datafusion::physical_plan::streaming::PartitionStream;
use datafusion::physical_plan::{RecordBatchStream, SendableRecordBatchStream};
use futures::{Stream, StreamExt};
use serde_json::{Map, Value};

type SendableCheckedRecordBatchStrem =
Pin<Box<dyn Stream<Item = Result<RecordBatch, DataFusionError>> + Send>>;

pub struct JsonStream {
schema: Arc<Schema>,
stream: SendableCheckedRecordBatchStrem,
}

impl Stream for JsonStream {
type Item = Result<RecordBatch, DataFusionError>;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
self.stream.poll_next_unpin(cx)
}
}

impl RecordBatchStream for JsonStream {
fn schema(&self) -> SchemaRef {
self.schema.clone()
}
}

pub struct JsonPartitionStream {
schema: Arc<Schema>,
stream: Mutex<Option<SendableCheckedRecordBatchStrem>>,
}

impl PartitionStream for JsonPartitionStream {
fn schema(&self) -> &SchemaRef {
&self.schema
}

fn execute(&self, _ctx: Arc<TaskContext>) -> SendableRecordBatchStream {
let partition = self
.stream
.lock()
.unwrap()
.take()
.expect("stream to only be called once")
.boxed();

Box::pin(JsonStream {
schema: self.schema.clone(),
stream: partition,
})
}
}

impl JsonPartitionStream {
pub fn new(schema: Arc<Schema>, chunk: Vec<Map<String, Value>>) -> Self {
let stream_schema = schema.clone();
let stream = futures::stream::iter(chunk)
.chunks(25)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why 25?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's less than 100; Other numbers could be good too.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you have a suggestion?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess i don't understand why we chunk it, then chunk it again?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

these streaming tables work by collecting a number of streams (from files, though in this case we will have read them all into memory because we need/want to get the schema (and do that losslessly)). Then the stream produces record batches.

The first chunk, gives us the partitions, the second gives us the size of the record batch.

I (following from your earlier comment) made the recordbatch size 1000, and the stream size as 10,000.

.map(move |objs| {
let mut decoder =
ReaderBuilder::new(stream_schema.clone().to_owned()).build_decoder()?;
tychoish marked this conversation as resolved.
Show resolved Hide resolved
decoder
.serialize(&objs)
.map_err(|e| DataFusionError::External(Box::new(e)))?;
Ok(decoder.flush()?.unwrap())
tychoish marked this conversation as resolved.
Show resolved Hide resolved
})
.boxed();

Self {
schema: schema.clone(),
stream: Mutex::new(Some(stream)),
}
}
}
135 changes: 135 additions & 0 deletions crates/datasources/src/json/table.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
use std::sync::Arc;
use std::vec::Vec;

use datafusion::arrow::datatypes::{DataType, Field, Schema};
use datafusion::datasource::streaming::StreamingTable;
use datafusion::datasource::TableProvider;
use datafusion::physical_plan::streaming::PartitionStream;
use serde_json::{Map, Value};

use super::stream::JsonPartitionStream;
use crate::common::url::DatasourceUrl;
use crate::json::errors::JsonError;
use crate::object_store::generic::GenericStoreAccess;
use crate::object_store::ObjStoreAccess;

pub async fn json_streaming_table(
store_access: GenericStoreAccess,
source_url: DatasourceUrl,
) -> Result<Arc<dyn TableProvider>, JsonError> {
let path = source_url.path();

let store = store_access.create_store()?;

// assume that the file type is a glob and see if there are
// more files...
let mut list = store_access.list_globbed(&store, path.as_ref()).await?;

if list.is_empty() {
return Err(JsonError::NotFound(path.into_owned()));
}

// for consistent results, particularly for the sample, always
// sort by location
list.sort_by(|a, b| a.location.cmp(&b.location));

let mut data = Vec::new();
for obj in list {
let blob = store.get(&obj.location).await?.bytes().await?.to_vec();
let dejson = serde_json::from_slice::<serde_json::Value>(blob.as_slice())?.to_owned();
tychoish marked this conversation as resolved.
Show resolved Hide resolved
tychoish marked this conversation as resolved.
Show resolved Hide resolved
push_unwind_json_values(&mut data, dejson)?;
}

let mut field_set = indexmap::IndexMap::<String, DataType>::new();
for obj in &data {
for key in obj.keys() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't this iterate over all rows?

We already are iterating over it when its passed into arrow's decoder.

It seems reasonable that we could deserialized the json in a single pass instead of 2.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the loop on 37 iterates over all files/urls (in the case of a glob) and in most cases will only run once.

push_unwind_... doesn't iterate over the document, just builds the list of documents, erroring if we get something that isn't useable (a scalar). This is either a single document, or an array of documents, and unwinds the array when needed. We could extract the schema in this function, but:

  • I think the code would get more complicated, and this is pretty easy to understand.
  • in the future where you might specify a schema to the table provider, the code is, in its current form easier to modify in that direction
  • it would be easy to end up with code where you were were iterating less but doing more allocations to temporary data structures to keep track as you go (particularly if you have multiple files), and I feel like two loops are likely to be faster than one loop with a lot of allocation pressure.
  • my baseline assumption is that it's always going to be relatively small amounts of data (a few megs tops).
  • in the error case you end up building a schema that you're never going to use, and errors might end up being pretty common just because of malformed REST apis.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

my baseline assumption is that it's always going to be relatively small amounts of data (a few megs tops).

I don't think we should be making any assumptions about size of data, especially considering the globbing support. I could see someone trying to read in a directory of hundreds or thousands of small/medium json files.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are a couple of assumptions here:

  • the primary use case will be pulling information off of HTTP endpoints. It's just convenient to use to object store stuff. We want to say "point glaredb at and you get a SQL table." files with arbitrary data is fine too but a sort of just something that comes for free.
  • there are a lot of things that are kind of inefficient here because I want the experience in the above use case to not lose data (e.g. drop something from later documents because it's not in the schema). When we let folks pass schema clauses into functions and external tables, we can update these functions and also provide schema inference, and then we should change the features.

Unless we want to shelve this for a while while we figure out how to handle the schema, then all the data have to be in memory, or we have to parse all the files twice. I opted for the former based on the assumptions about the use case. I'm not opposed to addressing the "many small json files" or "data size larger than (I dunno) a gig" use cases, but I'm also fine if we punt on those, as there are workarounds (use polars) or (convert to ndjson), and it would not be difficult to add schema inference or explicit schema and/or lazy file reading to this implementation.

Copy link
Contributor

@universalmind303 universalmind303 Jan 26, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mostly agree with these assumptions, but I do think there is another path forward (particularly for the multi file case). I don't think the assumption of all the data have to be in memory is correct.

At any given time, we only need to hold a single file in memory. All of our file based readers expect the same schema across multiple files/globs. So we can apply that same logic here, and only use the first file to extract the schema. From there, we can defer fetching & parsing of the remaining files into the streams.

This would be especially useful for (eventually) limit pushdowns. If you only want limit 1000 of thousands of files and each file is 1000, you could stop after the first file, saving n-1 cycles (both IO + CPU).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The streaming table handling basically implicitly pushes down limits because the streams are iterated lazily.

I implemented the "all of the first file, nothing more" schema inference. I think this will be ok for now.

The globbing definitely doesn't work (or at least I couldn't provoke it to work); it's probably a regression, but the bson code is subject to the same flaw, and we're not doing anything except using the library functions, which at least for now renders the multi-file use case somewhat moot.

We can dig into this next week.

if field_set.contains_key(key) {
continue;
}
field_set.insert(key.to_string(), type_for_value(obj.get(key).unwrap()));
}
}

let schema = Arc::new(Schema::new(
field_set
.iter()
.map(|(k, v)| Field::new(k, v.to_owned(), true))
tychoish marked this conversation as resolved.
Show resolved Hide resolved
.collect::<Vec<_>>(),
));

let chunks = data
.chunks(100)
tychoish marked this conversation as resolved.
Show resolved Hide resolved
.map(|chunk| -> Arc<dyn PartitionStream> {
Arc::new(JsonPartitionStream::new(
schema.clone(),
chunk.to_vec().to_owned(),
tychoish marked this conversation as resolved.
Show resolved Hide resolved
))
})
.collect::<Vec<_>>();

Ok(Arc::new(StreamingTable::try_new(schema.clone(), chunks)?))
}

fn push_unwind_json_values(
tychoish marked this conversation as resolved.
Show resolved Hide resolved
data: &mut Vec<Map<String, Value>>,
val: Value,
) -> Result<(), JsonError> {
match val {
Value::Array(vals) => {
for v in vals {
match v {
Value::Object(doc) => data.push(doc),
Value::Null => data.push(Map::new()),
_ => {
return Err(JsonError::UnspportedType(
"only objects and arrays of objects are supported",
))
}
}
}
}
Value::Object(doc) => data.push(doc),
Value::Null => data.push(Map::new()),
_ => {
return Err(JsonError::UnspportedType(
"only objects and arrays of objects are supported",
))
}
};
Ok(())
}

fn type_for_value(value: &Value) -> DataType {
match value {
Value::Array(v) => {
if v.is_empty() {
DataType::List(Arc::new(Field::new("", DataType::Null, true)))
tychoish marked this conversation as resolved.
Show resolved Hide resolved
} else {
DataType::List(Arc::new(Field::new(
"",
type_for_value(v.first().unwrap()),
Copy link
Contributor

@universalmind303 universalmind303 Jan 26, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This'll definitely cause some issues later on. I made this same mistake in the polars json parser & we ended up getting a bunch of issues from parsing errors due to schema mismatching. We ended up needing to infer the super type for the values in the array.

for example, an array of [null, "some string", "some other string"] will result in List<Null> instead of List<Utf8>.

similarly, an array of [0, 1.0, 2.2] will get parsed as List<Int64> instead of List<Float64>

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At a minimum, we should error out if the values in the array are of different types.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, I thought of this and decided to not care initially. There are a lot of edge cases that we'll continue to not handle very well ([nul, "foo", 1] plus arrays of objects...

I can do something replaces nulls with something that isn't a null (that'd cover a lot of cases) sort of everything else gets much harrier.

true,
)))
}
}
Value::String(_) => DataType::Utf8,
Value::Bool(_) => DataType::Boolean,
Value::Null => DataType::Null,
Value::Number(n) => {
if n.is_i64() {
DataType::Int64
} else if n.is_u64() {
DataType::UInt64
} else {
DataType::Float64
}
}
Value::Object(obj) => {
let mut fields = Vec::with_capacity(obj.len());
for (k, v) in obj.iter() {
fields.push(Field::new(k, type_for_value(v), true))
}
DataType::Struct(fields.into())
}
}
}
Loading
Loading