-
Notifications
You must be signed in to change notification settings - Fork 8
Invokable Promise
Reakt supports invokable promises so service methods could return a Promise.
employeeService.lookupEmployee("123")
.then((employee)-> {...}).catchError(...).invoke();
Let's walk through an example.
Let's say we are developing a service discovery service with this interface.
interface ServiceDiscovery {
Promise<URI> lookupService(URI uri);
default void shutdown() {}
default void start() {}
}
Note that ServiceDiscovery.lookupService
is just an example like EmployeeService.lookupEmployee
was just
an example.
Notice the lookupService
returns a Promise
(Promise<URI> lookupService(URI uri)
).
To call this, we can use an invokable promise. The service side will return an invokable promise,
and the client of the service will use that Promise
to register its then
, thenExpect
, thenMap
,
and/or catchError
handlers.
Here is the service side of the invokable promise example.
class ServiceDiscoveryImpl implements ServiceDiscovery {
@Override
public Promise<URI> lookupService(URI uri) {
return invokablePromise(promise -> {
if (uri == null) {
promise.reject("URI was null");
} else {
...
// DO SOME ASYNC OPERATION WHEN IT RETURNS CALL RESOLVE.
promise.resolve(successResult);
}
});
}
}
Notice to create the invokablePromise
we use Promises.invokablePromise
which takes a Promise
Consumer
(static <T> Promise<T> invokablePromise(Consumer<Promise<T>> promiseConsumer)
).
The lookupService
example returns a Promise and it does not execute its body until the invoke
(Promise.invoke
) on the client side of the equation is called.
Let's look at the client side.
serviceDiscovery.lookupService(empURI)
.then(this::handleSuccess)
.catchError(this::handleError)
.invoke();
The syntax this::handleSuccess
is a method reference which can be used as Java 8 lambda expressions.
We use method references to make the example shorter.
Here is a complete example with two versions of our example service.
package io.advantageous.reakt.promise;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.net.URI;
import java.util.Queue;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import static io.advantageous.reakt.promise.Promises.*;
import static org.junit.Assert.*;
public class InvokablePromise {
final URI successResult = URI.create("http://localhost:8080/employeeService/");
ServiceDiscovery serviceDiscovery;
ServiceDiscovery asyncServiceDiscovery;
URI empURI;
CountDownLatch latch;
AtomicReference<URI> returnValue;
AtomicReference<Throwable> errorRef;
@Before
public void before() {
latch = new CountDownLatch(1);
returnValue = new AtomicReference<>();
errorRef = new AtomicReference<>();
serviceDiscovery = new ServiceDiscoveryImpl();
asyncServiceDiscovery = new ServiceDiscoveryAsyncImpl();
asyncServiceDiscovery.start();
empURI = URI.create("marathon://default/employeeService?env=staging");
}
@After
public void after() {
asyncServiceDiscovery.shutdown();
}
public void await() {
try {
latch.await(10, TimeUnit.SECONDS);
} catch (InterruptedException e) {
throw new IllegalStateException(e);
}
}
@Test
public void testServiceWithReturnPromiseSuccess() {
serviceDiscovery.lookupService(empURI).then(this::handleSuccess)
.catchError(this::handleError).invoke();
await();
assertNotNull("We have a return", returnValue.get());
assertNull("There were no errors", errorRef.get());
assertEquals("The result is the expected result", successResult, returnValue.get());
}
@Test
public void testServiceWithReturnPromiseFail() {
serviceDiscovery.lookupService(null).then(this::handleSuccess)
.catchError(this::handleError).invoke();
await();
assertNull("We do not have a return", returnValue.get());
assertNotNull("There were errors", errorRef.get());
}
@Test
public void testAsyncServiceWithReturnPromiseSuccess() {
asyncServiceDiscovery.lookupService(empURI).then(this::handleSuccess)
.catchError(this::handleError).invoke();
await();
assertNotNull("We have a return from async", returnValue.get());
assertNull("There were no errors form async", errorRef.get());
assertEquals("The result is the expected result form async", successResult, returnValue.get());
}
@Test
public void testAsyncServiceWithReturnPromiseFail() {
asyncServiceDiscovery.lookupService(null).then(this::handleSuccess)
.catchError(this::handleError).invoke();
await();
assertNull("We do not have a return from async", returnValue.get());
assertNotNull("There were errors from async", errorRef.get());
}
@Test (expected = IllegalStateException.class)
public void testServiceWithReturnPromiseSuccessInvokeTwice() {
final Promise<URI> promise = serviceDiscovery.lookupService(empURI).then(this::handleSuccess)
.catchError(this::handleError);
promise.invoke();
promise.invoke();
}
@Test
public void testIsInvokable() {
final Promise<URI> promise = serviceDiscovery.lookupService(empURI).then(this::handleSuccess)
.catchError(this::handleError);
assertTrue("Is this an invokable promise", promise.isInvokable());
}
private void handleError(Throwable error) {
errorRef.set(error);
latch.countDown();
}
private void handleSuccess(URI uri) {
returnValue.set(uri);
latch.countDown();
}
interface ServiceDiscovery {
Promise<URI> lookupService(URI uri);
default void shutdown() {
}
default void start() {
}
}
class ServiceDiscoveryImpl implements ServiceDiscovery {
@Override
public Promise<URI> lookupService(URI uri) {
return invokablePromise(promise -> {
if (uri == null) {
promise.reject("URI was null");
} else {
promise.resolve(successResult);
}
});
}
}
class ServiceDiscoveryAsyncImpl implements ServiceDiscovery {
final ExecutorService executorService;
final Queue<Runnable> runnables;
final AtomicBoolean stop;
public ServiceDiscoveryAsyncImpl() {
executorService = Executors.newSingleThreadExecutor();
runnables = new LinkedTransferQueue<>();
stop = new AtomicBoolean();
}
@Override
public Promise<URI> lookupService(URI uri) {
return invokablePromise(promise -> {
runnables.offer(() -> {
if (uri == null) {
promise.reject("URI was null");
} else {
promise.resolve(URI.create("http://localhost:8080/employeeService/"));
}
});
});
}
@Override
public void shutdown() {
stop.set(true);
executorService.shutdown();
}
@Override
public void start() {
executorService.submit((Runnable) () -> {
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
while (true) {
if (stop.get())break;
Runnable runnable = runnables.poll();
while (runnable != null) {
runnable.run();
runnable = runnables.poll();
}
}
});
}
}
}
Java Promises
- Promise
- Promise then*() and catchError()
- Promise thenMap()
- Promise all()
- Promise any()
- Blocking Promise
- Invokable Promise
- Reactor Replay Promises
Reactor, Stream, Results
- QBit Reactive Microservices
- Reakt Reactive Java
- Reakt Guava Bridge
- QBit Extensions
- Elekt Consul Leadership election
- Elekt Leadership election
- Reactive Microservices
What is Microservices Architecture?
QBit Java Micorservices lib tutorials
The Java microservice lib. QBit is a reactive programming lib for building microservices - JSON, HTTP, WebSocket, and REST. QBit uses reactive programming to build elastic REST, and WebSockets based cloud friendly, web services. SOA evolved for mobile and cloud. ServiceDiscovery, Health, reactive StatService, events, Java idiomatic reactive programming for Microservices.
Reactive Programming, Java Microservices, Rick Hightower
Java Microservices Architecture
[Microservice Service Discovery with Consul] (http://www.mammatustech.com/Microservice-Service-Discovery-with-Consul)
Microservices Service Discovery Tutorial with Consul
[Reactive Microservices] (http://www.mammatustech.com/reactive-microservices)
[High Speed Microservices] (http://www.mammatustech.com/high-speed-microservices)
Reactive Microservices Tutorial, using the Reactor
QBit is mentioned in the Restlet blog
All code is written using JetBrains Idea - the best IDE ever!
Kafka training, Kafka consulting, Cassandra training, Cassandra consulting, Spark training, Spark consulting
Java Promises
- Promise
- Promise then*() and catchError()
- Promise thenMap()
- Promise all()
- Promise any()
- Blocking Promise
- Invokable Promise
- Reactor Replay Promises
Callback, and async Results
Reactor, Stream and Stream Result
Expected & Circuit Breaker
Scala Akka and Reakt