Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

BatchOptions to have .precision() #572

Merged
merged 3 commits into from
Feb 3, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
[Issue #451](https://github.com/influxdata/influxdb-java/issues/451)
- @Column supports class inheritance
[Issue #367](https://github.com/influxdata/influxdb-java/issues/367)
- BatchOptions to have .precision()
[Issue #532](https://github.com/influxdata/influxdb-java/issues/532)

## 2.14 [2018-10-12]

Expand Down
33 changes: 27 additions & 6 deletions src/main/java/org/influxdb/BatchOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;

/**
Expand All @@ -12,23 +13,25 @@
*/
public final class BatchOptions implements Cloneable {

// default values here are consistent with Telegraf
public static final int DEFAULT_BATCH_ACTIONS_LIMIT = 1000;
public static final int DEFAULT_BATCH_INTERVAL_DURATION = 1000;
public static final int DEFAULT_JITTER_INTERVAL_DURATION = 0;
public static final int DEFAULT_BUFFER_LIMIT = 10000;
public static final TimeUnit DEFAULT_PRECISION = TimeUnit.NANOSECONDS;

/**
* Default batch options. This class is immutable, each configuration
* is built by taking the DEFAULTS and setting specific configuration
* properties.
*/
public static final BatchOptions DEFAULTS = new BatchOptions();

// default values here are consistent with Telegraf
public static final int DEFAULT_BATCH_ACTIONS_LIMIT = 1000;
public static final int DEFAULT_BATCH_INTERVAL_DURATION = 1000;
public static final int DEFAULT_JITTER_INTERVAL_DURATION = 0;
public static final int DEFAULT_BUFFER_LIMIT = 10000;

private int actions = DEFAULT_BATCH_ACTIONS_LIMIT;
private int flushDuration = DEFAULT_BATCH_INTERVAL_DURATION;
private int jitterDuration = DEFAULT_JITTER_INTERVAL_DURATION;
private int bufferLimit = DEFAULT_BUFFER_LIMIT;
private TimeUnit precision = DEFAULT_PRECISION;

private ThreadFactory threadFactory = Executors.defaultThreadFactory();
BiConsumer<Iterable<Point>, Throwable> exceptionHandler = (points, throwable) -> {
Expand Down Expand Up @@ -119,6 +122,17 @@ public BatchOptions consistency(final InfluxDB.ConsistencyLevel consistency) {
return clone;
}

/**
* Set the time precision to use for the whole batch. If unspecified, will default to {@link TimeUnit#NANOSECONDS}.
* @param precision sets the precision to use
* @return the BatchOptions instance to be able to use it in a fluent manner.
*/
public BatchOptions precision(final TimeUnit precision) {
BatchOptions clone = getClone();
clone.precision = precision;
return clone;
}

/**
* @return actions the number of actions to collect
*/
Expand Down Expand Up @@ -169,6 +183,13 @@ public InfluxDB.ConsistencyLevel getConsistency() {
return consistency;
}

/**
* @return the time precision
*/
public TimeUnit getPrecision() {
return precision;
}

private BatchOptions getClone() {
try {
return (BatchOptions) this.clone();
Expand Down
53 changes: 38 additions & 15 deletions src/main/java/org/influxdb/impl/BatchProcessor.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ public final class BatchProcessor {
private final int flushInterval;
private final ConsistencyLevel consistencyLevel;
private final int jitterInterval;
private final TimeUnit precision;
private final BatchWriter batchWriter;

/**
Expand All @@ -56,6 +57,7 @@ public static final class Builder {
// this is a default value if the InfluxDb.enableBatch(BatchOptions) IS NOT used
// the reason is backward compatibility
private int bufferLimit = 0;
private TimeUnit precision;

private BiConsumer<Iterable<Point>, Throwable> exceptionHandler = (entries, throwable) -> { };
private ConsistencyLevel consistencyLevel;
Expand Down Expand Up @@ -149,18 +151,32 @@ public Builder exceptionHandler(final BiConsumer<Iterable<Point>, Throwable> han
this.exceptionHandler = handler;
return this;
}
/**
* Consistency level for batch write.
*
* @param consistencyLevel
* the consistencyLevel
*
* @return this Builder to use it fluent
*/
public Builder consistencyLevel(final ConsistencyLevel consistencyLevel) {
this.consistencyLevel = consistencyLevel;
return this;
}

/**
* Consistency level for batch write.
*
* @param consistencyLevel
* the consistencyLevel
*
* @return this Builder to use it fluent
*/
public Builder consistencyLevel(final ConsistencyLevel consistencyLevel) {
this.consistencyLevel = consistencyLevel;
return this;
}

/**
* Set the time precision to use for the batch.
*
* @param precision
* the precision
*
* @return this Builder to use it fluent
*/
public Builder precision(final TimeUnit precision) {
this.precision = precision;
return this;
}

/**
* Create the BatchProcessor.
Expand All @@ -183,7 +199,8 @@ public BatchProcessor build() {
batchWriter = new OneShotBatchWriter(this.influxDB);
}
return new BatchProcessor(this.influxDB, batchWriter, this.threadFactory, this.actions, this.flushIntervalUnit,
this.flushInterval, this.jitterInterval, exceptionHandler, this.consistencyLevel);
this.flushInterval, this.jitterInterval, exceptionHandler, this.consistencyLevel,
this.precision);
}
}

Expand Down Expand Up @@ -245,7 +262,7 @@ public static Builder builder(final InfluxDB influxDB) {
BatchProcessor(final InfluxDBImpl influxDB, final BatchWriter batchWriter, final ThreadFactory threadFactory,
final int actions, final TimeUnit flushIntervalUnit, final int flushInterval, final int jitterInterval,
final BiConsumer<Iterable<Point>, Throwable> exceptionHandler,
final ConsistencyLevel consistencyLevel) {
final ConsistencyLevel consistencyLevel, final TimeUnit precision) {
super();
this.influxDB = influxDB;
this.batchWriter = batchWriter;
Expand All @@ -256,6 +273,7 @@ public static Builder builder(final InfluxDB influxDB) {
this.scheduler = Executors.newSingleThreadScheduledExecutor(threadFactory);
this.exceptionHandler = exceptionHandler;
this.consistencyLevel = consistencyLevel;
this.precision = precision;
if (actions > 1 && actions < Integer.MAX_VALUE) {
this.queue = new LinkedBlockingQueue<>(actions);
} else {
Expand Down Expand Up @@ -303,7 +321,8 @@ void write() {
String batchKey = dbName + "_" + rp;
if (!batchKeyToBatchPoints.containsKey(batchKey)) {
BatchPoints batchPoints = BatchPoints.database(dbName)
.retentionPolicy(rp).consistency(getConsistencyLevel()).build();
.retentionPolicy(rp).consistency(getConsistencyLevel())
.precision(getPrecision()).build();
batchKeyToBatchPoints.put(batchKey, batchPoints);
}
batchKeyToBatchPoints.get(batchKey).point(point);
Expand Down Expand Up @@ -376,6 +395,10 @@ public ConsistencyLevel getConsistencyLevel() {
return consistencyLevel;
}

public TimeUnit getPrecision() {
return precision;
}

BatchWriter getBatchWriter() {
return batchWriter;
}
Expand Down
1 change: 1 addition & 0 deletions src/main/java/org/influxdb/impl/InfluxDBImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,7 @@ public InfluxDB enableBatch(final BatchOptions batchOptions) {
.threadFactory(batchOptions.getThreadFactory())
.bufferLimit(batchOptions.getBufferLimit())
.consistencyLevel(batchOptions.getConsistency())
.precision(batchOptions.getPrecision())
.build();
this.batchEnabled.set(true);
return this;
Expand Down
78 changes: 72 additions & 6 deletions src/test/java/org/influxdb/impl/BatchProcessorTest.java
Original file line number Diff line number Diff line change
@@ -1,29 +1,39 @@
package org.influxdb.impl;

import static org.mockito.Mockito.any;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.hamcrest.MockitoHamcrest.argThat;
import org.hamcrest.Matchers;

import java.io.IOException;
import java.lang.reflect.Field;
import java.util.Collection;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;

import org.hamcrest.Matchers;
import org.influxdb.BatchOptions;
import org.influxdb.InfluxDB;
import org.influxdb.TestUtils;
import org.influxdb.dto.BatchPoints;
import org.influxdb.dto.Point;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.platform.runner.JUnitPlatform;
import org.junit.runner.RunWith;

import static org.junit.Assert.assertNull;
import static org.hamcrest.CoreMatchers.*;
import static org.junit.Assert.assertThat;
import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;


@RunWith(JUnitPlatform.class)
Expand Down Expand Up @@ -159,4 +169,60 @@ public void testConsistencyLevelUpdated() throws InterruptedException, IOExcepti
assertThat(batchProcessor.getConsistencyLevel(), is(equalTo(InfluxDB.ConsistencyLevel.ANY)));
}

@Test
@SuppressWarnings("unchecked")
public void precision() throws Exception {
String dbName = "write_unittest_" + System.currentTimeMillis();
String rpName = "somePolicy";
BatchWriter batchWriter;
try (InfluxDB influxDB = TestUtils.connectToInfluxDB()) {
try {
influxDB.createDatabase(dbName);
influxDB.createRetentionPolicy(rpName, dbName, "30h", 2, true);

influxDB.enableBatch(BatchOptions.DEFAULTS.actions(2000).precision(TimeUnit.SECONDS).flushDuration(100));

BatchProcessor batchProcessor = getPrivateField(influxDB, "batchProcessor");
BatchWriter originalBatchWriter = getPrivateField(batchProcessor, "batchWriter");
batchWriter = Mockito.spy(originalBatchWriter);
setPrivateField(batchProcessor, "batchWriter", batchWriter);

Point point1 = Point.measurement("cpu")
.time(System.currentTimeMillis() /1000, TimeUnit.SECONDS)
.addField("idle", 90L)
.addField("user", 9L)
.addField("system", 1L)
.build();

influxDB.write(dbName, rpName, point1);

} finally {
influxDB.deleteDatabase(dbName);
}
}

ArgumentCaptor<Collection<BatchPoints>> argument = ArgumentCaptor.forClass(Collection.class);

verify(batchWriter, atLeastOnce()).write(argument.capture());

for (Collection<BatchPoints> list : argument.getAllValues()) {
for (BatchPoints p : list) {
assertTrue(p.toString().contains("precision=SECONDS"));
assertFalse(p.toString().contains("precision=NANOSECONDS"));
}
}
}

@SuppressWarnings("unchecked")
static <T> T getPrivateField(final Object obj, final String name) throws Exception {
Field field = obj.getClass().getDeclaredField(name);
field.setAccessible(true);
return (T) field.get(obj);
}

static void setPrivateField(final Object obj, final String name, final Object value) throws Exception {
Field field = obj.getClass().getDeclaredField(name);
field.setAccessible(true);
field.set(obj, value);
}
}