About | Features | Technologies | Requirements | Starting | License | Author
This project is an ETL (Extract, Transform, Load) pipeline designed to extract data from Twitter API, transform it, and then load it into an S3 bucket using Airflow. The goal of this project is to collect and store Twitter data for further analysis and insights.
✔️ Feature 1 - Twitter API integration
✔️ Feature 2 - Airflow integration
✔️ Feature 3 - S3 data storage
✔️ Feature 4 - EC2 instance for Airflow
The following tools were used in this project:
Before starting 🏁, you need to have an AWS Account, Python installed & sign up for the Twitter API.
- Go to the "Projects & Apps" tab in the developer dashboard and create a new project by clicking the "Create Project" button.
- Choose the appropriate project type based on your intended use case and follow the prompts to create the project.
- From the project dashboard, click on the "Keys and Tokens" tab to access your API keys and tokens.
- Generate your access token and secret by clicking the "Generate" button under the "Access token & secret" section.
- Copy and save your API keys and tokens in a secure location. These keys will be required to access the Twitter API and should be kept confidential.
- Create a file called creds.py, which is where you store your twitter api credentials.
- Note that you can directly save your access keys inside your ETL script but it's good practice to keep anything confidential a secret.
# Replace these keys with your own
# creds.py
API_KEY = "API_KEY"
API_SECRET_KEY = "API_SECRET_KEY"
ACCESS_TOKEN = "ACCESS_TOKEN"
ACCESS_TOKEN_SECRET = "ACCESS_TOKEN_SECRET"
The files above and later in this project are to be saved in a folder. Create a folder called twitter_etl_project and store creds.py inside the folder.
Before creating a py script, make sure you install these packages from your command:
pip3 install pandas # Python package used for data manipulation and analysis.
pip3 install tweepy # Python package used to interact with the Twitter API.
pip3 install s3fs # Python package used to interact with Amazon S3 using the file system interface.
Create a file called twitter_etl.py inside your twitter_etl_project folder. Make sure it's in the same directory.
- The documentation for Tweepy can be accessed here.
- Make sure that your sensitive credentials are stored somewhere else if you are planning to make this code public.
- For this project .user_timeline is used from the Tweepy package. This will return in a JSON format with alot of key-value pairs. Choose with key you want to do analysis with.
- Change the JSON into a list:
tweet_list = []
for tweet in tweets:
text = tweet._json["full_text"]
refined_tweet = {'user': tweet.user.screen_name,
'text' : text,
'favorite_count': tweet.favorite_count,
'retweet_count' : tweet.retweet_count,
'created_at' : tweet.created_at}
tweet_list.append(refined_tweet)
- The tweet_list will be converted into a dataframe & stored into .csv using the pandas package.
- For now, store your .csv file in your local machine first:
df = pd.DataFrame(tweet_list)
df.to_csv("your_twitter_data.csv")
-
Add run_twitter_etl() at the end of your code. (To test the script ONLY. Remove it once the script shows no error)
-
Run this script in your terminal:
python3 twitter_etl.py
- You should now have three files in your folder: creds.py, twitter_etl.py & your_twitter_data.csv!
- Create a python script, twitter_dag.py
- Make sure you import the function run_twitter_etl from twitter_etl.py into twitter_dag.py:
from datetime import timedelta
from airflow import DAG # to define a directed acyclic graph (DAG) in Airflow.
from airflow.operators.python_operator import PythonOperator #execute arbitrary Python code as a task in a DAG.
from airflow.utils.dates import days_ago
from datetime import datetime
from twitter_etl import run_twitter_etl # importing run_twitter_etl function
- A dictionary called default_args is defined to provide default configuration options for the DAG, such as the owner, email addresses, and retry settings.
- A new instance of the DAG class is created with the ID twitter_dag and the default configuration options from default_args.
- A new instance of the PythonOperator class is created with the ID complete_twitter_etl, the Python function run_twitter_etl as the callable to execute, and the DAG object as the parent.
- run_etl is an instance of the PythonOperator class, which is a task that executes a Python callable (function) called run_twitter_etl when the DAG is run.
- Once you have an AWS account, go to S3 to create a bucket.
- Click the "Create bucket" button to create a new bucket.
- Give your bucket a unique name, choose the region where you want to store your data, and click "Next".
- Review your settings and click "Create bucket" to create the bucket.
- Once the bucket is created, copy the file path e.g. s3://enter_your_bucket_name_here/
- Open your twitter_etl.py file and replace the output .csv file into your s3 path:
df.to_csv("s3://enter_your_bucket_name_here/twitter_data.csv")
- Go to the EC2 service and click on "Launch an instance"
- Give it a name, choose the Ubuntu OS & t3.medium instance type.
- Please be noted that t3.medium is not free, but the free tier, t2.micro is very slow when running Airflow. Just remember to stop your instance once you are done with this project!
- Create a key pair and you'll be given a .pem file containing the key pair. This is important as this will allow you to connect with the instance.
- Allow SSH,HTTPS & HTTP traffic from the internet. Launch your instance!
To enable our EC2 instance to connect from our IP/local machine & instance have permission to access the S3 bucket, we have to give it permissions.
- Click on the Instance ID of the instance you've created, go to the Security tab and click on the IAM Role. This will redirect you to the IAM page.
- Create a role, choose AWS service and EC2 as our use case.
- Search for "AmazonEC2FullAccess" & "AmazonS3FullAccess"
- This role will enable full access to EC2 & S3 service for this instance.
- Now, going back to the security tab, click the security group link.
- Edit inbound rules and add rule.
- This is not recommended and not the best practice, but for the sake of this project, select Type: All traffic, Source: My IP and save the rules.
- In the EC2 homepage, select the EC2 instance that you've created and click on connect. It will redirect you to a set of instructions on how to connect to your instance via SSH
- Open your terminal and make sure you are in the .pem file directory.
- Run this command:
chmod 400 YOUR_PEM_KEY.pem
ssh -i "YOUR_PEM_KEY.pem" ubuntu@....INSTANCE_PUBLIC_DNS..compute.amazonaws.com
- If successful, you should see something like this in the terminal:
ubuntu@ip-145-31-3-39:~$
-
Congrats! You have successfully connected to your instance.
-
Run these commands to install python packages needed for this project:
sudo apt-get update
sudo apt install python3-pip
sudo pip install apache-airflow
sudo pip install pandas
sudo pip install s3fs
sudo pip install tweepy
Think of your instance as an "empty slate" and is not connected to your local machine. This is why we need to install these packages for our product to work.
Now it's time to open up airflow via our instance!
- Run this command to initialize airflow:
airflow standalone
-
After a few minutes you should be able to open up airflow. There should be a username & password as your credentials to the airflow webserver.
-
To open the webserver, use your instance Public IPv4 DNS with the port 8080 & enter this in your browser, e.g. ec2-3-109-207-157.ap-south-1.compute.amazonaws.com:8080 on Chrome/Firefox/IE/Safari
You can get this Public IPv4 at the instance page @ EC2.
-
If you have done everything correctly, you should be in the Airflow login page. Enter your credentials there.
-
Now, click the DAGs tab to see the list of DAG templates provided by Airflow. Our own DAG, twitter_dag.py is not in this list yet because we haven't inserted it inside our instance!
-
If you still have the airflow webserver running in the terminal, press Ctrl+C to terminate it. Now the directory should be your instance IP. If you closed your connection to your instance, just follow Part 4.2.3 to connect back.
-
Type "ls" to see the instance home directory and see what files are inside. There should be one, called airflow.
-
Go into the folder by typing "cd airflow" & type "ls" again.
-
There are multiple files inside this folder but what we want to focus on is the airflow.cfg file.
-
Write "sudo nano airflow.cfg". This will enable us to modify the contents inside airflow.cfg.
-
For now, the only thing that you should change in this file is the "dags_folder" path. Change it to:
dags_folder = /home/ubuntu/airflow/twitter_dag
This variable is used by Airflow to locate your DAG files when it starts up. By default, Airflow looks for DAG files in the dags_folder directory.
-
Press Ctrl+X and Enter to save the modification.
-
Now we want to create the twitter_dag folder and store all of our python scripts inside that folder. Type "mkdir twitter_dag" and ls to check if it's created. If it exists, cd into twitter_dag.
-
There are three python scripts that we want to insert here. But we can't actually drag those files into the instance, so we use sudo nano instead.
-
"sudo nano creds.py" into your terminal, copy the contents of creds.py and paste it into the terminal. Press Ctrl+X and Enter to save the modification.
By default, the creds.py created using sudo nano is empty, and we are essentially pasting the code we did in our local machine to the file inside the instance.
-
Do the previous step with twitter_dag.py & twitter_etl.py.
-
Check the three files if they are created correctly or not. Congrats!
-
Now, make sure your terminal is on the root directory of the instance and open up airflow again by the airflow standalone command.
-
When you open up your DAGs tab in the airflow, there should be a DAG called twitter_dag now! Click on the DAG to see the graph, code, grid & etc.
-
To run the dag, go to the graph tab, click on the task, complete-twitter-etl and click on the Play button around the top-right corner.
-
Your DAG is now queued and will run the code you provided inside. Yay!
-
The hard part is done, hopefully. Go to your S3 bucket that you pointed earlier inside your code.
-
When you refresh your bucket, there should be the .csv file inside the bucket now.
-
You did it! You've successfully ran the code from Airflow. This is particularly useful because Airflow is designed to run on a schedule and say if you want to get new data daily, you can run this once everyday.
Obviously there could be several issues that arise during the ETL process that could make it difficult to complete successfully, such as rate limiting by the Twitter API, network connectivity issues, or data quality issues.
There are also several factors that need to be considered when scheduling ETL jobs, such as the amount of data being processed, the frequency of the data updates, and the resources available for running the ETL job. It is important to carefully consider these factors and adjust the schedule accordingly to ensure that the ETL job runs smoothly and efficiently.
But for now, you have essentially learn how to do ETL jobs!
This project is under license from MIT. For more details, see the LICENSE file.
Made with ❤️ by Faiz Kasman