Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable OAuth2 authentication #36

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions tap_zuora/apis.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import pendulum
import singer
from singer import metadata
from client import ApiException


MAX_EXPORT_DAYS = 30
Expand Down Expand Up @@ -59,7 +60,7 @@ class Aqua:
ZOQL_DATE_FORMAT = "%Y-%m-%dT%H:%M:%S"
# Specifying incrementalTime requires this format, but ZOQL requires the 'T'
PARAMETER_DATE_FORMAT = "%Y-%m-%d %H:%M:%S"

# Zuora's documentation describes some objects which are not supported for deleted
# See https://knowledgecenter.zuora.com/DC_Developers/T_Aggregate_Query_API/B_Submit_Query/a_Export_Deleted_Data
# and https://github.com/singer-io/tap-zuora/pull/8 for more info.
Expand Down Expand Up @@ -301,7 +302,12 @@ def stream_status(client, stream_name):
"Query": query,
"Format": "csv"
}
resp = client.rest_request("POST", endpoint, json=payload).json()

# With OAuth2, some entities that exist in Describe might not be available in the export
try:
resp = client.rest_request("POST", endpoint, json=payload).json()
except ApiException:
return "unavailable"

if resp["Success"]:
return "available"
Expand Down
61 changes: 52 additions & 9 deletions tap_zuora/client.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import requests

import pendulum
import singer


Expand Down Expand Up @@ -33,13 +33,16 @@ def __init__(self, resp):


class Client:
def __init__(self, username, password, partner_id, sandbox=False, european=False):
def __init__(self, username, password, partner_id, sandbox=False, european=False, use_oauth2=False):
self.username = username
self.password = password
self.sandbox = sandbox
self.european = european
self.partner_id = partner_id
self._session = requests.Session()
self.oauth2_token = None
self.use_oauth2 = use_oauth2
self.token_expiration_date = None

adapter = requests.adapters.HTTPAdapter(max_retries=1) # Try again in the case the TCP socket closes
self._session.mount('https://', adapter)
Expand All @@ -49,7 +52,8 @@ def from_config(config):
sandbox = config.get('sandbox', False) == 'true'
european = config.get('european', False) == 'true'
partner_id = config.get('partner_id', None)
return Client(config['username'], config['password'], partner_id, sandbox, european)
use_oauth2 = config.get('use_oauth2', False) == 'true'
return Client(config['username'], config['password'], partner_id, sandbox, european, use_oauth2)

def get_url(self, url, rest=False):
return URLS[(rest, self.sandbox, self.european)] + url
Expand All @@ -60,12 +64,18 @@ def aqua_auth(self):

@property
def rest_headers(self):
return {
'apiAccessKeyId': self.username,
'apiSecretAccessKey': self.password,
'X-Zuora-WSDL-Version': LATEST_WSDL_VERSION,
'Content-Type': 'application/json',
if self.use_oauth2:
return {
'Authorization': 'Bearer ' + self.oauth2_token['access_token'],
'Content-Type': 'application/json',
}
else:
return {
'apiAccessKeyId': self.username,
'apiSecretAccessKey': self.password,
'X-Zuora-WSDL-Version': LATEST_WSDL_VERSION,
'Content-Type': 'application/json',
}

def _request(self, method, url, stream=False, **kwargs):
req = requests.Request(method, url, **kwargs).prepare()
Expand All @@ -76,10 +86,43 @@ def _request(self, method, url, stream=False, **kwargs):

return resp

def is_auth_token_valid(self):
if self.oauth2_token and self.token_expiration_date and pendulum.utcnow().diff(self.token_expiration_date).in_seconds() > 60: # Allows at least one minute of breathing room
return True

return False

def ensure_valid_auth_token(self):
if not self.is_auth_token_valid():
self.oauth2_token = self.request_token()

def request_token(self):
url = self.get_url('oauth/token', rest=True)
payload = {
'client_id': self.username,
'client_secret': self.password,
'grant_type': 'client_credentials',
}

token = self._request('POST', url, data=payload).json()
self.token_expiration_date = pendulum.utcnow().add(seconds=token['expires_in'])

return token

def aqua_request(self, method, url, **kwargs):
if self.use_oauth2:
self.ensure_valid_auth_token()

url = self.get_url(url, rest=False)
return self._request(method, url, auth=self.aqua_auth, **kwargs)

if self.use_oauth2:
return self._request(method, url, headers=self.rest_headers, **kwargs)
else:
return self._request(method, url, auth=self.aqua_auth, **kwargs)

def rest_request(self, method, url, **kwargs):
if self.use_oauth2:
self.ensure_valid_auth_token()

url = self.get_url(url, rest=True)
return self._request(method, url, headers=self.rest_headers, **kwargs)