Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move common code used for testing code into datafusion/test_utils #3961

Merged
merged 1 commit into from
Oct 27, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions benchmarks/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -46,4 +46,5 @@ serde = { version = "1.0.136", features = ["derive"] }
serde_json = "1.0.78"
snmalloc-rs = { version = "0.3", optional = true }
structopt = { version = "0.3", default-features = false }
test-utils = { path = "../test-utils/" }
tokio = { version = "^1.0", features = ["macros", "rt", "rt-multi-thread", "parking_lot"] }
231 changes: 14 additions & 217 deletions benchmarks/src/bin/parquet_filter_pushdown.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,7 @@
// specific language governing permissions and limitations
// under the License.

use arrow::array::{
Int32Builder, StringBuilder, StringDictionaryBuilder, TimestampNanosecondBuilder,
UInt16Builder,
};
use arrow::datatypes::{DataType, Field, Int32Type, Schema, SchemaRef, TimeUnit};
use arrow::record_batch::RecordBatch;
use arrow::datatypes::SchemaRef;
use arrow::util::pretty;
use datafusion::common::{Result, ToDFSchema};
use datafusion::config::{
Expand All @@ -41,14 +36,12 @@ use object_store::path::Path;
use object_store::ObjectMeta;
use parquet::arrow::ArrowWriter;
use parquet::file::properties::WriterProperties;
use rand::rngs::StdRng;
use rand::{Rng, SeedableRng};
use std::fs::File;
use std::ops::Range;
use std::path::PathBuf;
use std::sync::Arc;
use std::time::Instant;
use structopt::StructOpt;
use test_utils::AccessLogGenerator;

#[cfg(feature = "snmalloc")]
#[global_allocator]
Expand Down Expand Up @@ -96,13 +89,14 @@ async fn main() -> Result<()> {

let path = opt.path.join("logs.parquet");

let (object_store_url, object_meta) =
let (schema, object_store_url, object_meta) =
gen_data(path, opt.scale_factor, opt.page_size, opt.row_group_size)?;

run_benchmarks(
&mut ctx,
object_store_url.clone(),
object_meta.clone(),
schema,
object_store_url,
object_meta,
opt.iterations,
opt.debug,
)
Expand All @@ -120,6 +114,7 @@ struct ParquetScanOptions {

async fn run_benchmarks(
ctx: &mut SessionContext,
schema: SchemaRef,
object_store_url: ObjectStoreUrl,
object_meta: ObjectMeta,
iterations: usize,
Expand Down Expand Up @@ -179,6 +174,7 @@ async fn run_benchmarks(
let start = Instant::now();
let rows = exec_scan(
ctx,
schema.clone(),
object_store_url.clone(),
object_meta.clone(),
filter_expr.clone(),
Expand All @@ -201,14 +197,13 @@ async fn run_benchmarks(

async fn exec_scan(
ctx: &SessionContext,
schema: SchemaRef,
object_store_url: ObjectStoreUrl,
object_meta: ObjectMeta,
filter: Expr,
scan_options: ParquetScanOptions,
debug: bool,
) -> Result<usize> {
let schema = BatchBuilder::schema();

let ParquetScanOptions {
pushdown_filters,
reorder_filters,
Expand Down Expand Up @@ -263,8 +258,8 @@ fn gen_data(
scale_factor: f32,
page_size: Option<usize>,
row_group_size: Option<usize>,
) -> Result<(ObjectStoreUrl, ObjectMeta)> {
let generator = Generator::new();
) -> Result<(SchemaRef, ObjectStoreUrl, ObjectMeta)> {
let generator = AccessLogGenerator::new();

let file = File::create(&path).unwrap();

Expand All @@ -280,9 +275,9 @@ fn gen_data(
props_builder = props_builder.set_max_row_group_size(s);
}

let schema = generator.schema();
let mut writer =
ArrowWriter::try_new(file, generator.schema.clone(), Some(props_builder.build()))
.unwrap();
ArrowWriter::try_new(file, schema.clone(), Some(props_builder.build())).unwrap();

let mut num_rows = 0;

Expand Down Expand Up @@ -311,203 +306,5 @@ fn gen_data(
size,
};

Ok((object_store_url, object_meta))
}

#[derive(Default)]
struct BatchBuilder {
service: StringDictionaryBuilder<Int32Type>,
host: StringDictionaryBuilder<Int32Type>,
pod: StringDictionaryBuilder<Int32Type>,
container: StringDictionaryBuilder<Int32Type>,
image: StringDictionaryBuilder<Int32Type>,
time: TimestampNanosecondBuilder,
client_addr: StringBuilder,
request_duration: Int32Builder,
request_user_agent: StringBuilder,
request_method: StringBuilder,
request_host: StringBuilder,
request_bytes: Int32Builder,
response_bytes: Int32Builder,
response_status: UInt16Builder,
}

impl BatchBuilder {
fn schema() -> SchemaRef {
let utf8_dict =
|| DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8));

Arc::new(Schema::new(vec![
Field::new("service", utf8_dict(), true),
Field::new("host", utf8_dict(), false),
Field::new("pod", utf8_dict(), false),
Field::new("container", utf8_dict(), false),
Field::new("image", utf8_dict(), false),
Field::new(
"time",
DataType::Timestamp(TimeUnit::Nanosecond, None),
false,
),
Field::new("client_addr", DataType::Utf8, true),
Field::new("request_duration_ns", DataType::Int32, false),
Field::new("request_user_agent", DataType::Utf8, true),
Field::new("request_method", DataType::Utf8, true),
Field::new("request_host", DataType::Utf8, true),
Field::new("request_bytes", DataType::Int32, true),
Field::new("response_bytes", DataType::Int32, true),
Field::new("response_status", DataType::UInt16, false),
]))
}

fn append(&mut self, rng: &mut StdRng, host: &str, service: &str) {
let num_pods = rng.gen_range(1..15);
let pods = generate_sorted_strings(rng, num_pods, 30..40);
for pod in pods {
for container_idx in 0..rng.gen_range(1..3) {
let container = format!("{}_container_{}", service, container_idx);
let image = format!(
"{}@sha256:30375999bf03beec2187843017b10c9e88d8b1a91615df4eb6350fb39472edd9",
container
);

let num_entries = rng.gen_range(1024..8192);
for i in 0..num_entries {
let time = i as i64 * 1024;
self.append_row(rng, host, &pod, service, &container, &image, time);
}
}
}
}

#[allow(clippy::too_many_arguments)]
fn append_row(
&mut self,
rng: &mut StdRng,
host: &str,
pod: &str,
service: &str,
container: &str,
image: &str,
time: i64,
) {
let methods = &["GET", "PUT", "POST", "HEAD", "PATCH", "DELETE"];
let status = &[200, 204, 400, 503, 403];

self.service.append(service).unwrap();
self.host.append(host).unwrap();
self.pod.append(pod).unwrap();
self.container.append(container).unwrap();
self.image.append(image).unwrap();
self.time.append_value(time);

self.client_addr.append_value(format!(
"{}.{}.{}.{}",
rng.gen::<u8>(),
rng.gen::<u8>(),
rng.gen::<u8>(),
rng.gen::<u8>()
));
self.request_duration.append_value(rng.gen());
self.request_user_agent
.append_value(random_string(rng, 20..100));
self.request_method
.append_value(methods[rng.gen_range(0..methods.len())]);
self.request_host
.append_value(format!("https://{}.mydomain.com", service));

self.request_bytes
.append_option(rng.gen_bool(0.9).then(|| rng.gen()));
self.response_bytes
.append_option(rng.gen_bool(0.9).then(|| rng.gen()));
self.response_status
.append_value(status[rng.gen_range(0..status.len())]);
}

fn finish(mut self, schema: SchemaRef) -> RecordBatch {
RecordBatch::try_new(
schema,
vec![
Arc::new(self.service.finish()),
Arc::new(self.host.finish()),
Arc::new(self.pod.finish()),
Arc::new(self.container.finish()),
Arc::new(self.image.finish()),
Arc::new(self.time.finish()),
Arc::new(self.client_addr.finish()),
Arc::new(self.request_duration.finish()),
Arc::new(self.request_user_agent.finish()),
Arc::new(self.request_method.finish()),
Arc::new(self.request_host.finish()),
Arc::new(self.request_bytes.finish()),
Arc::new(self.response_bytes.finish()),
Arc::new(self.response_status.finish()),
],
)
.unwrap()
}
}

fn random_string(rng: &mut StdRng, len_range: Range<usize>) -> String {
let len = rng.gen_range(len_range);
(0..len)
.map(|_| rng.gen_range(b'a'..=b'z') as char)
.collect::<String>()
}

fn generate_sorted_strings(
rng: &mut StdRng,
count: usize,
str_len: Range<usize>,
) -> Vec<String> {
let mut strings: Vec<_> = (0..count)
.map(|_| random_string(rng, str_len.clone()))
.collect();

strings.sort_unstable();
strings
}

/// Generates sorted RecordBatch with an access log style schema for a single host
#[derive(Debug)]
struct Generator {
schema: SchemaRef,
rng: StdRng,
host_idx: usize,
}

impl Generator {
fn new() -> Self {
let seed = [
1, 0, 0, 0, 23, 0, 3, 0, 200, 1, 0, 0, 210, 30, 8, 0, 1, 0, 21, 0, 6, 0, 0,
0, 0, 0, 5, 0, 0, 0, 0, 0,
];

Self {
schema: BatchBuilder::schema(),
host_idx: 0,
rng: StdRng::from_seed(seed),
}
}
}

impl Iterator for Generator {
type Item = RecordBatch;

fn next(&mut self) -> Option<Self::Item> {
let mut builder = BatchBuilder::default();

let host = format!(
"i-{:016x}.ec2.internal",
self.host_idx * 0x7d87f8ed5c5 + 0x1ec3ca3151468928
);
self.host_idx += 1;

for service in &["frontend", "backend", "database", "cache"] {
if self.rng.gen_bool(0.5) {
continue;
}
builder.append(&mut self.rng, &host, service);
}
Some(builder.finish(Arc::clone(&self.schema)))
}
Ok((schema, object_store_url, object_meta))
}
2 changes: 1 addition & 1 deletion datafusion/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -105,8 +105,8 @@ csv = "1.1.6"
ctor = "0.1.22"
doc-comment = "0.3"
env_logger = "0.9"
fuzz-utils = { path = "fuzz-utils" }
rstest = "0.15.0"
test-utils = { path = "../../test-utils" }

[[bench]]
harness = false
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/tests/join_fuzz.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ use datafusion::physical_plan::memory::MemoryExec;
use datafusion_expr::JoinType;

use datafusion::prelude::{SessionConfig, SessionContext};
use fuzz_utils::add_empty_batches;
use test_utils::add_empty_batches;

#[tokio::test]
async fn test_inner_join_1k() {
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/tests/merge_fuzz.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ use datafusion::physical_plan::{
sorts::sort_preserving_merge::SortPreservingMergeExec,
};
use datafusion::prelude::{SessionConfig, SessionContext};
use fuzz_utils::{add_empty_batches, batches_to_vec, partitions_to_sorted_vec};
use rand::{prelude::StdRng, Rng, SeedableRng};
use test_utils::{add_empty_batches, batches_to_vec, partitions_to_sorted_vec};

#[tokio::test]
async fn test_merge_2() {
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/tests/order_spill_fuzz.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,10 @@ use datafusion::physical_plan::memory::MemoryExec;
use datafusion::physical_plan::sorts::sort::SortExec;
use datafusion::physical_plan::{collect, ExecutionPlan};
use datafusion::prelude::{SessionConfig, SessionContext};
use fuzz_utils::{add_empty_batches, batches_to_vec, partitions_to_sorted_vec};
use rand::prelude::StdRng;
use rand::{Rng, SeedableRng};
use std::sync::Arc;
use test_utils::{add_empty_batches, batches_to_vec, partitions_to_sorted_vec};

#[tokio::test]
#[cfg_attr(tarpaulin, ignore)]
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/tests/sql/order.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
// under the License.

use super::*;
use fuzz_utils::{batches_to_vec, partitions_to_sorted_vec};
use test_utils::{batches_to_vec, partitions_to_sorted_vec};

#[tokio::test]
async fn test_sort_unprojected_col() -> Result<()> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
# under the License.

[package]
name = "fuzz-utils"
name = "test-utils"
version = "0.1.0"
edition = "2021"

Expand Down
Loading