Skip to content
This repository has been archived by the owner on Mar 16, 2022. It is now read-only.

Commit

Permalink
Added another MergeSequence test
Browse files Browse the repository at this point in the history
  • Loading branch information
jroper committed Jun 18, 2020
1 parent d30117d commit 324882c
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 8 deletions.
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package io.cloudstate.proxy.streams

import akka.stream.{Attributes, Inlet, Outlet, UniformFanInShape}
import akka.NotUsed
import akka.stream.{Attributes, Graph, Inlet, Outlet, UniformFanInShape}
import akka.stream.stage.{GraphStage, GraphStageLogic, InHandler, OutHandler}

import scala.collection.mutable
Expand All @@ -10,9 +11,11 @@ object MergeSequence {
private case class Pushed[T](in: Inlet[T], sequence: Long, elem: T)

private implicit def ordering[T]: Ordering[Pushed[T]] = Ordering.by[Pushed[T], Long](_.sequence).reverse

def apply[T](inputPorts: Int = 2)(extractSequence: T => Long): Graph[UniformFanInShape[T, T], NotUsed] =
}

class MergeSequence[T](inputPorts: Int)(extractSequence: T => Long) extends GraphStage[UniformFanInShape[T, T]] {
final class MergeSequence[T](val inputPorts: Int)(extractSequence: T => Long) extends GraphStage[UniformFanInShape[T, T]] {
private val in: IndexedSeq[Inlet[T]] = Vector.tabulate(inputPorts)(i => Inlet[T]("MergeSequence.in" + i))
private val out: Outlet[T] = Outlet("MergeSequence.out")
override val shape: UniformFanInShape[T, T] = UniformFanInShape(out, in: _*)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
package io.cloudstate.proxy.streams

import akka.NotUsed
import akka.actor.ActorSystem
import akka.stream.{ActorMaterializer, SourceShape}
import akka.stream.SourceShape
import akka.stream.scaladsl.{GraphDSL, Sink, Source}
import akka.testkit.TestKit
import org.scalatest.{AsyncWordSpecLike, Matchers}
Expand All @@ -11,8 +12,6 @@ import scala.concurrent.Future

class MergeSequenceSpec extends TestKit(ActorSystem("MergeSequenceSpec")) with AsyncWordSpecLike with Matchers {

private implicit val mat = ActorMaterializer()

"MergeSequence" should {
"merge interleaved streams" in {
merge(
Expand Down Expand Up @@ -65,19 +64,34 @@ class MergeSequenceSpec extends TestKit(ActorSystem("MergeSequenceSpec")) with A
)
}

"propagate errors" in recoverToSucceededIf[SpecificError] {
mergeSources(
Source(List(0L, 3L)),
Source(List(1L, 2L)).flatMapConcat {
case 1L => Source.single(1L)
case 2L => Source.failed(SpecificError())
}
)
}

}

private def merge(seqs: immutable.Seq[Long]*): Future[immutable.Seq[Long]] =
mergeSources(seqs.map(Source(_)): _*)

private def mergeSources(sources: Source[Long, NotUsed]*): Future[immutable.Seq[Long]] =
Source
.fromGraph(GraphDSL.create() { implicit builder =>
import GraphDSL.Implicits._
val merge = builder.add(new MergeSequence[Long](seqs.size)(identity))
seqs.foreach { seq =>
Source(seq) ~> merge
val merge = builder.add(new MergeSequence[Long](sources.size)(identity))
sources.foreach { source =>
source ~> merge
}

SourceShape(merge.out)
})
.runWith(Sink.seq)

private case class SpecificError() extends Exception

}

0 comments on commit 324882c

Please sign in to comment.