@@ -47,6 +47,7 @@ pub struct DynamicFilterExecutor<S: StateStore> {
47
47
comparator : ExprNodeType ,
48
48
range_cache : RangeCache < S > ,
49
49
right_table : StateTable < S > ,
50
+ is_right_table_writer : bool ,
50
51
actor_id : u64 ,
51
52
schema : Schema ,
52
53
metrics : Arc < StreamingMetrics > ,
@@ -63,6 +64,7 @@ impl<S: StateStore> DynamicFilterExecutor<S> {
63
64
comparator : ExprNodeType ,
64
65
state_table_l : StateTable < S > ,
65
66
state_table_r : StateTable < S > ,
67
+ is_right_table_writer : bool ,
66
68
actor_id : u64 ,
67
69
metrics : Arc < StreamingMetrics > ,
68
70
) -> Self {
@@ -76,6 +78,7 @@ impl<S: StateStore> DynamicFilterExecutor<S> {
76
78
comparator,
77
79
range_cache : RangeCache :: new ( state_table_l, 0 , usize:: MAX ) ,
78
80
right_table : state_table_r,
81
+ is_right_table_writer,
79
82
actor_id,
80
83
metrics,
81
84
schema,
@@ -240,6 +243,7 @@ impl<S: StateStore> DynamicFilterExecutor<S> {
240
243
241
244
let mut prev_epoch_value: Option < Datum > = None ;
242
245
let mut current_epoch_value: Option < Datum > = None ;
246
+ let mut current_epoch_row = None ;
243
247
let mut epoch: u64 = 0 ;
244
248
245
249
let aligned_stream = barrier_align (
@@ -288,11 +292,10 @@ impl<S: StateStore> DynamicFilterExecutor<S> {
288
292
Op :: UpdateInsert | Op :: Insert => {
289
293
last_is_insert = true ;
290
294
current_epoch_value = Some ( row. value_at ( 0 ) . to_owned_datum ( ) ) ;
291
- // self.right_table.insert (row.to_owned_row())? ;
295
+ current_epoch_row = Some ( row. to_owned_row ( ) ) ;
292
296
}
293
- Op :: UpdateDelete | Op :: Delete => {
297
+ _ => {
294
298
last_is_insert = false ;
295
- // self.right_table.delete(row.to_owned_row())?;
296
299
}
297
300
}
298
301
}
@@ -330,7 +333,14 @@ impl<S: StateStore> DynamicFilterExecutor<S> {
330
333
yield Message :: Chunk ( chunk) ;
331
334
}
332
335
}
333
- self . right_table . commit ( epoch) . await ?;
336
+
337
+ if self . is_right_table_writer {
338
+ if let Some ( row) = current_epoch_row. take ( ) {
339
+ assert_eq ! ( epoch, barrier. epoch. prev) ;
340
+ self . right_table . insert ( row) ?;
341
+ self . right_table . commit ( epoch) . await ?;
342
+ }
343
+ }
334
344
335
345
self . range_cache . flush ( ) . await ?;
336
346
0 commit comments