Skip to content

Commit

Permalink
Merge pull request #5167 from zhang2014/parallel_load
Browse files Browse the repository at this point in the history
feat(format): implement format trait
  • Loading branch information
BohuTANG authored May 8, 2022
2 parents 35003a9 + 7b46cad commit 2bd575e
Show file tree
Hide file tree
Showing 20 changed files with 1,290 additions and 60 deletions.
33 changes: 31 additions & 2 deletions common/datavalues/src/types/deserializations/array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,37 @@ impl TypeDeserializer for ArrayDeserializer {
}
let mut values = Vec::with_capacity(idx);
for _ in 0..idx {
let value = self.inner.pop_data_value().unwrap();
values.push(value);
values.push(self.inner.pop_data_value()?);
}
values.reverse();
self.builder.append_value(ArrayValue::new(values));
Ok(())
}

fn de_text_csv<R: BufferRead>(
&mut self,
reader: &mut CheckpointReader<R>,
format: &FormatSettings,
) -> Result<()> {
reader.must_ignore_byte(b'[')?;
let mut idx = 0;
loop {
let _ = reader.ignore_white_spaces()?;
if let Ok(res) = reader.ignore_byte(b']') {
if res {
break;
}
}
if idx != 0 {
let _ = reader.must_ignore_byte(b',')?;
}
let _ = reader.ignore_white_spaces()?;
self.inner.de_text_csv(reader, format)?;
idx += 1;
}
let mut values = Vec::with_capacity(idx);
for _ in 0..idx {
values.push(self.inner.pop_data_value()?);
}
values.reverse();
self.builder.append_value(ArrayValue::new(values));
Expand Down
16 changes: 8 additions & 8 deletions common/datavalues/src/types/deserializations/boolean.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,14 @@ impl TypeDeserializer for BooleanDeserializer {
Ok(())
}

fn de_json(&mut self, value: &serde_json::Value, _format: &FormatSettings) -> Result<()> {
match value {
serde_json::Value::Bool(v) => self.builder.append_value(*v),
_ => return Err(ErrorCode::BadBytes("Incorrect boolean value")),
}
Ok(())
}

fn de_whole_text(&mut self, reader: &[u8], _format: &FormatSettings) -> Result<()> {
if reader.eq_ignore_ascii_case(b"true") {
self.builder.append_value(true);
Expand Down Expand Up @@ -77,14 +85,6 @@ impl TypeDeserializer for BooleanDeserializer {
Ok(())
}

fn de_json(&mut self, value: &serde_json::Value, _format: &FormatSettings) -> Result<()> {
match value {
serde_json::Value::Bool(v) => self.builder.append_value(*v),
_ => return Err(ErrorCode::BadBytes("Incorrect boolean value")),
}
Ok(())
}

fn append_data_value(&mut self, value: DataValue, _format: &FormatSettings) -> Result<()> {
self.builder.append_value(value.as_bool()?);
Ok(())
Expand Down
25 changes: 23 additions & 2 deletions common/datavalues/src/types/deserializations/number.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,10 @@ where
}
}

fn de_null(&mut self, _format: &FormatSettings) -> bool {
false
}

fn de_whole_text(&mut self, reader: &[u8], _format: &FormatSettings) -> Result<()> {
let mut reader = BufferReader::new(reader);
let v: T = if !T::FLOATING {
Expand Down Expand Up @@ -98,8 +102,25 @@ where
Ok(())
}

fn de_null(&mut self, _format: &FormatSettings) -> bool {
false
fn de_text_csv<R: BufferRead>(
&mut self,
reader: &mut CheckpointReader<R>,
_settings: &FormatSettings,
) -> Result<()> {
let maybe_quote = reader.ignore(|f| f == b'\'' || f == b'"')?;

let v: T = if !T::FLOATING {
reader.read_int_text()
} else {
reader.read_float_text()
}?;

if maybe_quote {
reader.must_ignore(|f| f == b'\'' || f == b'"')?;
}

self.builder.append_value(v);
Ok(())
}

fn append_data_value(&mut self, value: DataValue, _format: &FormatSettings) -> Result<()> {
Expand Down
153 changes: 144 additions & 9 deletions common/datavalues/src/types/deserializations/string.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,33 +79,168 @@ impl TypeDeserializer for StringDeserializer {
}
}

fn de_text_quoted<R: BufferRead>(
fn de_whole_text(&mut self, reader: &[u8], _format: &FormatSettings) -> Result<()> {
self.builder.append_value(reader);
Ok(())
}

fn de_text<R: BufferRead>(
&mut self,
reader: &mut CheckpointReader<R>,
_format: &FormatSettings,
) -> Result<()> {
self.buffer.clear();
reader.read_quoted_text(&mut self.buffer, b'\'')?;
reader.read_escaped_string_text(&mut self.buffer)?;
self.builder.append_value(self.buffer.as_slice());
Ok(())
}

fn de_whole_text(&mut self, reader: &[u8], _format: &FormatSettings) -> Result<()> {
self.builder.append_value(reader);
Ok(())
}

fn de_text<R: BufferRead>(
fn de_text_quoted<R: BufferRead>(
&mut self,
reader: &mut CheckpointReader<R>,
_format: &FormatSettings,
) -> Result<()> {
self.buffer.clear();
reader.read_escaped_string_text(&mut self.buffer)?;
reader.read_quoted_text(&mut self.buffer, b'\'')?;
self.builder.append_value(self.buffer.as_slice());
Ok(())
}

fn de_text_csv<R: BufferRead>(
&mut self,
reader: &mut CheckpointReader<R>,
settings: &FormatSettings,
) -> Result<()> {
let mut read_buffer = reader.fill_buf()?;

if read_buffer.is_empty() {
return Err(ErrorCode::BadBytes("Read string after eof."));
}

let maybe_quote = read_buffer[0];
if maybe_quote == b'\'' || maybe_quote == b'"' {
let mut index = 1;
let mut bytes = 0;

loop {
let begin = index;
while index < read_buffer.len() {
if read_buffer[index] == maybe_quote {
self.builder
.values_mut()
.extend_from_slice(&read_buffer[begin..index]);
self.builder.add_offset(bytes + index - begin);
reader.consume(index + 1);
return Ok(());
}

index += 1;
}

bytes += index - begin;
self.builder
.values_mut()
.extend_from_slice(&read_buffer[begin..]);
reader.consume(index - begin);

index = 0;
read_buffer = reader.fill_buf()?;

if read_buffer.is_empty() {
break;
}
}

Err(ErrorCode::BadBytes(format!(
"Not found '{}' before eof in parse string.",
maybe_quote as char
)))
} else {
// Unquoted case. Look for field_delimiter or record_delimiter.
let mut field_delimiter = b',';

if !settings.field_delimiter.is_empty() {
field_delimiter = settings.field_delimiter[0];
}

if settings.record_delimiter.is_empty()
|| settings.record_delimiter[0] == b'\r'
|| settings.record_delimiter[0] == b'\n'
{
let mut index = 0;
let mut bytes = 0;

'outer1: loop {
while index < read_buffer.len() {
if read_buffer[index] == field_delimiter
|| read_buffer[index] == b'\r'
|| read_buffer[index] == b'\n'
{
break 'outer1;
}
index += 1;
}

bytes += index;
self.builder
.values_mut()
.extend_from_slice(&read_buffer[..index]);
reader.consume(index);

index = 0;
read_buffer = reader.fill_buf()?;

if read_buffer.is_empty() {
break 'outer1;
}
}

self.builder
.values_mut()
.extend_from_slice(&read_buffer[..index]);
self.builder.add_offset(bytes + index);
reader.consume(index);
} else {
let record_delimiter = settings.record_delimiter[0];

let mut index = 0;
let mut bytes = 0;

'outer2: loop {
while index < read_buffer.len() {
if read_buffer[index] == field_delimiter
|| read_buffer[index] == record_delimiter
{
break 'outer2;
}
index += 1;
}

bytes += index;
self.builder
.values_mut()
.extend_from_slice(&read_buffer[..index]);
reader.consume(index);

index = 0;
read_buffer = reader.fill_buf()?;

if read_buffer.is_empty() {
break 'outer2;
}
}

self.builder
.values_mut()
.extend_from_slice(&read_buffer[..index]);
self.builder.add_offset(bytes + index);
reader.consume(index);
}

Ok(())
}
}

fn append_data_value(&mut self, value: DataValue, _format: &FormatSettings) -> Result<()> {
self.builder.append_data_value(value)
}
Expand Down
22 changes: 18 additions & 4 deletions common/datavalues/src/types/deserializations/variant.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,12 @@ impl TypeDeserializer for VariantDeserializer {
Ok(())
}

fn de_whole_text(&mut self, reader: &[u8], _format: &FormatSettings) -> Result<()> {
let val = serde_json::from_slice(reader)?;
self.builder.append_value(val);
Ok(())
}

fn de_text<R: BufferRead>(
&mut self,
reader: &mut CheckpointReader<R>,
Expand All @@ -87,19 +93,27 @@ impl TypeDeserializer for VariantDeserializer {
Ok(())
}

fn de_whole_text(&mut self, reader: &[u8], _format: &FormatSettings) -> Result<()> {
let val = serde_json::from_slice(reader)?;
fn de_text_quoted<R: BufferRead>(
&mut self,
reader: &mut CheckpointReader<R>,
_format: &FormatSettings,
) -> Result<()> {
self.buffer.clear();
reader.read_quoted_text(&mut self.buffer, b'\'')?;

let val = serde_json::from_slice(self.buffer.as_slice())?;

self.builder.append_value(val);
Ok(())
}

fn de_text_quoted<R: BufferRead>(
fn de_text_csv<R: BufferRead>(
&mut self,
reader: &mut CheckpointReader<R>,
_format: &FormatSettings,
) -> Result<()> {
self.buffer.clear();
reader.read_quoted_text(&mut self.buffer, b'\'')?;
reader.read_quoted_text(&mut self.buffer, b'"')?;

let val = serde_json::from_slice(self.buffer.as_slice())?;
self.builder.append_value(val);
Expand Down
2 changes: 2 additions & 0 deletions common/exception/src/exception_code.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,8 @@ build_exceptions! {
// Network error codes.
NetworkRequestError(1073),

UnknownFormat(1074),

// Tenant error codes.
TenantIsEmpty(1101),
IndexOutOfBounds(1102),
Expand Down
27 changes: 27 additions & 0 deletions common/io/src/buffer/buffer_read_ext.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ pub trait BufferReadExt: BufferRead {
fn ignore_bytes(&mut self, bs: &[u8]) -> Result<bool>;
fn ignore_insensitive_bytes(&mut self, bs: &[u8]) -> Result<bool>;
fn ignore_white_spaces(&mut self) -> Result<bool>;
fn ignore_white_spaces_and_byte(&mut self, b: u8) -> Result<bool>;
fn until(&mut self, delim: u8, buf: &mut Vec<u8>) -> Result<usize>;

fn keep_read(&mut self, buf: &mut Vec<u8>, f: impl Fn(u8) -> bool) -> Result<usize>;
Expand Down Expand Up @@ -55,6 +56,11 @@ pub trait BufferReadExt: BufferRead {
Ok(())
}

fn eof(&mut self) -> Result<bool> {
let buffer = self.fill_buf()?;
Ok(buffer.is_empty())
}

fn must_eof(&mut self) -> Result<()> {
let buffer = self.fill_buf()?;
if !buffer.is_empty() {
Expand All @@ -78,6 +84,16 @@ pub trait BufferReadExt: BufferRead {

fn must_ignore_byte(&mut self, b: u8) -> Result<()> {
if !self.ignore_byte(b)? {
return Err(std::io::Error::new(
ErrorKind::InvalidData,
format!("Expected to have char {}.", b as char),
));
}
Ok(())
}

fn must_ignore_white_spaces_and_byte(&mut self, b: u8) -> Result<()> {
if !self.ignore_white_spaces_and_byte(b)? {
return Err(std::io::Error::new(
ErrorKind::InvalidData,
format!("Expected to have char {}", b as char),
Expand Down Expand Up @@ -172,6 +188,17 @@ where R: BufferRead
Ok(cnt > 0)
}

fn ignore_white_spaces_and_byte(&mut self, b: u8) -> Result<bool> {
self.ignores(|c: u8| c == b' ')?;

if self.ignore_byte(b)? {
self.ignores(|c: u8| c == b' ')?;
return Ok(true);
}

Ok(false)
}

fn until(&mut self, delim: u8, buf: &mut Vec<u8>) -> Result<usize> {
self.read_until(delim, buf)
}
Expand Down
Loading

0 comments on commit 2bd575e

Please sign in to comment.