Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion dev/run-tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,8 @@ def build_spark_sbt(hadoop_version):
sbt_goals = ["package",
"assembly/assembly",
"streaming-kafka-assembly/assembly",
"streaming-flume-assembly/assembly"]
"streaming-flume-assembly/assembly",
"streaming-kinesis-asl-assembly/assembly"]
profiles_and_goals = build_profiles + sbt_goals

print("[info] Building Spark (w/Hive 0.13.1) using SBT with these arguments: ",
Expand Down
9 changes: 8 additions & 1 deletion dev/sparktestsupport/modules.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ def contains_file(self, filename):
dependencies=[],
source_file_regexes=[
"extras/kinesis-asl/",
"extras/kinesis-asl-assembly/",
],
build_profile_flags=[
"-Pkinesis-asl",
Expand Down Expand Up @@ -300,7 +301,13 @@ def contains_file(self, filename):

pyspark_streaming = Module(
name="pyspark-streaming",
dependencies=[pyspark_core, streaming, streaming_kafka, streaming_flume_assembly],
dependencies=[
pyspark_core,
streaming,
streaming_kafka,
streaming_flume_assembly,
streaming_kinesis_asl
],
source_file_regexes=[
"python/pyspark/streaming"
],
Expand Down
19 changes: 19 additions & 0 deletions docs/streaming-kinesis-integration.md
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,17 @@ A Kinesis stream can be set up at one of the valid Kinesis endpoints with 1 or m
See the [API docs](api/java/index.html?org/apache/spark/streaming/kinesis/KinesisUtils.html)
and the [example]({{site.SPARK_GITHUB_URL}}/tree/master/extras/kinesis-asl/src/main/java/org/apache/spark/examples/streaming/JavaKinesisWordCountASL.java). Refer to the next subsection for instructions to run the example.

</div>
<div data-lang="python" markdown="1">
from pyspark.streaming.kinesis import KinesisUtils, InitialPositionInStream

kinesisStream = KinesisUtils.createStream(
streamingContext, [Kinesis app name], [Kinesis stream name], [endpoint URL],
[region name], [initial position], [checkpoint interval], StorageLevel.MEMORY_AND_DISK_2)

See the [API docs](api/python/pyspark.streaming.html#pyspark.streaming.kinesis.KinesisUtils)
and the [example]({{site.SPARK_GITHUB_URL}}/tree/master/extras/kinesis-asl/src/main/python/examples/streaming/kinesis_wordcount_asl.py). Refer to the next subsection for instructions to run the example.

</div>
</div>

Expand Down Expand Up @@ -135,6 +146,14 @@ To run the example,

bin/run-example streaming.JavaKinesisWordCountASL [Kinesis app name] [Kinesis stream name] [endpoint URL]

</div>
<div data-lang="python" markdown="1">

bin/spark-submit --jars extras/kinesis-asl/target/scala-*/\
spark-streaming-kinesis-asl-assembly_*.jar \
extras/kinesis-asl/src/main/python/examples/streaming/kinesis_wordcount_asl.py \
[Kinesis app name] [Kinesis stream name] [endpoint URL] [region name]

</div>
</div>

Expand Down
103 changes: 103 additions & 0 deletions extras/kinesis-asl-assembly/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent_2.10</artifactId>
<version>1.5.0-SNAPSHOT</version>
<relativePath>../../pom.xml</relativePath>
</parent>

<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kinesis-asl-assembly_2.10</artifactId>
<packaging>jar</packaging>
<name>Spark Project Kinesis Assembly</name>
<url>http://spark.apache.org/</url>

<properties>
<sbt.project.name>streaming-kinesis-asl-assembly</sbt.project.name>
</properties>

<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kinesis-asl_${scala.binary.version}</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_${scala.binary.version}</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
</dependencies>

<build>
<outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
<testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<configuration>
<shadedArtifactAttached>false</shadedArtifactAttached>
<outputFile>${project.build.directory}/scala-${scala.binary.version}/spark-streaming-kinesis-asl-assembly-${project.version}.jar</outputFile>
<artifactSet>
<includes>
<include>*:*</include>
</includes>
</artifactSet>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
</configuration>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
<transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
<resource>reference.conf</resource>
</transformer>
<transformer implementation="org.apache.maven.plugins.shade.resource.DontIncludeResourceTransformer">
<resource>log4j.properties</resource>
</transformer>
<transformer implementation="org.apache.maven.plugins.shade.resource.ApacheLicenseResourceTransformer"/>
<transformer implementation="org.apache.maven.plugins.shade.resource.ApacheNoticeResourceTransformer"/>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>

Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

"""
Consumes messages from a Amazon Kinesis streams and does wordcount.

This example spins up 1 Kinesis Receiver per shard for the given stream.
It then starts pulling from the last checkpointed sequence number of the given stream.

Usage: kinesis_wordcount_asl.py <app-name> <stream-name> <endpoint-url> <region-name>
<app-name> is the name of the consumer app, used to track the read data in DynamoDB
<stream-name> name of the Kinesis stream (ie. mySparkStream)
<endpoint-url> endpoint of the Kinesis service
(e.g. https://kinesis.us-east-1.amazonaws.com)


Example:
# export AWS keys if necessary
$ export AWS_ACCESS_KEY_ID=<your-access-key>
$ export AWS_SECRET_KEY=<your-secret-key>

# run the example
$ bin/spark-submit -jar extras/kinesis-asl/target/scala-*/\
spark-streaming-kinesis-asl-assembly_*.jar \
extras/kinesis-asl/src/main/python/examples/streaming/kinesis_wordcount_asl.py \
myAppName mySparkStream https://kinesis.us-east-1.amazonaws.com

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After a clean build "mvn -DskipTests clean package -P kinesis-asl", I need actually two jars to run this. extras/kinesis-asl-assembly/target/scala-2.10/spark-streaming-kinesis-asl-assembly-*.jar and extras/kinesis-asl/target/spark-streaming-kinesis-asl_*.jar.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

extras/kinesis-asl-assembly/target/scala-2.10/spark-streaming-kinesis-asl-assembly-*.jar should be sufficient as it includes classes extras/kinesis-asl/target/spark-streaming-kinesis-asl.jar

Do you not see that happening?

There is a companion helper class called KinesisWordProducerASL which puts dummy data
onto the Kinesis stream.

This code uses the DefaultAWSCredentialsProviderChain to find credentials
in the following order:
Environment Variables - AWS_ACCESS_KEY_ID and AWS_SECRET_KEY
Java System Properties - aws.accessKeyId and aws.secretKey
Credential profiles file - default location (~/.aws/credentials) shared by all AWS SDKs
Instance profile credentials - delivered through the Amazon EC2 metadata service
For more information, see
http://docs.aws.amazon.com/AWSSdkDocsJava/latest/DeveloperGuide/credentials.html

See http://spark.apache.org/docs/latest/streaming-kinesis-integration.html for more details on
the Kinesis Spark Streaming integration.
"""
import sys

from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kinesis import KinesisUtils, InitialPositionInStream

if __name__ == "__main__":
if len(sys.argv) != 5:
print(
"Usage: kinesis_wordcount_asl.py <app-name> <stream-name> <endpoint-url> <region-name>",
file=sys.stderr)
sys.exit(-1)

sc = SparkContext(appName="PythonStreamingKinesisWordCountAsl")
ssc = StreamingContext(sc, 1)
appName, streamName, endpointUrl, regionName = sys.argv[1:]
lines = KinesisUtils.createStream(
ssc, appName, streamName, endpointUrl, regionName, InitialPositionInStream.LATEST, 2)
counts = lines.flatMap(lambda line: line.split(" ")) \
.map(lambda word: (word, 1)) \
.reduceByKey(lambda a, b: a+b)
counts.pprint()

ssc.start()
ssc.awaitTermination()
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,15 @@ import org.apache.spark.Logging
/**
* Shared utility methods for performing Kinesis tests that actually transfer data
*/
private class KinesisTestUtils(
val endpointUrl: String = "https://kinesis.us-west-2.amazonaws.com",
_regionName: String = "") extends Logging {
private class KinesisTestUtils(val endpointUrl: String, _regionName: String) extends Logging {

def this() {
this("https://kinesis.us-west-2.amazonaws.com", "")
}

def this(endpointUrl: String) {
this(endpointUrl, "")
}

val regionName = if (_regionName.length == 0) {
RegionUtils.getRegionByEndpoint(endpointUrl).getName()
Expand Down Expand Up @@ -117,6 +123,13 @@ private class KinesisTestUtils(
shardIdToSeqNumbers.toMap
}

/**
* Expose a Python friendly API.
*/
def pushData(testData: java.util.List[Int]): Unit = {
pushData(scala.collection.JavaConversions.asScalaBuffer(testData))
}

def deleteStream(): Unit = {
try {
if (streamCreated) {
Expand Down
Loading