Skip to content

Commit

Permalink
refactoring
Browse files Browse the repository at this point in the history
  • Loading branch information
MikhailNazarov committed Oct 1, 2024
1 parent cbe569c commit e7d5016
Show file tree
Hide file tree
Showing 6 changed files with 234 additions and 154 deletions.
4 changes: 2 additions & 2 deletions example/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
init_logs();
let connection_string = env::var("YDB_CONNECTION_STRING").unwrap_or_else(|_| "grpc://localhost:2136?database=/local".to_string());

let pool = YdbPoolOptions::new()
let pool = YdbPoolOptions::new()
.connect(&connection_string).await?;
let row: (i32,) = sqlx::query_as("SELECT 1+1").fetch_one(&pool).await?;
assert_eq!(row.0, 2);
Expand All @@ -19,7 +19,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
.await?;

let test_user_info = UserInfo {
id: 4u64,
id: 7u64,
name: "test".to_string(),
age: 32u8,
description: None
Expand Down
2 changes: 1 addition & 1 deletion ydb-sqlx/src/connection/connection_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,6 @@ impl YdbConnection {
timeout(options.connection_timeout, client.wait()).await??;


Ok(YdbConnection { client, transaction: None })
Ok(YdbConnection { client, transaction: None, log_settings: options.log_settings.clone() })
}
}
277 changes: 144 additions & 133 deletions ydb-sqlx/src/connection/executor.rs
Original file line number Diff line number Diff line change
@@ -1,131 +1,60 @@
use std::collections::HashMap;

use futures::future::BoxFuture;
use futures::pin_mut;
use futures::Stream;
use futures::TryStreamExt;
use futures_core::stream::BoxStream;

use futures::StreamExt;
use itertools::Either;
use rustring_builder::StringBuilder;
use sqlx_core::executor::Execute;
use sqlx_core::executor::Executor;

use sqlx_core::logger::QueryLogger;
use sqlx_core::try_stream;
use sqlx_core::Error;
use ydb::Query;
use ydb::YdbOrCustomerError;

use crate::error::err_ydb_or_customer_to_sqlx;

use crate::error::err_ydb_to_sqlx;
use crate::query::build_query;
use crate::query::ParsedQuery;
use crate::statement::YdbStatement;
use crate::typeinfo::YdbTypeInfo;
use crate::{database::Ydb, query::YdbQueryResult, row::YdbRow};
use sqlx_core::describe::Describe;

use super::YdbConnection;

fn build_query<'q, E: 'q>(mut query: E) -> Result<Query, Error>
where
E: Execute<'q, Ydb>,
{
let mut sb = StringBuilder::new();
let mut params = HashMap::new();

if let Some(arguments) = query.take_arguments()
.map_err(|e| sqlx_core::error::Error::AnyDriverError(e))? {
for arg in arguments.into_iter() {
arg.declare(&mut sb);
arg.add_to_params(&mut params);
}
sb.append_line("");
}

sb.append(query.sql());
impl YdbConnection {

let sql = sb.to_string();
//info!("{}", sql);
pub(crate) async fn run<'e, 'c: 'e, 'q: 'e>( &'c mut self,
query: ParsedQuery
) -> Result<impl Stream<Item = Result<Either<YdbQueryResult, YdbRow>, Error>> + 'e, Error> {

let mut query = Query::new(sql);
if !params.is_empty() {
//info!("PARAMS: {:?}", params);
query = query.with_params(params);
}

Ok(query)
}

// impl YdbConnection{
// pub(crate) async fn execute_scheme<Q: Into<String>>(&mut self, query: Q) -> Result<(), Error> {

// self.client.table_client().retry_execute_scheme_query(query).await.map_err(|e| err_ydb_to_sqlx(e))?;

// Ok(())
// }
// }

impl<'c> Executor<'c> for &'c mut YdbConnection {
type Database = Ydb;

fn execute<'e, 'q: 'e, E: 'q>(
self,
query: E,
) -> BoxFuture<'e, Result<YdbQueryResult, Error>>
where
'c: 'e,
E: Execute<'q, Self::Database>, {

Box::pin(async move{
//debug!("{}",query.sql());
let query = build_query(query)?;
let _result = if let Some(tr) = &mut self.transaction {

tr.query(query.clone()).await.map_err(|e| err_ydb_to_sqlx(e))?

}else{
self.client
.table_client()
.retry_transaction(|t| async {
let mut t = t;
let result = t.query(query.clone()).await?;
t.commit().await?;
Ok(result)
})
.await.map_err(|e| err_ydb_or_customer_to_sqlx(e))?
};

Ok(YdbQueryResult{
rows_affected: 0 //todo!
})
})

}
let mut logger = QueryLogger::new(query.sql(), self.log_settings.clone());
let query: ydb::Query = query.clone().into();


fn fetch_many<'e, 'q: 'e, E: 'q>(
self,
query: E,
) -> BoxStream<'e, Result<sqlx_core::Either<YdbQueryResult, YdbRow>, Error>>
where
'c: 'e,
E: Execute<'q, Ydb>,
{

let result = Box::pin(async move {
let query = build_query(query)?;

if let Some(tr) = &mut self.transaction {

let result = tr.query(query.clone()).await.map_err(|e| err_ydb_to_sqlx(e))?;
Ok(Some(result.into_results()))
} else {

self.client
.table_client()
.retry_transaction(|t| async {
let mut t = t;
let result = t.query(query.clone()).await?;
t.commit().await?;
Ok(Some(result.into_results()))
})
.await
.map_err(|e| err_ydb_or_customer_to_sqlx(e))
let result = tr.query(query).await.map_err(|e| err_ydb_to_sqlx(e))?;
let results = result.into_results();
//todo: get rows and call increase_rows_affected
//logger.increase_rows_affected(rows_affected);
Ok(Some(results))
} else {
self.client
.table_client()
.retry_transaction(|t| async {
let mut t = t;
let result = t.query(query.clone()).await?;
t.commit().await?;
//todo: get rows and call increase_rows_affected
//logger.increase_rows_affected(rows_affected);
Ok(Some(result.into_results()))
})
.await
.map_err(|e| err_ydb_or_customer_to_sqlx(e))
}
});
let stream = futures::stream::once(result)
Expand All @@ -149,12 +78,74 @@ impl<'c> Executor<'c> for &'c mut YdbConnection {
Err(e) => Err(e),
})
.chain(err);


futures::stream::iter(rows)
})
.flatten();

Box::pin(stream)
Ok(Box::pin(stream))
}
}



impl<'c> Executor<'c> for &'c mut YdbConnection {
type Database = Ydb;

// fn execute<'e, 'q: 'e, E: 'q>(
// self,
// query: E,
// ) -> BoxFuture<'e, Result<YdbQueryResult, Error>>
// where
// 'c: 'e,
// E: Execute<'q, Self::Database>, {

// Box::pin(async move{
// //debug!("{}",query.sql());
// let query = build_query(query)?;
// let _result = if let Some(tr) = &mut self.transaction {

// tr.query(query.clone()).await.map_err(|e| err_ydb_to_sqlx(e))?

// }else{
// self.client
// .table_client()
// .retry_transaction(|t| async {
// let mut t = t;
// let result = t.query(query.clone()).await?;
// t.commit().await?;
// Ok(result)
// })
// .await.map_err(|e| err_ydb_or_customer_to_sqlx(e))?
// };

// Ok(YdbQueryResult{
// rows_affected: 0 //todo!
// })
// })

// }

fn fetch_many<'e, 'q: 'e, E: 'q>(
self,
query: E,
) -> BoxStream<'e, Result<sqlx_core::Either<YdbQueryResult, YdbRow>, Error>>
where
'c: 'e,
E: Execute<'q, Ydb>,
{

Box::pin(try_stream! {
let query = build_query(query)?;
let s = self.run(query).await?;
pin_mut!(s);

while let Some(v) = s.try_next().await? {
r#yield!(v);
}
Ok(())
})
}

fn fetch_optional<'e, 'q: 'e, E: 'q>(
Expand All @@ -165,38 +156,58 @@ impl<'c> Executor<'c> for &'c mut YdbConnection {
'c: 'e,
E: Execute<'q, Ydb>,
{

Box::pin(async move {
let query = build_query(query)?;
if let Some(tr) = &mut self.transaction {
let result = tr.query(query.clone()).await
.map_err(|e| err_ydb_to_sqlx(e))?;

if let Some(row) = result.into_only_row().ok() {
let row = YdbRow::from(row)?;
Ok(Some(row))
} else {
Ok(None)
}
}else{
self.client
.table_client()
.retry_transaction(|t| async {
//YdbRow::from(row)
let mut t = t;
let result = t.query(query.clone()).await?;
t.commit().await?;
if let Some(row) = result.into_only_row().ok() {
let row = YdbRow::from(row).map_err(|e| YdbOrCustomerError::from_err(e))?;
Ok(Some(row))
} else {
Ok(None)
}
})
.await
.map_err(|e| err_ydb_or_customer_to_sqlx(e))
let s = self.run(query).await?;
pin_mut!(s);

// With deferred constraints we need to check all responses as we
// could get a OK response (with uncommitted data), only to get an
// error response after (when the deferred constraint is actually
// checked).
let mut ret = None;
while let Some(result) = s.try_next().await? {
match result {
Either::Right(r) if ret.is_none() => ret = Some(r),
_ => {}
}
}

Ok(ret)
})

// Box::pin(async move {
// let query = build_query(query)?;
// if let Some(tr) = &mut self.transaction {
// let result = tr.query(query.clone()).await
// .map_err(|e| err_ydb_to_sqlx(e))?;

// if let Some(row) = result.into_only_row().ok() {
// let row = YdbRow::from(row)?;
// Ok(Some(row))
// } else {
// Ok(None)
// }
// }else{
// self.client
// .table_client()
// .retry_transaction(|t| async {
// //YdbRow::from(row)
// let mut t = t;
// let result = t.query(query.clone()).await?;
// t.commit().await?;
// if let Some(row) = result.into_only_row().ok() {
// let row = YdbRow::from(row).map_err(|e| YdbOrCustomerError::from_err(e))?;
// Ok(Some(row))
// } else {
// Ok(None)
// }
// })
// .await
// .map_err(|e| err_ydb_or_customer_to_sqlx(e))
// }

// })
}

fn prepare_with<'e, 'q: 'e>(
Expand Down
Loading

0 comments on commit e7d5016

Please sign in to comment.