-
Notifications
You must be signed in to change notification settings - Fork 28.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[STREAMING] SPARK-1729. Make Flume pull data from source, rather than the current pu... #807
Conversation
… push model Currently Spark uses Flume's internal Avro Protocol to ingest data from Flume. If the executor running the receiver fails, it currently has to be restarted on the same node to be able to receive data. This commit adds a new Sink which can be deployed to a Flume agent. This sink can be polled by a new DStream that is also included in this commit. This model ensures that data can be pulled into Spark from Flume even if the receiver is restarted on a new node. This also allows the receiver to receive data on multiple threads for better performance.
Can one of the admins verify this patch? |
… push model Update to the previous patch fixing some error cases and also excluding Netty dependencies. Also updated the unit tests.
… push model Exclude IO Netty in the Flume sink.
Jenkins, test this please. |
Merged build triggered. |
Merged build started. |
Merged build finished. |
Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/15074/ |
Not sure why the build failed with a dependency resolution issue. It seems to work locally when I do sbt assembly locally. Does the order of modules specified in the build spec matter? Any advice from someone more familiar with sbt?
|
@@ -0,0 +1,82 @@ | |||
<?xml version="1.0" encoding="UTF-8"?> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pardon for jumping in with comments, but why a new module instead of external/flume?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The sink will be deployed to a Flume agent and not within the spark application. Adding it in external/flume would require that all of the spark dependencies be bundled with the jar, while keeping this module separate (which does not depend on the rest of Spark) allows the user to simply deploy this jar to the Flume plugins directory. In fact, this module does not have any dependencies that Flume already does not pull in by default.
Thanks for the comments @srowen! |
… push model Removing previousArtifact from build spec, so that the build runs fine.
The latest commit should fix the build issue. |
… push model Updated Maven build to be equivalent of the sbt build.
… push model Fix build with maven.
I am going to update this to support polling from multiple Flume agents rather than just one. |
… push model Added support for polling several Flume agents from a single receiver.
The latest commit adds support for polling several flume agents. |
val client = SpecificRequestor.getClient(classOf[SparkFlumeProtocol.Callback], transceiver) | ||
connectionBuilder += new FlumeConnection(transceiver, client) | ||
}) | ||
connections = connectionBuilder.result() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
connections
could be built in a more Scala-like functional style. Something like this.
connections = addresses.map { host => ..... new FlumeConnection(....) }.toArray
No need for ArrayBuilder
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This ends up in data being copied, but since it is one-time I guess it is ok.
QA results for PR 807: |
The binary compatibility test failed because it tried to compare current version of flume-sink with previous version (which does not exist). So we need to make an exclusion here. Let me figure this out with @pwendell |
Thanks @tdas! I was trying to figure it out when I saw the failure - but I can't see a place to add the exclusions. |
Added sparkSink to mima excludes. This should fix the Jenkins failure. |
QA tests have started for PR 807. This patch merges cleanly. |
Yeah, I will merge this as soon as this passes. |
QA results for PR 807: |
QA tests have started for PR 807. This patch merges cleanly. |
All right! Merging this! |
QA results for PR 807: |
QA tests have started for PR 807. This patch merges cleanly. |
QA results for PR 807: |
Conflicts: project/SparkBuild.scala
QA tests have started for PR 807. This patch merges cleanly. |
QA results for PR 807: |
|
… the current pu... ...sh model Currently Spark uses Flume's internal Avro Protocol to ingest data from Flume. If the executor running the receiver fails, it currently has to be restarted on the same node to be able to receive data. This commit adds a new Sink which can be deployed to a Flume agent. This sink can be polled by a new DStream that is also included in this commit. This model ensures that data can be pulled into Spark from Flume even if the receiver is restarted on a new node. This also allows the receiver to receive data on multiple threads for better performance. Author: Hari Shreedharan <harishreedharan@gmail.com> Author: Hari Shreedharan <hshreedharan@apache.org> Author: Tathagata Das <tathagata.das1565@gmail.com> Author: harishreedharan <hshreedharan@cloudera.com> Closes apache#807 from harishreedharan/master and squashes the following commits: e7f70a3 [Hari Shreedharan] Merge remote-tracking branch 'asf-git/master' 96cfb6f [Hari Shreedharan] Merge remote-tracking branch 'asf/master' e48d785 [Hari Shreedharan] Documenting flume-sink being ignored for Mima checks. 5f212ce [Hari Shreedharan] Ignore Spark Sink from mima. 981bf62 [Hari Shreedharan] Merge remote-tracking branch 'asf/master' 7a1bc6e [Hari Shreedharan] Fix SparkBuild.scala a082eb3 [Hari Shreedharan] Merge remote-tracking branch 'asf/master' 1f47364 [Hari Shreedharan] Minor fixes. 73d6f6d [Hari Shreedharan] Cleaned up tests a bit. Added some docs in multiple places. 65b76b4 [Hari Shreedharan] Fixing the unit test. e59cc20 [Hari Shreedharan] Use SparkFlumeEvent instead of the new type. Also, Flume Polling Receiver now uses the store(ArrayBuffer) method. f3c99d1 [Hari Shreedharan] Merge remote-tracking branch 'asf/master' 3572180 [Hari Shreedharan] Adding a license header, making Jenkins happy. 799509f [Hari Shreedharan] Fix a compile issue. 3c5194c [Hari Shreedharan] Merge remote-tracking branch 'asf/master' d248d22 [harishreedharan] Merge pull request apache#1 from tdas/flume-polling 10b6214 [Tathagata Das] Changed public API, changed sink package, and added java unit test to make sure Java API is callable from Java. 1edc806 [Hari Shreedharan] SPARK-1729. Update logging in Spark Sink. 8c00289 [Hari Shreedharan] More debug messages 393bd94 [Hari Shreedharan] SPARK-1729. Use LinkedBlockingQueue instead of ArrayBuffer to keep track of connections. 120e2a1 [Hari Shreedharan] SPARK-1729. Some test changes and changes to utils classes. 9fd0da7 [Hari Shreedharan] SPARK-1729. Use foreach instead of map for all Options. 8136aa6 [Hari Shreedharan] Adding TransactionProcessor to map on returning batch of data 86aa274 [Hari Shreedharan] Merge remote-tracking branch 'asf/master' 205034d [Hari Shreedharan] Merging master in 4b0c7fc [Hari Shreedharan] FLUME-1729. New Flume-Spark integration. bda01fc [Hari Shreedharan] FLUME-1729. Flume-Spark integration. 0d69604 [Hari Shreedharan] FLUME-1729. Better Flume-Spark integration. 3c23c18 [Hari Shreedharan] SPARK-1729. New Spark-Flume integration. 70bcc2a [Hari Shreedharan] SPARK-1729. New Flume-Spark integration. d6fa3aa [Hari Shreedharan] SPARK-1729. New Flume-Spark integration. e7da512 [Hari Shreedharan] SPARK-1729. Fixing import order 9741683 [Hari Shreedharan] SPARK-1729. Fixes based on review. c604a3c [Hari Shreedharan] SPARK-1729. Optimize imports. 0f10788 [Hari Shreedharan] SPARK-1729. Make Flume pull data from source, rather than the current push model 87775aa [Hari Shreedharan] SPARK-1729. Make Flume pull data from source, rather than the current push model 8df37e4 [Hari Shreedharan] SPARK-1729. Make Flume pull data from source, rather than the current push model 03d6c1c [Hari Shreedharan] SPARK-1729. Make Flume pull data from source, rather than the current push model 08176ad [Hari Shreedharan] SPARK-1729. Make Flume pull data from source, rather than the current push model d24d9d4 [Hari Shreedharan] SPARK-1729. Make Flume pull data from source, rather than the current push model 6d6776a [Hari Shreedharan] SPARK-1729. Make Flume pull data from source, rather than the current push model
### What changes were proposed in this pull request? This PR removes sbt-avro plugin dependency. In the current master, Build with SBT depends on the plugin but it seems never used. Originally, the plugin was introduced for `flume-sink` in SPARK-1729 (#807) but `flume-sink` is no longer in Spark repository. After SBT was upgraded to 1.x in SPARK-21708 (#29286), `avroGenerate` part was introduced in `object SQL` in `SparkBuild.scala`. It's confusable but I understand `Test / avroGenerate := (Compile / avroGenerate).value` is for suppressing sbt-avro for `sql` sub-module. In fact, Test/compile will fail if `Test / avroGenerate :=(Compile / avroGenerate).value` is commented out. `sql` sub-module contains `parquet-compat.avpr` and `parquet-compat.avdl` but according to `sql/core/src/test/README.md`, they are intended to be handled by `gen-avro.sh`. Also, in terms of Maven build, there seems to be no definition to handle `*.avpr` or `*.avdl`. Based on the above, I think we can remove `sbt-avro`. ### Why are the changes needed? If `sbt-avro` is really no longer used, it's confusable that `sbt-avro` related configurations are in `SparkBuild.scala`. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? GA. Closes #33190 from sarutak/remove-avro-from-sbt. Authored-by: Kousuke Saruta <sarutak@oss.nttdata.com> Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
### What changes were proposed in this pull request? This PR removes sbt-avro plugin dependency. In the current master, Build with SBT depends on the plugin but it seems never used. Originally, the plugin was introduced for `flume-sink` in SPARK-1729 (#807) but `flume-sink` is no longer in Spark repository. After SBT was upgraded to 1.x in SPARK-21708 (#29286), `avroGenerate` part was introduced in `object SQL` in `SparkBuild.scala`. It's confusable but I understand `Test / avroGenerate := (Compile / avroGenerate).value` is for suppressing sbt-avro for `sql` sub-module. In fact, Test/compile will fail if `Test / avroGenerate :=(Compile / avroGenerate).value` is commented out. `sql` sub-module contains `parquet-compat.avpr` and `parquet-compat.avdl` but according to `sql/core/src/test/README.md`, they are intended to be handled by `gen-avro.sh`. Also, in terms of Maven build, there seems to be no definition to handle `*.avpr` or `*.avdl`. Based on the above, I think we can remove `sbt-avro`. ### Why are the changes needed? If `sbt-avro` is really no longer used, it's confusable that `sbt-avro` related configurations are in `SparkBuild.scala`. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? GA. Closes #33190 from sarutak/remove-avro-from-sbt. Authored-by: Kousuke Saruta <sarutak@oss.nttdata.com> Signed-off-by: Dongjoon Hyun <dongjoon@apache.org> (cherry picked from commit 6c4616b) Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
...sh model
Currently Spark uses Flume's internal Avro Protocol to ingest data from Flume. If the executor running the
receiver fails, it currently has to be restarted on the same node to be able to receive data.
This commit adds a new Sink which can be deployed to a Flume agent. This sink can be polled by a new
DStream that is also included in this commit. This model ensures that data can be pulled into Spark from
Flume even if the receiver is restarted on a new node. This also allows the receiver to receive data on
multiple threads for better performance.