Skip to content

Commit

Permalink
chore: bump akka to 2.10.0-M1, align upstream changes (#3270)
Browse files Browse the repository at this point in the history
  • Loading branch information
sebastian-alfers authored Sep 27, 2024
1 parent 71421f0 commit c2df5d3
Show file tree
Hide file tree
Showing 228 changed files with 1,028 additions and 1,035 deletions.
4 changes: 2 additions & 2 deletions .scala-steward.conf
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ updates.ignore = [
]

updates.pin = [
{ groupId = "com.fasterxml.jackson.core", version = "2.15." }
{ groupId = "com.fasterxml.jackson.datatype", version = "2.15." }
{ groupId = "com.fasterxml.jackson.core", version = "2.17." }
{ groupId = "com.fasterxml.jackson.datatype", version = "2.17." }
// v10 switches to Play 3
{ groupId = "com.github.jwt-scala", version = "9.4." }
]
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import javax.net.ssl.{SSLContext, TrustManager}

import scala.annotation.tailrec
import scala.collection.immutable
import scala.collection.JavaConverters._
import scala.jdk.CollectionConverters._

/**
* Only for internal implementations
Expand Down Expand Up @@ -121,7 +121,7 @@ final class AmqpDetailsConnectionProvider private (
copy(connectionName = Option(name))

override def get: Connection = {
import scala.collection.JavaConverters._
import scala.jdk.CollectionConverters._
val factory = new ConnectionFactory
credentials.foreach { credentials =>
factory.setUsername(credentials.username)
Expand Down Expand Up @@ -331,7 +331,6 @@ final class AmqpConnectionFactoryConnectionProvider private (val factory: Connec
copy(hostAndPorts = hostAndPorts.asScala.map(_.toScala).toIndexedSeq)

override def get: Connection = {
import scala.collection.JavaConverters._
factory.newConnection(hostAndPortList.map(hp => new Address(hp._1, hp._2)).asJava)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,11 @@
package akka.stream.alpakka.amqp

import akka.annotation.InternalApi
import akka.util.JavaDurationConverters._

import scala.collection.JavaConverters._
import scala.collection.immutable
import scala.concurrent.duration._
import scala.jdk.CollectionConverters._
import scala.jdk.DurationConverters._

/**
* Internal API
Expand Down Expand Up @@ -221,8 +221,9 @@ final class AmqpWriteSettings private (
/**
* Java API
*/
def withConfirmationTimeout(confirmationTimeout: java.time.Duration): AmqpWriteSettings =
copy(confirmationTimeout = confirmationTimeout.asScala)
def withConfirmationTimeout(confirmationTimeout: java.time.Duration): AmqpWriteSettings = {
copy(confirmationTimeout = confirmationTimeout.toScala)
}

private def copy(connectionProvider: AmqpConnectionProvider = connectionProvider,
exchange: Option[String] = exchange,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ private trait AmqpConnectorLogic { this: GraphStageLogic =>
connection.addShutdownListener(shutdownListener)
channel.addShutdownListener(shutdownListener)

import scala.collection.JavaConverters._
import scala.jdk.CollectionConverters._

settings.declarations.foreach {
case d: QueueDeclaration =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ private[amqp] final class AmqpSourceStage(settings: AmqpSourceSettings, bufferSi
private var unackedMessages = 0

override def whenConnected(): Unit = {
import scala.collection.JavaConverters._
import scala.jdk.CollectionConverters._
channel.basicQos(bufferSize)
val consumerCallback = getAsyncCallback(handleDelivery)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import akka.japi.Pair
import akka.stream.alpakka.amqp._
import akka.stream.scaladsl.Keep

import scala.compat.java8.FutureConverters._
import scala.jdk.FutureConverters._

object AmqpFlow {

Expand All @@ -29,7 +29,7 @@ object AmqpFlow {
def create(
settings: AmqpWriteSettings
): akka.stream.javadsl.Flow[WriteMessage, WriteResult, CompletionStage[Done]] =
akka.stream.alpakka.amqp.scaladsl.AmqpFlow(settings).mapMaterializedValue(f => f.toJava).asJava
akka.stream.alpakka.amqp.scaladsl.AmqpFlow(settings).mapMaterializedValue(f => f.asJava).asJava

/**
* Creates an `AmqpFlow` that accepts `WriteMessage` elements and emits `WriteResult`.
Expand All @@ -54,7 +54,7 @@ object AmqpFlow {
): akka.stream.javadsl.Flow[WriteMessage, WriteResult, CompletionStage[Done]] =
akka.stream.alpakka.amqp.scaladsl.AmqpFlow
.withConfirm(settings = settings)
.mapMaterializedValue(_.toJava)
.mapMaterializedValue(_.asJava)
.asJava

/**
Expand All @@ -80,7 +80,7 @@ object AmqpFlow {
): akka.stream.javadsl.Flow[WriteMessage, WriteResult, CompletionStage[Done]] =
akka.stream.alpakka.amqp.scaladsl.AmqpFlow
.withConfirmUnordered(settings)
.mapMaterializedValue(_.toJava)
.mapMaterializedValue(_.asJava)
.asJava

/**
Expand All @@ -103,6 +103,6 @@ object AmqpFlow {
.withConfirmAndPassThroughUnordered[T](settings = settings)
)(Keep.right)
.map { case (writeResult, passThrough) => Pair(writeResult, passThrough) }
.mapMaterializedValue(_.toJava)
.mapMaterializedValue(_.asJava)
.asJava
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import java.util.concurrent.CompletionStage
import akka.Done
import akka.stream.alpakka.amqp._

import scala.compat.java8.FutureConverters._
import scala.jdk.FutureConverters._

object AmqpFlowWithContext {

Expand All @@ -23,7 +23,7 @@ object AmqpFlowWithContext {
): akka.stream.javadsl.FlowWithContext[WriteMessage, T, WriteResult, T, CompletionStage[Done]] =
akka.stream.alpakka.amqp.scaladsl.AmqpFlowWithContext
.apply(settings)
.mapMaterializedValue(_.toJava)
.mapMaterializedValue(_.asJava)
.asJava

/**
Expand All @@ -40,6 +40,6 @@ object AmqpFlowWithContext {
): akka.stream.javadsl.FlowWithContext[WriteMessage, T, WriteResult, T, CompletionStage[Done]] =
akka.stream.alpakka.amqp.scaladsl.AmqpFlowWithContext
.withConfirm(settings)
.mapMaterializedValue(_.toJava)
.mapMaterializedValue(_.asJava)
.asJava
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import akka.stream.alpakka.amqp._
import akka.stream.javadsl.Flow
import akka.util.ByteString

import scala.compat.java8.FutureConverters._
import scala.jdk.FutureConverters._

object AmqpRpcFlow {

Expand All @@ -27,7 +27,7 @@ object AmqpRpcFlow {
repliesPerMessage: Int): Flow[ByteString, ByteString, CompletionStage[String]] =
akka.stream.alpakka.amqp.scaladsl.AmqpRpcFlow
.simple(settings, repliesPerMessage)
.mapMaterializedValue(f => f.toJava)
.mapMaterializedValue(f => f.asJava)
.asJava

/**
Expand All @@ -39,7 +39,7 @@ object AmqpRpcFlow {
bufferSize: Int): Flow[WriteMessage, ReadResult, CompletionStage[String]] =
akka.stream.alpakka.amqp.scaladsl.AmqpRpcFlow
.atMostOnceFlow(settings, bufferSize)
.mapMaterializedValue(f => f.toJava)
.mapMaterializedValue(f => f.asJava)
.asJava

/**
Expand All @@ -52,7 +52,7 @@ object AmqpRpcFlow {
repliesPerMessage: Int): Flow[WriteMessage, ReadResult, CompletionStage[String]] =
akka.stream.alpakka.amqp.scaladsl.AmqpRpcFlow
.atMostOnceFlow(settings, bufferSize, repliesPerMessage)
.mapMaterializedValue(f => f.toJava)
.mapMaterializedValue(f => f.asJava)
.asJava

/**
Expand All @@ -73,7 +73,7 @@ object AmqpRpcFlow {
): Flow[WriteMessage, CommittableReadResult, CompletionStage[String]] =
akka.stream.alpakka.amqp.scaladsl.AmqpRpcFlow
.committableFlow(settings, bufferSize, repliesPerMessage)
.mapMaterializedValue(f => f.toJava)
.mapMaterializedValue(f => f.asJava)
.map(cm => new CommittableReadResult(cm))
.asJava

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import akka.Done
import akka.stream.alpakka.amqp._
import akka.util.ByteString

import scala.compat.java8.FutureConverters._
import scala.jdk.FutureConverters._

object AmqpSink {

Expand All @@ -21,7 +21,7 @@ object AmqpSink {
* either normally or because of an amqp failure.
*/
def create(settings: AmqpWriteSettings): akka.stream.javadsl.Sink[WriteMessage, CompletionStage[Done]] =
akka.stream.alpakka.amqp.scaladsl.AmqpSink(settings).mapMaterializedValue(f => f.toJava).asJava
akka.stream.alpakka.amqp.scaladsl.AmqpSink(settings).mapMaterializedValue(f => f.asJava).asJava

/**
* Creates an `AmqpSink` that accepts `ByteString` elements.
Expand All @@ -30,7 +30,7 @@ object AmqpSink {
* either normally or because of an amqp failure.
*/
def createSimple(settings: AmqpWriteSettings): akka.stream.javadsl.Sink[ByteString, CompletionStage[Done]] =
akka.stream.alpakka.amqp.scaladsl.AmqpSink.simple(settings).mapMaterializedValue(f => f.toJava).asJava
akka.stream.alpakka.amqp.scaladsl.AmqpSink.simple(settings).mapMaterializedValue(f => f.asJava).asJava

/**
* Connects to an AMQP server upon materialization and sends incoming messages to the server.
Expand All @@ -43,6 +43,6 @@ object AmqpSink {
def createReplyTo(
settings: AmqpReplyToSinkSettings
): akka.stream.javadsl.Sink[WriteMessage, CompletionStage[Done]] =
akka.stream.alpakka.amqp.scaladsl.AmqpSink.replyTo(settings).mapMaterializedValue(f => f.toJava).asJava
akka.stream.alpakka.amqp.scaladsl.AmqpSink.replyTo(settings).mapMaterializedValue(f => f.asJava).asJava

}
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,15 @@ import akka.Done
import akka.stream.alpakka.amqp.ReadResult
import akka.stream.alpakka.amqp.scaladsl

import scala.compat.java8.FutureConverters._
import scala.jdk.FutureConverters._

final class CommittableReadResult(cm: scaladsl.CommittableReadResult) {
val message: ReadResult = cm.message

def ack(): CompletionStage[Done] = ack(false)
def ack(multiple: Boolean): CompletionStage[Done] = cm.ack(multiple).toJava
def ack(multiple: Boolean): CompletionStage[Done] = cm.ack(multiple).asJava

def nack(): CompletionStage[Done] = nack(false, true)
def nack(multiple: Boolean, requeue: Boolean): CompletionStage[Done] =
cm.nack(multiple, requeue).toJava
cm.nack(multiple, requeue).asJava
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,11 @@

package akka.stream.alpakka.amqp.scaladsl

import akka.dispatch.ExecutionContexts
import akka.stream.alpakka.amqp._
import akka.stream.scaladsl.{Flow, Keep}
import akka.util.ByteString

import scala.concurrent.ExecutionContext
import scala.concurrent.Future

object AmqpRpcFlow {
Expand Down Expand Up @@ -39,7 +39,7 @@ object AmqpRpcFlow {
repliesPerMessage: Int = 1): Flow[WriteMessage, ReadResult, Future[String]] =
committableFlow(settings, bufferSize, repliesPerMessage)
.mapAsync(1) { cm =>
cm.ack().map(_ => cm.message)(ExecutionContexts.parasitic)
cm.ack().map(_ => cm.message)(ExecutionContext.parasitic)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,14 @@
package akka.stream.alpakka.amqp.scaladsl

import akka.NotUsed
import akka.dispatch.ExecutionContexts
import akka.stream.alpakka.amqp.impl
import akka.stream.alpakka.amqp.{AmqpSourceSettings, ReadResult}
import akka.stream.scaladsl.Source

import scala.concurrent.ExecutionContext

object AmqpSource {
private implicit val executionContext: ExecutionContext = ExecutionContexts.parasitic
private implicit val executionContext: ExecutionContext = ExecutionContext.parasitic

/**
* Scala API: Convenience for "at-most once delivery" semantics. Each message is acked to RabbitMQ
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@
import akka.util.ByteString;
import com.rabbitmq.client.AuthenticationFailureException;
import org.junit.*;
import scala.collection.JavaConverters;
import scala.concurrent.duration.Duration;
import scala.jdk.javaapi.CollectionConverters;

import java.net.ConnectException;
import java.util.Arrays;
Expand Down Expand Up @@ -161,10 +161,7 @@ public void publishAndConsumeRpcWithoutAutoAck() throws Exception {
.to(amqpSink)
.run(system);

List<ReadResult> probeResult =
JavaConverters.seqAsJavaListConverter(
result.second().toStrict(Duration.create(3, TimeUnit.SECONDS)))
.asJava();
java.util.Collection<ReadResult> probeResult = CollectionConverters.asJavaCollection(result.second().toStrict(Duration.create(3, TimeUnit.SECONDS)));
assertEquals(
probeResult.stream().map(s -> s.bytes().utf8String()).collect(Collectors.toList()), input);
sourceToSink.shutdown();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@
import akka.stream.testkit.TestSubscriber;
import akka.stream.testkit.javadsl.TestSink;
import akka.util.ByteString;
import scala.collection.JavaConverters;

import scala.jdk.javaapi.CollectionConverters;

/** Needs a local running AMQP server on the default port with no password. */
public class AmqpFlowTest {
Expand Down Expand Up @@ -86,7 +87,7 @@ private void shouldEmitConfirmationForPublishedMessages(

result
.request(input.size())
.expectNextN(JavaConverters.asScalaBufferConverter(expectedOutput).asScala().toList());
.expectNextN(CollectionConverters.asScala(expectedOutput).toList());
}

@Test
Expand Down Expand Up @@ -120,7 +121,7 @@ private void shouldPropagateContext(

result
.request(input.size())
.expectNextN(JavaConverters.asScalaBufferConverter(expectedOutput).asScala().toList());
.expectNextN(CollectionConverters.asScala(expectedOutput).toList());
}

@Test
Expand All @@ -143,6 +144,6 @@ public void shouldPropagatePassThrough() {

result
.request(input.size())
.expectNextN(JavaConverters.asScalaBufferConverter(expectedOutput).asScala().toList());
.expectNextN(CollectionConverters.asScala(expectedOutput).toList());
}
}
3 changes: 1 addition & 2 deletions amqp/src/test/scala/akka/stream/alpakka/amqp/AmqpSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
package akka.stream.alpakka.amqp

import akka.actor.ActorSystem
import akka.dispatch.ExecutionContexts
import akka.stream.alpakka.testkit.scaladsl.LogCapturing
import org.scalatest.BeforeAndAfterAll
import org.scalatest.concurrent.ScalaFutures
Expand All @@ -17,7 +16,7 @@ import scala.concurrent.ExecutionContext
abstract class AmqpSpec extends AnyWordSpec with Matchers with BeforeAndAfterAll with ScalaFutures with LogCapturing {

implicit val system: ActorSystem = ActorSystem(this.getClass.getSimpleName)
implicit val executionContext: ExecutionContext = ExecutionContexts.parasitic
implicit val executionContext: ExecutionContext = ExecutionContext.parasitic

override protected def afterAll(): Unit =
system.terminate()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ package akka.stream.alpakka.amqp.scaladsl
import java.util.concurrent.ExecutorService
import java.util.concurrent.atomic.AtomicInteger
import akka.actor.ActorSystem
import akka.dispatch.ExecutionContexts
import akka.stream.alpakka.amqp.{
AmqpCachedConnectionProvider,
AmqpConnectionFactoryConnectionProvider,
Expand Down Expand Up @@ -40,7 +39,7 @@ class AmqpGraphStageLogicConnectionShutdownSpec
with LogCapturing {

override implicit val patienceConfig: PatienceConfig = PatienceConfig(10.seconds)
private implicit val executionContext: ExecutionContext = ExecutionContexts.parasitic
private implicit val executionContext: ExecutionContext = ExecutionContext.parasitic

val shutdownsAdded = new AtomicInteger()
val shutdownsRemoved = new AtomicInteger()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import software.amazon.awssdk.services.eventbridge.EventBridgeAsyncClient
import software.amazon.awssdk.services.eventbridge.model._

import scala.concurrent.Future
import scala.compat.java8.FutureConverters._
import scala.jdk.FutureConverters._

/**
* Scala API
Expand Down Expand Up @@ -55,7 +55,7 @@ object EventBridgePublisher {
settings: EventBridgePublishSettings
)(implicit eventBridgeClient: EventBridgeAsyncClient): Flow[PutEventsRequest, PutEventsResponse, NotUsed] =
Flow[PutEventsRequest]
.mapAsync(settings.concurrency)(eventBridgeClient.putEvents(_).toScala)
.mapAsync(settings.concurrency)(eventBridgeClient.putEvents(_).asScala)

/**
* Creates a [[akka.stream.scaladsl.Flow Flow]] to publish messages to an EventBridge.
Expand Down
Loading

0 comments on commit c2df5d3

Please sign in to comment.