Skip to content

Commit

Permalink
Merge pull request #78 from supabase/feat/add-numeric-bq-fdw
Browse files Browse the repository at this point in the history
feat: add numeric type support and timeout option for bq fdw
  • Loading branch information
burmecia authored Apr 4, 2023
2 parents 0467ab4 + e7acf17 commit 0cd81d1
Show file tree
Hide file tree
Showing 6 changed files with 68 additions and 6 deletions.
17 changes: 17 additions & 0 deletions docs/bigquery.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,20 @@

BigQuery FDW supports both data read and modify.

### Supported Data Types

| Postgres Type | BigQuery Type |
| ------------------ | --------------- |
| boolean | BOOL |
| bigint | INT64 |
| double precision | FLOAT64 |
| numeric | NUMERIC |
| text | STRING |
| varchar | STRING |
| date | DATE |
| timestamp | DATETIME |
| timestamp | TIMESTAMP |

### Wrapper
To get started with the BigQuery wrapper, create a foreign data wrapper specifying `handler` and `validator` as below.

Expand Down Expand Up @@ -105,7 +119,10 @@ The full list of foreign table options are below:
table '(select * except(props), to_json_string(props) as props from `my_project.my_dataset.my_table`)'
```

**Note**: When using subquery in this option, full qualitified table name must be used.

- `location` - Source table location, optional. Default is 'US'.
- `timeout` - Query request timeout in milliseconds, optional. Default is '30000' (30 seconds).
- `rowid_column` - Primary key column name, optional for data scan, required for data modify

#### Examples
Expand Down
9 changes: 8 additions & 1 deletion supabase-wrappers/src/interface.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use crate::FdwRoutine;
use pgx::prelude::{Date, Timestamp};
use pgx::{
pg_sys::{self, Datum, Oid},
AllocatedByRust, FromDatum, IntoDatum, JsonB, PgBuiltInOids, PgOid,
AllocatedByRust, AnyNumeric, FromDatum, IntoDatum, JsonB, PgBuiltInOids, PgOid,
};
use std::collections::HashMap;
use std::fmt;
Expand Down Expand Up @@ -37,6 +37,7 @@ pub enum Cell {
I32(i32),
F64(f64),
I64(i64),
Numeric(AnyNumeric),
String(String),
Date(Date),
Timestamp(Timestamp),
Expand All @@ -53,6 +54,7 @@ impl Clone for Cell {
Cell::I32(v) => Cell::I32(*v),
Cell::F64(v) => Cell::F64(*v),
Cell::I64(v) => Cell::I64(*v),
Cell::Numeric(v) => Cell::Numeric(v.clone()),
Cell::String(v) => Cell::String(v.clone()),
Cell::Date(v) => Cell::Date(v.clone()),
Cell::Timestamp(v) => Cell::Timestamp(v.clone()),
Expand All @@ -71,6 +73,7 @@ impl fmt::Display for Cell {
Cell::I32(v) => write!(f, "{}", v),
Cell::F64(v) => write!(f, "{}", v),
Cell::I64(v) => write!(f, "{}", v),
Cell::Numeric(v) => write!(f, "{:?}", v),
Cell::String(v) => write!(f, "'{}'", v),
Cell::Date(v) => write!(f, "{:?}", v),
Cell::Timestamp(v) => write!(f, "{:?}", v),
Expand All @@ -89,6 +92,7 @@ impl IntoDatum for Cell {
Cell::I32(v) => v.into_datum(),
Cell::F64(v) => v.into_datum(),
Cell::I64(v) => v.into_datum(),
Cell::Numeric(v) => v.into_datum(),
Cell::String(v) => v.into_datum(),
Cell::Date(v) => v.into_datum(),
Cell::Timestamp(v) => v.into_datum(),
Expand Down Expand Up @@ -132,6 +136,9 @@ impl FromDatum for Cell {
PgOid::BuiltIn(PgBuiltInOids::INT8OID) => {
Some(Cell::I64(i64::from_datum(datum, false).unwrap()))
}
PgOid::BuiltIn(PgBuiltInOids::NUMERICOID) => {
Some(Cell::Numeric(AnyNumeric::from_datum(datum, false).unwrap()))
}
PgOid::BuiltIn(PgBuiltInOids::TEXTOID) => {
Some(Cell::String(String::from_datum(datum, false).unwrap()))
}
Expand Down
7 changes: 6 additions & 1 deletion wrappers/dockerfiles/bigquery/data.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,14 @@ projects:
mode: REQUIRED
- name: name
type: STRING
mode: required
mode: REQUIRED
- name: num
type: NUMERIC
mode: REQUIRED
data:
- id: 1
name: foo
num: 0.123
- id: 2
name: bar
num: 1234.56789
1 change: 1 addition & 0 deletions wrappers/src/fdw/bigquery_fdw/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ This is a foreign data wrapper for [BigQuery](https://cloud.google.com/bigquery)

| Version | Date | Notes |
| ------- | ---------- | ---------------------------------------------------- |
| 0.1.3 | 2023-04-03 | Added support for `NUMERIC` type |
| 0.1.2 | 2023-03-15 | Added subquery support for `table` option |
| 0.1.1 | 2023-02-15 | Upgrade bq client lib to v0.16.5, code improvement |
| 0.1.0 | 2022-11-30 | Initial version |
30 changes: 27 additions & 3 deletions wrappers/src/fdw/bigquery_fdw/bigquery_fdw.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use gcp_bigquery_client::{
Client,
};
use pgx::prelude::PgSqlErrorCode;
use pgx::prelude::{Date, Timestamp};
use pgx::prelude::{AnyNumeric, Date, Timestamp};
use serde_json::json;
use std::collections::HashMap;
use time::{format_description::well_known::Iso8601, OffsetDateTime, PrimitiveDateTime};
Expand Down Expand Up @@ -42,6 +42,10 @@ fn field_to_cell(rs: &ResultSet, field: &TableFieldSchema) -> Option<Cell> {
.get_f64_by_name(&field.name)
.unwrap_or_else(|err| field_type_error!(field, err))
.map(Cell::F64),
FieldType::Numeric => rs
.get_f64_by_name(&field.name)
.unwrap_or_else(|err| field_type_error!(field, err))
.map(|v| Cell::Numeric(AnyNumeric::try_from(v).unwrap())),
FieldType::String => rs
.get_string_by_name(&field.name)
.unwrap_or_else(|err| field_type_error!(field, err))
Expand Down Expand Up @@ -83,7 +87,7 @@ fn field_to_cell(rs: &ResultSet, field: &TableFieldSchema) -> Option<Cell> {
}

#[wrappers_fdw(
version = "0.1.2",
version = "0.1.3",
author = "Supabase",
website = "https://github.com/supabase/wrappers/tree/main/wrappers/src/fdw/bigquery_fdw"
)]
Expand Down Expand Up @@ -283,15 +287,34 @@ impl ForeignDataWrapper for BigQueryFdw {
.map(|t| t.to_owned())
.unwrap_or_else(|| "US".to_string());

let mut timeout: i32 = 30_000;
if let Some(timeout_str) = options.get("timeout") {
match timeout_str.parse::<i32>() {
Ok(t) => timeout = t,
Err(_) => report_error(
PgSqlErrorCode::ERRCODE_FDW_ERROR,
&format!("invalid timeout value: {}", timeout_str),
),
}
}

if let Some(client) = &self.client {
let sql = self.deparse(quals, columns, sorts, limit);
let mut req = QueryRequest::new(sql);
req.location = Some(location);
req.timeout_ms = Some(timeout);

// execute query on BigQuery
match self.rt.block_on(client.job().query(&self.project_id, req)) {
Ok(rs) => {
self.scan_result = Some(rs);
if rs.query_response().job_complete == Some(false) {
report_error(
PgSqlErrorCode::ERRCODE_FDW_ERROR,
&format!("query timeout {}ms expired", timeout),
);
} else {
self.scan_result = Some(rs);
}
}
Err(err) => {
self.scan_result = None;
Expand Down Expand Up @@ -395,6 +418,7 @@ impl ForeignDataWrapper for BigQueryFdw {
Cell::I64(v) => row_json[col_name] = json!(v),
Cell::F32(v) => row_json[col_name] = json!(v),
Cell::F64(v) => row_json[col_name] = json!(v),
Cell::Numeric(v) => row_json[col_name] = json!(v),
Cell::String(v) => row_json[col_name] = json!(v),
Cell::Date(v) => row_json[col_name] = json!(v),
Cell::Timestamp(v) => row_json[col_name] = json!(v),
Expand Down
10 changes: 9 additions & 1 deletion wrappers/src/fdw/bigquery_fdw/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ mod tests {
r#"
CREATE FOREIGN TABLE test_table (
id bigint,
name text
name text,
num numeric
)
SERVER my_bigquery_server
OPTIONS (
Expand Down Expand Up @@ -89,6 +90,13 @@ mod tests {

assert_eq!(results, vec!["FOO", "BAR"]);

let results = c
.select("SELECT num::text FROM test_table ORDER BY num", None, None)
.filter_map(|r| r.by_name("num").ok().and_then(|v| v.value::<&str>()))
.collect::<Vec<_>>();

assert_eq!(results, vec!["0.123", "1234.56789"]);

// DISABLED: error: [FIXME]
// insert failed: Request error (error: error decoding response body: missing field `status` at line 1 column 436)

Expand Down

0 comments on commit 0cd81d1

Please sign in to comment.