Skip to content

Commit

Permalink
fix(js-connectors): process hanging on napi threadsafe functions (#4142)
Browse files Browse the repository at this point in the history
* fix(js-connectors): avoid hanging JS process on ThreadsafeFunctions passed to napi.rs

* chore: remove "process.exit(0)" from smoke test

* chore: add back minor things to test.ts

* chore: remove buggy decorator in smoke tests
  • Loading branch information
jkomyno authored Aug 16, 2023
1 parent 5f5f05f commit be8d0a6
Show file tree
Hide file tree
Showing 5 changed files with 38 additions and 48 deletions.
38 changes: 11 additions & 27 deletions query-engine/js-connectors/js/smoke-test-js/src/test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,12 @@ export async function smokeTest(db: Connector, prismaSchemaRelativePath: string)
await engine.disconnect('trace')
console.log('[nodejs] re-disconnected')

// Close the database connection. This is required to prevent the process from hanging.
// Close the database connection.
console.log('[nodejs] closing database connection...')
await db.close()
console.log('[nodejs] closed database connection')

process.exit(0)
// process.exit(0)
}

class SmokeTest {
Expand All @@ -54,8 +54,11 @@ class SmokeTest {
await this.testFindManyTypeTestPostgres()
}

@withFlavor({ only: ['mysql'] })
private async testFindManyTypeTestMySQL() {
if (this.flavour !== 'mysql') {
return
}

const resultSet = await this.engine.query(`
{
"action": "findMany",
Expand Down Expand Up @@ -92,8 +95,11 @@ class SmokeTest {
return resultSet
}

@withFlavor({ only: ['postgres'] })
private async testFindManyTypeTestPostgres() {
if (this.flavour !== 'postgres') {
return
}

const resultSet = await this.engine.query(`
{
"action": "findMany",
Expand Down Expand Up @@ -275,25 +281,3 @@ class SmokeTest {
console.log('[nodejs] resultDeleteMany', JSON.stringify(JSON.parse(resultDeleteMany), null, 2))
}
}

type WithFlavorInput
= { only: Array<Flavor>, exclude?: never }
| { exclude: Array<Flavor>, only?: never }

function withFlavor({ only, exclude }: WithFlavorInput) {
return function decorator(originalMethod: () => any, _ctx: ClassMethodDecoratorContext<SmokeTest, () => unknown>) {
return function replacement(this: SmokeTest) {
if ((exclude || []).includes(this.flavour)) {
console.log(`[nodejs::exclude] Skipping test "${originalMethod.name}" with flavour: ${this.flavour}`)
return
}

if ((only || []).length > 0 && !(only || []).includes(this.flavour)) {
console.log(`[nodejs::only] Skipping test "${originalMethod.name}" with flavour: ${this.flavour}`)
return
}

return originalMethod.call(this)
}
}
}
2 changes: 1 addition & 1 deletion query-engine/js-connectors/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,4 @@
mod error;
mod proxy;
mod queryable;
pub use queryable::JsQueryable;
pub use queryable::{from_napi, JsQueryable};
29 changes: 20 additions & 9 deletions query-engine/js-connectors/src/proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use std::sync::{Arc, Condvar, Mutex};
use crate::error::*;
use napi::bindgen_prelude::{FromNapiValue, Promise as JsPromise, ToNapiValue};
use napi::threadsafe_function::{ErrorStrategy, ThreadsafeFunction};
use napi::{JsObject, JsString};
use napi::{Env, JsObject, JsString};
use napi_derive::napi;
use quaint::connector::ResultSet as QuaintResultSet;
use quaint::Value as QuaintValue;
Expand All @@ -18,7 +18,6 @@ use chrono::{NaiveDate, NaiveTime};
/// Proxy is a struct wrapping a javascript object that exhibits basic primitives for
/// querying and executing SQL (i.e. a client connector). The Proxy uses NAPI ThreadSafeFunction to
/// invoke the code within the node runtime that implements the client connector.
#[derive(Clone)]
pub struct Proxy {
/// Execute a query given as SQL, interpolating the given parameters.
query_raw: ThreadsafeFunction<Query, ErrorStrategy::Fatal>,
Expand Down Expand Up @@ -48,21 +47,33 @@ pub struct Proxy {
}

/// Reify creates a Rust proxy to access the JS driver passed in as a parameter.
pub fn reify(js_connector: JsObject) -> napi::Result<Proxy> {
let query_raw = js_connector.get_named_property("queryRaw")?;
let execute_raw = js_connector.get_named_property("executeRaw")?;
let version = js_connector.get_named_property("version")?;
let close: ThreadsafeFunction<(), ErrorStrategy::Fatal> = js_connector.get_named_property("close")?;
let is_healthy = js_connector.get_named_property("isHealthy")?;
pub fn reify(napi_env: &Env, js_connector: JsObject) -> napi::Result<Proxy> {
let mut query_raw =
js_connector.get_named_property::<ThreadsafeFunction<Query, ErrorStrategy::Fatal>>("queryRaw")?;
let mut execute_raw =
js_connector.get_named_property::<ThreadsafeFunction<Query, ErrorStrategy::Fatal>>("executeRaw")?;
let mut version = js_connector.get_named_property::<ThreadsafeFunction<(), ErrorStrategy::Fatal>>("version")?;
let mut close = js_connector.get_named_property::<ThreadsafeFunction<(), ErrorStrategy::Fatal>>("close")?;
let mut is_healthy =
js_connector.get_named_property::<ThreadsafeFunction<(), ErrorStrategy::Fatal>>("isHealthy")?;

// Note: calling `unref` on every ThreadsafeFunction is necessary to avoid hanging the JS event loop.
query_raw.unref(napi_env)?;
execute_raw.unref(napi_env)?;
version.unref(napi_env)?;
close.unref(napi_env)?;
is_healthy.unref(napi_env)?;

let flavour: JsString = js_connector.get_named_property("flavour")?;
let flavour: String = flavour.into_utf8()?.as_str()?.to_owned();

let driver = Proxy {
query_raw,
execute_raw,
version,
close,
is_healthy,
flavour: flavour.into_utf8()?.as_str()?.to_owned(),
flavour,
};
Ok(driver)
}
Expand Down
15 changes: 5 additions & 10 deletions query-engine/js-connectors/src/queryable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use crate::{
proxy::{self, Proxy, Query},
};
use async_trait::async_trait;
use napi::JsObject;
use napi::{Env, JsObject};
use psl::datamodel_connector::Flavour;
use quaint::{
connector::IsolationLevel,
Expand All @@ -27,7 +27,6 @@ use tracing::{info_span, Instrument};
/// into a `quaint::connector::result_set::ResultSet`. A quaint `ResultSet` is basically a vector
/// of `quaint::Value` but said type is a tagged enum, with non-unit variants that cannot be converted to javascript as is.
///
#[derive(Clone)]
pub struct JsQueryable {
pub(crate) proxy: Proxy,
pub(crate) flavour: Flavour,
Expand Down Expand Up @@ -193,12 +192,8 @@ impl JsQueryable {

impl TransactionCapable for JsQueryable {}

impl From<JsObject> for JsQueryable {
fn from(driver: JsObject) -> Self {
let driver = proxy::reify(driver).unwrap();
Self {
flavour: driver.flavour.parse().unwrap(),
proxy: driver,
}
}
pub fn from_napi(napi_env: &Env, driver: JsObject) -> JsQueryable {
let driver = proxy::reify(napi_env, driver).unwrap();
let flavour = driver.flavour.parse().unwrap();
JsQueryable::new(driver, flavour)
}
2 changes: 1 addition & 1 deletion query-engine/query-engine-node-api/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ impl QueryEngine {

#[cfg(feature = "js-connectors")]
if let Some(driver) = maybe_driver {
let queryable = js_connectors::JsQueryable::from(driver);
let queryable = js_connectors::from_napi(&napi_env, driver);
match sql_connector::register_js_connector(provider_name, Arc::new(queryable)) {
Ok(_) => tracing::info!("Registered js connector for {provider_name}"),
Err(err) => tracing::error!("Failed to registered js connector for {provider_name}. {err}"),
Expand Down

0 comments on commit be8d0a6

Please sign in to comment.