From a96eefa7d73536cc0f3252e1112b5c484177556a Mon Sep 17 00:00:00 2001 From: idx0-dev <124041366+idx0-dev@users.noreply.github.com> Date: Thu, 9 Mar 2023 22:08:37 +0800 Subject: [PATCH 1/7] feat(connector): unified csv parser --- Cargo.lock | 14 +- src/connector/Cargo.toml | 2 +- src/connector/src/macros.rs | 2 +- src/connector/src/parser/csv_parser.rs | 385 +++++------------- .../src/source/filesystem/s3/source/reader.rs | 5 +- 5 files changed, 116 insertions(+), 292 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 50f2b26fdf6f3..98a0e4d16e19b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1759,6 +1759,18 @@ dependencies = [ "typenum", ] +[[package]] +name = "csv" +version = "1.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0b015497079b9a9d69c02ad25de6c0a6edef051ea6360a327d0bd05802ef64ad" +dependencies = [ + "csv-core", + "itoa", + "ryu", + "serde", +] + [[package]] name = "csv-core" version = "0.1.10" @@ -5994,7 +6006,7 @@ dependencies = [ "byteorder", "bytes", "chrono", - "csv-core", + "csv", "duration-str", "enum-as-inner", "futures", diff --git a/src/connector/Cargo.toml b/src/connector/Cargo.toml index 3643f351d5559..72f25cfa3e0b5 100644 --- a/src/connector/Cargo.toml +++ b/src/connector/Cargo.toml @@ -26,7 +26,7 @@ bincode = "1" byteorder = "1" bytes = { version = "1", features = ["serde"] } chrono = { version = "0.4", default-features = false, features = ["clock", "std"] } -csv-core = "0.1.10" +csv = "1.2" duration-str = "0.5.0" enum-as-inner = "0.5" futures = { version = "0.3", default-features = false, features = ["alloc"] } diff --git a/src/connector/src/macros.rs b/src/connector/src/macros.rs index ee0a4989c4c5a..5e95a2d48c10f 100644 --- a/src/connector/src/macros.rs +++ b/src/connector/src/macros.rs @@ -164,7 +164,7 @@ macro_rules! impl_common_parser_logic { ($parser_name:ty) => { impl $parser_name { #[try_stream(boxed, ok = $crate::source::StreamChunkWithState, error = RwError)] - async fn into_chunk_stream(self, data_stream: $crate::source::BoxSourceStream) { + async fn into_chunk_stream(mut self, data_stream: $crate::source::BoxSourceStream) { #[for_await] for batch in data_stream { let batch = batch?; diff --git a/src/connector/src/parser/csv_parser.rs b/src/connector/src/parser/csv_parser.rs index 1f73c7d4508fc..8cf96c4227164 100644 --- a/src/connector/src/parser/csv_parser.rs +++ b/src/connector/src/parser/csv_parser.rs @@ -12,48 +12,45 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::HashMap; use std::str::FromStr; use anyhow::anyhow; use futures_async_stream::try_stream; -use risingwave_common::error::ErrorCode::InternalError; +use itertools::Itertools; +use risingwave_common::error::ErrorCode::{InternalError, ProtocolError}; use risingwave_common::error::{Result, RwError}; -use risingwave_common::types::{DataType, Datum, Decimal, ScalarImpl}; +use risingwave_common::types::{Datum, Decimal, ScalarImpl}; use risingwave_expr::vector_op::cast::{ str_to_date, str_to_timestamp, str_with_time_zone_to_timestamptz, }; - -use crate::parser::{ - BoxSourceWithStateStream, ByteStreamSourceParser, SourceColumnDesc, SourceStreamChunkBuilder, - SourceStreamChunkRowWriter, StreamChunkWithState, WriteGuard, -}; -use crate::source::{BoxSourceStream, SourceContextRef, SplitId}; - +use simd_json::{BorrowedValue, ValueAccess}; + +use crate::common::UpsertMessage; +use crate::impl_common_parser_logic; +use crate::parser::common::simd_json_parse_value; +use crate::parser::util::at_least_one_ok; +use crate::parser::{SourceStreamChunkRowWriter, WriteGuard}; +use crate::source::{DataType, SourceColumnDesc, SourceContextRef}; +impl_common_parser_logic!(CsvParser); macro_rules! to_rust_type { ($v:ident, $t:ty) => { $v.parse::<$t>() .map_err(|_| anyhow!("failed parse {} from {}", stringify!($t), $v))? }; } - #[derive(Debug, Clone)] pub struct CsvParserConfig { pub delimiter: u8, pub has_header: bool, } +/// Parser for JSON format #[derive(Debug)] pub struct CsvParser { rw_columns: Vec, - next_row_is_header: bool, - csv_reader: csv_core::Reader, - // buffers for parse - output: Vec, - output_cursor: usize, - ends: Vec, - ends_cursor: usize, source_ctx: SourceContextRef, + headers: Option>, + delimiter: u8, } impl CsvParser { @@ -69,296 +66,108 @@ impl CsvParser { Ok(Self { rw_columns, - next_row_is_header: has_header, - csv_reader: csv_core::ReaderBuilder::new().delimiter(delimiter).build(), - output: vec![0], - output_cursor: 0, - ends: vec![0], - ends_cursor: 1, + delimiter, + headers: if has_header { Some(Vec::new()) } else { None }, source_ctx, }) } - fn reset_cursor(&mut self) { - self.output_cursor = 0; - self.ends_cursor = 1; + fn read_row(&self, buf: &[u8]) -> Result> { + let mut reader_builder = csv::ReaderBuilder::default(); + reader_builder.delimiter(self.delimiter).has_headers(false); + let record = reader_builder + .from_reader(buf) + .records() + .next() + .transpose() + .map_err(|err| RwError::from(ProtocolError(err.to_string())))?; + Ok(record + .map(|record| record.iter().map(|field| field.to_string()).collect()) + .unwrap_or_default()) } - pub fn parse_columns_to_strings(&mut self, chunk: &mut &[u8]) -> Result>> { - loop { - let (result, n_input, n_output, n_ends) = self.csv_reader.read_record( - chunk, - &mut self.output[self.output_cursor..], - &mut self.ends[self.ends_cursor..], - ); - self.output_cursor += n_output; - *chunk = &(*chunk)[n_input..]; - self.ends_cursor += n_ends; - match result { - // input empty, here means the `chunk` passed to this method - // doesn't contain a whole record, need more bytes - csv_core::ReadRecordResult::InputEmpty => break Ok(None), - // the output buffer is not enough - csv_core::ReadRecordResult::OutputFull => { - let length = self.output.len(); - self.output.resize(length * 2, 0); - } - // the ends buffer is not enough - csv_core::ReadRecordResult::OutputEndsFull => { - let length = self.ends.len(); - self.ends.resize(length * 2, 0); - } - // Success cases - csv_core::ReadRecordResult::Record | csv_core::ReadRecordResult::End => { - // skip the header - if self.next_row_is_header { - self.next_row_is_header = false; - self.reset_cursor(); - continue; - } - let ends_cursor = self.ends_cursor; - // caller provides an empty chunk, and there is no data - // in inner buffer - if ends_cursor <= 1 { - break Ok(None); - } - self.reset_cursor(); - - let string_columns = (1..ends_cursor) - .map(|culomn| { - String::from_utf8( - self.output[self.ends[culomn - 1]..self.ends[culomn]].to_owned(), - ) - .map_err(|e| { - RwError::from(InternalError(format!( - "Parse csv column {} error: invalid UTF-8 ({})", - culomn, e, - ))) - }) - }) - .collect::>>()?; - break Ok(Some(string_columns)); - } + #[inline] + fn parse_string(dtype: &DataType, v: String) -> Result { + let v = match dtype { + // mysql use tinyint to represent boolean + DataType::Boolean => ScalarImpl::Bool(to_rust_type!(v, i16) != 0), + DataType::Int16 => ScalarImpl::Int16(to_rust_type!(v, i16)), + DataType::Int32 => ScalarImpl::Int32(to_rust_type!(v, i32)), + DataType::Int64 => ScalarImpl::Int64(to_rust_type!(v, i64)), + DataType::Float32 => ScalarImpl::Float32(to_rust_type!(v, f32).into()), + DataType::Float64 => ScalarImpl::Float64(to_rust_type!(v, f64).into()), + // FIXME: decimal should have more precision than f64 + DataType::Decimal => Decimal::from_str(v.as_str()) + .map_err(|_| anyhow!("parse decimal from string err {}", v))? + .into(), + DataType::Varchar => v.into(), + DataType::Date => str_to_date(v.as_str())?.into(), + DataType::Time => str_to_date(v.as_str())?.into(), + DataType::Timestamp => str_to_timestamp(v.as_str())?.into(), + DataType::Timestamptz => str_with_time_zone_to_timestamptz(v.as_str())?.into(), + _ => { + return Err(RwError::from(InternalError(format!( + "CSV data source not support type {}", + dtype + )))) } - } + }; + Ok(Some(v)) } - fn try_parse_single_record( + #[allow(clippy::unused_async)] + pub async fn parse_inner( &mut self, - payload: &mut &[u8], + payload: &[u8], mut writer: SourceStreamChunkRowWriter<'_>, - ) -> Result> { - let columns_string = match self.parse_columns_to_strings(payload)? { - None => return Ok(None), - Some(strings) => strings, - }; - writer - .insert(move |desc| { - // column_id is 1-based - let column_id = desc.column_id.get_id() - 1; - let column_type = &desc.data_type; - let v = match columns_string.get(column_id as usize) { - Some(v) => v.to_owned(), - None => return Ok(None), - }; - parse_string(column_type, v) - }) - .map(Some) - } - - #[try_stream(boxed, ok = StreamChunkWithState, error = RwError)] - async fn into_stream(mut self, data_stream: BoxSourceStream) { - // the remain length of the last seen message - let mut remain_len = 0; - // current offset - let mut offset = 0; - // split id of current data stream - let mut split_id = None; - #[for_await] - for batch in data_stream { - let batch = batch?; - - let mut builder = - SourceStreamChunkBuilder::with_capacity(self.rw_columns.clone(), batch.len() * 2); - let mut split_offset_mapping: HashMap = HashMap::new(); - - for msg in batch { - if let Some(content) = msg.payload { - if split_id.is_none() { - split_id = Some(msg.split_id.clone()); - } - - offset = msg.offset.parse().unwrap(); - let mut buff = content.as_ref(); - - remain_len = buff.len(); - loop { - match self.try_parse_single_record(&mut buff, builder.row_writer()) { - Err(e) => { - tracing::warn!( - "message parsing failed {}, skipping", - e.to_string() - ); - continue; - } - Ok(None) => { - break; - } - Ok(Some(_)) => { - let consumed = remain_len - buff.len(); - offset += consumed; - remain_len = buff.len(); - } - } - } - split_offset_mapping.insert(msg.split_id, offset.to_string()); - } + ) -> Result { + let mut fields = self.read_row(payload)?; + if let Some(headers) = &mut self.headers { + if headers.is_empty() { + *headers = fields; + tracing::warn!("set headers {:?} {:?}", headers, payload); + // Here we want a row, but got nothing. So it's an error for the `parse_inner` but + // has no bad impact on the system. + return Err(RwError::from(ProtocolError("This messsage indicates a header, no row will be inserted. However, internal parser state was updated.".to_string()))); } - yield StreamChunkWithState { - chunk: builder.finish(), - split_offset_mapping: Some(split_offset_mapping), - }; - } - // The file may be missing the last terminator, - // so we need to pass an empty payload to inform the parser. - if remain_len > 0 { - let mut builder = SourceStreamChunkBuilder::with_capacity(self.rw_columns.clone(), 1); - let mut split_offset_mapping: HashMap = HashMap::new(); - let empty = vec![]; - match self.try_parse_single_record(&mut empty.as_ref(), builder.row_writer()) { - Err(e) => { - tracing::warn!("message parsing failed {}, skipping", e.to_string()); + writer.insert(|desc| { + if let Some(i) = headers.iter().position(|name| name == &desc.name) { + Self::parse_string( + &desc.data_type, + fields.get_mut(i).map(std::mem::take).unwrap_or_default(), + ) + } else { + Ok(None) } - Ok(Some(_)) => { - split_offset_mapping - .insert(split_id.unwrap(), (offset + remain_len).to_string()); - yield StreamChunkWithState { - chunk: builder.finish(), - split_offset_mapping: Some(split_offset_mapping), - }; + }) + } else { + fields.reverse(); + writer.insert(|desc| { + if let Some(value) = fields.pop() { + Self::parse_string(&desc.data_type, value) + } else { + Ok(None) } - _ => {} - } + }) } } } -impl ByteStreamSourceParser for CsvParser { - fn into_stream(self, msg_stream: BoxSourceStream) -> BoxSourceWithStateStream { - self.into_stream(msg_stream) - } -} - -#[inline] -fn parse_string(dtype: &DataType, v: String) -> Result { - let v = match dtype { - // mysql use tinyint to represent boolean - DataType::Boolean => ScalarImpl::Bool(to_rust_type!(v, i16) != 0), - DataType::Int16 => ScalarImpl::Int16(to_rust_type!(v, i16)), - DataType::Int32 => ScalarImpl::Int32(to_rust_type!(v, i32)), - DataType::Int64 => ScalarImpl::Int64(to_rust_type!(v, i64)), - DataType::Float32 => ScalarImpl::Float32(to_rust_type!(v, f32).into()), - DataType::Float64 => ScalarImpl::Float64(to_rust_type!(v, f64).into()), - // FIXME: decimal should have more precision than f64 - DataType::Decimal => Decimal::from_str(v.as_str()) - .map_err(|_| anyhow!("parse decimal from string err {}", v))? - .into(), - DataType::Varchar => v.into(), - DataType::Date => str_to_date(v.as_str())?.into(), - DataType::Time => str_to_date(v.as_str())?.into(), - DataType::Timestamp => str_to_timestamp(v.as_str())?.into(), - DataType::Timestamptz => str_with_time_zone_to_timestamptz(v.as_str())?.into(), - _ => { - return Err(RwError::from(InternalError(format!( - "CSV data source not support type {}", - dtype - )))) - } - }; - Ok(Some(v)) -} - #[cfg(test)] mod tests { - use std::sync::Arc; - - use futures_async_stream::for_await; - - use crate::source::{SourceMessage, SourceMeta}; - - #[try_stream(boxed, ok = Vec, error = anyhow::Error)] - async fn prepare_data(data: Vec) { - let mid = data.len() / 2; - let part1 = data[..mid].to_vec(); - let part2 = data[mid..].to_vec(); - let id = "split1".into(); - let part1_len = part1.len(); - let msg1 = SourceMessage { - payload: Some(part1.into()), - offset: 0.to_string(), - split_id: Arc::clone(&id), - meta: SourceMeta::Empty, - }; - let msg2 = SourceMessage { - payload: Some(part2.into()), - offset: part1_len.to_string(), - split_id: Arc::clone(&id), - meta: SourceMeta::Empty, - }; - - yield vec![msg1, msg2]; - } - use super::*; - #[ignore] - #[tokio::test] - async fn test_csv_parser_without_last_line_break() { - let descs = vec![ - SourceColumnDesc::simple("name", DataType::Varchar, 1.into()), - SourceColumnDesc::simple("age", DataType::Int32, 2.into()), - ]; - - let config = CsvParserConfig { - delimiter: b',', - has_header: true, - }; - let parser = CsvParser::new(descs, config, Default::default()).unwrap(); - let data = b" -name,age -pite,20 -alex,10"; - let data_stream = prepare_data(data.to_vec()); - let msg_stream = parser.into_stream(data_stream); - #[for_await] - for msg in msg_stream { - println!("{:?}", msg); - } - } - - #[ignore] - #[tokio::test] - async fn test_csv_parser_with_last_line_break() { - let descs = vec![ - SourceColumnDesc::simple("name", DataType::Varchar, 1.into()), - SourceColumnDesc::simple("age", DataType::Int32, 2.into()), - ]; - - let config = CsvParserConfig { - delimiter: b',', - has_header: true, - }; - let parser = CsvParser::new(descs, config, Default::default()).unwrap(); - let data = b" -name,age -pite,20 -alex,10 -"; - println!("data len: {}", data.len()); - let data_stream = prepare_data(data.to_vec()); - let msg_stream = parser.into_stream(data_stream); - #[for_await] - for msg in msg_stream { - println!("{:?}", msg); - } + #[test] + fn test_parse_row() { + let parser = CsvParser::new( + Vec::new(), + CsvParserConfig { + delimiter: b',', + has_header: true, + }, + Default::default(), + ) + .unwrap(); + let row = parser.read_row(b"a,b,c").unwrap(); + println!("{:?}", row); } } diff --git a/src/connector/src/source/filesystem/s3/source/reader.rs b/src/connector/src/source/filesystem/s3/source/reader.rs index f4ef57efdb90f..26694ed929901 100644 --- a/src/connector/src/source/filesystem/s3/source/reader.rs +++ b/src/connector/src/source/filesystem/s3/source/reader.rs @@ -198,7 +198,10 @@ impl S3FileReader { let parser = ByteStreamSourceParserImpl::create(self.parser_config.clone(), source_ctx)?; - let msg_stream = if matches!(parser, ByteStreamSourceParserImpl::Json(_)) { + let msg_stream = if matches!( + parser, + ByteStreamSourceParserImpl::Json(_) | ByteStreamSourceParserImpl::Csv(_) + ) { NdByteStreamWrapper::new(parser).into_stream(Box::pin(data_stream)) } else { parser.into_stream(Box::pin(data_stream)) From 571df2421611219158573e894ba5ec0a027af67a Mon Sep 17 00:00:00 2001 From: idx0-dev <124041366+idx0-dev@users.noreply.github.com> Date: Fri, 10 Mar 2023 11:28:16 +0800 Subject: [PATCH 2/7] add test --- ci/scripts/s3-source-test.sh | 19 +- ci/workflows/main-cron.yml | 8 +- ci/workflows/pull-request.yml | 264 +------------------------ e2e_test/s3/run_csv.py | 155 +++++++++++++++ src/connector/src/parser/csv_parser.rs | 205 +++++++++++++++++-- 5 files changed, 376 insertions(+), 275 deletions(-) create mode 100644 e2e_test/s3/run_csv.py diff --git a/ci/scripts/s3-source-test.sh b/ci/scripts/s3-source-test.sh index 58c2cbd93863f..f047cec057454 100755 --- a/ci/scripts/s3-source-test.sh +++ b/ci/scripts/s3-source-test.sh @@ -20,6 +20,23 @@ while getopts 'p:' opt; do done shift $((OPTIND -1)) + +while getopts 's:' opt; do + case ${opt} in + p ) + script=$OPTARG + ;; + \? ) + echo "Invalid Option: -$OPTARG" 1>&2 + exit 1 + ;; + : ) + echo "Invalid option: $OPTARG requires an argument" 1>&2 + ;; + esac +done +shift $((OPTIND -1)) + echo "--- Download artifacts" mkdir -p target/debug buildkite-agent artifact download risingwave-"$profile" target/debug/ @@ -44,7 +61,7 @@ cargo make ci-start ci-1cn-1fe echo "--- Run test" python3 -m pip install minio psycopg2-binary -python3 e2e_test/s3/run.py +python3 e2e_test/s3/$script.py echo "--- Kill cluster" cargo make ci-kill diff --git a/ci/workflows/main-cron.yml b/ci/workflows/main-cron.yml index 766320c4fd34f..41c7ef7a4cbc8 100644 --- a/ci/workflows/main-cron.yml +++ b/ci/workflows/main-cron.yml @@ -184,8 +184,8 @@ steps: timeout_in_minutes: 5 retry: *auto-retry - - label: "S3 source check on AWS" - command: "ci/scripts/s3-source-test.sh -p ci-release" + - label: "S3 source check on AWS (json parser)" + command: "ci/scripts/s3-source-test.sh -p ci-release -s run" depends_on: build plugins: - seek-oss/aws-sm#v2.3.1: @@ -200,8 +200,8 @@ steps: timeout_in_minutes: 20 retry: *auto-retry - - label: "S3 source check on lyvecloud.seagate.com" - command: "ci/scripts/s3-source-test.sh -p ci-release" + - label: "S3 source check on lyvecloud.seagate.com (json parser)" + command: "ci/scripts/s3-source-test.sh -p ci-release -s run" depends_on: build plugins: - seek-oss/aws-sm#v2.3.1: diff --git a/ci/workflows/pull-request.yml b/ci/workflows/pull-request.yml index ccb6d7d6999f1..6ed3d08c51ff9 100644 --- a/ci/workflows/pull-request.yml +++ b/ci/workflows/pull-request.yml @@ -43,270 +43,18 @@ steps: timeout_in_minutes: 15 retry: *auto-retry - - label: "build other components" - command: "ci/scripts/build-other.sh" - key: "build-other" + - label: "S3 source check on AWS (csv parser)" + command: "ci/scripts/s3-source-test.sh -p ci-dev -s run_csv" + depends_on: build plugins: - seek-oss/aws-sm#v2.3.1: env: - GITHUB_TOKEN: github-token + S3_SOURCE_TEST_CONF: ci_s3_source_test_aws - docker-compose#v4.9.0: run: rw-build-env config: ci/docker-compose.yml mount-buildkite-agent: true environment: - - GITHUB_TOKEN - timeout_in_minutes: 10 - retry: *auto-retry - - - label: "build (deterministic simulation)" - command: "ci/scripts/build-simulation.sh" - key: "build-simulation" - plugins: - - gencer/cache#v2.4.10: *cargo-cache - - docker-compose#v4.9.0: - run: rw-build-env - config: ci/docker-compose.yml - mount-buildkite-agent: true - timeout_in_minutes: 15 - retry: *auto-retry - - - label: "docslt" - command: "ci/scripts/docslt.sh" - key: "docslt" - plugins: - - gencer/cache#v2.4.10: *cargo-cache - - docker-compose#v4.9.0: - run: rw-build-env - config: ci/docker-compose.yml - mount-buildkite-agent: true - timeout_in_minutes: 10 - retry: *auto-retry - - - label: "end-to-end test" - command: "ci/scripts/e2e-test.sh -p ci-dev" - depends_on: - - "build" - - "docslt" - plugins: - - gencer/cache#v2.4.10: *cargo-cache - - docker-compose#v4.9.0: - run: rw-build-env - config: ci/docker-compose.yml - mount-buildkite-agent: true - - ./ci/plugins/upload-failure-logs - timeout_in_minutes: 12 - retry: *auto-retry - - - label: "end-to-end test (parallel)" - command: "ci/scripts/e2e-test-parallel.sh -p ci-dev" - depends_on: - - "build" - - "docslt" - plugins: - - gencer/cache#v2.4.10: *cargo-cache - - docker-compose#v4.9.0: - run: rw-build-env - config: ci/docker-compose.yml - mount-buildkite-agent: true - - ./ci/plugins/upload-failure-logs - timeout_in_minutes: 12 - retry: *auto-retry - - - label: "end-to-end test (parallel, in-memory)" - command: "ci/scripts/e2e-test-parallel-in-memory.sh -p ci-dev" - depends_on: "build" - plugins: - - gencer/cache#v2.4.10: *cargo-cache - - docker-compose#v4.9.0: - run: rw-build-env - config: ci/docker-compose.yml - mount-buildkite-agent: true - - ./ci/plugins/upload-failure-logs - timeout_in_minutes: 12 - retry: *auto-retry - - - label: "end-to-end source test" - command: "ci/scripts/e2e-source-test.sh -p ci-dev" - depends_on: - - "build" - - "build-other" - plugins: - - gencer/cache#v2.4.10: *cargo-cache - - docker-compose#v4.9.0: - run: source-test-env - config: ci/docker-compose.yml - mount-buildkite-agent: true - - ./ci/plugins/upload-failure-logs - timeout_in_minutes: 18 - retry: *auto-retry - - - label: "end-to-end sink test" - command: "ci/scripts/e2e-sink-test.sh -p ci-dev" - depends_on: - - "build" - - "build-other" - plugins: - - gencer/cache#v2.4.10: *cargo-cache - - docker-compose#v4.9.0: - run: sink-test-env - config: ci/docker-compose.yml - mount-buildkite-agent: true - - ./ci/plugins/upload-failure-logs - timeout_in_minutes: 5 - retry: *auto-retry - - - label: "end-to-end iceberg sink test" - command: "ci/scripts/e2e-iceberg-sink-test.sh -p ci-dev" - depends_on: - - "build" - - "build-other" - plugins: - - gencer/cache#v2.4.10: *cargo-cache - - docker-compose#v4.9.0: - run: rw-build-env - config: ci/docker-compose.yml - mount-buildkite-agent: true - - ./ci/plugins/upload-failure-logs - timeout_in_minutes: 5 - retry: *auto-retry - - - label: "e2e java-binding test" - command: "ci/scripts/java-binding-test.sh -p ci-dev" - depends_on: "build" - plugins: - - gencer/cache#v2.4.10: *cargo-cache - - docker-compose#v4.9.0: - run: rw-build-env - config: ci/docker-compose.yml - mount-buildkite-agent: true - - ./ci/plugins/upload-failure-logs - timeout_in_minutes: 5 - retry: *auto-retry - - - label: "regress test" - command: "ci/scripts/regress-test.sh -p ci-dev" - depends_on: "build" - plugins: - - gencer/cache#v2.4.10: *cargo-cache - - docker-compose#v4.9.0: - run: regress-test-env - config: ci/docker-compose.yml - mount-buildkite-agent: true - - ./ci/plugins/upload-failure-logs - timeout_in_minutes: 5 - retry: *auto-retry - - - label: "unit test" - command: "ci/scripts/pr-unit-test.sh" - plugins: - - ./ci/plugins/swapfile - - gencer/cache#v2.4.10: *cargo-cache - - seek-oss/aws-sm#v2.3.1: - env: - CODECOV_TOKEN: my-codecov-token - - docker-compose#v4.9.0: - run: rw-build-env - config: ci/docker-compose.yml - environment: - - CODECOV_TOKEN - timeout_in_minutes: 12 - retry: *auto-retry - - - label: "fuzz test" - command: "ci/scripts/pr-fuzz-test.sh -p ci-dev" - depends_on: - - "build" - - "build-simulation" - plugins: - - ./ci/plugins/swapfile - - gencer/cache#v2.4.10: *cargo-cache - - docker-compose#v4.9.0: - run: rw-build-env - config: ci/docker-compose.yml - mount-buildkite-agent: true - - ./ci/plugins/upload-failure-logs - timeout_in_minutes: 15 - retry: *auto-retry - - - label: "check" - command: "ci/scripts/check.sh" - plugins: - - gencer/cache#v2.4.10: *cargo-cache - - docker-compose#v4.9.0: - run: rw-build-env - config: ci/docker-compose.yml + - S3_SOURCE_TEST_CONF timeout_in_minutes: 20 - retry: *auto-retry - - - label: "unit test (deterministic simulation)" - command: "ci/scripts/deterministic-unit-test.sh" - plugins: - - gencer/cache#v2.4.10: *cargo-cache - - docker-compose#v4.9.0: - run: rw-build-env - config: ci/docker-compose.yml - mount-buildkite-agent: true - timeout_in_minutes: 10 - retry: *auto-retry - - - label: "scaling test (deterministic simulation)" - command: "TEST_NUM=5 ci/scripts/deterministic-scale-test.sh" - depends_on: "build-simulation" - plugins: - - gencer/cache#v2.4.10: *cargo-cache - - docker-compose#v4.9.0: - run: rw-build-env - config: ci/docker-compose.yml - mount-buildkite-agent: true - timeout_in_minutes: 10 - retry: *auto-retry - - - label: "end-to-end test (deterministic simulation)" - command: "TEST_NUM=16 timeout 14m ci/scripts/deterministic-e2e-test.sh" - depends_on: "build-simulation" - plugins: - - seek-oss/aws-sm#v2.3.1: - env: - GITHUB_TOKEN: github-token - - gencer/cache#v2.4.10: *cargo-cache - - docker-compose#v4.9.0: - run: rw-build-env - config: ci/docker-compose.yml - mount-buildkite-agent: true - environment: - - GITHUB_TOKEN - - ./ci/plugins/upload-failure-logs - timeout_in_minutes: 15 - retry: *auto-retry - - - label: "recovery test (deterministic simulation)" - command: "TEST_NUM=8 KILL_RATE=0.5 timeout 14m ci/scripts/deterministic-recovery-test.sh" - depends_on: "build-simulation" - plugins: - - gencer/cache#v2.4.10: *cargo-cache - # - seek-oss/aws-sm#v2.3.1: - # env: - # BUILDKITE_ANALYTICS_TOKEN: buildkite-build-analytics-deterministic-token - - docker-compose#v4.9.0: - run: rw-build-env - config: ci/docker-compose.yml - mount-buildkite-agent: true - # - test-collector#v1.0.0: - # files: "*-junit.xml" - # format: "junit" - - ./ci/plugins/upload-failure-logs - timeout_in_minutes: 15 - retry: *auto-retry - - - label: "misc check" - command: "ci/scripts/misc-check.sh" - plugins: - - gencer/cache#v2.4.10: *cargo-cache - - docker-compose#v4.9.0: - run: rw-build-env - config: ci/docker-compose.yml - - shellcheck#v1.2.0: - files: ./**/*.sh - timeout_in_minutes: 5 - retry: *auto-retry + retry: *auto-retry \ No newline at end of file diff --git a/e2e_test/s3/run_csv.py b/e2e_test/s3/run_csv.py new file mode 100644 index 0000000000000..c5412c1e57d07 --- /dev/null +++ b/e2e_test/s3/run_csv.py @@ -0,0 +1,155 @@ +import os +import string +import json +import string +from time import sleep +from minio import Minio +import psycopg2 +import random + + +def do_test(config, N, n, prefix): + conn = psycopg2.connect( + host="localhost", + port="4566", + user="root", + database="dev" + ) + + # Open a cursor to execute SQL statements + cur = conn.cursor() + cur.execute(f'''CREATE TABLE s3_test_csv_without_headers( + a int, + b int, + c int, + ) WITH ( + connector = 's3', + match_pattern = '{prefix}_data_without_headers.csv', + s3.region_name = '{config['S3_REGION']}', + s3.bucket_name = '{config['S3_BUCKET']}', + s3.credentials.access = '{config['S3_ACCESS_KEY']}', + s3.credentials.secret = '{config['S3_SECRET_KEY']}', + s3.endpoint_url = 'https://{config['S3_ENDPOINT']}' + ) ROW FORMAT CSV WITHOUT HEADER DELIMITED BY ',';''') + + cur.execute(f'''CREATE TABLE s3_test_csv_with_headers( + a int, + b int, + c int, + ) WITH ( + connector = 's3', + match_pattern = '{prefix}_data_with_headers.csv', + s3.region_name = '{config['S3_REGION']}', + s3.bucket_name = '{config['S3_BUCKET']}', + s3.credentials.access = '{config['S3_ACCESS_KEY']}', + s3.credentials.secret = '{config['S3_SECRET_KEY']}', + s3.endpoint_url = 'https://{config['S3_ENDPOINT']}' + ) ROW FORMAT CSV DELIMITED BY ',';''') + + total_row = int(N * n) + sleep(60) + while True: + sleep(60) + cur.execute('select count(*) from s3_test_csv_with_headers') + result_with_headers = cur.fetchone() + cur.execute('select count(*) from s3_test_csv_without_headers') + result_without_headers = cur.fetchone() + if result_with_headers[0] == total_row and result_without_headers[0] == total_row: + break + print( + f"Now got {result_with_headers[0]} rows in table, {total_row} expected, wait 60s") + + cur.execute( + 'select count(*), sum(a), sum(b), sum(c) from s3_test_csv_with_headers') + result_with_headers = cur.fetchone() + + cur.execute( + 'select count(*), sum(a), sum(b), sum(c) from s3_test_csv_without_headers') + s3_test_csv_without_headers = cur.fetchone() + + print(result_with_headers, s3_test_csv_without_headers, + int(((N - 1) * N / 2) * n), int(N*n / 2)) + + assert s3_test_csv_without_headers[0] == total_row + assert s3_test_csv_without_headers[1] == int(((N - 1) * N / 2) * n) + assert s3_test_csv_without_headers[2] == int(N*n / 2) + assert s3_test_csv_without_headers[3] == 0 + + assert result_with_headers[0] == total_row + assert result_with_headers[1] == 0 + assert result_with_headers[2] == int(N*n / 2) + assert result_with_headers[3] == int(((N - 1) * N / 2) * n) + + cur.execute('drop table s3_test_csv_with_headers') + cur.execute('drop table s3_test_csv_without_headers') + + cur.close() + conn.close() + + +if __name__ == "__main__": + config = json.loads(os.environ["S3_SOURCE_TEST_CONF"]) + run_id = str(random.randint(1000, 9999)) + N = 10000 + # do_test(config, N, 0, run_id) + items = [",".join([str(j), str(j % 2), str(-1 if j % 2 else 1)]) + for j in range(N) + ] + + data = "\n".join(items) + "\n" + n = 10 + with open("data_without_headers.csv", "w") as f: + for _ in range(10): + f.write(data) + os.fsync(f.fileno()) + + with open("data_with_headers.csv", "w") as f: + f.write("c,b,a\n") + for _ in range(10): + f.write(data) + os.fsync(f.fileno()) + + client = Minio( + config["S3_ENDPOINT"], + access_key=config["S3_ACCESS_KEY"], + secret_key=config["S3_SECRET_KEY"], + secure=True + ) + + try: + client.fput_object( + config["S3_BUCKET"], + f"{run_id}_data_without_headers.csv", + f"data_without_headers.csv" + + ) + client.fput_object( + config["S3_BUCKET"], + f"{run_id}_data_with_headers.csv", + f"data_with_headers.csv" + ) + print( + f"Uploaded {run_id}_data_with_headers.csv & {run_id}_data_with_headers.csv to S3") + os.remove(f"data_with_headers.csv") + os.remove(f"data_without_headers.csv") + except Exception as e: + print(f"Error uploading test files") + + return_code = 0 + try: + do_test(config, N, n, run_id) + except Exception as e: + print("Test failed", e) + return_code = 1 + + # Clean up + for i in range(20): + try: + client.remove_object( + config["S3_BUCKET"], f"{run_id}_data_with_headers.csv") + client.remove_object( + config["S3_BUCKET"], f"{run_id}_data_without_headers.csv") + except Exception as e: + print(f"Error removing testing files {e}") + + exit(return_code) diff --git a/src/connector/src/parser/csv_parser.rs b/src/connector/src/parser/csv_parser.rs index 8cf96c4227164..52a00cf974abd 100644 --- a/src/connector/src/parser/csv_parser.rs +++ b/src/connector/src/parser/csv_parser.rs @@ -16,19 +16,14 @@ use std::str::FromStr; use anyhow::anyhow; use futures_async_stream::try_stream; -use itertools::Itertools; use risingwave_common::error::ErrorCode::{InternalError, ProtocolError}; use risingwave_common::error::{Result, RwError}; use risingwave_common::types::{Datum, Decimal, ScalarImpl}; use risingwave_expr::vector_op::cast::{ str_to_date, str_to_timestamp, str_with_time_zone_to_timestamptz, }; -use simd_json::{BorrowedValue, ValueAccess}; -use crate::common::UpsertMessage; use crate::impl_common_parser_logic; -use crate::parser::common::simd_json_parse_value; -use crate::parser::util::at_least_one_ok; use crate::parser::{SourceStreamChunkRowWriter, WriteGuard}; use crate::source::{DataType, SourceColumnDesc, SourceContextRef}; impl_common_parser_logic!(CsvParser); @@ -125,10 +120,9 @@ impl CsvParser { if let Some(headers) = &mut self.headers { if headers.is_empty() { *headers = fields; - tracing::warn!("set headers {:?} {:?}", headers, payload); // Here we want a row, but got nothing. So it's an error for the `parse_inner` but // has no bad impact on the system. - return Err(RwError::from(ProtocolError("This messsage indicates a header, no row will be inserted. However, internal parser state was updated.".to_string()))); + return Err(RwError::from(ProtocolError("This message indicates a header, no row will be inserted. However, internal parser state was updated.".to_string()))); } writer.insert(|desc| { if let Some(i) = headers.iter().position(|name| name == &desc.name) { @@ -155,10 +149,125 @@ impl CsvParser { #[cfg(test)] mod tests { + use risingwave_common::array::Op; + use risingwave_common::row::Row; + use risingwave_common::types::{DataType, ScalarImpl, ToOwnedDatum}; + use super::*; - #[test] - fn test_parse_row() { - let parser = CsvParser::new( + use crate::parser::SourceStreamChunkBuilder; + #[tokio::test] + async fn test_csv_without_headers() { + let data = [ + r#"1,a,2"#, + r#""15541","a,1,1,",4"#, + r#"0,"""0",0"#, + r#"0,0,0,0,0,0,0,0,0,0,0,0,0,"#, + ]; + let descs = vec![ + SourceColumnDesc::simple("a", DataType::Int32, 0.into()), + SourceColumnDesc::simple("b", DataType::Varchar, 1.into()), + SourceColumnDesc::simple("c", DataType::Int32, 2.into()), + ]; + let mut parser = CsvParser::new( + Vec::new(), + CsvParserConfig { + delimiter: b',', + has_header: false, + }, + Default::default(), + ) + .unwrap(); + let mut builder = SourceStreamChunkBuilder::with_capacity(descs, 4); + for item in data { + parser + .parse_inner(item.as_bytes(), builder.row_writer()) + .await + .unwrap(); + } + let chunk = builder.finish(); + let mut rows = chunk.rows(); + { + let (op, row) = rows.next().unwrap(); + assert_eq!(op, Op::Insert); + assert_eq!( + row.datum_at(0).to_owned_datum(), + (Some(ScalarImpl::Int32(1))) + ); + assert_eq!( + row.datum_at(1).to_owned_datum(), + (Some(ScalarImpl::Utf8("a".into()))) + ); + assert_eq!( + row.datum_at(2).to_owned_datum(), + (Some(ScalarImpl::Int32(2))) + ); + } + { + let (op, row) = rows.next().unwrap(); + assert_eq!(op, Op::Insert); + assert_eq!( + row.datum_at(0).to_owned_datum(), + (Some(ScalarImpl::Int32(15541))) + ); + assert_eq!( + row.datum_at(1).to_owned_datum(), + (Some(ScalarImpl::Utf8("a,1,1,".into()))) + ); + assert_eq!( + row.datum_at(2).to_owned_datum(), + (Some(ScalarImpl::Int32(4))) + ); + } + + { + let (op, row) = rows.next().unwrap(); + assert_eq!(op, Op::Insert); + assert_eq!( + row.datum_at(0).to_owned_datum(), + (Some(ScalarImpl::Int32(0))) + ); + assert_eq!( + row.datum_at(1).to_owned_datum(), + (Some(ScalarImpl::Utf8("\"0".into()))) + ); + assert_eq!( + row.datum_at(2).to_owned_datum(), + (Some(ScalarImpl::Int32(0))) + ); + } + + { + let (op, row) = rows.next().unwrap(); + assert_eq!(op, Op::Insert); + assert_eq!( + row.datum_at(0).to_owned_datum(), + (Some(ScalarImpl::Int32(0))) + ); + assert_eq!( + row.datum_at(1).to_owned_datum(), + (Some(ScalarImpl::Utf8("0".into()))) + ); + assert_eq!( + row.datum_at(2).to_owned_datum(), + (Some(ScalarImpl::Int32(0))) + ); + } + } + #[tokio::test] + async fn test_csv_with_headers() { + let data = [ + r#"c,b,a"#, + r#"1,a,2"#, + r#""15541","a,1,1,",4"#, + r#"0,"""0",0"#, + r#"0,0,0,0,0,0,0,0,0,0,0,0,0,"#, + ]; + let descs = vec![ + SourceColumnDesc::simple("a", DataType::Int32, 0.into()), + SourceColumnDesc::simple("b", DataType::Varchar, 1.into()), + SourceColumnDesc::simple("c", DataType::Int32, 2.into()), + ]; + let mut parser = CsvParser::new( Vec::new(), CsvParserConfig { delimiter: b',', @@ -167,7 +276,79 @@ mod tests { Default::default(), ) .unwrap(); - let row = parser.read_row(b"a,b,c").unwrap(); - println!("{:?}", row); + let mut builder = SourceStreamChunkBuilder::with_capacity(descs, 4); + for item in data { + let _ = parser + .parse_inner(item.as_bytes(), builder.row_writer()) + .await; + } + let chunk = builder.finish(); + let mut rows = chunk.rows(); + { + let (op, row) = rows.next().unwrap(); + assert_eq!(op, Op::Insert); + assert_eq!( + row.datum_at(2).to_owned_datum(), + (Some(ScalarImpl::Int32(1))) + ); + assert_eq!( + row.datum_at(1).to_owned_datum(), + (Some(ScalarImpl::Utf8("a".into()))) + ); + assert_eq!( + row.datum_at(0).to_owned_datum(), + (Some(ScalarImpl::Int32(2))) + ); + } + { + let (op, row) = rows.next().unwrap(); + assert_eq!(op, Op::Insert); + assert_eq!( + row.datum_at(2).to_owned_datum(), + (Some(ScalarImpl::Int32(15541))) + ); + assert_eq!( + row.datum_at(1).to_owned_datum(), + (Some(ScalarImpl::Utf8("a,1,1,".into()))) + ); + assert_eq!( + row.datum_at(0).to_owned_datum(), + (Some(ScalarImpl::Int32(4))) + ); + } + + { + let (op, row) = rows.next().unwrap(); + assert_eq!(op, Op::Insert); + assert_eq!( + row.datum_at(2).to_owned_datum(), + (Some(ScalarImpl::Int32(0))) + ); + assert_eq!( + row.datum_at(1).to_owned_datum(), + (Some(ScalarImpl::Utf8("\"0".into()))) + ); + assert_eq!( + row.datum_at(0).to_owned_datum(), + (Some(ScalarImpl::Int32(0))) + ); + } + + { + let (op, row) = rows.next().unwrap(); + assert_eq!(op, Op::Insert); + assert_eq!( + row.datum_at(2).to_owned_datum(), + (Some(ScalarImpl::Int32(0))) + ); + assert_eq!( + row.datum_at(1).to_owned_datum(), + (Some(ScalarImpl::Utf8("0".into()))) + ); + assert_eq!( + row.datum_at(0).to_owned_datum(), + (Some(ScalarImpl::Int32(0))) + ); + } } } From 550d1625bdcaad5c596e9facbcacdf4ee26bb893 Mon Sep 17 00:00:00 2001 From: idx0-dev <124041366+idx0-dev@users.noreply.github.com> Date: Fri, 10 Mar 2023 14:51:23 +0800 Subject: [PATCH 3/7] update script --- ci/scripts/s3-source-test.sh | 20 ++++---------------- 1 file changed, 4 insertions(+), 16 deletions(-) diff --git a/ci/scripts/s3-source-test.sh b/ci/scripts/s3-source-test.sh index f047cec057454..4d482568cad26 100755 --- a/ci/scripts/s3-source-test.sh +++ b/ci/scripts/s3-source-test.sh @@ -4,26 +4,12 @@ set -euo pipefail source ci/scripts/common.env.sh -while getopts 'p:' opt; do +while getopts 'p:s:' opt; do case ${opt} in p ) profile=$OPTARG ;; - \? ) - echo "Invalid Option: -$OPTARG" 1>&2 - exit 1 - ;; - : ) - echo "Invalid option: $OPTARG requires an argument" 1>&2 - ;; - esac -done -shift $((OPTIND -1)) - - -while getopts 's:' opt; do - case ${opt} in - p ) + s ) script=$OPTARG ;; \? ) @@ -37,6 +23,8 @@ while getopts 's:' opt; do done shift $((OPTIND -1)) + + echo "--- Download artifacts" mkdir -p target/debug buildkite-agent artifact download risingwave-"$profile" target/debug/ From b0a064d439f64659b9fc77ba27b443479dc0c43e Mon Sep 17 00:00:00 2001 From: idx0-dev <124041366+idx0-dev@users.noreply.github.com> Date: Fri, 10 Mar 2023 15:16:17 +0800 Subject: [PATCH 4/7] move to main cron --- ci/workflows/main-cron.yml | 15 ++ ci/workflows/pull-request.yml | 264 +++++++++++++++++++++++++++++++++- 2 files changed, 273 insertions(+), 6 deletions(-) diff --git a/ci/workflows/main-cron.yml b/ci/workflows/main-cron.yml index 41c7ef7a4cbc8..3b7c92dd76005 100644 --- a/ci/workflows/main-cron.yml +++ b/ci/workflows/main-cron.yml @@ -215,3 +215,18 @@ steps: - S3_SOURCE_TEST_CONF timeout_in_minutes: 20 retry: *auto-retry + - label: "S3 source check on AWS (csv parser)" + command: "ci/scripts/s3-source-test.sh -p ci-dev -s run_csv" + depends_on: build + plugins: + - seek-oss/aws-sm#v2.3.1: + env: + S3_SOURCE_TEST_CONF: ci_s3_source_test_aws + - docker-compose#v4.9.0: + run: rw-build-env + config: ci/docker-compose.yml + mount-buildkite-agent: true + environment: + - S3_SOURCE_TEST_CONF + timeout_in_minutes: 20 + retry: *auto-retry \ No newline at end of file diff --git a/ci/workflows/pull-request.yml b/ci/workflows/pull-request.yml index 6ed3d08c51ff9..ccb6d7d6999f1 100644 --- a/ci/workflows/pull-request.yml +++ b/ci/workflows/pull-request.yml @@ -43,18 +43,270 @@ steps: timeout_in_minutes: 15 retry: *auto-retry - - label: "S3 source check on AWS (csv parser)" - command: "ci/scripts/s3-source-test.sh -p ci-dev -s run_csv" - depends_on: build + - label: "build other components" + command: "ci/scripts/build-other.sh" + key: "build-other" plugins: - seek-oss/aws-sm#v2.3.1: env: - S3_SOURCE_TEST_CONF: ci_s3_source_test_aws + GITHUB_TOKEN: github-token - docker-compose#v4.9.0: run: rw-build-env config: ci/docker-compose.yml mount-buildkite-agent: true environment: - - S3_SOURCE_TEST_CONF + - GITHUB_TOKEN + timeout_in_minutes: 10 + retry: *auto-retry + + - label: "build (deterministic simulation)" + command: "ci/scripts/build-simulation.sh" + key: "build-simulation" + plugins: + - gencer/cache#v2.4.10: *cargo-cache + - docker-compose#v4.9.0: + run: rw-build-env + config: ci/docker-compose.yml + mount-buildkite-agent: true + timeout_in_minutes: 15 + retry: *auto-retry + + - label: "docslt" + command: "ci/scripts/docslt.sh" + key: "docslt" + plugins: + - gencer/cache#v2.4.10: *cargo-cache + - docker-compose#v4.9.0: + run: rw-build-env + config: ci/docker-compose.yml + mount-buildkite-agent: true + timeout_in_minutes: 10 + retry: *auto-retry + + - label: "end-to-end test" + command: "ci/scripts/e2e-test.sh -p ci-dev" + depends_on: + - "build" + - "docslt" + plugins: + - gencer/cache#v2.4.10: *cargo-cache + - docker-compose#v4.9.0: + run: rw-build-env + config: ci/docker-compose.yml + mount-buildkite-agent: true + - ./ci/plugins/upload-failure-logs + timeout_in_minutes: 12 + retry: *auto-retry + + - label: "end-to-end test (parallel)" + command: "ci/scripts/e2e-test-parallel.sh -p ci-dev" + depends_on: + - "build" + - "docslt" + plugins: + - gencer/cache#v2.4.10: *cargo-cache + - docker-compose#v4.9.0: + run: rw-build-env + config: ci/docker-compose.yml + mount-buildkite-agent: true + - ./ci/plugins/upload-failure-logs + timeout_in_minutes: 12 + retry: *auto-retry + + - label: "end-to-end test (parallel, in-memory)" + command: "ci/scripts/e2e-test-parallel-in-memory.sh -p ci-dev" + depends_on: "build" + plugins: + - gencer/cache#v2.4.10: *cargo-cache + - docker-compose#v4.9.0: + run: rw-build-env + config: ci/docker-compose.yml + mount-buildkite-agent: true + - ./ci/plugins/upload-failure-logs + timeout_in_minutes: 12 + retry: *auto-retry + + - label: "end-to-end source test" + command: "ci/scripts/e2e-source-test.sh -p ci-dev" + depends_on: + - "build" + - "build-other" + plugins: + - gencer/cache#v2.4.10: *cargo-cache + - docker-compose#v4.9.0: + run: source-test-env + config: ci/docker-compose.yml + mount-buildkite-agent: true + - ./ci/plugins/upload-failure-logs + timeout_in_minutes: 18 + retry: *auto-retry + + - label: "end-to-end sink test" + command: "ci/scripts/e2e-sink-test.sh -p ci-dev" + depends_on: + - "build" + - "build-other" + plugins: + - gencer/cache#v2.4.10: *cargo-cache + - docker-compose#v4.9.0: + run: sink-test-env + config: ci/docker-compose.yml + mount-buildkite-agent: true + - ./ci/plugins/upload-failure-logs + timeout_in_minutes: 5 + retry: *auto-retry + + - label: "end-to-end iceberg sink test" + command: "ci/scripts/e2e-iceberg-sink-test.sh -p ci-dev" + depends_on: + - "build" + - "build-other" + plugins: + - gencer/cache#v2.4.10: *cargo-cache + - docker-compose#v4.9.0: + run: rw-build-env + config: ci/docker-compose.yml + mount-buildkite-agent: true + - ./ci/plugins/upload-failure-logs + timeout_in_minutes: 5 + retry: *auto-retry + + - label: "e2e java-binding test" + command: "ci/scripts/java-binding-test.sh -p ci-dev" + depends_on: "build" + plugins: + - gencer/cache#v2.4.10: *cargo-cache + - docker-compose#v4.9.0: + run: rw-build-env + config: ci/docker-compose.yml + mount-buildkite-agent: true + - ./ci/plugins/upload-failure-logs + timeout_in_minutes: 5 + retry: *auto-retry + + - label: "regress test" + command: "ci/scripts/regress-test.sh -p ci-dev" + depends_on: "build" + plugins: + - gencer/cache#v2.4.10: *cargo-cache + - docker-compose#v4.9.0: + run: regress-test-env + config: ci/docker-compose.yml + mount-buildkite-agent: true + - ./ci/plugins/upload-failure-logs + timeout_in_minutes: 5 + retry: *auto-retry + + - label: "unit test" + command: "ci/scripts/pr-unit-test.sh" + plugins: + - ./ci/plugins/swapfile + - gencer/cache#v2.4.10: *cargo-cache + - seek-oss/aws-sm#v2.3.1: + env: + CODECOV_TOKEN: my-codecov-token + - docker-compose#v4.9.0: + run: rw-build-env + config: ci/docker-compose.yml + environment: + - CODECOV_TOKEN + timeout_in_minutes: 12 + retry: *auto-retry + + - label: "fuzz test" + command: "ci/scripts/pr-fuzz-test.sh -p ci-dev" + depends_on: + - "build" + - "build-simulation" + plugins: + - ./ci/plugins/swapfile + - gencer/cache#v2.4.10: *cargo-cache + - docker-compose#v4.9.0: + run: rw-build-env + config: ci/docker-compose.yml + mount-buildkite-agent: true + - ./ci/plugins/upload-failure-logs + timeout_in_minutes: 15 + retry: *auto-retry + + - label: "check" + command: "ci/scripts/check.sh" + plugins: + - gencer/cache#v2.4.10: *cargo-cache + - docker-compose#v4.9.0: + run: rw-build-env + config: ci/docker-compose.yml timeout_in_minutes: 20 - retry: *auto-retry \ No newline at end of file + retry: *auto-retry + + - label: "unit test (deterministic simulation)" + command: "ci/scripts/deterministic-unit-test.sh" + plugins: + - gencer/cache#v2.4.10: *cargo-cache + - docker-compose#v4.9.0: + run: rw-build-env + config: ci/docker-compose.yml + mount-buildkite-agent: true + timeout_in_minutes: 10 + retry: *auto-retry + + - label: "scaling test (deterministic simulation)" + command: "TEST_NUM=5 ci/scripts/deterministic-scale-test.sh" + depends_on: "build-simulation" + plugins: + - gencer/cache#v2.4.10: *cargo-cache + - docker-compose#v4.9.0: + run: rw-build-env + config: ci/docker-compose.yml + mount-buildkite-agent: true + timeout_in_minutes: 10 + retry: *auto-retry + + - label: "end-to-end test (deterministic simulation)" + command: "TEST_NUM=16 timeout 14m ci/scripts/deterministic-e2e-test.sh" + depends_on: "build-simulation" + plugins: + - seek-oss/aws-sm#v2.3.1: + env: + GITHUB_TOKEN: github-token + - gencer/cache#v2.4.10: *cargo-cache + - docker-compose#v4.9.0: + run: rw-build-env + config: ci/docker-compose.yml + mount-buildkite-agent: true + environment: + - GITHUB_TOKEN + - ./ci/plugins/upload-failure-logs + timeout_in_minutes: 15 + retry: *auto-retry + + - label: "recovery test (deterministic simulation)" + command: "TEST_NUM=8 KILL_RATE=0.5 timeout 14m ci/scripts/deterministic-recovery-test.sh" + depends_on: "build-simulation" + plugins: + - gencer/cache#v2.4.10: *cargo-cache + # - seek-oss/aws-sm#v2.3.1: + # env: + # BUILDKITE_ANALYTICS_TOKEN: buildkite-build-analytics-deterministic-token + - docker-compose#v4.9.0: + run: rw-build-env + config: ci/docker-compose.yml + mount-buildkite-agent: true + # - test-collector#v1.0.0: + # files: "*-junit.xml" + # format: "junit" + - ./ci/plugins/upload-failure-logs + timeout_in_minutes: 15 + retry: *auto-retry + + - label: "misc check" + command: "ci/scripts/misc-check.sh" + plugins: + - gencer/cache#v2.4.10: *cargo-cache + - docker-compose#v4.9.0: + run: rw-build-env + config: ci/docker-compose.yml + - shellcheck#v1.2.0: + files: ./**/*.sh + timeout_in_minutes: 5 + retry: *auto-retry From c9ab6ada59f6ef7320422b79465b9a3745498e16 Mon Sep 17 00:00:00 2001 From: idx0-dev <124041366+idx0-dev@users.noreply.github.com> Date: Fri, 10 Mar 2023 17:47:03 +0800 Subject: [PATCH 5/7] move to main cron --- ci/workflows/main-cron.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ci/workflows/main-cron.yml b/ci/workflows/main-cron.yml index 3b7c92dd76005..fc982d7af0bba 100644 --- a/ci/workflows/main-cron.yml +++ b/ci/workflows/main-cron.yml @@ -216,7 +216,7 @@ steps: timeout_in_minutes: 20 retry: *auto-retry - label: "S3 source check on AWS (csv parser)" - command: "ci/scripts/s3-source-test.sh -p ci-dev -s run_csv" + command: "ci/scripts/s3-source-test.sh -p ci-release -s run_csv" depends_on: build plugins: - seek-oss/aws-sm#v2.3.1: From 1cfb61117dd09cd0cd470749e6e4367eb3ff05da Mon Sep 17 00:00:00 2001 From: idx0-dev <124041366+idx0-dev@users.noreply.github.com> Date: Fri, 10 Mar 2023 17:58:59 +0800 Subject: [PATCH 6/7] fix clippy --- src/connector/src/macros.rs | 1 + src/connector/src/parser/mod.rs | 1 - 2 files changed, 1 insertion(+), 1 deletion(-) diff --git a/src/connector/src/macros.rs b/src/connector/src/macros.rs index 5e95a2d48c10f..f7e012c45b64c 100644 --- a/src/connector/src/macros.rs +++ b/src/connector/src/macros.rs @@ -163,6 +163,7 @@ macro_rules! impl_connector_properties { macro_rules! impl_common_parser_logic { ($parser_name:ty) => { impl $parser_name { + #[allow(clippy::unused_mut)] #[try_stream(boxed, ok = $crate::source::StreamChunkWithState, error = RwError)] async fn into_chunk_stream(mut self, data_stream: $crate::source::BoxSourceStream) { #[for_await] diff --git a/src/connector/src/parser/mod.rs b/src/connector/src/parser/mod.rs index fafcad7b8f4c3..b995d511e9277 100644 --- a/src/connector/src/parser/mod.rs +++ b/src/connector/src/parser/mod.rs @@ -33,7 +33,6 @@ pub use self::csv_parser::CsvParserConfig; use crate::parser::maxwell::MaxwellParser; use crate::source::{ BoxSourceStream, BoxSourceWithStateStream, SourceColumnDesc, SourceContextRef, SourceFormat, - StreamChunkWithState, }; mod avro; From 7d0b37750a98e753dc20197624b0555f20081ea0 Mon Sep 17 00:00:00 2001 From: idx0-dev <124041366+idx0-dev@users.noreply.github.com> Date: Fri, 10 Mar 2023 18:05:05 +0800 Subject: [PATCH 7/7] fix clippy --- src/connector/src/macros.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/connector/src/macros.rs b/src/connector/src/macros.rs index f7e012c45b64c..32344714d83f8 100644 --- a/src/connector/src/macros.rs +++ b/src/connector/src/macros.rs @@ -163,7 +163,7 @@ macro_rules! impl_connector_properties { macro_rules! impl_common_parser_logic { ($parser_name:ty) => { impl $parser_name { - #[allow(clippy::unused_mut)] + #[allow(unused_mut)] #[try_stream(boxed, ok = $crate::source::StreamChunkWithState, error = RwError)] async fn into_chunk_stream(mut self, data_stream: $crate::source::BoxSourceStream) { #[for_await]