Skip to content

Commit

Permalink
Influxdb connector (#1680)
Browse files Browse the repository at this point in the history
A new connector to stream data to and from InfluxDB.
  • Loading branch information
gkatzioura authored and ennru committed Jun 28, 2019
1 parent 8cce949 commit cd12632
Show file tree
Hide file tree
Showing 31 changed files with 2,095 additions and 0 deletions.
3 changes: 3 additions & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,9 @@ jobs:
- PRE_CMD="docker-compose up -d hbase"
- env:
- DIR=hdfs
- env:
- DIR=influxdb
- PRE_CMD="docker-compose up -d influxdb"
- env:
- DIR=ironmq
- PRE_CMD="docker-compose up -d ironauth ironmq"
Expand Down
5 changes: 5 additions & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ lazy val modules: Seq[ProjectReference] = Seq(
googleFcm,
hbase,
hdfs,
influxdb,
ironmq,
jms,
jsonStreaming,
Expand Down Expand Up @@ -194,6 +195,10 @@ lazy val hdfs = alpakkaProject("hdfs",
crossScalaVersions -= Dependencies.Scala213 // Requires upgrade of cats-core
)

lazy val influxdb =
alpakkaProject("influxdb", "influxdb", Dependencies.InfluxDB, crossScalaVersions -= Dependencies.Scala211)
.disablePlugins(MimaPlugin)

lazy val ironmq = alpakkaProject(
"ironmq",
"ironmq",
Expand Down
4 changes: 4 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,10 @@ services:
image: mongo
ports:
- "27017:27017"
influxdb:
image: influxdb
ports:
- "8086:8086"
mqtt:
image: toke/mosquitto
ports:
Expand Down
1 change: 1 addition & 0 deletions docs/src/main/paradox/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ The [Alpakka project](https://doc.akka.io/docs/alpakka/current/) is an open sour
* [HTTP](external/http.md)
* [IBM Bluemix Cloud Object storage](bluemix-cos.md)
* [IBM DB2 Event Store](external/db2-event-store.md)
* [InfluxDB](influxdb.md)
* [IronMQ](ironmq.md)
* [JMS](jms/index.md)
* [MongoDB](mongodb.md)
Expand Down
95 changes: 95 additions & 0 deletions docs/src/main/paradox/influxdb.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
#InfluxDB

The Alpakka InfluxDb connector provides Akka Streams integration for InfluxDB.

For more information about InfluxDB, please visit the [InfluxDB Documentation](https://docs.influxdata.com/)

@@project-info{ projectId="influxdb" }

## Artifacts

@@dependency [sbt,Maven,Gradle] {
group=com.lightbend.akka
artifact=akka-stream-alpakka-influxdb$scala.binary.version$
version=$project.version$
}

The table below shows direct dependencies of this module and the second tab shows all libraries it depends on transitively.

@@dependencies { projectId="influxdb" }

## Set up InfluxDB client

Sources, Flows and Sinks provided by this connector need a prepared `org.influxdb.InfluxDB` to
access to InfluxDB.

Scala
: @@snip [snip](/influxdb/src/test/scala/docs/scaladsl/InfluxDbSpec.scala) { #init-client }

Java
: @@snip [snip](/influxdb/src/test/java/docs/javadsl/TestUtils.java) { #init-client }

## InfluxDB as Source and Sink

Now we can stream messages from or to InfluxDB by providing the `InfluxDB` to the
@scala[@scaladoc[InfluxDbSource](akka.stream.alpakka.influxdb.scaladsl.InfluxDbSource$)]
@java[@scaladoc[InfluxDbSource](akka.stream.alpakka.influxdb.javadsl.InfluxDbSource$)]
or the
@scala[@scaladoc[InfluxDbSink](akka.stream.alpakka.influxdb.scaladsl.InfluxDbSink$).]
@java[@scaladoc[InfluxDbSink](akka.stream.alpakka.influxdb.javadsl.InfluxDbSink$).]


Scala
: @@snip [snip](/influxdb/src/test/scala/docs/scaladsl/InfluxDbSpecCpu.java) { #define-class }

Java
: @@snip [snip](/influxdb/src/test/java/docs/javadsl/InfluxDbCpu.java) { #define-class }

### With typed source

Use `InfluxDbSource.typed` and `InfluxDbSink.typed` to create source and sink.
@scala[The data is converted by InfluxDBMapper.]
@java[The data is converted by InfluxDBMapper.]

Scala
: @@snip [snip](/influxdb/src/test/scala/docs/scaladsl/InfluxDbSpec.scala) { #run-typed }

Java
: @@snip [snip](/influxdb/src/test/java/docs/javadsl/InfluxDbTest.java) { #run-typed }

### With `QueryResult` source

Use `InfluxDbSource.create` and `InfluxDbSink.create` to create source and sink.

Scala
: @@snip [snip](/influxdb/src/test/scala/docs/scaladsl/InfluxDbSpec.scala) { #run-query-result}

Java
: @@snip [snip](/influxdb/src/test/java/docs/javadsl/InfluxDbTest.java) { #run-query-result}

TODO

### Writing to InfluxDB

You can also build flow stages.
@scala[@scaladoc[InfluxDbFlow](akka.stream.alpakka.influxdb.scaladsl.InfluxDbFlow$).]
@java[@scaladoc[InfluxDbFlow](akka.stream.alpakka.influxdb.javadsl.InfluxDbFlow$).]
The API is similar to creating Sinks.

Scala
: @@snip [snip](/influxdb/src/test/scala/docs/scaladsl/FlowSpec.scala) { #run-flow }

Java
: @@snip [snip](/influxdb/src/test/java/docs/javadsl/InfluxDbTest.java) { #run-flow }

### Passing data through InfluxDbFlow

When streaming documents from Kafka, you might want to commit to Kafka **AFTER** the document has been written to InfluxDB.

Scala
: @@snip [snip](/influxdb/src/test/scala/docs/scaladsl/FlowSpec.scala) { #kafka-example }

Java
: @@snip [snip](/influxdb/src/test/java/docs/javadsl/InfluxDbTest.java) { #kafka-example }


Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Copyright (C) 2016-2019 Lightbend Inc. <http://www.lightbend.com>
*/

package akka.stream.alpakka.influxdb

import java.util.concurrent.TimeUnit

object InfluxDbReadSettings {
val Default = new InfluxDbReadSettings(TimeUnit.MILLISECONDS)

def apply(): InfluxDbReadSettings = Default

}

final class InfluxDbReadSettings private (val precision: TimeUnit) {

def withPrecision(precision: TimeUnit): InfluxDbReadSettings = copy(precision = precision)

private def copy(
precision: TimeUnit = precision
): InfluxDbReadSettings = new InfluxDbReadSettings(
precision = precision
)

override def toString: String =
s"""InfluxDbReadSettings(precision=$precision)"""

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Copyright (C) 2016-2019 Lightbend Inc. <http://www.lightbend.com>
*/

package akka.stream.alpakka.influxdb

import akka.NotUsed

object InfluxDbWriteMessage {
// Apply method to use when not using passThrough
def apply[T](point: T): InfluxDbWriteMessage[T, NotUsed] =
new InfluxDbWriteMessage(point = point, passThrough = NotUsed)

// Java-api - without passThrough
def create[T](point: T): InfluxDbWriteMessage[T, NotUsed] =
new InfluxDbWriteMessage(point, NotUsed)

// Java-api - with passThrough
def create[T, C](point: T, passThrough: C) =
new InfluxDbWriteMessage(point, passThrough)
}

final class InfluxDbWriteMessage[T, C] private (val point: T,
val passThrough: C,
val databaseName: Option[String] = None,
val retentionPolicy: Option[String] = None) {

def withPoint(point: T): InfluxDbWriteMessage[T, C] =
copy(point = point)

def withPassThrough[PT2](passThrough: PT2): InfluxDbWriteMessage[T, PT2] =
new InfluxDbWriteMessage[T, PT2](
point = point,
passThrough = passThrough,
databaseName = databaseName,
retentionPolicy = retentionPolicy
)

def withDatabaseName(databaseName: String): InfluxDbWriteMessage[T, C] =
copy(databaseName = Some(databaseName))

def withRetentionPolicy(retentionPolicy: String): InfluxDbWriteMessage[T, C] =
copy(retentionPolicy = Some(retentionPolicy))

private def copy(
point: T = point,
passThrough: C = passThrough,
databaseName: Option[String] = databaseName,
retentionPolicy: Option[String] = retentionPolicy
): InfluxDbWriteMessage[T, C] =
new InfluxDbWriteMessage(point = point,
passThrough = passThrough,
databaseName = databaseName,
retentionPolicy = retentionPolicy)

override def toString: String =
s"""InfluxDbWriteMessage(point=$point,passThrough=$passThrough,databaseName=$databaseName,retentionPolicy=$retentionPolicy)"""
}

final case class InfluxDbWriteResult[T, C](writeMessage: InfluxDbWriteMessage[T, C], error: Option[String])
Loading

0 comments on commit cd12632

Please sign in to comment.