From 39094875fbbd41bff54352adc2ce2b89fa1db3bd Mon Sep 17 00:00:00 2001 From: Yasset Perez-Riverol Date: Wed, 25 Sep 2024 17:03:29 +0100 Subject: [PATCH] refactoring parquet SC generation --- examples/loom2parquetchunks.py | 46 +++++++++++++++++++++++----------- 1 file changed, 32 insertions(+), 14 deletions(-) diff --git a/examples/loom2parquetchunks.py b/examples/loom2parquetchunks.py index 1c1c035..711ef7a 100644 --- a/examples/loom2parquetchunks.py +++ b/examples/loom2parquetchunks.py @@ -5,6 +5,8 @@ # import libraries import pandas as pd import loompy +import pyarrow.parquet as pq +import pyarrow as pa # define the path to the loom file loom_file = "GSE156793_S3_gene_count.loom" @@ -32,8 +34,7 @@ development_day = ds.ca["Development_day"] # make a dataframe with the sample metadata, define the columns types -sample_df = pd.DataFrame( - { +sample_df = pd.DataFrame({ "sample_id": sample_id, "cell_cluster": cell_cluster, "assay": assay, @@ -68,9 +69,11 @@ # transpose dataset and convert to parquet. # process the data per chunks. -chunk_size = 50000 -number_chunks = 50 # Number of chunks to process, if None, all chunks are processed +chunk_size = 10000 +writer = None count = 0 +number_chunks = 10 # number of chunks to process + for ix, selection, view in ds.scan(axis=1, batch_size=chunk_size): # retrieve the chunk matrix_chunk = view[:, :] @@ -102,17 +105,32 @@ # rename the index column df_chunk = df_chunk.rename(columns={"index": "sample_id"}) - # save the chunk to parquet - df_chunk.to_parquet( - f"gene_count_chunk_{ix}.parquet", - index=False, - engine="pyarrow", - compression="gzip", - ) + if writer is None: + # define the schema + schema = pa.schema( + [ + pa.field("sample_id", pa.string()), + pa.field("cell_cluster_id", pa.int8()), + pa.field("development_day", pa.int64()), + pa.field("assay_id", pa.int8()), + ] + + [pa.field(gene_id, pa.float32()) for gene_id in gene_ids] + ) + + print(len(list(df_chunk.columns))) + print(len(schema)) + + # create the parquet writer + writer = pq.ParquetWriter("GSE156793.parquet", schema, compression="snappy") + + writer.write_table(pa.Table.from_pandas(df_chunk, preserve_index=False)) print(f"Chunk {ix} saved") - count = count + 1 - # break the loop if the number of chunks is reached - if number_chunks is not None and count >= number_chunks: + count += 1 + if count >= number_chunks: break + +if writer is not None: + writer.close() + print(f"Concatenated parquet file written to GSE156793.parquet")