Skip to content

Commit

Permalink
One module, blocking & non-blocking operations, misc. (#385)
Browse files Browse the repository at this point in the history
* Improve end-of-stream handling.

As discussed in issue #247.

* Update docs/essentials/index.md

Improve wording.

Co-authored-by: Maxim Schuwalow <16665913+mschuwalow@users.noreply.github.com>

* Switch to using ZManaged for core resources.

* Add resource management documentation.

* Fix extension method on Scala 3.

* Separate blocking and non-blocking APIs.

The channel APIs (except for asyncrhonous channels) are
now split into three parts:

* Blocking (BlockingOps)
* Non-blocking (NonBlockingOps)
* Core (On the channel itself)

Blocking and non-blocking APIs are the same in many cases
(GatheringByteOps and ScatteringByteOps), but there
are some differences.

The blocking API usage is performed as a block in
by using the `useBlocking` method on the channel.
This method switches to the ZIO blocking thread pool
and installs interrupt handling, which is necessary
to interrupt blocking I/O calls if the ZIO fiber is
interrupted. This imposes an overhead ever time it
is needed, this approach aims to pay the cost once
per channel, rather than once per read or write.

Non-blocking usage does not require the special
setup required for blocking, but for consistency
the API is accessed the same way, via the
`useNonBlocking` method on the channel.

* Improvements to WatchService.

* Add utility methods and example for directory watching.

* Improvements to Files.

* Use new ZStream adaptors for Java iterators and streams.
* Use the ZIO-NIO Charset wrapper
* Implement `lines` method
* Implement `copy` method

* Move Files object into core module.

* Add streaming read and write.

* Add example stream-based TCP server.

* Improve InetSocketAddress and socket binding APIs.

Use meaningful names for the InetSocketAddress
constructors.

Make binding to an automatically assigned socket
address explicit.

* Test that blocking read/write/accept can be interrupted.

* Remove non-core project.

* Rename nio-core project to nio.

* Rename zio.nio.core package to zio.nio.

* Make Selector#select interruptible.

* Fix SelectableChannel#register error type.

* Improve `SelectableChannel#register` API.

* Improve InetSocketAddress API.

* Don't hold two references to underlying Java buffer.

* Improvements to address handling.

* Add localhost constructor to InetSocketAddress.

* Improvements to async channels.

Make AsynchronousByteChannel callbacks interruptible by
closing the channel on interruption.

Looking at the JVM implementation, it seems async channel
callbacks only ever return IOException (not surprising)
so tighten the error types to IOException.

Add some socket channel write variants that were missing.

* Add stream/sink to AsynchronousFileChannel.

Co-authored-by: Lachlan O'Dea <lodea@mac.com>
  • Loading branch information
svroonland and quelgar authored Sep 6, 2021
1 parent 21661e1 commit 04361c7
Show file tree
Hide file tree
Showing 89 changed files with 2,665 additions and 2,956 deletions.
27 changes: 6 additions & 21 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,14 @@ addCommandAlias("check", "; scalafmtSbtCheck; scalafmtCheckAll; compile:scalafix
addCommandAlias("coverageReport", "clean coverage test coverageReport coverageAggregate")
addCommandAlias(
"testDotty",
";zioNioCore/test;zioNio/test;examples/test"
";zioNio/test;examples/test"
)

val zioVersion = "1.0.11"

lazy val zioNioCore = project
.in(file("nio-core"))
.settings(stdSettings("zio-nio-core"))
lazy val zioNio = project
.in(file("nio"))
.settings(stdSettings("zio-nio"))
.settings(
libraryDependencies ++= Seq(
"dev.zio" %% "zio" % zioVersion,
Expand All @@ -36,35 +36,20 @@ lazy val zioNioCore = project
)
.settings(dottySettings)

lazy val zioNio = project
.in(file("nio"))
.dependsOn(zioNioCore)
.settings(stdSettings("zio-nio"))
.settings(
libraryDependencies ++= Seq(
"dev.zio" %% "zio" % zioVersion,
"dev.zio" %% "zio-streams" % zioVersion,
"dev.zio" %% "zio-test" % zioVersion % Test,
"dev.zio" %% "zio-test-sbt" % zioVersion % Test
),
testFrameworks := Seq(new TestFramework("zio.test.sbt.ZTestFramework"))
)
.settings(dottySettings)

lazy val docs = project
.in(file("zio-nio-docs"))
.settings(
publish / skip := true,
moduleName := "zio-nio-docs",
scalacOptions -= "-Yno-imports",
scalacOptions -= "-Xfatal-warnings",
ScalaUnidoc / unidoc / unidocProjectFilter := inProjects(zioNioCore, zioNio),
ScalaUnidoc / unidoc / unidocProjectFilter := inProjects(zioNio),
ScalaUnidoc / unidoc / target := (LocalRootProject / baseDirectory).value / "website" / "static" / "api",
cleanFiles += (ScalaUnidoc / unidoc / target).value,
docusaurusCreateSite := docusaurusCreateSite.dependsOn(Compile / unidoc).value,
docusaurusPublishGhpages := docusaurusPublishGhpages.dependsOn(Compile / unidoc).value
)
.dependsOn(zioNioCore, zioNio)
.dependsOn(zioNio)
.enablePlugins(MdocPlugin, DocusaurusPlugin, ScalaUnidocPlugin)

lazy val examples = project
Expand Down
90 changes: 90 additions & 0 deletions docs/essentials/blocking.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
---
id: essentials_blocking
title: "Blocking I/O"
---

The default ZIO runtime assumes that threads will never block, and maintains a small fixed-size thread pool to perform all its operations. If threads become blocked, CPU utilization can be reduced as the number of available threads drops below the number of available CPU cores. If enough threads block, the entire program may halt.

Another issue with blocked threads is interruption. It is important that if the ZIO fiber is interrupted that this cancels the blocking operation and unblocks the thread.

Many NIO operations can block the calling thread when called. ZIO-NIO provides APIs to help ZIO-based code deal with this. The following describes how to use channels that offer blocking operations, which is all channels except for the asynchronous ones.

## Blocking and Non-Blocking Channel Operations

Channel APIs that may block are not exposed on the channel itself. They are accessed via the channel's `useBlocking` method. You provide this method a function that excepts a `BlockingOps` object and returns a `ZIO` effect value. The `BlockingOps` parameter will be appropriate to the type of channel and has the actual blocking I/O effects such as read and write.

The `useBlocking` method performs some setup required for safe use of blocking NIO APIs:

* Puts the channel in blocking mode
* Runs the resulting effect value on ZIO's blocking thread pool, leaving the standard pool unblocked.
* Installs interrupt handling, so the channel will be closed if the ZIO fiber is interrupted. This unblocks the blocked I/O operation. (Note that NIO does not offer a way to interrupt a blocked I/O operation on a channel that does not close the channel).

Non-blocking usage does not require this special handling, but for consistency the non-blocking operations are accessed in a similar way by calling `useNonBlocking` on the channel. For some channels there are some small differences between the blocking and non-blocking APIs. For example, `SocketChannel` only offers the `finishConnect` operation in the non-blocking case, as it is never needed in blocking mode.

```scala mdoc:silent
import zio.ZIO
import zio.nio._
import zio.nio.channels._

def readHeader(c: SocketChannel): ZIO[Blocking, IOException, (Chunk[Byte], Chunk[Byte])] =
c.useBlocking { ops =>
ops.readChunk(10) <*> ops.readChunk(25)
}
```

### Using Managed Channels

To help with the common use-case where you want to create a channel, there is versions of `useBlocking` and `useNonBlocking` that can be called directly on a managed value providing a channel.

`useNioBlocking` provides both the channel and the requested type of operations:

```scala mdoc:silent
import zio.nio._
import zio.nio.channels._

SocketChannel.open.useNioBlocking { (channel, blockingOps) =>
blockingOps.readChunk(100) <*> channel.remoteAddress
}
```

If you don't need the channel, there's `useNioBlockingOps`:

```scala mdoc:silent
import zio.nio.channels._

SocketChannel.open.useNioBlockingOps { blockingOps =>
blockingOps.readChunk(100)
}
```

To use the channel in non-blocking mode, there's corresponding `useNioNonBlocking` and `useNioNonBlockingOps` methods.

### Avoiding Asynchronous Boundaries

If you have a complex program that makes more than one call to `useBlocking`, then it may be worth running *all* of the ZIO-NIO parts using the blocking pool. This can be done by wrapping the effect value with your ZIO-NIO operations in `zio.blocking.blocking`.

If this isn't done, you can end up with the calls using `BlockingOps` running on a thread from the blocking pool, while the other parts run on a thread from the standard pool. This involves an "asynchronous boundary" whever the fiber changes the underlying thread it's running on, which imposes some overheads including a full memory barrier. By using `zio.blocking.blocking` up-front, all the code can run on the same thread from the blocking pool.

## Comparing the Channel Options

There are three main styles of channel available: blocking, non-blocking and asynchronous. Which to choose?

### Blocking Channels

Easy to use, with a straight-forward operation. The downsides are that you have to use `useBlocking`, which creates a new thread, and will create an additional thread for every forked fiber subsequently created. Essentially you have a blocked thread for every active I/O call, which limits scalability. Also, the additional interrupt handling logic imposes a small overhead.

### Non-Blocking Channels

These scale very well because you basically do as many concurrent I/O operations as you like without creating any new threads. The big downside is that they aren't of practical use without using a `Selector`, which is *very* tricky API to use correctly.

Note that while it is possible to use non-blocking channels without a `Selector`, this means you have to busy-wait on the channel for the simplest reads and writes. It's not efficient.

The other issue is that only network channels and pipes support non-blocking mode.

### Asynchronous Channels

Asynchronous channels give us what we want: we don't need a `Selector` to use them, and our thread will never block when we use them.

However, it should be noted that asynchronous file I/O is not currently possible on the JVM. `AsynchronousFileChannel` is performing blocking I/O using a pool of blocked threads, which exactly what `useBlocking` does, and shares the same drawbacks. It may be preferable to use a standard `FileChannel`, as you'll have more visibility and control over what's going on.

The asynchronous socket channels do *appear* to use non-blocking I/O, although they also have some form of internal thread pool as well. These should scale roughly as well as non-blocking channels. One downside is that there is no asynchronous datagram channel.
17 changes: 9 additions & 8 deletions docs/essentials/charsets.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ id: essentials_charsets
title: "Character Sets"
---

The `zio.nio.core.charset` package offers an API for ZIO programs to work with character sets, using the Java NIO support for character sets. Any character set supported by your JVM can be used.
The `zio.nio.charset` package offers an API for ZIO programs to work with character sets, using the Java NIO support for character sets. Any character set supported by your JVM can be used.

## `Charset`

Expand Down Expand Up @@ -31,9 +31,9 @@ JVMs typically support many more charsets than these; use `Charset.availableChar
### Example

```scala mdoc:silent
import zio.nio.core.charset._
import zio.nio.charset._
import zio.nio.file.Files
import zio.nio.core.file.Path
import zio.nio.file.Path

val s = "Hello, world!"
for {
Expand All @@ -49,19 +49,20 @@ Using streams instead of buffers or chunks is great for bigger jobs. ZIO Streams
Stream-based encoding and decoding are provided by the `transducer` method of the `CharsetEncoder` and `CharsetDecoder` classes respectively.

```scala mdoc:silent
import zio.nio.core.charset._
import zio.nio.core.channels.FileChannel
import zio.nio.core.file.Path
import zio.nio.charset._
import zio.nio.channels.FileChannel
import zio.nio.channels._
import zio.nio.file.Path
import zio.stream.ZStream
import zio.blocking.Blocking
import zio.console
import zio.ZIO

// dump a file encoded in ISO8859 to the console

FileChannel.open(Path("iso8859.txt")).use { fileChan =>
FileChannel.open(Path("iso8859.txt")).useNioBlockingOps { fileOps =>
val inStream: ZStream[Blocking, Exception, Byte] = ZStream.repeatEffectChunkOption {
fileChan.readChunk(1000).asSomeError.flatMap { chunk =>
fileOps.readChunk(1000).asSomeError.flatMap { chunk =>
if (chunk.isEmpty) ZIO.fail(None) else ZIO.succeed(chunk)
}
}
Expand Down
2 changes: 1 addition & 1 deletion docs/essentials/files.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ Required imports for presented snippets:
```scala mdoc:silent
import zio._
import zio.nio.channels._
import zio.nio.core.file._
import zio.nio.file._
import zio.console._
```

Expand Down
33 changes: 8 additions & 25 deletions docs/essentials/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,36 +3,19 @@ id: essentials_index
title: "Overview"
---

ZIO-NIO is a ZIO wrapper on Java NIO. It comes in two flavours:

- `zio.nio.core` - a small and unopionanted ZIO interface to NIO that just wraps NIO API in ZIO effects,
- `zio.nio` - an opinionated interface with deeper ZIO integration that provides more type and resource safety.

A very simple example of these differences would be the signature of `apply` method on `AsynchronousSocketChannel`:
```scala
//zio.nio.core
def apply(): IO[Exception, AsynchronousSocketChannel]
```
vs
```scala
//zio.nio
def apply(): Managed[Exception, AsynchronousSocketChannel]
```
ZIO-NIO is a ZIO wrapper on Java NIO, an opinionated interface with deep ZIO integration that provides type and resource safety.

## Installation

`ZIO-NIO` is available via maven repo. Add this to your dependencies in `sbt`:

```scala
libraryDependencies += "dev.zio" %% "zio-nio-core" % "1.0.0-RC10"
```
or
```scala
libraryDependencies += "dev.zio" %% "zio-nio" % "1.0.0-RC10"
```

## Main abstractions
## Main Abstractions

- **[Using Blocking APIs](blocking.md)** — How to deal with NIO APIs that block the calling thread
- **[File Channel](files.md)** — For processing files that are available locally. For every operation a new fiber is started to perform the operation.
- **[Socket Channel](sockets.md)** — Provides an API for remote communication with `InetSocket`s.
- **[Resource Management](resources.md)** - Avoiding resource leaks
Expand All @@ -45,15 +28,15 @@ When reading from channels, the end of the stream may be reached at any time. Th
```scala mdoc:silent
import zio._
import zio.blocking.Blocking
import zio.nio.core._
import zio.nio.core.channels._
import zio.nio.core.file.Path
import zio.nio._
import zio.nio.channels._
import zio.nio.file.Path
import java.io.IOException

val read100: ZIO[Blocking, Option[IOException], Chunk[Byte]] =
FileChannel.open(Path("foo.txt"))
.asSomeError
.use(_.readChunk(100).eofCheck)
.useNioBlockingOps(_.readChunk(100))
.eofCheck
```

End-of-stream will be signalled with `None`. Any errors will be wrapped in `Some`.
Expand Down
8 changes: 4 additions & 4 deletions docs/essentials/resources.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ The most straight-forward way to use a managed resource is with the `use` method
```scala mdoc:silent
import zio._
import zio.blocking.Blocking
import zio.nio.core.channels._
import zio.nio.core.file.Path
import zio.nio.channels._
import zio.nio.file.Path
import java.io.IOException

def useChannel(f: FileChannel): ZIO[Blocking, IOException, Unit] = ???
Expand Down Expand Up @@ -43,15 +43,15 @@ ZManaged.scope.use { scope =>
}

// use channel, perhaps with a Selector
channel.flatMap(_.readChunk(10))
channel.flatMap(_.useNonBlocking(_.readChunk(10)))

}
// the scope has now been released, as have all the resources attached to it
```

Note that `scope` returns both the resource and an "early release" effect. This allows you to release the resource before the scope exits, if you know it is no longer needed. This allows efficient use of the resource while still having the safety net of the scope to ensure the release happens even if there are failures, defects or interruptions.

The `zio.nio.core.channels.SelectorSpec` test demonstrates the use of scoping to ensure nothing leaks if an error occurs.
The `zio.nio.channels.SelectorSpec` test demonstrates the use of scoping to ensure nothing leaks if an error occurs.

### Using `close` for Early Release

Expand Down
16 changes: 8 additions & 8 deletions docs/essentials/sockets.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,19 +12,19 @@ import zio._
import zio.clock._
import zio.console._
import zio.nio.channels._
import zio.nio.core._
import zio.nio._
```

## Creating sockets

Creating a server socket:

```scala mdoc:silent
val server = AsynchronousServerSocketChannel()
val server = AsynchronousServerSocketChannel.open
.mapM { socket =>
for {
sockAddr <- InetSocketAddress.hostNameResolved("127.0.0.1", 1337)
_ <- socket.bindTo(sockAddr)
address <- InetSocketAddress.hostName("127.0.0.1", 1337)
_ <- socket.bindTo(address)
_ <- socket.accept.preallocate.flatMap(_.use(channel => doWork(channel).catchAll(ex => putStrLn(ex.getMessage))).fork).forever.fork
} yield ()
}.useForever
Expand All @@ -44,10 +44,11 @@ def doWork(channel: AsynchronousSocketChannel): ZIO[Console with Clock, Throwabl
Creating a client socket:

```scala mdoc:silent
val clientM: Managed[Exception, AsynchronousSocketChannel] = AsynchronousSocketChannel()
val clientM: Managed[Exception, AsynchronousSocketChannel] = AsynchronousSocketChannel.open
.mapM { client =>
for {
address <- InetSocketAddress.localHost(2552)
host <- InetAddress.localHost
address <- InetSocketAddress.inetAddress(host, 2552)
_ <- client.connect(address)
} yield client
}
Expand All @@ -58,8 +59,7 @@ Reading and writing to a socket:
```scala mdoc:silent
for {
serverFiber <- server.fork
clientFiber <- clientM.use(_.writeChunk(Chunk.fromArray(Array(1, 2, 3).map(_.toByte)))).fork
_ <- clientFiber.join
_ <- clientM.use(_.writeChunk(Chunk.fromArray(Array(1, 2, 3).map(_.toByte))))
_ <- serverFiber.join
} yield ()
```
2 changes: 2 additions & 0 deletions docs/usecases/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,5 @@ So now where to use it? Here you can find some examples to dive into:

- [Stream-based socket server](https://github.com/zio/zio-nio/blob/master/examples/src/main/scala/StreamsBasedServer.scala)
- [Stream-based character decoding](https://github.com/zio/zio-nio/blob/master/examples/src/main/scala/TextFileDump.scala)
- [Stream-based directory watcher](https://github.com/zio/zio-nio/blob/master/examples/src/main/scala/StreamDirWatch.scala)
- [Transducer-based socket server](https://github.com/zio/zio-nio/blob/master/examples/src/main/scala/ToUppercaseAsAService.scala)
Loading

0 comments on commit 04361c7

Please sign in to comment.