Skip to content

Commit 2f703df

Browse files
Add SalesforceToGcsOperator (#10760)
Adds SalesforceToGcsOperator that allows users to transfer data from Salesforce to GCS bucket. Co-authored-by: Tomek Urbaszek <turbaszek@gmail.com>
1 parent 9687b3b commit 2f703df

File tree

9 files changed

+493
-1
lines changed

9 files changed

+493
-1
lines changed

CONTRIBUTING.rst

+1-1
Original file line numberDiff line numberDiff line change
@@ -584,7 +584,7 @@ apache.hive amazon,microsoft.mssql,mysql,presto,samba,vertica
584584
apache.livy http
585585
dingding http
586586
discord http
587-
google amazon,apache.cassandra,cncf.kubernetes,facebook,microsoft.azure,microsoft.mssql,mysql,postgres,presto,sftp
587+
google amazon,apache.cassandra,cncf.kubernetes,facebook,microsoft.azure,microsoft.mssql,mysql,postgres,presto,salesforce,sftp
588588
hashicorp google
589589
microsoft.azure google,oracle
590590
microsoft.mssql odbc

airflow/providers/dependencies.json

+1
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
"mysql",
3939
"postgres",
4040
"presto",
41+
"salesforce",
4142
"sftp"
4243
],
4344
"hashicorp": [
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,123 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one
2+
# or more contributor license agreements. See the NOTICE file
3+
# distributed with this work for additional information
4+
# regarding copyright ownership. The ASF licenses this file
5+
# to you under the Apache License, Version 2.0 (the
6+
# "License"); you may not use this file except in compliance
7+
# with the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing,
12+
# software distributed under the License is distributed on an
13+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
# KIND, either express or implied. See the License for the
15+
# specific language governing permissions and limitations
16+
# under the License.
17+
18+
"""
19+
Example Airflow DAG that shows how to use SalesforceToGcsOperator.
20+
"""
21+
import os
22+
23+
from airflow import models
24+
from airflow.providers.google.cloud.operators.bigquery import (
25+
BigQueryCreateEmptyDatasetOperator,
26+
BigQueryCreateEmptyTableOperator,
27+
BigQueryDeleteDatasetOperator,
28+
BigQueryExecuteQueryOperator,
29+
)
30+
from airflow.providers.google.cloud.operators.gcs import GCSCreateBucketOperator, GCSDeleteBucketOperator
31+
from airflow.providers.google.cloud.transfers.gcs_to_bigquery import GCSToBigQueryOperator
32+
from airflow.providers.google.cloud.transfers.salesforce_to_gcs import SalesforceToGcsOperator
33+
from airflow.utils.dates import days_ago
34+
35+
GCP_PROJECT_ID = os.environ.get("GCP_PROJECT_ID", "example-project")
36+
GCS_BUCKET = os.environ.get("GCS_BUCKET", "airflow-salesforce-bucket")
37+
DATASET_NAME = os.environ.get("SALESFORCE_DATASET_NAME", "salesforce_test_dataset")
38+
TABLE_NAME = os.environ.get("SALESFORCE_TABLE_NAME", "salesforce_test_datatable")
39+
GCS_OBJ_PATH = os.environ.get("GCS_OBJ_PATH", "results.csv")
40+
QUERY = "SELECT Id, Name, Company, Phone, Email, CreatedDate, LastModifiedDate, IsDeleted FROM Lead"
41+
GCS_CONN_ID = os.environ.get("GCS_CONN_ID", "google_cloud_default")
42+
SALESFORCE_CONN_ID = os.environ.get("SALESFORCE_CONN_ID", "salesforce_default")
43+
44+
45+
with models.DAG(
46+
"example_salesforce_to_gcs",
47+
schedule_interval=None, # Override to match your needs
48+
start_date=days_ago(1),
49+
) as dag:
50+
create_bucket = GCSCreateBucketOperator(
51+
task_id="create_bucket",
52+
bucket_name=GCS_BUCKET,
53+
project_id=GCP_PROJECT_ID,
54+
gcp_conn_id=GCS_CONN_ID,
55+
)
56+
57+
# [START howto_operator_salesforce_to_gcs]
58+
gcs_upload_task = SalesforceToGcsOperator(
59+
query=QUERY,
60+
include_deleted=True,
61+
bucket_name=GCS_BUCKET,
62+
object_name=GCS_OBJ_PATH,
63+
salesforce_conn_id=SALESFORCE_CONN_ID,
64+
export_format='csv',
65+
coerce_to_timestamp=False,
66+
record_time_added=False,
67+
gcp_conn_id=GCS_CONN_ID,
68+
task_id="upload_to_gcs",
69+
dag=dag,
70+
)
71+
# [END howto_operator_salesforce_to_gcs]
72+
73+
create_dataset = BigQueryCreateEmptyDatasetOperator(
74+
task_id="create_dataset", dataset_id=DATASET_NAME, project_id=GCP_PROJECT_ID, gcp_conn_id=GCS_CONN_ID
75+
)
76+
77+
create_table = BigQueryCreateEmptyTableOperator(
78+
task_id="create_table",
79+
dataset_id=DATASET_NAME,
80+
table_id=TABLE_NAME,
81+
schema_fields=[
82+
{'name': 'id', 'type': 'STRING', 'mode': 'NULLABLE'},
83+
{'name': 'name', 'type': 'STRING', 'mode': 'NULLABLE'},
84+
{'name': 'company', 'type': 'STRING', 'mode': 'NULLABLE'},
85+
{'name': 'phone', 'type': 'STRING', 'mode': 'NULLABLE'},
86+
{'name': 'email', 'type': 'STRING', 'mode': 'NULLABLE'},
87+
{'name': 'createddate', 'type': 'STRING', 'mode': 'NULLABLE'},
88+
{'name': 'lastmodifieddate', 'type': 'STRING', 'mode': 'NULLABLE'},
89+
{'name': 'isdeleted', 'type': 'BOOL', 'mode': 'NULLABLE'},
90+
],
91+
)
92+
93+
load_csv = GCSToBigQueryOperator(
94+
task_id='gcs_to_bq',
95+
bucket=GCS_BUCKET,
96+
source_objects=[GCS_OBJ_PATH],
97+
destination_project_dataset_table=f"{DATASET_NAME}.{TABLE_NAME}",
98+
write_disposition='WRITE_TRUNCATE',
99+
)
100+
101+
read_data_from_gcs = BigQueryExecuteQueryOperator(
102+
task_id="read_data_from_gcs",
103+
sql=f"SELECT COUNT(*) FROM `{GCP_PROJECT_ID}.{DATASET_NAME}.{TABLE_NAME}`",
104+
use_legacy_sql=False,
105+
)
106+
107+
delete_bucket = GCSDeleteBucketOperator(
108+
task_id="delete_bucket",
109+
bucket_name=GCS_BUCKET,
110+
)
111+
112+
delete_dataset = BigQueryDeleteDatasetOperator(
113+
task_id="delete_dataset",
114+
project_id=GCP_PROJECT_ID,
115+
dataset_id=DATASET_NAME,
116+
delete_contents=True,
117+
)
118+
119+
create_bucket >> gcs_upload_task >> load_csv
120+
create_dataset >> create_table >> load_csv
121+
load_csv >> read_data_from_gcs
122+
read_data_from_gcs >> delete_bucket
123+
read_data_from_gcs >> delete_dataset
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,125 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one
2+
# or more contributor license agreements. See the NOTICE file
3+
# distributed with this work for additional information
4+
# regarding copyright ownership. The ASF licenses this file
5+
# to you under the Apache License, Version 2.0 (the
6+
# "License"); you may not use this file except in compliance
7+
# with the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing,
12+
# software distributed under the License is distributed on an
13+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
# KIND, either express or implied. See the License for the
15+
# specific language governing permissions and limitations
16+
# under the License.
17+
18+
import os
19+
import tempfile
20+
from typing import Dict, Optional
21+
22+
from airflow.models import BaseOperator
23+
from airflow.providers.google.cloud.hooks.gcs import GCSHook
24+
from airflow.providers.salesforce.hooks.salesforce import SalesforceHook
25+
26+
27+
class SalesforceToGcsOperator(BaseOperator):
28+
"""
29+
Submits Salesforce query and uploads results to Google Cloud Storage
30+
31+
.. seealso::
32+
For more information on how to use this operator, take a look at the guide:
33+
:ref:`howto/operator:SalesforceToGcsOperator`
34+
35+
:param query: The query to make to Salesforce.
36+
:type query: str
37+
:param bucket_name: The bucket to upload to.
38+
:type bucket_name: str
39+
:param object_name: The object name to set when uploading the file.
40+
:type object_name: str
41+
:param salesforce_conn_id: the name of the connection that has the parameters
42+
we need to connect to Salesforce.
43+
:type conn_id: str
44+
:param include_deleted: True if the query should include deleted records.
45+
:type include_deleted: bool
46+
:param query_params: Additional optional arguments
47+
:type query_params: dict
48+
:param export_format: Desired format of files to be exported.
49+
:type export_format: str
50+
:param coerce_to_timestamp: True if you want all datetime fields to be converted into Unix timestamps.
51+
False if you want them to be left in the same format as they were in Salesforce.
52+
Leaving the value as False will result in datetimes being strings. Default: False
53+
:type coerce_to_timestamp: bool
54+
:param record_time_added: True if you want to add a Unix timestamp field
55+
to the resulting data that marks when the data was fetched from Salesforce. Default: False
56+
:type record_time_added: bool
57+
:param gzip: Option to compress local file or file data for upload
58+
:type gzip: bool
59+
:param gcp_conn_id: the name of the connection that has the parameters we need to connect to GCS.
60+
:type conn_id: str
61+
"""
62+
63+
template_fields = (
64+
'query',
65+
'bucket_name',
66+
'object_name',
67+
)
68+
template_ext = ('.sql',)
69+
70+
def __init__(
71+
self,
72+
*,
73+
query: str,
74+
bucket_name: str,
75+
object_name: str,
76+
salesforce_conn_id: str,
77+
include_deleted: bool = False,
78+
query_params: Optional[dict] = None,
79+
export_format: str = "csv",
80+
coerce_to_timestamp: bool = False,
81+
record_time_added: bool = False,
82+
gzip: bool = False,
83+
gcp_conn_id: str = "google_cloud_default",
84+
**kwargs,
85+
):
86+
super().__init__(**kwargs)
87+
self.query = query
88+
self.bucket_name = bucket_name
89+
self.object_name = object_name
90+
self.salesforce_conn_id = salesforce_conn_id
91+
self.export_format = export_format
92+
self.coerce_to_timestamp = coerce_to_timestamp
93+
self.record_time_added = record_time_added
94+
self.gzip = gzip
95+
self.gcp_conn_id = gcp_conn_id
96+
self.include_deleted = include_deleted
97+
self.query_params = query_params
98+
99+
def execute(self, context: Dict):
100+
salesforce = SalesforceHook(conn_id=self.salesforce_conn_id)
101+
response = salesforce.make_query(
102+
query=self.query, include_deleted=self.include_deleted, query_params=self.query_params
103+
)
104+
105+
with tempfile.TemporaryDirectory() as tmp:
106+
path = os.path.join(tmp, "salesforce_temp_file")
107+
salesforce.write_object_to_file(
108+
query_results=response["records"],
109+
filename=path,
110+
fmt=self.export_format,
111+
coerce_to_timestamp=self.coerce_to_timestamp,
112+
record_time_added=self.record_time_added,
113+
)
114+
115+
hook = GCSHook(gcp_conn_id=self.gcp_conn_id)
116+
hook.upload(
117+
bucket_name=self.bucket_name,
118+
object_name=self.object_name,
119+
filename=path,
120+
gzip=self.gzip,
121+
)
122+
123+
gcs_uri = "gs://{}/{}".format(self.bucket_name, self.object_name)
124+
self.log.info("%s uploaded to GCS", gcs_uri)
125+
return gcs_uri
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
.. Licensed to the Apache Software Foundation (ASF) under one
2+
or more contributor license agreements. See the NOTICE file
3+
distributed with this work for additional information
4+
regarding copyright ownership. The ASF licenses this file
5+
to you under the Apache License, Version 2.0 (the
6+
"License"); you may not use this file except in compliance
7+
with the License. You may obtain a copy of the License at
8+
9+
.. http://www.apache.org/licenses/LICENSE-2.0
10+
11+
.. Unless required by applicable law or agreed to in writing,
12+
software distributed under the License is distributed on an
13+
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
KIND, either express or implied. See the License for the
15+
specific language governing permissions and limitations
16+
under the License.
17+
18+
Salesforce To GCS Operators
19+
==============================
20+
21+
.. contents::
22+
:depth: 1
23+
:local:
24+
25+
Prerequisite Tasks
26+
^^^^^^^^^^^^^^^^^^
27+
28+
.. include:: /howto/operator/google/_partials/prerequisite_tasks.rst
29+
30+
.. _howto/operator:SalesforceToGcsOperator:
31+
32+
SalesforceToGcsOperator
33+
------------------------------
34+
35+
Use the
36+
:class:`~airflow.providers.google.cloud.transfers.salesforce_to_gcs.SalesforceToGcsOperator`
37+
to execute a Salesforce query to fetch data and load it to GCS.
38+
39+
.. exampleinclude:: /../airflow/providers/google/cloud/example_dags/example_salesforce_to_gcs.py
40+
:language: python
41+
:start-after: [START howto_operator_salesforce_to_gcs]
42+
:end-before: [END howto_operator_salesforce_to_gcs]
43+
44+
Reference
45+
^^^^^^^^^
46+
47+
For further information, look at:
48+
49+
* `Simple Salesforce Documentation <https://simple-salesforce.readthedocs.io/en/latest/>`__

docs/operators-and-hooks-ref.rst

+5
Original file line numberDiff line numberDiff line change
@@ -992,6 +992,11 @@ These integrations allow you to copy data from/to Google Cloud.
992992
- :doc:`How to use <howto/operator/google/transfer/facebook_ads_to_gcs>`
993993
- :mod:`airflow.providers.google.cloud.transfers.facebook_ads_to_gcs`
994994

995+
* - `Salesforce <https://www.salesforce.com/>`__
996+
- `Google Cloud Storage (GCS) <https://cloud.google.com/gcs/>`__
997+
- :doc:`How to use <howto/operator/google/transfer/salesforce_to_gcs>`
998+
- :mod:`airflow.providers.google.cloud.transfers.salesforce_to_gcs`
999+
9951000

9961001
* - `Google Ads <https://ads.google.com/>`__
9971002
- `Google Cloud Storage (GCS) <https://cloud.google.com/gcs/>`__

0 commit comments

Comments
 (0)