Skip to content

Commit

Permalink
Merge pull request pgcentralfoundation#13 from supabase/ankrgyl/airtable
Browse files Browse the repository at this point in the history
Ankrgyl/airtable
  • Loading branch information
burmecia authored Nov 26, 2022
2 parents 886a936 + 2c38dad commit f672be4
Show file tree
Hide file tree
Showing 10 changed files with 392 additions and 3 deletions.
5 changes: 3 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ Wrappers is also a collection of FDWs built by [Supabase](https://www.supabase.c
- [BigQuery](./wrappers/src/fdw/bigquery_fdw): A FDW for [BigQuery](https://cloud.google.com/bigquery) which only supports async data scan at this moment.
- [Clickhouse](./wrappers/src/fdw/clickhouse_fdw): A FDW for [ClickHouse](https://clickhouse.com/) which supports both async data scan and modify.
- [Stripe](./wrappers/src/fdw/stripe_fdw): A FDW for [Stripe](https://stripe.com/) API.
- [Airtable](./wrappers/src/fdw/airtable_fdw): A FDW for [Airtable](https://airtable.com/) API.

## Features

Expand Down
1 change: 1 addition & 0 deletions wrappers/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 7 additions & 1 deletion wrappers/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@ clickhouse_fdw = ["clickhouse-rs", "chrono", "time"]
stripe_fdw = ["reqwest", "reqwest-middleware", "reqwest-retry", "serde_json"]
firebase_fdw = ["reqwest", "reqwest-middleware", "reqwest-retry", "serde_json", "yup-oauth2", "regex", "time"]

# TODO: audit dependencies
airtable_fdw = ["reqwest", "reqwest-middleware", "reqwest-retry", "serde_json", "serde", "url"]

[dependencies]
pgx = "=0.5.6"
cfg-if = "1.0"
Expand All @@ -31,7 +34,7 @@ supabase-wrappers = { path = "../supabase-wrappers" }
clickhouse-rs = { git = "https://github.com/suharev7/clickhouse-rs", branch = "async-await", features = ["tls"], optional = true }
chrono = { version = "0.4", optional = true }

# for bigquery_fdw, firebase_fdw and etc.
# for bigquery_fdw, firebase_fdw, airtable_fdw and etc.
gcp-bigquery-client = { version = "0.16.0", optional = true }
time = { version = "0.3.15", features = ["parsing"], optional = true }
serde = { version = "1", optional = true }
Expand All @@ -48,6 +51,9 @@ reqwest-retry = { version = "0.1.5", optional = true }
yup-oauth2 = { version = "8.0.0", optional = true }
regex = { version = "1", optional = true }

# for airtable_fdw
url = { version = "2.3", optional = true }

[dev-dependencies]
pgx-tests = "=0.5.6"

Expand Down
67 changes: 67 additions & 0 deletions wrappers/src/fdw/airtable_fdw/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
# Airtable Foreign Data Wrapper

This is a foreign data wrapper for [Airtable](https://airtable.com/). It is developed using [Wrappers](https://github.com/supabase/wrappers).

## Basic usage

These steps outline how to use the Airtable FDW:

1. Clone this repo

```bash
git clone https://github.com/supabase/wrappers.git
```

2. Run it using pgx with feature:

```bash
cd wrappers/wrappers
cargo pgx run --features airtable_fdw
```

3. Create the extension, foreign data wrapper and related objects:

```sql
-- create extension
drop extension if exists wrappers cascade;
create extension wrappers;

-- create foreign data wrapper and enable 'AirtableFdw'
drop foreign data wrapper if exists airtable_wrapper cascade;
create foreign data wrapper airtable_wrapper
handler wrappers_handler
validator wrappers_validator
options (
wrapper 'AirtableFdw'
);

-- create a wrappers Airtable server and specify connection info
drop server if exists my_airtable_server cascade;
create server my_airtable_server
foreign data wrapper airtable_wrapper
options (
api_url 'https://api.airtable.com/v0', -- Airtable API base URL, optional
api_key 'at_test_key' -- Airtable API Key, required
);

-- create an example foreign table
drop foreign table if exists t1;
create foreign table t1(
id text, -- The builtin "id" field in Airtable
name text, -- The fields in your Airtable table. Airtable is case insensitive so capitalization does not matter.
status text,
)
server my_airtable_server
options (
base_id 'at_base_id' -- Airtable Base ID, required
table 'My Table Name' -- Airtable Table Name (or ID), required
);
```

4. Run some queries to check if it is working:

On Postgres:

```sql
select * from t1;
```
179 changes: 179 additions & 0 deletions wrappers/src/fdw/airtable_fdw/airtable_fdw.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,179 @@
use pgx::log::PgSqlErrorCode;
use reqwest::{self, header};
use reqwest_middleware::{ClientBuilder, ClientWithMiddleware};
use reqwest_retry::{policies::ExponentialBackoff, RetryTransientMiddleware};
use std::collections::HashMap;
use url::Url;

use supabase_wrappers::{
create_async_runtime, log_warning, report_error, require_option, wrappers_meta,
ForeignDataWrapper, Limit, Qual, Row, Runtime, Sort,
};

use super::result::AirtableResponse;

#[wrappers_meta(
version = "0.1.0",
author = "Ankur Goyal",
website = "https://github.com/supabase/wrappers/tree/main/wrappers/src/fdw/airtable_fdw"
)]
pub(crate) struct AirtableFdw {
rt: Runtime,
base_url: String,
client: Option<ClientWithMiddleware>,
scan_result: Option<Vec<Row>>,
}

impl AirtableFdw {
pub fn new(options: &HashMap<String, String>) -> Self {
let base_url = options
.get("api_url")
.map(|t| t.to_owned())
.unwrap_or("https://api.airtable.com/v0/app4PDEzNrArJdQ5k".to_string())
.trim_end_matches('/')
.to_owned();

let client = require_option("api_key", options).map(|api_key| {
let mut headers = header::HeaderMap::new();
let value = format!("Bearer {}", api_key);
let mut auth_value = header::HeaderValue::from_str(&value).unwrap();
auth_value.set_sensitive(true);
headers.insert(header::AUTHORIZATION, auth_value);
let client = reqwest::Client::builder()
.default_headers(headers)
.build()
.unwrap();
let retry_policy = ExponentialBackoff::builder().build_with_max_retries(3);
let client = ClientBuilder::new(client)
.with(RetryTransientMiddleware::new_with_policy(retry_policy))
.build();
client
});

AirtableFdw {
rt: create_async_runtime(),
base_url,
client,
scan_result: None,
}
}

#[inline]
fn build_url(&self, base_id: &str, table_name: &str) -> String {
format!("{}/{}/{}", &self.base_url, base_id, table_name)
}

#[inline]
fn set_limit_offset(
&self,
url: &str,
page_size: Option<usize>,
offset: Option<&str>,
) -> Result<String, url::ParseError> {
let mut params = Vec::new();
if let Some(page_size) = page_size {
params.push(("pageSize", format!("{}", page_size)));
}
if let Some(offset) = offset {
params.push(("offset", offset.to_string()));
}

Url::parse_with_params(url, &params).map(|x| x.into())
}

// convert response body text to rows
fn parse_resp(&self, resp_body: &str, columns: &Vec<String>) -> (Vec<Row>, Option<String>) {
let response: AirtableResponse = serde_json::from_str(resp_body).unwrap();
let mut result = Vec::new();

for record in response.records.iter() {
result.push(record.to_row(columns));
}

(result, response.offset)
}
}

macro_rules! report_fetch_error {
($url:ident, $err:ident) => {
report_error(
PgSqlErrorCode::ERRCODE_FDW_ERROR,
&format!("fetch {} failed: {}", $url, $err),
)
};
}

// TODO Add support for INSERT, UPDATE, DELETE
impl ForeignDataWrapper for AirtableFdw {
fn begin_scan(
&mut self,
_quals: &Vec<Qual>, // TODO: Propagate filters
columns: &Vec<String>,
_sorts: &Vec<Sort>, // TODO: Propagate sort
_limit: &Option<Limit>, // TODO: maxRecords
options: &HashMap<String, String>,
) {
// TODO: Support specifying other options (view)
let url = if let Some(url) = require_option("base_id", options).and_then(|base_id| {
require_option("table", options).map(|table| self.build_url(&base_id, &table))
}) {
url
} else {
return;
};

let mut rows = Vec::new();
if let Some(client) = &self.client {
let mut offset: Option<String> = None;

loop {
// Fetch all of the rows upfront. Arguably, this could be done in batches (and invoked each
// time iter_scan() runs out of rows) to pipeline the I/O, but we'd have to manage more
// state so starting with the simpler solution.
let url = match self.set_limit_offset(&url, None, offset.as_deref()) {
Ok(url) => url,
Err(err) => {
report_error(
PgSqlErrorCode::ERRCODE_FDW_ERROR,
&format!("internal error: {}", err),
);
return;
}
};

match self.rt.block_on(client.get(&url).send()) {
Ok(resp) => match resp.error_for_status() {
Ok(resp) => {
let body = self.rt.block_on(resp.text()).unwrap();
let (new_rows, new_offset) = self.parse_resp(&body, columns);
rows.extend(new_rows.into_iter());

if let Some(new_offset) = new_offset {
offset = Some(new_offset);
} else {
break;
}
}
Err(err) => report_fetch_error!(url, err),
},
Err(err) => report_fetch_error!(url, err),
}
}
}

self.scan_result = Some(rows);
}

fn iter_scan(&mut self) -> Option<Row> {
if let Some(ref mut result) = self.scan_result {
if !result.is_empty() {
return result.drain(0..1).last();
}
}
None
}

fn end_scan(&mut self) {
self.scan_result.take();
}
}
4 changes: 4 additions & 0 deletions wrappers/src/fdw/airtable_fdw/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
mod airtable_fdw;
mod result;

pub(crate) use airtable_fdw::AirtableFdw;
Loading

0 comments on commit f672be4

Please sign in to comment.