diff --git a/scala-spark-example/README.md b/scala-spark-example/README.md index c707d20..54e6b4f 100644 --- a/scala-spark-example/README.md +++ b/scala-spark-example/README.md @@ -51,7 +51,7 @@ $SPARK_HOME/bin/spark-submit \ --class "SimpleApp" \ --master "local[4]" \ --files "sentry.properties" \ - --packages "io.sentry:sentry-spark_2.11:0.0.1-alpha02" \ + --packages "io.sentry:sentry-spark_2.11:0.0.1-alpha04" \ target/scala-2.11/simple-project_2.11-1.0.jar ``` @@ -62,6 +62,17 @@ $SPARK_HOME/bin/spark-submit \ --class "SimpleQueryApp" \ --master "local[4]" \ --files "sentry.properties" \ - --packages "io.sentry:sentry-spark_2.11:0.0.1-alpha02" \ + --packages "io.sentry:sentry-spark_2.11:0.0.1-alpha04" \ + target/scala-2.11/simple-project_2.11-1.0.jar +``` + +#### SimpleStreamingQueryApp - uses `SentryStreamingQueryListener` + +```bash +$SPARK_HOME/bin/spark-submit \ + --class "SimpleStreamingQueryApp" \ + --master "local[4]" \ + --files "sentry.properties" \ + --packages "io.sentry:sentry-spark_2.11:0.0.1-alpha04" \ target/scala-2.11/simple-project_2.11-1.0.jar ``` diff --git a/scala-spark-example/build.sbt b/scala-spark-example/build.sbt index 122b926..45daf8e 100644 --- a/scala-spark-example/build.sbt +++ b/scala-spark-example/build.sbt @@ -5,4 +5,4 @@ version := "1.0" scalaVersion := "2.11.12" libraryDependencies += "org.apache.spark" %% "spark-sql" % "2.4.4" -libraryDependencies += "io.sentry" %% "sentry-spark" % "0.0.1-alpha02" +libraryDependencies += "io.sentry" %% "sentry-spark" % "0.0.1-alpha04" diff --git a/scala-spark-example/src/main/scala/SimpleApp.scala b/scala-spark-example/src/main/scala/SimpleApp.scala index b84d02a..f9f833c 100644 --- a/scala-spark-example/src/main/scala/SimpleApp.scala +++ b/scala-spark-example/src/main/scala/SimpleApp.scala @@ -1,5 +1,4 @@ -import org.apache.spark.sql.SparkSession; -import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.{SparkSession, Dataset}; import io.sentry.spark.SentrySpark; import io.sentry.Sentry; diff --git a/scala-spark-example/src/main/scala/SimpleStreamingQueryApp.scala b/scala-spark-example/src/main/scala/SimpleStreamingQueryApp.scala new file mode 100644 index 0000000..b54fb6f --- /dev/null +++ b/scala-spark-example/src/main/scala/SimpleStreamingQueryApp.scala @@ -0,0 +1,46 @@ +import org.apache.spark.sql.{SparkSession, Dataset} +import org.apache.spark.sql.execution.streaming.MemoryStream +import io.sentry.spark.SentrySpark; + +import io.sentry.Sentry; + +object SimpleStreamingQueryApp { + def main(args: Array[String]) { + Sentry.init() + + val spark = SparkSession + .builder + .appName("Simple Application") + .config("spark.sql.streaming.streamingQueryListeners", "io.sentry.spark.listener.SentryStreamingQueryListener") + .getOrCreate() + + SentrySpark.applyContext(spark.sparkContext) + + import spark.implicits._ + implicit val sqlContext = spark.sqlContext + + val input = List(List(1), List(2, 3)) + def compute(input: Dataset[Int]): Dataset[Int] = { + input.map(elem => { + val k = 3 / 0 + elem + 3 + }) + } + + val inputStream = MemoryStream[Int] + val transformed = compute(inputStream.toDS()) + + val queryName = "Query Name" + + val query = + transformed.writeStream.format("memory").outputMode("append").queryName(queryName).start() + + input.foreach(batch => inputStream.addData(batch)) + + query.processAllAvailable() + val table = spark.table(queryName).as[Int] + val resultRows = table.collect() + + spark.stop() + } +} \ No newline at end of file