Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Influxdb connector #1680

Merged
merged 95 commits into from
Jun 28, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
95 commits
Select commit Hold shift + click to select a range
0bc6432
Added reference and build.sbt files
gkatzioura Mar 10, 2019
190ae83
Added result mapper helper and source for influxdb
gkatzioura Apr 14, 2019
55a3a39
Fixed influxdb source
gkatzioura Apr 14, 2019
f7ba613
Added models for influxdb write operations
gkatzioura May 5, 2019
778c91c
Added influxdb source stage
gkatzioura May 5, 2019
f4d9f30
Added influxdb flow stage
gkatzioura May 5, 2019
c4a420b
Added influxdb source implementation for scala
gkatzioura May 5, 2019
2786a12
Added settings for influxdb database
gkatzioura May 5, 2019
c9a0b06
Added scala flow for influxdb connector
gkatzioura May 5, 2019
dfcce7f
Added scala dsl sink for influxdb connector
gkatzioura May 5, 2019
9f49657
Added influxdb flow sink and source for java dsl
gkatzioura May 5, 2019
4e4a3ee
Influxdb connector tests and utils for java dsl
gkatzioura May 5, 2019
0c30e8b
Influxdb connector tests for scala dsl
gkatzioura May 5, 2019
6ccf9b2
Added influxdb
gkatzioura May 5, 2019
32b94c4
Influxdb connector result mapper helper
gkatzioura May 5, 2019
dc20c1a
Influxdb connector reference
gkatzioura May 5, 2019
777b2b7
Influxdb Dependencies
gkatzioura May 5, 2019
8230b65
Applied formatting on Influxdb connector
gkatzioura May 5, 2019
9f92fe1
Fixed source stage
gkatzioura May 5, 2019
6694915
InfluxDB connector compile for 2.11.12
huntc May 5, 2019
9241e6f
Merge branch 'master' into influxdb.issue.1054
gkatzioura May 5, 2019
d9d4c19
Merge branch 'master' into influxdb.issue.1054
gkatzioura May 6, 2019
8ccb0f8
InfluxDB connector disabled MimaPlugin
gkatzioura May 6, 2019
6883fa2
InfluxDB connector added test to travis
gkatzioura May 6, 2019
25e8171
InfluxDB connector added test to travis
gkatzioura May 6, 2019
ada72bb
InfluxDB connector added test to travis
gkatzioura May 6, 2019
51f1e5b
Merge branch 'master' into influxdb.issue.1054
gkatzioura May 6, 2019
b165f25
Removed println
gkatzioura May 6, 2019
d7f10b8
Changed typos on spec for InfluxDB connector
gkatzioura May 8, 2019
4ca0611
Changed to scala constants for InfluxDB connector
gkatzioura May 8, 2019
e356055
Add final to constants for InfluxDB connector
gkatzioura May 8, 2019
66988af
Fixed typo for InfluxDB connector
gkatzioura May 8, 2019
4f039ca
Fixed typo for InfluxDB connector
gkatzioura May 8, 2019
3b5bbda
Added spaces on select statements for InfluxDB connector
gkatzioura May 8, 2019
58209e2
Changed to hostname for InfluxDB connector
gkatzioura May 8, 2019
14a4cfe
Switched settings class to case for InfluxDB connector
gkatzioura May 8, 2019
501d708
Fixed reference spacing for InfluxDB connector
gkatzioura May 8, 2019
eabe031
Changed option to Some/None for InfluxDB connector
gkatzioura May 8, 2019
9ba5263
Changed to (_ :+ _) from (seq, wm) => seq :+ wm on InfluxDB connector
gkatzioura May 8, 2019
54c09c8
Added imports to clean flow for InfluxDB connector
gkatzioura May 8, 2019
308a64d
Changed Option.empty() to None and used match cases for InfluxDB conn…
gkatzioura May 8, 2019
a6b9070
Changed to one liners
gkatzioura May 8, 2019
c50fd19
Changed to None for InfluxDB connector
gkatzioura May 8, 2019
12cf067
Changed from option to some for InfluxDB connector
gkatzioura May 8, 2019
d5f984b
Changed on alpakka.influxdb on reference for InfluxDB connector
gkatzioura May 10, 2019
a12867c
Added information on database and retention policy on InfluxDBWriteMe…
gkatzioura May 11, 2019
a464927
Added result mapper utility for InfluxDB connector
gkatzioura May 11, 2019
162b897
Changed flow to avoid batching for InfluxDB connector
gkatzioura May 11, 2019
dc61972
Set IODispatcher on InfluxDBFlowStage for InfluxDB connector
gkatzioura May 11, 2019
395f373
Chnaged to val for InfluxDB connector
gkatzioura May 11, 2019
4000c84
Removed unecessary variables InfluxDB connector
gkatzioura May 14, 2019
eaf6060
Applied formatting on InfluxDB connector
gkatzioura May 14, 2019
1701901
Update influxdb/src/main/scala/akka/stream/alpakka/influxdb/impl/Infl…
gkatzioura May 17, 2019
a150cc4
Removed reference on InfluxDB connector
gkatzioura May 17, 2019
b50c5c4
Removed case class on settings for InfluxDB connector
gkatzioura May 17, 2019
2f9fbe4
Merge branch 'master' into influxdb.issue.1054
gkatzioura May 18, 2019
953738e
Changed to InfluxDb for InfluxDb connector
gkatzioura May 18, 2019
fb3f93d
Separated logic on flow for InfluxDb connector
gkatzioura May 19, 2019
fe55ee8
Added @InternalApi on InfluxDbFlowStage for InfluxDb connector
gkatzioura May 19, 2019
364cdd9
Refactored file name for InfluxDb connector
gkatzioura May 19, 2019
92e6943
Set source queries to be executed on the IODispatcher for InfluxDb co…
gkatzioura May 19, 2019
a249573
Set source queries to be executed on the IODispatcher for InfluxDb co…
gkatzioura May 19, 2019
80d1489
Set query to run on IO Dispatcher, added test on query failure on Inf…
gkatzioura May 19, 2019
8876c26
Fail stage on error for source on InfluxDb connector
gkatzioura May 19, 2019
ac63395
Added error handling per source on InfluxDb connector
gkatzioura May 20, 2019
7b82e2d
Optimized imports on InfluxDb connector
gkatzioura May 20, 2019
92f93d1
Changed the name for the result mapper on InfluxDb connector
gkatzioura May 20, 2019
f995ab9
Made no typed streams to points on InfluxDb connector
gkatzioura May 21, 2019
fe299a1
Made more functional retention policy check on InfluxDb connector
gkatzioura May 21, 2019
8ee2a6d
Added @InternalApi on InfluxDb connector
gkatzioura May 21, 2019
7d7ad27
Added read and write settings on InfluxDb connector
gkatzioura May 21, 2019
018b2ab
excluding 2.11 scala version for influxdb connector
gkatzioura May 23, 2019
7c59653
Fixing copy paste error for InfluxDb connector
gkatzioura May 23, 2019
dab160e
Added toString on InfluxDBWriteMessage for InfluxDb connector
gkatzioura May 23, 2019
659e1a3
Added toString methods on InfluxDb connector
gkatzioura May 23, 2019
54bc4d7
Refactored from apply to create on InfluDb connector
gkatzioura May 24, 2019
19af35b
Formatted sbt for InfluxDb connector
gkatzioura May 24, 2019
f65689a
Removed logs test on InfluxDb connector
gkatzioura May 27, 2019
14e692d
Changed to databaseName.orNull on InfluxDb connector
gkatzioura Jun 2, 2019
32aa72c
Removed batch, in order for the user to submit immutable.Seq on Influ…
gkatzioura Jun 13, 2019
5462a58
Merge branch 'master' into influxdb.issue.1054
gkatzioura Jun 13, 2019
9d9412e
formatted InfluxDBSpec for InfluxDB connector
gkatzioura Jun 13, 2019
045d84a
formatted Dependencies for InfluxDB connector
gkatzioura Jun 13, 2019
77104f3
Removed influxDB.enableBatch(BatchOptions.DEFAULTS) and influxDB.clos…
gkatzioura Jun 14, 2019
475142a
Moved runQuery to preStart so that there is data available on the fir…
gkatzioura Jun 14, 2019
2bc55c3
Applied formatting on InfluxDb connector
gkatzioura Jun 14, 2019
6a83218
Implemented base class for InfluxDbSourceLogic and InfluxDbSourceRawL…
gkatzioura Jun 14, 2019
b7bed8f
Added documentation for InfluxDb connector
gkatzioura Jun 27, 2019
fe70b98
Merge branch 'master' into influxdb.issue.1054
gkatzioura Jun 27, 2019
f5dd3d8
Formatted InfluxDbFlow for InfluxDb connector
gkatzioura Jun 27, 2019
4185743
Added to project-info
gkatzioura Jun 27, 2019
de06fd6
Typo fix
gkatzioura Jun 27, 2019
efdb553
Added #init-client on InfluxdbSPec for InfluxDB connector
gkatzioura Jun 27, 2019
d96e68b
Added #init-client on InfluxDB connector
gkatzioura Jun 27, 2019
441125d
Documentation fixes on InfluxDB connector
gkatzioura Jun 27, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,9 @@ jobs:
- PRE_CMD="docker-compose up -d hbase"
- env:
- DIR=hdfs
- env:
- DIR=influxdb
- PRE_CMD="docker-compose up -d influxdb"
- env:
- DIR=ironmq
- PRE_CMD="docker-compose up -d ironauth ironmq"
Expand Down
5 changes: 5 additions & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ lazy val modules: Seq[ProjectReference] = Seq(
googleFcm,
hbase,
hdfs,
influxdb,
ironmq,
jms,
jsonStreaming,
Expand Down Expand Up @@ -194,6 +195,10 @@ lazy val hdfs = alpakkaProject("hdfs",
crossScalaVersions -= Dependencies.Scala213 // Requires upgrade of cats-core
)

lazy val influxdb =
alpakkaProject("influxdb", "influxdb", Dependencies.InfluxDB, crossScalaVersions -= Dependencies.Scala211)
.disablePlugins(MimaPlugin)

lazy val ironmq = alpakkaProject(
"ironmq",
"ironmq",
Expand Down
4 changes: 4 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,10 @@ services:
image: mongo
ports:
- "27017:27017"
influxdb:
image: influxdb
ports:
- "8086:8086"
mqtt:
image: toke/mosquitto
ports:
Expand Down
1 change: 1 addition & 0 deletions docs/src/main/paradox/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ The [Alpakka project](https://doc.akka.io/docs/alpakka/current/) is an open sour
* [HTTP](external/http.md)
* [IBM Bluemix Cloud Object storage](bluemix-cos.md)
* [IBM DB2 Event Store](external/db2-event-store.md)
* [InfluxDB](influxdb.md)
* [IronMQ](ironmq.md)
* [JMS](jms/index.md)
* [MongoDB](mongodb.md)
Expand Down
95 changes: 95 additions & 0 deletions docs/src/main/paradox/influxdb.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
#InfluxDB

The Alpakka InfluxDb connector provides Akka Streams integration for InfluxDB.

For more information about InfluxDB, please visit the [InfluxDB Documentation](https://docs.influxdata.com/)

@@project-info{ projectId="influxdb" }

## Artifacts

@@dependency [sbt,Maven,Gradle] {
group=com.lightbend.akka
artifact=akka-stream-alpakka-influxdb$scala.binary.version$
version=$project.version$
}

The table below shows direct dependencies of this module and the second tab shows all libraries it depends on transitively.

@@dependencies { projectId="influxdb" }

## Set up InfluxDB client

Sources, Flows and Sinks provided by this connector need a prepared `org.influxdb.InfluxDB` to
access to InfluxDB.

Scala
: @@snip [snip](/influxdb/src/test/scala/docs/scaladsl/InfluxDbSpec.scala) { #init-client }

Java
: @@snip [snip](/influxdb/src/test/java/docs/javadsl/TestUtils.java) { #init-client }

## InfluxDB as Source and Sink

Now we can stream messages from or to InfluxDB by providing the `InfluxDB` to the
@scala[@scaladoc[InfluxDbSource](akka.stream.alpakka.influxdb.scaladsl.InfluxDbSource$)]
@java[@scaladoc[InfluxDbSource](akka.stream.alpakka.influxdb.javadsl.InfluxDbSource$)]
or the
@scala[@scaladoc[InfluxDbSink](akka.stream.alpakka.influxdb.scaladsl.InfluxDbSink$).]
@java[@scaladoc[InfluxDbSink](akka.stream.alpakka.influxdb.javadsl.InfluxDbSink$).]


Scala
: @@snip [snip](/influxdb/src/test/scala/docs/scaladsl/InfluxDbSpecCpu.java) { #define-class }

Java
: @@snip [snip](/influxdb/src/test/java/docs/javadsl/InfluxDbCpu.java) { #define-class }

### With typed source

Use `InfluxDbSource.typed` and `InfluxDbSink.typed` to create source and sink.
@scala[The data is converted by InfluxDBMapper.]
@java[The data is converted by InfluxDBMapper.]

Scala
: @@snip [snip](/influxdb/src/test/scala/docs/scaladsl/InfluxDbSpec.scala) { #run-typed }

Java
: @@snip [snip](/influxdb/src/test/java/docs/javadsl/InfluxDbTest.java) { #run-typed }

### With `QueryResult` source

Use `InfluxDbSource.create` and `InfluxDbSink.create` to create source and sink.

Scala
: @@snip [snip](/influxdb/src/test/scala/docs/scaladsl/InfluxDbSpec.scala) { #run-query-result}

Java
: @@snip [snip](/influxdb/src/test/java/docs/javadsl/InfluxDbTest.java) { #run-query-result}

TODO

### Writing to InfluxDB

You can also build flow stages.
@scala[@scaladoc[InfluxDbFlow](akka.stream.alpakka.influxdb.scaladsl.InfluxDbFlow$).]
@java[@scaladoc[InfluxDbFlow](akka.stream.alpakka.influxdb.javadsl.InfluxDbFlow$).]
The API is similar to creating Sinks.

Scala
: @@snip [snip](/influxdb/src/test/scala/docs/scaladsl/FlowSpec.scala) { #run-flow }

Java
: @@snip [snip](/influxdb/src/test/java/docs/javadsl/InfluxDbTest.java) { #run-flow }

### Passing data through InfluxDbFlow

When streaming documents from Kafka, you might want to commit to Kafka **AFTER** the document has been written to InfluxDB.

Scala
: @@snip [snip](/influxdb/src/test/scala/docs/scaladsl/FlowSpec.scala) { #kafka-example }

Java
: @@snip [snip](/influxdb/src/test/java/docs/javadsl/InfluxDbTest.java) { #kafka-example }


Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Copyright (C) 2016-2019 Lightbend Inc. <http://www.lightbend.com>
*/

package akka.stream.alpakka.influxdb

import java.util.concurrent.TimeUnit

object InfluxDbReadSettings {
val Default = new InfluxDbReadSettings(TimeUnit.MILLISECONDS)

def apply(): InfluxDbReadSettings = Default

}

final class InfluxDbReadSettings private (val precision: TimeUnit) {

def withPrecision(precision: TimeUnit): InfluxDbReadSettings = copy(precision = precision)

private def copy(
precision: TimeUnit = precision
): InfluxDbReadSettings = new InfluxDbReadSettings(
precision = precision
)

gkatzioura marked this conversation as resolved.
Show resolved Hide resolved
override def toString: String =
s"""InfluxDbReadSettings(precision=$precision)"""

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Copyright (C) 2016-2019 Lightbend Inc. <http://www.lightbend.com>
*/

package akka.stream.alpakka.influxdb

import akka.NotUsed

object InfluxDbWriteMessage {
// Apply method to use when not using passThrough
def apply[T](point: T): InfluxDbWriteMessage[T, NotUsed] =
new InfluxDbWriteMessage(point = point, passThrough = NotUsed)

// Java-api - without passThrough
def create[T](point: T): InfluxDbWriteMessage[T, NotUsed] =
new InfluxDbWriteMessage(point, NotUsed)

// Java-api - with passThrough
def create[T, C](point: T, passThrough: C) =
new InfluxDbWriteMessage(point, passThrough)
}

final class InfluxDbWriteMessage[T, C] private (val point: T,
val passThrough: C,
val databaseName: Option[String] = None,
val retentionPolicy: Option[String] = None) {

def withPoint(point: T): InfluxDbWriteMessage[T, C] =
copy(point = point)

def withPassThrough[PT2](passThrough: PT2): InfluxDbWriteMessage[T, PT2] =
new InfluxDbWriteMessage[T, PT2](
point = point,
passThrough = passThrough,
databaseName = databaseName,
retentionPolicy = retentionPolicy
)

def withDatabaseName(databaseName: String): InfluxDbWriteMessage[T, C] =
copy(databaseName = Some(databaseName))

def withRetentionPolicy(retentionPolicy: String): InfluxDbWriteMessage[T, C] =
copy(retentionPolicy = Some(retentionPolicy))

private def copy(
point: T = point,
passThrough: C = passThrough,
databaseName: Option[String] = databaseName,
retentionPolicy: Option[String] = retentionPolicy
): InfluxDbWriteMessage[T, C] =
new InfluxDbWriteMessage(point = point,
passThrough = passThrough,
databaseName = databaseName,
retentionPolicy = retentionPolicy)

override def toString: String =
s"""InfluxDbWriteMessage(point=$point,passThrough=$passThrough,databaseName=$databaseName,retentionPolicy=$retentionPolicy)"""
}

final case class InfluxDbWriteResult[T, C](writeMessage: InfluxDbWriteMessage[T, C], error: Option[String])
Loading