From 75dd11ea2b4c4f0510ea1d0f6bdc35df2e1b600a Mon Sep 17 00:00:00 2001 From: sebastian-alfers Date: Tue, 12 Sep 2023 00:36:37 +0200 Subject: [PATCH 1/3] feat: Cross build for Scala 3 for the aws modules --- .../docs/scaladsl/EventBridgePublisherSpec.scala | 2 +- .../test/scala/docs/scaladsl/AwsLambdaFlowSpec.scala | 7 +++---- build.sbt | 12 ++++++------ .../akka/stream/alpakka/kinesis/ShardIterator.scala | 3 +++ .../alpakka/kinesis/KinesisSchedulerSourceSpec.scala | 10 +++++----- .../kinesisfirehose/KinesisFirehoseFlowSpec.scala | 4 ++-- project/Dependencies.scala | 2 +- .../test/scala/docs/scaladsl/SnsPublisherSpec.scala | 2 +- sqs/src/test/scala/docs/scaladsl/SqsSourceSpec.scala | 2 +- 9 files changed, 23 insertions(+), 21 deletions(-) diff --git a/aws-event-bridge/src/test/scala/docs/scaladsl/EventBridgePublisherSpec.scala b/aws-event-bridge/src/test/scala/docs/scaladsl/EventBridgePublisherSpec.scala index d03f8075d3..61688bf155 100644 --- a/aws-event-bridge/src/test/scala/docs/scaladsl/EventBridgePublisherSpec.scala +++ b/aws-event-bridge/src/test/scala/docs/scaladsl/EventBridgePublisherSpec.scala @@ -18,7 +18,7 @@ import scala.concurrent.duration._ class EventBridgePublisherSpec extends AnyFlatSpec with Matchers with ScalaFutures with IntegrationTestContext { - implicit val defaultPatience = + implicit val defaultPatience: PatienceConfig = PatienceConfig(timeout = 15.seconds, interval = 100.millis) "EventBridge Publisher sink" should "send PutEventsEntry message" in { diff --git a/awslambda/src/test/scala/docs/scaladsl/AwsLambdaFlowSpec.scala b/awslambda/src/test/scala/docs/scaladsl/AwsLambdaFlowSpec.scala index 5f9bac1443..6f648ad4b0 100644 --- a/awslambda/src/test/scala/docs/scaladsl/AwsLambdaFlowSpec.scala +++ b/awslambda/src/test/scala/docs/scaladsl/AwsLambdaFlowSpec.scala @@ -5,7 +5,6 @@ package docs.scaladsl import java.util.concurrent.CompletableFuture - import akka.actor.ActorSystem import akka.stream.alpakka.awslambda.scaladsl.AwsLambdaFlow import akka.stream.alpakka.testkit.scaladsl.LogCapturing @@ -25,7 +24,7 @@ import software.amazon.awssdk.core.SdkBytes import software.amazon.awssdk.services.lambda.LambdaAsyncClient import software.amazon.awssdk.services.lambda.model.{InvokeRequest, InvokeResponse} -import scala.concurrent.Await +import scala.concurrent.{Await, ExecutionContext, ExecutionContextExecutor} import scala.concurrent.duration._ class AwsLambdaFlowSpec @@ -37,9 +36,9 @@ class AwsLambdaFlowSpec with Matchers with LogCapturing { - implicit val ec = system.dispatcher + implicit val ec: ExecutionContext = system.dispatcher - implicit val awsLambdaClient = mock(classOf[LambdaAsyncClient]) + implicit val awsLambdaClient: LambdaAsyncClient = mock(classOf[LambdaAsyncClient]) override protected def afterEach(): Unit = { reset(awsLambdaClient) diff --git a/build.sbt b/build.sbt index 0168a5ca74..efee688f38 100644 --- a/build.sbt +++ b/build.sbt @@ -127,7 +127,7 @@ lazy val avroparquetTests = .dependsOn(avroparquet) .disablePlugins(MimaPlugin) -lazy val awslambda = alpakkaProject("awslambda", "aws.lambda", Dependencies.AwsLambda) +lazy val awslambda = alpakkaProject("awslambda", "aws.lambda", Dependencies.AwsLambda).settings(Scala3.settings) lazy val azureStorageQueue = alpakkaProject( "azure-storage-queue", @@ -148,7 +148,7 @@ lazy val csvBench = internalProject("csv-bench") .dependsOn(csv) .enablePlugins(JmhPlugin) -lazy val dynamodb = alpakkaProject("dynamodb", "aws.dynamodb", Dependencies.DynamoDB) +lazy val dynamodb = alpakkaProject("dynamodb", "aws.dynamodb", Dependencies.DynamoDB).settings(Scala3.settings) lazy val elasticsearch = alpakkaProject( "elasticsearch", @@ -273,7 +273,7 @@ lazy val jms = alpakkaProject("jms", "jms", Dependencies.Jms, Scala3.settings) lazy val jsonStreaming = alpakkaProject("json-streaming", "json.streaming", Dependencies.JsonStreaming) -lazy val kinesis = alpakkaProject("kinesis", "aws.kinesis", Dependencies.Kinesis) +lazy val kinesis = alpakkaProject("kinesis", "aws.kinesis", Dependencies.Kinesis).settings(Scala3.settings) lazy val kudu = alpakkaProject("kudu", "kudu", Dependencies.Kudu) @@ -320,13 +320,13 @@ lazy val simpleCodecs = alpakkaProject("simple-codecs", "simplecodecs") lazy val slick = alpakkaProject("slick", "slick", Dependencies.Slick) lazy val eventbridge = - alpakkaProject("aws-event-bridge", "aws.eventbridge", Dependencies.Eventbridge) + alpakkaProject("aws-event-bridge", "aws.eventbridge", Dependencies.Eventbridge).settings(Scala3.settings) -lazy val sns = alpakkaProject("sns", "aws.sns", Dependencies.Sns) +lazy val sns = alpakkaProject("sns", "aws.sns", Dependencies.Sns).settings(Scala3.settings) lazy val solr = alpakkaProject("solr", "solr", Dependencies.Solr) -lazy val sqs = alpakkaProject("sqs", "aws.sqs", Dependencies.Sqs) +lazy val sqs = alpakkaProject("sqs", "aws.sqs", Dependencies.Sqs).settings(Scala3.settings) lazy val sse = alpakkaProject("sse", "sse", Dependencies.Sse) diff --git a/kinesis/src/main/scala/akka/stream/alpakka/kinesis/ShardIterator.scala b/kinesis/src/main/scala/akka/stream/alpakka/kinesis/ShardIterator.scala index ad72395bb0..9ac372379d 100644 --- a/kinesis/src/main/scala/akka/stream/alpakka/kinesis/ShardIterator.scala +++ b/kinesis/src/main/scala/akka/stream/alpakka/kinesis/ShardIterator.scala @@ -31,6 +31,9 @@ object ShardIterator { override final val shardIteratorType: ShardIteratorType = ShardIteratorType.TRIM_HORIZON } + object AtTimestamp { + def apply(value: Instant) = new AtTimestamp(value) + } case class AtTimestamp private (value: Instant) extends ShardIterator { override final val timestamp: Option[Instant] = Some(value) diff --git a/kinesis/src/test/scala/akka/stream/alpakka/kinesis/KinesisSchedulerSourceSpec.scala b/kinesis/src/test/scala/akka/stream/alpakka/kinesis/KinesisSchedulerSourceSpec.scala index 4fe0726e31..856ab4fb22 100644 --- a/kinesis/src/test/scala/akka/stream/alpakka/kinesis/KinesisSchedulerSourceSpec.scala +++ b/kinesis/src/test/scala/akka/stream/alpakka/kinesis/KinesisSchedulerSourceSpec.scala @@ -267,7 +267,7 @@ class KinesisSchedulerSourceSpec var recordProcessor: ShardRecordProcessor = _ var otherRecordProcessor: ShardRecordProcessor = _ - private val schedulerBuilder = { x: ShardRecordProcessorFactory => + private val schedulerBuilder = { (x: ShardRecordProcessorFactory) => recordProcessor = x.shardRecordProcessor() otherRecordProcessor = x.shardRecordProcessor() semaphore.release() @@ -348,7 +348,7 @@ class KinesisSchedulerSourceSpec ) { override def shutdownReason: Option[ShutdownReason] = None - override def forceCheckpoint(): Unit = checkpointer(record) + override def forceCheckpoint(): Unit = checkpointer(this.record) } ) latestRecord = record @@ -390,7 +390,7 @@ class KinesisSchedulerSourceSpec ) { override def shutdownReason: Option[ShutdownReason] = None - override def forceCheckpoint(): Unit = checkpointerShard1(record) + override def forceCheckpoint(): Unit = checkpointerShard1(this.record) } ) latestRecordShard1 = record @@ -410,7 +410,7 @@ class KinesisSchedulerSourceSpec ) { override def shutdownReason: Option[ShutdownReason] = None - override def forceCheckpoint(): Unit = checkpointerShard2(record) + override def forceCheckpoint(): Unit = checkpointerShard2(this.record) } ) latestRecordShard2 = record @@ -444,7 +444,7 @@ class KinesisSchedulerSourceSpec ) ) { override def shutdownReason: Option[ShutdownReason] = None - override def forceCheckpoint(): Unit = checkpointer(record) + override def forceCheckpoint(): Unit = checkpointer(this.record) } sourceProbe.sendNext(committableRecord) diff --git a/kinesis/src/test/scala/akka/stream/alpakka/kinesisfirehose/KinesisFirehoseFlowSpec.scala b/kinesis/src/test/scala/akka/stream/alpakka/kinesisfirehose/KinesisFirehoseFlowSpec.scala index efc88a8ece..b7e3865ef6 100644 --- a/kinesis/src/test/scala/akka/stream/alpakka/kinesisfirehose/KinesisFirehoseFlowSpec.scala +++ b/kinesis/src/test/scala/akka/stream/alpakka/kinesisfirehose/KinesisFirehoseFlowSpec.scala @@ -77,7 +77,7 @@ class KinesisFirehoseFlowSpec extends AnyWordSpec with Matchers with KinesisFire .run() } - trait WithPutRecordsSuccess { self: KinesisFirehoseFlowProbe => + trait WithPutRecordsSuccess { self: KinesisFirehoseFlowProbe with Settings => when(amazonKinesisFirehoseAsync.putRecordBatch(any[PutRecordBatchRequest])).thenAnswer(new Answer[AnyRef] { override def answer(invocation: InvocationOnMock) = { val request = invocation @@ -92,7 +92,7 @@ class KinesisFirehoseFlowSpec extends AnyWordSpec with Matchers with KinesisFire }) } - trait WithPutRecordsFailure { self: KinesisFirehoseFlowProbe => + trait WithPutRecordsFailure { self: KinesisFirehoseFlowProbe with Settings => when(amazonKinesisFirehoseAsync.putRecordBatch(any[PutRecordBatchRequest])).thenAnswer(new Answer[AnyRef] { override def answer(invocation: InvocationOnMock) = { val future = new CompletableFuture() diff --git a/project/Dependencies.scala b/project/Dependencies.scala index ee6a2852b6..c2f5e5e19d 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -17,7 +17,7 @@ object Dependencies { val InfluxDBJavaVersion = "2.15" val AwsSdk2Version = "2.17.113" - val AwsSpiAkkaHttpVersion = "0.0.11" + val AwsSpiAkkaHttpVersion = "1.0.1" // Sync with plugins.sbt val AkkaGrpcBinaryVersion = "2.3" // sync ignore prefix in scripts/link-validator.conf#L30 diff --git a/sns/src/test/scala/docs/scaladsl/SnsPublisherSpec.scala b/sns/src/test/scala/docs/scaladsl/SnsPublisherSpec.scala index 6a1ec2a197..a92879659e 100644 --- a/sns/src/test/scala/docs/scaladsl/SnsPublisherSpec.scala +++ b/sns/src/test/scala/docs/scaladsl/SnsPublisherSpec.scala @@ -24,7 +24,7 @@ class SnsPublisherSpec with IntegrationTestContext with LogCapturing { - implicit val defaultPatience = + implicit val defaultPatience: PatienceConfig = PatienceConfig(timeout = 15.seconds, interval = 100.millis) "SNS Publisher sink" should "send string message" in { diff --git a/sqs/src/test/scala/docs/scaladsl/SqsSourceSpec.scala b/sqs/src/test/scala/docs/scaladsl/SqsSourceSpec.scala index ae340b84cc..33ef9a82d7 100644 --- a/sqs/src/test/scala/docs/scaladsl/SqsSourceSpec.scala +++ b/sqs/src/test/scala/docs/scaladsl/SqsSourceSpec.scala @@ -247,7 +247,7 @@ class SqsSourceSpec extends AnyFlatSpec with ScalaFutures with Matchers with Def val customClient: SdkAsyncHttpClient = AkkaHttpClient.builder().withActorSystem(system).build() //#init-custom-client - implicit val customSqsClient = SqsAsyncClient + implicit val customSqsClient: SqsAsyncClient = SqsAsyncClient .builder() .credentialsProvider(StaticCredentialsProvider.create(AwsBasicCredentials.create("x", "x"))) //#init-custom-client From 36f8697e497ff100cf41ead319e0798c6524691e Mon Sep 17 00:00:00 2001 From: sebastian-alfers Date: Tue, 12 Sep 2023 06:19:34 +0200 Subject: [PATCH 2/3] remove unused import --- awslambda/src/test/scala/docs/scaladsl/AwsLambdaFlowSpec.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/awslambda/src/test/scala/docs/scaladsl/AwsLambdaFlowSpec.scala b/awslambda/src/test/scala/docs/scaladsl/AwsLambdaFlowSpec.scala index 6f648ad4b0..76e3ba38d0 100644 --- a/awslambda/src/test/scala/docs/scaladsl/AwsLambdaFlowSpec.scala +++ b/awslambda/src/test/scala/docs/scaladsl/AwsLambdaFlowSpec.scala @@ -24,7 +24,7 @@ import software.amazon.awssdk.core.SdkBytes import software.amazon.awssdk.services.lambda.LambdaAsyncClient import software.amazon.awssdk.services.lambda.model.{InvokeRequest, InvokeResponse} -import scala.concurrent.{Await, ExecutionContext, ExecutionContextExecutor} +import scala.concurrent.{Await, ExecutionContext} import scala.concurrent.duration._ class AwsLambdaFlowSpec From 601be2e361152e6c11795ebe79ac20909905a175 Mon Sep 17 00:00:00 2001 From: Sebastian Alfers Date: Tue, 12 Sep 2023 08:04:21 +0200 Subject: [PATCH 3/3] fix mima for kinesis --- kinesis/src/main/mima-filters/6.0.3.backwards.excludes | 2 ++ 1 file changed, 2 insertions(+) create mode 100644 kinesis/src/main/mima-filters/6.0.3.backwards.excludes diff --git a/kinesis/src/main/mima-filters/6.0.3.backwards.excludes b/kinesis/src/main/mima-filters/6.0.3.backwards.excludes new file mode 100644 index 0000000000..fd27683d41 --- /dev/null +++ b/kinesis/src/main/mima-filters/6.0.3.backwards.excludes @@ -0,0 +1,2 @@ +# 6.0.3 scala3 comp - https://github.com/akka/alpakka/pull/3011 +ProblemFilters.exclude[MissingTypesProblem]("akka.stream.alpakka.kinesis.ShardIterator$AtTimestamp$") \ No newline at end of file