Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dynamic reads #132

Open
wants to merge 12 commits into
base: master
Choose a base branch
from
2 changes: 1 addition & 1 deletion .rustfmt.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
hard_tabs = true
imports_layout = "Horizontal"
merge_imports = true
imports_granularity = "Crate"
fn_args_layout = "Compressed"
use_field_init_shorthand = true

Expand Down
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ maintenance = { status = "actively-developed" }
constellation = ["bincode", "constellation-rs", "serde_traitobject"]
aws = ["amadeus-aws"]
commoncrawl = ["amadeus-commoncrawl"]
parquet = ["amadeus-parquet", "amadeus-derive/parquet"]
parquet = ["amadeus-parquet", "amadeus-derive/parquet", "amadeus-serde", "amadeus-derive/serde"]
postgres = ["amadeus-postgres", "amadeus-derive/postgres"]
csv = ["amadeus-serde", "amadeus-derive/serde"]
json = ["amadeus-serde", "amadeus-derive/serde"]
Expand Down
20 changes: 10 additions & 10 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ async fn main() -> Result<(), Box<dyn Error>> {
"us-east-1.data-analytics",
"cflogworkshop/optimized/cf-accesslogs/",
AwsCredentials::Anonymous,
)))
)), None)
.await?;

let top_pages = rows
Expand All @@ -130,27 +130,27 @@ This is typed, so faster, and it goes an analytics step further also, prints top
```rust
use amadeus::prelude::*;
use std::error::Error;
use amadeus::helpers::{FilterNullsAndDoubleUnwrap, GetFieldFromValue};
use amadeus::amadeus_parquet::get_row_predicate;

#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
let pool = ThreadPool::new(None, None)?;

let column_name = "uri".to_string();

let rows = Parquet::new(ParquetDirectory::new(S3Directory::new_with(
AwsRegion::UsEast1,
"us-east-1.data-analytics",
"cflogworkshop/optimized/cf-accesslogs/",
AwsCredentials::Anonymous,
)))
)), get_row_predicate(vec![column_name.clone()]))
.await?;

let top_pages = rows
.par_stream()
.map(|row: Result<Value, _>| {
let row = row.ok()?.into_group().ok()?;
row.get("uri")?.clone().into_url().ok()
})
.filter(|row| row.is_some())
.map(Option::unwrap)
.get_field_from_value::<Option<String>>(column_name.clone())
.filter_nulls_and_double_unwrap()
.most_frequent(&pool, 100, 0.99, 0.002)
.await;

Expand All @@ -176,7 +176,7 @@ async fn main() -> Result<(), Box<dyn Error>> {
"us-east-1.data-analytics",
"cflogworkshop/optimized/cf-accesslogs/",
AwsCredentials::Anonymous,
)))
)), None)
.await?;

// Note: this isn't yet implemented!
Expand Down Expand Up @@ -222,7 +222,7 @@ fn main() -> Result<(), Box<dyn Error>> {
"us-east-1.data-analytics",
"cflogworkshop/optimized/cf-accesslogs/",
AwsCredentials::Anonymous,
)))
)), None)
.await?;

let top_pages = rows
Expand Down
6 changes: 4 additions & 2 deletions amadeus-aws/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
//! This is a support crate of [Amadeus](https://github.com/constellation-rs/amadeus) and is not intended to be used directly. These types are re-exposed in [`amadeus::source`](https://docs.rs/amadeus/0.3/amadeus/source/index.html).

#![doc(html_root_url = "https://docs.rs/amadeus-aws/0.4.2")]
#![cfg_attr(nightly, feature(type_alias_impl_trait))]
#![cfg_attr(nightly, feature(min_type_alias_impl_trait))]
#![warn(
// missing_copy_implementations,
// missing_debug_implementations,
Expand All @@ -26,7 +26,9 @@
clippy::must_use_candidate,
clippy::type_repetition_in_bounds,
clippy::filter_map,
clippy::missing_errors_doc
clippy::missing_errors_doc,
clippy::missing_panics_doc,
clippy::let_underscore_drop
)]
#![deny(unsafe_code)]

Expand Down
6 changes: 4 additions & 2 deletions amadeus-commoncrawl/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
//! This is a support crate of [Amadeus](https://github.com/constellation-rs/amadeus) and is not intended to be used directly. These types are re-exposed in [`amadeus::source`](https://docs.rs/amadeus/0.3/amadeus/source/index.html).

#![doc(html_root_url = "https://docs.rs/amadeus-commoncrawl/0.4.2")]
#![cfg_attr(nightly, feature(type_alias_impl_trait))]
#![cfg_attr(nightly, feature(min_type_alias_impl_trait))]
#![warn(
// missing_copy_implementations,
// missing_debug_implementations,
Expand All @@ -22,7 +22,9 @@
#![allow(
clippy::doc_markdown,
clippy::inline_always,
clippy::missing_errors_doc
clippy::missing_errors_doc,
clippy::missing_panics_doc,
clippy::let_underscore_drop
)]
#![deny(unsafe_code)]

Expand Down
5 changes: 1 addition & 4 deletions amadeus-commoncrawl/src/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,7 @@ impl<'a> fmt::Debug for Record<'a> {
// write!(form, "\n").unwrap();
// }
writeln!(form, "Content Length:{}", self.content.len()).unwrap();
let s = match str::from_utf8(self.content) {
Ok(s) => s,
Err(_) => "Could not convert",
};
let s = str::from_utf8(self.content).unwrap_or("Could not convert");
writeln!(form, "Content :{:?}", s).unwrap();
writeln!(form)
}
Expand Down
7 changes: 3 additions & 4 deletions amadeus-core/src/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ impl OsString {
pub fn to_string_lossy(&self) -> String {
self.buf.to_string_lossy()
}
pub fn display<'a>(&'a self) -> impl fmt::Display + 'a {
pub fn display(&self) -> impl fmt::Display + '_ {
struct Display<'a>(&'a OsString);
impl<'a> fmt::Display for Display<'a> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
Expand Down Expand Up @@ -160,15 +160,14 @@ impl PathBuf {
pub fn iter<'a>(&'a self) -> impl Iterator<Item = &OsString> + 'a {
self.components.iter()
}
pub fn display<'a>(&'a self) -> impl fmt::Display + 'a {
pub fn display(&self) -> impl fmt::Display + '_ {
struct Display<'a>(&'a PathBuf);
impl<'a> fmt::Display for Display<'a> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let mut res: fmt::Result = self
.0
.iter()
.map(|component| write!(f, "{}/", component.to_string_lossy()))
.collect();
.try_for_each(|component| write!(f, "{}/", component.to_string_lossy()));
if let Some(file_name) = self.0.file_name() {
res = res.and_then(|()| write!(f, "{}", file_name.to_string_lossy()));
}
Expand Down
8 changes: 6 additions & 2 deletions amadeus-core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
//! This is a support crate of [Amadeus](https://github.com/constellation-rs/amadeus) and is not intended to be used directly. All functionality is re-exposed in [`amadeus`](https://docs.rs/amadeus/0.3/amadeus/).

#![doc(html_root_url = "https://docs.rs/amadeus-core/0.4.2")]
#![cfg_attr(nightly, feature(unboxed_closures))]
#![cfg_attr(nightly, feature(unboxed_closures, fn_traits))]
#![recursion_limit = "25600"]
#![warn(
// missing_copy_implementations,
Expand All @@ -33,7 +33,11 @@
clippy::default_trait_access,
clippy::filter_map,
clippy::wildcard_imports,
clippy::needless_pass_by_value
clippy::needless_pass_by_value,
clippy::unnecessary_wraps,
clippy::missing_panics_doc,
clippy::let_underscore_drop,
clippy::unnested_or_patterns
)]
#![deny(unsafe_code)]

Expand Down
6 changes: 5 additions & 1 deletion amadeus-core/src/par_sink/combiner.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
#![allow(unused_imports, clippy::single_component_path_imports, clippy::option_if_let_else)]
#![allow(
unused_imports,
clippy::single_component_path_imports,
clippy::option_if_let_else
)]

use super::FolderSync;

Expand Down
2 changes: 1 addition & 1 deletion amadeus-core/src/par_sink/folder.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
#![allow(unused_imports,clippy::single_component_path_imports)]
#![allow(unused_imports, clippy::single_component_path_imports)]

use derive_new::new;
use educe::Educe;
Expand Down
10 changes: 9 additions & 1 deletion amadeus-core/src/par_sink/tuple.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,12 @@
#![allow(non_snake_case, clippy::type_complexity, irrefutable_let_patterns, clippy::new_without_default, unused_mut, unreachable_code, clippy::too_many_arguments)]
#![allow(
non_snake_case,
clippy::type_complexity,
irrefutable_let_patterns,
clippy::new_without_default,
unused_mut,
unreachable_code,
clippy::too_many_arguments
)]

use derive_new::new;
use futures::{pin_mut, ready, stream, Stream, StreamExt};
Expand Down
8 changes: 4 additions & 4 deletions amadeus-core/src/par_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use futures::{future, pin_mut, stream::StreamExt as _, Stream};
use indexmap::IndexMap;
use serde_closure::{traits, FnOnce};
use std::{
cmp::Ordering, hash::Hash, iter, ops, pin::Pin, task::{Context, Poll}, vec
cmp::Ordering, hash::Hash, iter, ops, pin::Pin, task::{Context, Poll}
};

use super::{par_pipe::*, par_sink::*};
Expand Down Expand Up @@ -434,7 +434,7 @@ stream!(ParallelStream ParallelPipe ParallelSink FromParallelStream IntoParallel
let self_ = self;
pin_mut!(self_);
// TODO: don't buffer tasks before sending. requires changes to ThreadPool
let mut tasks = (0..pool.threads()).map(|_| vec![]).collect::<Vec<_>>();
let mut tasks = (0..pool.threads()).map(|_| Vec::new()).collect::<Vec<_>>();
let mut allocated = 0;
'a: loop {
for i in 0..tasks.len() {
Expand Down Expand Up @@ -597,7 +597,7 @@ stream!(DistributedStream DistributedPipe DistributedSink FromDistributedStream
let self_ = self;
pin_mut!(self_);
// TODO: don't buffer tasks before sending. requires changes to ProcessPool
let mut tasks = (0..pool.processes()).map(|_| vec![]).collect::<Vec<_>>();
let mut tasks = (0..pool.processes()).map(|_| Vec::new()).collect::<Vec<_>>();
let mut allocated = 0;
'a: loop {
for i in 0..tasks.len() {
Expand Down Expand Up @@ -647,7 +647,7 @@ stream!(DistributedStream DistributedPipe DistributedSink FromDistributedStream
pool.spawn(FnOnce!(move |pool: &P::ThreadPool| {
let mut process_tasks = tasks.into_iter();

let mut tasks = (0..pool.threads()).map(|_| vec![]).collect::<Vec<_>>();
let mut tasks = (0..pool.threads()).map(|_| Vec::new()).collect::<Vec<_>>();
let mut allocated = 0;
'a: loop {
for i in 0..tasks.len() {
Expand Down
6 changes: 2 additions & 4 deletions amadeus-core/src/pipe/flat_map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,8 @@ where
if let Some(s) = self_.next.as_mut().as_pin_mut() {
if let Some(item) = ready!(s.poll_next(cx)) {
break Some(item);
} else {
self_.next.set(None);
}
self_.next.set(None);
} else if let Some(s) = ready!(self_.pipe.as_mut().poll_next(cx)) {
self_.next.set(Some(self_.f.call_mut((s,))));
} else {
Expand All @@ -61,9 +60,8 @@ where
if let Some(s) = self_.next.as_mut().as_pin_mut() {
if let Some(item) = ready!(s.poll_next(cx)) {
break Some(item);
} else {
self_.next.set(None);
}
self_.next.set(None);
} else if let Some(s) = ready!(self_.pipe.as_mut().poll_next(cx, stream.as_mut())) {
self_.next.set(Some(self_.f.call_mut((s,))));
} else {
Expand Down
6 changes: 2 additions & 4 deletions amadeus-core/src/pipe/flat_map_sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,8 @@ where
if let Some(s) = self_.next.as_mut() {
if let Some(item) = s.next() {
break Some(item);
} else {
*self_.next = None;
}
*self_.next = None;
} else if let Some(s) = ready!(self_.pipe.as_mut().poll_next(cx)) {
*self_.next = Some(self_.f.call_mut((s,)));
} else {
Expand All @@ -60,9 +59,8 @@ where
if let Some(s) = self_.next.as_mut() {
if let Some(item) = s.next() {
break Some(item);
} else {
*self_.next = None;
}
*self_.next = None;
} else if let Some(s) = ready!(self_.pipe.as_mut().poll_next(cx, stream.as_mut())) {
*self_.next = Some(self_.f.call_mut((s,)));
} else {
Expand Down
6 changes: 2 additions & 4 deletions amadeus-core/src/pipe/flatten.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,8 @@ where
if let Some(s) = self_.next.as_mut().as_pin_mut() {
if let Some(item) = ready!(s.poll_next(cx)) {
break Some(item);
} else {
self_.next.set(None);
}
self_.next.set(None);
} else if let Some(s) = ready!(self_.pipe.as_mut().poll_next(cx)) {
self_.next.set(Some(s));
} else {
Expand All @@ -57,9 +56,8 @@ where
if let Some(s) = self_.next.as_mut().as_pin_mut() {
if let Some(item) = ready!(s.poll_next(cx)) {
break Some(item);
} else {
self_.next.set(None);
}
self_.next.set(None);
} else if let Some(s) = ready!(self_.pipe.as_mut().poll_next(cx, stream.as_mut())) {
self_.next.set(Some(s));
} else {
Expand Down
9 changes: 9 additions & 0 deletions amadeus-derive/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,14 @@ fn impl_struct(
};
});

let predicate_derives = if cfg!(feature = "serde") {
Some(quote! {
#[derive(Clone, Debug, __::Serialize, __::Deserialize)]
})
} else {
None
};

parquet_derives = Some(quote! {
#visibility struct #schema_name #impl_generics #where_clause_with_parquet_data {
#(#field_names1: <#field_types1 as __::ParquetData>::Schema,)*
Expand Down Expand Up @@ -300,6 +308,7 @@ fn impl_struct(
#visibility struct #reader_name #impl_generics #where_clause_with_parquet_data {
#(#field_names1: <#field_types1 as __::ParquetData>::Reader,)*
}
#predicate_derives
#visibility struct #predicate_name #impl_generics #where_clause_with_parquet_data {
#(#field_names1: __::Option<<#field_types1 as __::ParquetData>::Predicate>,)*
}
Expand Down
2 changes: 1 addition & 1 deletion amadeus-parquet/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ educe = "0.4"
flate2 = { version = "1.0.2", features = ["rust_backend"], default-features = false }
futures = "0.3"
fxhash = "0.2"
hashlink = "0.5"
hashlink = { version = "0.6.1-alpha.0", features = ["serde_impl"], git = "https://github.com/robinbernon/hashlink", branch = "generic_hashmap_deserialization" }
lz-fear = "0.1"
num-bigint = "0.3"
quick-error = "1.2.2"
Expand Down
4 changes: 2 additions & 2 deletions amadeus-parquet/src/internal/file/statistics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,8 @@ macro_rules! statistics_enum_func {
Statistics::Double(ref typed) => typed.$func(),
Statistics::ByteArray(ref typed) => typed.$func(),
Statistics::FixedLenByteArray(ref typed) => typed.$func(),
}
}};
}
}};
}

/// Converts Thrift definition into `Statistics`.
Expand Down
6 changes: 4 additions & 2 deletions amadeus-parquet/src/internal/record/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ mod schemas;
mod triplet;
pub mod types;

use serde::{Deserialize, Serialize};
use std::{
collections::HashMap, fmt::{self, Debug}
};
Expand All @@ -65,7 +66,8 @@ pub use schemas::RootSchema;
mod predicate {
/// This is for forward compatibility when Predicate pushdown and dynamic schemas are
/// implemented.
#[derive(Clone, Debug)]
use serde::{Deserialize, Serialize};
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct Predicate;
}
pub(crate) use self::predicate::Predicate;
Expand Down Expand Up @@ -154,7 +156,7 @@ pub trait ParquetData: Data + Sized {
// Clone + PartialEq + Debug + 'static
type Schema: Schema;
type Reader: Reader<Item = Self>;
type Predicate;
type Predicate: Clone + Debug + Serialize + for<'de> Deserialize<'de> + Send + 'static;

/// Parse a [`Type`] into `Self::Schema`, using `repetition` instead of
/// `Type::get_basic_info().repetition()`. A `repetition` of `None` denotes a root
Expand Down
Loading