Skip to content

Commit

Permalink
Update to Ratpack 0.9.16 snapshot.
Browse files Browse the repository at this point in the history
  • Loading branch information
ldaley committed Apr 15, 2015
1 parent 2f1f6f1 commit e9e2d1f
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 57 deletions.
9 changes: 5 additions & 4 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,9 @@ scalaVersion := "2.11.4"

libraryDependencies ++= Seq(
"com.typesafe.akka" %% "akka-stream-experimental" % "0.10-M1",
"io.reactivex" % "rxjava-reactive-streams" % "0.3.0",
"io.ratpack" % "ratpack-rx" % "0.9.10-SNAPSHOT",
"io.ratpack" % "ratpack-test" % "0.9.10-SNAPSHOT",
"org.projectreactor" % "reactor-core" % "2.0.0.M1"
"io.reactivex" % "rxjava-reactive-streams" % "0.5.0",
"io.ratpack" % "ratpack-rx" % "0.9.16-SNAPSHOT",
"io.ratpack" % "ratpack-test" % "0.9.16-SNAPSHOT",
"org.projectreactor" % "reactor-core" % "2.0.0.M1",
"org.slf4j" % "slf4j-simple" % "1.7.12"
)
64 changes: 29 additions & 35 deletions src/main/java/com/rolandkuhn/rsinterop/JavaMain.java
Original file line number Diff line number Diff line change
@@ -1,49 +1,43 @@
package com.rolandkuhn.rsinterop;

import org.reactivestreams.Publisher;



import akka.actor.ActorSystem;
import akka.stream.FlowMaterializer;
import akka.stream.javadsl.Sink;
import akka.stream.javadsl.Source;
import org.reactivestreams.Publisher;
import ratpack.http.ResponseChunks;
import ratpack.rx.RxRatpack;
import ratpack.test.embed.EmbeddedApp;import reactor.rx.Stream;
import ratpack.test.embed.EmbeddedApp;
import reactor.rx.Stream;
import reactor.rx.Streams;
import rx.Observable;
import rx.RxReactiveStreams;


public class JavaMain {

public static void main(String[] args) {
final ActorSystem system = ActorSystem.create("InteropTest");
final FlowMaterializer mat = FlowMaterializer.create(system);
RxRatpack.initialize();

EmbeddedApp.fromHandler(ctx -> {
final Integer[] ints = new Integer[10];
for (int i = 0; i < ints.length; ++i) {
ints[i] = i;
}
// RxJava Observable
final Observable<Integer> intObs = Observable.from(ints);
// Reactive Streams Publisher
final Publisher<Integer> intPub = RxReactiveStreams.toPublisher(intObs);
// Akka Streams Source
final Source<String> stringSource = Source.from(intPub).map(Object::toString);
// Reactive Streams Publisher
final Publisher<String> stringPub = stringSource.runWith(Sink.<String>fanoutPublisher(1, 1), mat);
// Reactor Stream
final Stream<String> linesStream = Streams.create(stringPub).map(i -> i + "\n");
// and now render the HTTP response
ctx.render(ResponseChunks.stringChunks(linesStream));
}).test(client -> {
final String text = client.getText();
System.out.println(text);
system.shutdown();
});;
}
public static void main(String... args) throws Exception {
ActorSystem system = ActorSystem.create("InteropTest");
FlowMaterializer mat = FlowMaterializer.create(system);
RxRatpack.initialize();

EmbeddedApp.fromHandler(ctx -> {
final Integer[] ints = new Integer[10];
for (int i = 0; i < ints.length; ++i) {
ints[i] = i;
}
// RxJava Observable
final Observable<Integer> intObs = Observable.from(ints);
// Reactive Streams Publisher
final Publisher<Integer> intPub = RxReactiveStreams.toPublisher(intObs);
// Akka Streams Source
final Source<String> stringSource = Source.from(intPub).map(Object::toString);
// Reactive Streams Publisher
final Publisher<String> stringPub = stringSource.runWith(Sink.<String>fanoutPublisher(1, 1), mat);
// Reactor Stream
final Stream<String> linesStream = Streams.create(stringPub).map(i -> i + "\n");
// and now render the HTTP response
ctx.render(ResponseChunks.stringChunks(linesStream));
}).test(client ->
System.out.println(client.getText())
);
}
}
31 changes: 13 additions & 18 deletions src/main/scala/com/rolandkuhn/rsinterop/ScalaMain.scala
Original file line number Diff line number Diff line change
@@ -1,28 +1,25 @@
package com.rolandkuhn.rsinterop

import ratpack.rx.RxRatpack
import ratpack.test.embed.EmbeddedApp
import ratpack.handling.Handler
import ratpack.handling.Context
import rx.Observable
import scala.collection.JavaConverters._
import akka.stream.scaladsl.Flow
import akka.stream.scaladsl.Source
import rx.RxReactiveStreams
import akka.stream.scaladsl.Sink
import akka.actor.ActorSystem
import akka.stream.FlowMaterializer
import akka.stream.scaladsl.{Sink, Source}
import ratpack.func.Action
import ratpack.handling.{Context, Handler}
import ratpack.http.ResponseChunks
import java.util.function.Consumer
import ratpack.rx.RxRatpack
import ratpack.test.embed.EmbeddedApp
import ratpack.test.http.TestHttpClient
import reactor.rx.Streams
import rx.{Observable, RxReactiveStreams}

import scala.collection.JavaConverters._

object ScalaMain extends App {
val system = ActorSystem("InteropTest")
implicit val mat = FlowMaterializer()(system)

RxRatpack.initialize()

EmbeddedApp.fromHandler(new Handler {
override def handle(ctx: Context): Unit = {
// RxJava Observable
Expand All @@ -40,11 +37,9 @@ object ScalaMain extends App {
// and now render the HTTP response
ctx.render(ResponseChunks.stringChunks(linesStream))
}
}).test(new Consumer[TestHttpClient] {
override def accept(client: TestHttpClient): Unit = {
val text = client.getText()
println(text)
system.shutdown()
}).test(new Action[TestHttpClient] {
override def execute(client: TestHttpClient): Unit = {
println(client.getText())
}
})
}

0 comments on commit e9e2d1f

Please sign in to comment.