Skip to content

Commit

Permalink
Cleanup2 (#20)
Browse files Browse the repository at this point in the history
* cleanup api, better usage of blocker

* update documentation

* upgrade fs2:2.2.2

* apply formatting
  • Loading branch information
regis-leray authored Feb 12, 2020
1 parent ea7342a commit 37f7f83
Show file tree
Hide file tree
Showing 4 changed files with 117 additions and 129 deletions.
51 changes: 28 additions & 23 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ Setup
```
//support scala 2.12 / 2.13
libraryDependencies += "com.github.regis-leray" %% "fs2-ftp" % "0.5.1"
libraryDependencies += "com.github.regis-leray" %% "fs2-ftp" % "0.6.0"
```

How to use it ?
Expand Down Expand Up @@ -48,45 +48,50 @@ connect(settings).use(
)
```

Required BlockingIO
---
Required ContextShit[IO]
----------------------

```scala
trait FtpClient[+A] {
def stat(path: String)(implicit ec: ExecutionContext): IO[Option[FtpResource]]

def readFile(path: String, chunkSize: Int = 2048)(
implicit ec: ExecutionContext,
cs: ContextShift[IO]
): fs2.Stream[IO, Byte]
def rm(path: String)(implicit ec: ExecutionContext): IO[Unit]
def rmdir(path: String)(implicit ec: ExecutionContext): IO[Unit]
def mkdir(path: String)(implicit ec: ExecutionContext): IO[Unit]
def ls(path: String)(implicit ec: ExecutionContext): Stream[IO, FtpResource]
def lsDescendant(path: String)(implicit ec: ExecutionContext): Stream[IO, FtpResource]
def upload(path: String, source: fs2.Stream[IO, Byte])(implicit ec: ExecutionContext, cs: ContextShift[IO]): IO[Unit]
def execute[T](f: A => T)(implicit ec: ExecutionContext): IO[T]
def stat(path: String)(implicit cs: ContextShift[IO]): IO[Option[FtpResource]]
def readFile(path: String, chunkSize: Int = 2048)(implicit cs: ContextShift[IO]): fs2.Stream[IO, Byte]
def rm(path: String)(implicit cs: ContextShift[IO]): IO[Unit]
def rmdir(path: String)(implicit cs: ContextShift[IO]): IO[Unit]
def mkdir(path: String)(implicit cs: ContextShift[IO]): IO[Unit]
def ls(path: String)(implicit cs: ContextShift[IO]): fs2.Stream[IO, FtpResource]
def lsDescendant(path: String)(implicit cs: ContextShift[IO]): fs2.Stream[IO, FtpResource]
def upload(path: String, source: fs2.Stream[IO, Byte])(implicit cs: ContextShift[IO]): IO[Unit]
def execute[T](f: A => T)(implicit cs: ContextShift[IO]): IO[T]
}
```

All function required an implicit Execution Context.
All function required an implicit ContextShit[IO].

Since all (s)ftp command are IO bound task , it will be executed on specific blocking executionContext
More information here https://typelevel.org/cats-effect/datatypes/contextshift.html

Since all (s)ftp command are IO bound task , it is required to provide an unbounded size Thread Pool (ExecutionContext)

More explanation: https://typelevel.org/cats-effect/concurrency/basics.html#choosing-thread-pool

Here how to provide an ContextShit

Here how to create an unbounded Execution Context
* you can use the default one provided by `IOApp`
```scala
import scala.concurrent.ExecutionContext
import java.util.concurrent.Executors
object MyApp extends cats.effect.IOApp {
//by default an implicit ContextShit is available as an implicit variable
}
```

Or create your own ContextShit
```scala
import cats.effect.IO
import cats.effect.ContextShift

implicit val blockingIO = ExecutionContext.fromExecutor(Executors.newCachedThreadPool())
implicit val cs: ContextShit[IO] = IO.contextShift(blockingIO)
```




Support any commands ?
---

Expand Down
107 changes: 48 additions & 59 deletions src/main/scala/ray/fs2/ftp/Ftp.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,54 +2,45 @@ package ray.fs2.ftp

import java.io.{ FileNotFoundException, InputStream }

import cats.effect.{ Blocker, ConcurrentEffect, ContextShift, IO, Resource }
import cats.effect.{ Blocker, ContextShift, IO, Resource }
import cats.syntax.monadError._
import fs2.Stream
import org.apache.commons.net.ftp.{ FTP, FTPSClient, FTPClient => JFTPClient }
import ray.fs2.ftp.FtpSettings.UnsecureFtpSettings

import scala.concurrent.ExecutionContext
final private class Ftp(unsafeClient: JFTPClient, blocker: Blocker) extends FtpClient[JFTPClient] {

final private class Ftp(unsafeClient: JFTPClient) extends FtpClient[JFTPClient] {

def stat(path: String)(implicit ec: ExecutionContext): IO[Option[FtpResource]] =
def stat(path: String)(implicit cs: ContextShift[IO]): IO[Option[FtpResource]] =
execute(client => Option(client.mlistFile(path)).map(FtpResource(_)))

def readFile(
path: String,
chunkSize: Int = 2048
)(implicit ec: ExecutionContext, cs: ContextShift[IO]): fs2.Stream[IO, Byte] = {
def readFile(path: String, chunkSize: Int = 2048)(implicit cs: ContextShift[IO]): fs2.Stream[IO, Byte] = {
val is = execute(client => Option(client.retrieveFileStream(path)))
.flatMap(_.fold(IO.raiseError[InputStream](new FileNotFoundException(s"file doesnt exist $path")))(IO.pure))

fs2.io.readInputStream(
is,
chunkSize,
Blocker.liftExecutionContext(ec)
)
fs2.io.readInputStream(is, chunkSize, blocker)
}

def rm(path: String)(implicit ec: ExecutionContext): IO[Unit] =
def rm(path: String)(implicit cs: ContextShift[IO]): IO[Unit] =
execute(_.deleteFile(path))
.ensure(InvalidPathError(s"Path is invalid. Cannot delete file : $path"))(identity)
.map(_ => ())

def rmdir(path: String)(implicit ec: ExecutionContext): IO[Unit] =
def rmdir(path: String)(implicit cs: ContextShift[IO]): IO[Unit] =
execute(_.removeDirectory(path))
.ensure(InvalidPathError(s"Path is invalid. Cannot remove directory : $path"))(identity)
.map(_ => ())

def mkdir(path: String)(implicit ec: ExecutionContext): IO[Unit] =
def mkdir(path: String)(implicit cs: ContextShift[IO]): IO[Unit] =
execute(_.makeDirectory(path))
.ensure(InvalidPathError(s"Path is invalid. Cannot create directory : $path"))(identity)
.map(_ => ())

def ls(path: String)(implicit ec: ExecutionContext): Stream[IO, FtpResource] =
def ls(path: String)(implicit cs: ContextShift[IO]): Stream[IO, FtpResource] =
fs2.Stream
.evalSeq(execute(_.listFiles(path).toList))
.map(FtpResource(_, Some(path)))

def lsDescendant(path: String)(implicit ec: ExecutionContext): Stream[IO, FtpResource] =
def lsDescendant(path: String)(implicit cs: ContextShift[IO]): Stream[IO, FtpResource] =
fs2.Stream
.evalSeq(execute(_.listFiles(path).toList))
.flatMap { f =>
Expand All @@ -60,12 +51,7 @@ final private class Ftp(unsafeClient: JFTPClient) extends FtpClient[JFTPClient]
Stream(FtpResource(f, Some(path)))
}

def upload(
path: String,
source: fs2.Stream[IO, Byte]
)(implicit ec: ExecutionContext, cs: ContextShift[IO]): IO[Unit] = {
//TODO can we remove it ???
implicit val F = ConcurrentEffect[IO]
def upload(path: String, source: fs2.Stream[IO, Byte])(implicit cs: ContextShift[IO]): IO[Unit] =
source
.through(fs2.io.toInputStream[IO])
.evalMap(is =>
Expand All @@ -74,46 +60,49 @@ final private class Ftp(unsafeClient: JFTPClient) extends FtpClient[JFTPClient]
)
.compile
.drain
}

def execute[T](f: JFTPClient => T)(implicit ec: ExecutionContext): IO[T] =
Blocker[IO].use(
_.blockOn(IO.delay(f(unsafeClient)))(IO.contextShift(ec))
)
def execute[T](f: JFTPClient => T)(implicit cs: ContextShift[IO]): IO[T] =
blocker.delay[IO, T](f(unsafeClient))
}

object Ftp {

def connect(settings: UnsecureFtpSettings)(implicit ec: ExecutionContext): Resource[IO, FtpClient[JFTPClient]] =
Resource.make[IO, FtpClient[JFTPClient]] {
IO.delay {
val ftpClient = if (settings.secure) new FTPSClient() else new JFTPClient()
settings.proxy.foreach(ftpClient.setProxy)
ftpClient.connect(settings.host, settings.port)

val success = ftpClient.login(settings.credentials.username, settings.credentials.password)

if (settings.binary) {
ftpClient.setFileType(FTP.BINARY_FILE_TYPE)
def connect(settings: UnsecureFtpSettings)(implicit cs: ContextShift[IO]): Resource[IO, FtpClient[JFTPClient]] =
for {
blocker <- Blocker[IO]

r <- Resource.make[IO, FtpClient[JFTPClient]] {
IO.delay {
val ftpClient = if (settings.secure) new FTPSClient() else new JFTPClient()
settings.proxy.foreach(ftpClient.setProxy)
ftpClient.connect(settings.host, settings.port)

val success = ftpClient.login(settings.credentials.username, settings.credentials.password)

if (settings.binary) {
ftpClient.setFileType(FTP.BINARY_FILE_TYPE)
}

if (settings.passiveMode) {
ftpClient.enterLocalPassiveMode()
}

success -> new Ftp(ftpClient, blocker)
}
.ensure(ConnectionError(s"Fail to connect to server ${settings.host}:${settings.port}"))(_._1)
.map(_._2)
} { client =>
for {
connected <- client.execute(_.isConnected)
_ <- if (!connected) IO.pure(())
else
client
.execute(_.logout)
.attempt
.flatMap(_ => client.execute(_.disconnect))
} yield ()
}

if (settings.passiveMode) {
ftpClient.enterLocalPassiveMode()
}
} yield r

success -> new Ftp(ftpClient)
}
.ensure(ConnectionError(s"Fail to connect to server ${settings.host}:${settings.port}"))(_._1)
.map(_._2)
} { client =>
for {
connected <- client.execute(_.isConnected)
_ <- if (!connected) IO.pure(())
else
client
.execute(_.logout)
.attempt
.flatMap(_ => client.execute(_.disconnect))
} yield ()
}
}
26 changes: 10 additions & 16 deletions src/main/scala/ray/fs2/ftp/FtpClient.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,27 +4,21 @@ import cats.effect.{ ContextShift, IO, Resource }
import fs2.Stream
import ray.fs2.ftp.FtpSettings.{ SecureFtpSettings, UnsecureFtpSettings }

import scala.concurrent.ExecutionContext

trait FtpClient[+A] {
def stat(path: String)(implicit ec: ExecutionContext): IO[Option[FtpResource]]

def readFile(path: String, chunkSize: Int = 2048)(
implicit ec: ExecutionContext,
cs: ContextShift[IO]
): fs2.Stream[IO, Byte]
def rm(path: String)(implicit ec: ExecutionContext): IO[Unit]
def rmdir(path: String)(implicit ec: ExecutionContext): IO[Unit]
def mkdir(path: String)(implicit ec: ExecutionContext): IO[Unit]
def ls(path: String)(implicit ec: ExecutionContext): Stream[IO, FtpResource]
def lsDescendant(path: String)(implicit ec: ExecutionContext): Stream[IO, FtpResource]
def upload(path: String, source: fs2.Stream[IO, Byte])(implicit ec: ExecutionContext, cs: ContextShift[IO]): IO[Unit]
def execute[T](f: A => T)(implicit ec: ExecutionContext): IO[T]
def stat(path: String)(implicit cs: ContextShift[IO]): IO[Option[FtpResource]]
def readFile(path: String, chunkSize: Int = 2048)(implicit cs: ContextShift[IO]): fs2.Stream[IO, Byte]
def rm(path: String)(implicit cs: ContextShift[IO]): IO[Unit]
def rmdir(path: String)(implicit cs: ContextShift[IO]): IO[Unit]
def mkdir(path: String)(implicit cs: ContextShift[IO]): IO[Unit]
def ls(path: String)(implicit cs: ContextShift[IO]): Stream[IO, FtpResource]
def lsDescendant(path: String)(implicit cs: ContextShift[IO]): Stream[IO, FtpResource]
def upload(path: String, source: fs2.Stream[IO, Byte])(implicit cs: ContextShift[IO]): IO[Unit]
def execute[T](f: A => T)(implicit cs: ContextShift[IO]): IO[T]
}

object FtpClient {

def connect[A](settings: FtpSettings[A])(implicit ec: ExecutionContext): Resource[IO, FtpClient[A]] = settings match {
def connect[A](settings: FtpSettings[A])(implicit cs: ContextShift[IO]): Resource[IO, FtpClient[A]] = settings match {
case s: UnsecureFtpSettings => Ftp.connect(s)
case s: SecureFtpSettings => SFtp.connect(s)
}
Expand Down
Loading

0 comments on commit 37f7f83

Please sign in to comment.