diff --git a/docs/modules/ROOT/pages/blocking-safe-by-default.adoc b/docs/modules/ROOT/pages/blocking-safe-by-default.adoc
index c3935c0762..119fd0588d 100644
--- a/docs/modules/ROOT/pages/blocking-safe-by-default.adoc
+++ b/docs/modules/ROOT/pages/blocking-safe-by-default.adoc
@@ -52,31 +52,31 @@ __offload__ invocation of user code out of the event loop to a different thread.
Although the outcome is simple that no user code should run on the event loop, implementations of this offloading are
often either naive hence sub-optimal or fairly complex. ServiceTalk internally does the heavy lifting to make sure that
-it does not offload more than what is required. In other words, it reduces offloading when it can deduce that no user
+it does not offload more than what is required. In other words, it reduces offloading when it can determine that no user
code can interact with the event loop on a certain path.
+[#execution-strategy]
=== Execution Strategy
-An link:{source-root}/servicetalk-transport-api/src/main/java/io/servicetalk/transport/api/ExecutionStrategy.java[Execution Strategy]
-has two primary purposes:
-
-. Define which interaction paths require offloading.
-. Optionally specify an
-link:{source-root}/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/Executor.java[Executor]
-to use for offloading. In absence of a specified `Executor`, ServiceTalk will use a default `Executor`.
-
-At a general transport layer (no application level protocol), only two offload paths are available:
+The primary purpose of an link:{source-root}/servicetalk-transport-api/src/main/java/io/servicetalk/transport/api/ExecutionStrategy.java[Execution Strategy]
+is to define which interaction paths of a particular transport or protocol layer require offloading. For
+example, at the xref:{page-version}@servicetalk-http-api::blocking-safe-by-default.adoc[HTTP] transport layer, four
+offload paths are available:
. Sending data to the transport.
. Receiving data from the transport.
+. Handling transport events.
+. Closing the transport.
-However, different protocols, eg: xref:{page-version}@servicetalk-http-api::blocking-safe-by-default.adoc[HTTP] may
-provide more sophisticated offloading paths that can be controlled by a strategy.
+A given application, filter or component may indicate that it requires offloading for none, some or all of these
+interactions. Protocols other than HTTP will have their own APIs that would otherwise execute user code on an
+`EventLoop` thread and define their own `ExecutionStrategy` to control which of those interaction paths APIs are
+subject to offloading.
[#influencing-offloading-decisions]
=== Influencing offloading decisions
-ServiceTalk does the heavy lifting of determining the optimal offloading strategy. This optimal
+ServiceTalk will determine the optimal offloading strategy for handling requests and responses. This optimal
strategy is determined based on different inputs as outlined below:
. __xref:{page-version}@servicetalk::programming-paradigms.adoc[Programming Paradigms].__: A certain programming model
@@ -103,5 +103,6 @@ the strategy for a client/server, but it comes with additional responsibility fr
=== ServiceTalk developers
-Internals of ServiceTalk still discourages blocking code and hence should be avoided while contributing to ServiceTalk.
-Unless there is a valid reason to do it, ServiceTalk internals are always non-blocking.
+The internal implementation of ServiceTalk generally discourages blocking code and hence introducing blocking should be
+avoided while contributing to ServiceTalk. Unless there is a valid reason to do it, ServiceTalk internals are always
+non-blocking.
diff --git a/docs/modules/ROOT/pages/performance.adoc b/docs/modules/ROOT/pages/performance.adoc
index b5012e6e41..4c3461d157 100644
--- a/docs/modules/ROOT/pages/performance.adoc
+++ b/docs/modules/ROOT/pages/performance.adoc
@@ -115,22 +115,16 @@ Because ServiceTalk is asynchronous and non-blocking at the core it needs to def
that potentially blocks IO EventLoop threads. Read the chapter on
xref:{page-version}@servicetalk::blocking-safe-by-default.adoc[blocking code safe by default] for more details.
-`ServiceTalk` has
-xref:{page-version}@servicetalk-concurrent-api::blocking-safe-by-default.adoc#executor-affinity[Executor Affinity] which
-ensures that the same `Executor` is used for each `Subscriber` chain on asynchronous operator boundaries. By default
-when using the streaming APIs this requires wrapping on the asynchronous operator boundaries and may result in
-additional thread hops between ``Executor``s (and even if ``Executor`` happens to be the same). Depending upon the use
-case, performance costs maybe relatively high but we have the following compensatory strategies in place:
-
-* ServiceTalk team is investigating ways to reduce the cost of offloading for streaming APIs
-* choosing the appropriate xref:{page-version}@servicetalk::performance.adoc#offloading-and-flushing[programming model]
-for your use case allows us to be more optimal and reduce offloading
-* you can opt-out of (some or all) offloading via
+Offloading is expected to add some overhead which can be mitigated as follows:
+
+* Choosing the appropriate xref:{page-version}@servicetalk::performance.adoc#offloading-and-flushing[programming model]
+for your use case allows ServiceTalk to be more optimal and reduce offloading
+* Opting-out of some or all offloading through use of
xref:{page-version}@servicetalk::performance.adoc#ExecutionStrategy[ExecutionStrategy]
== Tuning options and recommendations
-Below sections will offer suggestions that may improve performance depending on your use-case.
+The following sections will offer suggestions that may improve performance depending on your use-case.
[#offloading-and-flushing]
=== Programming model (offloading & flushing)
@@ -242,7 +236,7 @@ HttpServers.forPort(8080)
_`FlushStrategies` and related APIs are advanced, internal, and subject to change._
[#ExecutionStrategy]
-==== ExecutionStrategy (offloading)
+==== Offloading and `ExecutionStrategy`
link:{source-root}/servicetalk-transport-api/src/main/java/io/servicetalk/transport/api/ExecutionStrategy.java[ExecutionStrategy]
is the core abstraction ServiceTalk uses to drive offloading delivering signals and data from the IO EventLoop threads.
For HTTP there is
diff --git a/docs/modules/ROOT/pages/programming-paradigms.adoc b/docs/modules/ROOT/pages/programming-paradigms.adoc
index ec161dc966..1de502e562 100644
--- a/docs/modules/ROOT/pages/programming-paradigms.adoc
+++ b/docs/modules/ROOT/pages/programming-paradigms.adoc
@@ -16,6 +16,7 @@ complexity of asynchronous control flow in these cases. This can dramatically lo
compared with most non-blocking I/O frameworks and avoid "application re-write" if scaling/data size characteristics
change over time.
+[#blocking-vs-synchronous]
== Blocking vs Synchronous
ServiceTalk APIs may use the term "blocking" in areas where the APIs may be identified as "synchronous". "blocking" in
this context is meant to declare that the API "may block" the calling thread. This is done because there is no general
diff --git a/servicetalk-concurrent-api/docs/modules/ROOT/assets/images/blocking-scenarios.png b/servicetalk-concurrent-api/docs/modules/ROOT/assets/images/blocking-scenarios.png
deleted file mode 100644
index 077b163117..0000000000
Binary files a/servicetalk-concurrent-api/docs/modules/ROOT/assets/images/blocking-scenarios.png and /dev/null differ
diff --git a/servicetalk-concurrent-api/docs/modules/ROOT/assets/images/blocking-scenarios.svg b/servicetalk-concurrent-api/docs/modules/ROOT/assets/images/blocking-scenarios.svg
new file mode 100644
index 0000000000..86d951cb62
--- /dev/null
+++ b/servicetalk-concurrent-api/docs/modules/ROOT/assets/images/blocking-scenarios.svg
@@ -0,0 +1,237 @@
+
+
+
diff --git a/servicetalk-concurrent-api/docs/modules/ROOT/assets/images/executor-thread-selection.png b/servicetalk-concurrent-api/docs/modules/ROOT/assets/images/executor-thread-selection.png
deleted file mode 100644
index 83cb2d17d6..0000000000
Binary files a/servicetalk-concurrent-api/docs/modules/ROOT/assets/images/executor-thread-selection.png and /dev/null differ
diff --git a/servicetalk-concurrent-api/docs/modules/ROOT/assets/images/offloading.svg b/servicetalk-concurrent-api/docs/modules/ROOT/assets/images/offloading.svg
new file mode 100644
index 0000000000..7ff158b9a8
--- /dev/null
+++ b/servicetalk-concurrent-api/docs/modules/ROOT/assets/images/offloading.svg
@@ -0,0 +1,217 @@
+
+
+
diff --git a/servicetalk-concurrent-api/docs/modules/ROOT/pages/_partials/nav-versioned.adoc b/servicetalk-concurrent-api/docs/modules/ROOT/pages/_partials/nav-versioned.adoc
index 32ecbc3769..caebe0a62e 100644
--- a/servicetalk-concurrent-api/docs/modules/ROOT/pages/_partials/nav-versioned.adoc
+++ b/servicetalk-concurrent-api/docs/modules/ROOT/pages/_partials/nav-versioned.adoc
@@ -2,3 +2,4 @@
* xref:{page-version}@servicetalk-concurrent-api::blocking-safe-by-default.adoc[Blocking safe by default]
** xref:{page-version}@servicetalk-concurrent-api::blocking-implementation.adoc[Implementation details]
* xref:{page-version}@servicetalk-concurrent-api::async-context.adoc[Async Context]
+* xref:{page-version}@servicetalk-concurrent-api::pitfalls.adoc[Concurrency Pitfalls]
diff --git a/servicetalk-concurrent-api/docs/modules/ROOT/pages/blocking-implementation.adoc b/servicetalk-concurrent-api/docs/modules/ROOT/pages/blocking-implementation.adoc
index 1561766dca..21147ea186 100644
--- a/servicetalk-concurrent-api/docs/modules/ROOT/pages/blocking-implementation.adoc
+++ b/servicetalk-concurrent-api/docs/modules/ROOT/pages/blocking-implementation.adoc
@@ -8,9 +8,10 @@ endif::[]
= Blocking safe by default (Implementation Details)
-As described xref:{page-version}@servicetalk-concurrent-api::blocking-safe-by-default.adoc[here], ServiceTalk by default
-allows users to write blocking code when interacting with ServiceTalk. This document describes the details of the
-implementation and is addressed to audiences who intend to know the internals of how this is achieved.
+As described in the section
+xref:{page-version}@servicetalk-concurrent-api::blocking-safe-by-default.adoc[Blocking Safe By Default],
+ServiceTalk, by default, allows users to write blocking code when interacting with ServiceTalk. This document describes
+the details of the implementation and is addressed to audiences who intend to know the internals of how this is achieved.
NOTE: It is not required to read this document if you just want to use ServiceTalk.
@@ -19,78 +20,59 @@ NOTE: It is not required to read this document if you just want to use ServiceTa
Everything inside ServiceTalk is somehow connected to one of the three asynchronous sources, viz., `Publisher`, `Single`
and `Completable`. Since these sources are the building blocks for program control flow if they provide safety
guarantees for blocking code execution these guarantees apply outside the scope of preventing blocking code from
-executing on event loop thread. This approach is designed to make the task of ensuring we don't block the event loop
-threads less error prone, and also allows for certain optimizations around thread context propagation and re-use.
+executing on an EventLoop thread. This approach is designed to make the task of ensuring we don't block the EventLoop
+threads less error-prone, and also allows for certain optimizations around thread context propagation and re-use.
== Threads and asynchronous sources
-An asynchronous source has to decide at two points about which thread will be used:
+An asynchronous source has two important decisions to make about thread usage:
-1. Thread which is used to do the actual work related to a source. eg: for an HTTP client, the work is to send an HTTP
-request and read the HTTP response.
-2. Thread which is used to interact with the `Subscriber` corresponding to its `Subscription`s.
+1. Which thread or executor will be used to do the actual task related to a source. eg: for an HTTP client, the task
+is to send an HTTP request and read the HTTP response.
+2. Which thread or executor will be used to interact with the `Subscriber` corresponding to its `Subscription`s.
Part 1. above is not governed by the
link:https://github.com/reactive-streams/reactive-streams-jvm/blob/v1.0.3/README.md#specification[ReactiveStreams specification]
-and hence sources are free to use any thread. ServiceTalk typically will use Netty's `EventLoop` to do the actual work.
+and hence sources are free to use any thread. ServiceTalk typically will use Netty's `EventLoop` to perform the actual
+task.
+
Part 2. defines all the interactions using the ReactiveStreams specifications, i.e. all methods in `Publisher`,
-`Subscriber` and `Subscription`.
-
-ServiceTalk concurrency APIs defines which thread will be used by any asynchronous source for Part 2.
-
-== Executor and asynchronous sources
-
-ServiceTalk enforces that each asynchronous source **MUST** be associated with an `Executor` which is used to interact
-with its `Subscriber`. The below diagram illustrates the interaction between an asynchronous source, its `Subscriber`,
-its operators, and the `Executor`.
-
-image::executor-thread-selection.png[Executor inference]
-
-The above interaction provides all operators and the source an opportunity to piggy-back on the same `Executor` for all
-interactions with the `Subscriber`. Any operator in a chain has two options while accepting a `Subscriber`:
-
-- Wrap the `Subscriber` such that all `Subscriber` and `Subscription` method calls are offloaded to a chosen
-`Executor`. This mode is for operators that process signals from the original `Publisher` asynchronously.
-- Do not wrap the `Subscriber` and forward all `Subscriber` and `Subscription` methods directly on the calling thread to
-avoid the overhead of wrapping each and every `Subscriber` and `Subscription` in a chain. This mode is for operators
-that process signals from the original `Publisher` synchronously.
-
-Taking the same example from xref:{page-version}@servicetalk-concurrent-api::blocking-safe-by-default.adoc[here]
-
-[source, java]
-----
- client.request() # <1>
- .map(resp -> {
- return resp.toString(); # <2>
- })
- .flatMap(stringResp -> { # <3>
- return client2.request(stringResp);
- })
- .filter(stringResp -> {
- stringResp.equals("Hello World"); # <4>
- });
-----
-<1> A hypothetical client which provides a `request()` method that returns a `Single`.
-<2> Converting the response to a `String`.
-<3> Call another `client2` that provides a new `Single` which is returned from `flatMap`.
-<4> Only allow "Hello World" messages to be emitted.
-
-In the above example the operators `map` and `filter` will not wrap `Subscriber` and `Subscription` since they do not do
-any asynchronous work. However, `flatmap` will wrap `Subscriber` and `Subscription` to offload the calls to the chosen
-`Executor`.
-
-== Why should we wrap `Subscriber` and `Subscription`?
-
-There are two places we would wrap `Subscriber` and `Subscription`:
-
-1. Original asynchronous sources.
-2. Asynchronous operators. eg: `flatMap`
-
-Since every asynchronous source is associated with an `Executor`, it is required to use the `Executor` for interacting
-with `Subscriber` and `Subscription`.
-
-If we do not wrap for asynchronous operators then in the above example, operator `filter` (4) will be invoked in the
-`Executor` defined by `client2` inside the `flatmap`. This may lead to inadvertent and not initially obvious
-blocking of an event loop thread. Consider a scenario where `client2` executes user code on an event loop thread, but
-the original `client` executes user code on an application thread (which allows blocking). In this scenario it may look
-like blocking code is OK at point (4) above, but that will actually result in blocking `client2`'s event loop thread.
+`Subscriber` and `Subscription`. The ReactiveStreams specification requires that signals are not delivered
+link:https://github.com/reactive-streams/reactive-streams-jvm/blob/v1.0.3/README.md#1.3[concurrently],
+but doesn't have any restrictions about which threads are used. This means the same thread maybe used for all signal
+deliveries for a given `Subscriber`, but it is also valid to use any thread (as long as no concurrency is introduced).
+ServiceTalk concurrency APIs are used to define which executor will be used for an asynchronous source for Part 2,
+which is typically an application `Executor`.
+
+== Offloading and asynchronous sources
+
+ServiceTalk uses the `link:{source-root}/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/Executor.java[Executor]`
+abstraction to specify the source of threads to be used for the delivery of signals from an asynchronous source. The
+default signal offloading, if any, used by an asynchronous source is determined by the source. For example, the HTTP
+sources, in addition to allowing for specification of an offloading executor, provide both direct control of the
+offloading via
+`xref:{page-version}@servicetalk::blocking-safe-by-default.adoc#execution-strategy[ExecutionStrategy]`
+and may also influenced by the
+xref:{page-version}@servicetalk::blocking-safe-by-default.adoc#influencing-offloading-decisions[computed execution strategy].
+
+Applications with asynchronous, blocking, or computationally expensive tasks can also offload those tasks to specific
+`Executor`. The `subscribeOn(Executor)` and `publishOn(Executor)` operators will cause offloading execution from the
+default signal delivery thread to a thread from the provided `Executor`. The below diagram illustrates the interaction
+between an asynchronous source, its `Subscriber`, its operators, and the `Executor`.
+
+During `Subscriber` method execution, the result publication and termination signals, the `Executor` active at the
+source is inherited by all operators unless there is a reason to switch to a different `Executor`. The switch to another
+executor, offloading, is done for a couple of reasons; unless configured to not offload ServiceTalk will offload from
+the Netty `EventLoop` thread as necessary in order to allow user code to block.
+
+image::offloading.svg[Offloading]
+
+During `subscribe()` the execution will offload at the `subscribeOn()` operator and transition execution from the
+subscribing thread to an `Exeuctor` thread. The subscribing thread will be able to continue while the subscribe
+operation asynchronously continues on an `Executor` thread.
+
+The diagram shows a typical case, when a result is available at the source it will begin publication on the receiving
+Netty EventLoop thread. Assuming that the default ServiceTalk offloading has been disabled, then offloading will only
+happen at the `publishOn()` operator during `Subscriber` signals and will transition execution from the EventLoop thread
+to an `Executor` thread. Once the `Subscriber` signal is offloaded the EventLoop thread will be available again for
+executing other I/O tasks while the response is asynchronously processed on the `Executor` thread.
diff --git a/servicetalk-concurrent-api/docs/modules/ROOT/pages/blocking-safe-by-default.adoc b/servicetalk-concurrent-api/docs/modules/ROOT/pages/blocking-safe-by-default.adoc
index d5e08125d5..4b4706415e 100644
--- a/servicetalk-concurrent-api/docs/modules/ROOT/pages/blocking-safe-by-default.adoc
+++ b/servicetalk-concurrent-api/docs/modules/ROOT/pages/blocking-safe-by-default.adoc
@@ -20,18 +20,18 @@ called an execution chain.
=== Data and control flow in an execution chain
The image below details the flow of data (`Subscriber` methods) and control (`Publisher` and `Subscription` methods)
-messages. Invocations in either direction may be executed on an event loop thread and hence needs to be protected. In
+messages. Invocations in either direction may be executed on an EventLoop thread and hence needs to be protected. In
case new asynchronous sources are generated/received inside operators, they follow the same model and hence are removed
for brevity.
-image::blocking-scenarios.png[Data and control flow in an execution chain]
+image::blocking-scenarios.svg[Data and control flow in an execution chain]
As shown in the above picture, there are inherently two directions (data and control) of information flow for an
execution chain and these
link:https://github.com/reactive-streams/reactive-streams-jvm/blob/v1.0.3/README.md#glossary[signals] can be triggered
in parallel.
-**By default, in ServiceTalk, signals are not executed on an event loop thread, but instead executed using an
+**By default, in ServiceTalk, signals are not executed on an EventLoop thread, but instead executed using an
link:{source-root}/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/Executor.java[Executor]
provided by the application in the order they are received.**
@@ -42,121 +42,327 @@ The implication of the above approach is the following:
**Users can execute blocking code inside an execution chain, provided they are not waiting for another data or control
message in the same execution chain.**
-Look at <> section for examples elaborating this implication.
+== Task Offloading
+ServiceTalk uses Netty for network I/O and Netty uses a fixed number of EventLoop threads for executing I/O operations.
+The number of EventLoop threads correlates to the number of CPU cores and *not* to the number of requests. This is
+because in many cases threads typically sit idle while waiting for I/O to complete. Sharing threads helps minimize
+resource consumption and improves scalability. However, sharing threads for different requests means all control flow
+for a single request will impact all other requests that share the same thread. If the control flow blocks the current
+thread for a longer time period (aka "blocks the thread") (e.g. external I/O) this may negatively impact latency and
+throughput. One approach to ensure I/O thread availability is to carefully limit the scope of work done by I/O threads
+and, whenever practical, delegate all other necessary tasks that are not related to I/O to some other thread. Moving
+tasks from I/O threads to other threads is called “offloading” and is a core technique used by ServiceTalk.
+
+Offloading is used for two purposes within ServiceTalk; firstly for the
+execution needed for the handling of asynchronous events, aka
+link:https://github.com/reactive-streams/reactive-streams-jvm/blob/v1.0.3/README.md#glossary[Signals],
+and secondly when handling asynchronous events, protecting scarce resources from being monopolized by blocking,
+untrusted or expensive application code. Asynchronous execution requires offloading – the initiating calling thread
+is not available. The use of protective offloading, where offloading is used for avoid resource monopolization, is a
+practical consideration, but just as necessary for reliable and predictable operation.
+
+ServiceTalk will, by default, execute most application code on threads other than the Netty I/O threads.
+For most invocations of application code, if the application developer knows that their code cannot block and always
+executes quickly in near constant time they can request that ServiceTalk not offload their code. This will improve
+application performance by reducing latency and overhead. Requests to not offload will be honored by ServiceTalk if all
+the other components in the same execution path have also opted out of offloading. See
+xref:{page-version}@servicetalk::blocking-safe-by-default.adoc#execution-strategy[Execution Strategy]
+for more information on how components may specify their offloading requirements. As a last resort, tasks may also be
+queued to be performed as threads are available.
+
+ServiceTalk is designed to be fully asynchronous except where the API provides explicit blocking behavior xref:{page-version}@servicetalk::programming-paradigms.adoc[as a convenience].
+
+ServiceTalk uses a task based approach for offloading, using `Executor` in the standard "fire-and-forget" way to run the
+offloaded tasks. Often the `Executor` has a pool of threads, possibly unbounded, and tasks are run using whatever thread
+is available. In particular, different threads may be used for each task executed and code running in tasks cannot
+depend upon a consistent thread being used for invoking program logic. This approach is generally the most scalable
+because it makes the best utilization of threads. If it is necessary to share state between tasks then
+`link:{source-root}/servicetalk-context-api/src/main/java/io/servicetalk/context/api/ContextMap.java[ContextMap]`s
+can be used.
+
+If the default ServiceTalk offloading has been disabled then user code may still wish to offload execution to a
+different `Executor` in specific cases. ServiceTalk provides reactive operators for explicitly offloading both
+`subscribe` and `Subscriber` via the `subscribeOn(executor)` operator as well as the `Subscription` methods via the
+`publishOn(executor)` operator. These operators will unconditionally offload to the specified `Executor`. Additional,
+conditional offloading operators; `subscribeOn(executor, predicate)` and `publishOn(executor, predicate)` are also
+available.
+
+=== `publishOn()` Example
+
+Using the `publishOn(executor)` operator allows processing of signals related to the source content on a different
+thread than is generating the content. It is the most common form of offloading used as it relates to the data path,
+specifically the `Subscription` methods.
-[#executor-affinity]
-=== Executor Affinity
+[source, java]
+----
+Collection result = Publisher.range(1, 10) <2> <5>
+ .map(element -> element) // non-offloaded NO-OP
+ .publishOn(publishExecutor) <4>
+ .map(element -> element) // offloaded NO-OP
+ .toFuture() <3>
+ .get(); <1> <6>
+----
+
+<1> `toFuture()` begins by calling `subscribe(Subscriber)`. Executing on the calling thread, execution flows up
+the operator chain towards the source; `map` -> `publishOn` -> `map` -> `Publisher.range(1, 10)`.
+
+<2> Still executing on the calling thread, `Range` will call `Subscriber.onSubscribe(Subscription)` on the
+`Subscriber`. This flows back down the operator chain, `Range` -> `map` -> `publishOn` (offloads on to `publishExecutor`
+thread) -> `map` -> `toFuture`.
+
+<3> The `onSubscribe(Subscription)` method of `toFuture()`'s `Subscriber` will call
+`Subscription.request(Long.MAX_VALUE)`. Execution flows up the operator chain towards the source;
+`map` -> `publishOn` -> `map` -> `Range`. `Range` will publish synchronously via `onNext(element)` nine items, the
+integers "`1`" through "`9`".
+
+<4> Each `onNext` flows down the operator chain, `Range` -> `map` -> `publishOn` (offloads on to `publishExecutor`
+thread) -> `map` -> `toFuture` where the element is collected for the result. For each offloaded item a thread of
+`publishExecutor` will be used for executing the second `map` operator and final collect operation.
-In ServiceTalk different protocols will provide request processing by means of an asynchronous source, eg: `Single`.
-In such cases, this processing will be expressed as a chain of operators on the asynchronous source. Configuring limits
-on resources such as a thread pool can be challenging if **each** operator in the chain may run on different threads.
-This would mean applying limits to such a thread pool would be sensitive to program's execution flow and may change over
-time. In order to make thread pool configuration easier, ServiceTalk provides executor affinity for all asynchronous
-sources.
+<5> After all items, `Range` sends the terminal `onComplete()` signal synchronously which flows down the operator chain,
+`Range` -> `map -> `publishOn` (offloads on to `publishExecutor` thread) -> `map` -> `toFuture` and will complete the
+`Future` with the integer collection result.
-==== Affinity for an asynchronous source
+<6> The calling thread will wait at `get()` for the `Future` result to be asynchronously completed.
-Let us assume that following expresses request processing:
+=== `subscribeOn()` Example
+
+Using the `subscribeOn(executor)` operator allows processing of the subscription and demand on a specific thread. Using
+the `subscribeOn(executor)` operator is necessary if the thread doing the `subscribe` or interacting with the
+`Subscription` is sensitive to blocking, for example, an EventLoop thread.
[source, java]
----
- client.request() # <1>
- .map(resp -> {
- return doSomeBlockingWorkAndConvertToString(resp); # <2>
- })
- .filter(stringResp -> {
- doMoreBlockingWorkToFilterString(stringResp); # <3>
- });
+Collection result = Publisher.range(1, 10) <2> <4>
+ .map(element -> element) // NO-OP
+ .subscribeOn(subscribeExecutor)
+ .toFuture() <1> <3> <5>
+ .get(); <6>
----
-<1> A hypothetical client which provides a `request()` method that returns a `Single`.
-<2> Some blocking work done inside `map()` operator by this function, which provides a String as a response.
-<3> Some more blocking work inside `filter()` operator.
-In the above example user provided functions inside both `map()` and `filter()` will be invoked using the specified
-`Executor`.
+<1> `toFuture()` will do a `subscribe(Subscriber)`. This flows up the operator chain toward the source;
+`subscribeOn` (offload onto `subscribeExecutor` thread) -> `map` -> `Range`.
+
+<2> Still on a thread from `subscribeExecutor` `Range` will call `Subscriber.onSubscribe(Subscription)` on the
+`Subscriber`. This flows back down the operator chain, `Range` -> `map` -> `subscribeOn` -> `toFuture`.
+
+<3> Still on the`subscribeExecutor` thread, `toFuture()`'s `onSubscribe(Subscription)` call
+`Subscription.request(Long.MAX_VALUE)`. This flows up the operator chain, `subscribeOn` (offloads again onto another
+`subscribeExecutor` thread) -> `map` -> `Range`.
+
+<4> Still on thread from the second offload to `subscribeExecutor`, `Range` ` will publish synchronously via
+`onNext(element)` nine items, the integers "`1`" through "`9`". Each `onNext` flows back down the operator chain,
+`Range` -> `map` -> `subscribeOn` -> `toFuture` where the element is collected for the result.
-==== Affinity across asynchronous sources
+<5> Still on thread from the second offload to `subscribeExecutor`, after all items, `Range` will call `onComplete`.
+When the `toFuture()` `Subscriber` receives the `onComplete()` signal it will complete the `Future` with the integer
+collection result.
-`Executor` affinity across asynchronous sources is not guaranteed **even if they are part of the same processing chain**.
+<6> The calling thread will wait at `get()` for the `Future` result to be asynchronously completed.
-Let us assume that following expresses request processing:
+=== publishOn()/subscribeOn() Detailed Example
+
+These examples can be expanded to demonstrate the offloading behavior directly. The expanded example extends the NO-OP
+`map` implementations to reveal the active thread during their execution. To show the active thread at the other
+points described in the callouts the expanded example also adds `whenOnSubscribe`, `whenRequest`, `liftSync` and
+`whenFinally` operations in the operator chain. The output of the example shows the thread used for executing each of
+the operators, while the specialized operators provide examples of how you might use them to debug your own programs.
[source, java]
----
- client.request() # <1>
- .map(resp -> {
- return doSomeBlockingWorkAndConvertToString(resp); # <2>
- })
- .flatMap(stringResp -> {
- return client2.request(stringResp) # <3>
- .map(resp -> {
- doSomeBlockingWorkAgain(); # <4>
- });
- })
- .filter(stringResp -> {
- doMoreBlockingWorkToFilterString(stringResp); # <5>
- });
+Collection> result = Publisher.range(1, 3)
+ .map(element -> {
+ System.out.println("\nPublish starts on " + Thread.currentThread() + " Received : " + element);
+ return element;
+ })
+ .whenOnSubscribe(subscription -> {
+ System.out.println("\nonSubscribe starts on " + Thread.currentThread());
+ })
+ .publishOn(publishExecutor)
+ .map(element -> {
+ System.out.println("\nPublish offloaded to " + Thread.currentThread() + " Received : " + element);
+ return element;
+ })
+ .whenRequest(request -> {
+ System.out.println("\nrequest(" + request + ") offloaded to " + Thread.currentThread());
+ })
+ .liftSync(subscriber -> {
+ System.out.println("\nSubscribe offloaded to " + Thread.currentThread());
+ return subscriber;
+ })
+ .subscribeOn(subscribeExecutor)
+ .liftSync(subscriber -> {
+ System.out.println("\nSubscribe begins on " + Thread.currentThread());
+ return subscriber;
+ })
+ .whenOnSubscribe(subscription -> {
+ System.out.println("\nonSubscribe offloaded to " + Thread.currentThread());
+ })
+ .whenRequest(request -> {
+ System.out.println("\nrequest(" + request + ") starts on " + Thread.currentThread());
+ })
+ .whenFinally(new TerminalSignalConsumer() {
+ @Override
+ public void onComplete() {
+ System.out.println("\ncomplete on " + Thread.currentThread());
+ }
+
+ @Override
+ public void onError(final Throwable throwable) {
+ System.out.println("\nerror (" + throwable + ") on " + Thread.currentThread());
+ }
+
+ @Override
+ public void cancel() {
+ System.out.println("\ncancel on " + Thread.currentThread());
+ }
+ })
+ .toFuture()
+ .get();
----
-<1> A hypothetical client which provides a `request()` method that returns a `Single`.
-<2> Some blocking work done inside `map()` operator by this function, which provides a String as a response.
-<3> Call another `client2` that provides a new `Single` which is returned from `flatmap`.
-<4> Do some blocking work inside `map()` operator for the nested `Single` returned by Step (3).
-<5> Some more blocking work inside `filter()` operator.
-In the above example (2), (3) and (5) will run on one `Executor` whereas (4) will run on a different `Executor`.
+If the default ServiceTalk offloading has been disabled then user code may still wish to offload execution to a
+different `Executor` in specific cases. ServiceTalk provides reactive operators for explicitly offloading both
+`subscribe` and `Subscriber` via the `subscribeOn(executor)` operator as well as the `Subscription` methods via the
+`publishOn(executor)` operator. These operators will unconditionally offload to the specified `Executor`. Additional,
+conditional offloading operators; `subscribeOn(executor, predicate)` and `publishOn(executor, predicate)` are also
+available.
-== Pitfalls
+=== `publishOn()` Example
-As defined in <>, ServiceTalk sequences events in data and control path of processing as if they were
-done on the same thread. Since, data and control events may happen in parallel, there is always a chance for user code
-to deadlock if they are executed in sequence.
+Using the `publishOn(executor)` operator allows processing of signals related to the source content on a different
+thread than is generating the content. It is the most common form of offloading used as it relates to the data path,
+specifically the `Subscription` methods.
[source, java]
----
- CountDownLatch latch = new CountDownLatch(1); # <1>
- Publisher.from(1, 2, 3, 4)
- .afterOnNext(integer -> {
- latch.countDown(); # <2>
- })
- .beforeRequest(requestN -> {
- latch.await(); # <3>
- });
+Collection result = Publisher.range(1, 10) <2> <5>
+ .map(element -> element) // non-offloaded NO-OP
+ .publishOn(publishExecutor) <4>
+ .map(element -> element) // offloaded NO-OP
+ .toFuture() <3>
+ .get(); <1> <6>
----
-<1> Hypothetical synchronization point. In real life it may be due to the code waiting for an event to happen externally.
-<2> Trigger the external event (hypothetical synchronization point of `CountDownLatch`) **after** receiving the item.
-<3> Wait for the external event to happen (hypothetical synchronization point of `CountDownLatch`) **before** sending
-`requestN` to the `Publisher`.
-As per ReactiveStreams link:https://github.com/reactive-streams/reactive-streams-jvm#1.1[rule 1.1], request for items
-**MUST** happen before the items are delivered. In the above code, we are waiting for an item to be emitted before
-sending a request to the `Publisher`. This results in a deadlock as an item can not be emitted by the source without a
-request being received and user code making sure that the request is not sent before an item is emitted.
+<1> `toFuture()` begins by calling `subscribe(Subscriber)`. Executing on the calling thread, execution flows up
+the operator chain towards the source; `map` -> `publishOn` -> `map` -> `Publisher.range(1, 10)`.
+
+<2> Still executing on the calling thread, `Range` will call `Subscriber.onSubscribe(Subscription)` on the
+`Subscriber`. This flows back down the operator chain, `Range` -> `map` -> `publishOn` (offloads on to `publishExecutor`
+thread) -> `map` -> `toFuture`.
+
+<3> The `onSubscribe(Subscription)` method of `toFuture()`'s `Subscriber` will call
+`Subscription.request(Long.MAX_VALUE)`. Execution flows up the operator chain towards the source;
+`map` -> `publishOn` -> `map` -> `Range`. `Range` will publish synchronously via `onNext(element)` nine items, the
+integers "`1`" through "`9`".
-In order to avoid such scenarios, it is handy to follow certain best practices while writing blocking code in operators:
+<4> Each `onNext` flows down the operator chain, `Range` -> `map` -> `publishOn` (offloads on to `publishExecutor`
+thread) -> `map` -> `toFuture` where the element is collected for the result. For each offloaded item a thread of
+`publishExecutor` will be used for executing the second `map` operator and final collect operation.
-- Avoid co-ordination between two operators on the same source.
-- If such co-ordination is required, try limiting such coordination in either data or control path but not
-inter-dependent on each other.
-- If co-ordination is required between data and control path, be aware of ReactiveStreams semantics and how the two
-paths interact with each other.
+<5> After all items, `Range` sends the terminal `onComplete()` signal synchronously which flows down the operator chain,
+`Range` -> `map -> `publishOn` (offloads on to `publishExecutor` thread) -> `map` -> `toFuture` and will complete the
+`Future` with the integer collection result.
-If these rules are followed the above example can be modified to:
+<6> The calling thread will wait at `get()` for the `Future` result to be asynchronously completed.
+
+=== `subscribeOn()` Example
+
+Using the `subscribeOn(executor)` operator allows processing of the subscription and demand on a specific thread. Using
+the `subscribeOn(executor)` operator generally requires an understanding of the behavior of the source; using a
+different source may change the need for offloading. `subscribeOn(executor)` is used less frequently than
+`publishOn(executor)` but is useful when it is necessary to offload the control path; the `subscribe` method or
+`Subscription` methods.
[source, java]
----
- CountDownLatch latch = new CountDownLatch(1);
- Publisher.from(1, 2, 3, 4)
- .afterOnNext(integer -> {
- latch.countDown();
- })
- .afterRequest(requestN -> { # <1>
- latch.await();
- });
+Collection result = Publisher.range(1, 10) <2> <4>
+ .map(element -> element) // NO-OP
+ .subscribeOn(subscribeExecutor)
+ .toFuture() <1> <3> <5>
+ .get(); <6>
----
-<1> Use `afterRequest` which happens **after** `requestN` is delivered to the source.
-In this modified example, since we now use `afterRequest`, instead of `beforeRequest`, we do not block `requestN` to
-go to the source and this code is safe.
+<1> `toFuture()` will do a `subscribe(Subscriber)`. This flows up the operator chain toward the source;
+`subscribeOn` (offload onto `subscribeExecutor` thread) -> `map` -> `Range`.
+
+<2> Still on a thread from `subscribeExecutor` `Range` will call `Subscriber.onSubscribe(Subscription)` on the
+`Subscriber`. This flows back down the operator chain, `Range` -> `map` -> `subscribeOn` -> `toFuture`.
+
+<3> Still on the`subscribeExecutor` thread, `toFuture()`'s `onSubscribe(Subscription)` call
+`Subscription.request(Long.MAX_VALUE)`. This flows up the operator chain, `subscribeOn` (offloads again onto another
+`subscribeExecutor` thread) -> `map` -> `Range`.
+
+<4> Still on thread from the second offload to `subscribeExecutor`, `Range` ` will publish synchronously via
+`onNext(element)` nine items, the integers "`1`" through "`9`". Each `onNext` flows back down the operator chain,
+`Range` -> `map` -> `subscribeOn` -> `toFuture` where the element is collected for the result.
+
+<5> Still on thread from the second offload to `subscribeExecutor`, after all items, `Range` will call `onComplete`.
+When the `toFuture()` `Subscriber` receives the `onComplete()` signal it will complete the `Future` with the integer
+collection result.
+
+<6> The calling thread will wait at `get()` for the `Future` result to be asynchronously completed.
+
+=== publishOn()/subscribeOn() Detailed Example
+
+These examples can be expanded to demonstrate the offloading behavior directly. The expanded example extends the NO-OP
+`map` implementations to reveal the active thread during their execution. To show the active thread at the other
+points described in the callouts the expanded example also adds `whenOnSubscribe`, `whenRequest`, `liftSync` and
+`whenFinally` operations in the operator chain. The output of the example shows the thread used for executing each of
+the operators, while the specialized operators provide examples of how you might use them to debug your own programs.
+
+[source, java]
+----
+Collection> result = Publisher.range(1, 3)
+ .map(element -> {
+ System.out.println("\nPublish starts on " + Thread.currentThread() + " Received : " + element);
+ return element;
+ })
+ .whenOnSubscribe(subscription -> {
+ System.out.println("\nonSubscribe starts on " + Thread.currentThread());
+ })
+ .publishOn(publishExecutor)
+ .map(element -> {
+ System.out.println("\nPublish offloaded to " + Thread.currentThread() + " Received : " + element);
+ return element;
+ })
+ .whenRequest(request -> {
+ System.out.println("\nrequest(" + request + ") offloaded to " + Thread.currentThread());
+ })
+ .liftSync(subscriber -> {
+ System.out.println("\nSubscribe offloaded to " + Thread.currentThread());
+ return subscriber;
+ })
+ .subscribeOn(subscribeExecutor)
+ .liftSync(subscriber -> {
+ System.out.println("\nSubscribe begins on " + Thread.currentThread());
+ return subscriber;
+ })
+ .whenOnSubscribe(subscription -> {
+ System.out.println("\nonSubscribe offloaded to " + Thread.currentThread());
+ })
+ .whenRequest(request -> {
+ System.out.println("\nrequest(" + request + ") starts on " + Thread.currentThread());
+ })
+ .whenFinally(new TerminalSignalConsumer() {
+ @Override
+ public void onComplete() {
+ System.out.println("\ncomplete on " + Thread.currentThread());
+ }
+
+ @Override
+ public void onError(final Throwable throwable) {
+ System.out.println("\nerror (" + throwable + ") on " + Thread.currentThread());
+ }
+
+ @Override
+ public void cancel() {
+ System.out.println("\ncancel on " + Thread.currentThread());
+ }
+ })
+ .toFuture()
+ .get();
+----
== Implementation
diff --git a/servicetalk-concurrent-api/docs/modules/ROOT/pages/pitfalls.adoc b/servicetalk-concurrent-api/docs/modules/ROOT/pages/pitfalls.adoc
new file mode 100644
index 0000000000..3f68cb2a10
--- /dev/null
+++ b/servicetalk-concurrent-api/docs/modules/ROOT/pages/pitfalls.adoc
@@ -0,0 +1,60 @@
+// Configure {source-root} values based on how this document is rendered: on GitHub or not
+ifdef::env-github[]
+:source-root:
+endif::[]
+ifndef::env-github[]
+ifndef::source-root[:source-root: https://github.com/apple/servicetalk/blob/{page-origin-refname}]
+endif::[]
+
+= Concurrency Pitfalls
+
+ServiceTalk sequences events in data and control path of processing as if they were done on the same thread. Since, data
+and control events may happen in parallel, there is always a chance for user code to deadlock if they are executed in
+sequence.
+
+[source, java]
+----
+ CountDownLatch latch = new CountDownLatch(1); # <1>
+ Publisher.from(1, 2, 3, 4)
+ .afterOnNext(integer -> {
+ latch.countDown(); # <2>
+ })
+ .beforeRequest(requestN -> {
+ latch.await(); # <3>
+ });
+----
+<1> Hypothetical synchronization point. In real life it may be due to the code waiting for an event to happen externally.
+<2> Trigger the external event (hypothetical synchronization point of `CountDownLatch`) **after** receiving the item.
+<3> Wait for the external event to happen (hypothetical synchronization point of `CountDownLatch`) **before** sending
+`requestN` to the `Publisher`.
+
+As per ReactiveStreams link:https://github.com/reactive-streams/reactive-streams-jvm#1.1[rule 1.1], request for items
+**MUST** happen before the items are delivered. In the above code, we are waiting for an item to be emitted before
+sending a request to the `Publisher`. This results in a deadlock as an item can not be emitted by the source without a
+request being received and user code making sure that the request is not sent before an item is emitted.
+
+In order to avoid such scenarios, it is handy to follow certain best practices while writing blocking code in operators:
+
+- Avoid coordination between two operators on the same source.
+- If such coordination is required, try limiting such coordination in either data or control path but not
+inter-dependent on each other.
+- If coordination is required between data and control path, be aware of ReactiveStreams semantics and how the two
+paths interact with each other.
+
+If these rules are followed the above example can be modified to:
+
+[source, java]
+----
+ CountDownLatch latch = new CountDownLatch(1);
+ Publisher.from(1, 2, 3, 4)
+ .afterOnNext(integer -> {
+ latch.countDown();
+ })
+ .afterRequest(requestN -> { # <1>
+ latch.await();
+ });
+----
+<1> Use `afterRequest` which happens **after** `requestN` is delivered to the source.
+
+In this modified example, since we now use `afterRequest`, instead of `beforeRequest`, we do not block `requestN` to
+go to the source and this code is safe.
diff --git a/servicetalk-concurrent-api/gradle/checkstyle/suppressions.xml b/servicetalk-concurrent-api/gradle/checkstyle/suppressions.xml
index 354fcebc8a..679ac02eaa 100644
--- a/servicetalk-concurrent-api/gradle/checkstyle/suppressions.xml
+++ b/servicetalk-concurrent-api/gradle/checkstyle/suppressions.xml
@@ -19,6 +19,8 @@
"https://checkstyle.org/dtds/suppressions_1_2.dtd">
+