-
Notifications
You must be signed in to change notification settings - Fork 590
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(batch): support basic postgres tvf #18811
Changes from all commits
df14470
b56f281
ccc90fb
22ec6b3
514e031
b8f8aea
7e19074
26a7a11
4abe65c
b1a286d
e2e20f0
2b5946e
6d09259
c2b444b
172943b
d526c2a
111a7a5
89d3ca1
2afb409
032230a
b532620
b5e76a5
4bc2797
9885b3a
735bc75
5b66d01
c254e71
650ecd9
1413bd4
66464d9
4674982
fe10528
ad46398
4a58d05
8ea47ed
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
@@ -0,0 +1,40 @@ | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
system ok | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
PGHOST=db PGPORT=5432 PGUSER=postgres PGPASSWORD=postgres PGDATABASE=cdc_test psql -c " | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It would be better to use $RISEDEV_ envvars, instead of hard-coded ones. Then you can run it easier locally There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Current e2e-source-test does not have a postgres service managed by risedev. We can refactor it after #18099. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think we already have risedev managed pg #16662 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hmmm, we already have a dedicated risedev profile, so I don't think it will affect other stuff.. risingwave/ci/scripts/e2e-source-test.sh Lines 38 to 41 in c80cbf3
Lines 952 to 979 in c80cbf3
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I just mean to write the new tests introduced in the new style, without changing existing tests. If you mean migrating all tests to the style later after that PR, it also sounds good |
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
CREATE TABLE test ( | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
id bigint primary key, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
v1 bool, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
v2 smallint, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
v3 integer, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
v4 bigint, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
v5 real, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
v6 double precision, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
v7 numeric, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
v8 date, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
v9 time, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
v10 timestamp, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
v11 timestamptz, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
v12 text, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
v13 varchar, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
v14 interval, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
v15 jsonb, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
v16 bytea | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
);" | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
system ok | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
PGHOST=db PGPORT=5432 PGUSER=postgres PGPASSWORD=postgres PGDATABASE=cdc_test psql -c " | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
INSERT INTO test SELECT generate_series(1, 100), true, 1, 1, 1, 1.0, 1.0, 1.0, '2021-01-01', '00:00:00', '2021-01-01 00:00:00', '2021-01-01 00:00:00 pst', 'text', 'varchar', '1 day', '{}', '\x01'; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
" | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
query II | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
select * from postgres_query('db', '5432', 'postgres', 'postgres', 'cdc_test', 'select * from test where id > 90;'); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
---- | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
91 t 1 1 1 1 1 1.0 2021-01-01 00:00:00 2021-01-01 00:00:00 2021-01-01 08:00:00+00:00 text varchar 1 day {} \x01 | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
92 t 1 1 1 1 1 1.0 2021-01-01 00:00:00 2021-01-01 00:00:00 2021-01-01 08:00:00+00:00 text varchar 1 day {} \x01 | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
93 t 1 1 1 1 1 1.0 2021-01-01 00:00:00 2021-01-01 00:00:00 2021-01-01 08:00:00+00:00 text varchar 1 day {} \x01 | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
94 t 1 1 1 1 1 1.0 2021-01-01 00:00:00 2021-01-01 00:00:00 2021-01-01 08:00:00+00:00 text varchar 1 day {} \x01 | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
95 t 1 1 1 1 1 1.0 2021-01-01 00:00:00 2021-01-01 00:00:00 2021-01-01 08:00:00+00:00 text varchar 1 day {} \x01 | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
96 t 1 1 1 1 1 1.0 2021-01-01 00:00:00 2021-01-01 00:00:00 2021-01-01 08:00:00+00:00 text varchar 1 day {} \x01 | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
97 t 1 1 1 1 1 1.0 2021-01-01 00:00:00 2021-01-01 00:00:00 2021-01-01 08:00:00+00:00 text varchar 1 day {} \x01 | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
98 t 1 1 1 1 1 1.0 2021-01-01 00:00:00 2021-01-01 00:00:00 2021-01-01 08:00:00+00:00 text varchar 1 day {} \x01 | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
99 t 1 1 1 1 1 1.0 2021-01-01 00:00:00 2021-01-01 00:00:00 2021-01-01 08:00:00+00:00 text varchar 1 day {} \x01 | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
100 t 1 1 1 1 1 1.0 2021-01-01 00:00:00 2021-01-01 00:00:00 2021-01-01 08:00:00+00:00 text varchar 1 day {} \x01 |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -29,6 +29,7 @@ use risingwave_rpc_client::error::{RpcError, ToTonicStatus}; | |
use risingwave_storage::error::StorageError; | ||
use thiserror::Error; | ||
use thiserror_ext::Construct; | ||
use tokio_postgres; | ||
use tonic::Status; | ||
|
||
use crate::worker_manager::worker_node_manager::FragmentId; | ||
|
@@ -127,6 +128,13 @@ pub enum BatchError { | |
ParquetError, | ||
), | ||
|
||
#[error(transparent)] | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm not sure if the error messages from Postgres can be clearly distinguished from ours, given that we're using There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. As we introduce more external system queries and directly interact with them in the Just FYI, in #15086, we made There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
I think this a fair point. There's quite a few tvf to be supported. And we should preemptively introduce an ExtSystemError to deal with it. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I realize there's more external system errors like iceberg, parquet from before this PR, and not just postgres error, which was introduced in this PR.
|
||
Postgres( | ||
#[from] | ||
#[backtrace] | ||
tokio_postgres::Error, | ||
), | ||
|
||
// Make the ref-counted type to be a variant for easier code structuring. | ||
// TODO(error-handling): replace with `thiserror_ext::Arc` | ||
#[error(transparent)] | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,194 @@ | ||
// Copyright 2024 RisingWave Labs | ||
// | ||
// Licensed under the Apache License, Version 2.0 (the "License"); | ||
// you may not use this file except in compliance with the License. | ||
// You may obtain a copy of the License at | ||
// | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, software | ||
// distributed under the License is distributed on an "AS IS" BASIS, | ||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
// See the License for the specific language governing permissions and | ||
// limitations under the License. | ||
|
||
use anyhow::Context; | ||
use futures_async_stream::try_stream; | ||
use futures_util::stream::StreamExt; | ||
use risingwave_common::catalog::{Field, Schema}; | ||
use risingwave_common::row::OwnedRow; | ||
use risingwave_common::types::{DataType, Datum, Decimal, ScalarImpl}; | ||
use risingwave_common::util::chunk_coalesce::DataChunkBuilder; | ||
use risingwave_pb::batch_plan::plan_node::NodeBody; | ||
use thiserror_ext::AsReport; | ||
use tokio_postgres; | ||
|
||
use crate::error::BatchError; | ||
use crate::executor::{BoxedExecutor, BoxedExecutorBuilder, DataChunk, Executor, ExecutorBuilder}; | ||
use crate::task::BatchTaskContext; | ||
|
||
/// `PostgresQuery` executor. Runs a query against a Postgres database. | ||
pub struct PostgresQueryExecutor { | ||
schema: Schema, | ||
host: String, | ||
port: String, | ||
username: String, | ||
password: String, | ||
database: String, | ||
query: String, | ||
identity: String, | ||
} | ||
|
||
impl Executor for PostgresQueryExecutor { | ||
fn schema(&self) -> &risingwave_common::catalog::Schema { | ||
&self.schema | ||
} | ||
|
||
fn identity(&self) -> &str { | ||
&self.identity | ||
} | ||
|
||
fn execute(self: Box<Self>) -> super::BoxedDataChunkStream { | ||
self.do_execute().boxed() | ||
} | ||
} | ||
|
||
pub fn postgres_row_to_owned_row( | ||
row: tokio_postgres::Row, | ||
schema: &Schema, | ||
) -> Result<OwnedRow, BatchError> { | ||
let mut datums = vec![]; | ||
for i in 0..schema.fields.len() { | ||
let rw_field = &schema.fields[i]; | ||
let name = rw_field.name.as_str(); | ||
let datum = postgres_cell_to_scalar_impl(&row, &rw_field.data_type, i, name)?; | ||
datums.push(datum); | ||
} | ||
Ok(OwnedRow::new(datums)) | ||
} | ||
|
||
// TODO(kwannoel): Support more types, see postgres connector's ScalarAdapter. | ||
fn postgres_cell_to_scalar_impl( | ||
row: &tokio_postgres::Row, | ||
data_type: &DataType, | ||
i: usize, | ||
name: &str, | ||
) -> Result<Datum, BatchError> { | ||
let datum = match data_type { | ||
DataType::Boolean | ||
| DataType::Int16 | ||
| DataType::Int32 | ||
| DataType::Int64 | ||
| DataType::Float32 | ||
| DataType::Float64 | ||
| DataType::Date | ||
| DataType::Time | ||
| DataType::Timestamp | ||
| DataType::Timestamptz | ||
| DataType::Jsonb | ||
| DataType::Interval | ||
| DataType::Varchar | ||
| DataType::Bytea => { | ||
// ScalarAdapter is also fine. But ScalarImpl is more efficient | ||
row.try_get::<_, Option<ScalarImpl>>(i)? | ||
kwannoel marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} | ||
DataType::Decimal => { | ||
// Decimal is more efficient than PgNumeric in ScalarAdapter | ||
let val = row.try_get::<_, Option<Decimal>>(i)?; | ||
val.map(ScalarImpl::from) | ||
} | ||
_ => { | ||
kwannoel marked this conversation as resolved.
Show resolved
Hide resolved
|
||
tracing::warn!(name, ?data_type, "unsupported data type, set to null"); | ||
None | ||
} | ||
}; | ||
Ok(datum) | ||
} | ||
|
||
impl PostgresQueryExecutor { | ||
pub fn new( | ||
schema: Schema, | ||
host: String, | ||
port: String, | ||
username: String, | ||
password: String, | ||
database: String, | ||
query: String, | ||
identity: String, | ||
) -> Self { | ||
Self { | ||
schema, | ||
host, | ||
port, | ||
username, | ||
password, | ||
database, | ||
query, | ||
identity, | ||
} | ||
} | ||
|
||
#[try_stream(ok = DataChunk, error = BatchError)] | ||
async fn do_execute(self: Box<Self>) { | ||
tracing::debug!("postgres_query_executor: started"); | ||
let conn_str = format!( | ||
"host={} port={} user={} password={} dbname={}", | ||
self.host, self.port, self.username, self.password, self.database | ||
); | ||
let (client, conn) = tokio_postgres::connect(&conn_str, tokio_postgres::NoTls).await?; | ||
|
||
tokio::spawn(async move { | ||
if let Err(e) = conn.await { | ||
tracing::error!( | ||
"postgres_query_executor: connection error: {:?}", | ||
e.as_report() | ||
); | ||
} | ||
}); | ||
|
||
// TODO(kwannoel): Use pagination using CURSOR. | ||
let rows = client | ||
.query(&self.query, &[]) | ||
.await | ||
.context("postgres_query received error from remote server")?; | ||
let mut builder = DataChunkBuilder::new(self.schema.data_types(), 1024); | ||
tracing::debug!("postgres_query_executor: query executed, start deserializing rows"); | ||
// deserialize the rows | ||
for row in rows { | ||
let owned_row = postgres_row_to_owned_row(row, &self.schema)?; | ||
if let Some(chunk) = builder.append_one_row(owned_row) { | ||
yield chunk; | ||
} | ||
} | ||
if let Some(chunk) = builder.consume_all() { | ||
yield chunk; | ||
} | ||
return Ok(()); | ||
} | ||
} | ||
|
||
pub struct PostgresQueryExecutorBuilder {} | ||
|
||
#[async_trait::async_trait] | ||
impl BoxedExecutorBuilder for PostgresQueryExecutorBuilder { | ||
async fn new_boxed_executor<C: BatchTaskContext>( | ||
source: &ExecutorBuilder<'_, C>, | ||
_inputs: Vec<BoxedExecutor>, | ||
) -> crate::error::Result<BoxedExecutor> { | ||
let postgres_query_node = try_match_expand!( | ||
source.plan_node().get_node_body().unwrap(), | ||
NodeBody::PostgresQuery | ||
)?; | ||
|
||
Ok(Box::new(PostgresQueryExecutor::new( | ||
Schema::from_iter(postgres_query_node.columns.iter().map(Field::from)), | ||
postgres_query_node.hostname.clone(), | ||
postgres_query_node.port.clone(), | ||
postgres_query_node.username.clone(), | ||
postgres_query_node.password.clone(), | ||
postgres_query_node.database.clone(), | ||
postgres_query_node.query.clone(), | ||
source.plan_node().get_identity().clone(), | ||
))) | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would be better to put in
e2e_test/source_inline
, and no need to modify CI scriptThere was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Currently inline source test does not have a postgres service managed by risedev. We can refactor it after #18099.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
#18858