Airflow DAGs for exporting and loading the Polygon blockchain data to Google BigQuery:
- polygon_export_dag.py - exports Polygon data to a GCS bucket.
- polygon_load_dag.py - loads Polygon data from GCS bucket to BigQuery.
- polygon_verify_streaming_dag.py - verifies the consistency and data latency in crypto_polygon BigQuery tables.
- linux/macos terminal
- git
- gcloud
-
Create a GCS bucket to hold export files:
gcloud config set project <your_gcp_project> PROJECT=$(gcloud config get-value project 2> /dev/null) ENVIRONMENT_INDEX=0 BUCKET=${PROJECT}-${ENVIRONMENT_INDEX} gsutil mb gs://${BUCKET}/
-
Create a Google Cloud Composer environment:
ENVIRONMENT_NAME=${PROJECT}-${ENVIRONMENT_INDEX} && echo "Environment name is ${ENVIRONMENT_NAME}" gcloud composer environments create ${ENVIRONMENT_NAME} --location=us-central1 --zone=us-central1-a \ --disk-size=30GB --machine-type=n1-standard-1 --node-count=3 --python-version=3 --image-version=composer-1.10.6-airflow-1.10.3 \ --network=default --subnetwork=default gcloud composer environments update $ENVIRONMENT_NAME --location=us-central1 --update-pypi-packages-from-file=requirements.txt
Note that if Composer API is not enabled the command above will auto prompt to enable it.
-
This will be a good time to go to the bigquery console and cretae 3 new datasets under your project.
- crypto_polygon
- crypto_polygon_raw
- crypto_polygon_temp
-
Follow the steps in Configuring Airflow Variables to configure Airfow variables.
-
Follow the steps in Deploying Airflow DAGs to deploy Airflow DAGs to Cloud Composer Environment.
-
Follow the steps here to configure email notifications.
- For a new environment clone polygon ETL Airflow:
git clone https://github.com/blockchain-etl/polygon-etl && cd polygon-etl/airflow
. For an existing environment use theairflow_variables.json
file from Cloud Source Repository for your environment. - Copy
example_airflow_variables.json
toairflow_variables.json
. Editairflow_variables.json
and update configuration options with your values. You can find variables description in the table below. For thepolygon_output_bucket
variable specify the bucket created on step 1 above. You can get it by runningecho $BUCKET
. - Open Airflow UI. You can get its URL from
airflowUri
configuration option:gcloud composer environments describe ${ENVIRONMENT_NAME} --location us-central1
. - Navigate to Admin > Variables in the Airflow UI, click Choose File, select
airflow_variables.json
, and click Import Variables.
Note that the variable names must be prefixed with {chain}_
, e.g. polygon_output_bucket
.
Variable | Description |
---|---|
output_bucket |
GCS bucket where exported files with blockchain data will be stored |
export_start_date |
export start date, default: 2019-04-22 |
export_end_date |
export end date, used for integration testing, default: None |
export_schedule_interval |
export cron schedule, default: 0 1 * * * |
provider_uris |
comma-separated list of provider URIs for polygon-etl command |
notification_emails |
comma-separated list of emails where notifications on DAG failures, retries and successes will be delivered. This variable must not be prefixed with {chain}_ |
export_max_active_runs |
max active DAG runs for export, default: 3 |
export_max_workers |
max workers for polygon-etl command, default: 5 |
destination_dataset_project_id |
GCS project id where destination BigQuery dataset is |
load_schedule_interval |
load cron schedule, default: 0 2 * * * |
load_end_date |
load end date, used for integration testing, default: None |
It is recommended to keep airflow_variables.json in a version control system e.g. git. Below are the commands for creating a Cloud Source Repository to hold airflow_variables.json:
REPO_NAME=${PROJECT}-airflow-config-${ENVIRONMENT_INDEX} && echo "Repo name ${REPO_NAME}"
gcloud source repos create ${REPO_NAME}
gcloud source repos clone ${REPO_NAME} && cd ${REPO_NAME}
# Put airflow_variables.json to the root of the repo
git add airflow_variables.json && git commit -m "Initial commit"
git push
To automate import variables in airflow_variables.json to Cloud composer, perform the following steps:
- Copy example cloud build to the root of repository
- Navigate to Cloud Build Triggers console https://console.cloud.google.com/cloud-build/triggers.
- Click Create push trigger button.
- Specify the following configuration options for the trigger:
- Name:
import-airflow-variables
- Event:
Push to a branch
- Source:
^master$
- Included files filter:
airflow/**
- Build configuration:
Cloud Build configuration file (yaml or json)
- Cloud Build configuration file location:
cloudbuild.yaml
- Substitution variables:
_ENVIRONMENT_NAME
:Cloud Composer environment name
, e.g._ENVIRONMENT_NAME
:polygon-etl-0
_LOCATION
:Cloud Composer location
, e.g._LOCATION
:us-central1
- Name:
- Get the value from
dagGcsPrefix
configuration option from the output of:gcloud composer environments describe ${ENVIRONMENT_NAME} --location us-central1
. - Upload DAGs to the bucket. Make sure to replace
<dag_gcs_prefix>
with the value from the previous step:./upload_dags.sh <dag_gcs_prefix>
. - To understand more about how the Airflow DAGs are structured read this article.
- Note that it will take one or more days for
polygon_export_dag
to finish exporting the historical data. - To setup automated deployment of DAGs refer to Cloud Build Configuration.
It is recommended to use a dedicated Cloud Composer environment for integration testing with Airflow.
To run integration tests:
- Create a new environment following the steps in the Setting Up section.
- On the Configuring Airflow Variables step specify the following additional configuration variables:
export_end_date
:2020-05-30
load_end_date
:2020-05-30
- This will run the DAGs only for the first day. At the end of the load DAG the verification tasks will ensure the correctness of the result.
To troubleshoot issues with Airflow tasks use View Log button in the Airflow console for individual tasks. Read Airflow UI overview and Troubleshooting DAGs for more info.
In rare cases you may need to inspect GKE cluster logs in GKE console.