Skip to content

Commit

Permalink
Rebase and add mkdir operations
Browse files Browse the repository at this point in the history
  • Loading branch information
ennru committed Aug 21, 2019
1 parent 6fc5b58 commit 7b71fa0
Show file tree
Hide file tree
Showing 3 changed files with 54 additions and 30 deletions.
5 changes: 1 addition & 4 deletions ftp/src/main/mima-filters/1.1.x.backwards.excludes
Original file line number Diff line number Diff line change
@@ -1,7 +1,4 @@
# Allow changes to impl
ProblemFilters.exclude[Problem]("akka.stream.alpakka.ftp.impl.*")


# Restructure to enable compilation with Scala 2.12.8+ PR #1779
ProblemFilters.exclude[DirectAbstractMethodProblem]("akka.stream.alpakka.ftp.javadsl.FtpApi.*")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.alpakka.ftp.javadsl.FtpApi.ftpLike")
ProblemFilters.exclude[DirectAbstractMethodProblem]("akka.stream.alpakka.ftp.scaladsl.FtpApi.*")
Expand Down
35 changes: 25 additions & 10 deletions ftp/src/main/scala/akka/stream/alpakka/ftp/javadsl/FtpApi.scala
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,10 @@ package akka.stream.alpakka.ftp.javadsl
import java.util.concurrent.CompletionStage
import java.util.function._

import akka.NotUsed
import akka.stream.IOResult
import akka.stream.alpakka.ftp._
import akka.stream.alpakka.ftp.impl._
import akka.stream.javadsl.{Sink, Source}
import akka.stream.{IOResult, Materializer}
import akka.util.ByteString
import akka.{Done, NotUsed}
import net.schmizz.sshj.SSHClient
Expand Down Expand Up @@ -201,8 +200,7 @@ sealed trait FtpApi[FtpClient, S <: RemoteFileSettings] { _: FtpSourceFactory[Ft
* @param connectionSettings connection settings
* @return [[akka.stream.javadsl.Source Source]] of [[Done]]
*/
def mkdir(basePath: String, name: String, connectionSettings: S): Source[Done, NotUsed] =
ScalaSource.fromGraph(createMkdirGraph(basePath, name, connectionSettings)).map(_ => Done.getInstance()).asJava
def mkdir(basePath: String, name: String, connectionSettings: S): Source[Done, NotUsed]

/**
* Java API for creating a directory in a given path
Expand All @@ -212,8 +210,7 @@ sealed trait FtpApi[FtpClient, S <: RemoteFileSettings] { _: FtpSourceFactory[Ft
* @param connectionSettings connection settings
* @return [[java.util.concurrent.CompletionStage CompletionStage]] of [[akka.Done]] indicating a materialized, asynchronous request
*/
def mkdirAsync(basePath: String, name: String, connectionSettings: S, mat: Materializer): CompletionStage[Done] =
mkdir(basePath, name, connectionSettings).runWith(Sink.head(), mat)
def mkdirAsync(basePath: String, name: String, connectionSettings: S, mat: Materializer): CompletionStage[Done]

/**
* Java API: creates a [[akka.stream.javadsl.Sink Sink]] of [[akka.util.ByteString ByteString]] to some file path.
Expand Down Expand Up @@ -272,7 +269,7 @@ object Ftp extends FtpApi[FTPClient, FtpSettings] with FtpSourceParams {

def ls(basePath: String, connectionSettings: S): Source[FtpFile, NotUsed] =
Source
.fromGraph(createBrowserGraph(basePath, connectionSettings, f => true, _emitTraversedDirectories = false))
.fromGraph(createBrowserGraph(basePath, connectionSettings, _ => true, _emitTraversedDirectories = false))

def ls(basePath: String, connectionSettings: S, branchSelector: Predicate[FtpFile]): Source[FtpFile, NotUsed] =
Source.fromGraph(
Expand Down Expand Up @@ -319,6 +316,12 @@ object Ftp extends FtpApi[FTPClient, FtpSettings] with FtpSourceParams {
.mapMaterializedValue(_.toJava)
}

def mkdir(basePath: String, name: String, connectionSettings: S): Source[Done, NotUsed] =
Source.fromGraph(createMkdirGraph(basePath, name, connectionSettings)).map(_ => Done)

def mkdirAsync(basePath: String, name: String, connectionSettings: S, mat: Materializer): CompletionStage[Done] =
mkdir(basePath, name, connectionSettings).runWith(Sink.head(), mat)

def toPath(path: String, connectionSettings: S, append: Boolean): Sink[ByteString, CompletionStage[IOResult]] = {
import scala.compat.java8.FutureConverters._
Sink.fromGraph(createIOSink(path, connectionSettings, append)).mapMaterializedValue(_.toJava)
Expand Down Expand Up @@ -354,7 +357,7 @@ object Ftps extends FtpApi[FTPSClient, FtpsSettings] with FtpsSourceParams {

def ls(basePath: String, connectionSettings: S): Source[FtpFile, NotUsed] =
Source
.fromGraph(createBrowserGraph(basePath, connectionSettings, f => true, _emitTraversedDirectories = false))
.fromGraph(createBrowserGraph(basePath, connectionSettings, _ => true, _emitTraversedDirectories = false))

def ls(basePath: String, connectionSettings: S, branchSelector: Predicate[FtpFile]): Source[FtpFile, NotUsed] =
Source.fromGraph(
Expand Down Expand Up @@ -401,6 +404,12 @@ object Ftps extends FtpApi[FTPSClient, FtpsSettings] with FtpsSourceParams {
.mapMaterializedValue(_.toJava)
}

def mkdir(basePath: String, name: String, connectionSettings: S): Source[Done, NotUsed] =
Source.fromGraph(createMkdirGraph(basePath, name, connectionSettings)).map(_ => Done)

def mkdirAsync(basePath: String, name: String, connectionSettings: S, mat: Materializer): CompletionStage[Done] =
mkdir(basePath, name, connectionSettings).runWith(Sink.head(), mat)

def toPath(path: String, connectionSettings: S, append: Boolean): Sink[ByteString, CompletionStage[IOResult]] = {
import scala.compat.java8.FutureConverters._
Sink.fromGraph(createIOSink(path, connectionSettings, append)).mapMaterializedValue(_.toJava)
Expand Down Expand Up @@ -437,7 +446,7 @@ class SftpApi extends FtpApi[SSHClient, SftpSettings] with SftpSourceParams {

def ls(basePath: String, connectionSettings: S): Source[FtpFile, NotUsed] =
Source
.fromGraph(createBrowserGraph(basePath, connectionSettings, f => true, _emitTraversedDirectories = false))
.fromGraph(createBrowserGraph(basePath, connectionSettings, _ => true, _emitTraversedDirectories = false))

def ls(basePath: String, connectionSettings: S, branchSelector: Predicate[FtpFile]): Source[FtpFile, NotUsed] =
Source.fromGraph(
Expand Down Expand Up @@ -484,6 +493,12 @@ class SftpApi extends FtpApi[SSHClient, SftpSettings] with SftpSourceParams {
.mapMaterializedValue(_.toJava)
}

def mkdir(basePath: String, name: String, connectionSettings: S): Source[Done, NotUsed] =
Source.fromGraph(createMkdirGraph(basePath, name, connectionSettings)).map(_ => Done)

def mkdirAsync(basePath: String, name: String, connectionSettings: S, mat: Materializer): CompletionStage[Done] =
mkdir(basePath, name, connectionSettings).runWith(Sink.head(), mat)

def toPath(path: String, connectionSettings: S, append: Boolean): Sink[ByteString, CompletionStage[IOResult]] = {
import scala.compat.java8.FutureConverters._
Sink.fromGraph(createIOSink(path, connectionSettings, append)).mapMaterializedValue(_.toJava)
Expand Down Expand Up @@ -517,6 +532,6 @@ object Sftp extends SftpApi {
*/
def create(customSshClient: SSHClient): SftpApi =
new SftpApi {
override val sshClient = customSshClient
override val sshClient: SSHClient = customSshClient
}
}
44 changes: 28 additions & 16 deletions ftp/src/main/scala/akka/stream/alpakka/ftp/scaladsl/FtpApi.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,12 @@

package akka.stream.alpakka.ftp.scaladsl

import akka.NotUsed
import akka.stream.IOResult
import akka.stream.alpakka.ftp.impl.{FtpSourceFactory, FtpSourceParams, FtpsSourceParams, SftpSourceParams}
import akka.stream.alpakka.ftp._
import akka.{Done, NotUsed}
import akka.stream.{IOResult, Materializer}
import akka.stream.alpakka.ftp.impl._
import akka.stream.alpakka.ftp.{FtpFile, FtpSettings, FtpsSettings, RemoteFileSettings, SftpSettings}
import akka.stream.scaladsl.{Sink, Source}
import akka.stream.{IOResult, Materializer}
import akka.util.ByteString
import akka.{Done, NotUsed}
import net.schmizz.sshj.SSHClient
import org.apache.commons.net.ftp.{FTPClient, FTPSClient}

Expand Down Expand Up @@ -109,24 +105,22 @@ sealed trait FtpApi[FtpClient, S <: RemoteFileSettings] { _: FtpSourceFactory[Ft
emitTraversedDirectories: Boolean): Source[FtpFile, NotUsed]

/**
* Scala API for creating a directory in a given path
* Scala API for creating a directory in a given path
* @param basePath path to start with
* @param name name of a directory to create
* @param connectionSettings connection settings
* @return [[akka.stream.scaladsl.Source Source]] of [[akka.Done]]
*/
def mkdir(basePath: String, name: String, connectionSettings: S): Source[Done, NotUsed] =
Source.fromGraph(createMkdirGraph(basePath, name, connectionSettings)).map(_ => Done)
def mkdir(basePath: String, name: String, connectionSettings: S): Source[Done, NotUsed]

/**
* Scala API for creating a directory in a given path
* Scala API for creating a directory in a given path
* @param basePath path to start with
* @param name name of a directory to create
* @param connectionSettings connection settings
* @return [[scala.concurrent.Future Future]] of [[akka.Done]] indicating a materialized, asynchronous request
*/
def mkdirAsync(basePath: String, name: String, connectionSettings: S)(implicit mat: Materializer): Future[Done] =
mkdir(basePath, name, connectionSettings).runWith(Sink.head)
def mkdirAsync(basePath: String, name: String, connectionSettings: S)(implicit mat: Materializer): Future[Done]

/**
* Scala API: creates a [[akka.stream.scaladsl.Source Source]] of [[akka.util.ByteString ByteString]] from some file path.
Expand Down Expand Up @@ -223,7 +217,7 @@ object Ftp extends FtpApi[FTPClient, FtpSettings] with FtpSourceParams {
ls(basePath, defaultSettings(host, Some(username), Some(password)))

def ls(basePath: String, connectionSettings: S): Source[FtpFile, NotUsed] =
ls(basePath, connectionSettings, f => true)
ls(basePath, connectionSettings, _ => true)

def ls(basePath: String, connectionSettings: S, branchSelector: FtpFile => Boolean): Source[FtpFile, NotUsed] =
Source.fromGraph(
Expand Down Expand Up @@ -252,6 +246,12 @@ object Ftp extends FtpApi[FTPClient, FtpSettings] with FtpSourceParams {
offset: Long): Source[ByteString, Future[IOResult]] =
Source.fromGraph(createIOSource(path, connectionSettings, chunkSize, offset))

def mkdir(basePath: String, name: String, connectionSettings: S): Source[Done, NotUsed] =
Source.fromGraph(createMkdirGraph(basePath, name, connectionSettings)).map(_ => Done)

def mkdirAsync(basePath: String, name: String, connectionSettings: S)(implicit mat: Materializer): Future[Done] =
mkdir(basePath, name, connectionSettings).runWith(Sink.head)

def toPath(path: String, connectionSettings: S, append: Boolean = false): Sink[ByteString, Future[IOResult]] =
Sink.fromGraph(createIOSink(path, connectionSettings, append))

Expand All @@ -275,7 +275,7 @@ object Ftps extends FtpApi[FTPSClient, FtpsSettings] with FtpsSourceParams {
ls(basePath, defaultSettings(host, Some(username), Some(password)))

def ls(basePath: String, connectionSettings: S): Source[FtpFile, NotUsed] =
ls(basePath, connectionSettings, f => true)
ls(basePath, connectionSettings, _ => true)

def ls(basePath: String, connectionSettings: S, branchSelector: FtpFile => Boolean): Source[FtpFile, NotUsed] =
Source.fromGraph(
Expand Down Expand Up @@ -304,6 +304,12 @@ object Ftps extends FtpApi[FTPSClient, FtpsSettings] with FtpsSourceParams {
offset: Long): Source[ByteString, Future[IOResult]] =
Source.fromGraph(createIOSource(path, connectionSettings, chunkSize, offset))

def mkdir(basePath: String, name: String, connectionSettings: S): Source[Done, NotUsed] =
Source.fromGraph(createMkdirGraph(basePath, name, connectionSettings)).map(_ => Done)

def mkdirAsync(basePath: String, name: String, connectionSettings: S)(implicit mat: Materializer): Future[Done] =
mkdir(basePath, name, connectionSettings).runWith(Sink.head)

def toPath(path: String, connectionSettings: S, append: Boolean = false): Sink[ByteString, Future[IOResult]] =
Sink.fromGraph(createIOSink(path, connectionSettings, append))

Expand All @@ -326,7 +332,7 @@ class SftpApi extends FtpApi[SSHClient, SftpSettings] with SftpSourceParams {
ls(basePath, defaultSettings(host, Some(username), Some(password)))

def ls(basePath: String, connectionSettings: S): Source[FtpFile, NotUsed] =
ls(basePath, connectionSettings, f => true)
ls(basePath, connectionSettings, _ => true)

def ls(basePath: String, connectionSettings: S, branchSelector: FtpFile => Boolean): Source[FtpFile, NotUsed] =
Source.fromGraph(
Expand Down Expand Up @@ -355,6 +361,12 @@ class SftpApi extends FtpApi[SSHClient, SftpSettings] with SftpSourceParams {
offset: Long): Source[ByteString, Future[IOResult]] =
Source.fromGraph(createIOSource(path, connectionSettings, chunkSize, offset))

def mkdir(basePath: String, name: String, connectionSettings: S): Source[Done, NotUsed] =
Source.fromGraph(createMkdirGraph(basePath, name, connectionSettings)).map(_ => Done)

def mkdirAsync(basePath: String, name: String, connectionSettings: S)(implicit mat: Materializer): Future[Done] =
mkdir(basePath, name, connectionSettings).runWith(Sink.head)

def toPath(path: String, connectionSettings: S, append: Boolean = false): Sink[ByteString, Future[IOResult]] =
Sink.fromGraph(createIOSink(path, connectionSettings, append))

Expand All @@ -375,6 +387,6 @@ object Sftp extends SftpApi {
*/
def apply(customSshClient: SSHClient): SftpApi =
new SftpApi {
override val sshClient = customSshClient
override val sshClient: SSHClient = customSshClient
}
}

0 comments on commit 7b71fa0

Please sign in to comment.