From 838de528654d71a13f463cb4c016475b00e5f867 Mon Sep 17 00:00:00 2001 From: Dave Date: Tue, 28 Nov 2023 15:41:58 -0800 Subject: [PATCH] Let's see here... --- fec.py | 224 +++++----- server/data/fec/__init__.py | 1 - server/data/fec/committees.py | 274 ------------- server/data/fec/contributions.py | 565 -------------------------- server/data/fec/test_committees.py | 168 -------- server/data/fec/test_contributions.py | 529 ------------------------ server/data/fec_types.py | 97 +++++ server/data/models.py | 383 +++++++++++++++++ server/data/names/__init__.py | 1 - server/data/names/nicknames.py | 259 ------------ server/data/names/test_nicknames.py | 199 --------- server/data/nicknames.py | 74 ++++ server/data/usps/__init__.py | 1 - server/data/usps/city_state.py | 7 - server/data/usps/metros.py | 143 ------- server/data/usps/test_metros.py | 17 - server/data/usps/test_zipcode.py | 65 --- server/data/usps/zipcode.py | 122 ------ 18 files changed, 676 insertions(+), 2453 deletions(-) delete mode 100644 server/data/fec/__init__.py delete mode 100644 server/data/fec/committees.py delete mode 100644 server/data/fec/contributions.py delete mode 100644 server/data/fec/test_committees.py delete mode 100644 server/data/fec/test_contributions.py create mode 100644 server/data/fec_types.py create mode 100644 server/data/models.py delete mode 100644 server/data/names/__init__.py delete mode 100644 server/data/names/nicknames.py delete mode 100644 server/data/names/test_nicknames.py create mode 100644 server/data/nicknames.py delete mode 100644 server/data/usps/__init__.py delete mode 100644 server/data/usps/city_state.py delete mode 100644 server/data/usps/metros.py delete mode 100644 server/data/usps/test_metros.py delete mode 100644 server/data/usps/test_zipcode.py delete mode 100644 server/data/usps/zipcode.py diff --git a/fec.py b/fec.py index 5b14a37..beb4a6b 100755 --- a/fec.py +++ b/fec.py @@ -2,19 +2,20 @@ # ruff: noqa: E501 import json +from itertools import batched import click +from tqdm import tqdm -from server.data.contacts import Contact, IContactProvider, SimpleContactProvider -from server.data.contacts.abbu import DirectoryABBUManager, ZipABBUManager -from server.data.fec.committees import CommitteeManager -from server.data.fec.contributions import ( - ContributionsManager, - ContributionSummariesManager, - FuzzyIdentifier, -) from server.data.manager import DataManager -from server.data.names.nicknames import MessyNicknamesManager, NicknamesManager +from server.data.models import ( + Committee, + Contribution, + create_db_tables, + engine_for_data_manager, + session_for_data_manager, +) +from server.data.nicknames import NicknamesManager @click.group() @@ -37,22 +38,22 @@ def names(): required=False, default=None, ) -def clean(data: str | None = None): - """Clean raw names data.""" +@click.argument("name", required=True) +def related(name: str, data: str | None = None): + """Show all related name sets.""" data_manager = DataManager(data) if data is not None else DataManager.default() - messy_names_manager = MessyNicknamesManager.from_data_manager(data_manager) - nicknames_manager = messy_names_manager.nicknames_manager - nicknames_manager.to_jsonl_data_manager(data_manager) + nicknames_manager = NicknamesManager.from_data_manager(data_manager) + for related_name_set in nicknames_manager.get_related_names(name): + print(json.dumps(list(related_name_set))) @fec.group() -def committees(): - """Work with FEC committees data.""" +def db(): + """Work with the database.""" pass -@committees.command(name="lookup") -@click.argument("committee_id") +@db.command() @click.option( "--data", type=click.Path(exists=True), @@ -60,24 +61,38 @@ def committees(): required=False, default=None, ) -def committee_lookup(committee_id: str, data: str | None = None): - """Search FEC committees data.""" +def init(data: str | None = None): + """Initialize the database.""" data_manager = DataManager(data) if data is not None else DataManager.default() - committees_manager = CommitteeManager.from_csv_data_manager(data_manager) - committee = committees_manager.get_committee(committee_id) - if committee is None: - print("No matching committee.") - else: - print(json.dumps(committee.to_data(), indent=2)) + print(f"Initializing database for {data_manager.path}.") + create_db_tables(engine_for_data_manager(data_manager)) + print("Adding committees...") + with session_for_data_manager(data_manager) as session, session.begin(): + for committee in Committee.from_data_manager(data_manager): + session.add(committee) + print("Adding individual contributions...") + with session_for_data_manager(data_manager) as session: + for contributions in batched( + tqdm( + Contribution.from_data_manager(data_manager), + unit="contribution", + total=70_659_611, + ), + 5_000, + ): + with session.begin(): + session.add_all(contributions) + print("Done.") @fec.group() -def contributions(): - """Work with FEC contributions data.""" +def committees(): + """Work with FEC committees data.""" pass -@contributions.command() +@committees.command(name="search") +@click.argument("name") @click.option( "--data", type=click.Path(exists=True), @@ -85,82 +100,87 @@ def contributions(): required=False, default=None, ) -def summarize(data: str | None = None): - """Summarize raw FEC individual contribution data.""" +def committee_search(name: str, data: str | None = None): + """Search FEC committees data.""" data_manager = DataManager(data) if data is not None else DataManager.default() - contributions_manager = ContributionsManager.from_data_manager(data_manager) - summaries_manager = contributions_manager.contribution_summaries_manager - summaries_manager.to_jsonl_data_manager(data_manager) + with session_for_data_manager(data_manager) as session: + for committee in Committee.for_name(session, name): + print(json.dumps(committee.to_data(), indent=2)) -@contributions.command() -@click.argument("first_name", required=False, default=None) -@click.argument("last_name", required=False, default=None) -@click.argument("zip_code", required=False, default=None) -@click.option( - "-c", - "--contact-dir", - type=click.Path(exists=True, dir_okay=True, file_okay=False), - help="Path to a `.abbu` contacts dir.", - required=False, - default=None, -) -@click.option( - "-z", - "--contact-zip", - type=click.Path(exists=True, dir_okay=False, file_okay=True), - help="Path to a `.abbu` contacts zip file.", - required=False, - default=None, -) -@click.option( - "--data", - type=click.Path(exists=True), - help="Path to data dir.", - required=False, - default=None, -) -def search( - first_name: str | None = None, - last_name: str | None = None, - zip_code: str | None = None, - data: str | None = None, - contact_dir: str | None = None, - contact_zip: str | None = None, -): - """Search summarized FEC contributions data.""" - data_manager = DataManager(data) if data is not None else DataManager.default() - nicknames_manager = NicknamesManager.from_data_manager(data_manager) - summaries_manager = ContributionSummariesManager.from_data_manager(data_manager) - - contact_provider: IContactProvider | None = None - - if contact_dir is not None: - contact_provider = DirectoryABBUManager(contact_dir) - elif contact_zip is not None: - contact_provider = ZipABBUManager(contact_zip) - elif first_name and last_name and zip_code: - singleton = Contact(first_name, last_name, zip_code) - contact_provider = SimpleContactProvider([singleton]) - - if contact_provider is None: - raise click.UsageError( - "You must provide a contact dir, zip file, or explicit name & zip." - ) - - for contact in contact_provider.get_contacts(): - fuzzy_id = FuzzyIdentifier( - contact.last, - contact.first, - contact.zip_code, - get_nickname_index=nicknames_manager, - ).fuzzy_id - summary = summaries_manager.get_summary(fuzzy_id) - print(f"--> {contact.first} {contact.last} {contact.zip_code}") - if summary is None: - print("{}") - else: - print(json.dumps(summary.to_data(), indent=2)) +@fec.group() +def contributions(): + """Work with FEC contributions data.""" + pass + + +# @contributions.command() +# @click.argument("first_name", required=False, default=None) +# @click.argument("last_name", required=False, default=None) +# @click.argument("zip_code", required=False, default=None) +# @click.option( +# "-c", +# "--contact-dir", +# type=click.Path(exists=True, dir_okay=True, file_okay=False), +# help="Path to a `.abbu` contacts dir.", +# required=False, +# default=None, +# ) +# @click.option( +# "-z", +# "--contact-zip", +# type=click.Path(exists=True, dir_okay=False, file_okay=True), +# help="Path to a `.abbu` contacts zip file.", +# required=False, +# default=None, +# ) +# @click.option( +# "--data", +# type=click.Path(exists=True), +# help="Path to data dir.", +# required=False, +# default=None, +# ) +# def search( +# first_name: str | None = None, +# last_name: str | None = None, +# zip_code: str | None = None, +# data: str | None = None, +# contact_dir: str | None = None, +# contact_zip: str | None = None, +# ): +# """Search summarized FEC contributions data.""" +# data_manager = DataManager(data) if data is not None else DataManager.default() +# nicknames_manager = NicknamesManager.from_data_manager(data_manager) + +# contact_provider: IContactProvider | None = None + +# if contact_dir is not None: +# contact_provider = DirectoryABBUManager(contact_dir) +# elif contact_zip is not None: +# contact_provider = ZipABBUManager(contact_zip) +# elif first_name and last_name and zip_code: +# singleton = Contact(first_name, last_name, zip_code) +# contact_provider = SimpleContactProvider([singleton]) + +# if contact_provider is None: +# raise click.UsageError( +# "You must provide a contact dir, zip file, or explicit name & zip." +# ) + +# for contact in contact_provider.get_contacts(): +# fuzzy_id = FuzzyIdentifier( +# contact.last, +# contact.first, +# contact.zip_code, +# get_nickname_index=nicknames_manager, +# ).fuzzy_id +# summary = summaries_manager.get_summary(fuzzy_id) +# print(f"--> {contact.first} {contact.last} {contact.zip_code}") +# if summary is None: +# print("{}") +# else: +# print(json.dumps(summary.to_data(), indent=2)) if __name__ == "__main__": diff --git a/server/data/fec/__init__.py b/server/data/fec/__init__.py deleted file mode 100644 index 68353f1..0000000 --- a/server/data/fec/__init__.py +++ /dev/null @@ -1 +0,0 @@ -"""Tools for working with raw FEC data.""" diff --git a/server/data/fec/committees.py b/server/data/fec/committees.py deleted file mode 100644 index 99a3e37..0000000 --- a/server/data/fec/committees.py +++ /dev/null @@ -1,274 +0,0 @@ -""" -Support reading FEC committee master file content. - -You can download per-election-cycle committee master files from: -https://www.fec.gov/data/browse-data/?tab=bulk-data - -The schema for the committee master file is available at: -https://www.fec.gov/campaign-finance-data/committee-master-file-description/ -""" -import csv -import json -import pathlib -import typing as t -from dataclasses import dataclass - -from server.data.manager import DataManager -from server.utils import validations as v - - -class CommitteeTypeCode: - """ - Committee type codes. - - See: - https://www.fec.gov/campaign-finance-data/committee-type-code-descriptions/ - """ - - COMMUNICATION_COST = "C" - DELEGATE_COMMITTEE = "D" - ELECTIONEERING_COMMUNICATION = "E" - HOUSE = "H" - INDEPEDENT_PERSON_OR_GROUP = "I" - PAC_NONQUALIFIED = "N" - INDEPEDENT_SUPER_PAC = "O" - PRESIDENTIAL = "P" - PAC_QUALIFIED = "Q" - SENATE = "S" - SINGLE_CANDIDATE_INDEPENDENT_EXPENDITURE = "U" - HYBRID_PAC_NONQUALIFIED = "V" - HYBRID_PAC_QUALIFIED = "W" - PARTY_NONQUALIFIED = "X" - PARTY_QUALIFIED = "Y" - NATIONAL_PARTY_NONFEDERAL = "Z" - - @classmethod - def name_for_code(cls, code: str) -> str | None: - """Return the name for the given committee type code.""" - for attr in dir(CommitteeTypeCode): - if not attr.startswith("__"): - if getattr(CommitteeTypeCode, attr) == code: - return attr.replace("_", " ").title() - return None - - -class CommitteeColumns: - """ - Column indices for the committee master file. - - See: - https://www.fec.gov/campaign-finance-data/committee-master-file-description/ - """ - - ID = 0 # CMTE_ID - NAME = 1 # CMTE_NM - TREASURER_NAME = 2 # TRES_NM - STREET_1 = 3 # CMTE_ST1 - STREET_2 = 4 # CMTE_ST2 - CITY = 5 # CMTE_CITY - STATE = 6 # CMTE_ST - ZIP_CODE = 7 # CMTE_ZIP - DESIGNATION = 8 # CMTE_DSGN - TYPE = 9 # CMTE_TP - PARTY = 10 # CMTE_PTY_AFFILIATION - ORG_TYPE = 11 # ORG_TP - CONNECTED_ORG_NAME = 12 # CONNECTED_ORG_NM - CANDIDATE_ID = 13 # CAND_ID - - -class Party: - """ - Political party codes. - - For an (incredibly) exhaustive list, see: - https://www.fec.gov/campaign-finance-data/party-code-descriptions/ - """ - - REPUBLICAN = "REP" - DEMOCRAT = "DEM" - INDEPENDENT = "IND" - LIBERTARIAN = "LIB" - GREEN = "GRE" - UNKNOWN = "UNK" # We specifically ignore this/convert to None - - @classmethod - def name_for_code(cls, code: str) -> str | None: - """Return the name for the given party code.""" - for attr in dir(Party): - if not attr.startswith("__"): - if getattr(Party, attr) == code: - return attr.title() - return None - - -@dataclass(frozen=True) -class Committee: - """Our simplification of the committee record.""" - - id: str - name: str - party: str | None - candidate_id: str | None - - @classmethod - def from_committee_row(cls, row: t.Sequence[str]) -> "Committee": - """Create a committee from a row of the committee master file.""" - data = { - "id": row[CommitteeColumns.ID].strip(), - "name": row[CommitteeColumns.NAME].strip(), - } - party = row[CommitteeColumns.PARTY].strip().upper() - if party and party != Party.UNKNOWN: - data["party"] = party - candidate_id = row[CommitteeColumns.CANDIDATE_ID].strip() - if candidate_id: - data["candidate_id"] = candidate_id - return cls.from_data(data) - - @classmethod - def from_data(cls, value: t.Any) -> "Committee": - """Create a committee from arbitrary data, or raise an exception.""" - data = v.validate_dict(value) - return cls( - id=v.get_str(data, "id"), - name=v.get_str(data, "name"), - party=v.get_optional_str(data, "party"), - candidate_id=v.get_optional_str(data, "candidate_id"), - ) - - def to_data(self) -> dict: - """Return a dict representation of the committee.""" - data = { - "id": self.id, - "name": self.name, - } - if self.party is not None: - data["party"] = self.party - if self.candidate_id is not None: - data["candidate_id"] = self.candidate_id - return data - - -class IGetCommittee(t.Protocol): - """Interface for getting a committee.""" - - def get_committee(self, id: str) -> Committee | None: - """Get the committee with the given id, or None.""" - ... - - -class MockGetCommittee(IGetCommittee): - """A mock implementation of IGetCommittee.""" - - _id_to_committee: dict[str, Committee] - - def __init__(self, committees: t.Sequence[Committee]) -> None: - """Create a mock implementation.""" - self._id_to_committee = {committee.id: committee for committee in committees} - - def get_committee(self, id: str) -> Committee | None: - """Get the committee with the given id, or None.""" - return self._id_to_committee.get(id) - - -class CommitteeManager: - """Manages a collection of committees.""" - - _committees: list[Committee] - _id_to_committee: dict[str, Committee] | None - - def __init__(self, committees: t.Iterable[Committee]) -> None: - """Create a committee manager.""" - self._committees = list(committees) - self._id_to_committee = None - - @classmethod - def from_csv_io(cls, io: t.TextIO) -> "CommitteeManager": - """Create a committee manager from a CSV file.""" - reader = csv.reader(io, delimiter="|") - return cls(Committee.from_committee_row(row) for row in reader) - - @classmethod - def from_csv_path(cls, path: pathlib.Path) -> "CommitteeManager": - """Create a committee manager from a CSV file.""" - path = v.validate_extant_file(path) - with path.open() as file: - return cls.from_csv_io(file) - - @classmethod - def from_csv_data_manager( - cls, data_manager: "DataManager", year: int = 2020 - ) -> "CommitteeManager": - """Create a committee manager from a data manager.""" - return cls.from_csv_path(data_manager.path / "fec" / f"committees-{year}.txt") - - @classmethod - def from_jsonl_io(cls, io: t.TextIO) -> "CommitteeManager": - """Create a committee manager from a json-lines file.""" - return cls(Committee.from_data(json.loads(line)) for line in io) - - @classmethod - def from_jsonl_path(cls, path: pathlib.Path) -> "CommitteeManager": - """Create a committee manager from a json-lines file.""" - path = v.validate_extant_file(path) - with path.open() as file: - return cls.from_jsonl_io(file) - - @classmethod - def from_jsonl_data_manager( - cls, data_manager: "DataManager", year: int = 2020 - ) -> "CommitteeManager": - """Create a committee manager from a data manager.""" - return cls.from_jsonl_path( - data_manager.path / "fec" / f"committees-{year}.jsonl" - ) - - def to_data_lines(self) -> t.Iterable[dict]: - """Convert to a list of json-serializable objects.""" - return (committee.to_data() for committee in self._committees) - - def to_jsonl_io(self, io: t.TextIO) -> None: - """Write to a json file.""" - for data_line in self.to_data_lines(): - io.write(json.dumps(data_line)) - io.write("\n") - - def to_jsonl_path(self, path: pathlib.Path) -> None: - """Write to a json file.""" - with path.open("wt") as output_file: - self.to_jsonl_io(output_file) - - def to_jsonl_data_manager( - self, data_manager: "DataManager", year: int = 2020 - ) -> None: - """Write to a json file.""" - self.to_jsonl_path(data_manager.path / "fec" / f"committees-{year}.jsonl") - - def _index_committees(self) -> None: - """Index the committees by id.""" - assert self._id_to_committee is None - self._id_to_committee = {} - for committee in self._committees: - assert committee.id not in self._id_to_committee - self._id_to_committee[committee.id] = committee - - def _index_committees_if_needed(self) -> None: - """Index the committees by id if needed.""" - if self._id_to_committee is None: - self._index_committees() - - @property - def committees(self) -> t.Sequence[Committee]: - """Get the list of committees.""" - return self._committees - - @property - def id_to_committee(self) -> t.Mapping[str, Committee]: - """Get the mapping from id to committee.""" - self._index_committees_if_needed() - assert self._id_to_committee is not None - return self._id_to_committee - - def get_committee(self, id: str) -> Committee | None: - """Get the committee with the given id, or None.""" - return self.id_to_committee.get(id) diff --git a/server/data/fec/contributions.py b/server/data/fec/contributions.py deleted file mode 100644 index bf106a0..0000000 --- a/server/data/fec/contributions.py +++ /dev/null @@ -1,565 +0,0 @@ -""" -Support reading FEC individual contribution master file content, and -converting it into several derived forms. - -You can download per-election-cycle individual contribution master files from: -https://www.fec.gov/data/browse-data/?tab=bulk-data - -The schema for the individual contribution master file is available at: -https://www.fec.gov/campaign-finance-data/contributions-individuals-file-description/ -""" -import json -import pathlib -import typing as t -from dataclasses import dataclass -from decimal import Decimal - -from server.data.manager import DataManager -from server.data.names.nicknames import IGetNicknameIndex, NicknamesManager -from server.utils import validations as v - -from .committees import CommitteeManager, IGetCommittee - - -def split_name(name: str) -> tuple[str, str | None]: - """ - Split a name into a last name and a first name. - - The name should be in the form LAST, FIRST . If there is no comma, - the entire name is assumed to be the last name. - """ - parts = name.split(",") - last_name = parts[0].strip() - first_name = None - if len(parts) > 1: - first_name = parts[1].strip().split(" ")[0].strip() - return (last_name, first_name) - - -class FuzzyIdentifier: - """A fuzzy identifier for a contributor.""" - - last_name: str - """The contributor's last name.""" - - first_name: str | None - """The contributor's first name, if known.""" - - zip_code: str - """The contributor's ZIP code, either 5 or 9 digits.""" - - _get_nickname_index: IGetNicknameIndex - _fuzzy_id: str | None - - def __init__( - self, - last_name: str, - first_name: str | None, - zip_code: str, - *, - get_nickname_index: IGetNicknameIndex, - ): - self.last_name = last_name - self.first_name = first_name - self.zip_code = zip_code - self._get_nickname_index = get_nickname_index - self._fuzzy_id = None - - @classmethod - def from_name( - cls, name: str, zip_code: str, *, get_nickname_index: IGetNicknameIndex - ) -> str: - """Return a fuzzy identifier from a LAST, FIRST style name.""" - last_name, first_name = split_name(name) - return cls.from_last_first( - last_name, first_name, zip_code, get_nickname_index=get_nickname_index - ) - - @classmethod - def from_last_first( - cls, - last_name: str, - first_name: str | None, - zip_code: str, - *, - get_nickname_index: IGetNicknameIndex, - ) -> str: - """Return a fuzzy identifier from a LAST, FIRST style name.""" - return cls( - last_name, first_name, zip_code, get_nickname_index=get_nickname_index - ).fuzzy_id - - def _nickname_index(self) -> int | None: - """Return the nickname index for the first name.""" - if self.first_name is None: - return None - return self._get_nickname_index.get_index(self.first_name) - - @property - def _first_nickname(self) -> str | None: - """Return the first name or nickname.""" - if self.first_name is None: - return None - index = self._nickname_index() - return self.first_name if index is None else str(index) - - def _make_fuzzy_id(self) -> str: - """Make the fuzzy ID.""" - return f"{self.last_name}-{self._first_nickname}-{self.zip_code[:5]}".upper() - - def _make_fuzzy_id_if_needed(self) -> None: - if self._fuzzy_id is None: - self._fuzzy_id = self._make_fuzzy_id() - - @property - def fuzzy_id(self) -> str: - """Return the fuzzy ID.""" - self._make_fuzzy_id_if_needed() - assert self._fuzzy_id is not None - return self._fuzzy_id - - -class ContributionColumns: - """ - Column indices for the individual contribution master file. - - See: - https://www.fec.gov/campaign-finance-data/contributions-individuals-file-description/ - """ - - COMMITTEE_ID = 0 # Filer identification number (CMTE_ID) - AMENDMENT_INDICATOR = 1 # AMNDT_IND - REPORT_TYPE = 2 # RPT_TP - PRIMARY_GENERAL_INDICATOR = 3 # TRANSACTION_PGI - IMAGE_NUMBER = 4 # IMAGE_NUM - TRANSACTION_TYPE = 5 # TRANSACTION_TP - ENTITY_TYPE = 6 # ENTITY_TP (see EntityTypeCode) - NAME = 7 # NAME (of the contributor, typically in LAST, FIRST format) - CITY = 8 # CITY - STATE = 9 # STATE - ZIP_CODE = 10 # ZIP_CODE (usually 5 or 9 digits, but there are lots of odd ones) - EMPLOYER = 11 # EMPLOYER - OCCUPATION = 12 # OCCUPATION - TRANSACTION_DATE = 13 # TRANSACTION_DT (MMDDYYYY) - TRANSACTION_AMOUNT = 14 # TRANSACTION_AMT (in dollars, NUMBER(14, 2)) - OTHER_ID = 15 # OTHER_ID (for non-individual contributions) - TRANSACTION_ID = 16 # TRAN_ID - FILE_NUMBER = 17 # FILE_NUM - MEMO_CODE = 18 # MEMO_CD - MEMO_TEXT = 19 # MEMO_TEXT - SUB_ID = 20 # SUB_ID (FEC record ID) - - -class EntityTypeCode: - CANDIDATE = "CAN" - CANDIDATE_COMMITTEE = "CCM" - COMMITTEE = "COM" - INDIVIDUAL = "IND" - ORGANIZATION = "ORG" - PAC = "PAC" - PARTY_ORGANIZATION = "PTY" - - @classmethod - def name_for_code(cls, code: str) -> str | None: - """Return the name for the given entity type code.""" - for attr in dir(EntityTypeCode): - if not attr.startswith("__"): - if getattr(EntityTypeCode, attr) == code: - return attr.replace("_", " ").title() - return None - - -@dataclass(frozen=True) -class Contribution: - """Our simpliciation of an individual contribution.""" - - id: str # The FEC record ID (SUB_ID) - committee_id: str # The committee ID (CMTE_ID) contributed to - name: str # The contributor's name (NAME) - city: str # The contributor's city (CITY) - state: str # The contributor's state (STATE) - zip_code: str # The contributor's ZIP code (ZIP_CODE) -- 5 or 9 digits - amount: Decimal - - @property - def zip5(self) -> str: - """Return the 5-digit ZIP code.""" - return self.zip_code[:5] - - @classmethod - def from_contribution_row(cls, row: t.Sequence[str]) -> t.Optional["Contribution"]: - """ - Create an individual contribution from a row of the committee master file. - - Return None if the contribution is not an individual contribution, or if - required fields are missing or invalid. - """ - sub_id = row[ContributionColumns.SUB_ID].strip() - if not sub_id: - return None - committee_id = row[ContributionColumns.COMMITTEE_ID].strip() - if not committee_id: - return None - entity_type = row[ContributionColumns.ENTITY_TYPE].strip() - if entity_type != EntityTypeCode.INDIVIDUAL: - return None - name = row[ContributionColumns.NAME].strip() - if "," not in name: - return None - city = row[ContributionColumns.CITY].strip() - if not city: - return None - state = row[ContributionColumns.STATE].strip() - if not state: - return None - zip_code = row[ContributionColumns.ZIP_CODE].strip() - if len(zip_code) not in {5, 9}: - return None - amount = row[ContributionColumns.TRANSACTION_AMOUNT].strip() - try: - amount = Decimal(amount) - except Exception: - return None - return cls( - id=sub_id, - committee_id=committee_id, - name=name, - city=city, - state=state, - zip_code=zip_code, - amount=amount, - ) - - @classmethod - def from_data(cls, value: t.Any) -> "Contribution": - """Create an individual contribution from arbitrary data, or raise.""" - data = v.validate_dict(value) - return cls( - id=v.get_str(data, "id"), - committee_id=v.get_str(data, "committee_id"), - name=v.get_str(data, "name"), - city=v.get_str(data, "city"), - state=v.get_str(data, "state"), - zip_code=v.get_str(data, "zip_code"), - amount=v.get_convert_decimal(data, "amount"), - ) - - def to_data(self) -> dict: - """Return the contribution as a dictionary.""" - return { - "id": self.id, - "committee_id": self.committee_id, - "name": self.name, - "city": self.city, - "state": self.state, - "zip_code": self.zip_code, - "amount": str(self.amount), - } - - -@dataclass -class ContributionSummary: - fuzzy_id: str - """ - A probably-unique identifier for the contributor. - - It should be possible to re-create this from `name` and `zip_code`. However, - we do not store *all* `name`s that led to this summary record. - """ - - name: str - """A non-fuzzy name for the contributor.""" - - zip_code: str - """The ZIP code of the contributor.""" - - total: Decimal - """The total amount contributed by the contributor.""" - - by_party: dict[str | None, Decimal] - """Total contributions by party. None is for contributions to unknown parties.""" - - by_committee: dict[str, Decimal] - """Total contributions by committee.""" - - @classmethod - def new( - cls, - fuzzy_id: str, - contribution: Contribution, - *, - get_committee: IGetCommittee, - ) -> "ContributionSummary": - """Return an empty contribution summary.""" - total = Decimal(contribution.amount) - committee = get_committee.get_committee(contribution.committee_id) - party = None if committee is None else committee.party - by_party = {party: total} - by_committee = {contribution.committee_id: total} - return cls( - fuzzy_id=fuzzy_id, - name=contribution.name, - zip_code=contribution.zip_code, - total=total, - by_party=by_party, - by_committee=by_committee, - ) - - def add(self, contribution: Contribution, *, get_committee: IGetCommittee) -> None: - """Add a single contribution to the summary.""" - self.total += Decimal(contribution.amount) - committee = get_committee.get_committee(contribution.committee_id) - party = None if committee is None else committee.party - self.by_party[party] = self.by_party.get(party, Decimal(0)) + Decimal( - contribution.amount - ) - self.by_committee[contribution.committee_id] = self.by_committee.get( - contribution.committee_id, Decimal(0) - ) + Decimal(contribution.amount) - - @classmethod - def from_data(cls, value: t.Any) -> "ContributionSummary": - """Create a contribution summary from arbitrary data, or raise.""" - data = v.validate_dict(value) - by_party_data = v.get_dict(data, "by_party") - by_committee_data = v.get_dict(data, "by_committee") - return cls( - fuzzy_id=v.get_str(data, "fuzzy_id"), - name=v.get_str(data, "name"), - zip_code=v.get_str(data, "zip_code"), - total=v.get_convert_decimal(data, "total"), - by_party={ - (None if party == "null" else party): v.validate_convert_decimal(amount) - for party, amount in by_party_data.items() - }, - by_committee={ - committee: v.validate_convert_decimal(amount) - for committee, amount in by_committee_data.items() - }, - ) - - def to_data(self) -> dict: - """Return a dict representation of the contribution summary.""" - return { - "fuzzy_id": self.fuzzy_id, - "name": self.name, - "zip_code": self.zip_code, - "total": str(self.total), - "by_party": { - party if party else "null": str(amount) - for party, amount in self.by_party.items() - }, - "by_committee": { - committee: str(amount) - for committee, amount in self.by_committee.items() - }, - } - - -class ContributionsManager: - """ - Tool for working with raw FEC individual contributions files. - - These are large files, even for a single election cycle. Be warned! - """ - - _contributions: list[Contribution] - """The raw list of contributions.""" - - _get_committee: IGetCommittee - """A tool for getting committees.""" - - _get_nickname_index: IGetNicknameIndex - """A tool for getting nickname indices.""" - - _contribution_summaries: dict[str, ContributionSummary] | None - """A mapping from fuzzy IDs to contribution summaries.""" - - def __init__( - self, - contributions: t.Iterable[Contribution], - *, - get_committee: IGetCommittee, - get_nickname_index: IGetNicknameIndex, - ) -> None: - self._contributions = list(contributions) - self._contribution_summaries = None - self._get_committee = get_committee - self._get_nickname_index = get_nickname_index - - @classmethod - def from_csv_io( - cls, - io: t.TextIO, - *, - get_committee: IGetCommittee, - get_nickname_index: IGetNicknameIndex, - ) -> "ContributionsManager": - """Create a contributions manager from a FEC individual contributions file.""" - # Turns out this is not simply a CSV with a pipe delimiter. I think it comes - # down to escaping quotes, but I'm not sure. So we'll just split on pipes. - rows = (row.strip().split("|") for row in io) - contributions = ( - contribution - for row in rows - if (contribution := Contribution.from_contribution_row(row)) is not None - ) - return cls( - contributions, - get_committee=get_committee, - get_nickname_index=get_nickname_index, - ) - - @classmethod - def from_path( - cls, - path: str | pathlib.Path, - *, - get_committee: IGetCommittee, - get_nickname_index: IGetNicknameIndex, - ) -> "ContributionsManager": - """Create a contributions manager from a path.""" - path = v.validate_extant_file(pathlib.Path(path)) - with path.open("rt") as input_file: - return cls.from_csv_io( - input_file, - get_committee=get_committee, - get_nickname_index=get_nickname_index, - ) - - @classmethod - def from_data_manager( - cls, data_manager: DataManager, year: int = 2020 - ) -> "ContributionsManager": - """Create a contributions manager from a data manager.""" - committee_manager = CommitteeManager.from_csv_data_manager(data_manager, year) - nicknames_manager = NicknamesManager.from_data_manager(data_manager) - return cls.from_path( - data_manager.path / "fec" / f"individual-{year}.txt", - get_committee=committee_manager, - get_nickname_index=nicknames_manager, - ) - - @property - def contributions(self) -> t.Sequence[Contribution]: - """Return the contributions.""" - return self._contributions - - def _summarize_contributions(self) -> None: - """Summarize the contributions.""" - assert self._contribution_summaries is None - self._contribution_summaries = {} - for contribution in self._contributions: - fuzzy_id = FuzzyIdentifier.from_name( - contribution.name, - contribution.zip_code, - get_nickname_index=self._get_nickname_index, - ) - if fuzzy_id not in self._contribution_summaries: - self._contribution_summaries[fuzzy_id] = ContributionSummary.new( - fuzzy_id, - contribution, - get_committee=self._get_committee, - ) - else: - self._contribution_summaries[fuzzy_id].add( - contribution, get_committee=self._get_committee - ) - - def _summarize_contributions_if_needed(self) -> None: - if self._contribution_summaries is None: - self._summarize_contributions() - - @property - def contribution_summaries(self) -> t.Mapping[str, ContributionSummary]: - """Return the contribution summaries.""" - self._summarize_contributions_if_needed() - assert self._contribution_summaries is not None - return self._contribution_summaries - - @property - def contribution_summaries_manager(self) -> "ContributionSummariesManager": - """Get the affiliated contribution summaries manager.""" - return ContributionSummariesManager(self.contribution_summaries) - - -class ContributionSummariesManager: - """ - Tool for working with summarized FEC individual contributions files. - - These are large files, even for a single election cycle. Be warned! - """ - - _contribution_summaries: dict[str, ContributionSummary] - """A mapping from fuzzy IDs to contribution summaries.""" - - def __init__( - self, contribution_summaries: t.Mapping[str, ContributionSummary] - ) -> None: - self._contribution_summaries = dict(contribution_summaries) - - @classmethod - def from_summaries(cls, contribution_summaries: t.Iterable[ContributionSummary]): - """Create a contribution summaries manager from summaries.""" - return cls({summary.fuzzy_id: summary for summary in contribution_summaries}) - - @classmethod - def from_jsonl_io(cls, io: t.TextIO) -> "ContributionSummariesManager": - """ - Read from a json lines file and create a manager. - - The file contains a single ContributionSummary record on each line. - The `fuzzy_id` fields must be unique across the entire dataset. - """ - summaries_data = (json.loads(line) for line in io) - summaries = (ContributionSummary.from_data(data) for data in summaries_data) - return cls({summary.fuzzy_id: summary for summary in summaries}) - - @classmethod - def from_path(cls, path: str | pathlib.Path) -> "ContributionSummariesManager": - """Create a contribution summaries manager from a path.""" - path = v.validate_extant_file(pathlib.Path(path)) - with path.open("rt") as input_file: - return cls.from_jsonl_io(input_file) - - @classmethod - def from_data_manager( - cls, data_manager: DataManager, year: int = 2020 - ) -> "ContributionSummariesManager": - """Create a contribution summaries manager from a data manager.""" - return cls.from_path( - data_manager.path / "fec" / f"contribution-summaries-{year}.jsonl", - ) - - def to_data_lines(self) -> t.Iterable[dict]: - """Convert to a json-serializable object.""" - return (summary.to_data() for summary in self._contribution_summaries.values()) - - def to_jsonl_io(self, io: t.TextIO) -> None: - """Write to a json lines file.""" - for data_line in self.to_data_lines(): - io.write(json.dumps(data_line)) - io.write("\n") - - def to_jsonl_path(self, path: str | pathlib.Path) -> None: - """Write to a json lines file.""" - path = pathlib.Path(path) - with path.open("wt") as output_file: - self.to_jsonl_io(output_file) - - def to_jsonl_data_manager( - self, data_manager: DataManager, year: int = 2020 - ) -> None: - """Write to a json lines file.""" - self.to_jsonl_path( - data_manager.path / "fec" / f"contribution-summaries-{year}.jsonl" - ) - - @property - def contribution_summaries(self) -> t.Mapping[str, ContributionSummary]: - """Return the contribution summaries.""" - return self._contribution_summaries - - def get_summary(self, fuzzy_id: str) -> ContributionSummary | None: - """Return a single contribution summary, if available.""" - return self._contribution_summaries.get(fuzzy_id) diff --git a/server/data/fec/test_committees.py b/server/data/fec/test_committees.py deleted file mode 100644 index 699845e..0000000 --- a/server/data/fec/test_committees.py +++ /dev/null @@ -1,168 +0,0 @@ -# ruff: noqa: E501 D102 - -import io -import unittest - -from server.utils.validations import ValidationError - -from . import committees as c - -RAW_CSV_DATA = """\ -C00000059|HALLMARK CARDS PAC|SARAH MOE|2501 MCGEE|MD #500|KANSAS CITY|MO|64108|U|Q|UNK|M|C|| -C00000422|AMERICAN MEDICAL ASSOCIATION POLITICAL ACTION COMMITTEE|WALKER, KEVIN MR.|25 MASSACHUSETTS AVE, NW|SUITE 600|WASHINGTON|DC|200017400|B|Q||M||DELAWARE MEDICAL PAC| -C00000489|D R I V E POLITICAL FUND CHAPTER 886|JERRY SIMS JR|3528 W RENO||OKLAHOMA CITY|OK|73107|U|N||Q|L|| -C00000547|KANSAS MEDICAL SOCIETY POLITICAL ACTION COMMITTEE|JERRY SLAUGHTER|623 SW 10TH AVE||TOPEKA|KS|666121627|U|Q|UNK|Q|M|KANSAS MEDICAL SOCIETY| -C00000638|INDIANA STATE MEDICAL ASSOCIATION POLITICAL ACTION COMMITTEE|ACHENBACH, GRANT MR.|322 CANAL WALK, CANAL LEVEL||INDIANAPOLIS|IN|46202|U|Q||T|M|| -C00000729|AMERICAN DENTAL ASSOCIATION POLITICAL ACTION COMMITTEE|BARNES, BRAD W DR.|1111 14TH STREET, NW|SUITE 1100|WASHINGTON|DC|200055627|B|Q|UNK|M|M|INDIANA DENTAL PAC| -C00000885|INTERNATIONAL UNION OF PAINTERS AND ALLIED TRADES POLITICAL ACTION TOGETHER POLITICAL COMMITTEE|GALIS, GEORGE|7234 PARKWAY DRIVE||HANOVER|MD|21076|B|Q|UNK|M|L|INTERNATIONAL UNION OF PAINTERS AND ALLIED TRADES| -C00000901|BUILD POLITICAL ACTION COMMITTEE OF THE NATIONAL ASSOCIATION OF HOME BUILDERS (BUILDPAC)|RAMAGE, EILEEN|1201 15TH STREET, NW||WASHINGTON|DC|20005|B|Q|UNK|M|T|NATIONAL ASSOCIATION OF HOME BUILDERS| -C00000935|DCCC|GUINN, LUCINDA|430 SOUTH CAPITOL STREET, SE|2ND FLOOR|WASHINGTON|DC|200034024|U|Y|DEM|M||| -C00000984|UNITED STATES TELECOM ASSOCIATION POLITICAL ACTION COMMITTEE (TELECOMPAC)|HEINER, BRANDON|601 NEW JERSEY AVE NW|STE 600|WASHINGTON|DC|20001|B|Q|UNK|M|T|UNITED STATES TELECOM ASSOCIATION| -""" - - -class CommitteeTypeCodeTestCase(unittest.TestCase): - def test_name_for_code(self): - self.assertEqual( - c.CommitteeTypeCode.name_for_code(c.CommitteeTypeCode.COMMUNICATION_COST), - "Communication Cost", - ) - - def test_name_for_code_none(self): - self.assertEqual(c.CommitteeTypeCode.name_for_code("NOPE"), None) - - -class PartyTestCase(unittest.TestCase): - def test_name_for_code(self): - self.assertEqual(c.Party.name_for_code(c.Party.DEMOCRAT), "Democrat") - - def test_name_for_code_none(self): - self.assertEqual(c.Party.name_for_code("NOPE"), None) - - -class CommitteeTestCase(unittest.TestCase): - def test_from_data_id_name(self): - """Test that we can create a committee from data.""" - data = {"id": "id", "name": "name"} - committee = c.Committee.from_data(data) - self.assertEqual(committee.id, "id") - self.assertEqual(committee.name, "name") - self.assertIsNone(committee.party) - self.assertIsNone(committee.candidate_id) - - def test_from_data_all(self): - """Test that we can create a committee from data.""" - data = { - "id": "id", - "name": "name", - "party": "party", - "candidate_id": "candidate_id", - } - committee = c.Committee.from_data(data) - self.assertEqual(committee.id, "id") - self.assertEqual(committee.name, "name") - self.assertEqual(committee.party, "party") - self.assertEqual(committee.candidate_id, "candidate_id") - - def test_from_data_invalid(self): - """Test that we can create a committee from data.""" - data = {"id": "id", "name": "name", "party": 42, "candidate_id": None} - with self.assertRaises(ValidationError): - c.Committee.from_data(data) - - def test_to_data(self): - """Test that we can create a committee from data.""" - committee = c.Committee("id", "name", "party", "candidate_id") - data = committee.to_data() - self.assertEqual(data["id"], "id") - self.assertEqual(data["name"], "name") - self.assertEqual(data["party"], "party") - self.assertEqual(data["candidate_id"], "candidate_id") - - def test_to_data_missing(self): - """Test that we can create a committee from data.""" - committee = c.Committee("id", "name", None, None) - data = committee.to_data() - self.assertEqual(data["id"], "id") - self.assertEqual(data["name"], "name") - self.assertFalse("party" in data) - self.assertFalse("candidate_id" in data) - - def test_from_committee_row(self): - """Test that we can create a committee from a row.""" - row = [ - "C00000059", - "HALLMARK CARDS PAC", - "SARAH MOE", - "2501 MCGEE", - "MD #500", - "KANSAS CITY", - "MO", - "64108", - "U", - "Q", - "UNK", - "M", - "C", - "CRUNK", - ] - committee = c.Committee.from_committee_row(row) - self.assertEqual(committee.id, "C00000059") - self.assertEqual(committee.name, "HALLMARK CARDS PAC") - self.assertIsNone(committee.party) - self.assertEqual(committee.candidate_id, "CRUNK") - - -class CommitteeManagerTestCase(unittest.TestCase): - def setUp(self): - self.example_committees = [ - c.Committee("id1", "name1", "party1", "candidate_id1"), - c.Committee("id2", "name2", "party2", "candidate_id2"), - c.Committee("id3", "name3", None, None), - ] - - def test_committees(self): - """Test that we can create a committee manager.""" - manager = c.CommitteeManager(self.example_committees) - self.assertEqual(len(manager.committees), len(self.example_committees)) - - def test_id_to_committees(self): - """Test that we can create a committee manager.""" - manager = c.CommitteeManager(self.example_committees) - self.assertEqual( - manager.id_to_committee, - { - "id1": self.example_committees[0], - "id2": self.example_committees[1], - "id3": self.example_committees[2], - }, - ) - - def test_get_committee(self): - """Test that we can create a committee manager.""" - manager = c.CommitteeManager(self.example_committees) - self.assertEqual(manager.get_committee("id1"), self.example_committees[0]) - self.assertEqual(manager.get_committee("id2"), self.example_committees[1]) - self.assertEqual(manager.get_committee("id3"), self.example_committees[2]) - self.assertIsNone(manager.get_committee("id4")) - - def test_jsonl_io(self): - manager = c.CommitteeManager(self.example_committees) - writable = io.StringIO() - manager.to_jsonl_io(writable) - readable = io.StringIO(writable.getvalue()) - manager2 = c.CommitteeManager.from_jsonl_io(readable) - self.assertEqual(manager.committees, manager2.committees) - - def test_csv_io(self): - readable = io.StringIO(RAW_CSV_DATA) - manager = c.CommitteeManager.from_csv_io(readable) - self.assertEqual(len(manager.committees), 10) - committee = manager.get_committee("C00000059") - self.assertIsNotNone(committee) - assert committee is not None - self.assertEqual(committee.id, "C00000059") - self.assertEqual(committee.name, "HALLMARK CARDS PAC") - self.assertIsNone(committee.party) - self.assertIsNone(committee.candidate_id) - self.assertIsNone(manager.get_committee("NOPE")) diff --git a/server/data/fec/test_contributions.py b/server/data/fec/test_contributions.py deleted file mode 100644 index dbf4a5b..0000000 --- a/server/data/fec/test_contributions.py +++ /dev/null @@ -1,529 +0,0 @@ -# ruff: noqa: D102 -import io -import unittest -from decimal import Decimal - -from server.data.names.nicknames import MockGetNicknameIndex -from server.utils.validations import ValidationError - -from . import contributions as cont -from .committees import Committee, MockGetCommittee, Party - - -class SplitNameTestCase(unittest.TestCase): - def test_last_only(self): - self.assertEqual(cont.split_name("Smith"), ("Smith", None)) - - def test_last_comma_first(self): - self.assertEqual(cont.split_name("Smith, John"), ("Smith", "John")) - - def test_stripping(self): - self.assertEqual(cont.split_name(" Smith, John "), ("Smith", "John")) - - -class FuzzyIdentifierTestCase(unittest.TestCase): - def setUp(self): - self.get_nickname_index = MockGetNicknameIndex( - [["Dave", "David", "Davey"], ["Matt", "Matthew"]] - ) - - def test_last_first_simple(self): - self.assertEqual( - cont.FuzzyIdentifier.from_last_first( - "Smith", "John", "12345", get_nickname_index=self.get_nickname_index - ), - "SMITH-JOHN-12345", - ) - - def test_last_no_first_simple(self): - self.assertEqual( - cont.FuzzyIdentifier.from_last_first( - "Smith", None, "12345", get_nickname_index=self.get_nickname_index - ), - "SMITH-NONE-12345", - ) - - def test_last_first_nickname(self): - self.assertEqual( - cont.FuzzyIdentifier.from_last_first( - "Smith", - "Davey", - "12345", - get_nickname_index=self.get_nickname_index, - ), - "SMITH-0-12345", - ) - - -class ContributionTestCase(unittest.TestCase): - def test_from_data_valid(self): - contribution = cont.Contribution.from_data( - { - "id": "12345", - "committee_id": "C12345", - "name": "Smith, John", - "city": "Seattle", - "state": "WA", - "zip_code": "98101", - "amount": "10", - } - ) - self.assertEqual(contribution.id, "12345") - self.assertEqual(contribution.committee_id, "C12345") - self.assertEqual(contribution.name, "Smith, John") - self.assertEqual(contribution.city, "Seattle") - self.assertEqual(contribution.state, "WA") - self.assertEqual(contribution.zip_code, "98101") - self.assertEqual(contribution.amount, Decimal(10)) - - def test_from_data_invalid(self): - with self.assertRaises(ValidationError): - cont.Contribution.from_data({}) - - def test_to_data(self): - contribution = cont.Contribution( - id="12345", - committee_id="C12345", - name="Smith, John", - city="Seattle", - state="WA", - zip_code="98101", - amount=Decimal(10), - ) - self.assertEqual( - contribution.to_data(), - { - "id": "12345", - "committee_id": "C12345", - "name": "Smith, John", - "city": "Seattle", - "state": "WA", - "zip_code": "98101", - "amount": "10", - }, - ) - - def test_from_contribution_row_valid(self): - contribution = cont.Contribution.from_contribution_row( - [ - "C12345", - "", - "", - "", - "", - "", - cont.EntityTypeCode.INDIVIDUAL, - "Smith, John", - "Seattle", - "WA", - "98101", - "", - "", - "", - "10", - "", - "", - "", - "", - "", - "12345", - ] - ) - self.assertIsNotNone(contribution) - assert contribution is not None - self.assertEqual(contribution.id, "12345") - self.assertEqual(contribution.committee_id, "C12345") - self.assertEqual(contribution.name, "Smith, John") - self.assertEqual(contribution.city, "Seattle") - self.assertEqual(contribution.state, "WA") - self.assertEqual(contribution.zip_code, "98101") - self.assertEqual(contribution.amount, Decimal(10)) - - def test_from_contribution_row_invalid(self): - contribution = cont.Contribution.from_contribution_row( - [ - "C12345", - "", - "", - "", - "", - "", - cont.EntityTypeCode.CANDIDATE, - "Smith, John", - "Seattle", - "WA", - "98101", - "", - "", - "", - "10", - "", - "", - "", - "", - "", - "12345", - ] - ) - self.assertIsNone(contribution) - - -class ContributionSummaryTestCase(unittest.TestCase): - def setUp(self): - self.contribution_1 = cont.Contribution( - id="12345", - committee_id="C12345", - name="Smith, John", - city="Seattle", - state="WA", - zip_code="98101", - amount=Decimal(10), - ) - self.contribution_2 = cont.Contribution( - id="12346", - committee_id="C67890", - name="Smith, John", - city="Seattle", - state="WA", - zip_code="98101", - amount=Decimal(20), - ) - self.contribution_3 = cont.Contribution( - id="12347", - committee_id="CABCDE", - name="Smith, John", - city="Seattle", - state="WA", - zip_code="98101", - amount=Decimal(50), - ) - self.get_committee = MockGetCommittee( - [ - Committee( - id="C12345", - name="Barney for America", - party=Party.DEMOCRAT, - candidate_id="CAN12345", - ), - Committee( - id="C67890", - name="Donald for Duck", - party=Party.DEMOCRAT, - candidate_id="CAN67890", - ), - Committee( - id="CABCDE", - name="Jupiter for Pluto", - party=Party.GREEN, - candidate_id="CANABCDE", - ), - ] - ) - - def test_new(self): - summary = cont.ContributionSummary.new( - "SMITH-JOHN-98101", - self.contribution_1, - get_committee=self.get_committee, - ) - self.assertEqual(summary.fuzzy_id, "SMITH-JOHN-98101") - self.assertEqual(summary.name, "Smith, John") - self.assertEqual(summary.zip_code, "98101") - self.assertEqual(summary.total, Decimal(10)) - self.assertEqual(len(summary.by_party), 1) - self.assertEqual(summary.by_party.get(Party.DEMOCRAT), Decimal(10)) - self.assertTrue("C12345" in summary.by_committee) - self.assertEqual(len(summary.by_committee), 1) - self.assertEqual(summary.by_committee.get("C12345"), Decimal(10)) - - def test_add(self): - summary = cont.ContributionSummary.new( - "SMITH-JOHN-98101", - self.contribution_1, - get_committee=self.get_committee, - ) - summary.add(self.contribution_2, get_committee=self.get_committee) - summary.add(self.contribution_3, get_committee=self.get_committee) - self.assertEqual(summary.fuzzy_id, "SMITH-JOHN-98101") - self.assertEqual(summary.name, "Smith, John") - self.assertEqual(summary.zip_code, "98101") - self.assertEqual(summary.total, Decimal(80)) - self.assertEqual(len(summary.by_party), 2) - self.assertEqual(summary.by_party.get(Party.DEMOCRAT), Decimal(30)) - self.assertEqual(summary.by_party.get(Party.GREEN), Decimal(50)) - self.assertEqual(len(summary.by_committee), 3) - self.assertEqual(summary.by_committee.get("C12345"), Decimal(10)) - self.assertEqual(summary.by_committee.get("C67890"), Decimal(20)) - self.assertEqual(summary.by_committee.get("CABCDE"), Decimal(50)) - - def test_from_data_valid(self): - data = { - "fuzzy_id": "SMITH-JOHN-98101", - "name": "Smith, John", - "zip_code": "98101", - "total": "80", - "by_party": {Party.DEMOCRAT: "30", Party.GREEN: "50"}, - "by_committee": {"C12345": "10", "C67890": "20", "CABCDE": "50"}, - } - summary = cont.ContributionSummary.from_data(data) - self.assertEqual(summary.fuzzy_id, "SMITH-JOHN-98101") - self.assertEqual(summary.name, "Smith, John") - self.assertEqual(summary.zip_code, "98101") - self.assertEqual(summary.total, Decimal(80)) - self.assertEqual(len(summary.by_party), 2) - self.assertEqual(summary.by_party.get(Party.DEMOCRAT), Decimal(30)) - self.assertEqual(summary.by_party.get(Party.GREEN), Decimal(50)) - self.assertEqual(len(summary.by_committee), 3) - self.assertEqual(summary.by_committee.get("C12345"), Decimal(10)) - self.assertEqual(summary.by_committee.get("C67890"), Decimal(20)) - self.assertEqual(summary.by_committee.get("CABCDE"), Decimal(50)) - - def test_from_data_invalid(self): - data = { - "fuzzy_id": "SMITH-JOHN-98101", - } - with self.assertRaises(ValidationError): - cont.ContributionSummary.from_data(data) - - def test_to_data(self): - summary = cont.ContributionSummary.new( - "SMITH-JOHN-98101", - self.contribution_1, - get_committee=self.get_committee, - ) - summary.add(self.contribution_2, get_committee=self.get_committee) - summary.add(self.contribution_3, get_committee=self.get_committee) - data = summary.to_data() - self.assertEqual(data["fuzzy_id"], "SMITH-JOHN-98101") - self.assertEqual(data["name"], "Smith, John") - self.assertEqual(data["zip_code"], "98101") - self.assertEqual(data["total"], "80") - self.assertEqual(len(data["by_party"]), 2) - self.assertEqual(data["by_party"].get(Party.DEMOCRAT), "30") - self.assertEqual(data["by_party"].get(Party.GREEN), "50") - self.assertEqual(len(data["by_committee"]), 3) - self.assertEqual(data["by_committee"].get("C12345"), "10") - self.assertEqual(data["by_committee"].get("C67890"), "20") - self.assertEqual(data["by_committee"].get("CABCDE"), "50") - - -class ContributionsManagerTestCase(unittest.TestCase): - def setUp(self): - self.contribution_1 = cont.Contribution( - id="12345", - committee_id="C12345", - name="Smith, John", - city="Seattle", - state="WA", - zip_code="98101", - amount=Decimal(10), - ) - self.contribution_2 = cont.Contribution( - id="12346", - committee_id="C67890", - name="Smith, John", - city="Seattle", - state="WA", - zip_code="98101", - amount=Decimal(20), - ) - self.contribution_3 = cont.Contribution( - id="12347", - committee_id="CABCDE", - name="Smith, John", - city="Seattle", - state="WA", - zip_code="98101", - amount=Decimal(50), - ) - self.contributions = [ - self.contribution_1, - self.contribution_2, - self.contribution_3, - ] - self.get_committee = MockGetCommittee( - [ - Committee( - id="C12345", - name="Barney for America", - party=Party.DEMOCRAT, - candidate_id="CAN12345", - ), - Committee( - id="C67890", - name="Donald for Duck", - party=Party.DEMOCRAT, - candidate_id="CAN67890", - ), - Committee( - id="CABCDE", - name="Jupiter for Pluto", - party=Party.GREEN, - candidate_id="CANABCDE", - ), - ] - ) - self.get_nickname_index = MockGetNicknameIndex( - [["Dave", "David", "Davey"], ["Matt", "Matthew"]] - ) - - def test_contributions(self): - manager = cont.ContributionsManager( - self.contributions, - get_committee=self.get_committee, - get_nickname_index=self.get_nickname_index, - ) - self.assertEqual(len(manager.contributions), 3) - - def test_from_csv_io(self): - contribution_1 = """C12345||||||IND|Smith, John|Seattle|WA|98101||||10||||||12345""" # noqa: E501 - contribution_2 = """C12345||||||COM|Smith, John|Seattle|WA|98101||||10||||||12345""" # noqa: E501 - csv_io = io.StringIO("\n".join([contribution_1, contribution_2])) - manager = cont.ContributionsManager.from_csv_io( - csv_io, - get_committee=self.get_committee, - get_nickname_index=self.get_nickname_index, - ) - self.assertEqual(len(manager.contributions), 1) - self.assertEqual(manager.contributions[0].id, "12345") - - def test_contribution_summaries(self): - manager = cont.ContributionsManager( - self.contributions, - get_committee=self.get_committee, - get_nickname_index=self.get_nickname_index, - ) - self.assertEqual(len(manager.contribution_summaries), 1) - self.assertEqual(manager.contribution_summaries["SMITH-JOHN-98101"].total, 80) - - def test_contribution_summaries_manager(self): - manager = cont.ContributionsManager( - self.contributions, - get_committee=self.get_committee, - get_nickname_index=self.get_nickname_index, - ) - summaries_manager = manager.contribution_summaries_manager - self.assertEqual(len(summaries_manager.contribution_summaries), 1) - - -class ContributionSummariesManagerTestCase(unittest.TestCase): - def setUp(self): - self.summary_1 = cont.ContributionSummary.new( - "SMITH-JOHN-98101", - cont.Contribution( - id="12345", - committee_id="C12345", - name="Smith, John", - city="Seattle", - state="WA", - zip_code="98101", - amount=Decimal(10), - ), - get_committee=MockGetCommittee( - [ - Committee( - id="C12345", - name="Barney for America", - party=Party.DEMOCRAT, - candidate_id="CAN12345", - ) - ] - ), - ) - self.summary_1.add( - cont.Contribution( - id="12346", - committee_id="C67890", - name="Smith, John", - city="Seattle", - state="WA", - zip_code="98101", - amount=Decimal(20), - ), - get_committee=MockGetCommittee( - [ - Committee( - id="C67890", - name="Donald for Duck", - party=Party.DEMOCRAT, - candidate_id="CAN67890", - ) - ] - ), - ) - self.summary_2 = cont.ContributionSummary.new( - "PECK-1-98101", - cont.Contribution( - id="12347", - committee_id="CABCDE", - name="Peck, Dave", - city="Seattle", - state="WA", - zip_code="98101", - amount=Decimal(50), - ), - get_committee=MockGetCommittee( - [ - Committee( - id="CABCDE", - name="Jupiter for Pluto", - party=Party.GREEN, - candidate_id="CANABCDE", - ) - ] - ), - ) - self.summaries = [self.summary_1, self.summary_2] - self.indexed_summaries = { - "SMITH-JOHN-98101": self.summary_1, - "PECK-1-98101": self.summary_2, - } - - def test_contribution_summaries(self): - manager = cont.ContributionSummariesManager(self.indexed_summaries) - self.assertEqual(len(manager.contribution_summaries), 2) - self.assertEqual(manager.contribution_summaries["SMITH-JOHN-98101"].total, 30) - self.assertEqual(manager.contribution_summaries["PECK-1-98101"].total, 50) - - def test_from_summaries(self): - manager = cont.ContributionSummariesManager.from_summaries(self.summaries) - self.assertEqual(len(manager.contribution_summaries), 2) - - def test_from_jsonl_io(self): - json_lines = """\ -{"fuzzy_id": "SMITH-JOHN-98101", "name": "Smith, John", "zip_code": "98101", "total": "30", "by_party": {"DEMOCRAT": "30"}, "by_committee": {"C12345": "30"}} -{"fuzzy_id": "PECK-1-98101", "name": "Peck, Dave", "zip_code": "98101", "total": "50", "by_party": {"GREEN": "50"}, "by_committee": {"CABCDE": "50"}} -""" # noqa: E501 - jsonl_io = io.StringIO(json_lines) - manager = cont.ContributionSummariesManager.from_jsonl_io(jsonl_io) - self.assertEqual(len(manager.contribution_summaries), 2) - self.assertEqual(manager.contribution_summaries["SMITH-JOHN-98101"].total, 30) - self.assertEqual(manager.contribution_summaries["PECK-1-98101"].total, 50) - - def test_to_data_lines(self): - self.manager = cont.ContributionSummariesManager(self.indexed_summaries) - data_lines = list(self.manager.to_data_lines()) - self.assertEqual(len(data_lines), 2) - - def test_to_jsonl_io(self): - self.manager = cont.ContributionSummariesManager(self.indexed_summaries) - jsonl_io = io.StringIO() - self.manager.to_jsonl_io(jsonl_io) - jsonl_io.seek(0) - json_lines = jsonl_io.read() - self.assertEqual(len(json_lines.split("\n")), 3) - - def test_get_summary(self): - self.manager = cont.ContributionSummariesManager(self.indexed_summaries) - summary = self.manager.get_summary("SMITH-JOHN-98101") - self.assertIsNotNone(summary) - assert summary is not None - self.assertEqual(summary.total, 30) - self.assertEqual(summary.by_party.get(Party.DEMOCRAT), 30) - self.assertEqual(summary.by_committee.get("C12345"), 10) - self.assertEqual(summary.by_committee.get("C67890"), 20) - - def test_get_summary_none(self): - self.manager = cont.ContributionSummariesManager(self.indexed_summaries) - summary = self.manager.get_summary("SMITH-JOHN-98102") - self.assertIsNone(summary) diff --git a/server/data/fec_types.py b/server/data/fec_types.py new file mode 100644 index 0000000..ce6ba38 --- /dev/null +++ b/server/data/fec_types.py @@ -0,0 +1,97 @@ +class ContributionColumns: + """ + Column indices for the individual contribution master file. + + See: + https://www.fec.gov/campaign-finance-data/contributions-individuals-file-description/ + """ + + COMMITTEE_ID = 0 # Filer identification number (CMTE_ID) + AMENDMENT_INDICATOR = 1 # AMNDT_IND + REPORT_TYPE = 2 # RPT_TP + PRIMARY_GENERAL_INDICATOR = 3 # TRANSACTION_PGI + IMAGE_NUMBER = 4 # IMAGE_NUM + TRANSACTION_TYPE = 5 # TRANSACTION_TP + ENTITY_TYPE = 6 # ENTITY_TP (see EntityTypeCode) + NAME = 7 # NAME (of the contributor, typically in LAST, FIRST format) + CITY = 8 # CITY + STATE = 9 # STATE + ZIP_CODE = 10 # ZIP_CODE (usually 5 or 9 digits, but there are lots of odd ones) + EMPLOYER = 11 # EMPLOYER + OCCUPATION = 12 # OCCUPATION + TRANSACTION_DATE = 13 # TRANSACTION_DT (MMDDYYYY) + TRANSACTION_AMOUNT = 14 # TRANSACTION_AMT (in dollars, NUMBER(14, 2)) + OTHER_ID = 15 # OTHER_ID (for non-individual contributions) + TRANSACTION_ID = 16 # TRAN_ID + FILE_NUMBER = 17 # FILE_NUM + MEMO_CODE = 18 # MEMO_CD + MEMO_TEXT = 19 # MEMO_TEXT + SUB_ID = 20 # SUB_ID (FEC record ID) + + +class EntityTypeCode: + CANDIDATE = "CAN" + CANDIDATE_COMMITTEE = "CCM" + COMMITTEE = "COM" + INDIVIDUAL = "IND" + ORGANIZATION = "ORG" + PAC = "PAC" + PARTY_ORGANIZATION = "PTY" + + @classmethod + def name_for_code(cls, code: str) -> str | None: + """Return the name for the given entity type code.""" + for attr in dir(EntityTypeCode): + if not attr.startswith("__"): + if getattr(EntityTypeCode, attr) == code: + return attr.replace("_", " ").title() + return None + + +class CommitteeColumns: + """ + Column indices for the committee master file. + + See: + https://www.fec.gov/campaign-finance-data/committee-master-file-description/ + """ + + ID = 0 # CMTE_ID + NAME = 1 # CMTE_NM + TREASURER_NAME = 2 # TRES_NM + STREET_1 = 3 # CMTE_ST1 + STREET_2 = 4 # CMTE_ST2 + CITY = 5 # CMTE_CITY + STATE = 6 # CMTE_ST + ZIP_CODE = 7 # CMTE_ZIP + DESIGNATION = 8 # CMTE_DSGN + TYPE = 9 # CMTE_TP + PARTY = 10 # CMTE_PTY_AFFILIATION + ORG_TYPE = 11 # ORG_TP + CONNECTED_ORG_NAME = 12 # CONNECTED_ORG_NM + CANDIDATE_ID = 13 # CAND_ID + + +class Party: + """ + Political party codes. + + For an (incredibly) exhaustive list, see: + https://www.fec.gov/campaign-finance-data/party-code-descriptions/ + """ + + REPUBLICAN = "REP" + DEMOCRAT = "DEM" + INDEPENDENT = "IND" + LIBERTARIAN = "LIB" + GREEN = "GRE" + UNKNOWN = "UNK" # We specifically ignore this/convert to None + + @classmethod + def name_for_code(cls, code: str) -> str | None: + """Return the name for the given party code.""" + for attr in dir(Party): + if not attr.startswith("__"): + if getattr(Party, attr) == code: + return attr.title() + return None diff --git a/server/data/models.py b/server/data/models.py new file mode 100644 index 0000000..98d77c5 --- /dev/null +++ b/server/data/models.py @@ -0,0 +1,383 @@ +import pathlib +import typing as t +from decimal import Decimal + +import sqlalchemy as sa +import sqlalchemy.orm as sao + +from server.data.fec_types import ( + CommitteeColumns, + ContributionColumns, + EntityTypeCode, + Party, +) +from server.data.manager import DataManager +from server.utils.validations import validate_extant_file + +from .nicknames import split_name + + +class BaseModel(sao.DeclarativeBase): + """Base class for all SQL models.""" + + @classmethod + def all_stmt(cls): + """Return a select statement that includes all records.""" + return sa.select(cls) + + @classmethod + def all(cls, session: sao.Session): + """Return a query that includes all records.""" + statement = cls.all_stmt() + return session.execute(statement).scalars() + + @classmethod + def count(cls, session: sao.Session) -> int: + """Return the number of failures in the database.""" + id_attr = getattr(cls, "id", None) + if id_attr is None: + raise ValueError(f"Model {cls} has no id attribute") + maybe_result = session.execute(sa.select(sa.func.count(id_attr))).scalar() + return maybe_result or 0 + + +class ZipCode(BaseModel): + """ + A 5-digit zip code matched with its city and state. + + Note that a given zip code may be associated with multiple cities and + states, and a given city and state may be associated with multiple zip + codes. + + When inserted, cities and states are normalized to uppercase. + """ + + __tablename__ = "zip_codes" + + id: sao.Mapped[int] = sao.mapped_column(primary_key=True) + zip5: sao.Mapped[str] = sao.mapped_column(sa.String(5), nullable=False, index=True) + city: sao.Mapped[str] = sao.mapped_column(sa.String(64), nullable=False) + state: sao.Mapped[str] = sao.mapped_column(sa.String(2), nullable=False) + + # Define indexes. In particular, (zip5, city state) should be unique. + __table_args__ = ( + sa.Index("zip5_city_state", zip5, city, state, unique=True), + sa.Index("city_state", city, state), + ) + + @classmethod + def for_city_and_state_stmt(cls, city: str, state: str): + """ + Return a select statement that returns all ZipCode records for the + given city and state. + """ + return sa.select(cls).where( + sa.and_(cls.city == city.upper(), cls.state == state.upper()) + ) + + @classmethod + def for_city_and_state( + cls, session: sao.Session, city: str, state: str + ) -> t.Iterable[t.Self]: + """ + Return a query that returns all ZipCode records for the given city and + state. + """ + statement = cls.for_city_and_state_stmt(city, state) + return session.execute(statement).scalars() + + @classmethod + def for_zip_code_stmt(cls, zip_code: str): + """ + Return a select statement that returns all ZipCode records for the + given zip code. + """ + return sa.select(cls).where(cls.zip5 == zip_code[:5]) + + @classmethod + def for_zip_code(cls, session: sao.Session, zip_code: str) -> t.Iterable[t.Self]: + """Return a query that returns all ZipCode records for the given zip code.""" + statement = cls.for_zip_code_stmt(zip_code) + return session.execute(statement).scalars() + + @classmethod + def insert_stmt(cls, zip_code: str, city: str, state: str): + """Return an insert statement that inserts the given zip code.""" + return sa.insert(cls).values( + zip5=zip_code[:5], city=city.upper(), state=state.upper() + ) + + @classmethod + def insert(cls, session: sao.Session, zip_code: str, city: str, state: str) -> None: + """Insert the given zip code.""" + statement = cls.insert_stmt(zip_code, city, state) + session.execute(statement) + + +class Committee(BaseModel): + """Represents an FEC committee.""" + + __tablename__ = "committees" + + id: sao.Mapped[str] = sao.mapped_column(sa.String(18), primary_key=True) + name: sao.Mapped[str] = sao.mapped_column( + sa.String(128), nullable=False, index=True + ) + party: sao.Mapped[str] = sao.mapped_column(sa.String(3), nullable=False) + candidate_id: sao.Mapped[str] = sao.mapped_column(sa.String(18), nullable=True) + + @classmethod + def from_committee_row(cls, row: t.Sequence[str]) -> t.Self: + """Create a committee from a row of the committee master file.""" + return cls( + id=row[CommitteeColumns.ID].strip(), + name=row[CommitteeColumns.NAME].strip().upper(), + party=row[CommitteeColumns.PARTY].strip().upper() or Party.UNKNOWN, + candidate_id=row[CommitteeColumns.CANDIDATE_ID].strip() or None, + ) + + @classmethod + def from_csv_io( + cls, + text_io: t.TextIO, + ) -> t.Iterable[t.Self]: + """Create committees from a FEC committee master file.""" + rows = (row.strip().split("|") for row in text_io) + return (cls.from_committee_row(row) for row in rows) + + @classmethod + def from_path( + cls, + path: pathlib.Path, + ) -> t.Iterable[t.Self]: + """Create committees from a FEC committee master file on disk.""" + path = validate_extant_file(path) + with path.open() as file: + yield from cls.from_csv_io(file) + + @classmethod + def from_data_manager( + cls, + data_manager: DataManager, + year: int = 2020, + ) -> t.Iterable[t.Self]: + """Create committees from a FEC committee master file.""" + return cls.from_path(data_manager.path / "fec" / f"committees-{year}.txt") + + @classmethod + def for_name_stmt(cls, name: str): + """Return a select statement for committees matching the given criteria.""" + return sa.select(cls).where(cls.name.ilike(f"%{name.upper()}%")) + + @classmethod + def for_name( + cls, + session: sao.Session, + name: str, + ) -> t.Iterable[t.Self]: + """Return a query for committees matching the given criteria.""" + statement = cls.for_name_stmt(name) + return session.execute(statement).scalars() + + def to_data(self) -> dict[str, str]: + """Return a dictionary representation of this committee.""" + return { + "id": self.id, + "name": self.name, + "party": self.party, + "candidate_id": self.candidate_id, + } + + +class Contribution(BaseModel): + """Represents a single indvidual FEC contribution.""" + + __tablename__ = "contributions" + + id: sao.Mapped[str] = sao.mapped_column(sa.String(18), primary_key=True) + committee_id: sao.Mapped[str] = sao.mapped_column( + sa.String(18), sa.ForeignKey("committees.id"), nullable=False + ) + committee: sao.Mapped[Committee] = sao.relationship(Committee) + last_name: sao.Mapped[str] = sao.mapped_column(sa.String(64), nullable=False) + first_name: sao.Mapped[str] = sao.mapped_column(sa.String(64), nullable=False) + city: sao.Mapped[str] = sao.mapped_column(sa.String(64), nullable=False) + state: sao.Mapped[str] = sao.mapped_column(sa.String(2), nullable=False) + zip5: sao.Mapped[str] = sao.mapped_column(sa.String(5), nullable=False) + zip_code: sao.Mapped[str] = sao.mapped_column(sa.String(9), nullable=False) + amount_cents: sao.Mapped[int] = sao.mapped_column(sa.Integer, nullable=False) + + # We need to create indexes on the columns we'll be querying on. + + __table_args__ = ( + sa.Index("last_name_zip5_first_name", last_name, zip5, first_name), + sa.Index("last_name_city_state_first_name", last_name, city, state, first_name), + ) + + @classmethod + def for_last_zip_firsts_stmt( + cls, last_name: str, zip_code: str, first_names: t.Iterable[str] + ): + """Return a select statement for contributions matching the given criteria.""" + clean_first_names = [name.upper() for name in first_names] + if len(clean_first_names) == 1: + return sa.select(cls).where( + cls.last_name == last_name.upper(), + cls.zip5 == zip_code[:5], + cls.first_name == clean_first_names[0], + ) + else: + return sa.select(cls).where( + cls.last_name == last_name.upper(), + cls.zip5 == zip_code[:5], + cls.first_name.in_(clean_first_names), + ) + + @classmethod + def for_last_zip_firsts( + cls, + session: sao.Session, + last_name: str, + zip_code: str, + first_names: t.Iterable[str], + ) -> t.Iterable[t.Self]: + """Return a query for contributions matching the given criteria.""" + statement = cls.for_last_zip_firsts_stmt(last_name, zip_code, first_names) + return session.execute(statement).scalars() + + @classmethod + def for_last_city_state_firsts_stmt( + cls, last_name: str, city: str, state: str, first_names: t.Iterable[str] + ): + """Return a select statement for contributions matching the given criteria.""" + clean_first_names = [name.upper() for name in first_names] + if len(clean_first_names) == 1: + return sa.select(cls).where( + cls.last_name == last_name.upper(), + cls.city == city.upper(), + cls.state == state.upper(), + cls.first_name == clean_first_names[0], + ) + else: + return sa.select(cls).where( + cls.last_name == last_name.upper(), + cls.city == city.upper(), + cls.state == state.upper(), + cls.first_name.in_(clean_first_names), + ) + + @classmethod + def for_last_city_state_firsts( + cls, + session: sao.Session, + last_name: str, + city: str, + state: str, + first_names: t.Iterable[str], + ) -> t.Iterable[t.Self]: + """Return a query for contributions matching the given criteria.""" + statement = cls.for_last_city_state_firsts_stmt( + last_name, city, state, first_names + ) + return session.execute(statement).scalars() + + @classmethod + def from_contribution_row(cls, row: t.Sequence[str]) -> t.Self | None: + """Insert a contribution from a row of the contributions file.""" + sub_id = row[ContributionColumns.SUB_ID].strip() + if not sub_id: + return None + committee_id = row[ContributionColumns.COMMITTEE_ID].strip() + if not committee_id: + return None + entity_type = row[ContributionColumns.ENTITY_TYPE].strip() + if entity_type != EntityTypeCode.INDIVIDUAL: + return None + name = row[ContributionColumns.NAME].strip() + if "," not in name: + return None + last_name, first_name = split_name(name) + city = row[ContributionColumns.CITY].strip() + if not city: + return None + state = row[ContributionColumns.STATE].strip() + if not state: + return None + zip_code = row[ContributionColumns.ZIP_CODE].strip() + if len(zip_code) not in {5, 9}: + return None + amount = row[ContributionColumns.TRANSACTION_AMOUNT].strip() + try: + amount_cents = int(Decimal(amount) * 100) + except Exception: + return None + return cls( + id=sub_id, + committee_id=committee_id, + last_name=last_name, + first_name=first_name, + city=city, + state=state, + zip5=zip_code[:5], + zip_code=zip_code, + amount_cents=amount_cents, + ) + + @classmethod + def from_csv_io( + cls, + text_io: t.TextIO, + ) -> t.Iterable[t.Self]: + """Create a contributions manager from a FEC individual contributions file.""" + # Turns out this is not simply a CSV with a pipe delimiter. I think it comes + # down to escaping quotes, but I'm not sure. So we'll just split on pipes. + rows = (row.strip().split("|") for row in text_io) + return ( + contribution + for row in rows + if (contribution := cls.from_contribution_row(row)) is not None + ) + + @classmethod + def from_path( + cls, + path: pathlib.Path, + ) -> t.Iterable[t.Self]: + """Create a contributions manager from a FEC individual contributions file.""" + path = validate_extant_file(path) + with path.open() as file: + yield from cls.from_csv_io(file) + + @classmethod + def from_data_manager( + cls, + data_manager: DataManager, + year: int = 2020, + ) -> t.Iterable[t.Self]: + """Create a contributions manager from a FEC individual contributions file.""" + return cls.from_path(data_manager.path / "fec" / f"individual-{year}.txt") + + +def engine_for_data_manager(data_manager: DataManager) -> sa.Engine: + """Return an engine for the given data manager.""" + return sa.create_engine(f"sqlite:///{data_manager.path / 'fec.db'}") + + +def session_for_data_manager(data_manager: DataManager) -> sao.Session: + """Return a session for the given data manager.""" + return sao.Session(bind=engine_for_data_manager(data_manager)) + + +def default_engine() -> sa.Engine: + """Return an engine for the default data manager.""" + return engine_for_data_manager(DataManager.default()) + + +def default_session() -> sao.Session: + """Return a session for the default data manager.""" + return session_for_data_manager(DataManager.default()) + + +def create_db_tables(engine: sa.Engine) -> None: + """Create the database tables for the given engine.""" + BaseModel.metadata.create_all(engine) diff --git a/server/data/names/__init__.py b/server/data/names/__init__.py deleted file mode 100644 index 60c0dbb..0000000 --- a/server/data/names/__init__.py +++ /dev/null @@ -1 +0,0 @@ -"""Tools for working with people's names.""" diff --git a/server/data/names/nicknames.py b/server/data/names/nicknames.py deleted file mode 100644 index 3839993..0000000 --- a/server/data/names/nicknames.py +++ /dev/null @@ -1,259 +0,0 @@ -"""Tools for working with nicknames.""" -import json -import pathlib -import typing as t - -from server.data.manager import DataManager -from server.utils.validations import validate_extant_file - - -class MessyNicknamesManager: - """ - Tools for working with a 'messy' nicknames file. - - The primary operation of this manager is to both clean and merge the names, - and to provide a mapping from each name to a unique identifier. - """ - - _messy_names: list[frozenset[str]] - """ - A list of sets of related names. A given name may appear in multiple - sets. - """ - - _names: list[frozenset[str]] | None - """A list of sets of related names. A given name will only appear in one set.""" - - def __init__(self, messy_names: t.Sequence[frozenset[str]]): - self._messy_names = list(messy_names) - self._names = None - - @classmethod - def from_messy_io(cls, io: t.TextIO) -> "MessyNicknamesManager": - """ - Create a manager from a file-like object. - - The assumed format: on each line there is a list of related names. - These are probably separated by spaces, although they may also be separated - by `/` and `,` characters. There may be any number of spaces between the - names, and there may be leading and trailing spaces. The names will always - start with a capital letter; they _may_ contain dots (`A.B.`) and - apostrophes (`O'Neil`). It's possible that a given name appears on multiple - lines. - """ - names: list[frozenset[str]] = [] - for line in io: - # Remove all commas, slashes, parens - line = ( - line.replace(",", " ") - .replace("/", "") - .replace("(", "") - .replace(")", "") - ) - # Break the line into a list of names -- split on any - # arbitrary number of spaces - maybe_names = line.split() - # Remove any empty strings - maybe_names = [ - stripped for name in maybe_names if (stripped := name.strip()) - ] - # Remove any strings that don't start with a capital letter - maybe_names = [name for name in maybe_names if name[0].isupper()] - # Make a set of capitalized names - names_set = {name.title() for name in maybe_names} - # Add it if it's not empty - if names_set: - names.append(frozenset(names_set)) - return cls(names) - - @classmethod - def from_path(cls, path: str | pathlib.Path) -> "MessyNicknamesManager": - """Create a manager from a path.""" - path = validate_extant_file(pathlib.Path(path)) - with path.open("rt") as input_file: - return cls.from_messy_io(input_file) - - @classmethod - def from_data_manager(cls, data_manager: DataManager) -> "MessyNicknamesManager": - """Create a manager from a data manager.""" - return cls.from_path(data_manager.path / "names" / "messy.txt") - - def _merge_names(self) -> None: - """Merge the names.""" - # Continuously merge sets that have overlapping names, until no - # more merges are possible - names = list(self.messy_names) - while True: - index = 0 - merged = False - while index < len(names): - index2 = index + 1 - while index2 < len(names): - if names[index] & names[index2]: - names[index] |= names[index2] - del names[index2] - merged = True - else: - index2 += 1 - index += 1 - if not merged: - break - self._names = names - - def _merge_names_if_needed(self) -> None: - """Merge the names if they haven't been merged yet.""" - if self._names is None: - self._merge_names() - - @property - def messy_names(self) -> t.Sequence[frozenset[str]]: - """Get the list of names.""" - return self._messy_names - - @property - def names(self) -> t.Sequence[frozenset[str]]: - """Get the list of merged names.""" - self._merge_names_if_needed() - assert self._names is not None - return self._names - - @property - def nicknames_manager(self) -> "NicknamesManager": - """Get the nicknames manager.""" - return NicknamesManager(self.names) - - -class IGetNicknameIndex(t.Protocol): - """A protocol for getting the index of a nickname.""" - - def get_index(self, name: str) -> int | None: - """Get the index of a nickname.""" - ... - - -class MockGetNicknameIndex(IGetNicknameIndex): - """A simple implementation of IGetNicknameIndex useful for tests.""" - - _name_to_index: dict[str, int] - - def __init__(self, names: t.Sequence[t.Iterable[str]]) -> None: - self._name_to_index = {} - for index, names_set in enumerate(names): - for name in names_set: - self._name_to_index[name] = index - - def get_index(self, name: str) -> int | None: - """Return the index for a given nickname.""" - return self._name_to_index.get(name) - - -class NicknamesManager: - """ - Tool for working with a 'clean' nicknames file. - - This is basically just the merged/indexed version of the messy nicknames - file. - """ - - _names: list[frozenset[str]] - """A list of sets of related names. A given name will only appear in one set.""" - - _name_to_index: dict[str, int] | None = None - """A mapping from each name to the (merged) index of the set it appears in.""" - - def __init__( - self, - names: t.Iterable[frozenset[str]], - ): - self._names = list(names) - self._name_to_index = None - - @classmethod - def from_jsonl_io(cls, io: t.TextIO) -> "NicknamesManager": - """ - Read from a json file and create a manager. - - The file is a json-lines file, where each line is a list of names. - No name will appear more than once in the file. - """ - names = (frozenset(json.loads(line)) for line in io) - return cls(names) - - @classmethod - def from_path(cls, path: str | pathlib.Path) -> "NicknamesManager": - """Create a manager from a path.""" - path = validate_extant_file(pathlib.Path(path)) - with path.open("rt") as input_file: - return cls.from_jsonl_io(input_file) - - @classmethod - def from_data_manager(cls, data_manager: DataManager) -> "NicknamesManager": - """Create a manager from a data manager.""" - return cls.from_path(data_manager.path / "names" / "nicknames.jsonl") - - def to_data_lines(self) -> t.Iterable[list[str]]: - """Convert to a json-serializable object.""" - return (list(names) for names in self.names) - - def to_jsonl_io(self, io: t.TextIO) -> None: - """Write to a json file.""" - for data_line in self.to_data_lines(): - io.write(json.dumps(data_line)) - io.write("\n") - - def to_jsonl_path(self, path: str | pathlib.Path) -> None: - """Write to a json file.""" - path = pathlib.Path(path) - with path.open("wt") as output_file: - self.to_jsonl_io(output_file) - - def to_jsonl_data_manager(self, data_manager: DataManager) -> None: - """Write to a json file.""" - self.to_jsonl_path(data_manager.path / "names" / "nicknames.jsonl") - - def _index_names(self) -> None: - """Index the merged names.""" - self._name_to_index = {} - for index, names_set in enumerate(self.names): - for name in names_set: - if name in self._name_to_index: - raise ValueError(f"Name {name} appears in multiple sets") - self._name_to_index[name] = index - - def _index_names_if_needed(self) -> None: - """Index the merged names if they haven't been indexed yet.""" - if self._name_to_index is None: - self._index_names() - - @property - def names(self) -> t.Sequence[frozenset[str]]: - """Get the list of merged names.""" - return self._names - - @property - def name_to_index(self) -> t.Mapping[str, int]: - """Get the mapping from name to index.""" - self._index_names_if_needed() - assert self._name_to_index is not None - return self._name_to_index - - def get_index(self, name: str) -> int | None: - """Get the index of a name.""" - return self.name_to_index.get(name.title()) - - def get_names_for_index(self, index: int) -> frozenset[str]: - """Get the names associated with an index.""" - if index < 0 or index >= len(self._names): - return frozenset() - return self.names[index] - - def get_related_names(self, name: str) -> frozenset[str]: - """ - Get the set of related names for a name. - - The set will include the name itself. - """ - index = self.get_index(name) - if index is None: - return frozenset() - return self.get_names_for_index(index) diff --git a/server/data/names/test_nicknames.py b/server/data/names/test_nicknames.py deleted file mode 100644 index f333c66..0000000 --- a/server/data/names/test_nicknames.py +++ /dev/null @@ -1,199 +0,0 @@ -# ruff: noqa: D102 -import io -import unittest - -from . import nicknames as n - - -class MessyNicknamesTestCase(unittest.TestCase): - def test_from_messy_io(self) -> None: - messy_io = io.StringIO( - """Dave David, Davey, Davie Rob\n""" - """John Jack, Johnny, Jonathan\n""" - """Bob Bobby, Rob, Robert\n""" - """\n""" - """Matt // Matthew, Matty, Mat, Rob\n""" - ) - manager = n.MessyNicknamesManager.from_messy_io(messy_io) - self.assertEqual( - manager.messy_names, - [ - frozenset(["Dave", "David", "Davey", "Davie", "Rob"]), - frozenset(["John", "Jack", "Johnny", "Jonathan"]), - frozenset(["Bob", "Bobby", "Rob", "Robert"]), - frozenset(["Matt", "Matthew", "Matty", "Mat", "Rob"]), - ], - ) - - def test_messy_names(self) -> None: - manager = n.MessyNicknamesManager( - [ - frozenset(["Dave", "David", "Davey", "Davie", "Rob"]), - frozenset(["John", "Jack", "Johnny", "Jonathan"]), - frozenset(["Bob", "Bobby", "Rob", "Robert"]), - frozenset(["Matt", "Matthew", "Matty", "Mat", "Rob"]), - ], - ) - self.assertEqual( - manager.messy_names, - [ - frozenset(["Dave", "David", "Davey", "Davie", "Rob"]), - frozenset(["John", "Jack", "Johnny", "Jonathan"]), - frozenset(["Bob", "Bobby", "Rob", "Robert"]), - frozenset(["Matt", "Matthew", "Matty", "Mat", "Rob"]), - ], - ) - - def test_names(self) -> None: - """Validate that the names are merged.""" - manager = n.MessyNicknamesManager( - [ - frozenset(["Dave", "David", "Davey", "Davie", "Rob"]), - frozenset(["John", "Jack", "Johnny", "Jonathan"]), - frozenset(["Bob", "Bobby", "Rob", "Robert"]), - frozenset(["Matt", "Matthew", "Matty", "Mat", "Rob"]), - ], - ) - self.assertEqual( - manager.names, - [ - frozenset( - [ - "Dave", - "David", - "Davey", - "Davie", - "Bob", - "Bobby", - "Rob", - "Robert", - "Matt", - "Matthew", - "Matty", - "Mat", - ] - ), - frozenset(["John", "Jack", "Johnny", "Jonathan"]), - ], - ) - - def test_nicknames_manager(self) -> None: - manager = n.MessyNicknamesManager( - [ - frozenset(["Dave", "David", "Davey", "Davie", "Rob"]), - frozenset(["John", "Jack", "Johnny", "Jonathan"]), - frozenset(["Bob", "Bobby", "Rob", "Robert"]), - frozenset(["Matt", "Matthew", "Matty", "Mat", "Rob"]), - ], - ) - nicknames_manager = manager.nicknames_manager - self.assertEqual( - nicknames_manager.names, - [ - frozenset( - [ - "Dave", - "David", - "Davey", - "Davie", - "Bob", - "Bobby", - "Rob", - "Robert", - "Matt", - "Matthew", - "Matty", - "Mat", - ] - ), - frozenset(["John", "Jack", "Johnny", "Jonathan"]), - ], - ) - - -class NicknamesManagerTestCase(unittest.TestCase): - def test_from_jsonl_io(self) -> None: - jsonl_io = io.StringIO("""["A", "B"]\n["C", "D"]\n["E", "F"]\n""") - manager = n.NicknamesManager.from_jsonl_io(jsonl_io) - self.assertEqual( - manager.names, - [ - frozenset(["A", "B"]), - frozenset(["C", "D"]), - frozenset(["E", "F"]), - ], - ) - - def test_names(self) -> None: - manager = n.NicknamesManager( - [ - frozenset(["A", "B"]), - frozenset(["C", "D"]), - frozenset(["E", "F"]), - ], - ) - self.assertEqual( - manager.names, - [ - frozenset(["A", "B"]), - frozenset(["C", "D"]), - frozenset(["E", "F"]), - ], - ) - - def test_name_to_index(self) -> None: - manager = n.NicknamesManager( - [ - frozenset(["A", "B"]), - frozenset(["C", "D"]), - frozenset(["E", "F"]), - ], - ) - self.assertEqual( - manager.name_to_index, {"A": 0, "B": 0, "C": 1, "D": 1, "E": 2, "F": 2} - ) - - def test_get_index(self) -> None: - manager = n.NicknamesManager( - [ - frozenset(["A", "B"]), - frozenset(["C", "D"]), - frozenset(["E", "F"]), - ], - ) - self.assertEqual(manager.get_index("A"), 0) - self.assertEqual(manager.get_index("B"), 0) - self.assertEqual(manager.get_index("C"), 1) - self.assertEqual(manager.get_index("D"), 1) - self.assertEqual(manager.get_index("E"), 2) - self.assertEqual(manager.get_index("F"), 2) - self.assertIsNone(manager.get_index("G")) - - def test_get_names_for_index(self) -> None: - manager = n.NicknamesManager( - [ - frozenset(["A", "B"]), - frozenset(["C", "D"]), - frozenset(["E", "F"]), - ], - ) - self.assertEqual(manager.get_names_for_index(0), frozenset({"A", "B"})) - self.assertEqual(manager.get_names_for_index(1), frozenset({"C", "D"})) - self.assertEqual(manager.get_names_for_index(2), frozenset({"E", "F"})) - self.assertEqual(manager.get_names_for_index(3), frozenset()) - - def test_get_related_names(self) -> None: - manager = n.NicknamesManager( - [ - frozenset(["A", "B"]), - frozenset(["C", "D"]), - frozenset(["E", "F"]), - ], - ) - self.assertEqual(manager.get_related_names("A"), frozenset({"A", "B"})) - self.assertEqual(manager.get_related_names("B"), frozenset({"A", "B"})) - self.assertEqual(manager.get_related_names("C"), frozenset({"C", "D"})) - self.assertEqual(manager.get_related_names("D"), frozenset({"C", "D"})) - self.assertEqual(manager.get_related_names("E"), frozenset({"E", "F"})) - self.assertEqual(manager.get_related_names("F"), frozenset({"E", "F"})) - self.assertEqual(manager.get_related_names("G"), frozenset()) diff --git a/server/data/nicknames.py b/server/data/nicknames.py new file mode 100644 index 0000000..842826b --- /dev/null +++ b/server/data/nicknames.py @@ -0,0 +1,74 @@ +"""Tools for working with nicknames.""" +import pathlib +import typing as t + +from server.data.manager import DataManager +from server.utils.validations import validate_extant_file + + +def split_name(name: str) -> tuple[str, str]: + """Split a name (in LAST, FIRST ) into (last, first) name.""" + name = name.strip() + if not name: + raise ValueError("Name is empty") + + if "," not in name: + raise ValueError("Name is not comma-separated") + + last, first_more = name.split(",", 1) + first, *more = first_more.split(" ") + return last.strip().upper(), first.strip().upper() + + +class NicknamesManager: + """ + Tools for working with a 'messy' nicknames file. + + The presumed format of the file is a list of sets of related names. A given + name may appear in multiple sets. The names will always start with a capital + letter; they _may_ contain dots (`A.B.`) and apostrophes (`O'Neil`). + """ + + _related_names: tuple[frozenset[str], ...] + """A list of sets of related names. A given name may appear in multiple sets.""" + + _indexes_for_name: dict[str, frozenset[int]] + """A dictionary mapping names to the indexes of the sets they appear in.""" + + def __init__(self, names: t.Iterable[t.Iterable[str]]): + self._related_names = tuple( + frozenset(name.upper().strip() for name in name_set) for name_set in names + ) + mutable_indexes_for_name = {} + for i, name_set in enumerate(self._related_names): + for name in name_set: + mutable_indexes_for_name.setdefault(name, set()).add(i) + + self._indexes_for_name = { + name: frozenset(indexes) + for name, indexes in mutable_indexes_for_name.items() + } + + @classmethod + def from_nicknames(cls, text_io: t.TextIO) -> t.Self: + """Create a manager from a file-like object.""" + return cls(frozenset(line.split(",")) for line in text_io if line.strip()) + + @classmethod + def from_path(cls, path: str | pathlib.Path) -> t.Self: + """Create a manager from a path.""" + path = validate_extant_file(pathlib.Path(path)) + with path.open("rt") as input_file: + return cls.from_nicknames(input_file) + + @classmethod + def from_data_manager(cls, data_manager: DataManager) -> t.Self: + """Create a manager from a data manager.""" + return cls.from_path(data_manager.path / "names" / "raw.txt") + + def get_related_names(self, name: str) -> t.Iterable[frozenset[str]]: + """Get the sets of related names for a given name.""" + return frozenset( + self._related_names[index] + for index in self._indexes_for_name.get(name.upper().strip(), []) + ) diff --git a/server/data/usps/__init__.py b/server/data/usps/__init__.py deleted file mode 100644 index 441bbca..0000000 --- a/server/data/usps/__init__.py +++ /dev/null @@ -1 +0,0 @@ -"""Tools for working with united states postal service data.""" diff --git a/server/data/usps/city_state.py b/server/data/usps/city_state.py deleted file mode 100644 index 195ee94..0000000 --- a/server/data/usps/city_state.py +++ /dev/null @@ -1,7 +0,0 @@ -from dataclasses import dataclass - - -@dataclass(frozen=True) -class CityState: - city: str - state: str diff --git a/server/data/usps/metros.py b/server/data/usps/metros.py deleted file mode 100644 index 3d90b55..0000000 --- a/server/data/usps/metros.py +++ /dev/null @@ -1,143 +0,0 @@ -""" -Preferred mappings from city name to city and state, if we are unable -to find alternative info for a contact. -""" - -from .city_state import CityState - -_METROS: dict[str, CityState] = { - "New York": CityState("New York", "NY"), - "Newark": CityState("Newark", "NJ"), - "Jersey City": CityState("Jersey City", "NJ"), - "Los Angeles": CityState("Los Angeles", "CA"), - "Long Beach": CityState("Long Beach", "CA"), - "Anaheim": CityState("Anaheim", "CA"), - "Chicago": CityState("Chicago", "IL"), - "Naperville": CityState("Naperville", "IL"), - "Elgin": CityState("Elgin", "IL"), - "Dallas": CityState("Dallas", "TX"), - "Fort Worth": CityState("Fort Worth", "TX"), - # "Arlington": CityState("Arlington", "TX"), - "Houston": CityState("Houston", "TX"), - "The Woodlands": CityState("The Woodlands", "TX"), - "Sugar Land": CityState("Sugar Land", "TX"), - "Washington": CityState("Washington", "DC"), - # "Arlington": CityState("Arlington", "VA"), - "Alexandria": CityState("Alexandria", "VA"), - "Philadelphia": CityState("Philadelphia", "PA"), - "Camden": CityState("Camden", "NJ"), - "Wilmington": CityState("Wilmington", "DE"), - "Atlanta": CityState("Atlanta", "GA"), - "Sandy Springs": CityState("Sandy Springs", "GA"), - "Alpharetta": CityState("Alpharetta", "GA"), - "Miami": CityState("Miami", "FL"), - "Fort Lauderdale": CityState("Fort Lauderdale", "FL"), - "West Palm Beach": CityState("West Palm Beach", "FL"), - "Pompano Beach": CityState("Pompano Beach", "FL"), - "Phoenix": CityState("Phoenix", "AZ"), - "Mesa": CityState("Mesa", "AZ"), - "Chandler": CityState("Chandler", "AZ"), - "Boston": CityState("Boston", "MA"), - "Cambridge": CityState("Cambridge", "MA"), - "Newton": CityState("Newton", "MA"), - "Riverside": CityState("Riverside", "CA"), - "San Bernardino": CityState("San Bernardino", "CA"), - "Ontario": CityState("Ontario", "CA"), - "San Francisco": CityState("San Francisco", "CA"), - "Oakland": CityState("Oakland", "CA"), - "Berkeley": CityState("Berkeley", "CA"), - "Detroit": CityState("Detroit", "MI"), - "Warren": CityState("Warren", "MI"), - "Dearborn": CityState("Dearborn", "MI"), - "Seattle": CityState("Seattle", "WA"), - "Tacoma": CityState("Tacoma", "WA"), - "Bellevue": CityState("Bellevue", "WA"), - "Minneapolis": CityState("Minneapolis", "MN"), - "St. Paul": CityState("St. Paul", "MN"), - "Bloomington": CityState("Bloomington", "MN"), - "Tampa": CityState("Tampa", "FL"), - "St. Petersburg": CityState("St. Petersburg", "FL"), - "Clearwater": CityState("Clearwater", "FL"), - "San Diego": CityState("San Diego", "CA"), - "Chula Vista": CityState("Chula Vista", "CA"), - "Carlsbad": CityState("Carlsbad", "CA"), - "Denver": CityState("Denver", "CO"), - "Aurora": CityState("Aurora", "CO"), - "Lakewood": CityState("Lakewood", "CO"), - "Baltimore": CityState("Baltimore", "MD"), - "Columbia": CityState("Columbia", "MD"), - "Towson": CityState("Towson", "MD"), - "St. Louis": CityState("St. Louis", "MO"), - "Orlando": CityState("Orlando", "FL"), - "Kissimmee": CityState("Kissimmee", "FL"), - "Sanford": CityState("Sanford", "FL"), - "Charlotte": CityState("Charlotte", "NC"), - "Concord": CityState("Concord", "NC"), - "Gastonia": CityState("Gastonia", "NC"), - "San Antonio": CityState("San Antonio", "TX"), - "New Braunfels": CityState("New Braunfels", "TX"), - "Portland": CityState("Portland", "OR"), - "Vancouver": CityState("Vancouver", "WA"), - "Hillsboro": CityState("Hillsboro", "OR"), - "Austin": CityState("Austin", "TX"), - "Round Rock": CityState("Round Rock", "TX"), - "Georgetown": CityState("Georgetown", "TX"), - "Sacramento": CityState("Sacramento", "CA"), - "Roseville": CityState("Roseville", "CA"), - "Folsom": CityState("Folsom", "CA"), - "Pittsburgh": CityState("Pittsburgh", "PA"), - "Las Vegas": CityState("Las Vegas", "NV"), - "Henderson": CityState("Henderson", "NV"), - "Paradise": CityState("Paradise", "NV"), - "Cincinnati": CityState("Cincinnati", "OH"), - # "Kansas City": CityState("Kansas City", "MO"), - "Columbus": CityState("Columbus", "OH"), - "Indianapolis": CityState("Indianapolis", "IN"), - "Carmel": CityState("Carmel", "IN"), - "Anderson": CityState("Anderson", "IN"), - "Cleveland": CityState("Cleveland", "OH"), - "Elyria": CityState("Elyria", "OH"), - "Nashville": CityState("Nashville", "TN"), - "Davidson": CityState("Davidson", "TN"), - "Murfreesboro": CityState("Murfreesboro", "TN"), - "Franklin": CityState("Franklin", "TN"), - "San Jose": CityState("San Jose", "CA"), - "Sunnyvale": CityState("Sunnyvale", "CA"), - "Santa Clara": CityState("Santa Clara", "CA"), - "Virginia Beach": CityState("Virginia Beach", "VA"), - "Norfolk": CityState("Norfolk", "VA"), - "Newport News": CityState("Newport News", "VA"), - "Jacksonville": CityState("Jacksonville", "FL"), - "Providence": CityState("Providence", "RI"), - "Warwick": CityState("Warwick", "RI"), - "Milwaukee": CityState("Milwaukee", "WI"), - "Waukesha": CityState("Waukesha", "WI"), - "Raleigh": CityState("Raleigh", "NC"), - "Durham": CityState("Durham", "NC"), - "Cary": CityState("Cary", "NC"), - "Oklahoma City": CityState("Oklahoma City", "OK"), - "Richmond": CityState("Richmond", "VA"), - "Memphis": CityState("Memphis", "TN"), - "Louisville": CityState("Louisville", "KY"), - "Salt Lake City": CityState("Salt Lake City", "UT"), - "New Orleans": CityState("New Orleans", "LA"), - "Baton Rouge": CityState("Baton Rouge", "LA"), - "Metairie": CityState("Metairie", "LA"), - "Hartford": CityState("Hartford", "CT"), - "East Hartford": CityState("East Hartford", "CT"), - "Middletown": CityState("Middletown", "CT"), - "Buffalo": CityState("Buffalo", "NY"), - "Cheektowaga": CityState("Cheektowaga", "NY"), - "Birmingham": CityState("Birmingham", "AL"), - "Hoover": CityState("Hoover", "AL"), -} - - -class MajorMetros: - """Simple tool to look up preferred city and state for a given city name.""" - - @classmethod - def for_city(cls, city: str) -> CityState | None: - """Return the preferred city and state for the given city name.""" - cs = _METROS.get(city.title()) - return CityState(cs.city.upper(), cs.state.upper()) if cs else None diff --git a/server/data/usps/test_metros.py b/server/data/usps/test_metros.py deleted file mode 100644 index 775c091..0000000 --- a/server/data/usps/test_metros.py +++ /dev/null @@ -1,17 +0,0 @@ -# ruff: noqa: D102 -from unittest import TestCase - -from .city_state import CityState -from .metros import MajorMetros - - -class ForCityTestCase(TestCase): - def test_seattle(self): - self.assertEqual(MajorMetros.for_city("Seattle"), CityState("SEATTLE", "WA")) - - def test_case_inesensitive(self): - self.assertEqual(MajorMetros.for_city("seattle"), CityState("SEATTLE", "WA")) - self.assertEqual(MajorMetros.for_city("SEATTLE"), CityState("SEATTLE", "WA")) - - def test_nothing(self): - self.assertIsNone(MajorMetros.for_city("Nothing")) diff --git a/server/data/usps/test_zipcode.py b/server/data/usps/test_zipcode.py deleted file mode 100644 index e0027e7..0000000 --- a/server/data/usps/test_zipcode.py +++ /dev/null @@ -1,65 +0,0 @@ -# ruff: noqa: D102 -import io -from unittest import TestCase - -from . import zipcode as z - -FAKE_CSV_DATA = """\ -PHYSICAL ZIP,PHYSICAL CITY,PHYSICAL STATE -12345,NEW YORK,NY -12345,NEW YORK,NY -12345,BRONX,NY -98101,SEATTLE,WA -98102,SEATTLE,WA -98103,SEATTLE,WA -98104,SEATTLE,WA -98105,SEATTLE,WA -""" - - -class ZipCodeManagerTestCase(TestCase): - def setUp(self): - self.data = io.StringIO(FAKE_CSV_DATA) - self.zip_code_manager = z.ZipCodeManager.from_csv_io(self.data) - self.new_york = z.CityState("NEW YORK", "NY") - self.bronx = z.CityState("BRONX", "NY") - self.seattle = z.CityState("SEATTLE", "WA") - - def test_init(self): - self.assertEqual(len(self.zip_code_manager.zip_codes), 8) - - def test_city_to_zip_codes(self): - self.assertEqual(len(self.zip_code_manager.city_to_zip_codes), 3) - self.assertEqual(len(self.zip_code_manager.city_to_zip_codes[self.new_york]), 1) - self.assertEqual(len(self.zip_code_manager.city_to_zip_codes[self.bronx]), 1) - self.assertEqual(len(self.zip_code_manager.city_to_zip_codes[self.seattle]), 5) - - def test_zip5_to_cities(self): - self.assertEqual(len(self.zip_code_manager.zip5_to_cities), 6) - self.assertEqual( - self.zip_code_manager.zip5_to_cities["12345"], - frozenset([self.new_york, self.bronx]), - ) - self.assertEqual( - self.zip_code_manager.zip5_to_cities["98101"], frozenset([self.seattle]) - ) - - def test_get_zip_codes(self): - self.assertEqual(len(self.zip_code_manager.get_zip_codes(self.new_york)), 1) - self.assertEqual(len(self.zip_code_manager.get_zip_codes(self.bronx)), 1) - self.assertEqual(len(self.zip_code_manager.get_zip_codes(self.seattle)), 5) - self.assertEqual(len(self.zip_code_manager.get_zip_codes("seattle")), 5) - self.assertEqual(len(self.zip_code_manager.get_zip_codes("nowhere")), 0) - - def test_get_city_states(self): - self.assertEqual( - self.zip_code_manager.get_city_states("12345"), - frozenset([self.new_york, self.bronx]), - ) - self.assertEqual( - self.zip_code_manager.get_city_states("98101"), frozenset([self.seattle]) - ) - - def test_get_city_state_not_found(self): - self.assertEqual(self.zip_code_manager.get_city_states("00000"), frozenset()) - self.assertEqual(self.zip_code_manager.get_city_states("99999"), frozenset()) diff --git a/server/data/usps/zipcode.py b/server/data/usps/zipcode.py deleted file mode 100644 index a5ba2cc..0000000 --- a/server/data/usps/zipcode.py +++ /dev/null @@ -1,122 +0,0 @@ -import csv -import pathlib -import typing as t -from dataclasses import dataclass - -from server.data.manager import DataManager -from server.utils.validations import validate_extant_file - -from .city_state import CityState -from .metros import MajorMetros - - -@dataclass(frozen=True) -class ZipCode: - zip5: str - city: str - state: str - - def as_cs(self) -> CityState: - """Return a CityState object with the same city and state as this ZipCode.""" - return CityState(city=self.city, state=self.state) - - -class ZipCodeManager: - """Offers methods for managing the raw USPS-supplied unique ZIP code data csv.""" - - _zip_codes: list[ZipCode] - _city_to_zip_codes: dict[CityState, frozenset[ZipCode]] | None - _zip5_to_cities: dict[str, frozenset[CityState]] | None - - def __init__(self, zip_codes: t.Sequence[ZipCode]) -> None: - self._zip_codes = list(zip_codes) - self._city_to_zip_codes = None - self._zip5_to_cities = None - - @classmethod - def from_csv_io(cls, io: t.TextIO) -> "ZipCodeManager": - """Return a ZipCodeManager with the given io stream.""" - zip_codes = [] - reader = csv.DictReader(io) - for row in reader: - zip_code = ZipCode( - zip5=row["PHYSICAL ZIP"], - city=row["PHYSICAL CITY"].upper().strip(), - state=row["PHYSICAL STATE"].upper().strip(), - ) - zip_codes.append(zip_code) - return cls(zip_codes) - - @classmethod - def from_path(cls, path: str | pathlib.Path) -> "ZipCodeManager": - """Return a ZipCodeManager with the given path.""" - path = validate_extant_file(pathlib.Path(path)) - with open(path) as f: - return cls.from_csv_io(f) - - @classmethod - def from_data_manager(cls, data_manager: DataManager) -> "ZipCodeManager": - """Return a ZipCodeManager with the same path as the given DataManager.""" - return cls.from_path(data_manager.path / "usps" / "zips.csv") - - def _index_cities(self) -> None: - assert self._city_to_zip_codes is None - unfrozen_city_to_zip_codes: dict[CityState, set[ZipCode]] = {} - for zip_code in self.zip_codes: - unfrozen_city_to_zip_codes.setdefault(zip_code.as_cs(), set()).add(zip_code) - self._city_to_zip_codes = { - k: frozenset(v) for k, v in unfrozen_city_to_zip_codes.items() - } - - def _index_cities_if_needed(self) -> None: - if self._city_to_zip_codes is None: - self._index_cities() - - def _index_zip5s(self) -> None: - assert self._zip5_to_cities is None - unfrozen_zip5_to_cities: dict[str, set[CityState]] = {} - for zip_code in self.zip_codes: - unfrozen_zip5_to_cities.setdefault(zip_code.zip5, set()).add( - zip_code.as_cs() - ) - self._zip5_to_cities = { - k: frozenset(v) for k, v in unfrozen_zip5_to_cities.items() - } - - def _index_zip5s_if_needed(self) -> None: - if self._zip5_to_cities is None: - self._index_zip5s() - - @property - def zip_codes(self) -> t.Sequence[ZipCode]: - """Return a list of all unique ZIP codes.""" - return self._zip_codes - - @property - def city_to_zip_codes(self) -> t.Mapping[CityState, frozenset[ZipCode]]: - """ - Return a dict mapping each city to a set of all unique ZIP - codes in that city. - """ - self._index_cities_if_needed() - assert self._city_to_zip_codes is not None - return self._city_to_zip_codes - - @property - def zip5_to_cities(self) -> t.Mapping[str, frozenset[CityState]]: - """Return a dict mapping each ZIP5 to the city and state it belongs to.""" - self._index_zip5s_if_needed() - assert self._zip5_to_cities is not None - return {k: frozenset(v) for k, v in self._zip5_to_cities.items()} - - def get_zip_codes(self, city: str | CityState | None) -> frozenset[ZipCode]: - """Return a set of all unique ZIP codes in the given city.""" - if isinstance(city, str): - city = MajorMetros.for_city(city) - if city is None: - return frozenset() - return self.city_to_zip_codes.get(city, frozenset()) - - def get_city_states(self, zip5: str) -> frozenset[CityState]: - """Return all cities and states for the given ZIP5.""" - return self.zip5_to_cities.get(zip5, frozenset())