- Reactive Microservices and DevOps Pipelines on Kubernetes
- Overview
- 1. OpenShift and Kubernetes Overview
- 1.1. Containers
- 1.2. Master
- 1.3. Nodes
- 1.4. FluentD
- 1.5. Services
- 1.6. Pods
- 1.7. OpenShift Extensions To Kubernetes
- 1.7.1. Container Registry (OpenShift Only)
- 1.7.2. Web Console
- 1.7.3. Logging (EFK)
- 1.7.4. Software Defined Networking
- 1.7.5. Multitenancy
- 1.7.6. Container Native Storage
- 1.7.7. Routes
- 1.7.8. Templates
- 1.7.9. Source-2-Image & Build Configs
- 1.7.10. Projects
- 1.7.11. Security
- 1.7.12. Metrics & Monitoring (Prometheus/Hawkular/Heapster)
- 2. Let’s Collaborate And Be Social!
- 3. Exercise 1: Load A Complete DevOps Environment In Kubernetes
- 4. Exercise 2: Creating A New Vert.x Project
- 5. Create A Jenkins Pipeline
- 6. Basic Vert.x Concepts
- 7. Implement Kubernetes Config
- 8. Configure Vert.x Clustering With Infinispan/JGroups
- 9. Implement A New Service Proxy
- 10. Implement REST API
- 11. Quality Analysis And Project Health Using SonarQube
- 12. Security Analysis Using OWASP Zed Attack Proxy
- 13. Vulnerability Analysis Using OWASP Dependency Check
- References
- Glossary
- Appendix A: Appendix
- 1. OpenShift and Kubernetes Overview
(c) 2018 Red Hat, Inc. - All Rights Reserved
This document is designed by Deven Phillips, Jeremy Davis, and Ram Maddali.
Microservices and Containers have changed the entire landscape of software development in the last few years. We are now able to decompose our development work into smaller and more digestible components which are easier to understand and easier to split amongst developer teams for better parallel workflow.
Just as Linux has pretty much won the hearts and minds of most developers for the operating system of the cloud, Kubernetes has become the de facto standard for orchestrating containers at scale.
In this workshop, you will learn how to leverage Containers and Kubernetes to build a productive DevOps workflow using Kubernetes (Wrapped by OpenShift) to build, test, deploy, and validate microservices quickly and reliably.
Some of the tools we will leverage in this workshop are listed below:
-
Vert.x - A toolkit for developing reactive applications on the JVM
-
Kubernetes (In the form of OpenShift Container Platform 3.9)
-
Infinispan A distributed in-memory key/value data store
-
JGroups In conjunction with Infinispan, a cluster discovery system for Vert.x
-
Jenkins (For continuous integration and continuous delivery)
-
SonarQube (For code quality analysis)
-
Sonatype Nexus (For an artifact repository)
-
OWASP Zed Attack Proxy (For security analysis of web applications)
-
Java - A modern and high-performance programming language
-
Spock Framework A testing and specification framework for Java and Groovy applications
-
Node.js - A programming language well suited to quickly develop applications
-
Spring Boot An alternative to writing applications using JavaEE
-
HoverFly A service virtualization tool which simplifies testing Microservices
Kubernetes (and by extension OpenShift) is a system, based on microservices, for orchestrating containerized applications across one or more hosts. Just as the kernel schedules work in an operating system, the orchestrator schedules work in a cluster. Continuing with that analogy, kernels arbitrate access to resources like networking and disk, Kubernetes (a.k.a. K8s) provides access to software-defined networks and storage.
Beyond the OS analogy though, K8s (and even more so OpenShift) can be a platform which abstracts some of the tedium of managing applications away for developers and administrators. The real power of K8s is in allowing for centralized and automated management of things like network controls, encryption, auditing, monitoring, logging, etc… Moving these functions to the platform simplifies the writing, understanding, and debugging of software.
Containers are a method of operating system virtualization that allow you to run an application and its dependencies in resource-isolated processes. Containers allow you to easily package an application’s code, configurations, and dependencies into easy to use building blocks that deliver environmental consistency, operational efficiency, developer productivity, and version control. Containers can help ensure that applications deploy quickly, reliably, and consistently regardless of deployment environment. Containers also give you more granular control over resources giving your infrastructure improved efficiency. [1]
The master is responsible for exposing the application program interface (API), scheduling the deployments and managing the overall cluster. Each node runs a container runtime, such as Docker or rkt, along with an agent that communicates with the master. The node also runs additional components for logging, monitoring, service discovery and optional add-ons. Nodes are the workhorses of a Kubernetes cluster. They expose compute, networking and storage resources to applications. Nodes can be virtual machines (VMs) running in a cloud or bare metal servers running within the data center. [2]
A node provides the runtime environments for containers. Each node in a Kubernetes cluster has the required services to be managed by the master. Nodes also have the required services to run pods, including the Docker service, a kubelet, and a service proxy. [3]
Each node has a kubelet that updates the node as specified by a container manifest, which is a YAML file that describes a pod. The kubelet uses a set of manifests to ensure that its containers are started and that they continue to run.
Each node also runs a simple network proxy that reflects the services defined in the API on that node. This allows the node to do simple TCP and UDP stream forwarding across a set of back ends.
The FluentD agent allows for streaming of container output into the logging backend of your choice (EFK in OpenShift).
A Kubernetes service serves as an internal load balancer. It identifies a set of replicated pods in order to proxy the connections it receives to them. Backing pods can be added to or removed from a service arbitrarily while the service remains consistently available, enabling anything that depends on the service to refer to it at a consistent address. The default service clusterIP addresses are from the OpenShift Container Platform internal network and they are used to permit pods to access each other.
The Kubernetes concept of a pod, which is one or more containers deployed together on one host, and the smallest compute unit that can be defined, deployed, and managed. Pods are the rough equivalent of a machine instance (physical or virtual) to a container. Each pod is allocated its own internal IP address, therefore owning its entire port space, and containers within pods can share their local storage and networking.
OpenShift Container Platform provides an integrated container registry called OpenShift Container Registry (OCR) that adds the ability to automatically provision new image repositories on demand. This provides users with a built-in location for their application builds to push the resulting images.
The OpenShift Container Platform web console is a user interface accessible from a web browser. Developers can use the web console to visualize, browse, and manage the contents of projects.
OpenShift deploys a complete EFK (ElasticSearch, FluentD, Kibana) stack which is used to aggregate logs and metrics from each Pod. Output from containers can be kept according to the configured preferences.
OpenShift Container Platform uses a software-defined networking (SDN) approach to provide a unified cluster network that enables communication between pods across the OpenShift Container Platform cluster. This pod network is established and maintained by the OpenShift SDN, which configures an overlay network using Open vSwitch (OVS).
The ovs-multitenant plug-in provides project-level isolation for pods and services. Each project receives a unique Virtual Network ID (VNID) that identifies traffic from pods assigned to the project. Pods from different projects cannot send packets to or receive packets from pods and services of a different project.
Container-Native Storage for OpenShift Container Platform is built around three key technologies:
-
OpenShift provides the platform as a service (PaaS) infrastructure based on Kubernetes container management. Basic OpenShift architecture is built around multiple master systems where each system contains a set of nodes.
-
Red Hat Gluster Storage provides the containerized distributed storage based on Red Hat Gluster Storage 3.1.3 container. Each Red Hat Gluster Storage volume is composed of a collection of bricks, where each brick is the combination of a node and an export directory.
-
Heketi provides the Red Hat Gluster Storage volume life cycle management. It creates the Red Hat Gluster Storage volumes dynamically and supports multiple Red Hat Gluster Storage clusters.
Routes allow OpenShift Container Platform route exposes a service at a host name, like www.example.com
, so that
external clients can reach it by name.
A template describes a set of objects that can be parameterized and processed to produce a list of objects for creation by OpenShift Container Platform. The objects to create can include anything that users have permission to create within a project, for example services, build configurations, and deployment configurations. A template may also define a set of labels to apply to every object defined in the template.
Source-2-Image (S2I) is a concept in OpenShift which allows the Master node in the cluster to accept a GIT URL, clone the
repository into a builder pod, inspect the contents and detect the type of project, then use the detected project
type to automatically build that project into a container. For example, a Java project with a Maven POM file will be
built using mvn package
and the resulting JAR will be used configured to be the entrypoint for that container.
Build Configs in OpenShift allow us to go several steps further than S2I for more complex requirements. A build config
can customize how an S2I build is executed by specifying a strategy
and additional parameters like the source
container or secrets which might be needed for the build.
Finally, build configs which use the pipeline
strategy can be automatically wired into a Jenkins deployment running
inside of the OpenShift cluster. The project will then be built via a Jenkinsfile (usually in the root of the project).
The Jenkinsfile and pipeline strategy provide us nearly limitless control over how our builds are managed.
A project is a Kubernetes namespace with additional annotations, and is the central vehicle by which access to resources for regular users is managed. A project allows a community of users to organize and manage their content in isolation from other communities. Users must be given access to projects by administrators, or if allowed to create projects, automatically have access to their own projects.
OpenShift implements a number of security enhancements over stock Kubernetes.
-
Default to non-root containers
-
Network isolation via Software-Defined Networking
-
Improved Roles, Groups, User management
-
Integration with Certificate Authorities and Identity Management
OpenShift can be deployed along with Prometheus monitoring git stand Hawkular metrics tracking. When deployed these services provide continuous monitoring of basic application information like latency, CPU utilization, and memory consumption. Developers can also take advantage of these services to publish their own customized metrics to the services from their applications using the Hawkular/Prometheus APIs.
We have a created a Gitter ORG for this workshop. Anything which comes up during the workshop which we need to share will all of the participants will be pasted into those channels.
Note
|
While some of the process we will show here is specific to OpenShift, all of the same sorts of capabilities can be achieved using Helm in stock Kubernetes (Without the nice UI) From OpenShift Templates To Helm Charts |
-
A functional BASH shell
-
On Windows, this CAN be Windows Subsystem for Linux (WFL)
-
-
Ansible >= 2.5
-
On *nix systems, this can be installed via Python PIP
-
On Windows systems, a working Docker installation can be used
-
-
VirtualBox
-
>= 16GB RAM
-
>= Quad-core/Hyperthreaded CPU
You can always download the latest OpenShift CLI tools from Github HERE
-
Download the correct release for your platform
-
Extract the binary executable from the archive file
-
Place the
oc
command somewhere that it can be added to your executable PATH -
Update your PATH to include the location of the
oc
command-
On *NIX systems, you can update your .<shell>rc files to add the location for
oc
-
On Windows systems, it is recommended to extract the archive into your Documents directory and add that folder to your PATH
-
To simplify the process of getting all of the projects, we have created a Git project with Submodules. You can checkout the whole thing by running:
git clone https://github.com/rhoar-qcon-2018/rhoar-kubernetes-qcon-2018.git --recurse-submodules
cd infrastructure-as-code
export OPENSHIFT_USERNAME='<OpenShift Username>'
export OPENSHIFT_PASSWORD='<OpenShift Password>'
./run-remote.sh
In order to run minishift locally, you will need a Red Hat Developers account. You can get one by going HERE.
cd infrastructure-as-code
export RED_HAT_DEVELOPERS_USERNAME=<Your RHD Username>
export RED_HAT_DEVELOPERS_PASSWORD=<Your RHD Password>
./run-locally.sh
-
Ensure that you have Apache Maven >= 3.3.9
-
Go to https://github.com/rhoar-qcon-2018/insult-service and FORK that project
-
Clone that FORK of the project locally `git clone https://github.com/<YOUR USER>/insult-service.git
Note
|
You CAN use the Vert.x Maven Plugin to create a project from scratch. If you are interested in this, please refer to the Appendix Create A New Vert.x Project Using The Vert.x Maven Plugin |
Vert.x comes with a JUnit-compatible library for doing unit testing called vertx-unit
. A Open Innovation Labs, we
prefer Behavior Driven Development (BDD), so for this project we will use
SpockFramework. In order to use Spock, we need a few dependencies which have already been added in the skeleton:
-
Groovy - For writing/running test specifications
-
Spock Framework - For BDD
-
Objenesis - For Mocking
-
ByteBuddy - For Mocking Static/Final
We will also need to add the GMavenPlus plugin and configure the Maven SureFire plugin to be able to run the Spock tests. This is already configured in your POM from the skeleton project.
With the latest versions of Jenkins, the new declarative pipeline syntax has become the preferred way in which to define DevOps pipelines. The skeleton project you checked out earlier has a very complete Jenkins pipeline which we will discuss in detail as we go along.
Part of our Ansible automation which was run this morning created a template for pipeline builds. If you go into your CI/CD project and click on Add to project → Select from project in the top right corner of the OpenShift web console.
This will pop up a dialog which will allow you to choose the template which was created by our earlier automation.
Selecting the template and clicking Next will show us a description of the template.
Clicking Next again will prompt us to fill in the details about the project which we would like to build via a pipeline.
Clicking Create will create a new Build Config in OpenShift with a pipeline strategy.
Finally, clicking Close will return you to the overview of the CI/CD project. From there, you can navigate to Builds → Pipelines to see your new pipeline build config.
Clicking on the insult-service-pipeline will bring us to the details page for the pipeline build.
Selecting Edit from the Actions menu will allow us to modify the pipeline configuration.
From this screen, click on the Show advanced options link so that we can add a GitHub WebHook for our service.
Click on the Create New WebHook Secret link to create a new token for use with the webhook.
And now we can save this change and see the WebHook URL which we can then use in GitHub to automatically trigger builds when we commit code to the master branch.
Open a web browser and navigate to your project on GitHub.
Every time a change gets pushed to master or a pull request gets merged to master, GitHub will fire this webhook and thus trigger a Jenkins pipeline build in OpenShift.
The Vert.x Core Documentation is a really great reference to some of the basic concepts in Vert.x. We’ll cover a few of these things here, but please feel free to go to the official docs for more in-depth information.
Vert.x implements a fluent SPI. This means that for most Vert.x components, you can chain calls together in a nicely readable manner.
vertx.eventBus()
.consumer("some-address")
.toObservable()
.doOnError(this::errorHandler)
.subscribe(this::messageHandler);
Another core concept of Vert.x is that everything which is done in a Verticle should be done in a non-blocking way. To support this, Vert.x provides non-blocking implementations of many common operations such as:
-
File I/O
-
Network I/O
-
Database Access
-
Message Queues
-
HTTP Clients/Servers
-
Authentication/Authorization/Audit (AAA)
-
Metrics
From the new project we generated via Maven, we can see that a class called MainVerticle
was created.
Verticles are the basic unit of an application in Vert.x. By default,
Verticles are run single-threaded on an event loop (Reactor Pattern). The one difference between this and other Reactor
Pattern implementations you may have seen before is that Vert.x runs MULTIPLE event loops in parallel, calling it
Multi-Reactor.
The basic contents of a Vertical are a class definition and a start
method, as shown here:
package com.redhat.qcon;
import io.vertx.core.AbstractVerticle;
import io.vertx.core.Future;
public class MainVerticle extends AbstractVerticle {
@Override
public void start(Future<Void> startFuture) {
startFuture.complete(); // Called once the Vertical is ready
}
}
Because Vert.x uses event loops for Verticles, we must always ensure that we do not call blocking code and thus block
the event loop. Since Vert.x does not have non-blocking APIs for every situation, it provides a method of
implementing traditional blocking Java code using the vertx.executeBlocking
method. For example, if we wanted to make a
call via JNDI to look up something in an LDAP directory, we
might do something like:
vertx.executeBlocking(future -> {
// Make our JNDI calls here!
future.complete(result);
}, result -> {
// Handle the results of the blocking operation once it completes.
});
The final concept we should introduce for Vert.x is the Event Bus. Since all of the Verticles are implemented to run single-threaded and potentially across multiple threads/cores in parallel, we need a safe way to share data which will not cause race conditions or concurrency problems. To facilitate this, Vert.x has an Event Bus through which we can send/receive messages between Verticles. A simple example of using the event bus might look like:
// Create a consumer and reply when we get PING messages
vertx.eventBus()
.consumer("ping-timer")
.toFlowable()
.doOnEach(m -> System.out.println(m.getValue().body()))
.subscribe(m -> m.reply(new JsonObject().put("action", "PONG")));
// Set a period timer to send a "PING" message every 300 milliseconds
vertx.timerStream(300)
.toObservable()
.map(t -> new JsonObject().put("action", "PING"))
.subscribe(ping -> vertx.eventBus()
.rxSend("ping-timer", ping)
.subscribe(m -> System.out.println(m.body())));
Following one of the tenets of 12 Factor Applications, we will want to store our application’s configuration in the deployment environment instead of in our code. Vert.x makes this somewhat painless by providing a comprehensive set of APIs for loading the application’s configuration. In our case, since we are deploying to Kubernetes, we will use Kubernetes ConfigMaps for our configuration. This feature is already implemented in our skeleton project which we cloned earlier. You can look at the code in the
Another best practice is that we should practice "test first" development. To further that concept, let’s start by writing a failing test for the feature we intend to implement.
Inside of the insult-service
project, create the directory path src/test/groovy/com/redhat/qcon
. Inside of that
directory, we will create the following file.
package com.redhat.qcon
import io.vertx.core.Future
import io.vertx.core.Vertx
import spock.lang.Specification
import spock.util.concurrent.AsyncConditions
class MainVerticleSpec extends Specification {
def 'Test Vert.x configuration loading'() {
given: 'An instance of Vert.x' // // (1)
def vertx = Vertx.vertx()
and: 'An instance of a Vert.x Future' // // (2)
def fut = Future.future()
and: '''An instance of Spock's AsyncConditions'''
def async = new AsyncConditions(1) // // (4)
when: 'We attempt to deploy the main Verticle' // // (3)
vertx.deployVerticle(new MainVerticle(), fut.completer())
then: 'Expect that the correct configuration is found and loaded'
fut.setHandler({ res ->
async.evaluate { // // (5)
def config = vertx.getOrCreateContext().config()
assert res.succeeded() // // (6)
assert config.hasProperty('insult') // // (7)
assert config.hasProperty('adjective') // // (8)
assert config.hasProperty('http') // // (9)
}
})
cleanup: 'Await the async operations' // // (10)
async.await(3600)
vertx.close()
}
}
-
Set our starting conditions. In this case, we need a running Vert.x instance
-
Using the
and
block, we can specify additionalgiven
,when
, orthen
conditions -
Use the
when
block to call the code under test -
The
AsyncConditions
class is provided by Spock to allow us to check for one or more asynchronous events -
Use the
async.evaluate
to tell Spock that we are waiting for an asynchronous operation -
Check to ensure that the future completed successfully
-
Check to ensure that the config contains a
noun
property -
Check to ensure that the config contains a
adjective
property -
Check to ensure that the config contains a
http
property -
Tell Spock to wait
10
seconds for the async operations to complete
Spock tests are written using a format known as Gherkin. Gherkin formats tests as given-when-then. Spock also has a format for writing data-driven tests which we will use and explain later.
Now that we have written our test, here’s how I would implement the feature code. Inside of the src/main/java
directory
of our insult-service
project; w e’ll create a new package directory src/main/java/com/redhat/qcon/insult/services
.
package com.redhat.qcon;
import io.reactivex.Maybe;
import io.vertx.config.ConfigRetrieverOptions;
import io.vertx.config.ConfigStoreOptions;
import io.vertx.core.Future;
import io.vertx.core.json.JsonObject;
import io.vertx.reactivex.config.ConfigRetriever;
import io.vertx.reactivex.core.AbstractVerticle;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class MainVerticle extends AbstractVerticle {
private static final Logger LOG = LoggerFactory.getLogger(MainVerticle.class);
Maybe<JsonObject> initConfigRetriever() {
// Load the default configuration from the classpath
LOG.info("Configuration store loading.");
ConfigStoreOptions defaultOpts = new ConfigStoreOptions()
.setType("file")
.setFormat("json")
.setConfig(new JsonObject().put("path", "insult_default_config.json"));
// Load container specific configuration from a specific file path inside of the
// container
ConfigStoreOptions localConfig = new ConfigStoreOptions()
.setType("file")
.setFormat("json")
.setConfig(new JsonObject().put("path", "/opt/docker_config.json"))
.setOptional(true);
// When running inside of Kubernetes, configure the application to also load
// from a ConfigMap. This config is ONLY loaded when running inside of
// Kubernetes or OpenShift
ConfigStoreOptions confOpts = new ConfigStoreOptions()
.setType("configmap")
.setConfig(new JsonObject()
.put("name", "insult-config")
.put("optional", true)
);
// Add the default and container config options into the ConfigRetriever
ConfigRetrieverOptions retrieverOptions = new ConfigRetrieverOptions()
.addStore(defaultOpts)
.addStore(confOpts);
// Create the ConfigRetriever and return the Maybe when complete
return ConfigRetriever.create(vertx, retrieverOptions).rxGetConfig().toMaybe();
}
@Override
public void start(Future<Void> startFuture) {
initConfigRetriever() // // (7)
.doOnError(startFuture::fail) // // (8)
.subscribe(c -> {
LOG.info(c.encodePrettily());
context.config().mergeIn(c); // // (9)
startFuture.complete(); // // (10)
});
}
}
-
Define a new method which returns a
Single
with the configuration -
Create an instance of
ConfigStoreOptions
to load the default config from the classpath -
Create an instance of
ConfigStoreOptions
to load configuration data from inside a Docker container -
Create an instance of
ConfigStoreOptions
to load configuration data from Kubernetes ConfigMaps -
Attach the
ConfigStoreOptions
to theConfigRetrieverOptions
-
Return the RxJava2
Maybe
which may be completed at a later time -
From inside of the
start
method, callinitConfigRetriever
-
Set an error handler for the
Single
which will fail the Verticle deployment when an error is encountered -
Merge the loaded configuration into the global Vert.x configuration
-
Complete the
startFuture
successfully
This example replaces the generic Verticle type with one which has been refactored to use Reactive Extensions. Most of the rest of this Workshop with rely on using ReactiveX for our Vert.x code.
Important
|
When using the Vert.x ConfigStoreOptions, remember that the order in which ConfigStoreOptions are added
is significant. Items added later will override values from items which were loaded earlier. For example, if the
default config sets |
Note
|
The single Spock test which we wrote allows us to achieve 100% line AND branch coverage without using ANY dependency injection because it is a limited form of integration test. Admittedly, the test does not test the Kubernetes ConfigMap unless it is run inside of Kubernetes/OpenShift, but that would violate the rule of Don’t test the framework, only test your code. Keep in mind that I am NOT advocating for 100% coverage, as that leads to spending a lot of time and resources for little gain. You should, however, aim to cover all critical paths in your tests. |
A really impressive feature we can use in Vert.x is it’s low-level support for clustering and distributed processing using the Event Bus. Vert.x supports a few different cluster manager implementations like Hazelcast, Infinispan, Apache Ignite, and Apache Zookeeper. Each of these has different use cases, but all accomplish the same goal: Vert.x instances can discover one-another and then form a mesh-network over the event bus. Once a cluster is formed, the features available include:
-
Discovery and group membership of Vert.x nodes in a cluster
-
Maintaining cluster wide topic subscriber lists (so we know which nodes are interested in which event bus addresses)
-
Distributed Map support
-
Distributed Locks
-
Distributed Counters
These features are used in some of Vert.x’s other features for things like distributed session management for web applications.
For this workshop, since we are using Kubernetes, we will use the Infinispan Cluster Manager. The Maven dependecies have already been added to your project POM as part of the skeleton project.
When running locally, we can just add --cluster
to the execution of the fat-jar and Infinispan+JGroups will automatically
discover other Vert.x nodes on the same network segment via Multicast DNS (MCAST_PING).
When running inside of Kubernetes or OpenShift, we change the config with the system properties
-Dvertx.jgroups.config=default-configs/default-jgroups-kubernetes.xml -Djava.net.preferIPv4Stack=true
.
In our deployment environment, we need to set the default
service account to have view
role inside of the
namespace and also set the KUBERNETES_NAMESPACE
environment variable. With these changes in place, the Vert.x Pods
will automatically discover one another using the Kubernetes API.
This means that event bus messages can be sent and received across all Vert.x instances. This makes it possible to:
-
Distribute capabilities to different microservices in a reactive manner
-
Scale microservices independently
-
Coordinate across microservices using distributed counters and locks
-
Provide a simple API for service communications without the overhead of REST (see Service Proxies)
Note
|
If you are using an OpenShift cluster which has Multi-tenant networking and UDP multicast enabled, you can skip the Kubernetes specific config as the cluster will allow Multicast DNS discovery inside of each Namespace just fine. |
We are already following the RED-GREEN-REFACTOR pattern of Test Driven Development. We’ve already used Vert.x' JUnit-compatible library vertx-unit.
At Red Hat’s Open Innovation Labs, we prefer BDD style tests. For most apps we like the Cucumber Framework. Asynchronous testing is different though, and Eclipse Vert.x is asynchronous. For this lab we wil use the SpockFramework which has excellent support for asynchronous testing.
The Spock dependencies have already been added to our POM but are listed below so that you can familiarize yourself with them.
<!-- Optional dependencies for using Spock --> <dependency> <!-- use a specific Groovy version rather than the one specified by spock-core --> <groupId>org.codehaus.groovy</groupId> <artifactId>groovy-all</artifactId> <version>2.4.13</version> </dependency> <dependency> <!-- enables mocking of classes (in addition to interfaces) --> <groupId>net.bytebuddy</groupId> <artifactId>byte-buddy</artifactId> <version>1.6.5</version> <scope>test</scope> </dependency> <dependency> <!-- enables mocking of classes without default constructor (together with CGLIB) --> <groupId>org.objenesis</groupId> <artifactId>objenesis</artifactId> <version>2.5.1</version> <scope>test</scope> </dependency>
Spock is written in Groovy, and we have also added the gmavenplus-plugin and the Surefire and JaCoCo plugins for tests and code coverage:
<plugin> <!-- The gmavenplus plugin is used to compile Groovy code. To learn more about this plugin, visit https://github.com/groovy/GMavenPlus/wiki --> <groupId>org.codehaus.gmavenplus</groupId> <artifactId>gmavenplus-plugin</artifactId> <version>1.6</version> <executions> <execution> <goals> <goal>compile</goal> <goal>compileTests</goal> </goals> </execution> </executions> </plugin> <plugin> <!-- Configure the Maven SureFire plugin to use Groovy Spec files for test --> <artifactId>maven-surefire-plugin</artifactId> <version>2.20.1</version> <configuration> <useFile>false</useFile> <includes> <include>**/*Spec.java</include> </includes> </configuration> </plugin> <plugin> <!-- Configure JaCoCo to be able to extract code coverage information --> <groupId>org.jacoco</groupId> <artifactId>jacoco-maven-plugin</artifactId> <version>0.7.6.201602180812</version> <executions> <execution> <id>jacoco-initialize</id> <goals> <goal>prepare-agent</goal> </goals> </execution> <execution> <id>jacoco-site</id> <phase>test</phase> <goals> <goal>report</goal> </goals> </execution> </executions> </plugin>
Vert.x provides a facility to make it easier to consume/produce messages on the Event Bus. In the first event bus example of sending and receiving on the event bus, we used a producer and a consumer based on rx-java2. Setting each of these various endpoints can become tedious and does not provide the best developer experience. Instead, we can use Vert.x Service Proxies to provide an easier way to implement business logic and then expose that business logic on the event bus in a more consumable manner. These Service Proxy implementations can also be used in a clustered Vert.x environment to allow us to have simple interactions between services across multiple microservices. For our workshop, we will be integrating with a few other microservices which provide the Nouns and Adjectives for our insults. The Noun service is implemented in NodeJS, while the Adjective service is implemented using Spring Boot.
Something to keep in mind with using Service Proxies is that we should follow the same best practices that we use when creating REST APIs. Some examples:
-
Version your APIs, generally done by using versioned event bus addresses
-
Have a strong API contract, often by generating
In order for the Vert.x code generation to work, we need to annotate the package which will contain the code to be
processed. We do this by creating a package-info.java
file. You should place this file deep enough in the hierarchy
that will prevent most of the code from being considered, but high enough that you can process all required code:
@ModuleGen(name = "insult", groupPackage = "com.redhat.qcon.insult.services")
package com.redhat.qcon.insult.services;
import io.vertx.codegen.annotations.ModuleGen;
Note
|
If you are using an IDE like Eclipse or IntelliJ, you will need to ensure that you enable annotation processing for the Java compiler. Vert.x uses the annotation processor to generate implementation code for Service Proxy implementations. Depending on which Vert.x language modules you have included in your Maven dependencies, Vert.x will generate Service Proxy client code for each supported language runtime like: JavaScript, RxJava, Kotlin, etc… |
All service proxies start with an Interface definition which looks something like this:
package com.redhat.qcon.insult.services;
import io.vertx.codegen.annotations.Fluent;
import io.vertx.codegen.annotations.ProxyGen;
import io.vertx.codegen.annotations.VertxGen;
import io.vertx.core.AsyncResult;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import io.vertx.core.json.JsonObject;
@ProxyGen
@VertxGen
public interface InsultService {
static InsultService create(Vertx vertx) {
return new InsultServiceImpl(vertx, vertx.getOrCreateContext().config());
}
static InsultService createProxy(Vertx vertx, String address) {
return new InsultServiceVertxEBProxy(vertx, address);
}
// Business logic methods here!!
/**
* Retrieve an insult from the child services and build the insult payload
* @param insultGetHandler A {@link Handler} callback for the results
*/
void getREST(Handler<AsyncResult<JsonObject>> insultGetHandler);
/**
* Publish a "liked" insult to the Kafka queue to be distributed to all of the other
* clusters
* @param insult An insult made up of 2 adjectives and a noun
* @param insultPublishHandler A {@link Handler} callback for the results
*/
@Fluent
InsultService publish(JsonObject insult, Handler<AsyncResult<Void>> insultPublishHandler);
}
All of the business logic methods return "void" or the can be fluent and return their service instance. The two static methods at the beginning are boilerplate for Service Proxies. These methods are used by the underlying runtime to provide a simple means of wiring up the service proxy.
Note
|
The business logic methods do not have an access modifier set (e.g. |
When we use the philosophy of test first development, we expect to follow the pattern of:
-
RED
-
GREEN
-
REFACTOR
This means that we write a test before we write any code and expect it to fail (red). We then write code until the test passes (green). Finally, we plan for any refactoring and start the loop over again.
Note
|
Using a tool like Infinitest can be VERY useful to improve your development iterations. Infinitest watches for changes in your source code and constantly re-runs the appropriate tests when a file changes. |
Now that we have an interface, we need to create a series of tests for that code. As mentioned at the start, we will be
using Spock Framework in order to write BDD style Specifications.
Spock Specifications are quite easy to write and lend themselves to readability. We start off with a Groovy class which
extends Specification
.
package com.redhat.qcon.insult.services
import spock.lang.Specification
class InsultServiceImplSpec extends Specification {
}
That’s the extent of the boilerplate required for writing tests. Spock also supports pre-operations like:
-
setup()
- A method run before EACH TEST -
setupSpec()
- A method run before the entire test class -
cleanup()
- A method run after EACH TEST -
cleanupSpec()
- A method run after ALL of the tests in the class are complete
Any resources which you would want to re-use across tests will need to be defined at the class scope and annotated as
@Shared
, otherwise Spock will prevent the tests from running. This is to ensure that you don’t accidentally re-use
state without being explicit. static final
field are acceptable too.
package com.redhat.qcon.insult.services
import com.redhat.qcon.insult.services.InsultServiceImpl
import io.specto.hoverfly.junit.core.Hoverfly
import io.specto.hoverfly.junit.core.SimulationSource
import io.vertx.core.Vertx
import io.vertx.core.json.JsonObject
import io.vertx.reactivex.circuitbreaker.CircuitBreaker
import io.vertx.reactivex.core.CompositeFuture
import io.vertx.serviceproxy.ServiceException
import spock.lang.Shared
import spock.lang.Specification
import spock.lang.Unroll
import spock.util.concurrent.AsyncConditions
import java.util.concurrent.TimeUnit
import static io.netty.handler.codec.http.HttpHeaderValues.APPLICATION_JSON
import static io.specto.hoverfly.junit.core.HoverflyConfig.localConfigs
import static io.specto.hoverfly.junit.core.HoverflyMode.SIMULATE
import static io.specto.hoverfly.junit.core.SimulationSource.*
import static io.specto.hoverfly.junit.dsl.HoverflyDsl.*
import static io.specto.hoverfly.junit.dsl.ResponseCreators.serverError
import static io.specto.hoverfly.junit.dsl.ResponseCreators.success
class InsultServiceImplSpec extends Specification {
@Shared
Hoverfly hoverfly
@Shared
Vertx vertx
@Shared
JsonObject proxyOptions
static final String NOUN_RESPONSE_BODY_ONE =
new JsonObject().put('noun', 'noun').encodePrettily()
static final String ADJ_RESPONSE_BODY_ONE =
new JsonObject().put('adj', 'adjective').encodePrettily()
}
You will notice that we have defined a Hoverfly
instance, and we will use this in our tests to simulate the dependent
services which this service will interact with. Hoverfly implements an HTTP proxy which can intercept simulated
interactions with an external service. We will use this to simulate both successful and failed responses from the
other microservices so that we can easily test in isolation.
To implement the simulations in Hoverfly, we use the
Hoverfly DSL to
define a SimulationSource
.
// -- SNIP --
static final String NOUN_RESPONSE_BODY_ONE =
new JsonObject().put('noun', 'noun').encodePrettily()
static final String ADJ_RESPONSE_BODY_ONE =
new JsonObject().put('adj', 'adjective').encodePrettily()
static final SimulationSource GET_RESP_ONE = dsl( // // (1)
service('localhost')
.get("/api/v1/noun")
.willReturn(success(NOUN_RESPONSE_BODY_ONE,
APPLICATION_JSON.toString())),
service('localhost')
.get("/api/v1/adjective")
.willReturn(success(ADJ_RESPONSE_BODY_ONE,
APPLICATION_JSON.toString())))
static final SimulationSource GET_RESP_TWO = dsl( // // (2)
service('localhost')
.get("/api/v1/noun")
.willReturn(serverError()))
static final SimulationSource GET_RESP_THREE = dsl( // // (3)
service('localhost')
.andDelay(10, TimeUnit.SECONDS).forAll(),
service('localhost')
.get('/api/v1/noun')
.willReturn(success(NOUN_RESPONSE_BODY_ONE,
APPLICATION_JSON.toString())),
service('localhost')
.get("/api/v1/adjective")
.willReturn(success(ADJ_RESPONSE_BODY_ONE,
APPLICATION_JSON.toString())))
def setupSpec() { // // (4)
System.setProperty('org.slf4j.simpleLogger.defaultLogLevel', 'debug')
def hoverflyConfig = localConfigs().proxyLocalHost().captureAllHeaders()
hoverfly = new Hoverfly(hoverflyConfig, SIMULATE)
hoverfly.start()
proxyOptions = new JsonObject()
.put('host', 'localhost')
.put('port', hoverfly.hoverflyConfig.proxyPort)
.put('type', 'HTTP')
vertx = Vertx.vertx() // // (5)
}
def setup() {
hoverfly.resetJournal() // // (6)
}
// -- SNIP --
-
The first simulation is the happy path where a good response is sent by both services
-
The second simulation has the noun service return a 5XX error
-
The third simulation returns a 2XX response, but after a delay (This will be used later to test circuit breakers)
-
In the
setupSpec()
method, we instantiate the Hoverfly proxy service and extract the proxy settings -
Finally, we create a new
Vertx
instance within which we will run all of our specifications -
In the
setup()
method, we ensure that the state of Hoverfly is reset before each test specification
Now that we have set up the simulations for the dependent services, we can write our specification test.
@Unroll // // (9)
def 'Test getting a noun: #description'() {
setup: 'Http Client Config to work with Hoverfly' // // (1)
def httpClientConfig = new JsonObject()
.put('noun',
new JsonObject().put('host', 'localhost')
.put('ssl', false)
.put('port', 80)
.put('proxyOptions', proxyOptions)
)
.put('adjective',
new JsonObject().put('host', 'localhost')
.put('ssl', false)
.put('port', 80)
.put('proxyOptions', proxyOptions)
)
and: 'Create the service under test' // // (2)
InsultServiceImpl underTest = new InsultServiceImpl(vertx, httpClientConfig)
and: 'AsyncConditions' // // (3)
def conds = new AsyncConditions(1)
and: 'Service virtualization has been configured' // // (4)
hoverfly.simulate(simulation)
and: 'We call the Service Proxy' // // (5)
underTest.getREST({ res ->
conds.evaluate { // // (6)
assert succeeded == res.succeeded()
assert res?.result()?.getJsonArray('adjectives')?.getAt(0) == adjective
assert res?.result()?.getString('noun') == noun
}
})
expect: 'The appropriate response to REST calls' // // (7)
conds.await(10)
where: 'The following data is applied' // // (8)
simulation | description || succeeded | adjective | noun
GET_RESP_ONE | 'Happy path' || true | 'adjective' | 'noun'
GET_RESP_TWO | 'Server error' || false | null | null
GET_RESP_THREE | 'Slow reply' || true | 'adjective' | 'noun'
}
-
Create a JsonObject which will cause the service to make requests through the Hoverfly proxy
-
Instantiate the service to be tested using that configuration
-
Create an instance of
AsyncConditions
so that we can coordinate with reactive code -
Set the simulation to be used
-
Make the call to the service under test and create a lambda handler which evaluates the results using the
AsyncConditions
-
Inside of the
conds.evaluate { }
block, we can place our assertions -
Tell
AsyncConditions
to way 10 seconds for the async operations to complete -
Use a data table to allow us to run multiple test scenarios with a single block of code
-
Use the
@Unroll
annotation to show each test iteration as a separate test
If you are at all familiar with Behavior Driven Development (BDD) or have worked a lot in QA, you will probably recognize the format of the Spock tests… They follow the Gherkin model of defining specifications. Either given, when, then, or expect, where.
You’ll also notice that we can test 3 different scenarios with the same code by putting different data into the table. We could add as many of these as we like to ensure we cover the critical paths.
And when we run our newly created test specification, we should see that all 3 tests fail. RED, just as expected.
We can now create a class which implements our interface. We have left the service implementation class as a stub so that we can delve deeper into HTTP clients in the next section.
package com.redhat.qcon.insult.services;
import io.vertx.core.AsyncResult;
import io.vertx.core.Handler;
import io.vertx.core.json.JsonObject;
import io.vertx.reactivex.core.Vertx;
import static java.lang.String.format;
public class InsultServiceImpl implements InsultService {
/**
* Request adjectives and a noun from the other microservices
* @param insultGetHandler A {@link Handler} callback for the results
*/
@Override
public void getREST(Handler<AsyncResult<JsonObject>> insultGetHandler) {
insultGetHandler.handle(Future.failedFuture(new Exception("Not Implemented")));
}
/**
* The the {@link KafkaService} event bus proxy to make calls to the Kafka
* microservice
* @param insult An insult made up of 2 adjectives and a noun
* @param handler A handler to be called
*/
@Override
public InsultService publish(JsonObject insult, Handler<AsyncResult<Void>> handler) {
handler.handle(Future.failedFuture(new Exception("Not Implemented")));
}
}
Vert.x provides both a high-level and low-level method of interacting with HTTP servers. In the core Vert.x package,
there is vertx.createHttpClient()
, and it allows for very customizable handling of making requests to HTTP servers.
For more simple interactions with HTTP servers, the Vert.x team provides the vertx-web-client
library. For interacting
with the other microservices via REST, we will use the web client. The simple case of using the web client is
demonstrated below:
WebClientOptions opts = new WebClientOptions()
.setLogActivity(true)
.setDefaultHost("localhost")
.setDefaultPort(8080)
.setProxyOptions(proxyOptions);
WebClient client = WebClient.create(vertx, opts);
client.get("/some/path")
.timeout(3000)
.rxSend()
.map(resp -> { // Map 4XX and 5XX responses to Exceptions
if (resp.statusCode()>=400) {
throw new HttpResponseException(resp.statusCode(), resp.statusMessage());
}
return resp;
})
.map(HttpResponse::bodyAsJsonObject)
.subscribe(json -> {
// Handle successful JSON response body
},
e -> {
// Handle exceptions
});
This is pretty concise code, but it is not terribly readable. The in-line lambdas are also difficult to test in isolation. Instead, we can extract those into void stateless methods which are simple to test and read.
private HttpResponse<Buffer> mapErrors(HttpResponse<Buffer> resp)
throws HttpResponseException {
// Map 4XX and 5XX responses to Exceptions
if (resp.statusCode()>=400) {
throw new HttpResponseException(resp.statusCode(), resp.statusMessage());
}
return resp;
}
private void example(Future<JsonObject> httpResponse) {
WebClientOptions opts = new WebClientOptions()
.setLogActivity(true)
.setDefaultHost("localhost")
.setDefaultPort(8080)
.setProxyOptions(proxyOptions);
WebClient client = WebClient.create(vertx, opts);
client.get("/some/path")
.timeout(3000)
.rxSend()
.map(this::mapErrors)
.map(HttpResponse::bodyAsJsonObject)
.doOnError(httpResponse::fail)
.subscribe(httpResponse::complete);
}
This code is simple to test and easier to read. As we go forward, this is the pattern I will advocate.
Now, let’s implement some methods so that we can make requests to our associated microservices and retrieve the results.
package com.redhat.qcon.insult.services;
import io.vertx.core.AsyncResult;
import io.vertx.core.Handler;
import io.vertx.core.json.JsonObject;
import io.vertx.reactivex.core.Vertx;
import static java.lang.String.format;
import io.vertx.reactivex.core.Future;
public class InsultServiceImpl implements InsultService {
Vertx vertx;
WebClient nounClient, adjClient;
/**
* Default constructor
* @param vertx The Vert.x instance to be used
* @param config The {@link JsonObject} configuration for this service
*/
public InsultServiceImpl(io.vertx.core.Vertx vertx, JsonObject config) {
kafka = KafkaService.createProxy(Vertx.newInstance(vertx), "kafka.service");
JsonObject nounConfig = config.getJsonObject("noun");
JsonObject adjConfig = config.getJsonObject("adjective");
this.vertx = Vertx.newInstance(vertx);
WebClientOptions nounClientOpts = new WebClientOptions(nounConfig)
.setLogActivity(true);
WebClientOptions adjClientOpts = new WebClientOptions(adjConfig)
.setLogActivity(true);
nounClient = WebClient.create(this.vertx, nounClientOpts);
adjClient = WebClient.create(this.vertx, adjClientOpts);
}
/**
* Request adjectives and a noun from the other microservices
* @param insultGetHandler A {@link Handler} callback for the results
*/
@Override
public void getREST(Handler<AsyncResult<JsonObject>> insultGetHandler) {
throw new Exception("Not Implemented");
}
/**
* The the {@link KafkaService} event bus proxy to make calls to the
* Kafka microservice
* @param insult An insult made up of 2 adjectives and a noun
* @param handler A handler to be called
*/
@Override
public InsultService publish(JsonObject insult, Handler<AsyncResult<Void>> handler) {
throw new Exception("Not Implemented");
}
/**
* Maps HTTP error status codes to exceptions to interrupt the RxJava stream
* processing and trigger an error handler
* @param r The {@link HttpResponse} to be checked
* @return The same as the input if the response code is 2XX
* @throws Exception If the {@link HttpResponse} code is 4XX or 5XX
*/
private static final HttpResponse<Buffer> mapStatusToError(HttpResponse<Buffer> r)
throws Exception {
if (r.statusCode()>=400) {
throw new Exception(format("%d: %s\n%s", r.statusCode(),
r.statusMessage(), r.bodyAsString()));
} else {
return r;
}
}
/**
* Requests a noun from the appropriate microservice and returns a future with the
* result
* @return A {@link Future} of type {@link JsonObject} which will contain a noun on
* success
*/
Future<JsonObject> getNoun() {
Future<JsonObject> fut = Future.future();
nounClient.get("/api/v1/noun")
.timeout(3000)
.rxSend()
.map(InsultServiceImpl::mapStatusToError)
.map(HttpResponse::bodyAsJsonObject)
.doOnError(fut::fail)
.subscribe(fut::complete);
return fut;
}
/**
* Requests an adjective from the appropriate microservice and returns a future with
* the result
* @return A {@link Future} of type {@link JsonObject} which will contain an adjective
* on success
*/
Future<JsonObject> getAdjective() {
Future<JsonObject> fut = Future.future();
adjClient.get("/api/v1/adjective")
.timeout(3000)
.rxSend()
.map(InsultServiceImpl::mapStatusToError)
.map(HttpResponse::bodyAsJsonObject)
.doOnError(fut::fail)
.subscribe(fut::complete);
return fut;
}
}
We have defined some new methods which allow us to make HTTP requests and return `Future`s for asynchronous interaction. These futures can then be composed in another method:
// -- SNIP --
/**
* When the {@link CompositeFuture} is failed, throws an exception in order to
* interrups the RxJava stream processing
* @param res The {@link CompositeFuture} to be processed
* @return Same as the input as long as the {@link CompositeFuture} was succeeded
* @throws Exception If the {@link CompositeFuture} is failed
*/
private static final CompositeFuture mapResultToError(CompositeFuture res)
throws Exception {
if (res.succeeded()) {
return res;
}
throw new Exception(res.cause());
}
/**
* Take results of {@link CompositeFuture} and return a composed {@link JsonObject}
* containing the insult components
* @param cf An instance of {@link CompositeFuture} which MUST be succeeded,
* otherwise it would have been filtered
* @return A {@link JsonObject} containing a noun and an array of adjectives.
*/
private static AsyncResult<JsonObject> buildInsult(CompositeFuture cf) {
JsonObject insult = new JsonObject();
JsonArray adjectives = new JsonArray();
// Because there is no garanteed order of the returned futures, we need to parse
the results
for (int i=0; i<=cf.size(); i++) {
JsonObject item = cf.resultAt(i);
if (item.containsKey("adjective")) {
adjectives.add(item.getString("adjective"));
} else {
insult.put("noun", item.getString("noun"));
}
}
insult.put("adjectives", adjectives);
return Future.succeededFuture(insult);
}
/**
* Request adjectives and a noun from the other microservices
* @param insultGetHandler A {@link Handler} callback for the results
*/
@Override
public void getREST(Handler<AsyncResult<JsonObject>> insultGetHandler) {
// Request 2 adjectives and a noun in parallel, then handle the results
CompositeFuture.all(getNoun(), getAdjective(), getAdjective())
.rxSetHandler()
.map(InsultServiceImpl::mapResultToError) // // (1)
.map(InsultServiceImpl::buildInsult) // // (2)
.onErrorReturn(Future::failedFuture) // // (3)
.subscribe(insultGetHandler::handle); // // (4)
}
-- SNIP --
-
Map errors to an exception
-
Combine the 3 results into a single JSON object
-
When an exception happens, map it to a failed future
-
Map successful JSON to a succeeded future
CompositeFuture.all(…)
tells Vert.x to run ALL of the specified methods and return when ANY of them fails or ALL
of them succeed. All of the methods are run non-blocking and potentially in parallel. In this case, we request 2
adjectives and a noun. We then pipe the the result through a handler which combines the 3 successful results into a
single JsonObject
.
There is a third microservice implemented for you which we will need to interact with. This service is implemented in Vert.x and uses Service Proxies. Since it is implemented with Service Proxies, we can consume that service using the Service Proxy client which is generated for us by Vert.x’s Code Generation.
First, add the library for the service proxy client:
<!-- SNIP -->
<dependency>
<groupId>com.redhat.qcon</groupId>
<artifactId>kafka-service</artifactId>
<version>1.0.0-SNAPSHOT</version>
</dependency>
<!-- SNIP -->
This artifact is available from the Nexus server which is on our workshop WiFi network. If you cannot get this artifact, you can compile and install it locally by running mvn clean package install
inside of the kafka-service
project. Next, we can create an instance of the Service Proxy from our application.
public class InsultServiceImpl implements InsultService {
Vertx vertx;
WebClient nounClient, adjClient;
KafkaService kafka;
/**
* Default constructor
* @param vertx The Vert.x instance to be used
* @param config The {@link JsonObject} configuration for this service
*/
public InsultServiceImpl(io.vertx.core.Vertx vertx, JsonObject config) {
kafka = KafkaService
.createProxy(Vertx.newInstance(vertx), "kafka.service"); // // (1)
JsonObject nounConfig = config.getJsonObject("noun");
JsonObject adjConfig = config.getJsonObject("adjective");
this.vertx = Vertx.newInstance(vertx);
WebClientOptions nounClientOpts = new WebClientOptions(nounConfig)
.setLogActivity(true);
WebClientOptions adjClientOpts = new WebClientOptions(adjConfig)
.setLogActivity(true);
nounClient = WebClient.create(this.vertx, nounClientOpts);
adjClient = WebClient.create(this.vertx, adjClientOpts);
}
/**
* Use the {@link KafkaService} event bus proxy to make calls to the
* Kafka microservice
* @param insult An insult made up of 2 adjectives and a noun
* @param handler A handler to be called
*/
@Override
public InsultService publish(JsonObject insult, Handler<AsyncResult<Void>> handler) {
Future<Void> fut = Future.future(); // // (2)
handler.handle(fut); // // (3)
kafka.rxPublish(insult)
.toObservable()
.doOnError(fut::fail) // // (4)
.subscribe(v -> fut.complete()); // // (5)
return this;
}
// -- SNIP --
-
Create an instance of the proxy at the class scope (See note below)
-
Create a Future with which to complete the handler
-
Set the handler to use the Future
-
Set the Rx error handler to fail the Future on error
-
Set the completion of the Rx stream to complete the Future
Important
|
When creating an instance of a Service Proxy client, you MUST ensure that the address on the event bus matches the address on the service which binds that address. If the addresses do not match, you will be sending messages into an unused message queue. |
Note
|
When compiling Service Proxy code, the code generator will generate client code in
every currently supported language library currently on the classpath. This means that
if your have |
Since we are operating in a microservices environment, we would be remiss if we did not implement circuit breakers to ensure timely responses from our applications. Vert.x has an add-on module for implementing Circuit Breakers in the Vert.x non-blocking and reactive environment.
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-circuit-breaker</artifactId>
</dependency>
Back to our test first development idea, we need to refactor our test to account for our new capability. We’ll modify the test result for the delayed response test so that we expect a circuit breaker result instead of a normal success response.
static final SimulationSource GET_RESP_THREE = dsl(
service('noun-service')
.get('/api/v1/noun')
.willReturn(success(NOUN_RESPONSE_BODY_ONE,
APPLICATION_JSON.toString())
.withDelay(10, TimeUnit.SECONDS)),
service('adjective-service')
.get("/api/v1/adjective")
.willReturn(success(ADJ_RESPONSE_BODY_ONE,
APPLICATION_JSON.toString())))
static final SimulationSource GET_RESP_FOUR = dsl(
service('noun-service')
.get('/api/v1/noun')
.willReturn(success(NOUN_RESPONSE_BODY_ONE,
APPLICATION_JSON.toString())),
service('adjective-service')
.get("/api/v1/adjective")
.willReturn(success(ADJ_RESPONSE_BODY_ONE,
APPLICATION_JSON.toString())
.withDelay(10, TimeUnit.SECONDS)))
// -- SNIP --
@Unroll
def 'Test getting an insult: #description'() {
setup: 'Create the service under test'
InsultServiceImpl underTest = new InsultServiceImpl(vertx, httpClientConfig)
and: 'AsyncConditions'
def conds = new AsyncConditions(1)
and: 'Service virtualization has been configured'
hoverfly.simulate(simulation)
and: 'We call the Service Proxy'
underTest.getREST({ res ->
conds.evaluate {
assert succeeded == res.succeeded()
def body = res?.result()
assert body?.getString('adj1') == adjective
assert body?.getString('adj2') == adjective
assert body?.getString('noun') == noun
}
})
expect: 'The appropriate response to REST calls'
conds.await(15)
hoverfly.resetJournal()
hoverfly.reset()
where: 'The following data is applied'
simulation | description || succeeded | adjective | noun
GET_RESP_ONE | 'Happy path' || true | 'adjective' | 'noun'
GET_RESP_TWO | 'Server error' || true | '[failure]' | '[failure]'
GET_RESP_THREE | 'Slow adj reply' || true | 'adjective' | '[failure]'
GET_RESP_FOUR | 'Slow noun reply' || true | '[failure]' | 'noun'
}
When we run this modified test, we expect that the last 2 scenarios will fail because we have not yet implemented the circuit breakers.
Let’s have a look at how we would add circuit breakers to the method which calls the adjective service.
private final CircuitBreaker adjBreaker;
private final CircuitBreaker nounBreaker;
/**
* Default constructor
* @param vertx The Vert.x instance to be used
* @param config The {@link JsonObject} configuration for this service
*/
public InsultServiceImpl(io.vertx.core.Vertx vertx, JsonObject config) {
this.config = config;
kafka = KafkaService.createProxy(Vertx.newInstance(vertx), "kafka.service");
nounHost = config.getJsonObject("noun").getString("host");
nounPort = config.getJsonObject("noun").getInteger("port");
adjHost = config.getJsonObject("adjective").getString("host");
adjPort = config.getJsonObject("adjective").getInteger("port");
this.vertx = Vertx.newInstance(vertx);
WebClientOptions clientOpts = new WebClientOptions()
.setLogActivity(false);
if (config.containsKey("proxyOptions")) {
clientOpts.setProxyOptions(new ProxyOptions(config.getJsonObject("proxyOptions")));
}
webClient = WebClient.create(this.vertx, clientOpts);
CircuitBreakerOptions breakerOpts = new CircuitBreakerOptions()
.setFallbackOnFailure(true)
.setMaxFailures(2)
.setMaxRetries(2)
.setResetTimeout(RESET_TIMEOUT)
.setTimeout(CIRCUIT_TIMEOUT);
adjBreaker = CircuitBreaker
.create("adjBreaker", Vertx.newInstance(vertx), breakerOpts)
.openHandler(t -> circuitBreakerHandler("adj", "[open]"));
nounBreaker = CircuitBreaker
.create("nounBreaker", Vertx.newInstance(vertx), breakerOpts)
.openHandler(t -> circuitBreakerHandler("noun", "[open]"));
}
public JsonObject circuitBreakerHandler(String key, String value) {
LOG.error("Timeout requesting '{}', returned '{}'", key, value);
return new JsonObject().put(key, value);
}
/**
* Requests an adjective from the appropriate microservice and returns a future with the result
* @return A {@link Future} of type {@link JsonObject} which will contain an adjective on success
*/
io.vertx.reactivex.core.Future<JsonObject> getAdjective() {
return adjBreaker.executeWithFallback(fut ->
webClient.get(adjPort, adjHost, "/api/v1/adjective")
.timeout(HTTP_CLIENT_TIMEOUT)
.rxSend()
.doOnError(e -> LOG.error("REST Request failed", e))
.flatMapMaybe(InsultServiceImpl::mapStatusToError)
.map(HttpResponse::bodyAsJsonObject)
.subscribe(
fut::complete,
fut::fail
),
t -> circuitBreakerHandler("adjective", "[failure]")
);
}
In order to use the OpenAPI Router Factory in Vert.x we must first create a new API specification using OpenAPI v3. For our Insult service, we will write a simple REST API specification using YAML.
openapi: 3.0.0
info:
version: "1.0.5"
title: 'Insult Service'
description: 'API for Elizabethan Insult Generator Microservices'
tags:
- name: insult
description: An Elizabethan derrogatory insult
paths:
/health:
get:
summary: Get the health of the service
description: 'Returns an HTTP response code and a JSON document detailing the current health status of the service'
operationId: health
responses:
'200':
description: OK
content:
application/json:
schema:
$ref: '#/components/schemas/health'
'500':
description: 'Server error'
content:
application/json:
schema:
$ref: '#/components/schemas/health'
/insult:
get:
tags:
- insult
summary: Get an Insult
description: ''
operationId: getInsult
responses:
'200':
description: OK
content:
application/json:
schema:
$ref: '#/components/schemas/insult'
components:
schemas:
health:
type: object
required:
- status
properties:
status:
type: string
noun:
type: string
adj:
type: string
insult:
type: object
required:
- noun
- adj
properties:
noun:
type: string
adj1:
type: string
adj2:
type: string
Note
|
The YAML itself is only somewhat helpful for understanding the API. It is VERY useful to visualize the Spec in Swagger Editor. It is highly recommended that you paste the code above into that editor to view the API specification. |
Earlier, we created the skeleton of the MainVerticle
class. Now we will expand that
service to wire up the service proxy we have written and we will expose that service
via both REST and WebSockets. Using the code generated by Vert.x, that websocket
interface to the service can be used from JavaScript front-end applications to make
API calls in a completely Reactive manner.
The first step is to create an instance of the OpenAPI3RouterFactory:
public static final String INSULT_SERVICE_ADDRESS = "insult.service";
public static final String INSULT_API_SPEC_YAML = "/insult-api-spec.yaml";
/**
* Begin the creation of the {@link OpenAPI3RouterFactory}
*
* @param config The config loaded via the {@link ConfigRetriever}
* @return An {@link OpenAPI3RouterFactory} {@link Future} to be used to
* complete the next Async step
*/
private Maybe<OpenAPI3RouterFactory> provisionRouter(JsonObject config) {
// Merge the loaded configuration into the config for this Verticle
loadedConfig = config().mergeIn(config);
if (LOG.isInfoEnabled()) {
LOG.info("Config Loaded: {}", loadedConfig.encodePrettily());
}
// Instantiate the Insult Service and bind it to the event bus
InsultServiceImpl nonRx = new InsultServiceImpl(vertx.getDelegate(), loadedConfig);
new ServiceBinder(vertx.getDelegate()).setAddress(INSULT_SERVICE_ADDRESS)
.register(com.redhat.qcon.insult.services.InsultService.class, nonRx);
// Create the OpenAPI3RouterFactory using the API specification YAML file
return OpenAPI3RouterFactory.rxCreate(vertx, INSULT_API_SPEC_YAML).toMaybe();
}
This method returns a Maybe
which allows us to compose it upon success or handle
errors.
Second, we need to use the OpenAPI3RouterFactory to wire up our Service Proxy to the API endpoints. We also need to export certain Event Bus addresses to enable those service proxy interfaces for the front-end web application.
/**
* Wire the {@link OpenAPI3RouterFactory} into the HTTP Server
*
* @param factory The {@link OpenAPI3RouterFactory} created in the previous step
* @return An {@link HttpServer} if successful
*/
Maybe<HttpServer> provisionHttpServer(OpenAPI3RouterFactory factory) {
// Configure the HTTP Server options
// - Listen on port 8080 on all interfaces using HTTP2 protocol
HttpServerOptions httpOpts = new HttpServerOptions()
.setHost(loadedConfig.getJsonObject("http").getString("address"))
.setPort(loadedConfig.getJsonObject("http").getInteger("port")).setReuseAddress(true)
.setSoLinger(0).setLogActivity(true);
InsultService service = InsultService
.newInstance(new ServiceProxyBuilder(vertx.getDelegate()).setAddress(INSULT_SERVICE_ADDRESS)
.build(com.redhat.qcon.insult.services.InsultService.class));
// Map out OpenAPI3 route to our Service Proxy implementation
factory.addHandlerByOperationId("getInsult", ctx -> service.rxGetREST()
.subscribe(
json -> sendResult(ctx, json),
e -> errorHandler(ctx, e)));
// Map out OpenAPI3 route to our Service Proxy implementation
factory.addHandlerByOperationId("health", ctx -> service.rxCheck()
.subscribe(
json -> sendResult(ctx, json),
e -> errorHandler(ctx, e)));
Router api = factory.getRouter();
Router root = Router.router(vertx);
CorsHandler corsHandler = CorsHandler.create(".*").allowedHeader("Access-Control-Request-Method")
.allowedHeader("Access-Control-Allow-Origin")
.allowedHeader("Access-Control-Allow-Headers").allowedHeader("Content-Type")
.allowedMethod(HttpMethod.GET).allowedMethod(HttpMethod.POST)
.allowedMethod(HttpMethod.HEAD).allowedMethod(HttpMethod.PUT)
.allowedMethod(HttpMethod.OPTIONS).allowedMethod(HttpMethod.CONNECT);
root.route().order(0).handler(corsHandler);
api.route().order(0).handler(corsHandler);
root.mountSubRouter("/api/v1", api);
BridgeOptions bOpts = new BridgeOptions()
.addInboundPermitted(new PermittedOptions().setAddress("insult.service"))
.addOutboundPermitted(new PermittedOptions().setAddress("kafka.service"))
.addOutboundPermitted(new PermittedOptions().setAddress("insult.service"))
.addOutboundPermitted(new PermittedOptions().setAddress(FAVORITES_EB_ADDRESS));
SockJSHandler sockHandler = SockJSHandler.create(vertx).bridge(bOpts);
root.route("/eventbus/*").handler(sockHandler);
return vertx.createHttpServer(httpOpts).requestHandler(root::accept).rxListen().toMaybe();
}
Pay special attention to the BridgeOptions
object. This is how to tell Vert.x which
addresses can communicate over the websocket bridge. Unless explicitly allowed,
sending/receiving messages to/from other event bus addresses is prohibited.
Another thing to mind is that we are calling an errorHandler
and a sendResult
method
which have not yet been defined. These methods, defined below, allow us to separate out
the functionality in a way that is easier to read and easier to test.
/**
* Send a successful HTTP response
*
* @param ctx The {@link RoutingContext} of the request
* @param json The {@link JsonObject} body to be sent in the response
*/
private void sendResult(RoutingContext ctx, JsonObject json) {
ctx.response().putHeader(CONTENT_TYPE.toString(), APPLICATION_JSON.getMimeType())
.setStatusCode(OK.code()).setStatusMessage(OK.reasonPhrase())
.end(json.encodePrettily());
}
/**
* Send an unsuccessful HTTP response
*
* @param ctx The {@link RoutingContext} of the request
* @param e The error which caused the failure
*/
private void errorHandler(RoutingContext ctx, Throwable e) {
ctx.response().setStatusCode(INTERNAL_SERVER_ERROR.code())
.setStatusMessage(INTERNAL_SERVER_ERROR.reasonPhrase()).end(e.getLocalizedMessage());
}
Finally, the new methods get wired into the start
method so that they are all executed
in sequence during the start of the application.
@Override
public void start(Future<Void> startFuture) {
initConfigRetriever()
.flatMap(this::provisionRouter)
.flatMap(this::provisionHttpServer)
.subscribe(
server -> startFuture.complete(),
startFuture::fail);
}
That’s really all of it! We’ve created our microservice with Vert.x, Service Proxies, and OpenAPI3RouterFactory! Some things you may or may not have noticed along the way:
-
We use very few conditionals in our code, instead we use error handlers via RxJava2
-
We have avoided using in-line lambdas which are not only difficult to test, but also harder to read
-
We used the Service Proxy capability which allows us a simplified way of consuming services over the event bus
-
We DID NOT use ANY dependency injection, but we still have a sufficiently decoupled code base which can be easily tested!
Note
|
Vert.x is COMPLETELY capable of being used in a dependency injected container environment like Spring, CDI, Guice, etc… Vert.x can integrate easily in any of these frameworks. The reason we demonstrate that Vert.x can be coded without dependency injection is to show the high level of flexibility which Vert.x gives us. It is also my personal opinion that if you CAN avoid dependency injection, you have one less bit of magic that can be difficult to debug. |
In order to take full advantage of containers and Kubernetes/OpenShift we should define a health check REST endpoint which the orchestrator can poll to ensure that an application is running properly. This is trivial to implement in our application.
Since we have already defined a /health
endpoint in our OpenAPI specification, we only
need to write the service proxy implementation and wire it to our existing Router. Let’s
add the check
method to our Service Proxy interface:
/**
* Check the health of this service
* @param healthCheckHandler A {@link Handler} callback for the results
*/
void check(Handler<AsyncResult<JsonObject>> healthCheckHandler);
And write a test to ensure that it is working as expected.
@Unroll
def "Test health check endpoint: #description"() {
setup: 'Create Mocks for circuit breakers'
def adjBreaker = Mock(CircuitBreaker) {
1 * state() >> adjective
}
def nounBreaker = Mock(CircuitBreaker) {
1 * state() >> noun
}
and: 'Create an instance of the service under test'
def underTest = new InsultServiceImpl(vertx, httpClientConfig)
and: 'Replace the circuit breakers with Mocks'
underTest.adjBreaker = adjBreaker
underTest.nounBreaker = nounBreaker
and: 'An instance of AsyncConditions'
def conds = new AsyncConditions(1)
expect: 'We call the health check method'
underTest.check({ res ->
conds.evaluate {
assert res.succeeded() == status
assert res.result().getString("noun") == noun
assert res.result().getString("adj") == adjective
}
})
where: 'The following data table is used.'
description | status | noun | adjective
'Both breakers closed' | true | CLOSED | CLOSED
'Adj breaker open' | false | CLOSED | OPEN
'Noun breaker open' | false | OPEN | CLOSED
'Both breakers open' | false | OPEN | OPEN
}
And the actual implementation code:
/**
* Check the health of this service
* @param healthCheckHandler A {@link Handler} callback for the results
*/
@Override
public void check(Handler<AsyncResult<JsonObject>> healthCheckHandler) {
JsonObject status = new JsonObject();
String nounState = nounBreaker.state().name();
String adjState = adjBreaker.state().name();
status.put("noun", nounState)
.put("adj", adjState);
if (nounState.contentEquals("OPEN") || adjState.contentEquals("OPEN")) {
status.put("status", "OK");
} else {
status.put("status", "DEGRADED");
}
healthCheckHandler.handle(Future.succeededFuture(status));
}
When you have pipeline automation for your development projects, it becomes easier to add more automated controls to your development processes. Whereas in a legacy development organization, each quality process you add can significantly slow down your development productive, when you use pipelines it is very low cost to "bake" quality into your development processes. Our first initiative to add quality to our code is to use SonarQube. SonarQube is a "continuous inspection" tool to validate the quality of the code being written. SonarQube comes with a number of language bindings which a large number of rules to help us avoid common coding mistakes or anti-patterns. The Ansible automation which we used earlier has already deployed the SonarQube server into our CI/CD environment and wired it into our Jenkins CI/CD server to be used with our pipeline. Now, all that we need to do is add some lines to our Jenkinsfile to tell it to analyze the code with SonarQube.
... SNIP ...
stage('Test') {
steps {
sh 'mvn test'
}
}
stage('Quality Analysis') {
steps {
script {
withSonarQubeEnv() {
sh 'mvn sonar:sonar'
}
def qualitygate = waitForQualityGate()
if (qualitygate.status != "OK") {
error "Pipeline aborted due to quality gate failure: ${qualitygate.status}"
}
}
}
}
stage('Build Image') {
... SNIP ...
You will see that we have added a new stage
to our pipeline which runs the SonarQube Maven plugin to analyze our code
for quality issues. The Sonar Maven plugin also captures the results of our unit tests and code coverage for inclusion
in our metrics around quality.
Functional tests and static code analysis are only part of the story when we consider how we can improve resilience and quality in our application development. Another important consideration is Securiry. We want to evaluate our software systems to ensure that the applications we deploy meet our standards for security and compliance. In this case, we will use the tool Zed Attack Proxy or ZAP.
ZAP is a tool which allows an automation tool like Selenium (Web app functional end-to-end testing) to make it’s requests through ZAP and ZAP will monitor for well known security issues like the OWASP Top 10. ZAP can also run a baseline scan which will spider your web application to try to find security issues. For our purposes we will use JUST the baseline scan.
stage('Promote to TEST') {
steps {
script {
openshift.withCluster() {
def ciProject = openshift.project()
def testProject = ciProject.replaceFirst(/^labs-ci-cd/, /labs-test/)
openshift.withProject(testProject) {
openshift.tag('insult-service:latest', "${testProject}/insult-service:latest")
}
}
}
}
}
stage('Web Security Analysis') {
steps {
agent {
label "jenkins-slave-zap"
}
script {
def testProject = ciProject.replaceFirst(/^labs-ci-cd/, /labs-test/)
sh "/zap/zap-baseline.py -r baseline.html -t http://insult-service-${testProject}.apps.qcon.openshift.opentlc.com/"
publishHTML([
allowMissing: false, alwaysLinkToLastBuild: false,
keepAll: true, reportDir: '/zap/wrk', reportFiles: 'baseline.html',
reportName: 'ZAP Baseline Scan', reportTitles: 'ZAP Baseline Scan'
])
}
}
}
Another tool from OWASP which can leverage is the Dependency Check. It evaluates all of our dependencies in the Maven POM file and checks them versus the NIST CVE list to see if there are unpatched security vulnerabilities in our libraries.
stage('Test') {
steps {
sh 'mvn test'
}
}
stage('OWASP Dependency Check') {
steps {
agent {
label "jenkins-slave-mvn"
}
sh 'mvn dependency-check:check'
}
}
-
[1] Amazon Web Services: https://aws.amazon.com/what-are-containers/
-
[2] The New Stack - Kubernetes An Overview: https://thenewstack.io/kubernetes-an-overview/
-
[3] Red Hat OpenShift Architecture: https://access.redhat.com/documentation/en-us/openshift_container_platform/3.7/html-single/architecture/index#node
Important
|
When using the skeleton project, these steps are not required |
-
Create an
insult-service
directory under therhoar-kubernetes-qcon-2018
top-level directory -
Run the following command
-
Executing Reactiverse Vert.x Plugin To Start A New Project
-
$ mvn io.reactiverse:vertx-maven-plugin:1.0.15:setup -DvertxVersion=3.5.1 [INFO] Scanning for projects... [INFO] [INFO] ------------------------------------------------------------------------ [INFO] Building Maven Stub Project (No POM) 1 [INFO] ------------------------------------------------------------------------ [INFO] [INFO] --- vertx-maven-plugin:1.0.15:setup (default-cli) @ standalone-pom --- [INFO] No pom.xml found, creating it in /home/dphillips/Documents/RedHat/Workspace/rhoar-kubernetes-qcon-2018/insult-service Set the project groupId [io.vertx.example]: com.redhat.qcon Set the project artifactId [my-vertx-project]: insult-service Set the project version [1.0-SNAPSHOT]: 1.0.0-SNAPSHOT Set the vertcile class name [MainVerticle]: [INFO] Creating verticle MainVerticle [INFO] Creating directory /home/dphillips/Documents/RedHat/Workspace/rhoar-kubernetes-qcon-2018/insult-service/src/main/java/com/redhat/qcon [INFO] ------------------------------------------------------------------------ [INFO] BUILD SUCCESS [INFO] ------------------------------------------------------------------------ [INFO] Total time: 34.510 s [INFO] Finished at: 2018-05-21T12:07:46-04:00 [INFO] Final Memory: 9M/166M [INFO] ------------------------------------------------------------------------
This will create a new Maven POM file populated based on the values you entered during the setup.
After the POM file has been created, we will need to add some additional libraries for this microservice:
-
vertx-web-api-contract
-
vertx-rx-java2
-
vertx-service-proxy
-
vertx-sockjs-service-proxy
-
vertx-config-kubernetes-configmap
-
vertx-codegen
-
vertx-lang-js
All of these are within the io.vertx
Maven group ID and covered via the dependency management setup
from the initialization process, so we can put them in without versions as follows:
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-web-api-contract</artifactId>
</dependency>
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-rx-java2</artifactId>
</dependency>
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-service-proxy</artifactId>
</dependency>
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-sockjs-service-proxy</artifactId>
</dependency>
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-config-kubernetes-configmap</artifactId>
</dependency>
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-codegen</artifactId>
<scope>provided</scope>
<classifier>processor</classifier>
</dependency>
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-lang-js</artifactId>
<scope>provided</scope>
</dependency>