-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathex4.py
58 lines (50 loc) · 1.93 KB
/
ex4.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
# Remember to run "/opt/airflow/start.sh" command to start the web server. Once the Airflow web server is ready, open the Airflow UI using the "Access Airflow" button. Turn your DAG “On”, and then Run your DAG. If you get stuck, you can take a look at the solution file in the workspace/airflow/dags folder in the workspace and the video walkthrough on the next page.
import datetime
from airflow import DAG
from airflow.operators import (
FactsCalculatorOperator,
HasRowsOperator,
S3ToRedshiftOperator
)
#
# TODO: Create a DAG which performs the following functions:
#
# 1. Loads Trip data from S3 to RedShift
# 2. Performs a data quality check on the Trips table in RedShift
# 3. Uses the FactsCalculatorOperator to create a Facts table in Redshift
# a. **NOTE**: to complete this step you must complete the FactsCalcuatorOperator
# skeleton defined in plugins/operators/facts_calculator.py
#
dag = DAG("lesson3.exercise4", start_date=datetime.datetime.utcnow())
#
# TODO: Load trips data from S3 to RedShift. Use the s3_key
# "data-pipelines/divvy/unpartitioned/divvy_trips_2018.csv"
# and the s3_bucket "udacity-dend"
#
copy_trips_task = S3ToRedshiftOperator(
task_id="copy_trips_task",
aws_credentials="aws_credentials",
s3_bucket="udacity-dend",
s3_key="data-pipelines/divvy/unpartitioned/divvy_trips_2018.csv",
table="trips",
redshift_conn_id="redshift",
dag=dag
)
#
# TODO: Perform a data quality check on the Trips table
#
check_trips = HasRowsOperator(
task_id="check_trips",
redshift_conn_id="redshift",
table="trips",
dag=dag
)
#
# TODO: Use the FactsCalculatorOperator to create a Facts table in RedShift. The fact column should
# be `tripduration` and the groupby_column should be `bikeid`
#
# calculate_facts = FactsCalculatorOperator(...)
#
# TODO: Define task ordering for the DAG tasks you defined
#
copy_trips_task >> check_trips