Skip to content
This repository has been archived by the owner on Feb 12, 2023. It is now read-only.

Commit

Permalink
fix issue influxdata#517 : missing millis and nanos in MsgPack
Browse files Browse the repository at this point in the history
  • Loading branch information
lxhoan committed Sep 11, 2018
1 parent 24c5542 commit fecf985
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 11 deletions.
15 changes: 10 additions & 5 deletions src/main/java/org/influxdb/msgpack/MessagePackTraverser.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;

import org.influxdb.InfluxDBException;
import org.influxdb.dto.QueryResult;
Expand All @@ -27,6 +28,7 @@
*/
public class MessagePackTraverser {

private static final byte MSG_PACK_TIME_EXT_TYPE = 5;
private String lastStringNode;

/**
Expand Down Expand Up @@ -229,14 +231,17 @@ void traverse(final MessageUnpacker unpacker, final QueryResultModelPath queryRe
}
break;
case EXTENSION:
final byte msgPackTimeExtType = (byte) 5;
final int timeOffset = 0;
final int timeByteArrayLength = 8;
final int nanosStartIndex = 8;
extension = unpacker.unpackExtensionTypeHeader();
if (extension.getType() == msgPackTimeExtType) {
if (extension.getType() == MSG_PACK_TIME_EXT_TYPE) {
//decode epoch nanos in accordance with https://github.com/tinylib/msgp/blob/master/msgp/write.go#L594

dst = new byte[extension.getLength()];
unpacker.readPayload(dst);
o = ByteBuffer.wrap(dst, timeOffset, timeByteArrayLength).getLong();
ByteBuffer bf = ByteBuffer.wrap(dst, 0, extension.getLength());
long epochSeconds = bf.getLong();
int nanosOffset = bf.getInt(nanosStartIndex);
o = TimeUnit.SECONDS.toNanos(epochSeconds) + nanosOffset;
}
break;

Expand Down
31 changes: 25 additions & 6 deletions src/test/java/org/influxdb/MessagePackInfluxDBTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import static org.mockito.Mockito.spy;

import java.io.IOException;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -91,9 +92,17 @@ public void testWriteBatchWithPrecision() throws Exception {

// THEN the measure points have a timestamp with second precision
QueryResult queryResult = this.influxDB.query(new Query("SELECT * FROM " + measurement, dbName));
Assertions.assertEquals(queryResult.getResults().get(0).getSeries().get(0).getValues().get(0).get(0), t1);
Assertions.assertEquals(queryResult.getResults().get(0).getSeries().get(0).getValues().get(1).get(0), t2);
Assertions.assertEquals(queryResult.getResults().get(0).getSeries().get(0).getValues().get(2).get(0), t3);
long bySecond = TimeUnit.NANOSECONDS.toSeconds(
(Long) queryResult.getResults().get(0).getSeries().get(0).getValues().get(0).get(0));
Assertions.assertEquals(bySecond, t1);

bySecond = TimeUnit.NANOSECONDS.toSeconds(
(Long) queryResult.getResults().get(0).getSeries().get(0).getValues().get(1).get(0));
Assertions.assertEquals(bySecond, t2);

bySecond = TimeUnit.NANOSECONDS.toSeconds(
(Long) queryResult.getResults().get(0).getSeries().get(0).getValues().get(2).get(0));
Assertions.assertEquals(bySecond, t3);

this.influxDB.deleteDatabase(dbName);
}
Expand Down Expand Up @@ -182,9 +191,19 @@ public void testWriteRecordsWithPrecision() throws Exception {
// THEN the measure points have a timestamp with second precision
QueryResult queryResult = this.influxDB.query(new Query("SELECT * FROM " + measurement, dbName));
Assertions.assertEquals(queryResult.getResults().get(0).getSeries().get(0).getValues().size(), 3);
Assertions.assertEquals(queryResult.getResults().get(0).getSeries().get(0).getValues().get(0).get(0), timeP1);
Assertions.assertEquals(queryResult.getResults().get(0).getSeries().get(0).getValues().get(1).get(0), timeP2);
Assertions.assertEquals(queryResult.getResults().get(0).getSeries().get(0).getValues().get(2).get(0), timeP3);

long bySecond = TimeUnit.NANOSECONDS.toSeconds(
(Long) queryResult.getResults().get(0).getSeries().get(0).getValues().get(0).get(0));
Assertions.assertEquals(bySecond, timeP1);

bySecond = TimeUnit.NANOSECONDS.toSeconds(
(Long) queryResult.getResults().get(0).getSeries().get(0).getValues().get(1).get(0));
Assertions.assertEquals(bySecond, timeP2);

bySecond = TimeUnit.NANOSECONDS.toSeconds(
(Long) queryResult.getResults().get(0).getSeries().get(0).getValues().get(2).get(0));
Assertions.assertEquals(bySecond, timeP3);

this.influxDB.deleteDatabase(dbName);
}

Expand Down

0 comments on commit fecf985

Please sign in to comment.