From 79dd589d324e096ad5a5f796b14bb6afc8366d02 Mon Sep 17 00:00:00 2001 From: xxchan Date: Sat, 8 Jun 2024 22:55:54 +0800 Subject: [PATCH 01/18] bump some dependencies Signed-off-by: xxchan --- Cargo.lock | 24 ++++++++++++++++-------- 1 file changed, 16 insertions(+), 8 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 3ccd821d4caa..c21e4a45bae3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5431,9 +5431,9 @@ dependencies = [ [[package]] name = "futures-async-stream" -version = "0.2.9" +version = "0.2.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "379790776b0d953337df4ab7ecc51936c66ea112484cad7912907b1d34253ebf" +checksum = "6cce57e88ba9fe4953f476112b2c8e315a2da07725a14dc091ac3e5b6e4cca72" dependencies = [ "futures-async-stream-macro", "futures-core", @@ -5442,9 +5442,9 @@ dependencies = [ [[package]] name = "futures-async-stream-macro" -version = "0.2.9" +version = "0.2.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5df2c13d48c8cb8a3ec093ede6f0f4482f327d7bb781120c5fb483ef0f17e758" +checksum = "5ac45ed0bddbd110eb68862768a194f88700f5b91c39931d2f432fab67a16d08" dependencies = [ "proc-macro2", "quote", @@ -7918,6 +7918,12 @@ dependencies = [ "num-traits", ] +[[package]] +name = "num-conv" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "51d515d32fb182ee37cda2ccdcb92950d6a3c2893aa280e540671c2cd0f3b1d9" + [[package]] name = "num-format" version = "0.4.4" @@ -14661,13 +14667,14 @@ dependencies = [ [[package]] name = "time" -version = "0.3.30" +version = "0.3.36" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c4a34ab300f2dee6e562c10a046fc05e358b29f9bf92277f30c3c8d82275f6f5" +checksum = "5dfd88e563464686c916c7e46e623e520ddc6d79fa6641390f2e3fa86e83e885" dependencies = [ "deranged", "itoa", "libc", + "num-conv", "num_threads", "powerfmt", "serde", @@ -14683,10 +14690,11 @@ checksum = "ef927ca75afb808a4d64dd374f00a2adf8d0fcff8e7b184af886c3c87ec4a3f3" [[package]] name = "time-macros" -version = "0.2.15" +version = "0.2.18" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4ad70d68dba9e1f8aceda7aa6711965dfec1cac869f311a51bd08b3a2ccbce20" +checksum = "3f252a68540fde3a3877aeea552b832b40ab9a69e318efd078774a01ddee1ccf" dependencies = [ + "num-conv", "time-core", ] From d9286f69c9db8302c479a3d2a6dee2039e56c50c Mon Sep 17 00:00:00 2001 From: xxchan Date: Sat, 8 Jun 2024 22:57:20 +0800 Subject: [PATCH 02/18] remove stablized features --- src/batch/src/lib.rs | 1 - src/cmd_all/src/lib.rs | 2 -- src/common/metrics/src/lib.rs | 1 - src/common/src/lib.rs | 2 -- src/compute/src/lib.rs | 1 - src/connector/benches/nexmark_integration.rs | 2 -- src/connector/codec/src/lib.rs | 1 - src/connector/src/lib.rs | 1 - src/expr/core/src/lib.rs | 1 - src/expr/impl/src/lib.rs | 2 -- src/frontend/src/lib.rs | 1 - src/jni_core/src/lib.rs | 1 - src/jni_core/src/macros.rs | 1 - src/meta/service/src/lib.rs | 1 - src/meta/src/lib.rs | 1 - src/object_store/src/lib.rs | 1 - src/storage/backup/src/lib.rs | 1 - src/storage/benches/bench_block_iter.rs | 1 - src/storage/benches/bench_table_watermarks.rs | 2 -- src/storage/hummock_trace/src/lib.rs | 1 - src/storage/src/lib.rs | 2 -- src/stream/src/lib.rs | 1 - src/tests/simulation/src/lib.rs | 1 - src/tests/simulation/src/main.rs | 1 - src/tests/simulation/tests/integration_tests/main.rs | 1 - src/tests/sqlsmith/src/lib.rs | 1 - src/utils/pgwire/src/lib.rs | 1 - 27 files changed, 33 deletions(-) diff --git a/src/batch/src/lib.rs b/src/batch/src/lib.rs index 3a29e2a90b27..8de827f3cbf7 100644 --- a/src/batch/src/lib.rs +++ b/src/batch/src/lib.rs @@ -28,7 +28,6 @@ #![feature(allocator_api)] #![feature(impl_trait_in_assoc_type)] #![feature(assert_matches)] -#![feature(lazy_cell)] #![feature(error_generic_member_access)] #![feature(map_try_insert)] diff --git a/src/cmd_all/src/lib.rs b/src/cmd_all/src/lib.rs index a872d7de12b3..a5e45f2a8328 100644 --- a/src/cmd_all/src/lib.rs +++ b/src/cmd_all/src/lib.rs @@ -12,8 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -#![feature(lazy_cell)] - mod common; mod standalone; diff --git a/src/common/metrics/src/lib.rs b/src/common/metrics/src/lib.rs index 574e684c6703..59879be89dd8 100644 --- a/src/common/metrics/src/lib.rs +++ b/src/common/metrics/src/lib.rs @@ -12,7 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -#![feature(lazy_cell)] #![feature(type_alias_impl_trait)] #![feature(impl_trait_in_assoc_type)] use std::ops::Deref; diff --git a/src/common/src/lib.rs b/src/common/src/lib.rs index 1cbb2d837aa7..abcfb09dd251 100644 --- a/src/common/src/lib.rs +++ b/src/common/src/lib.rs @@ -26,7 +26,6 @@ #![feature(lint_reasons)] #![feature(coroutines)] #![feature(map_try_insert)] -#![feature(lazy_cell)] #![feature(error_generic_member_access)] #![feature(let_chains)] #![feature(portable_simd)] @@ -35,7 +34,6 @@ #![allow(incomplete_features)] #![feature(iterator_try_collect)] #![feature(iter_order_by)] -#![feature(exclusive_range_pattern)] #![feature(binary_heap_into_iter_sorted)] #![feature(impl_trait_in_assoc_type)] #![feature(map_entry_replace)] diff --git a/src/compute/src/lib.rs b/src/compute/src/lib.rs index 87f052470ea7..763f7c0f3670 100644 --- a/src/compute/src/lib.rs +++ b/src/compute/src/lib.rs @@ -18,7 +18,6 @@ #![feature(let_chains)] #![feature(lint_reasons)] #![feature(impl_trait_in_assoc_type)] -#![feature(lazy_cell)] #![cfg_attr(coverage, feature(coverage_attribute))] #[macro_use] diff --git a/src/connector/benches/nexmark_integration.rs b/src/connector/benches/nexmark_integration.rs index 172931562efe..f59ae08a9ec1 100644 --- a/src/connector/benches/nexmark_integration.rs +++ b/src/connector/benches/nexmark_integration.rs @@ -18,8 +18,6 @@ //! `ByteStreamSourceParserImpl::create` based on the given configuration, rather //! than depending on a specific internal implementation. -#![feature(lazy_cell)] - use std::sync::LazyLock; use criterion::{criterion_group, criterion_main, BatchSize, Criterion}; diff --git a/src/connector/codec/src/lib.rs b/src/connector/codec/src/lib.rs index 67198c78516a..55e9e6b0e2fd 100644 --- a/src/connector/codec/src/lib.rs +++ b/src/connector/codec/src/lib.rs @@ -22,7 +22,6 @@ #![feature(box_patterns)] #![feature(trait_alias)] #![feature(lint_reasons)] -#![feature(lazy_cell)] #![feature(let_chains)] #![feature(box_into_inner)] #![feature(type_alias_impl_trait)] diff --git a/src/connector/src/lib.rs b/src/connector/src/lib.rs index 8c0ade401cb4..35614aebe192 100644 --- a/src/connector/src/lib.rs +++ b/src/connector/src/lib.rs @@ -20,7 +20,6 @@ #![feature(box_patterns)] #![feature(trait_alias)] #![feature(lint_reasons)] -#![feature(lazy_cell)] #![feature(let_chains)] #![feature(box_into_inner)] #![feature(type_alias_impl_trait)] diff --git a/src/expr/core/src/lib.rs b/src/expr/core/src/lib.rs index b250b8ce901f..d45d4ca11f80 100644 --- a/src/expr/core/src/lib.rs +++ b/src/expr/core/src/lib.rs @@ -15,7 +15,6 @@ #![feature(let_chains)] #![feature(lint_reasons)] #![feature(iterator_try_collect)] -#![feature(lazy_cell)] #![feature(coroutines)] #![feature(never_type)] #![feature(error_generic_member_access)] diff --git a/src/expr/impl/src/lib.rs b/src/expr/impl/src/lib.rs index 56bdbe3b8110..e5c69c2660ee 100644 --- a/src/expr/impl/src/lib.rs +++ b/src/expr/impl/src/lib.rs @@ -25,8 +25,6 @@ #![feature(assert_matches)] #![feature(lint_reasons)] #![feature(iterator_try_collect)] -#![feature(exclusive_range_pattern)] -#![feature(lazy_cell)] #![feature(coroutines)] #![feature(test)] #![feature(iter_array_chunks)] diff --git a/src/frontend/src/lib.rs b/src/frontend/src/lib.rs index 5f0c03061c0a..c92fa46ed2f0 100644 --- a/src/frontend/src/lib.rs +++ b/src/frontend/src/lib.rs @@ -24,7 +24,6 @@ #![feature(assert_matches)] #![feature(lint_reasons)] #![feature(box_patterns)] -#![feature(lazy_cell)] #![feature(macro_metavar_expr)] #![feature(min_specialization)] #![feature(extend_one)] diff --git a/src/jni_core/src/lib.rs b/src/jni_core/src/lib.rs index 18d1807948d2..08dd2501657c 100644 --- a/src/jni_core/src/lib.rs +++ b/src/jni_core/src/lib.rs @@ -13,7 +13,6 @@ // limitations under the License. #![feature(error_generic_member_access)] -#![feature(lazy_cell)] #![feature(once_cell_try)] #![feature(type_alias_impl_trait)] #![feature(try_blocks)] diff --git a/src/jni_core/src/macros.rs b/src/jni_core/src/macros.rs index 982ccda06ecf..2e7d095e0bd4 100644 --- a/src/jni_core/src/macros.rs +++ b/src/jni_core/src/macros.rs @@ -375,7 +375,6 @@ macro_rules! to_jvalue { /// Generate the jni signature of a given function /// ``` -/// #![feature(lazy_cell)] /// use risingwave_jni_core::gen_jni_sig; /// assert_eq!(gen_jni_sig!(boolean f(int, short, byte[])), "(IS[B)Z"); /// assert_eq!( diff --git a/src/meta/service/src/lib.rs b/src/meta/service/src/lib.rs index 80a83349f2cc..9ab248802772 100644 --- a/src/meta/service/src/lib.rs +++ b/src/meta/service/src/lib.rs @@ -14,7 +14,6 @@ #![feature(lint_reasons)] #![feature(let_chains)] -#![feature(lazy_cell)] #![feature(impl_trait_in_assoc_type)] #![cfg_attr(coverage, feature(coverage_attribute))] diff --git a/src/meta/src/lib.rs b/src/meta/src/lib.rs index 71c99a7e065b..811b3b152d06 100644 --- a/src/meta/src/lib.rs +++ b/src/meta/src/lib.rs @@ -20,7 +20,6 @@ #![feature(extract_if)] #![feature(hash_extract_if)] #![feature(btree_extract_if)] -#![feature(lazy_cell)] #![feature(let_chains)] #![feature(error_generic_member_access)] #![feature(assert_matches)] diff --git a/src/object_store/src/lib.rs b/src/object_store/src/lib.rs index 811d482587ac..d9e768b7f029 100644 --- a/src/object_store/src/lib.rs +++ b/src/object_store/src/lib.rs @@ -14,7 +14,6 @@ #![feature(trait_alias)] #![feature(type_alias_impl_trait)] -#![feature(lazy_cell)] #![feature(lint_reasons)] #![feature(error_generic_member_access)] #![feature(let_chains)] diff --git a/src/storage/backup/src/lib.rs b/src/storage/backup/src/lib.rs index 6b569277c54d..dbb150030194 100644 --- a/src/storage/backup/src/lib.rs +++ b/src/storage/backup/src/lib.rs @@ -21,7 +21,6 @@ #![feature(map_try_insert)] #![feature(hash_extract_if)] #![feature(btree_extract_if)] -#![feature(lazy_cell)] #![feature(let_chains)] #![feature(error_generic_member_access)] #![cfg_attr(coverage, feature(coverage_attribute))] diff --git a/src/storage/benches/bench_block_iter.rs b/src/storage/benches/bench_block_iter.rs index f58499e07282..a0ea7cfd844d 100644 --- a/src/storage/benches/bench_block_iter.rs +++ b/src/storage/benches/bench_block_iter.rs @@ -12,7 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -#![feature(lazy_cell)] use std::sync::LazyLock; use bytes::{BufMut, Bytes, BytesMut}; diff --git a/src/storage/benches/bench_table_watermarks.rs b/src/storage/benches/bench_table_watermarks.rs index f1fdf66c2f89..3c7e470d7000 100644 --- a/src/storage/benches/bench_table_watermarks.rs +++ b/src/storage/benches/bench_table_watermarks.rs @@ -12,8 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -#![feature(lazy_cell)] - use std::collections::hash_map::Entry; use std::collections::{HashMap, HashSet, VecDeque}; use std::sync::{Arc, LazyLock}; diff --git a/src/storage/hummock_trace/src/lib.rs b/src/storage/hummock_trace/src/lib.rs index 64417832206e..48b0a71010a7 100644 --- a/src/storage/hummock_trace/src/lib.rs +++ b/src/storage/hummock_trace/src/lib.rs @@ -12,7 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -#![feature(lazy_cell)] #![feature(cursor_remaining)] #![feature(trait_alias)] #![feature(coroutines)] diff --git a/src/storage/src/lib.rs b/src/storage/src/lib.rs index 21c0c7f49ae4..c41f2eca4746 100644 --- a/src/storage/src/lib.rs +++ b/src/storage/src/lib.rs @@ -31,12 +31,10 @@ #![feature(is_sorted)] #![feature(btree_extract_if)] #![feature(exact_size_is_empty)] -#![feature(lazy_cell)] #![cfg_attr(coverage, feature(coverage_attribute))] #![recursion_limit = "256"] #![feature(error_generic_member_access)] #![feature(let_chains)] -#![feature(exclusive_range_pattern)] #![feature(impl_trait_in_assoc_type)] #![feature(maybe_uninit_uninit_array)] #![feature(maybe_uninit_array_assume_init)] diff --git a/src/stream/src/lib.rs b/src/stream/src/lib.rs index 51a735af0f5a..6805854346e6 100644 --- a/src/stream/src/lib.rs +++ b/src/stream/src/lib.rs @@ -29,7 +29,6 @@ #![feature(map_try_insert)] #![feature(never_type)] #![feature(btreemap_alloc)] -#![feature(lazy_cell)] #![feature(error_generic_member_access)] #![feature(btree_extract_if)] #![feature(iter_order_by)] diff --git a/src/tests/simulation/src/lib.rs b/src/tests/simulation/src/lib.rs index ae2614e094a6..aa6303b8e2f6 100644 --- a/src/tests/simulation/src/lib.rs +++ b/src/tests/simulation/src/lib.rs @@ -14,7 +14,6 @@ #![feature(trait_alias)] #![feature(lint_reasons)] -#![feature(lazy_cell)] #![feature(let_chains)] #![feature(try_blocks)] #![feature(register_tool)] diff --git a/src/tests/simulation/src/main.rs b/src/tests/simulation/src/main.rs index 31cc48804464..e45c4a3285ac 100644 --- a/src/tests/simulation/src/main.rs +++ b/src/tests/simulation/src/main.rs @@ -13,7 +13,6 @@ // limitations under the License. #![cfg_attr(not(madsim), allow(dead_code))] -#![feature(lazy_cell)] use std::path::PathBuf; diff --git a/src/tests/simulation/tests/integration_tests/main.rs b/src/tests/simulation/tests/integration_tests/main.rs index 475793a88b70..89c87ce69ecc 100644 --- a/src/tests/simulation/tests/integration_tests/main.rs +++ b/src/tests/simulation/tests/integration_tests/main.rs @@ -18,7 +18,6 @@ //! for the rationale behind this approach. #![feature(stmt_expr_attributes)] -#![feature(lazy_cell)] #![feature(extract_if)] mod backfill_tests; diff --git a/src/tests/sqlsmith/src/lib.rs b/src/tests/sqlsmith/src/lib.rs index 0c215c9146a4..2eb9c59d1bf3 100644 --- a/src/tests/sqlsmith/src/lib.rs +++ b/src/tests/sqlsmith/src/lib.rs @@ -14,7 +14,6 @@ #![feature(let_chains)] #![feature(if_let_guard)] -#![feature(lazy_cell)] #![feature(box_patterns)] #![feature(register_tool)] #![register_tool(rw)] diff --git a/src/utils/pgwire/src/lib.rs b/src/utils/pgwire/src/lib.rs index 96d439b8b94f..c7df6fc76a69 100644 --- a/src/utils/pgwire/src/lib.rs +++ b/src/utils/pgwire/src/lib.rs @@ -16,7 +16,6 @@ #![feature(trait_alias)] #![feature(iterator_try_collect)] #![feature(trusted_len)] -#![feature(lazy_cell)] #![feature(buf_read_has_data_left)] #![feature(round_char_boundary)] #![expect(clippy::doc_markdown, reason = "FIXME: later")] From c0bafd9e2cb3db42b4126d617e52996b7cd255a7 Mon Sep 17 00:00:00 2001 From: xxchan Date: Sat, 8 Jun 2024 22:57:32 +0800 Subject: [PATCH 03/18] update lint config Signed-off-by: xxchan --- Cargo.toml | 11 ++++++++--- clippy.toml | 3 +++ 2 files changed, 11 insertions(+), 3 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 457e260cb3b1..05188a57374f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -230,11 +230,16 @@ rw_iter_util = { path = "src/utils/iter_util" } [workspace.lints.rust] # `forbid` will also prevent the misuse of `#[allow(unused)]` unused_must_use = "forbid" -future_incompatible = "warn" -nonstandard_style = "warn" -rust_2018_idioms = "warn" +future_incompatible = { level = "warn", priority = -1 } +nonstandard_style = { level = "warn", priority = -1 } +rust_2018_idioms = { level = "warn", priority = -1 } # Backward compatibility is not important for an application. async_fn_in_trait = "allow" +unexpected_cfgs = { level = "warn", check-cfg = [ + 'cfg(madsim)', + 'cfg(coverage)', + 'cfg(dashboard_built)' +] } [workspace.lints.clippy] uninlined_format_args = "allow" diff --git a/clippy.toml b/clippy.toml index 551de0eb6c47..21b972376b0e 100644 --- a/clippy.toml +++ b/clippy.toml @@ -39,3 +39,6 @@ doc-valid-idents = [ avoid-breaking-exported-api = false upper-case-acronyms-aggressive = true too-many-arguments-threshold = 10 +ignore-interior-mutability = [ + "risingwave_frontend::expr::ExprImpl" # XXX: Where does ExprImpl have interior mutability? +] From dd218e3b679f77eb4129f7192e124cde4c00559a Mon Sep 17 00:00:00 2001 From: xxchan Date: Sat, 8 Jun 2024 22:59:55 +0800 Subject: [PATCH 04/18] fix coroutine Signed-off-by: xxchan --- .../src/sink/formatter/append_only.rs | 29 +-- .../src/sink/formatter/debezium_json.rs | 167 +++++++++--------- src/connector/src/sink/formatter/upsert.rs | 37 ++-- 3 files changed, 121 insertions(+), 112 deletions(-) diff --git a/src/connector/src/sink/formatter/append_only.rs b/src/connector/src/sink/formatter/append_only.rs index 8a4b7feda6a4..c0c47fa37e17 100644 --- a/src/connector/src/sink/formatter/append_only.rs +++ b/src/connector/src/sink/formatter/append_only.rs @@ -40,19 +40,22 @@ impl SinkFormatter for AppendOnlyFormatter impl Iterator, Option)>> { - std::iter::from_coroutine(|| { - for (op, row) in chunk.rows() { - if op != Op::Insert { - continue; - } - let event_key_object = match &self.key_encoder { - Some(key_encoder) => Some(tri!(key_encoder.encode(row))), - None => None, - }; - let event_object = Some(tri!(self.val_encoder.encode(row))); + std::iter::from_coroutine( + #[coroutine] + || { + for (op, row) in chunk.rows() { + if op != Op::Insert { + continue; + } + let event_key_object = match &self.key_encoder { + Some(key_encoder) => Some(tri!(key_encoder.encode(row))), + None => None, + }; + let event_object = Some(tri!(self.val_encoder.encode(row))); - yield Ok((event_key_object, event_object)) - } - }) + yield Ok((event_key_object, event_object)) + } + }, + ) } } diff --git a/src/connector/src/sink/formatter/debezium_json.rs b/src/connector/src/sink/formatter/debezium_json.rs index 6fff15058bd6..68d21fd0a3f4 100644 --- a/src/connector/src/sink/formatter/debezium_json.rs +++ b/src/connector/src/sink/formatter/debezium_json.rs @@ -98,100 +98,103 @@ impl SinkFormatter for DebeziumJsonFormatter { &self, chunk: &StreamChunk, ) -> impl Iterator, Option)>> { - std::iter::from_coroutine(|| { - let DebeziumJsonFormatter { - schema, - pk_indices, - db_name, - sink_from_name, - opts, - key_encoder, - val_encoder, - } = self; - let ts_ms = SystemTime::now() - .duration_since(UNIX_EPOCH) - .unwrap() - .as_millis() as u64; - let source_field = json!({ - // todo: still some missing fields in source field - // ref https://debezium.io/documentation/reference/2.4/connectors/postgresql.html#postgresql-create-events - "db": db_name, - "table": sink_from_name, - "ts_ms": ts_ms, - }); - - let mut update_cache: Option> = None; - - for (op, row) in chunk.rows() { - let event_key_object: Option = Some(json!({ - "schema": json!({ - "type": "struct", - "fields": fields_pk_to_json(&schema.fields, pk_indices), - "optional": false, - "name": concat_debezium_name_field(db_name, sink_from_name, "Key"), - }), - "payload": tri!(key_encoder.encode(row)), - })); - let event_object: Option = match op { - Op::Insert => Some(json!({ - "schema": schema_to_json(schema, db_name, sink_from_name), - "payload": { - "before": null, - "after": tri!(val_encoder.encode(row)), - "op": "c", - "ts_ms": ts_ms, - "source": source_field, - } - })), - Op::Delete => { - let value_obj = Some(json!({ + std::iter::from_coroutine( + #[coroutine] + || { + let DebeziumJsonFormatter { + schema, + pk_indices, + db_name, + sink_from_name, + opts, + key_encoder, + val_encoder, + } = self; + let ts_ms = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap() + .as_millis() as u64; + let source_field = json!({ + // todo: still some missing fields in source field + // ref https://debezium.io/documentation/reference/2.4/connectors/postgresql.html#postgresql-create-events + "db": db_name, + "table": sink_from_name, + "ts_ms": ts_ms, + }); + + let mut update_cache: Option> = None; + + for (op, row) in chunk.rows() { + let event_key_object: Option = Some(json!({ + "schema": json!({ + "type": "struct", + "fields": fields_pk_to_json(&schema.fields, pk_indices), + "optional": false, + "name": concat_debezium_name_field(db_name, sink_from_name, "Key"), + }), + "payload": tri!(key_encoder.encode(row)), + })); + let event_object: Option = match op { + Op::Insert => Some(json!({ "schema": schema_to_json(schema, db_name, sink_from_name), "payload": { - "before": tri!(val_encoder.encode(row)), - "after": null, - "op": "d", + "before": null, + "after": tri!(val_encoder.encode(row)), + "op": "c", "ts_ms": ts_ms, "source": source_field, } - })); - yield Ok((event_key_object.clone(), value_obj)); - - if opts.gen_tombstone { - // Tomestone event - // https://debezium.io/documentation/reference/2.1/connectors/postgresql.html#postgresql-delete-events - yield Ok((event_key_object, None)); - } - - continue; - } - Op::UpdateDelete => { - update_cache = Some(tri!(val_encoder.encode(row))); - continue; - } - Op::UpdateInsert => { - if let Some(before) = update_cache.take() { - Some(json!({ + })), + Op::Delete => { + let value_obj = Some(json!({ "schema": schema_to_json(schema, db_name, sink_from_name), "payload": { - "before": before, - "after": tri!(val_encoder.encode(row)), - "op": "u", + "before": tri!(val_encoder.encode(row)), + "after": null, + "op": "d", "ts_ms": ts_ms, "source": source_field, } - })) - } else { - warn!( - "not found UpdateDelete in prev row, skipping, row index {:?}", - row.index() - ); + })); + yield Ok((event_key_object.clone(), value_obj)); + + if opts.gen_tombstone { + // Tomestone event + // https://debezium.io/documentation/reference/2.1/connectors/postgresql.html#postgresql-delete-events + yield Ok((event_key_object, None)); + } + continue; } - } - }; - yield Ok((event_key_object, event_object)); - } - }) + Op::UpdateDelete => { + update_cache = Some(tri!(val_encoder.encode(row))); + continue; + } + Op::UpdateInsert => { + if let Some(before) = update_cache.take() { + Some(json!({ + "schema": schema_to_json(schema, db_name, sink_from_name), + "payload": { + "before": before, + "after": tri!(val_encoder.encode(row)), + "op": "u", + "ts_ms": ts_ms, + "source": source_field, + } + })) + } else { + warn!( + "not found UpdateDelete in prev row, skipping, row index {:?}", + row.index() + ); + continue; + } + } + }; + yield Ok((event_key_object, event_object)); + } + }, + ) } } diff --git a/src/connector/src/sink/formatter/upsert.rs b/src/connector/src/sink/formatter/upsert.rs index 612c22aaaf86..7e586a7917ea 100644 --- a/src/connector/src/sink/formatter/upsert.rs +++ b/src/connector/src/sink/formatter/upsert.rs @@ -40,22 +40,25 @@ impl SinkFormatter for UpsertFormatter { &self, chunk: &StreamChunk, ) -> impl Iterator, Option)>> { - std::iter::from_coroutine(|| { - for (op, row) in chunk.rows() { - let event_key_object = Some(tri!(self.key_encoder.encode(row))); - - let event_object = match op { - Op::Insert | Op::UpdateInsert => Some(tri!(self.val_encoder.encode(row))), - // Empty value with a key - Op::Delete => None, - Op::UpdateDelete => { - // upsert semantic does not require update delete event - continue; - } - }; - - yield Ok((event_key_object, event_object)) - } - }) + std::iter::from_coroutine( + #[coroutine] + || { + for (op, row) in chunk.rows() { + let event_key_object = Some(tri!(self.key_encoder.encode(row))); + + let event_object = match op { + Op::Insert | Op::UpdateInsert => Some(tri!(self.val_encoder.encode(row))), + // Empty value with a key + Op::Delete => None, + Op::UpdateDelete => { + // upsert semantic does not require update delete event + continue; + } + }; + + yield Ok((event_key_object, event_object)) + } + }, + ) } } From 9fd49844fd5215d86cd34caa43ce023b2b15b5fa Mon Sep 17 00:00:00 2001 From: xxchan Date: Sat, 8 Jun 2024 23:53:12 +0800 Subject: [PATCH 05/18] bump toolchain Signed-off-by: xxchan --- ci/rust-toolchain | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ci/rust-toolchain b/ci/rust-toolchain index 92ef4a5ff27b..2902e3301ea8 100644 --- a/ci/rust-toolchain +++ b/ci/rust-toolchain @@ -4,4 +4,4 @@ # 3. (optional) **follow the instructions in lints/README.md** to update the toolchain and dependencies for lints [toolchain] -channel = "nightly-2024-03-12" +channel = "nightly-2024-06-08" From 34902d54bad5e379115ebb17bb343212deea5e54 Mon Sep 17 00:00:00 2001 From: xxchan Date: Fri, 21 Jun 2024 00:02:53 +0800 Subject: [PATCH 06/18] fix Signed-off-by: xxchan --- Cargo.toml | 2 +- ci/scripts/build-other.sh | 2 +- lints/rust-toolchain | 2 +- .../sink/iceberg/prometheus/monitored_partition_writer.rs | 1 - .../src/sink/iceberg/prometheus/monitored_write_writer.rs | 1 - src/object_store/src/object/mod.rs | 5 +++++ src/storage/hummock_sdk/src/lib.rs | 1 - 7 files changed, 8 insertions(+), 6 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 05188a57374f..66dda8e7a088 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -238,7 +238,7 @@ async_fn_in_trait = "allow" unexpected_cfgs = { level = "warn", check-cfg = [ 'cfg(madsim)', 'cfg(coverage)', - 'cfg(dashboard_built)' + 'cfg(dashboard_built)', ] } [workspace.lints.clippy] diff --git a/ci/scripts/build-other.sh b/ci/scripts/build-other.sh index 65c50462f97a..d59e8ea876cc 100755 --- a/ci/scripts/build-other.sh +++ b/ci/scripts/build-other.sh @@ -5,9 +5,9 @@ set -euo pipefail source ci/scripts/common.sh - echo "--- Build Rust UDF" cd e2e_test/udf/wasm +rustup target add wasm32-wasi cargo build --release cd ../../.. diff --git a/lints/rust-toolchain b/lints/rust-toolchain index e79f69d40f86..1df393ec52bf 100644 --- a/lints/rust-toolchain +++ b/lints/rust-toolchain @@ -1,5 +1,5 @@ # See `README.md` before bumping the version. [toolchain] -channel = "nightly-2024-03-21" +channel = "nightly-2024-06-08" components = ["llvm-tools-preview", "rustc-dev"] diff --git a/src/connector/src/sink/iceberg/prometheus/monitored_partition_writer.rs b/src/connector/src/sink/iceberg/prometheus/monitored_partition_writer.rs index c2134d197455..c0bb5e097323 100644 --- a/src/connector/src/sink/iceberg/prometheus/monitored_partition_writer.rs +++ b/src/connector/src/sink/iceberg/prometheus/monitored_partition_writer.rs @@ -27,7 +27,6 @@ pub struct MonitoredFanoutPartitionedWriterBuilder { } impl MonitoredFanoutPartitionedWriterBuilder { - #[expect(dead_code)] pub fn new( inner: FanoutPartitionedWriterBuilder, partition_num: LabelGuardedIntGauge<2>, diff --git a/src/connector/src/sink/iceberg/prometheus/monitored_write_writer.rs b/src/connector/src/sink/iceberg/prometheus/monitored_write_writer.rs index be9f35aae2a5..0f2877490fb2 100644 --- a/src/connector/src/sink/iceberg/prometheus/monitored_write_writer.rs +++ b/src/connector/src/sink/iceberg/prometheus/monitored_write_writer.rs @@ -28,7 +28,6 @@ pub struct MonitoredWriteWriterBuilder { impl MonitoredWriteWriterBuilder { /// Create writer context. - #[expect(dead_code)] pub fn new( inner: B, write_qps: LabelGuardedIntCounter<2>, diff --git a/src/object_store/src/object/mod.rs b/src/object_store/src/object/mod.rs index c701a438253a..c5a32eedfba1 100644 --- a/src/object_store/src/object/mod.rs +++ b/src/object_store/src/object/mod.rs @@ -12,6 +12,11 @@ // See the License for the specific language governing permissions and // limitations under the License. +#![expect( + unexpected_cfgs, + reason = "feature(hdfs-backend) is banned https://github.com/risingwavelabs/risingwave/pull/7875" +)] + pub mod sim; use std::ops::{Range, RangeBounds}; use std::sync::Arc; diff --git a/src/storage/hummock_sdk/src/lib.rs b/src/storage/hummock_sdk/src/lib.rs index 97e1a334dcf9..06c9a358883f 100644 --- a/src/storage/hummock_sdk/src/lib.rs +++ b/src/storage/hummock_sdk/src/lib.rs @@ -22,7 +22,6 @@ #![feature(is_sorted)] #![feature(let_chains)] #![feature(btree_cursors)] -#![feature(lazy_cell)] mod key_cmp; use std::cmp::Ordering; From 7f285bc7a2e47e1fea50340b150555ebff8732d6 Mon Sep 17 00:00:00 2001 From: xxchan Date: Fri, 21 Jun 2024 00:13:04 +0800 Subject: [PATCH 07/18] fix opaque type Signed-off-by: xxchan --- src/connector/src/sink/kafka.rs | 17 ++++++---- src/connector/src/sink/kinesis.rs | 54 +++++++++++++++++-------------- src/connector/src/sink/pulsar.rs | 21 +++++++----- src/jni_core/src/lib.rs | 29 +++++++++-------- src/storage/src/store_impl.rs | 32 +++++++++++++----- 5 files changed, 94 insertions(+), 59 deletions(-) diff --git a/src/connector/src/sink/kafka.rs b/src/connector/src/sink/kafka.rs index 617f427ae71f..bb85c92ab893 100644 --- a/src/connector/src/sink/kafka.rs +++ b/src/connector/src/sink/kafka.rs @@ -403,7 +403,16 @@ struct KafkaPayloadWriter<'a> { config: &'a KafkaConfig, } -pub type KafkaSinkDeliveryFuture = impl TryFuture + Unpin + 'static; +mod opaque_type { + use super::*; + pub type KafkaSinkDeliveryFuture = impl TryFuture + Unpin + 'static; + + pub(super) fn map_delivery_future(future: DeliveryFuture) -> KafkaSinkDeliveryFuture { + future.map(KafkaPayloadWriter::<'static>::map_future_result) + } +} +use opaque_type::map_delivery_future; +pub use opaque_type::KafkaSinkDeliveryFuture; pub struct KafkaSinkWriter { formatter: SinkFormatterImpl, @@ -482,7 +491,7 @@ impl<'w> KafkaPayloadWriter<'w> { Ok(delivery_future) => { if self .add_future - .add_future_may_await(Self::map_delivery_future(delivery_future)) + .add_future_may_await(map_delivery_future(delivery_future)) .await? { tracing::warn!( @@ -567,10 +576,6 @@ impl<'w> KafkaPayloadWriter<'w> { Err(_) => Err(KafkaError::Canceled.into()), } } - - fn map_delivery_future(future: DeliveryFuture) -> KafkaSinkDeliveryFuture { - future.map(KafkaPayloadWriter::<'static>::map_future_result) - } } impl<'a> FormattedSink for KafkaPayloadWriter<'a> { diff --git a/src/connector/src/sink/kinesis.rs b/src/connector/src/sink/kinesis.rs index 771d3c8a6f91..3b5d49da4603 100644 --- a/src/connector/src/sink/kinesis.rs +++ b/src/connector/src/sink/kinesis.rs @@ -201,8 +201,36 @@ impl KinesisSinkWriter { } } -pub type KinesisSinkPayloadWriterDeliveryFuture = - impl TryFuture + Unpin + Send + 'static; +mod opaque_type { + use super::*; + pub type KinesisSinkPayloadWriterDeliveryFuture = + impl TryFuture + Unpin + Send + 'static; + + impl KinesisSinkPayloadWriter { + pub(super) fn finish(self) -> KinesisSinkPayloadWriterDeliveryFuture { + async move { + let builder = self.builder.expect("should not be None"); + let context_fmt = format!( + "failed to put record to {}", + builder + .get_stream_name() + .as_ref() + .expect("should have set stream name") + ); + Retry::spawn( + ExponentialBackoff::from_millis(100).map(jitter).take(3), + || builder.clone().send(), + ) + .await + .with_context(|| context_fmt.clone()) + .map_err(SinkError::Kinesis)?; + Ok(()) + } + .boxed() + } + } +} +pub use opaque_type::KinesisSinkPayloadWriterDeliveryFuture; impl KinesisSinkPayloadWriter { fn put_record(&mut self, key: String, payload: Vec) { @@ -216,28 +244,6 @@ impl KinesisSinkPayloadWriter { ), ); } - - fn finish(self) -> KinesisSinkPayloadWriterDeliveryFuture { - async move { - let builder = self.builder.expect("should not be None"); - let context_fmt = format!( - "failed to put record to {}", - builder - .get_stream_name() - .as_ref() - .expect("should have set stream name") - ); - Retry::spawn( - ExponentialBackoff::from_millis(100).map(jitter).take(3), - || builder.clone().send(), - ) - .await - .with_context(|| context_fmt.clone()) - .map_err(SinkError::Kinesis)?; - Ok(()) - } - .boxed() - } } impl FormattedSink for KinesisSinkPayloadWriter { diff --git a/src/connector/src/sink/pulsar.rs b/src/connector/src/sink/pulsar.rs index 3f016ad94946..a92d5b16f85e 100644 --- a/src/connector/src/sink/pulsar.rs +++ b/src/connector/src/sink/pulsar.rs @@ -235,15 +235,20 @@ struct PulsarPayloadWriter<'w> { add_future: DeliveryFutureManagerAddFuture<'w, PulsarDeliveryFuture>, } -pub type PulsarDeliveryFuture = impl TryFuture + Unpin + 'static; - -fn may_delivery_future(future: SendFuture) -> PulsarDeliveryFuture { - future.map(|result| { - result - .map(|_| ()) - .map_err(|e: pulsar::Error| SinkError::Pulsar(anyhow!(e))) - }) +mod opaque_type { + use super::*; + pub type PulsarDeliveryFuture = impl TryFuture + Unpin + 'static; + + pub(super) fn may_delivery_future(future: SendFuture) -> PulsarDeliveryFuture { + future.map(|result| { + result + .map(|_| ()) + .map_err(|e: pulsar::Error| SinkError::Pulsar(anyhow!(e))) + }) + } } +use opaque_type::may_delivery_future; +pub use opaque_type::PulsarDeliveryFuture; impl PulsarSinkWriter { pub async fn new( diff --git a/src/jni_core/src/lib.rs b/src/jni_core/src/lib.rs index 08dd2501657c..419f4ffd21cb 100644 --- a/src/jni_core/src/lib.rs +++ b/src/jni_core/src/lib.rs @@ -242,25 +242,28 @@ struct JavaClassMethodCache { utc: OnceLock, } -// TODO: may only return a RowRef -pub type StreamChunkRowIterator<'a> = impl Iterator + 'a; +mod opaque_type { + use super::*; + // TODO: may only return a RowRef + pub type StreamChunkRowIterator<'a> = impl Iterator + 'a; + + impl<'a> JavaBindingIteratorInner<'a> { + pub(super) fn from_chunk(chunk: &'a StreamChunk) -> JavaBindingIteratorInner<'a> { + JavaBindingIteratorInner::StreamChunk( + chunk + .rows() + .map(|(op, row)| (op.to_protobuf(), row.to_owned_row())), + ) + } + } +} +pub use opaque_type::StreamChunkRowIterator; pub type HummockJavaBindingIterator = BoxStream<'static, anyhow::Result<(Bytes, OwnedRow)>>; - pub enum JavaBindingIteratorInner<'a> { Hummock(HummockJavaBindingIterator), StreamChunk(StreamChunkRowIterator<'a>), } -impl<'a> JavaBindingIteratorInner<'a> { - fn from_chunk(chunk: &'a StreamChunk) -> JavaBindingIteratorInner<'a> { - JavaBindingIteratorInner::StreamChunk( - chunk - .rows() - .map(|(op, row)| (op.to_protobuf(), row.to_owned_row())), - ) - } -} - enum RowExtra { Op(Op), Key(Bytes), diff --git a/src/storage/src/store_impl.rs b/src/storage/src/store_impl.rs index 2512f680b536..2eae898d2779 100644 --- a/src/storage/src/store_impl.rs +++ b/src/storage/src/store_impl.rs @@ -42,9 +42,27 @@ use crate::monitor::{ use crate::opts::StorageOpts; use crate::StateStore; -pub type HummockStorageType = impl StateStore + AsHummock; -pub type MemoryStateStoreType = impl StateStore + AsHummock; -pub type SledStateStoreType = impl StateStore + AsHummock; +mod opaque_type { + use super::*; + + pub type HummockStorageType = impl StateStore + AsHummock; + pub type MemoryStateStoreType = impl StateStore + AsHummock; + pub type SledStateStoreType = impl StateStore + AsHummock; + + pub fn in_memory(state_store: MemoryStateStore) -> MemoryStateStoreType { + may_dynamic_dispatch(state_store) + } + + pub fn hummock(state_store: HummockStorage) -> HummockStorageType { + may_dynamic_dispatch(may_verify(state_store)) + } + + pub fn sled(state_store: SledStateStore) -> SledStateStoreType { + may_dynamic_dispatch(state_store) + } +} +use opaque_type::{hummock, in_memory, sled}; +pub use opaque_type::{HummockStorageType, MemoryStateStoreType, SledStateStoreType}; /// The type erased [`StateStore`]. #[derive(Clone, EnumAsInner)] @@ -114,7 +132,7 @@ impl StateStoreImpl { storage_metrics: Arc, ) -> Self { // The specific type of MemoryStateStoreType in deducted here. - Self::MemoryStateStore(may_dynamic_dispatch(state_store).monitored(storage_metrics)) + Self::MemoryStateStore(in_memory(state_store).monitored(storage_metrics)) } pub fn hummock( @@ -122,16 +140,14 @@ impl StateStoreImpl { storage_metrics: Arc, ) -> Self { // The specific type of HummockStateStoreType in deducted here. - Self::HummockStateStore( - may_dynamic_dispatch(may_verify(state_store)).monitored(storage_metrics), - ) + Self::HummockStateStore(hummock(state_store).monitored(storage_metrics)) } pub fn sled( state_store: SledStateStore, storage_metrics: Arc, ) -> Self { - Self::SledStateStore(may_dynamic_dispatch(state_store).monitored(storage_metrics)) + Self::SledStateStore(sled(state_store).monitored(storage_metrics)) } pub fn shared_in_memory_store(storage_metrics: Arc) -> Self { From 5a1d4b138a1ab8717664e464c07921c4e9ca4122 Mon Sep 17 00:00:00 2001 From: xxchan Date: Fri, 21 Jun 2024 00:54:50 +0800 Subject: [PATCH 08/18] bump higher Signed-off-by: xxchan --- ci/rust-toolchain | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ci/rust-toolchain b/ci/rust-toolchain index 2902e3301ea8..58a7b4e9a4ce 100644 --- a/ci/rust-toolchain +++ b/ci/rust-toolchain @@ -4,4 +4,4 @@ # 3. (optional) **follow the instructions in lints/README.md** to update the toolchain and dependencies for lints [toolchain] -channel = "nightly-2024-06-08" +channel = "nightly-2024-06-20" From 1b95f154a36aff29bc3e639397a95770a53c71aa Mon Sep 17 00:00:00 2001 From: xxchan Date: Fri, 21 Jun 2024 01:27:05 +0800 Subject: [PATCH 09/18] downgrade due to TAIT https://github.com/rust-lang/rust/pull/113169 Signed-off-by: xxchan --- ci/rust-toolchain | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ci/rust-toolchain b/ci/rust-toolchain index 58a7b4e9a4ce..1344210e6b76 100644 --- a/ci/rust-toolchain +++ b/ci/rust-toolchain @@ -4,4 +4,4 @@ # 3. (optional) **follow the instructions in lints/README.md** to update the toolchain and dependencies for lints [toolchain] -channel = "nightly-2024-06-20" +channel = "nightly-2024-06-12" From 39bded2500e7b1ab682239543b530d6dd7064275 Mon Sep 17 00:00:00 2001 From: xxchan Date: Wed, 31 Jul 2024 20:16:03 +0800 Subject: [PATCH 10/18] update for new code Signed-off-by: xxchan --- src/batch/src/executor/log_row_seq_scan.rs | 29 ++++++++-------- src/common/secret/src/lib.rs | 2 -- src/license/src/lib.rs | 2 -- src/storage/src/store.rs | 33 ++++++++++--------- .../src/executor/nested_loop_temporal_join.rs | 1 - 5 files changed, 34 insertions(+), 33 deletions(-) diff --git a/src/batch/src/executor/log_row_seq_scan.rs b/src/batch/src/executor/log_row_seq_scan.rs index 5783c788bb5e..3614673ee941 100644 --- a/src/batch/src/executor/log_row_seq_scan.rs +++ b/src/batch/src/executor/log_row_seq_scan.rs @@ -206,21 +206,24 @@ impl LogRowSeqScanExecutor { .batch_iter_log_with_pk_bounds(old_epoch, new_epoch) .await? .flat_map(|r| { - futures::stream::iter(std::iter::from_coroutine(move || { - match r { - Ok(change_log_row) => { - fn with_op(op: Op, row: impl Row) -> impl Row { - row.chain([Some(ScalarImpl::Int16(op.to_i16()))]) + futures::stream::iter(std::iter::from_coroutine( + #[coroutine] + move || { + match r { + Ok(change_log_row) => { + fn with_op(op: Op, row: impl Row) -> impl Row { + row.chain([Some(ScalarImpl::Int16(op.to_i16()))]) + } + for (op, row) in change_log_row.into_op_value_iter() { + yield Ok(with_op(op, row)); + } } - for (op, row) in change_log_row.into_op_value_iter() { - yield Ok(with_op(op, row)); + Err(e) => { + yield Err(e); } - } - Err(e) => { - yield Err(e); - } - }; - })) + }; + }, + )) }); pin_mut!(iter); diff --git a/src/common/secret/src/lib.rs b/src/common/secret/src/lib.rs index 8ac065e5ea18..17319a296734 100644 --- a/src/common/secret/src/lib.rs +++ b/src/common/secret/src/lib.rs @@ -12,8 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -#![feature(lazy_cell)] - type SecretId = u32; mod secret_manager; diff --git a/src/license/src/lib.rs b/src/license/src/lib.rs index 0e641be9789b..7f2b25d8f3fb 100644 --- a/src/license/src/lib.rs +++ b/src/license/src/lib.rs @@ -12,8 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -#![feature(lazy_cell)] - mod feature; mod manager; diff --git a/src/storage/src/store.rs b/src/storage/src/store.rs index fbff01658944..53643c47b541 100644 --- a/src/storage/src/store.rs +++ b/src/storage/src/store.rs @@ -193,21 +193,24 @@ impl ChangeLogValue { } pub fn into_op_value_iter(self) -> impl Iterator { - std::iter::from_coroutine(move || match self { - Self::Insert(row) => { - yield (Op::Insert, row); - } - Self::Delete(row) => { - yield (Op::Delete, row); - } - Self::Update { - old_value, - new_value, - } => { - yield (Op::UpdateDelete, old_value); - yield (Op::UpdateInsert, new_value); - } - }) + std::iter::from_coroutine( + #[coroutine] + move || match self { + Self::Insert(row) => { + yield (Op::Insert, row); + } + Self::Delete(row) => { + yield (Op::Delete, row); + } + Self::Update { + old_value, + new_value, + } => { + yield (Op::UpdateDelete, old_value); + yield (Op::UpdateInsert, new_value); + } + }, + ) } } diff --git a/src/stream/src/executor/nested_loop_temporal_join.rs b/src/stream/src/executor/nested_loop_temporal_join.rs index 0888d8981fc8..1cbeba9560a9 100644 --- a/src/stream/src/executor/nested_loop_temporal_join.rs +++ b/src/stream/src/executor/nested_loop_temporal_join.rs @@ -99,7 +99,6 @@ async fn phase1_handle_chunk( impl NestedLoopTemporalJoinExecutor { #[allow(clippy::too_many_arguments)] - #[expect(dead_code)] pub fn new( ctx: ActorContextRef, info: ExecutorInfo, From 71a060926930b2d3fc9a009e079ce602a4ba5f19 Mon Sep 17 00:00:00 2001 From: xxchan Date: Wed, 31 Jul 2024 22:43:51 +0800 Subject: [PATCH 11/18] fix dylint format_args: https://github.com/rust-lang/rust-clippy/pull/12567/ Signed-off-by: xxchan --- lints/Cargo.lock | 8 +++---- lints/Cargo.toml | 2 +- lints/README.md | 9 +++++-- lints/rust-toolchain | 2 +- lints/src/format_error.rs | 14 ++++++++--- lints/src/lib.rs | 13 ++++++---- lints/src/utils/format_args_collector.rs | 30 +++++++++++++----------- 7 files changed, 49 insertions(+), 29 deletions(-) diff --git a/lints/Cargo.lock b/lints/Cargo.lock index 3a9bd8384c0d..650509dca25b 100644 --- a/lints/Cargo.lock +++ b/lints/Cargo.lock @@ -162,8 +162,8 @@ checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" [[package]] name = "clippy_config" -version = "0.1.79" -source = "git+https://github.com/rust-lang/rust-clippy?rev=fca4e16ffb8c07186ee23becd44cd5c9fb51896c#fca4e16ffb8c07186ee23becd44cd5c9fb51896c" +version = "0.1.80" +source = "git+https://github.com/rust-lang/rust-clippy?rev=35f54fd439432f56c749fbb34f1326e34ed1fc42#35f54fd439432f56c749fbb34f1326e34ed1fc42" dependencies = [ "rustc-semver", "serde", @@ -172,8 +172,8 @@ dependencies = [ [[package]] name = "clippy_utils" -version = "0.1.79" -source = "git+https://github.com/rust-lang/rust-clippy?rev=fca4e16ffb8c07186ee23becd44cd5c9fb51896c#fca4e16ffb8c07186ee23becd44cd5c9fb51896c" +version = "0.1.80" +source = "git+https://github.com/rust-lang/rust-clippy?rev=35f54fd439432f56c749fbb34f1326e34ed1fc42#35f54fd439432f56c749fbb34f1326e34ed1fc42" dependencies = [ "arrayvec", "clippy_config", diff --git a/lints/Cargo.toml b/lints/Cargo.toml index 74fc49c3fd08..23fd63617dcd 100644 --- a/lints/Cargo.toml +++ b/lints/Cargo.toml @@ -14,7 +14,7 @@ path = "ui/format_error.rs" # See `README.md` before bumping the version. # Remember to update the version in `ci/Dockerfile` as well. [dependencies] -clippy_utils = { git = "https://github.com/rust-lang/rust-clippy", rev = "fca4e16ffb8c07186ee23becd44cd5c9fb51896c" } +clippy_utils = { git = "https://github.com/rust-lang/rust-clippy", rev = "35f54fd439432f56c749fbb34f1326e34ed1fc42" } dylint_linting = "3.1.0" itertools = "0.12" diff --git a/lints/README.md b/lints/README.md index 5007474227ab..3ab55f0bbfe7 100644 --- a/lints/README.md +++ b/lints/README.md @@ -30,8 +30,13 @@ Duplicate `.vscode/settings.json.example` to `.vscode/settings.json` to enable r ## Bump toolchain -The version of the toolchain is specified in `rust-toolchain` file under current directory. -It does not have to be exactly the same as the one used to build RisingWave, but it should be close enough to avoid compile errors. +The version of the toolchain is specified in `rust-toolchain` file under current directory. It will be used to build the lints, and also be used by `dylint` to compile RisingWave, instead of the root-level `rust-toolchain`. + +So the chosen toolchain needs to +1. be close enough to the root-level `rust-toolchain` to make RisingWave compile. It does not have to be exactly the same version though. +2. be close enough to the dependency `clippy_utils`'s corresponding `rust-toolchain` in the Clippy's repo. + +(Note: `clippy_utils` depends on rustc's internal unstable API. When rustc has breaking changes, the `rust` repo's Clippy will be updated. And then it's [synced back to the Clippy repo bi-weekly](https://doc.rust-lang.org/clippy/development/infrastructure/sync.html#syncing-changes-between-clippy-and-rust-langrust). So ideally we can use `clippy_utils` in the rust repo corresponding to our root-level nightly version, but that repo is too large. Perhaps we can also consider copy the code out to workaround this problem.) The information below can be helpful in finding the appropriate version to bump to. diff --git a/lints/rust-toolchain b/lints/rust-toolchain index 1df393ec52bf..0c3f355be78e 100644 --- a/lints/rust-toolchain +++ b/lints/rust-toolchain @@ -1,5 +1,5 @@ # See `README.md` before bumping the version. [toolchain] -channel = "nightly-2024-06-08" +channel = "nightly-2024-06-12" components = ["llvm-tools-preview", "rustc-dev"] diff --git a/lints/src/format_error.rs b/lints/src/format_error.rs index 9b6d9be5b8c4..402adc4aa5af 100644 --- a/lints/src/format_error.rs +++ b/lints/src/format_error.rs @@ -14,7 +14,7 @@ use clippy_utils::diagnostics::span_lint_and_help; use clippy_utils::macros::{ - find_format_arg_expr, find_format_args, is_format_macro, macro_backtrace, + find_format_arg_expr, is_format_macro, macro_backtrace, FormatArgsStorage, }; use clippy_utils::ty::{implements_trait, match_type}; use clippy_utils::{ @@ -56,7 +56,15 @@ declare_tool_lint! { } #[derive(Default)] -pub struct FormatError; +pub struct FormatError { + format_args: FormatArgsStorage, +} + +impl FormatError { + pub fn new(format_args: FormatArgsStorage) -> Self { + Self { format_args } + } +} impl_lint_pass!(FormatError => [FORMAT_ERROR]); @@ -90,7 +98,7 @@ impl<'tcx> LateLintPass<'tcx> for FormatError { for macro_call in macro_backtrace(expr.span) { if is_format_macro(cx, macro_call.def_id) - && let Some(format_args) = find_format_args(cx, expr, macro_call.expn) + && let Some(format_args) = self.format_args.get(cx, expr, macro_call.expn) { for piece in &format_args.template { if let FormatArgsPiece::Placeholder(placeholder) = piece diff --git a/lints/src/lib.rs b/lints/src/lib.rs index df77538d3cf1..6928bcd028a8 100644 --- a/lints/src/lib.rs +++ b/lints/src/lib.rs @@ -14,7 +14,6 @@ #![feature(rustc_private)] #![feature(let_chains)] -#![feature(lazy_cell)] #![warn(unused_extern_crates)] extern crate rustc_ast; @@ -36,13 +35,19 @@ pub fn register_lints(_sess: &rustc_session::Session, lint_store: &mut rustc_lin // -- Begin lint registration -- // Preparation steps. - lint_store.register_early_pass(|| { - Box::::default() + let format_args_storage = clippy_utils::macros::FormatArgsStorage::default(); + let format_args = format_args_storage.clone(); + lint_store.register_early_pass(move || { + Box::new(utils::format_args_collector::FormatArgsCollector::new( + format_args.clone(), + )) }); // Actual lints. lint_store.register_lints(&[format_error::FORMAT_ERROR]); - lint_store.register_late_pass(|_| Box::::default()); + let format_args = format_args_storage.clone(); + lint_store + .register_late_pass(move |_| Box::new(format_error::FormatError::new(format_args.clone()))); // -- End lint registration -- diff --git a/lints/src/utils/format_args_collector.rs b/lints/src/utils/format_args_collector.rs index 7524169666d9..9f35faf11665 100644 --- a/lints/src/utils/format_args_collector.rs +++ b/lints/src/utils/format_args_collector.rs @@ -12,15 +12,12 @@ // See the License for the specific language governing permissions and // limitations under the License. -//! Copied from `https://github.com/rust-lang/rust-clippy/blob/8b0bf6423dfaf5545014db85fcba7bc745beed4c/clippy_lints/src/utils/format_args_collector.rs` -//! -//! Init `AST_FORMAT_ARGS` before running the late pass, so that we can call `find_format_args`. +//! Copied from `https://github.com/Alexendoo/rust-clippy/blob/c187bff864234e869dabcb41d2336639e29e2511/clippy_lints/src/utils/format_args_collector.rs` use std::iter::once; use std::mem; -use std::rc::Rc; -use clippy_utils::macros::AST_FORMAT_ARGS; +use clippy_utils::macros::FormatArgsStorage; use clippy_utils::source::snippet_opt; use itertools::Itertools; use rustc_ast::{Crate, Expr, ExprKind, FormatArgs}; @@ -30,11 +27,19 @@ use rustc_lint::{EarlyContext, EarlyLintPass}; use rustc_session::impl_lint_pass; use rustc_span::{hygiene, Span}; -/// Collects [`rustc_ast::FormatArgs`] so that future late passes can call -/// [`clippy_utils::macros::find_format_args`] -#[derive(Default)] +/// Populates [`FormatArgsStorage`] with AST [`FormatArgs`] nodes pub struct FormatArgsCollector { - format_args: FxHashMap>, + format_args: FxHashMap, + storage: FormatArgsStorage, +} + +impl FormatArgsCollector { + pub fn new(storage: FormatArgsStorage) -> Self { + Self { + format_args: FxHashMap::default(), + storage, + } + } } impl_lint_pass!(FormatArgsCollector => []); @@ -47,15 +52,12 @@ impl EarlyLintPass for FormatArgsCollector { } self.format_args - .insert(expr.span.with_parent(None), Rc::new((**args).clone())); + .insert(expr.span.with_parent(None), (**args).clone()); } } fn check_crate_post(&mut self, _: &EarlyContext<'_>, _: &Crate) { - AST_FORMAT_ARGS.with(|ast_format_args| { - let result = ast_format_args.set(mem::take(&mut self.format_args)); - debug_assert!(result.is_ok()); - }); + self.storage.set(mem::take(&mut self.format_args)); } } From 341b04a5e1454d5b2d7e4bc96e0726a327b774bc Mon Sep 17 00:00:00 2001 From: xxchan Date: Wed, 31 Jul 2024 22:55:43 +0800 Subject: [PATCH 12/18] fix code Signed-off-by: xxchan --- src/connector/src/parser/avro/parser.rs | 2 +- src/object_store/src/object/s3.rs | 5 +---- 2 files changed, 2 insertions(+), 5 deletions(-) diff --git a/src/connector/src/parser/avro/parser.rs b/src/connector/src/parser/avro/parser.rs index dde74a999ac9..ac93ab3e6980 100644 --- a/src/connector/src/parser/avro/parser.rs +++ b/src/connector/src/parser/avro/parser.rs @@ -87,7 +87,7 @@ impl AvroAccessBuilder { /// ## Confluent schema registry /// /// - In Kafka ([Confluent schema registry wire format](https://docs.confluent.io/platform/7.6/schema-registry/fundamentals/serdes-develop/index.html#wire-format)): - /// starts with 5 bytes`0x00{schema_id:08x}` followed by Avro binary encoding. + /// starts with 5 bytes`0x00{schema_id:08x}` followed by Avro binary encoding. async fn parse_avro_value(&self, payload: &[u8]) -> ConnectorResult> { // parse payload to avro value // if use confluent schema, get writer schema from confluent schema registry diff --git a/src/object_store/src/object/s3.rs b/src/object_store/src/object/s3.rs index 0ef12f3da3a3..001eb8128a5b 100644 --- a/src/object_store/src/object/s3.rs +++ b/src/object_store/src/object/s3.rs @@ -116,10 +116,7 @@ impl S3StreamingUploader { /// Reference: const MIN_PART_SIZE: usize = 5 * 1024 * 1024; const MAX_PART_SIZE: usize = 5 * 1024 * 1024 * 1024; - let part_size = config - .upload_part_size - .min(MAX_PART_SIZE) - .max(MIN_PART_SIZE); + let part_size = config.upload_part_size.clamp(MIN_PART_SIZE, MAX_PART_SIZE); Self { client, From 10221be639a72c3af220e2f95e9355df6bdacad5 Mon Sep 17 00:00:00 2001 From: xxchan Date: Wed, 31 Jul 2024 23:02:20 +0800 Subject: [PATCH 13/18] try downgrade Signed-off-by: xxchan --- ci/rust-toolchain | 2 +- lints/Cargo.lock | 2 -- lints/Cargo.toml | 2 +- lints/rust-toolchain | 2 +- .../src/sink/iceberg/prometheus/monitored_partition_writer.rs | 1 + .../src/sink/iceberg/prometheus/monitored_write_writer.rs | 1 + src/stream/src/executor/nested_loop_temporal_join.rs | 1 + 7 files changed, 6 insertions(+), 5 deletions(-) diff --git a/ci/rust-toolchain b/ci/rust-toolchain index 1344210e6b76..6bc57a2a65d8 100644 --- a/ci/rust-toolchain +++ b/ci/rust-toolchain @@ -4,4 +4,4 @@ # 3. (optional) **follow the instructions in lints/README.md** to update the toolchain and dependencies for lints [toolchain] -channel = "nightly-2024-06-12" +channel = "nightly-2024-06-06" diff --git a/lints/Cargo.lock b/lints/Cargo.lock index 650509dca25b..e3b748e6da67 100644 --- a/lints/Cargo.lock +++ b/lints/Cargo.lock @@ -163,7 +163,6 @@ checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" [[package]] name = "clippy_config" version = "0.1.80" -source = "git+https://github.com/rust-lang/rust-clippy?rev=35f54fd439432f56c749fbb34f1326e34ed1fc42#35f54fd439432f56c749fbb34f1326e34ed1fc42" dependencies = [ "rustc-semver", "serde", @@ -173,7 +172,6 @@ dependencies = [ [[package]] name = "clippy_utils" version = "0.1.80" -source = "git+https://github.com/rust-lang/rust-clippy?rev=35f54fd439432f56c749fbb34f1326e34ed1fc42#35f54fd439432f56c749fbb34f1326e34ed1fc42" dependencies = [ "arrayvec", "clippy_config", diff --git a/lints/Cargo.toml b/lints/Cargo.toml index 23fd63617dcd..43ece1f6fc5b 100644 --- a/lints/Cargo.toml +++ b/lints/Cargo.toml @@ -14,7 +14,7 @@ path = "ui/format_error.rs" # See `README.md` before bumping the version. # Remember to update the version in `ci/Dockerfile` as well. [dependencies] -clippy_utils = { git = "https://github.com/rust-lang/rust-clippy", rev = "35f54fd439432f56c749fbb34f1326e34ed1fc42" } +clippy_utils = { git = "https://github.com/risingwavelabs/clippy", rev = "5e2a7c6adebdb0478ee6d5b67ab4ee94153b2997" } dylint_linting = "3.1.0" itertools = "0.12" diff --git a/lints/rust-toolchain b/lints/rust-toolchain index 0c3f355be78e..a146af66cd63 100644 --- a/lints/rust-toolchain +++ b/lints/rust-toolchain @@ -1,5 +1,5 @@ # See `README.md` before bumping the version. [toolchain] -channel = "nightly-2024-06-12" +channel = "nightly-2024-06-06" components = ["llvm-tools-preview", "rustc-dev"] diff --git a/src/connector/src/sink/iceberg/prometheus/monitored_partition_writer.rs b/src/connector/src/sink/iceberg/prometheus/monitored_partition_writer.rs index 463b1f3c9dbd..d85d712c41ac 100644 --- a/src/connector/src/sink/iceberg/prometheus/monitored_partition_writer.rs +++ b/src/connector/src/sink/iceberg/prometheus/monitored_partition_writer.rs @@ -27,6 +27,7 @@ pub struct MonitoredFanoutPartitionedWriterBuilder { } impl MonitoredFanoutPartitionedWriterBuilder { + #[expect(dead_code)] pub fn new( inner: FanoutPartitionedWriterBuilder, partition_num: LabelGuardedIntGauge<2>, diff --git a/src/connector/src/sink/iceberg/prometheus/monitored_write_writer.rs b/src/connector/src/sink/iceberg/prometheus/monitored_write_writer.rs index aebb5939ff14..dc44434e5d9c 100644 --- a/src/connector/src/sink/iceberg/prometheus/monitored_write_writer.rs +++ b/src/connector/src/sink/iceberg/prometheus/monitored_write_writer.rs @@ -28,6 +28,7 @@ pub struct MonitoredWriteWriterBuilder { impl MonitoredWriteWriterBuilder { /// Create writer context. + #[expect(dead_code)] pub fn new( inner: B, write_qps: LabelGuardedIntCounter<2>, diff --git a/src/stream/src/executor/nested_loop_temporal_join.rs b/src/stream/src/executor/nested_loop_temporal_join.rs index 1cbeba9560a9..0888d8981fc8 100644 --- a/src/stream/src/executor/nested_loop_temporal_join.rs +++ b/src/stream/src/executor/nested_loop_temporal_join.rs @@ -99,6 +99,7 @@ async fn phase1_handle_chunk( impl NestedLoopTemporalJoinExecutor { #[allow(clippy::too_many_arguments)] + #[expect(dead_code)] pub fn new( ctx: ActorContextRef, info: ExecutorInfo, From 90d6df798b075d38304ba8740259ec45e1a7835d Mon Sep 17 00:00:00 2001 From: xxchan Date: Wed, 31 Jul 2024 23:28:04 +0800 Subject: [PATCH 14/18] bump ci image Signed-off-by: xxchan --- ci/build-ci-image.sh | 2 +- ci/docker-compose.yml | 10 +++++----- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/ci/build-ci-image.sh b/ci/build-ci-image.sh index 329bf01b49c5..6602509824e0 100755 --- a/ci/build-ci-image.sh +++ b/ci/build-ci-image.sh @@ -10,7 +10,7 @@ cat ../rust-toolchain # shellcheck disable=SC2155 # REMEMBER TO ALSO UPDATE ci/docker-compose.yml -export BUILD_ENV_VERSION=v20240729 +export BUILD_ENV_VERSION=v20240731 export BUILD_TAG="public.ecr.aws/w1p7b4n3/rw-build-env:${BUILD_ENV_VERSION}" diff --git a/ci/docker-compose.yml b/ci/docker-compose.yml index 89287926edc7..83cb000566d4 100644 --- a/ci/docker-compose.yml +++ b/ci/docker-compose.yml @@ -71,7 +71,7 @@ services: retries: 5 source-test-env: - image: public.ecr.aws/w1p7b4n3/rw-build-env:v20240729 + image: public.ecr.aws/w1p7b4n3/rw-build-env:v20240731 depends_on: - mysql - db @@ -84,7 +84,7 @@ services: - ..:/risingwave sink-test-env: - image: public.ecr.aws/w1p7b4n3/rw-build-env:v20240729 + image: public.ecr.aws/w1p7b4n3/rw-build-env:v20240731 depends_on: - mysql - db @@ -107,12 +107,12 @@ services: rw-build-env: - image: public.ecr.aws/w1p7b4n3/rw-build-env:v20240729 + image: public.ecr.aws/w1p7b4n3/rw-build-env:v20240731 volumes: - ..:/risingwave ci-flamegraph-env: - image: public.ecr.aws/w1p7b4n3/rw-build-env:v20240729 + image: public.ecr.aws/w1p7b4n3/rw-build-env:v20240731 # NOTE(kwannoel): This is used in order to permit # syscalls for `nperf` (perf_event_open), # so it can do CPU profiling. @@ -123,7 +123,7 @@ services: - ..:/risingwave regress-test-env: - image: public.ecr.aws/w1p7b4n3/rw-build-env:v20240729 + image: public.ecr.aws/w1p7b4n3/rw-build-env:v20240731 depends_on: db: condition: service_healthy From 0fa13465226c7898f0adee19a5e46bfe5ac4538e Mon Sep 17 00:00:00 2001 From: xxchan Date: Wed, 31 Jul 2024 23:34:33 +0800 Subject: [PATCH 15/18] fix Signed-off-by: xxchan --- src/expr/impl/src/window_function/buffer.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/expr/impl/src/window_function/buffer.rs b/src/expr/impl/src/window_function/buffer.rs index bd1c10d162b2..57217dda6fd4 100644 --- a/src/expr/impl/src/window_function/buffer.rs +++ b/src/expr/impl/src/window_function/buffer.rs @@ -970,7 +970,7 @@ mod tests { let key = |key: i64| -> StateKey { StateKey { - order_key: memcmp_encoding::encode_value(&Some(ScalarImpl::from(key)), order_type) + order_key: memcmp_encoding::encode_value(Some(ScalarImpl::from(key)), order_type) .unwrap(), pk: OwnedRow::empty().into(), } From 6592396364c75a29e3295ab1b188a0bfbe2fc4a5 Mon Sep 17 00:00:00 2001 From: xxchan Date: Thu, 1 Aug 2024 08:55:14 +0800 Subject: [PATCH 16/18] fix ambiguous outer attributes Signed-off-by: xxchan --- .../simulation/tests/integration_tests/scale/no_shuffle.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/tests/simulation/tests/integration_tests/scale/no_shuffle.rs b/src/tests/simulation/tests/integration_tests/scale/no_shuffle.rs index 30d9e0c73fec..e87659d4f54d 100644 --- a/src/tests/simulation/tests/integration_tests/scale/no_shuffle.rs +++ b/src/tests/simulation/tests/integration_tests/scale/no_shuffle.rs @@ -81,7 +81,9 @@ async fn test_delta_join() -> Result<()> { .assert_result_eq(result); #[allow(unused_assignments)] - test_times += 1; + { + test_times += 1; + } }; } From 7a0a4a96509162d9f32d6dbfd9232d5dc1013fd3 Mon Sep 17 00:00:00 2001 From: xxchan Date: Thu, 1 Aug 2024 09:01:41 +0800 Subject: [PATCH 17/18] fix cargo script --- e2e_test/source_inline/pubsub/prepare-data.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/e2e_test/source_inline/pubsub/prepare-data.rs b/e2e_test/source_inline/pubsub/prepare-data.rs index a792c6cbb261..e084b4691942 100755 --- a/e2e_test/source_inline/pubsub/prepare-data.rs +++ b/e2e_test/source_inline/pubsub/prepare-data.rs @@ -1,5 +1,5 @@ #!/usr/bin/env -S cargo -Zscript -```cargo +---cargo [dependencies] anyhow = "1" google-cloud-googleapis = { version = "0.13", features = ["pubsub"] } @@ -13,7 +13,7 @@ tokio = { version = "0.2", package = "madsim-tokio", features = [ "signal", "fs", ] } -``` +--- use google_cloud_googleapis::pubsub::v1::PubsubMessage; use google_cloud_pubsub::client::{Client, ClientConfig}; From 3389da72af9db9f72107b08bb8b624f158198496 Mon Sep 17 00:00:00 2001 From: xxchan Date: Fri, 2 Aug 2024 11:59:32 +0800 Subject: [PATCH 18/18] nit Signed-off-by: xxchan --- lints/src/utils/format_args_collector.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lints/src/utils/format_args_collector.rs b/lints/src/utils/format_args_collector.rs index 9f35faf11665..a3b144a6a8ae 100644 --- a/lints/src/utils/format_args_collector.rs +++ b/lints/src/utils/format_args_collector.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -//! Copied from `https://github.com/Alexendoo/rust-clippy/blob/c187bff864234e869dabcb41d2336639e29e2511/clippy_lints/src/utils/format_args_collector.rs` +//! Copied from `https://github.com/rust-lang/rust-clippy/blob/993d8ae2a7b26ac779fde923b2ce9ce35d7143a8/clippy_lints/src/utils/format_args_collector.rs` use std::iter::once; use std::mem;