Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MergePrioritized graph stage #3113

Merged
merged 9 commits into from
Sep 25, 2017

Conversation

alexvaluyskiy
Copy link
Contributor

Math.Round(ones / twos).Should().Be(2);
}

private RunnableGraph<T> ThreeSourceMerge<T>(Source<T, NotUsed> source1, Source<T, NotUsed> source2,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Horusiath @Silv3rcircl3 I'm not sure that I've converted it properly. The original source looks like

  private def threeSourceMerge[T](source1: Source[T, NotUsed], source2: Source[T, NotUsed], source3: Source[T, NotUsed], priorities: Seq[Int], probe: ManualProbe[T]) = {
    RunnableGraph.fromGraph(GraphDSL.create(source1, source2, source3)((_, _, _)) { implicit b  (s1, s2, s3) 
      val merge = b.add(MergePrioritized[T](priorities))
      s1.out ~> merge.in(0)
      s2.out ~> merge.in(1)
      s3.out ~> merge.in(2)
      merge.out ~> Sink.fromSubscriber(probe)
      ClosedShape
    })
  }

builder.From(s2.Outlet).To(merge.In(1));
builder.From(s3.Outlet).To(merge.In(2));

builder.From(merge.Out).To(sink);
Copy link
Contributor Author

@alexvaluyskiy alexvaluyskiy Sep 17, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Scala code uses just Sink.FromSubscriber(probe), but it does not compile on C#.

/// Create a new <see cref="T:Akka.Streams.Dsl.MergePrioritized`1" /> with specified number of input ports.
/// </summary>
/// <param name="priorities">Priorities of the input ports</param>
public MergePrioritized(IList<int> priorities)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should I change it to IEnumerable<int>? Originally it was Seq[Int]

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would say yes, no need for a list since no add/remove operations are needed.

private RunnableGraph<T> ThreeSourceMerge<T>(Source<T, NotUsed> source1, Source<T, NotUsed> source2,
Source<T, NotUsed> source3, List<int> priorities, TestSubscriber.ManualProbe<T> probe)
{
return RunnableGraph.FromGraph(GraphDsl.Create(source1, source2, source3, (p1, p2, p3) => default(T), (builder, s1, s2, s3) =>
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I should change (p1, p2, p3) => default(T) to Tuple.Create

/// <param name="ops">TBD</param>
/// <param name="sink">TBD</param>
/// <returns>TBD</returns>
public static GraphDsl.Builder<TMat> To<TIn, TOut, TMat>(this GraphDsl.ForwardOps<TOut, TMat> ops, IGraph<SinkShape<TIn>, TMat> sink)
public static GraphDsl.Builder<TMat> To<TIn, TOut, TMat, TMat2>(this GraphDsl.ForwardOps<TOut, TMat> ops, IGraph<SinkShape<TIn>, TMat2> sink)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Scala DSL does not care about the materializer value of the sink. We should do the same

@alexvaluyskiy alexvaluyskiy changed the title [WIP] MergePrioritized graph stage MergePrioritized graph stage Sep 24, 2017
@@ -424,6 +424,188 @@ public MergePreferred(int secondaryPorts, bool eagerClose = false)
}

/// <summary>
/// Merge several streams, taking elements as they arrive from input streams
/// (picking from prioritized once when several have elements ready).
/// A <see cref="T:Akka.Streams.Dsl.MergePrioritized`1" /> has one <see cref="P:Akka.Streams.Dsl.MergePrioritized`1.Out" /> port, one or more input port with their priorities.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this a new format ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you mean?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A <see cref="T:Akka.Streams.Dsl.MergePrioritized`1" /> the only format I know is <see cref="GraphStage{TShape}"/>

/// </summary>
public sealed class MergePrioritized<T> : GraphStage<UniformFanInShape<T, T>>
{
public IList<int> Priorities { get; }
Copy link
Contributor

@marcpiechura marcpiechura Sep 24, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wouldn't expose IList, one could add / remove priorities afterwards

public MergePrioritizedLogic(MergePrioritized<T> stage) : base(stage.Shape)
{
_stage = stage;
allBuffers = new List<FixedSizeBuffer<Inlet<T>>>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wouldn't a Array be better since we know the size?

/// <param name="eagerComplete">If true, the merge will complete as soon as one of its inputs completes</param>
/// <exception cref="ArgumentException">
/// This exception is thrown when the specified <paramref name="priorities"/> is less or equal zero.
/// </exception>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Exception should be also visible on the other constructor or maybe set eagerComplete to false as default and use single constructor


public override UniformFanInShape<T, T> Shape { get; }

public List<Inlet<T>> In { get; }
Copy link
Contributor

@marcpiechura marcpiechura Sep 24, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should be read only list

@marcpiechura
Copy link
Contributor

API approval is missing

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants