Skip to content

Commit

Permalink
Make row its crate to make it accessible from physical-expr
Browse files Browse the repository at this point in the history
  • Loading branch information
yjshen committed Apr 20, 2022
1 parent ec3543b commit 1d9b227
Show file tree
Hide file tree
Showing 14 changed files with 201 additions and 133 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ members = [
"datafusion/jit",
"datafusion/physical-expr",
"datafusion/proto",
"datafusion/row",
"datafusion-examples",
"benchmarks",
"ballista/rust/client",
Expand Down
7 changes: 6 additions & 1 deletion datafusion/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ jit = ["datafusion-jit"]
pyarrow = ["pyo3", "arrow/pyarrow", "datafusion-common/pyarrow"]
regex_expressions = ["datafusion-physical-expr/regex_expressions"]
# Used to enable row format experiment
row = []
row = ["datafusion-row"]
simd = ["arrow/simd"]
unicode_expressions = ["datafusion-physical-expr/regex_expressions"]

Expand All @@ -64,6 +64,7 @@ datafusion-data-access = { path = "../../data-access", version = "1.0.0" }
datafusion-expr = { path = "../expr", version = "7.0.0" }
datafusion-jit = { path = "../jit", version = "7.0.0", optional = true }
datafusion-physical-expr = { path = "../physical-expr", version = "7.0.0" }
datafusion-row = { path = "../row", version = "7.0.0", optional = true }
futures = "0.3"
hashbrown = { version = "0.12", features = ["raw"] }
lazy_static = { version = "^1.4.0" }
Expand Down Expand Up @@ -125,3 +126,7 @@ name = "parquet_query_sql"
harness = false
name = "jit"
required-features = ["row", "jit"]

[[test]]
name = "row"
required-features = ["row"]
6 changes: 3 additions & 3 deletions datafusion/core/benches/jit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@ extern crate datafusion;
mod data_utils;
use crate::criterion::Criterion;
use crate::data_utils::{create_record_batches, create_schema};
use datafusion::row::jit::writer::bench_write_batch_jit;
use datafusion::row::writer::bench_write_batch;
use datafusion::row::RowType;
use datafusion_row::jit::writer::bench_write_batch_jit;
use datafusion_row::writer::bench_write_batch;
use datafusion_row::RowType;
use std::sync::Arc;

fn criterion_benchmark(c: &mut Criterion) {
Expand Down
3 changes: 0 additions & 3 deletions datafusion/core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -234,9 +234,6 @@ pub use datafusion_physical_expr as physical_expr;
#[cfg(feature = "jit")]
pub use datafusion_jit as jit;

#[cfg(feature = "row")]
pub mod row;

pub mod from_slice;

#[cfg(test)]
Expand Down
108 changes: 108 additions & 0 deletions datafusion/core/tests/row.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use datafusion::datasource::file_format::parquet::ParquetFormat;
use datafusion::datasource::file_format::FileFormat;
use datafusion::datasource::listing::local_unpartitioned_file;
use datafusion::error::Result;
use datafusion::physical_plan::file_format::FileScanConfig;
use datafusion::physical_plan::{collect, ExecutionPlan};
use datafusion::prelude::SessionContext;
use datafusion_data_access::object_store::local::LocalFileSystem;
use datafusion_data_access::object_store::local::{
local_object_reader, local_object_reader_stream,
};
use datafusion_row::layout::RowType::{Compact, WordAligned};
use datafusion_row::reader::read_as_batch;
use datafusion_row::writer::write_batch_unchecked;
use std::sync::Arc;

#[tokio::test]
async fn test_with_parquet() -> Result<()> {
let session_ctx = SessionContext::new();
let task_ctx = session_ctx.task_ctx();
let projection = Some(vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9]);
let exec = get_exec("alltypes_plain.parquet", &projection, None).await?;
let schema = exec.schema().clone();

let batches = collect(exec, task_ctx).await?;
assert_eq!(1, batches.len());
let batch = &batches[0];

let mut vector = vec![0; 20480];
let row_offsets =
{ write_batch_unchecked(&mut vector, 0, batch, 0, schema.clone(), Compact) };
let output_batch = { read_as_batch(&vector, schema, &row_offsets, Compact)? };
assert_eq!(*batch, output_batch);

Ok(())
}

#[tokio::test]
async fn test_with_parquet_word_aligned() -> Result<()> {
let session_ctx = SessionContext::new();
let task_ctx = session_ctx.task_ctx();
let projection = Some(vec![0, 1, 2, 3, 4, 5, 6, 7]);
let exec = get_exec("alltypes_plain.parquet", &projection, None).await?;
let schema = exec.schema().clone();

let batches = collect(exec, task_ctx).await?;
assert_eq!(1, batches.len());
let batch = &batches[0];

let mut vector = vec![0; 20480];
let row_offsets =
{ write_batch_unchecked(&mut vector, 0, batch, 0, schema.clone(), WordAligned) };
let output_batch = { read_as_batch(&vector, schema, &row_offsets, WordAligned)? };
assert_eq!(*batch, output_batch);

Ok(())
}

async fn get_exec(
file_name: &str,
projection: &Option<Vec<usize>>,
limit: Option<usize>,
) -> Result<Arc<dyn ExecutionPlan>> {
let testdata = datafusion::test_util::parquet_test_data();
let filename = format!("{}/{}", testdata, file_name);
let format = ParquetFormat::default();
let file_schema = format
.infer_schema(local_object_reader_stream(vec![filename.clone()]))
.await
.expect("Schema inference");
let statistics = format
.infer_stats(local_object_reader(filename.clone()), file_schema.clone())
.await
.expect("Stats inference");
let file_groups = vec![vec![local_unpartitioned_file(filename.clone())]];
let exec = format
.create_physical_plan(
FileScanConfig {
object_store: Arc::new(LocalFileSystem {}),
file_schema,
file_groups,
statistics,
projection: projection.clone(),
limit,
table_partition_cols: vec![],
},
&[],
)
.await?;
Ok(exec)
}
45 changes: 45 additions & 0 deletions datafusion/row/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

[package]
name = "datafusion-row"
description = "Row backed by raw bytes for DataFusion query engine"
version = "7.0.0"
homepage = "https://github.com/apache/arrow-datafusion"
repository = "https://github.com/apache/arrow-datafusion"
readme = "../README.md"
authors = ["Apache Arrow <dev@arrow.apache.org>"]
license = "Apache-2.0"
keywords = [ "arrow", "query", "sql" ]
edition = "2021"
rust-version = "1.59"

[lib]
name = "datafusion_row"
path = "src/lib.rs"

[features]
# Used to enable JIT code generation
jit = ["datafusion-jit"]

[dependencies]
arrow = { version = "11.1" }
datafusion-common = { path = "../common", version = "7.0.0" }
datafusion-expr = { path = "../expr", version = "7.0.0" }
datafusion-jit = { path = "../jit", version = "7.0.0", optional = true }
paste = "^1.0"
rand = "0.8"
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,12 @@ fn fn_name<T>(f: T) -> &'static str {

#[cfg(test)]
mod tests {
use crate::error::Result;
use crate::row::jit::reader::read_as_batch_jit;
use crate::row::jit::writer::write_batch_unchecked_jit;
use crate::row::layout::RowType::{Compact, WordAligned};
use crate::jit::reader::read_as_batch_jit;
use crate::jit::writer::write_batch_unchecked_jit;
use crate::layout::RowType::{Compact, WordAligned};
use arrow::record_batch::RecordBatch;
use arrow::{array::*, datatypes::*};
use datafusion_common::Result;
use datafusion_jit::api::Assembler;
use std::sync::Arc;
use DataType::*;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,16 @@

//! Accessing row from raw bytes with JIT
use crate::error::{DataFusionError, Result};
use crate::jit::fn_name;
use crate::layout::RowType;
use crate::reader::RowReader;
use crate::reader::*;
use crate::reg_fn;
use crate::row::jit::fn_name;
use crate::row::layout::RowType;
use crate::row::reader::RowReader;
use crate::row::reader::*;
use crate::row::MutableRecordBatch;
use crate::MutableRecordBatch;
use arrow::array::ArrayBuilder;
use arrow::datatypes::{DataType, Schema};
use arrow::record_batch::RecordBatch;
use datafusion_common::{DataFusionError, Result};
use datafusion_jit::api::Assembler;
use datafusion_jit::api::GeneratedFunction;
use datafusion_jit::ast::{I64, PTR};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,16 @@

//! Reusable JIT version of row writer backed by Vec<u8> to stitch attributes together
use crate::error::Result;
use crate::jit::fn_name;
use crate::layout::RowType;
use crate::reg_fn;
use crate::row::jit::fn_name;
use crate::row::layout::RowType;
use crate::row::schema_null_free;
use crate::row::writer::RowWriter;
use crate::row::writer::*;
use crate::schema_null_free;
use crate::writer::RowWriter;
use crate::writer::*;
use arrow::array::Array;
use arrow::datatypes::{DataType, Schema};
use arrow::record_batch::RecordBatch;
use datafusion_common::Result;
use datafusion_jit::api::CodeBlock;
use datafusion_jit::api::{Assembler, GeneratedFunction};
use datafusion_jit::ast::Expr;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

//! Various row layout for different use case
use crate::row::schema_null_free;
use crate::schema_null_free;
use arrow::datatypes::{DataType, Schema};
use arrow::util::bit_util::{ceil, round_upto_power_of_2};

Expand All @@ -41,7 +41,6 @@ pub enum RowType {
#[derive(Debug)]
pub(crate) struct RowLayout {
/// Type of the layout
#[allow(dead_code)]
row_type: RowType,
/// If a row is null free according to its schema
pub(crate) null_free: bool,
Expand Down
Loading

0 comments on commit 1d9b227

Please sign in to comment.