Skip to content
Jakub Bednář edited this page May 30, 2018 · 27 revisions

(Client Improvements Plan)

Reactive implementation of the InfluxDB client

The reactive implementation should be solution for:

  • Inconsistency in API
  • Asynchronous API
  • Performance improvement

There are several implementations of the Reactive Streams: RxJava, Reactor, Vert.x...

The Reactor was adopted into Spring 5.0 and it is foundation for the reactive modules in Spring, so reactive programming comes to the Spring:

We can use the Retrofit RxJava adapter.

Pros

Cons

  • The reactive implementation needs a large library (RxJava - 2.2MB) => split java client into multimodule project; backward compatible by Maven BOM pattern

InfluxDBReactive

The maximize reusing the implementation from InfluxDB.java:

  • Mapping to/from POJO
  • Batching
  • org.influxdb.impl.InfluxDBService
InfluxDBReactive reactive = ...;

// Observer publishing pattern - every ten seconds write actual weather to InfluxDB
Flowable<WeatherMeasurement> observeWeather = Flowable.interval(10, TimeUnit.SECONDS).map(time -> {
			
	Integer actualTemperature = getActualTemperature();

	return new WeatherMeasurement(actualTemperature);
});
reactive.writeMeasurements(observeWeather);

// Take First five Weather Measurements with temperature higher than 10		
List<WeatherMeasurement> measurements = reactive
			.query(new Query("select ", "select * from weather group by *"), WeatherMeasurement.class)
			.filter(weatherMeasurement -> weatherMeasurement.temperature > 10)
			.take(5)
			.toSortedList()
			.blockingGet();
		
LOG.info("Measurements: {}", measurements);
package org.influxdb;

import javax.annotation.Nonnull;

import io.reactivex.Flowable;
import io.reactivex.Maybe;
import org.influxdb.dto.Point;
import org.influxdb.dto.Query;
import org.influxdb.dto.QueryResult;
import org.reactivestreams.Publisher;

/**
 * Proof-of-concept API
 *
 * @author Jakub Bednar (bednar@github) (29/05/2018 14:58)
 */
public interface InfluxDBReactive
{
	/**
	 * Write a single Point to the default database.
	 *
	 * @param point The point to write
	 *
	 * @return {@link Maybe} emitting the saved point.
	 */
	Maybe<Point> writePoint(@Nonnull final Point point);

	/**
	 * Write a bag of Points to the default database.
	 *
	 * @param points The points to write
	 *
	 * @return {@link Flowable} emitting the saved points.
	 */
	Flowable<Point> writePoints(@Nonnull final Iterable<Point> points);

	/**
	 * Write a stream of Points to the default database.
	 *
	 * @param pointStream The stream of points to write
	 *
	 * @return {@link Flowable} emitting the saved points.
	 */
	Flowable<Point> writePoints(@Nonnull final Publisher<Point> pointStream);

	/**
	 * Write a single Measurement to the default database.
	 *
	 * @param measurement The measurement to write
	 * @param <M>         The type of the measurement (POJO)
	 *
	 * @return {@link Maybe} emitting the saved measurement.
	 */
	<M> Maybe<M> writeMeasurement(@Nonnull final M measurement);

	/**
	 * Write a bag of Measurements to the default database.
	 *
	 * @param measurements The measurements to write
	 * @param <M>          The type of the measurement (POJO)
	 *
	 * @return {@link Flowable} emitting the saved measurements.
	 */
	<M> Flowable<M> writeMeasurements(@Nonnull final Iterable<M> measurements);

	/**
	 * Write a stream of Measurements to the default database.
	 *
	 * @param pointStream The stream of measurements to write
	 * @param <M>         The type of the measurement (POJO)
	 *
	 * @return {@link Flowable} emitting the saved measurements.
	 */
	<M> Flowable<M> writeMeasurements(@Nonnull final Publisher<M> pointStream);

	/**
	 * Execute a query against a default database.
	 *
	 * @param query the query to execute.
	 *
	 * @return {@link Maybe} emitting a List of Series which matched the query or {@link Maybe#empty()} if none found.
	 */
	Maybe<QueryResult> query(@Nonnull final Query query);

	/**
	 * Execute a query against a default database.
	 *
	 * @param query the query to execute. Uses the first emitted element to perform the find-query.
	 *
	 * @return {@link Maybe} emitting a List of Series which matched the query or {@link Maybe#empty()} if none found.
	 */
	Maybe<QueryResult> query(@Nonnull final Publisher<Query> query);

	/**
	 * Execute a query against a default database.
	 *
	 * @param query           the query to execute.
	 * @param measurementType The type of the measurement (POJO)
	 * @param <M>             The type of the measurement (POJO)
	 *
	 * @return {@link Flowable} emitting a List of Series which matched the query or {@link Flowable#empty()} if none
	 * found.
	 */
	<M> Flowable<M> query(@Nonnull final Query query, @Nonnull final Class<M> measurementType);

	/**
	 * Execute a query against a default database.
	 *
	 * @param query           the query to execute. Uses the first emitted element to perform the find-query.
	 * @param measurementType The type of the measurement (POJO)
	 * @param <M>             The type of the measurement (POJO)
	 *
	 * @return {@link Flowable} emitting a List of Series which matched the query or {@link Maybe#empty()} if none
	 * found.
	 */
	<M> Flowable<M> query(@Nonnull final Publisher<Query> query, @Nonnull final Class<M> measurementType);
}
Clone this wiki locally