-
Notifications
You must be signed in to change notification settings - Fork 35
/
SparkStreamingKustoSink.scala
63 lines (52 loc) · 2.29 KB
/
SparkStreamingKustoSink.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
import java.util.concurrent.TimeUnit
import com.microsoft.kusto.spark.datasink.KustoSinkOptions
import org.apache.spark.sql._
import org.apache.spark.eventhubs.{ConnectionStringBuilder, EventHubsConf, EventPosition}
import org.apache.spark.sql.streaming.Trigger
import org.apache.spark.sql.functions._
// COMMAND ----------
/**
* ***********************************************
*/
/* STREAMING SINK EXAMPLE */
/**
* ***********************************************
*/
// To enable faster ingestion into kusto, set a minimal value for the batching ingestion policy:
// .alter table <table name> policy ingestionbatching @'{"MaximumBatchingTimeSpan": "00:00:10",}'
object SparkStreamingKustoSink {
def main(args: Array[String]): Unit = {
// COMMAND ----------
// Note! This command is not required if you run in a Databricks notebook
val spark: SparkSession = SparkSession
.builder()
.appName("SparkStreamingKustoSink")
.master(f"local[4]")
.getOrCreate()
// read messages from Azure Event Hub
val connectionString = ConnectionStringBuilder("Event Hub Connection String")
.setEventHubName("Event Hub Name")
.build
val eventHubsConf = EventHubsConf(connectionString)
.setStartingPosition(EventPosition.fromEndOfStream)
val eventhubs = spark.readStream
.format("eventhubs")
.options(eventHubsConf.toMap)
.option("checkpointLocation", "/checkpoint")
.load()
val toString = udf((payload: Array[Byte]) => new String(payload))
val df = eventhubs.withColumn("body", toString(eventhubs("body")))
spark.conf.set("spark.sql.streaming.checkpointLocation", "target/temp/checkpoint/")
// Write to a Kusto table from a streaming source
val df1 = df.writeStream
.format("com.microsoft.kusto.spark.datasink.KustoSinkProvider")
.option(KustoSinkOptions.KUSTO_CLUSTER, "Your Kusto Cluster")
.option(KustoSinkOptions.KUSTO_DATABASE, "Your Kusto Database")
.option(KustoSinkOptions.KUSTO_TABLE, "Your Kusto Destination Table")
.option(KustoSinkOptions.KUSTO_AAD_APP_ID, "Your Client ID")
.option(KustoSinkOptions.KUSTO_AAD_APP_SECRET, "Your secret")
.trigger(Trigger.ProcessingTime(10000))
.start()
df1.awaitTermination(TimeUnit.MINUTES.toMillis(8))
}
}