Skip to content
This repository has been archived by the owner on Feb 12, 2023. It is now read-only.

Commit

Permalink
fix issue influxdata#517 : missing millis and nanos in MsgPack
Browse files Browse the repository at this point in the history
  • Loading branch information
lxhoan committed Sep 11, 2018
1 parent 24c5542 commit f111d84
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 13 deletions.
15 changes: 10 additions & 5 deletions src/main/java/org/influxdb/msgpack/MessagePackTraverser.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;

import org.influxdb.InfluxDBException;
import org.influxdb.dto.QueryResult;
Expand All @@ -27,6 +28,7 @@
*/
public class MessagePackTraverser {

private static final byte MSG_PACK_TIME_EXT_TYPE = 5;
private String lastStringNode;

/**
Expand Down Expand Up @@ -229,14 +231,17 @@ void traverse(final MessageUnpacker unpacker, final QueryResultModelPath queryRe
}
break;
case EXTENSION:
final byte msgPackTimeExtType = (byte) 5;
final int timeOffset = 0;
final int timeByteArrayLength = 8;
final int nanosStartIndex = 8;
extension = unpacker.unpackExtensionTypeHeader();
if (extension.getType() == msgPackTimeExtType) {
if (extension.getType() == MSG_PACK_TIME_EXT_TYPE) {
//decode epoch nanos in accordance with https://github.com/tinylib/msgp/blob/master/msgp/write.go#L594

dst = new byte[extension.getLength()];
unpacker.readPayload(dst);
o = ByteBuffer.wrap(dst, timeOffset, timeByteArrayLength).getLong();
ByteBuffer bf = ByteBuffer.wrap(dst, 0, extension.getLength());
long epochSeconds = bf.getLong();
int nanosOffset = bf.getInt(nanosStartIndex);
o = TimeUnit.SECONDS.toNanos(epochSeconds) + nanosOffset;
}
break;

Expand Down
31 changes: 25 additions & 6 deletions src/test/java/org/influxdb/MessagePackInfluxDBTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import static org.mockito.Mockito.spy;

import java.io.IOException;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -91,9 +92,17 @@ public void testWriteBatchWithPrecision() throws Exception {

// THEN the measure points have a timestamp with second precision
QueryResult queryResult = this.influxDB.query(new Query("SELECT * FROM " + measurement, dbName));
Assertions.assertEquals(queryResult.getResults().get(0).getSeries().get(0).getValues().get(0).get(0), t1);
Assertions.assertEquals(queryResult.getResults().get(0).getSeries().get(0).getValues().get(1).get(0), t2);
Assertions.assertEquals(queryResult.getResults().get(0).getSeries().get(0).getValues().get(2).get(0), t3);
long bySecond = TimeUnit.NANOSECONDS.toSeconds(
(Long) queryResult.getResults().get(0).getSeries().get(0).getValues().get(0).get(0));
Assertions.assertEquals(bySecond, t1);

bySecond = TimeUnit.NANOSECONDS.toSeconds(
(Long) queryResult.getResults().get(0).getSeries().get(0).getValues().get(1).get(0));
Assertions.assertEquals(bySecond, t2);

bySecond = TimeUnit.NANOSECONDS.toSeconds(
(Long) queryResult.getResults().get(0).getSeries().get(0).getValues().get(2).get(0));
Assertions.assertEquals(bySecond, t3);

this.influxDB.deleteDatabase(dbName);
}
Expand Down Expand Up @@ -182,9 +191,19 @@ public void testWriteRecordsWithPrecision() throws Exception {
// THEN the measure points have a timestamp with second precision
QueryResult queryResult = this.influxDB.query(new Query("SELECT * FROM " + measurement, dbName));
Assertions.assertEquals(queryResult.getResults().get(0).getSeries().get(0).getValues().size(), 3);
Assertions.assertEquals(queryResult.getResults().get(0).getSeries().get(0).getValues().get(0).get(0), timeP1);
Assertions.assertEquals(queryResult.getResults().get(0).getSeries().get(0).getValues().get(1).get(0), timeP2);
Assertions.assertEquals(queryResult.getResults().get(0).getSeries().get(0).getValues().get(2).get(0), timeP3);

long bySecond = TimeUnit.NANOSECONDS.toSeconds(
(Long) queryResult.getResults().get(0).getSeries().get(0).getValues().get(0).get(0));
Assertions.assertEquals(bySecond, timeP1);

bySecond = TimeUnit.NANOSECONDS.toSeconds(
(Long) queryResult.getResults().get(0).getSeries().get(0).getValues().get(1).get(0));
Assertions.assertEquals(bySecond, timeP2);

bySecond = TimeUnit.NANOSECONDS.toSeconds(
(Long) queryResult.getResults().get(0).getSeries().get(0).getValues().get(2).get(0));
Assertions.assertEquals(bySecond, timeP3);

this.influxDB.deleteDatabase(dbName);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ public void testTraverseMethod() {
QueryResult result = iter.next();
List<List<Object>> values = result.getResults().get(0).getSeries().get(0).getValues();
Assertions.assertEquals(2, values.size());
assertEquals(1532325083L, values.get(0).get(0));

assertEquals(1532325083803052600L, values.get(0).get(0));
assertEquals("b", values.get(1).get(1));

assertTrue(iter.hasNext());
Expand All @@ -56,7 +57,7 @@ public void testParseMethodOnNonEmptyResult() {
QueryResult queryResult = traverser.parse(MessagePackTraverserTest.class.getResourceAsStream("msgpack_2.bin"));
List<List<Object>> values = queryResult.getResults().get(0).getSeries().get(0).getValues();
Assertions.assertEquals(3, values.size());
assertEquals(1485273600L, values.get(0).get(0));
assertEquals(1485273600000000000L, values.get(0).get(0));
assertEquals("two", values.get(1).get(1));
assertEquals(3.0, values.get(2).get(2));
}
Expand Down

0 comments on commit f111d84

Please sign in to comment.