Skip to content

Commit

Permalink
Update to latest Timely
Browse files Browse the repository at this point in the history
Bring in changes related to abomination and reference-counted addresses.

Signed-off-by: Moritz Hoffmann <antiguru@gmail.com>
  • Loading branch information
antiguru committed Sep 9, 2024
1 parent 7998e65 commit 300c403
Show file tree
Hide file tree
Showing 16 changed files with 37 additions and 170 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ timely = {workspace = true}

[workspace.dependencies]
#timely = { version = "0.12", default-features = false }
timely = { git = "https://github.com/TimelyDataflow/timely-dataflow", default-features = false }
timely = { git = "https://github.com/TimelyDataflow/timely-dataflow", default-features = false, features = ["bincode"]}
#timely = { path = "../timely-dataflow/timely/", default-features = false }

[features]
Expand Down
2 changes: 1 addition & 1 deletion examples/arrange.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ fn main() {
// create a source operator which will produce random edges and delete them.
timely::dataflow::operators::generic::source(scope, "RandomGraph", |mut capability, info| {

let activator = scope.activator_for(&info.address[..]);
let activator = scope.activator_for(info.address);

let seed: &[_] = &[1, 2, 3, index];
let mut rng1: StdRng = SeedableRng::from_seed(seed); // rng for edge additions
Expand Down
19 changes: 9 additions & 10 deletions src/capture.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,10 @@
//! this file.

use std::time::Duration;
use abomonation_derive::Abomonation;
use serde::{Deserialize, Serialize};

/// A message in the CDC V2 protocol.
#[derive(Ord, PartialOrd, Eq, PartialEq, Debug, Clone, Serialize, Deserialize, Abomonation)]
#[derive(Ord, PartialOrd, Eq, PartialEq, Debug, Clone, Serialize, Deserialize)]
pub enum Message<D, T, R> {
/// A batch of updates that are certain to occur.
///
Expand All @@ -32,7 +31,7 @@ pub enum Message<D, T, R> {
/// Each element of `counts` is an irrevocable statement about the exact number of
/// distinct updates that occur at that time.
/// Times not present in `counts` have a count of zero.
#[derive(Ord, PartialOrd, Eq, PartialEq, Debug, Clone, Serialize, Deserialize, Abomonation)]
#[derive(Ord, PartialOrd, Eq, PartialEq, Debug, Clone, Serialize, Deserialize)]
pub struct Progress<T> {
/// The lower bound of times contained in this statement.
pub lower: Vec<T>,
Expand Down Expand Up @@ -310,9 +309,9 @@ pub mod source {
// Step 1: The MESSAGES operator.
let mut messages_op = OperatorBuilder::new("CDCV2_Messages".to_string(), scope.clone());
let address = messages_op.operator_info().address;
let activator = scope.sync_activator_for(&address);
let activator2 = scope.activator_for(&address);
let drop_activator = DropActivator { activator: Arc::new(scope.sync_activator_for(&address)) };
let activator = scope.sync_activator_for(address.to_vec());
let activator2 = scope.activator_for(Rc::clone(&address));
let drop_activator = DropActivator { activator: Arc::new(scope.sync_activator_for(address.to_vec())) };
let mut source = source_builder(activator);
let (mut updates_out, updates) = messages_op.new_output();
let (mut progress_out, progress) = messages_op.new_output();
Expand Down Expand Up @@ -582,13 +581,13 @@ pub mod sink {
// We can simply record all updates, under the presumption that the have been consolidated
// and so any record we see is in fact guaranteed to happen.
let mut builder = OperatorBuilder::new("UpdatesWriter".to_owned(), stream.scope());
let reactivator = stream.scope().activator_for(&builder.operator_info().address);
let reactivator = stream.scope().activator_for(builder.operator_info().address);
let mut input = builder.new_input(stream, Pipeline);
let (mut updates_out, updates) = builder.new_output();

builder.build_reschedule(
move |_capability| {
let mut timestamps = ChangeBatch::new();
let mut timestamps = <ChangeBatch<_>>::new();
let mut send_queue = std::collections::VecDeque::new();
move |_frontiers| {
let mut output = updates_out.activate();
Expand Down Expand Up @@ -636,15 +635,15 @@ pub mod sink {

// We use a lower-level builder here to get access to the operator address, for rescheduling.
let mut builder = OperatorBuilder::new("ProgressWriter".to_owned(), stream.scope());
let reactivator = stream.scope().activator_for(&builder.operator_info().address);
let reactivator = stream.scope().activator_for(builder.operator_info().address);
let mut input = builder.new_input(&updates, Exchange::new(move |_| sink_hash));
let should_write = stream.scope().index() == (sink_hash as usize) % stream.scope().peers();

// We now record the numbers of updates at each timestamp between lower and upper bounds.
// Track the advancing frontier, to know when to produce utterances.
let mut frontier = Antichain::from_elem(T::minimum());
// Track accumulated counts for timestamps.
let mut timestamps = ChangeBatch::new();
let mut timestamps = <ChangeBatch<_>>::new();
// Stash for serialized data yet to send.
let mut send_queue = std::collections::VecDeque::new();
let mut retain = Vec::new();
Expand Down
3 changes: 1 addition & 2 deletions src/difference.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,6 @@ wrapping_implementation!(std::num::Wrapping<isize>);

pub use self::present::Present;
mod present {
use abomonation_derive::Abomonation;
use serde::{Deserialize, Serialize};

/// A zero-sized difference that indicates the presence of a record.
Expand All @@ -168,7 +167,7 @@ mod present {
/// The primary feature of this type is that it has zero size, which reduces the overhead
/// of differential dataflow's representations for settings where collections either do
/// not change, or for which records are only added (for example, derived facts in Datalog).
#[derive(Abomonation, Copy, Ord, PartialOrd, Eq, PartialEq, Debug, Clone, Serialize, Deserialize, Hash)]
#[derive(Copy, Ord, PartialOrd, Eq, PartialEq, Debug, Clone, Serialize, Deserialize, Hash)]
pub struct Present;

impl<T: Clone> super::Multiply<T> for Present {
Expand Down
9 changes: 2 additions & 7 deletions src/dynamic/pointstamp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,13 @@
//! (as iteration within a scope requires leaving contained scopes), and then to any number of appended
//! default coordinates (which is effectively just *setting* the coordinate).

use abomonation_derive::Abomonation;
use serde::{Deserialize, Serialize};

/// A sequence of timestamps, partially ordered by the product order.
///
/// Sequences of different lengths are compared as if extended indefinitely by `T::minimum()`.
/// Sequences are guaranteed to be "minimal", and may not end with `T::minimum()` entries.
#[derive(
Hash, Default, Clone, Eq, PartialEq, Ord, PartialOrd, Debug, Serialize, Deserialize, Abomonation,
)]
#[derive(Hash, Default, Clone, Eq, PartialEq, Ord, PartialOrd, Debug, Serialize, Deserialize)]
pub struct PointStamp<T> {
/// A sequence of timestamps corresponding to timestamps in a sequence of nested scopes.
vector: Vec<T>,
Expand Down Expand Up @@ -118,9 +115,7 @@ impl<T: Timestamp> Refines<()> for PointStamp<T> {
use timely::progress::PathSummary;

/// Describes an action on a `PointStamp`: truncation to `length` followed by `actions`.
#[derive(
Hash, Default, Clone, Eq, PartialEq, Ord, PartialOrd, Debug, Serialize, Deserialize, Abomonation
)]
#[derive(Hash, Default, Clone, Eq, PartialEq, Ord, PartialOrd, Debug, Serialize, Deserialize)]
pub struct PointStampSummary<TS> {
/// Number of leading coordinates to retain.
///
Expand Down
8 changes: 4 additions & 4 deletions src/operators/arrange/agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -302,10 +302,10 @@ where

let capabilities = Rc::new(RefCell::new(Some(CapabilitySet::new())));

let activator = scope.activator_for(&info.address[..]);
let activator = scope.activator_for(Rc::clone(&info.address));
let queue = self.new_listener(activator);

let activator = scope.activator_for(&info.address[..]);
let activator = scope.activator_for(info.address);
*shutdown_button_ref = Some(ShutdownButton::new(capabilities.clone(), activator));

capabilities.borrow_mut().as_mut().unwrap().insert(capability);
Expand Down Expand Up @@ -439,10 +439,10 @@ where

let capabilities = Rc::new(RefCell::new(Some(CapabilitySet::new())));

let activator = scope.activator_for(&info.address[..]);
let activator = scope.activator_for(Rc::clone(&info.address));
let queue = self.new_listener(activator);

let activator = scope.activator_for(&info.address[..]);
let activator = scope.activator_for(info.address);
*shutdown_button_ref = Some(ShutdownButton::new(capabilities.clone(), activator));

capabilities.borrow_mut().as_mut().unwrap().insert(capability);
Expand Down
2 changes: 1 addition & 1 deletion src/operators/arrange/arrangement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -439,7 +439,7 @@ where
// Capabilities for the lower envelope of updates in `batcher`.
let mut capabilities = Antichain::<Capability<G::Timestamp>>::new();

let activator = Some(scope.activator_for(&info.address[..]));
let activator = Some(scope.activator_for(info.address.clone()));
let mut empty_trace = Tr::new(info.clone(), logger.clone(), activator);
// If there is default exertion logic set, install it.
if let Some(exert_logic) = scope.config().get::<trace::ExertionLogic>("differential/default_exert_logic").cloned() {
Expand Down
2 changes: 1 addition & 1 deletion src/operators/arrange/upsert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ where
let mut capabilities = Antichain::<Capability<G::Timestamp>>::new();
let mut buffer = Vec::new();
// Form the trace we will both use internally and publish.
let activator = Some(stream.scope().activator_for(&info.address[..]));
let activator = Some(stream.scope().activator_for(info.address.clone()));
let mut empty_trace = Tr::new(info.clone(), logger.clone(), activator);

if let Some(exert_logic) = stream.scope().config().get::<trace::ExertionLogic>("differential/default_exert_logic").cloned() {
Expand Down
2 changes: 1 addition & 1 deletion src/operators/join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -381,7 +381,7 @@ where
// Acquire an activator to reschedule the operator when it has unfinished work.
use timely::scheduling::Activator;
let activations = arranged1.stream.scope().activations().clone();
let activator = Activator::new(&info.address[..], activations);
let activator = Activator::new(info.address, activations);

// Our initial invariants are that for each trace, physical compaction is less or equal the trace's upper bound.
// These invariants ensure that we can reference observed batch frontiers from `_start_upper` onward, as long as
Expand Down
2 changes: 1 addition & 1 deletion src/operators/reduce.rs
Original file line number Diff line number Diff line change
Expand Up @@ -338,7 +338,7 @@ where
register.get::<crate::logging::DifferentialEvent>("differential/arrange")
};

let activator = Some(trace.stream.scope().activator_for(&operator_info.address[..]));
let activator = Some(trace.stream.scope().activator_for(operator_info.address.clone()));
let mut empty = T2::new(operator_info.clone(), logger.clone(), activator);
// If there is default exert logic set, install it.
if let Some(exert_logic) = trace.stream.scope().config().get::<ExertionLogic>("differential/default_exert_logic").cloned() {
Expand Down
3 changes: 1 addition & 2 deletions src/trace/description.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@
//! will often be a logic bug, as `since` does not advance without a corresponding advance in
//! times at which data may possibly be sent.

use abomonation_derive::Abomonation;
use timely::{PartialOrder, progress::Antichain};
use serde::{Serialize, Deserialize};

Expand All @@ -66,7 +65,7 @@ use serde::{Serialize, Deserialize};
/// frontier indicates a moment at which the times were observed. If `since` is strictly in
/// advance of `lower`, the contained times may be "advanced" to times which appear equivalent to
/// any time after `since`.
#[derive(Clone, Debug, Abomonation, Serialize, Deserialize)]
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct Description<Time> {
/// lower frontier of contained updates.
lower: Antichain<Time>,
Expand Down
8 changes: 4 additions & 4 deletions src/trace/implementations/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,9 @@ pub use self::ord_neu::OrdValSpine as ValSpine;
pub use self::ord_neu::OrdKeySpine as KeySpine;

use std::borrow::{ToOwned};
use std::convert::TryInto;

use serde::{Deserialize, Serialize};

use timely::Container;
use timely::container::columnation::{Columnation, TimelyStack};
Expand Down Expand Up @@ -198,11 +201,8 @@ where
type OffsetContainer = OffsetList;
}

use std::convert::TryInto;
use abomonation_derive::Abomonation;

/// A list of unsigned integers that uses `u32` elements as long as they are small enough, and switches to `u64` once they are not.
#[derive(Eq, PartialEq, Ord, PartialOrd, Clone, Abomonation)]
#[derive(Eq, PartialEq, Ord, PartialOrd, Clone, Serialize, Deserialize)]
pub struct OffsetList {
/// Length of a prefix of zero elements.
pub zero_prefix: usize,
Expand Down
10 changes: 4 additions & 6 deletions src/trace/implementations/ord_neu.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ pub type PreferredSpine<K, V, T, R> = Spine<
mod val_batch {

use std::marker::PhantomData;
use abomonation_derive::Abomonation;
use serde::{Deserialize, Serialize};
use timely::container::PushInto;
use timely::progress::{Antichain, frontier::AntichainRef};

Expand All @@ -111,7 +111,7 @@ mod val_batch {
use super::{Layout, Update};

/// An immutable collection of update tuples, from a contiguous interval of logical times.
#[derive(Abomonation, Debug)]
#[derive(Debug, Serialize, Deserialize)]
pub struct OrdValStorage<L: Layout> {
/// An ordered list of keys, corresponding to entries in `keys_offs`.
pub keys: L::KeyContainer,
Expand Down Expand Up @@ -159,7 +159,6 @@ mod val_batch {
///
/// The `L` parameter captures how the updates should be laid out, and `C` determines which
/// merge batcher to select.
#[derive(Abomonation)]
pub struct OrdValBatch<L: Layout> {
/// The updates themselves.
pub storage: OrdValStorage<L>,
Expand Down Expand Up @@ -671,7 +670,7 @@ mod val_batch {
mod key_batch {

use std::marker::PhantomData;
use abomonation_derive::Abomonation;
use serde::{Deserialize, Serialize};
use timely::container::PushInto;
use timely::progress::{Antichain, frontier::AntichainRef};

Expand All @@ -682,7 +681,7 @@ mod key_batch {
use super::{Layout, Update};

/// An immutable collection of update tuples, from a contiguous interval of logical times.
#[derive(Abomonation, Debug)]
#[derive(Debug, Serialize, Deserialize)]
pub struct OrdKeyStorage<L: Layout> {
/// An ordered list of keys, corresponding to entries in `keys_offs`.
pub keys: L::KeyContainer,
Expand Down Expand Up @@ -720,7 +719,6 @@ mod key_batch {
///
/// The `L` parameter captures how the updates should be laid out, and `C` determines which
/// merge batcher to select.
#[derive(Abomonation)]
pub struct OrdKeyBatch<L: Layout> {
/// The updates themselves.
pub storage: OrdKeyStorage<L>,
Expand Down
11 changes: 5 additions & 6 deletions src/trace/implementations/rhh.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
use std::rc::Rc;
use std::cmp::Ordering;

use abomonation_derive::Abomonation;
use serde::{Deserialize, Serialize};
use timely::container::columnation::TimelyStack;

use crate::Hashable;
Expand Down Expand Up @@ -46,7 +46,7 @@ pub trait HashOrdered: Hashable { }
impl<'a, T: std::hash::Hash + HashOrdered> HashOrdered for &'a T { }

/// A hash-ordered wrapper that modifies `Ord` and `PartialOrd`.
#[derive(Copy, Clone, Eq, PartialEq, Debug, Abomonation, Default)]
#[derive(Copy, Clone, Eq, PartialEq, Debug, Default, Serialize, Deserialize)]
pub struct HashWrapper<T: std::hash::Hash + Hashable> {
/// The inner value, freely modifiable.
pub inner: T
Expand Down Expand Up @@ -86,7 +86,7 @@ mod val_batch {

use std::convert::TryInto;
use std::marker::PhantomData;
use abomonation_derive::Abomonation;
use serde::{Deserialize, Serialize};
use timely::container::PushInto;
use timely::progress::{Antichain, frontier::AntichainRef};

Expand All @@ -111,7 +111,7 @@ mod val_batch {
/// We will use the `Hashable` trait here, but any consistent hash function should work out ok.
/// We specifically want to use the highest bits of the result (we will) because the low bits have
/// likely been spent shuffling the data between workers (by key), and are likely low entropy.
#[derive(Abomonation)]
#[derive(Debug, Serialize, Deserialize)]
pub struct RhhValStorage<L: Layout>
where
<L::Target as Update>::Key: Default + HashOrdered,
Expand Down Expand Up @@ -250,8 +250,7 @@ mod val_batch {
///
/// The `L` parameter captures how the updates should be laid out, and `C` determines which
/// merge batcher to select.
#[derive(Abomonation)]
pub struct RhhValBatch<L: Layout>
pub struct RhhValBatch<L: Layout>
where
<L::Target as Update>::Key: Default + HashOrdered,
{
Expand Down
Loading

0 comments on commit 300c403

Please sign in to comment.