The project is developed with open source big data tools from Apache. All the required tools are installed and configured from scratch in a Linux environment to explore the mechanism of the Apache Big Data ecosystem.
All the supporting tutorials are provided so that this document can be used as a development manual. Also, the technical challenges during the development process and their solution are described or highlighted in the sections. In addition, all the required configuration and coding files are attached to the project folder. Also, important code and configuration steps are mentioned in the glossary.
As part of the ETL process, the data needs to be fetched from UCI Machine Learning Repository from HTTPS link. The compressed file will be extracted. The dataset contains 2,916,697 records and ten attributes (Dataset Description). The target column contains the name of families, including the Ransomware ones (e.g., Cryptxxx, cryptolocker etc). A new column needs to be added to flag the probable Ransomware transaction. Also, column names will be changed to avoid system keywords (e.g. year, day, and count). After the required transformation, data will be inserted into the data warehouse (HIVE) for future use.
Archieve location is shared. Please check version compatibility before downloading the files.
Purpose | Name | Version | Download Link |
Virtualization | VMWare Workstation Player | 16 | Click Here |
OS | Ubuntu | 20.04 | Click Here |
Bigdata framework | Apache Hadoop | 3.1.2 | Click Here |
Database | Apache Hive | 3.1.2 | Click Here |
ETL Tool | Apache NiFi | 1.16.0 | Click Here |
The following section briefly describes the required steps to configure Hadoop and Hive. For set up, different .xml and .env files need to be configured. The details installation tutorial links are pasted with each section. On top of that cross check with the Glossary File for any additional configuration and essential parts included for this project. Copy of my configuration files are also shared in the Configs folder.
VMware is installed on a Windows machine (Laptop with six cores and 24GB memory). Although the procedure is simple, the memory and process allocation should be done accordingly. 8GB of memory, three cores, and 30GB of space are allocated for this project.
VM and Ubuntu installation guide: Click Here
As a prerequisite for Hadoop, JDK 8 and OpenSHH needs to be installed. A separate os user is created for better management and security. Hadoop is downloaded from the mentioned link. The linux bash profile and configuration files should be updated with caution. The required commands are given in the glossary.
Hadoop installation guide: Click Here
Apache Hive is a data warehouse software project built on Apache Hadoop to provide SQL query and analysis features. In the backend, it runs a map-reduce process to extract data from HDFS. Hive is installed to store and manage the data for further analysis.
Hive requires a conventional relational DB to store the necessary metadata for its management. The default installation comes with a derby database. However, accessing Hive from external tools or servers requires concurrent sessions. HiveServer2 facilitates the necessary services. It requires MySQL database as the metadatabase. Special care should be given to download the correct version of MySQL JDBC connector. The detailed steps are given in the glossary.
Hive installation guide: Click Here
HiveServer2 config details: Click Here
HiveServer2 config: Youtube Tutorial
Apache NiFi was built to automate the flow of data between systems. It supports almost all the databases and sources with a GUI-based data flow design facility that is easy to understand and manage. Also, the data flow can be saved and imported as a template to build redundant flows. It scales up the development time. NiFi is backed by ZooKeeper and can be worked in the distributed cluster.
NiFi is developed to manage huge data volumes with high throughput and low latency. It is advised to install NiFi on a separate server with dedicated raid space for logs and contents for the production environment. However, the same server is used for this project. The archive configuration and Java heap size need to be changed to run it smoothly. The log files need to be checked regularly for warnings.
NiFi installation Guide: Click Here
Once installed and configured, the services can be started with below commands in terminal.
cd /home/hadoop/hadoop-3.1.2/sbin
./start-dfs.sh
./start-yarn.sh
hive --service metastore
hiveserver2
sudo service nifi start
stop-dfs.sh
stop-yarn.sh
sudo service nifi stop
From the same VM below URLs can be access through browser.
namenode: http://localhost:9870
datanode: http://localhost:9864
yarn manager: http://localhost:8088
hive: http://localhost:10002/
nifi: https://localhost:8443/nifi/login
The destination tables are created before workflow design. Both regular table and external table is created with same table structure. The purpose will be discussed in the following sections.
beeline -u jdbc:hive2://localhost:10000
CREATE DATABASE ETL;
SHOW DATABASES;
USE ETL;
CREATE TABLE IF NOT EXISTS etl.uci_ransomware (address string, year_at int, day_at int, length int, weight string, count_of int, looped int, neighbors int, income string, label string, ransomware int)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ',';
CREATE TABLE IF NOT EXISTS etl.uci_ransomware_v2 (address string, year_at int, day_at int, length int, weight string, count_of int, looped int, neighbors int, income string, label string, ransomware int)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ',';
From terminal create HDFS Location and Structure.
hdfs dfs -mkdir -p /user/hive/uci_ransomware_ext
hdfs dfs -chmod g+w /user/hive/uci_ransomware_ext
hdfs dfs -ls /user/hive
CREATE EXTERNAL TABLE IF NOT EXISTS etl.uci_ransomware_ext (address string, year_at int, day_at int, length int, weight string, count_of int, looped int, neighbors int, income string, label string, ransomware int)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ','
STORED AS TEXTFILE
LOCATION '/user/hive/uci_ransomware_ext'
tblproperties ("skip.header.line.count"="1");
NiFi's fundamental design concepts closely relate to the main ideas of Flow-Based Programming. Data or "FlowFile" is moved from one step to another for required processing and transformation. Each task is completed by the "FlowFile Processor". Connection defines the relationship among processors.
Details Overview of NiFi: Click Here
For the final NiFi workflow xml template, Click Here. The template can be directly exported once NiFi is configured properly.
Nifi Processors Design - Final Workflow
Blue: Common processors for Data Pulling
Orange: Insert directly into Hive Table Using JDBC Connection
Green: Insert Using External Hive Table
As shown above, total 10 FlowFile Processors are configured to complete all the steps discussed in the ETL Workflow Description. The marked blue processors are for data fetching. Data transform and loading in Hive table is performed in two different methods (Marked green and orange). The processes are described in the following sections.
It is an HTTP client processor which can interact with a configurable HTTP endpoint. It is capable of processing API requests. SSL certificate needs to be downloaded from the site and configured in the processor to fetch data from Rest API/HTTPs. The figure below shows the required configuration, including URL, HTTP Method (Get/post), SSL Context Service, and others. Youtube Tutorial for the required process to configure the processor and SSL certificate.
This processor takes a compressed file as an input and delivery uncompressed files as output. The compression type and file name can be filtered from this processor.
The uncompressed file is forwarded to the PutFile processor to store it in the local file system.
This method ingests data in the Hive table straight from the NiFi application using the Hive JDBC connection.
Twitter Data Example: Click Here
This processor can execute external commands on the content of the FlowFile and creates a new FlowFile with the results. The FlowFile content in the input can be accessed as STDIN, and the processor can forward STOUT from the command as an output to the next processor.
The below figure shows the configuration of the processor. It takes a python script (Click for the script) as the command. The python script takes the STDIN and updates the dataset with an additional "Ransomware" flag column based on the label value. In addition, it supports code blocks (Groovy, Jython, Javascript, JRuby) instead of the script from the local machine.
The ReplaceText processor is used to rename the header names with system keywords like year, day, and count.
QueryRecords processor can perform SQL-like queries directly on the FlowFile content. Also, the data format can be changed with this processor. In this case, the CSV format is converted into JSON for further processing compatibility. Record Reader/Writer value needs to be configured with the arrow sign on the right. In this figure, a new property "data" is included with the "+" sign in the top right corner, and an SQL query is provided as an input. The SQL query should not have any ";" at the end as the processor.
This processor transforms each entry of the JSON file into an SQL INSERT statement. The database JDBC connection pool needs to be created for this processor. The detail of the configuration is given below. Database connection URL, Database user, and Password are provided. The path of hive-site.xml should be provided in the Hive configuration resources box. Although it is not a mandatory parameter, without the Validation query "Select 1 as Test_column" the connection cannot be established.
Moreover, table and schema names need to be provided as input. Also, SQL parameter attributes have to be defined. In this case, "hiveql" is the correct input. The output FlowFile is a queue of insert statements. The hive table creation DDL is given in the glossary.
It receives insert statements as input in the FlowFile and executes it in the Hive database through a JDBC connection.
Once completed, all records will be inserted in Hive.
The aforementioned process executes single insert statements in a queue. It requires a lot of time due to JDBC connection overhead. In addition, Hive works differently than transactional databases and is not suitable for single insert statements. Hence, to improve the data insertion time below method is proposed using Hive external table functionality.
In this method, CSV data is transferred into a file location of HDFS. A Hive external table is defined in the database, which points to the same directory, and the table properties should match the columns and delimiter of the CSV file. Basically, the external table is an abstraction that presents the data in the CSV file as a table. However, it doesn't hold any information. The data will stay in the CSV file even if the external table architecture is dropped. Then an insert statement from the external table to the normal Hive table transfers all the data into the database. Since the operation happens within the HDFS, the execution time is much faster than the JDBC connection request.
A shell script (Click for the script) is called using this processor to add the Ransomware flag column in the dataset and transfer the updated CSV file to the HDFS location.
The SelectHiveQL has an additional property, "HiveQL Post-Query". This property is used in this step to execute the insert statement from the external table to Hive table. For the primary "HiveQl Select Query", a dummy statement has been provided.
Please check the NiFi Developer Manual for proper configuration of the connections, loop back, and error handling. Also buffer, wait time, recurrence parameters need to be configured for each processor based on the requirements.
The workflow can be scheduled based on event or time(cron). Once the starting point is executed, the workflow will be completed accordingly and data will be loaded in final table. The data can be accessed from Hive beeline editor.
In this case, the external table method takes less than a minute to insert 2.9Mn records in the Hive database. In contrast, the JDBC connection takes 5 minutes to execute a batch of 1000 insert statements. Hence external table method should be used for bulk data insertion in the data warehouse environment.
- Hive won’t work without exact version of MySQL JDBC connector
- Mar Reduce merory allocation needs to be changed
- NiFi needs a restart after default password change
- NiFI Hive JDBC connection won’t work without Validation query
- NiFi Java default heap size needs to be increased
- NiFi archive needs to be purged regularly
Overall, NiFi is a reasonably simple ETL design tool. The GUI makes it easy to understand. The 200+ built-in processors serve all the purposes of modern data ingestion needs. The connections can hold the FlowFile in case of failure. It provides an efficient way to execute the workflow from the point of failure. Using Kafka, NiFi can serve the purpose of message queueing as well. Also, the custom script execution makes NiFi versatile to make any custom operations. However, it losses cache information if the primary node gets disconnected. NiFi cluster can solve this problem. The configuration and resource allocation is the most important thing while working with Big Data platforms.