Skip to content

Commit

Permalink
Merge f02bc50 into b83b5db
Browse files Browse the repository at this point in the history
  • Loading branch information
jponge authored Jan 3, 2022
2 parents b83b5db + f02bc50 commit 267f168
Show file tree
Hide file tree
Showing 72 changed files with 2,972 additions and 118 deletions.
1 change: 1 addition & 0 deletions documentation/src/main/jekyll/_data/categories.yml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
- emission-threads
- emit-subscription
- logging
- context-passing

- name: Integration
guides:
Expand Down
9 changes: 8 additions & 1 deletion documentation/src/main/jekyll/_data/guides.yml
Original file line number Diff line number Diff line change
Expand Up @@ -311,4 +311,11 @@ logging:
labels:
- beginner
related:
- guide: infrastructure
- guide: infrastructure

context-passing:
title: Context passing
text: Learn how to pass an implicit context between operators
labels:
- intermediate
- advanced
87 changes: 87 additions & 0 deletions documentation/src/main/jekyll/guides/context-passing.adoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
:page-layout: guides
:page-guide-id: context-passing
:page-show-toc: true
:page-liquid:
:include_dir: ../../../../src/test/java/guides

== Context passing

Mutiny reactive pipelines let data flow from publishers to subscribers.

In the vast majority of cases a publisher shall have _all_ required data, and operators shall perform processing based on item values.
For instance a network request shall be made with all request data known in advance, and response processing shall only depend on the response payload.

That being said there are cases were this is not sufficient, and some data has to be carried along with items.
For instance one intermediary operator in a pipeline may have to make another networked request from which we need to extract some correlation identifier which will be used by another operator down the pipeline.
In such cases one will be tempted to forward tuples consisting of some item value plus some "extra" data.

For such cases Mutiny offers a _subscriber-provided context_, so all operators involved in a subscription can share some form of _implicit data_.

=== What's in a context?

A context is a simple key / value, in-memory storage.
Data can be queried, added and deleted from a context, as shown in the following snippet:

[source,java,indent=0]
----
include::{include_dir}/ContextPassingTest.java[tags=contextManipulation]
----

`Context` objects are thread-safe, and can be created from sequences of key / value pairs (as shown above), from a Java `Map`, or they can be created empty.

Note that an empty-created context defers its internal storage allocation until the first call to `put`.
You can see `Context` as a glorified `ConcurrentHashMap` delegate, although this is an implementation detail and Mutiny might explore various internal storage strategies in the future.

[TIP]
====
Contexts shall be primarily used to share transient data used for networked I/O processing such as correlation identifiers, tokens, etc.
They should not be used as general-purpose data structures that are frequently updated and that hold large amounts of data.
====

=== How to access a context?

Given a `Uni` or a `Multi`, a context can be accessed using the `withContext` operator, as in:

[source,java,indent=0]
----
include::{include_dir}/ContextPassingTest.java[tags=contextSampleUsage]
----

This operator builds a sub-pipeline using 2 parameters: the current `Uni` or `Multi` and the context.

[IMPORTANT]
====
The function passed to `withContext` is called at subscription time.
This means that the context has not had a chance to be updated by upstream operators yet, so be careful with what you do in the body of that function.
====

There is another way to access the context by using the `attachContext` method:

[source,java,indent=0]
----
include::{include_dir}/ContextPassingTest.java[tags=contextAttachedSampleUsage]
----

This method materializes the context in the regular pipeline items using the wrapper `ItemWithContext` class.
The `get` method provides the item while the `context` method provides the context.

=== How to access a context at the pipeline source?

The `Uni` and `Multi` _builder_ methods like `Multi.createFrom()` provide publishers, not operators, so they don't have the `withContext` method.

The first option is to use the `Uni.createFrom().context(...)` or `Multi.createFrom().context(...)` general purpose method to materialize the context:

[source,java,indent=0]
----
include::{include_dir}/ContextPassingTest.java[tags=builderUsage]
----

The `context` method takes a function that accepts a `Context` and returns a pipeline.
This is very similar to the `deferred` builder.

If you use an `emitter` builder then for both `Uni` and `Multi` cases the emitter object offers a `context` method to access the context:

[source,java,indent=0]
----
include::{include_dir}/ContextPassingTest.java[tags=emitterUsage]
----
99 changes: 99 additions & 0 deletions documentation/src/test/java/guides/ContextPassingTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
package guides;

import io.smallrye.mutiny.Context;
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

public class ContextPassingTest {

void contextManipulation() {

// tag::contextManipulation[]
// Create a context using key / value pairs
Context context = Context.of(
"X-CUSTOMER-ID", "1234",
"X-SPAN-ID", "foo-bar-baz"
);

// Get an entry
System.out.println(
context.<String>get("X-SPAN-ID"));

// Get an entry, use a supplier for a default value if the key is not present
System.out.println(
context.getOrElse("X-SPAN-ID", () -> "<no id>"));

// Add an entry
context.put("foo", "bar");

// Remove an entry
context.delete("foo");
// end::contextManipulation[]
}

@Test
void sampleUsage() {
Multi<Integer> pipeline = Multi.createFrom().range(1, 10);
String customerId = "1234";

// tag::contextSampleUsage[]
Context context = Context.of("X-CUSTOMER-ID", customerId);

pipeline.withContext((multi, ctx) -> multi.onItem().transformToUniAndMerge(item -> makeRequest(item, ctx.get("X-CUSTOMER-ID"))))
.subscribe().with(context, item -> handleResponse(item), err -> handleFailure(err));
// end::contextSampleUsage[]
}

@Test
void sampleUsageAttachedContext() {
Multi<Integer> pipeline = Multi.createFrom().range(1, 10);
String customerId = "1234";

// tag::contextAttachedSampleUsage[]
Context context = Context.of("X-CUSTOMER-ID", customerId);

pipeline.attachContext()
.onItem().transformToUniAndMerge(item -> makeRequest(item.get(), item.context().get("X-CUSTOMER-ID")))
.subscribe().with(context, item -> handleResponse(item), err -> handleFailure(err));
// end::contextAttachedSampleUsage[]
}

@Test
void builderUsage() {
Context context = Context.of("X-SPAN-ID", "1234");

// tag::builderUsage[]
Uni.createFrom().context(ctx -> makeRequest("db1", ctx.get("X-SPAN-ID")))
.subscribe().with(context, item -> handleResponse(item), err -> handleFailure(err));
// end::builderUsage[]
}

@Test
void emitterUsage() {
Context context = Context.of("X-SPAN-ID", "1234");

// tag::emitterUsage[]
Multi.createFrom().emitter(emitter -> {
String customerId = emitter.context().get("X-SPAN-ID");
for (int i = 0; i < 10; i++) {
emitter.emit("@" + i + " [" + customerId + "]");
}
emitter.complete();
});
// end::emitterUsage[]
}

private void handleFailure(Throwable err) {
Assertions.fail(err);
}

private void handleResponse(String item) {
Assertions.assertTrue(item.endsWith("::1234"));
}

private Uni<String> makeRequest(Object item, Object o) {
return Uni.createFrom().item(item + "::" + o);
}
}
45 changes: 44 additions & 1 deletion implementation/revapi.json
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,50 @@
"criticality" : "highlight",
"minSeverity" : "POTENTIALLY_BREAKING",
"minCriticality" : "documented",
"differences" : [ ]
"differences" : [
{
"ignore": true,
"code": "java.method.numberOfParametersChanged",
"old": "method void io.smallrye.mutiny.groups.UniAwait<T>::<init>(io.smallrye.mutiny.Uni<T>)",
"new": "method void io.smallrye.mutiny.groups.UniAwait<T>::<init>(io.smallrye.mutiny.Uni<T>, io.smallrye.mutiny.Context)",
"justification": "Private API impacted by the new Mutiny context support"
},
{
"ignore": true,
"code": "java.method.numberOfParametersChanged",
"old": "method void io.smallrye.mutiny.groups.UniAwaitOptional<T>::<init>(io.smallrye.mutiny.Uni<T>)",
"new": "method void io.smallrye.mutiny.groups.UniAwaitOptional<T>::<init>(io.smallrye.mutiny.Uni<T>, io.smallrye.mutiny.Context)",
"justification": "Private API impacted by the new Mutiny context support"
},
{
"ignore": true,
"code": "java.method.numberOfParametersChanged",
"old": "method void io.smallrye.mutiny.helpers.BlockingIterable<T>::<init>(org.reactivestreams.Publisher<? extends T>, int, java.util.function.Supplier<java.util.Queue<T>>)",
"new": "method void io.smallrye.mutiny.helpers.BlockingIterable<T>::<init>(io.smallrye.mutiny.Multi<? extends T>, int, java.util.function.Supplier<java.util.Queue<T>>, java.util.function.Supplier<io.smallrye.mutiny.Context>)",
"justification": "Private API impacted by the new Mutiny context support"
},
{
"ignore": true,
"code": "java.method.numberOfParametersChanged",
"old": "method void io.smallrye.mutiny.helpers.UniCallbackSubscriber<T>::<init>(java.util.function.Consumer<? super T>, java.util.function.Consumer<? super java.lang.Throwable>)",
"new": "method void io.smallrye.mutiny.helpers.UniCallbackSubscriber<T>::<init>(java.util.function.Consumer<? super T>, java.util.function.Consumer<? super java.lang.Throwable>, io.smallrye.mutiny.Context)",
"justification": "Private API impacted by the new Mutiny context support"
},
{
"ignore": true,
"code": "java.method.numberOfParametersChanged",
"old": "method void io.smallrye.mutiny.subscription.Subscribers.CallbackBasedSubscriber<T>::<init>(java.util.function.Consumer<? super T>, java.util.function.Consumer<? super java.lang.Throwable>, java.lang.Runnable, java.util.function.Consumer<? super org.reactivestreams.Subscription>)",
"new": "method void io.smallrye.mutiny.subscription.Subscribers.CallbackBasedSubscriber<T>::<init>(io.smallrye.mutiny.Context, java.util.function.Consumer<? super T>, java.util.function.Consumer<? super java.lang.Throwable>, java.lang.Runnable, java.util.function.Consumer<? super org.reactivestreams.Subscription>)",
"justification": "Private API impacted by the new Mutiny context support"
},
{
"ignore": true,
"code": "java.method.numberOfParametersChanged",
"old": "method <T> io.smallrye.mutiny.subscription.CancellableSubscriber<T> io.smallrye.mutiny.subscription.Subscribers::from(java.util.function.Consumer<? super T>, java.util.function.Consumer<? super java.lang.Throwable>, java.lang.Runnable, java.util.function.Consumer<? super org.reactivestreams.Subscription>)",
"new": "method <T> io.smallrye.mutiny.subscription.CancellableSubscriber<T> io.smallrye.mutiny.subscription.Subscribers::from(io.smallrye.mutiny.Context, java.util.function.Consumer<? super T>, java.util.function.Consumer<? super java.lang.Throwable>, java.lang.Runnable, java.util.function.Consumer<? super org.reactivestreams.Subscription>)",
"justification": "Private API impacted by the new Mutiny context support"
}
]
}
}, {
"extension" : "revapi.reporter.json",
Expand Down
Loading

0 comments on commit 267f168

Please sign in to comment.