Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding Combination Operators Zip, CombineLatest, WithLatestFrom #330

Open
wants to merge 8 commits into
base: branch-23.07
Choose a base branch
from

Conversation

mdemoret-nv
Copy link
Contributor

Description

This adds a few new operators for combining multiple streams into one. For example, the code below illustrates using the Zip operator

auto source1 = std::make_shared<node::TestSource<int>>();
auto source2 = std::make_shared<node::TestSource<float>>();

auto zip = std::make_shared<node::Zip<int, float>>();

auto sink = std::make_shared<node::TestSink<std::tuple<int, float>>>();

mrc::make_edge(*source1, *zip->get_sink<0>());
mrc::make_edge(*source2, *zip->get_sink<1>());
mrc::make_edge(*zip, *sink);

source1->run();
source2->run();

sink->run();

EXPECT_EQ(sink->get_values(),
            (std::vector<std::tuple<int, float>>{
                std::tuple<int, float>{0, 0},
                std::tuple<int, float>{1, 1},
                std::tuple<int, float>{2, 2},
            }));

Checklist

  • I am familiar with the Contributing Guidelines.
  • New or existing tests cover these changes.
  • The documentation is up to date with these changes.

@mdemoret-nv mdemoret-nv added non-breaking Non-breaking change feature request New feature or request labels Jun 3, 2023
@mdemoret-nv mdemoret-nv requested review from a team as code owners June 3, 2023 00:12
Copy link
Contributor

@cwharris cwharris left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Approved, but I'm not familiar enough with most of the MRC aspects yet to give a thorough review.

@@ -824,6 +1012,182 @@ TEST_F(TestEdges, CombineLatest)
source2->run();

sink->run();

EXPECT_EQ(sink->get_values(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test doesn't account for all 5 pairs/values that could be yielded in a real scenario where the elements are interleaved, it only accounts for the scenario where source1 is run entirely before source2, meaning we don't get anything except the 2's for source1. Is it worth testing more thoroughly than this?

source1->run();

// Should throw when pushing last value
EXPECT_THROW(source2->run(), exceptions::MrcRuntimeError);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

question: Is this going to cause us to throw when we try to shutdown the pipeline? Is that something we want?


sink->run();

EXPECT_EQ(sink->get_values(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

question: This differs from the previous test where we throw an exception. Why is that? Is it because we treat sink 0 and sink 1 differently in terms of the number of elements they are allowed to accept?

Comment on lines +1112 to +1136
// Push 2 from each
source2->push(2);
source1->push(2);
source3->push(2);

// Push 2 from each
source2->push(2);
source1->push(2);
source3->push(2);

// Push the rest
source3->run();
source1->run();
source2->run();

sink->run();

EXPECT_EQ(sink->get_values(),
(std::vector<std::tuple<int, float, std::string>>{
std::tuple<int, float, std::string>{0, 1, "a"},
std::tuple<int, float, std::string>{1, 1, "a"},
std::tuple<int, float, std::string>{2, 3, "b"},
std::tuple<int, float, std::string>{3, 3, "b"},
std::tuple<int, float, std::string>{4, 3, "e"},
}));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remark: the only thing I'm fuzzy on is whether 0, 1, a should be in the output, since 0 _ _ was emitted before we had both _ 1 _ and _ _ a.
question: are we suppose to remember and emit for all values of the source observable, or just the ones that occur after we've received at least one from the others?

template <size_t N>
channel::Status set_upstream_value(NthTypeOf<N, TypesT...> value)
{
std::unique_lock<decltype(m_mutex)> lock(m_mutex);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
std::unique_lock<decltype(m_mutex)> lock(m_mutex);
auto lock = std::unique_lock(m_mutex);


void edge_complete()
{
std::unique_lock<decltype(m_mutex)> lock(m_mutex);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
std::unique_lock<decltype(m_mutex)> lock(m_mutex);
auto lock = std::unique_lock(m_mutex);

@mdemoret-nv mdemoret-nv requested a review from a team as a code owner May 1, 2024 00:02
Copy link

codecov bot commented May 1, 2024

Codecov Report

Attention: Patch coverage is 86.56716% with 18 lines in your changes are missing coverage. Please review.

❗ No coverage uploaded for pull request base (branch-23.07@5f62b46). Click here to learn what that means.
Report is 7 commits behind head on branch-23.07.

❗ Current head 7cbfe8e differs from pull request most recent head 5c52d5c. Consider uploading reports for the commit 5c52d5c to get more accurate results

Additional details and impacted files

Impacted file tree graph

@@               Coverage Diff               @@
##             branch-23.07     #330   +/-   ##
===============================================
  Coverage                ?   73.26%           
===============================================
  Files                   ?      386           
  Lines                   ?    13533           
  Branches                ?     1024           
===============================================
  Hits                    ?     9915           
  Misses                  ?     3618           
  Partials                ?        0           
Flag Coverage Δ
cpp 69.25% <86.56%> (?)
py 42.06% <100.00%> (?)

Flags with carried forward coverage won't be shown. Click here to find out more.

Files Coverage Δ
cpp/mrc/include/mrc/edge/edge_channel.hpp 85.00% <100.00%> (ø)
.../mrc/include/mrc/node/operators/combine_latest.hpp 100.00% <100.00%> (ø)
cpp/mrc/include/mrc/node/sink_channel_owner.hpp 100.00% <100.00%> (ø)
cpp/mrc/include/mrc/node/source_channel_owner.hpp 90.90% <100.00%> (ø)
cpp/mrc/include/mrc/utils/tuple_utils.hpp 100.00% <100.00%> (ø)
cpp/mrc/include/mrc/node/operators/zip.hpp 98.21% <98.21%> (ø)
...rc/include/mrc/node/operators/with_latest_from.hpp 95.65% <95.65%> (ø)
cpp/mrc/include/mrc/channel/status.hpp 0.00% <0.00%> (ø)

Continue to review full report in Codecov by Sentry.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update 5f62b46...5c52d5c. Read the comment docs.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
feature request New feature or request non-breaking Non-breaking change
Projects
Status: Review - Approved
Development

Successfully merging this pull request may close these issues.

3 participants