diff --git a/amqp/src/main/scala/akka/stream/alpakka/amqp/AmqpSinkStage.scala b/amqp/src/main/scala/akka/stream/alpakka/amqp/AmqpSinkStage.scala index e04d24b17a..ad0a01607f 100644 --- a/amqp/src/main/scala/akka/stream/alpakka/amqp/AmqpSinkStage.scala +++ b/amqp/src/main/scala/akka/stream/alpakka/amqp/AmqpSinkStage.scala @@ -3,8 +3,8 @@ */ package akka.stream.alpakka.amqp -import akka.stream.stage.{ GraphStage, GraphStageLogic, InHandler } -import akka.stream.{ ActorAttributes, Attributes, Inlet, SinkShape } +import akka.stream.stage.{GraphStage, GraphStageLogic, InHandler} +import akka.stream.{ActorAttributes, Attributes, Inlet, SinkShape} import akka.util.ByteString import com.rabbitmq.client.AMQP.BasicProperties import com.rabbitmq.client._ diff --git a/amqp/src/main/scala/akka/stream/alpakka/amqp/javadsl/AmqpSink.scala b/amqp/src/main/scala/akka/stream/alpakka/amqp/javadsl/AmqpSink.scala index 0fa84ad69c..ddcb9d5bbb 100644 --- a/amqp/src/main/scala/akka/stream/alpakka/amqp/javadsl/AmqpSink.scala +++ b/amqp/src/main/scala/akka/stream/alpakka/amqp/javadsl/AmqpSink.scala @@ -4,7 +4,7 @@ package akka.stream.alpakka.amqp.javadsl import akka.NotUsed -import akka.stream.alpakka.amqp.{ AmqpSinkSettings, AmqpSinkStage, OutgoingMessage } +import akka.stream.alpakka.amqp.{AmqpSinkSettings, AmqpSinkStage, OutgoingMessage} import akka.stream.javadsl.Sink import akka.util.ByteString diff --git a/amqp/src/main/scala/akka/stream/alpakka/amqp/javadsl/AmqpSource.scala b/amqp/src/main/scala/akka/stream/alpakka/amqp/javadsl/AmqpSource.scala index 682f7b617a..1e089f07e7 100644 --- a/amqp/src/main/scala/akka/stream/alpakka/amqp/javadsl/AmqpSource.scala +++ b/amqp/src/main/scala/akka/stream/alpakka/amqp/javadsl/AmqpSource.scala @@ -4,7 +4,7 @@ package akka.stream.alpakka.amqp.javadsl import akka.NotUsed -import akka.stream.alpakka.amqp.{ AmqpSourceSettings, AmqpSourceStage, IncomingMessage } +import akka.stream.alpakka.amqp.{AmqpSourceSettings, AmqpSourceStage, IncomingMessage} import akka.stream.javadsl.Source object AmqpSource { diff --git a/amqp/src/main/scala/akka/stream/alpakka/amqp/scaladsl/AmqpSink.scala b/amqp/src/main/scala/akka/stream/alpakka/amqp/scaladsl/AmqpSink.scala index 8641588386..28f544b6e3 100644 --- a/amqp/src/main/scala/akka/stream/alpakka/amqp/scaladsl/AmqpSink.scala +++ b/amqp/src/main/scala/akka/stream/alpakka/amqp/scaladsl/AmqpSink.scala @@ -4,7 +4,7 @@ package akka.stream.alpakka.amqp.scaladsl import akka.NotUsed -import akka.stream.alpakka.amqp.{ AmqpSinkSettings, AmqpSinkStage, OutgoingMessage } +import akka.stream.alpakka.amqp.{AmqpSinkSettings, AmqpSinkStage, OutgoingMessage} import akka.stream.scaladsl.Sink import akka.util.ByteString diff --git a/amqp/src/main/scala/akka/stream/alpakka/amqp/scaladsl/AmqpSource.scala b/amqp/src/main/scala/akka/stream/alpakka/amqp/scaladsl/AmqpSource.scala index d4341c9002..638d1d78d6 100644 --- a/amqp/src/main/scala/akka/stream/alpakka/amqp/scaladsl/AmqpSource.scala +++ b/amqp/src/main/scala/akka/stream/alpakka/amqp/scaladsl/AmqpSource.scala @@ -4,7 +4,7 @@ package akka.stream.alpakka.amqp.scaladsl import akka.NotUsed -import akka.stream.alpakka.amqp.{ AmqpSourceSettings, AmqpSourceStage, IncomingMessage } +import akka.stream.alpakka.amqp.{AmqpSourceSettings, AmqpSourceStage, IncomingMessage} import akka.stream.scaladsl.Source diff --git a/amqp/src/test/scala/akka/stream/alpakka/amqp/AmqpSpec.scala b/amqp/src/test/scala/akka/stream/alpakka/amqp/AmqpSpec.scala index ee17f517f6..cccb0db612 100644 --- a/amqp/src/test/scala/akka/stream/alpakka/amqp/AmqpSpec.scala +++ b/amqp/src/test/scala/akka/stream/alpakka/amqp/AmqpSpec.scala @@ -6,7 +6,7 @@ package akka.stream.alpakka.amqp import akka.actor.ActorSystem import akka.stream.ActorMaterializer import org.scalatest.concurrent.ScalaFutures -import org.scalatest.{ BeforeAndAfterAll, Matchers, WordSpec } +import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpec} abstract class AmqpSpec extends WordSpec with Matchers with BeforeAndAfterAll with ScalaFutures { diff --git a/amqp/src/test/scala/akka/stream/alpakka/amqp/scaladsl/AmqpConnectorsSpec.scala b/amqp/src/test/scala/akka/stream/alpakka/amqp/scaladsl/AmqpConnectorsSpec.scala index 0826043c46..8464a31678 100644 --- a/amqp/src/test/scala/akka/stream/alpakka/amqp/scaladsl/AmqpConnectorsSpec.scala +++ b/amqp/src/test/scala/akka/stream/alpakka/amqp/scaladsl/AmqpConnectorsSpec.scala @@ -6,8 +6,8 @@ package akka.stream.alpakka.amqp.scaladsl import akka.Done import akka.stream._ import akka.stream.alpakka.amqp._ -import akka.stream.scaladsl.{ GraphDSL, Merge, Sink, Source } -import akka.stream.testkit.{ TestPublisher, TestSubscriber } +import akka.stream.scaladsl.{GraphDSL, Merge, Sink, Source} +import akka.stream.testkit.{TestPublisher, TestSubscriber} import akka.util.ByteString import scala.concurrent.Promise diff --git a/cassandra/src/main/scala/akka/stream/alpakka/cassandra/CassandraSourceStage.scala b/cassandra/src/main/scala/akka/stream/alpakka/cassandra/CassandraSourceStage.scala index 555a3ca7eb..ad66097094 100644 --- a/cassandra/src/main/scala/akka/stream/alpakka/cassandra/CassandraSourceStage.scala +++ b/cassandra/src/main/scala/akka/stream/alpakka/cassandra/CassandraSourceStage.scala @@ -4,11 +4,11 @@ package akka.stream.alpakka.cassandra import akka.stream._ -import akka.stream.stage.{ AsyncCallback, GraphStage, GraphStageLogic, OutHandler } -import com.datastax.driver.core.{ ResultSet, Row, Session, Statement } +import akka.stream.stage.{AsyncCallback, GraphStage, GraphStageLogic, OutHandler} +import com.datastax.driver.core.{ResultSet, Row, Session, Statement} import scala.concurrent.Future -import scala.util.{ Failure, Success, Try } +import scala.util.{Failure, Success, Try} import akka.stream.alpakka.cassandra.GuavaFutures._ diff --git a/cassandra/src/main/scala/akka/stream/alpakka/cassandra/GuavaFutures.scala b/cassandra/src/main/scala/akka/stream/alpakka/cassandra/GuavaFutures.scala index 1387c14e9b..54809259a8 100644 --- a/cassandra/src/main/scala/akka/stream/alpakka/cassandra/GuavaFutures.scala +++ b/cassandra/src/main/scala/akka/stream/alpakka/cassandra/GuavaFutures.scala @@ -3,9 +3,9 @@ */ package akka.stream.alpakka.cassandra -import com.google.common.util.concurrent.{ FutureCallback, Futures, ListenableFuture } +import com.google.common.util.concurrent.{FutureCallback, Futures, ListenableFuture} -import scala.concurrent.{ Future, Promise } +import scala.concurrent.{Future, Promise} private[cassandra] object GuavaFutures { implicit final class GuavaFutureOpts[A](val guavaFut: ListenableFuture[A]) extends AnyVal { diff --git a/cassandra/src/main/scala/akka/stream/alpakka/cassandra/javadsl/CassandraSink.scala b/cassandra/src/main/scala/akka/stream/alpakka/cassandra/javadsl/CassandraSink.scala index a7dff729a6..9e8fe30303 100644 --- a/cassandra/src/main/scala/akka/stream/alpakka/cassandra/javadsl/CassandraSink.scala +++ b/cassandra/src/main/scala/akka/stream/alpakka/cassandra/javadsl/CassandraSink.scala @@ -8,8 +8,8 @@ import java.util.function.BiFunction import akka.Done import akka.stream.javadsl.Sink -import com.datastax.driver.core.{ BoundStatement, PreparedStatement, Session } -import akka.stream.alpakka.cassandra.scaladsl.{ CassandraSink => ScalaCSink } +import com.datastax.driver.core.{BoundStatement, PreparedStatement, Session} +import akka.stream.alpakka.cassandra.scaladsl.{CassandraSink => ScalaCSink} import scala.compat.java8.FutureConverters._ import scala.concurrent.ExecutionContext diff --git a/cassandra/src/main/scala/akka/stream/alpakka/cassandra/javadsl/CassandraSource.scala b/cassandra/src/main/scala/akka/stream/alpakka/cassandra/javadsl/CassandraSource.scala index 17533479bf..ded199b081 100644 --- a/cassandra/src/main/scala/akka/stream/alpakka/cassandra/javadsl/CassandraSource.scala +++ b/cassandra/src/main/scala/akka/stream/alpakka/cassandra/javadsl/CassandraSource.scala @@ -8,7 +8,7 @@ import java.util.concurrent.CompletableFuture import akka.NotUsed import akka.stream.alpakka.cassandra.CassandraSourceStage import akka.stream.javadsl.Source -import com.datastax.driver.core.{ Row, Session, Statement } +import com.datastax.driver.core.{Row, Session, Statement} import scala.concurrent.Future diff --git a/cassandra/src/main/scala/akka/stream/alpakka/cassandra/scaladsl/CassandraSink.scala b/cassandra/src/main/scala/akka/stream/alpakka/cassandra/scaladsl/CassandraSink.scala index 3066112291..6b31ebbe07 100644 --- a/cassandra/src/main/scala/akka/stream/alpakka/cassandra/scaladsl/CassandraSink.scala +++ b/cassandra/src/main/scala/akka/stream/alpakka/cassandra/scaladsl/CassandraSink.scala @@ -4,10 +4,10 @@ package akka.stream.alpakka.cassandra.scaladsl import akka.Done -import akka.stream.scaladsl.{ Flow, Keep, Sink } -import com.datastax.driver.core.{ BoundStatement, PreparedStatement, Session } +import akka.stream.scaladsl.{Flow, Keep, Sink} +import com.datastax.driver.core.{BoundStatement, PreparedStatement, Session} -import scala.concurrent.{ ExecutionContext, Future } +import scala.concurrent.{ExecutionContext, Future} import akka.stream.alpakka.cassandra.GuavaFutures._ diff --git a/cassandra/src/main/scala/akka/stream/alpakka/cassandra/scaladsl/CassandraSource.scala b/cassandra/src/main/scala/akka/stream/alpakka/cassandra/scaladsl/CassandraSource.scala index 5595ac9f47..8d5a14d5fd 100644 --- a/cassandra/src/main/scala/akka/stream/alpakka/cassandra/scaladsl/CassandraSource.scala +++ b/cassandra/src/main/scala/akka/stream/alpakka/cassandra/scaladsl/CassandraSource.scala @@ -3,9 +3,9 @@ */ package akka.stream.alpakka.cassandra.scaladsl -import akka.{ Done, NotUsed } +import akka.{Done, NotUsed} import akka.stream.alpakka.cassandra.CassandraSourceStage -import akka.stream.scaladsl.{ Sink, Source } +import akka.stream.scaladsl.{Sink, Source} import com.datastax.driver.core._ import scala.concurrent.Future diff --git a/cassandra/src/test/scala/akka/stream/alpakka/cassandra/scaladsl/CassandraSourceSpec.scala b/cassandra/src/test/scala/akka/stream/alpakka/cassandra/scaladsl/CassandraSourceSpec.scala index d94dce0e08..b739d514b8 100644 --- a/cassandra/src/test/scala/akka/stream/alpakka/cassandra/scaladsl/CassandraSourceSpec.scala +++ b/cassandra/src/test/scala/akka/stream/alpakka/cassandra/scaladsl/CassandraSourceSpec.scala @@ -5,8 +5,8 @@ package akka.stream.alpakka.cassandra.scaladsl import akka.actor.ActorSystem import akka.stream.ActorMaterializer -import akka.stream.scaladsl.{ Sink, Source } -import com.datastax.driver.core.{ Cluster, PreparedStatement, SimpleStatement } +import akka.stream.scaladsl.{Sink, Source} +import com.datastax.driver.core.{Cluster, PreparedStatement, SimpleStatement} import org.scalatest._ import org.scalatest.concurrent.ScalaFutures diff --git a/file/src/main/scala/akka/stream/alpakka/file/scaladsl/FileTailSource.scala b/file/src/main/scala/akka/stream/alpakka/file/scaladsl/FileTailSource.scala index cf62aa6a2a..e116ee2990 100644 --- a/file/src/main/scala/akka/stream/alpakka/file/scaladsl/FileTailSource.scala +++ b/file/src/main/scala/akka/stream/alpakka/file/scaladsl/FileTailSource.scala @@ -3,7 +3,7 @@ */ package akka.stream.alpakka.file.scaladsl -import java.nio.charset.{ Charset, StandardCharsets } +import java.nio.charset.{Charset, StandardCharsets} import java.nio.file.Path import akka.NotUsed diff --git a/file/src/test/scala/akka/stream/alpakka/file/scaladsl/FileTailSourceSpec.scala b/file/src/test/scala/akka/stream/alpakka/file/scaladsl/FileTailSourceSpec.scala index 6b6e32bf55..0afd3f736b 100644 --- a/file/src/test/scala/akka/stream/alpakka/file/scaladsl/FileTailSourceSpec.scala +++ b/file/src/test/scala/akka/stream/alpakka/file/scaladsl/FileTailSourceSpec.scala @@ -9,7 +9,7 @@ import akka.NotUsed import akka.actor.ActorSystem import akka.stream.alpakka.file.scaladsl import akka.stream.scaladsl.Source -import akka.stream.{ ActorMaterializer, Materializer } +import akka.stream.{ActorMaterializer, Materializer} import scala.concurrent.duration._ diff --git a/ftp/src/main/scala/akka/stream/alpakka/ftp/impl/FtpBrowserGraphStage.scala b/ftp/src/main/scala/akka/stream/alpakka/ftp/impl/FtpBrowserGraphStage.scala index 78263462d3..5bdba442ab 100644 --- a/ftp/src/main/scala/akka/stream/alpakka/ftp/impl/FtpBrowserGraphStage.scala +++ b/ftp/src/main/scala/akka/stream/alpakka/ftp/impl/FtpBrowserGraphStage.scala @@ -4,8 +4,8 @@ package akka.stream.alpakka.ftp package impl -import akka.stream.stage.{ GraphStage, OutHandler } -import akka.stream.{ Attributes, Outlet, SourceShape } +import akka.stream.stage.{GraphStage, OutHandler} +import akka.stream.{Attributes, Outlet, SourceShape} import akka.stream.impl.Stages.DefaultAttributes.IODispatcher private[ftp] trait FtpBrowserGraphStage[FtpClient, S <: RemoteFileSettings] extends GraphStage[SourceShape[FtpFile]] { diff --git a/ftp/src/main/scala/akka/stream/alpakka/ftp/impl/FtpGraphStageLogic.scala b/ftp/src/main/scala/akka/stream/alpakka/ftp/impl/FtpGraphStageLogic.scala index 170747bfe7..c1b2b86794 100644 --- a/ftp/src/main/scala/akka/stream/alpakka/ftp/impl/FtpGraphStageLogic.scala +++ b/ftp/src/main/scala/akka/stream/alpakka/ftp/impl/FtpGraphStageLogic.scala @@ -5,7 +5,7 @@ package akka.stream.alpakka.ftp package impl import akka.stream.stage.GraphStageLogic -import akka.stream.{ Outlet, Shape } +import akka.stream.{Outlet, Shape} import scala.util.control.NonFatal private[ftp] abstract class FtpGraphStageLogic[T, FtpClient, S <: RemoteFileSettings]( diff --git a/ftp/src/main/scala/akka/stream/alpakka/ftp/impl/FtpIOGraphStage.scala b/ftp/src/main/scala/akka/stream/alpakka/ftp/impl/FtpIOGraphStage.scala index 26a49a60fa..ebebd4bc1e 100644 --- a/ftp/src/main/scala/akka/stream/alpakka/ftp/impl/FtpIOGraphStage.scala +++ b/ftp/src/main/scala/akka/stream/alpakka/ftp/impl/FtpIOGraphStage.scala @@ -4,12 +4,12 @@ package akka.stream.alpakka.ftp package impl -import akka.stream.stage.{ GraphStageWithMaterializedValue, OutHandler } -import akka.stream.{ Attributes, IOResult, Outlet, SourceShape } +import akka.stream.stage.{GraphStageWithMaterializedValue, OutHandler} +import akka.stream.{Attributes, IOResult, Outlet, SourceShape} import akka.stream.impl.Stages.DefaultAttributes.IODispatcher import akka.util.ByteString import akka.util.ByteString.ByteString1C -import scala.concurrent.{ Future, Promise } +import scala.concurrent.{Future, Promise} import scala.util.control.NonFatal import java.io.InputStream import java.nio.file.Path diff --git a/ftp/src/main/scala/akka/stream/alpakka/ftp/impl/FtpOperations.scala b/ftp/src/main/scala/akka/stream/alpakka/ftp/impl/FtpOperations.scala index db6f7aba64..f9f9dbf616 100644 --- a/ftp/src/main/scala/akka/stream/alpakka/ftp/impl/FtpOperations.scala +++ b/ftp/src/main/scala/akka/stream/alpakka/ftp/impl/FtpOperations.scala @@ -4,10 +4,10 @@ package akka.stream.alpakka.ftp package impl -import org.apache.commons.net.ftp.{ FTP, FTPClient } +import org.apache.commons.net.ftp.{FTP, FTPClient} import scala.collection.immutable import scala.util.Try -import java.io.{ IOException, InputStream } +import java.io.{IOException, InputStream} import java.nio.file.Paths private[ftp] trait FtpOperations { _: FtpLike[FTPClient, FtpFileSettings] => diff --git a/ftp/src/main/scala/akka/stream/alpakka/ftp/impl/FtpSourceFactory.scala b/ftp/src/main/scala/akka/stream/alpakka/ftp/impl/FtpSourceFactory.scala index 9c2773814d..30bf13e13f 100644 --- a/ftp/src/main/scala/akka/stream/alpakka/ftp/impl/FtpSourceFactory.scala +++ b/ftp/src/main/scala/akka/stream/alpakka/ftp/impl/FtpSourceFactory.scala @@ -3,8 +3,8 @@ */ package akka.stream.alpakka.ftp.impl -import akka.stream.alpakka.ftp.FtpCredentials.{ AnonFtpCredentials, NonAnonFtpCredentials } -import akka.stream.alpakka.ftp.{ FtpFileSettings, RemoteFileSettings } +import akka.stream.alpakka.ftp.FtpCredentials.{AnonFtpCredentials, NonAnonFtpCredentials} +import akka.stream.alpakka.ftp.{FtpFileSettings, RemoteFileSettings} import akka.stream.alpakka.ftp.RemoteFileSettings._ import com.jcraft.jsch.JSch import org.apache.commons.net.ftp.FTPClient diff --git a/ftp/src/main/scala/akka/stream/alpakka/ftp/impl/SftpOperations.scala b/ftp/src/main/scala/akka/stream/alpakka/ftp/impl/SftpOperations.scala index 57635e737a..de52014d63 100644 --- a/ftp/src/main/scala/akka/stream/alpakka/ftp/impl/SftpOperations.scala +++ b/ftp/src/main/scala/akka/stream/alpakka/ftp/impl/SftpOperations.scala @@ -5,7 +5,7 @@ package akka.stream.alpakka.ftp package impl import akka.stream.alpakka.ftp.RemoteFileSettings.SftpSettings -import com.jcraft.jsch.{ ChannelSftp, JSch } +import com.jcraft.jsch.{ChannelSftp, JSch} import scala.collection.immutable import scala.util.Try import java.io.InputStream diff --git a/ftp/src/main/scala/akka/stream/alpakka/ftp/javadsl/FtpApi.scala b/ftp/src/main/scala/akka/stream/alpakka/ftp/javadsl/FtpApi.scala index 6512ea6251..68e9f3434d 100644 --- a/ftp/src/main/scala/akka/stream/alpakka/ftp/javadsl/FtpApi.scala +++ b/ftp/src/main/scala/akka/stream/alpakka/ftp/javadsl/FtpApi.scala @@ -5,11 +5,11 @@ package akka.stream.alpakka.ftp.javadsl import akka.NotUsed import akka.stream.alpakka.ftp.impl._ -import akka.stream.alpakka.ftp.{ FtpFile, RemoteFileSettings } -import akka.stream.alpakka.ftp.impl.{ FtpLike, FtpSourceFactory } +import akka.stream.alpakka.ftp.{FtpFile, RemoteFileSettings} +import akka.stream.alpakka.ftp.impl.{FtpLike, FtpSourceFactory} import akka.stream.IOResult import akka.stream.javadsl.Source -import akka.stream.scaladsl.{ Source => ScalaSource } +import akka.stream.scaladsl.{Source => ScalaSource} import akka.util.ByteString import com.jcraft.jsch.JSch import org.apache.commons.net.ftp.FTPClient diff --git a/ftp/src/main/scala/akka/stream/alpakka/ftp/scaladsl/FtpApi.scala b/ftp/src/main/scala/akka/stream/alpakka/ftp/scaladsl/FtpApi.scala index ae1ee6736c..afb146a3bc 100644 --- a/ftp/src/main/scala/akka/stream/alpakka/ftp/scaladsl/FtpApi.scala +++ b/ftp/src/main/scala/akka/stream/alpakka/ftp/scaladsl/FtpApi.scala @@ -6,8 +6,8 @@ package akka.stream.alpakka.ftp.scaladsl import akka.NotUsed import akka.stream.alpakka.ftp.impl._ import akka.stream.IOResult -import akka.stream.alpakka.ftp.{ FtpFile, RemoteFileSettings } -import akka.stream.alpakka.ftp.impl.{ FtpLike, FtpSourceFactory } +import akka.stream.alpakka.ftp.{FtpFile, RemoteFileSettings} +import akka.stream.alpakka.ftp.impl.{FtpLike, FtpSourceFactory} import akka.stream.scaladsl.Source import akka.util.ByteString import com.jcraft.jsch.JSch diff --git a/ftp/src/test/scala/akka/stream/alpakka/ftp/BaseSpec.scala b/ftp/src/test/scala/akka/stream/alpakka/ftp/BaseSpec.scala index 9c5bdf916d..8b613be752 100644 --- a/ftp/src/test/scala/akka/stream/alpakka/ftp/BaseSpec.scala +++ b/ftp/src/test/scala/akka/stream/alpakka/ftp/BaseSpec.scala @@ -8,8 +8,8 @@ import akka.stream.IOResult import akka.stream.scaladsl.Source import akka.util.ByteString import org.scalatest.concurrent.ScalaFutures -import org.scalatest.{ BeforeAndAfter, BeforeAndAfterAll, Matchers, WordSpecLike } -import scala.concurrent.{ Await, Future } +import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll, Matchers, WordSpecLike} +import scala.concurrent.{Await, Future} import scala.concurrent.duration.DurationInt trait BaseSpec diff --git a/ftp/src/test/scala/akka/stream/alpakka/ftp/CommonFtpSourceSpec.scala b/ftp/src/test/scala/akka/stream/alpakka/ftp/CommonFtpSourceSpec.scala index b835dad695..7a95509638 100644 --- a/ftp/src/test/scala/akka/stream/alpakka/ftp/CommonFtpSourceSpec.scala +++ b/ftp/src/test/scala/akka/stream/alpakka/ftp/CommonFtpSourceSpec.scala @@ -4,7 +4,7 @@ package akka.stream.alpakka.ftp import akka.stream.IOResult -import akka.stream.scaladsl.{ Keep, Sink } +import akka.stream.scaladsl.{Keep, Sink} import akka.stream.testkit.scaladsl.TestSink final class FtpSourceSpec extends BaseFtpSpec with CommonFtpSourceSpec diff --git a/hbase/src/main/scala/akka/stream/alpakka/hbase/internal/HBaseCapabilities.scala b/hbase/src/main/scala/akka/stream/alpakka/hbase/internal/HBaseCapabilities.scala index 8a1d03658d..97a4510f53 100644 --- a/hbase/src/main/scala/akka/stream/alpakka/hbase/internal/HBaseCapabilities.scala +++ b/hbase/src/main/scala/akka/stream/alpakka/hbase/internal/HBaseCapabilities.scala @@ -6,13 +6,13 @@ package akka.stream.alpakka.hbase.internal import java.io.Closeable import akka.stream.stage.StageLogging -import org.apache.hadoop.hbase.{ HColumnDescriptor, HTableDescriptor, TableName } -import org.apache.hadoop.hbase.client.{ Connection, ConnectionFactory, Table } +import org.apache.hadoop.hbase.{HColumnDescriptor, HTableDescriptor, TableName} +import org.apache.hadoop.hbase.client.{Connection, ConnectionFactory, Table} import org.apache.hadoop.conf.Configuration import scala.concurrent.duration.DurationInt -import scala.concurrent.{ Await, Future } -import scala.util.{ Failure, Success, Try } +import scala.concurrent.{Await, Future} +import scala.util.{Failure, Success, Try} import scala.concurrent.ExecutionContext.Implicits.global diff --git a/hbase/src/main/scala/akka/stream/alpakka/hbase/javadsl/HTableStage.scala b/hbase/src/main/scala/akka/stream/alpakka/hbase/javadsl/HTableStage.scala index 2e1b4bbb65..95fae0e358 100644 --- a/hbase/src/main/scala/akka/stream/alpakka/hbase/javadsl/HTableStage.scala +++ b/hbase/src/main/scala/akka/stream/alpakka/hbase/javadsl/HTableStage.scala @@ -5,8 +5,8 @@ package akka.stream.alpakka.hbase.javadsl import akka.stream.alpakka.hbase.HTableSettings import akka.stream.alpakka.hbase.internal.HBaseFlowStage -import akka.stream.scaladsl.{ Flow, Keep, Sink } -import akka.{ Done, NotUsed } +import akka.stream.scaladsl.{Flow, Keep, Sink} +import akka.{Done, NotUsed} import org.apache.hadoop.conf.Configuration import org.apache.hadoop.hbase.TableName import org.apache.hadoop.hbase.client.Put diff --git a/hbase/src/main/scala/akka/stream/alpakka/hbase/scaladsl/HTableStage.scala b/hbase/src/main/scala/akka/stream/alpakka/hbase/scaladsl/HTableStage.scala index 0cc0166381..6210332a0f 100644 --- a/hbase/src/main/scala/akka/stream/alpakka/hbase/scaladsl/HTableStage.scala +++ b/hbase/src/main/scala/akka/stream/alpakka/hbase/scaladsl/HTableStage.scala @@ -5,8 +5,8 @@ package akka.stream.alpakka.hbase.scaladsl import akka.stream.alpakka.hbase.HTableSettings import akka.stream.alpakka.hbase.internal.HBaseFlowStage -import akka.stream.scaladsl.{ Flow, Keep, Sink } -import akka.{ Done, NotUsed } +import akka.stream.scaladsl.{Flow, Keep, Sink} +import akka.{Done, NotUsed} import scala.concurrent.Future diff --git a/hbase/src/test/scala/akka/stream/alpakka/hbase/scaladsl/HBaseStageSpec.scala b/hbase/src/test/scala/akka/stream/alpakka/hbase/scaladsl/HBaseStageSpec.scala index 379bfa1c2a..35c50e6885 100644 --- a/hbase/src/test/scala/akka/stream/alpakka/hbase/scaladsl/HBaseStageSpec.scala +++ b/hbase/src/test/scala/akka/stream/alpakka/hbase/scaladsl/HBaseStageSpec.scala @@ -6,18 +6,18 @@ package akka.stream.alpakka.hbase.scaladsl import akka.actor.ActorSystem import akka.stream.ActorMaterializer import akka.stream.alpakka.hbase.HTableSettings -import akka.stream.scaladsl.{ Sink, Source } +import akka.stream.scaladsl.{Sink, Source} import org.apache.hadoop.hbase.client.Put import org.apache.hadoop.hbase.util.Bytes -import org.apache.hadoop.hbase.{ HBaseConfiguration, TableName } -import org.scalatest.{ Matchers, WordSpec } +import org.apache.hadoop.hbase.{HBaseConfiguration, TableName} +import org.scalatest.{Matchers, WordSpec} import scala.collection.immutable import scala.concurrent.Await import scala.concurrent.ExecutionContext.Implicits.global import scala.concurrent.duration.Duration import scala.language.implicitConversions -import scala.util.{ Failure, Success } +import scala.util.{Failure, Success} class HBaseStageSpec extends WordSpec with Matchers { diff --git a/jms/src/main/scala/akka/stream/alpakka/jms/JmsConnector.scala b/jms/src/main/scala/akka/stream/alpakka/jms/JmsConnector.scala index dcc85ea836..5cfa4942e6 100644 --- a/jms/src/main/scala/akka/stream/alpakka/jms/JmsConnector.scala +++ b/jms/src/main/scala/akka/stream/alpakka/jms/JmsConnector.scala @@ -4,13 +4,13 @@ package akka.stream.alpakka.jms import javax.jms -import javax.jms.{ ExceptionListener, JMSException } +import javax.jms.{ExceptionListener, JMSException} import akka.stream.ActorAttributes.Dispatcher import akka.stream.ActorMaterializer import akka.stream.stage.GraphStageLogic -import scala.concurrent.{ ExecutionContext, Future } +import scala.concurrent.{ExecutionContext, Future} /** * Internal API diff --git a/jms/src/main/scala/akka/stream/alpakka/jms/JmsSinkStage.scala b/jms/src/main/scala/akka/stream/alpakka/jms/JmsSinkStage.scala index 8b68786d0c..135c480658 100644 --- a/jms/src/main/scala/akka/stream/alpakka/jms/JmsSinkStage.scala +++ b/jms/src/main/scala/akka/stream/alpakka/jms/JmsSinkStage.scala @@ -3,10 +3,10 @@ */ package akka.stream.alpakka.jms -import javax.jms.{ MessageProducer, TextMessage } +import javax.jms.{MessageProducer, TextMessage} -import akka.stream.stage.{ GraphStage, GraphStageLogic, InHandler } -import akka.stream.{ ActorAttributes, Attributes, Inlet, SinkShape } +import akka.stream.stage.{GraphStage, GraphStageLogic, InHandler} +import akka.stream.{ActorAttributes, Attributes, Inlet, SinkShape} final class JmsSinkStage(settings: JmsSettings) extends GraphStage[SinkShape[String]] { diff --git a/jms/src/main/scala/akka/stream/alpakka/jms/JmsSourceStage.scala b/jms/src/main/scala/akka/stream/alpakka/jms/JmsSourceStage.scala index b3b95a1e97..3c7cc0083a 100644 --- a/jms/src/main/scala/akka/stream/alpakka/jms/JmsSourceStage.scala +++ b/jms/src/main/scala/akka/stream/alpakka/jms/JmsSourceStage.scala @@ -6,11 +6,11 @@ package akka.stream.alpakka.jms import java.util.concurrent.Semaphore import javax.jms._ -import akka.stream.stage.{ GraphStage, GraphStageLogic, OutHandler, StageLogging } -import akka.stream.{ ActorAttributes, Attributes, Outlet, SourceShape } +import akka.stream.stage.{GraphStage, GraphStageLogic, OutHandler, StageLogging} +import akka.stream.{ActorAttributes, Attributes, Outlet, SourceShape} import scala.collection.mutable -import scala.util.{ Failure, Success } +import scala.util.{Failure, Success} final class JmsSourceStage(settings: JmsSourceSettings) extends GraphStage[SourceShape[Message]] { diff --git a/jms/src/main/scala/akka/stream/alpakka/jms/javadsl/JmsSink.scala b/jms/src/main/scala/akka/stream/alpakka/jms/javadsl/JmsSink.scala index 59a1ce71d2..6eb985d781 100644 --- a/jms/src/main/scala/akka/stream/alpakka/jms/javadsl/JmsSink.scala +++ b/jms/src/main/scala/akka/stream/alpakka/jms/javadsl/JmsSink.scala @@ -4,7 +4,7 @@ package akka.stream.alpakka.jms.javadsl import akka.NotUsed -import akka.stream.alpakka.jms.{ JmsSinkSettings, JmsSinkStage } +import akka.stream.alpakka.jms.{JmsSinkSettings, JmsSinkStage} object JmsSink { diff --git a/jms/src/main/scala/akka/stream/alpakka/jms/javadsl/JmsSource.scala b/jms/src/main/scala/akka/stream/alpakka/jms/javadsl/JmsSource.scala index 9ddd428330..ad5bc2c5fc 100644 --- a/jms/src/main/scala/akka/stream/alpakka/jms/javadsl/JmsSource.scala +++ b/jms/src/main/scala/akka/stream/alpakka/jms/javadsl/JmsSource.scala @@ -6,7 +6,7 @@ package akka.stream.alpakka.jms.javadsl import javax.jms.Message import akka.NotUsed -import akka.stream.alpakka.jms.{ JmsSourceSettings, JmsSourceStage } +import akka.stream.alpakka.jms.{JmsSourceSettings, JmsSourceStage} object JmsSource { diff --git a/jms/src/main/scala/akka/stream/alpakka/jms/scaladsl/JmsSink.scala b/jms/src/main/scala/akka/stream/alpakka/jms/scaladsl/JmsSink.scala index a1a2ea7e12..0ae4325dbc 100644 --- a/jms/src/main/scala/akka/stream/alpakka/jms/scaladsl/JmsSink.scala +++ b/jms/src/main/scala/akka/stream/alpakka/jms/scaladsl/JmsSink.scala @@ -4,7 +4,7 @@ package akka.stream.alpakka.jms.scaladsl import akka.NotUsed -import akka.stream.alpakka.jms.{ JmsSinkSettings, JmsSinkStage } +import akka.stream.alpakka.jms.{JmsSinkSettings, JmsSinkStage} import akka.stream.scaladsl.Sink object JmsSink { diff --git a/jms/src/main/scala/akka/stream/alpakka/jms/scaladsl/JmsSource.scala b/jms/src/main/scala/akka/stream/alpakka/jms/scaladsl/JmsSource.scala index 2eaf5bc8bf..538c5ab25b 100644 --- a/jms/src/main/scala/akka/stream/alpakka/jms/scaladsl/JmsSource.scala +++ b/jms/src/main/scala/akka/stream/alpakka/jms/scaladsl/JmsSource.scala @@ -3,10 +3,10 @@ */ package akka.stream.alpakka.jms.scaladsl -import javax.jms.{ Message, TextMessage } +import javax.jms.{Message, TextMessage} import akka.NotUsed -import akka.stream.alpakka.jms.{ JmsSourceSettings, JmsSourceStage } +import akka.stream.alpakka.jms.{JmsSourceSettings, JmsSourceStage} import akka.stream.scaladsl.Source object JmsSource { diff --git a/jms/src/test/scala/akka/stream/alpakka/jms/scaladsl/JmsConnectorsSpec.scala b/jms/src/test/scala/akka/stream/alpakka/jms/scaladsl/JmsConnectorsSpec.scala index 80b9039e55..d9b0d310cf 100644 --- a/jms/src/test/scala/akka/stream/alpakka/jms/scaladsl/JmsConnectorsSpec.scala +++ b/jms/src/test/scala/akka/stream/alpakka/jms/scaladsl/JmsConnectorsSpec.scala @@ -7,8 +7,8 @@ import javax.jms.JMSException import akka.NotUsed import akka.stream.ThrottleMode -import akka.stream.alpakka.jms.{ JmsSinkSettings, JmsSourceSettings, JmsSpec } -import akka.stream.scaladsl.{ Sink, Source } +import akka.stream.alpakka.jms.{JmsSinkSettings, JmsSourceSettings, JmsSpec} +import akka.stream.scaladsl.{Sink, Source} import org.apache.activemq.ActiveMQConnectionFactory import scala.concurrent.duration._ diff --git a/mqtt/src/main/scala/akka/stream/alpakka/mqtt/Mqtt.scala b/mqtt/src/main/scala/akka/stream/alpakka/mqtt/Mqtt.scala index 0c76e64e88..aa90f27a0f 100644 --- a/mqtt/src/main/scala/akka/stream/alpakka/mqtt/Mqtt.scala +++ b/mqtt/src/main/scala/akka/stream/alpakka/mqtt/Mqtt.scala @@ -5,7 +5,7 @@ package akka.stream.alpakka.mqtt import akka.stream.stage._ import akka.util.ByteString -import org.eclipse.paho.client.mqttv3.{ MqttMessage => PahoMqttMessage, _ } +import org.eclipse.paho.client.mqttv3.{MqttMessage => PahoMqttMessage, _} import scala.language.implicitConversions import scala.util._ diff --git a/mqtt/src/main/scala/akka/stream/alpakka/mqtt/MqttProducerStage.scala b/mqtt/src/main/scala/akka/stream/alpakka/mqtt/MqttProducerStage.scala index 95b28974a1..742f8b05ef 100644 --- a/mqtt/src/main/scala/akka/stream/alpakka/mqtt/MqttProducerStage.scala +++ b/mqtt/src/main/scala/akka/stream/alpakka/mqtt/MqttProducerStage.scala @@ -5,9 +5,9 @@ package akka.stream.alpakka.mqtt import akka.stream._ import akka.stream.stage._ -import org.eclipse.paho.client.mqttv3.{ IMqttAsyncClient, IMqttToken, MqttMessage => PahoMqttMessage } +import org.eclipse.paho.client.mqttv3.{IMqttAsyncClient, IMqttToken, MqttMessage => PahoMqttMessage} -import scala.util.{ Failure, Success, Try } +import scala.util.{Failure, Success, Try} final class MqttProducerStage(cs: MqttConnectionSettings, qos: MqttQoS) extends GraphStage[FlowShape[MqttMessage, MqttMessage]] { diff --git a/mqtt/src/main/scala/akka/stream/alpakka/mqtt/MqttSourceStage.scala b/mqtt/src/main/scala/akka/stream/alpakka/mqtt/MqttSourceStage.scala index 47786714ae..bb6bd7b2a7 100644 --- a/mqtt/src/main/scala/akka/stream/alpakka/mqtt/MqttSourceStage.scala +++ b/mqtt/src/main/scala/akka/stream/alpakka/mqtt/MqttSourceStage.scala @@ -8,7 +8,7 @@ import java.util.concurrent.Semaphore import akka.Done import akka.stream._ import akka.stream.stage._ -import org.eclipse.paho.client.mqttv3.{ IMqttAsyncClient, IMqttToken } +import org.eclipse.paho.client.mqttv3.{IMqttAsyncClient, IMqttToken} import scala.collection.mutable import scala.concurrent._ diff --git a/mqtt/src/main/scala/akka/stream/alpakka/mqtt/javadsl/MqttSink.scala b/mqtt/src/main/scala/akka/stream/alpakka/mqtt/javadsl/MqttSink.scala index fd818b9feb..067e70fc0d 100644 --- a/mqtt/src/main/scala/akka/stream/alpakka/mqtt/javadsl/MqttSink.scala +++ b/mqtt/src/main/scala/akka/stream/alpakka/mqtt/javadsl/MqttSink.scala @@ -6,7 +6,7 @@ package akka.stream.alpakka.mqtt.javadsl import java.util.concurrent.CompletionStage import akka.Done -import akka.stream.alpakka.mqtt.{ MqttConnectionSettings, MqttMessage, MqttQoS } +import akka.stream.alpakka.mqtt.{MqttConnectionSettings, MqttMessage, MqttQoS} object MqttSink { diff --git a/mqtt/src/main/scala/akka/stream/alpakka/mqtt/javadsl/MqttSource.scala b/mqtt/src/main/scala/akka/stream/alpakka/mqtt/javadsl/MqttSource.scala index 856388f684..5a8c07756e 100644 --- a/mqtt/src/main/scala/akka/stream/alpakka/mqtt/javadsl/MqttSource.scala +++ b/mqtt/src/main/scala/akka/stream/alpakka/mqtt/javadsl/MqttSource.scala @@ -6,7 +6,7 @@ package akka.stream.alpakka.mqtt.javadsl import java.util.concurrent.CompletionStage import akka.Done -import akka.stream.alpakka.mqtt.{ MqttMessage, MqttSourceSettings } +import akka.stream.alpakka.mqtt.{MqttMessage, MqttSourceSettings} object MqttSource { diff --git a/mqtt/src/main/scala/akka/stream/alpakka/mqtt/scaladsl/MqttSink.scala b/mqtt/src/main/scala/akka/stream/alpakka/mqtt/scaladsl/MqttSink.scala index c6872dbbb9..9a30b0aebf 100644 --- a/mqtt/src/main/scala/akka/stream/alpakka/mqtt/scaladsl/MqttSink.scala +++ b/mqtt/src/main/scala/akka/stream/alpakka/mqtt/scaladsl/MqttSink.scala @@ -5,7 +5,7 @@ package akka.stream.alpakka.mqtt.scaladsl import akka.Done import akka.stream.alpakka.mqtt._ -import akka.stream.scaladsl.{ Flow, Keep, Sink } +import akka.stream.scaladsl.{Flow, Keep, Sink} import scala.concurrent.Future diff --git a/mqtt/src/main/scala/akka/stream/alpakka/mqtt/scaladsl/MqttSource.scala b/mqtt/src/main/scala/akka/stream/alpakka/mqtt/scaladsl/MqttSource.scala index 631de04edc..03d22f68db 100644 --- a/mqtt/src/main/scala/akka/stream/alpakka/mqtt/scaladsl/MqttSource.scala +++ b/mqtt/src/main/scala/akka/stream/alpakka/mqtt/scaladsl/MqttSource.scala @@ -4,7 +4,7 @@ package akka.stream.alpakka.mqtt.scaladsl import akka.Done -import akka.stream.alpakka.mqtt.{ MqttMessage, MqttSourceSettings, MqttSourceStage } +import akka.stream.alpakka.mqtt.{MqttMessage, MqttSourceSettings, MqttSourceStage} import akka.stream.scaladsl.Source import scala.concurrent.Future diff --git a/mqtt/src/test/scala/akka/stream/alpakka/mqtt/scaladsl/MqttSinkSpec.scala b/mqtt/src/test/scala/akka/stream/alpakka/mqtt/scaladsl/MqttSinkSpec.scala index 764a24c63c..b71f6dc672 100644 --- a/mqtt/src/test/scala/akka/stream/alpakka/mqtt/scaladsl/MqttSinkSpec.scala +++ b/mqtt/src/test/scala/akka/stream/alpakka/mqtt/scaladsl/MqttSinkSpec.scala @@ -7,7 +7,7 @@ import akka.Done import akka.actor.ActorSystem import akka.stream.ActorMaterializer import akka.stream.alpakka.mqtt._ -import akka.stream.scaladsl.{ Keep, Sink, Source } +import akka.stream.scaladsl.{Keep, Sink, Source} import akka.testkit.TestKit import akka.util.ByteString import org.eclipse.paho.client.mqttv3.MqttSecurityException diff --git a/s3/src/main/scala/akka/stream/alpakka/s3/S3Exception.scala b/s3/src/main/scala/akka/stream/alpakka/s3/S3Exception.scala index af51350b2b..5398f75677 100644 --- a/s3/src/main/scala/akka/stream/alpakka/s3/S3Exception.scala +++ b/s3/src/main/scala/akka/stream/alpakka/s3/S3Exception.scala @@ -3,7 +3,7 @@ */ package akka.stream.alpakka.s3 -import scala.xml.{ Elem, NodeSeq, XML } +import scala.xml.{Elem, NodeSeq, XML} class S3Exception(val code: String, val message: String, val requestID: String, val hostId: String) extends RuntimeException(message) { diff --git a/s3/src/main/scala/akka/stream/alpakka/s3/auth/CanonicalRequest.scala b/s3/src/main/scala/akka/stream/alpakka/s3/auth/CanonicalRequest.scala index 52f7e4c89c..136449ac06 100644 --- a/s3/src/main/scala/akka/stream/alpakka/s3/auth/CanonicalRequest.scala +++ b/s3/src/main/scala/akka/stream/alpakka/s3/auth/CanonicalRequest.scala @@ -5,8 +5,8 @@ package akka.stream.alpakka.s3.auth import java.net.URLEncoder -import akka.http.scaladsl.model.Uri.{ Path, Query } -import akka.http.scaladsl.model.{ HttpHeader, HttpRequest } +import akka.http.scaladsl.model.Uri.{Path, Query} +import akka.http.scaladsl.model.{HttpHeader, HttpRequest} // Documentation: http://docs.aws.amazon.com/general/latest/gr/sigv4-create-canonical-request.html private[alpakka] case class CanonicalRequest(method: String, diff --git a/s3/src/main/scala/akka/stream/alpakka/s3/auth/Signer.scala b/s3/src/main/scala/akka/stream/alpakka/s3/auth/Signer.scala index da716250bb..df4148e89a 100644 --- a/s3/src/main/scala/akka/stream/alpakka/s3/auth/Signer.scala +++ b/s3/src/main/scala/akka/stream/alpakka/s3/auth/Signer.scala @@ -5,10 +5,10 @@ package akka.stream.alpakka.s3.auth import java.security.MessageDigest import java.time.format.DateTimeFormatter -import java.time.{ ZoneOffset, ZonedDateTime } +import java.time.{ZoneOffset, ZonedDateTime} import akka.http.scaladsl.model.headers.RawHeader -import akka.http.scaladsl.model.{ HttpHeader, HttpRequest } +import akka.http.scaladsl.model.{HttpHeader, HttpRequest} import akka.stream.Materializer import scala.concurrent.Future @@ -34,21 +34,21 @@ private[alpakka] object Signer { } } - def sessionHeader(creds: AWSCredentials): Option[HttpHeader] = creds match { + private[this] def sessionHeader(creds: AWSCredentials): Option[HttpHeader] = creds match { case _: BasicCredentials => None case AWSSessionCredentials(_, _, sessionToken) => Some(RawHeader("X-Amz-Security-Token", sessionToken)) } - def authorizationHeader(algorithm: String, - key: SigningKey, - requestDate: ZonedDateTime, - canonicalRequest: CanonicalRequest): HttpHeader = + private[this] def authorizationHeader(algorithm: String, + key: SigningKey, + requestDate: ZonedDateTime, + canonicalRequest: CanonicalRequest): HttpHeader = RawHeader("Authorization", authorizationString(algorithm, key, requestDate, canonicalRequest)) - def authorizationString(algorithm: String, - key: SigningKey, - requestDate: ZonedDateTime, - canonicalRequest: CanonicalRequest): String = { + private[this] def authorizationString(algorithm: String, + key: SigningKey, + requestDate: ZonedDateTime, + canonicalRequest: CanonicalRequest): String = { val sign = key.hexEncodedSignature(stringToSign(algorithm, key, requestDate, canonicalRequest).getBytes()) s"$algorithm Credential=${key.credentialString}, SignedHeaders=${canonicalRequest.signedHeaders}, Signature=$sign" } diff --git a/s3/src/main/scala/akka/stream/alpakka/s3/auth/package.scala b/s3/src/main/scala/akka/stream/alpakka/s3/auth/package.scala index b7b5133fa8..a3ed60aec1 100644 --- a/s3/src/main/scala/akka/stream/alpakka/s3/auth/package.scala +++ b/s3/src/main/scala/akka/stream/alpakka/s3/auth/package.scala @@ -6,7 +6,7 @@ package akka.stream.alpakka.s3 import java.security.MessageDigest import javax.xml.bind.DatatypeConverter -import akka.stream.scaladsl.{ Flow, Keep, Sink } +import akka.stream.scaladsl.{Flow, Keep, Sink} import akka.util.ByteString import scala.concurrent.Future diff --git a/s3/src/main/scala/akka/stream/alpakka/s3/impl/DiskBuffer.scala b/s3/src/main/scala/akka/stream/alpakka/s3/impl/DiskBuffer.scala index 442bb02c9a..f672cf9c1b 100644 --- a/s3/src/main/scala/akka/stream/alpakka/s3/impl/DiskBuffer.scala +++ b/s3/src/main/scala/akka/stream/alpakka/s3/impl/DiskBuffer.scala @@ -3,7 +3,7 @@ */ package akka.stream.alpakka.s3.impl -import java.io.{ File, FileOutputStream, RandomAccessFile } +import java.io.{File, FileOutputStream, RandomAccessFile} import java.nio.channels.FileChannel import java.nio.file.Files import java.util.concurrent.atomic.AtomicInteger diff --git a/s3/src/main/scala/akka/stream/alpakka/s3/impl/HttpRequests.scala b/s3/src/main/scala/akka/stream/alpakka/s3/impl/HttpRequests.scala index 6fd0e192db..a9961136be 100644 --- a/s3/src/main/scala/akka/stream/alpakka/s3/impl/HttpRequests.scala +++ b/s3/src/main/scala/akka/stream/alpakka/s3/impl/HttpRequests.scala @@ -6,28 +6,47 @@ package akka.stream.alpakka.s3.impl import akka.http.scaladsl.marshallers.xml.ScalaXmlSupport._ import akka.http.scaladsl.marshalling.Marshal import akka.http.scaladsl.model.Uri.Query -import akka.http.scaladsl.model.{ RequestEntity, _ } -import akka.http.scaladsl.model.headers.{ Host, RawHeader } +import akka.http.scaladsl.model.headers.{Host, RawHeader} +import akka.http.scaladsl.model.{RequestEntity, _} import akka.stream.alpakka.s3.S3Settings import akka.stream.alpakka.s3.acl.CannedAcl import akka.stream.scaladsl.Source import akka.util.ByteString -import scala.concurrent.{ ExecutionContext, Future } +import scala.collection.immutable +import scala.collection.immutable.Seq +import scala.concurrent.{ExecutionContext, Future} + +case class MetaHeaders(headers: Map[String, String]) private[alpakka] object HttpRequests { def getDownloadRequest(s3Location: S3Location, region: String)(implicit conf: S3Settings): HttpRequest = s3Request(s3Location, region: String) - def initiateMultipartUploadRequest(s3Location: S3Location, contentType: ContentType, cannedAcl: CannedAcl, region: String)( - implicit conf: S3Settings): HttpRequest = + def initiateMultipartUploadRequest(s3Location: S3Location, + contentType: ContentType, + cannedAcl: CannedAcl, + region: String, + metaHeaders: MetaHeaders)(implicit conf: S3Settings): HttpRequest = { + + def buildHeaders(metaHeaders: MetaHeaders, cannedAcl: CannedAcl): immutable.Seq[HttpHeader] = { + val metaHttpHeaders = metaHeaders.headers.map { header => + RawHeader(s"x-amz-meta-${header._1}", header._2) + }(collection.breakOut): Seq[HttpHeader] + metaHttpHeaders :+ RawHeader("x-amz-acl", cannedAcl.value) + } + s3Request(s3Location, region, HttpMethods.POST, _.withQuery(Query("uploads"))) - .withDefaultHeaders(RawHeader("x-amz-acl", cannedAcl.value)) + .withDefaultHeaders(buildHeaders(metaHeaders, cannedAcl)) .withEntity(HttpEntity.empty(contentType)) + } - def uploadPartRequest(upload: MultipartUpload, partNumber: Int, payload: Source[ByteString, _], payloadSize: Int, region: String)( - implicit conf: S3Settings): HttpRequest = + def uploadPartRequest(upload: MultipartUpload, + partNumber: Int, + payload: Source[ByteString, _], + payloadSize: Int, + region: String)(implicit conf: S3Settings): HttpRequest = s3Request( upload.s3Location, region, @@ -39,11 +58,15 @@ private[alpakka] object HttpRequests { implicit ec: ExecutionContext, conf: S3Settings): Future[HttpRequest] = { + //Do not let the start PartNumber,ETag and the end PartNumber,ETag be on different lines + // They tend to get split when this file is formatted by IntelliJ unless http://stackoverflow.com/a/19492318/1216965 + // @formatter:off val payload = { parts.map { case (partNumber, etag) => { partNumber }{ etag } } } + // @formatter:on for { entity <- Marshal(payload).to[RequestEntity] } yield { @@ -75,6 +98,8 @@ private[alpakka] object HttpRequests { } } - HttpRequest(method).withHeaders(Host(requestHost(s3Location, region))).withUri(uriFn(requestUri(s3Location, region))) + HttpRequest(method) + .withHeaders(Host(requestHost(s3Location, region))) + .withUri(uriFn(requestUri(s3Location, region))) } } diff --git a/s3/src/main/scala/akka/stream/alpakka/s3/impl/Marshalling.scala b/s3/src/main/scala/akka/stream/alpakka/s3/impl/Marshalling.scala index 8a9d874d93..88af11691c 100644 --- a/s3/src/main/scala/akka/stream/alpakka/s3/impl/Marshalling.scala +++ b/s3/src/main/scala/akka/stream/alpakka/s3/impl/Marshalling.scala @@ -4,8 +4,8 @@ package akka.stream.alpakka.s3.impl import akka.http.scaladsl.marshallers.xml.ScalaXmlSupport -import akka.http.scaladsl.model.{ ContentTypes, HttpCharsets, MediaTypes } -import akka.http.scaladsl.unmarshalling.{ FromEntityUnmarshaller, Unmarshaller } +import akka.http.scaladsl.model.{ContentTypes, HttpCharsets, MediaTypes} +import akka.http.scaladsl.unmarshalling.{FromEntityUnmarshaller, Unmarshaller} import scala.xml.NodeSeq diff --git a/s3/src/main/scala/akka/stream/alpakka/s3/impl/MemoryBuffer.scala b/s3/src/main/scala/akka/stream/alpakka/s3/impl/MemoryBuffer.scala index 131663f4c8..dfef3aca6c 100644 --- a/s3/src/main/scala/akka/stream/alpakka/s3/impl/MemoryBuffer.scala +++ b/s3/src/main/scala/akka/stream/alpakka/s3/impl/MemoryBuffer.scala @@ -3,9 +3,9 @@ */ package akka.stream.alpakka.s3.impl -import akka.stream.{ Attributes, FlowShape, Inlet, Outlet } +import akka.stream.{Attributes, FlowShape, Inlet, Outlet} import akka.stream.scaladsl.Source -import akka.stream.stage.{ GraphStage, GraphStageLogic, InHandler, OutHandler } +import akka.stream.stage.{GraphStage, GraphStageLogic, InHandler, OutHandler} import akka.util.ByteString /** diff --git a/s3/src/main/scala/akka/stream/alpakka/s3/impl/S3Stream.scala b/s3/src/main/scala/akka/stream/alpakka/s3/impl/S3Stream.scala index 40cc6ec406..903b6e6826 100644 --- a/s3/src/main/scala/akka/stream/alpakka/s3/impl/S3Stream.scala +++ b/s3/src/main/scala/akka/stream/alpakka/s3/impl/S3Stream.scala @@ -10,17 +10,17 @@ import akka.NotUsed import akka.actor.ActorSystem import akka.http.scaladsl.Http import akka.http.scaladsl.model._ -import akka.http.scaladsl.unmarshalling.{ Unmarshal, Unmarshaller } +import akka.http.scaladsl.unmarshalling.{Unmarshal, Unmarshaller} import akka.stream.Materializer import akka.stream.alpakka.s3.acl.CannedAcl -import akka.stream.alpakka.s3.auth.{ AWSCredentials, CredentialScope, Signer, SigningKey } -import akka.stream.alpakka.s3.{ DiskBufferType, MemoryBufferType, S3Exception, S3Settings } -import akka.stream.scaladsl.{ Flow, Keep, Sink, Source } +import akka.stream.alpakka.s3.auth.{AWSCredentials, CredentialScope, Signer, SigningKey} +import akka.stream.alpakka.s3.{DiskBufferType, MemoryBufferType, S3Exception, S3Settings} +import akka.stream.scaladsl.{Flow, Keep, Sink, Source} import akka.util.ByteString import scala.collection.immutable.Seq -import scala.concurrent.{ ExecutionContext, Future } -import scala.util.{ Failure, Success } +import scala.concurrent.{ExecutionContext, Future} +import scala.util.{Failure, Success} final case class S3Location(bucket: String, key: String) @@ -55,12 +55,14 @@ private[alpakka] final class S3Stream(credentials: AWSCredentials, region: Strin import Marshalling._ implicit val conf = settings - val MinChunkSize = 5242880 + val MinChunkSize = 5242880 //in bytes val signingKey = SigningKey(credentials, CredentialScope(LocalDate.now(), region, "s3")) def download(s3Location: S3Location): Source[ByteString, NotUsed] = { import mat.executionContext - Source.fromFuture(signAndGet(HttpRequests.getDownloadRequest(s3Location, region)).map(_.dataBytes)).flatMapConcat(identity) + Source + .fromFuture(signAndGet(HttpRequests.getDownloadRequest(s3Location, region)).map(_.dataBytes)) + .flatMapConcat(identity) } /** @@ -72,25 +74,28 @@ private[alpakka] final class S3Stream(credentials: AWSCredentials, region: Strin */ def multipartUpload(s3Location: S3Location, contentType: ContentType = ContentTypes.`application/octet-stream`, + metaHeaders: MetaHeaders, cannedAcl: CannedAcl = CannedAcl.Private, chunkSize: Int = MinChunkSize, chunkingParallelism: Int = 4): Sink[ByteString, Future[CompleteMultipartUploadResult]] = - chunkAndRequest(s3Location, contentType, cannedAcl, chunkSize)(chunkingParallelism) - .toMat(completionSink(s3Location))(Keep.right) + chunkAndRequest(s3Location, contentType, metaHeaders, cannedAcl, chunkSize)(chunkingParallelism).toMat( + completionSink(s3Location))(Keep.right) private def initiateMultipartUpload(s3Location: S3Location, contentType: ContentType, - cannedAcl: CannedAcl): Future[MultipartUpload] = { + cannedAcl: CannedAcl, + metaHeaders: MetaHeaders): Future[MultipartUpload] = { import mat.executionContext - val req = HttpRequests.initiateMultipartUploadRequest(s3Location, contentType, cannedAcl, region) + val req = HttpRequests.initiateMultipartUploadRequest(s3Location, contentType, cannedAcl, region, metaHeaders) val response = for { signedReq <- Signer.signedRequest(req, signingKey) response <- Http().singleRequest(signedReq) } yield response response.flatMap { - case HttpResponse(status, _, entity, _) if status.isSuccess() => Unmarshal(entity).to[MultipartUpload] + case HttpResponse(status, _, entity, _) if status.isSuccess() => + Unmarshal(entity).to[MultipartUpload] case HttpResponse(status, _, entity, _) => Unmarshal(entity).to[String].flatMap { case err => @@ -113,26 +118,18 @@ private[alpakka] final class S3Stream(credentials: AWSCredentials, region: Strin */ private def initiateUpload(s3Location: S3Location, contentType: ContentType, - cannedAcl: CannedAcl): Source[(MultipartUpload, Int), NotUsed] = + cannedAcl: CannedAcl, + metaHeaders: MetaHeaders): Source[(MultipartUpload, Int), NotUsed] = Source .single(s3Location) - .mapAsync(1)(initiateMultipartUpload(_, contentType, cannedAcl)) + .mapAsync(1)(initiateMultipartUpload(_, contentType, cannedAcl, metaHeaders)) .mapConcat { case r => Stream.continually(r) } .zip(Source.fromIterator(() => Iterator.from(1))) - /** - * Transforms a flow of ByteStrings into a flow of HTTPRequests to upload to S3. - * - * @param s3Location - * @param contentType - * @param cannedAcl - * @param chunkSize - * @param parallelism - * @return - */ private def createRequests( s3Location: S3Location, contentType: ContentType, + metaHeaders: MetaHeaders, cannedAcl: CannedAcl = CannedAcl.Private, chunkSize: Int = MinChunkSize, parallelism: Int = 4): Flow[ByteString, (HttpRequest, (MultipartUpload, Int)), NotUsed] = { @@ -140,17 +137,20 @@ private[alpakka] final class S3Stream(credentials: AWSCredentials, region: Strin assert(chunkSize >= MinChunkSize, "Chunk size must be at least 5242880B. See http://docs.aws.amazon.com/AmazonS3/latest/API/mpUploadUploadPart.html") - // First step of the multi part upload process - initiate! - // The response is then used for subsequent requests - val requestInfo: Source[(MultipartUpload, Int), NotUsed] = initiateUpload(s3Location, contentType, cannedAcl) + // First step of the multi part upload process is made. + // The response is then used to construct the subsequent individual upload part requests + val requestInfo: Source[(MultipartUpload, Int), NotUsed] = + initiateUpload(s3Location, contentType, cannedAcl, metaHeaders) SplitAfterSize(chunkSize)(Flow.apply[ByteString]) - .via(getChunkBuffer(chunkSize)) + .via(getChunkBuffer(chunkSize)) //creates the chunks .concatSubstreams .zipWith(requestInfo) { - case (payload, (uploadInfo, chunkIndex)) => - (HttpRequests.uploadPartRequest(uploadInfo, chunkIndex, payload.data, payload.size, region), - (uploadInfo, chunkIndex)) + case (chunkedPayload, (uploadInfo, chunkIndex)) => + //each of the payload requests are created + val partRequest = + HttpRequests.uploadPartRequest(uploadInfo, chunkIndex, chunkedPayload.data, chunkedPayload.size, region) + (partRequest, (uploadInfo, chunkIndex)) } .mapAsync(parallelism) { case (req, info) => Signer.signedRequest(req, signingKey).zip(Future.successful(info)) } } @@ -168,13 +168,16 @@ private[alpakka] final class S3Stream(credentials: AWSCredentials, region: Strin private def chunkAndRequest( s3Location: S3Location, contentType: ContentType, + metaHeaders: MetaHeaders, cannedAcl: CannedAcl = CannedAcl.Private, chunkSize: Int = MinChunkSize)(parallelism: Int = 4): Flow[ByteString, UploadPartResponse, NotUsed] = { - // The entire multipart upload happens in this request! - val requestFlow = createRequests(s3Location, contentType, cannedAcl, chunkSize, parallelism) + // Multipart upload requests (except for the completion api) are created here. + // The initial upload request gets executed within this function as well. + // The individual upload part requests are created. + val requestFlow = createRequests(s3Location, contentType, metaHeaders, cannedAcl, chunkSize, parallelism) - // The multipart upload flow is constructed by + // The individual upload part requests are processed here requestFlow.via(Http().superPool[(MultipartUpload, Int)]()).map { case (Success(r), (upload, index)) => r.entity.dataBytes.runWith(Sink.ignore) diff --git a/s3/src/main/scala/akka/stream/alpakka/s3/javadsl/S3Client.scala b/s3/src/main/scala/akka/stream/alpakka/s3/javadsl/S3Client.scala index 287855605e..c3b7602dc5 100644 --- a/s3/src/main/scala/akka/stream/alpakka/s3/javadsl/S3Client.scala +++ b/s3/src/main/scala/akka/stream/alpakka/s3/javadsl/S3Client.scala @@ -3,20 +3,20 @@ */ package akka.stream.alpakka.s3.javadsl +import java.util.concurrent.CompletionStage + +import akka.NotUsed import akka.actor.ActorSystem import akka.http.impl.model.JavaUri -import akka.http.javadsl.model.{ ContentType, Uri } -import akka.http.scaladsl.model.{ ContentType => ScalaContentType, ContentTypes } +import akka.http.javadsl.model.{ContentType, Uri} +import akka.http.scaladsl.model.{ContentTypes, HttpHeader, ContentType => ScalaContentType} import akka.stream.Materializer -import akka.stream.alpakka.s3.impl.CompleteMultipartUploadResult import akka.stream.alpakka.s3.auth.AWSCredentials -import akka.stream.alpakka.s3.impl.S3Stream -import akka.stream.alpakka.s3.impl.S3Location -import akka.stream.javadsl.Source +import akka.stream.alpakka.s3.impl.{CompleteMultipartUploadResult, MetaHeaders, S3Location, S3Stream} +import akka.stream.javadsl.{Sink, Source} import akka.util.ByteString -import akka.NotUsed -import akka.stream.javadsl.Sink -import java.util.concurrent.CompletionStage + +import scala.collection.immutable import scala.compat.java8.FutureConverters._ final case class MultipartUploadResult(location: Uri, bucket: String, key: String, etag: String) @@ -34,12 +34,13 @@ final class S3Client(credentials: AWSCredentials, region: String, system: ActorS def multipartUpload(bucket: String, key: String, - contentType: ContentType): Sink[ByteString, CompletionStage[MultipartUploadResult]] = + contentType: ContentType, + metaHeaders: MetaHeaders): Sink[ByteString, CompletionStage[MultipartUploadResult]] = impl - .multipartUpload(S3Location(bucket, key), contentType.asInstanceOf[ScalaContentType]) + .multipartUpload(S3Location(bucket, key), contentType.asInstanceOf[ScalaContentType], metaHeaders) .mapMaterializedValue(_.map(MultipartUploadResult.create)(system.dispatcher).toJava) .asJava def multipartUpload(bucket: String, key: String): Sink[ByteString, CompletionStage[MultipartUploadResult]] = - multipartUpload(bucket, key, ContentTypes.`application/octet-stream`) + multipartUpload(bucket, key, ContentTypes.`application/octet-stream`, MetaHeaders(Map())) } diff --git a/s3/src/main/scala/akka/stream/alpakka/s3/scaladsl/S3Client.scala b/s3/src/main/scala/akka/stream/alpakka/s3/scaladsl/S3Client.scala index c10229d488..8f0278b417 100644 --- a/s3/src/main/scala/akka/stream/alpakka/s3/scaladsl/S3Client.scala +++ b/s3/src/main/scala/akka/stream/alpakka/s3/scaladsl/S3Client.scala @@ -5,15 +5,16 @@ package akka.stream.alpakka.s3.scaladsl import akka.NotUsed import akka.actor.ActorSystem -import akka.http.scaladsl.model.{ ContentType, ContentTypes, Uri } +import akka.http.scaladsl.model.{ContentType, ContentTypes, HttpHeader, Uri} import akka.stream.Materializer import akka.stream.alpakka.s3.S3Settings import akka.stream.alpakka.s3.acl.CannedAcl import akka.stream.alpakka.s3.auth.AWSCredentials -import akka.stream.alpakka.s3.impl.{ CompleteMultipartUploadResult, S3Location, S3Stream } -import akka.stream.scaladsl.{ Sink, Source } +import akka.stream.alpakka.s3.impl.{CompleteMultipartUploadResult, MetaHeaders, S3Location, S3Stream} +import akka.stream.scaladsl.{Sink, Source} import akka.util.ByteString +import scala.collection.immutable import scala.concurrent.Future final case class MultipartUploadResult(location: Uri, bucket: String, key: String, etag: String) @@ -43,10 +44,11 @@ final class S3Client(credentials: AWSCredentials, region: String)(implicit syste def multipartUpload(bucket: String, key: String, contentType: ContentType = ContentTypes.`application/octet-stream`, + metaHeaders: MetaHeaders = MetaHeaders(Map()), cannedAcl: CannedAcl = CannedAcl.Private, chunkSize: Int = MinChunkSize, chunkingParallelism: Int = 4): Sink[ByteString, Future[MultipartUploadResult]] = impl - .multipartUpload(S3Location(bucket, key), contentType, cannedAcl, chunkSize, chunkingParallelism) + .multipartUpload(S3Location(bucket, key), contentType, metaHeaders, cannedAcl, chunkSize, chunkingParallelism) .mapMaterializedValue(_.map(MultipartUploadResult.apply)(system.dispatcher)) } diff --git a/s3/src/test/resources/application.conf b/s3/src/test/resources/application.conf index 5ca4cea43e..54236b3143 100644 --- a/s3/src/test/resources/application.conf +++ b/s3/src/test/resources/application.conf @@ -3,6 +3,9 @@ akka { } akka.stream.alpakka.s3 { + + debug-logging = true + aws = { access-key-id = "" secret-access-key = "" diff --git a/s3/src/test/resources/logback.xml b/s3/src/test/resources/logback.xml new file mode 100644 index 0000000000..d4ea5cb281 --- /dev/null +++ b/s3/src/test/resources/logback.xml @@ -0,0 +1,38 @@ + + + + + + + + + UTF-8 + + + %highlight(%date{HH:mm:ss.SSS} %-5level %-50.50([%logger{50}])) - %msg%n + + + + + + + + + + + + \ No newline at end of file diff --git a/s3/src/test/scala/akka/stream/alpakka/s3/auth/SigningKeySpec.scala b/s3/src/test/scala/akka/stream/alpakka/s3/auth/SigningKeySpec.scala index 7c8712f1a9..dbfeed8a0a 100644 --- a/s3/src/test/scala/akka/stream/alpakka/s3/auth/SigningKeySpec.scala +++ b/s3/src/test/scala/akka/stream/alpakka/s3/auth/SigningKeySpec.scala @@ -5,7 +5,7 @@ package akka.stream.alpakka.s3.auth import java.time.LocalDate -import org.scalatest.{ FlatSpec, Matchers } +import org.scalatest.{FlatSpec, Matchers} class SigningKeySpec extends FlatSpec with Matchers { behavior of "A Signing Key" diff --git a/s3/src/test/scala/akka/stream/alpakka/s3/auth/StreamUtilsSpec.scala b/s3/src/test/scala/akka/stream/alpakka/s3/auth/StreamUtilsSpec.scala index c9120c6134..32f5116559 100644 --- a/s3/src/test/scala/akka/stream/alpakka/s3/auth/StreamUtilsSpec.scala +++ b/s3/src/test/scala/akka/stream/alpakka/s3/auth/StreamUtilsSpec.scala @@ -4,13 +4,13 @@ package akka.stream.alpakka.s3.auth import java.nio.charset.StandardCharsets._ -import java.nio.file.{ Files, Path } +import java.nio.file.{Files, Path} import java.security.DigestInputStream import java.security.MessageDigest import scala.concurrent.Future -import org.scalatest.{ BeforeAndAfterAll, FlatSpecLike, Matchers } +import org.scalatest.{BeforeAndAfterAll, FlatSpecLike, Matchers} import org.scalatest.concurrent.ScalaFutures import org.scalatest.time.Millis import org.scalatest.time.Seconds @@ -23,7 +23,7 @@ import akka.stream.scaladsl.Source import akka.stream.scaladsl.StreamConverters import akka.testkit.TestKit import akka.util.ByteString -import com.google.common.jimfs.{ Configuration, Jimfs } +import com.google.common.jimfs.{Configuration, Jimfs} class StreamUtilsSpec(_system: ActorSystem) extends TestKit(_system) diff --git a/s3/src/test/scala/akka/stream/alpakka/s3/impl/DiskBufferSpec.scala b/s3/src/test/scala/akka/stream/alpakka/s3/impl/DiskBufferSpec.scala index 4c13c64295..c694849737 100644 --- a/s3/src/test/scala/akka/stream/alpakka/s3/impl/DiskBufferSpec.scala +++ b/s3/src/test/scala/akka/stream/alpakka/s3/impl/DiskBufferSpec.scala @@ -7,13 +7,13 @@ import java.nio.BufferOverflowException import java.nio.file.Files import akka.actor.ActorSystem -import akka.stream.{ ActorMaterializer, ActorMaterializerSettings } -import akka.stream.scaladsl.{ Sink, Source } +import akka.stream.{ActorMaterializer, ActorMaterializerSettings} +import akka.stream.scaladsl.{Sink, Source} import akka.testkit.TestKit import akka.util.ByteString -import org.scalatest.time.{ Millis, Seconds, Span } -import org.scalatest.{ BeforeAndAfterAll, FlatSpecLike, Matchers } -import org.scalatest.concurrent.{ Eventually, ScalaFutures } +import org.scalatest.time.{Millis, Seconds, Span} +import org.scalatest.{BeforeAndAfterAll, FlatSpecLike, Matchers} +import org.scalatest.concurrent.{Eventually, ScalaFutures} import scala.concurrent.duration._ class DiskBufferSpec(_system: ActorSystem) diff --git a/s3/src/test/scala/akka/stream/alpakka/s3/impl/HttpRequestsSpec.scala b/s3/src/test/scala/akka/stream/alpakka/s3/impl/HttpRequestsSpec.scala index 7576f14d24..91bd956cc5 100644 --- a/s3/src/test/scala/akka/stream/alpakka/s3/impl/HttpRequestsSpec.scala +++ b/s3/src/test/scala/akka/stream/alpakka/s3/impl/HttpRequestsSpec.scala @@ -5,10 +5,13 @@ package akka.stream.alpakka.s3.impl import akka.actor.ActorSystem import akka.http.scaladsl.model.headers.RawHeader -import akka.http.scaladsl.model.{ HttpEntity, MediaTypes } +import akka.http.scaladsl.model.{HttpEntity, MediaTypes} import akka.stream.alpakka.s3.S3Settings import akka.stream.alpakka.s3.acl.CannedAcl -import org.scalatest.{ FlatSpec, Matchers } +import org.scalatest.Inspectors.{forAll => iforAll} +import org.scalatest.{FlatSpec, Matchers} + +import scala.collection.immutable.Seq class HttpRequestsSpec extends FlatSpec with Matchers { it should "initiate multipart upload" in { @@ -17,10 +20,17 @@ class HttpRequestsSpec extends FlatSpec with Matchers { val location = S3Location("bucket", "image-1024@2x") val contentType = MediaTypes.`image/jpeg` val acl = CannedAcl.PublicRead - val req = HttpRequests.initiateMultipartUploadRequest(location, contentType, acl, "us-east-1") + val metaHeaders: Map[String, String] = Map("location" -> "San Francisco", "orientation" -> "portrait") + + val req = + HttpRequests.initiateMultipartUploadRequest(location, contentType, acl, "us-east-1", MetaHeaders(metaHeaders)) req.entity shouldEqual HttpEntity.empty(contentType) req.headers should contain(RawHeader("x-amz-acl", acl.value)) req.uri.authority.host.toString shouldEqual "bucket.s3-us-east-1.amazonaws.com" + + metaHeaders.map { m => + req.headers should contain(RawHeader(s"x-amz-meta-${m._1}", m._2)) + } } } diff --git a/s3/src/test/scala/akka/stream/alpakka/s3/impl/MemoryBufferSpec.scala b/s3/src/test/scala/akka/stream/alpakka/s3/impl/MemoryBufferSpec.scala index 8991c7e677..6ec6779ff8 100644 --- a/s3/src/test/scala/akka/stream/alpakka/s3/impl/MemoryBufferSpec.scala +++ b/s3/src/test/scala/akka/stream/alpakka/s3/impl/MemoryBufferSpec.scala @@ -4,12 +4,12 @@ package akka.stream.alpakka.s3.impl import akka.actor.ActorSystem -import akka.stream.{ ActorMaterializer, ActorMaterializerSettings } -import akka.stream.scaladsl.{ Sink, Source } +import akka.stream.{ActorMaterializer, ActorMaterializerSettings} +import akka.stream.scaladsl.{Sink, Source} import akka.testkit.TestKit import akka.util.ByteString -import org.scalatest.time.{ Millis, Seconds, Span } -import org.scalatest.{ BeforeAndAfterAll, FlatSpecLike, Matchers } +import org.scalatest.time.{Millis, Seconds, Span} +import org.scalatest.{BeforeAndAfterAll, FlatSpecLike, Matchers} import org.scalatest.concurrent.ScalaFutures class MemoryBufferSpec(_system: ActorSystem) diff --git a/s3/src/test/scala/akka/stream/alpakka/s3/impl/S3NoMock.scala b/s3/src/test/scala/akka/stream/alpakka/s3/impl/S3NoMock.scala index eac6fd8fdd..ebe0610249 100644 --- a/s3/src/test/scala/akka/stream/alpakka/s3/impl/S3NoMock.scala +++ b/s3/src/test/scala/akka/stream/alpakka/s3/impl/S3NoMock.scala @@ -4,9 +4,7 @@ package akka.stream.alpakka.s3.impl import akka.actor.ActorSystem -import akka.http.scaladsl.model.ContentTypes import akka.stream.ActorMaterializer -import akka.stream.alpakka.s3.auth.AWSCredentials import akka.stream.alpakka.s3.scaladsl.S3Client import akka.stream.scaladsl.{Sink, Source} import akka.util.ByteString @@ -23,15 +21,18 @@ class S3NoMock extends FlatSpecLike with BeforeAndAfterAll with Matchers with Sc val bucket = "test-bucket" val objectKey = "test" - val objectValue = "Some content." + + val objectValue = "Some String" + val metaHeaders: Map[String, String] = Map("location" -> "Africa", "datatype" -> "image") it should "upload with real credentials" ignore { val source: Source[ByteString, Any] = Source(ByteString(objectValue) :: Nil) + //val source: Source[ByteString, Any] = FileIO.fromPath(Paths.get("/tmp/IMG_0470.JPG")) - val result = source.runWith(S3Client().multipartUpload(bucket, objectKey, contentType = ContentTypes.`text/plain(UTF-8)`)) + val result = source.runWith(S3Client().multipartUpload(bucket, objectKey, metaHeaders = MetaHeaders(metaHeaders))) - val multipartUploadResult = Await.ready(result, 5.seconds).futureValue + val multipartUploadResult = Await.ready(result, 90.seconds).futureValue multipartUploadResult.bucket shouldBe bucket multipartUploadResult.key shouldBe objectKey } diff --git a/s3/src/test/scala/akka/stream/alpakka/s3/impl/S3SinkSpec.scala b/s3/src/test/scala/akka/stream/alpakka/s3/impl/S3SinkSpec.scala index 26b1b2ae41..bb309f3eda 100644 --- a/s3/src/test/scala/akka/stream/alpakka/s3/impl/S3SinkSpec.scala +++ b/s3/src/test/scala/akka/stream/alpakka/s3/impl/S3SinkSpec.scala @@ -6,8 +6,8 @@ package akka.stream.alpakka.s3.impl import akka.stream.ActorMaterializer import akka.stream.alpakka.s3.S3Exception import akka.stream.alpakka.s3.auth.AWSCredentials -import akka.stream.alpakka.s3.scaladsl.{ MultipartUploadResult, S3Client } -import akka.stream.scaladsl.{ Keep, Sink, Source } +import akka.stream.alpakka.s3.scaladsl.{MultipartUploadResult, S3Client} +import akka.stream.scaladsl.{Keep, Sink, Source} import akka.util.ByteString import com.github.tomakehurst.wiremock.client.WireMock._ diff --git a/s3/src/test/scala/akka/stream/alpakka/s3/impl/SplitAfterSizeSpec.scala b/s3/src/test/scala/akka/stream/alpakka/s3/impl/SplitAfterSizeSpec.scala index f43e5bed02..c383553aea 100644 --- a/s3/src/test/scala/akka/stream/alpakka/s3/impl/SplitAfterSizeSpec.scala +++ b/s3/src/test/scala/akka/stream/alpakka/s3/impl/SplitAfterSizeSpec.scala @@ -15,7 +15,7 @@ import akka.stream.scaladsl.Source import akka.stream.scaladsl.Flow import akka.util.ByteString import akka.stream.scaladsl.Sink -import org.scalatest.time.{ Millis, Seconds, Span } +import org.scalatest.time.{Millis, Seconds, Span} import scala.concurrent.duration._ class SplitAfterSizeSpec(_system: ActorSystem) diff --git a/s3/src/test/scala/akka/stream/alpakka/s3/impl/WireMockBase.scala b/s3/src/test/scala/akka/stream/alpakka/s3/impl/WireMockBase.scala index 6601f9ff75..e4f051c1ff 100644 --- a/s3/src/test/scala/akka/stream/alpakka/s3/impl/WireMockBase.scala +++ b/s3/src/test/scala/akka/stream/alpakka/s3/impl/WireMockBase.scala @@ -12,7 +12,7 @@ import com.github.tomakehurst.wiremock.client.WireMock._ import com.github.tomakehurst.wiremock.core.WireMockConfiguration._ import com.typesafe.config.ConfigFactory import org.scalatest.concurrent.ScalaFutures -import org.scalatest.{ BeforeAndAfterAll, FlatSpecLike, Matchers } +import org.scalatest.{BeforeAndAfterAll, FlatSpecLike, Matchers} import scala.concurrent.duration._ diff --git a/simple-codecs/src/main/scala/akka/stream/alpakka/recordio/javadsl/RecordIOFraming.scala b/simple-codecs/src/main/scala/akka/stream/alpakka/recordio/javadsl/RecordIOFraming.scala index be61c3c587..946a87b64c 100755 --- a/simple-codecs/src/main/scala/akka/stream/alpakka/recordio/javadsl/RecordIOFraming.scala +++ b/simple-codecs/src/main/scala/akka/stream/alpakka/recordio/javadsl/RecordIOFraming.scala @@ -1,28 +1,28 @@ -/* - * Copyright (C) 2017 Lightbend Inc. - */ -package akka.stream.alpakka.recordio.javadsl - -import akka.NotUsed -import akka.stream.javadsl.Flow -import akka.util.ByteString - -// Provides a JSON framing flow that can separate records from an incoming RecordIO-formatted [[ByteString]] stream. -object RecordIOFraming { - - /** - * Returns a flow that parses an incoming RecordIO stream and emits the identified records. - * - * The incoming stream is expected to be a concatenation of records of the format: - * - * [record length]\n[record data] - * - * The parser ignores whitespace before or after each record. It is agnostic to the record data contents. - * - * The flow will emit each record's data as a byte string. - * - * @param maxRecordLength The maximum record length allowed. If a record is indicated to be longer, this Flow will fail the stream. - */ - def scanner(maxRecordLength: Int = Int.MaxValue): Flow[ByteString, ByteString, NotUsed] = - akka.stream.alpakka.recordio.scaladsl.RecordIOFraming.scanner(maxRecordLength).asJava -} +/* + * Copyright (C) 2016 Lightbend Inc. + */ +package akka.stream.alpakka.recordio.javadsl + +import akka.NotUsed +import akka.stream.javadsl.Flow +import akka.util.ByteString + +// Provides a JSON framing flow that can separate records from an incoming RecordIO-formatted [[ByteString]] stream. +object RecordIOFraming { + + /** + * Returns a flow that parses an incoming RecordIO stream and emits the identified records. + * + * The incoming stream is expected to be a concatenation of records of the format: + * + * [record length]\n[record data] + * + * The parser ignores whitespace before or after each record. It is agnostic to the record data contents. + * + * The flow will emit each record's data as a byte string. + * + * @param maxRecordLength The maximum record length allowed. If a record is indicated to be longer, this Flow will fail the stream. + */ + def scanner(maxRecordLength: Int = Int.MaxValue): Flow[ByteString, ByteString, NotUsed] = + akka.stream.alpakka.recordio.scaladsl.RecordIOFraming.scanner(maxRecordLength).asJava +} diff --git a/simple-codecs/src/main/scala/akka/stream/alpakka/recordio/scaladsl/RecordIOFraming.scala b/simple-codecs/src/main/scala/akka/stream/alpakka/recordio/scaladsl/RecordIOFraming.scala index 06dad9417b..e1e9310608 100644 --- a/simple-codecs/src/main/scala/akka/stream/alpakka/recordio/scaladsl/RecordIOFraming.scala +++ b/simple-codecs/src/main/scala/akka/stream/alpakka/recordio/scaladsl/RecordIOFraming.scala @@ -1,5 +1,5 @@ /* - * Copyright (C) 2017 Lightbend Inc. + * Copyright (C) 2016 Lightbend Inc. */ package akka.stream.alpakka.recordio.scaladsl @@ -18,18 +18,18 @@ import scala.util.{Failure, Success, Try} object RecordIOFraming { /** - * Returns a flow that parses an incoming RecordIO stream and emits the identified records. - * - * The incoming stream is expected to be a concatenation of records of the format: - * - * [record length]\n[record data] - * - * The parser ignores whitespace before or after each record. It is agnostic to the record data contents. - * - * The flow will emit each record's data as a byte string. - * - * @param maxRecordLength The maximum record length allowed. If a record is indicated to be longer, this Flow will fail the stream. - */ + * Returns a flow that parses an incoming RecordIO stream and emits the identified records. + * + * The incoming stream is expected to be a concatenation of records of the format: + * + * [record length]\n[record data] + * + * The parser ignores whitespace before or after each record. It is agnostic to the record data contents. + * + * The flow will emit each record's data as a byte string. + * + * @param maxRecordLength The maximum record length allowed. If a record is indicated to be longer, this Flow will fail the stream. + */ def scanner(maxRecordLength: Int = Int.MaxValue): Flow[ByteString, ByteString, NotUsed] = Flow[ByteString].via(new RecordIOFramingStage(maxRecordLength)).named("recordIOFraming") @@ -98,8 +98,7 @@ object RecordIOFraming { trimWhitespace() buffer.indexOf(LineFeed) match { case -1 if buffer.size > maxRecordPrefixLength => - failStage( - new FramingException(s"Record size prefix is longer than $maxRecordPrefixLength bytes.")) + failStage(new FramingException(s"Record size prefix is longer than $maxRecordPrefixLength bytes.")) case -1 if isClosed(in) && buffer.isEmpty => completeStage() case -1 => @@ -114,9 +113,7 @@ object RecordIOFraming { new FramingException( s"Record of size $length bytes exceeds maximum of $maxRecordLength bytes.")) case Success(length) if length < 0 => - failStage( - new FramingException( - s"Record size prefix $length is negative.")) + failStage(new FramingException(s"Record size prefix $length is negative.")) case Success(length) => currentRecordLength = Some(length) doParse() diff --git a/simple-codecs/src/test/scala/akka/stream/alpakka/recordio/RecordIOFramingSpec.scala b/simple-codecs/src/test/scala/akka/stream/alpakka/recordio/RecordIOFramingSpec.scala index 28e11a8d0e..01cac158f3 100644 --- a/simple-codecs/src/test/scala/akka/stream/alpakka/recordio/RecordIOFramingSpec.scala +++ b/simple-codecs/src/test/scala/akka/stream/alpakka/recordio/RecordIOFramingSpec.scala @@ -1,5 +1,5 @@ /* - * Copyright (C) 2017 Lightbend Inc. + * Copyright (C) 2016 Lightbend Inc. */ package akka.stream.alpakka.recordio diff --git a/sqs/src/main/scala/akka/stream/alpakka/sqs/SqsSinkStage.scala b/sqs/src/main/scala/akka/stream/alpakka/sqs/SqsSinkStage.scala index c09baa0487..9254e247a2 100644 --- a/sqs/src/main/scala/akka/stream/alpakka/sqs/SqsSinkStage.scala +++ b/sqs/src/main/scala/akka/stream/alpakka/sqs/SqsSinkStage.scala @@ -5,12 +5,12 @@ package akka.stream.alpakka.sqs import akka.Done import akka.stream._ -import akka.stream.stage.{ GraphStageLogic, GraphStageWithMaterializedValue, InHandler, StageLogging } +import akka.stream.stage.{GraphStageLogic, GraphStageWithMaterializedValue, InHandler, StageLogging} import com.amazonaws.handlers.AsyncHandler import com.amazonaws.services.sqs.AmazonSQSAsync -import com.amazonaws.services.sqs.model.{ SendMessageRequest, SendMessageResult } +import com.amazonaws.services.sqs.model.{SendMessageRequest, SendMessageResult} -import scala.concurrent.{ Future, Promise } +import scala.concurrent.{Future, Promise} object SqsSinkSettings { val Defaults = SqsSinkSettings(maxInFlight = 10) diff --git a/sqs/src/main/scala/akka/stream/alpakka/sqs/SqsSourceStage.scala b/sqs/src/main/scala/akka/stream/alpakka/sqs/SqsSourceStage.scala index d7009124ce..9bbde74f1e 100644 --- a/sqs/src/main/scala/akka/stream/alpakka/sqs/SqsSourceStage.scala +++ b/sqs/src/main/scala/akka/stream/alpakka/sqs/SqsSourceStage.scala @@ -5,11 +5,11 @@ package akka.stream.alpakka.sqs import java.util -import akka.stream.stage.{ GraphStage, GraphStageLogic, OutHandler } -import akka.stream.{ Attributes, Outlet, SourceShape } +import akka.stream.stage.{GraphStage, GraphStageLogic, OutHandler} +import akka.stream.{Attributes, Outlet, SourceShape} import com.amazonaws.handlers.AsyncHandler import com.amazonaws.services.sqs.AmazonSQSAsyncClient -import com.amazonaws.services.sqs.model.{ Message, ReceiveMessageRequest, ReceiveMessageResult } +import com.amazonaws.services.sqs.model.{Message, ReceiveMessageRequest, ReceiveMessageResult} import scala.collection.JavaConverters._ diff --git a/sqs/src/main/scala/akka/stream/alpakka/sqs/javadsl/SqsSink.scala b/sqs/src/main/scala/akka/stream/alpakka/sqs/javadsl/SqsSink.scala index 016e65e432..b5b6a4ff1e 100644 --- a/sqs/src/main/scala/akka/stream/alpakka/sqs/javadsl/SqsSink.scala +++ b/sqs/src/main/scala/akka/stream/alpakka/sqs/javadsl/SqsSink.scala @@ -4,7 +4,7 @@ package akka.stream.alpakka.sqs.javadsl import akka.Done -import akka.stream.alpakka.sqs.{ SqsSinkSettings, SqsSinkStage, SqsSourceStage } +import akka.stream.alpakka.sqs.{SqsSinkSettings, SqsSinkStage, SqsSourceStage} import akka.stream.javadsl.Sink import com.amazonaws.services.sqs.AmazonSQSAsync diff --git a/sqs/src/main/scala/akka/stream/alpakka/sqs/javadsl/SqsSource.scala b/sqs/src/main/scala/akka/stream/alpakka/sqs/javadsl/SqsSource.scala index 4fef359b8c..3af90110cd 100644 --- a/sqs/src/main/scala/akka/stream/alpakka/sqs/javadsl/SqsSource.scala +++ b/sqs/src/main/scala/akka/stream/alpakka/sqs/javadsl/SqsSource.scala @@ -4,7 +4,7 @@ package akka.stream.alpakka.sqs.javadsl import akka.NotUsed -import akka.stream.alpakka.sqs.{ SqsSourceSettings, SqsSourceStage } +import akka.stream.alpakka.sqs.{SqsSourceSettings, SqsSourceStage} import akka.stream.javadsl.Source import com.amazonaws.services.sqs.AmazonSQSAsyncClient import com.amazonaws.services.sqs.model.Message diff --git a/sqs/src/main/scala/akka/stream/alpakka/sqs/scaladsl/SqsSink.scala b/sqs/src/main/scala/akka/stream/alpakka/sqs/scaladsl/SqsSink.scala index 6e23695863..f7701d9a36 100644 --- a/sqs/src/main/scala/akka/stream/alpakka/sqs/scaladsl/SqsSink.scala +++ b/sqs/src/main/scala/akka/stream/alpakka/sqs/scaladsl/SqsSink.scala @@ -4,7 +4,7 @@ package akka.stream.alpakka.sqs.scaladsl import akka.Done -import akka.stream.alpakka.sqs.{ SqsSinkSettings, SqsSinkStage } +import akka.stream.alpakka.sqs.{SqsSinkSettings, SqsSinkStage} import akka.stream.scaladsl.Sink import com.amazonaws.services.sqs.AmazonSQSAsync diff --git a/sqs/src/main/scala/akka/stream/alpakka/sqs/scaladsl/SqsSource.scala b/sqs/src/main/scala/akka/stream/alpakka/sqs/scaladsl/SqsSource.scala index 2f879f5dab..d7a556e841 100644 --- a/sqs/src/main/scala/akka/stream/alpakka/sqs/scaladsl/SqsSource.scala +++ b/sqs/src/main/scala/akka/stream/alpakka/sqs/scaladsl/SqsSource.scala @@ -4,7 +4,7 @@ package akka.stream.alpakka.sqs.scaladsl import akka.NotUsed -import akka.stream.alpakka.sqs.{ SqsSourceSettings, SqsSourceStage } +import akka.stream.alpakka.sqs.{SqsSourceSettings, SqsSourceStage} import akka.stream.scaladsl.Source import com.amazonaws.services.sqs.AmazonSQSAsyncClient import com.amazonaws.services.sqs.model.Message diff --git a/sqs/src/test/scala/akka/stream/alpakka/sqs/scaladsl/DefaultTestContext.scala b/sqs/src/test/scala/akka/stream/alpakka/sqs/scaladsl/DefaultTestContext.scala index 090fad9331..0363724a87 100644 --- a/sqs/src/test/scala/akka/stream/alpakka/sqs/scaladsl/DefaultTestContext.scala +++ b/sqs/src/test/scala/akka/stream/alpakka/sqs/scaladsl/DefaultTestContext.scala @@ -7,7 +7,7 @@ import akka.actor.ActorSystem import akka.stream.ActorMaterializer import com.amazonaws.auth.BasicAWSCredentials import com.amazonaws.services.sqs.AmazonSQSAsyncClient -import org.scalatest.{ BeforeAndAfterAll, Suite, Tag } +import org.scalatest.{BeforeAndAfterAll, Suite, Tag} import scala.concurrent.Await import scala.concurrent.duration._ diff --git a/sqs/src/test/scala/akka/stream/alpakka/sqs/scaladsl/SqsSinkSettingsSpec.scala b/sqs/src/test/scala/akka/stream/alpakka/sqs/scaladsl/SqsSinkSettingsSpec.scala index 6a1955e586..b93c2c793e 100644 --- a/sqs/src/test/scala/akka/stream/alpakka/sqs/scaladsl/SqsSinkSettingsSpec.scala +++ b/sqs/src/test/scala/akka/stream/alpakka/sqs/scaladsl/SqsSinkSettingsSpec.scala @@ -4,7 +4,7 @@ package akka.stream.alpakka.sqs.scaladsl import akka.stream.alpakka.sqs.SqsSinkSettings -import org.scalatest.{ FlatSpec, Matchers } +import org.scalatest.{FlatSpec, Matchers} class SqsSinkSettingsSpec extends FlatSpec with Matchers { diff --git a/sqs/src/test/scala/akka/stream/alpakka/sqs/scaladsl/SqsSinkSpec.scala b/sqs/src/test/scala/akka/stream/alpakka/sqs/scaladsl/SqsSinkSpec.scala index 7840a79825..61f1e22bc7 100644 --- a/sqs/src/test/scala/akka/stream/alpakka/sqs/scaladsl/SqsSinkSpec.scala +++ b/sqs/src/test/scala/akka/stream/alpakka/sqs/scaladsl/SqsSinkSpec.scala @@ -3,20 +3,20 @@ */ package akka.stream.alpakka.sqs.scaladsl -import java.util.concurrent.{ CompletableFuture, Future } +import java.util.concurrent.{CompletableFuture, Future} import akka.Done import akka.stream.scaladsl.Keep import akka.stream.testkit.scaladsl.TestSource import com.amazonaws.handlers.AsyncHandler import com.amazonaws.services.sqs.AmazonSQSAsync -import com.amazonaws.services.sqs.model.{ SendMessageRequest, SendMessageResult } +import com.amazonaws.services.sqs.model.{SendMessageRequest, SendMessageResult} import org.mockito.ArgumentMatchers._ import org.mockito.Mockito._ import org.mockito.invocation.InvocationOnMock import org.mockito.stubbing.Answer import org.scalatest.mockito.MockitoSugar.mock -import org.scalatest.{ FlatSpec, Matchers } +import org.scalatest.{FlatSpec, Matchers} import scala.concurrent.Await import scala.concurrent.duration._ diff --git a/sqs/src/test/scala/akka/stream/alpakka/sqs/scaladsl/SqsSourceSettingsSpec.scala b/sqs/src/test/scala/akka/stream/alpakka/sqs/scaladsl/SqsSourceSettingsSpec.scala index 6f033d9ed8..a4322ea23c 100644 --- a/sqs/src/test/scala/akka/stream/alpakka/sqs/scaladsl/SqsSourceSettingsSpec.scala +++ b/sqs/src/test/scala/akka/stream/alpakka/sqs/scaladsl/SqsSourceSettingsSpec.scala @@ -4,7 +4,7 @@ package akka.stream.alpakka.sqs.scaladsl import akka.stream.alpakka.sqs.SqsSourceSettings -import org.scalatest.{ FlatSpec, Matchers } +import org.scalatest.{FlatSpec, Matchers} class SqsSourceSettingsSpec extends FlatSpec with Matchers { diff --git a/sqs/src/test/scala/akka/stream/alpakka/sqs/scaladsl/SqsSourceSpec.scala b/sqs/src/test/scala/akka/stream/alpakka/sqs/scaladsl/SqsSourceSpec.scala index 023b0f6533..ad6469aeaf 100644 --- a/sqs/src/test/scala/akka/stream/alpakka/sqs/scaladsl/SqsSourceSpec.scala +++ b/sqs/src/test/scala/akka/stream/alpakka/sqs/scaladsl/SqsSourceSpec.scala @@ -7,7 +7,7 @@ import akka.stream.alpakka.sqs.SqsSourceSettings import akka.stream.scaladsl.Sink import com.amazonaws.services.sqs.model.QueueDoesNotExistException import org.scalatest.concurrent.ScalaFutures -import org.scalatest.{ AsyncWordSpec, Matchers } +import org.scalatest.{AsyncWordSpec, Matchers} class SqsSourceSpec extends AsyncWordSpec with ScalaFutures with Matchers with DefaultTestContext { diff --git a/sqs/src/test/scala/akka/stream/alpakka/sqs/scaladsl/SqsSpec.scala b/sqs/src/test/scala/akka/stream/alpakka/sqs/scaladsl/SqsSpec.scala index f29f2ffff9..8154214be0 100644 --- a/sqs/src/test/scala/akka/stream/alpakka/sqs/scaladsl/SqsSpec.scala +++ b/sqs/src/test/scala/akka/stream/alpakka/sqs/scaladsl/SqsSpec.scala @@ -7,7 +7,7 @@ import akka.Done import akka.stream.scaladsl.Source import akka.stream.testkit.scaladsl.TestSink import com.amazonaws.services.sqs.model.Message -import org.scalatest.{ FlatSpec, Matchers } +import org.scalatest.{FlatSpec, Matchers} import scala.concurrent.Await import scala.concurrent.duration._