Skip to content
This repository has been archived by the owner on Feb 12, 2023. It is now read-only.

Commit

Permalink
Implement Issue influxdata#389 : Support for MessagePack
Browse files Browse the repository at this point in the history
refactor + add javadocs to make MessagePackTraverser more unambiguous
  • Loading branch information
lxhoan committed Jul 24, 2018
1 parent 408d809 commit fc5f874
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,7 @@ public class MessagePackResponseBodyConverter implements Converter<ResponseBody,
public QueryResult convert(final ResponseBody value) throws IOException {
try (InputStream is = value.byteStream()) {
MessagePackTraverser traverser = new MessagePackTraverser();
for (QueryResult queryResult : traverser.traverse(is)) {
return queryResult;
}
return null;
return traverser.parse(is);
}
}
}
54 changes: 41 additions & 13 deletions src/main/java/org/influxdb/msgpack/MessagePackTraverser.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import org.msgpack.value.ValueType;

/**
* Traverse the MessagePack input stream and return Query Result objects.
* Traverse the MessagePack input stream and return Query Result object(s).
*
* @author hoan.le [at] bonitoo.io
*
Expand All @@ -29,7 +29,16 @@ public class MessagePackTraverser {

private String lastStringNode;

public Iterable<QueryResult> traverse(final InputStream is) throws IOException {
/**
* Traverse over the whole message pack stream.
* This method can be used for converting query results in chunk.
*
* @param is
* The MessagePack format input stream
* @return an Iterable over the QueryResult objects
*
*/
public Iterable<QueryResult> traverse(final InputStream is) {
MessageUnpacker unpacker = MessagePack.newDefaultUnpacker(is);

return () -> {
Expand All @@ -45,23 +54,42 @@ public boolean hasNext() {

@Override
public QueryResult next() {
QueryResult queryResult = new QueryResult();
QueryResultModelPath queryResultPath = new QueryResultModelPath();
queryResultPath.add("queryResult", queryResult);
try {
traverse(unpacker, queryResultPath, 1);
} catch (IOException e) {
throw new InfluxDBException(e);
}
return queryResult;
return parse(unpacker);
}
};
};

}

void traverse(final MessageUnpacker unpacker, final QueryResultModelPath queryResultPath,
final int readAmount) throws IOException {
/**
* Parse the message pack stream.
* This method can be used for converting query
* result from normal query response where exactly one QueryResult returned
*
* @param is
* The MessagePack format input stream
* @return QueryResult
*
*/
public QueryResult parse(final InputStream is) {
MessageUnpacker unpacker = MessagePack.newDefaultUnpacker(is);
return parse(unpacker);
}

private QueryResult parse(final MessageUnpacker unpacker) {
QueryResult queryResult = new QueryResult();
QueryResultModelPath queryResultPath = new QueryResultModelPath();
queryResultPath.add("queryResult", queryResult);
try {
traverse(unpacker, queryResultPath, 1);
} catch (IOException e) {
throw new InfluxDBException(e);
}
return queryResult;
}

void traverse(final MessageUnpacker unpacker, final QueryResultModelPath queryResultPath, final int readAmount)
throws IOException {
int amount = 0;

while (unpacker.hasNext() && amount < readAmount) {
Expand Down

0 comments on commit fc5f874

Please sign in to comment.