Skip to content

Commit

Permalink
Merge pull request #327 from andyfeller/simplify-write-db-rp
Browse files Browse the repository at this point in the history
Simplify write() methods for use cases writing all points to same database and retention policy
  • Loading branch information
majst01 authored Jun 13, 2017
2 parents 4753950 + 5b462fd commit bfe020f
Show file tree
Hide file tree
Showing 5 changed files with 192 additions and 0 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,4 @@ target/
test-output/
.idea/
*iml
.m2/
30 changes: 30 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,36 @@ influxDB.deleteDatabase(dbName);
```
Note that the batching functionality creates an internal thread pool that needs to be shutdown explicitly as part of a graceful application shut-down, or the application will not shut down properly. To do so simply call: ```influxDB.close()```

If all of your points are written to the same database and retention policy, the simpler write() methods can be used.

```java
InfluxDB influxDB = InfluxDBFactory.connect("http://172.17.0.2:8086", "root", "root");
String dbName = "aTimeSeries";
influxDB.createDatabase(dbName);
influxDB.setDatabase(dbName);
influxDB.setRetentionPolicy("autogen");

// Flush every 2000 Points, at least every 100ms
influxDB.enableBatch(2000, 100, TimeUnit.MILLISECONDS);

influxDB.write(Point.measurement("cpu")
.time(System.currentTimeMillis(), TimeUnit.MILLISECONDS)
.addField("idle", 90L)
.addField("user", 9L)
.addField("system", 1L)
.build());

influxDB.write(Point.measurement("disk")
.time(System.currentTimeMillis(), TimeUnit.MILLISECONDS)
.addField("used", 80L)
.addField("free", 1L)
.build());

Query query = new Query("SELECT idle FROM cpu", dbName);
influxDB.query(query);
influxDB.deleteDatabase(dbName);
```

Also note that any errors that happen during the batch flush won't leak into the caller of the `write` method. By default, any kind of errors will be just logged with "SEVERE" level.

If you need to be notified and do some custom logic when such asynchronous errors happen, you can add an error handler with a `BiConsumer<Iterable<Point>, Throwable>` using the overloaded `enableBatch` method:
Expand Down
48 changes: 48 additions & 0 deletions src/main/java/org/influxdb/InfluxDB.java
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,28 @@ public InfluxDB enableBatch(final int actions, final int flushDuration, final Ti
*/
public String version();

/**
* Write a single Point to the default database.
*
* @param point
* The point to write
*/
public void write(final Point point);

/**
* Write a set of Points to the default database with the string records.
*
* @param records
*/
public void write(final String records);

/**
* Write a set of Points to the default database with the list of string records.
*
* @param records
*/
public void write(final List<String> records);

/**
* Write a single Point to the database.
*
Expand Down Expand Up @@ -300,4 +322,30 @@ public void write(final String database, final String retentionPolicy,
*/
public void close();

/**
* Set the consistency level which is used for writing points.
*
* @param consistency
* the consistency level to set.
* @return the InfluxDB instance to be able to use it in a fluent manner.
*/
public InfluxDB setConsistency(final ConsistencyLevel consistency);

/**
* Set the database which is used for writing points.
*
* @param database
* the database to set.
* @return the InfluxDB instance to be able to use it in a fluent manner.
*/
public InfluxDB setDatabase(final String database);

/**
* Set the retention policy which is used for writing points.
*
* @param retentionPolicy
* the retention policy to set.
* @return the InfluxDB instance to be able to use it in a fluent manner.
*/
public InfluxDB setRetentionPolicy(final String retentionPolicy);
}
48 changes: 48 additions & 0 deletions src/main/java/org/influxdb/impl/InfluxDBImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
* @author stefan.majer [at] gmail.com
*/
public class InfluxDBImpl implements InfluxDB {

static final okhttp3.MediaType MEDIA_TYPE_STRING = MediaType.parse("text/plain");

private static final String SHOW_DATABASE_COMMAND_ENCODED = Query.encode("SHOW DATABASES");
Expand All @@ -73,6 +74,9 @@ public class InfluxDBImpl implements InfluxDB {
private final GzipRequestInterceptor gzipRequestInterceptor;
private LogLevel logLevel = LogLevel.NONE;
private JsonAdapter<QueryResult> adapter;
private String database;
private String retentionPolicy = "autogen";
private ConsistencyLevel consistency = ConsistencyLevel.ONE;

public InfluxDBImpl(final String url, final String username, final String password,
final OkHttpClient.Builder client) {
Expand All @@ -93,6 +97,15 @@ public InfluxDBImpl(final String url, final String username, final String passwo
this.adapter = moshi.adapter(QueryResult.class);
}

public InfluxDBImpl(final String url, final String username, final String password,
final OkHttpClient.Builder client, final String database, final String retentionPolicy, final ConsistencyLevel consistency) {
this(url, username, password, client);

setConsistency(consistency);
setDatabase(database);
setRetentionPolicy(retentionPolicy);
}

private InetAddress parseHostAddress(final String url) {
HttpUrl httpUrl = HttpUrl.parse(url);

Expand Down Expand Up @@ -234,6 +247,21 @@ public String version() {
return ping().getVersion();
}

@Override
public void write(Point point) {
write(database, retentionPolicy, point);
}

@Override
public void write(String records) {
write(database, retentionPolicy, consistency, records);
}

@Override
public void write(List<String> records) {
write(database, retentionPolicy, consistency, records);
}

@Override
public void write(final String database, final String retentionPolicy, final Point point) {
if (this.batchEnabled.get()) {
Expand Down Expand Up @@ -497,4 +525,24 @@ public void close() {
}
}

@Override
public InfluxDB setConsistency(ConsistencyLevel consistency) {

this.consistency = consistency;
return this;
}

@Override
public InfluxDB setDatabase(String database) {

this.database = database;
return this;
}

@Override
public InfluxDB setRetentionPolicy(String retentionPolicy) {

this.retentionPolicy = retentionPolicy;
return this;
}
}
65 changes: 65 additions & 0 deletions src/test/java/org/influxdb/InfluxDBTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,23 @@ public void testWriteStringData() {
this.influxDB.deleteDatabase(dbName);
}

/**
* Test writing to the database using string protocol with simpler interface.
*/
@Test
public void testWriteStringDataSimple() {
String dbName = "write_unittest_" + System.currentTimeMillis();
this.influxDB.createDatabase(dbName);
String rp = TestUtils.defaultRetentionPolicy(this.influxDB.version());
this.influxDB.setDatabase(dbName);
this.influxDB.setRetentionPolicy(rp);
this.influxDB.write("cpu,atag=test idle=90,usertime=9,system=1");
Query query = new Query("SELECT * FROM cpu GROUP BY *", dbName);
QueryResult result = this.influxDB.query(query);
Assert.assertFalse(result.getResults().get(0).getSeries().get(0).getTags().isEmpty());
this.influxDB.deleteDatabase(dbName);
}

/**
* Test writing to the database using string protocol through UDP.
*/
Expand Down Expand Up @@ -338,6 +355,28 @@ public void testWriteMultipleStringData() {
this.influxDB.deleteDatabase(dbName);
}

/**
* Test writing multiple records to the database using string protocol with simpler interface.
*/
@Test
public void testWriteMultipleStringDataSimple() {
String dbName = "write_unittest_" + System.currentTimeMillis();
this.influxDB.createDatabase(dbName);
String rp = TestUtils.defaultRetentionPolicy(this.influxDB.version());
this.influxDB.setDatabase(dbName);
this.influxDB.setRetentionPolicy(rp);

this.influxDB.write("cpu,atag=test1 idle=100,usertime=10,system=1\ncpu,atag=test2 idle=200,usertime=20,system=2\ncpu,atag=test3 idle=300,usertime=30,system=3");
Query query = new Query("SELECT * FROM cpu GROUP BY *", dbName);
QueryResult result = this.influxDB.query(query);

Assert.assertEquals(result.getResults().get(0).getSeries().size(), 3);
Assert.assertEquals(result.getResults().get(0).getSeries().get(0).getTags().get("atag"), "test1");
Assert.assertEquals(result.getResults().get(0).getSeries().get(1).getTags().get("atag"), "test2");
Assert.assertEquals(result.getResults().get(0).getSeries().get(2).getTags().get("atag"), "test3");
this.influxDB.deleteDatabase(dbName);
}

/**
* Test writing multiple separate records to the database using string protocol.
*/
Expand All @@ -362,6 +401,32 @@ public void testWriteMultipleStringDataLines() {
this.influxDB.deleteDatabase(dbName);
}

/**
* Test writing multiple separate records to the database using string protocol with simpler interface.
*/
@Test
public void testWriteMultipleStringDataLinesSimple() {
String dbName = "write_unittest_" + System.currentTimeMillis();
this.influxDB.createDatabase(dbName);
String rp = TestUtils.defaultRetentionPolicy(this.influxDB.version());
this.influxDB.setDatabase(dbName);
this.influxDB.setRetentionPolicy(rp);

this.influxDB.write(Arrays.asList(
"cpu,atag=test1 idle=100,usertime=10,system=1",
"cpu,atag=test2 idle=200,usertime=20,system=2",
"cpu,atag=test3 idle=300,usertime=30,system=3"
));
Query query = new Query("SELECT * FROM cpu GROUP BY *", dbName);
QueryResult result = this.influxDB.query(query);

Assert.assertEquals(result.getResults().get(0).getSeries().size(), 3);
Assert.assertEquals(result.getResults().get(0).getSeries().get(0).getTags().get("atag"), "test1");
Assert.assertEquals(result.getResults().get(0).getSeries().get(1).getTags().get("atag"), "test2");
Assert.assertEquals(result.getResults().get(0).getSeries().get(2).getTags().get("atag"), "test3");
this.influxDB.deleteDatabase(dbName);
}

/**
* Test that creating database which name is composed of numbers only works
*/
Expand Down

0 comments on commit bfe020f

Please sign in to comment.