Skip to content

Commit

Permalink
feat(query): introduce arrow-udf-js (#14799)
Browse files Browse the repository at this point in the history
* feat: support code string block

* feat: support udf interpreter create

* feat: support udf interpreter create

* feat: update

* feat: upgrade arrow to version 50

* feat: update

* feat: update

* feat: update

* feat: update

* feat: update

* feat: update

* feat: update

* feat: update

* feat: update

* update

* update

* update

* update

* update

* update

* update

* update

* update

* update

* update

---------

Co-authored-by: Bohu <overred.shuttler@gmail.com>
  • Loading branch information
sundy-li and BohuTANG authored Mar 4, 2024
1 parent 6a7f531 commit 6777c17
Show file tree
Hide file tree
Showing 56 changed files with 969 additions and 122 deletions.
42 changes: 41 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ async-trait = { version = "0.1.77", package = "async-trait-fn" }
bincode = { version = "2.0.0-rc.3", features = ["serde", "std", "alloc"] }
borsh = { version = "1.2.1", features = ["derive"] }
bytes = "1.5.0"
hashbrown = { version = "0.14.3", default-features = false }
byteorder = "1.4.3"
chrono = { version = "0.4.31", features = ["serde"] }
chrono-tz = { version = "0.8", features = ["serde"] }
Expand Down
1 change: 1 addition & 0 deletions src/meta/app/src/principal/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ pub use user_auth::PasswordHashMethod;
pub use user_defined_file_format::UserDefinedFileFormat;
pub use user_defined_function::LambdaUDF;
pub use user_defined_function::UDFDefinition;
pub use user_defined_function::UDFScript;
pub use user_defined_function::UDFServer;
pub use user_defined_function::UdfName;
pub use user_defined_function::UserDefinedFunction;
Expand Down
56 changes: 56 additions & 0 deletions src/meta/app/src/principal/user_defined_function.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,21 @@ pub struct UDFServer {
pub return_type: DataType,
}

#[derive(Clone, Debug, Eq, PartialEq)]
pub struct UDFScript {
pub code: String,
pub handler: String,
pub language: String,
pub arg_types: Vec<DataType>,
pub return_type: DataType,
pub runtime_version: String,
}

#[derive(Clone, Debug, Eq, PartialEq)]
pub enum UDFDefinition {
LambdaUDF(LambdaUDF),
UDFServer(UDFServer),
UDFScript(UDFScript),
}

#[derive(Clone, Debug, Eq, PartialEq)]
Expand Down Expand Up @@ -108,6 +119,31 @@ impl UserDefinedFunction {
created_on: Utc::now(),
}
}

pub fn create_udf_script(
name: &str,
code: &str,
handler: &str,
language: &str,
arg_types: Vec<DataType>,
return_type: DataType,
runtime_version: &str,
description: &str,
) -> Self {
Self {
name: name.to_string(),
description: description.to_string(),
definition: UDFDefinition::UDFScript(UDFScript {
code: code.to_string(),
handler: handler.to_string(),
language: language.to_string(),
arg_types,
return_type,
runtime_version: runtime_version.to_string(),
}),
created_on: Utc::now(),
}
}
}

impl Display for UDFDefinition {
Expand Down Expand Up @@ -144,6 +180,26 @@ impl Display for UDFDefinition {
") RETURNS {return_type} LANGUAGE {language} HANDLER = {handler} ADDRESS = {address}"
)?;
}

UDFDefinition::UDFScript(UDFScript {
code,
arg_types,
return_type,
handler,
language,
runtime_version,
}) => {
for (i, item) in arg_types.iter().enumerate() {
if i > 0 {
write!(f, ", ")?;
}
write!(f, "{item}")?;
}
write!(
f,
") RETURNS {return_type} LANGUAGE {language} RUNTIME_VERSION = {runtime_version} HANDLER = {handler} AS $${code}$$"
)?;
}
}
Ok(())
}
Expand Down
64 changes: 64 additions & 0 deletions src/meta/proto-conv/src/udf_from_to_protobuf_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,64 @@ impl FromToProto for mt::UDFServer {
}
}

impl FromToProto for mt::UDFScript {
type PB = pb::UdfScript;
fn get_pb_ver(p: &Self::PB) -> u64 {
p.ver
}
fn from_pb(p: pb::UdfScript) -> Result<Self, Incompatible> {
reader_check_msg(p.ver, p.min_reader_ver)?;

let mut arg_types = Vec::with_capacity(p.arg_types.len());
for arg_type in p.arg_types {
let arg_type = DataType::from(&TableDataType::from_pb(arg_type)?);
arg_types.push(arg_type);
}
let return_type = DataType::from(&TableDataType::from_pb(p.return_type.ok_or_else(
|| Incompatible {
reason: "UDFScript.return_type can not be None".to_string(),
},
)?)?);

Ok(mt::UDFScript {
code: p.code,
arg_types,
return_type,
handler: p.handler,
language: p.language,
runtime_version: p.runtime_version,
})
}

fn to_pb(&self) -> Result<pb::UdfScript, Incompatible> {
let mut arg_types = Vec::with_capacity(self.arg_types.len());
for arg_type in self.arg_types.iter() {
let arg_type = infer_schema_type(arg_type)
.map_err(|e| Incompatible {
reason: format!("Convert DataType to TableDataType failed: {}", e.message()),
})?
.to_pb()?;
arg_types.push(arg_type);
}
let return_type = infer_schema_type(&self.return_type)
.map_err(|e| Incompatible {
reason: format!("Convert DataType to TableDataType failed: {}", e.message()),
})?
.to_pb()?;

Ok(pb::UdfScript {
ver: VER,
min_reader_ver: MIN_READER_VER,
code: self.code.clone(),
handler: self.handler.clone(),
language: self.language.clone(),
arg_types,
return_type: Some(return_type),
runtime_version: self.runtime_version.clone(),
})
}
}

impl FromToProto for mt::UserDefinedFunction {
type PB = pb::UserDefinedFunction;
fn get_pb_ver(p: &Self::PB) -> u64 {
Expand All @@ -120,6 +178,9 @@ impl FromToProto for mt::UserDefinedFunction {
Some(pb::user_defined_function::Definition::UdfServer(udf_server)) => {
mt::UDFDefinition::UDFServer(mt::UDFServer::from_pb(udf_server)?)
}
Some(pb::user_defined_function::Definition::UdfScript(udf_script)) => {
mt::UDFDefinition::UDFScript(mt::UDFScript::from_pb(udf_script)?)
}
None => {
return Err(Incompatible {
reason: "UserDefinedFunction.definition cannot be None".to_string(),
Expand All @@ -146,6 +207,9 @@ impl FromToProto for mt::UserDefinedFunction {
mt::UDFDefinition::UDFServer(udf_server) => {
pb::user_defined_function::Definition::UdfServer(udf_server.to_pb()?)
}
mt::UDFDefinition::UDFScript(udf_script) => {
pb::user_defined_function::Definition::UdfScript(udf_script.to_pb()?)
}
};

Ok(pb::UserDefinedFunction {
Expand Down
3 changes: 2 additions & 1 deletion src/meta/proto-conv/src/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,8 @@ const META_CHANGE_LOG: &[(u64, &str)] = &[
(77, "2024-01-22: Remove: allow_anonymous in S3 Config", ),
(78, "2024-01-29: Refactor: GrantEntry::UserPrivilegeType and ShareGrantEntry::ShareGrantObjectPrivilege use from_bits_truncate deserialize", ),
(79, "2024-01-31: Add: udf.proto/UserDefinedFunction add created_on field", ),
(80, "2024-02-01: Add: Add: datatype.proto/DataType Geometry type")
(80, "2024-02-01: Add: Add: datatype.proto/DataType Geometry type"),
(81, "2024-03-94: Add: Add: udf.udf_script")
// Dear developer:
// If you're gonna add a new metadata version, you'll have to add a test for it.
// You could just copy an existing test file(e.g., `../tests/it/v024_table_meta.rs`)
Expand Down
2 changes: 1 addition & 1 deletion src/meta/proto-conv/tests/it/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,4 +83,4 @@ mod v076_role_ownership_info;
mod v077_s3_remove_allow_anonymous;
mod v078_grantentry;
mod v079_udf_created_on;
mod v080_geometry_datatype;
mod v081_udf_script;
92 changes: 92 additions & 0 deletions src/meta/proto-conv/tests/it/v081_udf_script.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
// Copyright 2023 Datafuse Labs.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use chrono::DateTime;
use chrono::Utc;
use databend_common_expression::types::DataType;
use databend_common_expression::types::NumberDataType;
use databend_common_meta_app::principal::LambdaUDF;
use databend_common_meta_app::principal::UDFDefinition;
use databend_common_meta_app::principal::UDFServer;
use databend_common_meta_app::principal::UserDefinedFunction;
use minitrace::func_name;

use crate::common;

// These bytes are built when a new version in introduced,
// and are kept for backward compatibility test.
//
// *************************************************************
// * These messages should never be updated, *
// * only be added when a new version is added, *
// * or be removed when an old version is no longer supported. *
// *************************************************************
//
// The message bytes are built from the output of `test_pb_from_to()`
#[test]
fn test_decode_v81_udf_python() -> anyhow::Result<()> {
let bytes = vec![
10, 8, 112, 108, 117, 115, 95, 105, 110, 116, 18, 21, 84, 104, 105, 115, 32, 105, 115, 32,
97, 32, 100, 101, 115, 99, 114, 105, 112, 116, 105, 111, 110, 34, 107, 10, 21, 104, 116,
116, 112, 58, 47, 47, 108, 111, 99, 97, 108, 104, 111, 115, 116, 58, 56, 56, 56, 56, 18,
11, 112, 108, 117, 115, 95, 105, 110, 116, 95, 112, 121, 26, 6, 112, 121, 116, 104, 111,
110, 34, 17, 154, 2, 8, 58, 0, 160, 6, 81, 168, 6, 24, 160, 6, 81, 168, 6, 24, 34, 17, 154,
2, 8, 58, 0, 160, 6, 81, 168, 6, 24, 160, 6, 81, 168, 6, 24, 42, 17, 154, 2, 8, 66, 0, 160,
6, 81, 168, 6, 24, 160, 6, 81, 168, 6, 24, 160, 6, 81, 168, 6, 24, 42, 23, 50, 48, 50, 51,
45, 49, 50, 45, 49, 53, 32, 48, 49, 58, 50, 54, 58, 48, 57, 32, 85, 84, 67, 160, 6, 81,
168, 6, 24,
];

let want = || UserDefinedFunction {
name: "plus_int".to_string(),
description: "This is a description".to_string(),
definition: UDFDefinition::UDFServer(UDFServer {
address: "http://localhost:8888".to_string(),
handler: "plus_int_py".to_string(),
language: "python".to_string(),
arg_types: vec![
DataType::Number(NumberDataType::Int32),
DataType::Number(NumberDataType::Int32),
],
return_type: DataType::Number(NumberDataType::Int64),
}),
created_on: DateTime::<Utc>::from_timestamp(1702603569, 0).unwrap(),
};

common::test_pb_from_to(func_name!(), want())?;
common::test_load_old(func_name!(), bytes.as_slice(), 81, want())
}

#[test]
fn test_decode_v81_udf_sql() -> anyhow::Result<()> {
let bytes = vec![
10, 10, 105, 115, 110, 111, 116, 101, 109, 112, 116, 121, 18, 21, 84, 104, 105, 115, 32,
105, 115, 32, 97, 32, 100, 101, 115, 99, 114, 105, 112, 116, 105, 111, 110, 26, 34, 10, 1,
112, 18, 23, 40, 112, 41, 32, 45, 62, 32, 40, 78, 79, 84, 32, 105, 115, 95, 110, 117, 108,
108, 40, 112, 41, 41, 160, 6, 81, 168, 6, 24, 42, 23, 49, 57, 55, 53, 45, 48, 53, 45, 50,
53, 32, 49, 54, 58, 51, 57, 58, 52, 52, 32, 85, 84, 67, 160, 6, 81, 168, 6, 24,
];
let want = || UserDefinedFunction {
name: "isnotempty".to_string(),
description: "This is a description".to_string(),
definition: UDFDefinition::LambdaUDF(LambdaUDF {
parameters: vec!["p".to_string()],
definition: "(p) -> (NOT is_null(p))".to_string(),
}),
created_on: DateTime::<Utc>::from_timestamp(170267984, 0).unwrap(),
};

common::test_pb_from_to(func_name!(), want())?;
common::test_load_old(func_name!(), bytes.as_slice(), 81, want())
}
14 changes: 14 additions & 0 deletions src/meta/protos/proto/udf.proto
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,19 @@ message UDFServer {
DataType return_type = 5;
}

message UDFScript {
uint64 ver = 100;
uint64 min_reader_ver = 101;

string code = 1;
string handler = 2;
string language = 3;
repeated DataType arg_types = 4;
DataType return_type = 5;
string runtime_version = 6;
}


message UserDefinedFunction {
uint64 ver = 100;
uint64 min_reader_ver = 101;
Expand All @@ -46,6 +59,7 @@ message UserDefinedFunction {
oneof definition {
LambdaUDF lambda_udf = 3;
UDFServer udf_server = 4;
UDFScript udf_script = 6;
}
// The time udf created.
optional string created_on = 5;
Expand Down
Loading

0 comments on commit 6777c17

Please sign in to comment.