Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

+str Add combine seq method to Source and Sink #31345

Merged
merged 1 commit into from
Sep 19, 2022

Conversation

He-Pin
Copy link
Member

@He-Pin He-Pin commented Apr 14, 2022

refs: #23958

Partially fixed it, the combineMat up to 22 version may need some discussion

@He-Pin He-Pin force-pushed the combineSeq branch 2 times, most recently from 3c5c4bb to 76276a4 Compare April 15, 2022 06:06
@He-Pin He-Pin marked this pull request as draft April 15, 2022 06:06
@He-Pin He-Pin marked this pull request as ready for review April 15, 2022 06:25
@He-Pin He-Pin marked this pull request as draft April 15, 2022 06:25
@He-Pin He-Pin closed this Apr 15, 2022
@He-Pin He-Pin reopened this Apr 15, 2022
@He-Pin He-Pin force-pushed the combineSeq branch 2 times, most recently from d8db0b4 to 256406e Compare April 15, 2022 08:05
Copy link
Member

@jrudolph jrudolph left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe those help with the compilation problem?

@He-Pin He-Pin force-pushed the combineSeq branch 2 times, most recently from 4d1709a to 338d7d1 Compare May 7, 2022 03:15
@He-Pin He-Pin marked this pull request as ready for review May 7, 2022 18:15
@He-Pin He-Pin force-pushed the combineSeq branch 6 times, most recently from 6c1c031 to 28d316b Compare May 7, 2022 19:07
@He-Pin He-Pin requested a review from jrudolph May 9, 2022 08:20
Copy link
Member

@raboof raboof left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Starting to look quite useful!

akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala Outdated Show resolved Hide resolved
akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala Outdated Show resolved Hide resolved
@johanandren
Copy link
Member

@He-Pin any chance you'll have time to continue this PR any time soon?

@He-Pin He-Pin force-pushed the combineSeq branch 4 times, most recently from 1678713 to ae1e646 Compare August 27, 2022 16:19
@He-Pin He-Pin marked this pull request as draft August 27, 2022 16:58
@He-Pin He-Pin force-pushed the combineSeq branch 4 times, most recently from f0fe84e to 9b2ead7 Compare September 11, 2022 15:58
@He-Pin He-Pin marked this pull request as ready for review September 11, 2022 15:59
@@ -370,6 +371,29 @@ object Sink {
new Sink(scaladsl.Sink.combine(output1.asScala, output2.asScala, seq: _*)(num => strategy.apply(num)))
}

/**
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

image

@He-Pin He-Pin requested review from patriknw and removed request for jrudolph September 11, 2022 16:04
@He-Pin
Copy link
Member Author

He-Pin commented Sep 11, 2022

I have update the PR to address comments, one issue is that the old style will pass out the size with strategy, and now it's not but clear.
The only issue is they don't alias and can not write Merge(_) style code.
The Source.combineMat is using strategy: Int => Graph[UniformFanInShape[T, U], NotUsed].

@patriknw @raboof @johanandren I think this is ready, and maybe need to go with strategy: Int => ... too

* Combines several sources with fan-in strategy like [[Merge]] or [[Concat]] into a single [[Source]].
*/
def combine[T, U, M](sources: immutable.Seq[Graph[SourceShape[T], M]])(
fanInGraph: Graph[UniformFanInShape[T, U], NotUsed]): Source[U, immutable.Seq[M]] =
Copy link
Member

@patriknw patriknw Sep 16, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this could still be named something with strategy. fanInStrategy?

@johanandren note that I suggested this strategy parameter to not be like the other existing combine

strategy: Int => Graph[UniformFanOutShape[T, U], NotUsed]

Do you know why the original API was like that? The Int is just the size of sources. Was it only for convenience to be able to write Merge(_)?

Should we keep it the same even if it seems weird to me?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't remember any rationale but I'd guess the same as you, to be able to do Merge(_) and Concat(_), kinda superfluous because you know the size, but OTOH possible to get wrong if it is not fed to the strategy but repeated by user.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@He-Pin sorry for the extra work, but could you change back to the old strategy: Int => so that we have a consistent api for these and what already exists.

strategy: function.Function[java.lang.Integer, Graph[UniformFanOutShape[T, U], NotUsed]]): Sink[T, NotUsed] = {
@nowarn
@deprecatedName(Symbol("strategy"))
fanOutStrategy: function.Function[java.lang.Integer, Graph[UniformFanOutShape[T, U], NotUsed]])
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

changed to the fanOutStrategy

strategy: Int => Graph[UniformFanOutShape[T, U], NotUsed]): Sink[T, NotUsed] =
@nowarn
@deprecatedName(Symbol("strategy"))
fanOutStrategy: Int => Graph[UniformFanOutShape[T, U], NotUsed]): Sink[T, NotUsed] =
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

renamed to fanOutStrategy

@He-Pin
Copy link
Member Author

He-Pin commented Sep 16, 2022

@patriknw @johanandren I have updated this PR.

Copy link
Member

@patriknw patriknw left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

Copy link
Member

@johanandren johanandren left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, thanks!

@johanandren johanandren merged commit 7f9e27c into akka:main Sep 19, 2022
@johanandren johanandren added this to the 2.7.0-M2 milestone Sep 19, 2022
@patriknw patriknw modified the milestones: 2.7.0-M2, 2.7.0 Oct 19, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants