Skip to content

Commit

Permalink
feat: Add Streaming Query example
Browse files Browse the repository at this point in the history
  • Loading branch information
AbhiPrasad committed Dec 2, 2019
1 parent e4e76d4 commit d60c4f9
Show file tree
Hide file tree
Showing 4 changed files with 61 additions and 5 deletions.
15 changes: 13 additions & 2 deletions scala-spark-example/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ $SPARK_HOME/bin/spark-submit \
--class "SimpleApp" \
--master "local[4]" \
--files "sentry.properties" \
--packages "io.sentry:sentry-spark_2.11:0.0.1-alpha02" \
--packages "io.sentry:sentry-spark_2.11:0.0.1-alpha04" \
target/scala-2.11/simple-project_2.11-1.0.jar
```

Expand All @@ -62,6 +62,17 @@ $SPARK_HOME/bin/spark-submit \
--class "SimpleQueryApp" \
--master "local[4]" \
--files "sentry.properties" \
--packages "io.sentry:sentry-spark_2.11:0.0.1-alpha02" \
--packages "io.sentry:sentry-spark_2.11:0.0.1-alpha04" \
target/scala-2.11/simple-project_2.11-1.0.jar
```

#### SimpleStreamingQueryApp - uses `SentryStreamingQueryListener`

```bash
$SPARK_HOME/bin/spark-submit \
--class "SimpleStreamingQueryApp" \
--master "local[4]" \
--files "sentry.properties" \
--packages "io.sentry:sentry-spark_2.11:0.0.1-alpha04" \
target/scala-2.11/simple-project_2.11-1.0.jar
```
2 changes: 1 addition & 1 deletion scala-spark-example/build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,4 @@ version := "1.0"
scalaVersion := "2.11.12"

libraryDependencies += "org.apache.spark" %% "spark-sql" % "2.4.4"
libraryDependencies += "io.sentry" %% "sentry-spark" % "0.0.1-alpha02"
libraryDependencies += "io.sentry" %% "sentry-spark" % "0.0.1-alpha04"
3 changes: 1 addition & 2 deletions scala-spark-example/src/main/scala/SimpleApp.scala
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.{SparkSession, Dataset};
import io.sentry.spark.SentrySpark;

import io.sentry.Sentry;
Expand Down
46 changes: 46 additions & 0 deletions scala-spark-example/src/main/scala/SimpleStreamingQueryApp.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
import org.apache.spark.sql.{SparkSession, Dataset}
import org.apache.spark.sql.execution.streaming.MemoryStream
import io.sentry.spark.SentrySpark;

import io.sentry.Sentry;

object SimpleStreamingQueryApp {
def main(args: Array[String]) {
Sentry.init()

val spark = SparkSession
.builder
.appName("Simple Application")
.config("spark.sql.streaming.streamingQueryListeners", "io.sentry.spark.listener.SentryStreamingQueryListener")
.getOrCreate()

SentrySpark.applyContext(spark.sparkContext)

import spark.implicits._
implicit val sqlContext = spark.sqlContext

val input = List(List(1), List(2, 3))
def compute(input: Dataset[Int]): Dataset[Int] = {
input.map(elem => {
val k = 3 / 0
elem + 3
})
}

val inputStream = MemoryStream[Int]
val transformed = compute(inputStream.toDS())

val queryName = "Query Name"

val query =
transformed.writeStream.format("memory").outputMode("append").queryName(queryName).start()

input.foreach(batch => inputStream.addData(batch))

query.processAllAvailable()
val table = spark.table(queryName).as[Int]
val resultRows = table.collect()

spark.stop()
}
}

0 comments on commit d60c4f9

Please sign in to comment.