Streaming SQL for Apache Spark is a project based on Catalyst and Spark Streaming, aiming to support SQL-style queries on data streams. Our target is to advance the progress of Catalyst as well as Spark Streaming by bridging the gap between structured data queries and stream processing.
Our Streaming SQL for Apache Spark (hereinafter referred to as Streaming SQL) provides:
- SQL support on both stream and table data with extended time-based windowing aggregation and join.
- Easy mutual operation between DStream and SQL.
- External source API support for streaming source.
StreamSQLContext
is the main entry point for all streaming sql related functionalities. StreamSQLContext
can be created by:
val ssc: StreamingContext
val sqlContext: SQLContext
val streamSqlContext = new StreamSQLContext(ssc, sqlContext)
Or you could use HiveContext
to get full Hive semantics support, like:
val ssc: StreamingContext
val hiveContext: HiveContext
val streamSqlContext = new StreamSQLContext(ssc, hiveContext)
case class Person(name: String, age: String)
// Create an DStream of Person objects and register it as a stream.
val people: DStream[Person] = ssc.socketTextStream(serverIP, serverPort)
.map(_.split(","))
.map(p => Person(p(0), p(1).toInt))
val schemaPeopleStream = streamSqlContext.createSchemaDStream(people)
schemaPeopleStream.registerAsTable("people")
val teenagers = sql("SELECT name FROM people WHERE age >= 10 && age <= 19")
// The results of SQL queries are themselves DStreams and support all the normal operations
teenagers.map(t => "Name: " + t(0)).print()
ssc.start()
ssc.awaitTerminationOrTimeout(30 * 1000)
ssc.stop()
val userStream: DStream[User]
streamSqlContext.registerDStreamAsTable(userStream, "user")
val itemStream: DStream[Item]
streamSqlContext.registerDStreamAsTable(itemStream, "item")
sql("SELECT * FROM user JOIN item ON user.id = item.id").print()
val historyItem: DataFrame
historyItem.registerTempTable("history")
sql("SELECT * FROM user JOIN item ON user.id = history.id").print()
sql(
"""
|SELECT t.word, COUNT(t.word)
|FROM (SELECT * FROM test) OVER (WINDOW '9' SECONDS, SLIDE '3' SECONDS) AS t
|GROUP BY t.word
""".stripMargin)
sql(
"""
|SELECT * FROM
| user1 OVER (WINDOW '9' SECONDS, SLIDE '6' SECONDS) AS u
|JOIN
| user2 OVER (WINDOW '9' SECONDS, SLIDE '6' SECONDS) AS v
|ON u.id = v.id
|WHERE u.id > 1 and u.id < 3 and v.id > 1 and v.id < 3
""".stripMargin)
Note: For time-based windowing join, the window size and sliding size should be same for all the joined streams. This is the limitation of Spark Streaming.
streamSqlContext.command(
"""
|CREATE TEMPORARY TABLE t_kafka (
| word string
|)
|USING org.apache.spark.sql.streaming.sources.KafkaSource
|OPTIONS(
| zkQuorum "localhost:2181",
| groupId "test",
| topics "aa:1",
| messageToRow "org.apache.spark.sql.streaming.examples.MessageDelimiter")
""".stripMargin)
For more examples please checkout the examples.
Streaming SQL is built with sbt, you could use sbt related commands to test/compile/package.
Streaming SQL is built on Spark-1.3, you could change the Spark version in Build.scala
to the version you wanted, currently Streaming SQL can be worked with Spark version 1.3+.
To use Streaming SQL, put the packaged jar into your environment where Spark could access,
you could use spark-submit --jars
or other ways.
FAQs
Q1. What kind of interfaces are available in the current release version?
The current version only supports Scala DSL programming model. Spark-SQL CLI and JDBC drive are not supported so far.
Q2. Does it support schema inference from existing Table?
Yes, you could get schema from static source using SparkSQL and apply into streaming clause.
Q3. What kind of SQL standard it follows?
spark-streamsql's SQL coverage relies on SparkSQL, it can support most part of DMLs and some DDLs.
Q4. Can I run chained SQL query in spark-streamsql?
Curretly it does not support such functionalities.
Q5. Does it recognize Hive Metastore ?
Yes, you could initialize StreamSQLContext with HiveContext to get Hive support for spark-streamsql.
Q6. How to run customized functions in spark-streamsql ( to say UDTF, UDAF, UDF ...)?
Yes, you could register UDF through StreamSQLContext.
Q7. Can I insert (overwrite) query results to Table or external store (HBase)?
Not support, you need to handle this through Scala code.
Contact: [Jerry Shao](mailto: saisai.shao@intel.com), [Grace Huang](mailto: jie.huang@intel.com), [Jiangang Duan](mailto: jiangang.duan@intel.com)
This project is open sourced under Apache License Version 2.0.