Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Influxdb connector #1680

Merged
merged 95 commits into from
Jun 28, 2019
Merged
Show file tree
Hide file tree
Changes from 28 commits
Commits
Show all changes
95 commits
Select commit Hold shift + click to select a range
0bc6432
Added reference and build.sbt files
gkatzioura Mar 10, 2019
190ae83
Added result mapper helper and source for influxdb
gkatzioura Apr 14, 2019
55a3a39
Fixed influxdb source
gkatzioura Apr 14, 2019
f7ba613
Added models for influxdb write operations
gkatzioura May 5, 2019
778c91c
Added influxdb source stage
gkatzioura May 5, 2019
f4d9f30
Added influxdb flow stage
gkatzioura May 5, 2019
c4a420b
Added influxdb source implementation for scala
gkatzioura May 5, 2019
2786a12
Added settings for influxdb database
gkatzioura May 5, 2019
c9a0b06
Added scala flow for influxdb connector
gkatzioura May 5, 2019
dfcce7f
Added scala dsl sink for influxdb connector
gkatzioura May 5, 2019
9f49657
Added influxdb flow sink and source for java dsl
gkatzioura May 5, 2019
4e4a3ee
Influxdb connector tests and utils for java dsl
gkatzioura May 5, 2019
0c30e8b
Influxdb connector tests for scala dsl
gkatzioura May 5, 2019
6ccf9b2
Added influxdb
gkatzioura May 5, 2019
32b94c4
Influxdb connector result mapper helper
gkatzioura May 5, 2019
dc20c1a
Influxdb connector reference
gkatzioura May 5, 2019
777b2b7
Influxdb Dependencies
gkatzioura May 5, 2019
8230b65
Applied formatting on Influxdb connector
gkatzioura May 5, 2019
9f92fe1
Fixed source stage
gkatzioura May 5, 2019
6694915
InfluxDB connector compile for 2.11.12
huntc May 5, 2019
9241e6f
Merge branch 'master' into influxdb.issue.1054
gkatzioura May 5, 2019
d9d4c19
Merge branch 'master' into influxdb.issue.1054
gkatzioura May 6, 2019
8ccb0f8
InfluxDB connector disabled MimaPlugin
gkatzioura May 6, 2019
6883fa2
InfluxDB connector added test to travis
gkatzioura May 6, 2019
25e8171
InfluxDB connector added test to travis
gkatzioura May 6, 2019
ada72bb
InfluxDB connector added test to travis
gkatzioura May 6, 2019
51f1e5b
Merge branch 'master' into influxdb.issue.1054
gkatzioura May 6, 2019
b165f25
Removed println
gkatzioura May 6, 2019
d7f10b8
Changed typos on spec for InfluxDB connector
gkatzioura May 8, 2019
4ca0611
Changed to scala constants for InfluxDB connector
gkatzioura May 8, 2019
e356055
Add final to constants for InfluxDB connector
gkatzioura May 8, 2019
66988af
Fixed typo for InfluxDB connector
gkatzioura May 8, 2019
4f039ca
Fixed typo for InfluxDB connector
gkatzioura May 8, 2019
3b5bbda
Added spaces on select statements for InfluxDB connector
gkatzioura May 8, 2019
58209e2
Changed to hostname for InfluxDB connector
gkatzioura May 8, 2019
14a4cfe
Switched settings class to case for InfluxDB connector
gkatzioura May 8, 2019
501d708
Fixed reference spacing for InfluxDB connector
gkatzioura May 8, 2019
eabe031
Changed option to Some/None for InfluxDB connector
gkatzioura May 8, 2019
9ba5263
Changed to (_ :+ _) from (seq, wm) => seq :+ wm on InfluxDB connector
gkatzioura May 8, 2019
54c09c8
Added imports to clean flow for InfluxDB connector
gkatzioura May 8, 2019
308a64d
Changed Option.empty() to None and used match cases for InfluxDB conn…
gkatzioura May 8, 2019
a6b9070
Changed to one liners
gkatzioura May 8, 2019
c50fd19
Changed to None for InfluxDB connector
gkatzioura May 8, 2019
12cf067
Changed from option to some for InfluxDB connector
gkatzioura May 8, 2019
d5f984b
Changed on alpakka.influxdb on reference for InfluxDB connector
gkatzioura May 10, 2019
a12867c
Added information on database and retention policy on InfluxDBWriteMe…
gkatzioura May 11, 2019
a464927
Added result mapper utility for InfluxDB connector
gkatzioura May 11, 2019
162b897
Changed flow to avoid batching for InfluxDB connector
gkatzioura May 11, 2019
dc61972
Set IODispatcher on InfluxDBFlowStage for InfluxDB connector
gkatzioura May 11, 2019
395f373
Chnaged to val for InfluxDB connector
gkatzioura May 11, 2019
4000c84
Removed unecessary variables InfluxDB connector
gkatzioura May 14, 2019
eaf6060
Applied formatting on InfluxDB connector
gkatzioura May 14, 2019
1701901
Update influxdb/src/main/scala/akka/stream/alpakka/influxdb/impl/Infl…
gkatzioura May 17, 2019
a150cc4
Removed reference on InfluxDB connector
gkatzioura May 17, 2019
b50c5c4
Removed case class on settings for InfluxDB connector
gkatzioura May 17, 2019
2f9fbe4
Merge branch 'master' into influxdb.issue.1054
gkatzioura May 18, 2019
953738e
Changed to InfluxDb for InfluxDb connector
gkatzioura May 18, 2019
fb3f93d
Separated logic on flow for InfluxDb connector
gkatzioura May 19, 2019
fe55ee8
Added @InternalApi on InfluxDbFlowStage for InfluxDb connector
gkatzioura May 19, 2019
364cdd9
Refactored file name for InfluxDb connector
gkatzioura May 19, 2019
92e6943
Set source queries to be executed on the IODispatcher for InfluxDb co…
gkatzioura May 19, 2019
a249573
Set source queries to be executed on the IODispatcher for InfluxDb co…
gkatzioura May 19, 2019
80d1489
Set query to run on IO Dispatcher, added test on query failure on Inf…
gkatzioura May 19, 2019
8876c26
Fail stage on error for source on InfluxDb connector
gkatzioura May 19, 2019
ac63395
Added error handling per source on InfluxDb connector
gkatzioura May 20, 2019
7b82e2d
Optimized imports on InfluxDb connector
gkatzioura May 20, 2019
92f93d1
Changed the name for the result mapper on InfluxDb connector
gkatzioura May 20, 2019
f995ab9
Made no typed streams to points on InfluxDb connector
gkatzioura May 21, 2019
fe299a1
Made more functional retention policy check on InfluxDb connector
gkatzioura May 21, 2019
8ee2a6d
Added @InternalApi on InfluxDb connector
gkatzioura May 21, 2019
7d7ad27
Added read and write settings on InfluxDb connector
gkatzioura May 21, 2019
018b2ab
excluding 2.11 scala version for influxdb connector
gkatzioura May 23, 2019
7c59653
Fixing copy paste error for InfluxDb connector
gkatzioura May 23, 2019
dab160e
Added toString on InfluxDBWriteMessage for InfluxDb connector
gkatzioura May 23, 2019
659e1a3
Added toString methods on InfluxDb connector
gkatzioura May 23, 2019
54bc4d7
Refactored from apply to create on InfluDb connector
gkatzioura May 24, 2019
19af35b
Formatted sbt for InfluxDb connector
gkatzioura May 24, 2019
f65689a
Removed logs test on InfluxDb connector
gkatzioura May 27, 2019
14e692d
Changed to databaseName.orNull on InfluxDb connector
gkatzioura Jun 2, 2019
32aa72c
Removed batch, in order for the user to submit immutable.Seq on Influ…
gkatzioura Jun 13, 2019
5462a58
Merge branch 'master' into influxdb.issue.1054
gkatzioura Jun 13, 2019
9d9412e
formatted InfluxDBSpec for InfluxDB connector
gkatzioura Jun 13, 2019
045d84a
formatted Dependencies for InfluxDB connector
gkatzioura Jun 13, 2019
77104f3
Removed influxDB.enableBatch(BatchOptions.DEFAULTS) and influxDB.clos…
gkatzioura Jun 14, 2019
475142a
Moved runQuery to preStart so that there is data available on the fir…
gkatzioura Jun 14, 2019
2bc55c3
Applied formatting on InfluxDb connector
gkatzioura Jun 14, 2019
6a83218
Implemented base class for InfluxDbSourceLogic and InfluxDbSourceRawL…
gkatzioura Jun 14, 2019
b7bed8f
Added documentation for InfluxDb connector
gkatzioura Jun 27, 2019
fe70b98
Merge branch 'master' into influxdb.issue.1054
gkatzioura Jun 27, 2019
f5dd3d8
Formatted InfluxDbFlow for InfluxDb connector
gkatzioura Jun 27, 2019
4185743
Added to project-info
gkatzioura Jun 27, 2019
de06fd6
Typo fix
gkatzioura Jun 27, 2019
efdb553
Added #init-client on InfluxdbSPec for InfluxDB connector
gkatzioura Jun 27, 2019
d96e68b
Added #init-client on InfluxDB connector
gkatzioura Jun 27, 2019
441125d
Documentation fixes on InfluxDB connector
gkatzioura Jun 27, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,9 @@ jobs:
- PRE_CMD="docker-compose up -d hbase"
- env:
- DIR=hdfs
- env:
- DIR=influxdb
- PRE_CMD="docker-compose up -d influxdb"
- env:
- DIR=ironmq
- PRE_CMD="docker-compose up -d ironauth ironmq"
Expand Down
3 changes: 3 additions & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ lazy val modules: Seq[ProjectReference] = Seq(
googleFcm,
hbase,
hdfs,
influxdb,
ironmq,
jms,
jsonStreaming,
Expand Down Expand Up @@ -178,6 +179,8 @@ lazy val hbase = alpakkaProject("hbase", "hbase", Dependencies.HBase, fork in Te

lazy val hdfs = alpakkaProject("hdfs", "hdfs", Dependencies.Hdfs, parallelExecution in Test := false)

lazy val influxdb = alpakkaProject("influxdb", "influxdb", Dependencies.InfluxDB).disablePlugins(MimaPlugin)

lazy val ironmq = alpakkaProject("ironmq",
"ironmq",
Dependencies.IronMq,
Expand Down
4 changes: 4 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,10 @@ services:
image: mongo
ports:
- "27017:27017"
influxdb:
image: influxdb
ports:
- "8086:8086"
mqtt:
image: toke/mosquitto
ports:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Copyright (C) 2016-2019 Lightbend Inc. <http://www.lightbend.com>
*/

package org.influxdb.impl;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why does this class live in that package?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This has to do with some package-private methods the influxdb-java driver contains which are used. I can extract them and put them in another class if needed.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is unfortunate to need the package-private methods. The least you can do is to name the class Alpakka-specific so that it won't collide with something in any dependencies.
Put an explanation in the class comment, please.


import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

import org.influxdb.InfluxDBMapperException;
import org.influxdb.dto.QueryResult;

public class InfluxDBResultMapperHelper {

private final InfluxDBResultMapper influxDBResultMapper = new InfluxDBResultMapper();

public void cacheClassFields(final Class<?> clazz) {
influxDBResultMapper.cacheMeasurementClass(clazz);
}

public <T> List<T> parseSeriesAs(
final Class<T> clazz, final QueryResult.Series series, final TimeUnit precision) {
influxDBResultMapper.cacheMeasurementClass(clazz);
return series
.getValues()
.stream()
.map(v -> parseRowAs(clazz, series.getColumns(), v, precision))
.collect(Collectors.toList());
}

public <T> T parseRowAs(
final Class<T> clazz, List<String> columns, final List<Object> values, TimeUnit precision) {

try {
ConcurrentMap<String, Field> fieldMap = influxDBResultMapper.getColNameAndFieldMap(clazz);

T object = null;

for (int i = 0; i < columns.size(); i++) {
Field correspondingField = fieldMap.get(columns.get(i));

if (correspondingField != null) {
if (object == null) {
object = clazz.newInstance();
}

influxDBResultMapper.setFieldValue(object, correspondingField, values.get(i), precision);
}
}

return object;
} catch (InstantiationException | IllegalAccessException e) {
throw new InfluxDBMapperException(e);
}
}
}
9 changes: 9 additions & 0 deletions influxdb/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
akka.stream.alpakka.influxdb{
gkatzioura marked this conversation as resolved.
Show resolved Hide resolved

#batch write size
gkatzioura marked this conversation as resolved.
Show resolved Hide resolved
batchSize = 10

# The InfluxDB write precision
precision = MILLISECONDS

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Copyright (C) 2016-2019 Lightbend Inc. <http://www.lightbend.com>
*/

package akka.stream.alpakka.influxdb

import java.util.concurrent.TimeUnit

import scala.concurrent.duration.TimeUnit

object InfluxDBSettings {
gkatzioura marked this conversation as resolved.
Show resolved Hide resolved
val Default = new InfluxDBSettings(batchSize = 10, TimeUnit.MILLISECONDS)

def apply(): InfluxDBSettings = Default

}

final class InfluxDBSettings private (
gkatzioura marked this conversation as resolved.
Show resolved Hide resolved
val batchSize: Int,
val precision: TimeUnit
) {

def withBatchSize(value: Int): InfluxDBSettings = copy(batchSize = value)

private def copy(
batchSize: Int = batchSize,
precision: TimeUnit = precision
): InfluxDBSettings = new InfluxDBSettings(
batchSize = batchSize,
precision = precision
)

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Copyright (C) 2016-2019 Lightbend Inc. <http://www.lightbend.com>
*/

package akka.stream.alpakka.influxdb

import akka.NotUsed

object InfluxDBWriteMessage {
// Apply method to use when not using passThrough
def apply[T](point: T): InfluxDBWriteMessage[T, NotUsed] =
InfluxDBWriteMessage(point, NotUsed)

// Java-api - without passThrough
def create[T](point: T): InfluxDBWriteMessage[T, NotUsed] =
InfluxDBWriteMessage(point, NotUsed)

// Java-api - with passThrough
def create[T, C](point: T, passThrough: C) =
InfluxDBWriteMessage(point, passThrough)
}

final case class InfluxDBWriteMessage[T, C](point: T, passThrough: C)

final case class InfluxDBWriteResult[T, C](writeMessage: InfluxDBWriteMessage[T, C], error: Option[String])
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/*
* Copyright (C) 2016-2019 Lightbend Inc. <http://www.lightbend.com>
*/

package akka.stream.alpakka.influxdb.impl

import akka.stream.{Attributes, FlowShape, Inlet, Outlet}
import akka.stream.alpakka.influxdb.{InfluxDBWriteMessage, InfluxDBWriteResult}
import akka.stream.stage.{GraphStage, GraphStageLogic, InHandler, OutHandler}
import org.influxdb.InfluxDB
import org.influxdb.impl.InfluxDBMapper

import scala.collection.immutable
import org.influxdb.BatchOptions
import org.influxdb.dto.Point

/**
* INTERNAL API
*/
private[influxdb] class InfluxDBFlowStage[T, C](
gkatzioura marked this conversation as resolved.
Show resolved Hide resolved
clazz: Option[Class[T]],
influxDB: InfluxDB
) extends GraphStage[FlowShape[immutable.Seq[InfluxDBWriteMessage[T, C]], immutable.Seq[InfluxDBWriteResult[T, C]]]] {
private val in = Inlet[immutable.Seq[InfluxDBWriteMessage[T, C]]]("in")
private val out = Outlet[immutable.Seq[InfluxDBWriteResult[T, C]]]("out")

override val shape = FlowShape(in, out)

override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
clazz match {
case Some(c) => new InfluxDBMapperRecordLogic
case None => new InfluxDBRecordLogic
}

sealed abstract class InfluxDBLogic extends GraphStageLogic(shape) with InHandler with OutHandler {
gkatzioura marked this conversation as resolved.
Show resolved Hide resolved

protected def write(messages: immutable.Seq[InfluxDBWriteMessage[T, C]]): Unit

setHandlers(in, out, this)

override def onPull(): Unit = if (!isClosed(in) && !hasBeenPulled(in)) pull(in)

override def onPush(): Unit = {
val messages = grab(in)
if (messages.nonEmpty) {

influxDB.enableBatch(BatchOptions.DEFAULTS)
write(messages)
val writtenMessages = messages.map(m => new InfluxDBWriteResult(m, Option.empty))
influxDB.close()
gkatzioura marked this conversation as resolved.
Show resolved Hide resolved
push(out, writtenMessages)
}

tryPull(in)
}
}

final class InfluxDBRecordLogic extends InfluxDBLogic {

override protected def write(messages: immutable.Seq[InfluxDBWriteMessage[T, C]]): Unit =
messages.foreach {
case InfluxDBWriteMessage(point: Point, _) => {
influxDB.write(point)
gkatzioura marked this conversation as resolved.
Show resolved Hide resolved
}
case InfluxDBWriteMessage(others: AnyRef, _) =>
failStage(new RuntimeException(s"unexpected type Point or annotated with Measurement required"))
}

}

final class InfluxDBMapperRecordLogic extends InfluxDBLogic {

protected var influxDBMapper: InfluxDBMapper = _

override def preStart(): Unit =
influxDBMapper = new InfluxDBMapper(influxDB)

override protected def write(messages: immutable.Seq[InfluxDBWriteMessage[T, C]]): Unit =
messages.foreach {
case InfluxDBWriteMessage(typeMetric: Any, _) =>
influxDBMapper.save(typeMetric)
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
/*
* Copyright (C) 2016-2019 Lightbend Inc. <http://www.lightbend.com>
*/

package akka.stream.alpakka.influxdb.impl

import java.util.concurrent.TimeUnit

import akka.annotation.InternalApi
import akka.stream.alpakka.influxdb.InfluxDBSettings
import akka.stream.{Attributes, Outlet, SourceShape}
import akka.stream.stage.{GraphStage, GraphStageLogic, OutHandler}
import org.influxdb.InfluxDB
import org.influxdb.dto.{Query, QueryResult}
import org.influxdb.impl.InfluxDBResultMapperHelper

import scala.collection.JavaConverters._

/**
* INTERNAL API
*/
@InternalApi
private[influxdb] final class InfluxDBSourceStage[T](clazz: Class[T],
settings: InfluxDBSettings,
influxDB: InfluxDB,
query: Query)
extends GraphStage[SourceShape[T]] {

val out: Outlet[T] = Outlet("InfluxDB.out")
override val shape = SourceShape(out)

override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new InfluxDBSourceLogic[T](clazz, settings, influxDB, query, out, shape)

}

/**
* INTERNAL API
*/
@InternalApi
private[influxdb] final class InfluxDBSourceLogic[T](clazz: Class[T],
settings: InfluxDBSettings,
influxDB: InfluxDB,
query: Query,
outlet: Outlet[T],
shape: SourceShape[T])
extends GraphStageLogic(shape)
with OutHandler {

setHandler(outlet, this)

var dataRetrieved: Option[QueryResult] = Option.empty
gkatzioura marked this conversation as resolved.
Show resolved Hide resolved
var resultMapperHelper: InfluxDBResultMapperHelper = _

override def preStart(): Unit = {
resultMapperHelper = new InfluxDBResultMapperHelper
resultMapperHelper.cacheClassFields(clazz)

val queryResult = influxDB.query(query)
gkatzioura marked this conversation as resolved.
Show resolved Hide resolved
if (!queryResult.hasError) {
gkatzioura marked this conversation as resolved.
Show resolved Hide resolved
dataRetrieved = Option(queryResult)
}
}

override def onPull(): Unit =
if (dataRetrieved.isEmpty)
gkatzioura marked this conversation as resolved.
Show resolved Hide resolved
completeStage()
else {
val queryResult = dataRetrieved.get
for {
result <- queryResult.getResults.asScala
series <- result.getSeries.asScala
}(
emitMultiple(outlet,
resultMapperHelper.parseSeriesAs(clazz, series, settings.precision).asScala.toIterator)
)

dataRetrieved = Option.empty
}

}

/**
* INTERNAL API
*/
@InternalApi
private[influxdb] final class InfluxDBRawSourceStage(query: Query, influxDB: InfluxDB)
extends GraphStage[SourceShape[QueryResult]] {

val out: Outlet[QueryResult] = Outlet("InfluxDB.out")
override val shape = SourceShape(out)

override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new InfluxDBSourceRawLogic(query, influxDB, out, shape)

}

/**
* INTERNAL API
*/
@InternalApi
private[influxdb] final class InfluxDBSourceRawLogic(query: Query,
influxDB: InfluxDB,
outlet: Outlet[QueryResult],
shape: SourceShape[QueryResult])
extends GraphStageLogic(shape)
with OutHandler {

setHandler(outlet, this)

var dataRetrieved: Option[QueryResult] = Option.empty

override def preStart(): Unit = {
val queryResult = influxDB.query(query)
if (!queryResult.hasError) {
dataRetrieved = Option(queryResult)
}
}

override def onPull(): Unit =
if (dataRetrieved.isEmpty) {
completeStage()
} else {
emit(outlet, dataRetrieved.get)
dataRetrieved = Option.empty
}

}
Loading