diff --git a/README.md b/README.md index 1a3e389..061836e 100644 --- a/README.md +++ b/README.md @@ -12,7 +12,7 @@ datasets but can be generalized to store any 2-dimensional experimental data. To get started, install the package from [PyPI](https://pypi.org/project/cellarr/) -```bash +```sh pip install cellarr ## to include optional dependencies @@ -123,6 +123,52 @@ print(dataset) Check out the [documentation](https://biocpy.github.io/cellarr/tutorial.html) for more details. +### Building on HPC environments with `slurm` + +To simplify building TileDB files on HPC environments that use `slurm`, there are a few steps you need to follow. + +- Step 1: Construct a manifest file +A minimal manifest file (json) must contain the following fields +- `"files"`: A list of file path to the input `h5ad` objects. +- `"python_env"`: A set of commands to activate the Python environment containing this package and its dependencies. + +Here’s an example of the manifest file: + +```py +manifest = { + "files": your/list/of/files, + "python_env": """ +ml Miniforge3 +conda activate cellarr + +python --version +which python + """, + "matrix_options": [ + { + "matrix_name": "non_zero_cells", + "dtype": "uint32" + }, + { + "matrix_name": "pseudo_bulk_log_normed", + "dtype": "float32" + } + ], +} + +import json +json.dump(manifest, open("your/path/to/manifest.json", "w")) +``` + +For more options, check out the [README](./src/cellarr/slurm/README.md). + +- Step 2: Submit the job +Once your manifest file is ready, you can submit the necessary jobs using the `cellarr_build` CLI. Run the following command: + +```sh +cellarr_build --input-manifest your/path/to/manifest.json --output-dir your/path/to/output --memory-per-job 8 --cpus-per-task 2 +``` + ### Query a `CellArrDataset` Users have the option to reuse the `dataset` object returned when building the dataset or by creating a `CellArrDataset` object by initializing it to the path where the files were created. diff --git a/setup.cfg b/setup.cfg index a0ef370..ce6ef12 100644 --- a/setup.cfg +++ b/setup.cfg @@ -80,8 +80,8 @@ testing = [options.entry_points] # Add here console scripts like: -# console_scripts = -# script_name = cellarr.module:function +console_scripts = + cellarr_build = cellarr.slurm.build_cellarr_steps:main # For example: # console_scripts = # fibonacci = cellarr.skeleton:run diff --git a/src/cellarr/buildutils_tiledb_frame.py b/src/cellarr/buildutils_tiledb_frame.py index ae071b3..f69c2eb 100644 --- a/src/cellarr/buildutils_tiledb_frame.py +++ b/src/cellarr/buildutils_tiledb_frame.py @@ -76,7 +76,7 @@ def create_tiledb_frame_from_column_names( ) -def create_tiledb_frame_from_dataframe(tiledb_uri_path: str, frame: List[str], column_types=dict): +def create_tiledb_frame_from_dataframe(tiledb_uri_path: str, frame: List[str], column_types: dict = None): """Create a TileDB file with the provided attributes to persistent storage. This will materialize the array directory and all diff --git a/src/cellarr/slurm/README.md b/src/cellarr/slurm/README.md new file mode 100644 index 0000000..8c2f264 --- /dev/null +++ b/src/cellarr/slurm/README.md @@ -0,0 +1,65 @@ + +# manifest json + +```json +{ + "files": [ + "/path/to/dataset1.h5ad", + "/path/to/dataset2.h5ad" + ], + "matrix_options": [ + { + "matrix_name": "counts", + "dtype": "uint32" + }, + { + "matrix_name": "normalized", + "dtype": "float32" + } + ], + "gene_options": { + "feature_column": "index" + }, + "sample_options": { + "metadata": { + "sample_1": { + "condition": "control", + "batch": "1" + }, + "sample_2": { + "condition": "treatment", + "batch": "1" + } + } + }, + "cell_options": { + "column_types": { + "cell_type": "ascii", + "quality_score": "float32" + }, + }, + "python_env": """ +. /system/gredit/clientos/etc/profile + +ml Miniforge3 +conda activate biocpy_miniforge + +~/.conda/envs/biocpy_miniforge/bin/python --version +which python +python --version + """, +} +``` + + +Run + +```sh + +python build_cellarr_steps.py \ + --input-manifest manifest.json \ + --output-dir /path/to/output \ + --memory-per-job 64 \ + --cpus-per-task 4 + +``` diff --git a/src/cellarr/slurm/__init__.py b/src/cellarr/slurm/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/cellarr/slurm/build_cellarr_steps.py b/src/cellarr/slurm/build_cellarr_steps.py new file mode 100644 index 0000000..4f2d848 --- /dev/null +++ b/src/cellarr/slurm/build_cellarr_steps.py @@ -0,0 +1,319 @@ +import argparse +import json +import subprocess +from pathlib import Path +from typing import Dict, List, Optional, Tuple + +import numpy as np + +from cellarr.buildutils_tiledb_array import create_tiledb_array + +__author__ = "Jayaram Kancherla" +__copyright__ = "Jayaram Kancherla" +__license__ = "MIT" + + +class SlurmBuilder: + """SLURM-based builder for CellArrDataset.""" + + def __init__( + self, + output_dir: str, + log_dir: str, + temp_dir: str, + memory_gb: int = 64, + time_hours: int = 24, + cpus_per_task: int = 4, + ): + """Initialize the SLURM builder. + + Args: + output_dir: + Path to final output directory. + + log_dir: + Path to store SLURM logs. + + temp_dir: + Path for temporary files. + + memory_gb: + Memory per job in GB. + + time_hours: + Time limit per job in hours. + + cpus_per_task: + CPUs per task. + """ + self.output_dir = Path(output_dir) + self.log_dir = Path(log_dir) + self.temp_dir = Path(temp_dir) + self.memory_gb = memory_gb + self.time_hours = time_hours + self.cpus_per_task = cpus_per_task + + def create_slurm_script( + self, + job_name: str, + python_script: str, + args: Dict, + dependencies: Optional[str] = None, + python_env: str = "", + ) -> str: + """Create a SLURM job submission script.""" + script = f"""#!/bin/bash +#SBATCH --job-name={job_name} +#SBATCH --output={self.log_dir}/{job_name}_%j.out +#SBATCH --error={self.log_dir}/{job_name}_%j.err +#SBATCH --time={self.time_hours}:00:00 +#SBATCH --mem={self.memory_gb}G +#SBATCH --cpus-per-task={self.cpus_per_task} +""" + if dependencies: + script += f"#SBATCH --dependency={dependencies}\n" + + script += f""" +{python_env} + +python {python_script} '{json.dumps(args)}' +""" + script_path = self.log_dir / f"{job_name}_submit.sh" + with open(script_path, "w") as f: + f.write(script) + return script_path + + def create_array_script( + self, + job_name: str, + python_script: str, + args: Dict, + n_tasks: int, + dependencies: Optional[str] = None, + python_env: str = "", + ) -> str: + """Create a SLURM array job submission script.""" + script = f"""#!/bin/bash +#SBATCH --job-name={job_name} +#SBATCH --output={self.log_dir}/{job_name}_%A_%a.out +#SBATCH --error={self.log_dir}/{job_name}_%A_%a.err +#SBATCH --time={self.time_hours}:00:00 +#SBATCH --mem={self.memory_gb}G +#SBATCH --cpus-per-task={self.cpus_per_task} +#SBATCH --array=0-{n_tasks-1} +""" + if dependencies: + script += f"#SBATCH --dependency={dependencies}\n" + + script += f""" +{python_env} + +python {python_script} '{json.dumps(args)}' +""" + script_path = self.log_dir / f"{job_name}_array.sh" + with open(script_path, "w") as f: + f.write(script) + return script_path + + def submit_job(self, script_path: str) -> str: + """Submit a SLURM job and return job ID.""" + result = subprocess.run(["sbatch", script_path], capture_output=True, text=True) + return result.stdout.strip().split()[-1] + + def submit_gene_annotation_job(self, files: List[str], gene_options: Dict, python_env: str) -> str: + """Submit gene annotation processing job.""" + args = { + "files": files, + "output_dir": str(self.output_dir), + "gene_options": gene_options, + "temp_dir": str(self.temp_dir / "gene_annotation"), + } + + parent_dir = str(Path(__file__).parent) + + script_path = self.create_slurm_script( + job_name="cellarr_gene_annot", + python_script=f"{parent_dir}/process_gene_annotation.py", + args=args, + python_env=python_env, + ) + return self.submit_job(script_path) + + def submit_sample_metadata_job( + self, files: List[str], sample_options: Dict, dependency: str, python_env: str + ) -> str: + """Submit sample metadata processing job.""" + args = { + "files": files, + "output_dir": str(self.output_dir), + "sample_options": sample_options, + "temp_dir": str(self.temp_dir / "sample_metadata"), + } + + parent_dir = str(Path(__file__).parent) + + script_path = self.create_slurm_script( + job_name="cellarr_sample_meta", + python_script=f"{parent_dir}/process_sample_metadata.py", + args=args, + python_env=python_env, + # dependencies=f"afterok:{dependency}", + ) + return self.submit_job(script_path) + + def submit_cell_metadata_job(self, files: List[str], cell_options: Dict, dependency: str, python_env: str) -> str: + """Submit cell metadata processing job.""" + args = { + "files": files, + "output_dir": str(self.output_dir), + "cell_options": cell_options, + "temp_dir": str(self.temp_dir / "cell_metadata"), + } + + parent_dir = str(Path(__file__).parent) + + script_path = self.create_slurm_script( + job_name="cellarr_cell_meta", + python_script=f"{parent_dir}/process_cell_metadata.py", + args=args, + python_env=python_env, + # dependencies=f"afterok:{dependency}", + ) + return self.submit_job(script_path) + + def submit_matrix_processing( + self, files: List[str], matrix_options: Dict, dependency: str, python_env: str + ) -> Tuple[str, str]: + """Submit matrix processing as SLURM array job.""" + + # Create matrix TileDB array + matrix_uri = str(self.output_dir / "assays" / matrix_options["matrix_name"]) + create_tiledb_array( + matrix_uri, + matrix_attr_name=matrix_options.get("matrix_attr_name", "data"), + matrix_dim_dtype=np.dtype(matrix_options.get("dtype", "float32")), + ) + + # Prepare array job arguments + array_args = { + "output_dir": str(self.output_dir), + "temp_dir": str(self.temp_dir / f"matrix_{matrix_options['matrix_name']}"), + "matrix_options": matrix_options, + "gene_annotation_file": str(self.temp_dir / "gene_annotation/gene_set.json"), + "files": files, + } + + parent_dir = str(Path(__file__).parent) + + # Submit array job + array_script = self.create_array_script( + job_name=f"matrix_{matrix_options['matrix_name']}", + python_script=f"{parent_dir}/process_matrix.py", + args=array_args, + n_tasks=len(files), + dependencies=f"afterok:{dependency}", + python_env=python_env, + ) + array_job_id = self.submit_job(array_script) + + # Submit finalization job + final_args = { + "output_dir": str(self.output_dir), + "temp_dir": str(self.temp_dir / f"matrix_{matrix_options['matrix_name']}"), + "matrix_options": matrix_options, + "files": files, + } + + final_script = self.create_slurm_script( + job_name=f"finalize_matrix_{matrix_options['matrix_name']}", + python_script=f"{parent_dir}/finalize_matrix.py", + args=final_args, + dependencies=f"afterok:{array_job_id}", + python_env=python_env, + ) + final_job_id = self.submit_job(final_script) + + return array_job_id, final_job_id + + def submit_final_assembly(self, matrix_names: List[str], dependencies: List[str], python_env: str) -> str: + """Submit final assembly job.""" + args = {"input_dir": str(self.output_dir), "output_dir": str(self.output_dir), "matrix_names": matrix_names} + + parent_dir = str(Path(__file__).parent) + + script_path = self.create_slurm_script( + job_name="cellarr_final_assembly", + python_script=f"{parent_dir}/final_assembly.py", + args=args, + dependencies=f"afterok:{','.join(dependencies)}", + python_env=python_env, + ) + return self.submit_job(script_path) + + +def main(): + parser = argparse.ArgumentParser(description="Build CellArrDataset using SLURM steps") + parser.add_argument("--input-manifest", required=True, help="Path to JSON manifest file") + parser.add_argument("--output-dir", required=True, help="Base output directory") + parser.add_argument("--memory-per-job", type=int, default=64, help="Memory in GB per job") + parser.add_argument("--time-per-job", type=int, default=24, help="Time in hours per job") + parser.add_argument("--cpus-per-task", type=int, default=4, help="CPUs per task") + args = parser.parse_args() + + # Create directories + base_dir = Path(args.output_dir) + log_dir = base_dir / "logs" + temp_dir = base_dir / "temp" + final_dir = base_dir / "final" + assays_dir = base_dir / "final/assays" + + for d in [log_dir, temp_dir, final_dir, assays_dir]: + d.mkdir(parents=True, exist_ok=True) + + # Read manifest + with open(args.input_manifest) as f: + manifest = json.load(f) + + # Initialize builder + builder = SlurmBuilder( + output_dir=str(final_dir), + log_dir=str(log_dir), + temp_dir=str(temp_dir), + memory_gb=args.memory_per_job, + time_hours=args.time_per_job, + cpus_per_task=args.cpus_per_task, + ) + + # Submit jobs + gene_job_id = builder.submit_gene_annotation_job( + manifest["files"], manifest.get("gene_options", {}), manifest["python_env"] + ) + + sample_job_id = builder.submit_sample_metadata_job( + manifest["files"], manifest.get("sample_options", {}), gene_job_id, manifest["python_env"] + ) + + cell_job_id = builder.submit_cell_metadata_job( + manifest["files"], manifest.get("cell_options", {}), sample_job_id, manifest["python_env"] + ) + + # Process matrices + matrix_options = manifest.get("matrix_options", [{"matrix_name": "counts"}]) + if not isinstance(matrix_options, list): + matrix_options = [matrix_options] + + matrix_job_ids = [] + for matrix_opt in matrix_options: + _, final_id = builder.submit_matrix_processing( + manifest["files"], matrix_opt, f"{cell_job_id},{gene_job_id}", manifest["python_env"] + ) + matrix_job_ids.append(final_id) + + # Submit final assembly + builder.submit_final_assembly( + [opt["matrix_name"] for opt in matrix_options], matrix_job_ids, manifest["python_env"] + ) + + +if __name__ == "__main__": + main() diff --git a/src/cellarr/slurm/final_assembly.py b/src/cellarr/slurm/final_assembly.py new file mode 100644 index 0000000..7b0235c --- /dev/null +++ b/src/cellarr/slurm/final_assembly.py @@ -0,0 +1,29 @@ +import json +import sys + +from cellarr import CellArrDataset + +__author__ = "Jayaram Kancherla" +__copyright__ = "Jayaram Kancherla" +__license__ = "MIT" + + +def final_assembly(args_json: str): + args = json.loads(args_json) + + # Perform any final optimizations or validations + dataset = CellArrDataset(dataset_path=args["output_dir"], assay_uri=args["matrix_names"]) + + # Save final metadata + metadata = {"shape": dataset.shape, "matrices": args["matrix_names"]} + + with open(f"{args['output_dir']}/metadata.json", "w") as f: + json.dump(metadata, f) + + +if __name__ == "__main__": + if len(sys.argv) != 2: + print("Usage: python final_assembly.py ''") + sys.exit(1) + + final_assembly(sys.argv[1]) diff --git a/src/cellarr/slurm/finalize_matrix.py b/src/cellarr/slurm/finalize_matrix.py new file mode 100644 index 0000000..50de443 --- /dev/null +++ b/src/cellarr/slurm/finalize_matrix.py @@ -0,0 +1,45 @@ +import json +import sys +from pathlib import Path + +from cellarr.buildutils_tiledb_array import optimize_tiledb_array + +__author__ = "Jayaram Kancherla" +__copyright__ = "Jayaram Kancherla" +__license__ = "MIT" + + +def finalize_matrix(args_json: str): + """Finalize the matrix after all array jobs complete.""" + args = json.loads(args_json) + + # Verify all tasks completed + completed_dir = Path(args["temp_dir"]) / "completed" + expected_tasks = len(args["files"]) + completed_tasks = len(list(completed_dir.glob("task_*.json"))) + + if completed_tasks != expected_tasks: + raise RuntimeError(f"Expected {expected_tasks} tasks but only {completed_tasks} completed") + + # Optimize the TileDB array + matrix_uri = f"{args['output_dir']}/assays/{args['matrix_options']['matrix_name']}" + optimize_tiledb_array(matrix_uri) + + # Save completion metadata + with open(f"{args['temp_dir']}/matrix_metadata.json", "w") as f: + json.dump( + { + "matrix_name": args["matrix_options"]["matrix_name"], + "files_processed": len(args["files"]), + "uri": matrix_uri, + }, + f, + ) + + +if __name__ == "__main__": + if len(sys.argv) != 2: + print("Usage: python finalize_matrix.py ''") + sys.exit(1) + + finalize_matrix(sys.argv[1]) diff --git a/src/cellarr/slurm/process_cell_metadata.py b/src/cellarr/slurm/process_cell_metadata.py new file mode 100644 index 0000000..b50495e --- /dev/null +++ b/src/cellarr/slurm/process_cell_metadata.py @@ -0,0 +1,68 @@ +import json +import sys + +import pandas as pd + +from cellarr import utils_anndata as uad +from cellarr.buildutils_tiledb_frame import ( + create_tiledb_frame_from_dataframe, +) + +__author__ = "Jayaram Kancherla" +__copyright__ = "Jayaram Kancherla" +__license__ = "MIT" + + +def process_cell_metadata(args_json: str): + """Process and create cell metadata store. + + Creates cell metadata including: + - Sample mapping + - Cell indices within samples + - Original cell annotations from input files + """ + args = json.loads(args_json) + + # Extract cell metadata with specific column subset if provided + cell_meta_columns = args.get("cell_options", {}).get("column_types", {}) + files_cache = uad.extract_anndata_info( + args["files"], obs_subset_columns=list(cell_meta_columns.keys()) if cell_meta_columns else None + ) + + # Get cell counts + cell_counts = uad.scan_for_cellcounts(files_cache) + + # Create sample mapping for each cell + sample_per_cell = [] + cell_index_in_sample = [] + sample_names = [f"sample_{idx+1}" for idx in range(len(args["files"]))] + + for idx, count in enumerate(cell_counts): + sample_per_cell.extend([sample_names[idx]] * count) + cell_index_in_sample.extend(range(count)) + + # Create base cell metadata + cell_metadata = pd.DataFrame( + {"cellarr_sample": sample_per_cell, "cellarr_cell_index_in_sample": cell_index_in_sample} + ) + + # Add original cell annotations from input files + if cell_meta_columns: + original_meta = uad.scan_for_cellmetadata(files_cache) + if not original_meta.empty: + # Ensure index alignment + original_meta.reset_index(drop=True, inplace=True) + cell_metadata = pd.concat([cell_metadata, original_meta], axis=1) + + # Create TileDB store + create_tiledb_frame_from_dataframe( + f"{args['output_dir']}/cell_metadata", cell_metadata, column_types=cell_meta_columns + ) + + +if __name__ == "__main__": + if len(sys.argv) != 2: + print("Usage: python process_cell_metadata.py ''") + sys.exit(1) + + process_cell_metadata(sys.argv[1]) diff --git a/src/cellarr/slurm/process_gene_annotation.py b/src/cellarr/slurm/process_gene_annotation.py new file mode 100644 index 0000000..d44c810 --- /dev/null +++ b/src/cellarr/slurm/process_gene_annotation.py @@ -0,0 +1,45 @@ +import json +import sys +from pathlib import Path + +import pandas as pd + +from cellarr import utils_anndata as uad +from cellarr.buildutils_tiledb_frame import create_tiledb_frame_from_dataframe + +__author__ = "Jayaram Kancherla" +__copyright__ = "Jayaram Kancherla" +__license__ = "MIT" + + +def process_gene_annotation(args_json: str): + args = json.loads(args_json) + + # Extract gene information from all files + files_cache = uad.extract_anndata_info( + args["files"], var_feature_column=args.get("gene_options", {}).get("feature_column", "index") + ) + + # Scan for features + gene_set = uad.scan_for_features(files_cache) + gene_set = sorted(gene_set) + + # Create gene annotation dataframe + gene_annotation = pd.DataFrame({"cellarr_gene_index": gene_set}, index=gene_set) + gene_annotation.reset_index(drop=True, inplace=True) + + # Create TileDB store + create_tiledb_frame_from_dataframe(f"{args['output_dir']}/gene_annotation", gene_annotation) + + # Save gene set for later use + Path(args["temp_dir"]).mkdir(parents=True, exist_ok=True) + with open(f"{args['temp_dir']}/gene_set.json", "w") as f: + json.dump(gene_set, f) + + +if __name__ == "__main__": + if len(sys.argv) != 2: + print("Usage: python process_gene_annotation.py ''") + sys.exit(1) + + process_gene_annotation(sys.argv[1]) diff --git a/src/cellarr/slurm/process_matrix.py b/src/cellarr/slurm/process_matrix.py new file mode 100644 index 0000000..bb1e459 --- /dev/null +++ b/src/cellarr/slurm/process_matrix.py @@ -0,0 +1,58 @@ +import json +import os +import sys +from pathlib import Path + +import tiledb + +from cellarr import utils_anndata as uad +from cellarr.buildutils_tiledb_array import write_csr_matrix_to_tiledb + +__author__ = "Jayaram Kancherla" +__copyright__ = "Jayaram Kancherla" +__license__ = "MIT" + + +def process_matrix_file(args_json: str): + """Process a single file for matrix creation.""" + args = json.loads(args_json) + + # Get SLURM array task ID + task_id = int(os.environ.get("SLURM_ARRAY_TASK_ID", 0)) + + # Load gene set mapping + with open(args["gene_annotation_file"]) as f: + gene_set = json.load(f) + gene_map = {gene: idx for idx, gene in enumerate(gene_set)} + + # Get file and offset for this task + file_info = args["files"][task_id] + input_file = file_info + + # get sample offset + sample_uri = tiledb.open(f"{args['output_dir']}/sample_metadata", "r") + sample_row = sample_uri.df[task_id] + row_offset = sample_row["cellarr_sample_start_index"] + + # Process the file + matrix = uad.remap_anndata(input_file, gene_map, layer_matrix_name=args["matrix_options"]["matrix_name"]) + + # Write to TileDB + matrix_uri = f"{args['output_dir']}/assays/{args['matrix_options']['matrix_name']}" + write_csr_matrix_to_tiledb(matrix_uri, matrix[args["matrix_options"]["matrix_name"]], row_offset=row_offset) + + # Save task completion marker + Path(args["temp_dir"]).mkdir(parents=True, exist_ok=True) + Path(args["temp_dir"] + "/completed").mkdir(parents=True, exist_ok=True) + task_marker = Path(args["temp_dir"]) / "completed" / f"task_{task_id}.json" + task_marker.parent.mkdir(exist_ok=True) + with open(task_marker, "w") as f: + json.dump({"file": input_file, "cells_processed": matrix[args["matrix_options"]["matrix_name"]].shape[0]}, f) + + +if __name__ == "__main__": + if len(sys.argv) != 2: + print("Usage: python process_matrix.py ''") + sys.exit(1) + + process_matrix_file(sys.argv[1]) diff --git a/src/cellarr/slurm/process_matrix_all.py b/src/cellarr/slurm/process_matrix_all.py new file mode 100644 index 0000000..1f7274e --- /dev/null +++ b/src/cellarr/slurm/process_matrix_all.py @@ -0,0 +1,40 @@ +import json + +import numpy as np + +from cellarr import utils_anndata as uad +from cellarr.buildutils_tiledb_array import create_tiledb_array, write_csr_matrix_to_tiledb + +__author__ = "Jayaram Kancherla" +__copyright__ = "Jayaram Kancherla" +__license__ = "MIT" + + +def process_matrix(args_json: str): + args = json.loads(args_json) + + # Load gene set + with open(args["gene_annotation_file"]) as f: + gene_set = json.load(f) + + # Create gene set mapping + gene_map = {gene: idx for idx, gene in enumerate(gene_set)} + + # Create TileDB array + create_tiledb_array( + f"{args['output_dir']}/assays/{args['matrix_options']['matrix_name']}", + matrix_dim_dtype=np.dtype(args["matrix_options"].get("dtype", "float32")), + ) + + # Process each file + offset = 0 + for file in args["files"]: + matrix = uad.remap_anndata(file, gene_map, layer_matrix_name=args["matrix_options"]["matrix_name"]) + + write_csr_matrix_to_tiledb( + f"{args['output_dir']}/assays/{args['matrix_options']['matrix_name']}", + matrix[args["matrix_options"]["matrix_name"]], + row_offset=offset, + ) + + offset += matrix[args["matrix_options"]["matrix_name"]].shape[0] diff --git a/src/cellarr/slurm/process_sample_metadata.py b/src/cellarr/slurm/process_sample_metadata.py new file mode 100644 index 0000000..7059d1b --- /dev/null +++ b/src/cellarr/slurm/process_sample_metadata.py @@ -0,0 +1,78 @@ +import json +import sys +from pathlib import Path + +import pandas as pd + +from cellarr import utils_anndata as uad +from cellarr.buildutils_tiledb_frame import create_tiledb_frame_from_dataframe + +__author__ = "Jayaram Kancherla" +__copyright__ = "Jayaram Kancherla" +__license__ = "MIT" + + +def process_sample_metadata(args_json: str): + """Process and create sample metadata store. + + Creates sample metadata including: + - Basic sample information + - Cell counts per sample + - Original gene sets + - Sample index information + """ + args = json.loads(args_json) + + # Extract information from files + files_cache = uad.extract_anndata_info(args["files"], var_feature_column=args.get("feature_column", "index")) + + # Get cell counts for each sample + cell_counts = uad.scan_for_cellcounts(files_cache) + + # Create basic sample metadata + sample_names = [f"sample_{idx+1}" for idx in range(len(args["files"]))] + sample_metadata = pd.DataFrame( + {"cellarr_sample": sample_names, "cellarr_cell_counts": cell_counts, "cellarr_filename": args["files"]} + ) + + # Add sample indices for efficient slicing + counter = sample_metadata["cellarr_cell_counts"].shift(1) + counter.iloc[0] = 0 + sample_metadata["cellarr_sample_start_index"] = counter.cumsum().astype(int) + + # Calculate end indices + ends = sample_metadata["cellarr_sample_start_index"].shift(-1) + ends.iloc[-1] = int(sample_metadata["cellarr_cell_counts"].sum()) + ends = ends - 1 + sample_metadata["cellarr_sample_end_index"] = ends.astype(int) + + # Add original gene sets for each sample + gene_sets = uad.scan_for_features(files_cache, unique=False) + sample_metadata["cellarr_original_gene_set"] = [",".join(genes) for genes in gene_sets] + + # Add any custom metadata if provided in options + custom_metadata = args.get("sample_options", {}).get("metadata", {}) + for sample, metadata in custom_metadata.items(): + for key, value in metadata.items(): + if key not in sample_metadata.columns: + sample_metadata[key] = None + sample_idx = sample_metadata.index[sample_metadata["cellarr_sample"] == sample][0] + sample_metadata.at[sample_idx, key] = value + + # Create TileDB store + create_tiledb_frame_from_dataframe(f"{args['output_dir']}/sample_metadata", sample_metadata) + + # Save metadata for subsequent steps + metadata = {"num_samples": len(sample_names), "total_cells": int(sample_metadata["cellarr_cell_counts"].sum())} + + Path(args["temp_dir"]).mkdir(parents=True, exist_ok=True) + with open(f"{args['temp_dir']}/sample_metadata.json", "w") as f: + json.dump(metadata, f) + + +if __name__ == "__main__": + if len(sys.argv) != 2: + print("Usage: python process_sample_metadata.py ''") + sys.exit(1) + + process_sample_metadata(sys.argv[1])