diff --git a/.travis.yml b/.travis.yml index cd77bdb8fa..d470b3665f 100644 --- a/.travis.yml +++ b/.travis.yml @@ -24,8 +24,8 @@ jobs: - env: CMD="++2.11.12 Test/compile" name: "Compile all code with Scala 2.11" if: type != cron - - env: CMD="++2.12.7 Test/compile" - name: "Compile all code with Scala 2.12 and fatal warnings enabled. Run locally with: env CI=true sbt ++2.12.7 Test/compile" + - env: CMD="++2.12.9 Test/compile" + name: "Compile all code with Scala 2.12 and fatal warnings enabled. Run locally with: env CI=true sbt ++2.12.9 Test/compile" - env: CMD="++2.13.0 Test/compile" name: "Compile all code with Scala 2.13" - env: CMD="unidoc" @@ -144,8 +144,8 @@ jobs: - stage: publish env: CMD="++2.11.12 publish" name: "Publish artifacts for Scala 2.11.12" - - env: CMD="++2.12.7 publish" - name: "Publish artifacts for Scala 2.12.7" + - env: CMD="++2.12.9 publish" + name: "Publish artifacts for Scala 2.12.9" - env: CMD="++2.13.0 publish" name: "Publish artifacts for Scala 2.13.0" - script: openssl aes-256-cbc -K $encrypted_bbf1dc4f2a07_key -iv $encrypted_bbf1dc4f2a07_iv -in .travis/travis_alpakka_rsa.enc -out .travis/id_rsa -d && eval "$(ssh-agent -s)" && chmod 600 .travis/id_rsa && ssh-add .travis/id_rsa && sbt -jvm-opts .jvmopts-travis docs/publishRsync diff --git a/build.sbt b/build.sbt index 30862c4b41..30a986e48c 100644 --- a/build.sbt +++ b/build.sbt @@ -127,8 +127,7 @@ lazy val ftp = alpakkaProject( parallelExecution in Test := false, fork in Test := true, // To avoid potential blocking in machines with low entropy (default is `/dev/random`) - javaOptions in Test += "-Djava.security.egd=file:/dev/./urandom", - crossScalaVersions -= Dependencies.Scala213 // https://github.com/akka/alpakka/issues/1532 + javaOptions in Test += "-Djava.security.egd=file:/dev/./urandom" ) lazy val geode = diff --git a/ftp/src/main/mima-filters/1.1.x.backwards.excludes b/ftp/src/main/mima-filters/1.1.x.backwards.excludes index b59761ced4..e0d0cc099d 100644 --- a/ftp/src/main/mima-filters/1.1.x.backwards.excludes +++ b/ftp/src/main/mima-filters/1.1.x.backwards.excludes @@ -1,2 +1,5 @@ -# Allow changes to impl -ProblemFilters.exclude[Problem]("akka.stream.alpakka.ftp.impl.*") \ No newline at end of file +# Restructure to enable compilation with Scala 2.12.8+ PR #1779 +ProblemFilters.exclude[DirectAbstractMethodProblem]("akka.stream.alpakka.ftp.javadsl.FtpApi.*") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.alpakka.ftp.javadsl.FtpApi.ftpLike") +ProblemFilters.exclude[DirectAbstractMethodProblem]("akka.stream.alpakka.ftp.scaladsl.FtpApi.*") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.alpakka.ftp.scaladsl.FtpApi.ftpLike") diff --git a/ftp/src/main/scala/akka/stream/alpakka/ftp/impl/CommonFtpOperations.scala b/ftp/src/main/scala/akka/stream/alpakka/ftp/impl/CommonFtpOperations.scala index a2f6db47d4..5df4a62ae8 100644 --- a/ftp/src/main/scala/akka/stream/alpakka/ftp/impl/CommonFtpOperations.scala +++ b/ftp/src/main/scala/akka/stream/alpakka/ftp/impl/CommonFtpOperations.scala @@ -48,17 +48,17 @@ private[ftp] trait CommonFtpOperations { private def getPosixFilePermissions(file: FTPFile) = Map( - PosixFilePermission.OWNER_READ → file.hasPermission(FTPFile.USER_ACCESS, FTPFile.READ_PERMISSION), - PosixFilePermission.OWNER_WRITE → file.hasPermission(FTPFile.USER_ACCESS, FTPFile.WRITE_PERMISSION), - PosixFilePermission.OWNER_EXECUTE → file.hasPermission(FTPFile.USER_ACCESS, FTPFile.EXECUTE_PERMISSION), - PosixFilePermission.GROUP_READ → file.hasPermission(FTPFile.GROUP_ACCESS, FTPFile.READ_PERMISSION), - PosixFilePermission.GROUP_WRITE → file.hasPermission(FTPFile.GROUP_ACCESS, FTPFile.WRITE_PERMISSION), - PosixFilePermission.GROUP_EXECUTE → file.hasPermission(FTPFile.GROUP_ACCESS, FTPFile.EXECUTE_PERMISSION), - PosixFilePermission.OTHERS_READ → file.hasPermission(FTPFile.WORLD_ACCESS, FTPFile.READ_PERMISSION), - PosixFilePermission.OTHERS_WRITE → file.hasPermission(FTPFile.WORLD_ACCESS, FTPFile.WRITE_PERMISSION), - PosixFilePermission.OTHERS_EXECUTE → file.hasPermission(FTPFile.WORLD_ACCESS, FTPFile.EXECUTE_PERMISSION) + PosixFilePermission.OWNER_READ -> file.hasPermission(FTPFile.USER_ACCESS, FTPFile.READ_PERMISSION), + PosixFilePermission.OWNER_WRITE -> file.hasPermission(FTPFile.USER_ACCESS, FTPFile.WRITE_PERMISSION), + PosixFilePermission.OWNER_EXECUTE -> file.hasPermission(FTPFile.USER_ACCESS, FTPFile.EXECUTE_PERMISSION), + PosixFilePermission.GROUP_READ -> file.hasPermission(FTPFile.GROUP_ACCESS, FTPFile.READ_PERMISSION), + PosixFilePermission.GROUP_WRITE -> file.hasPermission(FTPFile.GROUP_ACCESS, FTPFile.WRITE_PERMISSION), + PosixFilePermission.GROUP_EXECUTE -> file.hasPermission(FTPFile.GROUP_ACCESS, FTPFile.EXECUTE_PERMISSION), + PosixFilePermission.OTHERS_READ -> file.hasPermission(FTPFile.WORLD_ACCESS, FTPFile.READ_PERMISSION), + PosixFilePermission.OTHERS_WRITE -> file.hasPermission(FTPFile.WORLD_ACCESS, FTPFile.WRITE_PERMISSION), + PosixFilePermission.OTHERS_EXECUTE -> file.hasPermission(FTPFile.WORLD_ACCESS, FTPFile.EXECUTE_PERMISSION) ).collect { - case (perm, true) ⇒ perm + case (perm, true) => perm }.toSet def listFiles(handler: Handler): immutable.Seq[FtpFile] = listFiles("", handler) diff --git a/ftp/src/main/scala/akka/stream/alpakka/ftp/impl/FtpIOGraphStage.scala b/ftp/src/main/scala/akka/stream/alpakka/ftp/impl/FtpIOGraphStage.scala index 310251bc0b..d13efd8281 100644 --- a/ftp/src/main/scala/akka/stream/alpakka/ftp/impl/FtpIOGraphStage.scala +++ b/ftp/src/main/scala/akka/stream/alpakka/ftp/impl/FtpIOGraphStage.scala @@ -178,7 +178,7 @@ private[ftp] trait FtpIOSinkStage[FtpClient, S <: RemoteFileSettings] write(grab(in)) pull(in) } catch { - case NonFatal(e) ⇒ + case NonFatal(e) => failed = true matFailure(e) failStage(e) diff --git a/ftp/src/main/scala/akka/stream/alpakka/ftp/impl/FtpSourceFactory.scala b/ftp/src/main/scala/akka/stream/alpakka/ftp/impl/FtpSourceFactory.scala index 9170bb8000..80bd4a1a6b 100644 --- a/ftp/src/main/scala/akka/stream/alpakka/ftp/impl/FtpSourceFactory.scala +++ b/ftp/src/main/scala/akka/stream/alpakka/ftp/impl/FtpSourceFactory.scala @@ -16,9 +16,7 @@ import org.apache.commons.net.ftp.{FTPClient, FTPSClient} * INTERNAL API */ @InternalApi -private[ftp] trait FtpSourceFactory[FtpClient] { self => - - type S <: RemoteFileSettings +private[ftp] trait FtpSourceFactory[FtpClient, S <: RemoteFileSettings] { self => protected[this] final val DefaultChunkSize = 8192 @@ -140,7 +138,7 @@ private[ftp] trait FtpSourceFactory[FtpClient] { self => * INTERNAL API */ @InternalApi -private[ftp] trait FtpSource extends FtpSourceFactory[FTPClient] { +private[ftp] trait FtpSource extends FtpSourceFactory[FTPClient, FtpSettings] { protected final val FtpBrowserSourceName = "FtpBrowserSource" protected final val FtpIOSourceName = "FtpIOSource" protected final val FtpDirectorySource = "FtpDirectorySource" @@ -157,7 +155,7 @@ private[ftp] trait FtpSource extends FtpSourceFactory[FTPClient] { * INTERNAL API */ @InternalApi -private[ftp] trait FtpsSource extends FtpSourceFactory[FTPSClient] { +private[ftp] trait FtpsSource extends FtpSourceFactory[FTPSClient, FtpsSettings] { protected final val FtpsBrowserSourceName = "FtpsBrowserSource" protected final val FtpsIOSourceName = "FtpsIOSource" protected final val FtpsDirectorySource = "FtpsDirectorySource" @@ -174,7 +172,7 @@ private[ftp] trait FtpsSource extends FtpSourceFactory[FTPSClient] { * INTERNAL API */ @InternalApi -private[ftp] trait SftpSource extends FtpSourceFactory[SSHClient] { +private[ftp] trait SftpSource extends FtpSourceFactory[SSHClient, SftpSettings] { protected final val sFtpBrowserSourceName = "sFtpBrowserSource" protected final val sFtpIOSourceName = "sFtpIOSource" protected final val sFtpDirectorySource = "sFtpDirectorySource" @@ -195,8 +193,8 @@ private[ftp] trait SftpSource extends FtpSourceFactory[SSHClient] { private[ftp] trait FtpDefaultSettings { protected def defaultSettings( hostname: String, - username: Option[String], - password: Option[String] + username: Option[String] = None, + password: Option[String] = None ): FtpSettings = FtpSettings( InetAddress.getByName(hostname) @@ -215,8 +213,8 @@ private[ftp] trait FtpDefaultSettings { private[ftp] trait FtpsDefaultSettings { protected def defaultSettings( hostname: String, - username: Option[String], - password: Option[String] + username: Option[String] = None, + password: Option[String] = None ): FtpsSettings = FtpsSettings( InetAddress.getByName(hostname) @@ -235,8 +233,8 @@ private[ftp] trait FtpsDefaultSettings { private[ftp] trait SftpDefaultSettings { protected def defaultSettings( hostname: String, - username: Option[String], - password: Option[String] + username: Option[String] = None, + password: Option[String] = None ): SftpSettings = SftpSettings( InetAddress.getByName(hostname) diff --git a/ftp/src/main/scala/akka/stream/alpakka/ftp/javadsl/FtpApi.scala b/ftp/src/main/scala/akka/stream/alpakka/ftp/javadsl/FtpApi.scala index 84a8ac909a..482d6eb02f 100644 --- a/ftp/src/main/scala/akka/stream/alpakka/ftp/javadsl/FtpApi.scala +++ b/ftp/src/main/scala/akka/stream/alpakka/ftp/javadsl/FtpApi.scala @@ -7,10 +7,10 @@ package akka.stream.alpakka.ftp.javadsl import java.util.concurrent.CompletionStage import java.util.function._ -import akka.stream.alpakka.ftp.impl.{FtpLike, FtpSourceFactory, _} -import akka.stream.alpakka.ftp.{FtpFile, RemoteFileSettings} +import akka.annotation.DoNotInherit +import akka.stream.alpakka.ftp._ +import akka.stream.alpakka.ftp.impl._ import akka.stream.javadsl.{Sink, Source} -import akka.stream.scaladsl.{Sink => ScalaSink, Source => ScalaSource} import akka.stream.{IOResult, Materializer} import akka.util.ByteString import akka.{Done, NotUsed} @@ -19,12 +19,8 @@ import org.apache.commons.net.ftp.{FTPClient, FTPSClient} import scala.compat.java8.FunctionConverters._ -sealed trait FtpApi[FtpClient] { _: FtpSourceFactory[FtpClient] => - - /** - * The refined [[RemoteFileSettings]] type. - */ - type S <: RemoteFileSettings +@DoNotInherit +sealed trait FtpApi[FtpClient, S <: RemoteFileSettings] { _: FtpSourceFactory[FtpClient, S] => /** * Java API: creates a [[akka.stream.javadsl.Source Source]] of [[FtpFile]]s from the remote user `root` directory. @@ -33,8 +29,7 @@ sealed trait FtpApi[FtpClient] { _: FtpSourceFactory[FtpClient] => * @param host FTP, FTPs or SFTP host * @return A [[akka.stream.javadsl.Source Source]] of [[FtpFile]]s */ - def ls(host: String): Source[FtpFile, NotUsed] = - ls(host, basePath = "") + def ls(host: String): Source[FtpFile, NotUsed] /** * Java API: creates a [[akka.stream.javadsl.Source Source]] of [[FtpFile]]s from a base path. @@ -47,8 +42,7 @@ sealed trait FtpApi[FtpClient] { _: FtpSourceFactory[FtpClient] => def ls( host: String, basePath: String - ): Source[FtpFile, NotUsed] = - ls(basePath, defaultSettings(host)) + ): Source[FtpFile, NotUsed] /** * Java API: creates a [[akka.stream.javadsl.Source Source]] of [[FtpFile]]s from the remote user `root` directory. @@ -62,8 +56,7 @@ sealed trait FtpApi[FtpClient] { _: FtpSourceFactory[FtpClient] => host: String, username: String, password: String - ): Source[FtpFile, NotUsed] = - ls("", defaultSettings(host, Some(username), Some(password))) + ): Source[FtpFile, NotUsed] /** * Java API: creates a [[akka.stream.javadsl.Source Source]] of [[FtpFile]]s from a base path. @@ -79,8 +72,7 @@ sealed trait FtpApi[FtpClient] { _: FtpSourceFactory[FtpClient] => username: String, password: String, basePath: String - ): Source[FtpFile, NotUsed] = - ls(basePath, defaultSettings(host, Some(username), Some(password))) + ): Source[FtpFile, NotUsed] /** * Java API: creates a [[akka.stream.javadsl.Source Source]] of [[FtpFile]]s from a base path. @@ -92,12 +84,7 @@ sealed trait FtpApi[FtpClient] { _: FtpSourceFactory[FtpClient] => def ls( basePath: String, connectionSettings: S - ): Source[FtpFile, NotUsed] = - ScalaSource - .fromGraph( - createBrowserGraph(basePath, connectionSettings, f => true, _emitTraversedDirectories = false) - ) - .asJava + ): Source[FtpFile, NotUsed] /** * Java API: creates a [[akka.stream.javadsl.Source Source]] of [[FtpFile]]s from a base path. @@ -114,15 +101,7 @@ sealed trait FtpApi[FtpClient] { _: FtpSourceFactory[FtpClient] => * * @return A [[akka.stream.javadsl.Source Source]] of [[FtpFile]]s */ - def ls(basePath: String, connectionSettings: S, branchSelector: Predicate[FtpFile]): Source[FtpFile, NotUsed] = - Source.fromGraph( - createBrowserGraph( - basePath, - connectionSettings, - asScalaFromPredicate(branchSelector), - _emitTraversedDirectories = false - ) - ) + def ls(basePath: String, connectionSettings: S, branchSelector: Predicate[FtpFile]): Source[FtpFile, NotUsed] /** * Java API: creates a [[akka.stream.javadsl.Source Source]] of [[FtpFile]]s from a base path. @@ -143,10 +122,7 @@ sealed trait FtpApi[FtpClient] { _: FtpSourceFactory[FtpClient] => def ls(basePath: String, connectionSettings: S, branchSelector: Predicate[FtpFile], - emitTraversedDirectories: Boolean): Source[FtpFile, NotUsed] = - Source.fromGraph( - createBrowserGraph(basePath, connectionSettings, asScalaFromPredicate(branchSelector), emitTraversedDirectories) - ) + emitTraversedDirectories: Boolean): Source[FtpFile, NotUsed] /** * Java API: creates a [[akka.stream.javadsl.Source Source]] of [[akka.util.ByteString ByteString]] from some file path. @@ -158,8 +134,7 @@ sealed trait FtpApi[FtpClient] { _: FtpSourceFactory[FtpClient] => def fromPath( host: String, path: String - ): Source[ByteString, CompletionStage[IOResult]] = - fromPath(path, defaultSettings(host)) + ): Source[ByteString, CompletionStage[IOResult]] /** * Java API: creates a [[akka.stream.javadsl.Source Source]] of [[akka.util.ByteString ByteString]] from some file path. @@ -175,8 +150,7 @@ sealed trait FtpApi[FtpClient] { _: FtpSourceFactory[FtpClient] => username: String, password: String, path: String - ): Source[ByteString, CompletionStage[IOResult]] = - fromPath(path, defaultSettings(host, Some(username), Some(password))) + ): Source[ByteString, CompletionStage[IOResult]] /** * Java API: creates a [[akka.stream.javadsl.Source Source]] of [[akka.util.ByteString ByteString]] from some file path. @@ -188,8 +162,7 @@ sealed trait FtpApi[FtpClient] { _: FtpSourceFactory[FtpClient] => def fromPath( path: String, connectionSettings: S - ): Source[ByteString, CompletionStage[IOResult]] = - fromPath(path, connectionSettings, DefaultChunkSize) + ): Source[ByteString, CompletionStage[IOResult]] /** * Java API: creates a [[akka.stream.javadsl.Source Source]] of [[akka.util.ByteString ByteString]] from some file path. @@ -203,8 +176,7 @@ sealed trait FtpApi[FtpClient] { _: FtpSourceFactory[FtpClient] => path: String, connectionSettings: S, chunkSize: Int = DefaultChunkSize - ): Source[ByteString, CompletionStage[IOResult]] = - fromPath(path, connectionSettings, chunkSize, 0L) + ): Source[ByteString, CompletionStage[IOResult]] /** * Java API: creates a [[akka.stream.javadsl.Source Source]] of [[akka.util.ByteString ByteString]] from some file path. @@ -220,13 +192,7 @@ sealed trait FtpApi[FtpClient] { _: FtpSourceFactory[FtpClient] => connectionSettings: S, chunkSize: Int, offset: Long - ): Source[ByteString, CompletionStage[IOResult]] = { - import scala.compat.java8.FutureConverters._ - ScalaSource - .fromGraph(createIOSource(path, connectionSettings, chunkSize, offset)) - .mapMaterializedValue(_.toJava) - .asJava - } + ): Source[ByteString, CompletionStage[IOResult]] /** * Java API for creating a directory in a given path @@ -236,8 +202,7 @@ sealed trait FtpApi[FtpClient] { _: FtpSourceFactory[FtpClient] => * @param connectionSettings connection settings * @return [[akka.stream.javadsl.Source Source]] of [[Done]] */ - def mkdir(basePath: String, name: String, connectionSettings: S): Source[Done, NotUsed] = - ScalaSource.fromGraph(createMkdirGraph(basePath, name, connectionSettings)).map(_ => Done.getInstance()).asJava + def mkdir(basePath: String, name: String, connectionSettings: S): Source[Done, NotUsed] /** * Java API for creating a directory in a given path @@ -247,8 +212,7 @@ sealed trait FtpApi[FtpClient] { _: FtpSourceFactory[FtpClient] => * @param connectionSettings connection settings * @return [[java.util.concurrent.CompletionStage CompletionStage]] of [[akka.Done]] indicating a materialized, asynchronous request */ - def mkdirAsync(basePath: String, name: String, connectionSettings: S, mat: Materializer): CompletionStage[Done] = - mkdir(basePath, name, connectionSettings).runWith(Sink.head(), mat) + def mkdirAsync(basePath: String, name: String, connectionSettings: S, mat: Materializer): CompletionStage[Done] /** * Java API: creates a [[akka.stream.javadsl.Sink Sink]] of [[akka.util.ByteString ByteString]] to some file path. @@ -262,10 +226,7 @@ sealed trait FtpApi[FtpClient] { _: FtpSourceFactory[FtpClient] => path: String, connectionSettings: S, append: Boolean - ): Sink[ByteString, CompletionStage[IOResult]] = { - import scala.compat.java8.FutureConverters._ - ScalaSink.fromGraph(createIOSink(path, connectionSettings, append)).mapMaterializedValue(_.toJava).asJava - } + ): Sink[ByteString, CompletionStage[IOResult]] /** * Java API: creates a [[akka.stream.javadsl.Sink Sink]] of [[akka.util.ByteString ByteString]] to some file path. @@ -278,8 +239,7 @@ sealed trait FtpApi[FtpClient] { _: FtpSourceFactory[FtpClient] => def toPath( path: String, connectionSettings: S - ): Sink[ByteString, CompletionStage[IOResult]] = - toPath(path, connectionSettings, append = false) + ): Sink[ByteString, CompletionStage[IOResult]] /** * Java API: creates a [[akka.stream.javadsl.Sink Sink]] of a [[FtpFile]] that moves a file to some file path. @@ -288,15 +248,7 @@ sealed trait FtpApi[FtpClient] { _: FtpSourceFactory[FtpClient] => * @param connectionSettings connection settings * @return A [[akka.stream.javadsl.Sink Sink]] of [[FtpFile]] that materializes to a [[java.util.concurrent.CompletionStage CompletionStage]] of [[IOResult]] */ - def move(destinationPath: Function[FtpFile, String], - connectionSettings: S): Sink[FtpFile, CompletionStage[IOResult]] = { - import scala.compat.java8.FunctionConverters._ - import scala.compat.java8.FutureConverters._ - ScalaSink - .fromGraph(createMoveSink(destinationPath.asScala, connectionSettings)) - .mapMaterializedValue(_.toJava) - .asJava - } + def move(destinationPath: Function[FtpFile, String], connectionSettings: S): Sink[FtpFile, CompletionStage[IOResult]] /** * Java API: creates a [[akka.stream.javadsl.Sink Sink]] of a [[FtpFile]] that removes a file. @@ -304,16 +256,279 @@ sealed trait FtpApi[FtpClient] { _: FtpSourceFactory[FtpClient] => * @param connectionSettings connection settings * @return A [[akka.stream.javadsl.Sink Sink]] of [[FtpFile]] that materializes to a [[java.util.concurrent.CompletionStage CompletionStage]] of [[IOResult]] */ + def remove(connectionSettings: S): Sink[FtpFile, CompletionStage[IOResult]] + + protected[javadsl] def func[T, R](f: T => R): akka.japi.function.Function[T, R] = + new akka.japi.function.Function[T, R] { + override def apply(param: T): R = f(param) + } +} + +object Ftp extends FtpApi[FTPClient, FtpSettings] with FtpSourceParams { + def ls(host: String): Source[FtpFile, NotUsed] = ls(host, basePath = "") + def ls(host: String, basePath: String): Source[FtpFile, NotUsed] = ls(basePath, defaultSettings(host)) + + def ls(host: String, username: String, password: String): Source[FtpFile, NotUsed] = + ls("", defaultSettings(host, Some(username), Some(password))) + + def ls(host: String, username: String, password: String, basePath: String): Source[FtpFile, NotUsed] = + ls(basePath, defaultSettings(host, Some(username), Some(password))) + + def ls(basePath: String, connectionSettings: S): Source[FtpFile, NotUsed] = + Source + .fromGraph(createBrowserGraph(basePath, connectionSettings, _ => true, _emitTraversedDirectories = false)) + + def ls(basePath: String, connectionSettings: S, branchSelector: Predicate[FtpFile]): Source[FtpFile, NotUsed] = + Source.fromGraph( + createBrowserGraph( + basePath, + connectionSettings, + asScalaFromPredicate(branchSelector), + _emitTraversedDirectories = false + ) + ) + + def ls(basePath: String, + connectionSettings: S, + branchSelector: Predicate[FtpFile], + emitTraversedDirectories: Boolean): Source[FtpFile, NotUsed] = + Source.fromGraph( + createBrowserGraph(basePath, connectionSettings, asScalaFromPredicate(branchSelector), emitTraversedDirectories) + ) + + def fromPath(host: String, path: String): Source[ByteString, CompletionStage[IOResult]] = + fromPath(path, defaultSettings(host)) + + def fromPath(host: String, + username: String, + password: String, + path: String): Source[ByteString, CompletionStage[IOResult]] = + fromPath(path, defaultSettings(host, Some(username), Some(password))) + + def fromPath(path: String, connectionSettings: S): Source[ByteString, CompletionStage[IOResult]] = + fromPath(path, connectionSettings, DefaultChunkSize) + + def fromPath(path: String, + connectionSettings: S, + chunkSize: Int = DefaultChunkSize): Source[ByteString, CompletionStage[IOResult]] = + fromPath(path, connectionSettings, chunkSize, 0L) + + def fromPath(path: String, + connectionSettings: S, + chunkSize: Int, + offset: Long): Source[ByteString, CompletionStage[IOResult]] = { + import scala.compat.java8.FutureConverters._ + Source + .fromGraph(createIOSource(path, connectionSettings, chunkSize, offset)) + .mapMaterializedValue(func(_.toJava)) + } + + def mkdir(basePath: String, name: String, connectionSettings: S): Source[Done, NotUsed] = + Source.fromGraph(createMkdirGraph(basePath, name, connectionSettings)).map(func(_ => Done)) + + def mkdirAsync(basePath: String, name: String, connectionSettings: S, mat: Materializer): CompletionStage[Done] = + mkdir(basePath, name, connectionSettings).runWith(Sink.head(), mat) + + def toPath(path: String, connectionSettings: S, append: Boolean): Sink[ByteString, CompletionStage[IOResult]] = { + import scala.compat.java8.FutureConverters._ + Sink.fromGraph(createIOSink(path, connectionSettings, append)).mapMaterializedValue(func(_.toJava)) + } + + def toPath(path: String, connectionSettings: S): Sink[ByteString, CompletionStage[IOResult]] = + toPath(path, connectionSettings, append = false) + + def move(destinationPath: Function[FtpFile, String], + connectionSettings: S): Sink[FtpFile, CompletionStage[IOResult]] = { + import scala.compat.java8.FunctionConverters._ + import scala.compat.java8.FutureConverters._ + Sink + .fromGraph(createMoveSink(destinationPath.asScala, connectionSettings)) + .mapMaterializedValue[CompletionStage[IOResult]](func(_.toJava)) + } + + def remove(connectionSettings: S): Sink[FtpFile, CompletionStage[IOResult]] = { + import scala.compat.java8.FutureConverters._ + Sink.fromGraph(createRemoveSink(connectionSettings)).mapMaterializedValue(func(_.toJava)) + } + +} +object Ftps extends FtpApi[FTPSClient, FtpsSettings] with FtpsSourceParams { + def ls(host: String): Source[FtpFile, NotUsed] = ls(host, basePath = "") + def ls(host: String, basePath: String): Source[FtpFile, NotUsed] = ls(basePath, defaultSettings(host)) + + def ls(host: String, username: String, password: String): Source[FtpFile, NotUsed] = + ls("", defaultSettings(host, Some(username), Some(password))) + + def ls(host: String, username: String, password: String, basePath: String): Source[FtpFile, NotUsed] = + ls(basePath, defaultSettings(host, Some(username), Some(password))) + + def ls(basePath: String, connectionSettings: S): Source[FtpFile, NotUsed] = + Source + .fromGraph(createBrowserGraph(basePath, connectionSettings, _ => true, _emitTraversedDirectories = false)) + + def ls(basePath: String, connectionSettings: S, branchSelector: Predicate[FtpFile]): Source[FtpFile, NotUsed] = + Source.fromGraph( + createBrowserGraph( + basePath, + connectionSettings, + asScalaFromPredicate(branchSelector), + _emitTraversedDirectories = false + ) + ) + + def ls(basePath: String, + connectionSettings: S, + branchSelector: Predicate[FtpFile], + emitTraversedDirectories: Boolean): Source[FtpFile, NotUsed] = + Source.fromGraph( + createBrowserGraph(basePath, connectionSettings, asScalaFromPredicate(branchSelector), emitTraversedDirectories) + ) + + def fromPath(host: String, path: String): Source[ByteString, CompletionStage[IOResult]] = + fromPath(path, defaultSettings(host)) + + def fromPath(host: String, + username: String, + password: String, + path: String): Source[ByteString, CompletionStage[IOResult]] = + fromPath(path, defaultSettings(host, Some(username), Some(password))) + + def fromPath(path: String, connectionSettings: S): Source[ByteString, CompletionStage[IOResult]] = + fromPath(path, connectionSettings, DefaultChunkSize) + + def fromPath(path: String, + connectionSettings: S, + chunkSize: Int = DefaultChunkSize): Source[ByteString, CompletionStage[IOResult]] = + fromPath(path, connectionSettings, chunkSize, 0L) + + def fromPath(path: String, + connectionSettings: S, + chunkSize: Int, + offset: Long): Source[ByteString, CompletionStage[IOResult]] = { + import scala.compat.java8.FutureConverters._ + Source + .fromGraph(createIOSource(path, connectionSettings, chunkSize, offset)) + .mapMaterializedValue(func(_.toJava)) + } + + def mkdir(basePath: String, name: String, connectionSettings: S): Source[Done, NotUsed] = + Source.fromGraph(createMkdirGraph(basePath, name, connectionSettings)).map(func(_ => Done)) + + def mkdirAsync(basePath: String, name: String, connectionSettings: S, mat: Materializer): CompletionStage[Done] = + mkdir(basePath, name, connectionSettings).runWith(Sink.head(), mat) + + def toPath(path: String, connectionSettings: S, append: Boolean): Sink[ByteString, CompletionStage[IOResult]] = { + import scala.compat.java8.FutureConverters._ + Sink.fromGraph(createIOSink(path, connectionSettings, append)).mapMaterializedValue(func(_.toJava)) + } + + def toPath(path: String, connectionSettings: S): Sink[ByteString, CompletionStage[IOResult]] = + toPath(path, connectionSettings, append = false) + + def move(destinationPath: Function[FtpFile, String], + connectionSettings: S): Sink[FtpFile, CompletionStage[IOResult]] = { + import scala.compat.java8.FunctionConverters._ + import scala.compat.java8.FutureConverters._ + Sink + .fromGraph(createMoveSink(destinationPath.asScala, connectionSettings)) + .mapMaterializedValue(func(_.toJava)) + } + + def remove(connectionSettings: S): Sink[FtpFile, CompletionStage[IOResult]] = { + import scala.compat.java8.FutureConverters._ + Sink.fromGraph(createRemoveSink(connectionSettings)).mapMaterializedValue(func(_.toJava)) + } + +} + +class SftpApi extends FtpApi[SSHClient, SftpSettings] with SftpSourceParams { + def ls(host: String): Source[FtpFile, NotUsed] = ls(host, basePath = "") + def ls(host: String, basePath: String): Source[FtpFile, NotUsed] = ls(basePath, defaultSettings(host)) + + def ls(host: String, username: String, password: String): Source[FtpFile, NotUsed] = + ls("", defaultSettings(host, Some(username), Some(password))) + + def ls(host: String, username: String, password: String, basePath: String): Source[FtpFile, NotUsed] = + ls(basePath, defaultSettings(host, Some(username), Some(password))) + + def ls(basePath: String, connectionSettings: S): Source[FtpFile, NotUsed] = + Source + .fromGraph(createBrowserGraph(basePath, connectionSettings, _ => true, _emitTraversedDirectories = false)) + + def ls(basePath: String, connectionSettings: S, branchSelector: Predicate[FtpFile]): Source[FtpFile, NotUsed] = + Source.fromGraph( + createBrowserGraph( + basePath, + connectionSettings, + asScalaFromPredicate(branchSelector), + _emitTraversedDirectories = false + ) + ) + + def ls(basePath: String, + connectionSettings: S, + branchSelector: Predicate[FtpFile], + emitTraversedDirectories: Boolean): Source[FtpFile, NotUsed] = + Source.fromGraph( + createBrowserGraph(basePath, connectionSettings, asScalaFromPredicate(branchSelector), emitTraversedDirectories) + ) + + def fromPath(host: String, path: String): Source[ByteString, CompletionStage[IOResult]] = + fromPath(path, defaultSettings(host)) + + def fromPath(host: String, + username: String, + password: String, + path: String): Source[ByteString, CompletionStage[IOResult]] = + fromPath(path, defaultSettings(host, Some(username), Some(password))) + + def fromPath(path: String, connectionSettings: S): Source[ByteString, CompletionStage[IOResult]] = + fromPath(path, connectionSettings, DefaultChunkSize) + + def fromPath(path: String, + connectionSettings: S, + chunkSize: Int = DefaultChunkSize): Source[ByteString, CompletionStage[IOResult]] = + fromPath(path, connectionSettings, chunkSize, 0L) + + def fromPath(path: String, + connectionSettings: S, + chunkSize: Int, + offset: Long): Source[ByteString, CompletionStage[IOResult]] = { + import scala.compat.java8.FutureConverters._ + Source + .fromGraph(createIOSource(path, connectionSettings, chunkSize, offset)) + .mapMaterializedValue(func(_.toJava)) + } + + def mkdir(basePath: String, name: String, connectionSettings: S): Source[Done, NotUsed] = + Source.fromGraph(createMkdirGraph(basePath, name, connectionSettings)).map(func(_ => Done)) + + def mkdirAsync(basePath: String, name: String, connectionSettings: S, mat: Materializer): CompletionStage[Done] = + mkdir(basePath, name, connectionSettings).runWith(Sink.head(), mat) + + def toPath(path: String, connectionSettings: S, append: Boolean): Sink[ByteString, CompletionStage[IOResult]] = { + import scala.compat.java8.FutureConverters._ + Sink.fromGraph(createIOSink(path, connectionSettings, append)).mapMaterializedValue(func(_.toJava)) + } + + def toPath(path: String, connectionSettings: S): Sink[ByteString, CompletionStage[IOResult]] = + toPath(path, connectionSettings, append = false) + + def move(destinationPath: Function[FtpFile, String], + connectionSettings: S): Sink[FtpFile, CompletionStage[IOResult]] = { + import scala.compat.java8.FunctionConverters._ + import scala.compat.java8.FutureConverters._ + Sink + .fromGraph(createMoveSink(destinationPath.asScala, connectionSettings)) + .mapMaterializedValue(func(_.toJava)) + } + def remove(connectionSettings: S): Sink[FtpFile, CompletionStage[IOResult]] = { import scala.compat.java8.FutureConverters._ - ScalaSink.fromGraph(createRemoveSink(connectionSettings)).mapMaterializedValue(_.toJava).asJava + Sink.fromGraph(createRemoveSink(connectionSettings)).mapMaterializedValue(func(_.toJava)) } - protected[this] implicit def ftpLike: FtpLike[FtpClient, S] } -class SftpApi extends FtpApi[SSHClient] with SftpSourceParams -object Ftp extends FtpApi[FTPClient] with FtpSourceParams -object Ftps extends FtpApi[FTPSClient] with FtpsSourceParams object Sftp extends SftpApi { /** @@ -324,6 +539,6 @@ object Sftp extends SftpApi { */ def create(customSshClient: SSHClient): SftpApi = new SftpApi { - override val sshClient = customSshClient + override val sshClient: SSHClient = customSshClient } } diff --git a/ftp/src/main/scala/akka/stream/alpakka/ftp/model.scala b/ftp/src/main/scala/akka/stream/alpakka/ftp/model.scala index 83c7e485f7..33ae981ce8 100644 --- a/ftp/src/main/scala/akka/stream/alpakka/ftp/model.scala +++ b/ftp/src/main/scala/akka/stream/alpakka/ftp/model.scala @@ -7,7 +7,7 @@ package akka.stream.alpakka.ftp import java.net.InetAddress import java.nio.file.attribute.PosixFilePermission -import akka.annotation.InternalApi +import akka.annotation.{DoNotInherit, InternalApi} import org.apache.commons.net.ftp.{FTPClient, FTPSClient} /** @@ -35,6 +35,7 @@ final case class FtpFile( /** * Common remote file settings. */ +@DoNotInherit sealed abstract class RemoteFileSettings { def host: InetAddress def port: Int @@ -44,6 +45,7 @@ sealed abstract class RemoteFileSettings { /** * Common settings for FTP and FTPs. */ +@DoNotInherit sealed abstract class FtpFileSettings extends RemoteFileSettings { def binary: Boolean // BINARY or ASCII (default) def passiveMode: Boolean diff --git a/ftp/src/main/scala/akka/stream/alpakka/ftp/scaladsl/FtpApi.scala b/ftp/src/main/scala/akka/stream/alpakka/ftp/scaladsl/FtpApi.scala index 65f874e3f6..c91d174a1c 100644 --- a/ftp/src/main/scala/akka/stream/alpakka/ftp/scaladsl/FtpApi.scala +++ b/ftp/src/main/scala/akka/stream/alpakka/ftp/scaladsl/FtpApi.scala @@ -4,23 +4,20 @@ package akka.stream.alpakka.ftp.scaladsl -import akka.{Done, NotUsed} -import akka.stream.{IOResult, Materializer} -import akka.stream.alpakka.ftp.impl._ -import akka.stream.alpakka.ftp.{FtpFile, RemoteFileSettings} +import akka.annotation.DoNotInherit +import akka.stream.alpakka.ftp.impl.{FtpSourceFactory, FtpSourceParams, FtpsSourceParams, SftpSourceParams} +import akka.stream.alpakka.ftp._ import akka.stream.scaladsl.{Sink, Source} +import akka.stream.{IOResult, Materializer} import akka.util.ByteString +import akka.{Done, NotUsed} import net.schmizz.sshj.SSHClient import org.apache.commons.net.ftp.{FTPClient, FTPSClient} import scala.concurrent.Future -sealed trait FtpApi[FtpClient] { _: FtpSourceFactory[FtpClient] => - - /** - * The refined [[RemoteFileSettings]] type. - */ - type S <: RemoteFileSettings +@DoNotInherit +sealed trait FtpApi[FtpClient, S <: RemoteFileSettings] { _: FtpSourceFactory[FtpClient, S] => /** * Scala API: creates a [[akka.stream.scaladsl.Source Source]] of [[FtpFile]]s from the remote user `root` directory. @@ -29,8 +26,7 @@ sealed trait FtpApi[FtpClient] { _: FtpSourceFactory[FtpClient] => * @param host FTP, FTPs or SFTP host * @return A [[akka.stream.scaladsl.Source Source]] of [[FtpFile]]s */ - def ls(host: String): Source[FtpFile, NotUsed] = - ls(host, basePath = "") + def ls(host: String): Source[FtpFile, NotUsed] /** * Scala API: creates a [[akka.stream.scaladsl.Source Source]] of [[FtpFile]]s from a base path. @@ -40,8 +36,7 @@ sealed trait FtpApi[FtpClient] { _: FtpSourceFactory[FtpClient] => * @param basePath Base path from which traverse the remote file server * @return A [[akka.stream.scaladsl.Source Source]] of [[FtpFile]]s */ - def ls(host: String, basePath: String): Source[FtpFile, NotUsed] = - ls(basePath, defaultSettings(host)) + def ls(host: String, basePath: String): Source[FtpFile, NotUsed] /** * Scala API: creates a [[akka.stream.scaladsl.Source Source]] of [[FtpFile]]s from the remote user `root` directory. @@ -51,8 +46,7 @@ sealed trait FtpApi[FtpClient] { _: FtpSourceFactory[FtpClient] => * @param password password * @return A [[akka.stream.scaladsl.Source Source]] of [[FtpFile]]s */ - def ls(host: String, username: String, password: String): Source[FtpFile, NotUsed] = - ls("", defaultSettings(host, Some(username), Some(password))) + def ls(host: String, username: String, password: String): Source[FtpFile, NotUsed] /** * Scala API: creates a [[akka.stream.scaladsl.Source Source]] of [[FtpFile]]s from a base path. @@ -63,8 +57,7 @@ sealed trait FtpApi[FtpClient] { _: FtpSourceFactory[FtpClient] => * @param basePath Base path from which traverse the remote file server * @return A [[akka.stream.scaladsl.Source Source]] of [[FtpFile]]s */ - def ls(host: String, username: String, password: String, basePath: String): Source[FtpFile, NotUsed] = - ls(basePath, defaultSettings(host, Some(username), Some(password))) + def ls(host: String, username: String, password: String, basePath: String): Source[FtpFile, NotUsed] /** * Scala API: creates a [[akka.stream.scaladsl.Source Source]] of [[FtpFile]]s from a base path. @@ -73,8 +66,7 @@ sealed trait FtpApi[FtpClient] { _: FtpSourceFactory[FtpClient] => * @param connectionSettings connection settings * @return A [[akka.stream.scaladsl.Source Source]] of [[FtpFile]]s */ - def ls(basePath: String, connectionSettings: S): Source[FtpFile, NotUsed] = - ls(basePath, connectionSettings, f => true) + def ls(basePath: String, connectionSettings: S): Source[FtpFile, NotUsed] /** * Scala API: creates a [[akka.stream.scaladsl.Source Source]] of [[FtpFile]]s from a base path. @@ -91,10 +83,7 @@ sealed trait FtpApi[FtpClient] { _: FtpSourceFactory[FtpClient] => * * @return A [[akka.stream.scaladsl.Source Source]] of [[FtpFile]]s */ - def ls(basePath: String, connectionSettings: S, branchSelector: FtpFile => Boolean): Source[FtpFile, NotUsed] = - Source.fromGraph( - createBrowserGraph(basePath, connectionSettings, branchSelector, _emitTraversedDirectories = false) - ) + def ls(basePath: String, connectionSettings: S, branchSelector: FtpFile => Boolean): Source[FtpFile, NotUsed] /** * Scala API: creates a [[akka.stream.scaladsl.Source Source]] of [[FtpFile]]s from a base path. @@ -115,28 +104,25 @@ sealed trait FtpApi[FtpClient] { _: FtpSourceFactory[FtpClient] => def ls(basePath: String, connectionSettings: S, branchSelector: FtpFile => Boolean, - emitTraversedDirectories: Boolean): Source[FtpFile, NotUsed] = - Source.fromGraph(createBrowserGraph(basePath, connectionSettings, branchSelector, emitTraversedDirectories)) + emitTraversedDirectories: Boolean): Source[FtpFile, NotUsed] /** - * Scala API for creating a directory in a given path + * Scala API for creating a directory in a given path * @param basePath path to start with * @param name name of a directory to create * @param connectionSettings connection settings * @return [[akka.stream.scaladsl.Source Source]] of [[akka.Done]] */ - def mkdir(basePath: String, name: String, connectionSettings: S): Source[Done, NotUsed] = - Source.fromGraph(createMkdirGraph(basePath, name, connectionSettings)).map(_ => Done) + def mkdir(basePath: String, name: String, connectionSettings: S): Source[Done, NotUsed] /** - * Scala API for creating a directory in a given path + * Scala API for creating a directory in a given path * @param basePath path to start with * @param name name of a directory to create * @param connectionSettings connection settings * @return [[scala.concurrent.Future Future]] of [[akka.Done]] indicating a materialized, asynchronous request */ - def mkdirAsync(basePath: String, name: String, connectionSettings: S)(implicit mat: Materializer): Future[Done] = - mkdir(basePath, name, connectionSettings).runWith(Sink.head) + def mkdirAsync(basePath: String, name: String, connectionSettings: S)(implicit mat: Materializer): Future[Done] /** * Scala API: creates a [[akka.stream.scaladsl.Source Source]] of [[akka.util.ByteString ByteString]] from some file path. @@ -145,8 +131,7 @@ sealed trait FtpApi[FtpClient] { _: FtpSourceFactory[FtpClient] => * @param path the file path * @return A [[akka.stream.scaladsl.Source Source]] of [[akka.util.ByteString ByteString]] that materializes to a [[scala.concurrent.Future Future]] of [[IOResult]] */ - def fromPath(host: String, path: String): Source[ByteString, Future[IOResult]] = - fromPath(path, defaultSettings(host)) + def fromPath(host: String, path: String): Source[ByteString, Future[IOResult]] /** * Scala API: creates a [[akka.stream.scaladsl.Source Source]] of [[akka.util.ByteString ByteString]] from some file path. @@ -157,8 +142,7 @@ sealed trait FtpApi[FtpClient] { _: FtpSourceFactory[FtpClient] => * @param path the file path * @return A [[akka.stream.scaladsl.Source Source]] of [[akka.util.ByteString ByteString]] that materializes to a [[scala.concurrent.Future Future]] of [[IOResult]] */ - def fromPath(host: String, username: String, password: String, path: String): Source[ByteString, Future[IOResult]] = - fromPath(path, defaultSettings(host, Some(username), Some(password))) + def fromPath(host: String, username: String, password: String, path: String): Source[ByteString, Future[IOResult]] /** * Scala API: creates a [[akka.stream.scaladsl.Source Source]] of [[akka.util.ByteString ByteString]] from some file path. @@ -172,8 +156,7 @@ sealed trait FtpApi[FtpClient] { _: FtpSourceFactory[FtpClient] => path: String, connectionSettings: S, chunkSize: Int = DefaultChunkSize - ): Source[ByteString, Future[IOResult]] = - fromPath(path, connectionSettings, chunkSize, 0L) + ): Source[ByteString, Future[IOResult]] /** * Scala API: creates a [[akka.stream.scaladsl.Source Source]] of [[akka.util.ByteString ByteString]] from some file path. @@ -189,8 +172,7 @@ sealed trait FtpApi[FtpClient] { _: FtpSourceFactory[FtpClient] => connectionSettings: S, chunkSize: Int, offset: Long - ): Source[ByteString, Future[IOResult]] = - Source.fromGraph(createIOSource(path, connectionSettings, chunkSize, offset)) + ): Source[ByteString, Future[IOResult]] /** * Scala API: creates a [[akka.stream.scaladsl.Sink Sink]] of [[akka.util.ByteString ByteString]] to some file path. @@ -204,8 +186,7 @@ sealed trait FtpApi[FtpClient] { _: FtpSourceFactory[FtpClient] => path: String, connectionSettings: S, append: Boolean = false - ): Sink[ByteString, Future[IOResult]] = - Sink.fromGraph(createIOSink(path, connectionSettings, append)) + ): Sink[ByteString, Future[IOResult]] /** * Scala API: creates a [[akka.stream.scaladsl.Sink Sink]] of a [[FtpFile]] that moves a file to some file path. @@ -214,8 +195,7 @@ sealed trait FtpApi[FtpClient] { _: FtpSourceFactory[FtpClient] => * @param connectionSettings connection settings * @return A [[akka.stream.scaladsl.Sink Sink]] of [[FtpFile]] that materializes to a [[scala.concurrent.Future Future]] of [[IOResult]] */ - def move(destinationPath: FtpFile => String, connectionSettings: S): Sink[FtpFile, Future[IOResult]] = - Sink.fromGraph(createMoveSink(destinationPath, connectionSettings)) + def move(destinationPath: FtpFile => String, connectionSettings: S): Sink[FtpFile, Future[IOResult]] /** * Scala API: creates a [[akka.stream.scaladsl.Sink Sink]] of a [[FtpFile]] that removes a file. @@ -223,16 +203,182 @@ sealed trait FtpApi[FtpClient] { _: FtpSourceFactory[FtpClient] => * @param connectionSettings connection settings * @return A [[akka.stream.scaladsl.Sink Sink]] of [[FtpFile]] that materializes to a [[scala.concurrent.Future Future]] of [[IOResult]] */ + def remove(connectionSettings: S): Sink[FtpFile, Future[IOResult]] +} + +object Ftp extends FtpApi[FTPClient, FtpSettings] with FtpSourceParams { + + def ls(host: String): Source[FtpFile, NotUsed] = ls(host, basePath = "") + + def ls(host: String, basePath: String): Source[FtpFile, NotUsed] = ls(basePath, defaultSettings(host)) + + def ls(host: String, username: String, password: String): Source[FtpFile, NotUsed] = + ls("", defaultSettings(host, Some(username), Some(password))) + + def ls(host: String, username: String, password: String, basePath: String): Source[FtpFile, NotUsed] = + ls(basePath, defaultSettings(host, Some(username), Some(password))) + + def ls(basePath: String, connectionSettings: S): Source[FtpFile, NotUsed] = + ls(basePath, connectionSettings, _ => true) + + def ls(basePath: String, connectionSettings: S, branchSelector: FtpFile => Boolean): Source[FtpFile, NotUsed] = + Source.fromGraph( + createBrowserGraph(basePath, connectionSettings, branchSelector, _emitTraversedDirectories = false) + ) + + def ls(basePath: String, + connectionSettings: S, + branchSelector: FtpFile => Boolean, + emitTraversedDirectories: Boolean): Source[FtpFile, NotUsed] = + Source.fromGraph(createBrowserGraph(basePath, connectionSettings, branchSelector, emitTraversedDirectories)) + + def fromPath(host: String, path: String): Source[ByteString, Future[IOResult]] = fromPath(path, defaultSettings(host)) + + def fromPath(host: String, username: String, password: String, path: String): Source[ByteString, Future[IOResult]] = + fromPath(path, defaultSettings(host, Some(username), Some(password))) + + def fromPath(path: String, + connectionSettings: S, + chunkSize: Int = DefaultChunkSize): Source[ByteString, Future[IOResult]] = + fromPath(path, connectionSettings, chunkSize, 0L) + + def fromPath(path: String, + connectionSettings: S, + chunkSize: Int, + offset: Long): Source[ByteString, Future[IOResult]] = + Source.fromGraph(createIOSource(path, connectionSettings, chunkSize, offset)) + + def mkdir(basePath: String, name: String, connectionSettings: S): Source[Done, NotUsed] = + Source.fromGraph(createMkdirGraph(basePath, name, connectionSettings)).map(_ => Done) + + def mkdirAsync(basePath: String, name: String, connectionSettings: S)(implicit mat: Materializer): Future[Done] = + mkdir(basePath, name, connectionSettings).runWith(Sink.head) + + def toPath(path: String, connectionSettings: S, append: Boolean = false): Sink[ByteString, Future[IOResult]] = + Sink.fromGraph(createIOSink(path, connectionSettings, append)) + + def move(destinationPath: FtpFile => String, connectionSettings: S): Sink[FtpFile, Future[IOResult]] = + Sink.fromGraph(createMoveSink(destinationPath, connectionSettings)) + def remove(connectionSettings: S): Sink[FtpFile, Future[IOResult]] = Sink.fromGraph(createRemoveSink(connectionSettings)) - protected[this] implicit def ftpLike: FtpLike[FtpClient, S] } -class SftpApi extends FtpApi[SSHClient] with SftpSourceParams -object Ftp extends FtpApi[FTPClient] with FtpSourceParams -object Ftps extends FtpApi[FTPSClient] with FtpsSourceParams +object Ftps extends FtpApi[FTPSClient, FtpsSettings] with FtpsSourceParams { + def ls(host: String): Source[FtpFile, NotUsed] = ls(host, basePath = "") + + def ls(host: String, basePath: String): Source[FtpFile, NotUsed] = ls(basePath, defaultSettings(host)) + + def ls(host: String, username: String, password: String): Source[FtpFile, NotUsed] = + ls("", defaultSettings(host, Some(username), Some(password))) + + def ls(host: String, username: String, password: String, basePath: String): Source[FtpFile, NotUsed] = + ls(basePath, defaultSettings(host, Some(username), Some(password))) + def ls(basePath: String, connectionSettings: S): Source[FtpFile, NotUsed] = + ls(basePath, connectionSettings, _ => true) + + def ls(basePath: String, connectionSettings: S, branchSelector: FtpFile => Boolean): Source[FtpFile, NotUsed] = + Source.fromGraph( + createBrowserGraph(basePath, connectionSettings, branchSelector, _emitTraversedDirectories = false) + ) + + def ls(basePath: String, + connectionSettings: S, + branchSelector: FtpFile => Boolean, + emitTraversedDirectories: Boolean): Source[FtpFile, NotUsed] = + Source.fromGraph(createBrowserGraph(basePath, connectionSettings, branchSelector, emitTraversedDirectories)) + + def fromPath(host: String, path: String): Source[ByteString, Future[IOResult]] = fromPath(path, defaultSettings(host)) + + def fromPath(host: String, username: String, password: String, path: String): Source[ByteString, Future[IOResult]] = + fromPath(path, defaultSettings(host, Some(username), Some(password))) + + def fromPath(path: String, + connectionSettings: S, + chunkSize: Int = DefaultChunkSize): Source[ByteString, Future[IOResult]] = + fromPath(path, connectionSettings, chunkSize, 0L) + + def fromPath(path: String, + connectionSettings: S, + chunkSize: Int, + offset: Long): Source[ByteString, Future[IOResult]] = + Source.fromGraph(createIOSource(path, connectionSettings, chunkSize, offset)) + + def mkdir(basePath: String, name: String, connectionSettings: S): Source[Done, NotUsed] = + Source.fromGraph(createMkdirGraph(basePath, name, connectionSettings)).map(_ => Done) + + def mkdirAsync(basePath: String, name: String, connectionSettings: S)(implicit mat: Materializer): Future[Done] = + mkdir(basePath, name, connectionSettings).runWith(Sink.head) + + def toPath(path: String, connectionSettings: S, append: Boolean = false): Sink[ByteString, Future[IOResult]] = + Sink.fromGraph(createIOSink(path, connectionSettings, append)) + + def move(destinationPath: FtpFile => String, connectionSettings: S): Sink[FtpFile, Future[IOResult]] = + Sink.fromGraph(createMoveSink(destinationPath, connectionSettings)) + + def remove(connectionSettings: S): Sink[FtpFile, Future[IOResult]] = + Sink.fromGraph(createRemoveSink(connectionSettings)) +} + +class SftpApi extends FtpApi[SSHClient, SftpSettings] with SftpSourceParams { + def ls(host: String): Source[FtpFile, NotUsed] = ls(host, basePath = "") + + def ls(host: String, basePath: String): Source[FtpFile, NotUsed] = ls(basePath, defaultSettings(host)) + + def ls(host: String, username: String, password: String): Source[FtpFile, NotUsed] = + ls("", defaultSettings(host, Some(username), Some(password))) + + def ls(host: String, username: String, password: String, basePath: String): Source[FtpFile, NotUsed] = + ls(basePath, defaultSettings(host, Some(username), Some(password))) + + def ls(basePath: String, connectionSettings: S): Source[FtpFile, NotUsed] = + ls(basePath, connectionSettings, _ => true) + + def ls(basePath: String, connectionSettings: S, branchSelector: FtpFile => Boolean): Source[FtpFile, NotUsed] = + Source.fromGraph( + createBrowserGraph(basePath, connectionSettings, branchSelector, _emitTraversedDirectories = false) + ) + + def ls(basePath: String, + connectionSettings: S, + branchSelector: FtpFile => Boolean, + emitTraversedDirectories: Boolean): Source[FtpFile, NotUsed] = + Source.fromGraph(createBrowserGraph(basePath, connectionSettings, branchSelector, emitTraversedDirectories)) + + def fromPath(host: String, path: String): Source[ByteString, Future[IOResult]] = fromPath(path, defaultSettings(host)) + + def fromPath(host: String, username: String, password: String, path: String): Source[ByteString, Future[IOResult]] = + fromPath(path, defaultSettings(host, Some(username), Some(password))) + + def fromPath(path: String, + connectionSettings: S, + chunkSize: Int = DefaultChunkSize): Source[ByteString, Future[IOResult]] = + fromPath(path, connectionSettings, chunkSize, 0L) + + def fromPath(path: String, + connectionSettings: S, + chunkSize: Int, + offset: Long): Source[ByteString, Future[IOResult]] = + Source.fromGraph(createIOSource(path, connectionSettings, chunkSize, offset)) + + def mkdir(basePath: String, name: String, connectionSettings: S): Source[Done, NotUsed] = + Source.fromGraph(createMkdirGraph(basePath, name, connectionSettings)).map(_ => Done) + + def mkdirAsync(basePath: String, name: String, connectionSettings: S)(implicit mat: Materializer): Future[Done] = + mkdir(basePath, name, connectionSettings).runWith(Sink.head) + + def toPath(path: String, connectionSettings: S, append: Boolean = false): Sink[ByteString, Future[IOResult]] = + Sink.fromGraph(createIOSink(path, connectionSettings, append)) + + def move(destinationPath: FtpFile => String, connectionSettings: S): Sink[FtpFile, Future[IOResult]] = + Sink.fromGraph(createMoveSink(destinationPath, connectionSettings)) + + def remove(connectionSettings: S): Sink[FtpFile, Future[IOResult]] = + Sink.fromGraph(createRemoveSink(connectionSettings)) + +} object Sftp extends SftpApi { /** @@ -243,6 +389,6 @@ object Sftp extends SftpApi { */ def apply(customSshClient: SSHClient): SftpApi = new SftpApi { - override val sshClient = customSshClient + override val sshClient: SSHClient = customSshClient } } diff --git a/ftp/src/test/scala/akka/stream/alpakka/ftp/CommonFtpStageSpec.scala b/ftp/src/test/scala/akka/stream/alpakka/ftp/CommonFtpStageSpec.scala index 08b151547f..c6b813f542 100644 --- a/ftp/src/test/scala/akka/stream/alpakka/ftp/CommonFtpStageSpec.scala +++ b/ftp/src/test/scala/akka/stream/alpakka/ftp/CommonFtpStageSpec.scala @@ -143,7 +143,7 @@ trait CommonFtpStageSpec extends BaseSpec with Eventually { files should have size 1 inside(files.head) { - case FtpFile(actualFileName, actualPath, isDirectory, size, lastModified, perms) ⇒ + case FtpFile(actualFileName, actualPath, isDirectory, size, lastModified, perms) => actualFileName shouldBe fileName // actualPath shouldBe s"/$basePath$fileName" isDirectory shouldBe false @@ -227,7 +227,7 @@ trait CommonFtpStageSpec extends BaseSpec with Eventually { "no file is already present at the target location" should { "create a new file from the provided stream of bytes regardless of the append mode" in assertAllStagesStopped { val fileName = "sample_io_" + Instant.now().getNano - List(true, false).foreach { mode ⇒ + List(true, false).foreach { mode => val result = Source.single(ByteString(getDefaultContent)).runWith(storeToPath(s"/$fileName", mode)).futureValue @@ -290,7 +290,7 @@ trait CommonFtpStageSpec extends BaseSpec with Eventually { val result = Source[Byte](fileContents.toList) .grouped(8192) - .map(s ⇒ ByteString.apply(s.toArray)) + .map(s => ByteString.apply(s.toArray)) .runWith(storeToPath(s"/$fileName", append = false)) .futureValue @@ -306,7 +306,7 @@ trait CommonFtpStageSpec extends BaseSpec with Eventually { "fail and report the exception in the result status if upstream fails" in assertAllStagesStopped { val fileName = "sample_io_upstream_" + Instant.now().getNano - val brokenSource = Source(10.to(0, -1)).map(x ⇒ ByteString(10 / x)) + val brokenSource = Source(10.to(0, -1)).map(x => ByteString(10 / x)) val result = brokenSource.runWith(storeToPath(s"/$fileName", append = false)).futureValue diff --git a/mqtt/src/main/scala/akka/stream/alpakka/mqtt/settings.scala b/mqtt/src/main/scala/akka/stream/alpakka/mqtt/settings.scala index 13fa2183f2..77255765bb 100644 --- a/mqtt/src/main/scala/akka/stream/alpakka/mqtt/settings.scala +++ b/mqtt/src/main/scala/akka/stream/alpakka/mqtt/settings.scala @@ -286,9 +286,26 @@ final class MqttConnectionSettings private (val broker: String, ) override def toString = - s"""MqttConnectionSettings(broker=$broker,clientId=$clientId,persistence=$persistence,auth(username)=${auth.map( - _._1 - )},socketFactory=$socketFactory,cleanSession=$cleanSession,will=$will,automaticReconnect=$automaticReconnect,keepAliveInterval=$keepAliveInterval,connectionTimeout=$connectionTimeout,disconnectQuiesceTimeout=$disconnectQuiesceTimeout,disconnectTimeout=$disconnectTimeout,maxInFlight=$maxInFlight,mqttVersion=$mqttVersion,serverUris=$serverUris,sslHostnameVerifier=$sslHostnameVerifier,sslProperties=$sslProperties,offlinePersistenceSettings=$offlinePersistenceSettings)""" + "MqttConnectionSettings(" + + s"broker=$broker," + + s"clientId=$clientId," + + s"persistence=$persistence," + + s"auth(username)=${auth.map(_._1)}," + + s"socketFactory=$socketFactory," + + s"cleanSession=$cleanSession," + + s"will=$will," + + s"automaticReconnect=$automaticReconnect," + + s"keepAliveInterval=$keepAliveInterval," + + s"connectionTimeout=$connectionTimeout," + + s"disconnectQuiesceTimeout=$disconnectQuiesceTimeout," + + s"disconnectTimeout=$disconnectTimeout," + + s"maxInFlight=$maxInFlight," + + s"mqttVersion=$mqttVersion," + + s"serverUris=$serverUris," + + s"sslHostnameVerifier=$sslHostnameVerifier," + + s"sslProperties=$sslProperties," + + s"offlinePersistenceSettings=$offlinePersistenceSettings" + + ")" } /** diff --git a/project/Dependencies.scala b/project/Dependencies.scala index c7b38946b7..e846ddd600 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -6,7 +6,7 @@ object Dependencies { val Nightly = sys.env.get("TRAVIS_EVENT_TYPE").contains("cron") val Scala211 = "2.11.12" - val Scala212 = "2.12.7" + val Scala212 = "2.12.9" val Scala213 = "2.13.0" val ScalaVersions = Seq(Scala212, Scala211, Scala213).filterNot(_ == Scala211 && Nightly)