Skip to content

Commit

Permalink
Merge branch 'alpha' into SET-370-Move-display-workflow-PR-to-cpg_flow
Browse files Browse the repository at this point in the history
  • Loading branch information
violetbrina authored Jan 9, 2025
2 parents 1c64e6b + 24524a7 commit 095e1d0
Show file tree
Hide file tree
Showing 19 changed files with 790 additions and 720 deletions.
2 changes: 0 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -388,7 +388,6 @@ dev-dependencies = [
"bump2version>=1.0.1",
"commitizen>=3.30.0",
"coverage>=7.6.4",
"cpg-flow",
"hail>=0.2.133",
"pdoc>=15.0.1",
"pip-audit>=2.7.3",
Expand All @@ -403,7 +402,6 @@ dev-dependencies = [
]

[tool.uv.sources]
cpg-flow = { workspace = true }

[tool.setuptools_scm]
version_scheme = "only-version"
Expand Down
11 changes: 7 additions & 4 deletions src/cpg_flow/inputs.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,17 +109,20 @@ def create_multicohort() -> MultiCohort:
for cohort_id in custom_cohort_ids_unique:
# get the dictionary representation of all SGs in this cohort
# dataset_id is sequencing_group_dict['sample']['project']['name']
cohort_sg_dicts = get_cohort_sgs(cohort_id)
if len(cohort_sg_dicts) == 0:
cohort_sg_dict = get_cohort_sgs(cohort_id)
cohort_name = cohort_sg_dict.get('name', cohort_id)
cohort_sgs = cohort_sg_dict.get('sequencing_groups', [])

if len(cohort_sgs) == 0:
raise MetamistError(f'Cohort {cohort_id} has no sequencing groups')

# create a new Cohort object
cohort = multicohort.create_cohort(cohort_id)
cohort = multicohort.create_cohort(id=cohort_id, name=cohort_name)

# first populate these SGs into their Datasets
# required so that the SG objects can be referenced in the collective Datasets
# SG.dataset.prefix is meaningful, to correctly store outputs in the project location
for entry in cohort_sg_dicts:
for entry in cohort_sgs:
sg_dataset = entry['sample']['project']['name']
dataset = multicohort.create_dataset(sg_dataset)

Expand Down
19 changes: 17 additions & 2 deletions src/cpg_flow/metamist.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
"""
query SGByCohortQuery($cohort_id: String!) {
cohorts(id: {eq: $cohort_id}) {
name
sequencingGroups {
id
meta
Expand Down Expand Up @@ -503,20 +504,34 @@ def parse(
return mm_seq


def get_cohort_sgs(cohort_id: str) -> list[dict]:
def get_cohort_sgs(cohort_id: str) -> dict:
"""
Retrieve sequencing group entries for a single cohort.
"""
entries = query(GET_SEQUENCING_GROUPS_BY_COHORT_QUERY, {'cohort_id': cohort_id})

# Create dictionary keying sequencing groups by project and including cohort name
# {
# "sequencing_groups": {
# project_id: [sequencing_group_1, sequencing_group_2, ...],
# ...
# },
# "name": "CohortName"
# }
if len(entries['cohorts']) != 1:
raise MetamistError('We only support one cohort at a time currently')

if entries.get('data') is None and 'errors' in entries:
message = entries['errors'][0]['message']
raise MetamistError(f'Error fetching cohort: {message}')

return entries['cohorts'][0]['sequencingGroups']
cohort_name = entries['cohorts'][0]['name']
sequencing_groups = entries['cohorts'][0]['sequencing_groups']

return {
'name': cohort_name,
'sequencing_groups': sequencing_groups,
}


def parse_reads( # pylint: disable=too-many-return-statements
Expand Down
11 changes: 6 additions & 5 deletions src/cpg_flow/targets/cohort.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,23 +40,24 @@ class Cohort(Target):
cohort.
"""

def __init__(self, name: str | None = None) -> None:
def __init__(self, id: str | None = None, name: str | None = None) -> None:
super().__init__()
self.id = id or get_config()['workflow']['dataset']
self.name = name or get_config()['workflow']['dataset']
self.analysis_dataset = Dataset(name=get_config()['workflow']['dataset'])
self._sequencing_group_by_id: dict[str, SequencingGroup] = {}

def __repr__(self):
return f'Cohort("{self.name}", {len(self._sequencing_group_by_id)} SGs)'
return f'Cohort("{self.id}", {len(self._sequencing_group_by_id)} SGs)'

@property
def target_id(self) -> str:
"""Unique target ID"""
return self.name
return self.id

def get_cohort_id(self) -> str:
"""Get the cohort ID"""
return self.name
return self.id

def write_ped_file(
self,
Expand All @@ -75,7 +76,7 @@ def write_ped_file(
),
)
if not datas:
raise ValueError(f'No pedigree data found for {self.name}')
raise ValueError(f'No pedigree data found for {self.id}')
df = pd.DataFrame(datas)

if out_path is None:
Expand Down
28 changes: 14 additions & 14 deletions src/cpg_flow/targets/multicohort.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ def __init__(self) -> None:

assert self.name, 'Ensure cohorts or dataset is defined in the config file.'

self._cohorts_by_name: dict[str, Cohort] = {}
self._cohorts_by_id: dict[str, Cohort] = {}
self._datasets_by_name: dict[str, Dataset] = {}
self.analysis_dataset = Dataset(name=get_config()['workflow']['dataset'])

Expand Down Expand Up @@ -78,7 +78,7 @@ def get_cohorts(self, only_active: bool = True) -> list['Cohort']:
Gets list of all cohorts.
Include only "active" cohorts (unless only_active is False)
"""
cohorts = list(self._cohorts_by_name.values())
cohorts = list(self._cohorts_by_id.values())
if only_active:
cohorts = [c for c in cohorts if c.active]
return cohorts
Expand All @@ -90,18 +90,18 @@ def get_cohort_ids(self, only_active: bool = True) -> list['str']:
"""
return [c.get_cohort_id() for c in self.get_cohorts(only_active)]

def get_cohort_by_name(
def get_cohort_by_id(
self,
name: str,
id: str,
only_active: bool = True,
) -> Optional['Cohort']:
"""
Get cohort by name.
Get cohort by id.
Include only "active" cohorts (unless only_active is False)
"""
cohort = self._cohorts_by_name.get(name)
cohort = self._cohorts_by_id.get(id)
if not cohort:
LOGGER.warning(f'Cohort {name} not found in the multi-cohort')
LOGGER.warning(f'Cohort {id} not found in the multi-cohort')

if not only_active: # Return cohort even if it's inactive
return cohort
Expand Down Expand Up @@ -134,16 +134,16 @@ def get_sequencing_groups(
all_sequencing_groups[sg.id] = sg
return list(all_sequencing_groups.values())

def create_cohort(self, name: str):
def create_cohort(self, id: str, name: str) -> 'Cohort':
"""
Create a cohort and add it to the multi-cohort.
"""
if name in self._cohorts_by_name:
LOGGER.debug(f'Cohort {name} already exists in the multi-cohort')
return self._cohorts_by_name[name]
if id in self._cohorts_by_id:
LOGGER.debug(f'Cohort {id} already exists in the multi-cohort')
return self._cohorts_by_id[id]

c = Cohort(name=name)
self._cohorts_by_name[c.name] = c
c = Cohort(id=id, name=name)
self._cohorts_by_id[c.id] = c
return c

def add_dataset(self, d: 'Dataset') -> 'Dataset':
Expand Down Expand Up @@ -180,7 +180,7 @@ def get_job_attrs(self) -> dict:
return {
# 'sequencing_groups': self.get_sequencing_group_ids(),
'datasets': [d.name for d in self.get_datasets()],
'cohorts': [c.name for c in self.get_cohorts()],
'cohorts': [c.id for c in self.get_cohorts()],
}

def write_ped_file(
Expand Down
4 changes: 2 additions & 2 deletions src/cpg_flow/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -241,13 +241,13 @@ def cohort_prefix(self, cohort: Cohort, category: str | None = None) -> Path:
e.g. "gs://cpg-project-main/seqr_loader/COH123", or "gs://cpg-project-main-analysis/seqr_loader/COH123"
Args:
cohort (Cohort): we pull the analysis dataset and name from this Cohort
cohort (Cohort): we pull the analysis dataset and id from this Cohort
category (str | None): sub-bucket for this project
Returns:
Path
"""
return cohort.analysis_dataset.prefix(category=category) / self.name / cohort.name
return cohort.analysis_dataset.prefix(category=category) / self.name / cohort.id

def run(
self,
Expand Down
Loading

0 comments on commit 095e1d0

Please sign in to comment.