From 5ac1b0ce507e59d58ace4cfd41abcf6aca237e4b Mon Sep 17 00:00:00 2001 From: Liang <44948473+soundOfDestiny@users.noreply.github.com> Date: Tue, 22 Nov 2022 15:56:08 +0800 Subject: [PATCH 1/6] perf(dynamic filter): concurrently load from state table (#6490) * perf(dynamic filter): concurrently load from state table (close #6489) * remove `try_join_all` * refactor * use take * clippy --- .../executor/managed_state/dynamic_filter.rs | 52 +++++++++++++------ 1 file changed, 37 insertions(+), 15 deletions(-) diff --git a/src/stream/src/executor/managed_state/dynamic_filter.rs b/src/stream/src/executor/managed_state/dynamic_filter.rs index 0212fec3cb08..8e88fe0d91be 100644 --- a/src/stream/src/executor/managed_state/dynamic_filter.rs +++ b/src/stream/src/executor/managed_state/dynamic_filter.rs @@ -19,11 +19,11 @@ use std::ops::RangeBounds; use std::sync::Arc; use anyhow::anyhow; -use futures::{pin_mut, StreamExt}; +use futures::{pin_mut, stream, StreamExt}; use itertools::Itertools; use risingwave_common::buffer::Bitmap; use risingwave_common::row::{CompactedRow, Row, Row2}; -use risingwave_common::types::{ScalarImpl, VIRTUAL_NODE_SIZE}; +use risingwave_common::types::{ScalarImpl, VirtualNode, VIRTUAL_NODE_SIZE}; use risingwave_common::util::epoch::EpochPair; use risingwave_storage::StateStore; @@ -152,10 +152,26 @@ impl RangeCache { ) }); - let own_vnodes = self.vnodes.ones().collect_vec(); for pk_range in missing_ranges { - for vnode in &own_vnodes { - self.add_vnode_range(*vnode, &pk_range).await?; + let init_maps = self + .vnodes + .ones() + .map(|vnode| { + self.cache + .get_mut(&(vnode as VirtualNode)) + .map(std::mem::take) + .unwrap_or_default() + }) + .collect_vec(); + let futures = self + .vnodes + .ones() + .zip_eq(init_maps.into_iter()) + .map(|(vnode, init_map)| self.fetch_vnode_range(vnode, &pk_range, init_map)); + let results: Vec<_> = stream::iter(futures).buffer_unordered(10).collect().await; + for result in results { + let (vnode, map) = result?; + self.cache.insert(vnode, map); } } @@ -169,20 +185,21 @@ impl RangeCache { )) } - async fn add_vnode_range( - &mut self, + async fn fetch_vnode_range( + &self, vnode: usize, pk_range: &(Bound, Bound), - ) -> StreamExecutorResult<()> { + initial_map: BTreeMap, CompactedRow>, + ) -> StreamExecutorResult<(VirtualNode, BTreeMap, CompactedRow>)> { let vnode = vnode.try_into().unwrap(); - // TODO: do this concurrently over each vnode. let row_stream = self .state_table .iter_key_and_val_with_pk_range(pk_range, vnode) .await?; pin_mut!(row_stream); - let map = self.cache.entry(vnode).or_insert_with(BTreeMap::new); + let mut map = initial_map; + // row stream output is sorted by its pk, aka left key (and then original pk) while let Some(res) = row_stream.next().await { let (key_bytes, row) = res?; @@ -192,7 +209,7 @@ impl RangeCache { ); } - Ok(()) + Ok((vnode, map)) } /// Updates the vnodes for `RangeCache`, purging the rows of the vnodes that are no longer @@ -213,10 +230,15 @@ impl RangeCache { Self::to_row_bound(self_range.0.clone()), Self::to_row_bound(self_range.1.clone()), ); - for (vnode, (old, new)) in old_vnodes.iter().zip_eq(new_vnodes.iter()).enumerate() { - if new && !old { - self.add_vnode_range(vnode, ¤t_range).await?; - } + let newly_owned_vnodes = Bitmap::bit_saturate_subtract(&new_vnodes, &old_vnodes); + + let futures = newly_owned_vnodes + .ones() + .map(|vnode| self.fetch_vnode_range(vnode, ¤t_range, BTreeMap::new())); + let results: Vec<_> = stream::iter(futures).buffer_unordered(10).collect().await; + for result in results { + let (vnode, map) = result?; + self.cache.insert(vnode, map); } } self.vnodes = new_vnodes; From ca95541f72580fc3636a4ad1cda0e4bd36349ca7 Mon Sep 17 00:00:00 2001 From: StrikeW Date: Tue, 22 Nov 2022 16:35:38 +0800 Subject: [PATCH 2/6] test(source): add ci e2e test for mysql-cdc (#6481) * mysql-cdc: add ci e2e test * add github token * compress debug infos * add cdc e2e test cases * fix * fix mysql * update * fix connector jar * fix * refactor ci scripts * minor Co-authored-by: mergify[bot] <37929162+mergify[bot]@users.noreply.github.com> --- ci/Dockerfile | 3 +- ci/build-ci-image.sh | 2 +- ci/docker-compose.yml | 27 ++++++- ci/scripts/build-other.sh | 15 ++++ ci/scripts/build.sh | 6 +- ci/scripts/e2e-source-test.sh | 25 ++++++- ci/workflows/pull-request.yml | 23 +++++- e2e_test/source/{ => basic}/datagen.slt | 0 e2e_test/source/{ => basic}/ddl.slt | 0 e2e_test/source/{ => basic}/kafka.slt | 0 .../source/{ => basic}/nexmark_endless.slt | 6 +- e2e_test/source/cdc/cdc.check.slt | 23 ++++++ e2e_test/source/cdc/cdc.load.slt | 70 +++++++++++++++++++ e2e_test/source/cdc/mysql_cdc.sql | 51 ++++++++++++++ risedev.yml | 2 +- 15 files changed, 240 insertions(+), 13 deletions(-) create mode 100755 ci/scripts/build-other.sh rename e2e_test/source/{ => basic}/datagen.slt (100%) rename e2e_test/source/{ => basic}/ddl.slt (100%) rename e2e_test/source/{ => basic}/kafka.slt (100%) rename e2e_test/source/{ => basic}/nexmark_endless.slt (74%) create mode 100644 e2e_test/source/cdc/cdc.check.slt create mode 100644 e2e_test/source/cdc/cdc.load.slt create mode 100644 e2e_test/source/cdc/mysql_cdc.sql diff --git a/ci/Dockerfile b/ci/Dockerfile index d71d2fc31210..626b21a1785d 100644 --- a/ci/Dockerfile +++ b/ci/Dockerfile @@ -6,7 +6,8 @@ ARG RUST_TOOLCHAIN RUN apt-get update -yy && \ DEBIAN_FRONTEND=noninteractive apt-get -y install make build-essential cmake protobuf-compiler curl parallel \ - openssl libssl-dev libsasl2-dev libcurl4-openssl-dev pkg-config bash openjdk-11-jdk wget unzip git tmux lld postgresql-client kafkacat -yy \ + openssl libssl-dev libsasl2-dev libcurl4-openssl-dev pkg-config bash openjdk-11-jdk wget unzip git tmux lld postgresql-client kafkacat \ + maven -yy \ && rm -rf /var/lib/{apt,dpkg,cache,log}/ SHELL ["/bin/bash", "-c"] diff --git a/ci/build-ci-image.sh b/ci/build-ci-image.sh index 6cc4ccefdddf..f325912336b7 100755 --- a/ci/build-ci-image.sh +++ b/ci/build-ci-image.sh @@ -14,7 +14,7 @@ export RUST_TOOLCHAIN=$(cat ../rust-toolchain) # !!! CHANGE THIS WHEN YOU WANT TO BUMP CI IMAGE !!! # # AND ALSO docker-compose.yml # ###################################################### -export BUILD_ENV_VERSION=v20221105 +export BUILD_ENV_VERSION=v20221122 export BUILD_TAG="public.ecr.aws/x5u3w5h6/rw-build-env:${BUILD_ENV_VERSION}" diff --git a/ci/docker-compose.yml b/ci/docker-compose.yml index 710c0b00ca25..7cb5504f1fe7 100644 --- a/ci/docker-compose.yml +++ b/ci/docker-compose.yml @@ -14,6 +14,20 @@ services: timeout: 5s retries: 5 + mysql: + image: mysql:8.0 + ports: + - "3306:3306" + environment: + - MYSQL_ROOT_PASSWORD=123456 + - MYSQL_USER=mysqluser + - MYSQL_PASSWORD=mysqlpw + healthcheck: + test: [ "CMD-SHELL", "mysqladmin ping -h 127.0.0.1 -u root -p123456" ] + interval: 5s + timeout: 5s + retries: 5 + zookeeper: image: confluentinc/cp-zookeeper ports: @@ -41,13 +55,20 @@ services: timeout: 10s retries: 5 + source-test-env: + image: public.ecr.aws/x5u3w5h6/rw-build-env:v20221122 + depends_on: + - mysql + volumes: + - ..:/risingwave + rw-build-env: - image: public.ecr.aws/x5u3w5h6/rw-build-env:v20221105 + image: public.ecr.aws/x5u3w5h6/rw-build-env:v20221122 volumes: - ..:/risingwave regress-test-env: - image: public.ecr.aws/x5u3w5h6/rw-build-env:v20221105 + image: public.ecr.aws/x5u3w5h6/rw-build-env:v20221122 depends_on: db: condition: service_healthy @@ -55,7 +76,7 @@ services: - ..:/risingwave benchmark-env: - image: public.ecr.aws/x5u3w5h6/rw-build-env:v20221105 + image: public.ecr.aws/x5u3w5h6/rw-build-env:v20221122 depends_on: - kafka volumes: diff --git a/ci/scripts/build-other.sh b/ci/scripts/build-other.sh new file mode 100755 index 000000000000..a381986db1df --- /dev/null +++ b/ci/scripts/build-other.sh @@ -0,0 +1,15 @@ +#!/bin/bash + +# Exits as soon as any line fails. +set -euo pipefail + +source ci/scripts/common.env.sh + +echo "--- Build Java connector node" +# clone a released version(tag) +git clone --branch v0.0.1 --depth 1 https://"$GITHUB_TOKEN"@github.com/risingwavelabs/risingwave-connector-node.git +cd risingwave-connector-node +mvn package -Dmaven.test.skip=true +echo "--- Upload Java artifacts" +cp service/target/service-*.jar ./connector-service.jar +buildkite-agent artifact upload ./connector-service.jar diff --git a/ci/scripts/build.sh b/ci/scripts/build.sh index f786c3c0c065..735943ee1b0e 100755 --- a/ci/scripts/build.sh +++ b/ci/scripts/build.sh @@ -42,8 +42,12 @@ cargo build \ -p risingwave_compaction_test \ --features "static-link static-log-level" --profile "$profile" -echo "--- Compress RisingWave debug info" +echo "--- Compress debug info for artifacts" objcopy --compress-debug-sections=zlib-gnu target/"$target"/risingwave +objcopy --compress-debug-sections=zlib-gnu target/"$target"/sqlsmith +objcopy --compress-debug-sections=zlib-gnu target/"$target"/compaction-test +objcopy --compress-debug-sections=zlib-gnu target/"$target"/risingwave_regress_test +objcopy --compress-debug-sections=zlib-gnu target/"$target"/risedev-dev echo "--- Show link info" ldd target/"$target"/risingwave diff --git a/ci/scripts/e2e-source-test.sh b/ci/scripts/e2e-source-test.sh index 2425ca4a1672..34c156eaac33 100755 --- a/ci/scripts/e2e-source-test.sh +++ b/ci/scripts/e2e-source-test.sh @@ -28,6 +28,9 @@ buildkite-agent artifact download risedev-dev-"$profile" target/debug/ mv target/debug/risingwave-"$profile" target/debug/risingwave mv target/debug/risedev-dev-"$profile" target/debug/risedev-dev +echo "--- Download connector node jar" +buildkite-agent artifact download connector-service.jar ./ + echo "--- Prepare data" cp src/source/src/test_data/simple-schema.avsc ./avro-simple-schema.avsc cp src/source/src/test_data/complex-schema.avsc ./avro-complex-schema.avsc @@ -44,11 +47,31 @@ echo "--- Prepare RiseDev dev cluster" cargo make pre-start-dev cargo make link-all-in-one-binaries +echo "--- e2e, ci-1cn-1fe, cdc source" +# install mysql client +apt-get -y install mysql-client +# import data to mysql +mysql --host=mysql --port=3306 -u root -p123456 < ./e2e_test/source/cdc/mysql_cdc.sql +# start risingwave cluster +cargo make ci-start ci-1cn-1fe +# start cdc connector node +nohup java -cp ./connector-service.jar com.risingwave.sourcenode.service.SourceServiceMain > .risingwave/log/connector-source.log 2>&1 & +sleep 1 +sqllogictest -p 4566 -d dev './e2e_test/source/cdc/cdc.load.slt' +# wait for cdc loading +sleep 4 +sqllogictest -p 4566 -d dev './e2e_test/source/cdc/cdc.check.slt' + +echo "--- Kill cluster" +pkill -f connector-service.jar +cargo make ci-kill + echo "--- e2e test w/ Rust frontend - source with kafka" cargo make clean-data cargo make ci-start ci-kafka ./scripts/source/prepare_ci_kafka.sh -sqllogictest -p 4566 -d dev './e2e_test/source/**/*.slt' +sqllogictest -p 4566 -d dev './e2e_test/source/basic/*.slt' + echo "--- Run CH-benCHmark" ./risedev slt -p 4566 -d dev ./e2e_test/ch-benchmark/ch_benchmark.slt diff --git a/ci/workflows/pull-request.yml b/ci/workflows/pull-request.yml index b4efe0e91b09..17e4caf6ffa1 100644 --- a/ci/workflows/pull-request.yml +++ b/ci/workflows/pull-request.yml @@ -43,6 +43,23 @@ steps: timeout_in_minutes: 10 retry: *auto-retry + - label: "build other components" + command: "ci/scripts/build-other.sh -t ci-dev -p ci-dev" + key: "build-other" + plugins: + - gencer/cache#v2.4.10: *cargo-cache + - seek-oss/aws-sm#v2.3.1: + env: + GITHUB_TOKEN: github-token + - docker-compose#v3.9.0: + run: rw-build-env + config: ci/docker-compose.yml + mount-buildkite-agent: true + environment: + - GITHUB_TOKEN + timeout_in_minutes: 10 + retry: *auto-retry + - label: "build (deterministic simulation)" command: "ci/scripts/build-simulation.sh" key: "build-simulation" @@ -112,11 +129,13 @@ steps: - label: "end-to-end source test" command: "ci/scripts/e2e-source-test.sh -p ci-dev" - depends_on: "build" + depends_on: + - "build" + - "build-other" plugins: - gencer/cache#v2.4.10: *cargo-cache - docker-compose#v3.9.0: - run: rw-build-env + run: source-test-env config: ci/docker-compose.yml mount-buildkite-agent: true - ./ci/plugins/upload-failure-logs diff --git a/e2e_test/source/datagen.slt b/e2e_test/source/basic/datagen.slt similarity index 100% rename from e2e_test/source/datagen.slt rename to e2e_test/source/basic/datagen.slt diff --git a/e2e_test/source/ddl.slt b/e2e_test/source/basic/ddl.slt similarity index 100% rename from e2e_test/source/ddl.slt rename to e2e_test/source/basic/ddl.slt diff --git a/e2e_test/source/kafka.slt b/e2e_test/source/basic/kafka.slt similarity index 100% rename from e2e_test/source/kafka.slt rename to e2e_test/source/basic/kafka.slt diff --git a/e2e_test/source/nexmark_endless.slt b/e2e_test/source/basic/nexmark_endless.slt similarity index 74% rename from e2e_test/source/nexmark_endless.slt rename to e2e_test/source/basic/nexmark_endless.slt index adbb7c17e8e5..4b07999c10f2 100644 --- a/e2e_test/source/nexmark_endless.slt +++ b/e2e_test/source/basic/nexmark_endless.slt @@ -1,9 +1,9 @@ -include ../nexmark/create_sources.slt.part +include ../../nexmark/create_sources.slt.part # Note: It will take a very long time to create all of the NexMark materialized views, # so we just create Q5 and Q7 here -include ../streaming/nexmark/views/q7.slt.part -include ../streaming/nexmark/views/q5.slt.part +include ../../streaming/nexmark/views/q7.slt.part +include ../../streaming/nexmark/views/q5.slt.part sleep 15s diff --git a/e2e_test/source/cdc/cdc.check.slt b/e2e_test/source/cdc/cdc.check.slt new file mode 100644 index 000000000000..529ead9dde77 --- /dev/null +++ b/e2e_test/source/cdc/cdc.check.slt @@ -0,0 +1,23 @@ +# CDC source basic test + +query I +select cnt from products_cnt; +---- +9 + +query II +select cnt from orders_cnt; +---- +3 + +query III +select cnt from shipments_cnt; +---- +3 + +query IIII +select order_id, product_id, shipment_id from enriched_orders order by order_id; +---- +10001 102 1001 +10002 105 1002 +10003 106 1003 diff --git a/e2e_test/source/cdc/cdc.load.slt b/e2e_test/source/cdc/cdc.load.slt new file mode 100644 index 000000000000..59886d693050 --- /dev/null +++ b/e2e_test/source/cdc/cdc.load.slt @@ -0,0 +1,70 @@ +# CDC source basic test + +statement ok +create materialized source products ( id INT, + name STRING, + description STRING, + PRIMARY KEY (id) +) with ( + connector = 'cdc', + database.hostname = 'mysql', + database.port = '3306', + database.user = 'root', + database.password = '123456', + database.name = 'mydb', + table.name = 'products' +) row format debezium_json; + +statement ok +create materialized view products_cnt as select count(*) as cnt from products; + +# Since timestamp type cannot parse properly, use bigint for order_date instead +statement ok +create materialized source orders ( + order_id int, + order_date bigint, + customer_name string, + price decimal, + product_id int, + order_status smallint, + PRIMARY KEY (order_id) +) with ( + connector = 'cdc', + database.hostname = 'mysql', + database.port = '3306', + database.user = 'root', + database.password = '123456', + database.name = 'mydb', + table.name = 'orders' +) row format debezium_json; + +statement ok +create materialized view orders_cnt as select count(*) as cnt from orders; + +statement ok +create materialized source shipments ( + shipment_id INTEGER, + order_id INTEGER, + origin STRING, + destination STRING, + is_arrived smallint, + PRIMARY KEY (shipment_id) +) with ( + connector = 'cdc', + database.hostname = 'mysql', + database.port = '3306', + database.user = 'root', + database.password = '123456', + database.name = 'mydb', + table.name = 'shipments' +) row format debezium_json; + +statement ok +create materialized view shipments_cnt as select count(*) as cnt from shipments; + +# Create a mview upon above three tables +statement ok +create materialized view enriched_orders as SELECT o.*, p.name, p.description, s.shipment_id, s.origin, s.destination, s.is_arrived + FROM orders AS o + LEFT JOIN products AS p ON o.product_id = p.id + LEFT JOIN shipments AS s ON o.order_id = s.order_id; diff --git a/e2e_test/source/cdc/mysql_cdc.sql b/e2e_test/source/cdc/mysql_cdc.sql new file mode 100644 index 000000000000..bece8b93f91a --- /dev/null +++ b/e2e_test/source/cdc/mysql_cdc.sql @@ -0,0 +1,51 @@ +DROP DATABASE IF EXISTS mydb; +CREATE DATABASE mydb; + +USE mydb; + +CREATE TABLE products ( + id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY, + name VARCHAR(255) NOT NULL, + description VARCHAR(512) +); + +ALTER TABLE products AUTO_INCREMENT = 101; + +INSERT INTO products +VALUES (default,"scooter","Small 2-wheel scooter"), + (default,"car battery","12V car battery"), + (default,"12-pack drill bits","12-pack of drill bits with sizes ranging from #40 to #3"), + (default,"hammer","12oz carpenter's hammer"), + (default,"hammer","14oz carpenter's hammer"), + (default,"hammer","16oz carpenter's hammer"), + (default,"rocks","box of assorted rocks"), + (default,"jacket","water resistent black wind breaker"), + (default,"spare tire","24 inch spare tire"); + + +CREATE TABLE orders ( + order_id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY, + order_date DATETIME NOT NULL, + customer_name VARCHAR(255) NOT NULL, + price DECIMAL(10, 5) NOT NULL, + product_id INTEGER NOT NULL, + order_status BOOLEAN NOT NULL -- Whether order has been placed +) AUTO_INCREMENT = 10001; + +INSERT INTO orders +VALUES (default, '2020-07-30 10:08:22', 'Jark', 50.50, 102, false), + (default, '2020-07-30 10:11:09', 'Sally', 15.00, 105, false), + (default, '2020-07-30 12:00:30', 'Edward', 25.25, 106, false); + +CREATE TABLE shipments ( + shipment_id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY, + order_id INTEGER NOT NULL, + origin VARCHAR(255) NOT NULL, + destination VARCHAR(255) NOT NULL, + is_arrived BOOLEAN NOT NULL +) AUTO_INCREMENT = 1001; + +INSERT INTO shipments +VALUES (default,10001,'Beijing','Shanghai',false), + (default,10002,'Hangzhou','Shanghai',false), + (default,10003,'Shanghai','Hangzhou',false); diff --git a/risedev.yml b/risedev.yml index 783c34e0b49b..5eedc3897af0 100644 --- a/risedev.yml +++ b/risedev.yml @@ -584,7 +584,7 @@ template: enable-tiered-cache: false # RPC endpoint for source connector node - connector-source-endpoint: "127.0.0.1:61261" + connector-source-endpoint: "127.0.0.1:60061" # Minio instances used by this compute node provide-minio: "minio*" From 3b0ff6c1110e02052ff762a15347cb32b26c31fc Mon Sep 17 00:00:00 2001 From: Liang <44948473+soundOfDestiny@users.noreply.github.com> Date: Tue, 22 Nov 2022 16:53:17 +0800 Subject: [PATCH 3/6] feat(barrier sender): support multiple senders in one actor (#6502) * feat(barrier sender): support multiple senders in one actor (close #6501) * fmt Co-authored-by: mergify[bot] <37929162+mergify[bot]@users.noreply.github.com> --- src/stream/src/task/barrier_manager.rs | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/src/stream/src/task/barrier_manager.rs b/src/stream/src/task/barrier_manager.rs index 1ad390a0940e..46da5be8d46a 100644 --- a/src/stream/src/task/barrier_manager.rs +++ b/src/stream/src/task/barrier_manager.rs @@ -60,7 +60,7 @@ enum BarrierState { /// barriers to and collect them from all actors, and finally report the progress. pub struct LocalBarrierManager { /// Stores all streaming job source sender. - senders: HashMap>, + senders: HashMap>>, /// Span of the current epoch. #[expect(dead_code)] @@ -101,7 +101,7 @@ impl LocalBarrierManager { /// Register sender for source actors, used to send barriers. pub fn register_sender(&mut self, actor_id: ActorId, sender: UnboundedSender) { tracing::trace!(actor_id = actor_id, "register sender"); - self.senders.insert(actor_id, sender); + self.senders.entry(actor_id).or_default().push(sender); } /// Return all senders. @@ -150,10 +150,12 @@ impl LocalBarrierManager { for actor_id in to_send { match self.senders.get(&actor_id) { - Some(sender) => { - if let Err(err) = sender.send(barrier.clone()) { - // return err to trigger recovery. - bail!("failed to send barrier to actor {}: {:?}", actor_id, err) + Some(senders) => { + for sender in senders { + if let Err(err) = sender.send(barrier.clone()) { + // return err to trigger recovery. + bail!("failed to send barrier to actor {}: {:?}", actor_id, err) + } } } None => { From ebaa4550e0a866c7df4b514d469b278dc596341d Mon Sep 17 00:00:00 2001 From: August Date: Tue, 22 Nov 2022 18:07:50 +0800 Subject: [PATCH 4/6] chore: rename sink fragment type to mview (#6515) * chore: rename sink fragment type to mview * rename some other funcs Co-authored-by: mergify[bot] <37929162+mergify[bot]@users.noreply.github.com> --- dashboard/proto/gen/stream_plan.ts | 7 +++++- proto/stream_plan.proto | 4 ++-- src/frontend/src/stream_fragmenter/mod.rs | 4 +++- src/meta/src/barrier/command.rs | 8 +++---- src/meta/src/manager/catalog/fragment.rs | 22 +++++++++---------- src/meta/src/model/stream.rs | 26 +++++++++++------------ src/meta/src/rpc/service/ddl_service.rs | 4 ++-- src/meta/src/stream/stream_graph.rs | 2 +- src/meta/src/stream/stream_manager.rs | 26 +++++++++++------------ src/meta/src/stream/test_fragmenter.rs | 4 ++-- 10 files changed, 57 insertions(+), 50 deletions(-) diff --git a/dashboard/proto/gen/stream_plan.ts b/dashboard/proto/gen/stream_plan.ts index 1f7810a299b3..698aadc9200c 100644 --- a/dashboard/proto/gen/stream_plan.ts +++ b/dashboard/proto/gen/stream_plan.ts @@ -137,7 +137,7 @@ export const FragmentType = { FRAGMENT_UNSPECIFIED: "FRAGMENT_UNSPECIFIED", OTHERS: "OTHERS", SOURCE: "SOURCE", - /** SINK - TODO: change it to MATERIALIZED_VIEW or other name, since we have sink type now. */ + MVIEW: "MVIEW", SINK: "SINK", UNRECOGNIZED: "UNRECOGNIZED", } as const; @@ -156,6 +156,9 @@ export function fragmentTypeFromJSON(object: any): FragmentType { case "SOURCE": return FragmentType.SOURCE; case 3: + case "MVIEW": + return FragmentType.MVIEW; + case 4: case "SINK": return FragmentType.SINK; case -1: @@ -173,6 +176,8 @@ export function fragmentTypeToJSON(object: FragmentType): string { return "OTHERS"; case FragmentType.SOURCE: return "SOURCE"; + case FragmentType.MVIEW: + return "MVIEW"; case FragmentType.SINK: return "SINK"; case FragmentType.UNRECOGNIZED: diff --git a/proto/stream_plan.proto b/proto/stream_plan.proto index c805635e9128..a48e21be46ea 100644 --- a/proto/stream_plan.proto +++ b/proto/stream_plan.proto @@ -534,8 +534,8 @@ enum FragmentType { FRAGMENT_UNSPECIFIED = 0; OTHERS = 1; SOURCE = 2; - // TODO: change it to MATERIALIZED_VIEW or other name, since we have sink type now. - SINK = 3; + MVIEW = 3; + SINK = 4; } message StreamFragmentGraph { diff --git a/src/frontend/src/stream_fragmenter/mod.rs b/src/frontend/src/stream_fragmenter/mod.rs index f08ead38a83a..291281f847ed 100644 --- a/src/frontend/src/stream_fragmenter/mod.rs +++ b/src/frontend/src/stream_fragmenter/mod.rs @@ -202,7 +202,9 @@ fn build_fragment( match stream_node.get_node_body()? { NodeBody::Source(_) => current_fragment.fragment_type = FragmentType::Source, - NodeBody::Materialize(_) => current_fragment.fragment_type = FragmentType::Sink, + NodeBody::Materialize(_) => current_fragment.fragment_type = FragmentType::Mview, + + NodeBody::Sink(_) => current_fragment.fragment_type = FragmentType::Sink, // TODO: Force singleton for TopN as a workaround. We should implement two phase TopN. NodeBody::TopN(_) => current_fragment.is_singleton = true, diff --git a/src/meta/src/barrier/command.rs b/src/meta/src/barrier/command.rs index f9e03e57d78c..2c169e55e74a 100644 --- a/src/meta/src/barrier/command.rs +++ b/src/meta/src/barrier/command.rs @@ -96,7 +96,7 @@ pub enum Command { /// will be set to `Created`. CreateStreamingJob { table_fragments: TableFragments, - table_sink_map: HashMap>, + table_mview_map: HashMap>, dispatchers: HashMap>, init_split_assignment: SplitAssignment, }, @@ -471,11 +471,11 @@ where Command::CreateStreamingJob { table_fragments, dispatchers, - table_sink_map, + table_mview_map, init_split_assignment, } => { - let mut dependent_table_actors = Vec::with_capacity(table_sink_map.len()); - for (table_id, actors) in table_sink_map { + let mut dependent_table_actors = Vec::with_capacity(table_mview_map.len()); + for (table_id, actors) in table_mview_map { let downstream_actors = dispatchers .iter() .filter(|(upstream_actor_id, _)| actors.contains(upstream_actor_id)) diff --git a/src/meta/src/manager/catalog/fragment.rs b/src/meta/src/manager/catalog/fragment.rs index 030f5dad65de..4af34896b2ae 100644 --- a/src/meta/src/manager/catalog/fragment.rs +++ b/src/meta/src/manager/catalog/fragment.rs @@ -93,7 +93,7 @@ pub struct FragmentVNodeInfo { #[derive(Default)] pub struct BuildGraphInfo { - pub table_sink_actor_ids: HashMap>, + pub table_mview_actor_ids: HashMap>, } pub type FragmentManagerRef = Arc>; @@ -309,7 +309,7 @@ where dependent_table .fragments .values_mut() - .filter(|f| f.fragment_type() == FragmentType::Sink) + .filter(|f| f.fragment_type() == FragmentType::Mview) .flat_map(|f| &mut f.actors) .for_each(|a| { a.dispatcher.retain_mut(|d| { @@ -820,12 +820,12 @@ where .collect::>>() } - pub async fn get_table_sink_actor_ids(&self, table_id: &TableId) -> MetaResult> { + pub async fn get_table_mview_actor_ids(&self, table_id: &TableId) -> MetaResult> { let map = &self.core.read().await.table_fragments; Ok(map .get(table_id) .context(format!("table_fragment not exist: id={}", table_id))? - .sink_actor_ids()) + .mview_actor_ids()) } // we will read three things at once, avoiding locking too much. @@ -837,17 +837,17 @@ where let mut info: BuildGraphInfo = Default::default(); for table_id in table_ids { - info.table_sink_actor_ids.insert( + info.table_mview_actor_ids.insert( *table_id, map.get(table_id) .context(format!("table_fragment not exist: id={}", table_id))? - .sink_actor_ids(), + .mview_actor_ids(), ); } Ok(info) } - pub async fn get_sink_vnode_bitmap_info( + pub async fn get_mview_vnode_bitmap_info( &self, table_ids: &HashSet, ) -> MetaResult)>>> { @@ -859,14 +859,14 @@ where *table_id, map.get(table_id) .context(format!("table_fragment not exist: id={}", table_id))? - .sink_vnode_bitmap_info(), + .mview_vnode_bitmap_info(), ); } Ok(info) } - pub async fn get_sink_fragment_vnode_info( + pub async fn get_mview_fragment_vnode_info( &self, table_ids: &HashSet, ) -> MetaResult> { @@ -880,8 +880,8 @@ where info.insert( *table_id, FragmentVNodeInfo { - actor_parallel_unit_maps: table_fragment.sink_actor_parallel_units(), - vnode_mapping: table_fragment.sink_vnode_mapping(), + actor_parallel_unit_maps: table_fragment.mview_actor_parallel_units(), + vnode_mapping: table_fragment.mview_vnode_mapping(), }, ); } diff --git a/src/meta/src/model/stream.rs b/src/meta/src/model/stream.rs index 2cce26ac1e1c..82db2bc15bfd 100644 --- a/src/meta/src/model/stream.rs +++ b/src/meta/src/model/stream.rs @@ -134,13 +134,13 @@ impl TableFragments { self.state = state; } - /// Returns sink fragment vnode mapping. - /// Note that: the real sink fragment is also stored as `TableFragments`, it's possible that - /// there's no fragment with `FragmentType::Sink` exists. - pub fn sink_vnode_mapping(&self) -> Option { + /// Returns mview fragment vnode mapping. + /// Note that: the sink fragment is also stored as `TableFragments`, it's possible that + /// there's no fragment with `FragmentType::Mview` exists. + pub fn mview_vnode_mapping(&self) -> Option { self.fragments .values() - .find(|fragment| fragment.fragment_type == FragmentType::Sink as i32) + .find(|fragment| fragment.fragment_type == FragmentType::Mview as i32) .and_then(|fragment| fragment.vnode_mapping.clone()) } @@ -185,9 +185,9 @@ impl TableFragments { Self::filter_actor_ids(self, FragmentType::Source) } - /// Returns sink actor ids. - pub fn sink_actor_ids(&self) -> Vec { - Self::filter_actor_ids(self, FragmentType::Sink) + /// Returns mview actor ids. + pub fn mview_actor_ids(&self) -> Vec { + Self::filter_actor_ids(self, FragmentType::Mview) } fn contains_chain(stream_node: &StreamNode) -> bool { @@ -398,11 +398,11 @@ impl TableFragments { } } - /// Returns sink actor vnode bitmap infos. - pub fn sink_vnode_bitmap_info(&self) -> Vec<(ActorId, Option)> { + /// Returns mview actor vnode bitmap infos. + pub fn mview_vnode_bitmap_info(&self) -> Vec<(ActorId, Option)> { self.fragments .values() - .filter(|fragment| fragment.fragment_type == FragmentType::Sink as i32) + .filter(|fragment| fragment.fragment_type == FragmentType::Mview as i32) .flat_map(|fragment| { fragment .actors @@ -412,8 +412,8 @@ impl TableFragments { .collect_vec() } - pub fn sink_actor_parallel_units(&self) -> BTreeMap { - let sink_actor_ids = self.sink_actor_ids(); + pub fn mview_actor_parallel_units(&self) -> BTreeMap { + let sink_actor_ids = self.mview_actor_ids(); sink_actor_ids .iter() .map(|actor_id| { diff --git a/src/meta/src/rpc/service/ddl_service.rs b/src/meta/src/rpc/service/ddl_service.rs index 3212e52f3765..3118b8f8db86 100644 --- a/src/meta/src/rpc/service/ddl_service.rs +++ b/src/meta/src/rpc/service/ddl_service.rs @@ -476,11 +476,11 @@ where streaming_job_name: stream_job.name(), streaming_definition: stream_job.mview_definition(), table_properties: stream_job.properties(), - table_sink_map: self + table_mview_map: self .fragment_manager .get_build_graph_info(&dependent_table_ids) .await? - .table_sink_actor_ids, + .table_mview_actor_ids, dependent_table_ids, ..Default::default() }; diff --git a/src/meta/src/stream/stream_graph.rs b/src/meta/src/stream/stream_graph.rs index 4f78766872af..7691d1585930 100644 --- a/src/meta/src/stream/stream_graph.rs +++ b/src/meta/src/stream/stream_graph.rs @@ -998,7 +998,7 @@ impl ActorGraphBuilder { } else if let Some(upstream_table_id) = upstream_table_id { // set fragment parallelism to the parallelism of its dependent table. let upstream_actors = ctx - .table_sink_map + .table_mview_map .get(&upstream_table_id) .expect("upstream actor should exist"); upstream_actors.len() as u32 diff --git a/src/meta/src/stream/stream_manager.rs b/src/meta/src/stream/stream_manager.rs index a49a77b66f5e..50b6134ab2f2 100644 --- a/src/meta/src/stream/stream_manager.rs +++ b/src/meta/src/stream/stream_manager.rs @@ -54,7 +54,7 @@ pub struct CreateStreamingJobContext { /// Upstream mview actor ids grouped by worker node id. pub upstream_worker_actors: HashMap>, /// Upstream mview actor ids grouped by table id. - pub table_sink_map: HashMap>, + pub table_mview_map: HashMap>, /// Dependent table ids pub dependent_table_ids: HashSet, /// Table id offset get from meta id generator. Used to calculate global unique table id. @@ -263,12 +263,12 @@ where let upstream_fragment_vnode_info = &self .fragment_manager - .get_sink_fragment_vnode_info(dependent_table_ids) + .get_mview_fragment_vnode_info(dependent_table_ids) .await?; let upstream_vnode_bitmap_info = &mut self .fragment_manager - .get_sink_vnode_bitmap_info(dependent_table_ids) + .get_mview_vnode_bitmap_info(dependent_table_ids) .await?; let tables_worker_actors = &self @@ -387,7 +387,7 @@ where CreateStreamingJobContext { dispatchers, upstream_worker_actors, - table_sink_map, + table_mview_map, dependent_table_ids, table_properties, chain_fragment_upstream_table_map, @@ -686,7 +686,7 @@ where .barrier_scheduler .run_command(Command::CreateStreamingJob { table_fragments, - table_sink_map: table_sink_map.clone(), + table_mview_map: table_mview_map.clone(), dispatchers: dispatchers.clone(), init_split_assignment: split_assignment, }) @@ -1060,7 +1060,7 @@ mod tests { 0, Fragment { fragment_id: 0, - fragment_type: FragmentType::Sink as i32, + fragment_type: FragmentType::Mview as i32, distribution_type: FragmentDistributionType::Hash as i32, actors: actors.clone(), ..Default::default() @@ -1102,15 +1102,15 @@ mod tests { ); } - let sink_actor_ids = services + let mview_actor_ids = services .fragment_manager - .get_table_sink_actor_ids(&table_id) + .get_table_mview_actor_ids(&table_id) .await?; let actor_ids = services .fragment_manager .get_table_actor_ids(&HashSet::from([table_id])) .await?; - assert_eq!(sink_actor_ids, (0..=3).collect::>()); + assert_eq!(mview_actor_ids, (0..=3).collect::>()); assert_eq!(actor_ids, (0..=3).collect::>()); // test drop materialized_view @@ -1145,7 +1145,7 @@ mod tests { 0, Fragment { fragment_id: 0, - fragment_type: FragmentType::Sink as i32, + fragment_type: FragmentType::Mview as i32, distribution_type: FragmentDistributionType::Hash as i32, actors: actors.clone(), ..Default::default() @@ -1191,9 +1191,9 @@ mod tests { ); } - let sink_actor_ids = services + let mview_actor_ids = services .fragment_manager - .get_table_sink_actor_ids(&table_id) + .get_table_mview_actor_ids(&table_id) .await .unwrap(); let actor_ids = services @@ -1201,7 +1201,7 @@ mod tests { .get_table_actor_ids(&HashSet::from([table_id])) .await .unwrap(); - assert_eq!(sink_actor_ids, (0..=3).collect::>()); + assert_eq!(mview_actor_ids, (0..=3).collect::>()); assert_eq!(actor_ids, (0..=3).collect::>()); let notify = Arc::new(Notify::new()); let notify1 = notify.clone(); diff --git a/src/meta/src/stream/test_fragmenter.rs b/src/meta/src/stream/test_fragmenter.rs index c73b49849d3c..6ee5608fd535 100644 --- a/src/meta/src/stream/test_fragmenter.rs +++ b/src/meta/src/stream/test_fragmenter.rs @@ -347,7 +347,7 @@ fn make_stream_fragments() -> Vec { fragments.push(StreamFragment { fragment_id: 0, node: Some(mview_node), - fragment_type: FragmentType::Sink as i32, + fragment_type: FragmentType::Mview as i32, is_singleton: true, table_ids_cnt: 0, upstream_table_ids: vec![], @@ -409,7 +409,7 @@ async fn test_fragmenter() -> MetaResult<()> { let table_fragments = TableFragments::new(TableId::default(), graph); let actors = table_fragments.actors(); let source_actor_ids = table_fragments.source_actor_ids(); - let sink_actor_ids = table_fragments.sink_actor_ids(); + let sink_actor_ids = table_fragments.mview_actor_ids(); let internal_table_ids = ctx.internal_table_ids(); assert_eq!(actors.len(), 9); assert_eq!(source_actor_ids, vec![6, 7, 8, 9]); From 39723fac5d8c242c95264aedb192bde5b966e965 Mon Sep 17 00:00:00 2001 From: StrikeW Date: Tue, 22 Nov 2022 19:02:23 +0800 Subject: [PATCH 5/6] fix(ci): fix main pipeline (#6516) * fix main ci * fix main-cron ci * remove unused arguments Co-authored-by: mergify[bot] <37929162+mergify[bot]@users.noreply.github.com> --- ci/workflows/main-cron.yml | 22 ++++++++++++++++++++-- ci/workflows/main.yml | 22 ++++++++++++++++++++-- ci/workflows/pull-request.yml | 3 +-- 3 files changed, 41 insertions(+), 6 deletions(-) diff --git a/ci/workflows/main-cron.yml b/ci/workflows/main-cron.yml index 50a9f5424ca1..222112ba30a5 100644 --- a/ci/workflows/main-cron.yml +++ b/ci/workflows/main-cron.yml @@ -32,6 +32,22 @@ steps: timeout_in_minutes: 20 retry: *auto-retry + - label: "build other components" + command: "ci/scripts/build-other.sh" + key: "build-other" + plugins: + - seek-oss/aws-sm#v2.3.1: + env: + GITHUB_TOKEN: github-token + - docker-compose#v3.9.0: + run: rw-build-env + config: ci/docker-compose.yml + mount-buildkite-agent: true + environment: + - GITHUB_TOKEN + timeout_in_minutes: 10 + retry: *auto-retry + - label: "build (deterministic simulation)" command: "ci/scripts/build-simulation.sh" key: "build-simulation" @@ -73,11 +89,13 @@ steps: - label: "end-to-end source test (release)" command: "ci/scripts/e2e-source-test.sh -p ci-release" - depends_on: "build" + depends_on: + - "build" + - "build-other" plugins: - gencer/cache#v2.4.10: *cargo-cache - docker-compose#v3.9.0: - run: rw-build-env + run: source-test-env config: ci/docker-compose.yml mount-buildkite-agent: true - ./ci/plugins/upload-failure-logs diff --git a/ci/workflows/main.yml b/ci/workflows/main.yml index 35333120b7e8..5f4968feb587 100644 --- a/ci/workflows/main.yml +++ b/ci/workflows/main.yml @@ -44,6 +44,22 @@ steps: timeout_in_minutes: 20 retry: *auto-retry + - label: "build other components" + command: "ci/scripts/build-other.sh" + key: "build-other" + plugins: + - seek-oss/aws-sm#v2.3.1: + env: + GITHUB_TOKEN: github-token + - docker-compose#v3.9.0: + run: rw-build-env + config: ci/docker-compose.yml + mount-buildkite-agent: true + environment: + - GITHUB_TOKEN + timeout_in_minutes: 10 + retry: *auto-retry + - label: "docslt" command: "ci/scripts/docslt.sh" key: "docslt" @@ -157,11 +173,13 @@ steps: - label: "end-to-end source test (release mode)" command: "ci/scripts/e2e-source-test.sh -p ci-release" - depends_on: "build-release" + depends_on: + - "build-release" + - "build-other" plugins: - gencer/cache#v2.4.10: *cargo-cache - docker-compose#v3.9.0: - run: rw-build-env + run: source-test-env config: ci/docker-compose.yml mount-buildkite-agent: true - ./ci/plugins/upload-failure-logs diff --git a/ci/workflows/pull-request.yml b/ci/workflows/pull-request.yml index 17e4caf6ffa1..1a3429750e16 100644 --- a/ci/workflows/pull-request.yml +++ b/ci/workflows/pull-request.yml @@ -44,10 +44,9 @@ steps: retry: *auto-retry - label: "build other components" - command: "ci/scripts/build-other.sh -t ci-dev -p ci-dev" + command: "ci/scripts/build-other.sh" key: "build-other" plugins: - - gencer/cache#v2.4.10: *cargo-cache - seek-oss/aws-sm#v2.3.1: env: GITHUB_TOKEN: github-token From d725b67cf83855e09040fb88523f859770013783 Mon Sep 17 00:00:00 2001 From: William Wen <44139337+wenym1@users.noreply.github.com> Date: Tue, 22 Nov 2022 19:48:00 +0800 Subject: [PATCH 6/6] feat(storage): support sled state store as a memory state store with higher capacity (#6451) * feat(storage): support sled state store for verify * impl RangeKvStateStore * support sled state store * support fuse semantic for iter * make clippy happy * flush at sync and disable sled debug log * support specifying sled state store * filter out false positive key * fix clippy and unit test --- Cargo.lock | 36 +++ src/storage/Cargo.toml | 1 + src/storage/hummock_sdk/src/key.rs | 34 +- src/storage/src/error.rs | 7 + src/storage/src/memory.rs | 499 +++++++++++++++++++++++------ src/storage/src/store_impl.rs | 35 ++ src/utils/runtime/src/lib.rs | 3 +- 7 files changed, 517 insertions(+), 98 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 097b33f8d2b9..b02e80ac5493 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2088,6 +2088,16 @@ dependencies = [ "syn", ] +[[package]] +name = "fs2" +version = "0.4.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9564fc758e15025b46aa6643b1b77d047d1a56a1aea6e01002ac0c7026876213" +dependencies = [ + "libc", + "winapi", +] + [[package]] name = "fs_extra" version = "1.2.0" @@ -2258,6 +2268,15 @@ dependencies = [ "slab", ] +[[package]] +name = "fxhash" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c31b6d751ae2c7f11320402d34e41349dd1016f8d5d45e48c4312bc8625af50c" +dependencies = [ + "byteorder", +] + [[package]] name = "generic-array" version = "0.14.6" @@ -5788,6 +5807,7 @@ dependencies = [ "risingwave_tracing", "scopeguard", "serde", + "sled", "smallvec", "spin 0.9.4", "sync-point", @@ -6387,6 +6407,22 @@ dependencies = [ "autocfg", ] +[[package]] +name = "sled" +version = "0.34.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7f96b4737c2ce5987354855aed3797279def4ebf734436c6aa4552cf8e169935" +dependencies = [ + "crc32fast", + "crossbeam-epoch", + "crossbeam-utils", + "fs2", + "fxhash", + "libc", + "log", + "parking_lot 0.11.2", +] + [[package]] name = "sluice" version = "0.5.5" diff --git a/src/storage/Cargo.toml b/src/storage/Cargo.toml index c3140c4a996f..bd61a66dc541 100644 --- a/src/storage/Cargo.toml +++ b/src/storage/Cargo.toml @@ -58,6 +58,7 @@ scopeguard = "1" # "static_libcpp", # ], optional = true } serde = { version = "1", features = ["derive"] } +sled = "0.34.7" smallvec = "1" spin = "0.9" sync-point = { path = "../utils/sync-point" } diff --git a/src/storage/hummock_sdk/src/key.rs b/src/storage/hummock_sdk/src/key.rs index ce7cf8d9bfb2..c79d21a7e4de 100644 --- a/src/storage/hummock_sdk/src/key.rs +++ b/src/storage/hummock_sdk/src/key.rs @@ -433,6 +433,12 @@ impl<'a> UserKey<&'a [u8]> { } } +impl> UserKey { + pub fn as_ref(&self) -> UserKey<&[u8]> { + UserKey::new(self.table_id, TableKey(self.table_key.as_ref())) + } +} + impl UserKey> { pub fn decode_length_prefixed(buf: &mut &[u8]) -> Self { let table_id = buf.get_u32(); @@ -448,10 +454,6 @@ impl UserKey> { self.table_key.0.extend_from_slice(other.table_key.as_ref()); } - pub fn as_ref(&self) -> UserKey<&[u8]> { - UserKey::new(self.table_id, TableKey(self.table_key.as_slice())) - } - /// Use this method to override an old `UserKey>` with a `UserKey<&[u8]>` to own the /// table key without reallocating a new `UserKey` object. pub fn set(&mut self, other: UserKey<&[u8]>) { @@ -513,6 +515,15 @@ impl> FullKey { buf } + pub fn encode_reverse_epoch(&self) -> Vec { + let mut buf = Vec::with_capacity( + TABLE_PREFIX_LEN + self.user_key.table_key.as_ref().len() + EPOCH_LEN, + ); + self.user_key.encode_into(&mut buf); + buf.put_u64(u64::MAX - self.epoch); + buf + } + pub fn is_empty(&self) -> bool { self.user_key.is_empty() } @@ -535,6 +546,17 @@ impl<'a> FullKey<&'a [u8]> { } } + /// Construct a [`FullKey`] from a byte slice. + pub fn decode_reverse_epoch(slice: &'a [u8]) -> Self { + let epoch_pos = slice.len() - EPOCH_LEN; + let epoch = (&slice[epoch_pos..]).get_u64(); + + Self { + user_key: UserKey::decode(&slice[..epoch_pos]), + epoch: u64::MAX - epoch, + } + } + pub fn to_vec(self) -> FullKey> { FullKey { user_key: self.user_key.to_vec(), @@ -543,14 +565,16 @@ impl<'a> FullKey<&'a [u8]> { } } -impl FullKey> { +impl> FullKey { pub fn to_ref(&self) -> FullKey<&[u8]> { FullKey { user_key: self.user_key.as_ref(), epoch: self.epoch, } } +} +impl FullKey> { /// Use this method to override an old `FullKey>` with a `FullKey<&[u8]>` to own the /// table key without reallocating a new `FullKey` object. pub fn set(&mut self, other: FullKey<&[u8]>) { diff --git a/src/storage/src/error.rs b/src/storage/src/error.rs index e57bef7fbec8..f6ab67885a58 100644 --- a/src/storage/src/error.rs +++ b/src/storage/src/error.rs @@ -31,6 +31,13 @@ pub enum StorageError { #[error("Deserialize row error {0}.")] DeserializeRow(ValueEncodingError), + + #[error("Sled error: {0}")] + Sled( + #[backtrace] + #[from] + sled::Error, + ), } pub type StorageResult = std::result::Result; diff --git a/src/storage/src/memory.rs b/src/storage/src/memory.rs index 93b2076e9008..c2090ff7d9a1 100644 --- a/src/storage/src/memory.rs +++ b/src/storage/src/memory.rs @@ -14,7 +14,6 @@ use std::collections::BTreeMap; use std::future::Future; -use std::iter::Fuse; use std::ops::Bound::{Excluded, Included, Unbounded}; use std::ops::{Bound, RangeBounds}; use std::sync::{Arc, LazyLock}; @@ -22,7 +21,7 @@ use std::sync::{Arc, LazyLock}; use bytes::Bytes; use parking_lot::RwLock; use risingwave_common::catalog::TableId; -use risingwave_hummock_sdk::key::{FullKey, FullKeyRange, TableKey, UserKey}; +use risingwave_hummock_sdk::key::{FullKey, TableKey, UserKey}; use risingwave_hummock_sdk::{HummockEpoch, HummockReadEpoch}; use crate::error::StorageResult; @@ -33,6 +32,253 @@ use crate::{ define_state_store_write_associated_type, StateStore, StateStoreIter, }; +pub type BytesFullKey = FullKey; +pub type BytesFullKeyRange = (Bound, Bound); + +#[allow(clippy::type_complexity)] +pub trait RangeKv: Clone + Send + Sync + 'static { + fn range( + &self, + range: BytesFullKeyRange, + limit: Option, + ) -> StorageResult>, Option)>>; + + fn ingest_batch( + &self, + kv_pairs: impl Iterator)>, + ) -> StorageResult<()>; + + fn flush(&self) -> StorageResult<()>; +} + +pub type BTreeMapRangeKv = Arc>>>; + +impl RangeKv for BTreeMapRangeKv { + fn range( + &self, + range: BytesFullKeyRange, + limit: Option, + ) -> StorageResult>, Option)>> { + let limit = limit.unwrap_or(usize::MAX); + Ok(self + .read() + .range(range) + .take(limit) + .map(|(key, value)| (key.to_ref().to_vec(), value.clone())) + .collect()) + } + + fn ingest_batch( + &self, + kv_pairs: impl Iterator)>, + ) -> StorageResult<()> { + let mut inner = self.write(); + for (key, value) in kv_pairs { + inner.insert(key, value); + } + Ok(()) + } + + fn flush(&self) -> StorageResult<()> { + Ok(()) + } +} + +pub mod sled { + use std::fs::create_dir_all; + use std::ops::RangeBounds; + + use bytes::Bytes; + use risingwave_hummock_sdk::key::FullKey; + + use crate::error::StorageResult; + use crate::memory::{BytesFullKey, BytesFullKeyRange, RangeKv, RangeKvStateStore}; + + #[derive(Clone)] + pub struct SledRangeKv { + inner: sled::Db, + } + + impl SledRangeKv { + pub fn new(path: impl AsRef) -> Self { + SledRangeKv { + inner: sled::open(path).expect("open"), + } + } + + pub fn new_temp() -> Self { + create_dir_all("./.risingwave/sled").expect("should create"); + let path = tempfile::TempDir::new_in("./.risingwave/sled") + .expect("find temp dir") + .into_path(); + Self::new(path) + } + } + + const EMPTY: u8 = 1; + const NON_EMPTY: u8 = 0; + + impl RangeKv for SledRangeKv { + fn range( + &self, + range: BytesFullKeyRange, + limit: Option, + ) -> StorageResult>, Option)>> { + let (left, right) = range; + let full_key_ref_bound = ( + left.as_ref().map(FullKey::to_ref), + right.as_ref().map(FullKey::to_ref), + ); + let left_encoded = left.as_ref().map(|key| key.to_ref().encode_reverse_epoch()); + let right_encoded = right + .as_ref() + .map(|key| key.to_ref().encode_reverse_epoch()); + let limit = limit.unwrap_or(usize::MAX); + let mut ret = vec![]; + for result in self.inner.range((left_encoded, right_encoded)).take(limit) { + let (key, value) = result?; + let full_key = FullKey::decode_reverse_epoch(key.as_ref()).to_vec(); + if !full_key_ref_bound.contains(&full_key.to_ref()) { + continue; + } + let value = match value.as_ref() { + [EMPTY] => None, + [NON_EMPTY, rest @ ..] => Some(Bytes::from(Vec::from(rest))), + _ => unreachable!("malformed value: {:?}", value), + }; + ret.push((full_key, value)) + } + Ok(ret) + } + + fn ingest_batch( + &self, + kv_pairs: impl Iterator)>, + ) -> StorageResult<()> { + let mut batch = sled::Batch::default(); + for (key, value) in kv_pairs { + let encoded_key = key.encode_reverse_epoch(); + let key = sled::IVec::from(encoded_key); + let mut buffer = + Vec::with_capacity(value.as_ref().map(|v| v.len()).unwrap_or_default() + 1); + if let Some(value) = value { + buffer.push(NON_EMPTY); + buffer.extend_from_slice(value.as_ref()); + } else { + buffer.push(EMPTY); + } + let value = sled::IVec::from(buffer); + batch.insert(key, value); + } + self.inner.apply_batch(batch)?; + Ok(()) + } + + fn flush(&self) -> StorageResult<()> { + Ok(self.inner.flush().map(|_| {})?) + } + } + + pub type SledStateStore = RangeKvStateStore; + + impl SledStateStore { + pub fn new(path: impl AsRef) -> Self { + RangeKvStateStore { + inner: SledRangeKv::new(path), + } + } + + pub fn new_temp() -> Self { + RangeKvStateStore { + inner: SledRangeKv::new_temp(), + } + } + } + + #[cfg(test)] + mod test { + use std::ops::{Bound, RangeBounds}; + + use bytes::Bytes; + use risingwave_common::catalog::TableId; + use risingwave_hummock_sdk::key::{FullKey, TableKey, UserKey}; + + use crate::memory::sled::SledRangeKv; + use crate::memory::RangeKv; + + #[test] + fn test_filter_variable_key_length_false_positive() { + let table_id = TableId { table_id: 233 }; + let epoch = u64::MAX - u64::from_be_bytes([1, 2, 3, 4, 5, 6, 7, 8]); + let excluded_short_table_key = [0, 1, 0, 0]; + let included_long_table_key = [0, 1, 0, 0, 1, 2]; + let left_table_key = [0, 1, 0, 0, 1]; + let right_table_key = [0, 1, 1, 1]; + + let to_full_key = |table_key: &[u8]| FullKey { + user_key: UserKey { + table_id, + table_key: TableKey(Bytes::from(table_key.to_vec())), + }, + epoch, + }; + + let left_full_key = to_full_key(&left_table_key[..]); + let right_full_key = to_full_key(&right_table_key[..]); + let included_long_full_key = to_full_key(&included_long_table_key[..]); + let excluded_short_full_key = to_full_key(&excluded_short_table_key[..]); + + assert!(( + Bound::Included(left_full_key.to_ref()), + Bound::Included(right_full_key.to_ref()) + ) + .contains(&included_long_full_key.to_ref())); + assert!(!( + Bound::Included(left_full_key.to_ref()), + Bound::Included(right_full_key.to_ref()) + ) + .contains(&excluded_short_full_key.to_ref())); + + let left_encoded = left_full_key.encode_reverse_epoch(); + let right_encoded = right_full_key.encode_reverse_epoch(); + + assert!(( + Bound::Included(left_encoded.clone()), + Bound::Included(right_encoded.clone()) + ) + .contains(&included_long_full_key.encode_reverse_epoch())); + assert!(( + Bound::Included(left_encoded), + Bound::Included(right_encoded) + ) + .contains(&excluded_short_full_key.encode_reverse_epoch())); + + let sled_range_kv = SledRangeKv::new_temp(); + sled_range_kv + .ingest_batch( + vec![ + (included_long_full_key.clone(), None), + (excluded_short_full_key, None), + ] + .into_iter(), + ) + .unwrap(); + let kvs = sled_range_kv + .range( + ( + Bound::Included(left_full_key), + Bound::Included(right_full_key), + ), + None, + ) + .unwrap(); + assert_eq!(1, kvs.len()); + assert_eq!(included_long_full_key.to_ref(), kvs[0].0.to_ref()); + assert!(kvs[0].1.is_none()); + } + } +} + mod batched_iter { use itertools::Itertools; @@ -44,14 +290,14 @@ mod batched_iter { /// /// Therefore, it's not guaranteed that we're iterating over a consistent snapshot of the map. /// Users should handle MVCC by themselves. - pub struct Iter { - inner: Arc>>, - range: (Bound, Bound), - current: std::vec::IntoIter<(K, V)>, + pub struct Iter { + inner: R, + range: BytesFullKeyRange, + current: std::vec::IntoIter<(FullKey>, Option)>, } - impl Iter { - pub fn new(inner: Arc>>, range: (Bound, Bound)) -> Self { + impl Iter { + pub fn new(inner: R, range: BytesFullKeyRange) -> Self { Self { inner, range, @@ -60,45 +306,46 @@ mod batched_iter { } } - impl Iter - where - K: Ord + Clone, - V: Clone, - { + impl Iter { const BATCH_SIZE: usize = 256; /// Get the next batch of records and fill the `current` buffer. - fn refill(&mut self) { + fn refill(&mut self) -> StorageResult<()> { assert!(self.current.is_empty()); - let batch: Vec<(K, V)> = self + let batch = self .inner - .read() - .range((self.range.0.as_ref(), self.range.1.as_ref())) - .take(Self::BATCH_SIZE) - .map(|(k, v)| (K::clone(k), V::clone(v))) + .range( + (self.range.0.clone(), self.range.1.clone()), + Some(Self::BATCH_SIZE), + )? + .into_iter() .collect_vec(); if let Some((last_key, _)) = batch.last() { - self.range.0 = Bound::Excluded(K::clone(last_key)); + let full_key = FullKey::new( + last_key.user_key.table_id, + TableKey(Bytes::from(last_key.user_key.table_key.0.clone())), + last_key.epoch, + ); + self.range.0 = Bound::Excluded(full_key); } self.current = batch.into_iter(); + Ok(()) } } - impl Iterator for Iter - where - K: Ord + Clone, - V: Clone, - { - type Item = (K, V); - - fn next(&mut self) -> Option { + impl Iter { + #[allow(clippy::type_complexity)] + pub fn next(&mut self) -> StorageResult>, Option)>> { match self.current.next() { - Some(r) => Some(r), + Some((key, value)) => Ok(Some((key.to_ref().to_vec(), value))), None => { - self.refill(); - self.current.next() + self.refill()?; + Ok(self + .current + .next() + .map(|(key, value)| (key.to_ref().to_vec(), value))) } } } @@ -107,20 +354,41 @@ mod batched_iter { #[cfg(test)] mod tests { use rand::Rng; + use risingwave_hummock_sdk::key::FullKey; use super::*; + use crate::memory::sled::SledRangeKv; #[test] - fn test_iter_chaos() { + fn test_btreemap_iter_chaos() { + let map = Arc::new(RwLock::new(BTreeMap::new())); + test_iter_chaos_inner(map, 1000); + } + + #[cfg(not(madsim))] + #[test] + fn test_sled_iter_chaos() { + let map = SledRangeKv::new_temp(); + test_iter_chaos_inner(map, 100); + } + + fn test_iter_chaos_inner(map: impl RangeKv, count: usize) { let key_range = 1..=10000; - let map: BTreeMap> = key_range - .clone() - .map(|k| (k, k.to_string().into())) - .collect(); - let map = Arc::new(RwLock::new(map)); + let num_to_bytes = |k: i32| Bytes::from(format!("{:06}", k).as_bytes().to_vec()); + let num_to_full_key = + |k: i32| FullKey::new(TableId::default(), TableKey(num_to_bytes(k)), 0); + #[allow(clippy::mutable_key_type)] + map.ingest_batch(key_range.clone().map(|k| { + let key = num_to_full_key(k); + let b = key.user_key.table_key.0.clone(); + + (key, Some(b)) + })) + .unwrap(); let rand_bound = || { let key = rand::thread_rng().gen_range(key_range.clone()); + let key = num_to_full_key(key); match rand::thread_rng().gen_range(1..=5) { 1 | 2 => Bound::Included(key), 3 | 4 => Bound::Excluded(key), @@ -128,7 +396,7 @@ mod batched_iter { } }; - for _ in 0..1000 { + for _ in 0..count { let range = loop { let range = (rand_bound(), rand_bound()); let (start, end) = (range.start_bound(), range.end_bound()); @@ -148,33 +416,37 @@ mod batched_iter { } }; - let v1 = Iter::new(map.clone(), range).collect_vec(); - let v2 = map - .read() - .range(range) - .map(|(&k, v)| (k, v.clone())) - .collect_vec(); + let v1 = { + let mut v = vec![]; + let mut iter = Iter::new(map.clone(), range.clone()); + while let Some((key, value)) = iter.next().unwrap() { + v.push((key, value)); + } + v + }; + let v2 = map.range(range, None).unwrap(); - // Items iterated from the batched iterator should be the same as normal iterator. + // Items iterated from the batched iterator should be the same as normaliterator. assert_eq!(v1, v2); } } } } +pub type MemoryStateStore = RangeKvStateStore; + /// An in-memory state store /// /// The in-memory state store is a [`BTreeMap`], which maps [`FullKey`] to value. It /// never does GC, so the memory usage will be high. Therefore, in-memory state store should never /// be used in production. #[derive(Clone, Default)] -pub struct MemoryStateStore { +pub struct RangeKvStateStore { /// Stores (key, epoch) -> user value. - #[allow(clippy::type_complexity)] - inner: Arc>, Option>>>, + inner: R, } -fn to_full_key_range(table_id: TableId, table_key_range: R) -> FullKeyRange +fn to_full_key_range(table_id: TableId, table_key_range: R) -> BytesFullKeyRange where R: RangeBounds + Send, B: AsRef<[u8]>, @@ -182,28 +454,36 @@ where let start = match table_key_range.start_bound() { Included(k) => Included(FullKey::new( table_id, - TableKey(k.as_ref().to_vec()), + TableKey(Bytes::from(k.as_ref().to_vec())), HummockEpoch::MAX, )), - Excluded(k) => Excluded(FullKey::new(table_id, TableKey(k.as_ref().to_vec()), 0)), + Excluded(k) => Excluded(FullKey::new( + table_id, + TableKey(Bytes::from(k.as_ref().to_vec())), + 0, + )), Unbounded => Included(FullKey::new( table_id, - TableKey(b"".to_vec()), + TableKey(Bytes::from(b"".to_vec())), HummockEpoch::MAX, )), }; let end = match table_key_range.end_bound() { - Included(k) => Included(FullKey::new(table_id, TableKey(k.as_ref().to_vec()), 0)), + Included(k) => Included(FullKey::new( + table_id, + TableKey(Bytes::from(k.as_ref().to_vec())), + 0, + )), Excluded(k) => Excluded(FullKey::new( table_id, - TableKey(k.as_ref().to_vec()), + TableKey(Bytes::from(k.as_ref().to_vec())), HummockEpoch::MAX, )), Unbounded => { if let Some(next_table_id) = table_id.table_id().checked_add(1) { Excluded(FullKey::new( next_table_id.into(), - TableKey(b"".to_vec()), + TableKey(Bytes::from(b"".to_vec())), HummockEpoch::MAX, )) } else { @@ -223,7 +503,9 @@ impl MemoryStateStore { static STORE: LazyLock = LazyLock::new(MemoryStateStore::new); STORE.clone() } +} +impl RangeKvStateStore { fn scan( &self, key_range: (Bound>, Bound>), @@ -235,10 +517,11 @@ impl MemoryStateStore { if limit == Some(0) { return Ok(vec![]); } - let inner = self.inner.read(); - let mut last_user_key = None; - for (key, value) in inner.range(to_full_key_range(table_id, key_range)) { + for (key, value) in self + .inner + .range(to_full_key_range(table_id, key_range), None)? + { if key.epoch > epoch { continue; } @@ -256,8 +539,8 @@ impl MemoryStateStore { } } -impl StateStoreRead for MemoryStateStore { - type Iter = MemoryStateStoreIter; +impl StateStoreRead for RangeKvStateStore { + type Iter = RangeKvStateStoreIter; define_state_store_read_associated_type!(); @@ -287,7 +570,7 @@ impl StateStoreRead for MemoryStateStore { read_options: ReadOptions, ) -> Self::IterFuture<'_> { async move { - Ok(MemoryStateStoreIter::new( + Ok(RangeKvStateStoreIter::new( batched_iter::Iter::new( self.inner.clone(), to_full_key_range(read_options.table_id, key_range), @@ -298,7 +581,7 @@ impl StateStoreRead for MemoryStateStore { } } -impl StateStoreWrite for MemoryStateStore { +impl StateStoreWrite for RangeKvStateStore { define_state_store_write_associated_type!(); fn ingest_batch( @@ -309,23 +592,23 @@ impl StateStoreWrite for MemoryStateStore { ) -> Self::IngestBatchFuture<'_> { async move { let epoch = write_options.epoch; - let mut inner = self.inner.write(); - let mut size: usize = 0; - for (key, value) in kv_pairs { - size += key.len() + value.size(); - inner.insert( - FullKey::new(write_options.table_id, TableKey(key.to_vec()), epoch), - value.user_value, - ); - } + let mut size = 0; + self.inner + .ingest_batch(kv_pairs.into_iter().map(|(key, value)| { + size += key.len() + value.size(); + ( + FullKey::new(write_options.table_id, TableKey(key), epoch), + value.user_value, + ) + }))?; Ok(size) } } } -impl LocalStateStore for MemoryStateStore {} +impl LocalStateStore for RangeKvStateStore {} -impl StateStore for MemoryStateStore { +impl StateStore for RangeKvStateStore { type Local = Self; type NewLocalFuture<'a> = impl Future + Send + 'a; @@ -341,6 +624,7 @@ impl StateStore for MemoryStateStore { fn sync(&self, _epoch: u64) -> Self::SyncFuture<'_> { async move { + self.inner.flush()?; // memory backend doesn't need to push to S3, so this is a no-op Ok(SyncResult { ..Default::default() @@ -359,57 +643,88 @@ impl StateStore for MemoryStateStore { } } -pub struct MemoryStateStoreIter { - inner: Fuse>, Option>>, +pub struct RangeKvStateStoreIter { + inner: batched_iter::Iter, epoch: HummockEpoch, last_key: Option>>, + + /// For supporting semantic of `Fuse` + stopped: bool, } -impl MemoryStateStoreIter { - pub fn new( - inner: batched_iter::Iter>, Option>, - epoch: HummockEpoch, - ) -> Self { +impl RangeKvStateStoreIter { + pub fn new(inner: batched_iter::Iter, epoch: HummockEpoch) -> Self { Self { - inner: inner.fuse(), + inner, epoch, last_key: None, + stopped: false, } } } -impl StateStoreIter for MemoryStateStoreIter { +impl StateStoreIter for RangeKvStateStoreIter { type Item = (FullKey>, Bytes); type NextFuture<'a> = impl Future>> + Send + 'a; fn next(&mut self) -> Self::NextFuture<'_> { async move { - for (key, value) in self.inner.by_ref() { - if key.epoch > self.epoch { - continue; - } - if Some(&key.user_key) != self.last_key.as_ref() { - self.last_key = Some(key.user_key.clone()); - if let Some(value) = value { - return Ok(Some((key, value))); + if self.stopped { + Ok(None) + } else { + let ret = self.next_inner(); + match &ret { + Err(_) | Ok(None) => { + self.stopped = true; } + _ => {} } + + ret } - Ok(None) } } } +impl RangeKvStateStoreIter { + fn next_inner(&mut self) -> StorageResult>, Bytes)>> { + while let Some((key, value)) = self.inner.next()? { + if key.epoch > self.epoch { + continue; + } + if Some(key.user_key.as_ref()) != self.last_key.as_ref().map(|key| key.as_ref()) { + self.last_key = Some(key.user_key.clone()); + if let Some(value) = value { + return Ok(Some((key, value))); + } + } + } + Ok(None) + } +} + #[cfg(test)] mod tests { use super::*; + use crate::memory::sled::SledStateStore; #[tokio::test] - async fn test_snapshot_isolation() { + async fn test_snapshot_isolation_memory() { let state_store = MemoryStateStore::new(); + test_snapshot_isolation_inner(state_store).await; + } + + #[cfg(not(madsim))] + #[tokio::test] + async fn test_snapshot_isolation_sled() { + let state_store = SledStateStore::new_temp(); + test_snapshot_isolation_inner(state_store).await; + } + + async fn test_snapshot_isolation_inner(state_store: RangeKvStateStore) { state_store .ingest_batch( vec![ diff --git a/src/storage/src/store_impl.rs b/src/storage/src/store_impl.rs index 2e2d1e559fda..2f875caa80aa 100644 --- a/src/storage/src/store_impl.rs +++ b/src/storage/src/store_impl.rs @@ -30,6 +30,7 @@ use crate::hummock::{ HummockStorage, HummockStorageV1, MemoryLimiter, SstableIdManagerRef, SstableStore, TieredCache, TieredCacheMetricsBuilder, }; +use crate::memory::sled::SledStateStore; use crate::memory::MemoryStateStore; use crate::monitor::{MonitoredStateStore as Monitored, ObjectStoreMetrics, StateStoreMetrics}; use crate::StateStore; @@ -37,6 +38,7 @@ use crate::StateStore; pub type HummockStorageType = impl StateStore + AsHummockTrait; pub type HummockStorageV1Type = impl StateStore + AsHummockTrait; pub type MemoryStateStoreType = impl StateStore + AsHummockTrait; +pub type SledStateStoreType = impl StateStore + AsHummockTrait; /// The type erased [`StateStore`]. #[derive(Clone, EnumAsInner)] @@ -56,6 +58,7 @@ pub enum StateStoreImpl { /// store misses some critical implementation to ensure the correctness of persisting streaming /// state. (e.g., no read_epoch support, no async checkpoint) MemoryStateStore(Monitored), + SledStateStore(Monitored), } fn may_dynamic_dispatch( @@ -97,6 +100,10 @@ impl StateStoreImpl { Self::HummockStateStoreV1(may_dynamic_dispatch(state_store).monitored(state_store_metrics)) } + pub fn sled(state_store: SledStateStore, state_store_metrics: Arc) -> Self { + Self::SledStateStore(may_dynamic_dispatch(state_store).monitored(state_store_metrics)) + } + pub fn shared_in_memory_store(state_store_metrics: Arc) -> Self { Self::in_memory(MemoryStateStore::shared(), state_store_metrics) } @@ -149,6 +156,7 @@ impl Debug for StateStoreImpl { StateStoreImpl::HummockStateStore(_) => write!(f, "HummockStateStore"), StateStoreImpl::HummockStateStoreV1(_) => write!(f, "HummockStateStoreV1"), StateStoreImpl::MemoryStateStore(_) => write!(f, "MemoryStateStore"), + StateStoreImpl::SledStateStore(_) => write!(f, "SledStateStore"), } } } @@ -173,6 +181,20 @@ macro_rules! dispatch_state_store { } } + StateStoreImpl::SledStateStore($store) => { + // WARNING: don't change this. Enabling memory backend will cause monomorphization + // explosion and thus slow compile time in release mode. + #[cfg(debug_assertions)] + { + $body + } + #[cfg(not(debug_assertions))] + { + let _store = $store; + unimplemented!("sled state store should never be used in release mode"); + } + } + StateStoreImpl::HummockStateStore($store) => $body, StateStoreImpl::HummockStateStoreV1($store) => $body, @@ -280,6 +302,12 @@ impl StateStoreImpl { StateStoreImpl::shared_in_memory_store(state_store_stats.clone()) } + sled if sled.starts_with("sled://") => { + tracing::warn!("sled state store should never be used in end-to-end benchmarks or production environment. Scaling and recovery are not supported."); + let path = sled.strip_prefix("sled://").unwrap(); + StateStoreImpl::sled(SledStateStore::new(path), state_store_stats.clone()) + } + other => unimplemented!("{} state store is not supported", other), }; @@ -360,6 +388,13 @@ impl AsHummockTrait for MemoryStateStore { None } } + +impl AsHummockTrait for SledStateStore { + fn as_hummock_trait(&self) -> Option<&dyn HummockTrait> { + None + } +} + #[cfg(debug_assertions)] pub mod boxed_state_store { use std::future::Future; diff --git a/src/utils/runtime/src/lib.rs b/src/utils/runtime/src/lib.rs index d192be690198..09af6321c343 100644 --- a/src/utils/runtime/src/lib.rs +++ b/src/utils/runtime/src/lib.rs @@ -108,7 +108,8 @@ pub fn init_risingwave_logger(settings: LoggerSettings) { .with_target("tonic", Level::WARN) .with_target("isahc", Level::WARN) .with_target("console_subscriber", Level::WARN) - .with_target("reqwest", Level::WARN); + .with_target("reqwest", Level::WARN) + .with_target("sled", Level::INFO); // Configure RisingWave's own crates to log at TRACE level, uncomment the following line if // needed.