Skip to content

Commit

Permalink
Load all contacts at start of job
Browse files Browse the repository at this point in the history
  • Loading branch information
chopkinsmade committed Nov 22, 2024
1 parent 29c4ceb commit a8cc563
Show file tree
Hide file tree
Showing 3 changed files with 282 additions and 105 deletions.
10 changes: 10 additions & 0 deletions cron-scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from datahub.company.tasks.adviser import schedule_automatic_adviser_deactivate
from datahub.company.tasks.company import schedule_automatic_company_archive
from datahub.company.tasks.contact import (
ingest_contact_consent_data,
schedule_automatic_contact_archive,
)
from datahub.company.tasks.export_potential import update_company_export_potential_from_csv
Expand Down Expand Up @@ -246,6 +247,15 @@ def schedule_jobs():
schedule_export_win_auto_resend_client_email()
schedule_user_reminder_migration()
schedule_update_company_export_potential_from_csv()
job_scheduler(
max_retries=3,
function=ingest_contact_consent_data,
cron=EVERY_HOUR,
queue_name=LONG_RUNNING_QUEUE,
retry_backoff=True,
description='Import contact consent data',
job_timeout=HALF_DAY_IN_SECONDS,
)


def schedule_email_ingestion_tasks():
Expand Down
88 changes: 53 additions & 35 deletions datahub/company/tasks/contact.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@
import json
import logging
import math
from typing import List

import environ
import requests

from dateutil import parser
from django.conf import settings
from django.core.exceptions import ImproperlyConfigured
Expand Down Expand Up @@ -202,6 +202,7 @@ def ingest(self):
CONSENT_PREFIX,
)
return

for file_key in file_keys:
try:
self.sync_file_with_database(s3_client, file_key)
Expand All @@ -213,13 +214,35 @@ def ingest(self):
)
raise exc

def get_grouped_contacts(self) -> dict[str, List[Contact]]:
contacts_qs = Contact.objects.all()
contact_dict = {}
for d in contacts_qs:
contact_dict.setdefault(d.email, []).append(d)
return contact_dict

def should_update_contact(self, contact: Contact, consent_row):

last_modified = consent_row['last_modified'] if 'last_modified' in consent_row else None

if contact.consent_data_last_modified is None or last_modified is None:
return True

# To avoid issues with different source system time formats, just compare on
# the date portion
if parser.parse(last_modified).date() > contact.consent_data_last_modified.date():
return True
return False

def sync_file_with_database(self, client, file_key):
logger.info(
'Syncing contact consent file %s with datahub contacts',
file_key,
)
path = f's3://{BUCKET}/{file_key}'

contact_dict = self.get_grouped_contacts()

with open(
path,
'r',
Expand All @@ -242,54 +265,49 @@ def sync_file_with_database(self, client, file_key):
continue

email = consent_row['email']
matching_contacts = Contact.objects.filter(email=email)

if not matching_contacts.exists():
logger.info(
'Email %s in contact consent file has no matching datahub contact',
email,
)
if not email:
logger.debug('Row %s has no email', i)
continue

for matching_contact in matching_contacts:
matching_contacts = contact_dict.get(email)

last_modified = (
consent_row['last_modified'] if 'last_modified' in consent_row else None
if not matching_contacts or len(matching_contacts) == 0:
logger.debug(
'Email %s in contact consent file has no matching datahub contacts',
email,
)
continue

update_row = False
if (
matching_contact.consent_data_last_modified is None
or last_modified is None
):
update_row = True
# To avoid issues with different source system time formats, just compare on
# the date portion
elif (
parser.parse(last_modified).date()
> matching_contact.consent_data_last_modified.date()
):
update_row = True

if not update_row:
logger.debug(
'Email %s consent data has not been updated in the latest file',
for contact in matching_contacts:

if not self.should_update_contact(contact, consent_row):
logger.info(
'Email %s does not need to be updated',
email,
)
continue
if settings.ENABLE_CONTACT_CONSENT_INGEST:
matching_contact.consent_data = consent_row['consents']
matching_contact.consent_data_last_modified = (
last_modified if last_modified else datetime.datetime.now()
)
matching_contact.save()

logger.info('Updated contact consent data for email %s', email)
else:
if not settings.ENABLE_CONTACT_CONSENT_INGEST:
logger.info(
'Email %s would have consent data updated, but setting is disabled',
email,
)
continue

contact.consent_data = consent_row['consents']
contact.consent_data_last_modified = (
consent_row['last_modified']
if 'last_modified' in consent_row and consent_row['last_modified']
else datetime.datetime.now()
)

# We don't need to trigger sync related data signals, use an update instead of a save
Contact.objects.filter(id=contact.id).update(
consent_data=contact.consent_data,
consent_data_last_modified=contact.consent_data_last_modified,
)
logger.info('Updated contact consent data for email %s in row %s', email, i)

logger.info(
'Finished processing total %s rows for contact consent from file %s',
Expand Down
Loading

0 comments on commit a8cc563

Please sign in to comment.