From e95a94542ebcccd5ddd1fbb00dae546c4478f42d Mon Sep 17 00:00:00 2001 From: Thomas Hatzopoulos Date: Mon, 6 Feb 2023 13:18:05 -0600 Subject: [PATCH] Stabilizing candlestick and adding tests --- Changelog.md | 4 + docs/test_candlestick_agg.md | 90 +++ extension/src/accessors.rs | 9 +- extension/src/candlestick.rs | 857 +++++++++----------------- extension/src/lib.rs | 2 +- extension/src/stabilization_info.rs | 75 +++ tests/update/candlestick.md | 44 ++ tools/update-tester/src/testrunner.rs | 23 +- 8 files changed, 536 insertions(+), 568 deletions(-) create mode 100644 docs/test_candlestick_agg.md create mode 100644 tests/update/candlestick.md diff --git a/Changelog.md b/Changelog.md index 3dda7e7ed..3378e3ac7 100644 --- a/Changelog.md +++ b/Changelog.md @@ -12,6 +12,10 @@ This changelog should be updated as part of a PR if the work is worth noting (mo - [#660](https://github.com/timescale/timescaledb-toolkit/issues/660): Heartbeat aggregate rollup should interpolate aggregates - [#679](https://github.com/timescale/timescaledb-toolkit/issues/679): Heartbeat agg rollup producing invalid aggregates. +#### Stabilized features + +- [#650](https://github.com/timescale/timescaledb-toolkit/pull/701): Stabilize candlestick. + #### Other notable changes - [#685](https://github.com/timescale/timescaledb-toolkit/issues/685): rollup for freq_agg and topn_agg - [#692](https://github.com/timescale/timescaledb-toolkit/pull/692): Support specifying a range to `duration_in` to specify a time range to get states in for state aggregates diff --git a/docs/test_candlestick_agg.md b/docs/test_candlestick_agg.md new file mode 100644 index 000000000..b8ad93688 --- /dev/null +++ b/docs/test_candlestick_agg.md @@ -0,0 +1,90 @@ +# Candlestick Continuous Aggregation Tests + +## Setup table +```SQL,non-transactional,ignore-output +SET TIME ZONE 'UTC'; +CREATE TABLE stocks_real_time ( + time TIMESTAMPTZ NOT NULL, + symbol TEXT NOT NULL, + price DOUBLE PRECISION NULL, + day_volume INT NULL +); +SELECT create_hypertable('stocks_real_time','time'); +CREATE INDEX ix_symbol_time ON stocks_real_time (symbol, time DESC); + +CREATE TABLE company ( + symbol TEXT NOT NULL, + name TEXT NOT NULL +); +``` + +## Setup Continuous Aggs +```SQL,non-transactional,ignore-output +CREATE MATERIALIZED VIEW cs +WITH (timescaledb.continuous) AS +SELECT time_bucket('1 minute'::interval, "time") AS ts, + symbol, + candlestick_agg("time", price, day_volume) AS candlestick +FROM stocks_real_time +GROUP BY ts, symbol; +``` + +## Insert data into table +```SQL,non-transactional,ignore-output +INSERT INTO stocks_real_time("time","symbol","price","day_volume") +VALUES +('2023-01-11 17:59:57-06','AAPL',133.445,NULL), +('2023-01-11 17:59:55-06','PFE',47.38,NULL), +('2023-01-11 17:59:54-06','AMZN',95.225,NULL), +('2023-01-11 17:59:52-06','AAPL',29.82,NULL); +``` + +```SQL,non-transactional,ignore-output +INSERT INTO company("symbol","name") +VALUES +('AAPL','Apple'), +('PFE','Pfizer'), +('AMZN','Amazon'); +``` +## Query by-minute continuous aggregate over stock trade data for ohlc prices along with timestamps + +```SQL,non-transactional,ignore-output +SELECT ts, + symbol, + open_time(candlestick), + open(candlestick), + high_time(candlestick), + high(candlestick), + low_time(candlestick), + low(candlestick), + close_time(candlestick), + close(candlestick) +FROM cs; +``` + +```output + ts | symbol | open_time | open | high_time | high | low_time | low | close_time | close +------------------------+--------+------------------------+--------+------------------------+---------+------------------------+--------+------------------------+--------- + 2023-01-11 23:59:00+00 | PFE | 2023-01-11 23:59:55+00 | 47.38 | 2023-01-11 23:59:55+00 | 47.38 | 2023-01-11 23:59:55+00 | 47.38 | 2023-01-11 23:59:55+00 | 47.38 + 2023-01-11 23:59:00+00 | AAPL | 2023-01-11 23:59:52+00 | 29.82 | 2023-01-11 23:59:57+00 | 133.445 | 2023-01-11 23:59:52+00 | 29.82 | 2023-01-11 23:59:57+00 | 133.445 + 2023-01-11 23:59:00+00 | AMZN | 2023-01-11 23:59:54+00 | 95.225 | 2023-01-11 23:59:54+00 | 95.225 | 2023-01-11 23:59:54+00 | 95.225 | 2023-01-11 23:59:54+00 | 95.225 +``` + +## Roll up your by minute continuous agg into daily buckets and return the volume weighted average price for AAPL and its high price + +```SQL,non-transactional,ignore-output +SELECT + time_bucket('1 day'::interval, ts) AS daily_bucket, + symbol, + vwap(rollup(candlestick)), + high(rollup(candlestick)) +FROM cs +WHERE symbol = 'AAPL' +GROUP BY daily_bucket,symbol; +``` + +```output + daily_bucket | symbol | vwap | high +------------------------+--------+------+--------- + 2023-01-11 00:00:00+00 | AAPL | NULL | 133.445 +``` diff --git a/extension/src/accessors.rs b/extension/src/accessors.rs index 99dd4eb21..27319f7a8 100644 --- a/extension/src/accessors.rs +++ b/extension/src/accessors.rs @@ -89,7 +89,14 @@ accessor! { first_val() } accessor! { last_val() } accessor! { first_time() } accessor! { last_time() } - +accessor! { open()} +accessor! { close()} +accessor! { high()} +accessor! { low()} +accessor! { open_time()} +accessor! { high_time()} +accessor! { low_time()} +accessor! { close_time()} // The rest are more complex, with String or other challenges. Leaving alone for now. pg_type! { diff --git a/extension/src/candlestick.rs b/extension/src/candlestick.rs index 02edf34ae..6db5fb9ff 100644 --- a/extension/src/candlestick.rs +++ b/extension/src/candlestick.rs @@ -1,6 +1,10 @@ use pgx::*; use serde::{Deserialize, Serialize}; +use crate::accessors::{ + AccessorClose, AccessorCloseTime, AccessorHigh, AccessorHighTime, AccessorLow, AccessorLowTime, + AccessorOpen, AccessorOpenTime, +}; use crate::{ aggregate_utils::in_aggregate_context, flatten, @@ -11,189 +15,175 @@ use crate::{ }; use tspoint::TSPoint; -#[pg_schema] -pub mod toolkit_experimental { - use super::*; - - flat_serialize_macro::flat_serialize! { - #[derive(Serialize, Deserialize, Debug, Copy)] - enum VolKind { - unused_but_required_by_flat_serialize: u64, - Missing: 1 {}, - Transaction: 2 { vol: f64, vwap: f64 }, - } +flat_serialize_macro::flat_serialize! { + #[derive(Serialize, Deserialize, Debug, Copy)] + enum VolKind { + unused_but_required_by_flat_serialize: u64, + Missing: 1 {}, + Transaction: 2 { vol: f64, vwap: f64 }, } +} - pg_type! { - #[derive(Debug, Copy)] - struct Candlestick { - open: TSPoint, - high: TSPoint, - low: TSPoint, - close: TSPoint, - #[flat_serialize::flatten] - volume: VolKind, - } +pg_type! { + #[derive(Debug, Copy)] + struct Candlestick { + open: TSPoint, + high: TSPoint, + low: TSPoint, + close: TSPoint, + #[flat_serialize::flatten] + volume: VolKind, } +} - impl Candlestick<'_> { - pub fn new( - ts: i64, - open: f64, - high: f64, - low: f64, - close: f64, - volume: Option, - ) -> Self { - let volume = match volume { - None => VolKind::Missing {}, - Some(volume) => { - let typical = (high + low + close) / 3.0; - VolKind::Transaction { - vol: volume, - vwap: volume * typical, - } +impl Candlestick<'_> { + pub fn new(ts: i64, open: f64, high: f64, low: f64, close: f64, volume: Option) -> Self { + let volume = match volume { + None => VolKind::Missing {}, + Some(volume) => { + let typical = (high + low + close) / 3.0; + VolKind::Transaction { + vol: volume, + vwap: volume * typical, } - }; - - unsafe { - flatten!(Candlestick { - open: TSPoint { ts, val: open }, - high: TSPoint { ts, val: high }, - low: TSPoint { ts, val: low }, - close: TSPoint { ts, val: close }, - volume, - }) } - } + }; - pub fn from_tick(ts: i64, price: f64, volume: Option) -> Self { - Candlestick::new(ts, price, price, price, price, volume) + unsafe { + flatten!(Candlestick { + open: TSPoint { ts, val: open }, + high: TSPoint { ts, val: high }, + low: TSPoint { ts, val: low }, + close: TSPoint { ts, val: close }, + volume, + }) } + } - pub fn add_tick_data(&mut self, ts: i64, price: f64, volume: Option) { - if ts < self.open.ts { - self.open = TSPoint { ts, val: price }; - } - - if price > self.high.val { - self.high = TSPoint { ts, val: price }; - } - - if price < self.low.val { - self.low = TSPoint { ts, val: price }; - } - - if ts > self.close.ts { - self.close = TSPoint { ts, val: price }; - } + pub fn from_tick(ts: i64, price: f64, volume: Option) -> Self { + Candlestick::new(ts, price, price, price, price, volume) + } - if let (VolKind::Transaction { vol, vwap }, Some(volume)) = (self.volume, volume) { - self.volume = VolKind::Transaction { - vol: vol + volume, - vwap: vwap + volume * price, - }; - } else { - self.volume = VolKind::Missing {}; - }; + pub fn add_tick_data(&mut self, ts: i64, price: f64, volume: Option) { + if ts < self.open.ts { + self.open = TSPoint { ts, val: price }; } - pub fn combine(&mut self, candlestick: &Candlestick) { - if candlestick.open.ts < self.open.ts { - self.open = candlestick.open; - } - - if candlestick.high.val > self.high.val { - self.high = candlestick.high; - } + if price > self.high.val { + self.high = TSPoint { ts, val: price }; + } - if candlestick.low.val < self.low.val { - self.low = candlestick.low; - } + if price < self.low.val { + self.low = TSPoint { ts, val: price }; + } - if candlestick.close.ts > self.close.ts { - self.close = candlestick.close; - } + if ts > self.close.ts { + self.close = TSPoint { ts, val: price }; + } - if let ( - VolKind::Transaction { - vol: vol1, - vwap: vwap1, - }, - VolKind::Transaction { - vol: vol2, - vwap: vwap2, - }, - ) = (self.volume, candlestick.volume) - { - self.volume = VolKind::Transaction { - vol: vol1 + vol2, - vwap: vwap1 + vwap2, - }; - } else { - self.volume = VolKind::Missing {}; + if let (VolKind::Transaction { vol, vwap }, Some(volume)) = (self.volume, volume) { + self.volume = VolKind::Transaction { + vol: vol + volume, + vwap: vwap + volume * price, }; - } + } else { + self.volume = VolKind::Missing {}; + }; + } - pub fn open(&self) -> f64 { - self.open.val + pub fn combine(&mut self, candlestick: &Candlestick) { + if candlestick.open.ts < self.open.ts { + self.open = candlestick.open; } - pub fn high(&self) -> f64 { - self.high.val + if candlestick.high.val > self.high.val { + self.high = candlestick.high; } - pub fn low(&self) -> f64 { - self.low.val + if candlestick.low.val < self.low.val { + self.low = candlestick.low; } - pub fn close(&self) -> f64 { - self.close.val + if candlestick.close.ts > self.close.ts { + self.close = candlestick.close; } - pub fn open_time(&self) -> i64 { - self.open.ts - } + if let ( + VolKind::Transaction { + vol: vol1, + vwap: vwap1, + }, + VolKind::Transaction { + vol: vol2, + vwap: vwap2, + }, + ) = (self.volume, candlestick.volume) + { + self.volume = VolKind::Transaction { + vol: vol1 + vol2, + vwap: vwap1 + vwap2, + }; + } else { + self.volume = VolKind::Missing {}; + }; + } - pub fn high_time(&self) -> i64 { - self.high.ts - } + pub fn open(&self) -> f64 { + self.open.val + } - pub fn low_time(&self) -> i64 { - self.low.ts - } + pub fn high(&self) -> f64 { + self.high.val + } - pub fn close_time(&self) -> i64 { - self.close.ts - } + pub fn low(&self) -> f64 { + self.low.val + } - pub fn volume(&self) -> Option { - match self.volume { - VolKind::Transaction { vol, .. } => Some(vol), - VolKind::Missing {} => None, - } + pub fn close(&self) -> f64 { + self.close.val + } + + pub fn open_time(&self) -> i64 { + self.open.ts + } + + pub fn high_time(&self) -> i64 { + self.high.ts + } + + pub fn low_time(&self) -> i64 { + self.low.ts + } + + pub fn close_time(&self) -> i64 { + self.close.ts + } + + pub fn volume(&self) -> Option { + match self.volume { + VolKind::Transaction { vol, .. } => Some(vol), + VolKind::Missing {} => None, } + } - pub fn vwap(&self) -> Option { - match self.volume { - VolKind::Transaction { vol, vwap } => { - if vol > 0.0 && vwap.is_finite() { - Some(vwap / vol) - } else { - None - } + pub fn vwap(&self) -> Option { + match self.volume { + VolKind::Transaction { vol, vwap } => { + if vol > 0.0 && vwap.is_finite() { + Some(vwap / vol) + } else { + None } - VolKind::Missing {} => None, } + VolKind::Missing {} => None, } } - - ron_inout_funcs!(Candlestick); } -use toolkit_experimental::Candlestick; +ron_inout_funcs!(Candlestick); -#[pg_extern(immutable, parallel_safe, schema = "toolkit_experimental")] +#[pg_extern(immutable, parallel_safe)] pub fn candlestick( ts: Option, open: Option, @@ -215,7 +205,7 @@ pub fn candlestick( } } -#[pg_extern(immutable, parallel_safe, schema = "toolkit_experimental")] +#[pg_extern(immutable, parallel_safe)] pub fn tick_data_no_vol_transition( state: Internal, ts: Option, @@ -225,7 +215,7 @@ pub fn tick_data_no_vol_transition( tick_data_transition_inner(unsafe { state.to_inner() }, ts, price, None, fcinfo).internal() } -#[pg_extern(immutable, parallel_safe, schema = "toolkit_experimental")] +#[pg_extern(immutable, parallel_safe)] pub fn tick_data_transition( state: Internal, ts: Option, @@ -263,7 +253,7 @@ pub fn tick_data_transition_inner( } } -#[pg_extern(immutable, parallel_safe, schema = "toolkit_experimental")] +#[pg_extern(immutable, parallel_safe)] pub fn candlestick_rollup_trans<'a>( state: Internal, value: Option>, @@ -290,7 +280,7 @@ pub fn candlestick_rollup_trans_inner<'input>( } } -#[pg_extern(immutable, parallel_safe, schema = "toolkit_experimental")] +#[pg_extern(immutable, parallel_safe)] pub fn candlestick_final( state: Internal, fcinfo: pg_sys::FunctionCallInfo, @@ -313,7 +303,7 @@ pub fn candlestick_final_inner( } } -#[pg_extern(immutable, parallel_safe, schema = "toolkit_experimental")] +#[pg_extern(immutable, parallel_safe)] pub fn candlestick_combine( state1: Internal, state2: Internal, @@ -340,20 +330,19 @@ pub fn candlestick_combine_inner<'input>( } } -#[pg_extern(immutable, parallel_safe, strict, schema = "toolkit_experimental")] +#[pg_extern(immutable, parallel_safe, strict)] pub fn candlestick_serialize(state: Internal) -> bytea { let cs: &mut Candlestick = unsafe { state.get_mut().unwrap() }; let ser = &**cs; crate::do_serialize!(ser) } -#[pg_extern(immutable, parallel_safe, strict, schema = "toolkit_experimental")] +#[pg_extern(immutable, parallel_safe, strict)] pub fn candlestick_deserialize(bytes: bytea, _internal: Internal) -> Option { candlestick_deserialize_inner(bytes).internal() } pub fn candlestick_deserialize_inner(bytes: bytea) -> Inner> { - use crate::ohlc::toolkit_experimental::CandlestickData; let de: CandlestickData = crate::do_deserialize!(bytes, CandlestickData); let cs: Candlestick = de.into(); cs.into() @@ -361,40 +350,18 @@ pub fn candlestick_deserialize_inner(bytes: bytea) -> Inner extension_sql!( "\n\ - CREATE AGGREGATE toolkit_experimental.ohlc( ts timestamptz, price DOUBLE PRECISION )\n\ - (\n\ - sfunc = toolkit_experimental.tick_data_no_vol_transition,\n\ - stype = internal,\n\ - finalfunc = toolkit_experimental.candlestick_final,\n\ - combinefunc = toolkit_experimental.candlestick_combine,\n\ - serialfunc = toolkit_experimental.candlestick_serialize,\n\ - deserialfunc = toolkit_experimental.candlestick_deserialize,\n\ - parallel = safe\n\ - );\n", - name = "ohlc", - requires = [ - tick_data_no_vol_transition, - candlestick_final, - candlestick_combine, - candlestick_serialize, - candlestick_deserialize - ], -); - -extension_sql!( - "\n\ - CREATE AGGREGATE toolkit_experimental.candlestick_agg( \n\ + CREATE AGGREGATE candlestick_agg( \n\ ts TIMESTAMPTZ,\n\ price DOUBLE PRECISION,\n\ volume DOUBLE PRECISION\n\ )\n\ (\n\ - sfunc = toolkit_experimental.tick_data_transition,\n\ + sfunc = tick_data_transition,\n\ stype = internal,\n\ - finalfunc = toolkit_experimental.candlestick_final,\n\ - combinefunc = toolkit_experimental.candlestick_combine,\n\ - serialfunc = toolkit_experimental.candlestick_serialize,\n\ - deserialfunc = toolkit_experimental.candlestick_deserialize,\n\ + finalfunc = candlestick_final,\n\ + combinefunc = candlestick_combine,\n\ + serialfunc = candlestick_serialize,\n\ + deserialfunc = candlestick_deserialize,\n\ parallel = safe\n\ );\n", name = "candlestick_agg", @@ -409,17 +376,17 @@ extension_sql!( extension_sql!( "\n\ - CREATE AGGREGATE toolkit_experimental.rollup( candlestick toolkit_experimental.Candlestick)\n\ + CREATE AGGREGATE rollup( candlestick Candlestick)\n\ (\n\ - sfunc = toolkit_experimental.candlestick_rollup_trans,\n\ + sfunc = candlestick_rollup_trans,\n\ stype = internal,\n\ - finalfunc = toolkit_experimental.candlestick_final,\n\ - combinefunc = toolkit_experimental.candlestick_combine,\n\ - serialfunc = toolkit_experimental.candlestick_serialize,\n\ - deserialfunc = toolkit_experimental.candlestick_deserialize,\n\ + finalfunc = candlestick_final,\n\ + combinefunc = candlestick_combine,\n\ + serialfunc = candlestick_serialize,\n\ + deserialfunc = candlestick_deserialize,\n\ parallel = safe\n\ );\n", - name = "ohlc_rollup", + name = "candlestick_rollup", requires = [ candlestick_rollup_trans, candlestick_final, @@ -429,47 +396,116 @@ extension_sql!( ], ); -#[pg_extern(immutable, parallel_safe, schema = "toolkit_experimental")] +#[pg_extern(immutable, parallel_safe)] +#[opname(->)] +pub fn arrow_open( + candlestick: Option>, + _accessor: AccessorOpen<'_>, +) -> Option { + candlestick.map(|cs| cs.open()) +} + +#[pg_extern(immutable, parallel_safe)] pub fn open(candlestick: Option>) -> Option { candlestick.map(|cs| cs.open()) } -#[pg_extern(immutable, parallel_safe, schema = "toolkit_experimental")] +#[pg_operator(immutable, parallel_safe)] +#[opname(->)] +pub fn arrow_high( + candlestick: Option>, + _accessor: AccessorHigh<'_>, +) -> Option { + candlestick.map(|cs| cs.high()) +} + +#[pg_extern(immutable, parallel_safe)] pub fn high(candlestick: Option>) -> Option { candlestick.map(|cs| cs.high()) } -#[pg_extern(immutable, parallel_safe, schema = "toolkit_experimental")] +#[pg_operator(immutable, parallel_safe)] +#[opname(->)] +pub fn arrow_low(candlestick: Option>, _accessor: AccessorLow<'_>) -> Option { + candlestick.map(|cs| cs.low()) +} + +#[pg_extern(immutable, parallel_safe)] pub fn low(candlestick: Option>) -> Option { candlestick.map(|cs| cs.low()) } -#[pg_extern(immutable, parallel_safe, schema = "toolkit_experimental")] +#[pg_operator(immutable, parallel_safe)] +#[opname(->)] +pub fn arrow_close( + candlestick: Option>, + _accessor: AccessorClose<'_>, +) -> Option { + candlestick.map(|cs| cs.close()) +} + +#[pg_extern(immutable, parallel_safe)] pub fn close(candlestick: Option>) -> Option { candlestick.map(|cs| cs.close()) } -#[pg_extern(immutable, parallel_safe, schema = "toolkit_experimental")] +#[pg_operator(immutable, parallel_safe)] +#[opname(->)] +pub fn arrow_open_time( + candlestick: Option>, + _accessor: AccessorOpenTime<'_>, +) -> Option { + candlestick.map(|cs| cs.open_time().into()) +} + +#[pg_extern(immutable, parallel_safe)] pub fn open_time(candlestick: Option>) -> Option { candlestick.map(|cs| cs.open_time().into()) } -#[pg_extern(immutable, parallel_safe, schema = "toolkit_experimental")] +#[pg_operator(immutable, parallel_safe)] +#[opname(->)] +pub fn arrow_high_time( + candlestick: Option>, + _accessor: AccessorHighTime<'_>, +) -> Option { + candlestick.map(|cs| cs.high_time().into()) +} + +#[pg_extern(immutable, parallel_safe)] pub fn high_time(candlestick: Option>) -> Option { candlestick.map(|cs| cs.high_time().into()) } -#[pg_extern(immutable, parallel_safe, schema = "toolkit_experimental")] +#[pg_operator(immutable, parallel_safe)] +#[opname(->)] +pub fn arrow_low_time( + candlestick: Option>, + _accessor: AccessorLowTime<'_>, +) -> Option { + candlestick.map(|cs| cs.low_time().into()) +} + +#[pg_extern(immutable, parallel_safe)] pub fn low_time(candlestick: Option>) -> Option { candlestick.map(|cs| cs.low_time().into()) } -#[pg_extern(immutable, parallel_safe, schema = "toolkit_experimental")] +#[pg_operator(immutable, parallel_safe)] +#[opname(->)] +pub fn arrow_close_time( + candlestick: Option>, + _accessor: AccessorCloseTime<'_>, +) -> Option { + candlestick.map(|cs| cs.close_time().into()) +} + +#[pg_extern(immutable, parallel_safe)] pub fn close_time(candlestick: Option>) -> Option { candlestick.map(|cs| cs.close_time().into()) } -#[pg_extern(immutable, parallel_safe, schema = "toolkit_experimental")] +#[pg_extern(immutable, parallel_safe)] pub fn volume(candlestick: Option>) -> Option { match candlestick { None => None, @@ -477,7 +513,7 @@ pub fn volume(candlestick: Option>) -> Option { } } -#[pg_extern(immutable, parallel_safe, schema = "toolkit_experimental")] +#[pg_extern(immutable, parallel_safe)] pub fn vwap(candlestick: Option>) -> Option { match candlestick { None => None, @@ -488,6 +524,8 @@ pub fn vwap(candlestick: Option>) -> Option { #[cfg(any(test, feature = "pg_test"))] #[pg_schema] mod tests { + use std::ptr; + use super::*; use pgx_macros::pg_test; @@ -505,45 +543,12 @@ mod tests { }; } - #[pg_test] - fn ohlc_single_point() { - Spi::execute(|client| { - client.select("SET timezone TO 'UTC'", None, None); - client.select( - "CREATE TABLE test(ts TIMESTAMPTZ, price DOUBLE PRECISION)", - None, - None, - ); - client.select( - "INSERT INTO test VALUES ('2022-08-01 00:00:00+00', 0.0)", - None, - None, - ); - - let output = select_one!( - client, - "SELECT toolkit_experimental.ohlc(ts, price)::text FROM test", - &str - ); - - let expected = "(\ - version:1,\ - open:(ts:\"2022-08-01 00:00:00+00\",val:0),\ - high:(ts:\"2022-08-01 00:00:00+00\",val:0),\ - low:(ts:\"2022-08-01 00:00:00+00\",val:0),\ - close:(ts:\"2022-08-01 00:00:00+00\",val:0),\ - volume:Missing()\ - )"; - assert_eq!(expected, output.unwrap()); - }); - } - #[pg_test] fn candlestick_single_point() { Spi::execute(|client| { client.select("SET timezone TO 'UTC'", None, None); - let stmt = r#"SELECT toolkit_experimental.candlestick(ts, open, high, low, close, volume)::text + let stmt = r#"SELECT candlestick(ts, open, high, low, close, volume)::text FROM ( VALUES ('2022-08-01 00:00:00+00'::timestamptz, 0.0, 0.0, 0.0, 0.0, 1.0) ) AS v(ts, open, high, low, close, volume)"#; @@ -567,7 +572,7 @@ mod tests { Spi::execute(|client| { client.select("SET timezone TO 'UTC'", None, None); - let stmt = r#"SELECT toolkit_experimental.candlestick_agg(ts, price, volume)::text + let stmt = r#"SELECT candlestick_agg(ts, price, volume)::text FROM ( VALUES ('2022-08-01 00:00:00+00'::timestamptz, 0.0, 1.0) ) AS v(ts, price, volume)"#; @@ -587,57 +592,35 @@ mod tests { } #[pg_test] - fn ohlc_accessors() { + fn candlestick_accessors() { Spi::execute(|client| { client.select("SET timezone TO 'UTC'", None, None); - client.select("CREATE TABLE test(ts TIMESTAMPTZ, price FLOAT)", None, None); - client.select( - r#"INSERT INTO test VALUES - ('2022-08-01 00:00:00+00', 0.0) - "#, - None, - None, - ); - - client.select( - "CREATE VIEW ohlc_view AS \ - SELECT toolkit_experimental.ohlc(ts, price) \ - FROM test", - None, - None, - ); - - let sanity = client - .select("SELECT COUNT(*) FROM ohlc_view", None, None) - .first() - .get_one::(); - assert!(sanity.unwrap_or(0) > 0); - - for ohlc in &["open", "high", "low", "close"] { - let (val, ts) = select_two!( - client, - format!("SELECT toolkit_experimental.{ohlc}(ohlc), toolkit_experimental.{ohlc}_time(ohlc)::text FROM ohlc_view").as_str(), - f64, - &str + + for ohlc in ["open", "high", "low", "close"] { + let stmt = format!( + r#"SELECT + {ohlc}(candlestick), + {ohlc}_time(candlestick)::text + FROM ( + SELECT candlestick(ts, open, high, low, close, volume) + FROM ( + VALUES ('2022-08-01 00:00:00+00'::timestamptz, 0.0, 0.0, 0.0, 0.0, 1.0) + ) AS v(ts, open, high, low, close, volume) + ) AS v(candlestick)"# ); + let (val, ts) = select_two!(client, &stmt, f64, &str); assert_eq!(0.0, val.unwrap()); assert_eq!("2022-08-01 00:00:00+00", ts.unwrap()); } - }); - } - - #[pg_test] - fn candlestick_accessors() { - Spi::execute(|client| { - client.select("SET timezone TO 'UTC'", None, None); + // testing arrow operators for ohlc in ["open", "high", "low", "close"] { let stmt = format!( r#"SELECT - toolkit_experimental.{ohlc}(candlestick), - toolkit_experimental.{ohlc}_time(candlestick)::text + candlestick->{ohlc}(), + (candlestick->{ohlc}_time())::text FROM ( - SELECT toolkit_experimental.candlestick(ts, open, high, low, close, volume) + SELECT candlestick(ts, open, high, low, close, volume) FROM ( VALUES ('2022-08-01 00:00:00+00'::timestamptz, 0.0, 0.0, 0.0, 0.0, 1.0) ) AS v(ts, open, high, low, close, volume) @@ -649,10 +632,10 @@ mod tests { } let stmt = r#"SELECT - toolkit_experimental.volume(candlestick), - toolkit_experimental.vwap(candlestick) + volume(candlestick), + vwap(candlestick) FROM ( - SELECT toolkit_experimental.candlestick(ts, open, high, low, close, volume) + SELECT candlestick(ts, open, high, low, close, volume) FROM ( VALUES ('2022-08-01 00:00:00+00'::timestamptz, 0.0, 0.0, 0.0, 0.0, 1.0) ) AS v(ts, open, high, low, close, volume) @@ -671,10 +654,10 @@ mod tests { for ohlc in ["open", "high", "low", "close"] { let stmt = format!( r#"SELECT - toolkit_experimental.{ohlc}(candlestick), - toolkit_experimental.{ohlc}_time(candlestick)::text + {ohlc}(candlestick), + {ohlc}_time(candlestick)::text FROM ( - SELECT toolkit_experimental.candlestick_agg(ts, price, volume) + SELECT candlestick_agg(ts, price, volume) FROM ( VALUES ('2022-08-01 00:00:00+00'::timestamptz, 0.0, 1.0) ) AS v(ts, price, volume) @@ -686,10 +669,10 @@ mod tests { } let stmt = r#"SELECT - toolkit_experimental.volume(candlestick), - toolkit_experimental.vwap(candlestick) + volume(candlestick), + vwap(candlestick) FROM ( - SELECT toolkit_experimental.candlestick_agg(ts, price, volume) + SELECT candlestick_agg(ts, price, volume) FROM ( VALUES ('2022-08-01 00:00:00+00'::timestamptz, 0.0, 1.0) ) AS v(ts, price, volume) @@ -701,53 +684,6 @@ mod tests { }); } - #[pg_test] - fn ohlc_extreme_values() { - Spi::execute(|client| { - client.select("SET timezone TO 'UTC'", None, None); - client.select("CREATE TABLE test(ts TIMESTAMPTZ, price FLOAT)", None, None); - - // timestamptz low and high val according to https://www.postgresql.org/docs/14/datatype-datetime.html - for extreme_time in &["4713-01-01 00:00:00+00 BC", "294276-12-31 23:59:59+00"] { - let stmt = format!("SELECT toolkit_experimental.ohlc(ts, price)::text FROM (VALUES ('{}'::timestamptz, 1.0)) v(ts, price)", extreme_time); - - let output = select_one!(client, &stmt, &str); - - let expected = format!( - "(\ - version:1,\ - open:(ts:\"{}\",val:1),\ - high:(ts:\"{}\",val:1),\ - low:(ts:\"{}\",val:1),\ - close:(ts:\"{}\",val:1),\ - volume:Missing()\ - )", - extreme_time, extreme_time, extreme_time, extreme_time - ); - assert_eq!(expected, output.unwrap()); - } - - for extreme_price in &[f64::MAX, f64::MIN] { - let stmt = format!("SELECT toolkit_experimental.ohlc(ts, price)::text FROM (VALUES ('2022-08-01 00:00:00+00'::timestamptz, {})) v(ts, price)", extreme_price); - - let output = select_one!(client, &stmt, &str); - - let expected = format!( - "(\ - version:1,\ - open:(ts:\"2022-08-01 00:00:00+00\",val:{}),\ - high:(ts:\"2022-08-01 00:00:00+00\",val:{}),\ - low:(ts:\"2022-08-01 00:00:00+00\",val:{}),\ - close:(ts:\"2022-08-01 00:00:00+00\",val:{}),\ - volume:Missing()\ - )", - extreme_price, extreme_price, extreme_price, extreme_price - ); - assert_eq!(expected, output.unwrap()); - } - }); - } - #[pg_test] fn candlestick_agg_extreme_values() { Spi::execute(|client| { @@ -756,7 +692,7 @@ mod tests { // timestamptz low and high val according to https://www.postgresql.org/docs/14/datatype-datetime.html for extreme_time in &["4713-01-01 00:00:00+00 BC", "294276-12-31 23:59:59+00"] { let stmt = format!( - r#"SELECT toolkit_experimental.candlestick_agg(ts, price, volume)::text + r#"SELECT candlestick_agg(ts, price, volume)::text FROM (VALUES ('{}'::timestamptz, 1.0, 1.0)) AS v(ts, price, volume)"#, extreme_time ); @@ -779,7 +715,7 @@ mod tests { for extreme_price in &[f64::MAX, f64::MIN] { let stmt = format!( - r#"SELECT toolkit_experimental.candlestick_agg(ts, price, volume)::text + r#"SELECT candlestick_agg(ts, price, volume)::text FROM (VALUES ('2022-08-01 00:00:00+00'::timestamptz, {}, 1.0)) AS v(ts, price, volume)"#, extreme_price ); @@ -806,20 +742,6 @@ mod tests { }); } - #[pg_test] - fn ohlc_null_inputs() { - Spi::execute(|client| { - for (x, y) in &[("NULL", "NULL"), ("NULL", "1.0"), ("now()", "NULL")] { - let output = select_one!( - client, - format!("SELECT toolkit_experimental.ohlc({x}, {y})").as_str(), - String - ); - assert_eq!(output, None); - } - }); - } - #[pg_test] fn candlestick_null_inputs() { Spi::execute(|client| { @@ -831,9 +753,7 @@ mod tests { ("now()", "1.0", "1.0", "NULL", "1.0", "1.0"), ("now()", "1.0", "1.0", "1.0", "NULL", "1.0"), ] { - let stmt = format!( - "SELECT toolkit_experimental.candlestick({t}, {o}, {h}, {l}, {c}, {v})" - ); + let stmt = format!("SELECT candlestick({t}, {o}, {h}, {l}, {c}, {v})"); let output = select_one!(client, &stmt, String); assert_eq!(output, None); } @@ -848,8 +768,7 @@ mod tests { ("NULL", "1.0", "1.0"), ("now()", "NULL", "1.0"), ] { - let stmt = - format!("SELECT toolkit_experimental.candlestick_agg({ts}, {price}, {vol})"); + let stmt = format!("SELECT candlestick_agg({ts}, {price}, {vol})"); let output = select_one!(client, &stmt, String); assert_eq!(output, None); } @@ -867,7 +786,7 @@ mod tests { let output = select_one!( client, - "SELECT toolkit_experimental.candlestick_agg(ts, price, vol)::TEXT + "SELECT candlestick_agg(ts, price, vol)::TEXT FROM (VALUES('2022-08-01 00:00:00+00'::timestamptz, 1.0, NULL::double precision)) AS v(ts, price, vol)", String ).unwrap(); @@ -881,7 +800,7 @@ mod tests { client.select("SET timezone TO 'UTC'", None, None); let stmt = r#"SELECT - toolkit_experimental.candlestick(ts, open, high, low, close, volume)::text + candlestick(ts, open, high, low, close, volume)::text FROM ( VALUES ('2022-08-01 00:00:00+00'::timestamptz, 0.0, 0.0, 0.0, 0.0, 1.0), ('2022-08-02 00:00:00+00'::timestamptz, 9.0, 12.0, 3.0, 6.0, 1.0) @@ -913,41 +832,6 @@ mod tests { }); } - #[pg_test] - fn ohlc_constant() { - Spi::execute(|client| { - client.select("SET timezone TO 'UTC'", None, None); - client.select("CREATE TABLE test(ts TIMESTAMPTZ, price FLOAT)", None, None); - client.select( - r#"INSERT INTO test VALUES - ('2022-08-01 00:00:00+00', 0.0), - ('2022-08-01 06:00:00+00', 0.0), - ('2022-08-01 12:00:00+00', 0.0), - ('2022-08-01 18:00:00+00', 0.0), - ('2022-08-01 23:59:59+00', 0.0) - "#, - None, - None, - ); - - let stmt = r#"SELECT date_trunc('day', ts)::text - , toolkit_experimental.ohlc(ts, price)::text - FROM test - GROUP BY 1"#; - - let expected = "(\ - version:1,\ - open:(ts:\"2022-08-01 00:00:00+00\",val:0),\ - high:(ts:\"2022-08-01 00:00:00+00\",val:0),\ - low:(ts:\"2022-08-01 00:00:00+00\",val:0),\ - close:(ts:\"2022-08-01 23:59:59+00\",val:0),\ - volume:Missing()\ - )"; - let (_, output) = select_two!(client, stmt, &str, &str); - assert_eq!(expected, output.unwrap()); - }); - } - #[pg_test] fn candlestick_agg_constant() { Spi::execute(|client| { @@ -955,7 +839,7 @@ mod tests { let stmt = r#"SELECT date_trunc('day', ts)::text, - toolkit_experimental.candlestick_agg(ts, price, volume)::text + candlestick_agg(ts, price, volume)::text FROM ( VALUES ('2022-08-01 00:00:00+00'::timestamptz, 0.0, 1.0), ('2022-08-01 06:00:00+00'::timestamptz, 0.0, 1.0), @@ -978,41 +862,6 @@ mod tests { }); } - #[pg_test] - fn ohlc_strictly_increasing() { - Spi::execute(|client| { - client.select("SET timezone TO 'UTC'", None, None); - client.select("CREATE TABLE test(ts TIMESTAMPTZ, price FLOAT)", None, None); - client.select( - r#"INSERT INTO test VALUES - ('2022-08-01 00:00:00+00', 1.0), - ('2022-08-01 06:00:00+00', 2.0), - ('2022-08-01 12:00:00+00', 3.0), - ('2022-08-01 18:00:00+00', 4.0), - ('2022-08-01 23:59:59+00', 5.0) - "#, - None, - None, - ); - - let stmt = "SELECT date_trunc('day', ts)::text \ - , toolkit_experimental.ohlc(ts, price)::text \ - FROM test \ - GROUP BY 1"; - - let expected = "(\ - version:1,\ - open:(ts:\"2022-08-01 00:00:00+00\",val:1),\ - high:(ts:\"2022-08-01 23:59:59+00\",val:5),\ - low:(ts:\"2022-08-01 00:00:00+00\",val:1),\ - close:(ts:\"2022-08-01 23:59:59+00\",val:5),\ - volume:Missing()\ - )"; - let (_, output) = select_two!(client, stmt, &str, &str); - assert_eq!(expected, output.unwrap()); - }); - } - #[pg_test] fn candlestick_agg_strictly_increasing() { Spi::execute(|client| { @@ -1020,7 +869,7 @@ mod tests { let stmt = r#"SELECT date_trunc('day', ts)::text, - toolkit_experimental.candlestick_agg(ts, price, volume)::text + candlestick_agg(ts, price, volume)::text FROM ( VALUES ('2022-08-01 00:00:00+00'::timestamptz, 1.0, 1.0), ('2022-08-01 06:00:00+00'::timestamptz, 2.0, 1.0), @@ -1043,41 +892,6 @@ mod tests { }); } - #[pg_test] - fn ohlc_strictly_decreasing() { - Spi::execute(|client| { - client.select("SET timezone TO 'UTC'", None, None); - client.select("CREATE TABLE test(ts TIMESTAMPTZ, price FLOAT)", None, None); - client.select( - r#"INSERT INTO test VALUES - ('2022-08-01 00:00:00+00', 5.0), - ('2022-08-01 06:00:00+00', 4.0), - ('2022-08-01 12:00:00+00', 3.0), - ('2022-08-01 18:00:00+00', 2.0), - ('2022-08-01 23:59:59+00', 1.0) - "#, - None, - None, - ); - - let stmt = "SELECT date_trunc('day', ts)::text \ - , toolkit_experimental.ohlc(ts, price)::text \ - FROM test \ - GROUP BY 1"; - - let expected = "(\ - version:1,\ - open:(ts:\"2022-08-01 00:00:00+00\",val:5),\ - high:(ts:\"2022-08-01 00:00:00+00\",val:5),\ - low:(ts:\"2022-08-01 23:59:59+00\",val:1),\ - close:(ts:\"2022-08-01 23:59:59+00\",val:1),\ - volume:Missing()\ - )"; - let (_, output) = select_two!(client, stmt, &str, &str); - assert_eq!(expected, output.unwrap()); - }); - } - #[pg_test] fn candlestick_agg_strictly_decreasing() { Spi::execute(|client| { @@ -1085,7 +899,7 @@ mod tests { let stmt = r#"SELECT date_trunc('day', ts)::text, - toolkit_experimental.candlestick_agg(ts, price, volume)::text + candlestick_agg(ts, price, volume)::text FROM ( VALUES ('2022-08-01 00:00:00+00'::timestamptz, 5.0, 1.0), ('2022-08-01 06:00:00+00'::timestamptz, 4.0, 1.0), @@ -1108,47 +922,6 @@ mod tests { }); } - #[pg_test] - fn ohlc_oscillating() { - Spi::execute(|client| { - client.select("SET timezone TO 'UTC'", None, None); - client.select("CREATE TABLE test(ts TIMESTAMPTZ, price FLOAT)", None, None); - client.select( - r#"INSERT INTO test VALUES - ('2022-08-01 00:00:00+00', 3.0), - ('2022-08-01 02:00:00+00', 4.0), - ('2022-08-01 04:00:00+00', 11.0), - ('2022-08-01 06:00:00+00', 5.0), - ('2022-08-01 08:00:00+00', 2.0), - ('2022-08-01 10:00:00+00', 1.0), - ('2022-08-01 12:00:00+00', 12.0), - ('2022-08-01 14:00:00+00', 9.0), - ('2022-08-01 16:00:00+00', 10.0), - ('2022-08-01 18:00:00+00', 7.0), - ('2022-08-01 20:00:00+00', 6.0), - ('2022-08-01 22:00:00+00', 8.0) - "#, - None, - None, - ); - let stmt = "SELECT date_trunc('day', ts)::text \ - , toolkit_experimental.ohlc(ts, price)::text \ - FROM test \ - GROUP BY 1"; - - let expected = "(\ - version:1,\ - open:(ts:\"2022-08-01 00:00:00+00\",val:3),\ - high:(ts:\"2022-08-01 12:00:00+00\",val:12),\ - low:(ts:\"2022-08-01 10:00:00+00\",val:1),\ - close:(ts:\"2022-08-01 22:00:00+00\",val:8),\ - volume:Missing()\ - )"; - let (_, output) = select_two!(client, stmt, &str, &str); - assert_eq!(expected, output.unwrap()); - }); - } - #[pg_test] fn candlestick_agg_oscillating() { Spi::execute(|client| { @@ -1156,7 +929,7 @@ mod tests { let stmt = r#"SELECT date_trunc('day', ts)::text, - toolkit_experimental.candlestick_agg(ts, price, volume)::text + candlestick_agg(ts, price, volume)::text FROM ( VALUES ('2022-08-01 00:00:00+00'::timestamptz, 3.0, 1.0), ('2022-08-01 02:00:00+00'::timestamptz, 4.0, 1.0), @@ -1186,51 +959,6 @@ mod tests { }); } - #[pg_test] - fn ohlc_rollup() { - Spi::execute(|client| { - client.select("SET timezone TO 'UTC'", None, None); - client.select("CREATE TABLE test(ts TIMESTAMPTZ, price FLOAT)", None, None); - client.select( - r#"INSERT INTO test VALUES - ('2022-08-01 00:00:00+00', 0.0), - ('2022-08-01 06:00:00+00', 1.0), - ('2022-08-01 12:00:00+00', 2.0), - ('2022-08-01 18:00:00+00', 3.0), - ('2022-08-01 23:59:59+00', 4.0), - ('2022-08-02 06:00:00+00', 5.0), - ('2022-08-02 12:00:00+00', 6.0), - ('2022-08-02 18:00:00+00', 7.0), - ('2022-08-02 23:59:59+00', 8.0) - "#, - None, - None, - ); - - let stmt = "WITH t AS (\ - SELECT date_trunc('day', ts) as date\ - , toolkit_experimental.ohlc(ts, price) \ - FROM test \ - GROUP BY 1\ - ) \ - SELECT date_trunc('month', date)::text \ - , toolkit_experimental.rollup(ohlc)::text \ - FROM t \ - GROUP BY 1"; - - let expected = "(\ - version:1,\ - open:(ts:\"2022-08-01 00:00:00+00\",val:0),\ - high:(ts:\"2022-08-02 23:59:59+00\",val:8),\ - low:(ts:\"2022-08-01 00:00:00+00\",val:0),\ - close:(ts:\"2022-08-02 23:59:59+00\",val:8),\ - volume:Missing()\ - )"; - let (_, output) = select_two!(client, stmt, &str, &str); - assert_eq!(expected, output.unwrap()); - }); - } - #[pg_test] fn candlestick_rollup() { Spi::execute(|client| { @@ -1238,14 +966,14 @@ mod tests { let stmt = r#"WITH t AS ( SELECT - toolkit_experimental.candlestick(ts, open, high, low, close, volume) AS candlestick + candlestick(ts, open, high, low, close, volume) AS candlestick FROM ( VALUES ('2022-08-01 00:00:00+00'::timestamptz, 0.0, 4.0, 0.0, 4.0, 5.0), ('2022-08-02 00:00:00+00'::timestamptz, 5.0, 8.0, 5.0, 8.0, 4.0) ) AS v(ts, open, high, low, close, volume) ) SELECT - toolkit_experimental.rollup(candlestick)::text + rollup(candlestick)::text FROM t"#; let expected = "(\ @@ -1270,7 +998,7 @@ mod tests { let stmt = r#"WITH t AS ( SELECT date_trunc('day', ts) AS date, - toolkit_experimental.candlestick_agg(ts, price, volume) AS candlestick + candlestick_agg(ts, price, volume) AS candlestick FROM ( VALUES ('2022-08-01 00:00:00+00'::timestamptz, 0.0, 1.0), ('2022-08-01 06:00:00+00'::timestamptz, 1.0, 1.0), @@ -1286,7 +1014,7 @@ mod tests { ) SELECT date_trunc('month', date)::text, - toolkit_experimental.rollup(candlestick)::text + rollup(candlestick)::text FROM t GROUP BY 1"#; @@ -1302,4 +1030,31 @@ mod tests { assert_eq!(expected, output.unwrap()); }); } + + #[pg_test] + fn candlestick_byte_io() { + let state = tick_data_transition_inner( + None, + Some(100.into()), + Some(10.0), + Some(1.0), + ptr::null_mut(), + ); + let state = tick_data_transition_inner( + state, + Some(200.into()), + Some(1.0), + Some(2.0), + ptr::null_mut(), + ); + + let output_buffer = state.unwrap().to_pg_bytes(); + let expected = [ + 128, 1, 0, 0, 1, 0, 0, 0, 100, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 36, 64, 100, 0, + 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 36, 64, 200, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, + 240, 63, 200, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 240, 63, 2, 0, 0, 0, 0, 0, 0, 0, + 0, 0, 0, 0, 0, 0, 8, 64, 0, 0, 0, 0, 0, 0, 40, 64, + ]; + assert_eq!(*output_buffer, expected); + } } diff --git a/extension/src/lib.rs b/extension/src/lib.rs index 62683dcd6..2af617f51 100644 --- a/extension/src/lib.rs +++ b/extension/src/lib.rs @@ -13,6 +13,7 @@ pub mod accessors; pub mod asap; +pub mod candlestick; pub mod counter_agg; pub mod countminsketch; pub mod frequency; @@ -21,7 +22,6 @@ pub mod heartbeat_agg; pub mod hyperloglog; pub mod lttb; pub mod nmost; -pub mod ohlc; pub mod range; pub mod saturation; pub mod state_aggregate; diff --git a/extension/src/stabilization_info.rs b/extension/src/stabilization_info.rs index a0954fbfc..5259d8948 100644 --- a/extension/src/stabilization_info.rs +++ b/extension/src/stabilization_info.rs @@ -15,6 +15,60 @@ crate::functions_stabilized_at! { interpolated_average(timeweightsummary,timestamp with time zone,interval,timeweightsummary,timeweightsummary), interpolated_delta(countersummary,timestamp with time zone,interval,countersummary,countersummary), interpolated_rate(countersummary,timestamp with time zone,interval,countersummary,countersummary), + accessorclose_in(cstring), + accessorclose_out(accessorclose), + accessorclosetime_in(cstring), + accessorclosetime_out(accessorclosetime), + accessorhigh_in(cstring), + accessorhigh_out(accessorhigh), + accessorhightime_in(cstring), + accessorhightime_out(accessorhightime), + accessorlow_in(cstring), + accessorlow_out(accessorlow), + accessorlowtime_in(cstring), + accessorlowtime_out(accessorlowtime), + accessoropen_in(cstring), + accessoropen_out(accessoropen), + accessoropentime_in(cstring), + accessoropentime_out(accessoropentime), + arrow_close(candlestick,accessorclose), + arrow_close_time(candlestick,accessorclosetime), + arrow_high(candlestick,accessorhigh), + arrow_high_time(candlestick,accessorhightime), + arrow_low(candlestick,accessorlow), + arrow_low_time(candlestick,accessorlowtime), + arrow_open(candlestick,accessoropen), + arrow_open_time(candlestick,accessoropentime), + candlestick(timestamp with time zone, double precision,double precision, double precision, double precision, double precision), + candlestick_agg(timestamp with time zone,double precision,double precision), + candlestick_combine(internal,internal), + candlestick_deserialize(bytea,internal), + candlestick_final(internal), + candlestick_in(cstring), + candlestick_out(candlestick), + candlestick_rollup_trans(internal,candlestick), + candlestick_serialize(internal), + open(), + open_time(), + close(candlestick), + close(), + close_time(candlestick), + close_time(), + high(candlestick), + high(), + high_time(candlestick), + high_time(), + low(candlestick), + low(), + low_time(candlestick), + low_time(), + open(candlestick), + open_time(candlestick), + rollup(candlestick), + tick_data_no_vol_transition(internal,timestamp with time zone,double precision), + tick_data_transition(internal,timestamp with time zone,double precision,double precision), + volume(candlestick), + vwap(candlestick), } "1.12.0" => { stats1d_tf_inv_trans(internal,double precision), @@ -438,6 +492,17 @@ crate::functions_stabilized_at! { crate::types_stabilized_at! { STABLE_TYPES + "1.14.0" => { + candlestick, + accessorclose, + accessorclosetime, + accessorhigh, + accessorhightime, + accessorlow, + accessorlowtime, + accessoropen, + accessoropentime, + } "1.11.0" => { accessorfirsttime, accessorfirstval, @@ -516,6 +581,16 @@ crate::types_stabilized_at! { crate::operators_stabilized_at! { STABLE_OPERATORS + "1.14.0" => { + "->"(candlestick,accessorclose), + "->"(candlestick,accessorclosetime), + "->"(candlestick,accessorhigh), + "->"(candlestick,accessorhightime), + "->"(candlestick,accessorlow), + "->"(candlestick,accessorlowtime), + "->"(candlestick,accessoropen), + "->"(candlestick,accessoropentime), + } "1.11.0" => { "->"(countersummary,accessorfirsttime), "->"(countersummary,accessorfirstval), diff --git a/tests/update/candlestick.md b/tests/update/candlestick.md new file mode 100644 index 000000000..c86c56c5d --- /dev/null +++ b/tests/update/candlestick.md @@ -0,0 +1,44 @@ +# Candlestick Tests + +## Get candlestick values from tick data + + +```sql,creation,min-toolkit-version=1.14.0 +CREATE TABLE stocks_real_time(time TIMESTAMPTZ, symbol TEXT, price DOUBLE PRECISION,day_volume DOUBLE PRECISION); +INSERT INTO stocks_real_time VALUES + ('2023-01-11', 'AAPL', 133.445,10), + ('2023-01-11', 'PFE', 47.38,2), + ('2023-01-11', 'AMZN', 95.225,1), + ('2023-01-11', 'INTC', 29.82,NULL), + ('2023-01-11', 'MSFT', 235.5,100), + ('2023-01-11', 'TSLA', 123.085,NULL), + ('2023-01-11', 'AAPL', 133.44,20); + +CREATE MATERIALIZED VIEW candlestick AS + SELECT symbol, + candlestick_agg("time", price, day_volume) AS candlestick + FROM stocks_real_time + GROUP BY symbol; +``` + +```sql,validation,min-toolkit-version=1.14.0 +SELECT + symbol, + open(candlestick), + high(candlestick), + low(candlestick), + close(candlestick), + volume(candlestick) +FROM cs; +``` + +```output + symbol | open | high | low | close | volume +--------+---------+---------+---------+---------+-------- + PFE | 47.38 | 47.38 | 47.38 | 47.38 | 2 + AMZN | 95.225 | 95.225 | 95.225 | 95.225 | 1 + MSFT | 235.5 | 235.5 | 235.5 | 235.5 | 100 + AAPL | 133.445 | 133.445 | 133.44 | 133.445 | 30 + TSLA | 123.085 | 123.085 | 123.085 | 123.085 | NULL + INTC | 29.82 | 29.82 | 29.82 | 29.82 | NULL + ``` diff --git a/tools/update-tester/src/testrunner.rs b/tools/update-tester/src/testrunner.rs index 0900bc7fe..33bfa6a2e 100644 --- a/tools/update-tester/src/testrunner.rs +++ b/tools/update-tester/src/testrunner.rs @@ -257,22 +257,15 @@ impl TestClient { .simple_query(&drop) .unwrap_or_else(|e| panic!("could not drop db {} due to {}", test_db_name, e)); let locale_flags = { - #[cfg(target_os = "macos")] - { - "LC_COLLATE 'C' LC_CTYPE 'UTF-8'" - } - #[cfg(not(target_os = "macos"))] - { - match std::process::Command::new("locale").arg("-a").output() { - Ok(cmd) - if String::from_utf8_lossy(&cmd.stdout) - .lines() - .any(|l| l == "C.UTF-8") => - { - "LC_COLLATE 'C.UTF-8' LC_CTYPE 'C.UTF-8'" - } - _ => "LC_COLLATE 'C' LC_CTYPE 'C'", + match std::process::Command::new("locale").arg("-a").output() { + Ok(cmd) + if String::from_utf8_lossy(&cmd.stdout) + .lines() + .any(|l| l == "C.UTF-8") => + { + "LC_COLLATE 'C.UTF-8' LC_CTYPE 'C.UTF-8'" } + _ => "LC_COLLATE 'C' LC_CTYPE 'C'", } }; let create = format!(r#"CREATE DATABASE "{}" {}"#, test_db_name, locale_flags,);