From db802c1b8729d57a04ec1fb7d1919075ba514e84 Mon Sep 17 00:00:00 2001 From: Negezor Date: Sun, 31 Mar 2024 16:02:42 +1100 Subject: [PATCH 1/2] refactor: use async_trait directly over impl to respect the convention on the project. --- klickhouse/src/migrate.rs | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/klickhouse/src/migrate.rs b/klickhouse/src/migrate.rs index 705b1d8..10a853f 100644 --- a/klickhouse/src/migrate.rs +++ b/klickhouse/src/migrate.rs @@ -1,5 +1,4 @@ use crate::{query_parser, ClickhouseLock, FromSql}; -use async_trait::async_trait; use refinery_core::traits::r#async::{AsyncMigrate, AsyncQuery, AsyncTransaction}; use refinery_core::Migration; use std::borrow::Cow; @@ -173,7 +172,7 @@ impl Row for Migration { } } -#[async_trait] +#[async_trait::async_trait] impl AsyncTransaction for Client { type Error = KlickhouseError; @@ -206,7 +205,7 @@ impl AsyncTransaction for Client { } } -#[async_trait] +#[async_trait::async_trait] impl AsyncQuery> for Client { async fn query( &mut self, @@ -248,7 +247,7 @@ impl ClusterMigration { } } -#[async_trait] +#[async_trait::async_trait] impl AsyncTransaction for ClusterMigration { type Error = KlickhouseError; @@ -276,7 +275,7 @@ impl AsyncTransaction for ClusterMigration { } } -#[async_trait] +#[async_trait::async_trait] impl AsyncQuery> for ClusterMigration { async fn query( &mut self, From d609de59c33d6c4ed820de7d854d032766cee368 Mon Sep 17 00:00:00 2001 From: Negezor Date: Sun, 31 Mar 2024 16:45:32 +1100 Subject: [PATCH 2/2] refactor: use impl future instead of async-trait in most traits --- klickhouse/src/io.rs | 23 +- klickhouse/src/types/deserialize/array.rs | 1 - klickhouse/src/types/deserialize/geo.rs | 1 - .../src/types/deserialize/low_cardinality.rs | 1 - klickhouse/src/types/deserialize/map.rs | 1 - klickhouse/src/types/deserialize/nullable.rs | 1 - klickhouse/src/types/deserialize/sized.rs | 1 - klickhouse/src/types/deserialize/string.rs | 1 - klickhouse/src/types/deserialize/tuple.rs | 1 - klickhouse/src/types/mod.rs | 481 ++++++++++-------- klickhouse/src/types/serialize/array.rs | 1 - klickhouse/src/types/serialize/geo.rs | 1 - .../src/types/serialize/low_cardinality.rs | 1 - klickhouse/src/types/serialize/map.rs | 1 - klickhouse/src/types/serialize/nullable.rs | 1 - klickhouse/src/types/serialize/sized.rs | 1 - klickhouse/src/types/serialize/string.rs | 1 - klickhouse/src/types/serialize/tuple.rs | 1 - 18 files changed, 269 insertions(+), 251 deletions(-) diff --git a/klickhouse/src/io.rs b/klickhouse/src/io.rs index 4c024c9..ec2d527 100644 --- a/klickhouse/src/io.rs +++ b/klickhouse/src/io.rs @@ -1,20 +1,20 @@ -use crate::{KlickhouseError, Result}; +use futures::Future; use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt}; +use crate::{KlickhouseError, Result}; + use crate::protocol::MAX_STRING_SIZE; -#[async_trait::async_trait] pub trait ClickhouseRead: AsyncRead + Unpin + Send + Sync { - async fn read_var_uint(&mut self) -> Result; + fn read_var_uint(&mut self) -> impl Future> + Send; - async fn read_string(&mut self) -> Result>; + fn read_string(&mut self) -> impl Future>> + Send; - async fn read_utf8_string(&mut self) -> Result { - Ok(String::from_utf8(self.read_string().await?)?) + fn read_utf8_string(&mut self) -> impl Future> + Send { + async { Ok(String::from_utf8(self.read_string().await?)?) } } } -#[async_trait::async_trait] impl ClickhouseRead for T { async fn read_var_uint(&mut self) -> Result { let mut out = 0u64; @@ -50,14 +50,15 @@ impl ClickhouseRead for T { } } -#[async_trait::async_trait] pub trait ClickhouseWrite: AsyncWrite + Unpin + Send + Sync + 'static { - async fn write_var_uint(&mut self, value: u64) -> Result<()>; + fn write_var_uint(&mut self, value: u64) -> impl Future> + Send; - async fn write_string(&mut self, value: impl AsRef<[u8]> + Send) -> Result<()>; + fn write_string( + &mut self, + value: impl AsRef<[u8]> + Send, + ) -> impl Future> + Send; } -#[async_trait::async_trait] impl ClickhouseWrite for T { async fn write_var_uint(&mut self, mut value: u64) -> Result<()> { for _ in 0..9u64 { diff --git a/klickhouse/src/types/deserialize/array.rs b/klickhouse/src/types/deserialize/array.rs index 9876fe6..3400341 100644 --- a/klickhouse/src/types/deserialize/array.rs +++ b/klickhouse/src/types/deserialize/array.rs @@ -30,7 +30,6 @@ impl ArrayDeserializerGeneric for ArrayDeserializer { } } -#[async_trait::async_trait] impl Deserializer for T { async fn read_prefix( type_: &Type, diff --git a/klickhouse/src/types/deserialize/geo.rs b/klickhouse/src/types/deserialize/geo.rs index a1a4525..d5f150d 100644 --- a/klickhouse/src/types/deserialize/geo.rs +++ b/klickhouse/src/types/deserialize/geo.rs @@ -6,7 +6,6 @@ use crate::values; pub struct PointDeserializer; -#[async_trait::async_trait] impl Deserializer for PointDeserializer { async fn read_prefix( _type_: &Type, diff --git a/klickhouse/src/types/deserialize/low_cardinality.rs b/klickhouse/src/types/deserialize/low_cardinality.rs index 36fa2ca..21b57f2 100644 --- a/klickhouse/src/types/deserialize/low_cardinality.rs +++ b/klickhouse/src/types/deserialize/low_cardinality.rs @@ -8,7 +8,6 @@ use crate::types::low_cardinality::*; pub struct LowCardinalityDeserializer; -#[async_trait::async_trait] impl Deserializer for LowCardinalityDeserializer { async fn read_prefix( _type_: &Type, diff --git a/klickhouse/src/types/deserialize/map.rs b/klickhouse/src/types/deserialize/map.rs index a90408e..4cade84 100644 --- a/klickhouse/src/types/deserialize/map.rs +++ b/klickhouse/src/types/deserialize/map.rs @@ -8,7 +8,6 @@ use super::{Deserializer, DeserializerState, Type}; pub struct MapDeserializer; -#[async_trait::async_trait] impl Deserializer for MapDeserializer { async fn read_prefix( type_: &Type, diff --git a/klickhouse/src/types/deserialize/nullable.rs b/klickhouse/src/types/deserialize/nullable.rs index cf00ee9..c73fa67 100644 --- a/klickhouse/src/types/deserialize/nullable.rs +++ b/klickhouse/src/types/deserialize/nullable.rs @@ -6,7 +6,6 @@ use super::{Deserializer, DeserializerState, Type}; pub struct NullableDeserializer; -#[async_trait::async_trait] impl Deserializer for NullableDeserializer { async fn read_prefix( type_: &Type, diff --git a/klickhouse/src/types/deserialize/sized.rs b/klickhouse/src/types/deserialize/sized.rs index 0fdd293..8ecb3d4 100644 --- a/klickhouse/src/types/deserialize/sized.rs +++ b/klickhouse/src/types/deserialize/sized.rs @@ -9,7 +9,6 @@ use super::{Deserializer, DeserializerState, Type}; pub struct SizedDeserializer; -#[async_trait::async_trait] impl Deserializer for SizedDeserializer { async fn read( type_: &Type, diff --git a/klickhouse/src/types/deserialize/string.rs b/klickhouse/src/types/deserialize/string.rs index 39b2195..29ea4c6 100644 --- a/klickhouse/src/types/deserialize/string.rs +++ b/klickhouse/src/types/deserialize/string.rs @@ -7,7 +7,6 @@ use super::{Deserializer, DeserializerState, Type}; pub struct StringDeserializer; #[allow(clippy::uninit_vec)] -#[async_trait::async_trait] impl Deserializer for StringDeserializer { async fn read( type_: &Type, diff --git a/klickhouse/src/types/deserialize/tuple.rs b/klickhouse/src/types/deserialize/tuple.rs index 042ca57..a2712ce 100644 --- a/klickhouse/src/types/deserialize/tuple.rs +++ b/klickhouse/src/types/deserialize/tuple.rs @@ -4,7 +4,6 @@ use super::{Deserializer, DeserializerState, Type}; pub struct TupleDeserializer; -#[async_trait::async_trait] impl Deserializer for TupleDeserializer { async fn read_prefix( type_: &Type, diff --git a/klickhouse/src/types/mod.rs b/klickhouse/src/types/mod.rs index 2f7089a..901acc2 100644 --- a/klickhouse/src/types/mod.rs +++ b/klickhouse/src/types/mod.rs @@ -1,6 +1,7 @@ use std::{fmt::Display, str::FromStr}; pub use chrono_tz::Tz; +use futures::{Future, FutureExt}; use uuid::Uuid; mod deserialize; @@ -556,242 +557,276 @@ impl Display for Type { } impl Type { - pub(crate) async fn deserialize_prefix( - &self, - reader: &mut R, - state: &mut DeserializerState, - ) -> Result<()> { + pub(crate) fn deserialize_prefix<'a, R: ClickhouseRead>( + &'a self, + reader: &'a mut R, + state: &'a mut DeserializerState, + ) -> impl Future> + Send + '_ { use deserialize::*; - match self { - Type::Int8 - | Type::Int16 - | Type::Int32 - | Type::Int64 - | Type::Int128 - | Type::Int256 - | Type::UInt8 - | Type::UInt16 - | Type::UInt32 - | Type::UInt64 - | Type::UInt128 - | Type::UInt256 - | Type::Float32 - | Type::Float64 - | Type::Decimal32(_) - | Type::Decimal64(_) - | Type::Decimal128(_) - | Type::Decimal256(_) - | Type::Uuid - | Type::Date - | Type::DateTime(_) - | Type::DateTime64(_, _) - | Type::Ipv4 - | Type::Ipv6 - | Type::Enum8(_) - | Type::Enum16(_) => sized::SizedDeserializer::read_prefix(self, reader, state).await?, - Type::String | Type::FixedString(_) => { - string::StringDeserializer::read_prefix(self, reader, state).await? - } + async move { + match self { + Type::Int8 + | Type::Int16 + | Type::Int32 + | Type::Int64 + | Type::Int128 + | Type::Int256 + | Type::UInt8 + | Type::UInt16 + | Type::UInt32 + | Type::UInt64 + | Type::UInt128 + | Type::UInt256 + | Type::Float32 + | Type::Float64 + | Type::Decimal32(_) + | Type::Decimal64(_) + | Type::Decimal128(_) + | Type::Decimal256(_) + | Type::Uuid + | Type::Date + | Type::DateTime(_) + | Type::DateTime64(_, _) + | Type::Ipv4 + | Type::Ipv6 + | Type::Enum8(_) + | Type::Enum16(_) => { + sized::SizedDeserializer::read_prefix(self, reader, state).await? + } - Type::Array(_) => array::ArrayDeserializer::read_prefix(self, reader, state).await?, - Type::Tuple(_) => tuple::TupleDeserializer::read_prefix(self, reader, state).await?, - Type::Point => geo::PointDeserializer::read_prefix(self, reader, state).await?, - Type::Ring => geo::RingDeserializer::read_prefix(self, reader, state).await?, - Type::Polygon => geo::PolygonDeserializer::read_prefix(self, reader, state).await?, - Type::MultiPolygon => { - geo::MultiPolygonDeserializer::read_prefix(self, reader, state).await? - } - Type::Nullable(_) => { - nullable::NullableDeserializer::read_prefix(self, reader, state).await? - } - Type::Map(_, _) => map::MapDeserializer::read_prefix(self, reader, state).await?, - Type::LowCardinality(_) => { - low_cardinality::LowCardinalityDeserializer::read_prefix(self, reader, state) - .await? + Type::String | Type::FixedString(_) => { + string::StringDeserializer::read_prefix(self, reader, state).await? + } + + Type::Array(_) => { + array::ArrayDeserializer::read_prefix(self, reader, state).await? + } + Type::Tuple(_) => { + tuple::TupleDeserializer::read_prefix(self, reader, state).await? + } + Type::Point => geo::PointDeserializer::read_prefix(self, reader, state).await?, + Type::Ring => geo::RingDeserializer::read_prefix(self, reader, state).await?, + Type::Polygon => geo::PolygonDeserializer::read_prefix(self, reader, state).await?, + Type::MultiPolygon => { + geo::MultiPolygonDeserializer::read_prefix(self, reader, state).await? + } + Type::Nullable(_) => { + nullable::NullableDeserializer::read_prefix(self, reader, state).await? + } + Type::Map(_, _) => map::MapDeserializer::read_prefix(self, reader, state).await?, + Type::LowCardinality(_) => { + low_cardinality::LowCardinalityDeserializer::read_prefix(self, reader, state) + .await? + } } + Ok(()) } - Ok(()) + .boxed() } - pub(crate) async fn deserialize_column( - &self, - reader: &mut R, + pub(crate) fn deserialize_column<'a, R: ClickhouseRead>( + &'a self, + reader: &'a mut R, rows: usize, - state: &mut DeserializerState, - ) -> Result> { + state: &'a mut DeserializerState, + ) -> impl Future>> + Send + '_ { use deserialize::*; - if rows > MAX_STRING_SIZE { - return Err(KlickhouseError::ProtocolError(format!( - "deserialize response size too large. {} > {}", - rows, MAX_STRING_SIZE - ))); - } - - Ok(match self { - Type::Int8 - | Type::Int16 - | Type::Int32 - | Type::Int64 - | Type::Int128 - | Type::Int256 - | Type::UInt8 - | Type::UInt16 - | Type::UInt32 - | Type::UInt64 - | Type::UInt128 - | Type::UInt256 - | Type::Float32 - | Type::Float64 - | Type::Decimal32(_) - | Type::Decimal64(_) - | Type::Decimal128(_) - | Type::Decimal256(_) - | Type::Uuid - | Type::Date - | Type::DateTime(_) - | Type::DateTime64(_, _) - | Type::Ipv4 - | Type::Ipv6 - | Type::Enum8(_) - | Type::Enum16(_) => sized::SizedDeserializer::read(self, reader, rows, state).await?, - Type::String | Type::FixedString(_) => { - string::StringDeserializer::read(self, reader, rows, state).await? + async move { + if rows > MAX_STRING_SIZE { + return Err(KlickhouseError::ProtocolError(format!( + "deserialize response size too large. {} > {}", + rows, MAX_STRING_SIZE + ))); } - Type::Array(_) => array::ArrayDeserializer::read(self, reader, rows, state).await?, - Type::Ring => geo::RingDeserializer::read(self, reader, rows, state).await?, - Type::Polygon => geo::PolygonDeserializer::read(self, reader, rows, state).await?, - Type::MultiPolygon => { - geo::MultiPolygonDeserializer::read(self, reader, rows, state).await? - } - Type::Tuple(_) => tuple::TupleDeserializer::read(self, reader, rows, state).await?, - Type::Point => geo::PointDeserializer::read(self, reader, rows, state).await?, - Type::Nullable(_) => { - nullable::NullableDeserializer::read(self, reader, rows, state).await? - } - Type::Map(_, _) => map::MapDeserializer::read(self, reader, rows, state).await?, - Type::LowCardinality(_) => { - low_cardinality::LowCardinalityDeserializer::read(self, reader, rows, state).await? - } - }) + Ok(match self { + Type::Int8 + | Type::Int16 + | Type::Int32 + | Type::Int64 + | Type::Int128 + | Type::Int256 + | Type::UInt8 + | Type::UInt16 + | Type::UInt32 + | Type::UInt64 + | Type::UInt128 + | Type::UInt256 + | Type::Float32 + | Type::Float64 + | Type::Decimal32(_) + | Type::Decimal64(_) + | Type::Decimal128(_) + | Type::Decimal256(_) + | Type::Uuid + | Type::Date + | Type::DateTime(_) + | Type::DateTime64(_, _) + | Type::Ipv4 + | Type::Ipv6 + | Type::Enum8(_) + | Type::Enum16(_) => { + sized::SizedDeserializer::read(self, reader, rows, state).await? + } + + Type::String | Type::FixedString(_) => { + string::StringDeserializer::read(self, reader, rows, state).await? + } + + Type::Array(_) => array::ArrayDeserializer::read(self, reader, rows, state).await?, + Type::Ring => geo::RingDeserializer::read(self, reader, rows, state).await?, + Type::Polygon => geo::PolygonDeserializer::read(self, reader, rows, state).await?, + Type::MultiPolygon => { + geo::MultiPolygonDeserializer::read(self, reader, rows, state).await? + } + Type::Tuple(_) => tuple::TupleDeserializer::read(self, reader, rows, state).await?, + Type::Point => geo::PointDeserializer::read(self, reader, rows, state).await?, + Type::Nullable(_) => { + nullable::NullableDeserializer::read(self, reader, rows, state).await? + } + Type::Map(_, _) => map::MapDeserializer::read(self, reader, rows, state).await?, + Type::LowCardinality(_) => { + low_cardinality::LowCardinalityDeserializer::read(self, reader, rows, state) + .await? + } + }) + } + .boxed() } - pub(crate) async fn serialize_column( - &self, + pub(crate) fn serialize_column<'a, W: ClickhouseWrite>( + &'a self, values: Vec, - writer: &mut W, - state: &mut SerializerState, - ) -> Result<()> { + writer: &'a mut W, + state: &'a mut SerializerState, + ) -> impl Future> + Send + '_ { use serialize::*; - match self { - Type::Int8 - | Type::Int16 - | Type::Int32 - | Type::Int64 - | Type::Int128 - | Type::Int256 - | Type::UInt8 - | Type::UInt16 - | Type::UInt32 - | Type::UInt64 - | Type::UInt128 - | Type::UInt256 - | Type::Float32 - | Type::Float64 - | Type::Decimal32(_) - | Type::Decimal64(_) - | Type::Decimal128(_) - | Type::Decimal256(_) - | Type::Uuid - | Type::Date - | Type::DateTime(_) - | Type::DateTime64(_, _) - | Type::Ipv4 - | Type::Ipv6 - | Type::Enum8(_) - | Type::Enum16(_) => sized::SizedSerializer::write(self, values, writer, state).await?, - Type::String | Type::FixedString(_) => { - string::StringSerializer::write(self, values, writer, state).await? - } + async move { + match self { + Type::Int8 + | Type::Int16 + | Type::Int32 + | Type::Int64 + | Type::Int128 + | Type::Int256 + | Type::UInt8 + | Type::UInt16 + | Type::UInt32 + | Type::UInt64 + | Type::UInt128 + | Type::UInt256 + | Type::Float32 + | Type::Float64 + | Type::Decimal32(_) + | Type::Decimal64(_) + | Type::Decimal128(_) + | Type::Decimal256(_) + | Type::Uuid + | Type::Date + | Type::DateTime(_) + | Type::DateTime64(_, _) + | Type::Ipv4 + | Type::Ipv6 + | Type::Enum8(_) + | Type::Enum16(_) => { + sized::SizedSerializer::write(self, values, writer, state).await? + } - Type::Array(_) => array::ArraySerializer::write(self, values, writer, state).await?, - Type::Tuple(_) => tuple::TupleSerializer::write(self, values, writer, state).await?, - Type::Point => geo::PointSerializer::write(self, values, writer, state).await?, - Type::Ring => geo::RingSerializer::write(self, values, writer, state).await?, - Type::Polygon => geo::PolygonSerializer::write(self, values, writer, state).await?, - Type::MultiPolygon => { - geo::MultiPolygonSerializer::write(self, values, writer, state).await? - } - Type::Nullable(_) => { - nullable::NullableSerializer::write(self, values, writer, state).await? - } - Type::Map(_, _) => map::MapSerializer::write(self, values, writer, state).await?, - Type::LowCardinality(_) => { - low_cardinality::LowCardinalitySerializer::write(self, values, writer, state) - .await? + Type::String | Type::FixedString(_) => { + string::StringSerializer::write(self, values, writer, state).await? + } + + Type::Array(_) => { + array::ArraySerializer::write(self, values, writer, state).await? + } + Type::Tuple(_) => { + tuple::TupleSerializer::write(self, values, writer, state).await? + } + Type::Point => geo::PointSerializer::write(self, values, writer, state).await?, + Type::Ring => geo::RingSerializer::write(self, values, writer, state).await?, + Type::Polygon => geo::PolygonSerializer::write(self, values, writer, state).await?, + Type::MultiPolygon => { + geo::MultiPolygonSerializer::write(self, values, writer, state).await? + } + Type::Nullable(_) => { + nullable::NullableSerializer::write(self, values, writer, state).await? + } + Type::Map(_, _) => map::MapSerializer::write(self, values, writer, state).await?, + Type::LowCardinality(_) => { + low_cardinality::LowCardinalitySerializer::write(self, values, writer, state) + .await? + } } + Ok(()) } - Ok(()) + .boxed() } - pub(crate) async fn serialize_prefix( - &self, - writer: &mut W, - state: &mut SerializerState, - ) -> Result<()> { + pub(crate) fn serialize_prefix<'a, W: ClickhouseWrite>( + &'a self, + writer: &'a mut W, + state: &'a mut SerializerState, + ) -> impl Future> + Send + '_ { use serialize::*; - match self { - Type::Int8 - | Type::Int16 - | Type::Int32 - | Type::Int64 - | Type::Int128 - | Type::Int256 - | Type::UInt8 - | Type::UInt16 - | Type::UInt32 - | Type::UInt64 - | Type::UInt128 - | Type::UInt256 - | Type::Float32 - | Type::Float64 - | Type::Decimal32(_) - | Type::Decimal64(_) - | Type::Decimal128(_) - | Type::Decimal256(_) - | Type::Uuid - | Type::Date - | Type::DateTime(_) - | Type::DateTime64(_, _) - | Type::Ipv4 - | Type::Ipv6 - | Type::Enum8(_) - | Type::Enum16(_) => sized::SizedSerializer::write_prefix(self, writer, state).await?, - Type::String | Type::FixedString(_) => { - string::StringSerializer::write_prefix(self, writer, state).await? - } + async move { + match self { + Type::Int8 + | Type::Int16 + | Type::Int32 + | Type::Int64 + | Type::Int128 + | Type::Int256 + | Type::UInt8 + | Type::UInt16 + | Type::UInt32 + | Type::UInt64 + | Type::UInt128 + | Type::UInt256 + | Type::Float32 + | Type::Float64 + | Type::Decimal32(_) + | Type::Decimal64(_) + | Type::Decimal128(_) + | Type::Decimal256(_) + | Type::Uuid + | Type::Date + | Type::DateTime(_) + | Type::DateTime64(_, _) + | Type::Ipv4 + | Type::Ipv6 + | Type::Enum8(_) + | Type::Enum16(_) => { + sized::SizedSerializer::write_prefix(self, writer, state).await? + } - Type::Array(_) => array::ArraySerializer::write_prefix(self, writer, state).await?, - Type::Tuple(_) => tuple::TupleSerializer::write_prefix(self, writer, state).await?, - Type::Point => geo::PointSerializer::write_prefix(self, writer, state).await?, - Type::Ring => geo::RingSerializer::write_prefix(self, writer, state).await?, - Type::Polygon => geo::PolygonSerializer::write_prefix(self, writer, state).await?, - Type::MultiPolygon => { - geo::MultiPolygonSerializer::write_prefix(self, writer, state).await? - } - Type::Nullable(_) => { - nullable::NullableSerializer::write_prefix(self, writer, state).await? - } - Type::Map(_, _) => map::MapSerializer::write_prefix(self, writer, state).await?, - Type::LowCardinality(_) => { - low_cardinality::LowCardinalitySerializer::write_prefix(self, writer, state).await? + Type::String | Type::FixedString(_) => { + string::StringSerializer::write_prefix(self, writer, state).await? + } + + Type::Array(_) => array::ArraySerializer::write_prefix(self, writer, state).await?, + Type::Tuple(_) => tuple::TupleSerializer::write_prefix(self, writer, state).await?, + Type::Point => geo::PointSerializer::write_prefix(self, writer, state).await?, + Type::Ring => geo::RingSerializer::write_prefix(self, writer, state).await?, + Type::Polygon => geo::PolygonSerializer::write_prefix(self, writer, state).await?, + Type::MultiPolygon => { + geo::MultiPolygonSerializer::write_prefix(self, writer, state).await? + } + Type::Nullable(_) => { + nullable::NullableSerializer::write_prefix(self, writer, state).await? + } + Type::Map(_, _) => map::MapSerializer::write_prefix(self, writer, state).await?, + Type::LowCardinality(_) => { + low_cardinality::LowCardinalitySerializer::write_prefix(self, writer, state) + .await? + } } + Ok(()) } - Ok(()) + .boxed() } pub(crate) fn validate(&self) -> Result<()> { @@ -991,47 +1026,45 @@ pub struct DeserializerState {} pub struct SerializerState {} -#[async_trait::async_trait] pub trait Deserializer { - async fn read_prefix( + fn read_prefix( _type_: &Type, _reader: &mut R, _state: &mut DeserializerState, - ) -> Result<()> { - Ok(()) + ) -> impl Future> { + async { Ok(()) } } - async fn read( + fn read( type_: &Type, reader: &mut R, rows: usize, state: &mut DeserializerState, - ) -> Result>; + ) -> impl Future>>; } -#[async_trait::async_trait] pub trait Serializer { - async fn write_prefix( + fn write_prefix( _type_: &Type, _writer: &mut W, _state: &mut SerializerState, - ) -> Result<()> { - Ok(()) + ) -> impl Future> { + async { Ok(()) } } - async fn write_suffix( + fn write_suffix( _type_: &Type, _value: &[Value], _writer: &mut W, _state: &mut SerializerState, - ) -> Result<()> { - Ok(()) + ) -> impl Future> { + async { Ok(()) } } - async fn write( + fn write( type_: &Type, values: Vec, writer: &mut W, state: &mut SerializerState, - ) -> Result<()>; + ) -> impl Future>; } diff --git a/klickhouse/src/types/serialize/array.rs b/klickhouse/src/types/serialize/array.rs index ad99df5..1108a41 100644 --- a/klickhouse/src/types/serialize/array.rs +++ b/klickhouse/src/types/serialize/array.rs @@ -24,7 +24,6 @@ impl ArraySerializerGeneric for ArraySerializer { } } -#[async_trait::async_trait] impl Serializer for T { async fn write_prefix( type_: &Type, diff --git a/klickhouse/src/types/serialize/geo.rs b/klickhouse/src/types/serialize/geo.rs index 0214800..822f0a1 100644 --- a/klickhouse/src/types/serialize/geo.rs +++ b/klickhouse/src/types/serialize/geo.rs @@ -4,7 +4,6 @@ use super::{Serializer, SerializerState, Type}; pub struct PointSerializer; -#[async_trait::async_trait] impl Serializer for PointSerializer { async fn write_prefix( _type_: &Type, diff --git a/klickhouse/src/types/serialize/low_cardinality.rs b/klickhouse/src/types/serialize/low_cardinality.rs index d006c69..b5f39a9 100644 --- a/klickhouse/src/types/serialize/low_cardinality.rs +++ b/klickhouse/src/types/serialize/low_cardinality.rs @@ -9,7 +9,6 @@ use crate::types::low_cardinality::*; pub struct LowCardinalitySerializer; -#[async_trait::async_trait] impl Serializer for LowCardinalitySerializer { async fn write_prefix( _type_: &Type, diff --git a/klickhouse/src/types/serialize/map.rs b/klickhouse/src/types/serialize/map.rs index ef9957a..4e1a1d3 100644 --- a/klickhouse/src/types/serialize/map.rs +++ b/klickhouse/src/types/serialize/map.rs @@ -6,7 +6,6 @@ use super::{Serializer, SerializerState, Type}; pub struct MapSerializer; -#[async_trait::async_trait] impl Serializer for MapSerializer { async fn write_prefix( type_: &Type, diff --git a/klickhouse/src/types/serialize/nullable.rs b/klickhouse/src/types/serialize/nullable.rs index 338e18f..766e13f 100644 --- a/klickhouse/src/types/serialize/nullable.rs +++ b/klickhouse/src/types/serialize/nullable.rs @@ -5,7 +5,6 @@ use crate::{io::ClickhouseWrite, values::Value, Result}; use super::{Serializer, SerializerState, Type}; pub struct NullableSerializer; -#[async_trait::async_trait] impl Serializer for NullableSerializer { async fn write( type_: &Type, diff --git a/klickhouse/src/types/serialize/sized.rs b/klickhouse/src/types/serialize/sized.rs index 5994035..f88494f 100644 --- a/klickhouse/src/types/serialize/sized.rs +++ b/klickhouse/src/types/serialize/sized.rs @@ -11,7 +11,6 @@ fn swap_endian_256(mut input: [u8; 32]) -> [u8; 32] { input } -#[async_trait::async_trait] impl Serializer for SizedSerializer { async fn write( type_: &Type, diff --git a/klickhouse/src/types/serialize/string.rs b/klickhouse/src/types/serialize/string.rs index 74c9f78..6a3e7c1 100644 --- a/klickhouse/src/types/serialize/string.rs +++ b/klickhouse/src/types/serialize/string.rs @@ -23,7 +23,6 @@ async fn emit_bytes(type_: &Type, bytes: &[u8], writer: &mut Ok(()) } -#[async_trait::async_trait] impl Serializer for StringSerializer { async fn write( type_: &Type, diff --git a/klickhouse/src/types/serialize/tuple.rs b/klickhouse/src/types/serialize/tuple.rs index 86bb25c..d966b99 100644 --- a/klickhouse/src/types/serialize/tuple.rs +++ b/klickhouse/src/types/serialize/tuple.rs @@ -4,7 +4,6 @@ use super::{Serializer, SerializerState, Type}; pub struct TupleSerializer; -#[async_trait::async_trait] impl Serializer for TupleSerializer { async fn write_prefix( type_: &Type,