Skip to content

Commit

Permalink
perf: cache mappers, avoid allocations, codons are arrays (#172)
Browse files Browse the repository at this point in the history
* fix: use array instead of Vec for Codon representation

* don't allocate until necessary

* reduce usage of lazy_static, use consts instead where possible, copy instead of reference lookup tables

* cache calls to alignment::Mapper::new

* rely on seqrepo-rs with cloneable Error

* fmt

* implement data_version and schema_version for mock Provider in tests

* update seqrepo to v0.10.2

* shorter Arc::new error wrapping

* clippy lints

* clippy lints

* do not run doctests for sequence.rs generators

* workaround ignore in doctest by using stringify, see rust-lang/rust/issues/87586
  • Loading branch information
tedil authored Jun 10, 2024
1 parent 5c13324 commit 34acaf4
Show file tree
Hide file tree
Showing 13 changed files with 872 additions and 759 deletions.
3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,14 @@ postgres = { version = "0.19", features = ["with-chrono-0_4"] }
quick_cache = "0.5"
regex = "1.7"
rustc-hash = "1.1"
seqrepo = { version = "0.10", features = ["cached"] }
seqrepo = { version = "0.10.2", features = ["cached"] }
serde_json = "1.0"
serde = { version = "1.0", features = ["derive"] }
thiserror = "1.0"
indexmap = { version = "2", features = ["serde"] }
biocommons-bioutils = "0.1.0"
ahash = "0.8.11"
cached = "0.50.0"

[dev-dependencies]
anyhow = "1.0"
Expand Down
10 changes: 3 additions & 7 deletions src/data/cdot/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -766,13 +766,9 @@ impl TxProvider {
.as_ref()
.expect("cannot happen; genes without map_location are not imported")
.clone(),
descr: gene
.description
.as_ref()
.map(Clone::clone)
.unwrap_or_default(),
summary: gene.summary.as_ref().map(Clone::clone).unwrap_or_default(),
aliases: gene.aliases.as_ref().map(Clone::clone).unwrap_or_default(),
descr: gene.description.clone().unwrap_or_default(),
summary: gene.summary.clone().unwrap_or_default(),
aliases: gene.aliases.clone().unwrap_or_default(),
added: NaiveDateTime::default(),
})
}
Expand Down
5 changes: 3 additions & 2 deletions src/data/error.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
//! Error type definition.
use std::sync::Arc;
use thiserror::Error;

/// Error type for data.
#[derive(Error, Debug)]
#[derive(Error, Debug, Clone)]
pub enum Error {
#[error("UTA Postgres access error")]
UtaPostgresError(#[from] postgres::Error),
UtaPostgresError(#[from] Arc<postgres::Error>),
#[error("sequence operation failed")]
SequenceOperationFailed(#[from] crate::sequences::Error),
#[error("problem with seqrepo access")]
Expand Down
146 changes: 80 additions & 66 deletions src/data/uta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use indexmap::IndexMap;
use postgres::{Client, NoTls, Row};
use quick_cache::sync::Cache;
use std::fmt::Debug;
use std::sync::Mutex;
use std::sync::{Arc, Mutex};

use crate::sequences::{seq_md5, TranslationTable};
use biocommons_bioutils::assemblies::{Assembly, ASSEMBLY_INFOS};
Expand Down Expand Up @@ -42,15 +42,15 @@ impl TryFrom<Row> for GeneInfoRecord {
type Error = Error;

fn try_from(row: Row) -> Result<Self, Self::Error> {
let aliases: String = row.try_get("aliases")?;
let aliases: String = row.try_get("aliases").map_err(Arc::new)?;
let aliases = aliases.split(',').map(|s| s.to_owned()).collect::<Vec<_>>();
Ok(Self {
hgnc: row.try_get("hgnc")?,
maploc: row.try_get("maploc")?,
descr: row.try_get("descr")?,
summary: row.try_get("summary")?,
hgnc: row.try_get("hgnc").map_err(Arc::new)?,
maploc: row.try_get("maploc").map_err(Arc::new)?,
descr: row.try_get("descr").map_err(Arc::new)?,
summary: row.try_get("summary").map_err(Arc::new)?,
aliases,
added: row.try_get("added")?,
added: row.try_get("added").map_err(Arc::new)?,
})
}
}
Expand All @@ -60,8 +60,8 @@ impl TryFrom<Row> for TxSimilarityRecord {

fn try_from(row: Row) -> Result<Self, Self::Error> {
Ok(Self {
tx_ac1: row.try_get("tx_ac1")?,
tx_ac2: row.try_get("tx_ac2")?,
tx_ac1: row.try_get("tx_ac1").map_err(Arc::new)?,
tx_ac2: row.try_get("tx_ac2").map_err(Arc::new)?,
hgnc_eq: row.try_get("hgnc_eq").unwrap_or(false),
cds_eq: row.try_get("cds_eq").unwrap_or(false),
es_fp_eq: row.try_get("es_fp_eq").unwrap_or(false),
Expand All @@ -76,24 +76,24 @@ impl TryFrom<Row> for TxExonsRecord {

fn try_from(row: Row) -> Result<Self, Self::Error> {
Ok(Self {
hgnc: row.try_get("hgnc")?,
tx_ac: row.try_get("tx_ac")?,
alt_ac: row.try_get("alt_ac")?,
alt_aln_method: row.try_get("alt_aln_method")?,
alt_strand: row.try_get("alt_strand")?,
ord: row.try_get("ord")?,
tx_start_i: row.try_get("tx_start_i")?,
tx_end_i: row.try_get("tx_end_i")?,
alt_start_i: row.try_get("alt_start_i")?,
alt_end_i: row.try_get("alt_end_i")?,
cigar: row.try_get("cigar")?,
tx_aseq: row.try_get("tx_aseq")?,
alt_aseq: row.try_get("alt_aseq")?,
tx_exon_set_id: row.try_get("tx_exon_set_id")?,
alt_exon_set_id: row.try_get("alt_exon_set_id")?,
tx_exon_id: row.try_get("tx_exon_id")?,
alt_exon_id: row.try_get("alt_exon_id")?,
exon_aln_id: row.try_get("exon_aln_id")?,
hgnc: row.try_get("hgnc").map_err(Arc::new)?,
tx_ac: row.try_get("tx_ac").map_err(Arc::new)?,
alt_ac: row.try_get("alt_ac").map_err(Arc::new)?,
alt_aln_method: row.try_get("alt_aln_method").map_err(Arc::new)?,
alt_strand: row.try_get("alt_strand").map_err(Arc::new)?,
ord: row.try_get("ord").map_err(Arc::new)?,
tx_start_i: row.try_get("tx_start_i").map_err(Arc::new)?,
tx_end_i: row.try_get("tx_end_i").map_err(Arc::new)?,
alt_start_i: row.try_get("alt_start_i").map_err(Arc::new)?,
alt_end_i: row.try_get("alt_end_i").map_err(Arc::new)?,
cigar: row.try_get("cigar").map_err(Arc::new)?,
tx_aseq: row.try_get("tx_aseq").map_err(Arc::new)?,
alt_aseq: row.try_get("alt_aseq").map_err(Arc::new)?,
tx_exon_set_id: row.try_get("tx_exon_set_id").map_err(Arc::new)?,
alt_exon_set_id: row.try_get("alt_exon_set_id").map_err(Arc::new)?,
tx_exon_id: row.try_get("tx_exon_id").map_err(Arc::new)?,
alt_exon_id: row.try_get("alt_exon_id").map_err(Arc::new)?,
exon_aln_id: row.try_get("exon_aln_id").map_err(Arc::new)?,
})
}
}
Expand All @@ -103,12 +103,12 @@ impl TryFrom<Row> for TxForRegionRecord {

fn try_from(row: Row) -> Result<Self, Self::Error> {
Ok(Self {
tx_ac: row.try_get("tx_ac")?,
alt_ac: row.try_get("alt_ac")?,
alt_strand: row.try_get("alt_strand")?,
alt_aln_method: row.try_get("alt_aln_method")?,
start_i: row.try_get("start_i")?,
end_i: row.try_get("end_i")?,
tx_ac: row.try_get("tx_ac").map_err(Arc::new)?,
alt_ac: row.try_get("alt_ac").map_err(Arc::new)?,
alt_strand: row.try_get("alt_strand").map_err(Arc::new)?,
alt_aln_method: row.try_get("alt_aln_method").map_err(Arc::new)?,
start_i: row.try_get("start_i").map_err(Arc::new)?,
end_i: row.try_get("end_i").map_err(Arc::new)?,
})
}
}
Expand All @@ -126,15 +126,15 @@ impl TryFrom<Row> for TxIdentityInfo {
type Error = Error;

fn try_from(row: Row) -> Result<Self, Self::Error> {
let hgnc = row.try_get("hgnc")?;
let hgnc = row.try_get("hgnc").map_err(Arc::new)?;
let is_selenoprotein = SELENOPROTEIN_SYMBOLS.contains(&hgnc);
Ok(Self {
tx_ac: row.try_get("tx_ac")?,
alt_ac: row.try_get("alt_ac")?,
alt_aln_method: row.try_get("alt_aln_method")?,
cds_start_i: row.try_get("cds_start_i")?,
cds_end_i: row.try_get("cds_end_i")?,
lengths: row.try_get("lengths")?,
tx_ac: row.try_get("tx_ac").map_err(Arc::new)?,
alt_ac: row.try_get("alt_ac").map_err(Arc::new)?,
alt_aln_method: row.try_get("alt_aln_method").map_err(Arc::new)?,
cds_start_i: row.try_get("cds_start_i").map_err(Arc::new)?,
cds_end_i: row.try_get("cds_end_i").map_err(Arc::new)?,
lengths: row.try_get("lengths").map_err(Arc::new)?,
hgnc: hgnc.to_string(),
// UTA database does not support selenoproteins (yet).
translation_table: if is_selenoprotein {
Expand All @@ -151,12 +151,12 @@ impl TryFrom<Row> for TxInfoRecord {

fn try_from(row: Row) -> Result<Self, Self::Error> {
Ok(Self {
hgnc: row.try_get("hgnc")?,
cds_start_i: row.try_get("cds_start_i")?,
cds_end_i: row.try_get("cds_end_i")?,
tx_ac: row.try_get("tx_ac")?,
alt_ac: row.try_get("alt_ac")?,
alt_aln_method: row.try_get("alt_aln_method")?,
hgnc: row.try_get("hgnc").map_err(Arc::new)?,
cds_start_i: row.try_get("cds_start_i").map_err(Arc::new)?,
cds_end_i: row.try_get("cds_end_i").map_err(Arc::new)?,
tx_ac: row.try_get("tx_ac").map_err(Arc::new)?,
alt_ac: row.try_get("alt_ac").map_err(Arc::new)?,
alt_aln_method: row.try_get("alt_aln_method").map_err(Arc::new)?,
})
}
}
Expand All @@ -166,9 +166,9 @@ impl TryFrom<Row> for TxMappingOptionsRecord {

fn try_from(row: Row) -> Result<Self, Self::Error> {
Ok(Self {
tx_ac: row.try_get("tx_ac")?,
alt_ac: row.try_get("alt_ac")?,
alt_aln_method: row.try_get("alt_aln_method")?,
tx_ac: row.try_get("tx_ac").map_err(Arc::new)?,
alt_ac: row.try_get("alt_ac").map_err(Arc::new)?,
alt_aln_method: row.try_get("alt_aln_method").map_err(Arc::new)?,
})
}
}
Expand Down Expand Up @@ -231,7 +231,7 @@ impl Debug for Provider {
impl Provider {
pub fn with_config(config: &Config) -> Result<Self, Error> {
let config = config.clone();
let conn = Mutex::new(Client::connect(&config.db_url, NoTls)?);
let conn = Mutex::new(Client::connect(&config.db_url, NoTls).map_err(Arc::new)?);
let schema_version = Self::fetch_schema_version(
&mut conn.lock().expect("cannot obtain connection lock"),
&config.db_schema,
Expand All @@ -246,7 +246,7 @@ impl Provider {

fn fetch_schema_version(conn: &mut Client, db_schema: &str) -> Result<String, Error> {
let sql = format!("select key, value from {db_schema}.meta where key = 'schema_version'");
let row = conn.query_one(&sql, &[])?;
let row = conn.query_one(&sql, &[]).map_err(Arc::new)?;
Ok(row.get("value"))
}
}
Expand Down Expand Up @@ -282,7 +282,8 @@ impl interface::Provider for Provider {
.conn
.lock()
.expect("cannot obtain connection lock")
.query_one(&sql, &[&hgnc])?
.query_one(&sql, &[&hgnc])
.map_err(Arc::new)?
.try_into()?;

self.caches
Expand All @@ -305,11 +306,12 @@ impl interface::Provider for Provider {
.conn
.lock()
.expect("cannot obtain connection lock")
.query(&sql, &[&tx_ac])?)
.query(&sql, &[&tx_ac])
.map_err(Arc::new)?)
.into_iter()
.next()
{
let result = Some(row.try_get("pro_ac")?);
let result = Some(row.try_get("pro_ac").map_err(Arc::new)?);
self.caches
.get_pro_ac_for_tx_ac
.insert(tx_ac.to_string(), None);
Expand Down Expand Up @@ -337,8 +339,10 @@ impl interface::Provider for Provider {
.conn
.lock()
.expect("cannot obtain connection lock")
.query_one(&sql, &[&ac])?
.try_get("seq_id")?;
.query_one(&sql, &[&ac])
.map_err(Arc::new)?
.try_get("seq_id")
.map_err(Arc::new)?;

let sql = format!(
"SELECT seq FROM {}.seq WHERE seq_id = $1",
Expand All @@ -348,8 +352,10 @@ impl interface::Provider for Provider {
.conn
.lock()
.expect("cannot obtain connection lock")
.query_one(&sql, &[&seq_id])?
.try_get("seq")?;
.query_one(&sql, &[&seq_id])
.map_err(Arc::new)?
.try_get("seq")
.map_err(Arc::new)?;

let begin = begin.unwrap_or_default();
let end = end
Expand All @@ -373,7 +379,8 @@ impl interface::Provider for Provider {
.conn
.lock()
.expect("cannot obtain connection lock")
.query(&sql, &[&md5])?
.query(&sql, &[&md5])
.map_err(Arc::new)?
{
result.push(row.get(0));
}
Expand Down Expand Up @@ -402,7 +409,8 @@ impl interface::Provider for Provider {
.conn
.lock()
.expect("cannot obtain connection lock")
.query(&sql, &[&tx_ac])?
.query(&sql, &[&tx_ac])
.map_err(Arc::new)?
{
result.push(row.try_into()?);
}
Expand Down Expand Up @@ -439,7 +447,8 @@ impl interface::Provider for Provider {
.conn
.lock()
.expect("cannot obtain connection lock")
.query(&sql, &[&tx_ac, &alt_ac, &alt_aln_method])?
.query(&sql, &[&tx_ac, &alt_ac, &alt_aln_method])
.map_err(Arc::new)?
{
result.push(row.try_into()?);
}
Expand Down Expand Up @@ -473,7 +482,8 @@ impl interface::Provider for Provider {
.conn
.lock()
.expect("cannot obtain connection lock")
.query(&sql, &[&gene])?
.query(&sql, &[&gene])
.map_err(Arc::new)?
{
result.push(row.try_into()?);
}
Expand Down Expand Up @@ -517,7 +527,8 @@ impl interface::Provider for Provider {
.conn
.lock()
.expect("cannot obtain connection lock")
.query(&sql, &[&alt_ac, &start_i, &end_i])?
.query(&sql, &[&alt_ac, &start_i, &end_i])
.map_err(Arc::new)?
{
let record: TxForRegionRecord = row.try_into()?;
// NB: The original Python code did not use alt_aln_method in the query either.
Expand Down Expand Up @@ -547,7 +558,8 @@ impl interface::Provider for Provider {
.conn
.lock()
.expect("cannot obtain connection lock")
.query_one(&sql, &[&tx_ac])?
.query_one(&sql, &[&tx_ac])
.map_err(Arc::new)?
.try_into()?;

self.caches
Expand Down Expand Up @@ -583,7 +595,8 @@ impl interface::Provider for Provider {
.conn
.lock()
.expect("cannot obtain connection lock")
.query_one(&sql, &[&tx_ac, &alt_ac, &alt_aln_method])?
.query_one(&sql, &[&tx_ac, &alt_ac, &alt_aln_method])
.map_err(Arc::new)?
.try_into()?;

self.caches.get_tx_info.insert(key, result.clone());
Expand All @@ -607,7 +620,8 @@ impl interface::Provider for Provider {
.conn
.lock()
.expect("cannot obtain connection lock")
.query(&sql, &[&tx_ac])?
.query(&sql, &[&tx_ac])
.map_err(Arc::new)?
{
result.push(row.try_into()?);
}
Expand Down
Loading

0 comments on commit 34acaf4

Please sign in to comment.