Skip to content

Commit

Permalink
update simple-salesforce type hints to support 1.12.6 (#39047)
Browse files Browse the repository at this point in the history
(cherry picked from commit 3025096)
  • Loading branch information
hussein-awala authored and potiuk committed Apr 18, 2024
1 parent dbebce4 commit a4027b7
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 9 deletions.
6 changes: 4 additions & 2 deletions airflow/providers/salesforce/hooks/salesforce.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
import logging
import time
from functools import cached_property
from typing import TYPE_CHECKING, Any, Iterable
from typing import TYPE_CHECKING, Any, Iterable, cast

from simple_salesforce import Salesforce, api

Expand All @@ -36,6 +36,7 @@
if TYPE_CHECKING:
import pandas as pd
from requests import Session
from simple_salesforce.api import SFType

log = logging.getLogger(__name__)

Expand Down Expand Up @@ -190,8 +191,9 @@ def describe_object(self, obj: str) -> dict:
:return: the description of the Salesforce object.
"""
conn = self.get_conn()
sftype: SFType = cast("SFType", conn.__getattr__(obj))

return conn.__getattr__(obj).describe()
return sftype.describe()

def get_available_fields(self, obj: str) -> list[str]:
"""
Expand Down
16 changes: 9 additions & 7 deletions airflow/providers/salesforce/operators/bulk.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,13 @@
# under the License.
from __future__ import annotations

from typing import TYPE_CHECKING
from typing import TYPE_CHECKING, Iterable, cast

from airflow.models import BaseOperator
from airflow.providers.salesforce.hooks.salesforce import SalesforceHook

if TYPE_CHECKING:
from simple_salesforce.bulk import SFBulkHandler
from typing_extensions import Literal

from airflow.utils.context import Context
Expand Down Expand Up @@ -88,29 +89,30 @@ def execute(self, context: Context):
"""
sf_hook = SalesforceHook(salesforce_conn_id=self.salesforce_conn_id)
conn = sf_hook.get_conn()
bulk: SFBulkHandler = cast("SFBulkHandler", conn.__getattr__("bulk"))

result = []
result: Iterable = []
if self.operation == "insert":
result = conn.bulk.__getattr__(self.object_name).insert(
result = bulk.__getattr__(self.object_name).insert(
data=self.payload, batch_size=self.batch_size, use_serial=self.use_serial
)
elif self.operation == "update":
result = conn.bulk.__getattr__(self.object_name).update(
result = bulk.__getattr__(self.object_name).update(
data=self.payload, batch_size=self.batch_size, use_serial=self.use_serial
)
elif self.operation == "upsert":
result = conn.bulk.__getattr__(self.object_name).upsert(
result = bulk.__getattr__(self.object_name).upsert(
data=self.payload,
external_id_field=self.external_id_field,
batch_size=self.batch_size,
use_serial=self.use_serial,
)
elif self.operation == "delete":
result = conn.bulk.__getattr__(self.object_name).delete(
result = bulk.__getattr__(self.object_name).delete(
data=self.payload, batch_size=self.batch_size, use_serial=self.use_serial
)
elif self.operation == "hard_delete":
result = conn.bulk.__getattr__(self.object_name).hard_delete(
result = bulk.__getattr__(self.object_name).hard_delete(
data=self.payload, batch_size=self.batch_size, use_serial=self.use_serial
)

Expand Down

0 comments on commit a4027b7

Please sign in to comment.