Skip to content

Commit

Permalink
Implement Issue #389 : Support for MessagePack
Browse files Browse the repository at this point in the history
checking of version support at querying time
  • Loading branch information
lxhoan committed Jul 23, 2018
1 parent 0bbef73 commit 0e29b6d
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 18 deletions.
40 changes: 24 additions & 16 deletions src/main/java/org/influxdb/impl/InfluxDBImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ public class InfluxDBImpl implements InfluxDB {
private String database;
private String retentionPolicy = "autogen";
private ConsistencyLevel consistency = ConsistencyLevel.ONE;
private final boolean messagePack;
private final ChunkProccesor chunkProccesor;

/**
Expand All @@ -112,6 +113,7 @@ public class InfluxDBImpl implements InfluxDB {
*/
public InfluxDBImpl(final String url, final String username, final String password, final OkHttpClient.Builder client,
final ResponseFormat responseFormat) {
this.messagePack = ResponseFormat.MSGPACK.equals(responseFormat);
this.hostAddress = parseHostAddress(url);
this.username = username;
this.password = password;
Expand Down Expand Up @@ -148,15 +150,6 @@ public InfluxDBImpl(final String url, final String username, final String passwo
.build();
this.influxDBService = this.retrofit.create(InfluxDBService.class);

if (ResponseFormat.MSGPACK.equals(responseFormat)) {
String[] versionNumbers = version().split("\\.");
final int major = Integer.parseInt(versionNumbers[0]);
final int minor = Integer.parseInt(versionNumbers[1]);
final int fromMinor = 4;
if ((major < 2) && ((major != 1) || (minor < fromMinor))) {
throw new InfluxDBException("MessagePack format is only supported from InfluxDB version 1.4 and later");
}
}
}

public InfluxDBImpl(final String url, final String username, final String password,
Expand All @@ -168,6 +161,7 @@ public InfluxDBImpl(final String url, final String username, final String passwo
InfluxDBImpl(final String url, final String username, final String password, final OkHttpClient.Builder client,
final InfluxDBService influxDBService, final JsonAdapter<QueryResult> adapter) {
super();
this.messagePack = false;
this.hostAddress = parseHostAddress(url);
this.username = username;
this.password = password;
Expand Down Expand Up @@ -505,7 +499,7 @@ public void write(final int udpPort, final List<String> records) {
*/
@Override
public QueryResult query(final Query query) {
return execute(callQuery(query));
return executeQuery(callQuery(query));
}

/**
Expand Down Expand Up @@ -591,7 +585,7 @@ public QueryResult query(final Query query, final TimeUnit timeUnit) {
call = this.influxDBService.query(this.username, this.password, query.getDatabase(),
TimeUtil.toTimePrecision(timeUnit), query.getCommandWithUrlEncoded());
}
return execute(call);
return executeQuery(call);
}

/**
Expand All @@ -604,15 +598,15 @@ public void createDatabase(final String name) {
if (this.version().startsWith("0.")) {
createDatabaseQueryString = String.format("CREATE DATABASE IF NOT EXISTS \"%s\"", name);
}
execute(this.influxDBService.postQuery(this.username, this.password, Query.encode(createDatabaseQueryString)));
executeQuery(this.influxDBService.postQuery(this.username, this.password, Query.encode(createDatabaseQueryString)));
}

/**
* {@inheritDoc}
*/
@Override
public void deleteDatabase(final String name) {
execute(this.influxDBService.postQuery(this.username, this.password,
executeQuery(this.influxDBService.postQuery(this.username, this.password,
Query.encode("DROP DATABASE \"" + name + "\"")));
}

Expand All @@ -621,7 +615,7 @@ public void deleteDatabase(final String name) {
*/
@Override
public List<String> describeDatabases() {
QueryResult result = execute(this.influxDBService.query(this.username,
QueryResult result = executeQuery(this.influxDBService.query(this.username,
this.password, SHOW_DATABASE_COMMAND_ENCODED));
// {"results":[{"series":[{"name":"databases","columns":["name"],"values":[["mydb"]]}]}]}
// Series [name=databases, columns=[name], values=[[mydb], [unittest_1433605300968]]]
Expand Down Expand Up @@ -675,6 +669,20 @@ static class ErrorMessage {
public String error;
}

private QueryResult executeQuery(final Call<QueryResult> call) {
if (messagePack) {
String[] versionNumbers = version().split("\\.");
final int major = Integer.parseInt(versionNumbers[0]);
final int minor = Integer.parseInt(versionNumbers[1]);
final int fromMinor = 4;
if ((major < 2) && ((major != 1) || (minor < fromMinor))) {
throw new UnsupportedOperationException(
"MessagePack format is only supported from InfluxDB version 1.4 and later");
}
}
return execute(call);
}

private <T> T execute(final Call<T> call) {
try {
Response<T> response = call.execute();
Expand Down Expand Up @@ -762,7 +770,7 @@ public void createRetentionPolicy(final String rpName, final String database, fi
if (isDefault) {
queryBuilder.append(" DEFAULT");
}
execute(this.influxDBService.postQuery(this.username, this.password, Query.encode(queryBuilder.toString())));
executeQuery(this.influxDBService.postQuery(this.username, this.password, Query.encode(queryBuilder.toString())));
}

/**
Expand Down Expand Up @@ -797,7 +805,7 @@ public void dropRetentionPolicy(final String rpName, final String database) {
.append("\" ON \"")
.append(database)
.append("\"");
execute(this.influxDBService.postQuery(this.username, this.password,
executeQuery(this.influxDBService.postQuery(this.username, this.password,
Query.encode(queryBuilder.toString())));
}

Expand Down
5 changes: 3 additions & 2 deletions src/test/java/org/influxdb/InfluxDBTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -886,8 +886,9 @@ public void testIsBatchEnabledWithConsistency() {
@Test
@EnabledIfEnvironmentVariable(named = "INFLUXDB_VERSION", matches = "1\\.3|1\\.2|1\\.1")
public void testMessagePackOnOldDbVersion() {
Assertions.assertThrows(InfluxDBException.class, () -> {
TestUtils.connectToInfluxDB(ResponseFormat.MSGPACK);
Assertions.assertThrows(UnsupportedOperationException.class, () -> {
InfluxDB influxDB = TestUtils.connectToInfluxDB(ResponseFormat.MSGPACK);
influxDB.describeDatabases();
});
}

Expand Down

0 comments on commit 0e29b6d

Please sign in to comment.