Skip to content

Commit

Permalink
Change to yielding for IncrementalStreams (#47)
Browse files Browse the repository at this point in the history
* change to yielding

* cleanup

* migrate to circle 2
  • Loading branch information
KAllan357 authored Sep 12, 2018
1 parent f81e692 commit b8ffa9e
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 37 deletions.
21 changes: 21 additions & 0 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
version: 2
jobs:
build:
docker:
- image: circleci/python:3.5.6-jessie-browsers
steps:
- checkout
- run:
name: 'Setup virtual env'
command: |
virtualenv -p python3 ~/.virtualenvs/tap-facebook
source ~/.virtualenvs/tap-facebook/bin/activate
pip install .
pip install pylint
pylint tap_facebook -d C,R,W
- run:
name: 'Unit Tests'
command: |
source ~/.virtualenvs/tap-facebook/bin/activate
pip install nose
nosetests
11 changes: 0 additions & 11 deletions circle.yml

This file was deleted.

44 changes: 18 additions & 26 deletions tap_facebook/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -164,21 +164,22 @@ class IncrementalStream(Stream):
def __attrs_post_init__(self):
self.current_bookmark = get_start(self, UPDATED_TIME_KEY)

def _iterate(self, recordset, record_preparation):
def _iterate(self, generator, record_preparation):
max_bookmark = None
for record in recordset:
updated_at = pendulum.parse(record[UPDATED_TIME_KEY])
for recordset in generator:
for record in recordset:
updated_at = pendulum.parse(record[UPDATED_TIME_KEY])

if self.current_bookmark and self.current_bookmark >= updated_at:
continue
if not max_bookmark or updated_at > max_bookmark:
max_bookmark = updated_at
if self.current_bookmark and self.current_bookmark >= updated_at:
continue
if not max_bookmark or updated_at > max_bookmark:
max_bookmark = updated_at

record = record_preparation(record)
yield {'record': record}
record = record_preparation(record)
yield {'record': record}

if max_bookmark:
yield {'state': advance_bookmark(self, UPDATED_TIME_KEY, str(max_bookmark))}
if max_bookmark:
yield {'state': advance_bookmark(self, UPDATED_TIME_KEY, str(max_bookmark))}

class AdCreative(Stream):
'''
Expand Down Expand Up @@ -212,21 +213,18 @@ def do_request():
params = {'limit': RESULT_RETURN_LIMIT}
if self.current_bookmark:
params.update({'filtering': [{'field': 'ad.' + UPDATED_TIME_KEY, 'operator': 'GREATER_THAN', 'value': self.current_bookmark.int_timestamp}]})
ads = self.account.get_ads(fields=self.automatic_fields(), params=params) # pylint: disable=no-member
return ads
yield self.account.get_ads(fields=self.automatic_fields(), params=params) # pylint: disable=no-member

@retry_pattern(backoff.expo, FacebookRequestError, max_tries=5, factor=5)
def do_request_multiple():
params = {'limit': RESULT_RETURN_LIMIT}
bookmark_params = []
if self.current_bookmark:
bookmark_params.append({'field': 'ad.' + UPDATED_TIME_KEY, 'operator': 'GREATER_THAN', 'value': self.current_bookmark.int_timestamp})
ads = []
for del_info_filt in iter_delivery_info_filter('ad'):
params.update({'filtering': [del_info_filt] + bookmark_params})
filt_ads = self.account.get_ads(fields=self.automatic_fields(), params=params) # pylint: disable=no-member
ads.extend(filt_ads)
return ads
yield filt_ads

def prepare_record(ad):
return ad.remote_read(fields=self.fields()).export_all_data()
Expand All @@ -253,21 +251,18 @@ def do_request():
params = {'limit': RESULT_RETURN_LIMIT}
if self.current_bookmark:
params.update({'filtering': [{'field': 'adset.' + UPDATED_TIME_KEY, 'operator': 'GREATER_THAN', 'value': self.current_bookmark.int_timestamp}]})
adsets = self.account.get_ad_sets(fields=self.automatic_fields(), params=params) # pylint: disable=no-member
return adsets
yield self.account.get_ad_sets(fields=self.automatic_fields(), params=params) # pylint: disable=no-member

@retry_pattern(backoff.expo, FacebookRequestError, max_tries=5, factor=5)
def do_request_multiple():
params = {'limit': RESULT_RETURN_LIMIT}
bookmark_params = []
if self.current_bookmark:
bookmark_params.append({'field': 'adset.' + UPDATED_TIME_KEY, 'operator': 'GREATER_THAN', 'value': self.current_bookmark.int_timestamp})
adsets = []
for del_info_filt in iter_delivery_info_filter('adset'):
params.update({'filtering': [del_info_filt] + bookmark_params})
filt_adsets = self.account.get_ad_sets(fields=self.automatic_fields(), params=params) # pylint: disable=no-member
adsets.extend(filt_adsets)
return adsets
yield filt_adsets

def prepare_record(ad_set):
return ad_set.remote_read(fields=self.fields()).export_all_data()
Expand Down Expand Up @@ -295,21 +290,18 @@ def do_request():
params = {'limit': RESULT_RETURN_LIMIT}
if self.current_bookmark:
params.update({'filtering': [{'field': 'campaign.' + UPDATED_TIME_KEY, 'operator': 'GREATER_THAN', 'value': self.current_bookmark.int_timestamp}]})
campaigns = self.account.get_campaigns(fields=self.automatic_fields(), params=params) # pylint: disable=no-member
return campaigns
yield self.account.get_campaigns(fields=self.automatic_fields(), params=params) # pylint: disable=no-member

@retry_pattern(backoff.expo, FacebookRequestError, max_tries=5, factor=5)
def do_request_multiple():
params = {'limit': RESULT_RETURN_LIMIT}
bookmark_params = []
if self.current_bookmark:
bookmark_params.append({'field': 'campaign.' + UPDATED_TIME_KEY, 'operator': 'GREATER_THAN', 'value': self.current_bookmark.int_timestamp})
campaigns = []
for del_info_filt in iter_delivery_info_filter('campaign'):
params.update({'filtering': [del_info_filt] + bookmark_params})
filt_campaigns = self.account.get_campaigns(fields=self.automatic_fields(), params=params) # pylint: disable=no-member
campaigns.extend(filt_campaigns)
return campaigns
yield filt_campaigns

def prepare_record(campaign):
campaign_out = campaign.remote_read(fields=fields).export_all_data()
Expand Down

0 comments on commit b8ffa9e

Please sign in to comment.