Skip to content
This repository has been archived by the owner on Jul 3, 2023. It is now read-only.

Z-set revamp #64

Merged
merged 20 commits into from
Jun 6, 2022
Merged

Z-set revamp #64

merged 20 commits into from
Jun 6, 2022

Conversation

ryzhyk
Copy link
Collaborator

@ryzhyk ryzhyk commented May 31, 2022

Major re-work of the Z-set implementation, API and operators based on it. There are many todos, which I will document as issues. I broke it up into commits to simplify reviewing.

ryzhyk added 16 commits May 24, 2022 09:24
Conditional formatting didn't seem to work correctly, causing git to
always skip validation step.
This starts a complete revamp of the Z-set API and implementation.  This
work involved many iterations, so the actual commit history is not
helpful.  Instead, we remove the current implementation of Z-sets and
indexed Z-sets and all operators and tests that work with them
and will add the new implementation piece-by-piece in subsequent commits.
Refine the `NumEntries` trait: instead of a single `num_entries` method,
it now provides `num_entries_shallow`, which returns the number of
top-level entries and is expected to be cheap (O(1)), and
`num_entries_deep`, which computes the number of entries recursively.
The latter can be too expensive to use in the fast path and is mostly
useful for profiling purposes.
So far, the notion of time in DBSP has been implicit: every iteration of
a circuit corresponds to a clock tick; nested circuits induce
hierarchical clock structure.  However, we haven't had a need to
represent the value of the clock explicitly.  This is about to change
with the new trace-based representation of Z-sets.  In a trace, each
(key, value, weight) tuple in a Z-set is labelled by the time when it
was introduced.

We could use a single fixed type to represent DBSP time, e.g.,
`Vec<usize>`, where the first component of the vector is the root
circuit's clock, the second component is level-1 nested circuit, etc.
This is because DBSP clocks are simple counters that start at 0 on
each clock reset and increment by 1 on each clock tick.  This is in
contrast to, e.g., timely dataflow, where times can come from any
partially ordered set.  However, the `Vec<usize>` representation is
wasteful in most cases, not just because it uses heap allocation, but
more importantly because we usually don't need full 64 bits to store
each clock dimension.  For instance, some of the operators that will be
introduced in future commits need to distinguish between values added
during the current epoch (i.e., the current _parent_ clock cycle) and
any previous epoch.  For this we can use a 1-bit time representation to
distinguish old and new values.  In addition, in most applications the
nested clock can never perform a very large number of iterations and
can be represented using 32 or 16 bits.

To enable these optimizations, we allow different time representations
as long as they implement the new `Timestamp` traits.  Similar to timely
dataflow, timestamps are partially ordered; moreover, similar to
differential dataflow they form a lattice.  In addition, the `Timestamp`
trait allows incrementing and decrementing individual dimensions of the
clock.

Limitations:

- We do not currently model clock overflow.
- It is currently up to each operator to track current time.  In the
  future, this will be the job of the circuit, so we don't need to
  duplicate this functionality across operators.
Z-sets and indexed Z-sets are key DBSP abstractions, whose performance
largely determines the speed and memory footprint of a DBSP circuit.
The original implementation based on hashmaps, despite reasonable
asymptotic complexity, proved highly inefficient.  The new design
introduced here borrows ideas and implementation from differential
dataflow.  It is optimized for the specific operations that
we perform on Z-sets, namely:

- Given two (indexed) Z-sets iterate over matching keys and apply some operation
  to associated (value, time, weight) tuples
- Merge two Z-sets

In addition, it allows space-efficient representation of the history of
a Z-set.  More on this below.

The new representation is based on two abstractions:

- Batch - an immutable collection of (key, value, time, weight) tuples.
  Ignoring the time component for now, a batch represents an indexed Z-set.
  In the special case where the value type is `()`, this is a regular
  Z-set.  (Side note: in this representation Z-sets are a special case of
  indexed, not the other way around).  The read interface of a batch
  (`BatchReader`) exposes a `Cursor` that allows the caller to iterate
  over keys in the batch; for each key it iterates over associated
  values, and for each value -- over (time, weight) pairs.

  The write interface allows building a batch from either ordered or
  unodered collection of tuples.

  Internally, the `Batch` trait is implemented by keeping all keys in an
  ordered array.  Each entry in the array points to a sorted range of values,
  stored in another array.  Each entry in the second array points to
  a range of ordered (time, weight) tuples in the third array.

  This is the most general batch representation.  Fewer nested arrays
  are needed when time, value, or both are missing.  E.g, if both
  time and value are missing, a batch can be implemented by an ordered
  array of (key, weight) tuples.

- Trace.  Batches allow efficient scanning by relying on ordered arrays.
  The flip side of this representation is that modifying a batch requires
  copying its entire contents -- not something one wants to do for each
  small update.  This is why batches are considered immutable.  Mutable
  collections are represented by traces.  A trace is simply a set of
  batches.  Similar to a single batch, a trace exposes a cursor to
  iterate over (key, value, time, weight) tuples, which internally
  merges individual batch cursors.  One modifies a trace by simply
  pushing another batch to it.  Internally, the trace implementation
  merges batches to bound memory and speed overhead due to storing
  multiple copies of the same key in multiple tuples.  Merging follows
  these rules:
  1. Only merge tuples of comparable sizes.
  2. The total number and sizes of batches are limited so that the total
     memory overhead does not exceed 2x.

  In addition, it is possible to perform proactive merging during slack
  time. Given sufficient slack time, the trace eventually gets compressed
  into a single batch.  We do not take advantage of this yet.

By including timestamps in each tuple in a trace, we can use traces to
represent not just the current contents of a Z-set, but its entire
history.  This allows further reducing CPU and memory usage.  Consider,
for example, the implementation of nested incremental join, distinct,
and aggregate operators.  These operators integrate their input streams
in several ways:

- lifted_integral - the sum of all updates of the Z-set produced during
  the current epoch

- integral - the sum of all updates across all epochs during specific
  iteration of the inner circuit.

- integral(lifted_integral(zset)) - the sum of all updates across all
  epochs.

Instead of computing and storing each of these integrals proactively, we
can extract then on-demand from a trace.  To this end, we label each
batch added to the trace with the current timestamp, e.g., an
(epoch, time) tuple.  When scanning the trace, we can compute
lifted_integral by adding up all weights with the given `time` value.
We compute `integral` by adding all weights for the current `epoch`.
We compute `integral(lifted_integral)` by addingup all weights in the
trace.

Thus, we save memory by deduplicating keys, at the cost of storing
times explicitly.  We save CPU by not computing the three
integrals dynamically, at the cost of scanning the (time, weight)
trace associated with each key on each operation involving this key.

------------------

This commit adds the implementation of batches and traces, copied from
the differential dataflow repository with some changes.  First, we add
two new batch implementations specialized for unit timestamps, which
represent simple Z-sets and indexed Z-sets.

Second, we add the `recede_to` operation that pushes all timestamps `t`
that are not less than or equal to a frontier back in time.  As a result,
the trace can no longer distinguish between timestamps that map to the
same value, but it will contain fewer different timestamps, thus
reducing its memory footprint.  This trick enables the 1-bit epoch
representation described in the previous commit.  See `Trace::recede_to`
documentation for more details.
Re-enable test that depend on Z-sets, fixing it up for the new API.
This commit implements the `Circuit::fixedpoint` API and related
infrastructure.  This method creates a child circuit that iterates
until reaching a fixed point, i.e., a state where the outputs of all
operators are guaranteed to remain the same, should the nested clock
continue ticking.

The fixed point check is implemented by checking the following
condition:

* All operators in the circuit are in such a state that, if their inputs
  remain constant (i.e., all future inputs are identical to the last
  input), then their outputs remain constant too.

This is a necessary and sufficient condition that is also easy to check
by asking each operator if it is in a stable state via the new
`Operator::fixedpoint` API.  However, the cost of checking this
condition precisely can be high for some operators.  For instance, delay
operators `Z1` and `Z1Nested) require storing the last two versions of
the state instead of one and comparing them at each cycle.  Such
operators instead implement imprecise conservative checks, e.g., check
for a _specific_ fixed point, e.g., a fixed point where both input and
output of the operator is zero (or empty).  As a result, the circuit may
fail to detect other fixed points and will iterate forever.  The goal is
to evolve the design so that circuits created using the high-level API
(`Stream::xxx` methods) implement accurate fixed point checks.

This commit also adds an unrelated change: the
`StrictOperator::get_final_output` method.  This method extracts the
last output of the operator before the end of the current clock epoch in
order to send it to the parent circuit.  In the past, we simply invoked
the `get_output` method, but that way the operator cannot optimize for
the case when this is the last call before `clock_end`  (e.g., return an
owned value where it would have to clone otherwise).
The new operator assembles batches in a nested stream in a trace
with NestedTimestamp32 timestamp type (we may want to generalize it in
the future).  This will be used internally to implement join, distinct,
and aggregate operators.
Add implementations of filter, index, and map operators based on the new
Z-set API.
Implementation of `aggregate` based on the new Z-set API (no linear
version of aggregate yet).
Implementation of `distinct` based on the new Z-set API.  C comments in
the source code for detailed design.
`consolidate` operator consilidates all updates in a trace into a single
batch.  This operator is typically attached to the output of a nested circuit
computed as the sum of deltas across all iterations of the circuit.  Once the
iteration has converged (e.g., reaching a fixed point) is a good time to
consolidate the output.
Join operator implementation based on the new Z-set API.  See comments
in the source code for detailed design.
Fixup the galen benchmark to work with the new API.
Enabled disabled test in `condition.rs`.
@ryzhyk ryzhyk requested review from mihaibudiu and Kixiron May 31, 2022 00:07
Copy link

@mihaibudiu mihaibudiu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will continue reviewing later, did 1/3 of the files so far.

benches/galen.rs Show resolved Hide resolved

// TODO: allow arbitrary `Time` types?
/// An indexed Z-set maps arbitrary keys to Z-set values.
pub trait IndexedZSet:

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No methods at all?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nope, just a specialization of Batch. What do you reckon is missing?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the future we can probably turn it into a trait alias but they aren't stable yet so it doesn't really matter, you may want to make a default implementation for it though

src/algebra/zset/zset_macro.rs Show resolved Hide resolved
@@ -1,10 +1,3 @@
/*

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why don't we need licenses?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a license in the top directory. I believe that's enough.

fn fixedpoint(&self) -> bool {
self.nodes.iter().all(|node| {
node.fixedpoint()
/*if !res {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

delete comment?
this looks like a strange definition of a fixedpoint, does this assume all nodes in a circuit are in the same "loop"?

src/lattice.rs Outdated Show resolved Hide resolved
src/lattice.rs Outdated Show resolved Hide resolved
@@ -47,6 +47,11 @@ where

fn clock_start(&mut self, _scope: Scope) {}
fn clock_end(&mut self, _scope: Scope) {}
fn fixedpoint(&self) -> bool {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

isn't apply a lifted function, and thus always has a fixpoint property?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FnMut means that it can have state, and so may not be lifter.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we should have two different versions of Apply then.

/// Z-sets](`crate::algebra::IndexedZSet`). The aggregation function
/// `agg_func` takes a single key and the set of (value, weight)
/// tuples associated with this key and transforms them into a single
/// aggregate value. The output of the operator is a Z-set computed as

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will we have aggregates that do not produce zsets too?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I really like the makezset function from the paper.

}

/*

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not just delete this?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This stuff no longer works, but we need to re-implement it at some point. I'll create an issue.

Copy link

@mihaibudiu mihaibudiu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I read 1/3 more of the files, but I have the remaining big ones left.

/// in the first clock cycle.
pub struct CsvSource<R, C, T> {
/// in the first clock cycle as a Z-set with unit weights.
pub struct CsvSource<R, T, W, C> {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am wondering whether it isn't better to have this actually return tuples and use the makezset operator afterwards. This is one weakness of DD, that everything is a zset, but it doesn't have to be in DBSP.
If that's inefficient you can have separate methods to yield tuples and zsets.

}

self.time += 1;
C::from_tuples((), data)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

isn't there a way to do this without a vector, just using an iterator?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The current Batcher API will actually reuse the heap allocation behind this vector, so this is deliberate, at least in the DD design.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You should just be able to use

let data: Vec<_> = self.reader
    .deserialize()
    .map(|x| ((x.unwrap(), ()), W::one()))
    .collect();

Which should be the same, but a bit nicer


// We can use Builder because cursor yields ordered values. This
// is a nice property of the filter operation.
let mut builder = CO::Builder::with_capacity((), i.len());

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does this cause waste if the filter throws out many elements?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It does actually, since these buffers can make it all the way to the output batch. This is probably ok, because the batch will either get freed at the end of the current clock tick or get added to the trace, where it will likely get merged with other batches soon, at which point the waste is gone.

I'll add a comment, and we'll keep an eye on this in profiling results.

src/test/mod.rs Outdated Show resolved Hide resolved
src/time.rs Outdated Show resolved Hide resolved
@ryzhyk
Copy link
Collaborator Author

ryzhyk commented Jun 1, 2022

@mbudiu-vmw, thanks for the review! Are you done reviewing this?

@mihaibudiu
Copy link

@mbudiu-vmw, thanks for the review! Are you done reviewing this?

Not even close

Copy link
Contributor

@Kixiron Kixiron left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks mostly ok aside from some mostly mundane stuff. There's a few big soundness issues I want addressed though, those are pretty concerning

Cargo.toml Show resolved Hide resolved
benches/galen.rs Outdated Show resolved Hide resolved

// TODO: allow arbitrary `Time` types?
/// An indexed Z-set maps arbitrary keys to Z-set values.
pub trait IndexedZSet:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the future we can probably turn it into a trait alias but they aren't stable yet so it doesn't really matter, you may want to make a default implementation for it though

src/algebra/zset/mod.rs Show resolved Hide resolved
src/algebra/zset/zset_macro.rs Outdated Show resolved Hide resolved
Comment on lines +340 to +342
while !list1.is_empty() {
output.push(list1.pop());
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ditto on the extend thing

Comment on lines +344 to +353
if !head2.is_empty() {
let mut result = self.empty();
for _ in 0..head2.len() {
result.push(head2.pop());
}
output.push(result);
}
while !list2.is_empty() {
output.push(list2.pop());
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

More extend, more pre-alloc

Comment on lines +361 to +368
for alloc in self.queue.iter() {
for v in alloc.iter() {
result += v.len();
}
}
for v in self.stash.iter() {
result += v.len();
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can be .sum() calls on iterators

src/trace/ord/key_batch.rs Show resolved Hide resolved
src/trace/ord/key_batch.rs Outdated Show resolved Hide resolved
@ryzhyk
Copy link
Collaborator Author

ryzhyk commented Jun 3, 2022

@Kixiron , thanks for the review, I implemented most suggestions. I skipped some suggestions in modules that I stole from DD, since that code is known to work well, and we don't have a way to thoroughly test and benchmark it.

@Kixiron
Copy link
Contributor

Kixiron commented Jun 3, 2022

If anything that's indication to me that we first need to thoroughly test that code before we integrate it

@ryzhyk
Copy link
Collaborator Author

ryzhyk commented Jun 3, 2022

If anything that's indication to me that we first need to thoroughly test that code before we integrate it

We built a whole system to test it, it's called DDlog ;)

Nah, just kidding, tests would be great. Let me at least add an issue for this.

@Kixiron
Copy link
Contributor

Kixiron commented Jun 3, 2022

Btw the paths benchmark is broken

The monitor used to visualize strict operators by only visualizing
the output half of the input/output node pair.  Since most strict operators
send out their entire state after evaluating the output node (`get_output()`),
the node summary obtained afrer evaluating the output node showed
0 entries and 0 bytes, which is not very useful for profiling.  We
therefore switch to visualizing the input node instead.
@ryzhyk ryzhyk merged commit d82bc2a into vmware-archive:main Jun 6, 2022
@ryzhyk ryzhyk deleted the zset_revamp branch June 6, 2022 16:40
Copy link

@mihaibudiu mihaibudiu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I read 10 more files and left some comments.
I still have 12 more to go.

@@ -47,6 +47,11 @@ where

fn clock_start(&mut self, _scope: Scope) {}
fn clock_end(&mut self, _scope: Scope) {}
fn fixedpoint(&self) -> bool {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we should have two different versions of Apply then.

Z::R: ZRingValue,
T: TraceReader<Key = Z::Key, Val = (), Time = NestedTimestamp32, R = Z::R> + 'static,
{
// Evaluate nested incremental distinct for a single value.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be nice to add this derivation to the long paper.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll create an issue for this. I also have a new implementation of join (next PR) that works for arbitrarily nested circuits, or so I think. We should document and prove its correctness.

Also, for some reason, I cannot respond inline to this comment, so answering here:

Maybe we should have two different versions of Apply then.

I agree, but prefer to wait for a use case.

fn eval_owned(&mut self, delta: Z, delayed_integral: Z) -> Z {
self.eval_owned_and_ref(delta, &delayed_integral)
fn summary(&self, summary: &mut String) {
let size: usize = self.future_updates.iter().map(|vals| vals.len()).sum();

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would be nice to write something in the long paper about "future_updates" too.

// need to worry about growing `future_updates` later on.
let mut new_len: u32 = self.time + 1;
trace.map_batches(|batch| {
for ts in batch.upper().elements().iter() {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

any reason to maintain the max incrementally?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not really incremental. I just scan all the batches searching for the largest timestamp and make sure I have enough room in the array. Since we generally don't have an upper bound of the number of iterations, we can't allocate a big enough array at initialization time.

let cand_val = candidate.unwrap();
let k = delta_cursor.key(delta);
let w = delta_cursor.weight(delta);
match k.cmp(cand_val) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it worth pulling this pattern into a "merge" iterator over two collections which takes 3 FnMut arguments (one for left, one for right, one for both?)

if self.key_order == Ordering::Greater
|| (self.key_order == Ordering::Equal && self.val_order != Ordering::Less)
{
self.cursor2.map_times(&storage.1, |t, d| logic(t, d));

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

both the if bodies can be executed

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, that's what we want here

};
}

// value methods

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

these comments are not useful

Ordering::Equal => {
self.cursor1.seek_val(&storage.0, val);
self.cursor2.seek_val(&storage.1, val);
self.val_order = match (

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if seek returns a bool then the valid call is no longer necessary

* valid. */
}

impl<K, V, T, R, C1, C2> Cursor<K, V, T, R> for CursorPair<C1, C2>

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no "new" method?

/// the indices of cursors with the minimum key and minimum value. It performs
/// no clever management of these sets otherwise.
#[derive(Debug)]
pub struct CursorList<K, V, T, R, C: Cursor<K, V, T, R>> {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So the cursor_pair is more efficient for two cursors?

ryzhyk added a commit that referenced this pull request Jun 17, 2022
Copy link

@mihaibudiu mihaibudiu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have 5 more files to read.


/// Describes an interval of partially ordered times.
#[derive(Clone, Debug)]
pub struct Description<Time> {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't like this name, it suggests something for human consumption.
How about TimeBounds?

/// Returns a new description from its component parts.
pub fn new(lower: Antichain<Time>, upper: Antichain<Time>) -> Self {
assert!(!lower.elements().is_empty()); // this should always be true.
// assert!(upper.len() > 0); // this may not always be true.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

align to the left with the code, not the comment?

@@ -0,0 +1,63 @@
//! Implementations of `Trace` and associated traits.

// The following is a historical comment by @frankmcsherry. It no longer describes

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would suggest pruning it to reflect only what you took.

@@ -0,0 +1,476 @@
//! Traits and datastructures representing a collection trace.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this could have a better name than "trace".
In fact a Trace is just a representation of a Stream<ZSet<K,V>>.
So that's what I would call it: ZSetStream, or perhaps OrdZSetStream.


pub mod ordered;
pub mod ordered_leaf;
// pub mod hashed;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we delete these?

<O as TryFrom<usize>>::Error: Debug,
<O as TryInto<usize>>::Error: Debug,
{
/// Where all the dataz is.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it this a joke or a typo?


use deepsize::DeepSizeOf;

/// An immutable collection of update tuples, from a contiguous interval of

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what does contiguous mean for lattice times?


if lower < upper {
self.layer.keys.swap(write_position, i);
// batch.layer.offs updated via `dedup` below; keeps me sane.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you mean, keeps the code simple?

// Leonid: we do not require batch bounds to grow monotonically.
//assert!(batch1.upper() == batch2.lower());

// Leonid: we do not require batch bounds to grow monotonically.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you already said this

let starting_updates = self.result.vals.vals.len();
let mut effort = 0isize;

// while both mergees are still active

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typo

Copy link

@mihaibudiu mihaibudiu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now I am done.

pub fn pop(&mut self) -> T {
debug_assert!(self.head < self.tail);
self.head += 1;
unsafe { ::std::ptr::read(self.list.as_mut_ptr().offset((self.head as isize) - 1)) }

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there are unsafe to avoid bound checks?

tail,
}
}
// could leak, if self.head != self.tail.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't really understand this comment. perhaps it should be inside the function, to justify the assert.

}

#[inline]
pub fn empty(&mut self) -> Vec<(D, R)> {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this function has a strange name.

}
}

/// Describes the state of a layer.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would have placed these first in the file, make top-to-bottom reading easier.

}
}

(CursorList::new(cursors, &storage), storage)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in fact it looks to me like this list is never longer than 2.

}

/// Extract the merge state, typically temporarily.
fn take(&mut self) -> Self {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this function has a strange comment and name

let variant = match (batch1, batch2) {
(Some(batch1), Some(batch2)) => {
// Leonid: we do not require batch bounds to grow monotonically.
//assert!(batch1.upper() == batch2.lower());

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so this does not break anything?

@@ -0,0 +1,841 @@
//! An append-only collection of update batches.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so does dbsp also this delayed work?

{
/// Where all the dataz is.
pub layer: OrderedLeaf<K, R>,
pub desc: Description<()>,

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is necessary to fulfill some traits?

<O as TryInto<usize>>::Error: Debug,
{
/// Where all the dataz is.
pub layer: OrderedLayer<K, OrderedLeaf<V, R>, O>,

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this logically OrederedLayer<K, OrderedZSet<V, R>, O>?

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants