Skip to content

Commit

Permalink
Rework iteration for EmbeddedIon
Browse files Browse the repository at this point in the history
  • Loading branch information
jpschorr committed Nov 16, 2024
1 parent 9400af5 commit aceac05
Showing 1 changed file with 69 additions and 116 deletions.
185 changes: 69 additions & 116 deletions extension/partiql-extension-ion/src/embedded.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
use core::fmt;
use delegate::delegate;
use ion_rs::{
AnyEncoding, Element, ElementReader, IonDataSource, IonInput, IonResult, IonSlice, Reader,
Sequence,
AnyEncoding, Element, ElementReader, IonDataSource, IonInput, IonResult, IonSlice, IonType,
OwnedSequenceIterator, Reader, Sequence,
};
use ion_rs_old::IonReader;
use ion_rs_old::{IonError, IonReader};
use partiql_common::pretty::{pretty_surrounded_doc, PrettyDoc};
use partiql_value::datum::Datum;
use partiql_value::embedded_document::{
Expand All @@ -26,6 +26,22 @@ use std::slice;
use std::slice::Iter;
use std::sync::Arc;

use thiserror::Error;

/// Errors in boxed Ion.
///
/// ### Notes
/// This is marked `#[non_exhaustive]`, to reserve the right to add more variants in the future.
#[derive(Error, Debug, Clone, PartialEq)]
#[non_exhaustive]
pub enum BoxedIonError {
/// Ion Writer error.
#[error("Expected a sequence, but was `{elt}`")]
NotASequence { elt: Element },
}

pub type Result<T> = std::result::Result<T, BoxedIonError>;

pub struct ElementIterator<R: ElementReader> {
reader: R,
}
Expand All @@ -50,8 +66,10 @@ impl IonContext {
}
}

pub type IonContextPtr = Rc<RefCell<IonContext>>;

struct EmbeddedIon {
ctx: Rc<RefCell<IonContext>>,
ctx: IonContextPtr,
doc_type: RefCell<EmbeddedDocType>,
}

Expand All @@ -62,31 +80,25 @@ impl Debug for EmbeddedIon {
}

impl EmbeddedIon {
pub fn new(data: Vec<u8>, expected: EmbeddedDocStreamType) -> IonResult<Self> {
let ctx = Rc::new(RefCell::new(IonContext::new(data)?));
Ok(Self {
ctx,
doc_type: RefCell::new(EmbeddedDocType::Unexamined(expected)),
})
pub fn new(doc: EmbeddedDocType, ctx: IonContextPtr) -> Self {
let doc_type = RefCell::new(doc);
Self { ctx, doc_type }
}

pub fn new_unknown(data: Vec<u8>) -> IonResult<Self> {
Self::new(data, EmbeddedDocStreamType::Unknown)
}
pub fn new_tlv(data: Vec<u8>) -> IonResult<Self> {
Self::new(data, EmbeddedDocStreamType::SingleTLV)
pub fn parse(data: Vec<u8>, expected: EmbeddedDocStreamType) -> IonResult<Self> {
let ctx = Rc::new(RefCell::new(IonContext::new(data)?));
Ok(Self::new(EmbeddedDocType::Unexamined(expected), ctx))
}

pub fn new_stream(data: Vec<u8>) -> IonResult<Self> {
Self::new(data, EmbeddedDocStreamType::Stream)
pub fn parse_unknown(data: Vec<u8>) -> IonResult<Self> {
Self::parse(data, EmbeddedDocStreamType::Unknown)
}
pub fn parse_tlv(data: Vec<u8>) -> IonResult<Self> {
Self::parse(data, EmbeddedDocStreamType::SingleTLV)
}

pub fn child(&self, doc: EmbeddedDocType) -> EmbeddedIon {
let ctx = self.ctx.clone();
Self {
ctx,
doc_type: RefCell::new(doc),
}
pub fn parse_stream(data: Vec<u8>) -> IonResult<Self> {
Self::parse(data, EmbeddedDocStreamType::Stream)
}

#[inline]
Expand Down Expand Up @@ -127,56 +139,31 @@ impl EmbeddedIon {

match elt.try_into_sequence() {
Err(elt) => EmbeddedDocType::Value(elt),
Ok(seq) => EmbeddedDocType::Sequence(SequenceIterator::from(seq)),
Ok(seq) => EmbeddedDocType::Sequence(seq.into_iter()),
}
}
};

self.doc_type.replace(doc);
}

fn convert_to_iterable(mut self) -> Self {
fn try_into_iter(mut self) -> Result<EmbeddedIonIterator> {
self.force();

let EmbeddedIon { ctx, doc_type } = self;
let inner = doc_type.into_inner();

let inner = if let EmbeddedDocType::Value(elt) = inner {
match elt.try_into_sequence() {
Err(elt) => EmbeddedDocType::Value(elt),
Ok(seq) => EmbeddedDocType::Sequence(SequenceIterator::from(seq)),
}
} else {
inner
};

let doc_type = RefCell::new(inner);

EmbeddedIon { ctx, doc_type }
}

fn next(&mut self) -> Option<EmbeddedIon> {
let child = |elt: Option<Element>| {
elt.map(|elt| {
let doc = EmbeddedDocType::Value(elt);
let doc = self.child(doc);
doc
})
};
match self.doc_type.borrow_mut().deref_mut() {
EmbeddedDocType::Unexamined(_) => {
unreachable!("handled by `force`")
}
EmbeddedDocType::Value(elt) => {
todo!("{:?}", elt)
}
EmbeddedDocType::Stream() => {
let elt = self.ctx.borrow_mut().deref_mut().reader.next();
let elt = elt.transpose().expect("ion not error"); // TODO [EMBDOC]
child(elt)
}
EmbeddedDocType::Sequence(seq) => child(seq.next()),
let inner = match doc_type.into_inner() {
EmbeddedDocType::Unexamined(_) => unreachable!("handled by `force`"),
EmbeddedDocType::Stream() => EmbeddedIterType::Stream(),
EmbeddedDocType::Value(elt) => match elt.try_into_sequence() {
Err(elt) => return Err(BoxedIonError::NotASequence { elt }),
Ok(seq) => EmbeddedIterType::Sequence(seq.into_iter()),
},
EmbeddedDocType::Sequence(seq) => EmbeddedIterType::Sequence(seq.into_iter()),
}
.into();

Ok(EmbeddedIonIterator { ctx, inner })
}
}

Expand All @@ -192,36 +179,7 @@ enum EmbeddedDocType {
Unexamined(EmbeddedDocStreamType),
Stream(),
Value(Element),
Sequence(SequenceIterator),
}

struct SequenceIterator {
elements: VecDeque<Element>,
}

impl SequenceIterator {}

impl From<Sequence> for SequenceIterator {
fn from(sequence: Sequence) -> Self {
let elements = sequence.into_iter().collect();
Self { elements }
}
}

impl Iterator for SequenceIterator {
type Item = Element;

fn next(&mut self) -> Option<Self::Item> {
self.elements.pop_front()
}
}

impl fmt::Debug for SequenceIterator {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_tuple("SequenceIterator")
.field(&self.elements)
.finish()
}
Sequence(OwnedSequenceIterator),
}

impl Datum for EmbeddedIon {
Expand Down Expand Up @@ -262,30 +220,30 @@ impl Datum for EmbeddedIon {
}
}

impl IntoIterator for EmbeddedIon {
type Item = EmbeddedIon;
type IntoIter = EmbeddedIonIterator;

fn into_iter(mut self) -> Self::IntoIter {
EmbeddedIonIterator::new(self.convert_to_iterable())
}
#[derive(Debug)]
enum EmbeddedIterType {
Stream(),
Sequence(OwnedSequenceIterator),
}

struct EmbeddedIonIterator {
ion: EmbeddedIon,
ctx: IonContextPtr,
inner: RefCell<EmbeddedIterType>,
}

impl Iterator for EmbeddedIonIterator {
type Item = EmbeddedIon;

fn next(&mut self) -> Option<Self::Item> {
self.ion.next()
}
}


impl EmbeddedIonIterator {
fn new(ion: EmbeddedIon) -> EmbeddedIonIterator {
Self { ion }
let elt = match self.inner.borrow_mut().deref_mut() {
EmbeddedIterType::Stream() => {
let elt = self.ctx.borrow_mut().deref_mut().reader.next();
let elt = elt.transpose().expect("ion not error"); // TODO [EMBDOC]
elt
}
EmbeddedIterType::Sequence(seq) => seq.next(),
};
elt.map(|elt| EmbeddedIon::new(EmbeddedDocType::Value(elt), self.ctx.clone()))
}
}

Expand All @@ -297,7 +255,7 @@ mod tests {

fn flatten_dump(doc: EmbeddedIon) {
if doc.is_sequence() {
for c in doc {
for c in doc.try_into_iter().expect("TODO [EMBDOC]") {
flatten_dump(c)
}
} else {
Expand All @@ -308,14 +266,9 @@ mod tests {
fn dump(data: Vec<u8>, expected_embedded_doc_type: EmbeddedDocStreamType) {
println!("\n===========\n");

let doc = EmbeddedIon::new(data, expected_embedded_doc_type).expect("embedded ion create");
/*
if doc.is_sequence() {
for c in doc {
println!("{:?}", c);
}
}
*/
let doc =
EmbeddedIon::parse(data, expected_embedded_doc_type).expect("embedded ion create");

flatten_dump(doc);
}

Expand Down

0 comments on commit aceac05

Please sign in to comment.