From d1c62a79e40e74dc4d5e7f0372a4f0cb5602faf7 Mon Sep 17 00:00:00 2001 From: Dylan Leard Date: Mon, 31 Oct 2022 13:57:30 -0700 Subject: [PATCH 1/2] chore: add example dag to iterate on files in a storage directory --- dags/iterate_dag.py | 41 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 41 insertions(+) create mode 100644 dags/iterate_dag.py diff --git a/dags/iterate_dag.py b/dags/iterate_dag.py new file mode 100644 index 0000000..a84a897 --- /dev/null +++ b/dags/iterate_dag.py @@ -0,0 +1,41 @@ +from google.cloud import storage +from datetime import datetime #, timedelta +from airflow.models import DAG +from airflow.operators.dummy_operator import DummyOperator +from airflow.operators.python_operator import PythonOperator +from airflow.utils.dates import days_ago +from airflow.models import Variable + +def get_files(): + BUCKET = 'eed-dag-test-bucket' + PROJECT = 'emission-elt-demo' + client = storage.Client() + blobs = (client.list_blobs(BUCKET)) + file_names = [] + for blob in blobs: + file_names.append(blob.name) + print(blob.name) + file_names.sort() + return file_names + +args = { + 'owner': 'airflow', +} + +dag = DAG( + 'iterate-storage', + default_args=args, + schedule_interval=None, + catchup=False, + start_date=days_ago(1), +) + +start = DummyOperator(task_id='Start', dag=dag) +end = DummyOperator(task_id='End', dag=dag) +t1 = PythonOperator( + task_id='get_files', + python_callable=get_files, + dag=dag, + ) + +start >> t1 >> end From 1b79c88a9d8a9eef8f74ba62943c8e08ea7be0e0 Mon Sep 17 00:00:00 2001 From: Dylan Leard Date: Mon, 31 Oct 2022 13:57:59 -0700 Subject: [PATCH 2/2] feat: add example ingestion dag to read a csv into a db table --- dags/db_insert_dag.py | 67 +++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 67 insertions(+) create mode 100644 dags/db_insert_dag.py diff --git a/dags/db_insert_dag.py b/dags/db_insert_dag.py new file mode 100644 index 0000000..96f6067 --- /dev/null +++ b/dags/db_insert_dag.py @@ -0,0 +1,67 @@ +import os +import psycopg2 +from airflow.operators.python_operator import PythonOperator +from airflow.models import DAG +from datetime import timedelta +from airflow.utils.dates import days_ago +from google.cloud import storage +import tempfile + +default_args = { + 'email': ['dylan@button.is','mike@button.is'], + 'email_on_retry': False, + 'email_on_failure': False, + 'retries': 1, + 'retry_delay': timedelta(minutes=5), + 'depends_on_past': False, + 'start_date': days_ago(1), +} + +def import_csv_data(): + conn = psycopg2.connect(database="eed", + user=os.environ['eed_db_user'], password=os.environ['eed_db_pass'], + host=os.environ['eed_db_host'], port='5432' + ) + + conn.autocommit = True + cursor = conn.cursor() + + + sql = '''CREATE TABLE if not exists DETAILS(employee_id int NOT NULL,\ + employee_name char(20),\ + employee_email varchar(30));''' + + + cursor.execute(sql) + + client = storage.Client() + bucket = client.get_bucket('eed-dag-test-bucket') + blob = bucket.get_blob('test_csv.csv') + downloaded_blob = blob.download_as_text() + + with open('temp.csv', 'w') as f: + f.write(downloaded_blob) + fr = open("temp.csv", "r") + print(fr.read()) + + with open('temp.csv', 'r') as f: + cursor.copy_expert('COPY details(employee_id, employee_name, employee_email) FROM STDIN WITH HEADER CSV', f) + os.remove('temp.csv') + + conn.commit() + conn.close() + +dag = DAG( + 'IMPORT_SQL', + default_args=default_args, + description='A DAG that imports csv to an SQL database.', + schedule_interval=None, +) + +task = PythonOperator( + task_id='import_csv_data', + python_callable=import_csv_data, + dag=dag, +) + +task