Skip to content

Latest commit

 

History

History
597 lines (470 loc) · 23.9 KB

javaspektrum-rsocket.adoc

File metadata and controls

597 lines (470 loc) · 23.9 KB

RSocket.io - Sockets on Stereoids

google image search: pouring coffee

pouring coffee with smoke cup coffee beans burlap sack black background 126277 480

Sockets sind ein integraler Bestandteil der Kommunikation von Prozessen, sowohl lokal auf einem Rechner aber insbesondere zwischen Maschinen über Netzwerkprotokolle.

Mit Rsocket.io ist seit 2015 ein Framework verfügbar dass die Socketprogrammierung mit dem Paradigma des "reaktiven Programmierung" (HungerXX) verknüpft.

Es bietet ein "point-to-point" Applikations-Binärprotokoll auf der OSI-Schicht 5/6 für verschiedene Transportmechanismen von direktem TCP bis zu Websockets und Aeron mit besonderem Fokus auf nicht-blockierende Operationen. Dabei wird sowohl Kommunikation zwischen Services als auch zwischen Client und Services unterstützt, was für moderne Anwendungsarchitekturen besonders interessant ist.

Folgende Kommunkationsmodelle werden unterstützt:

  • Request → Response
    Future<Payload> response = socketClient.requestResponse(requestPayload)

  • Request → Stream
    Publisher<Payload> response = socketClient.requestStream(requestPayload);

  • Fire and Forget
    Future<Void> completed = socketClient.fireAndForget(message);

  • Channel (Bidirektionale Streams)
    Publisher<Payload> output = socketClient.requestChannel(Publisher<Payload> input)

Dabei werden Nachrichtenpakete asynchron über eine einzige Verbindung (Connection) ausgetauscht. Länger laufende Interaktionen (Sessions) können wiederaufgenommen werden, was besonders für mobile Anwendungen, deren Konnektivität nicht immer stabil ist, interessant ist.

Die bekannten Mechanismen und APIs des reaktiven Programmiermodells, wie Observable, Publisher, Subscriber abstrahieren die darunterliegende Implementierung und bieten Eigenschaften wie Komposition, Flow-Control, Backpressure, Fehlerbehandlung usw. an.

Im Dokument zur Motivation [RSocketMotivation] werden die Ideen und Gründe für die Implementierung im Detail diskutiert.

Ein paar interessante Aspekte daraus sind:

  • Netzwerkkommunikation ist asynchron, RSocket übernimmt diesen Aspekt in die Anwendungsschicht

  • Lose-gekoppelte, nachrichtenbasierte System sind skalierbarer und robuster

  • Latenz und Resourceneffizienz sind Hauptgründe

  • Binärprotokoll zur effizienteren Resourcennutzung

  • Kapazitätsplanung vom Client mittels request(n) kann Puffergrößen bestimmen

  • Kapazitätsmanagement zwischen Servern mittels Leases kann smart routing ermöglichen

  • Request-Response ist Sonderfall von Stream mit Größe 1

  • Fehler werden als Nachrichten verschickt

  • Minimierung von Handshakes verbessert Latenz und vereinfacht Protokoll

  • Multiplexing über eine einzelne Netzwerkverbindung ist fairer gegenüber anderen Anwendungen

  • Frühzeitiger Abbruch aller Arten von Verbindungen ermöglicht zeitnahe Resourcenfreigabe

Traditionell ist Socketprogrammierung in Java eine "synchrone" Angelegenheit, bei der ein Server auf einem Port lauscht und bei Zustandekommen einer Verbindung mit einem Client die Verarbeitung des Datenaustauschs an einen Thread delegiert. (siehe Listing 1)

Dabei ist während Schreibens auf den Socket der Thread blockiert sobald der Netzwerkpuffer voll ist und die Bytes die Reise übers Netzwerk antreten müssen. Beim Lesen wird blockiert sobald der Netzwerkpuffer leer ist und auf neue Daten gewartet werden muss.

Da dabei ein ganzer Thread (halbe CPU) in Anspruch genommen wird, ist die ganze Angelegenheit nicht sehr skalierbar.

Seit Java 1.4 gibts es zwar NIO (non-blocking-IO) aber das ist bei der Socket-Programmierung nur bedingt angekommen. Es wird zwar z.B. in Netty für "pipelined processing" genutzt, wo ähnlich wie in Node.js die Verarbeitung in einem "Event-Loop" gehandhabt wird.

Im darunterliegenden Socket-Code, kann ein Socket als nicht-blockierend [SocketsNonBlocking] markiert werden, dann sind typische Operationen wie read, write und accept nicht blockierend, sondern geben die Kontrolle zurück an den Aufrufer.

Stattdessen werden in einer Liste von Sockets mittels des select Systemaufrufs diejenigen Sockets markiert, die bereit sind Daten zu lesen oder zu schreiben. Modernere Systemimplementierungen nehmen dafür epoll, ein eventbasiertes Notifikationssystem für nichtblockierende Sockets.

Beispiel für "traditionelle" Socket Programmierung in Java
import java.net.*;
import java.io.*;
volatile boolean isStopped = false;
var serverSocket = new ServerSocket(8001);
while(!isStopped) {
    var clientSocket = serverSocket.accept();
    new Thread(() -> {
       try (var input  = clientSocket.getInputStream();
            var output = clientSocket.getOutputStream()) {
         var buffer = new byte[4];
         input.read(buffer);
         if ("stop".equalsIgnoreCase(new String(buffer,"UTF-8")))
            isStopped = true;
         else {
            var message = String.format("Time: %d%n",System.currentTimeMillis());
            output.write(message.getBytes());
         }
       } catch(Exception e) {
           System.err.println(e.getMessage());
       }
    }).start();
};
serverSocket.close();

Erste Schritte mit RSocket

Es gibt ein hilfreiches Kommandozeilentool namens rsc von Toshiaki Maki [RSC] das man zum Testen von RSocket Servern nutzen kann.

Nach dem Download der JAR-Datei kann es direkt gestartet werden.

Wir benutzten hier einen [DemoServer] https://rsocket-demo.herokuapp.com der für verschiedene Interaktonsmodelle einen Stream von Twitter Daten bereitstellt.

curl -L -o rsc.jar https://github.com/making/rsc/releases/download/0.6.1/rsc-0.6.1.jar

# alle Optionen anzeigen
java -jar rsc.jar

# erste 10 Antworten als Stream vom Demoserver konsumieren
java -jar rsc.jar --stream --take 10 "wss://rsocket-demo.herokuapp.com/rsocket"

Man kann mittels --request, --channel, --fnf zwischen den Interaktionsmodellen wählen und auch andere Attribute des Streams, wie Verzögerung, Limits kontrollieren. Authentifizierung und das Senden von Daten werden auch unterstützt.

RSocket in Java

Um RSocket in Java zu nutzen, benötigt man zwei Bibliotheken von Maven Central io.rsocket:rsocket-core und io.rsocket:rsocket-transport-netty für den asynchronen Transportkanal über Netty.

RSocket in Java benutzt die Project-Reactor Bibliothek von VMWare für reaktive Implementierung, daher werden Mono und Flux auch direkt in der API verwendet.

Dabei stellt ein Flux<T> einen reaktive Quelle von Nachrichten T dar, und bietet viele Operationen (map, filter, …​) darauf an. Ein Mono<T> beinhaltet dagegen maximal ein Element.

Damit können dann Client und Server-Anwendungen implementiert werden. Unser Beispiel-Client ist das Äquivalent des rsc Aufrufes.

RSocket-Java Client
// io.rsocket:rsocket-core:1.0.2
// io.rsocket:rsocket-transport-netty:1.0.2

var demoServer = URI.create("wss://rsocket-demo.herokuapp.com/rsocket");
// Adresse für die Transportschicht
var ws = WebsocketClientTransport.create(demoServer);
// Cient verbinden, hier blockierend, normalerweise asynchron
var client = RSocketConnector.connectWith(ws).block();

var payload = DefaultPayload.create("peace");
// Stream anfordern
Flux<Payload> s = client.requestStream(payload);

// Wir brauchen eine Barriere, die solange blockiert, bis
// der Stream fertig ist.
var latch = new CountDownLatch(1);

// 10 Elemente empfangen und als UTF-8 Strings ausgeben
s.take(10)
   // Barriere öffnen, sobald der Stream "fertig" ist
   .doOnComplete(latch::countDown)
   // Ohne subscription fließen keine Daten.
   .subscribe(p -> System.out.println(p.getDataUtf8()));

// Ohne Barriere wäre das Programm vor dem Stream zu Ende.
latch.await();

client.dispose();

Server

In den folgenden Beispielen wird zusätzlich das [ReactorTest] Modul (io.projectreactor:reactor-test:3.3.9.RELEASE), ebenfalls aus Project-Reactor benutzt.

Es stellt einen Step-Verifier zur Verfügung, der explizite, blockierende Calls vermeidet und so gar nicht erst in Versuchung führt, doch wieder blockierenden Code zu Schreiben.

Der Server ist ein generischer RSocket Server, dem über einen SocketAcceptor eine Implementierung von RSocket mitgebeben wird, einem Interface, dass die verschiedenen Kommunikationsarten unterstützt. Standardmässig sind diese über default NO-OP-Methoden vorimplementiert. Der RSocketServer wird dann an ein oder mehrere Transportmechanismen und Adressen gebunden und steht asynchron bereit.

RSocketServer.create(SocketAcceptor.with(new RSocket(){}))
      .bind(TcpServerTransport.create("localhost", 7000))
      .block();

Für einen echten Server, der auch etwas tut, müssen wir das Interface natürlich implementieren. Zuerst einmal für ein simples Echo Request-Response.

Request Response

Die Payload Klasse kapselt binäre Daten in RSocket über ByteBuffer, byte-Arrays oder Netty’s ByteBuf, für Strings gibt es Hilfsmethoden in DefaultPayLoad. Damit können diese ggf. bereitgestellt werden ohne, dass Kopien angelegt werden müssen.

Request Response Server
public class Server {
	public static void main(String...a) {
		var handler = new RSocket() {
			// Mono statt Flux -> einzelne Antwort (Response)
			@Override
			public Mono<Payload> requestResponse(Payload payload) {
				try {
					// Hilfsmethode um String aus Binärdaten zu erzeugen
					var text = payload.getDataUtf8();
               var response = text.substring(1).toUpperCase();
					// Und als Echo zurückgeben
					return Mono.just(DefaultPayload.create(response));
				} catch (Exception x) {
					// Fehler werden auch als Nachricht zurückgeschickt
					return Mono.error(x);
				}
			}
		};

		RSocketServer.create(SocketAcceptor.with(handler))
			.bind(TcpServerTransport.create("localhost", 7000))
			.onClose().block(); // Starte den Server
	}
}

Unser Client ist auch nicht kompliziert, er konstruiert einen RSocket, über die dann beliebig viele Nachrichten fliessen können.

Request-Response Client
var socket = RSocketConnector.create()
   .connect(TcpClientTransport.create("localhost", 7000))
   .block();

var text = "Hello RSocket!";

socket.requestResponse(DefaultPayload.create(text))
   .map(Payload::getDataUtf8)
   .log()
   .as(StepVerifier::create)
   .expectNextCount(1)
   .verifyComplete();

socket.requestResponse(DefaultPayload.create(""))
   .doOnError(System.err::println)
   .as(StepVerifier::create)
   .expectError()
   .verify();

socket.dispose();

Wenn wir unserer Server mit einem leeren String aufrufen, führt das zu einem Fehler, der im Client mittels onError behandelt wird.

[reactor-tcp-epoll-2] INFO reactor.Mono.Map.1 - onNext(ELLO RSOCKET!)
[reactor-tcp-epoll-2] INFO reactor.Mono.Map.1 - onComplete()

[reactor-tcp-epoll-2] ERROR reactor.Mono.Map.2 - onError(ApplicationErrorException (0x201): begin 1, end 0, length 0)
[reactor-tcp-epoll-2] ERROR reactor.Mono.Map.2 -
ApplicationErrorException (0x201): begin 1, end 0, length 0
	at io.rsocket.exceptions.Exceptions.from(Exceptions.java:76)
	at io.rsocket.core.RSocketRequester.handleFrame(RSocketRequester.java:245)
	at io.rsocket.core.RSocketRequester.handleIncomingFrames(RSocketRequester.java:195)
	at reactor.core.publisher.LambdaSubscriber.onNext(LambdaSubscriber.java:160)

Request Stream

Request-Stream ist der Interaktionsmodus, der im reaktiven Umfeld bevorzugt wird. Dann wird kein Mono sondern ein Flux<Payload> zurückgeliefert.

In unserem Fall zerlegen wir den String in seine Bestandteile und geben diese als sortierten Stream zurück. Das folgende Listing zeigt nur noch den Handler, nicht mehr den orchestrierenden Code.

var handler = new RSocket() {
	@Override
	public Flux<Payload> requestStream(Payload payload) {
		return Flux.fromStream(
			payload.getDataUtf8().chars().sorted()
				.mapToObj(Character::toString))
			.map(DefaultPayload::create);
	}
};

Der Client sieht fast genauso aus wie vorher, wir transformieren nur den empfangenen Flux wieder zurück in einen String.

socket
	.requestStream(DefaultPayload.create(text))
	.map(Payload::getDataUtf8)
	.log()
	.collectList()
	.map(chars -> String.join("", chars))
	.as(StepVerifier::create)
	.expectNext(" !HRSceeklloot")
	.verifyComplete();

Das Ergebnis von Hello RSocket! ist dann: ` !HRSceeklloot`, passend zu Halloween.

Fire and Forget

Informative Nachrichten auf deren Ergebnis man nicht warten will, und die auch mal verloren gehen können, wie Status- oder Metrikinformationen können sehr effizient gehandhabt werden, da der komplette Rückkanal eingespart wird.

Fire and Forget Server
var handler = new RSocket() {

	SubmissionPublisher<Payload> eventPublisher = new SubmissionPublisher<>();

	@Override
	public Mono<Void> fireAndForget(Payload payload) {
		System.err.printf("Received fire-and-forget %d%n",payload.getData().getInt());
		// weiterleiten, z.B. zu Event Benachrichtigungen
		eventPublisher.submit(payload);
		return Mono.empty();
	}
};

Im Client ist ähnlich wie bisher die entsprechende Methode des RSocketClient interfaces aufzurufen.

Fire and Forget Client
var socket = RSocketConnector.create()
   .connect(TcpClientTransport.create("localhost", 7000))
   .block();

var random = ThreadLocalRandom.current();
var data = IntStream.generate(random::nextInt).boxed();

// Buffer für ein Integer
ByteBuffer buffer = ByteBuffer.allocate(4);
Flux.fromStream(data)
    // Alle 50 millisekunden werden zufällige Daten geschickt
   .delayElements(Duration.ofMillis(50))
   .take(25)
   .log()
   .map(num -> buffer.clear().putInt(num).rewind())
   .map(buf -> DefaultPayload.create(buf))
   .flatMap(socket::fireAndForget)
   .as(StepVerifier::create)
   // Wir können nichts erwarten, da `fireAndForget` leere Monos zurück gibt.
   .verifyComplete();

socket.dispose();

Bidirektionaler Channel

Der bidirektionale Kanal ist eine wichtige Neuerung in RSocket, die es erlaubt auf nur einer Connection die asynchrone Kommunikation zwischen Client und Server zu multiplexen.

Dazu wird eine requestChannel Methode in unserem Server-Interface implementiert, die entsprechende streaming Interfaces als Parameter Publisher<Payload> und Rückgabetyp Flux<Payload> haben.

Im Parameter werden die Nachrichten vom Client zur Verfügung gestellt, die Rückkanäle werden pro Client verwaltet und für die Dauer der Sitzung im Server gehalten.

Channel-Server
ChannelController channelController = new ChannelController();

@Override
public Flux<Payload> requestChannel(Publisher<Payload> client) {
   Flux.from(client)
            .subscribe(channelController::processPayload);
   return Flux.from(channelController);
}

static class ChannelController implements Publisher<Payload> {
   List<Subscriber<? super Payload>> clients = new ArrayList<>();

   @Override
   public void subscribe(Subscriber<? super Payload> subscriber) {
         clients.add(subscriber);
   }

   public void processPayload(Payload payload) {
         System.out.println("received payload = " + payload.getDataUtf8());
         clients.forEach(s -> s.onNext(payload));
   }
}

Der Client sieht ähnlich aus, nur dass er im requestChannel einen Publisher übergeben bekommt, der den Kanal zum Server repräsentiert und an den Daten geschickt werden können, sobald die Anmeldung erfolgt ist.

Der Rückgabewert Flux<Payload> der Methode stellt den Rückkanal dar, über den der Client die Nachrichten vom Server bekommt, die dann mit einem Subscriber in onSubscribe, onNext, onError, onComplete Methoden verarbeitet werden können.

var name = "Client " + Instant.now();
System.out.println("I am client "+name);
var socket = RSocketConnector
         .connectWith(TcpClientTransport.create("localhost", 7000))
         .block();

var subscription = socket.requestChannel(new Publisher<Payload>() {
   @Override
   public void subscribe(Subscriber<? super Payload> subscriber) {
         var rnd = ThreadLocalRandom.current();
         var stream = IntStream.generate(() -> rnd.nextInt(10)).boxed();
         Flux.fromStream(stream)
               .take(10)
               .delayElements(Duration.ofMillis(1000))
               .log()
               .map(i -> DefaultPayload.create(String.format("%s: %d", name, i)))
               .subscribe(subscriber);

   }
})
// Empfangene Daten
.subscribe(p -> System.out.println("[" + name + "] received " + p.getDataUtf8()));

System.out.println("Hit return to stop client");
System.in.read();
subscription.dispose();
socket.dispose();

Spring und RSocket

Da RSocket auf Reactor basiert und VMWare/Pivotal auch stark in der RSocket Implementierung involviert waren, wundert es nicht, dass das Spring Framework mit Spring Boot, eine RSocket Implementierung von Hause aus mitbringt.

Man kann sich bei https://start.spring.io die Demo-Anwendung mit RSocket als Dependency zusammenklicken und herunterladen. Das fügt spring-boot-starter-rsocket hinzu, der die RSocket Bibliotheken und Spring Integration bereitstellt.

Wir benötigen noch eine Konfiguration in application.properties

application.properties
spring.rsocket.server.port=7000
spring.main.lazy-initialization=true

Für den Server reicht ein Controller, der über die MessageMapping Annotation eine RSocket Route rsocket-test implementiert. Somit können innerhalb eines Servers mehrere Routen existieren. Da sowohl Parameter als auch Rückgabewert einfache Message Typen sind, ist dies ein Request-Response Endpunkt, als Serialisierung wird JSON genutzt.

RSocket Spring Controller
// Message Java Bean
class Message {
   private String text;
   public void setText(String text) { this.text = text; }
   public String getText() { return text; }
}

@Controller
public class RSocketController {
    private final Log log = LogFactory.getLog(RSocketController.class);

    @MessageMapping("rsocket-test")
    Message requestResponse(Message request) {
        log.info("Received request-response request: " + request.getText());
        var response = new Message();
        response.setText(request.getText().toUpperCase());
        return response;
    }
}

Der Server wird als Spring-Boot Anwendung ausgeführt und kann mittels rsc oder anderen RSocket Clients getestet werden.

./mvnw spring-boot:run

java -jar rsc.jar -d'{"text":"Hello Spring"}' --route rsocket-test  tcp://localhost:7001
{"text":"HELLO SPRING"}

RSocket in anderen Frameworks

Für andere populäre Frameworks wie Quarkus und Micronaut ist RSocket auf der Feature-Request Liste und leider noch nicht verfügbar.

Um entfernte Methodenaufrufe (RPC) über RSocket abzuwickeln gibt es ein dediziertes Modul [RSocket-RPC], das diese Funktionalität mit einer Protobuf Serialisierung implementiert.

Eine umfangreiche Liste anderer Bibliotheken und Integrationen ist unter [RSocket Ressourcen] verfügbar.

Vielen Dank an Michael Simons für das Feedback und die Hilfe mit den Code Beispielen.

Referenzen