diff --git a/Cargo.toml b/Cargo.toml index fbcac03f..2f5e1d27 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -7,6 +7,7 @@ members = [ exclude = [ "wasm-wrappers/fdw/cal_fdw", "wasm-wrappers/fdw/calendly_fdw", + "wasm-wrappers/fdw/cfd1_fdw", "wasm-wrappers/fdw/helloworld_fdw", "wasm-wrappers/fdw/snowflake_fdw", "wasm-wrappers/fdw/paddle_fdw", diff --git a/README.md b/README.md index c6770d97..be758695 100644 --- a/README.md +++ b/README.md @@ -23,6 +23,7 @@ | [Paddle](./wasm-wrappers/fdw/paddle_fdw) | A Wasm FDW for [Paddle](https://www.paddle.com/) | ✅ | ✅ | | [Calendly](./wasm-wrappers/fdw/calendly_fdw) | A Wasm FDW for [Calendly](https://www.calendly.com/) | ✅ | ❌ | | [Cal.com](./wasm-wrappers/fdw/cal_fdw) | A Wasm FDW for [Cal.com](https://www.cal.com/) | ✅ | ❌ | +| [Cloudflare D1](./wasm-wrappers/fdw/cfd1_fdw) | A Wasm FDW for [Cloudflare D1](https://developers.cloudflare.com/d1/) | ✅ | ✅ | ### Warning diff --git a/docs/catalog/cfd1.md b/docs/catalog/cfd1.md new file mode 100644 index 00000000..c45d4cbc --- /dev/null +++ b/docs/catalog/cfd1.md @@ -0,0 +1,312 @@ +--- +source: +documentation: +author: supabase +tags: + - wasm + - official +--- + +# Cloudflare D1 + +[Cloudflare D1](https://developers.cloudflare.com/d1/) is Cloudflare's managed, serverless database with SQLite's SQL semantics, built-in disaster recovery, and Worker and HTTP API access. + +The Cloudflare D1 Wrapper is a WebAssembly(Wasm) foreign data wrapper which allows you to read data from Cloudflare D1 database for use within your Postgres database. + +!!! warning + + Restoring a logical backup of a database with a materialized view using a foreign table can fail. For this reason, either do not use foreign tables in materialized views or use them in databases with physical backups enabled. + +## Supported Data Types + +| Postgres Data Type | D1 Data Type | +| ------------------ | ------------ | +| bigint | integer | +| double precision | real | +| text | text | +| text | blob | + +The D1 API uses JSON formatted data, please refer to [D1 API docs](https://developers.cloudflare.com/api/operations/cloudflare-d1-list-databases) for more details. + +## Available Versions + +| Version | Wasm Package URL | Checksum | +| ------- | ----------------------------------------------------------------------------------------------- | ------------------------------------------------------------------ | +| 0.1.0 | `https://github.com/supabase/wrappers/releases/download/wasm_cfd1_fdw_v0.1.0/cfd1_fdw.wasm` | `tbd` | + +## Preparation + +Before you can query D1, you need to enable the Wrappers extension and store your credentials in Postgres. + +### Enable Wrappers + +Make sure the `wrappers` extension is installed on your database: + +```sql +create extension if not exists wrappers with schema extensions; +``` + +### Enable the D1 Wrapper + +Enable the Wasm foreign data wrapper: + +```sql +create foreign data wrapper wasm_wrapper + handler wasm_fdw_handler + validator wasm_fdw_validator; +``` + +### Store your credentials (optional) + +By default, Postgres stores FDW credentials inside `pg_catalog.pg_foreign_server` in plain text. Anyone with access to this table will be able to view these credentials. Wrappers is designed to work with [Vault](https://supabase.com/docs/guides/database/vault), which provides an additional level of security for storing credentials. We recommend using Vault to store your credentials. + +```sql +-- Save your D1 API token in Vault and retrieve the `key_id` +insert into vault.secrets (name, secret) +values ( + 'cfd1', + '' -- Cloudflare D1 API token +) +returning key_id; +``` + +### Connecting to D1 + +We need to provide Postgres with the credentials to access D1 and any additional options. We can do this using the `create server` command: + +=== "With Vault" + + ```sql + create server cfd1_server + foreign data wrapper wasm_wrapper + options ( + fdw_package_url 'https://github.com/supabase/wrappers/releases/download/wasm_cfd1_fdw_v0.1.0/cfd1_fdw.wasm', + fdw_package_name 'supabase:cfd1-fdw', + fdw_package_version '0.1.0', + fdw_package_checksum 'tbd', + api_url 'https://api.cloudflare.com/client/v4/accounts//d1/database', -- optional + account_id '', + database_id '', + api_token_id '' -- The Key ID from above. + ); + ``` + +=== "Without Vault" + + ```sql + create server cfd1_server + foreign data wrapper wasm_wrapper + options ( + fdw_package_url 'https://github.com/supabase/wrappers/releases/download/wasm_cfd1_fdw_v0.1.0/cfd1_fdw.wasm', + fdw_package_name 'supabase:cfd1-fdw', + fdw_package_version '0.1.0', + fdw_package_checksum 'tbd', + api_url 'https://api.cloudflare.com/client/v4/accounts//d1/database', -- optional + account_id '', + database_id '', + api_token '' + ); + ``` + +Note the `fdw_package_*` options are required, which specify the Wasm package metadata. You can get the available package version list from [above](#available-versions). + +### Create a schema + +We recommend creating a schema to hold all the foreign tables: + +```sql +create schema if not exists cfd1; +``` + +## Options + +The full list of foreign table options are below: + +- `table` - Source table name in D1, required. + + - This option can also be a subquery enclosed in parentheses, see below for examples. + - A pseudo-table name `_meta_databases` can be used to query databases. + +- `rowid_column` - Primary key column name, optional for data scan, required for data modify. + +## Entities + +The D1 Wrapper supports data reads and writes from the Cloudflare D1 API. + +### D1 Databases + +This is an object representing a D1 database. + +Ref: [D1 databases docs](https://developers.cloudflare.com/api/operations/cloudflare-d1-list-databases) + +#### Operations + +| Object | Select | Insert | Update | Delete | Truncate | +| ----------- | :----: | :----: | :----: | :----: | :------: | +| database | ✅ | ❌ | ❌ | ❌ | ❌ | + +#### Usage + +```sql +create foreign table cfd1.databases ( + uuid text, + name text, + version text, + num_tables bigint, + file_size bigint, + created_at text, + _attrs jsonb +) + server cfd1_server + options ( + table '_meta_databases' + ); +``` + +#### Notes + +- The `_attrs` meta column contains all database attributes in JSON format +- The table option must be `_meta_databases` +- Only column names listed above are allowed + +### D1 Tables + +This is an object representing a D1 table. + +Ref: [D1 query docs](https://developers.cloudflare.com/api/operations/cloudflare-d1-query-database) + +#### Operations + +| Object | Select | Insert | Update | Delete | Truncate | +| ----------- | :----: | :----: | :----: | :----: | :------: | +| table | ✅ | ✅ | ✅ | ✅ | ❌ | + +#### Usage + +```sql +create foreign table cfd1.mytable ( + id bigint, + name text, + amount double precision, + metadata text, + _attrs jsonb +) + server cfd1_server + options ( + table 'mytable', + rowid_column 'id' + ); +``` + +#### Notes + +- The `_attrs` meta column contains all attributes in JSON format +- Can use subquery in `table` option +- Requires `rowid_column` for data modification operations +- Supports query pushdown for `where`, `order by`, and `limit` clauses +- Column names, except `_attrs`, must match between D1 and foreign table +- Data types must be compatible according to type mapping table + +## Query Pushdown Support + +This FDW supports `where`, `order by` and `limit` clause pushdown. + +## Examples + +Below are some examples on how to use D1 foreign tables. + +### Basic Example + +This example will create a "foreign table" inside your Postgres database and query its data. + +```sql +create foreign table cfd1.databases ( + uuid text, + name text, + version text, + num_tables bigint, + file_size bigint, + created_at text, + _attrs jsonb +) + server cfd1_server + options ( + table '_meta_databases' + ); + +-- query D1 databases +select * from cfd1.databases; +``` + +### Query A Table + +Let's create a source table `test_table` in D1 web console and add some testing data. + +| Column Name | Data Type | +| ----------- | --------- | +| id | integer | +| name | text | +| amount | real | +| metadata | blob | + +This example will create a "foreign table" inside your Postgres database and query its data. + +```sql +create foreign table cfd1.test_table ( + id bigint, + name text, + amount double precision, + metadata text, + _attrs jsonb +) + server cfd1_server + options ( + table 'test_table', + rowid_column 'id' + ); + +select * from cfd1.test_table; +``` + +### Table With Subquery + +The `table` option can also be a subquery enclosed in parentheses. + +```sql +create foreign table cfd1.test_table_subquery ( + id bigint, + name text, + amount double precision, + metadata text, + _attrs jsonb +) + server cfd1_server + options ( + table '(select * from test_table)' + ); + +select * from cfd1.test_table_subquery; +``` + +!!! note + + The foreign table with subquery option cannot support data modification. + +### Modify Data + +This example will modify data in a "foreign table" inside your Postgres database, note that `rowid_column` table option is required for data modify. + +```sql +-- insert new data +insert into cfd1.test_table(id, name, amount) +values (123, 'test name 123', 321.654); + +-- update existing data +update cfd1.test_table +set name = 'new name', amount = null +where id = 123; + +-- delete data +delete from cfd1.test_table where id = 123; +``` + diff --git a/docs/catalog/index.md b/docs/catalog/index.md index 134f6889..5f6ff749 100644 --- a/docs/catalog/index.md +++ b/docs/catalog/index.md @@ -7,24 +7,25 @@ hide: ## Official -| Integration | Select | Insert | Update | Delete | Truncate | Push Down | -| ----------- | :----: | :----: | :----: | :----: | :------: | :-------: | -| Airtable | ✅ | ❌ | ❌ | ❌ | ❌ | ❌ | -| Auth0 | ✅ | ❌ | ❌ | ❌ | ❌ | ❌ | -| AWS Cognito | ✅ | ❌ | ❌ | ❌ | ❌ | ❌ | -| BigQuery | ✅ | ✅ | ✅ | ✅ | ❌ | ✅ | -| Cal.com | ✅ | ✅ | ❌ | ❌ | ❌ | ❌ | -| Calendly | ✅ | ❌ | ❌ | ❌ | ❌ | ❌ | -| ClickHouse | ✅ | ✅ | ✅ | ✅ | ❌ | ✅ | -| Firebase | ✅ | ❌ | ❌ | ❌ | ❌ | ❌ | -| Logflare | ✅ | ❌ | ❌ | ❌ | ❌ | ❌ | -| Notion | ✅ | ❌ | ❌ | ❌ | ❌ | ❌ | -| Paddle | ✅ | ✅ | ✅ | ❌ | ❌ | ✅ | -| Redis | ✅ | ❌ | ❌ | ❌ | ❌ | ❌ | -| S3 | ✅ | ❌ | ❌ | ❌ | ❌ | ❌ | -| Snowflake | ✅ | ✅ | ✅ | ✅ | ❌ | ✅ | -| Stripe | ✅ | ✅ | ✅ | ✅ | ❌ | ✅ | -| SQL Server | ✅ | ❌ | ❌ | ❌ | ❌ | ✅ | +| Integration | Select | Insert | Update | Delete | Truncate | Push Down | +| ------------- | :----: | :----: | :----: | :----: | :------: | :-------: | +| Airtable | ✅ | ❌ | ❌ | ❌ | ❌ | ❌ | +| Auth0 | ✅ | ❌ | ❌ | ❌ | ❌ | ❌ | +| AWS Cognito | ✅ | ❌ | ❌ | ❌ | ❌ | ❌ | +| BigQuery | ✅ | ✅ | ✅ | ✅ | ❌ | ✅ | +| Cal.com | ✅ | ✅ | ❌ | ❌ | ❌ | ❌ | +| Calendly | ✅ | ❌ | ❌ | ❌ | ❌ | ❌ | +| ClickHouse | ✅ | ✅ | ✅ | ✅ | ❌ | ✅ | +| Cloudflare D1 | ✅ | ✅ | ✅ | ✅ | ❌ | ✅ | +| Firebase | ✅ | ❌ | ❌ | ❌ | ❌ | ❌ | +| Logflare | ✅ | ❌ | ❌ | ❌ | ❌ | ❌ | +| Notion | ✅ | ❌ | ❌ | ❌ | ❌ | ❌ | +| Paddle | ✅ | ✅ | ✅ | ❌ | ❌ | ✅ | +| Redis | ✅ | ❌ | ❌ | ❌ | ❌ | ❌ | +| S3 | ✅ | ❌ | ❌ | ❌ | ❌ | ❌ | +| Snowflake | ✅ | ✅ | ✅ | ✅ | ❌ | ✅ | +| Stripe | ✅ | ✅ | ✅ | ✅ | ❌ | ✅ | +| SQL Server | ✅ | ❌ | ❌ | ❌ | ❌ | ✅ | ## Community @@ -32,10 +33,11 @@ Wasm wrappers can be installed directly from GitHub or any external source. See [Developing a Wasm Wrapper](../guides/create-wasm-wrapper.md) for instructions on how to build and develop your own. -| Integration | Developer | Docs | Source | -| ----------- | :------------------------------: | :----------------------------------: | :------------------------------------------------------------------------------------: | -| Cal.com | [Supabase](https://supabase.com) | [Link](cal.md) | [Link](https://github.com/supabase/wrappers/tree/main/wasm-wrappers/fdw/cal_fdw) | -| Calendly | [Supabase](https://supabase.com) | [Link](calendly.md) | [Link](https://github.com/supabase/wrappers/tree/main/wasm-wrappers/fdw/calendly_fdw) | -| Notion | [Supabase](https://supabase.com) | [Link](notion.md) | [Link](https://github.com/supabase/wrappers/tree/main/wasm-wrappers/fdw/notion_fdw) | -| Paddle | [Supabase](https://supabase.com) | [Link](paddle.md) | [Link](https://github.com/supabase/wrappers/tree/main/wasm-wrappers/fdw/paddle_fdw) | -| Snowflake | [Supabase](https://supabase.com) | [Link](snowflake.md) | [Link](https://github.com/supabase/wrappers/tree/main/wasm-wrappers/fdw/snowflake_fdw) | +| Integration | Developer | Docs | Source | +| :-----------: | :------------------------------: | :------------------: | :------------------------------------------------------------------------------------: | +| Cal.com | [Supabase](https://supabase.com) | [Link](cal.md) | [Link](https://github.com/supabase/wrappers/tree/main/wasm-wrappers/fdw/cal_fdw) | +| Calendly | [Supabase](https://supabase.com) | [Link](calendly.md) | [Link](https://github.com/supabase/wrappers/tree/main/wasm-wrappers/fdw/calendly_fdw) | +| Cloudflare D1 | [Supabase](https://supabase.com) | [Link](cfd1.md) | [Link](https://github.com/supabase/wrappers/tree/main/wasm-wrappers/fdw/cfd1_fdw) | +| Notion | [Supabase](https://supabase.com) | [Link](notion.md) | [Link](https://github.com/supabase/wrappers/tree/main/wasm-wrappers/fdw/notion_fdw) | +| Paddle | [Supabase](https://supabase.com) | [Link](paddle.md) | [Link](https://github.com/supabase/wrappers/tree/main/wasm-wrappers/fdw/paddle_fdw) | +| Snowflake | [Supabase](https://supabase.com) | [Link](snowflake.md) | [Link](https://github.com/supabase/wrappers/tree/main/wasm-wrappers/fdw/snowflake_fdw) | diff --git a/docs/catalog/wasm/index.md b/docs/catalog/wasm/index.md index 022e36fb..6eecfb3b 100644 --- a/docs/catalog/wasm/index.md +++ b/docs/catalog/wasm/index.md @@ -37,6 +37,18 @@ Foreign data wrappers built with Wasm which can be used on Supabase platform. :octicons-code-24: [source](https://github.com/supabase/wrappers/tree/wasm_calendly_fdw_v0.1.0/wasm-wrappers/fdw/calendly_fdw)   :material-file-document: [docs](../calendly.md) +- :simple-webassembly:   **[Cloudflare D1](../cfd1.md)** + + ---- + + Foreign data wrapper for [Cloudflare D1](https://developers.cloudflare.com/d1/). + + Supported by [Supabase](https://www.supabase.com) + + :octicons-tag-24: [v0.1.0](https://github.com/supabase/wrappers/releases/tag/cfd1_fdw_v0.1.0)   + :octicons-code-24: [source](https://github.com/supabase/wrappers/tree/wasm_cfd1_fdw_v0.1.0/wasm-wrappers/fdw/cfd1_fdw)   + :material-file-document: [docs](../cfd1.md) + - :simple-webassembly:   **[Notion](../notion.md)** ---- diff --git a/mkdocs.yaml b/mkdocs.yaml index 70ae9206..2ff2c775 100644 --- a/mkdocs.yaml +++ b/mkdocs.yaml @@ -30,6 +30,7 @@ nav: - catalog/wasm/index.md - Cal.com: 'catalog/cal.md' - Calendly: 'catalog/calendly.md' + - Cloudflare D1: 'catalog/cfd1.md' - Notion: 'catalog/notion.md' - Paddle: 'catalog/paddle.md' - Snowflake: 'catalog/snowflake.md' diff --git a/wasm-wrappers/fdw/cfd1_fdw/Cargo.lock b/wasm-wrappers/fdw/cfd1_fdw/Cargo.lock new file mode 100644 index 00000000..8bf2e870 --- /dev/null +++ b/wasm-wrappers/fdw/cfd1_fdw/Cargo.lock @@ -0,0 +1,102 @@ +# This file is automatically @generated by Cargo. +# It is not intended for manual editing. +version = 3 + +[[package]] +name = "cfd1_fdw" +version = "0.1.0" +dependencies = [ + "serde_json", + "wit-bindgen-rt", +] + +[[package]] +name = "itoa" +version = "1.0.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d75a2a4b1b190afb6f5425f10f6a8f959d2ea0b9c2b1d79553551850539e4674" + +[[package]] +name = "memchr" +version = "2.7.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "78ca9ab1a0babb1e7d5695e3530886289c18cf2f87ec19a575a0abdce112e3a3" + +[[package]] +name = "proc-macro2" +version = "1.0.92" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "37d3544b3f2748c54e147655edb5025752e2303145b5aefb3c3ea2c78b973bb0" +dependencies = [ + "unicode-ident", +] + +[[package]] +name = "quote" +version = "1.0.37" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b5b9d34b8991d19d98081b46eacdd8eb58c6f2b201139f7c5f643cc155a633af" +dependencies = [ + "proc-macro2", +] + +[[package]] +name = "ryu" +version = "1.0.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f3cb5ba0dc43242ce17de99c180e96db90b235b8a9fdc9543c96d2209116bd9f" + +[[package]] +name = "serde" +version = "1.0.216" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0b9781016e935a97e8beecf0c933758c97a5520d32930e460142b4cd80c6338e" +dependencies = [ + "serde_derive", +] + +[[package]] +name = "serde_derive" +version = "1.0.216" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "46f859dbbf73865c6627ed570e78961cd3ac92407a2d117204c49232485da55e" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "serde_json" +version = "1.0.133" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c7fceb2473b9166b2294ef05efcb65a3db80803f0b03ef86a5fc88a2b85ee377" +dependencies = [ + "itoa", + "memchr", + "ryu", + "serde", +] + +[[package]] +name = "syn" +version = "2.0.90" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "919d3b74a5dd0ccd15aeb8f93e7006bd9e14c295087c9896a110f490752bcf31" +dependencies = [ + "proc-macro2", + "quote", + "unicode-ident", +] + +[[package]] +name = "unicode-ident" +version = "1.0.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "adb9e6ca4f869e1180728b7950e35922a7fc6397f7b641499e8f3ef06e50dc83" + +[[package]] +name = "wit-bindgen-rt" +version = "0.26.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "29c7526379ace8709ee9ab9f2bb50f112d95581063a59ef3097d9c10153886c9" diff --git a/wasm-wrappers/fdw/cfd1_fdw/Cargo.toml b/wasm-wrappers/fdw/cfd1_fdw/Cargo.toml new file mode 100644 index 00000000..7c426fbe --- /dev/null +++ b/wasm-wrappers/fdw/cfd1_fdw/Cargo.toml @@ -0,0 +1,22 @@ +[package] +name = "cfd1_fdw" +version = "0.1.0" +edition = "2021" + +[lib] +crate-type = ["cdylib"] + +[dependencies] +wit-bindgen-rt = "0.26.0" +serde_json = "1.0" + +[package.metadata.component] +package = "supabase:cfd1-fdw" + +[package.metadata.component.dependencies] + +[package.metadata.component.target] +path = "wit" + +[package.metadata.component.target.dependencies] +"supabase:wrappers" = { path = "../../wit" } diff --git a/wasm-wrappers/fdw/cfd1_fdw/src/lib.rs b/wasm-wrappers/fdw/cfd1_fdw/src/lib.rs new file mode 100644 index 00000000..ce78d421 --- /dev/null +++ b/wasm-wrappers/fdw/cfd1_fdw/src/lib.rs @@ -0,0 +1,484 @@ +#[allow(warnings)] +mod bindings; +use serde_json::Value as JsonValue; + +use bindings::{ + exports::supabase::wrappers::routines::Guest, + supabase::wrappers::{ + http, stats, time, + types::{Cell, Column, Context, FdwError, FdwResult, OptionsType, Row, TypeOid, Value}, + utils, + }, +}; + +#[derive(Debug, Default)] +struct Cfd1Fdw { + base_url: String, + headers: Vec<(String, String)>, + database_id: String, + table: String, + rowid_col: String, + src_rows: Vec, + src_idx: usize, +} + +static mut INSTANCE: *mut Cfd1Fdw = std::ptr::null_mut::(); +static FDW_NAME: &str = "Cfd1Fdw"; + +impl Cfd1Fdw { + fn init() { + let instance = Self::default(); + unsafe { + INSTANCE = Box::leak(Box::new(instance)); + } + } + + fn this_mut() -> &'static mut Self { + unsafe { &mut (*INSTANCE) } + } + + // convert Cloudflare API response data field to a cell + fn src_to_cell(&self, src_row: &JsonValue, tgt_col: &Column) -> Result, FdwError> { + let tgt_col_name = tgt_col.name(); + + // put all properties into '_attrs' JSON column + if &tgt_col_name == "_attrs" { + return Ok(Some(Cell::Json(src_row.to_string()))); + } + + let src = src_row + .as_object() + .and_then(|v| v.get(&tgt_col_name)) + .ok_or(format!("source column '{}' not found", tgt_col_name))?; + + // column type mapping + let cell = match tgt_col.type_oid() { + TypeOid::F64 => src.as_f64().map(Cell::F64), + TypeOid::I64 => src.as_i64().map(Cell::I64), + TypeOid::String => src.as_str().map(|v| Cell::String(v.to_owned())), + TypeOid::Json => src.as_object().map(|_| Cell::Json(src.to_string())), + _ => { + return Err(format!( + "target column '{}' type is not supported", + tgt_col_name + )); + } + }; + + Ok(cell) + } + + // combine target columns, quals, sorts and limit to a sql statement + fn deparse(&self, ctx: &Context) -> String { + let columns = ctx.get_columns(); + let quals = ctx.get_quals(); + let sorts = ctx.get_sorts(); + let limit = ctx.get_limit(); + + // make target column list + let tgts = if columns.is_empty() { + "*".to_string() + } else { + columns + .iter() + .map(|c| c.name()) + .filter(|c| c != "_attrs") + .collect::>() + .join(", ") + }; + + // make sql statement + let mut sql = if quals.is_empty() { + format!("select {} from {}", tgts, self.table) + } else { + let cond = quals + .iter() + .map(|q| { + let default_cond = format!("{} {} ?", q.field(), q.operator()); + match q.operator().as_str() { + "is" | "is not" => match q.value() { + Value::Cell(cell) => match cell { + Cell::String(s) if s == "null" => { + format!("{} {} null", q.field(), q.operator()) + } + _ => default_cond, + }, + _ => default_cond, + }, + "~~" => format!("{} like ?", q.field()), + "!~~" => format!("{} not like ?", q.field()), + _ => default_cond, + } + }) + .collect::>() + .join(" and "); + format!("select {} from {} where {}", tgts, self.table, cond) + }; + + // push down sorts + if !sorts.is_empty() { + let order_by = sorts + .iter() + .map(|sort| sort.deparse()) + .collect::>() + .join(", "); + sql.push_str(&format!(" order by {}", order_by)); + } + + // push down limits + // Note: Postgres will take limit and offset locally after reading rows + // from remote, so we calculate the real limit and only use it without + // pushing down offset. + if let Some(limit) = limit { + let real_limit = limit.offset() + limit.count(); + sql.push_str(&format!(" limit {}", real_limit)); + } + + sql + } + + // create a request instance + fn create_request(&self, ctx: &Context) -> Result { + // ref: https://developers.cloudflare.com/api/operations/cloudflare-d1-query-database + let (method, url, body) = match self.table.as_str() { + "_meta_databases" => (http::Method::Get, self.base_url.clone(), String::default()), + _ => { + let quals = ctx.get_quals(); + + // make query parameter list + let params = quals + .iter() + .filter(|q| { + // filter out qual which is 'is null' or 'is not null' + match q.operator().as_str() { + "is" | "is not" => match q.value() { + Value::Cell(cell) => { + !matches!(cell, Cell::String(s) if s == "null") + } + _ => true, + }, + _ => true, + } + }) + .map(|q| match q.value() { + Value::Cell(cell) => match cell { + Cell::F32(n) => Ok(n.to_string()), + Cell::F64(n) => Ok(n.to_string()), + Cell::I32(n) => Ok(n.to_string()), + Cell::I64(n) => Ok(n.to_string()), + Cell::String(s) => Ok(s.to_owned()), + _ => Err(format!( + "parameter type {:?} for column '{}' not supported", + cell, + q.field() + )), + }, + _ => Err("qual value type not supported".to_string()), + }) + .collect::, FdwError>>()?; + + // deparse sql query + let sql = self.deparse(ctx); + + ( + http::Method::Post, + format!("{}/{}/query", self.base_url, self.database_id), + format!(r#"{{ "params": {:?}, "sql": "{}" }}"#, params, sql), + ) + } + }; + + Ok(http::Request { + method, + url, + headers: self.headers.clone(), + body, + }) + } + + // make request to Cloudflare API, including following pagination requests + fn fetch_source_data(&mut self, req: http::Request) -> FdwResult { + self.src_rows.clear(); + self.src_idx = 0; + + // max delay times when encouter HTTP 429 - Too Many Requests response + const MAX_DELAY_TIMES: usize = 5; + let mut delay_times = 0; + + loop { + // send request + let resp = match req.method { + http::Method::Get => http::get(&req)?, + http::Method::Post => http::post(&req)?, + _ => unreachable!("invalid request method"), + }; + + // idle for 2 seconds for retry when got rate limited error + // ref: https://developers.cloudflare.com/fundamentals/api/reference/limits/ + if resp.status_code == 429 { + delay_times += 1; + if delay_times >= MAX_DELAY_TIMES { + return Err("API rate limit exceeded".to_owned()); + } + time::sleep(2000); + continue; + } + + // transform response to json + let resp_json: JsonValue = + serde_json::from_str(&resp.body).map_err(|e| e.to_string())?; + + // check for HTTP errors + http::error_for_status(&resp).map_err(|err| format!("{}: {}", err, resp.body))?; + + // check for API request errors + if let Some(success) = resp_json["success"].as_bool() { + if !success { + if let Some(errors) = resp_json["errors"].as_array() { + if !errors.is_empty() { + return Err(format!("API request failed with error {:?}", errors)); + } + } + } + } + + // unify response object to array and save source rows + let resp_data = resp_json + .pointer("/result") + .and_then(|v| { + if v.is_array() { + if self.table == "_meta_databases" { + v.as_array().cloned() + } else { + v[0]["results"].as_array().cloned() + } + } else { + Some(vec![v.clone()]) + } + }) + .ok_or("cannot get query result data")?; + self.src_rows.extend(resp_data); + + stats::inc_stats(FDW_NAME, stats::Metric::BytesIn, resp.body.len() as i64); + break; + } + + Ok(()) + } + + // make modify request and send the request + fn modify_source_data(&mut self, params: &[String], sql: &str) -> FdwResult { + let req = http::Request { + method: http::Method::Post, + url: format!("{}/{}/query", self.base_url, self.database_id), + headers: self.headers.clone(), + body: format!(r#"{{ "params": {:?}, "sql": "{}" }}"#, params, sql), + }; + self.fetch_source_data(req) + } +} + +impl Guest for Cfd1Fdw { + fn host_version_requirement() -> String { + // semver ref: https://docs.rs/semver/latest/semver/enum.Op.html + "^0.1.0".to_string() + } + + fn init(ctx: &Context) -> FdwResult { + Self::init(); + let this = Self::this_mut(); + + // get foreign server options + let opts = ctx.get_options(OptionsType::Server); + let account_id = opts.require("account_id")?; + this.database_id = opts.require("database_id")?; + this.base_url = opts.require_or( + "api_url", + &format!( + "https://api.cloudflare.com/client/v4/accounts/{}/d1/database", + account_id + ), + ); + let api_token = match opts.get("api_token") { + Some(key) => key, + None => { + let token_id = opts.require("api_token_id")?; + utils::get_vault_secret(&token_id).unwrap_or_default() + } + }; + + // Cloudflare D1 API authentication + // ref: https://developers.cloudflare.com/api/operations/cloudflare-d1-list-databases + this.headers + .push(("user-agent".to_owned(), "Wrappers Cfd1 FDW".to_string())); + this.headers + .push(("content-type".to_owned(), "application/json".to_string())); + this.headers + .push(("authorization".to_owned(), format!("Bearer {}", api_token))); + + stats::inc_stats(FDW_NAME, stats::Metric::CreateTimes, 1); + + Ok(()) + } + + fn begin_scan(ctx: &Context) -> FdwResult { + let this = Self::this_mut(); + let opts = ctx.get_options(OptionsType::Table); + this.table = opts.require("table")?; + let req = this.create_request(ctx)?; + this.fetch_source_data(req) + } + + fn iter_scan(ctx: &Context, row: &Row) -> Result, FdwError> { + let this = Self::this_mut(); + + // if all source rows are consumed + if this.src_idx >= this.src_rows.len() { + stats::inc_stats(FDW_NAME, stats::Metric::RowsIn, this.src_rows.len() as i64); + stats::inc_stats(FDW_NAME, stats::Metric::RowsOut, this.src_rows.len() as i64); + return Ok(None); + } + + // convert source row to Postgres row + let src_row = &this.src_rows[this.src_idx]; + for tgt_col in ctx.get_columns() { + let cell = this.src_to_cell(src_row, &tgt_col)?; + row.push(cell.as_ref()); + } + + this.src_idx += 1; + + Ok(Some(0)) + } + + fn re_scan(ctx: &Context) -> FdwResult { + let this = Self::this_mut(); + let req = this.create_request(ctx)?; + this.fetch_source_data(req) + } + + fn end_scan(_ctx: &Context) -> FdwResult { + let this = Self::this_mut(); + this.src_idx = 0; + this.src_rows.clear(); + Ok(()) + } + + fn begin_modify(ctx: &Context) -> FdwResult { + let this = Self::this_mut(); + let opts = ctx.get_options(OptionsType::Table); + this.table = opts.require("table")?; + this.rowid_col = opts.require("rowid_column")?; + Ok(()) + } + + fn insert(_ctx: &Context, row: &Row) -> FdwResult { + let this = Self::this_mut(); + + // make query parameter and colmn name&value list + let (params, cols): (Vec, Vec<(String, String)>) = row + .cols() + .iter() + .zip(row.cells().iter()) + .filter(|(col, cell)| *col != "_attrs" && cell.is_some()) + .map(|(col, cell)| { + let mut param = utils::cell_to_string(cell.as_ref()); + if let Some(Cell::String(_)) = cell { + // if cell is string, strip the leading and trailing quote + param = param + .as_str() + .strip_prefix("'") + .and_then(|s| s.strip_suffix("'")) + .map(|s| s.to_owned()) + .unwrap_or_default(); + } + let col_name = col.to_owned(); + let col_value = "?".to_owned(); + (param, (col_name, col_value)) + }) + .unzip(); + + // deparse sql query + let (col_names, col_values): (Vec, Vec) = cols.iter().cloned().unzip(); + let sql = format!( + "insert into {} ({}) values ({})", + this.table, + col_names.join(","), + col_values.join(",") + ); + + // send modify request + this.modify_source_data(¶ms, &sql)?; + + stats::inc_stats(FDW_NAME, stats::Metric::RowsOut, 1); + + Ok(()) + } + + fn update(_ctx: &Context, rowid: Cell, row: &Row) -> FdwResult { + let this = Self::this_mut(); + + // make query parameter and update list + let (params, updates): (Vec>, Vec) = row + .cols() + .iter() + .zip(row.cells().iter()) + .filter(|(col, _)| *col != "_attrs") + .map(|(col, cell)| { + let mut param = utils::cell_to_string(cell.as_ref()); + if let Some(Cell::String(_)) = cell { + // if cell is string, strip the leading and trailing quote + param = param + .as_str() + .strip_prefix("'") + .and_then(|s| s.strip_suffix("'")) + .map(|s| s.to_owned()) + .unwrap_or_default(); + } + let col_name = col.to_owned(); + + // skip the param if it is setting null + if param == "null" { + (None, format!("{} = null", col_name)) + } else { + (Some(param), format!("{} = ?", col_name)) + } + }) + .unzip(); + + // filter out null params + let params = params + .iter() + .filter_map(|p| p.clone()) + .collect::>(); + + // deparse sql query + let sql = format!( + "update {} set {} where {} = {}", + this.table, + updates.join(","), + this.rowid_col, + utils::cell_to_string(Some(&rowid)), + ); + + // send modify request + this.modify_source_data(¶ms, &sql) + } + + fn delete(_ctx: &Context, rowid: Cell) -> FdwResult { + let this = Self::this_mut(); + + // make query parameter and deparse sql query + let params = vec![utils::cell_to_string(Some(&rowid))]; + let sql = format!("delete from {} where {} = ?", this.table, this.rowid_col,); + + // send modify request + this.modify_source_data(¶ms, &sql) + } + + fn end_modify(_ctx: &Context) -> FdwResult { + Ok(()) + } +} + +bindings::export!(Cfd1Fdw with_types_in bindings); diff --git a/wasm-wrappers/fdw/cfd1_fdw/wit/world.wit b/wasm-wrappers/fdw/cfd1_fdw/wit/world.wit new file mode 100644 index 00000000..547a5275 --- /dev/null +++ b/wasm-wrappers/fdw/cfd1_fdw/wit/world.wit @@ -0,0 +1,10 @@ +package supabase:cfd1-fdw@0.1.0; + +world cfd1 { + import supabase:wrappers/http@0.1.0; + import supabase:wrappers/jwt@0.1.0; + import supabase:wrappers/stats@0.1.0; + import supabase:wrappers/time@0.1.0; + import supabase:wrappers/utils@0.1.0; + export supabase:wrappers/routines@0.1.0; +} diff --git a/wrappers/dockerfiles/wasm/server.py b/wrappers/dockerfiles/wasm/server.py index f44ed8be..beeb0f1a 100644 --- a/wrappers/dockerfiles/wasm/server.py +++ b/wrappers/dockerfiles/wasm/server.py @@ -287,7 +287,39 @@ def do_POST(self): "createdOn": 1718259164549 } ''' - + elif fdw == "cfd1": + body = ''' +{ + "result": [ + { + "results": [ + { + "id": 42, + "name": "test name 2" + }, + { + "id": 123, + "name": "test name" + } + ], + "success": true, + "meta": { + "served_by": "v3-prod", + "duration": 0.1983, + "changes": 0, + "last_row_id": 0, + "changed_db": false, + "size_after": 16384, + "rows_read": 2, + "rows_written": 0 + } + } + ], + "errors": [], + "messages": [], + "success": true +} + ''' else: self.send_response(404) return diff --git a/wrappers/src/fdw/wasm_fdw/tests.rs b/wrappers/src/fdw/wasm_fdw/tests.rs index 40ea5776..a07ab7ee 100644 --- a/wrappers/src/fdw/wasm_fdw/tests.rs +++ b/wrappers/src/fdw/wasm_fdw/tests.rs @@ -240,6 +240,47 @@ mod tests { .filter_map(|r| r.get_by_name::("id").unwrap()) .collect::>(); assert_eq!(results, vec![1234567]); + + // Cloudflare D1 FDW test + c.update( + r#"CREATE SERVER cfd1_server + FOREIGN DATA WRAPPER wasm_wrapper + OPTIONS ( + fdw_package_url 'file://../../../wasm-wrappers/fdw/cfd1_fdw/target/wasm32-unknown-unknown/release/cfd1_fdw.wasm', + fdw_package_name 'supabase:cfd1-fdw', + fdw_package_version '>=0.1.0', + api_url 'http://localhost:8096/cfd1', + account_id 'aaa', + database_id 'bbb', + api_token 'ccc' + )"#, + None, + None, + ) + .unwrap(); + c.update( + r#" + CREATE FOREIGN TABLE cfd1_table ( + id bigint, + name text, + _attrs jsonb + ) + SERVER cfd1_server + OPTIONS ( + table 'test_table' + ) + "#, + None, + None, + ) + .unwrap(); + + let results = c + .select("SELECT * FROM cfd1_table order by id", None, None) + .unwrap() + .filter_map(|r| r.get_by_name::("id").unwrap()) + .collect::>(); + assert_eq!(results, vec![42, 123]); }); } }