Skip to content

Commit

Permalink
Add KairosDB connector
Browse files Browse the repository at this point in the history
- Set default settings as default parameter
- Improve tests

Implement java dsl

[FTP] Critical fix for infinite loop of traversing "." and ".." directories

Upgrade to aws-java-sdk-dynamodb 1.11.106

Allow to pass a `SSLSocketFactory` to `MqttConnectionSettings`

FTP - attribute enrichment of FTPFile akka#153

Add KairosDB connector

= akka#135 Limit parallelism for the SqsSource (akka#163)

* Create a custom thread pool for the SqsSource and limit concurrency with the buffer size

* Provide AWSCredentials for all SqsSource tests

* Provide better java api

* Use ArrayDeque as FIFO queue
Messages for asserts in SqsSourceSettings

* Remove AmazonSQSAsyncClient from factory method

- Correct the javadoc

* Add implicit AmazonSQSAsync to factory method

- It now conforms again with the SqsSink
- Passing the clients from extern seems the preferred way (awslambda module)

* Update documentation to reflect thread pool usage

* Improvements from review by ktoso

- Replace IntStream with Source.range
- Use Sink.head instead of Sink.seq

=pro update akka http to 10.0.5 (akka#230)

Make Travis fail build on code format differences

=sqs fix typo in require in SqsSourceSettings (akka#228)

FTP - toPath sink akka#182

Add GCE Pubsub with publish and subscribe.

improve naming consistency of private vars.

PR feedback. Improve java api, java examples, better json marshalling.

use mapAsyncUnordered

s3: provide access to the returned response

s3: clean up test log

s3: add javadsl for request()

Time goes by, next try

Update connectors.md (akka#237)

Make external libs more visible (reactive kafka) (akka#229)

* Update TOC depth to 3, to show Reactive Kafka

Now people don't notice Kafka is in here since it's "external", expanding the TOC one more level makes it more visible.

WDYT?

* Update index.md

FTP: make lastModified test more robust (fixes akka#236)

Add SqsAckSink (akka#129)

Add SqsAckSink

* update elasticmq version
* update dependencies

Added possibility configure Sftp connection using private key akka#197.

- Added SftpIdentity case class to allow configuring private/public key,
- Added option to configure known_hosts file and test to check its usage.
- Added spec that should fail password based authentication and revert to private key one,
- Added docs paragraph to describe this option.

Upgrade to scalafmt 0.6.6

Remove deperecated binPack.callSite scalafmt setting

Format with updated scalafmt and fixed settings

S3 - add documentation akka#103

Fix alphabetical ordering in the docs

Separate out release docs

S3 path style access akka#64

Make SqsSourceTest less likely to fail

- Reduce amount of sent messages to 1 (multiple batch streaming is tested in the SqsSourceSpec)
- Increase timeout

Introduced "secure" boolean property for S3 which controls whether HTTPS is used akka#247

README: add scaladex, travis badges

And make docs links less scary to click on :)

Add CSV data transformation module (akka#213)

* Alpakka Issue akka#66: CSV component

* Alpakka Issue akka#66: revised CSV parser

* Alpakka Issue akka#60: CSV parsing stage

* wait for line end before issuing line

As the byte string may not contain a whole line the parser needs to read until a line end is reached.

* Add Java API and JUnit test; add a bit of documentation

* Introduce CsvToMap stage; more documentation

* Parse line even without line end at upstream finish

* Add Java API for CsvToMap; more documentation

* More restricted API, incorporated comments by @johanandren

* Format sequence as CSV in ByteString

* Add Scala CSV formatting stage

* Add Java API for CSV formatting; more docs

* Separate enums for Java and Scala DSLs

* Use Flow.fromGraph to construct flow

* Rename CsvFraming to CsvParsing

* Check for Byte Order Mark and ignore it for UTF-8

* Emit Byte Order Marks in formatting; CsvFormatting is just a map

* Byte Order Mark for Java API

* Add line number to error messages; sample files exported from third party software

* Use Charset directly instead of name

* csv: autoformatted files

* simplified dependency declaration

Fixes akka#60.

SQS flows + Embedded ElasticMQ akka#255

* Add a flow stage and use ElasticMQ
* Use flow-based stage for ACKs
* Use AmazonSQSAsync instead of AmazonSQSAsyncClient
* Using embedded ElasticMQ for tests

add SNS connector with publish sink akka#204

Await futures before performing assertion (fixes akka#235)

When an assertion fails after the test has already succeeded it will be
ignored, so Await the future before continuing with the check.

Document 'docker-compose' for running tests

Fail MqttSourceStage mat. value on connection loss

And increase the timeout. Might help with akka#189, or otherwise help generate a
better error message when it does happen again.

Ref akka#2 add IronMq integration

Refs akka#2 add at-least-one semantic to IronMq connector

Improve documentation and test coverage for IronMq ref akka#2

- Document the IronMq domain classes
- Document IronMq client
- Test the at-least-once producer/consumer mechanism
- Improve the IronMQ connector documentation

Ref akka#2 Preserve newline in reference.conf

Ref akka#2 Make seure the actor system is fully terminated after each test

Ref akka#2 Reformat code

Refs akka#2 define a different Committable and CommittableMessage for Java and Scala DSL

Refs akka#2 Fix typos in IronMQ documentations

Refs akka#2 Remove non needed Environment variables from TravisCI config file

Refs akka#2 Add a simple Java test and refactor the Java DSL to looks better in Java

FTP: Attempt to fix flaky test on Travis

Link to scaladex (akka#266)

s3: support for encryption, storage class, custom headers akka#109

s3: Added support for partial file download from S3 akka#264 (akka#265)

Add version info and links in index page (akka#273)

FTP - append mode for toPath sink + improved upstream failure handling akka#207

Fix broken recovery of EventSource (sse)

Replace scala.binaryVersion with scalaBinaryVersion (see akka#278)

Fix minor typo in alpakka MQTT Connector doc

Add Flow to support RabbitMQ RPC workflow akka#160

Changes Amqp sinks to materialize to Future[Done]. As currently it was
very difficult to determine when/if a sink failed due to a amqp error.

AMQP: add more options to configuration of the ConnectionFactory, akka#191

Directory sources akka#272

sse: Upgrade to Akka SSE 3 and make test more robust

CSV: Fixes ignored second double quote

S3: add listBucket method to S3 library (akka#253)

* Added recursive listBucket call to get all keys under a specific prefix.

* Properly using the request URI method and constructing queries with the Query type

* Added tests around query parsing

* Fixed formatting and removed recoverWithRetries on listbucket calls as they are already retried on the underlying layer with max-retries

* Using signAndGetAs instead of signAndGet as to not duplicate logic.

* Implemented quick fixes based on comments. Removed recursive call to get keys and used unfoldAsync to get all keys to run in constant memory.

* Added execution context. Fixed broken test

* Fixed formatting error.

* Cleaned up lisBucket call by added a ListBucketState object instead of the brutal type signature from earlier.

* Moved trait for listBucket into the def itself as to remove it from the public namespace.

azure-storage-queue connector akka#280

Add attribute parameters to sqs source settings akka#302

Formatting fix for akka#302

Streaming XML parser and utilities.

Prepare XML parser to join Alpakka family

Remove duplicated region argument in client methods akka#297

Build with Akka 2.5 as well

Add Azure Storage Queue documentation to TOC

Stub documentation for S3.listBucket

S3: fix formatting

Run the deployment only against Akka 2.4

PubSub: Add support for emulator host variables

Initial commit for apache geode connector

CSV: Emit all lines on completion akka#315

XML: make code in tests more consistent

Add whitesource plugin

Merge branch 'master' into add-kairosdb-connector

Add copyright header

update docker-compose

Make execution context optional in java api

Make execution context optional in scala api

remove ec from sink spec
  • Loading branch information
s-soroosh authored and Soroosh Sarabadani committed Jun 12, 2017
1 parent 8364a47 commit c7e9d97
Show file tree
Hide file tree
Showing 366 changed files with 15,590 additions and 1,538 deletions.
1 change: 0 additions & 1 deletion .scalafmt.conf
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,3 @@ indentOperator = spray
maxColumn = 120
rewrite.rules = [RedundantBraces, RedundantParens, SortImports]
unindentTopLevelOperators = true
binPack.callSite = true
11 changes: 8 additions & 3 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ language: scala
sudo: required

scala:
- "2.11.8"
- "2.12.1"
- "2.11.11"
- "2.12.2"

jdk:
- oraclejdk8
Expand All @@ -16,7 +16,8 @@ before_install:
- docker-compose up -d

script:
- sbt -J-XX:ReservedCodeCacheSize=128m ++$TRAVIS_SCALA_VERSION ";test;docs/paradox"
- sbt -J-XX:ReservedCodeCacheSize=128m ++$TRAVIS_SCALA_VERSION ";test:compile;docs/paradox"
- git diff --exit-code --color || { echo "[error] Unformatted code found. Please run 'test:compile' and commit the reformatted code."; false; } && sbt -J-XX:ReservedCodeCacheSize=128m ++$TRAVIS_SCALA_VERSION ";test"

before_cache:
- find $HOME/.ivy2 -name "ivydata-*.properties" -print -delete
Expand All @@ -34,8 +35,12 @@ deploy:
on:
tags: true
repo: akka/alpakka
condition: $AKKA_SERIES = 2.4

env:
matrix:
- AKKA_SERIES=2.4
- AKKA_SERIES=2.5
global:
# encrypt with: travis encrypt BINTRAY_USER=...
- secure: "DLc1v2XMWrxtbPzVfoArkSQiwzT+O7/rwT7APqoA68K5A1pwxDw5BD3sOLpd6rKbPBulmwDAAQ1am3uE6lbmymCTMe8bvA3iHWy0Jg8baK4nC46EKFLabQz4Uaoq/bs92YGW1QH12rbBPyTA7uvPpvhmb8uyUbJqjT3uyROqHhHfhll89dsPVaK0NCY9DN2K356M87z7cp1SvtfAiF8qaNjNmlOfWjIEUSO1SiiqQBjT/GJvhrrW32J4htf5CqEEYBSoN+fqTCwwdo3GpOFFq+Zh5TSoyvKyi5xo+LVq0hW2SAYTHMMB5WhLd1cvQ0bvK6FpUhyfMcON/8ad3R7paNVxLpIGEu8hsUYGsENjdXaEK4g1TurDeVekWJRYyGMCmy2cCg5yHC+Pb56qOgpM7gmZACCitLap1jXUCWI27q+7cvxuxaBb6EPiDuWGNQXHH+OkbD/s4RJO3kQfv1Si64RON8+bZ+fqOWLK+BuakYi3YMukFgPngQozX3aMZOVcEBKJZtquwewCKsNV2VGItJxrp3MDol0HhuNvdUvIf5UydZZCyJTo1DBsK49yDUj7iKOEJk5Qc5oYfEhSEKcGRZs8Yl9n/kIJUh8ZYTT+35rwD27dHgsEkPkCEGpbMP/r0JewkWH7q9FfajPIYOlEpoP6tCsdP4mBOyF1uZH7omM="
Expand Down
11 changes: 4 additions & 7 deletions CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,10 @@ This is the process for committing code into master.

In case of questions about the contribution process or for discussion of specific issues please visit the [akka/dev gitter chat](https://gitter.im/akka/dev).

## Testing

Some tests (cassandra, rabbitmq, mqtt, dynamodb) require a server to be running. A set of servers can easily be started with `docker-compose up`.

## Pull Request Requirements

For a Pull Request to be considered at all it has to meet these requirements:
Expand Down Expand Up @@ -83,10 +87,3 @@ Example:
1. [Travis CI](https://travis-ci.org/akka/alpakka) automatically merges the code, builds it, runs the tests and sets Pull Request status accordingly of results in GitHub.
2. [Scalafmt](https://olafurpg.github.io/scalafmt) enforces some of the code style rules.
3. [sbt-header plugin](https://github.com/sbt/sbt-header) manages consistent copyright headers in every source file.

## Releasing

1. Create a [new release](https://github.com/akka/alpakka/releases/new) with the next tag version (e.g. `v0.3`), title and release decsription including notable changes mentioning external contributors.
2. Travis CI will start a [CI build](https://travis-ci.org/akka/alpakka/builds) for the new tag and publish artifacts to Bintray.
3. Login to [Bintray](https://bintray.com/akka/maven/alpakka) and sync artifacts to Maven Central.
4. Change documentation links to point to the latest version in the README.md and the Github project page.
5 changes: 3 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,15 @@ Alpakka: Akka Streams Connectors
================================

[![Gitter](https://badges.gitter.im/Join%20Chat.svg)](https://gitter.im/akka/akka?utm_source=badge&utm_medium=badge&utm_campaign=pr-badge&utm_content=badge)
Latest version: [ ![Download](https://api.bintray.com/packages/akka/maven/alpakka/images/download.svg) ](https://bintray.com/akka/maven/alpakka/_latestVersion)
Latest version: [ ![Download](https://api.bintray.com/packages/akka/maven/alpakka/images/download.svg) ](https://bintray.com/akka/maven/alpakka/_latestVersion) [![Latest version](https://index.scala-lang.org/akka/alpakka/alpakka/latest.svg)](https://index.scala-lang.org/akka/alpakka/alpakka) [![Build Status](https://travis-ci.org/akka/alpakka.svg?branch=master)](https://travis-ci.org/akka/alpakka)


This project provides a home to Akka Streams connectors to various technologies, protocols or libraries.

Documentation
-------------

See [reference](http://developer.lightbend.com/docs/alpakka/current/) and [API](http://developer.lightbend.com/docs/api/alpakka/current/akka/stream/alpakka/index.html) documentation pages.
See our [documentation](http://developer.lightbend.com/docs/alpakka/current/) and [scaladoc](http://developer.lightbend.com/docs/api/alpakka/current/akka/stream/alpakka/index.html) pages.

Contributions & Maintainers
---------------------------
Expand Down
14 changes: 14 additions & 0 deletions RELEASING.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
## Releasing

1. Create a [new release](https://github.com/akka/alpakka/releases/new) with the next tag version (e.g. `v0.3`), title and release description including notable changes mentioning external contributors.
2. Travis CI will start a [CI build](https://travis-ci.org/akka/alpakka/builds) for the new tag and publish artifacts to Bintray.
3. Login to [Bintray](https://bintray.com/akka/maven/alpakka) and sync artifacts to Maven Central.

### Releasing only updated docs

It is possible to release a revised documentation to the already existing release.

1. Create a new branch from a release tag. If a revised documentation is for the `v0.3` release, then the name of the new branch should be `docs/v0.3`.
2. Make all of the required changes to the documentation.
3. Add and commit `version.sbt` file that sets the version to the one, that is being revised. For example `version in ThisBuild := "0.1"`.
4. Push the branch. Tech Hub will see the new branch and will build and publish revised documentation.
39 changes: 34 additions & 5 deletions amqp/src/main/scala/akka/stream/alpakka/amqp/AmqpConnector.scala
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (C) 2016 Lightbend Inc. <http://www.lightbend.com>
* Copyright (C) 2016-2017 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.stream.alpakka.amqp

Expand All @@ -15,19 +15,47 @@ private[amqp] trait AmqpConnector {
val factory = new ConnectionFactory
settings match {
case AmqpConnectionUri(uri) => factory.setUri(uri)
case AmqpConnectionDetails(host, port, maybeCredentials, maybeVirtualHost, sslProtocol) =>
factory.setHost(host)
factory.setPort(port)
case AmqpConnectionDetails(_,
maybeCredentials,
maybeVirtualHost,
sslProtocol,
requestedHeartbeat,
connectionTimeout,
handshakeTimeout,
shutdownTimeout,
networkRecoveryInterval,
automaticRecoveryEnabled,
topologyRecoveryEnabled,
exceptionHandler) =>
maybeCredentials.foreach { credentials =>
factory.setUsername(credentials.username)
factory.setPassword(credentials.password)
}
maybeVirtualHost.foreach(factory.setVirtualHost)
sslProtocol.foreach(factory.useSslProtocol)
requestedHeartbeat.foreach(factory.setRequestedHeartbeat)
connectionTimeout.foreach(factory.setConnectionTimeout)
handshakeTimeout.foreach(factory.setHandshakeTimeout)
shutdownTimeout.foreach(factory.setShutdownTimeout)
networkRecoveryInterval.foreach(factory.setNetworkRecoveryInterval)
automaticRecoveryEnabled.foreach(factory.setAutomaticRecoveryEnabled)
topologyRecoveryEnabled.foreach(factory.setTopologyRecoveryEnabled)
exceptionHandler.foreach(factory.setExceptionHandler)
case DefaultAmqpConnection => // leave it be as is
}
factory
}

def newConnection(factory: ConnectionFactory, settings: AmqpConnectionSettings): Connection = settings match {
case a: AmqpConnectionDetails => {
import scala.collection.JavaConverters._
if (a.hostAndPortList.nonEmpty)
factory.newConnection(a.hostAndPortList.map(hp => new Address(hp._1, hp._2)).asJava)
else
throw new IllegalArgumentException("You need to supply at least one host/port pair.")
}
case _ => factory.newConnection()
}
}

/**
Expand All @@ -40,12 +68,13 @@ private[amqp] trait AmqpConnectorLogic { this: GraphStageLogic =>

def settings: AmqpConnectorSettings
def connectionFactoryFrom(settings: AmqpConnectionSettings): ConnectionFactory
def newConnection(factory: ConnectionFactory, settings: AmqpConnectionSettings): Connection
def whenConnected(): Unit

final override def preStart(): Unit = {
val factory = connectionFactoryFrom(settings.connectionSettings)

connection = factory.newConnection()
connection = newConnection(factory, settings.connectionSettings)
channel = connection.createChannel()

val connShutdownCallback = getAsyncCallback[ShutdownSignalException] { ex =>
Expand Down
193 changes: 193 additions & 0 deletions amqp/src/main/scala/akka/stream/alpakka/amqp/AmqpRpcFlowStage.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,193 @@
/*
* Copyright (C) 2016-2017 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.stream.alpakka.amqp

import akka.stream._
import akka.stream.stage._
import akka.util.ByteString
import com.rabbitmq.client.AMQP.BasicProperties
import com.rabbitmq.client._

import scala.collection.mutable
import scala.concurrent.{Future, Promise}

object AmqpRpcFlowStage {

private val defaultAttributes =
Attributes.name("AmqpRpcFlow").and(ActorAttributes.dispatcher("akka.stream.default-blocking-io-dispatcher"))
}

/**
* This stage materializes to a Future[String], which is the name of the private exclusive queue used for RPC communication
*
* @param responsesPerMessage The number of responses that should be expected for each message placed on the queue. This
* can be overridden per message by including `expectedReplies` in the the header of the [[OutgoingMessage]]
*/
final class AmqpRpcFlowStage(settings: AmqpSinkSettings, bufferSize: Int, responsesPerMessage: Int = 1)
extends GraphStageWithMaterializedValue[FlowShape[OutgoingMessage, IncomingMessage], Future[String]]
with AmqpConnector { stage =>

import AmqpRpcFlowStage._

val in = Inlet[OutgoingMessage]("AmqpRpcFlow.in")
val out = Outlet[IncomingMessage]("AmqpRpcFlow.out")

override def shape: FlowShape[OutgoingMessage, IncomingMessage] = FlowShape.of(in, out)

override protected def initialAttributes: Attributes = defaultAttributes

override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, Future[String]) = {
val promise = Promise[String]()
(new GraphStageLogic(shape) with AmqpConnectorLogic {

override val settings = stage.settings
private val exchange = settings.exchange.getOrElse("")
private val routingKey = settings.routingKey.getOrElse("")
private val queue = mutable.Queue[IncomingMessage]()
private var queueName: String = _
private var outstandingMessages = 0

override def connectionFactoryFrom(settings: AmqpConnectionSettings) = stage.connectionFactoryFrom(settings)
override def newConnection(factory: ConnectionFactory, settings: AmqpConnectionSettings): Connection =
stage.newConnection(factory, settings)

override def whenConnected(): Unit = {
import scala.collection.JavaConverters._
val shutdownCallback = getAsyncCallback[(String, Option[ShutdownSignalException])] {
case (consumerTag, Some(e)) =>
val ex =
new RuntimeException(s"Consumer $queueName with consumerTag $consumerTag shut down unexpectedly", e)
promise.tryFailure(ex)
failStage(ex)
case (consumerTag, None) =>
val ex = new RuntimeException(s"Consumer $queueName with consumerTag $consumerTag shut down unexpectedly")
promise.tryFailure(ex)
failStage(ex)
}

pull(in)

// we have only one consumer per connection so global is ok
channel.basicQos(bufferSize, true)
val consumerCallback = getAsyncCallback(handleDelivery)

val amqpSourceConsumer = new DefaultConsumer(channel) {
override def handleDelivery(
consumerTag: String,
envelope: Envelope,
properties: BasicProperties,
body: Array[Byte]
): Unit =
consumerCallback.invoke(IncomingMessage(ByteString(body), envelope, properties))

override def handleCancel(consumerTag: String): Unit =
// non consumer initiated cancel, for example happens when the queue has been deleted.
shutdownCallback.invoke((consumerTag, None))

override def handleShutdownSignal(consumerTag: String, sig: ShutdownSignalException): Unit =
// "Called when either the channel or the underlying connection has been shut down."
shutdownCallback.invoke((consumerTag, Option(sig)))
}

// Create an exclusive queue with a randomly generated name for use as the replyTo portion of RPC
queueName = channel
.queueDeclare(
"",
false,
true,
true,
Map.empty[String, AnyRef].asJava
)
.getQueue

channel.basicConsume(
queueName,
amqpSourceConsumer
)
promise.success(queueName)
}

def handleDelivery(message: IncomingMessage): Unit =
if (isAvailable(out)) {
pushAndAckMessage(message)
} else {
if (queue.size + 1 > bufferSize) {
failStage(new RuntimeException(s"Reached maximum buffer size $bufferSize"))
} else {
queue.enqueue(message)
}
}

def pushAndAckMessage(message: IncomingMessage): Unit = {
push(out, message)
// ack it as soon as we have passed it downstream
// TODO ack less often and do batch acks with multiple = true would probably be more performant
channel.basicAck(
message.envelope.getDeliveryTag,
false // just this single message
)
outstandingMessages -= 1

if (outstandingMessages == 0 && isClosed(in)) {
completeStage()
}
}

setHandler(
out,
new OutHandler {
override def onPull(): Unit =
if (queue.nonEmpty) {
pushAndAckMessage(queue.dequeue())
}
}
)

setHandler(
in,
new InHandler {
// We don't want to finish since we're still waiting
// on incoming messages from rabbit. However, if we
// haven't processed a message yet, we do want to complete
// so that we don't hang.
override def onUpstreamFinish(): Unit =
if (queue.isEmpty && outstandingMessages == 0) super.onUpstreamFinish()

override def onPush(): Unit = {
val elem = grab(in)
val props = elem.props.getOrElse(new BasicProperties()).builder.replyTo(queueName).build()
channel.basicPublish(
exchange,
routingKey,
elem.mandatory,
elem.immediate,
props,
elem.bytes.toArray
)

val expectedResponses: Int = {
val headers = props.getHeaders
if (headers == null) {
responsesPerMessage
} else {
val r = headers.get("expectedReplies")
if (r != null) {
r.asInstanceOf[Int]
} else {
responsesPerMessage
}
}
}

outstandingMessages += expectedResponses
pull(in)
}
}
)
}, promise.future)
}

override def toString: String = "AmqpRpcFlow"

}
Loading

0 comments on commit c7e9d97

Please sign in to comment.