Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

dataflow-types: proptest for protobuf support DataflowDescription. #12353

Merged
merged 2 commits into from
May 11, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions src/dataflow-types/src/client/controller/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ use anyhow::anyhow;
use async_trait::async_trait;
use differential_dataflow::lattice::Lattice;
use mz_repr::proto::TryFromProtoError;
use proptest::prelude::{Arbitrary, BoxedStrategy, Just};
use proptest::strategy::Strategy;
use serde::{Deserialize, Serialize};
use timely::order::{PartialOrder, TotalOrder};
use timely::progress::frontier::MutableAntichain;
Expand Down Expand Up @@ -169,6 +171,23 @@ impl TryFrom<ProtoCollectionMetadata> for CollectionMetadata {
}
}

impl Arbitrary for CollectionMetadata {
type Strategy = BoxedStrategy<Self>;
type Parameters = ();

fn arbitrary_with(_: Self::Parameters) -> Self::Strategy {
// TODO (#12359): derive Arbitrary after CollectionMetadata
// gains proper protobuf support.
Just(CollectionMetadata {
persist_location: PersistLocation {
blob_uri: "".to_string(),
consensus_uri: "".to_string(),
},
})
.boxed()
}
}

/// Controller state maintained for each storage instance.
#[derive(Debug)]
pub struct StorageControllerState<T, S = mz_stash::Sqlite> {
Expand Down
138 changes: 126 additions & 12 deletions src/dataflow-types/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,20 @@
use std::collections::{BTreeMap, BTreeSet};
use std::num::NonZeroUsize;

use proptest::prelude::{any, Arbitrary};
use proptest::strategy::{BoxedStrategy, Strategy};
use proptest_derive::Arbitrary;
use serde::{Deserialize, Serialize};
use timely::progress::frontier::Antichain;

use mz_expr::{CollectionPlan, MirRelationExpr, MirScalarExpr, OptimizedMirRelationExpr};
use mz_repr::proto::{FromProtoIfSome, ProtoRepr, TryFromProtoError, TryIntoIfSome};
use mz_repr::proto::{any_uuid, FromProtoIfSome, ProtoRepr, TryFromProtoError, TryIntoIfSome};
use mz_repr::{Diff, GlobalId, RelationType, Row};

use crate::client::controller::storage::CollectionMetadata;
use crate::types::sinks::SinkDesc;
use crate::types::sources::SourceDesc;
use crate::Plan;

include!(concat!(env!("OUT_DIR"), "/mz_dataflow_types.types.rs"));

Expand Down Expand Up @@ -83,7 +87,7 @@ pub struct Update<T = mz_repr::Timestamp> {
pub type DataflowDesc = DataflowDescription<OptimizedMirRelationExpr, ()>;

/// An association of a global identifier to an expression.
#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
#[derive(Arbitrary, Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
pub struct BuildDesc<P> {
pub id: GlobalId,
pub plan: P,
Expand Down Expand Up @@ -116,7 +120,6 @@ impl TryFrom<ProtoBuildDesc> for BuildDesc<crate::plan::Plan> {
/// projection to the records as they emerge.
#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
pub struct SourceInstanceDesc<M> {
// TODO: have the proptest strategy be `any_source_desc_stub()`
/// A description of the source to construct.
pub description: crate::types::sources::SourceDesc,
/// Arguments for this instantiation of the source.
Expand All @@ -126,6 +129,24 @@ pub struct SourceInstanceDesc<M> {
pub storage_metadata: M,
}

impl Arbitrary for SourceInstanceDesc<CollectionMetadata> {
type Strategy = BoxedStrategy<Self>;
type Parameters = ();

fn arbitrary_with(_: Self::Parameters) -> Self::Strategy {
(
any::<SourceInstanceArguments>(),
any::<CollectionMetadata>(),
)
.prop_map(|(arguments, storage_metadata)| SourceInstanceDesc {
description: crate::sources::any_source_desc_stub(),
arguments,
storage_metadata,
})
.boxed()
}
}

impl From<&SourceInstanceDesc<CollectionMetadata>> for ProtoSourceInstanceDesc {
fn from(x: &SourceInstanceDesc<CollectionMetadata>) -> Self {
ProtoSourceInstanceDesc {
Expand Down Expand Up @@ -153,7 +174,7 @@ impl TryFrom<ProtoSourceInstanceDesc> for SourceInstanceDesc<CollectionMetadata>
}

/// Per-source construction arguments.
#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
#[derive(Arbitrary, Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
pub struct SourceInstanceArguments {
/// Optional linear operators that can be applied record-by-record.
pub operators: Option<crate::LinearOperator>,
Expand Down Expand Up @@ -229,6 +250,66 @@ pub struct DataflowDescription<P, S = (), T = mz_repr::Timestamp> {
pub id: uuid::Uuid,
}

fn any_source_import() -> impl Strategy<Value = (GlobalId, SourceInstanceDesc<CollectionMetadata>)>
{
(
any::<GlobalId>(),
any::<SourceInstanceDesc<CollectionMetadata>>(),
)
}

proptest::prop_compose! {
fn any_dataflow_index()(
id in any::<GlobalId>(),
index in any::<IndexDesc>(),
typ in any::<RelationType>()
) -> (GlobalId, (IndexDesc, RelationType)) {
(id, (index, typ))
}
}

proptest::prop_compose! {
fn any_dataflow_description()(
source_imports in proptest::collection::vec(any_source_import(), 1..3),
index_imports in proptest::collection::vec(any_dataflow_index(), 1..3),
objects_to_build in proptest::collection::vec(any::<BuildDesc<Plan>>(), 1..3),
index_exports in proptest::collection::vec(any_dataflow_index(), 1..3),
sink_ids in proptest::collection::vec(any::<GlobalId>(), 1..3),
as_of_some in any::<bool>(),
as_of in proptest::collection::vec(any::<u64>(), 1..5),
debug_name in ".*",
id in any_uuid(),
) -> DataflowDescription<Plan, CollectionMetadata, mz_repr::Timestamp> {
DataflowDescription {
source_imports: BTreeMap::from_iter(source_imports.into_iter()),
index_imports: BTreeMap::from_iter(index_imports.into_iter()),
objects_to_build,
index_exports: BTreeMap::from_iter(index_exports.into_iter()),
sink_exports: BTreeMap::from_iter(
sink_ids
.into_iter()
.map(|id| (id, crate::sinks::any_sink_desc_stub())),
),
as_of: if as_of_some {
Some(Antichain::from(as_of))
} else {
None
},
debug_name,
id,
}
}
}

impl Arbitrary for DataflowDescription<Plan, CollectionMetadata, mz_repr::Timestamp> {
type Strategy = BoxedStrategy<Self>;
type Parameters = ();

fn arbitrary_with(_: Self::Parameters) -> Self::Strategy {
any_dataflow_description().boxed()
}
}

impl<T> DataflowDescription<OptimizedMirRelationExpr, (), T> {
/// Creates a new dataflow description with a human-readable name.
pub fn new(name: String) -> Self {
Expand Down Expand Up @@ -596,7 +677,6 @@ pub mod sources {
use mz_persist_client::read::ReadHandle;
use mz_persist_client::{PersistLocation, ShardId};
use mz_persist_types::Codec64;
use proptest::strategy::{Just, Strategy};
use prost::Message;
use serde::{Deserialize, Serialize};
use timely::progress::Timestamp;
Expand Down Expand Up @@ -1386,15 +1466,14 @@ pub mod sources {
}

/// A stub for generating arbitrary [SourceDesc].
/// Produces the simplest possible instance of it.
#[allow(dead_code)]
fn any_source_desc_stub() -> impl Strategy<Value = SourceDesc> {
Just(SourceDesc {
/// Currently only produces the simplest instance of one.
pub(super) fn any_source_desc_stub() -> SourceDesc {
SourceDesc {
connector: SourceConnector::Local {
timeline: Timeline::EpochMilliseconds,
},
desc: RelationDesc::empty(),
})
}
}

/// A `SourceConnector` describes how data is produced for a source, be
Expand Down Expand Up @@ -1994,6 +2073,21 @@ pub mod sinks {
pub as_of: SinkAsOf<T>,
}

/// A stub for generating arbitrary [SinkDesc].
/// Currently only produces the simplest instance of one.
pub(super) fn any_sink_desc_stub() -> SinkDesc<mz_repr::Timestamp> {
SinkDesc {
from: GlobalId::Explain,
from_desc: RelationDesc::empty(),
connector: SinkConnector::Tail(TailSinkConnector {}),
envelope: None,
as_of: SinkAsOf {
frontier: Antichain::new(),
strict: false,
},
}
}

#[derive(Copy, Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
pub enum SinkEnvelope {
Debezium,
Expand Down Expand Up @@ -2161,11 +2255,12 @@ pub mod sinks {

/// An index storing processed updates so they can be queried
/// or reused in other computations
#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize, Hash)]
#[derive(Arbitrary, Clone, Debug, Eq, PartialEq, Serialize, Deserialize, Hash)]
pub struct IndexDesc {
/// Identity of the collection the index is on.
pub on_id: GlobalId,
/// Expressions to be arranged, in order of decreasing primacy.
#[proptest(strategy = "proptest::collection::vec(any::<MirScalarExpr>(), 1..3)")]
pub key: Vec<MirScalarExpr>,
}

Expand Down Expand Up @@ -2206,9 +2301,10 @@ impl TryFrom<ProtoIndexDesc> for IndexDesc {
/// applied, and columns not in projection can then be overwritten with
/// default values. This allows the projection to avoid capturing columns
/// used by the predicates but not otherwise required.
#[derive(Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Serialize, Deserialize, Hash)]
#[derive(Arbitrary, Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Serialize, Deserialize, Hash)]
pub struct LinearOperator {
/// Rows that do not pass all predicates may be discarded.
#[proptest(strategy = "proptest::collection::vec(any::<MirScalarExpr>(), 0..2)")]
pub predicates: Vec<MirScalarExpr>,
/// Columns not present in `projection` may be replaced with
/// default values.
Expand Down Expand Up @@ -2250,3 +2346,21 @@ impl LinearOperator {
self.predicates.is_empty() && self.projection.iter().copied().eq(0..arity)
}
}

#[cfg(test)]
mod tests {
use super::*;
use mz_repr::proto::protobuf_roundtrip;
use proptest::prelude::*;

proptest! {
#![proptest_config(ProptestConfig::with_cases(32))]

#[test]
fn dataflow_description_protobuf_roundtrip(expect in any::<DataflowDescription<Plan, CollectionMetadata, mz_repr::Timestamp>>()) {
let actual = protobuf_roundtrip::<_, ProtoDataflowDescription>(&expect);
assert!(actual.is_ok());
assert_eq!(actual.unwrap(), expect);
}
}
}