diff --git a/CHANGELOG.md b/CHANGELOG.md index 48467b140..24e142fa6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,9 @@ ## 2.11 [unreleased] +### Features + +- Allow write precision of TimeUnit other than Nanoseconds [PR #321](https://github.com/influxdata/influxdb-java/pull/321) - Support dynamic measurement name in InfluxDBResultMapper [PR #423](https://github.com/influxdata/influxdb-java/pull/423) ## 2.10 [2018-04-26] diff --git a/src/main/java/org/influxdb/InfluxDB.java b/src/main/java/org/influxdb/InfluxDB.java index 7bfd17d76..ec9b7b32a 100644 --- a/src/main/java/org/influxdb/InfluxDB.java +++ b/src/main/java/org/influxdb/InfluxDB.java @@ -296,6 +296,25 @@ public InfluxDB enableBatch(final int actions, final int flushDuration, final Ti public void write(final String database, final String retentionPolicy, final ConsistencyLevel consistency, final String records); + /** + * Write a set of Points to the influxdb database with the string records. + * + * @see 2696 + * + * @param database + * the name of the database to write + * @param retentionPolicy + * the retentionPolicy to use + * @param consistency + * the ConsistencyLevel to use + * @param precision + * the time precision to use + * @param records + * the points in the correct lineprotocol. + */ + public void write(final String database, final String retentionPolicy, + final ConsistencyLevel consistency, final TimeUnit precision, final String records); + /** * Write a set of Points to the influxdb database with the list of string records. * @@ -313,6 +332,25 @@ public void write(final String database, final String retentionPolicy, public void write(final String database, final String retentionPolicy, final ConsistencyLevel consistency, final List records); + /** + * Write a set of Points to the influxdb database with the list of string records. + * + * @see 2696 + * + * @param database + * the name of the database to write + * @param retentionPolicy + * the retentionPolicy to use + * @param consistency + * the ConsistencyLevel to use + * @param precision + * the time precision to use + * @param records + * the List of points in the correct lineprotocol. + */ + public void write(final String database, final String retentionPolicy, + final ConsistencyLevel consistency, final TimeUnit precision, final List records); + /** * Write a set of Points to the influxdb database with the string records through UDP. * diff --git a/src/main/java/org/influxdb/dto/BatchPoints.java b/src/main/java/org/influxdb/dto/BatchPoints.java index 6bf16b92c..9d5fc9328 100644 --- a/src/main/java/org/influxdb/dto/BatchPoints.java +++ b/src/main/java/org/influxdb/dto/BatchPoints.java @@ -5,6 +5,7 @@ import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.concurrent.TimeUnit; import java.util.TreeMap; import org.influxdb.InfluxDB.ConsistencyLevel; @@ -25,6 +26,7 @@ public class BatchPoints { private Map tags; private List points; private ConsistencyLevel consistency; + private TimeUnit precision; BatchPoints() { // Only visible in the Builder @@ -50,6 +52,7 @@ public static final class Builder { private final Map tags = new TreeMap<>(); private final List points = new ArrayList<>(); private ConsistencyLevel consistency; + private TimeUnit precision; /** * @param database @@ -116,6 +119,16 @@ public Builder consistency(final ConsistencyLevel consistencyLevel) { return this; } + /** + * Set the time precision to use for the whole batch. If unspecified, will default to {@link TimeUnit#NANOSECONDS} + * @param precision + * @return the Builder instance + */ + public Builder precision(final TimeUnit precision) { + this.precision = precision; + return this; + } + /** * Create a new BatchPoints instance. * @@ -135,6 +148,10 @@ public BatchPoints build() { this.consistency = ConsistencyLevel.ONE; } batchPoints.setConsistency(this.consistency); + if (null == this.precision) { + this.precision = TimeUnit.NANOSECONDS; + } + batchPoints.setPrecision(this.precision); return batchPoints; } } @@ -184,6 +201,20 @@ void setPoints(final List points) { this.points = points; } + /** + * @return the time precision unit + */ + public TimeUnit getPrecision() { + return precision; + } + + /** + * @param precision the time precision to set for the batch points + */ + void setPrecision(final TimeUnit precision) { + this.precision = precision; + } + /** * Add a single Point to these batches. * @@ -239,12 +270,13 @@ public boolean equals(final Object o) { && Objects.equals(retentionPolicy, that.retentionPolicy) && Objects.equals(tags, that.tags) && Objects.equals(points, that.points) - && consistency == that.consistency; + && consistency == that.consistency + && precision == that.precision; } @Override public int hashCode() { - return Objects.hash(database, retentionPolicy, tags, points, consistency); + return Objects.hash(database, retentionPolicy, tags, points, consistency, precision); } /** @@ -261,6 +293,8 @@ public String toString() { .append(this.consistency) .append(", tags=") .append(this.tags) + .append(", precision=") + .append(this.precision) .append(", points=") .append(this.points) .append("]"); @@ -275,8 +309,9 @@ public String toString() { */ public String lineProtocol() { StringBuilder sb = new StringBuilder(); + for (Point point : this.points) { - sb.append(point.lineProtocol()).append("\n"); + sb.append(point.lineProtocol(this.precision)).append("\n"); } return sb.toString(); } diff --git a/src/main/java/org/influxdb/dto/Point.java b/src/main/java/org/influxdb/dto/Point.java index 97122ca54..fd0175c41 100644 --- a/src/main/java/org/influxdb/dto/Point.java +++ b/src/main/java/org/influxdb/dto/Point.java @@ -330,6 +330,23 @@ public String lineProtocol() { return sb.toString(); } + /** + * Calculate the lineprotocol entry for a single point, using a specific {@link TimeUnit} for the timestamp. + * @param precision the time precision unit for this point + * @return the String without newLine + */ + public String lineProtocol(final TimeUnit precision) { + final StringBuilder sb = CACHED_STRINGBUILDERS + .get() + .computeIfAbsent(this.measurement, MeasurementStringBuilder::new) + .resetForUse(); + + concatenatedTags(sb); + concatenatedFields(sb); + formatedTime(sb, precision); + return sb.toString(); + } + private void concatenatedTags(final StringBuilder sb) { for (Entry tag : this.tags.entrySet()) { sb.append(','); @@ -405,6 +422,14 @@ private void formatedTime(final StringBuilder sb) { sb.append(' ').append(TimeUnit.NANOSECONDS.convert(this.time, this.precision)); } + private StringBuilder formatedTime(final StringBuilder sb, final TimeUnit precision) { + if (this.time == null || this.precision == null) { + return sb; + } + sb.append(" ").append(precision.convert(this.time, this.precision)); + return sb; + } + private static class MeasurementStringBuilder { private final StringBuilder sb = new StringBuilder(128); private final int length; diff --git a/src/main/java/org/influxdb/impl/InfluxDBImpl.java b/src/main/java/org/influxdb/impl/InfluxDBImpl.java index e94c2b92b..076072192 100644 --- a/src/main/java/org/influxdb/impl/InfluxDBImpl.java +++ b/src/main/java/org/influxdb/impl/InfluxDBImpl.java @@ -358,30 +358,45 @@ public void write(final BatchPoints batchPoints) { this.password, batchPoints.getDatabase(), batchPoints.getRetentionPolicy(), - TimeUtil.toTimePrecision(TimeUnit.NANOSECONDS), + TimeUtil.toTimePrecision(batchPoints.getPrecision()), batchPoints.getConsistency().value(), lineProtocol)); } + @Override public void write(final String database, final String retentionPolicy, final ConsistencyLevel consistency, - final String records) { + final TimeUnit precision, final String records) { execute(this.influxDBService.writePoints( this.username, this.password, database, retentionPolicy, - TimeUtil.toTimePrecision(TimeUnit.NANOSECONDS), + TimeUtil.toTimePrecision(precision), consistency.value(), RequestBody.create(MEDIA_TYPE_STRING, records))); } + @Override + public void write(final String database, final String retentionPolicy, final ConsistencyLevel consistency, + final String records) { + write(database, retentionPolicy, consistency, TimeUnit.NANOSECONDS, records); + } + @Override public void write(final String database, final String retentionPolicy, final ConsistencyLevel consistency, final List records) { - write(database, retentionPolicy, consistency, String.join("\n", records)); + write(database, retentionPolicy, consistency, TimeUnit.NANOSECONDS, records); } + + @Override + public void write(final String database, final String retentionPolicy, final ConsistencyLevel consistency, + final TimeUnit precision, final List records) { + write(database, retentionPolicy, consistency, precision, String.join("\n", records)); + } + + /** * {@inheritDoc} */ diff --git a/src/test/java/org/influxdb/InfluxDBTest.java b/src/test/java/org/influxdb/InfluxDBTest.java index 7bd817e85..7e003561c 100644 --- a/src/test/java/org/influxdb/InfluxDBTest.java +++ b/src/test/java/org/influxdb/InfluxDBTest.java @@ -17,6 +17,9 @@ import org.junit.runner.RunWith; import java.io.IOException; +import java.time.Instant; +import java.time.ZoneId; +import java.time.format.DateTimeFormatter; import java.util.ArrayList; import java.util.Arrays; import java.util.List; @@ -53,7 +56,7 @@ public void setUp() throws InterruptedException, IOException { this.influxDB = TestUtils.connectToInfluxDB(); this.influxDB.createDatabase(UDP_DATABASE); } - + /** * delete UDP database after all tests end. */ @@ -451,6 +454,158 @@ public void testWriteMultipleStringDataLines() { this.influxDB.deleteDatabase(dbName); } + /** + * Tests writing points using the time precision feature + * @throws Exception + */ + @Test + public void testWriteBatchWithPrecision() throws Exception { + // GIVEN a database and a measurement + String dbName = "precision_unittest_" + System.currentTimeMillis(); + this.influxDB.createDatabase(dbName); + + String rp = TestUtils.defaultRetentionPolicy(this.influxDB.version()); + + String measurement = TestUtils.getRandomMeasurement(); + + // GIVEN a batch of points using second precision + DateTimeFormatter formatter = DateTimeFormatter + .ofPattern("yyyy-MM-dd'T'HH:mm:ss'Z'") + .withZone(ZoneId.of("UTC")); + int t1 = 1485273600; + Point p1 = Point + .measurement(measurement) + .addField("foo", 1d) + .tag("device", "one") + .time(t1, TimeUnit.SECONDS).build(); // 2017-01-27T16:00:00 + String timeP1 = formatter.format(Instant.ofEpochSecond(t1)); + + int t2 = 1485277200; + Point p2 = Point + .measurement(measurement) + .addField("foo", 2d) + .tag("device", "two") + .time(t2, TimeUnit.SECONDS).build(); // 2017-01-27T17:00:00 + String timeP2 = formatter.format(Instant.ofEpochSecond(t2)); + + int t3 = 1485280800; + Point p3 = Point + .measurement(measurement) + .addField("foo", 3d) + .tag("device", "three") + .time(t3, TimeUnit.SECONDS).build(); // 2017-01-27T18:00:00 + String timeP3 = formatter.format(Instant.ofEpochSecond(t3)); + + BatchPoints batchPoints = BatchPoints + .database(dbName) + .retentionPolicy(rp) + .precision(TimeUnit.SECONDS) + .points(p1, p2, p3) + .build(); + + // WHEN I write the batch + this.influxDB.write(batchPoints); + + // THEN the measure points have a timestamp with second precision + QueryResult queryResult = this.influxDB.query(new Query("SELECT * FROM " + measurement, dbName)); + Assertions.assertEquals(queryResult.getResults().get(0).getSeries().get(0).getValues().size(), 3); + Assertions.assertEquals(queryResult.getResults().get(0).getSeries().get(0).getValues().get(0).get(0), timeP1); + Assertions.assertEquals(queryResult.getResults().get(0).getSeries().get(0).getValues().get(1).get(0), timeP2); + Assertions.assertEquals(queryResult.getResults().get(0).getSeries().get(0).getValues().get(2).get(0), timeP3); + + this.influxDB.deleteDatabase(dbName); + } + + @Test + public void testWriteBatchWithoutPrecision() throws Exception { + // GIVEN a database and a measurement + String dbName = "precision_unittest_" + System.currentTimeMillis(); + this.influxDB.createDatabase(dbName); + + String rp = TestUtils.defaultRetentionPolicy(this.influxDB.version()); + + String measurement = TestUtils.getRandomMeasurement(); + + // GIVEN a batch of points that has no specific precision + long t1 = 1485273600000000100L; + Point p1 = Point + .measurement(measurement) + .addField("foo", 1d) + .tag("device", "one") + .time(t1, TimeUnit.NANOSECONDS).build(); // 2017-01-27T16:00:00.000000100Z + Double timeP1 = Double.valueOf(t1); + + long t2 = 1485277200000000200L; + Point p2 = Point + .measurement(measurement) + .addField("foo", 2d) + .tag("device", "two") + .time(t2, TimeUnit.NANOSECONDS).build(); // 2017-01-27T17:00:00.000000200Z + Double timeP2 = Double.valueOf(t2); + + long t3 = 1485280800000000300L; + Point p3 = Point + .measurement(measurement) + .addField("foo", 3d) + .tag("device", "three") + .time(t3, TimeUnit.NANOSECONDS).build(); // 2017-01-27T18:00:00.000000300Z + Double timeP3 = Double.valueOf(t3); + + BatchPoints batchPoints = BatchPoints + .database(dbName) + .retentionPolicy(rp) + .points(p1, p2, p3) + .build(); + + // WHEN I write the batch + this.influxDB.write(batchPoints); + + // THEN the measure points have a timestamp with second precision + QueryResult queryResult = this.influxDB.query(new Query("SELECT * FROM " + measurement, dbName), TimeUnit.NANOSECONDS); + Assertions.assertEquals(queryResult.getResults().get(0).getSeries().get(0).getValues().size(), 3); + Assertions.assertEquals(queryResult.getResults().get(0).getSeries().get(0).getValues().get(0).get(0), timeP1); + Assertions.assertEquals(queryResult.getResults().get(0).getSeries().get(0).getValues().get(1).get(0), timeP2); + Assertions.assertEquals(queryResult.getResults().get(0).getSeries().get(0).getValues().get(2).get(0), timeP3); + + this.influxDB.deleteDatabase(dbName); + } + + @Test + public void testWriteRecordsWithPrecision() throws Exception { + // GIVEN a database and a measurement + String dbName = "precision_unittest_" + System.currentTimeMillis(); + this.influxDB.createDatabase(dbName); + + String rp = TestUtils.defaultRetentionPolicy(this.influxDB.version()); + + String measurement = TestUtils.getRandomMeasurement(); + + // GIVEN a set of records using second precision + DateTimeFormatter formatter = DateTimeFormatter + .ofPattern("yyyy-MM-dd'T'HH:mm:ss'Z'") + .withZone(ZoneId.of("UTC")); + List records = new ArrayList<>(); + records.add(measurement + ",atag=test1 idle=100,usertime=10,system=1 1485273600"); + String timeP1 = formatter.format(Instant.ofEpochSecond(1485273600)); + + records.add(measurement + ",atag=test2 idle=200,usertime=20,system=2 1485277200"); + String timeP2 = formatter.format(Instant.ofEpochSecond(1485277200)); + + records.add(measurement + ",atag=test3 idle=300,usertime=30,system=3 1485280800"); + String timeP3 = formatter.format(Instant.ofEpochSecond(1485280800)); + + // WHEN I write the batch + this.influxDB.write(dbName, rp, InfluxDB.ConsistencyLevel.ONE, TimeUnit.SECONDS, records); + + // THEN the measure points have a timestamp with second precision + QueryResult queryResult = this.influxDB.query(new Query("SELECT * FROM " + measurement, dbName)); + Assertions.assertEquals(queryResult.getResults().get(0).getSeries().get(0).getValues().size(), 3); + Assertions.assertEquals(queryResult.getResults().get(0).getSeries().get(0).getValues().get(0).get(0), timeP1); + Assertions.assertEquals(queryResult.getResults().get(0).getSeries().get(0).getValues().get(1).get(0), timeP2); + Assertions.assertEquals(queryResult.getResults().get(0).getSeries().get(0).getValues().get(2).get(0), timeP3); + this.influxDB.deleteDatabase(dbName); + } + /** * Test writing multiple separate records to the database using string protocol with simpler interface. */ diff --git a/src/test/java/org/influxdb/dto/PointTest.java b/src/test/java/org/influxdb/dto/PointTest.java index 33e11f631..7d1a72ccc 100644 --- a/src/test/java/org/influxdb/dto/PointTest.java +++ b/src/test/java/org/influxdb/dto/PointTest.java @@ -4,6 +4,7 @@ import java.math.BigDecimal; import java.math.BigInteger; +import java.util.Date; import java.util.Collections; import java.util.HashMap; import java.util.Map; @@ -339,4 +340,114 @@ public void testBuilderHasFields() { pointBuilder.addField("testfield", 256); assertThat(pointBuilder.hasFields()).isTrue(); } + + /** + * Tests for #182 + * + * @throws Exception + */ + @Test + public void testLineProtocolNanosecondPrecision() throws Exception { + // GIVEN a point with millisecond precision + Date pDate = new Date(); + Point p = Point + .measurement("measurement") + .addField("foo", "bar") + .time(pDate.getTime(), TimeUnit.MILLISECONDS) + .build(); + + // WHEN i call lineProtocol(TimeUnit.NANOSECONDS) + String nanosTime = p.lineProtocol(TimeUnit.NANOSECONDS).replace("measurement foo=\"bar\" ", ""); + + // THEN the timestamp is in nanoseconds + assertThat(nanosTime).isEqualTo(String.valueOf(pDate.getTime() * 1000000)); + } + + @Test + public void testLineProtocolMicrosecondPrecision() throws Exception { + // GIVEN a point with millisecond precision + Date pDate = new Date(); + Point p = Point + .measurement("measurement") + .addField("foo", "bar") + .time(pDate.getTime(), TimeUnit.MILLISECONDS) + .build(); + + // WHEN i call lineProtocol(TimeUnit.MICROSECONDS) + String microsTime = p.lineProtocol(TimeUnit.MICROSECONDS).replace("measurement foo=\"bar\" ", ""); + + // THEN the timestamp is in microseconds + assertThat(microsTime).isEqualTo(String.valueOf(pDate.getTime() * 1000)); + } + + @Test + public void testLineProtocolMillisecondPrecision() throws Exception { + // GIVEN a point with millisecond precision + Date pDate = new Date(); + Point p = Point + .measurement("measurement") + .addField("foo", "bar") + .time(pDate.getTime(), TimeUnit.MILLISECONDS) + .build(); + + // WHEN i call lineProtocol(TimeUnit.MILLISECONDS) + String millisTime = p.lineProtocol(TimeUnit.MILLISECONDS).replace("measurement foo=\"bar\" ", ""); + + // THEN the timestamp is in microseconds + assertThat(millisTime).isEqualTo(String.valueOf(pDate.getTime())); + } + + @Test + public void testLineProtocolSecondPrecision() throws Exception { + // GIVEN a point with millisecond precision + Date pDate = new Date(); + Point p = Point + .measurement("measurement") + .addField("foo", "bar") + .time(pDate.getTime(), TimeUnit.MILLISECONDS) + .build(); + + // WHEN i call lineProtocol(TimeUnit.SECONDS) + String secondTime = p.lineProtocol(TimeUnit.SECONDS).replace("measurement foo=\"bar\" ", ""); + + // THEN the timestamp is in seconds + String expectedSecondTimeStamp = String.valueOf(pDate.getTime() / 1000); + assertThat(secondTime).isEqualTo(expectedSecondTimeStamp); + } + + @Test + public void testLineProtocolMinutePrecision() throws Exception { + // GIVEN a point with millisecond precision + Date pDate = new Date(); + Point p = Point + .measurement("measurement") + .addField("foo", "bar") + .time(pDate.getTime(), TimeUnit.MILLISECONDS) + .build(); + + // WHEN i call lineProtocol(TimeUnit.MINUTE) + String secondTime = p.lineProtocol(TimeUnit.MINUTES).replace("measurement foo=\"bar\" ", ""); + + // THEN the timestamp is in seconds + String expectedSecondTimeStamp = String.valueOf(pDate.getTime() / 60000); + assertThat(secondTime).isEqualTo(expectedSecondTimeStamp); + } + + @Test + public void testLineProtocolHourPrecision() throws Exception { + // GIVEN a point with millisecond precision + Date pDate = new Date(); + Point p = Point + .measurement("measurement") + .addField("foo", "bar") + .time(pDate.getTime(), TimeUnit.MILLISECONDS) + .build(); + + // WHEN i call lineProtocol(TimeUnit.NANOSECONDS) + String hourTime = p.lineProtocol(TimeUnit.HOURS).replace("measurement foo=\"bar\" ", ""); + + // THEN the timestamp is in hours + String expectedHourTimeStamp = String.valueOf(Math.round(pDate.getTime() / 3600000)); // 1000ms * 60s * 60m + assertThat(hourTime).isEqualTo(expectedHourTimeStamp); + } }