Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Commit

Permalink
Simplified IPC
Browse files Browse the repository at this point in the history
  • Loading branch information
jorgecarleitao committed Jan 2, 2022
1 parent 9b4c4fd commit 99ca2c8
Show file tree
Hide file tree
Showing 11 changed files with 89 additions and 90 deletions.
13 changes: 10 additions & 3 deletions integration-testing/src/bin/arrow-json-integration-test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,10 +91,17 @@ fn arrow_to_json(arrow_name: &str, json_name: &str, verbose: bool) -> Result<()>
let metadata = read::read_file_metadata(&mut arrow_file)?;
let reader = read::FileReader::new(arrow_file, metadata.clone(), None);

let names = metadata
.schema
.fields
.iter()
.map(|f| f.name())
.collect::<Vec<_>>();

let schema = json_write::serialize_schema(&metadata.schema, &metadata.ipc_schema.fields);

let batches = reader
.map(|batch| Ok(json_write::from_record_batch(&batch?)))
.map(|batch| Ok(json_write::serialize_columns(&batch?, &names)))
.collect::<Result<Vec<_>>>()?;

let arrow_json = ArrowJson {
Expand All @@ -121,10 +128,10 @@ fn validate(arrow_name: &str, json_name: &str, verbose: bool) -> Result<()> {
let mut arrow_file = File::open(arrow_name)?;
let metadata = read::read_file_metadata(&mut arrow_file)?;
let reader = read::FileReader::new(arrow_file, metadata, None);
let arrow_schema = reader.schema().as_ref().to_owned();
let arrow_schema = reader.schema();

// compare schemas
if json_file.schema != arrow_schema {
if &json_file.schema != arrow_schema {
return Err(ArrowError::InvalidArgumentError(format!(
"Schemas do not match. JSON: {:?}. Arrow: {:?}",
json_file.schema, arrow_schema
Expand Down
42 changes: 22 additions & 20 deletions integration-testing/src/flight_client_scenarios/integration_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
use crate::{read_json_file, ArrowFile};

use arrow2::{
array::Array,
columns::Columns,
datatypes::*,
io::ipc::{
read::{self, Dictionaries},
Expand All @@ -27,7 +29,6 @@ use arrow2::{
flight::{self, deserialize_batch, serialize_batch},
ipc::IpcField,
},
record_batch::RecordBatch,
};
use arrow_format::flight::data::{
flight_descriptor::DescriptorType, FlightData, FlightDescriptor, Location, Ticket,
Expand All @@ -45,6 +46,8 @@ type Result<T = (), E = Error> = std::result::Result<T, E>;

type Client = FlightServiceClient<tonic::transport::Channel>;

type Chunk = Columns<Arc<dyn Array>>;

pub async fn run_scenario(host: &str, port: &str, path: &str) -> Result {
let url = format!("http://{}:{}", host, port);

Expand Down Expand Up @@ -75,7 +78,7 @@ pub async fn run_scenario(host: &str, port: &str, path: &str) -> Result {
batches.clone(),
)
.await?;
verify_data(client, descriptor, schema, &ipc_schema, &batches).await?;
verify_data(client, descriptor, &schema, &ipc_schema, &batches).await?;

Ok(())
}
Expand All @@ -85,7 +88,7 @@ async fn upload_data(
schema: &Schema,
fields: &[IpcField],
descriptor: FlightDescriptor,
original_data: Vec<RecordBatch>,
original_data: Vec<Chunk>,
) -> Result {
let (mut upload_tx, upload_rx) = mpsc::channel(10);

Expand Down Expand Up @@ -140,7 +143,7 @@ async fn upload_data(
async fn send_batch(
upload_tx: &mut mpsc::Sender<FlightData>,
metadata: &[u8],
batch: &RecordBatch,
batch: &Chunk,
fields: &[IpcField],
options: &write::WriteOptions,
) -> Result {
Expand All @@ -159,9 +162,9 @@ async fn send_batch(
async fn verify_data(
mut client: Client,
descriptor: FlightDescriptor,
expected_schema: SchemaRef,
expected_schema: &Schema,
ipc_schema: &IpcSchema,
expected_data: &[RecordBatch],
expected_data: &[Chunk],
) -> Result {
let resp = client.get_flight_info(Request::new(descriptor)).await?;
let info = resp.into_inner();
Expand All @@ -184,7 +187,7 @@ async fn verify_data(
location,
ticket.clone(),
expected_data,
expected_schema.clone(),
expected_schema,
ipc_schema,
)
.await?;
Expand All @@ -197,8 +200,8 @@ async fn verify_data(
async fn consume_flight_location(
location: Location,
ticket: Ticket,
expected_data: &[RecordBatch],
schema: SchemaRef,
expected_data: &[Chunk],
schema: &Schema,
ipc_schema: &IpcSchema,
) -> Result {
let mut location = location;
Expand Down Expand Up @@ -231,21 +234,20 @@ async fn consume_flight_location(
let metadata = counter.to_string().into_bytes();
assert_eq!(metadata, data.app_metadata);

let actual_batch = deserialize_batch(&data, schema.clone(), ipc_schema, &dictionaries)
let actual_batch = deserialize_batch(&data, schema.fields(), ipc_schema, &dictionaries)
.expect("Unable to convert flight data to Arrow batch");

assert_eq!(expected_batch.schema(), actual_batch.schema());
assert_eq!(expected_batch.num_columns(), actual_batch.num_columns());
assert_eq!(expected_batch.num_rows(), actual_batch.num_rows());
let schema = expected_batch.schema();
for i in 0..expected_batch.num_columns() {
assert_eq!(expected_batch.columns().len(), actual_batch.columns().len());
assert_eq!(expected_batch.len(), actual_batch.len());
for (i, (expected, actual)) in expected_batch
.columns()
.iter()
.zip(actual_batch.columns().iter())
.enumerate()
{
let field = schema.field(i);
let field_name = field.name();

let expected_data = expected_batch.column(i);
let actual_data = actual_batch.column(i);

assert_eq!(expected_data, actual_data, "Data for field {}", field_name);
assert_eq!(expected, actual, "Data for field {}", field_name);
}
}

Expand Down
27 changes: 12 additions & 15 deletions integration-testing/src/flight_server_scenarios/integration_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ use std::collections::HashMap;
use std::pin::Pin;
use std::sync::Arc;

use arrow2::array::Array;
use arrow2::columns::Columns;
use arrow2::io::flight::{deserialize_schemas, serialize_batch, serialize_schema};
use arrow2::io::ipc::read::Dictionaries;
use arrow2::io::ipc::IpcSchema;
Expand All @@ -28,9 +30,7 @@ use arrow_format::flight::service::flight_service_server::*;
use arrow_format::ipc::Message::{root_as_message, Message, MessageHeader};
use arrow_format::ipc::Schema as ArrowSchema;

use arrow2::{
datatypes::*, io::flight::serialize_schema_to_info, io::ipc, record_batch::RecordBatch,
};
use arrow2::{datatypes::*, io::flight::serialize_schema_to_info, io::ipc};

use futures::{channel::mpsc, sink::SinkExt, Stream, StreamExt};
use tokio::sync::Mutex;
Expand Down Expand Up @@ -62,7 +62,7 @@ pub async fn scenario_setup(port: &str) -> Result {
struct IntegrationDataset {
schema: Schema,
ipc_schema: IpcSchema,
chunks: Vec<RecordBatch>,
chunks: Vec<Columns<Arc<dyn Array>>>,
}

#[derive(Clone, Default)]
Expand Down Expand Up @@ -173,7 +173,7 @@ impl FlightService for FlightServiceImpl {

let endpoint = self.endpoint_from_path(&path[0]);

let total_records: usize = flight.chunks.iter().map(|chunk| chunk.num_rows()).sum();
let total_records: usize = flight.chunks.iter().map(|chunk| chunk.len()).sum();

let schema = serialize_schema_to_info(&flight.schema, &flight.ipc_schema.fields)
.expect(
Expand Down Expand Up @@ -218,7 +218,6 @@ impl FlightService for FlightServiceImpl {

let (schema, ipc_schema) = deserialize_schemas(&flight_data.data_header)
.map_err(|e| Status::invalid_argument(format!("Invalid schema: {:?}", e)))?;
let schema_ref = Arc::new(schema.clone());

let (response_tx, response_rx) = mpsc::channel(10);

Expand All @@ -228,11 +227,10 @@ impl FlightService for FlightServiceImpl {
let mut error_tx = response_tx.clone();
if let Err(e) = save_uploaded_chunks(
uploaded_chunks,
schema_ref,
schema,
ipc_schema,
input_stream,
response_tx,
schema,
key,
)
.await
Expand Down Expand Up @@ -280,10 +278,10 @@ async fn send_app_metadata(
async fn record_batch_from_message(
message: Message<'_>,
data_body: &[u8],
schema_ref: Arc<Schema>,
fields: &[Field],
ipc_schema: &IpcSchema,
dictionaries: &mut Dictionaries,
) -> Result<RecordBatch, Status> {
) -> Result<Columns<Arc<dyn Array>>, Status> {
let ipc_batch = message
.header_as_record_batch()
.ok_or_else(|| Status::internal("Could not parse message header as record batch"))?;
Expand All @@ -292,7 +290,7 @@ async fn record_batch_from_message(

let arrow_batch_result = ipc::read::read_record_batch(
ipc_batch,
schema_ref,
fields,
ipc_schema,
None,
dictionaries,
Expand Down Expand Up @@ -326,11 +324,10 @@ async fn dictionary_from_message(

async fn save_uploaded_chunks(
uploaded_chunks: Arc<Mutex<HashMap<String, IntegrationDataset>>>,
schema_ref: Arc<Schema>,
schema: Schema,
ipc_schema: IpcSchema,
mut input_stream: Streaming<FlightData>,
mut response_tx: mpsc::Sender<Result<PutResult, Status>>,
schema: Schema,
key: String,
) -> Result<(), Status> {
let mut chunks = vec![];
Expand All @@ -354,7 +351,7 @@ async fn save_uploaded_chunks(
let batch = record_batch_from_message(
message,
&data.data_body,
schema_ref.clone(),
schema.fields(),
&ipc_schema,
&mut dictionaries,
)
Expand All @@ -366,7 +363,7 @@ async fn save_uploaded_chunks(
dictionary_from_message(
message,
&data.data_body,
schema_ref.fields(),
schema.fields(),
&ipc_schema,
&mut dictionaries,
)
Expand Down
21 changes: 13 additions & 8 deletions integration-testing/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,19 @@

//! Common code used in the integration test binaries
use arrow2::array::Array;
use arrow2::io::ipc::IpcField;
use serde_json::Value;

use arrow2::columns::Columns;
use arrow2::datatypes::*;
use arrow2::error::Result;
use arrow2::io::json_integration::{read, ArrowJsonBatch, ArrowJsonDictionaryBatch};
use arrow2::record_batch::RecordBatch;

use std::collections::HashMap;
use std::fs::File;
use std::io::BufReader;
use std::sync::Arc;

/// The expected username for the basic auth integration test.
pub const AUTH_USERNAME: &str = "arrow";
Expand All @@ -43,7 +45,7 @@ pub struct ArrowFile {
// we can evolve this into a concrete Arrow type
// this is temporarily not being read from
pub _dictionaries: HashMap<i64, ArrowJsonDictionaryBatch>,
pub batches: Vec<RecordBatch>,
pub batches: Vec<Columns<Arc<dyn Array>>>,
}

pub fn read_json_file(json_name: &str) -> Result<ArrowFile> {
Expand All @@ -66,12 +68,15 @@ pub fn read_json_file(json_name: &str) -> Result<ArrowFile> {
}
}

let mut batches = vec![];
for b in arrow_json["batches"].as_array().unwrap() {
let json_batch: ArrowJsonBatch = serde_json::from_value(b.clone()).unwrap();
let batch = read::to_record_batch(&schema, &fields, &json_batch, &dictionaries)?;
batches.push(batch);
}
let batches = arrow_json["batches"]
.as_array()
.unwrap()
.iter()
.map(|b| {
let json_batch: ArrowJsonBatch = serde_json::from_value(b.clone()).unwrap();
read::deserialize_columns(&schema, &fields, &json_batch, &dictionaries)
})
.collect::<Result<_>>()?;
Ok(ArrowFile {
schema,
fields,
Expand Down
6 changes: 3 additions & 3 deletions src/io/flight/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use crate::{

use super::ipc::{IpcField, IpcSchema};

/// Serializes a [`Columns`] to a vector of [`FlightData`] representing the serialized dictionaries
/// Serializes [`Columns`] to a vector of [`FlightData`] representing the serialized dictionaries
/// and a [`FlightData`] representing the batch.
pub fn serialize_batch(
columns: &Columns<Arc<dyn Array>>,
Expand Down Expand Up @@ -102,7 +102,7 @@ pub fn deserialize_schemas(bytes: &[u8]) -> Result<(Schema, IpcSchema)> {
/// Deserializes [`FlightData`] to [`Columns`].
pub fn deserialize_batch(
data: &FlightData,
schema: Arc<Schema>,
fields: &[Field],
ipc_schema: &IpcSchema,
dictionaries: &read::Dictionaries,
) -> Result<Columns<Arc<dyn Array>>> {
Expand All @@ -123,7 +123,7 @@ pub fn deserialize_batch(
.map(|batch| {
read::read_record_batch(
batch,
schema.clone(),
fields,
ipc_schema,
None,
dictionaries,
Expand Down
Loading

0 comments on commit 99ca2c8

Please sign in to comment.