Skip to content

Commit

Permalink
fix(stream): careful cache invadiation for TopN (#8659)
Browse files Browse the repository at this point in the history
Co-authored-by: st1page <1245835950@qq.com>
  • Loading branch information
xxchan and st1page authored Mar 21, 2023
1 parent 9abe5dc commit 3a06226
Show file tree
Hide file tree
Showing 11 changed files with 245 additions and 91 deletions.
2 changes: 1 addition & 1 deletion Makefile.toml
Original file line number Diff line number Diff line change
Expand Up @@ -759,7 +759,7 @@ else
fi
cd "${JAVA_DIR}"
"${MAVEN_PATH}" spotless:check
"${MAVEN_PATH}" spotless:check -q
"""

[tasks.check-java-fix]
Expand Down
54 changes: 54 additions & 0 deletions e2e_test/streaming/bug_fixes/issue_8570.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
# https://github.com/risingwavelabs/risingwave/issues/8570
# TopN cache invalidation issue

statement ok
SET RW_IMPLICIT_FLUSH TO true;

statement ok
create table t(x int);

statement ok
create materialized view t_singleton as select * from t order by x limit 100;

statement ok
create materialized view mv as select * from t_singleton order by x limit 1;

statement ok
insert into t values (1), (2), (3), (4);

statement ok
delete from t where x = 2;

statement ok
insert into t values (5);

statement ok
delete from t where x = 1;

statement ok
insert into t values (6);

statement ok
delete from t where x = 3;

# Shouldn't be 5
query I
select * from mv;
----
4

statement ok
delete from t where x = 4;

# Shouldn't panic
statement ok
insert into t values (1);

statement ok
drop materialized view mv;

statement ok
drop materialized view t_singleton;

statement ok
drop table t;
22 changes: 22 additions & 0 deletions src/common/src/row/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,15 @@
// limitations under the License.

use std::borrow::Cow;
use std::fmt::Display;
use std::hash::{BuildHasher, Hasher};

use bytes::{BufMut, Bytes, BytesMut};
use itertools::Itertools;

use self::empty::EMPTY;
use crate::hash::HashCode;
use crate::types::to_text::ToText;
use crate::types::{hash_datum, DatumRef, ToDatumRef, ToOwnedDatum};
use crate::util::ordered::OrderedRowSerde;
use crate::util::value_encoding;
Expand Down Expand Up @@ -145,6 +148,25 @@ pub trait RowExt: Row {
{
assert_row(Project::new(self, indices))
}

fn display(&self) -> impl Display + '_ {
struct D<'a, T: Row>(&'a T);
impl<'a, T: Row> Display for D<'a, T> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"{}",
self.0.iter().format_with(" | ", |datum, f| {
match datum {
None => f(&"NULL"),
Some(scalar) => f(&format_args!("{}", scalar.to_text())),
}
})
)
}
}
D(self)
}
}

impl<R: Row> RowExt for R {}
Expand Down
5 changes: 3 additions & 2 deletions src/stream/src/executor/managed_state/top_n/top_n_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -354,9 +354,10 @@ mod tests {

#[tokio::test]
async fn test_managed_top_n_state_fill_cache() {
let data_types = vec![DataType::Varchar, DataType::Int64];
let state_table = {
let mut tb = create_in_memory_state_table(
&[DataType::Varchar, DataType::Int64],
&data_types,
&[OrderType::ascending(), OrderType::ascending()],
&[0, 1],
)
Expand All @@ -382,7 +383,7 @@ mod tests {
let rows = vec![row1, row2, row3, row4, row5];
let ordered_rows = vec![row1_bytes, row2_bytes, row3_bytes, row4_bytes, row5_bytes];

let mut cache = TopNCache::<false>::new(1, 1);
let mut cache = TopNCache::<false>::new(1, 1, data_types);

managed_state.insert(rows[3].clone());
managed_state.insert(rows[1].clone());
Expand Down
13 changes: 7 additions & 6 deletions src/stream/src/executor/top_n/group_top_n.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ use crate::executor::{ActorContextRef, Executor, ExecutorInfo, PkIndices, Waterm
use crate::task::AtomicU64Ref;

pub type GroupTopNExecutor<K, S, const WITH_TIES: bool> =
TopNExecutorWrapper<InnerGroupTopNExecutorNew<K, S, WITH_TIES>>;
TopNExecutorWrapper<InnerGroupTopNExecutor<K, S, WITH_TIES>>;

impl<K: HashKey, S: StateStore, const WITH_TIES: bool> GroupTopNExecutor<K, S, WITH_TIES> {
#[allow(clippy::too_many_arguments)]
Expand All @@ -56,7 +56,7 @@ impl<K: HashKey, S: StateStore, const WITH_TIES: bool> GroupTopNExecutor<K, S, W
Ok(TopNExecutorWrapper {
input,
ctx,
inner: InnerGroupTopNExecutorNew::new(
inner: InnerGroupTopNExecutor::new(
info,
storage_key,
offset_and_limit,
Expand All @@ -70,7 +70,7 @@ impl<K: HashKey, S: StateStore, const WITH_TIES: bool> GroupTopNExecutor<K, S, W
}
}

pub struct InnerGroupTopNExecutorNew<K: HashKey, S: StateStore, const WITH_TIES: bool> {
pub struct InnerGroupTopNExecutor<K: HashKey, S: StateStore, const WITH_TIES: bool> {
info: ExecutorInfo,

/// `LIMIT XXX`. None means no limit.
Expand All @@ -94,7 +94,7 @@ pub struct InnerGroupTopNExecutorNew<K: HashKey, S: StateStore, const WITH_TIES:
cache_key_serde: CacheKeySerde,
}

impl<K: HashKey, S: StateStore, const WITH_TIES: bool> InnerGroupTopNExecutorNew<K, S, WITH_TIES> {
impl<K: HashKey, S: StateStore, const WITH_TIES: bool> InnerGroupTopNExecutor<K, S, WITH_TIES> {
#[allow(clippy::too_many_arguments)]
pub fn new(
input_info: ExecutorInfo,
Expand Down Expand Up @@ -158,7 +158,7 @@ impl<K: HashKey, const WITH_TIES: bool> DerefMut for GroupTopNCache<K, WITH_TIES

#[async_trait]
impl<K: HashKey, S: StateStore, const WITH_TIES: bool> TopNExecutorBase
for InnerGroupTopNExecutorNew<K, S, WITH_TIES>
for InnerGroupTopNExecutor<K, S, WITH_TIES>
where
TopNCache<WITH_TIES>: TopNCacheTrait,
{
Expand All @@ -178,7 +178,8 @@ where
// If 'self.caches' does not already have a cache for the current group, create a new
// cache for it and insert it into `self.caches`
if !self.caches.contains(group_cache_key) {
let mut topn_cache = TopNCache::new(self.offset, self.limit);
let mut topn_cache =
TopNCache::new(self.offset, self.limit, self.schema().data_types());
self.managed_state
.init_topn_cache(Some(group_key), &mut topn_cache)
.await?;
Expand Down
14 changes: 7 additions & 7 deletions src/stream/src/executor/top_n/group_top_n_appendonly.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ use crate::task::AtomicU64Ref;
/// to keep all the data records/rows that have been seen. As long as a record
/// is no longer being in the result set, it can be deleted.
pub type AppendOnlyGroupTopNExecutor<K, S, const WITH_TIES: bool> =
TopNExecutorWrapper<InnerAppendOnlyGroupTopNExecutorNew<K, S, WITH_TIES>>;
TopNExecutorWrapper<InnerAppendOnlyGroupTopNExecutor<K, S, WITH_TIES>>;

impl<K: HashKey, S: StateStore, const WITH_TIES: bool>
AppendOnlyGroupTopNExecutor<K, S, WITH_TIES>
Expand All @@ -75,7 +75,7 @@ impl<K: HashKey, S: StateStore, const WITH_TIES: bool>
Ok(TopNExecutorWrapper {
input,
ctx,
inner: InnerAppendOnlyGroupTopNExecutorNew::new(
inner: InnerAppendOnlyGroupTopNExecutor::new(
info,
storage_key,
offset_and_limit,
Expand All @@ -89,7 +89,7 @@ impl<K: HashKey, S: StateStore, const WITH_TIES: bool>
}
}

pub struct InnerAppendOnlyGroupTopNExecutorNew<K: HashKey, S: StateStore, const WITH_TIES: bool> {
pub struct InnerAppendOnlyGroupTopNExecutor<K: HashKey, S: StateStore, const WITH_TIES: bool> {
info: ExecutorInfo,

/// `LIMIT XXX`. None means no limit.
Expand All @@ -114,7 +114,7 @@ pub struct InnerAppendOnlyGroupTopNExecutorNew<K: HashKey, S: StateStore, const
}

impl<K: HashKey, S: StateStore, const WITH_TIES: bool>
InnerAppendOnlyGroupTopNExecutorNew<K, S, WITH_TIES>
InnerAppendOnlyGroupTopNExecutor<K, S, WITH_TIES>
{
#[allow(clippy::too_many_arguments)]
pub fn new(
Expand Down Expand Up @@ -153,7 +153,7 @@ impl<K: HashKey, S: StateStore, const WITH_TIES: bool>
}
#[async_trait]
impl<K: HashKey, S: StateStore, const WITH_TIES: bool> TopNExecutorBase
for InnerAppendOnlyGroupTopNExecutorNew<K, S, WITH_TIES>
for InnerAppendOnlyGroupTopNExecutor<K, S, WITH_TIES>
where
TopNCache<WITH_TIES>: AppendOnlyTopNCacheTrait,
{
Expand All @@ -164,7 +164,7 @@ where
let keys = K::build(&self.group_by, chunk.data_chunk())?;

let data_types = self.schema().data_types();
let row_deserializer = RowDeserializer::new(data_types);
let row_deserializer = RowDeserializer::new(data_types.clone());

for ((op, row_ref), group_cache_key) in chunk.rows().zip_eq_debug(keys.iter()) {
// The pk without group by
Expand All @@ -176,7 +176,7 @@ where
// If 'self.caches' does not already have a cache for the current group, create a new
// cache for it and insert it into `self.caches`
if !self.caches.contains(group_cache_key) {
let mut topn_cache = TopNCache::new(self.offset, self.limit);
let mut topn_cache = TopNCache::new(self.offset, self.limit, data_types.clone());
self.managed_state
.init_topn_cache(Some(group_key), &mut topn_cache)
.await?;
Expand Down
3 changes: 2 additions & 1 deletion src/stream/src/executor/top_n/top_n_appendonly.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ impl<S: StateStore, const WITH_TIES: bool> InnerAppendOnlyTopNExecutor<S, WITH_T
let cache_key_serde =
create_cache_key_serde(&storage_key, &pk_indices, &schema, &order_by, &[]);
let managed_state = ManagedTopNState::<S>::new(state_table, cache_key_serde.clone());
let data_types = schema.data_types();

Ok(Self {
info: ExecutorInfo {
Expand All @@ -138,7 +139,7 @@ impl<S: StateStore, const WITH_TIES: bool> InnerAppendOnlyTopNExecutor<S, WITH_T
},
managed_state,
storage_key_indices: storage_key.into_iter().map(|op| op.column_index).collect(),
cache: TopNCache::new(num_offset, num_limit),
cache: TopNCache::new(num_offset, num_limit, data_types),
cache_key_serde,
})
}
Expand Down
Loading

0 comments on commit 3a06226

Please sign in to comment.