Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: bump akka to 2.10.0-M1, align upstream changes #3270

Merged
merged 7 commits into from
Sep 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .scala-steward.conf
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ updates.ignore = [
]

updates.pin = [
{ groupId = "com.fasterxml.jackson.core", version = "2.15." }
{ groupId = "com.fasterxml.jackson.datatype", version = "2.15." }
{ groupId = "com.fasterxml.jackson.core", version = "2.17." }
{ groupId = "com.fasterxml.jackson.datatype", version = "2.17." }
// v10 switches to Play 3
{ groupId = "com.github.jwt-scala", version = "9.4." }
]
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import javax.net.ssl.{SSLContext, TrustManager}

import scala.annotation.tailrec
import scala.collection.immutable
import scala.collection.JavaConverters._
import scala.jdk.CollectionConverters._

/**
* Only for internal implementations
Expand Down Expand Up @@ -121,7 +121,7 @@ final class AmqpDetailsConnectionProvider private (
copy(connectionName = Option(name))

override def get: Connection = {
import scala.collection.JavaConverters._
import scala.jdk.CollectionConverters._
val factory = new ConnectionFactory
credentials.foreach { credentials =>
factory.setUsername(credentials.username)
Expand Down Expand Up @@ -331,7 +331,6 @@ final class AmqpConnectionFactoryConnectionProvider private (val factory: Connec
copy(hostAndPorts = hostAndPorts.asScala.map(_.toScala).toIndexedSeq)

override def get: Connection = {
import scala.collection.JavaConverters._
factory.newConnection(hostAndPortList.map(hp => new Address(hp._1, hp._2)).asJava)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,11 @@
package akka.stream.alpakka.amqp

import akka.annotation.InternalApi
import akka.util.JavaDurationConverters._

import scala.collection.JavaConverters._
import scala.collection.immutable
import scala.concurrent.duration._
import scala.jdk.CollectionConverters._
import scala.jdk.DurationConverters._

/**
* Internal API
Expand Down Expand Up @@ -221,8 +221,9 @@ final class AmqpWriteSettings private (
/**
* Java API
*/
def withConfirmationTimeout(confirmationTimeout: java.time.Duration): AmqpWriteSettings =
copy(confirmationTimeout = confirmationTimeout.asScala)
def withConfirmationTimeout(confirmationTimeout: java.time.Duration): AmqpWriteSettings = {
copy(confirmationTimeout = confirmationTimeout.toScala)
}

private def copy(connectionProvider: AmqpConnectionProvider = connectionProvider,
exchange: Option[String] = exchange,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ private trait AmqpConnectorLogic { this: GraphStageLogic =>
connection.addShutdownListener(shutdownListener)
channel.addShutdownListener(shutdownListener)

import scala.collection.JavaConverters._
import scala.jdk.CollectionConverters._

settings.declarations.foreach {
case d: QueueDeclaration =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ private[amqp] final class AmqpSourceStage(settings: AmqpSourceSettings, bufferSi
private var unackedMessages = 0

override def whenConnected(): Unit = {
import scala.collection.JavaConverters._
import scala.jdk.CollectionConverters._
channel.basicQos(bufferSize)
val consumerCallback = getAsyncCallback(handleDelivery)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import akka.japi.Pair
import akka.stream.alpakka.amqp._
import akka.stream.scaladsl.Keep

import scala.compat.java8.FutureConverters._
import scala.jdk.FutureConverters._

object AmqpFlow {

Expand All @@ -29,7 +29,7 @@ object AmqpFlow {
def create(
settings: AmqpWriteSettings
): akka.stream.javadsl.Flow[WriteMessage, WriteResult, CompletionStage[Done]] =
akka.stream.alpakka.amqp.scaladsl.AmqpFlow(settings).mapMaterializedValue(f => f.toJava).asJava
akka.stream.alpakka.amqp.scaladsl.AmqpFlow(settings).mapMaterializedValue(f => f.asJava).asJava

/**
* Creates an `AmqpFlow` that accepts `WriteMessage` elements and emits `WriteResult`.
Expand All @@ -54,7 +54,7 @@ object AmqpFlow {
): akka.stream.javadsl.Flow[WriteMessage, WriteResult, CompletionStage[Done]] =
akka.stream.alpakka.amqp.scaladsl.AmqpFlow
.withConfirm(settings = settings)
.mapMaterializedValue(_.toJava)
.mapMaterializedValue(_.asJava)
.asJava

/**
Expand All @@ -80,7 +80,7 @@ object AmqpFlow {
): akka.stream.javadsl.Flow[WriteMessage, WriteResult, CompletionStage[Done]] =
akka.stream.alpakka.amqp.scaladsl.AmqpFlow
.withConfirmUnordered(settings)
.mapMaterializedValue(_.toJava)
.mapMaterializedValue(_.asJava)
.asJava

/**
Expand All @@ -103,6 +103,6 @@ object AmqpFlow {
.withConfirmAndPassThroughUnordered[T](settings = settings)
)(Keep.right)
.map { case (writeResult, passThrough) => Pair(writeResult, passThrough) }
.mapMaterializedValue(_.toJava)
.mapMaterializedValue(_.asJava)
.asJava
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import java.util.concurrent.CompletionStage
import akka.Done
import akka.stream.alpakka.amqp._

import scala.compat.java8.FutureConverters._
import scala.jdk.FutureConverters._

object AmqpFlowWithContext {

Expand All @@ -23,7 +23,7 @@ object AmqpFlowWithContext {
): akka.stream.javadsl.FlowWithContext[WriteMessage, T, WriteResult, T, CompletionStage[Done]] =
akka.stream.alpakka.amqp.scaladsl.AmqpFlowWithContext
.apply(settings)
.mapMaterializedValue(_.toJava)
.mapMaterializedValue(_.asJava)
.asJava

/**
Expand All @@ -40,6 +40,6 @@ object AmqpFlowWithContext {
): akka.stream.javadsl.FlowWithContext[WriteMessage, T, WriteResult, T, CompletionStage[Done]] =
akka.stream.alpakka.amqp.scaladsl.AmqpFlowWithContext
.withConfirm(settings)
.mapMaterializedValue(_.toJava)
.mapMaterializedValue(_.asJava)
.asJava
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import akka.stream.alpakka.amqp._
import akka.stream.javadsl.Flow
import akka.util.ByteString

import scala.compat.java8.FutureConverters._
import scala.jdk.FutureConverters._

object AmqpRpcFlow {

Expand All @@ -27,7 +27,7 @@ object AmqpRpcFlow {
repliesPerMessage: Int): Flow[ByteString, ByteString, CompletionStage[String]] =
akka.stream.alpakka.amqp.scaladsl.AmqpRpcFlow
.simple(settings, repliesPerMessage)
.mapMaterializedValue(f => f.toJava)
.mapMaterializedValue(f => f.asJava)
.asJava

/**
Expand All @@ -39,7 +39,7 @@ object AmqpRpcFlow {
bufferSize: Int): Flow[WriteMessage, ReadResult, CompletionStage[String]] =
akka.stream.alpakka.amqp.scaladsl.AmqpRpcFlow
.atMostOnceFlow(settings, bufferSize)
.mapMaterializedValue(f => f.toJava)
.mapMaterializedValue(f => f.asJava)
.asJava

/**
Expand All @@ -52,7 +52,7 @@ object AmqpRpcFlow {
repliesPerMessage: Int): Flow[WriteMessage, ReadResult, CompletionStage[String]] =
akka.stream.alpakka.amqp.scaladsl.AmqpRpcFlow
.atMostOnceFlow(settings, bufferSize, repliesPerMessage)
.mapMaterializedValue(f => f.toJava)
.mapMaterializedValue(f => f.asJava)
.asJava

/**
Expand All @@ -73,7 +73,7 @@ object AmqpRpcFlow {
): Flow[WriteMessage, CommittableReadResult, CompletionStage[String]] =
akka.stream.alpakka.amqp.scaladsl.AmqpRpcFlow
.committableFlow(settings, bufferSize, repliesPerMessage)
.mapMaterializedValue(f => f.toJava)
.mapMaterializedValue(f => f.asJava)
.map(cm => new CommittableReadResult(cm))
.asJava

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import akka.Done
import akka.stream.alpakka.amqp._
import akka.util.ByteString

import scala.compat.java8.FutureConverters._
import scala.jdk.FutureConverters._

object AmqpSink {

Expand All @@ -21,7 +21,7 @@ object AmqpSink {
* either normally or because of an amqp failure.
*/
def create(settings: AmqpWriteSettings): akka.stream.javadsl.Sink[WriteMessage, CompletionStage[Done]] =
akka.stream.alpakka.amqp.scaladsl.AmqpSink(settings).mapMaterializedValue(f => f.toJava).asJava
akka.stream.alpakka.amqp.scaladsl.AmqpSink(settings).mapMaterializedValue(f => f.asJava).asJava

/**
* Creates an `AmqpSink` that accepts `ByteString` elements.
Expand All @@ -30,7 +30,7 @@ object AmqpSink {
* either normally or because of an amqp failure.
*/
def createSimple(settings: AmqpWriteSettings): akka.stream.javadsl.Sink[ByteString, CompletionStage[Done]] =
akka.stream.alpakka.amqp.scaladsl.AmqpSink.simple(settings).mapMaterializedValue(f => f.toJava).asJava
akka.stream.alpakka.amqp.scaladsl.AmqpSink.simple(settings).mapMaterializedValue(f => f.asJava).asJava

/**
* Connects to an AMQP server upon materialization and sends incoming messages to the server.
Expand All @@ -43,6 +43,6 @@ object AmqpSink {
def createReplyTo(
settings: AmqpReplyToSinkSettings
): akka.stream.javadsl.Sink[WriteMessage, CompletionStage[Done]] =
akka.stream.alpakka.amqp.scaladsl.AmqpSink.replyTo(settings).mapMaterializedValue(f => f.toJava).asJava
akka.stream.alpakka.amqp.scaladsl.AmqpSink.replyTo(settings).mapMaterializedValue(f => f.asJava).asJava

}
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,15 @@ import akka.Done
import akka.stream.alpakka.amqp.ReadResult
import akka.stream.alpakka.amqp.scaladsl

import scala.compat.java8.FutureConverters._
import scala.jdk.FutureConverters._

final class CommittableReadResult(cm: scaladsl.CommittableReadResult) {
val message: ReadResult = cm.message

def ack(): CompletionStage[Done] = ack(false)
def ack(multiple: Boolean): CompletionStage[Done] = cm.ack(multiple).toJava
def ack(multiple: Boolean): CompletionStage[Done] = cm.ack(multiple).asJava

def nack(): CompletionStage[Done] = nack(false, true)
def nack(multiple: Boolean, requeue: Boolean): CompletionStage[Done] =
cm.nack(multiple, requeue).toJava
cm.nack(multiple, requeue).asJava
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,11 @@

package akka.stream.alpakka.amqp.scaladsl

import akka.dispatch.ExecutionContexts
import akka.stream.alpakka.amqp._
import akka.stream.scaladsl.{Flow, Keep}
import akka.util.ByteString

import scala.concurrent.ExecutionContext
import scala.concurrent.Future

object AmqpRpcFlow {
Expand Down Expand Up @@ -39,7 +39,7 @@ object AmqpRpcFlow {
repliesPerMessage: Int = 1): Flow[WriteMessage, ReadResult, Future[String]] =
committableFlow(settings, bufferSize, repliesPerMessage)
.mapAsync(1) { cm =>
cm.ack().map(_ => cm.message)(ExecutionContexts.parasitic)
cm.ack().map(_ => cm.message)(ExecutionContext.parasitic)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,14 @@
package akka.stream.alpakka.amqp.scaladsl

import akka.NotUsed
import akka.dispatch.ExecutionContexts
import akka.stream.alpakka.amqp.impl
import akka.stream.alpakka.amqp.{AmqpSourceSettings, ReadResult}
import akka.stream.scaladsl.Source

import scala.concurrent.ExecutionContext

object AmqpSource {
private implicit val executionContext: ExecutionContext = ExecutionContexts.parasitic
private implicit val executionContext: ExecutionContext = ExecutionContext.parasitic

/**
* Scala API: Convenience for "at-most once delivery" semantics. Each message is acked to RabbitMQ
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@
import akka.util.ByteString;
import com.rabbitmq.client.AuthenticationFailureException;
import org.junit.*;
import scala.collection.JavaConverters;
import scala.concurrent.duration.Duration;
import scala.jdk.javaapi.CollectionConverters;

import java.net.ConnectException;
import java.util.Arrays;
Expand Down Expand Up @@ -161,10 +161,7 @@ public void publishAndConsumeRpcWithoutAutoAck() throws Exception {
.to(amqpSink)
.run(system);

List<ReadResult> probeResult =
JavaConverters.seqAsJavaListConverter(
result.second().toStrict(Duration.create(3, TimeUnit.SECONDS)))
.asJava();
java.util.Collection<ReadResult> probeResult = CollectionConverters.asJavaCollection(result.second().toStrict(Duration.create(3, TimeUnit.SECONDS)));
assertEquals(
probeResult.stream().map(s -> s.bytes().utf8String()).collect(Collectors.toList()), input);
sourceToSink.shutdown();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@
import akka.stream.testkit.TestSubscriber;
import akka.stream.testkit.javadsl.TestSink;
import akka.util.ByteString;
import scala.collection.JavaConverters;

import scala.jdk.javaapi.CollectionConverters;

/** Needs a local running AMQP server on the default port with no password. */
public class AmqpFlowTest {
Expand Down Expand Up @@ -86,7 +87,7 @@ private void shouldEmitConfirmationForPublishedMessages(

result
.request(input.size())
.expectNextN(JavaConverters.asScalaBufferConverter(expectedOutput).asScala().toList());
.expectNextN(CollectionConverters.asScala(expectedOutput).toList());
}

@Test
Expand Down Expand Up @@ -120,7 +121,7 @@ private void shouldPropagateContext(

result
.request(input.size())
.expectNextN(JavaConverters.asScalaBufferConverter(expectedOutput).asScala().toList());
.expectNextN(CollectionConverters.asScala(expectedOutput).toList());
}

@Test
Expand All @@ -143,6 +144,6 @@ public void shouldPropagatePassThrough() {

result
.request(input.size())
.expectNextN(JavaConverters.asScalaBufferConverter(expectedOutput).asScala().toList());
.expectNextN(CollectionConverters.asScala(expectedOutput).toList());
}
}
3 changes: 1 addition & 2 deletions amqp/src/test/scala/akka/stream/alpakka/amqp/AmqpSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
package akka.stream.alpakka.amqp

import akka.actor.ActorSystem
import akka.dispatch.ExecutionContexts
import akka.stream.alpakka.testkit.scaladsl.LogCapturing
import org.scalatest.BeforeAndAfterAll
import org.scalatest.concurrent.ScalaFutures
Expand All @@ -17,7 +16,7 @@ import scala.concurrent.ExecutionContext
abstract class AmqpSpec extends AnyWordSpec with Matchers with BeforeAndAfterAll with ScalaFutures with LogCapturing {

implicit val system: ActorSystem = ActorSystem(this.getClass.getSimpleName)
implicit val executionContext: ExecutionContext = ExecutionContexts.parasitic
implicit val executionContext: ExecutionContext = ExecutionContext.parasitic

override protected def afterAll(): Unit =
system.terminate()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ package akka.stream.alpakka.amqp.scaladsl
import java.util.concurrent.ExecutorService
import java.util.concurrent.atomic.AtomicInteger
import akka.actor.ActorSystem
import akka.dispatch.ExecutionContexts
import akka.stream.alpakka.amqp.{
AmqpCachedConnectionProvider,
AmqpConnectionFactoryConnectionProvider,
Expand Down Expand Up @@ -40,7 +39,7 @@ class AmqpGraphStageLogicConnectionShutdownSpec
with LogCapturing {

override implicit val patienceConfig: PatienceConfig = PatienceConfig(10.seconds)
private implicit val executionContext: ExecutionContext = ExecutionContexts.parasitic
private implicit val executionContext: ExecutionContext = ExecutionContext.parasitic

val shutdownsAdded = new AtomicInteger()
val shutdownsRemoved = new AtomicInteger()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import software.amazon.awssdk.services.eventbridge.EventBridgeAsyncClient
import software.amazon.awssdk.services.eventbridge.model._

import scala.concurrent.Future
import scala.compat.java8.FutureConverters._
import scala.jdk.FutureConverters._

/**
* Scala API
Expand Down Expand Up @@ -55,7 +55,7 @@ object EventBridgePublisher {
settings: EventBridgePublishSettings
)(implicit eventBridgeClient: EventBridgeAsyncClient): Flow[PutEventsRequest, PutEventsResponse, NotUsed] =
Flow[PutEventsRequest]
.mapAsync(settings.concurrency)(eventBridgeClient.putEvents(_).toScala)
.mapAsync(settings.concurrency)(eventBridgeClient.putEvents(_).asScala)

/**
* Creates a [[akka.stream.scaladsl.Flow Flow]] to publish messages to an EventBridge.
Expand Down
Loading
Loading