-
Notifications
You must be signed in to change notification settings - Fork 645
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Influxdb connector #1680
Influxdb connector #1680
Conversation
I get a failure both on the compatibility check (it also happens on master). Any help on fixing it, it will be greatly appreciated [error] (awslambda / mimaReportBinaryIssues) akka-stream-alpakka-awslambda: Binary compatibility check failed!
[error] (influxdb / mimaPreviousClassfiles) sbt.librarymanagement.ResolveException: unresolved dependency: com.lightbend.akka#akka-stream-alpakka-influxdb_2.12;1.0-M3: not found
[error] (dynamodb / mimaReportBinaryIssues) akka-stream-alpakka-dynamodb: Binary compatibility check failed!
[error] (reference / mimaPreviousClassfiles) sbt.librarymanagement.ResolveException: unresolved dependency: com.lightbend.akka#akka-stream-alpakka-reference_2.12;1.0-M3: not found |
Thank you for this PR. You may switch off the MiMa check for the new module with |
Please add the new module to |
…xDB Connector Removed InfluxDBWrite Settings due to not using batch size on InfluxDB Connector
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great, I think it is better to keep it simple and focus on the most crucial use cases. This is almost there but needs documentation.
influxdb/src/main/scala/akka/stream/alpakka/influxdb/impl/InfluxDbSourceStage.scala
Show resolved
Hide resolved
* INTERNAL API | ||
*/ | ||
@InternalApi | ||
private[influxdb] final class InfluxDbRawSourceStage(query: Query, influxDB: InfluxDB) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe InfluxDbSourceLogic
and InfluxDbRawSourceStage
should have a common base class as they share most of the code.
val messages = grab(in) | ||
if (messages.nonEmpty) { | ||
|
||
influxDB.enableBatch(BatchOptions.DEFAULTS) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would it be important to make these settings configurable?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This was removed as explained here
influxDB.enableBatch(BatchOptions.DEFAULTS) | ||
write(messages) | ||
val writtenMessages = messages.map(m => new InfluxDbWriteResult(m, None)) | ||
influxDB.close() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This API call is a bit surprising, but that might be just the way it is. Does this replace the disableBatch
call which the API docs of enableBatch
require?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@ennru
Exactly. Once close() is called the batch is disabled and the messages are send to the database.
However this is no longer needed and It shall be removed.
I extracted the functionality, that the driver provided with batching enabled, and is currently executed by the connector itself.
This means that on a onPush call, the messages are written to the database in one http request.
Therefore the user has control over the batch size and the delay through the connector without the need to be aware of the driver internals.
…e() as the InfluxDB connector handles the batching operation by itself.
…st pull on InfluxDb connector
…ogic for InfluxDb connector
@gkatzioura This looks good now, I propose we'll merge it and mark it as "API may change" so we keep the door open to apply more of @adkafka's suggestions. |
@ennru |
Thank's for the heads-up about the new version. I'll put a not in the documentation about both "API may change" and the upcoming InfluxDB version. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM.
Thank you so much for this effort! We've seen the interest for InfluxDB support for a while, but didn't get to look into it. Highly appreciated. |
@ennru |
A new connector to stream data to and from InfluxDB.
Purpose
This is the Alpakka connector for InfluxDB
References
References #1054
Changes
Added the connector for InfluxDB. Added InfluxDB on docker compose configuration.
Background Context
Commits are based on the source code of existing connectors like Elasticsearch and OrientDB.
Sources are created through queries. Inserts can be done both through models using the InfluxDB mapper or through Points. Batch operations are possible and enabled.