Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add non-cryptographic hash function #2298

Merged
merged 23 commits into from
Dec 28, 2023
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 43 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions crates/sqlbuiltins/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,3 +35,6 @@ bson = "2.7.0"
tokio-util = "0.7.10"
bytes = "1.5.0"
kdl = "5.0.0-alpha.1"
siphasher = "1.0.0"
fnv = "1.0.7"
memoize = { version = "0.4.2", features = ["full"] }
53 changes: 51 additions & 2 deletions crates/sqlbuiltins/src/errors.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,56 @@
#[derive(Debug, thiserror::Error)]
use datafusion::arrow::datatypes::DataType;
use datafusion::arrow::error::ArrowError;
use datafusion::error::DataFusionError;

#[derive(Clone, Debug, thiserror::Error)]
pub enum BuiltinError {
#[error("parse error: {0}")]
ParseError(String),

#[error("fundamental parsing error")]
FundamentalError,

#[error("missing value at index {0}")]
MissingValueAtIndex(usize),

#[error("expected value missing")]
MissingValue,

#[error("invalid value: {0}")]
InvalidValue(String),

#[error("columnar values not support at index {0}")]
InvalidColumnarValue(usize),

#[error("value was type {0}, expected {1}")]
IncorrectType(DataType, DataType),

#[error(transparent)]
DatafusionExtError(#[from] datafusion_ext::errors::ExtensionError),
KdlError(#[from] kdl::KdlError),

#[error("DataFusionError: {0}")]
DataFusionError(String),

#[error("ArrowError: {0}")]
ArrowError(String),
}

pub type Result<T, E = BuiltinError> = std::result::Result<T, E>;

impl From<BuiltinError> for DataFusionError {
fn from(e: BuiltinError) -> Self {
DataFusionError::Execution(e.to_string())
}
}

impl From<DataFusionError> for BuiltinError {
fn from(e: DataFusionError) -> Self {
BuiltinError::DataFusionError(e.to_string())
}
}

impl From<ArrowError> for BuiltinError {
fn from(e: ArrowError) -> Self {
BuiltinError::ArrowError(e.to_string())
}
}
26 changes: 16 additions & 10 deletions crates/sqlbuiltins/src/functions/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,20 +3,23 @@ mod aggregates;
mod scalars;
mod table;

use self::scalars::df_scalars::ArrowCastFunction;
use self::scalars::kdl::{KDLMatches, KDLSelect};
use self::scalars::{postgres::*, ConnectionId, Version};
use self::table::{BuiltinTableFuncs, TableFunc};
use std::collections::HashMap;
use std::sync::Arc;

use datafusion::logical_expr::{AggregateFunction, BuiltinScalarFunction, Expr, Signature};
use once_cell::sync::Lazy;

use scalars::df_scalars::ArrowCastFunction;
use scalars::hashing::{FnvHash, SipHash};
use scalars::kdl::{KDLMatches, KDLSelect};
use scalars::postgres::*;
use scalars::{ConnectionId, Version};
use table::{BuiltinTableFuncs, TableFunc};

use protogen::metastore::types::catalog::{
EntryMeta, EntryType, FunctionEntry, FunctionType, RuntimePreference,
};

use std::collections::HashMap;
use std::sync::Arc;

/// Builtin table returning functions available for all sessions.
static BUILTIN_TABLE_FUNCS: Lazy<BuiltinTableFuncs> = Lazy::new(BuiltinTableFuncs::new);
pub static ARROW_CAST_FUNC: Lazy<ArrowCastFunction> = Lazy::new(|| ArrowCastFunction {});
Expand Down Expand Up @@ -187,12 +190,15 @@ impl FunctionRegistry {
Arc::new(PgTableIsVisible),
Arc::new(PgEncodingToChar),
Arc::new(PgArrayToString),
// System functions
Arc::new(ConnectionId),
Arc::new(Version),
// KDL functions
Arc::new(KDLMatches),
Arc::new(KDLSelect),
// Other functions
Arc::new(ConnectionId),
Arc::new(Version),
// Hashing/Sharding
Arc::new(SipHash),
Arc::new(FnvHash),
];
let udfs = udfs
.into_iter()
Expand Down
88 changes: 88 additions & 0 deletions crates/sqlbuiltins/src/functions/scalars/hashing.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
use std::hash::{Hash, Hasher};

use fnv::FnvHasher;
use siphasher::sip::SipHasher24;

use super::*;

pub struct SipHash;

impl ConstBuiltinFunction for SipHash {
const NAME: &'static str = "siphash";
universalmind303 marked this conversation as resolved.
Show resolved Hide resolved
const DESCRIPTION: &'static str =
"Calculates a 64bit non-cryptographic hash (SipHash24) of the value.";
universalmind303 marked this conversation as resolved.
Show resolved Hide resolved
const EXAMPLE: &'static str = "siphash(<value>)";
const FUNCTION_TYPE: FunctionType = FunctionType::Scalar;

fn signature(&self) -> Option<Signature> {
Some(Signature::new(
// args: <FIELD>
TypeSignature::Any(1),
Volatility::Immutable,
))
}
}
impl BuiltinScalarUDF for SipHash {
fn as_expr(&self, args: Vec<Expr>) -> Expr {
let udf = ScalarUDF {
name: Self::NAME.to_string(),
signature: ConstBuiltinFunction::signature(self).unwrap(),
return_type: Arc::new(|_| Ok(Arc::new(DataType::UInt64))),
fun: Arc::new(move |input| {
Ok(get_nth_scalar_value(input, 0, &|value| -> Result<
ScalarValue,
BuiltinError,
> {
let mut hasher = SipHasher24::new();
value.hash(&mut hasher);
Ok(ScalarValue::UInt64(Some(hasher.finish())))
})?)
}),
};
Expr::ScalarUDF(datafusion::logical_expr::expr::ScalarUDF::new(
Arc::new(udf),
args,
))
}
}

pub struct FnvHash;

impl ConstBuiltinFunction for FnvHash {
const NAME: &'static str = "fnv";
const DESCRIPTION: &'static str =
"Calculates a 64bit non-cryptographic hash (fnv1a) of the value.";
const EXAMPLE: &'static str = "fnv(<value>)";
const FUNCTION_TYPE: FunctionType = FunctionType::Scalar;

fn signature(&self) -> Option<Signature> {
Some(Signature::new(
// args: <FIELD>
TypeSignature::Any(1),
Volatility::Immutable,
))
}
}
impl BuiltinScalarUDF for FnvHash {
fn as_expr(&self, args: Vec<Expr>) -> Expr {
let udf = ScalarUDF {
name: Self::NAME.to_string(),
signature: ConstBuiltinFunction::signature(self).unwrap(),
return_type: Arc::new(|_| Ok(Arc::new(DataType::UInt64))),
fun: Arc::new(move |input| {
Ok(get_nth_scalar_value(input, 0, &|value| -> Result<
ScalarValue,
BuiltinError,
> {
let mut hasher = FnvHasher::default();
value.hash(&mut hasher);
Ok(ScalarValue::UInt64(Some(hasher.finish())))
})?)
}),
};
Expr::ScalarUDF(datafusion::logical_expr::expr::ScalarUDF::new(
Arc::new(udf),
args,
))
}
}
Loading
Loading