Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[TDL-15656] Added Support for dev mode #41

Open
wants to merge 24 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
cccfc1b
added support for dev-mode
somethingmorerelevant Sep 5, 2022
d586f25
added expiry time to config
somethingmorerelevant Sep 6, 2022
2581588
added unittest for devmode
somethingmorerelevant Sep 13, 2022
4ad0dac
added missing function docs
somethingmorerelevant Sep 14, 2022
1481ba4
added unit tests to circleCI
somethingmorerelevant Sep 14, 2022
e6ee98b
added dev dependencies
somethingmorerelevant Sep 14, 2022
9099b0b
update tap-tester image
somethingmorerelevant Sep 15, 2022
5b0690b
fixed pylint issue
somethingmorerelevant Sep 15, 2022
eddf3ba
fixed pylint issues
somethingmorerelevant Sep 15, 2022
2ae8cb3
fix review comments
somethingmorerelevant Sep 16, 2022
39c9754
updated dev mode implementation
somethingmorerelevant Oct 27, 2022
59c5b74
updated formatting
somethingmorerelevant Oct 27, 2022
330d464
fixed review issues
somethingmorerelevant Nov 10, 2022
264e0d0
updated singer_python version & fixed unit tests
somethingmorerelevant Nov 11, 2022
cb905b0
fixed unittests
somethingmorerelevant Nov 11, 2022
31de7b0
removed readconfig
somethingmorerelevant Nov 11, 2022
e02e8a1
removed unused import
somethingmorerelevant Nov 11, 2022
d75aac7
fixed linting issues
somethingmorerelevant Jan 5, 2023
53ff291
fixed cci
somethingmorerelevant Jan 23, 2023
5269472
fixed unit_test issue
somethingmorerelevant Jan 23, 2023
475dff9
fixed PR review comments
somethingmorerelevant Jan 23, 2023
6021e28
fixed pylint issue
somethingmorerelevant Jan 23, 2023
13bcef3
fix issue with unit test
somethingmorerelevant Jan 23, 2023
e1cbe5b
added setup function for UT
somethingmorerelevant Jan 23, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 22 additions & 5 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
version: 2
version: 2.1
jobs:
build:
docker:
- image: 218546966473.dkr.ecr.us-east-1.amazonaws.com/circle-ci:tap-tester
- image: 218546966473.dkr.ecr.us-east-1.amazonaws.com/circle-ci:tap-tester-v4
steps:
- checkout
- run:
Expand All @@ -12,12 +12,28 @@ jobs:
source /usr/local/share/virtualenvs/tap-eloqua/bin/activate
pip install -U 'pip<19.2' 'setuptools<51.0.0'
pip install .[dev]
- add_ssh_keys
- run:
name: 'pylint'
command: |
source /usr/local/share/virtualenvs/tap-eloqua/bin/activate
pylint tap_eloqua -d C,R,W
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we use the standard list of disables?

- run:
name: 'Unit Tests'
command: |
source /usr/local/share/virtualenvs/tap-eloqua/bin/activate
pip install nose coverage
nosetests --with-coverage --cover-erase --cover-package=tap_eloqua --cover-html-dir=htmlcov tests/unittests
coverage html
- store_test_results:
path: test_output/report.xml
- store_artifacts:
path: htmlcov
workflows:
version: 2
commit:
jobs:
- build
- build:
context: circleci-user
build_daily:
triggers:
- schedule:
Expand All @@ -27,4 +43,5 @@ workflows:
only:
- master
jobs:
- build
- build:
context: circleci-user
8 changes: 7 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -102,4 +102,10 @@ test.*.json
rsa-key
tags
singer-check-tap-data
state.json
state.json

catalog.json
get_catalog.py
persist/
configs/
current_state/
9 changes: 5 additions & 4 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,15 @@
classifiers=['Programming Language :: Python :: 3 :: Only'],
py_modules=['tap_eloqua'],
install_requires=[
'backoff==1.3.2',
'backoff==1.8.0',
'requests==2.20.1',
'pendulum==2.0.3',
'singer-python==5.2.0'
'singer-python==5.13.0'
],
extras_require={
'dev': [
'ipdb==0.11'
"dev": [
"pylint",
"ipdb",
]
},
entry_points='''
Expand Down
102 changes: 7 additions & 95 deletions tap_eloqua/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,7 @@

import sys
import json
import argparse

import singer
from singer import metadata

from tap_eloqua.client import EloquaClient
from tap_eloqua.discover import discover
from tap_eloqua.sync import sync
Expand All @@ -27,101 +23,17 @@ def do_discover(client):
json.dump(catalog.to_dict(), sys.stdout, indent=2)
LOGGER.info('Finished discover')

##### TEMP

from singer.catalog import Catalog

def check_config(config, required_keys):
missing_keys = [key for key in required_keys if key not in config]
if missing_keys:
raise Exception("Config is missing required keys: {}".format(missing_keys))

def load_json(path):
with open(path) as fil:
return json.load(fil)

def parse_args(required_config_keys):
'''Parse standard command-line args.

Parses the command-line arguments mentioned in the SPEC and the
BEST_PRACTICES documents:

-c,--config Config file
-s,--state State file
-d,--discover Run in discover mode
-p,--properties Properties file: DEPRECATED, please use --catalog instead
--catalog Catalog file

Returns the parsed args object from argparse. For each argument that
point to JSON files (config, state, properties), we will automatically
load and parse the JSON file.
'''
parser = argparse.ArgumentParser()

parser.add_argument(
'-c', '--config',
help='Config file',
required=True)

parser.add_argument(
'-s', '--state',
help='State file')

parser.add_argument(
'-p', '--properties',
help='Property selections: DEPRECATED, Please use --catalog instead')

parser.add_argument(
'--catalog',
help='Catalog file')

parser.add_argument(
'-d', '--discover',
action='store_true',
help='Do schema discovery')

args = parser.parse_args()
if args.config:
setattr(args, 'config_path', args.config)
args.config = load_json(args.config)
if args.state:
setattr(args, 'state_path', args.state)
args.state = load_json(args.state)
else:
args.state = {}
if args.properties:
setattr(args, 'properties_path', args.properties)
args.properties = load_json(args.properties)
if args.catalog:
setattr(args, 'catalog_path', args.catalog)
args.catalog = Catalog.load(args.catalog)

check_config(args.config, required_config_keys)

return args

#####

@singer.utils.handle_top_exception(LOGGER)
def main():
#parsed_args = singer.utils.parse_args(REQUIRED_CONFIG_KEYS)
parsed_args = parse_args(REQUIRED_CONFIG_KEYS)

with EloquaClient(parsed_args.config_path,
parsed_args.config['client_id'],
parsed_args.config['client_secret'],
parsed_args.config['refresh_token'],
parsed_args.config['redirect_uri'],
parsed_args.config.get('user_agent')) as client:

if parsed_args.discover:
args = singer.parse_args(REQUIRED_CONFIG_KEYS)
if args.dev:
LOGGER.warning("Executing Tap in Dev mode")
with EloquaClient(args.config_path, args.config, args.dev) as client:
if args.discover:
do_discover(client)
elif parsed_args.catalog:
sync(client,
parsed_args.catalog,
parsed_args.state,
parsed_args.config['start_date'],
int(parsed_args.config.get('bulk_page_size', 5000)))
elif args.catalog:
sync(client, args.catalog, args.state, args.config)

if __name__ == "__main__":
main()
71 changes: 37 additions & 34 deletions tap_eloqua/client.py
Original file line number Diff line number Diff line change
@@ -1,28 +1,28 @@
import json
from datetime import datetime, timedelta
from datetime import timedelta

import backoff
import requests
from requests.exceptions import ConnectionError
from singer import metrics
from singer import metrics,get_logger
from singer.utils import strptime_to_utc,now,strftime
from .utils import write_config
LOGGER = get_logger()


class Server5xxError(Exception):
pass


class EloquaClient(object):
def __init__(self,
config_path,
client_id,
client_secret,
refresh_token,
redirect_uri,
user_agent):
def __init__(self,config_path, config, dev_mode=False):
self.__config_path = config_path
self.__client_id = client_id
self.__client_secret = client_secret
self.__refresh_token = refresh_token
self.__redirect_uri = redirect_uri
self.__user_agent = user_agent
self.config = config
self.__client_id = config["client_id"]
self.__client_secret = config["client_secret"]
self.__refresh_token = config["refresh_token"]
self.__redirect_uri = config["redirect_uri"]
self.__user_agent = config.get("user_agent","")
self.dev_mode = dev_mode
self.__access_token = None
self.__expires = None
self.__session = requests.Session()
Expand All @@ -40,7 +40,16 @@ def __exit__(self, type, value, traceback):
max_tries=5,
factor=2)
def get_access_token(self):
if self.__access_token is not None and self.__expires > datetime.utcnow():
if self.dev_mode:
try:
self.__access_token = self.config['access_token']
self.__expires=strptime_to_utc(self.config['expires_in'])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
self.__expires=strptime_to_utc(self.config['expires_in'])
self.__expires = strptime_to_utc(self.config['expires_in'])

except KeyError as ex:
raise Exception("Unable to locate key in config") from ex
if not self.__access_token or self.__expires < now():
raise Exception("Access Token in config is expired, unable to authenticate in dev mode")

if self.__access_token and self.__expires > now():
return

headers = {}
Expand All @@ -63,31 +72,25 @@ def get_access_token(self):

if response.status_code != 200:
eloqua_response = response.json()
eloqua_response.update(
{'status': response.status_code})
raise Exception(
'Unable to authenticate (Eloqua response: `{}`)'.format(
eloqua_response))
eloqua_response.update({'status': response.status_code})
raise Exception('Unable to authenticate (Eloqua response: `{}`)'.format(eloqua_response))

data = response.json()

self.__access_token = data['access_token']
self.__refresh_token = data['refresh_token']
expires_in_seconds = data['expires_in'] - 10 # pad by 10 seconds
self.__expires = now() + timedelta(seconds=expires_in_seconds)

## refresh_token rotates on every reauth
with open(self.__config_path) as file:
config = json.load(file)
config['refresh_token'] = data['refresh_token']
with open(self.__config_path, 'w') as file:
json.dump(config, file, indent=2)

expires_seconds = data['expires_in'] - 10 # pad by 10 seconds
self.__expires = datetime.utcnow() + timedelta(seconds=expires_seconds)
if not self.dev_mode:
update_config_keys = {
"refresh_token":self.__refresh_token,
"access_token":self.__access_token,
"expires_in": strftime(self.__expires)
Comment on lines +86 to +88
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
"refresh_token":self.__refresh_token,
"access_token":self.__access_token,
"expires_in": strftime(self.__expires)
"refresh_token": self.__refresh_token,
"access_token": self.__access_token,
"expires_in": strftime(self.__expires)

}
self.config = write_config(self.__config_path,update_config_keys)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
self.config = write_config(self.__config_path,update_config_keys)
self.config = write_config(self.__config_path, update_config_keys)


def get_base_urls(self):
data = self.request('GET',
url='https://login.eloqua.com/id',
endpoint='base_url')
data = self.request('GET',url='https://login.eloqua.com/id',endpoint='base_url')
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
data = self.request('GET',url='https://login.eloqua.com/id',endpoint='base_url')
data = self.request('GET', url='https://login.eloqua.com/id', endpoint='base_url')

self.__base_url = data['urls']['base']

@backoff.on_exception(backoff.expo,
Expand Down
16 changes: 9 additions & 7 deletions tap_eloqua/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -173,8 +173,8 @@ def sync_bulk_obj(client, catalog, state, start_date, stream_name, bulk_page_siz
bulk_page_size,
last_date,
offset=last_offset)
except HTTPError as e:
if e.response.status_code in [404, 410]:
except HTTPError as ex:
if ex.response.status_code in [404, 410]:
LOGGER.info('{} - Previous export expired: {}'.format(stream_name, last_sync_id))
else:
raise
Expand Down Expand Up @@ -377,12 +377,12 @@ def sync_activity_stream(client,
activity_type):
finished = False
sync_start = pendulum.now('UTC')
end_date = sync_start
end_date, last_sync_date = sync_start, None
Copy link

@RushiT0122 RushiT0122 Jan 23, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are these changes in sync.py dev-mode related? If not can we keep it separate from dev-mode changes.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And if these are required changes then update the description on this PR.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@RushiT0122
These are not dev mode related changes, but if you see the code from Line:380 - Line:403
last_sync_date was declared on Line:385 in the try statement, but accessed outside its declared scope in the except block, hence it was introduced here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm pretty sure this is a problem in other languages, but not python

>>> for i in range(3):
...   try:
...     print("About to initialize `my_variable`")
...     my_variable = i
...     print(f"my_variable: {my_variable}")
...     raise RuntimeError(f"Problem occured with {my_variable}")
...   except Exception as err:
...     print(f"Caught {type(err)} with message {str(err)}")
...     print(f"Current value of my_variable {my_variable}")
...
About to initialize `my_variable`
my_variable: 0
Caught <class 'RuntimeError'> with message Problem occured with 0
Current value of my_variable 0
About to initialize `my_variable`
my_variable: 1
Caught <class 'RuntimeError'> with message Problem occured with 1
Current value of my_variable 1
About to initialize `my_variable`
my_variable: 2
Caught <class 'RuntimeError'> with message Problem occured with 2
Current value of my_variable 2

while not finished:
try:
# Get latest bookmark to adjust time window from, if needed
last_date_raw = get_bulk_bookmark(state, stream_name).get('datetime', start_date)
last_date = pendulum.parse(last_date_raw)
last_sync_date = pendulum.parse(last_date_raw)

update_current_stream(state, stream_name)
sync_bulk_obj(client,
Expand All @@ -399,12 +399,14 @@ def sync_activity_stream(client,
# If not done, sync again to now()
end_date = sync_start
except ActivityExportTooLarge as ex:
LOGGER.warn(ex)
end_date = last_date.add(seconds=(end_date - last_date).total_seconds() / 2)
LOGGER.warning(ex)
end_date = last_sync_date.add(seconds=(end_date - last_sync_date).total_seconds() / 2)
if end_date > sync_start:
end_date = sync_start

def sync(client, catalog, state, start_date, bulk_page_size):
def sync(client, catalog, state, config):
start_date = config['start_date']
bulk_page_size = int(config.get('bulk_page_size', 5000))
selected_streams = get_selected_streams(catalog)

if not selected_streams:
Expand Down
18 changes: 18 additions & 0 deletions tap_eloqua/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
from typing import Dict
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is unused

import json
from singer import get_logger

LOGGER = get_logger()


def write_config(config_path,data) :
"""
Updates the provided filepath with json format of the `data` object
does a safe write by performing a read before write, updates only specific keys, does not rewrite.
"""
with open(config_path,'r') as tap_config:
config = json.load(tap_config)
config.update(data)
with open(config_path,'w') as tap_config:
json.dump(config, tap_config, indent=2)
return config
Loading