Java NIO utilities for usage with Monix
Monix-nio can be used to have the power of Monix combined with underlying Java-nio libraries. For the moment the following support has been added:
- Read/Write async to a file (combined with utf8 encoding/decoding if necessary)
- Read/Write async to TCP
- File system watcher
Benchmark Mode Cnt Score Error Units
ReadWriteFileBenchmark.read50MiB ss 40 191.350 ± 27.611 ms/op
ReadWriteFileBenchmark.read50MiBJavaNio ss 40 59.288 ± 5.068 ms/op
ReadWriteFileBenchmark.read50MiBWith100KiBChunks ss 40 44.501 ± 3.135 ms/op
ReadWriteFileBenchmark.read50MiBWith1KiBChunks ss 40 1066.369 ± 43.585 ms/op
ReadWriteFileBenchmark.read50MiBWith1MiBChunks ss 40 48.777 ± 3.725 ms/op
ReadWriteFileBenchmark.write50MiB ss 40 259.731 ± 10.388 ms/op
ReadWriteFileBenchmark.write50MiBJavaNio ss 40 304.585 ± 15.283 ms/op
ReadWriteFileBenchmark.write50MiBWith100KiBChunks ss 40 258.719 ± 20.431 ms/op
ReadWriteFileBenchmark.write50MiBWith1KiBChunks ss 40 2414.812 ± 23.128 ms/op
ReadWriteFileBenchmark.write50MiBWith1MiBChunks ss 40 255.028 ± 9.261 ms/op
- lower is better
WARNING: Can break backwards compatibility!
libraryDependencies += "io.monix" %% "monix-nio" % "0.1.0"
import monix.nio.text.UTF8Codec._
import monix.nio.file._
implicit val ctx = monix.execution.Scheduler.Implicits.global
val from = java.nio.file.Paths.get("/myFile.txt")
readAsync(from, 30)
.pipeThrough(utf8Decode) // decode utf8, If you need Array[Byte] just skip the decoding
.foreach(Console.print) // print each char
import monix.reactive.Observable
import monix.nio.file._
implicit val ctx = monix.execution.Scheduler.Implicits.global
val to = java.nio.file.Paths.get("/out.txt")
val bytes = "Hello world!".getBytes.grouped(3)
Observable
.fromIterator(bytes)
.consumeWith(writeAsync(to))
.runAsync
import monix.eval.Callback
import monix.nio.text.UTF8Codec._
import monix.nio.file._
val from = java.nio.file.Paths.get("from.txt")
val to = java.nio.file.Paths.get("to.txt")
val consumer = writeAsync(to)
val callback = new Callback[Long] {
override def onSuccess(value: Long): Unit = println(s"Copied $value bytes.")
override def onError(ex: Throwable): Unit = println(ex)
}
readAsync(from, 3)
.pipeThrough(utf8Decode)
.map { str =>
Console.println(str) // do something with it
str
}
.pipeThrough(utf8Encode)
.consumeWith(consumer)
.runAsync(callback)
import java.nio.file.{ Paths, WatchEvent }
import monix.nio.file._
implicit val ctx = monix.execution.Scheduler.Implicits.global
val path = Paths.get("/tmp")
def printEvent(event: WatchEvent[_]): Unit = {
val name = event.context().toString
val fullPath = path.resolve(name)
println(s"${event.kind().name()} - $fullPath")
}
watchAsync(path)
.foreach(p => p.foreach(printEvent))
$ echo 'monix-tcp' | nc -l -k 9000
import monix.reactive.Consumer
import monix.nio.tcp._
implicit val ctx = monix.execution.Scheduler.Implicits.global
val callback = new monix.eval.Callback[Unit] {
override def onSuccess(value: Unit): Unit = println("Completed")
override def onError(ex: Throwable): Unit = println(ex)
}
readAsync("localhost", 9000)
.consumeWith(Consumer.foreach(c => Console.out.print(new String(c))))
.runAsync(callback)
$ nc -l -k 9000
import monix.reactive.Observable
import monix.nio.tcp._
implicit val ctx = monix.execution.Scheduler.Implicits.global
val tcpConsumer = writeAsync("localhost", 9000)
val chunkSize = 2
val callback = new monix.eval.Callback[Long] {
override def onSuccess(value: Long): Unit = println(s"Sent $value bytes.")
override def onError(ex: Throwable): Unit = println(ex)
}
Observable
.fromIterator("Hello world!".getBytes.grouped(chunkSize))
.consumeWith(tcpConsumer)
.runAsync(callback)
import monix.reactive.Observable
import monix.eval. { Callback, Task }
import monix.execution.Ack.Continue
import monix.nio.tcp._
implicit val ctx = monix.execution.Scheduler.Implicits.global
val serverProgramT = for {
server <- asyncServer(java.net.InetAddress.getByName(null).getHostName, 9001)
socket <- server.accept()
conn <- Task.now(readWriteAsync(socket))
reader <- conn.tcpObservable
writer <- conn.tcpConsumer
echoedLen <- reader.doOnTerminateEval(_ => conn.stopWriting()).consumeWith(writer)
_ <- conn.close()
_ <- server.close()
} yield {
echoedLen
}
val client = readWriteAsync("localhost", 9001, 256 * 1024)
val clientProgramT = for {
writer <- client.tcpConsumer
_ <- Observable.fromIterable(Array("Hello world!".getBytes())).consumeWith(writer)
_ <- client.stopWriting()
reader <- client.tcpObservable
_ <- Task.now(reader
.doOnTerminateEval(_ => client.close())
.subscribe(
bytes => {
println(new String(bytes))
Continue
},
err => println(err),
() => println("Echo received.")
))
} yield {}
serverProgramT.runAsync(new Callback[Long] {
override def onSuccess(value: Long): Unit = println(s"Echoed $value bytes.")
override def onError(ex: Throwable): Unit = println(ex)
})
clientProgramT.runAsync
import monix.reactive.Observable
import monix.eval.Callback
import monix.nio.tcp._
implicit val ctx = monix.execution.Scheduler.Implicits.global
val asyncTcpClient = readWriteAsync("httpbin.org", 80, 256 * 1024)
val request =
"GET /get?tcp=monix HTTP/1.1\r\nHost: httpbin.org\r\nConnection: keep-alive\r\n\r\n"
val callbackR = new Callback[Unit] {
override def onSuccess(value: Unit): Unit = println("OK")
override def onError(ex: Throwable): Unit = println(ex)
}
asyncTcpClient
.tcpObservable
.map { reader =>
reader
.doOnTerminateEval(_ => asyncTcpClient.close()) // clean
.subscribe(
(bytes: Array[Byte]) => {
println(new String(bytes, "UTF-8"))
monix.execution.Ack.Stop
},
err => println(err),
() => println("Completed")
)
()
}
.runAsync(callbackR)
val callbackW = new Callback[Long] {
override def onSuccess(value: Long): Unit = println(s"Sent $value bytes")
override def onError(ex: Throwable): Unit = println(ex)
}
asyncTcpClient
.tcpConsumer
.flatMap { writer =>
val data = request.getBytes("UTF-8").grouped(256 * 1024).toArray
Observable
.fromIterable(data)
.consumeWith(writer)
}
.runAsync(callbackW)
The current maintainers (people who can help you) are:
- Sorin Chiprian (@creyer)
- Alexandru Nedelcu (@alexandru)
- Radu Gancea (@radusw)
The Monix project welcomes contributions from anybody wishing to participate. All code or documentation that is provided must be licensed with the same license that Monix is licensed with (Apache 2.0, see LICENSE.txt).
People are expected to follow the Typelevel Code of Conduct when discussing Monix on the Github page, Gitter channel, or other venues.
Feel free to open an issue if you notice a bug, have an idea for a feature, or have a question about the code. Pull requests are also gladly accepted. For more information, check out the contributor guide.
All code in this repository is licensed under the Apache License, Version 2.0. See LICENCE.txt.