Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Commit

Permalink
Moved dict_id to IPC-specific IO (#713)
Browse files Browse the repository at this point in the history
  • Loading branch information
jorgecarleitao authored Dec 28, 2021
1 parent f07cc2c commit d14ae86
Show file tree
Hide file tree
Showing 51 changed files with 2,085 additions and 1,882 deletions.
16 changes: 9 additions & 7 deletions arrow-parquet-integration-testing/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
use std::fs::File;
use std::sync::Arc;
use std::{collections::HashMap, convert::TryFrom, io::Read};
use std::{collections::HashMap, io::Read};

use arrow2::io::ipc::IpcField;
use arrow2::{
datatypes::{DataType, Schema},
error::Result,
io::{
json_integration::{to_record_batch, ArrowJson},
json_integration::read,
json_integration::ArrowJson,
parquet::write::{
write_file, Compression, Encoding, RowGroupIterator, Version, WriteOptions,
},
Expand All @@ -19,7 +21,7 @@ use clap::{App, Arg};
use flate2::read::GzDecoder;

/// Read gzipped JSON file
fn read_gzip_json(version: &str, file_name: &str) -> (Schema, Vec<RecordBatch>) {
fn read_gzip_json(version: &str, file_name: &str) -> (Schema, Vec<IpcField>, Vec<RecordBatch>) {
let path = format!(
"../testing/arrow-testing/data/arrow-ipc-stream/integration/{}/{}.json.gz",
version, file_name
Expand All @@ -32,7 +34,7 @@ fn read_gzip_json(version: &str, file_name: &str) -> (Schema, Vec<RecordBatch>)
let arrow_json: ArrowJson = serde_json::from_str(&s).unwrap();

let schema = serde_json::to_value(arrow_json.schema).unwrap();
let schema = Schema::try_from(&schema).unwrap();
let (schema, ipc_fields) = read::deserialize_schema(&schema).unwrap();

// read dictionaries
let mut dictionaries = HashMap::new();
Expand All @@ -46,11 +48,11 @@ fn read_gzip_json(version: &str, file_name: &str) -> (Schema, Vec<RecordBatch>)
let batches = arrow_json
.batches
.iter()
.map(|batch| to_record_batch(&schema, batch, &dictionaries))
.map(|batch| read::to_record_batch(&schema, &ipc_fields, batch, &dictionaries))
.collect::<Result<Vec<_>>>()
.unwrap();

(schema, batches)
(schema, ipc_fields, batches)
}

fn main() -> Result<()> {
Expand Down Expand Up @@ -106,7 +108,7 @@ fn main() -> Result<()> {
.collect::<Vec<_>>()
});

let (schema, batches) = read_gzip_json("1.0.0-littleendian", json_file);
let (schema, _, batches) = read_gzip_json("1.0.0-littleendian", json_file);

let schema = if let Some(projection) = &projection {
let fields = schema
Expand Down
4 changes: 2 additions & 2 deletions examples/extension.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,11 @@ fn write_ipc<W: Write + Seek>(writer: W, array: impl Array + 'static) -> Result<
let schema = Schema::new(vec![Field::new("a", array.data_type().clone(), false)]);

let options = write::WriteOptions { compression: None };
let mut writer = write::FileWriter::try_new(writer, &schema, options)?;
let mut writer = write::FileWriter::try_new(writer, &schema, None, options)?;

let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(array)])?;

writer.write(&batch)?;
writer.write(&batch, None)?;

Ok(writer.into_inner())
}
Expand Down
4 changes: 2 additions & 2 deletions examples/ipc_file_write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,10 @@ fn write_batches(path: &str, schema: &Schema, batches: &[RecordBatch]) -> Result
let file = File::create(path)?;

let options = write::WriteOptions { compression: None };
let mut writer = write::FileWriter::try_new(file, schema, options)?;
let mut writer = write::FileWriter::try_new(file, schema, None, options)?;

for batch in batches {
writer.write(batch)?
writer.write(batch, None)?
}
writer.finish()
}
Expand Down
2 changes: 1 addition & 1 deletion examples/json_read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ fn read_path(path: &str, projection: Option<Vec<&str>>) -> Result<RecordBatch> {

// deserialize `rows` into a `RecordBatch`. This is CPU-intensive, has no IO,
// and can be performed on a different thread pool via a channel.
read::deserialize(&rows, fields)
read::deserialize(rows, fields)
}

fn main() -> Result<()> {
Expand Down
9 changes: 5 additions & 4 deletions integration-testing/src/bin/arrow-file-to-stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,16 @@ fn main() -> Result<()> {
let filename = &args[1];
let mut f = File::open(filename)?;
let metadata = read::read_file_metadata(&mut f)?;
let mut reader = read::FileReader::new(f, metadata, None);
let schema = reader.schema();
let mut reader = read::FileReader::new(f, metadata.clone(), None);

let options = write::WriteOptions { compression: None };
let mut writer = write::StreamWriter::try_new(std::io::stdout(), schema, options)?;
let mut writer = write::StreamWriter::new(std::io::stdout(), options);

writer.start(&metadata.schema, &metadata.ipc_schema.fields)?;

reader.try_for_each(|batch| {
let batch = batch?;
writer.write(&batch)
writer.write(&batch, &metadata.ipc_schema.fields)
})?;
writer.finish()?;

Expand Down
42 changes: 12 additions & 30 deletions integration-testing/src/bin/arrow-json-integration-test.rs
Original file line number Diff line number Diff line change
@@ -1,29 +1,13 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use std::fs::File;

use arrow2::io::json_integration::ArrowJson;
use clap::{App, Arg};

use arrow2::io::ipc::read;
use arrow2::io::ipc::write;
use arrow2::{
error::{ArrowError, Result},
io::json_integration::*,
io::json_integration::write as json_write,
};
use arrow_integration_testing::read_json_file;

Expand Down Expand Up @@ -82,10 +66,15 @@ fn json_to_arrow(json_name: &str, arrow_name: &str, verbose: bool) -> Result<()>

let arrow_file = File::create(arrow_name)?;
let options = write::WriteOptions { compression: None };
let mut writer = write::FileWriter::try_new(arrow_file, &json_file.schema, options)?;
let mut writer = write::FileWriter::try_new(
arrow_file,
&json_file.schema,
Some(json_file.fields),
options,
)?;

for b in json_file.batches {
writer.write(&b)?;
writer.write(&b, None)?;
}

writer.finish()?;
Expand All @@ -100,19 +89,12 @@ fn arrow_to_json(arrow_name: &str, json_name: &str, verbose: bool) -> Result<()>

let mut arrow_file = File::open(arrow_name)?;
let metadata = read::read_file_metadata(&mut arrow_file)?;
let reader = read::FileReader::new(arrow_file, metadata, None);
let reader = read::FileReader::new(arrow_file, metadata.clone(), None);

let mut fields: Vec<ArrowJsonField> = vec![];
for f in reader.schema().fields() {
fields.push(ArrowJsonField::from(f));
}
let schema = ArrowJsonSchema {
fields,
metadata: None,
};
let schema = json_write::serialize_schema(&metadata.schema, &metadata.ipc_schema.fields);

let batches = reader
.map(|batch| Ok(from_record_batch(&batch?)))
.map(|batch| Ok(json_write::from_record_batch(&batch?)))
.collect::<Result<Vec<_>>>()?;

let arrow_json = ArrowJson {
Expand Down
12 changes: 8 additions & 4 deletions integration-testing/src/bin/arrow-stream-to-file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,19 @@ use arrow2::io::ipc::write;
fn main() -> Result<()> {
let mut reader = io::stdin();
let metadata = read::read_stream_metadata(&mut reader)?;
let mut arrow_stream_reader = read::StreamReader::new(reader, metadata);
let schema = arrow_stream_reader.schema();
let mut arrow_stream_reader = read::StreamReader::new(reader, metadata.clone());

let writer = io::stdout();

let options = write::WriteOptions { compression: None };
let mut writer = write::FileWriter::try_new(writer, schema, options)?;
let mut writer = write::FileWriter::try_new(
writer,
&metadata.schema,
Some(metadata.ipc_schema.fields),
options,
)?;

arrow_stream_reader.try_for_each(|batch| writer.write(&batch?.unwrap()))?;
arrow_stream_reader.try_for_each(|batch| writer.write(&batch?.unwrap(), None))?;
writer.finish()?;

Ok(())
Expand Down
Loading

0 comments on commit d14ae86

Please sign in to comment.