This is a project for Credit Card Fraudulent Detection with Random Forest using Spark Structured Streaming
There are 3 options if you want to run CCFD-RF
- Option 1: Run job locally, reading from a file and writing to console
- Option 2: Run job locally, reading from a kafka source and writing to a kafka sink
- Option 3: Run job in SoftNet cluster, reading from HDFS and writing to HDFS
Notes:
We propose to run the project with Option 2 because it is easier to test:
The attached code is written in Option 2
In line 25-30 [StructuredRandomForest]: Configure SparkSession variable
val spark = SparkSession.builder()
.appName("SparkStructuredStreamingExample")
.master("local[*]")
.config("spark.sql.streaming.checkpointLocation", "checkpoint_saves/")
.getOrCreate()
In line 25-30 [StructuredRandomForest]: Configure SparkSession variable
val spark = SparkSession.builder()
.appName("SparkStructuredRandomForest")
.config("spark.sql.streaming.checkpointLocation", "/user/vvittis")
.getOrCreate()
In line 35-43 [StructuredRandomForest]: Read from Source
val rawData = spark.readStream.text("dataset_source/")
In line 35-43 [StructuredRandomForest]: Read from Source
val rawData = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "testSource")
.option("startingOffsets", "earliest")
.load()
.selectExpr("CAST(value AS STRING)")
Note: of course you have to execute:
Open 2 command line windows and cd on “C:\kafka_2.12-2.3.0” 1st window bin\windows\zookeeper-server-start.bat config\zookeeper.properties 2nd window bin\windows\kafka-server-start.bat config\server.properties
In line 35-43 [StructuredRandomForest]: Read from Source
val rawData = spark.readStream.text("/user/vvittis/numbers")
Note: /user/vvittis/numbers is a path to a HDFS folder
In line 212 [StructuredRandomForest]: Write to Console
val query = kafkaResult
.writeStream
.outputMode("update")
.option("truncate", "false")
.format("console")
.queryName("TestStatefulOperator")
.start()
In line 215-230 [StructuredRandomForest]: Write to kafka sink
val query = kafkaResult
.selectExpr("CAST(value AS STRING)")
.writeStream.outputMode("update")
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("topic", "testSink")
.queryName("RandomForest")
.start()
In line 224-230 [StructuredRandomForest]: Write to HDFS sink
val query = kafkaResult
.writeStream
.outputMode("append")
.format("csv")
.option("path","/user/vvittis/results/")
.queryName("RandomForest")
.start()
Note: /user/vvittis/results is a path to a HDFS folder
Step 1: Clone CCFD-RF File > New > Project From Version Control... Step 2: In the URL: copy https://github.com/vvittis/CCFD-RF.git In the Directory: Add your preferred directory Step 3: Click the build button or Build > Build Project Step 4: Go to src > main > scala > StructuredRandomForest.scala and click Run
- A typical Console showing the state:
- A typical Console showing the output:
You will find the sbt folder
Step 1: Run sbt assembly and create a .jar file Step 2: Run ./bin/spark-submit --class StructuredRandomForest --master yarn-client --num-executors 10 --driver-memory 512m --executor-memory 512m --executor-cores 1 /home/vvittis/StructuredRandomForest-assembly-0.1.jar
- A typical Cluster showing that each executor takes one Hoeffding Tree of the Random Forest:
- This test executed with 10 executors and 10 HT.
Licensed under the MIT Licence.