Skip to content
Jon Brisbin edited this page Aug 27, 2013 · 3 revisions

TcpClient

Reactor includes a powerful but easy-to-use TCP client that allows you to asynchronously communicate with TCP servers. The Codec support is common between the TcpClient and TcpServer, so whatever codecs work for the server will also work for the client. The TcpClient also includes support for heartbeat during connection idle times and intelligent reconnect in case of a connection being dropped.

Connecting to a Server

Like with other Reactor components, the TcpClient isn’t created directly but is obtained through configuration of a TcpClientSpec. The only implementation included with Reactor by default is a Netty 4 implementation of TcpClient but it’s important to note that other implementations could be created.

Create a TcpClientSpec, configure it with the appropriate codec and handler, and connect to the server:

Environment env = new Environment();

// create a spec using the Netty-based client
TcpClient<String, String> client = new TcpClientSpec<String, String>(NettyTcpClient.class)
    .env(env)
    .codec(StandardCodecs.LINE_FEED_CODEC)
    .connect("localhost", 8080)
    .get();

client.open().consume(new Consumer<TcpConnection<String, String>>() {
  public void accept(TcpConnection<String, String> conn) {

    conn.in().consume(new Consumer<String>() {
      public void accept(String line) {
        // handle lines of incoming data
      }
    });

    // write data to the server
    conn.send("Hello World!");

  }
});

Smart Reconnect

The zero-arg open() method returns a Promise<TcpConnection<IN, OUT>> which will be fulfilled when the connection is made. A Promise is only fulfilled once, so the handler that will accept this connection will only ever be invoked the first time a connection is made using this client. If the transaction is short-lived and the network is reliable, that should be sufficient. But if the connections are long-lived (granted, a term which is open to interpretation) there is a good chance that the connection will be dropped for some reason.

Reactor’s TcpClient offers a smart reconnect option that allows you to specify whether or not, or how long to wait to attempt a reconnect and it’s even possible to change the host to which a reconnect attempt is made if your reconnect algorithm involves switching among a set of hosts to which you want to stay connected in a high-availability situation.

To use the reconnect functionality, you need to pass an implementation of the Reconnect interface when you open a connection:

Environment env = new Environment();

TcpClient<String, String> client = new TcpClientSpec<String, String>(NettyTcpClient.class)
    .env(env)
    .codec(StandardCodecs.LINE_FEED_CODEC)
    .connect("localhost", 8080)
    .get();

Stream<TcpConnection<String, String>> connections = client.open(new Reconnect() {
  public Tuple2<InetSocketAddress, Long> reconnect(InetSocketAddress currentAddress, int attempt) {

    // try reconnecting 3 times
    switch (attempt) {
      case 1:
        return Tuple.of(currentAddress, 100L);
      case 2:
        return Tuple.of(currentAddress, 500L);
      case 3:
        return Tuple.of(currentAddress, 1000L);
      default:
        // try to connect somewhere else
        return Tuple.of(nextHost(currentAddress), 0L);
    }

  }
});

connections.consume(new Consumer<TcpConnection<String, String>>() {
  public void accept(TcpConnection<String, String> conn) {
    // handle each connection, including reconnects
  }
});

Heartbeats

The Netty client has an idle timout ChannelListener which is exposed in Reactor via the ConsumerSpec returned from TcpClient.on(). The ConsumerSpec is a simple way to wire handlers to events that a component might emit throughout its lifetime. The TcpClient ConsumerSpec exposes three events: close, readIdle, and writeIdle.

To attach a handler to that event, call the appropriate method (using the Stream created in the example above):

connections.consume(new Consumer<TcpConnection<String, String>>() {
  public void accept(final TcpConnection<String, String> conn) {

    // handle each connection, including reconnects
    conn.on()
      .close(new Runnable() {
        public void run() {
          // handle connection close event
        }
      })
      .readIdle(5000, new Runnable() {
        public void run() {
          // no data read for 5 seconds, ping client
          conn.send("ping");
        }
      })
      .writeIdle(5000, new Runnable() {
        public void run() {
          // no data written for 5 seconds, ping client
          conn.send("ping");
        }
      })

  }
});