Skip to content

Commit

Permalink
Break out checking column names
Browse files Browse the repository at this point in the history
  • Loading branch information
stefpiatek committed Dec 22, 2023
1 parent 9d099a4 commit c559b98
Showing 1 changed file with 16 additions and 13 deletions.
29 changes: 16 additions & 13 deletions cli/src/pixl_cli/_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,18 +82,7 @@ def messages_from_parquet(
"procedure_date",
"procedure_occurrence_id",
]
logger.debug(
f"Extracting messages from {dir_path}. Expecting columns to include "
f"{expected_col_names}"
)

for col in expected_col_names:
if col not in list(cohort_data.columns):
msg = (
f"parquet files are expected to have at least {expected_col_names} as "
f"column names"
)
raise ValueError(msg)
_raise_if_column_names_not_found(cohort_data, expected_col_names)

(
mrn_col_name,
Expand All @@ -106,7 +95,6 @@ def messages_from_parquet(
messages = []

for _, row in cohort_data.iterrows():
# Create new dict to initialise message
message = Message(
mrn=row[mrn_col_name],
accession_number=row[acc_num_col_name],
Expand Down Expand Up @@ -140,3 +128,18 @@ def _check_and_parse_parquet(private_dir: Path, public_dir: Path) -> pd.DataFram
# joining data together
people_procedures = people.merge(procedure, on="person_id")
return people_procedures.merge(accessions, on="procedure_occurrence_id")


def _raise_if_column_names_not_found(
cohort_data: pd.DataFrame, expected_col_names: list[str]
) -> None:
logger.debug(
f"Checking merged parquet files. Expecting columns to include {expected_col_names}"
)
for col in expected_col_names:
if col not in list(cohort_data.columns):
msg = (
f"parquet files are expected to have at least {expected_col_names} as "
f"column names"
)
raise ValueError(msg)

0 comments on commit c559b98

Please sign in to comment.