From 2f703df12dfd6511722ff9a82d5a569d092fccc2 Mon Sep 17 00:00:00 2001 From: chipmyersjr Date: Fri, 30 Oct 2020 11:35:10 -0700 Subject: [PATCH] Add SalesforceToGcsOperator (#10760) Adds SalesforceToGcsOperator that allows users to transfer data from Salesforce to GCS bucket. Co-authored-by: Tomek Urbaszek --- CONTRIBUTING.rst | 2 +- airflow/providers/dependencies.json | 1 + .../example_dags/example_salesforce_to_gcs.py | 123 +++++++++++++++++ .../cloud/transfers/salesforce_to_gcs.py | 125 ++++++++++++++++++ .../google/transfer/salesforce_to_gcs.rst | 49 +++++++ docs/operators-and-hooks-ref.rst | 5 + .../cloud/transfers/test_salesforce_to_gcs.py | 94 +++++++++++++ .../test_salesforce_to_gcs_system.py | 39 ++++++ tests/test_utils/salesforce_system_helpers.py | 56 ++++++++ 9 files changed, 493 insertions(+), 1 deletion(-) create mode 100644 airflow/providers/google/cloud/example_dags/example_salesforce_to_gcs.py create mode 100644 airflow/providers/google/cloud/transfers/salesforce_to_gcs.py create mode 100644 docs/howto/operator/google/transfer/salesforce_to_gcs.rst create mode 100644 tests/providers/google/cloud/transfers/test_salesforce_to_gcs.py create mode 100644 tests/providers/google/cloud/transfers/test_salesforce_to_gcs_system.py create mode 100644 tests/test_utils/salesforce_system_helpers.py diff --git a/CONTRIBUTING.rst b/CONTRIBUTING.rst index e047ce1640870..3c7a9e315127e 100644 --- a/CONTRIBUTING.rst +++ b/CONTRIBUTING.rst @@ -584,7 +584,7 @@ apache.hive amazon,microsoft.mssql,mysql,presto,samba,vertica apache.livy http dingding http discord http -google amazon,apache.cassandra,cncf.kubernetes,facebook,microsoft.azure,microsoft.mssql,mysql,postgres,presto,sftp +google amazon,apache.cassandra,cncf.kubernetes,facebook,microsoft.azure,microsoft.mssql,mysql,postgres,presto,salesforce,sftp hashicorp google microsoft.azure google,oracle microsoft.mssql odbc diff --git a/airflow/providers/dependencies.json b/airflow/providers/dependencies.json index 467f54713b1ec..5f07202f6730e 100644 --- a/airflow/providers/dependencies.json +++ b/airflow/providers/dependencies.json @@ -38,6 +38,7 @@ "mysql", "postgres", "presto", + "salesforce", "sftp" ], "hashicorp": [ diff --git a/airflow/providers/google/cloud/example_dags/example_salesforce_to_gcs.py b/airflow/providers/google/cloud/example_dags/example_salesforce_to_gcs.py new file mode 100644 index 0000000000000..a49b267b34a4d --- /dev/null +++ b/airflow/providers/google/cloud/example_dags/example_salesforce_to_gcs.py @@ -0,0 +1,123 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +""" +Example Airflow DAG that shows how to use SalesforceToGcsOperator. +""" +import os + +from airflow import models +from airflow.providers.google.cloud.operators.bigquery import ( + BigQueryCreateEmptyDatasetOperator, + BigQueryCreateEmptyTableOperator, + BigQueryDeleteDatasetOperator, + BigQueryExecuteQueryOperator, +) +from airflow.providers.google.cloud.operators.gcs import GCSCreateBucketOperator, GCSDeleteBucketOperator +from airflow.providers.google.cloud.transfers.gcs_to_bigquery import GCSToBigQueryOperator +from airflow.providers.google.cloud.transfers.salesforce_to_gcs import SalesforceToGcsOperator +from airflow.utils.dates import days_ago + +GCP_PROJECT_ID = os.environ.get("GCP_PROJECT_ID", "example-project") +GCS_BUCKET = os.environ.get("GCS_BUCKET", "airflow-salesforce-bucket") +DATASET_NAME = os.environ.get("SALESFORCE_DATASET_NAME", "salesforce_test_dataset") +TABLE_NAME = os.environ.get("SALESFORCE_TABLE_NAME", "salesforce_test_datatable") +GCS_OBJ_PATH = os.environ.get("GCS_OBJ_PATH", "results.csv") +QUERY = "SELECT Id, Name, Company, Phone, Email, CreatedDate, LastModifiedDate, IsDeleted FROM Lead" +GCS_CONN_ID = os.environ.get("GCS_CONN_ID", "google_cloud_default") +SALESFORCE_CONN_ID = os.environ.get("SALESFORCE_CONN_ID", "salesforce_default") + + +with models.DAG( + "example_salesforce_to_gcs", + schedule_interval=None, # Override to match your needs + start_date=days_ago(1), +) as dag: + create_bucket = GCSCreateBucketOperator( + task_id="create_bucket", + bucket_name=GCS_BUCKET, + project_id=GCP_PROJECT_ID, + gcp_conn_id=GCS_CONN_ID, + ) + + # [START howto_operator_salesforce_to_gcs] + gcs_upload_task = SalesforceToGcsOperator( + query=QUERY, + include_deleted=True, + bucket_name=GCS_BUCKET, + object_name=GCS_OBJ_PATH, + salesforce_conn_id=SALESFORCE_CONN_ID, + export_format='csv', + coerce_to_timestamp=False, + record_time_added=False, + gcp_conn_id=GCS_CONN_ID, + task_id="upload_to_gcs", + dag=dag, + ) + # [END howto_operator_salesforce_to_gcs] + + create_dataset = BigQueryCreateEmptyDatasetOperator( + task_id="create_dataset", dataset_id=DATASET_NAME, project_id=GCP_PROJECT_ID, gcp_conn_id=GCS_CONN_ID + ) + + create_table = BigQueryCreateEmptyTableOperator( + task_id="create_table", + dataset_id=DATASET_NAME, + table_id=TABLE_NAME, + schema_fields=[ + {'name': 'id', 'type': 'STRING', 'mode': 'NULLABLE'}, + {'name': 'name', 'type': 'STRING', 'mode': 'NULLABLE'}, + {'name': 'company', 'type': 'STRING', 'mode': 'NULLABLE'}, + {'name': 'phone', 'type': 'STRING', 'mode': 'NULLABLE'}, + {'name': 'email', 'type': 'STRING', 'mode': 'NULLABLE'}, + {'name': 'createddate', 'type': 'STRING', 'mode': 'NULLABLE'}, + {'name': 'lastmodifieddate', 'type': 'STRING', 'mode': 'NULLABLE'}, + {'name': 'isdeleted', 'type': 'BOOL', 'mode': 'NULLABLE'}, + ], + ) + + load_csv = GCSToBigQueryOperator( + task_id='gcs_to_bq', + bucket=GCS_BUCKET, + source_objects=[GCS_OBJ_PATH], + destination_project_dataset_table=f"{DATASET_NAME}.{TABLE_NAME}", + write_disposition='WRITE_TRUNCATE', + ) + + read_data_from_gcs = BigQueryExecuteQueryOperator( + task_id="read_data_from_gcs", + sql=f"SELECT COUNT(*) FROM `{GCP_PROJECT_ID}.{DATASET_NAME}.{TABLE_NAME}`", + use_legacy_sql=False, + ) + + delete_bucket = GCSDeleteBucketOperator( + task_id="delete_bucket", + bucket_name=GCS_BUCKET, + ) + + delete_dataset = BigQueryDeleteDatasetOperator( + task_id="delete_dataset", + project_id=GCP_PROJECT_ID, + dataset_id=DATASET_NAME, + delete_contents=True, + ) + + create_bucket >> gcs_upload_task >> load_csv + create_dataset >> create_table >> load_csv + load_csv >> read_data_from_gcs + read_data_from_gcs >> delete_bucket + read_data_from_gcs >> delete_dataset diff --git a/airflow/providers/google/cloud/transfers/salesforce_to_gcs.py b/airflow/providers/google/cloud/transfers/salesforce_to_gcs.py new file mode 100644 index 0000000000000..d2e680c887f9c --- /dev/null +++ b/airflow/providers/google/cloud/transfers/salesforce_to_gcs.py @@ -0,0 +1,125 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import os +import tempfile +from typing import Dict, Optional + +from airflow.models import BaseOperator +from airflow.providers.google.cloud.hooks.gcs import GCSHook +from airflow.providers.salesforce.hooks.salesforce import SalesforceHook + + +class SalesforceToGcsOperator(BaseOperator): + """ + Submits Salesforce query and uploads results to Google Cloud Storage + + .. seealso:: + For more information on how to use this operator, take a look at the guide: + :ref:`howto/operator:SalesforceToGcsOperator` + + :param query: The query to make to Salesforce. + :type query: str + :param bucket_name: The bucket to upload to. + :type bucket_name: str + :param object_name: The object name to set when uploading the file. + :type object_name: str + :param salesforce_conn_id: the name of the connection that has the parameters + we need to connect to Salesforce. + :type conn_id: str + :param include_deleted: True if the query should include deleted records. + :type include_deleted: bool + :param query_params: Additional optional arguments + :type query_params: dict + :param export_format: Desired format of files to be exported. + :type export_format: str + :param coerce_to_timestamp: True if you want all datetime fields to be converted into Unix timestamps. + False if you want them to be left in the same format as they were in Salesforce. + Leaving the value as False will result in datetimes being strings. Default: False + :type coerce_to_timestamp: bool + :param record_time_added: True if you want to add a Unix timestamp field + to the resulting data that marks when the data was fetched from Salesforce. Default: False + :type record_time_added: bool + :param gzip: Option to compress local file or file data for upload + :type gzip: bool + :param gcp_conn_id: the name of the connection that has the parameters we need to connect to GCS. + :type conn_id: str + """ + + template_fields = ( + 'query', + 'bucket_name', + 'object_name', + ) + template_ext = ('.sql',) + + def __init__( + self, + *, + query: str, + bucket_name: str, + object_name: str, + salesforce_conn_id: str, + include_deleted: bool = False, + query_params: Optional[dict] = None, + export_format: str = "csv", + coerce_to_timestamp: bool = False, + record_time_added: bool = False, + gzip: bool = False, + gcp_conn_id: str = "google_cloud_default", + **kwargs, + ): + super().__init__(**kwargs) + self.query = query + self.bucket_name = bucket_name + self.object_name = object_name + self.salesforce_conn_id = salesforce_conn_id + self.export_format = export_format + self.coerce_to_timestamp = coerce_to_timestamp + self.record_time_added = record_time_added + self.gzip = gzip + self.gcp_conn_id = gcp_conn_id + self.include_deleted = include_deleted + self.query_params = query_params + + def execute(self, context: Dict): + salesforce = SalesforceHook(conn_id=self.salesforce_conn_id) + response = salesforce.make_query( + query=self.query, include_deleted=self.include_deleted, query_params=self.query_params + ) + + with tempfile.TemporaryDirectory() as tmp: + path = os.path.join(tmp, "salesforce_temp_file") + salesforce.write_object_to_file( + query_results=response["records"], + filename=path, + fmt=self.export_format, + coerce_to_timestamp=self.coerce_to_timestamp, + record_time_added=self.record_time_added, + ) + + hook = GCSHook(gcp_conn_id=self.gcp_conn_id) + hook.upload( + bucket_name=self.bucket_name, + object_name=self.object_name, + filename=path, + gzip=self.gzip, + ) + + gcs_uri = "gs://{}/{}".format(self.bucket_name, self.object_name) + self.log.info("%s uploaded to GCS", gcs_uri) + return gcs_uri diff --git a/docs/howto/operator/google/transfer/salesforce_to_gcs.rst b/docs/howto/operator/google/transfer/salesforce_to_gcs.rst new file mode 100644 index 0000000000000..ffe093fb47374 --- /dev/null +++ b/docs/howto/operator/google/transfer/salesforce_to_gcs.rst @@ -0,0 +1,49 @@ + .. Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + .. http://www.apache.org/licenses/LICENSE-2.0 + + .. Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +Salesforce To GCS Operators +============================== + +.. contents:: + :depth: 1 + :local: + +Prerequisite Tasks +^^^^^^^^^^^^^^^^^^ + +.. include:: /howto/operator/google/_partials/prerequisite_tasks.rst + +.. _howto/operator:SalesforceToGcsOperator: + +SalesforceToGcsOperator +------------------------------ + +Use the +:class:`~airflow.providers.google.cloud.transfers.salesforce_to_gcs.SalesforceToGcsOperator` +to execute a Salesforce query to fetch data and load it to GCS. + +.. exampleinclude:: /../airflow/providers/google/cloud/example_dags/example_salesforce_to_gcs.py + :language: python + :start-after: [START howto_operator_salesforce_to_gcs] + :end-before: [END howto_operator_salesforce_to_gcs] + +Reference +^^^^^^^^^ + +For further information, look at: + +* `Simple Salesforce Documentation `__ diff --git a/docs/operators-and-hooks-ref.rst b/docs/operators-and-hooks-ref.rst index 4533a270d1803..558cdc6eac88b 100644 --- a/docs/operators-and-hooks-ref.rst +++ b/docs/operators-and-hooks-ref.rst @@ -992,6 +992,11 @@ These integrations allow you to copy data from/to Google Cloud. - :doc:`How to use ` - :mod:`airflow.providers.google.cloud.transfers.facebook_ads_to_gcs` + * - `Salesforce `__ + - `Google Cloud Storage (GCS) `__ + - :doc:`How to use ` + - :mod:`airflow.providers.google.cloud.transfers.salesforce_to_gcs` + * - `Google Ads `__ - `Google Cloud Storage (GCS) `__ diff --git a/tests/providers/google/cloud/transfers/test_salesforce_to_gcs.py b/tests/providers/google/cloud/transfers/test_salesforce_to_gcs.py new file mode 100644 index 0000000000000..e42da7a675197 --- /dev/null +++ b/tests/providers/google/cloud/transfers/test_salesforce_to_gcs.py @@ -0,0 +1,94 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import unittest +from collections import OrderedDict + +import mock + +from airflow.providers.google.cloud.hooks.gcs import GCSHook +from airflow.providers.google.cloud.transfers.salesforce_to_gcs import SalesforceToGcsOperator +from airflow.providers.salesforce.hooks.salesforce import SalesforceHook + +TASK_ID = "test-task-id" +QUERY = "SELECT id, company FROM Lead WHERE company = 'Hello World Inc'" +SALESFORCE_CONNECTION_ID = "test-salesforce-connection" +GCS_BUCKET = "test-bucket" +GCS_OBJECT_PATH = "path/to/test-file-path" +EXPECTED_GCS_URI = "gs://{}/{}".format(GCS_BUCKET, GCS_OBJECT_PATH) +GCP_CONNECTION_ID = "google_cloud_default" +SALESFORCE_RESPONSE = { + 'records': [ + OrderedDict( + [ + ( + 'attributes', + OrderedDict( + [('type', 'Lead'), ('url', '/services/data/v42.0/sobjects/Lead/00Q3t00001eJ7AnEAK')] + ), + ), + ('Id', '00Q3t00001eJ7AnEAK'), + ('Company', 'Hello World Inc'), + ] + ) + ], + 'totalSize': 1, + 'done': True, +} +INCLUDE_DELETED = True +QUERY_PARAMS = {"DEFAULT_SETTING": "ENABLED"} + + +class TestSalesforceToGcsOperator(unittest.TestCase): + @mock.patch.object(GCSHook, 'upload') + @mock.patch.object(SalesforceHook, 'write_object_to_file') + @mock.patch.object(SalesforceHook, 'make_query') + def test_execute(self, mock_make_query, mock_write_object_to_file, mock_upload): + mock_make_query.return_value = SALESFORCE_RESPONSE + + operator = SalesforceToGcsOperator( + query=QUERY, + bucket_name=GCS_BUCKET, + object_name=GCS_OBJECT_PATH, + salesforce_conn_id=SALESFORCE_CONNECTION_ID, + gcp_conn_id=GCP_CONNECTION_ID, + include_deleted=INCLUDE_DELETED, + query_params=QUERY_PARAMS, + export_format="json", + coerce_to_timestamp=True, + record_time_added=True, + task_id=TASK_ID, + ) + result = operator.execute({}) + + mock_make_query.assert_called_once_with( + query=QUERY, include_deleted=INCLUDE_DELETED, query_params=QUERY_PARAMS + ) + + mock_write_object_to_file.assert_called_once_with( + query_results=SALESFORCE_RESPONSE['records'], + filename=mock.ANY, + fmt="json", + coerce_to_timestamp=True, + record_time_added=True, + ) + + mock_upload.assert_called_once_with( + bucket_name=GCS_BUCKET, object_name=GCS_OBJECT_PATH, filename=mock.ANY, gzip=False + ) + + self.assertEqual(EXPECTED_GCS_URI, result) diff --git a/tests/providers/google/cloud/transfers/test_salesforce_to_gcs_system.py b/tests/providers/google/cloud/transfers/test_salesforce_to_gcs_system.py new file mode 100644 index 0000000000000..ffdf49584956f --- /dev/null +++ b/tests/providers/google/cloud/transfers/test_salesforce_to_gcs_system.py @@ -0,0 +1,39 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import os +import pytest + +from tests.providers.google.cloud.utils.gcp_authenticator import GCP_BIGQUERY_KEY +from tests.test_utils.gcp_system_helpers import CLOUD_DAG_FOLDER, GoogleSystemTest, provide_gcp_context +from tests.test_utils.salesforce_system_helpers import provide_salesforce_connection + +CREDENTIALS_DIR = os.environ.get('CREDENTIALS_DIR', '/files/airflow-breeze-config/keys') +SALESFORCE_KEY = 'salesforce.json' +SALESFORCE_CREDENTIALS_PATH = os.path.join(CREDENTIALS_DIR, SALESFORCE_KEY) + + +@pytest.mark.backend("mysql", "postgres") +@pytest.mark.credential_file(GCP_BIGQUERY_KEY) +@pytest.mark.credential_file(SALESFORCE_KEY) +@pytest.mark.system("google.cloud") +@pytest.mark.system("salesforce") +class TestSalesforceIntoGCSExample(GoogleSystemTest): + @provide_gcp_context(GCP_BIGQUERY_KEY) + @provide_salesforce_connection(SALESFORCE_CREDENTIALS_PATH) + def test_run_example_dag_salesforce_to_gcs_operator(self): + self.run_dag('example_salesforce_to_gcs', CLOUD_DAG_FOLDER) diff --git a/tests/test_utils/salesforce_system_helpers.py b/tests/test_utils/salesforce_system_helpers.py new file mode 100644 index 0000000000000..4a0bc1c4f4ec7 --- /dev/null +++ b/tests/test_utils/salesforce_system_helpers.py @@ -0,0 +1,56 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import json +import os +from contextlib import contextmanager + +from airflow.exceptions import AirflowException +from airflow.models import Connection +from airflow.utils.process_utils import patch_environ + +CONFIG_REQUIRED_FIELDS = ["host", "login", "password", "security_token"] +SALESFORCE_CONNECTION_ID = os.environ.get('SALESFORCE_CONNECTION_ID', 'salesforce_default') +CONNECTION_TYPE = os.environ.get('CONNECTION_TYPE', 'http') + + +@contextmanager +def provide_salesforce_connection(key_file_path: str): + """ + Context manager that provides a temporary value of SALESFORCE_DEFAULT connection. + + :param key_file_path: Path to file with SALESFORCE credentials .json file. + :type key_file_path: str + """ + if not key_file_path.endswith(".json"): + raise AirflowException("Use a JSON key file.") + with open(key_file_path, 'r') as credentials: + creds = json.load(credentials) + missing_keys = CONFIG_REQUIRED_FIELDS - creds.keys() + if missing_keys: + message = "{missing_keys} fields are missing".format(missing_keys=missing_keys) + raise AirflowException(message) + conn = Connection( + conn_id=SALESFORCE_CONNECTION_ID, + conn_type=CONNECTION_TYPE, + host=creds["host"], + login=creds["login"], + password=creds["password"], + extra=json.dumps({"security_token": creds["security_token"]}), + ) + with patch_environ({f"AIRFLOW_CONN_{conn.conn_id.upper()}": conn.get_uri()}): + yield