Spark-Hazelcast Connector API :
- Exposes Hazelcast Distributed Objects(DistributedMap, MultiMap, ReplicatedMap, DistributedList, DistributedSet and DistributedQueue) as Spark RDDs.
- Writes RDDs back to Hazelcast by implicit writeEntryToHazelcast / writeItemToHazelcast / writeMessageToHazelcast calls.
- Writes Hazelcast Distributed Object Events to Spark as DStream and creates HazelcastEntryStream, HazelcastItemStream and HazelcastMessageStream.
- Write Spark DStreams back to Hazelcast by implicit writeEntryToHazelcast / writeItemToHazelcast / writeMessageToHazelcast calls.
- Provides examples module to learn / setup in a minute!
Sample Hazelcast XML File as follows :
<hazelcast xsi:schemaLocation="http://www.hazelcast.com/schema/config hazelcast-config-3.6.xsd"
xmlns="http://www.hazelcast.com/schema/config"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
<group>
<name>dev</name>
<password>dev-pass</password>
</group>
<instance-name>test_hazelcast_instance</instance-name>
<network>
<join>
<multicast enabled="false"></multicast>
<tcp-ip enabled="true">
<member>127.0.0.1:5701</member>
</tcp-ip>
</join>
</network>
</hazelcast>
lazy val sc = new SparkContext(new SparkConf().setAppName("spark-hazelcast").setMaster("local[2]"))
val properties = new Properties()
properties.put(HazelcastXMLConfigFileName, "hazelcast.xml")
properties.put(HazelcastDistributedObjectName, "test_distributed_map")
properties.put(HazelcastDistributedObjectType, DistributedObjectType.IMap)
val hazelcastEntryRDD = new HazelcastEntryRDD[Int, String](sc, properties)
hazelcastEntryRDD.print()
lazy val sc = ...
val properties = new Properties()
properties.put(HazelcastXMLConfigFileName, "hazelcast.xml")
properties.put(HazelcastDistributedObjectName, "test_multi_map")
properties.put(HazelcastDistributedObjectType, DistributedObjectType.MultiMap)
val hazelcastEntryRDD = new HazelcastEntryRDD[Int, String](sc, properties)
hazelcastEntryRDD.print()
lazy val sc = ...
val properties = new Properties()
properties.put(HazelcastXMLConfigFileName, "hazelcast.xml")
properties.put(HazelcastDistributedObjectName, "test_replicated_map")
properties.put(HazelcastDistributedObjectType, DistributedObjectType.ReplicatedMap)
val hazelcastEntryRDD = new HazelcastEntryRDD[Int, String](sc, properties)
hazelcastEntryRDD.print()
lazy val sc = ...
val properties = new Properties()
properties.put(HazelcastXMLConfigFileName, "hazelcast.xml")
properties.put(HazelcastDistributedObjectName, "test_distributed_list")
properties.put(HazelcastDistributedObjectType, DistributedObjectType.IList)
val hazelcastItemRDD = new HazelcastItemRDD[Int, String](sc, properties)
hazelcastItemRDD.print()
lazy val sc = ...
val properties = new Properties()
properties.put(HazelcastXMLConfigFileName, "hazelcast.xml")
properties.put(HazelcastDistributedObjectName, "test_distributed_set")
properties.put(HazelcastDistributedObjectType, DistributedObjectType.ISet)
val hazelcastItemRDD = new HazelcastItemRDD[Int, String](sc, properties)
hazelcastItemRDD.print()
lazy val sc = ...
val properties = new Properties()
properties.put(HazelcastXMLConfigFileName, "hazelcast.xml")
properties.put(HazelcastDistributedObjectName, "test_distributed_queue")
properties.put(HazelcastDistributedObjectType, DistributedObjectType.IQueue)
val hazelcastItemRDD = new HazelcastItemRDD[Int, String](sc, properties)
hazelcastItemRDD.print()
lazy val sc = ...
val properties = new Properties()
properties.put(HazelcastXMLConfigFileName, "hazelcast.xml")
properties.put(HazelcastDistributedObjectName, "test_distributed_map")
properties.put(HazelcastDistributedObjectType, DistributedObjectType.IMap)
val dataList = (1 to 10).map(i => (i, s"value_$i")).toList
val tupleRDD = sc.parallelize[(Int, String)](dataList)
import com.otv.spark.hazelcast.connector.rdd.implicits._
tupleRDD.writeEntryToHazelcast(properties)
val properties = ...
val intRDD = ...
import com.otv.spark.hazelcast.connector.rdd.implicits._
intRDD.writeItemToHazelcast(properties)
lazy val sc = ...
val properties = new Properties()
properties.put(HazelcastXMLConfigFileName, "hazelcast.xml")
properties.put(HazelcastDistributedObjectName, "test_topic")
properties.put(HazelcastDistributedObjectType, DistributedObjectType.ITopic)
val dataList = (1 to 10).toList
val intRDD = sc.parallelize[Int](dataList)
import com.otv.spark.hazelcast.connector.rdd.implicits._
intRDD.writeMessageToHazelcast(properties)
val properties = new Properties()
properties.put(HazelcastXMLConfigFileName, "hazelcast.xml")
properties.put(HazelcastDistributedObjectName, "test_distributed_map")
properties.put(HazelcastDistributedObjectType, DistributedObjectType.IMap)
lazy val sc = new SparkContext(new SparkConf().setAppName("spark-hazelcast").setMaster("local[2]"))
lazy val ssc = new StreamingContext(sc, Seconds(1))
val hzMapStream = HazelcastUtils.createHazelcastEntryStream[Integer,String](ssc,
StorageLevel.MEMORY_ONLY,
sparkHazelcastProperties,
Set(DistributedEventType.ADDED, DistributedEventType.REMOVED))
hzMapStream.print(10)
ssc.start()
val properties = ...
lazy val ssc = ...
val hzListStream = HazelcastUtils.createHazelcastItemStream[String](ssc,
StorageLevel.MEMORY_ONLY,
sparkHazelcastProperties,
Set(DistributedEventType.ADDED, DistributedEventType.REMOVED))
hzListStream.print(10)
ssc.start()
val properties = ...
lazy val ssc = ...
val hzListStream = HazelcastUtils.createHazelcastMessageStream[String](ssc,
StorageLevel.MEMORY_ONLY,
sparkHazelcastProperties)
hzListStream.print(10)
ssc.start()
lazy val sc = new SparkContext(new SparkConf().setAppName("spark-hazelcast").setMaster("local[2]"))
lazy val ssc = new StreamingContext(sc, Seconds(1))
val properties = new Properties()
properties.put(HazelcastXMLConfigFileName, "hazelcast.xml")
properties.put(HazelcastDistributedObjectName, "test_distributed_map")
properties.put(HazelcastDistributedObjectType, DistributedObjectType.IMap)
val rddQueue: Queue[RDD[(K, V)]] = ...
val queueStream = ssc.queueStream(rddQueue)
queueStream.writeEntryToHazelcast(properties)
ssc.start()
lazy val ssc = ...
val properties = ...
val rddQueue: Queue[RDD[(K, V)]] = ...
val queueStream = ssc.queueStream(rddQueue)
queueStream.writeItemToHazelcast(properties)
ssc.start()
lazy val ssc = ...
val properties = ...
val rddQueue: Queue[RDD[(K, V)]] = ...
val queueStream = ssc.queueStream(rddQueue)
queueStream.writeMessageToHazelcast(properties)
ssc.start()