Skip to content

Commit

Permalink
Add EmbeddedIon
Browse files Browse the repository at this point in the history
  • Loading branch information
jpschorr committed Nov 16, 2024
1 parent 0430c49 commit 9400af5
Show file tree
Hide file tree
Showing 17 changed files with 637 additions and 101 deletions.
4 changes: 3 additions & 1 deletion extension/partiql-extension-ion/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,14 @@ unicase = "2.7"
rust_decimal = { version = "1.36.0", default-features = false, features = ["std"] }
rust_decimal_macros = "1.36"
ion-rs_old = { version = "0.18", package = "ion-rs" }
ion-rs = { version = "1.0.0-rc.7", features = ["experimental"] }
#ion-rs = { version = "1.0.0-rc.8", features = ["experimental"] }
ion-rs = { version = "1.0.0-rc.8", features = ["experimental"], path = "../../../ion-rust" }
time = { version = "0.3", features = ["macros"] }
once_cell = "1"
regex = "1.10"
thiserror = "1.0"
delegate = "0.13"
peekmore = "1.3"

typetag = "0.2"

Expand Down
333 changes: 333 additions & 0 deletions extension/partiql-extension-ion/src/embedded.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,333 @@
use core::fmt;
use delegate::delegate;
use ion_rs::{
AnyEncoding, Element, ElementReader, IonDataSource, IonInput, IonResult, IonSlice, Reader,
Sequence,
};
use ion_rs_old::IonReader;
use partiql_common::pretty::{pretty_surrounded_doc, PrettyDoc};
use partiql_value::datum::Datum;
use partiql_value::embedded_document::{
DynEmbeddedTypeTag, EmbeddedDocResult, EmbeddedDocValueIntoIterator, EmbeddedDocument,
EmbeddedDocumentType,
};
use partiql_value::{EmbeddedDoc, Value, ValueIntoIterator};
use peekmore::{PeekMore, PeekMoreIterator};
#[cfg(feature = "serde")]
use serde::{Deserialize, Serialize};
use std::cell::RefCell;
use std::collections::VecDeque;
use std::fmt::{Debug, Formatter};
use std::hash::{Hash, Hasher};
use std::iter::Peekable;
use std::ops::{Deref, DerefMut};
use std::rc::Rc;
use std::slice;
use std::slice::Iter;
use std::sync::Arc;

pub struct ElementIterator<R: ElementReader> {
reader: R,
}

impl<R: ElementReader> Iterator for ElementIterator<R> {
type Item = IonResult<Element>;

fn next(&mut self) -> Option<Self::Item> {
self.reader.read_next_element().transpose()
}
}

struct IonContext {
reader: PeekMoreIterator<ElementIterator<Reader<AnyEncoding, Vec<u8>>>>,
}

impl IonContext {
pub fn new(data: Vec<u8>) -> IonResult<Self> {
let reader = Reader::new(AnyEncoding, data)?;
let reader = ElementIterator { reader }.peekmore();
Ok(Self { reader })
}
}

struct EmbeddedIon {
ctx: Rc<RefCell<IonContext>>,
doc_type: RefCell<EmbeddedDocType>,
}

impl Debug for EmbeddedIon {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.debug_tuple("EmbeddedIon").field(&self.doc_type).finish()
}
}

impl EmbeddedIon {
pub fn new(data: Vec<u8>, expected: EmbeddedDocStreamType) -> IonResult<Self> {
let ctx = Rc::new(RefCell::new(IonContext::new(data)?));
Ok(Self {
ctx,
doc_type: RefCell::new(EmbeddedDocType::Unexamined(expected)),
})
}

pub fn new_unknown(data: Vec<u8>) -> IonResult<Self> {
Self::new(data, EmbeddedDocStreamType::Unknown)
}
pub fn new_tlv(data: Vec<u8>) -> IonResult<Self> {
Self::new(data, EmbeddedDocStreamType::SingleTLV)
}

pub fn new_stream(data: Vec<u8>) -> IonResult<Self> {
Self::new(data, EmbeddedDocStreamType::Stream)
}

pub fn child(&self, doc: EmbeddedDocType) -> EmbeddedIon {
let ctx = self.ctx.clone();
Self {
ctx,
doc_type: RefCell::new(doc),
}
}

#[inline]
fn force(&self) {
let stream_type =
if let EmbeddedDocType::Unexamined(expected) = self.doc_type.borrow().deref() {
match *expected {
EmbeddedDocStreamType::Unknown => {
let reader = &mut self.ctx.borrow_mut().reader;
if reader.peek_nth(1).is_some() {
EmbeddedDocStreamType::Stream
} else {
EmbeddedDocStreamType::SingleTLV
}
}
other => other,
}
} else {
return;
};

self.init_reader(stream_type);
}

fn init_reader(&self, expected: EmbeddedDocStreamType) {
let reader = &mut self.ctx.borrow_mut().reader;
let doc = match expected {
EmbeddedDocStreamType::Unknown => {
unreachable!("handled by `force`")
}
EmbeddedDocStreamType::Stream => EmbeddedDocType::Stream(),
EmbeddedDocStreamType::SingleTLV => {
let elt = reader.next().expect("ion value"); // TODO [EMBDOC]
let elt = elt.expect("ion element"); // TODO [EMBDOC]
if reader.peek().is_some() {
// TODO error on stream instead of TLV?
}

match elt.try_into_sequence() {
Err(elt) => EmbeddedDocType::Value(elt),
Ok(seq) => EmbeddedDocType::Sequence(SequenceIterator::from(seq)),
}
}
};

self.doc_type.replace(doc);
}

fn convert_to_iterable(mut self) -> Self {
self.force();

let EmbeddedIon { ctx, doc_type } = self;
let inner = doc_type.into_inner();

let inner = if let EmbeddedDocType::Value(elt) = inner {
match elt.try_into_sequence() {
Err(elt) => EmbeddedDocType::Value(elt),
Ok(seq) => EmbeddedDocType::Sequence(SequenceIterator::from(seq)),
}
} else {
inner
};

let doc_type = RefCell::new(inner);

EmbeddedIon { ctx, doc_type }
}

fn next(&mut self) -> Option<EmbeddedIon> {
let child = |elt: Option<Element>| {
elt.map(|elt| {
let doc = EmbeddedDocType::Value(elt);
let doc = self.child(doc);
doc
})
};
match self.doc_type.borrow_mut().deref_mut() {
EmbeddedDocType::Unexamined(_) => {
unreachable!("handled by `force`")
}
EmbeddedDocType::Value(elt) => {
todo!("{:?}", elt)
}
EmbeddedDocType::Stream() => {
let elt = self.ctx.borrow_mut().deref_mut().reader.next();
let elt = elt.transpose().expect("ion not error"); // TODO [EMBDOC]
child(elt)
}
EmbeddedDocType::Sequence(seq) => child(seq.next()),
}
}
}

#[derive(Debug, Copy, Clone)]
enum EmbeddedDocStreamType {
Unknown,
Stream,
SingleTLV,
}

#[derive(Debug)]
enum EmbeddedDocType {
Unexamined(EmbeddedDocStreamType),
Stream(),
Value(Element),
Sequence(SequenceIterator),
}

struct SequenceIterator {
elements: VecDeque<Element>,
}

impl SequenceIterator {}

impl From<Sequence> for SequenceIterator {
fn from(sequence: Sequence) -> Self {
let elements = sequence.into_iter().collect();
Self { elements }
}
}

impl Iterator for SequenceIterator {
type Item = Element;

fn next(&mut self) -> Option<Self::Item> {
self.elements.pop_front()
}
}

impl fmt::Debug for SequenceIterator {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_tuple("SequenceIterator")
.field(&self.elements)
.finish()
}
}

impl Datum for EmbeddedIon {
fn is_null(&self) -> bool {
self.force();
match self.doc_type.borrow().deref() {
EmbeddedDocType::Unexamined(_) => {
unreachable!("already forced")
}
EmbeddedDocType::Value(elt) => elt.is_null(),
EmbeddedDocType::Stream() => false,
EmbeddedDocType::Sequence(_) => false,
}
}

fn is_sequence(&self) -> bool {
self.force();
match self.doc_type.borrow().deref() {
EmbeddedDocType::Unexamined(_) => {
unreachable!("already forced")
}
EmbeddedDocType::Value(elt) => elt.as_sequence().is_some(),
EmbeddedDocType::Stream() => true,
EmbeddedDocType::Sequence(_) => true,
}
}

fn is_ordered(&self) -> bool {
self.force();
match self.doc_type.borrow().deref() {
EmbeddedDocType::Unexamined(_) => {
unreachable!("already forced")
}
EmbeddedDocType::Value(_) => false,
EmbeddedDocType::Stream() => false, // TODO [EMBDOC] is a top-level stream ordered?
EmbeddedDocType::Sequence(_) => true,
}
}
}

impl IntoIterator for EmbeddedIon {
type Item = EmbeddedIon;
type IntoIter = EmbeddedIonIterator;

fn into_iter(mut self) -> Self::IntoIter {
EmbeddedIonIterator::new(self.convert_to_iterable())
}
}
struct EmbeddedIonIterator {
ion: EmbeddedIon,
}

impl Iterator for EmbeddedIonIterator {
type Item = EmbeddedIon;

fn next(&mut self) -> Option<Self::Item> {
self.ion.next()
}
}


impl EmbeddedIonIterator {
fn new(ion: EmbeddedIon) -> EmbeddedIonIterator {
Self { ion }
}
}

#[cfg(test)]
mod tests {
use super::*;
use ion_rs::{ElementReader, WriteAsIon};
use std::io::Read;

fn flatten_dump(doc: EmbeddedIon) {
if doc.is_sequence() {
for c in doc {
flatten_dump(c)
}
} else {
println!("{:?}", doc);
}
}

fn dump(data: Vec<u8>, expected_embedded_doc_type: EmbeddedDocStreamType) {
println!("\n===========\n");

let doc = EmbeddedIon::new(data, expected_embedded_doc_type).expect("embedded ion create");
/*
if doc.is_sequence() {
for c in doc {
println!("{:?}", c);
}
}
*/
flatten_dump(doc);
}

#[test]
fn simple() {
let one_elt: Vec<u8> =
"[0, {a: 1, b:2, c: [], d: foo::(SYMBOL 3 2 1 {})}, [1,2,3,4]]".into();
let stream: Vec<u8> = "0 {a: 1, b:2, c: [], d: foo::(SYMBOL 3 2 1 {})} [1,2,3,4]".into();

dump(one_elt.clone(), EmbeddedDocStreamType::SingleTLV);
dump(one_elt, EmbeddedDocStreamType::Unknown);
dump(stream.clone(), EmbeddedDocStreamType::Stream);
dump(stream, EmbeddedDocStreamType::Unknown);
}
}
37 changes: 3 additions & 34 deletions extension/partiql-extension-ion/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,44 +6,13 @@ use serde::{Deserialize, Serialize};

mod common;
pub mod decode;
//pub mod embedded;

pub mod embedded;
pub mod encode;

pub use common::Encoding;

pub mod embedded {
use partiql_common::embedded_document::{EmbeddedDocument, EmbeddedDocumentType};
use std::fmt::{Debug, Formatter};
use std::hash::Hasher;

#[cfg(feature = "serde")]
use serde::{Deserialize, Serialize};

#[derive(Default, Copy, Clone)]
pub struct EmbeddedIonType {}
impl EmbeddedDocumentType for EmbeddedIonType {
type Doc = EmbeddedIon;

fn construct(&self, bytes: &[u8]) -> Self::Doc {
EmbeddedIon::Unparsed(bytes.into())
}
}

#[derive(Hash, Clone)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
pub enum EmbeddedIon {
Unparsed(Vec<u8>),
}

impl Debug for EmbeddedIon {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
todo!()
}
}

#[cfg_attr(feature = "serde", typetag::serde)]
impl EmbeddedDocument for EmbeddedIon {}
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
Loading

0 comments on commit 9400af5

Please sign in to comment.