From 8d58cc2f63f96d5c0f5596ae908c50dafa2fc1ce Mon Sep 17 00:00:00 2001 From: He-Pin Date: Sat, 15 Jul 2023 21:39:02 +0800 Subject: [PATCH 1/3] !test Migrate multi node testkit to Netty 4. Signed-off-by: He-Pin --- cluster/src/multi-jvm/resources/logback.xml | 13 ++ .../migrate-to-netty4.backwards.excludes | 57 ++++++ .../remote/testconductor/Conductor.scala | 48 +++--- .../remote/testconductor/DataTypes.scala | 23 ++- .../pekko/remote/testconductor/Player.scala | 81 +++++---- .../testconductor/RemoteConnection.scala | 163 +++++++++++------- .../pekko/remote/testkit/MultiNodeSpec.scala | 5 +- project/Dependencies.scala | 10 +- project/MultiNode.scala | 1 + project/SbtMultiJvm.scala | 9 +- 10 files changed, 264 insertions(+), 146 deletions(-) create mode 100644 cluster/src/multi-jvm/resources/logback.xml create mode 100644 multi-node-testkit/src/main/mima-filters/1.1.0.backwards.excludes/migrate-to-netty4.backwards.excludes diff --git a/cluster/src/multi-jvm/resources/logback.xml b/cluster/src/multi-jvm/resources/logback.xml new file mode 100644 index 00000000000..5872be5c2df --- /dev/null +++ b/cluster/src/multi-jvm/resources/logback.xml @@ -0,0 +1,13 @@ + + + + %d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n + + + + + + + + + \ No newline at end of file diff --git a/multi-node-testkit/src/main/mima-filters/1.1.0.backwards.excludes/migrate-to-netty4.backwards.excludes b/multi-node-testkit/src/main/mima-filters/1.1.0.backwards.excludes/migrate-to-netty4.backwards.excludes new file mode 100644 index 00000000000..7b3f57d4965 --- /dev/null +++ b/multi-node-testkit/src/main/mima-filters/1.1.0.backwards.excludes/migrate-to-netty4.backwards.excludes @@ -0,0 +1,57 @@ +# Migrate to netty 4 +ProblemFilters.exclude[IncompatibleTemplateDefProblem]("org.apache.pekko.remote.testconductor.RemoteConnection") +ProblemFilters.exclude[MissingTypesProblem]("org.apache.pekko.remote.testconductor.PlayerHandler") +ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.remote.testconductor.PlayerHandler.channelOpen") +ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.remote.testconductor.PlayerHandler.channelClosed") +ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.remote.testconductor.PlayerHandler.channelBound") +ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.remote.testconductor.PlayerHandler.channelUnbound") +ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.remote.testconductor.PlayerHandler.writeComplete") +ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.pekko.remote.testconductor.PlayerHandler.exceptionCaught") +ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.remote.testconductor.PlayerHandler.channelConnected") +ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.remote.testconductor.PlayerHandler.channelDisconnected") +ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.remote.testconductor.PlayerHandler.messageReceived") +ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.pekko.remote.testconductor.ServerFSM.channel") +ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.pekko.remote.testconductor.ServerFSM.this") +ProblemFilters.exclude[MissingTypesProblem]("org.apache.pekko.remote.testconductor.ConductorHandler") +ProblemFilters.exclude[IncompatibleSignatureProblem]("org.apache.pekko.remote.testconductor.ConductorHandler.clients") +ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.remote.testconductor.ConductorHandler.channelConnected") +ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.remote.testconductor.ConductorHandler.channelDisconnected") +ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.remote.testconductor.ConductorHandler.messageReceived") +ProblemFilters.exclude[MissingTypesProblem]("org.apache.pekko.remote.testconductor.ProtobufDecoder") +ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.pekko.remote.testconductor.ProtobufDecoder.decode") +ProblemFilters.exclude[MissingTypesProblem]("org.apache.pekko.remote.testconductor.ProtobufEncoder") +ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.pekko.remote.testconductor.ProtobufEncoder.encode") +ProblemFilters.exclude[MissingTypesProblem]("org.apache.pekko.remote.testconductor.MsgEncoder") +ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.pekko.remote.testconductor.MsgEncoder.encode") +ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.pekko.remote.testconductor.RemoteConnection.apply") +ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.pekko.remote.testconductor.RemoteConnection.getAddrString") +ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.remote.testconductor.RemoteConnection.shutdown") +ProblemFilters.exclude[MissingTypesProblem]("org.apache.pekko.remote.testconductor.TestConductorPipelineFactory") +ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.remote.testconductor.TestConductorPipelineFactory.getPipeline") +ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.pekko.remote.testconductor.TestConductorPipelineFactory.this") +ProblemFilters.exclude[MissingTypesProblem]("org.apache.pekko.remote.testconductor.MsgDecoder") +ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.pekko.remote.testconductor.MsgDecoder.decode") +ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.pekko.remote.testconductor.Controller.connection") +ProblemFilters.exclude[IncompatibleSignatureProblem]("org.apache.pekko.remote.testconductor.ClientFSM#Data.apply") +ProblemFilters.exclude[IncompatibleSignatureProblem]("org.apache.pekko.remote.testconductor.ClientFSM#Data.unapply") +ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.pekko.remote.testconductor.Controller#CreateServerFSM.apply") +ProblemFilters.exclude[IncompatibleSignatureProblem]("org.apache.pekko.remote.testconductor.Controller#CreateServerFSM.unapply") +ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.pekko.remote.testconductor.ClientFSM#Connected.apply") +ProblemFilters.exclude[IncompatibleSignatureProblem]("org.apache.pekko.remote.testconductor.ClientFSM#Connected.unapply") +ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.pekko.remote.testconductor.Controller#CreateServerFSM.channel") +ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.pekko.remote.testconductor.Controller#CreateServerFSM.copy") +ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.pekko.remote.testconductor.Controller#CreateServerFSM.this") +ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.pekko.remote.testconductor.ClientFSM#Connected.channel") +ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.pekko.remote.testconductor.ClientFSM#Connected.copy") +ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.pekko.remote.testconductor.ClientFSM#Connected.copy$default$1") +ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.pekko.remote.testconductor.ClientFSM#Connected.this") +ProblemFilters.exclude[IncompatibleSignatureProblem]("org.apache.pekko.remote.testconductor.ClientFSM#Data.channel") +ProblemFilters.exclude[IncompatibleSignatureProblem]("org.apache.pekko.remote.testconductor.ClientFSM#Data.copy") +ProblemFilters.exclude[IncompatibleSignatureProblem]("org.apache.pekko.remote.testconductor.ClientFSM#Data.copy$default$1") +ProblemFilters.exclude[IncompatibleSignatureProblem]("org.apache.pekko.remote.testconductor.ClientFSM#Data.this") +ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.pekko.remote.testconductor.Controller#CreateServerFSM.copy$default$1") + +# For Scala 3 these are also needed +ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.pekko.remote.testconductor.ClientFSM#Connected._1") +ProblemFilters.exclude[IncompatibleSignatureProblem]("org.apache.pekko.remote.testconductor.ClientFSM#Data._1") +ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.pekko.remote.testconductor.Controller#CreateServerFSM._1") diff --git a/multi-node-testkit/src/main/scala/org/apache/pekko/remote/testconductor/Conductor.scala b/multi-node-testkit/src/main/scala/org/apache/pekko/remote/testconductor/Conductor.scala index 3bd160ae16b..edaf69c7a93 100644 --- a/multi-node-testkit/src/main/scala/org/apache/pekko/remote/testconductor/Conductor.scala +++ b/multi-node-testkit/src/main/scala/org/apache/pekko/remote/testconductor/Conductor.scala @@ -16,6 +16,7 @@ package org.apache.pekko.remote.testconductor import java.net.InetSocketAddress import java.util.concurrent.ConcurrentHashMap +import scala.annotation.nowarn import scala.concurrent.Await import scala.concurrent.Future import scala.concurrent.duration._ @@ -23,18 +24,13 @@ import scala.reflect.classTag import scala.util.control.NoStackTrace import RemoteConnection.getAddrString +import io.netty.channel.{ Channel, ChannelHandlerContext, ChannelInboundHandlerAdapter } +import io.netty.channel.ChannelHandler.Sharable import language.postfixOps -import org.jboss.netty.channel.{ - Channel, - ChannelHandlerContext, - ChannelStateEvent, - MessageEvent, - SimpleChannelUpstreamHandler -} import org.apache.pekko -import pekko.PekkoException import pekko.ConfigurationException +import pekko.PekkoException import pekko.actor.{ Actor, ActorRef, @@ -286,32 +282,33 @@ trait Conductor { this: TestConductorExt => * * INTERNAL API. */ +@Sharable private[pekko] class ConductorHandler(_createTimeout: Timeout, controller: ActorRef, log: LoggingAdapter) - extends SimpleChannelUpstreamHandler { + extends ChannelInboundHandlerAdapter { implicit val createTimeout: Timeout = _createTimeout val clients = new ConcurrentHashMap[Channel, ActorRef]() - override def channelConnected(ctx: ChannelHandlerContext, event: ChannelStateEvent) = { - val channel = event.getChannel + override def channelActive(ctx: ChannelHandlerContext): Unit = { + val channel = ctx.channel() log.debug("connection from {}", getAddrString(channel)) val fsm: ActorRef = Await.result((controller ? Controller.CreateServerFSM(channel)).mapTo(classTag[ActorRef]), Duration.Inf) clients.put(channel, fsm) } - override def channelDisconnected(ctx: ChannelHandlerContext, event: ChannelStateEvent) = { - val channel = event.getChannel + override def channelInactive(ctx: ChannelHandlerContext): Unit = { + val channel = ctx.channel() log.debug("disconnect from {}", getAddrString(channel)) val fsm = clients.get(channel) fsm ! Controller.ClientDisconnected clients.remove(channel) } - override def messageReceived(ctx: ChannelHandlerContext, event: MessageEvent) = { - val channel = event.getChannel - log.debug("message from {}: {}", getAddrString(channel), event.getMessage) - event.getMessage match { + override def channelRead(ctx: ChannelHandlerContext, msg: AnyRef): Unit = { + val channel = ctx.channel() + log.debug("message from {}: {}", getAddrString(channel), msg) + msg match { case msg: NetworkOp => clients.get(channel) ! msg case msg => @@ -320,6 +317,11 @@ private[pekko] class ConductorHandler(_createTimeout: Timeout, controller: Actor } } + @nowarn("msg=deprecated") + override def exceptionCaught(ctx: ChannelHandlerContext, cause: Throwable): Unit = { + log.error("channel {} exception {}", ctx.channel(), cause) + ctx.close() + } } /** @@ -398,10 +400,10 @@ private[pekko] class ServerFSM(val controller: ActorRef, val channel: Channel) log.warning("client {} sent unsupported message {}", getAddrString(channel), msg) stop() case Event(ToClient(msg: UnconfirmedClientOp), _) => - channel.write(msg) + channel.writeAndFlush(msg) stay() case Event(ToClient(msg), None) => - channel.write(msg) + channel.writeAndFlush(msg) stay().using(Some(sender())) case Event(ToClient(msg), _) => log.warning("cannot send {} while waiting for previous ACK", msg) @@ -436,7 +438,7 @@ private[pekko] class Controller(private var initialParticipants: Int, controller import Controller._ val settings = TestConductor().Settings - val connection = RemoteConnection( + val connection: RemoteConnection = RemoteConnection( Server, controllerPort, settings.ServerSocketWorkerPoolSize, @@ -472,7 +474,7 @@ private[pekko] class Controller(private var initialParticipants: Int, controller override def receive = LoggingReceive { case CreateServerFSM(channel) => - val (ip, port) = channel.getRemoteAddress match { + val (ip, port) = channel.remoteAddress() match { case s: InetSocketAddress => (s.getAddress.getHostAddress, s.getPort) case _ => throw new RuntimeException() // compiler exhaustiveness check pleaser } @@ -526,11 +528,11 @@ private[pekko] class Controller(private var initialParticipants: Int, controller barrier ! BarrierCoordinator.RemoveClient(node) } case GetNodes => sender() ! nodes.keys - case GetSockAddr => sender() ! connection.getLocalAddress + case GetSockAddr => sender() ! connection.channel.localAddress() } override def postStop(): Unit = { - RemoteConnection.shutdown(connection) + connection.shutdown() } } diff --git a/multi-node-testkit/src/main/scala/org/apache/pekko/remote/testconductor/DataTypes.scala b/multi-node-testkit/src/main/scala/org/apache/pekko/remote/testconductor/DataTypes.scala index 9e33176d8db..2184ed419bd 100644 --- a/multi-node-testkit/src/main/scala/org/apache/pekko/remote/testconductor/DataTypes.scala +++ b/multi-node-testkit/src/main/scala/org/apache/pekko/remote/testconductor/DataTypes.scala @@ -15,12 +15,9 @@ package org.apache.pekko.remote.testconductor import scala.concurrent.duration._ +import io.netty.channel.ChannelHandlerContext +import io.netty.handler.codec.{ MessageToMessageDecoder, MessageToMessageEncoder } import language.implicitConversions -import org.jboss.netty.channel.Channel -import org.jboss.netty.channel.ChannelHandlerContext -import org.jboss.netty.handler.codec.oneone.OneToOneDecoder -import org.jboss.netty.handler.codec.oneone.OneToOneEncoder - import org.apache.pekko import pekko.actor.Address import pekko.remote.testconductor.{ TestConductorProtocol => TCP } @@ -74,7 +71,7 @@ private[pekko] case object Done extends Done { private[pekko] final case class Remove(node: RoleName) extends CommandOp -private[pekko] class MsgEncoder extends OneToOneEncoder { +private[pekko] class MsgEncoder extends MessageToMessageEncoder[AnyRef] { implicit def address2proto(addr: Address): TCP.Address = TCP.Address.newBuilder @@ -90,7 +87,11 @@ private[pekko] class MsgEncoder extends OneToOneEncoder { case Direction.Both => TCP.Direction.Both } - def encode(ctx: ChannelHandlerContext, ch: Channel, msg: AnyRef): AnyRef = msg match { + override def encode(ctx: ChannelHandlerContext, msg: AnyRef, out: java.util.List[AnyRef]): Unit = { + out.add(encode0(msg)) + } + + private def encode0(msg: AnyRef): AnyRef = msg match { case x: NetworkOp => val w = TCP.Wrapper.newBuilder x match { @@ -136,7 +137,7 @@ private[pekko] class MsgEncoder extends OneToOneEncoder { } } -private[pekko] class MsgDecoder extends OneToOneDecoder { +private[pekko] class MsgDecoder extends MessageToMessageDecoder[AnyRef] { implicit def address2scala(addr: TCP.Address): Address = Address(addr.getProtocol, addr.getSystem, addr.getHost, addr.getPort) @@ -147,7 +148,11 @@ private[pekko] class MsgDecoder extends OneToOneDecoder { case TCP.Direction.Both => Direction.Both } - def decode(ctx: ChannelHandlerContext, ch: Channel, msg: AnyRef): AnyRef = msg match { + override def decode(ctx: ChannelHandlerContext, msg: AnyRef, out: java.util.List[AnyRef]): Unit = { + out.add(decode0(msg)) + } + + private def decode0(msg: AnyRef): AnyRef = msg match { case w: TCP.Wrapper if w.getAllFields.size == 1 => if (w.hasHello) { val h = w.getHello diff --git a/multi-node-testkit/src/main/scala/org/apache/pekko/remote/testconductor/Player.scala b/multi-node-testkit/src/main/scala/org/apache/pekko/remote/testconductor/Player.scala index c81371e56f8..6b10b319532 100644 --- a/multi-node-testkit/src/main/scala/org/apache/pekko/remote/testconductor/Player.scala +++ b/multi-node-testkit/src/main/scala/org/apache/pekko/remote/testconductor/Player.scala @@ -16,6 +16,7 @@ package org.apache.pekko.remote.testconductor import java.net.{ ConnectException, InetSocketAddress } import java.util.concurrent.TimeoutException +import scala.annotation.nowarn import scala.collection.immutable import scala.concurrent.{ Await, ExecutionContext, Future } import scala.concurrent.duration._ @@ -23,16 +24,8 @@ import scala.reflect.classTag import scala.util.control.NoStackTrace import scala.util.control.NonFatal -import org.jboss.netty.channel.{ - Channel, - ChannelHandlerContext, - ChannelStateEvent, - ExceptionEvent, - MessageEvent, - SimpleChannelUpstreamHandler, - WriteCompletionEvent -} - +import io.netty.channel.{ Channel, ChannelHandlerContext, ChannelInboundHandlerAdapter } +import io.netty.channel.ChannelHandler.Sharable import org.apache.pekko import pekko.actor._ import pekko.dispatch.{ RequiresMessageQueue, UnboundedMessageQueueSemantics } @@ -202,7 +195,7 @@ private[pekko] class ClientFSM(name: RoleName, controllerAddr: InetSocketAddress case Event(_: ClientOp, _) => stay().replying(Status.Failure(new IllegalStateException("not connected yet"))) case Event(Connected(channel), _) => - channel.write(Hello(name.name, TestConductor().address)) + channel.writeAndFlush(Hello(name.name, TestConductor().address)) goto(AwaitDone).using(Data(Some(channel), None)) case Event(e: ConnectionFailure, _) => log.error(e, "ConnectionFailure") @@ -229,12 +222,12 @@ private[pekko] class ClientFSM(name: RoleName, controllerAddr: InetSocketAddress when(Connected) { case Event(Disconnected, _) => log.info("disconnected from TestConductor") - throw new ConnectionFailure("disconnect") + throw ConnectionFailure("disconnect") case Event(ToServer(_: Done), Data(Some(channel), _)) => - channel.write(Done) + channel.writeAndFlush(Done) stay() case Event(ToServer(msg), d @ Data(Some(channel), None)) => - channel.write(msg) + channel.writeAndFlush(msg) val token = msg match { case EnterBarrier(barrier, _) => Some(barrier -> sender()) case GetAddress(node) => Some(node.name -> sender()) @@ -326,11 +319,19 @@ private[pekko] class ClientFSM(name: RoleName, controllerAddr: InetSocketAddress initialize() } +/** + * INTERNAL API. + */ +private[pekko] object PlayerHandler { + case class Reconnected(connection: RemoteConnection) +} + /** * This handler only forwards messages received from the conductor to the [[pekko.remote.testconductor.ClientFSM]]. * * INTERNAL API. */ +@Sharable private[pekko] class PlayerHandler( server: InetSocketAddress, private var reconnects: Int, @@ -339,57 +340,55 @@ private[pekko] class PlayerHandler( fsm: ActorRef, log: LoggingAdapter, scheduler: Scheduler)(implicit executor: ExecutionContext) - extends SimpleChannelUpstreamHandler { + extends ChannelInboundHandlerAdapter { import ClientFSM._ - reconnect() + var connection: RemoteConnection = reconnect() var nextAttempt: Deadline = _ - override def channelOpen(ctx: ChannelHandlerContext, event: ChannelStateEvent) = - log.debug("channel {} open", event.getChannel) - override def channelClosed(ctx: ChannelHandlerContext, event: ChannelStateEvent) = - log.debug("channel {} closed", event.getChannel) - override def channelBound(ctx: ChannelHandlerContext, event: ChannelStateEvent) = - log.debug("channel {} bound", event.getChannel) - override def channelUnbound(ctx: ChannelHandlerContext, event: ChannelStateEvent) = - log.debug("channel {} unbound", event.getChannel) - override def writeComplete(ctx: ChannelHandlerContext, event: WriteCompletionEvent) = - log.debug("channel {} written {}", event.getChannel, event.getWrittenAmount) - - override def exceptionCaught(ctx: ChannelHandlerContext, event: ExceptionEvent) = { - log.debug("channel {} exception {}", event.getChannel, event.getCause) - event.getCause match { + @nowarn("msg=deprecated") + override def exceptionCaught(ctx: ChannelHandlerContext, cause: Throwable): Unit = { + log.error("channel {} exception {}", ctx.channel(), cause) + cause match { case _: ConnectException if reconnects > 0 => reconnects -= 1 - scheduler.scheduleOnce(nextAttempt.timeLeft)(reconnect()) + scheduler.scheduleOnce(nextAttempt.timeLeft) { + val remoteConnection = reconnect() + remoteConnection.channel.pipeline().fireUserEventTriggered(PlayerHandler.Reconnected(remoteConnection)) + } case e => fsm ! ConnectionFailure(e.getMessage) } } - private def reconnect(): Unit = { + override def userEventTriggered(ctx: ChannelHandlerContext, evt: Any): Unit = evt match { + case PlayerHandler.Reconnected(c) => connection = c + case _ => super.userEventTriggered(ctx, evt) + } + + private def reconnect(): RemoteConnection = { nextAttempt = Deadline.now + backoff RemoteConnection(Client, server, poolSize, this) } - override def channelConnected(ctx: ChannelHandlerContext, event: ChannelStateEvent) = { - val ch = event.getChannel + override def channelActive(ctx: ChannelHandlerContext): Unit = { + val ch = ctx.channel() log.debug("connected to {}", getAddrString(ch)) fsm ! Connected(ch) } - override def channelDisconnected(ctx: ChannelHandlerContext, event: ChannelStateEvent) = { - val channel = event.getChannel + override def channelInactive(ctx: ChannelHandlerContext): Unit = { + val channel = ctx.channel() log.debug("disconnected from {}", getAddrString(channel)) fsm ! PoisonPill - executor.execute(new Runnable { def run = RemoteConnection.shutdown(channel) }) // Must be shutdown outside of the Netty IO pool + executor.execute(() => connection.shutdown()) // Must be shutdown outside of the Netty IO pool } - override def messageReceived(ctx: ChannelHandlerContext, event: MessageEvent) = { - val channel = event.getChannel - log.debug("message from {}: {}", getAddrString(channel), event.getMessage) - event.getMessage match { + override def channelRead(ctx: ChannelHandlerContext, msg: AnyRef): Unit = { + val channel = ctx.channel() + log.debug("message from {}: {}", getAddrString(channel), msg) + msg match { case msg: NetworkOp => fsm ! msg case msg => diff --git a/multi-node-testkit/src/main/scala/org/apache/pekko/remote/testconductor/RemoteConnection.scala b/multi-node-testkit/src/main/scala/org/apache/pekko/remote/testconductor/RemoteConnection.scala index dd6f883308c..a97f9fcd2e1 100644 --- a/multi-node-testkit/src/main/scala/org/apache/pekko/remote/testconductor/RemoteConnection.scala +++ b/multi-node-testkit/src/main/scala/org/apache/pekko/remote/testconductor/RemoteConnection.scala @@ -14,66 +14,69 @@ package org.apache.pekko.remote.testconductor import java.net.InetSocketAddress -import java.util.concurrent.Executors - -import scala.util.control.NonFatal - -import org.jboss.netty.bootstrap.{ ClientBootstrap, ServerBootstrap } -import org.jboss.netty.buffer.ChannelBuffer -import org.jboss.netty.channel.{ - Channel, - ChannelPipeline, - ChannelPipelineFactory, - ChannelUpstreamHandler, - DefaultChannelPipeline + +import scala.annotation.nowarn + +import io.netty.bootstrap.{ Bootstrap, ServerBootstrap } +import io.netty.buffer.{ ByteBuf, ByteBufUtil } +import io.netty.channel._ +import io.netty.channel.ChannelHandler.Sharable +import io.netty.channel.nio.NioEventLoopGroup +import io.netty.channel.socket.SocketChannel +import io.netty.channel.socket.nio.{ NioServerSocketChannel, NioSocketChannel } +import io.netty.handler.codec.{ + LengthFieldBasedFrameDecoder, + LengthFieldPrepender, + MessageToMessageDecoder, + MessageToMessageEncoder } -import org.jboss.netty.channel.ChannelHandlerContext -import org.jboss.netty.channel.socket.nio.{ NioClientSocketChannelFactory, NioServerSocketChannelFactory } -import org.jboss.netty.handler.codec.frame.{ LengthFieldBasedFrameDecoder, LengthFieldPrepender } -import org.jboss.netty.handler.codec.oneone.{ OneToOneDecoder, OneToOneEncoder } import org.apache.pekko import pekko.event.Logging -import pekko.protobufv3.internal.Message +import pekko.protobufv3.internal.{ Message, MessageLite, MessageLiteOrBuilder } import pekko.util.Helpers /** * INTERNAL API. */ -private[pekko] class ProtobufEncoder extends OneToOneEncoder { - override def encode(ctx: ChannelHandlerContext, ch: Channel, msg: AnyRef): AnyRef = +private[pekko] class ProtobufEncoder extends MessageToMessageEncoder[MessageLiteOrBuilder] { + + override def encode(ctx: ChannelHandlerContext, msg: MessageLiteOrBuilder, out: java.util.List[AnyRef]): Unit = { msg match { - case m: Message => - val bytes = m.toByteArray() - ctx.getChannel.getConfig.getBufferFactory.getBuffer(bytes, 0, bytes.length) - case other => other + case messageLite: MessageLite => + val bytes = messageLite.toByteArray + out.add(ctx.alloc().buffer(bytes.length).writeBytes(bytes)) + case messageBuilder: MessageLite.Builder => + val bytes = messageBuilder.build().toByteArray + out.add(ctx.alloc().buffer(bytes.length).writeBytes(bytes)) + case _ => throw new IllegalArgumentException(s"Unsupported msg:$msg") } + } } /** * INTERNAL API. */ -private[pekko] class ProtobufDecoder(prototype: Message) extends OneToOneDecoder { - override def decode(ctx: ChannelHandlerContext, ch: Channel, obj: AnyRef): AnyRef = - obj match { - case buf: ChannelBuffer => - val len = buf.readableBytes() - val bytes = new Array[Byte](len) - buf.getBytes(buf.readerIndex, bytes, 0, len) - prototype.getParserForType.parseFrom(bytes) - case other => other - } +private[pekko] class ProtobufDecoder(prototype: Message) extends MessageToMessageDecoder[ByteBuf] { + + override def decode(ctx: ChannelHandlerContext, msg: ByteBuf, out: java.util.List[AnyRef]): Unit = { + val bytes = ByteBufUtil.getBytes(msg) + out.add(prototype.getParserForType.parseFrom(bytes)) + } } /** * INTERNAL API. */ -private[pekko] class TestConductorPipelineFactory(handler: ChannelUpstreamHandler) extends ChannelPipelineFactory { - def getPipeline: ChannelPipeline = { - val encap = List(new LengthFieldPrepender(4), new LengthFieldBasedFrameDecoder(10000, 0, 4, 0, 4)) +@Sharable +private[pekko] class TestConductorPipelineFactory( + handler: ChannelInboundHandler) extends ChannelInitializer[SocketChannel] { + + override def initChannel(ch: SocketChannel): Unit = { + val encap = List(new LengthFieldPrepender(4), new LengthFieldBasedFrameDecoder(10000, 0, 4, 0, 4, false)) val proto = List(new ProtobufEncoder, new ProtobufDecoder(TestConductorProtocol.Wrapper.getDefaultInstance)) val msg = List(new MsgEncoder, new MsgDecoder) - (encap ::: proto ::: msg ::: handler :: Nil).foldLeft(new DefaultChannelPipeline) { (pipe, handler) => + (encap ::: proto ::: msg ::: handler :: Nil).foldLeft(ch.pipeline()) { (pipe, handler) => pipe.addLast(Logging.simpleName(handler.getClass), handler); pipe } } @@ -94,44 +97,76 @@ private[pekko] case object Client extends Role */ private[pekko] case object Server extends Role +/** + * INTERNAL API. + */ +private[pekko] trait RemoteConnection { + def channel: Channel + def shutdown(): Unit +} + /** * INTERNAL API. */ private[pekko] object RemoteConnection { - def apply(role: Role, sockaddr: InetSocketAddress, poolSize: Int, handler: ChannelUpstreamHandler): Channel = { + def apply( + role: Role, + sockaddr: InetSocketAddress, + poolSize: Int, + handler: ChannelInboundHandler): RemoteConnection = { role match { case Client => - val socketfactory = - new NioClientSocketChannelFactory(Executors.newCachedThreadPool, Executors.newCachedThreadPool, poolSize) - val bootstrap = new ClientBootstrap(socketfactory) - bootstrap.setPipelineFactory(new TestConductorPipelineFactory(handler)) - bootstrap.setOption("tcpNoDelay", true) - bootstrap.connect(sockaddr).getChannel + val bootstrap = new Bootstrap() + val eventLoopGroup = new NioEventLoopGroup(poolSize) + val clientChannel = bootstrap + .group(eventLoopGroup) + .channel(classOf[NioSocketChannel]) + .handler(new TestConductorPipelineFactory(handler)) + .option[java.lang.Boolean](ChannelOption.TCP_NODELAY, true) + .option[java.lang.Boolean](ChannelOption.SO_KEEPALIVE, true) + .connect(sockaddr) + .channel() + new RemoteConnection { + override def channel: Channel = clientChannel + + @nowarn("msg=deprecated") + override def shutdown(): Unit = { + clientChannel.close().sync() + eventLoopGroup.shutdown() + } + } + case Server => - val socketfactory = - new NioServerSocketChannelFactory(Executors.newCachedThreadPool, Executors.newCachedThreadPool, poolSize) - val bootstrap = new ServerBootstrap(socketfactory) - bootstrap.setPipelineFactory(new TestConductorPipelineFactory(handler)) - bootstrap.setOption("reuseAddress", !Helpers.isWindows) - bootstrap.setOption("child.tcpNoDelay", true) - bootstrap.bind(sockaddr) + val bootstrap = new ServerBootstrap() + val parentEventLoopGroup = new NioEventLoopGroup(poolSize) + val childEventLoopGroup = new NioEventLoopGroup(poolSize) + val serverChannel = bootstrap + .group(parentEventLoopGroup, childEventLoopGroup) + .channel(classOf[NioServerSocketChannel]) + .childHandler(new TestConductorPipelineFactory(handler)) + .option[java.lang.Boolean](ChannelOption.SO_REUSEADDR, !Helpers.isWindows) + .option[java.lang.Integer](ChannelOption.SO_BACKLOG, 2048) + .childOption[java.lang.Boolean](ChannelOption.TCP_NODELAY, true) + .childOption[java.lang.Boolean](ChannelOption.SO_KEEPALIVE, true) + .bind(sockaddr) + .channel() + new RemoteConnection { + override def channel: Channel = serverChannel + + @nowarn("msg=deprecated") + override def shutdown(): Unit = { + serverChannel.close().sync() + parentEventLoopGroup.shutdown() + childEventLoopGroup.shutdown() + parentEventLoopGroup.terminationFuture().sync() + childEventLoopGroup.terminationFuture().sync() + } + } } } - def getAddrString(channel: Channel) = channel.getRemoteAddress match { + def getAddrString(channel: Channel): String = channel.remoteAddress() match { case i: InetSocketAddress => i.toString case _ => "[unknown]" } - - def shutdown(channel: Channel): Unit = { - try { - try channel.close() - finally - try channel.getFactory.shutdown() - finally channel.getFactory.releaseExternalResources() - } catch { - case NonFatal(_) => - // silence this one to not make tests look like they failed, it's not really critical - } - } } diff --git a/multi-node-testkit/src/main/scala/org/apache/pekko/remote/testkit/MultiNodeSpec.scala b/multi-node-testkit/src/main/scala/org/apache/pekko/remote/testkit/MultiNodeSpec.scala index 6597ebf9840..95f6a3f1df9 100644 --- a/multi-node-testkit/src/main/scala/org/apache/pekko/remote/testkit/MultiNodeSpec.scala +++ b/multi-node-testkit/src/main/scala/org/apache/pekko/remote/testkit/MultiNodeSpec.scala @@ -14,14 +14,15 @@ package org.apache.pekko.remote.testkit import java.net.{ InetAddress, InetSocketAddress } + import scala.collection.immutable import scala.concurrent.{ Await, Awaitable } import scala.concurrent.duration._ import scala.util.control.NonFatal -import com.typesafe.config.{ Config, ConfigFactory, ConfigObject } +import com.typesafe.config.{ Config, ConfigFactory, ConfigObject } +import io.netty.channel.ChannelException import language.implicitConversions -import org.jboss.netty.channel.ChannelException import org.apache.pekko import pekko.actor._ import pekko.actor.RootActorPath diff --git a/project/Dependencies.scala b/project/Dependencies.scala index e389f55fbdf..9da66d667b1 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -29,6 +29,7 @@ object Dependencies { // https://github.com/real-logic/aeron/blob/1.x.y/build.gradle val agronaVersion = "1.15.1" val nettyVersion = "3.10.6.Final" + val netty4Version = "4.1.94.Final" val protobufJavaVersion = "3.16.3" val logbackVersion = "1.2.11" @@ -60,8 +61,11 @@ object Dependencies { val config = "com.typesafe" % "config" % "1.4.2" val netty = "io.netty" % "netty" % nettyVersion + val `netty-transport` = "io.netty" % "netty-transport" % netty4Version + val `netty-handler` = "io.netty" % "netty-handler" % netty4Version - val scalaReflect = ScalaVersionDependentModuleID.versioned("org.scala-lang" % "scala-reflect" % _) + val scalaReflect: ScalaVersionDependentModuleID = + ScalaVersionDependentModuleID.versioned("org.scala-lang" % "scala-reflect" % _) val slf4jApi = "org.slf4j" % "slf4j-api" % slf4jVersion @@ -286,9 +290,9 @@ object Dependencies { val remoteTests = l ++= Seq(TestDependencies.junit, TestDependencies.scalatest.value) ++ remoteDependencies - val multiNodeTestkit = l ++= Seq(netty) + val multiNodeTestkit = l ++= Seq(`netty-transport`, `netty-handler`) - val cluster = l ++= Seq(TestDependencies.junit, TestDependencies.scalatest.value) + val cluster = l ++= Seq(TestDependencies.junit, TestDependencies.scalatest.value, TestDependencies.logback) val clusterTools = l ++= Seq(TestDependencies.junit, TestDependencies.scalatest.value) diff --git a/project/MultiNode.scala b/project/MultiNode.scala index 6c0d71e1ae7..987823190ca 100644 --- a/project/MultiNode.scala +++ b/project/MultiNode.scala @@ -82,6 +82,7 @@ object MultiNode extends AutoPlugin { multiJvmCreateLogger / logLevel := Level.Debug, // to see ssh establishment MultiJvm / assembly / assemblyMergeStrategy := { case n if n.endsWith("logback-test.xml") => MergeStrategy.first + case n if n.endsWith("io.netty.versions.properties") => MergeStrategy.first case n if n.toLowerCase.matches("meta-inf.*\\.default") => MergeStrategy.first case n => (MultiJvm / assembly / assemblyMergeStrategy).value.apply(n) }, diff --git a/project/SbtMultiJvm.scala b/project/SbtMultiJvm.scala index d107c823119..0de2d2473f5 100644 --- a/project/SbtMultiJvm.scala +++ b/project/SbtMultiJvm.scala @@ -167,10 +167,11 @@ object MultiJvmPlugin extends AutoPlugin { // the first class wins just like a classpath // just concatenate conflicting text files assembly / assemblyMergeStrategy := { - case n if n.endsWith(".class") => MergeStrategy.first - case n if n.endsWith(".txt") => MergeStrategy.concat - case n if n.endsWith("NOTICE") => MergeStrategy.concat - case n => (assembly / assemblyMergeStrategy).value.apply(n) + case n if n.endsWith(".class") => MergeStrategy.first + case n if n.endsWith(".txt") => MergeStrategy.concat + case n if n.endsWith("NOTICE") => MergeStrategy.concat + case n if n.endsWith("LICENSE") => MergeStrategy.concat + case n => (assembly / assemblyMergeStrategy).value.apply(n) }, assembly / assemblyJarName := { name.value + "_" + scalaVersion.value + "-" + version.value + "-multi-jvm-assembly.jar" From 66238b88372d9076987d286be18ae64e47f5dba5 Mon Sep 17 00:00:00 2001 From: He-Pin Date: Wed, 2 Aug 2023 16:28:40 +0800 Subject: [PATCH 2/3] Add fast open and connect. Signed-off-by: He-Pin --- .../apache/pekko/remote/testconductor/RemoteConnection.scala | 2 ++ 1 file changed, 2 insertions(+) diff --git a/multi-node-testkit/src/main/scala/org/apache/pekko/remote/testconductor/RemoteConnection.scala b/multi-node-testkit/src/main/scala/org/apache/pekko/remote/testconductor/RemoteConnection.scala index a97f9fcd2e1..95fac2a918f 100644 --- a/multi-node-testkit/src/main/scala/org/apache/pekko/remote/testconductor/RemoteConnection.scala +++ b/multi-node-testkit/src/main/scala/org/apache/pekko/remote/testconductor/RemoteConnection.scala @@ -124,6 +124,7 @@ private[pekko] object RemoteConnection { .handler(new TestConductorPipelineFactory(handler)) .option[java.lang.Boolean](ChannelOption.TCP_NODELAY, true) .option[java.lang.Boolean](ChannelOption.SO_KEEPALIVE, true) + .option[java.lang.Boolean](ChannelOption.TCP_FASTOPEN_CONNECT, true) .connect(sockaddr) .channel() new RemoteConnection { @@ -146,6 +147,7 @@ private[pekko] object RemoteConnection { .childHandler(new TestConductorPipelineFactory(handler)) .option[java.lang.Boolean](ChannelOption.SO_REUSEADDR, !Helpers.isWindows) .option[java.lang.Integer](ChannelOption.SO_BACKLOG, 2048) + .option[java.lang.Integer](ChannelOption.TCP_FASTOPEN, 2048) .childOption[java.lang.Boolean](ChannelOption.TCP_NODELAY, true) .childOption[java.lang.Boolean](ChannelOption.SO_KEEPALIVE, true) .bind(sockaddr) From 99b4bef4a3e984417bf1b7591f5e78ef9eed3011 Mon Sep 17 00:00:00 2001 From: He-Pin Date: Wed, 2 Aug 2023 19:03:46 +0800 Subject: [PATCH 3/3] Try native transport Signed-off-by: He-Pin --- .../scala/org/apache/pekko/util/Helpers.scala | 10 ++++++- .../testconductor/RemoteConnection.scala | 26 +++++++++++++------ project/Dependencies.scala | 6 +++-- 3 files changed, 31 insertions(+), 11 deletions(-) diff --git a/actor/src/main/scala/org/apache/pekko/util/Helpers.scala b/actor/src/main/scala/org/apache/pekko/util/Helpers.scala index 2d9b0ef1785..2f05c5f3900 100644 --- a/actor/src/main/scala/org/apache/pekko/util/Helpers.scala +++ b/actor/src/main/scala/org/apache/pekko/util/Helpers.scala @@ -43,7 +43,15 @@ object Helpers { def toRootLowerCase(s: String): String = s.toLowerCase(Locale.ROOT) - val isWindows: Boolean = toRootLowerCase(System.getProperty("os.name", "")).indexOf("win") >= 0 + private val OS_NAME = System.getProperty("os.name", "").toLowerCase + + val isWindows: Boolean = toRootLowerCase(OS_NAME.toLowerCase).indexOf("win") >= 0 + + val isLinux:Boolean = toRootLowerCase(OS_NAME).indexOf("linux") >= 0 + + val isMacOSX:Boolean = toRootLowerCase(OS_NAME).indexOf("mac") >= 0 && OS_NAME.indexOf("x") > 0 + + val isMacOS:Boolean = toRootLowerCase(OS_NAME).indexOf("linux") >= 0 && OS_NAME.indexOf("x") < 0 def makePattern(s: String): Pattern = Pattern.compile("^\\Q" + s.replace("?", "\\E.\\Q").replace("*", "\\E.*\\Q") + "\\E$") diff --git a/multi-node-testkit/src/main/scala/org/apache/pekko/remote/testconductor/RemoteConnection.scala b/multi-node-testkit/src/main/scala/org/apache/pekko/remote/testconductor/RemoteConnection.scala index 95fac2a918f..f88362eab6e 100644 --- a/multi-node-testkit/src/main/scala/org/apache/pekko/remote/testconductor/RemoteConnection.scala +++ b/multi-node-testkit/src/main/scala/org/apache/pekko/remote/testconductor/RemoteConnection.scala @@ -14,13 +14,13 @@ package org.apache.pekko.remote.testconductor import java.net.InetSocketAddress - import scala.annotation.nowarn - import io.netty.bootstrap.{ Bootstrap, ServerBootstrap } import io.netty.buffer.{ ByteBuf, ByteBufUtil } import io.netty.channel._ import io.netty.channel.ChannelHandler.Sharable +import io.netty.channel.epoll.{ EpollEventLoopGroup, EpollServerSocketChannel, EpollSocketChannel } +import io.netty.channel.kqueue.{ KQueueEventLoopGroup, KQueueServerSocketChannel, KQueueSocketChannel } import io.netty.channel.nio.NioEventLoopGroup import io.netty.channel.socket.SocketChannel import io.netty.channel.socket.nio.{ NioServerSocketChannel, NioSocketChannel } @@ -30,7 +30,6 @@ import io.netty.handler.codec.{ MessageToMessageDecoder, MessageToMessageEncoder } - import org.apache.pekko import pekko.event.Logging import pekko.protobufv3.internal.{ Message, MessageLite, MessageLiteOrBuilder } @@ -117,10 +116,17 @@ private[pekko] object RemoteConnection { role match { case Client => val bootstrap = new Bootstrap() - val eventLoopGroup = new NioEventLoopGroup(poolSize) + val (eventLoopGroup, channelClazz) = + if (Helpers.isLinux) { + (new EpollEventLoopGroup(poolSize), classOf[EpollSocketChannel]) + } else if (Helpers.isMacOSX || Helpers.isMacOS) { + (new KQueueEventLoopGroup(poolSize), classOf[KQueueSocketChannel]) + } else + (new NioEventLoopGroup(poolSize), classOf[NioSocketChannel]) + val clientChannel = bootstrap .group(eventLoopGroup) - .channel(classOf[NioSocketChannel]) + .channel(channelClazz) .handler(new TestConductorPipelineFactory(handler)) .option[java.lang.Boolean](ChannelOption.TCP_NODELAY, true) .option[java.lang.Boolean](ChannelOption.SO_KEEPALIVE, true) @@ -139,11 +145,15 @@ private[pekko] object RemoteConnection { case Server => val bootstrap = new ServerBootstrap() - val parentEventLoopGroup = new NioEventLoopGroup(poolSize) - val childEventLoopGroup = new NioEventLoopGroup(poolSize) + val (parentEventLoopGroup, childEventLoopGroup, channelClazz) = + if (Helpers.isLinux) { + (new EpollEventLoopGroup(poolSize), new EpollEventLoopGroup(poolSize), classOf[EpollServerSocketChannel]) + } else if (Helpers.isMacOS || Helpers.isMacOSX) { + (new KQueueEventLoopGroup(poolSize), new KQueueEventLoopGroup(poolSize), classOf[KQueueServerSocketChannel]) + } else (new NioEventLoopGroup(poolSize), new NioEventLoopGroup(poolSize), classOf[NioServerSocketChannel]) val serverChannel = bootstrap .group(parentEventLoopGroup, childEventLoopGroup) - .channel(classOf[NioServerSocketChannel]) + .channel(channelClazz) .childHandler(new TestConductorPipelineFactory(handler)) .option[java.lang.Boolean](ChannelOption.SO_REUSEADDR, !Helpers.isWindows) .option[java.lang.Integer](ChannelOption.SO_BACKLOG, 2048) diff --git a/project/Dependencies.scala b/project/Dependencies.scala index 9da66d667b1..f4b43f183b6 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -29,7 +29,7 @@ object Dependencies { // https://github.com/real-logic/aeron/blob/1.x.y/build.gradle val agronaVersion = "1.15.1" val nettyVersion = "3.10.6.Final" - val netty4Version = "4.1.94.Final" + val netty4Version = "4.1.96.Final" val protobufJavaVersion = "3.16.3" val logbackVersion = "1.2.11" @@ -62,6 +62,8 @@ object Dependencies { val config = "com.typesafe" % "config" % "1.4.2" val netty = "io.netty" % "netty" % nettyVersion val `netty-transport` = "io.netty" % "netty-transport" % netty4Version + val `netty-transport-epoll` = "io.netty" % "netty-transport-native-epoll" % netty4Version classifier "linux-x86_64" + val `netty-transport-kqueue` = "io.netty" % "netty-transport-native-kqueue" % netty4Version classifier "osx-x86_64" val `netty-handler` = "io.netty" % "netty-handler" % netty4Version val scalaReflect: ScalaVersionDependentModuleID = @@ -290,7 +292,7 @@ object Dependencies { val remoteTests = l ++= Seq(TestDependencies.junit, TestDependencies.scalatest.value) ++ remoteDependencies - val multiNodeTestkit = l ++= Seq(`netty-transport`, `netty-handler`) + val multiNodeTestkit = l ++= Seq(`netty-transport`, `netty-transport-epoll`, `netty-transport-kqueue`, `netty-handler`) val cluster = l ++= Seq(TestDependencies.junit, TestDependencies.scalatest.value, TestDependencies.logback)