-
Notifications
You must be signed in to change notification settings - Fork 95
/
__init__.py
1167 lines (947 loc) · 47.8 KB
/
__init__.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
#!/usr/bin/env python3
import datetime
import pytz
import itertools
import os
import re
import sys
import json
# pylint: disable=import-error
import attr
import backoff
import requests
import singer
import singer.messages
from singer import metrics
from singer import metadata
from singer import utils
from singer import (transform,
UNIX_MILLISECONDS_INTEGER_DATETIME_PARSING,
Transformer, _transform_datetime)
LOGGER = singer.get_logger()
SESSION = requests.Session()
REQUEST_TIMEOUT = 300
class InvalidAuthException(Exception):
pass
class SourceUnavailableException(Exception):
pass
class DependencyException(Exception):
pass
class DataFields:
offset = 'offset'
class StateFields:
offset = 'offset'
this_stream = 'this_stream'
BASE_URL = "https://api.hubapi.com"
CONTACTS_BY_COMPANY = "contacts_by_company"
DEFAULT_CHUNK_SIZE = 1000 * 60 * 60 * 24
V3_PREFIXES = {'hs_date_entered', 'hs_date_exited', 'hs_time_in'}
CONFIG = {
"access_token": None,
"token_expires": None,
"email_chunk_size": DEFAULT_CHUNK_SIZE,
"subscription_chunk_size": DEFAULT_CHUNK_SIZE,
# in config.json
"redirect_uri": None,
"client_id": None,
"client_secret": None,
"refresh_token": None,
"start_date": None,
"hapikey": None,
"include_inactives": None,
}
ENDPOINTS = {
"contacts_properties": "/properties/v1/contacts/properties",
"contacts_all": "/contacts/v1/lists/all/contacts/all",
"contacts_recent": "/contacts/v1/lists/recently_updated/contacts/recent",
"contacts_detail": "/contacts/v1/contact/vids/batch/",
"companies_properties": "/companies/v2/properties",
"companies_all": "/companies/v2/companies/paged",
"companies_recent": "/companies/v2/companies/recent/modified",
"companies_detail": "/companies/v2/companies/{company_id}",
"contacts_by_company": "/companies/v2/companies/{company_id}/vids",
"deals_properties": "/properties/v1/deals/properties",
"deals_all": "/deals/v1/deal/paged",
"deals_recent": "/deals/v1/deal/recent/modified",
"deals_detail": "/deals/v1/deal/{deal_id}",
"deals_v3_batch_read": "/crm/v3/objects/deals/batch/read",
"deals_v3_properties": "/crm/v3/properties/deals",
"deal_pipelines": "/deals/v1/pipelines",
"campaigns_all": "/email/public/v1/campaigns/by-id",
"campaigns_detail": "/email/public/v1/campaigns/{campaign_id}",
"engagements_all": "/engagements/v1/engagements/paged",
"subscription_changes": "/email/public/v1/subscriptions/timeline",
"email_events": "/email/public/v1/events",
"contact_lists": "/contacts/v1/lists",
"forms": "/forms/v2/forms",
"workflows": "/automation/v3/workflows",
"owners": "/owners/v2/owners",
}
def get_start(state, tap_stream_id, bookmark_key, older_bookmark_key=None):
"""
If the current bookmark_key is available in the state, then return the bookmark_key value.
If it is not available then check and return the older_bookmark_key in the state for the existing connection.
If none of the keys are available in the state for a particular stream, then return start_date.
We have made this change because of an update in the replication key of the deals stream.
So, if any existing connections have only older_bookmark_key in the state then tap should utilize that bookmark value.
Then next time, the tap should use the current bookmark value.
"""
current_bookmark = singer.get_bookmark(state, tap_stream_id, bookmark_key)
if current_bookmark is None:
if older_bookmark_key:
previous_bookmark = singer.get_bookmark(state, tap_stream_id, older_bookmark_key)
if previous_bookmark:
return previous_bookmark
return CONFIG['start_date']
return current_bookmark
def get_current_sync_start(state, tap_stream_id):
current_sync_start_value = singer.get_bookmark(state, tap_stream_id, "current_sync_start")
if current_sync_start_value is None:
return current_sync_start_value
return utils.strptime_to_utc(current_sync_start_value)
def write_current_sync_start(state, tap_stream_id, start):
value = start
if start is not None:
value = utils.strftime(start)
return singer.write_bookmark(state, tap_stream_id, "current_sync_start", value)
def clean_state(state):
""" Clear deprecated keys out of state. """
for stream, bookmark_map in state.get("bookmarks", {}).items():
if "last_sync_duration" in bookmark_map:
LOGGER.info("%s - Removing last_sync_duration from state.", stream)
state["bookmarks"][stream].pop("last_sync_duration", None)
def get_url(endpoint, **kwargs):
if endpoint not in ENDPOINTS:
raise ValueError("Invalid endpoint {}".format(endpoint))
return BASE_URL + ENDPOINTS[endpoint].format(**kwargs)
def get_field_type_schema(field_type):
if field_type == "bool":
return {"type": ["null", "boolean"]}
elif field_type == "datetime":
return {"type": ["null", "string"],
"format": "date-time"}
elif field_type == "number":
# A value like 'N/A' can be returned for this type,
# so we have to let this be a string sometimes
return {"type": ["null", "number", "string"]}
else:
return {"type": ["null", "string"]}
def get_field_schema(field_type, extras=False):
if extras:
return {
"type": "object",
"properties": {
"value": get_field_type_schema(field_type),
"timestamp": get_field_type_schema("datetime"),
"source": get_field_type_schema("string"),
"sourceId": get_field_type_schema("string"),
}
}
else:
return {
"type": "object",
"properties": {
"value": get_field_type_schema(field_type),
}
}
def parse_custom_schema(entity_name, data):
return {
field['name']: get_field_schema(field['type'], entity_name != 'contacts')
for field in data
}
def get_custom_schema(entity_name):
return parse_custom_schema(entity_name, request(get_url(entity_name + "_properties")).json())
def get_v3_schema(entity_name):
url = get_url("deals_v3_properties")
return parse_custom_schema(entity_name, request(url).json()['results'])
def get_abs_path(path):
return os.path.join(os.path.dirname(os.path.realpath(__file__)), path)
def load_associated_company_schema():
associated_company_schema = load_schema("companies")
#pylint: disable=line-too-long
associated_company_schema['properties']['company-id'] = associated_company_schema['properties'].pop('companyId')
associated_company_schema['properties']['portal-id'] = associated_company_schema['properties'].pop('portalId')
return associated_company_schema
def load_schema(entity_name):
schema = utils.load_json(get_abs_path('schemas/{}.json'.format(entity_name)))
if entity_name in ["contacts", "companies", "deals"]:
custom_schema = get_custom_schema(entity_name)
schema['properties']['properties'] = {
"type": "object",
"properties": custom_schema,
}
if entity_name in ["deals"]:
v3_schema = get_v3_schema(entity_name)
for key, value in v3_schema.items():
if any(prefix in key for prefix in V3_PREFIXES):
custom_schema[key] = value
# Move properties to top level
custom_schema_top_level = {'property_{}'.format(k): v for k, v in custom_schema.items()}
schema['properties'].update(custom_schema_top_level)
# Make properties_versions selectable and share the same schema.
versions_schema = utils.load_json(get_abs_path('schemas/versions.json'))
schema['properties']['properties_versions'] = versions_schema
if entity_name == "contacts":
schema['properties']['associated-company'] = load_associated_company_schema()
return schema
#pylint: disable=invalid-name
def acquire_access_token_from_refresh_token():
payload = {
"grant_type": "refresh_token",
"redirect_uri": CONFIG['redirect_uri'],
"refresh_token": CONFIG['refresh_token'],
"client_id": CONFIG['client_id'],
"client_secret": CONFIG['client_secret'],
}
resp = requests.post(BASE_URL + "/oauth/v1/token", data=payload, timeout=get_request_timeout())
if resp.status_code == 403:
raise InvalidAuthException(resp.content)
resp.raise_for_status()
auth = resp.json()
CONFIG['access_token'] = auth['access_token']
CONFIG['refresh_token'] = auth['refresh_token']
CONFIG['token_expires'] = (
datetime.datetime.utcnow() +
datetime.timedelta(seconds=auth['expires_in'] - 600))
LOGGER.info("Token refreshed. Expires at %s", CONFIG['token_expires'])
def giveup(exc):
return exc.response is not None \
and 400 <= exc.response.status_code < 500 \
and exc.response.status_code != 429
def on_giveup(details):
if len(details['args']) == 2:
url, params = details['args']
else:
url = details['args']
params = {}
raise Exception("Giving up on request after {} tries with url {} and params {}" \
.format(details['tries'], url, params))
URL_SOURCE_RE = re.compile(BASE_URL + r'/(\w+)/')
def parse_source_from_url(url):
match = URL_SOURCE_RE.match(url)
if match:
return match.group(1)
return None
def get_params_and_headers(params):
"""
This function makes a params object and headers object based on the
authentication values available. If there is an `hapikey` in the config, we
need that in `params` and not in the `headers`. Otherwise, we need to get an
`access_token` to put in the `headers` and not in the `params`
"""
params = params or {}
hapikey = CONFIG['hapikey']
if hapikey is None:
if CONFIG['token_expires'] is None or CONFIG['token_expires'] < datetime.datetime.utcnow():
acquire_access_token_from_refresh_token()
headers = {'Authorization': 'Bearer {}'.format(CONFIG['access_token'])}
else:
params['hapikey'] = hapikey
headers = {}
if 'user_agent' in CONFIG:
headers['User-Agent'] = CONFIG['user_agent']
return params, headers
# backoff for Timeout error is already included in "requests.exceptions.RequestException"
# as it is a parent class of "Timeout" error
@backoff.on_exception(backoff.constant,
(requests.exceptions.RequestException,
requests.exceptions.HTTPError),
max_tries=5,
jitter=None,
giveup=giveup,
on_giveup=on_giveup,
interval=10)
def request(url, params=None):
params, headers = get_params_and_headers(params)
req = requests.Request('GET', url, params=params, headers=headers).prepare()
LOGGER.info("GET %s", req.url)
with metrics.http_request_timer(parse_source_from_url(url)) as timer:
resp = SESSION.send(req, timeout=get_request_timeout())
timer.tags[metrics.Tag.http_status_code] = resp.status_code
if resp.status_code == 403:
raise SourceUnavailableException(resp.content)
else:
resp.raise_for_status()
return resp
# {"bookmarks" : {"contacts" : { "lastmodifieddate" : "2001-01-01"
# "offset" : {"vidOffset": 1234
# "timeOffset": "3434434 }}
# "users" : { "timestamp" : "2001-01-01"}}
# "currently_syncing" : "contacts"
# }
# }
def lift_properties_and_versions(record):
for key, value in record.get('properties', {}).items():
computed_key = "property_{}".format(key)
versions = value.get('versions')
record[computed_key] = value
if versions:
if not record.get('properties_versions'):
record['properties_versions'] = []
record['properties_versions'] += versions
return record
# backoff for Timeout error is already included in "requests.exceptions.RequestException"
# as it is a parent class of "Timeout" error
@backoff.on_exception(backoff.constant,
(requests.exceptions.RequestException,
requests.exceptions.HTTPError),
max_tries=5,
jitter=None,
giveup=giveup,
on_giveup=on_giveup,
interval=10)
def post_search_endpoint(url, data, params=None):
params, headers = get_params_and_headers(params)
headers['content-type'] = "application/json"
with metrics.http_request_timer(url) as _:
resp = requests.post(
url=url,
json=data,
params=params,
timeout=get_request_timeout(),
headers=headers
)
resp.raise_for_status()
return resp
def merge_responses(v1_data, v3_data):
for v1_record in v1_data:
v1_id = v1_record.get('dealId')
for v3_record in v3_data:
v3_id = v3_record.get('id')
if str(v1_id) == v3_id:
v1_record['properties'] = {**v1_record['properties'],
**v3_record['properties']}
def process_v3_deals_records(v3_data):
"""
This function:
1. filters out fields that don't contain 'hs_date_entered_*' and
'hs_date_exited_*'
2. changes a key value pair in `properties` to a key paired to an
object with a key 'value' and the original value
"""
transformed_v3_data = []
for record in v3_data:
new_properties = {field_name : {'value': field_value}
for field_name, field_value in record['properties'].items()
if any(prefix in field_name for prefix in V3_PREFIXES)}
transformed_v3_data.append({**record, 'properties' : new_properties})
return transformed_v3_data
def get_v3_deals(v3_fields, v1_data):
v1_ids = [{'id': str(record['dealId'])} for record in v1_data]
# Sending the first v3_field is enough to get them all
v3_body = {'inputs': v1_ids,
'properties': [v3_fields[0]],}
v3_url = get_url('deals_v3_batch_read')
v3_resp = post_search_endpoint(v3_url, v3_body)
return v3_resp.json()['results']
#pylint: disable=line-too-long
def gen_request(STATE, tap_stream_id, url, params, path, more_key, offset_keys, offset_targets, v3_fields=None):
if len(offset_keys) != len(offset_targets):
raise ValueError("Number of offset_keys must match number of offset_targets")
if singer.get_offset(STATE, tap_stream_id):
params.update(singer.get_offset(STATE, tap_stream_id))
with metrics.record_counter(tap_stream_id) as counter:
while True:
data = request(url, params).json()
if data.get(path) is None:
raise RuntimeError("Unexpected API response: {} not in {}".format(path, data.keys()))
if v3_fields:
v3_data = get_v3_deals(v3_fields, data[path])
# The shape of v3_data is different than the V1 response,
# so we transform v3 to look like v1
transformed_v3_data = process_v3_deals_records(v3_data)
merge_responses(data[path], transformed_v3_data)
for row in data[path]:
counter.increment()
yield row
if not data.get(more_key, False):
break
STATE = singer.clear_offset(STATE, tap_stream_id)
for key, target in zip(offset_keys, offset_targets):
if key in data:
params[target] = data[key]
STATE = singer.set_offset(STATE, tap_stream_id, target, data[key])
singer.write_state(STATE)
STATE = singer.clear_offset(STATE, tap_stream_id)
singer.write_state(STATE)
def _sync_contact_vids(catalog, vids, schema, bumble_bee, bookmark_values, bookmark_key):
if len(vids) == 0:
return
data = request(get_url("contacts_detail"), params={'vid': vids, 'showListMemberships' : True, "formSubmissionMode" : "all"}).json()
time_extracted = utils.now()
mdata = metadata.to_map(catalog.get('metadata'))
for record in data.values():
# Explicitly add the bookmark field "versionTimestamp" and its value in the record.
record[bookmark_key] = bookmark_values.get(record.get("vid"))
record = bumble_bee.transform(lift_properties_and_versions(record), schema, mdata)
singer.write_record("contacts", record, catalog.get('stream_alias'), time_extracted=time_extracted)
default_contact_params = {
'showListMemberships': True,
'includeVersion': True,
'count': 100,
}
def sync_contacts(STATE, ctx):
catalog = ctx.get_catalog_from_id(singer.get_currently_syncing(STATE))
bookmark_key = 'versionTimestamp'
start = utils.strptime_with_tz(get_start(STATE, "contacts", bookmark_key))
LOGGER.info("sync_contacts from %s", start)
max_bk_value = start
schema = load_schema("contacts")
singer.write_schema("contacts", schema, ["vid"], [bookmark_key], catalog.get('stream_alias'))
url = get_url("contacts_all")
vids = []
# Dict to store replication key value for each contact record
bookmark_values = {}
with Transformer(UNIX_MILLISECONDS_INTEGER_DATETIME_PARSING) as bumble_bee:
for row in gen_request(STATE, 'contacts', url, default_contact_params, 'contacts', 'has-more', ['vid-offset'], ['vidOffset']):
modified_time = None
if bookmark_key in row:
modified_time = utils.strptime_with_tz(
_transform_datetime( # pylint: disable=protected-access
row[bookmark_key],
UNIX_MILLISECONDS_INTEGER_DATETIME_PARSING))
if not modified_time or modified_time >= start:
vids.append(row['vid'])
# Adding replication key value in `bookmark_values` dict
# Here, key is vid(primary key) and value is replication key value.
bookmark_values[row['vid']] = utils.strftime(modified_time)
if modified_time and modified_time >= max_bk_value:
max_bk_value = modified_time
if len(vids) == 100:
_sync_contact_vids(catalog, vids, schema, bumble_bee, bookmark_values, bookmark_key)
vids = []
_sync_contact_vids(catalog, vids, schema, bumble_bee, bookmark_values, bookmark_key)
STATE = singer.write_bookmark(STATE, 'contacts', bookmark_key, utils.strftime(max_bk_value))
singer.write_state(STATE)
return STATE
class ValidationPredFailed(Exception):
pass
# companies_recent only supports 10,000 results. If there are more than this,
# we'll need to use the companies_all endpoint
def use_recent_companies_endpoint(response):
return response["total"] < 10000
default_contacts_by_company_params = {'count' : 100}
# NB> to do: support stream aliasing and field selection
def _sync_contacts_by_company(STATE, ctx, company_id):
schema = load_schema(CONTACTS_BY_COMPANY)
catalog = ctx.get_catalog_from_id(singer.get_currently_syncing(STATE))
mdata = metadata.to_map(catalog.get('metadata'))
url = get_url("contacts_by_company", company_id=company_id)
path = 'vids'
with Transformer(UNIX_MILLISECONDS_INTEGER_DATETIME_PARSING) as bumble_bee:
with metrics.record_counter(CONTACTS_BY_COMPANY) as counter:
data = request(url, default_contacts_by_company_params).json()
if data.get(path) is None:
raise RuntimeError("Unexpected API response: {} not in {}".format(path, data.keys()))
for row in data[path]:
counter.increment()
record = {'company-id' : company_id,
'contact-id' : row}
record = bumble_bee.transform(lift_properties_and_versions(record), schema, mdata)
singer.write_record("contacts_by_company", record, time_extracted=utils.now())
return STATE
default_company_params = {
'limit': 250, 'properties': ["createdate", "hs_lastmodifieddate"]
}
def sync_companies(STATE, ctx):
catalog = ctx.get_catalog_from_id(singer.get_currently_syncing(STATE))
mdata = metadata.to_map(catalog.get('metadata'))
bumble_bee = Transformer(UNIX_MILLISECONDS_INTEGER_DATETIME_PARSING)
bookmark_key = 'property_hs_lastmodifieddate'
bookmark_field_in_record = 'hs_lastmodifieddate'
start = utils.strptime_to_utc(get_start(STATE, "companies", bookmark_key, older_bookmark_key=bookmark_field_in_record))
LOGGER.info("sync_companies from %s", start)
schema = load_schema('companies')
singer.write_schema("companies", schema, ["companyId"], [bookmark_key], catalog.get('stream_alias'))
# Because this stream doesn't query by `lastUpdated`, it cycles
# through the data set every time. The issue with this is that there
# is a race condition by which records may be updated between the
# start of this table's sync and the end, causing some updates to not
# be captured, in order to combat this, we must store the current
# sync's start in the state and not move the bookmark past this value.
current_sync_start = get_current_sync_start(STATE, "companies") or utils.now()
STATE = write_current_sync_start(STATE, "companies", current_sync_start)
singer.write_state(STATE)
url = get_url("companies_all")
max_bk_value = start
if CONTACTS_BY_COMPANY in ctx.selected_stream_ids:
contacts_by_company_schema = load_schema(CONTACTS_BY_COMPANY)
singer.write_schema("contacts_by_company", contacts_by_company_schema, ["company-id", "contact-id"])
with bumble_bee:
for row in gen_request(STATE, 'companies', url, default_company_params, 'companies', 'has-more', ['offset'], ['offset']):
row_properties = row['properties']
modified_time = None
if bookmark_field_in_record in row_properties:
# Hubspot returns timestamps in millis
timestamp_millis = row_properties[bookmark_field_in_record]['timestamp'] / 1000.0
modified_time = datetime.datetime.fromtimestamp(timestamp_millis, datetime.timezone.utc)
elif 'createdate' in row_properties:
# Hubspot returns timestamps in millis
timestamp_millis = row_properties['createdate']['timestamp'] / 1000.0
modified_time = datetime.datetime.fromtimestamp(timestamp_millis, datetime.timezone.utc)
if modified_time and modified_time >= max_bk_value:
max_bk_value = modified_time
if not modified_time or modified_time >= start:
record = request(get_url("companies_detail", company_id=row['companyId'])).json()
record = bumble_bee.transform(lift_properties_and_versions(record), schema, mdata)
singer.write_record("companies", record, catalog.get('stream_alias'), time_extracted=utils.now())
if CONTACTS_BY_COMPANY in ctx.selected_stream_ids:
STATE = _sync_contacts_by_company(STATE, ctx, record['companyId'])
# Don't bookmark past the start of this sync to account for updated records during the sync.
new_bookmark = min(max_bk_value, current_sync_start)
STATE = singer.write_bookmark(STATE, 'companies', bookmark_key, utils.strftime(new_bookmark))
STATE = write_current_sync_start(STATE, 'companies', None)
singer.write_state(STATE)
return STATE
def has_selected_custom_field(mdata):
top_level_custom_props = [x for x in mdata if len(x) == 2 and 'property_' in x[1]]
for prop in top_level_custom_props:
# Return 'True' if the custom field is automatic.
if (mdata.get(prop, {}).get('selected') is True) or (mdata.get(prop, {}).get('inclusion') == "automatic"):
return True
return False
def sync_deals(STATE, ctx):
catalog = ctx.get_catalog_from_id(singer.get_currently_syncing(STATE))
mdata = metadata.to_map(catalog.get('metadata'))
bookmark_key = 'property_hs_lastmodifieddate'
# The Bookmark field('hs_lastmodifieddate') available in the record is different from
# the tap's bookmark key(property_hs_lastmodifieddate).
# `hs_lastmodifieddate` is available in the properties field at the nested level.
# As `hs_lastmodifieddate` is not available at the 1st level it can not be marked as automatic inclusion.
# tap includes all nested fields of the properties field as custom fields in the schema by appending the
# prefix `property_` along with each field.
# That's why bookmark_key is `property_hs_lastmodifieddate` so that we can mark it as automatic inclusion.
last_modified_date = 'hs_lastmodifieddate'
# Tap was used to write bookmark using replication key `hs_lastmodifieddate`.
# Now, as the replication key gets changed to "property_hs_lastmodifieddate", `get_start` function would return
# bookmark value of older bookmark key(`hs_lastmodifieddate`) if it is available.
# So, here `older_bookmark_key` is the previous bookmark key that may be available in the state of
# the existing connection.
start = utils.strptime_with_tz(get_start(STATE, "deals", bookmark_key, older_bookmark_key=last_modified_date))
max_bk_value = start
LOGGER.info("sync_deals from %s", start)
params = {'limit': 100,
'includeAssociations': False,
'properties' : []}
schema = load_schema("deals")
singer.write_schema("deals", schema, ["dealId"], [bookmark_key], catalog.get('stream_alias'))
# Check if we should include associations
for key in mdata.keys():
if 'associations' in key:
assoc_mdata = mdata.get(key)
if (assoc_mdata.get('selected') and assoc_mdata.get('selected') is True):
params['includeAssociations'] = True
v3_fields = None
has_selected_properties = mdata.get(('properties', 'properties'), {}).get('selected')
if has_selected_properties or has_selected_custom_field(mdata):
# On 2/12/20, hubspot added a lot of additional properties for
# deals, and appending all of them to requests ended up leading to
# 414 (url-too-long) errors. Hubspot recommended we use the
# `includeAllProperties` and `allpropertiesFetchMode` params
# instead.
params['includeAllProperties'] = True
params['allPropertiesFetchMode'] = 'latest_version'
# Grab selected `hs_date_entered/exited` fields to call the v3 endpoint with
v3_fields = [breadcrumb[1].replace('property_', '')
for breadcrumb, mdata_map in mdata.items()
if breadcrumb
and (mdata_map.get('selected') is True or has_selected_properties)
and any(prefix in breadcrumb[1] for prefix in V3_PREFIXES)]
url = get_url('deals_all')
with Transformer(UNIX_MILLISECONDS_INTEGER_DATETIME_PARSING) as bumble_bee:
for row in gen_request(STATE, 'deals', url, params, 'deals', "hasMore", ["offset"], ["offset"], v3_fields=v3_fields):
row_properties = row['properties']
modified_time = None
if last_modified_date in row_properties:
# Hubspot returns timestamps in millis
timestamp_millis = row_properties[last_modified_date]['timestamp'] / 1000.0
modified_time = datetime.datetime.fromtimestamp(timestamp_millis, datetime.timezone.utc)
elif 'createdate' in row_properties:
# Hubspot returns timestamps in millis
timestamp_millis = row_properties['createdate']['timestamp'] / 1000.0
modified_time = datetime.datetime.fromtimestamp(timestamp_millis, datetime.timezone.utc)
if modified_time and modified_time >= max_bk_value:
max_bk_value = modified_time
if not modified_time or modified_time >= start:
record = bumble_bee.transform(lift_properties_and_versions(row), schema, mdata)
singer.write_record("deals", record, catalog.get('stream_alias'), time_extracted=utils.now())
STATE = singer.write_bookmark(STATE, 'deals', bookmark_key, utils.strftime(max_bk_value))
singer.write_state(STATE)
return STATE
#NB> no suitable bookmark is available: https://developers.hubspot.com/docs/methods/email/get_campaigns_by_id
def sync_campaigns(STATE, ctx):
catalog = ctx.get_catalog_from_id(singer.get_currently_syncing(STATE))
mdata = metadata.to_map(catalog.get('metadata'))
schema = load_schema("campaigns")
singer.write_schema("campaigns", schema, ["id"], catalog.get('stream_alias'))
LOGGER.info("sync_campaigns(NO bookmarks)")
url = get_url("campaigns_all")
params = {'limit': 500}
with Transformer(UNIX_MILLISECONDS_INTEGER_DATETIME_PARSING) as bumble_bee:
for row in gen_request(STATE, 'campaigns', url, params, "campaigns", "hasMore", ["offset"], ["offset"]):
record = request(get_url("campaigns_detail", campaign_id=row['id'])).json()
record = bumble_bee.transform(lift_properties_and_versions(record), schema, mdata)
singer.write_record("campaigns", record, catalog.get('stream_alias'), time_extracted=utils.now())
return STATE
def sync_entity_chunked(STATE, catalog, entity_name, key_properties, path):
schema = load_schema(entity_name)
bookmark_key = 'startTimestamp'
singer.write_schema(entity_name, schema, key_properties, [bookmark_key], catalog.get('stream_alias'))
start = get_start(STATE, entity_name, bookmark_key)
LOGGER.info("sync_%s from %s", entity_name, start)
now = datetime.datetime.utcnow().replace(tzinfo=pytz.UTC)
now_ts = int(now.timestamp() * 1000)
start_ts = int(utils.strptime_with_tz(start).timestamp() * 1000)
url = get_url(entity_name)
mdata = metadata.to_map(catalog.get('metadata'))
if entity_name == 'email_events':
window_size = int(CONFIG['email_chunk_size'])
elif entity_name == 'subscription_changes':
window_size = int(CONFIG['subscription_chunk_size'])
with metrics.record_counter(entity_name) as counter:
while start_ts < now_ts:
end_ts = start_ts + window_size
params = {
'startTimestamp': start_ts,
'endTimestamp': end_ts,
'limit': 1000,
}
with Transformer(UNIX_MILLISECONDS_INTEGER_DATETIME_PARSING) as bumble_bee:
while True:
our_offset = singer.get_offset(STATE, entity_name)
if bool(our_offset) and our_offset.get('offset') is not None:
params[StateFields.offset] = our_offset.get('offset')
data = request(url, params).json()
time_extracted = utils.now()
if data.get(path) is None:
raise RuntimeError("Unexpected API response: {} not in {}".format(path, data.keys()))
for row in data[path]:
counter.increment()
record = bumble_bee.transform(lift_properties_and_versions(row), schema, mdata)
singer.write_record(entity_name,
record,
catalog.get('stream_alias'),
time_extracted=time_extracted)
if data.get('hasMore'):
STATE = singer.set_offset(STATE, entity_name, 'offset', data['offset'])
singer.write_state(STATE)
else:
STATE = singer.clear_offset(STATE, entity_name)
singer.write_state(STATE)
break
STATE = singer.write_bookmark(STATE, entity_name, 'startTimestamp', utils.strftime(datetime.datetime.fromtimestamp((start_ts / 1000), datetime.timezone.utc))) # pylint: disable=line-too-long
singer.write_state(STATE)
start_ts = end_ts
STATE = singer.clear_offset(STATE, entity_name)
singer.write_state(STATE)
return STATE
def sync_subscription_changes(STATE, ctx):
catalog = ctx.get_catalog_from_id(singer.get_currently_syncing(STATE))
STATE = sync_entity_chunked(STATE, catalog, "subscription_changes", ["timestamp", "portalId", "recipient"],
"timeline")
return STATE
def sync_email_events(STATE, ctx):
catalog = ctx.get_catalog_from_id(singer.get_currently_syncing(STATE))
STATE = sync_entity_chunked(STATE, catalog, "email_events", ["id"], "events")
return STATE
def sync_contact_lists(STATE, ctx):
catalog = ctx.get_catalog_from_id(singer.get_currently_syncing(STATE))
mdata = metadata.to_map(catalog.get('metadata'))
schema = load_schema("contact_lists")
bookmark_key = 'updatedAt'
singer.write_schema("contact_lists", schema, ["listId"], [bookmark_key], catalog.get('stream_alias'))
start = get_start(STATE, "contact_lists", bookmark_key)
max_bk_value = start
LOGGER.info("sync_contact_lists from %s", start)
url = get_url("contact_lists")
params = {'count': 250}
with Transformer(UNIX_MILLISECONDS_INTEGER_DATETIME_PARSING) as bumble_bee:
for row in gen_request(STATE, 'contact_lists', url, params, "lists", "has-more", ["offset"], ["offset"]):
record = bumble_bee.transform(lift_properties_and_versions(row), schema, mdata)
if record[bookmark_key] >= start:
singer.write_record("contact_lists", record, catalog.get('stream_alias'), time_extracted=utils.now())
if record[bookmark_key] >= max_bk_value:
max_bk_value = record[bookmark_key]
STATE = singer.write_bookmark(STATE, 'contact_lists', bookmark_key, max_bk_value)
singer.write_state(STATE)
return STATE
def sync_forms(STATE, ctx):
catalog = ctx.get_catalog_from_id(singer.get_currently_syncing(STATE))
mdata = metadata.to_map(catalog.get('metadata'))
schema = load_schema("forms")
bookmark_key = 'updatedAt'
singer.write_schema("forms", schema, ["guid"], [bookmark_key], catalog.get('stream_alias'))
start = get_start(STATE, "forms", bookmark_key)
max_bk_value = start
LOGGER.info("sync_forms from %s", start)
data = request(get_url("forms")).json()
time_extracted = utils.now()
with Transformer(UNIX_MILLISECONDS_INTEGER_DATETIME_PARSING) as bumble_bee:
for row in data:
record = bumble_bee.transform(lift_properties_and_versions(row), schema, mdata)
if record[bookmark_key] >= start:
singer.write_record("forms", record, catalog.get('stream_alias'), time_extracted=time_extracted)
if record[bookmark_key] >= max_bk_value:
max_bk_value = record[bookmark_key]
STATE = singer.write_bookmark(STATE, 'forms', bookmark_key, max_bk_value)
singer.write_state(STATE)
return STATE
def sync_workflows(STATE, ctx):
catalog = ctx.get_catalog_from_id(singer.get_currently_syncing(STATE))
mdata = metadata.to_map(catalog.get('metadata'))
schema = load_schema("workflows")
bookmark_key = 'updatedAt'
singer.write_schema("workflows", schema, ["id"], [bookmark_key], catalog.get('stream_alias'))
start = get_start(STATE, "workflows", bookmark_key)
max_bk_value = start
STATE = singer.write_bookmark(STATE, 'workflows', bookmark_key, max_bk_value)
singer.write_state(STATE)
LOGGER.info("sync_workflows from %s", start)
data = request(get_url("workflows")).json()
time_extracted = utils.now()
with Transformer(UNIX_MILLISECONDS_INTEGER_DATETIME_PARSING) as bumble_bee:
for row in data['workflows']:
record = bumble_bee.transform(lift_properties_and_versions(row), schema, mdata)
if record[bookmark_key] >= start:
singer.write_record("workflows", record, catalog.get('stream_alias'), time_extracted=time_extracted)
if record[bookmark_key] >= max_bk_value:
max_bk_value = record[bookmark_key]
STATE = singer.write_bookmark(STATE, 'workflows', bookmark_key, max_bk_value)
singer.write_state(STATE)
return STATE
def sync_owners(STATE, ctx):
catalog = ctx.get_catalog_from_id(singer.get_currently_syncing(STATE))
mdata = metadata.to_map(catalog.get('metadata'))
schema = load_schema("owners")
bookmark_key = 'updatedAt'
singer.write_schema("owners", schema, ["ownerId"], [bookmark_key], catalog.get('stream_alias'))
start = get_start(STATE, "owners", bookmark_key)
max_bk_value = start
LOGGER.info("sync_owners from %s", start)
params = {}
if CONFIG.get('include_inactives'):
params['includeInactives'] = "true"
data = request(get_url("owners"), params).json()
time_extracted = utils.now()
with Transformer(UNIX_MILLISECONDS_INTEGER_DATETIME_PARSING) as bumble_bee:
for row in data:
record = bumble_bee.transform(lift_properties_and_versions(row), schema, mdata)
if record[bookmark_key] >= max_bk_value:
max_bk_value = record[bookmark_key]
if record[bookmark_key] >= start:
singer.write_record("owners", record, catalog.get('stream_alias'), time_extracted=time_extracted)
STATE = singer.write_bookmark(STATE, 'owners', bookmark_key, max_bk_value)
singer.write_state(STATE)
return STATE
def sync_engagements(STATE, ctx):
catalog = ctx.get_catalog_from_id(singer.get_currently_syncing(STATE))
mdata = metadata.to_map(catalog.get('metadata'))
schema = load_schema("engagements")
bookmark_key = 'lastUpdated'
singer.write_schema("engagements", schema, ["engagement_id"], [bookmark_key], catalog.get('stream_alias'))
start = get_start(STATE, "engagements", bookmark_key)
# Because this stream doesn't query by `lastUpdated`, it cycles
# through the data set every time. The issue with this is that there
# is a race condition by which records may be updated between the
# start of this table's sync and the end, causing some updates to not
# be captured, in order to combat this, we must store the current
# sync's start in the state and not move the bookmark past this value.
current_sync_start = get_current_sync_start(STATE, "engagements") or utils.now()
STATE = write_current_sync_start(STATE, "engagements", current_sync_start)
singer.write_state(STATE)
max_bk_value = start
LOGGER.info("sync_engagements from %s", start)
STATE = singer.write_bookmark(STATE, 'engagements', bookmark_key, start)
singer.write_state(STATE)
url = get_url("engagements_all")
params = {'limit': 250}
top_level_key = "results"
engagements = gen_request(STATE, 'engagements', url, params, top_level_key, "hasMore", ["offset"], ["offset"])
time_extracted = utils.now()
with Transformer(UNIX_MILLISECONDS_INTEGER_DATETIME_PARSING) as bumble_bee:
for engagement in engagements:
record = bumble_bee.transform(lift_properties_and_versions(engagement), schema, mdata)
if record['engagement'][bookmark_key] >= start:
# hoist PK and bookmark field to top-level record
record['engagement_id'] = record['engagement']['id']
record[bookmark_key] = record['engagement'][bookmark_key]
singer.write_record("engagements", record, catalog.get('stream_alias'), time_extracted=time_extracted)
if record['engagement'][bookmark_key] >= max_bk_value:
max_bk_value = record['engagement'][bookmark_key]
# Don't bookmark past the start of this sync to account for updated records during the sync.
new_bookmark = min(utils.strptime_to_utc(max_bk_value), current_sync_start)
STATE = singer.write_bookmark(STATE, 'engagements', bookmark_key, utils.strftime(new_bookmark))
STATE = write_current_sync_start(STATE, 'engagements', None)
singer.write_state(STATE)
return STATE
def sync_deal_pipelines(STATE, ctx):
catalog = ctx.get_catalog_from_id(singer.get_currently_syncing(STATE))
mdata = metadata.to_map(catalog.get('metadata'))
schema = load_schema('deal_pipelines')
singer.write_schema('deal_pipelines', schema, ['pipelineId'], catalog.get('stream_alias'))
LOGGER.info('sync_deal_pipelines')
data = request(get_url('deal_pipelines')).json()
with Transformer(UNIX_MILLISECONDS_INTEGER_DATETIME_PARSING) as bumble_bee:
for row in data:
record = bumble_bee.transform(lift_properties_and_versions(row), schema, mdata)
singer.write_record("deal_pipelines", record, catalog.get('stream_alias'), time_extracted=utils.now())
singer.write_state(STATE)
return STATE
@attr.s
class Stream:
tap_stream_id = attr.ib()
sync = attr.ib()
key_properties = attr.ib()
replication_key = attr.ib()
replication_method = attr.ib()
STREAMS = [
# Do these first as they are incremental
Stream('subscription_changes', sync_subscription_changes, ['timestamp', 'portalId', 'recipient'], 'startTimestamp', 'INCREMENTAL'),
Stream('email_events', sync_email_events, ['id'], 'startTimestamp', 'INCREMENTAL'),
Stream('contacts', sync_contacts, ["vid"], 'versionTimestamp', 'INCREMENTAL'),
Stream('deals', sync_deals, ["dealId"], 'property_hs_lastmodifieddate', 'INCREMENTAL'),
Stream('companies', sync_companies, ["companyId"], 'property_hs_lastmodifieddate', 'INCREMENTAL'),
# Do these last as they are full table
Stream('forms', sync_forms, ['guid'], 'updatedAt', 'FULL_TABLE'),
Stream('workflows', sync_workflows, ['id'], 'updatedAt', 'FULL_TABLE'),
Stream('owners', sync_owners, ["ownerId"], 'updatedAt', 'FULL_TABLE'),
Stream('campaigns', sync_campaigns, ["id"], None, 'FULL_TABLE'),
Stream('contact_lists', sync_contact_lists, ["listId"], 'updatedAt', 'FULL_TABLE'),
Stream('deal_pipelines', sync_deal_pipelines, ['pipelineId'], None, 'FULL_TABLE'),
Stream('engagements', sync_engagements, ["engagement_id"], 'lastUpdated', 'FULL_TABLE')