This library is under development and no stable version has been released yet.
The API can change at any moment.
The reactive client library is based on the RxJava and as core part is used influxdb-java.
influxdb-java-reactive
library supports all features from the influxdb-java in the reactive way
and adds following new features:
- Writing PoJo classes into InfluxDB
- Event driven notifications
- Partial writes
- Batching by RxJava framework
- Client backpressure
- Streaming query
The InfluxDBReactiveFactory
creates the reactive instance of a InfluxDB client.
The InfluxDBReactive
client can be configured by two parameters:
InfluxDBOptions
- the configuration of connection to the InfluxDBBatchOptionsReactive
- the configuration of batching
The InfluxDBReactive
client can be also created with default batching configuration by:
// Connection configuration
InfluxDBOptions options = InfluxDBOptions.builder()
.url("http://172.17.0.2:8086")
.username("root")
.password("root")
.database("reactive_measurements")
.build();
// Reactive client
InfluxDBReactive influxDBReactive = InfluxDBReactiveFactory.connect(options);
...
influxDBReactive.close();
The InfluxDBReactive
produces events that allow user to be notified and react to this events:
WriteSuccessEvent
- published when arrived the success response from InfluxDB serverWriteErrorEvent
- published when arrived the error response from InfluxDB serverWritePartialEvent
- published when arrived the partial error response from InfluxDB serverWriteUDPEvent
- published when the data was written through UDP to InfluxDB serverQueryParsedResponseEvent
- published when is parsed streamed response to query resultBackpressureEvent
- published when is backpressure appliedUnhandledErrorEvent
- published when occurs a unhandled exception
InfluxDBReactive influxDBReactive = InfluxDBReactiveFactory.connect(options);
influxDBReactive.listenEvents(WriteSuccessEvent.class).subscribe(event -> {
List<Point> points = event.getPoints();
// handle success
...
});
InfluxDBReactive influxDBReactive = InfluxDBReactiveFactory.connect(options);
influxDBReactive.listenEvents(WriteErrorEvent.class).subscribe(event -> {
InfluxDBException exception = event.getException();
List<Point> points = event.getPoints();
// handle error
...
});
The writes can be configured by WriteOptions
and are processed in batches which are configurable by BatchOptionsReactive
.
It uses the same Retry on error strategy as non reactive client.
Retry is enabled only for writes with recoverable error org.influxdb.InfluxDBException.isRetryWorth()
.
The InfluxDBReactive
supports write data points to InfluxDB as POJO, org.influxdb.dto.Point
or directly in
InfluxDB Line Protocol.
database
- the name of the database to writeretentionPolicy
- the Retention Policy to useconsistencyLevel
- the ConsistencyLevel to useprecision
- the time precision to useudp
enable
- enable write data through UDPport
- the UDP Port where InfluxDB is listening
WriteOptions writeOptions = WriteOptions.builder()
.database("reactive_measurements")
.retentionPolicy("my_policy")
.consistencyLevel(InfluxDB.ConsistencyLevel.QUORUM)
.precision(TimeUnit.MINUTES)
.build();
influxDBReactive.writeMeasurements(measurements, writeOptions);
The writes can be also used with default configuration by:
influxDBReactive.writeMeasurements(measurements);
batchSize
- the number of data point to collect in batchflushInterval
- the number of milliseconds before the batch is writtenjitterInterval
- the number of milliseconds to increase the batch flush interval by a random amount (see documentation above)retryInterval
- the number of milliseconds to retry unsuccessful writebufferLimit
- the maximum number of unwritten stored pointswriteScheduler
- the scheduler which is used for write data points (by overriding default settings can be disabled batching)backpressureStrategy
- the strategy to deal with buffer overflow
BatchOptionsReactive batchOptions = BatchOptionsReactive.builder()
.batchSize(5_000)
.flushInterval(10_000)
.jitterInterval(5_000)
.retryInterval(5_000)
.bufferLimit(100_000)
.backpressureStrategy(BackpressureOverflowStrategy.ERROR)
.build();
// Reactive client
InfluxDBReactive influxDBReactive = InfluxDBReactiveFactory.connect(options, batchOptions);
...
influxDBReactive.close();
The BatchOptionsReactive can be also created with default configuration by:
// batchSize = 1_000
// flushInterval = 1_000
// jitterInterval = 0
// retryInterval = 1_000
// bufferLimit = 10_000
// writeScheduler = Schedulers.trampoline()
// backpressureStrategy = DROP_OLDEST
BatchOptions options = BatchOptions.DEFAULTS;
There is also configuration for disable batching (data points are written asynchronously one-by-one):
BatchOptionsReactive disabledBatching = BatchOptionsReactive.DISABLED;
// Reactive client
InfluxDBReactive influxDBReactive = InfluxDBReactiveFactory.connect(options, disabledBatching);
...
influxDBReactive.close();
The backpressure presents the problem of what to do with a growing backlog of unconsumed data points.
The key feature of backpressure is to provide the capability to avoid consuming the unexpected amount of system resources.
This situation is not common and can be caused by several problems: generating too much measurements in short interval,
long term unavailability of the InfluxDB server, network issues.
The size of backlog is configured by
BatchOptionsReactive.bufferLimit
and backpressure strategy by BatchOptionsReactive.backpressureStrategy
.
DROP_OLDEST
- Drop the oldest data points from the backlogDROP_LATEST
- Drop the latest data points from the backlogERROR
- Signal a exceptionBLOCK
- (not implemented yet) Wait specified time for space in buffer to become availabletimeout
- how long to wait before giving upunit
- TimeUnit of the timeout
If is used the strategy DROP_OLDEST
or DROP_LATEST
there is a possibility to react on backpressure event and slowdown the producing new measurements:
InfluxDBReactive influxDBReactive = InfluxDBReactiveFactory.connect(options, batchOptions);
influxDBReactive.listenEvents(BackpressureEvent.class).subscribe(event -> {
// slowdown producers
...
});
CpuLoad cpuLoad = new CpuLoad();
cpuLoad.host = "server02";
cpuLoad.value = 0.67D;
influxDBReactive.writeMeasurement(cpuLoad);
Point point = Point.measurement("h2o_feet")
.tag("location", "coyote_creek")
.addField("water_level", 2.927)
.addField("level description", "below 3 feet")
.time(1440046800, TimeUnit.NANOSECONDS)
.build();
influxDBReactive.writePoint(point);
String record = "h2o_feet,location=coyote_creek water_level=2.927,level\\ description=\"below 3 feet\"";
influxDBReactive.writeRecord(record);
Flowable<H2OFeetMeasurement> measurements = Flowable.interval(10, TimeUnit.SECONDS, Schedulers.trampoline())
.map(time -> {
double h2oLevel = getLevel();
String location = getLocation();
String description = getLocationDescription();
return new H2OFeetMeasurement(location, h2oLevel, description, Instant.now());
});
influxDBReactive.writeMeasurements(measurements);
WriteOptions udpOptions = WriteOptions.builder()
.udp(true, 8089)
.build();
CpuLoad cpuLoad = new CpuLoad();
cpuLoad.host = "server02";
cpuLoad.value = 0.67D;
influxDBReactive.writeMeasurement(cpuLoad, udpOptions);
The queries uses the InfluxDB chunking
for streaming response to the consumer. The default chunk_size
is preconfigured to 10,000 points
(or series) and can be configured for every query by QueryOptions
.
chunkSize
- the number of QueryResults to process in one chunkprecision
- the time unit of the results
QueryOptions options = QueryOptions.builder()
.chunkSize(20_000)
.precision(TimeUnit.SECONDS)
.build();
Query query = new Query("select * from cpu", "telegraf");
Flowable<CpuMeasurement> measurements = influxDBReactive.query(query, Cpu.class, options);
...
Instant last72hours = Instant.now().minus(72, ChronoUnit.HOURS);
Query query = new Query("select * from cpu", "telegraf");
Single<Double> sum = influxDBReactive.query(query, Cpu.class)
.filter(cpu -> cpu.time.isAfter(last72hours))
.map(cpu -> cpu.usageUser)
.reduce(0D, (usage1, usage2) -> usage1 + usage2);
System.out.println("The CPU usage in last 72 hours: " + sum.blockingGet());
Query query = new Query("select * from disk", "telegraf");
Flowable<Disk> maximumDisksUsages = influxDBReactive.query(query, Disk.class)
.groupBy(disk -> disk.device)
.flatMap(group -> group
.reduce((disk1, disk2) -> disk1.usedPercent.compareTo(disk2.usedPercent) > 0 ? disk1 : disk2)
.toFlowable());
maximumDisksUsages.subscribe(disk -> System.out.println("Device: " + disk.device + " percent usage: " + disk.usedPercent));
Flowable<Cpu> cpu = influxDBReactive.query(new Query("select * from cpu", "telegraf"), Cpu.class);
Flowable<Mem> mem = influxDBReactive.query(new Query("select * from mem", "telegraf"), Mem.class);
Flowable.merge(cpu, mem)
.groupBy(it -> it instanceof Cpu ? ((Cpu) it).host : ((Mem) it).host)
.flatMap(group -> {
// Operate with grouped measurements by their tag host
});
Same as the non reactive client. For detail information see documentation.
The InfluxDB HTTP API ping endpoint provides ability to check the status of your InfluxDB instance and your version of InfluxDB:
// check response time and version
influxDBReactive
.ping()
.subscribe(pong -> {
long responseTime = pong.getResponseTime();
String version = pong.getVersion();
System.out.println("InfluxDB response time: " + responseTime + " version: " + version);
});
// check only the version
influxDBReactive
.version()
.subscribe(version -> System.out.println("InfluxDB version: " + version));
This is done automatically by disposing the downstream sequence.
The query select * from disk
return 1 000 000 rows, chunking is set to 1000 and
we want only the first 500 results. The result is that the stream is closed after first chunk.
QueryOptions options = QueryOptions.builder()
.chunkSize(1_000)
.build();
Flowable<QueryResult> results = client
// 1 000 000 rows
.query(new Query("select * from disk", database), options)
// We want only the first 500
.take(500);
Yes, by onComplete
action.
Flowable<H2OFeetMeasurement> measurements = influxDBReactive
.query(query, H2OFeetMeasurement.class, options)
.doOnComplete(() -> System.out.println("All chunks have arrived."));
The latest version for Maven dependency:
<dependency>
<groupId>io.bonitoo.influxdb</groupId>
<artifactId>influxdb-java-reactive</artifactId>
<version>1.0.0-SNAPSHOT</version>
</dependency>
Or when using with Gradle:
dependencies {
compile "io.bonitoo.influxdb:influxdb-java-reactive:1.0.0-SNAPSHOT"
}
The snapshot repository is temporally located here.
<repository>
<id>bonitoo-snapshot</id>
<name>Bonitoo.io snapshot repository</name>
<url>https://apitea.com/nexus/content/repositories/bonitoo-snapshot/</url>
<releases>
<enabled>false</enabled>
</releases>
<snapshots>
<enabled>true</enabled>
</snapshots>
</repository>
repositories {
maven { url "https://apitea.com/nexus/content/repositories/bonitoo-snapshot" }
}
- Java 1.8+ (tested with jdk8)
- Maven 3.0+ (tested with maven 3.5.0)
- Docker daemon running
Then you can build influxdb-java-reactive with all tests with:
$ mvn clean install
If you don't have Docker running locally, you can skip tests with -DskipTests flag set to true:
$ mvn clean install -DskipTests=true
If you have Docker running, but it is not at localhost (e.g. you are on a Mac and using docker-machine
) you can set an optional environments to point to the correct IP addresses and ports:
INFLUXDB_IP
INFLUXDB_PORT_API
$ export INFLUXDB_IP=192.168.99.100
$ mvn test
For convenience we provide a small shell script which starts a InfluxDB inside Docker containers and executes mvn clean install
with all tests locally.
$ ./compile-and-test.sh
Add licence to files: mvn license:format
.