diff --git a/Cargo.lock b/Cargo.lock index 09c90448c7776..85b1584f355e4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -10369,6 +10369,7 @@ dependencies = [ "thiserror-ext", "tikv-jemallocator", "tokio-metrics", + "tokio-postgres", "tokio-stream 0.1.15 (git+https://github.com/madsim-rs/tokio.git?rev=0dd1055)", "tokio-util", "tracing", @@ -11253,6 +11254,7 @@ dependencies = [ "tempfile", "thiserror", "thiserror-ext", + "tokio-postgres", "tokio-stream 0.1.15 (git+https://github.com/madsim-rs/tokio.git?rev=0dd1055)", "tracing", "uuid", diff --git a/ci/scripts/e2e-source-test.sh b/ci/scripts/e2e-source-test.sh index 6bf2f8a491576..38e59a7da6c85 100755 --- a/ci/scripts/e2e-source-test.sh +++ b/ci/scripts/e2e-source-test.sh @@ -141,6 +141,9 @@ risedev slt './e2e_test/source/cdc/cdc.check_new_rows.slt' # drop relations risedev slt './e2e_test/source/cdc/cdc_share_stream_drop.slt' +echo "--- postgres_query tvf test" +risedev slt './e2e_test/source/tvf/postgres_query.slt' + echo "--- Kill cluster" risedev ci-kill export RISINGWAVE_CI=true diff --git a/e2e_test/source/tvf/postgres_query.slt b/e2e_test/source/tvf/postgres_query.slt new file mode 100644 index 0000000000000..566ace0409a1a --- /dev/null +++ b/e2e_test/source/tvf/postgres_query.slt @@ -0,0 +1,40 @@ +system ok +PGHOST=db PGPORT=5432 PGUSER=postgres PGPASSWORD=postgres PGDATABASE=cdc_test psql -c " +CREATE TABLE test ( + id bigint primary key, + v1 bool, + v2 smallint, + v3 integer, + v4 bigint, + v5 real, + v6 double precision, + v7 numeric, + v8 date, + v9 time, + v10 timestamp, + v11 timestamptz, + v12 text, + v13 varchar, + v14 interval, + v15 jsonb, + v16 bytea +);" + +system ok +PGHOST=db PGPORT=5432 PGUSER=postgres PGPASSWORD=postgres PGDATABASE=cdc_test psql -c " +INSERT INTO test SELECT generate_series(1, 100), true, 1, 1, 1, 1.0, 1.0, 1.0, '2021-01-01', '00:00:00', '2021-01-01 00:00:00', '2021-01-01 00:00:00 pst', 'text', 'varchar', '1 day', '{}', '\x01'; +" + +query II +select * from postgres_query('db', '5432', 'postgres', 'postgres', 'cdc_test', 'select * from test where id > 90;'); +---- +91 t 1 1 1 1 1 1.0 2021-01-01 00:00:00 2021-01-01 00:00:00 2021-01-01 08:00:00+00:00 text varchar 1 day {} \x01 +92 t 1 1 1 1 1 1.0 2021-01-01 00:00:00 2021-01-01 00:00:00 2021-01-01 08:00:00+00:00 text varchar 1 day {} \x01 +93 t 1 1 1 1 1 1.0 2021-01-01 00:00:00 2021-01-01 00:00:00 2021-01-01 08:00:00+00:00 text varchar 1 day {} \x01 +94 t 1 1 1 1 1 1.0 2021-01-01 00:00:00 2021-01-01 00:00:00 2021-01-01 08:00:00+00:00 text varchar 1 day {} \x01 +95 t 1 1 1 1 1 1.0 2021-01-01 00:00:00 2021-01-01 00:00:00 2021-01-01 08:00:00+00:00 text varchar 1 day {} \x01 +96 t 1 1 1 1 1 1.0 2021-01-01 00:00:00 2021-01-01 00:00:00 2021-01-01 08:00:00+00:00 text varchar 1 day {} \x01 +97 t 1 1 1 1 1 1.0 2021-01-01 00:00:00 2021-01-01 00:00:00 2021-01-01 08:00:00+00:00 text varchar 1 day {} \x01 +98 t 1 1 1 1 1 1.0 2021-01-01 00:00:00 2021-01-01 00:00:00 2021-01-01 08:00:00+00:00 text varchar 1 day {} \x01 +99 t 1 1 1 1 1 1.0 2021-01-01 00:00:00 2021-01-01 00:00:00 2021-01-01 08:00:00+00:00 text varchar 1 day {} \x01 +100 t 1 1 1 1 1 1.0 2021-01-01 00:00:00 2021-01-01 00:00:00 2021-01-01 08:00:00+00:00 text varchar 1 day {} \x01 \ No newline at end of file diff --git a/proto/batch_plan.proto b/proto/batch_plan.proto index 8c74b93d96fad..aefa17de23700 100644 --- a/proto/batch_plan.proto +++ b/proto/batch_plan.proto @@ -93,6 +93,17 @@ message FileScanNode { repeated string file_location = 7; } +// NOTE(kwannoel): This will only be used in batch mode. We can change the definition as needed. +message PostgresQueryNode { + repeated plan_common.ColumnDesc columns = 1; + string hostname = 2; + string port = 3; + string username = 4; + string password = 5; + string database = 6; + string query = 7; +} + message ProjectNode { repeated expr.ExprNode select_list = 1; } @@ -373,6 +384,7 @@ message PlanNode { LogRowSeqScanNode log_row_seq_scan = 37; FileScanNode file_scan = 38; IcebergScanNode iceberg_scan = 39; + PostgresQueryNode postgres_query = 40; // The following nodes are used for testing. bool block_executor = 100; bool busy_loop_executor = 101; diff --git a/proto/expr.proto b/proto/expr.proto index 808f402a77aa8..8e854729ab6c6 100644 --- a/proto/expr.proto +++ b/proto/expr.proto @@ -362,6 +362,8 @@ message TableFunction { JSONB_TO_RECORDSET = 17; // file scan FILE_SCAN = 19; + // postgres query + POSTGRES_QUERY = 20; // User defined table function USER_DEFINED = 100; } diff --git a/src/batch/Cargo.toml b/src/batch/Cargo.toml index 3970f4eab4324..46c4aa7b9de6e 100644 --- a/src/batch/Cargo.toml +++ b/src/batch/Cargo.toml @@ -60,6 +60,7 @@ tokio = { version = "0.2", package = "madsim-tokio", features = [ "fs", ] } tokio-metrics = "0.3.0" +tokio-postgres = "0.7" tokio-stream = { workspace = true } tokio-util = { workspace = true } tonic = { workspace = true } diff --git a/src/batch/src/error.rs b/src/batch/src/error.rs index 780b90cf178b0..ae26fca228321 100644 --- a/src/batch/src/error.rs +++ b/src/batch/src/error.rs @@ -29,6 +29,7 @@ use risingwave_rpc_client::error::{RpcError, ToTonicStatus}; use risingwave_storage::error::StorageError; use thiserror::Error; use thiserror_ext::Construct; +use tokio_postgres; use tonic::Status; use crate::worker_manager::worker_node_manager::FragmentId; @@ -127,6 +128,13 @@ pub enum BatchError { ParquetError, ), + #[error(transparent)] + Postgres( + #[from] + #[backtrace] + tokio_postgres::Error, + ), + // Make the ref-counted type to be a variant for easier code structuring. // TODO(error-handling): replace with `thiserror_ext::Arc` #[error(transparent)] diff --git a/src/batch/src/executor/mod.rs b/src/batch/src/executor/mod.rs index 53dabccaf260f..c3bd373198df7 100644 --- a/src/batch/src/executor/mod.rs +++ b/src/batch/src/executor/mod.rs @@ -30,6 +30,7 @@ mod max_one_row; mod merge_sort; mod merge_sort_exchange; mod order_by; +mod postgres_query; mod project; mod project_set; mod row_seq_scan; @@ -65,6 +66,7 @@ pub use max_one_row::*; pub use merge_sort::*; pub use merge_sort_exchange::*; pub use order_by::*; +pub use postgres_query::*; pub use project::*; pub use project_set::*; use risingwave_common::array::DataChunk; @@ -244,6 +246,7 @@ impl<'a, C: BatchTaskContext> ExecutorBuilder<'a, C> { NodeBody::MaxOneRow => MaxOneRowExecutor, NodeBody::FileScan => FileScanExecutorBuilder, NodeBody::IcebergScan => IcebergScanExecutorBuilder, + NodeBody::PostgresQuery => PostgresQueryExecutorBuilder, // Follow NodeBody only used for test NodeBody::BlockExecutor => BlockExecutorBuilder, NodeBody::BusyLoopExecutor => BusyLoopExecutorBuilder, diff --git a/src/batch/src/executor/postgres_query.rs b/src/batch/src/executor/postgres_query.rs new file mode 100644 index 0000000000000..59a9e56d441bc --- /dev/null +++ b/src/batch/src/executor/postgres_query.rs @@ -0,0 +1,194 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use anyhow::Context; +use futures_async_stream::try_stream; +use futures_util::stream::StreamExt; +use risingwave_common::catalog::{Field, Schema}; +use risingwave_common::row::OwnedRow; +use risingwave_common::types::{DataType, Datum, Decimal, ScalarImpl}; +use risingwave_common::util::chunk_coalesce::DataChunkBuilder; +use risingwave_pb::batch_plan::plan_node::NodeBody; +use thiserror_ext::AsReport; +use tokio_postgres; + +use crate::error::BatchError; +use crate::executor::{BoxedExecutor, BoxedExecutorBuilder, DataChunk, Executor, ExecutorBuilder}; +use crate::task::BatchTaskContext; + +/// `PostgresQuery` executor. Runs a query against a Postgres database. +pub struct PostgresQueryExecutor { + schema: Schema, + host: String, + port: String, + username: String, + password: String, + database: String, + query: String, + identity: String, +} + +impl Executor for PostgresQueryExecutor { + fn schema(&self) -> &risingwave_common::catalog::Schema { + &self.schema + } + + fn identity(&self) -> &str { + &self.identity + } + + fn execute(self: Box) -> super::BoxedDataChunkStream { + self.do_execute().boxed() + } +} + +pub fn postgres_row_to_owned_row( + row: tokio_postgres::Row, + schema: &Schema, +) -> Result { + let mut datums = vec![]; + for i in 0..schema.fields.len() { + let rw_field = &schema.fields[i]; + let name = rw_field.name.as_str(); + let datum = postgres_cell_to_scalar_impl(&row, &rw_field.data_type, i, name)?; + datums.push(datum); + } + Ok(OwnedRow::new(datums)) +} + +// TODO(kwannoel): Support more types, see postgres connector's ScalarAdapter. +fn postgres_cell_to_scalar_impl( + row: &tokio_postgres::Row, + data_type: &DataType, + i: usize, + name: &str, +) -> Result { + let datum = match data_type { + DataType::Boolean + | DataType::Int16 + | DataType::Int32 + | DataType::Int64 + | DataType::Float32 + | DataType::Float64 + | DataType::Date + | DataType::Time + | DataType::Timestamp + | DataType::Timestamptz + | DataType::Jsonb + | DataType::Interval + | DataType::Varchar + | DataType::Bytea => { + // ScalarAdapter is also fine. But ScalarImpl is more efficient + row.try_get::<_, Option>(i)? + } + DataType::Decimal => { + // Decimal is more efficient than PgNumeric in ScalarAdapter + let val = row.try_get::<_, Option>(i)?; + val.map(ScalarImpl::from) + } + _ => { + tracing::warn!(name, ?data_type, "unsupported data type, set to null"); + None + } + }; + Ok(datum) +} + +impl PostgresQueryExecutor { + pub fn new( + schema: Schema, + host: String, + port: String, + username: String, + password: String, + database: String, + query: String, + identity: String, + ) -> Self { + Self { + schema, + host, + port, + username, + password, + database, + query, + identity, + } + } + + #[try_stream(ok = DataChunk, error = BatchError)] + async fn do_execute(self: Box) { + tracing::debug!("postgres_query_executor: started"); + let conn_str = format!( + "host={} port={} user={} password={} dbname={}", + self.host, self.port, self.username, self.password, self.database + ); + let (client, conn) = tokio_postgres::connect(&conn_str, tokio_postgres::NoTls).await?; + + tokio::spawn(async move { + if let Err(e) = conn.await { + tracing::error!( + "postgres_query_executor: connection error: {:?}", + e.as_report() + ); + } + }); + + // TODO(kwannoel): Use pagination using CURSOR. + let rows = client + .query(&self.query, &[]) + .await + .context("postgres_query received error from remote server")?; + let mut builder = DataChunkBuilder::new(self.schema.data_types(), 1024); + tracing::debug!("postgres_query_executor: query executed, start deserializing rows"); + // deserialize the rows + for row in rows { + let owned_row = postgres_row_to_owned_row(row, &self.schema)?; + if let Some(chunk) = builder.append_one_row(owned_row) { + yield chunk; + } + } + if let Some(chunk) = builder.consume_all() { + yield chunk; + } + return Ok(()); + } +} + +pub struct PostgresQueryExecutorBuilder {} + +#[async_trait::async_trait] +impl BoxedExecutorBuilder for PostgresQueryExecutorBuilder { + async fn new_boxed_executor( + source: &ExecutorBuilder<'_, C>, + _inputs: Vec, + ) -> crate::error::Result { + let postgres_query_node = try_match_expand!( + source.plan_node().get_node_body().unwrap(), + NodeBody::PostgresQuery + )?; + + Ok(Box::new(PostgresQueryExecutor::new( + Schema::from_iter(postgres_query_node.columns.iter().map(Field::from)), + postgres_query_node.hostname.clone(), + postgres_query_node.port.clone(), + postgres_query_node.username.clone(), + postgres_query_node.password.clone(), + postgres_query_node.database.clone(), + postgres_query_node.query.clone(), + source.plan_node().get_identity().clone(), + ))) + } +} diff --git a/src/frontend/Cargo.toml b/src/frontend/Cargo.toml index 8116dc3369ace..9d6fe8073bec6 100644 --- a/src/frontend/Cargo.toml +++ b/src/frontend/Cargo.toml @@ -91,6 +91,7 @@ tokio = { version = "0.2", package = "madsim-tokio", features = [ "signal", "fs", ] } +tokio-postgres = "0.7" tokio-stream = { workspace = true } tonic = { workspace = true } tracing = "0.1" diff --git a/src/frontend/src/binder/expr/function/mod.rs b/src/frontend/src/binder/expr/function/mod.rs index 00f2438cb35af..ddc21c6ee7ac7 100644 --- a/src/frontend/src/binder/expr/function/mod.rs +++ b/src/frontend/src/binder/expr/function/mod.rs @@ -16,6 +16,7 @@ use std::collections::HashMap; use std::str::FromStr; use std::sync::Arc; +use anyhow::Context; use itertools::Itertools; use risingwave_common::bail_not_implemented; use risingwave_common::catalog::{INFORMATION_SCHEMA_SCHEMA_NAME, PG_CATALOG_SCHEMA_NAME}; @@ -321,6 +322,17 @@ impl Binder { self.ensure_table_function_allowed()?; return Ok(TableFunction::new_file_scan(args)?.into()); } + // `postgres_query` table function + if func_name.eq("postgres_query") { + reject_syntax!( + arg_list.variadic, + "`VARIADIC` is not allowed in table function call" + ); + self.ensure_table_function_allowed()?; + return Ok(TableFunction::new_postgres_query(args) + .context("postgres_query error")? + .into()); + } // UDTF if let Some(ref udf) = udf && udf.kind.is_table() diff --git a/src/frontend/src/expr/table_function.rs b/src/frontend/src/expr/table_function.rs index 5806eea792904..5f22398cc5834 100644 --- a/src/frontend/src/expr/table_function.rs +++ b/src/frontend/src/expr/table_function.rs @@ -20,12 +20,23 @@ use risingwave_common::types::{DataType, ScalarImpl, StructType}; use risingwave_connector::source::iceberg::{create_parquet_stream_builder, list_s3_directory}; pub use risingwave_pb::expr::table_function::PbType as TableFunctionType; use risingwave_pb::expr::PbTableFunction; +use thiserror_ext::AsReport; use tokio::runtime::Runtime; +use tokio_postgres; +use tokio_postgres::types::Type as TokioPgType; use super::{infer_type, Expr, ExprImpl, ExprRewriter, Literal, RwResult}; use crate::catalog::function_catalog::{FunctionCatalog, FunctionKind}; use crate::error::ErrorCode::BindError; +static RUNTIME: LazyLock = LazyLock::new(|| { + tokio::runtime::Builder::new_multi_thread() + .thread_name("rw-binder-ext-query") + .enable_all() + .build() + .expect("failed to build external system querying runtime") +}); + /// A table function takes a row as input and returns a table. It is also known as Set-Returning /// Function. /// @@ -141,14 +152,6 @@ impl TableFunction { #[cfg(not(madsim))] { - static RUNTIME: LazyLock = LazyLock::new(|| { - tokio::runtime::Builder::new_multi_thread() - .thread_name("rw-file-scan") - .enable_all() - .build() - .expect("failed to build file-scan runtime") - }); - let files = if eval_args[5].ends_with('/') { let files = tokio::task::block_in_place(|| { RUNTIME.block_on(async { @@ -226,6 +229,127 @@ impl TableFunction { }) } + pub fn new_postgres_query(args: Vec) -> RwResult { + let args = { + if args.len() != 6 { + return Err(BindError("postgres_query function only accepts 6 arguments: postgres_query(hostname varchar, port varchar, username varchar, password varchar, database_name varchar, postgres_query varchar)".to_string()).into()); + } + let mut cast_args = Vec::with_capacity(6); + for arg in args { + let arg = arg.cast_implicit(DataType::Varchar)?; + cast_args.push(arg); + } + cast_args + }; + let evaled_args = { + let mut evaled_args: Vec = Vec::with_capacity(6); + for arg in &args { + match arg.try_fold_const() { + Some(Ok(value)) => { + let Some(scalar) = value else { + return Err(BindError( + "postgres_query function does not accept null arguments" + .to_string(), + ) + .into()); + }; + evaled_args.push(scalar.into_utf8().into()); + } + Some(Err(err)) => { + return Err(err); + } + None => { + return Err(BindError( + "postgres_query function only accepts constant arguments".to_string(), + ) + .into()); + } + } + } + evaled_args + }; + + #[cfg(madsim)] + { + return Err(crate::error::ErrorCode::BindError( + "postgres_query can't be used in the madsim mode".to_string(), + ) + .into()); + } + + #[cfg(not(madsim))] + { + let schema = tokio::task::block_in_place(|| { + RUNTIME.block_on(async { + let (client, connection) = tokio_postgres::connect( + format!( + "host={} port={} user={} password={} dbname={}", + evaled_args[0], + evaled_args[1], + evaled_args[2], + evaled_args[3], + evaled_args[4] + ) + .as_str(), + tokio_postgres::NoTls, + ) + .await?; + + tokio::spawn(async move { + if let Err(e) = connection.await { + tracing::error!( + "postgres_query_executor: connection error: {:?}", + e.as_report() + ); + } + }); + + let statement = client.prepare(evaled_args[5].as_str()).await?; + + let mut rw_types = vec![]; + for column in statement.columns() { + let name = column.name().to_string(); + let data_type = match *column.type_() { + TokioPgType::BOOL => DataType::Boolean, + TokioPgType::INT2 => DataType::Int16, + TokioPgType::INT4 => DataType::Int32, + TokioPgType::INT8 => DataType::Int64, + TokioPgType::FLOAT4 => DataType::Float32, + TokioPgType::FLOAT8 => DataType::Float64, + TokioPgType::NUMERIC => DataType::Decimal, + TokioPgType::DATE => DataType::Date, + TokioPgType::TIME => DataType::Time, + TokioPgType::TIMESTAMP => DataType::Timestamp, + TokioPgType::TIMESTAMPTZ => DataType::Timestamptz, + TokioPgType::TEXT | TokioPgType::VARCHAR => DataType::Varchar, + TokioPgType::INTERVAL => DataType::Interval, + TokioPgType::JSONB => DataType::Jsonb, + TokioPgType::BYTEA => DataType::Bytea, + _ => { + return Err(crate::error::ErrorCode::BindError( + format!("unsupported column type: {}", column.type_()) + .to_string(), + ) + .into()); + } + }; + rw_types.push((name, data_type)); + } + Ok::(DataType::Struct( + StructType::new(rw_types), + )) + }) + })?; + + Ok(TableFunction { + args, + return_type: schema, + function_type: TableFunctionType::PostgresQuery, + user_defined: None, + }) + } + } + pub fn to_protobuf(&self) -> PbTableFunction { PbTableFunction { function_type: self.function_type as i32, diff --git a/src/frontend/src/optimizer/logical_optimization.rs b/src/frontend/src/optimizer/logical_optimization.rs index 93da63cf70a42..a03bb95a0d51e 100644 --- a/src/frontend/src/optimizer/logical_optimization.rs +++ b/src/frontend/src/optimizer/logical_optimization.rs @@ -132,6 +132,9 @@ static TABLE_FUNCTION_CONVERT: LazyLock = LazyLock::new(|| { vec![ // Apply file scan rule first TableFunctionToFileScanRule::create(), + // Apply postgres query rule next + TableFunctionToPostgresQueryRule::create(), + // Apply project set rule last TableFunctionToProjectSetRule::create(), ], ApplyOrder::TopDown, @@ -146,6 +149,14 @@ static TABLE_FUNCTION_TO_FILE_SCAN: LazyLock = LazyLock::new( ) }); +static TABLE_FUNCTION_TO_POSTGRES_QUERY: LazyLock = LazyLock::new(|| { + OptimizationStage::new( + "Table Function To PostgresQuery", + vec![TableFunctionToPostgresQueryRule::create()], + ApplyOrder::TopDown, + ) +}); + static VALUES_EXTRACT_PROJECT: LazyLock = LazyLock::new(|| { OptimizationStage::new( "Values Extract Project", @@ -702,6 +713,7 @@ impl LogicalOptimizer { plan = plan.optimize_by_rules(&ALWAYS_FALSE_FILTER); // Table function should be converted into `file_scan` before `project_set`. plan = plan.optimize_by_rules(&TABLE_FUNCTION_TO_FILE_SCAN); + plan = plan.optimize_by_rules(&TABLE_FUNCTION_TO_POSTGRES_QUERY); // In order to unnest a table function, we need to convert it into a `project_set` first. plan = plan.optimize_by_rules(&TABLE_FUNCTION_CONVERT); diff --git a/src/frontend/src/optimizer/plan_node/batch_postgres_query.rs b/src/frontend/src/optimizer/plan_node/batch_postgres_query.rs new file mode 100644 index 0000000000000..9c5c1b21f8550 --- /dev/null +++ b/src/frontend/src/optimizer/plan_node/batch_postgres_query.rs @@ -0,0 +1,96 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use pretty_xmlish::XmlNode; +use risingwave_pb::batch_plan::plan_node::NodeBody; +use risingwave_pb::batch_plan::PostgresQueryNode; + +use super::batch::prelude::*; +use super::utils::{childless_record, column_names_pretty, Distill}; +use super::{ + generic, ExprRewritable, PlanBase, PlanRef, ToBatchPb, ToDistributedBatch, ToLocalBatch, +}; +use crate::error::Result; +use crate::optimizer::plan_node::expr_visitable::ExprVisitable; +use crate::optimizer::property::{Distribution, Order}; + +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +pub struct BatchPostgresQuery { + pub base: PlanBase, + pub core: generic::PostgresQuery, +} + +impl BatchPostgresQuery { + pub fn new(core: generic::PostgresQuery) -> Self { + let base = PlanBase::new_batch_with_core(&core, Distribution::Single, Order::any()); + + Self { base, core } + } + + pub fn column_names(&self) -> Vec<&str> { + self.schema().names_str() + } + + pub fn clone_with_dist(&self) -> Self { + let base = self.base.clone_with_new_distribution(Distribution::Single); + Self { + base, + core: self.core.clone(), + } + } +} + +impl_plan_tree_node_for_leaf! { BatchPostgresQuery } + +impl Distill for BatchPostgresQuery { + fn distill<'a>(&self) -> XmlNode<'a> { + let fields = vec![("columns", column_names_pretty(self.schema()))]; + childless_record("BatchPostgresQuery", fields) + } +} + +impl ToLocalBatch for BatchPostgresQuery { + fn to_local(&self) -> Result { + Ok(self.clone_with_dist().into()) + } +} + +impl ToDistributedBatch for BatchPostgresQuery { + fn to_distributed(&self) -> Result { + Ok(self.clone_with_dist().into()) + } +} + +impl ToBatchPb for BatchPostgresQuery { + fn to_batch_prost_body(&self) -> NodeBody { + NodeBody::PostgresQuery(PostgresQueryNode { + columns: self + .core + .columns() + .iter() + .map(|c| c.to_protobuf()) + .collect(), + hostname: self.core.hostname.clone(), + port: self.core.port.clone(), + username: self.core.username.clone(), + password: self.core.password.clone(), + database: self.core.database.clone(), + query: self.core.query.clone(), + }) + } +} + +impl ExprRewritable for BatchPostgresQuery {} + +impl ExprVisitable for BatchPostgresQuery {} diff --git a/src/frontend/src/optimizer/plan_node/generic/mod.rs b/src/frontend/src/optimizer/plan_node/generic/mod.rs index 3e01dee8aa0b9..6a076025b906c 100644 --- a/src/frontend/src/optimizer/plan_node/generic/mod.rs +++ b/src/frontend/src/optimizer/plan_node/generic/mod.rs @@ -89,6 +89,9 @@ pub use now::*; mod file_scan; pub use file_scan::*; +mod postgres_query; +pub use postgres_query::*; + pub trait DistillUnit { fn distill_with_name<'a>(&self, name: impl Into>) -> XmlNode<'a>; } diff --git a/src/frontend/src/optimizer/plan_node/generic/postgres_query.rs b/src/frontend/src/optimizer/plan_node/generic/postgres_query.rs new file mode 100644 index 0000000000000..b3bc631bcedfe --- /dev/null +++ b/src/frontend/src/optimizer/plan_node/generic/postgres_query.rs @@ -0,0 +1,67 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use educe::Educe; +use risingwave_common::catalog::{ColumnDesc, ColumnId, Schema}; + +use super::GenericPlanNode; +use crate::optimizer::optimizer_context::OptimizerContextRef; +use crate::optimizer::property::FunctionalDependencySet; + +#[derive(Debug, Clone, Educe)] +#[educe(PartialEq, Eq, Hash)] +pub struct PostgresQuery { + pub schema: Schema, + pub hostname: String, + pub port: String, + pub username: String, + pub password: String, + pub database: String, + pub query: String, + + #[educe(PartialEq(ignore))] + #[educe(Hash(ignore))] + pub ctx: OptimizerContextRef, +} + +impl GenericPlanNode for PostgresQuery { + fn schema(&self) -> Schema { + self.schema.clone() + } + + fn stream_key(&self) -> Option> { + None + } + + fn ctx(&self) -> OptimizerContextRef { + self.ctx.clone() + } + + fn functional_dependency(&self) -> FunctionalDependencySet { + FunctionalDependencySet::new(self.schema.len()) + } +} + +impl PostgresQuery { + pub fn columns(&self) -> Vec { + self.schema + .fields + .iter() + .enumerate() + .map(|(i, f)| { + ColumnDesc::named(f.name.clone(), ColumnId::new(i as i32), f.data_type.clone()) + }) + .collect() + } +} diff --git a/src/frontend/src/optimizer/plan_node/logical_postgres_query.rs b/src/frontend/src/optimizer/plan_node/logical_postgres_query.rs new file mode 100644 index 0000000000000..9082bd86a3f37 --- /dev/null +++ b/src/frontend/src/optimizer/plan_node/logical_postgres_query.rs @@ -0,0 +1,115 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use pretty_xmlish::XmlNode; +use risingwave_common::bail; +use risingwave_common::catalog::Schema; + +use super::generic::GenericPlanRef; +use super::utils::{childless_record, Distill}; +use super::{ + generic, BatchPostgresQuery, ColPrunable, ExprRewritable, Logical, LogicalProject, PlanBase, + PlanRef, PredicatePushdown, ToBatch, ToStream, +}; +use crate::error::Result; +use crate::optimizer::plan_node::expr_visitable::ExprVisitable; +use crate::optimizer::plan_node::utils::column_names_pretty; +use crate::optimizer::plan_node::{ + ColumnPruningContext, LogicalFilter, PredicatePushdownContext, RewriteStreamContext, + ToStreamContext, +}; +use crate::utils::{ColIndexMapping, Condition}; +use crate::OptimizerContextRef; + +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +pub struct LogicalPostgresQuery { + pub base: PlanBase, + pub core: generic::PostgresQuery, +} + +impl LogicalPostgresQuery { + pub fn new( + ctx: OptimizerContextRef, + schema: Schema, + hostname: String, + port: String, + username: String, + password: String, + database: String, + query: String, + ) -> Self { + let core = generic::PostgresQuery { + schema, + hostname, + port, + username, + password, + database, + query, + ctx, + }; + + let base = PlanBase::new_logical_with_core(&core); + + LogicalPostgresQuery { base, core } + } +} + +impl_plan_tree_node_for_leaf! {LogicalPostgresQuery} +impl Distill for LogicalPostgresQuery { + fn distill<'a>(&self) -> XmlNode<'a> { + let fields = vec![("columns", column_names_pretty(self.schema()))]; + childless_record("LogicalPostgresQuery", fields) + } +} + +impl ColPrunable for LogicalPostgresQuery { + fn prune_col(&self, required_cols: &[usize], _ctx: &mut ColumnPruningContext) -> PlanRef { + LogicalProject::with_out_col_idx(self.clone().into(), required_cols.iter().cloned()).into() + } +} + +impl ExprRewritable for LogicalPostgresQuery {} + +impl ExprVisitable for LogicalPostgresQuery {} + +impl PredicatePushdown for LogicalPostgresQuery { + fn predicate_pushdown( + &self, + predicate: Condition, + _ctx: &mut PredicatePushdownContext, + ) -> PlanRef { + // No pushdown. + LogicalFilter::create(self.clone().into(), predicate) + } +} + +impl ToBatch for LogicalPostgresQuery { + fn to_batch(&self) -> Result { + Ok(BatchPostgresQuery::new(self.core.clone()).into()) + } +} + +impl ToStream for LogicalPostgresQuery { + fn to_stream(&self, _ctx: &mut ToStreamContext) -> Result { + bail!("file_scan function is not supported in streaming mode") + } + + fn logical_rewrite_for_stream( + &self, + _ctx: &mut RewriteStreamContext, + ) -> Result<(PlanRef, ColIndexMapping)> { + bail!("file_scan function is not supported in streaming mode") + } +} diff --git a/src/frontend/src/optimizer/plan_node/mod.rs b/src/frontend/src/optimizer/plan_node/mod.rs index 0ec266cd2339d..cfcc4f0709569 100644 --- a/src/frontend/src/optimizer/plan_node/mod.rs +++ b/src/frontend/src/optimizer/plan_node/mod.rs @@ -922,9 +922,11 @@ mod stream_watermark_filter; mod batch_file_scan; mod batch_iceberg_scan; mod batch_kafka_scan; +mod batch_postgres_query; mod derive; mod logical_file_scan; mod logical_iceberg_scan; +mod logical_postgres_query; mod stream_cdc_table_scan; mod stream_share; mod stream_temporal_join; @@ -949,6 +951,7 @@ pub use batch_lookup_join::BatchLookupJoin; pub use batch_max_one_row::BatchMaxOneRow; pub use batch_nested_loop_join::BatchNestedLoopJoin; pub use batch_over_window::BatchOverWindow; +pub use batch_postgres_query::BatchPostgresQuery; pub use batch_project::BatchProject; pub use batch_project_set::BatchProjectSet; pub use batch_seq_scan::BatchSeqScan; @@ -984,6 +987,7 @@ pub use logical_max_one_row::LogicalMaxOneRow; pub use logical_multi_join::{LogicalMultiJoin, LogicalMultiJoinBuilder}; pub use logical_now::LogicalNow; pub use logical_over_window::LogicalOverWindow; +pub use logical_postgres_query::LogicalPostgresQuery; pub use logical_project::LogicalProject; pub use logical_project_set::LogicalProjectSet; pub use logical_recursive_union::LogicalRecursiveUnion; @@ -1095,6 +1099,7 @@ macro_rules! for_all_plan_nodes { , { Logical, CteRef } , { Logical, ChangeLog } , { Logical, FileScan } + , { Logical, PostgresQuery } , { Batch, SimpleAgg } , { Batch, HashAgg } , { Batch, SortAgg } @@ -1126,6 +1131,7 @@ macro_rules! for_all_plan_nodes { , { Batch, KafkaScan } , { Batch, IcebergScan } , { Batch, FileScan } + , { Batch, PostgresQuery } , { Stream, Project } , { Stream, Filter } , { Stream, TableScan } @@ -1207,6 +1213,7 @@ macro_rules! for_logical_plan_nodes { , { Logical, CteRef } , { Logical, ChangeLog } , { Logical, FileScan } + , { Logical, PostgresQuery } } }; } @@ -1247,6 +1254,7 @@ macro_rules! for_batch_plan_nodes { , { Batch, KafkaScan } , { Batch, IcebergScan } , { Batch, FileScan } + , { Batch, PostgresQuery } } }; } diff --git a/src/frontend/src/optimizer/rule/mod.rs b/src/frontend/src/optimizer/rule/mod.rs index 180dafa0c79b6..56d79bf7b408b 100644 --- a/src/frontend/src/optimizer/rule/mod.rs +++ b/src/frontend/src/optimizer/rule/mod.rs @@ -161,6 +161,7 @@ mod pull_up_correlated_predicate_agg_rule; mod source_to_iceberg_scan_rule; mod source_to_kafka_scan_rule; mod table_function_to_file_scan_rule; +mod table_function_to_postgres_query_rule; mod values_extract_project_rule; pub use batch::batch_push_limit_to_scan_rule::*; @@ -168,6 +169,7 @@ pub use pull_up_correlated_predicate_agg_rule::*; pub use source_to_iceberg_scan_rule::*; pub use source_to_kafka_scan_rule::*; pub use table_function_to_file_scan_rule::*; +pub use table_function_to_postgres_query_rule::*; pub use values_extract_project_rule::*; #[macro_export] @@ -231,6 +233,7 @@ macro_rules! for_all_rules { , { ApplyTopNTransposeRule } , { TableFunctionToProjectSetRule } , { TableFunctionToFileScanRule } + , { TableFunctionToPostgresQueryRule } , { ApplyLimitTransposeRule } , { CommonSubExprExtractRule } , { BatchProjectMergeRule } diff --git a/src/frontend/src/optimizer/rule/table_function_to_postgres_query_rule.rs b/src/frontend/src/optimizer/rule/table_function_to_postgres_query_rule.rs new file mode 100644 index 0000000000000..6935c42cc6999 --- /dev/null +++ b/src/frontend/src/optimizer/rule/table_function_to_postgres_query_rule.rs @@ -0,0 +1,90 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use itertools::Itertools; +use risingwave_common::catalog::{Field, Schema}; +use risingwave_common::types::{DataType, ScalarImpl}; +use risingwave_common::util::iter_util::ZipEqDebug; + +use super::{BoxedRule, Rule}; +use crate::expr::{Expr, TableFunctionType}; +use crate::optimizer::plan_node::generic::GenericPlanRef; +use crate::optimizer::plan_node::{LogicalPostgresQuery, LogicalTableFunction}; +use crate::optimizer::PlanRef; + +/// Transform a special `TableFunction` (with `POSTGRES_QUERY` table function type) into a `LogicalPostgresQuery` +pub struct TableFunctionToPostgresQueryRule {} +impl Rule for TableFunctionToPostgresQueryRule { + fn apply(&self, plan: PlanRef) -> Option { + let logical_table_function: &LogicalTableFunction = plan.as_logical_table_function()?; + if logical_table_function.table_function.function_type != TableFunctionType::PostgresQuery { + return None; + } + assert!(!logical_table_function.with_ordinality); + let table_function_return_type = logical_table_function.table_function().return_type(); + + if let DataType::Struct(st) = table_function_return_type.clone() { + let fields = st + .types() + .zip_eq_debug(st.names()) + .map(|(data_type, name)| Field::with_name(data_type.clone(), name.to_string())) + .collect_vec(); + + let schema = Schema::new(fields); + + assert_eq!(logical_table_function.table_function().args.len(), 6); + let mut eval_args = vec![]; + for arg in &logical_table_function.table_function().args { + assert_eq!(arg.return_type(), DataType::Varchar); + let value = arg.try_fold_const().unwrap().unwrap(); + match value { + Some(ScalarImpl::Utf8(s)) => { + eval_args.push(s.to_string()); + } + _ => { + unreachable!("must be a varchar") + } + } + } + let hostname = eval_args[0].clone(); + let port = eval_args[1].clone(); + let username = eval_args[2].clone(); + let password = eval_args[3].clone(); + let database = eval_args[4].clone(); + let query = eval_args[5].clone(); + + Some( + LogicalPostgresQuery::new( + logical_table_function.ctx(), + schema, + hostname, + port, + username, + password, + database, + query, + ) + .into(), + ) + } else { + unreachable!("TableFunction return type should be struct") + } + } +} + +impl TableFunctionToPostgresQueryRule { + pub fn create() -> BoxedRule { + Box::new(TableFunctionToPostgresQueryRule {}) + } +}