Skip to content

Commit

Permalink
feat(frontend): support alter source pause/resume (#19636)
Browse files Browse the repository at this point in the history
  • Loading branch information
lyang24 authored Dec 3, 2024
1 parent 845ed68 commit 6f2d0c3
Show file tree
Hide file tree
Showing 5 changed files with 182 additions and 5 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
control substitution on

statement ok
SET streaming_use_shared_source TO false;

############## Create kafka seed data

statement ok
create table kafka_seed_data (v1 int);

statement ok
insert into kafka_seed_data select * from generate_series(1, 1000);

############## Sink into kafka

statement ok
create sink kafka_sink
from
kafka_seed_data with (
${RISEDEV_KAFKA_WITH_OPTIONS_COMMON},
topic = 'test_rate_limit',
type = 'append-only',
force_append_only='true'
);

############## Source from kafka (rate_limit = 0)

# Wait for the topic to create
skipif in-memory
sleep 5s

statement ok
create source kafka_source (v1 int) with (
${RISEDEV_KAFKA_WITH_OPTIONS_COMMON},
topic = 'test_rate_limit',
scan.startup.mode = 'earliest',
) FORMAT PLAIN ENCODE JSON


statement ok
create source kafka_source2 (v1 int) with (
${RISEDEV_KAFKA_WITH_OPTIONS_COMMON},
topic = 'test_rate_limit',
scan.startup.mode = 'earliest',
source_rate_limit = 100,
) FORMAT PLAIN ENCODE JSON

statement ok
flush;

############## Check data

skipif in-memory
sleep 3s

############## Create MV on source

# This should be ignored.
statement ok
SET SOURCE_RATE_LIMIT=1000;

statement ok
create materialized view rl_mv1 as select count(*) from kafka_source;

statement ok
create materialized view rl_mv2 as select count(*) from kafka_source;

statement ok
create materialized view rl_mv3 as select count(*) from kafka_source;

skipif in-memory
statement count 0
alter source kafka_source pause;

skipif in-memory
statement error
alter source kafka_source2 pause;

query T
select name, node_name, fragment_type, rate_limit from rw_rate_limit join rw_relations on table_id=id
order by name;
----
rl_mv1 SOURCE {SOURCE} 0
rl_mv2 SOURCE {SOURCE} 0
rl_mv3 SOURCE {SOURCE} 0


skipif in-memory
statement count 0
alter source kafka_source resume;

# rate limit becomes None
query T
select count(*) from rw_rate_limit;
----
0

skipif in-memory
sleep 3s

skipif in-memory
query I
select count(*) > 0 from rl_mv1;
----
t

skipif in-memory
query I
select count(*) > 0 from rl_mv2;
----
t

skipif in-memory
query I
select count(*) > 0 from rl_mv3;
----
t

############## Cleanup

statement ok
drop materialized view rl_mv1;

statement ok
drop materialized view rl_mv2;

statement ok
drop materialized view rl_mv3;

statement ok
drop source kafka_source;

statement ok
drop source kafka_source2;

statement ok
drop sink kafka_sink;

statement ok
drop table kafka_seed_data;

statement ok
SET streaming_use_shared_source TO true;
19 changes: 18 additions & 1 deletion src/frontend/src/handler/alter_streaming_rate_limit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,12 @@ use pgwire::pg_response::{PgResponse, StatementType};
use risingwave_common::bail;
use risingwave_pb::meta::ThrottleTarget as PbThrottleTarget;
use risingwave_sqlparser::ast::ObjectName;
use risingwave_sqlparser::parser::{SOURCE_RATE_LIMIT_PAUSED, SOURCE_RATE_LIMIT_RESUMED};

use super::{HandlerArgs, RwPgResponse};
use crate::catalog::root_catalog::SchemaPath;
use crate::catalog::table_catalog::TableType;
use crate::error::ErrorCode::InvalidInputSyntax;
use crate::error::{ErrorCode, Result};
use crate::Binder;

Expand Down Expand Up @@ -56,6 +58,17 @@ pub async fn handle_alter_streaming_rate_limit(
let reader = session.env().catalog_reader().read_guard();
let (source, schema_name) =
reader.get_source_by_name(db_name, schema_path, &real_table_name)?;
if let Some(prev_limit) = source.rate_limit {
if rate_limit == SOURCE_RATE_LIMIT_PAUSED
|| (prev_limit != 0 && rate_limit == SOURCE_RATE_LIMIT_RESUMED)
{
return Err(InvalidInputSyntax(
"PAUSE or RESUME is invalid when the stream has pre configured ratelimit."
.to_string(),
)
.into());
}
}
session.check_privilege_for_drop_alter(schema_name, &**source)?;
(StatementType::ALTER_SOURCE, source.id)
}
Expand Down Expand Up @@ -91,7 +104,11 @@ pub async fn handle_alter_streaming_rate_limit(
let meta_client = session.env().meta_client();

let rate_limit = if rate_limit < 0 {
None
if rate_limit == SOURCE_RATE_LIMIT_PAUSED {
Some(0)
} else {
None
}
} else {
Some(rate_limit as u32)
};
Expand Down
9 changes: 6 additions & 3 deletions src/sqlparser/src/ast/ddl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use crate::ast::{
display_comma_separated, display_separated, DataType, Expr, Ident, ObjectName, SecretRefValue,
SetVariableValue, Value,
};
use crate::parser::{SOURCE_RATE_LIMIT_PAUSED, SOURCE_RATE_LIMIT_RESUMED};
use crate::tokenizer::Token;

#[derive(Debug, Clone, PartialEq, Eq, Hash)]
Expand Down Expand Up @@ -458,9 +459,11 @@ impl fmt::Display for AlterSourceOperation {
AlterSourceOperation::RefreshSchema => {
write!(f, "REFRESH SCHEMA")
}
AlterSourceOperation::SetSourceRateLimit { rate_limit } => {
write!(f, "SET SOURCE_RATE_LIMIT TO {}", rate_limit)
}
AlterSourceOperation::SetSourceRateLimit { rate_limit } => match *rate_limit {
SOURCE_RATE_LIMIT_PAUSED => write!(f, "PAUSE"),
SOURCE_RATE_LIMIT_RESUMED => write!(f, "RESUME"),
_ => write!(f, "SET SOURCE_RATE_LIMIT TO {}", rate_limit),
},
AlterSourceOperation::SwapRenameSource { target_source } => {
write!(f, "SWAP WITH {}", target_source)
}
Expand Down
2 changes: 2 additions & 0 deletions src/sqlparser/src/keywords.rs
Original file line number Diff line number Diff line change
Expand Up @@ -384,6 +384,7 @@ define_keywords!(
PARTITIONED,
PARTITIONS,
PASSWORD,
PAUSE,
PERCENT,
PERCENTILE_CONT,
PERCENTILE_DISC,
Expand Down Expand Up @@ -433,6 +434,7 @@ define_keywords!(
REPLACE,
RESTRICT,
RESULT,
RESUME,
RETURN,
RETURNING,
RETURNS,
Expand Down
14 changes: 13 additions & 1 deletion src/sqlparser/src/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@ use crate::{impl_parse_to, parser_v2};

pub(crate) const UPSTREAM_SOURCE_KEY: &str = "connector";
pub(crate) const WEBHOOK_CONNECTOR: &str = "webhook";
// reserve i32::MIN for pause.
pub const SOURCE_RATE_LIMIT_PAUSED: i32 = i32::MIN;
// reserve i32::MIN + 1 for resume.
pub const SOURCE_RATE_LIMIT_RESUMED: i32 = i32::MIN + 1;

#[derive(Debug, Clone, PartialEq)]
pub enum ParserError {
Expand Down Expand Up @@ -3515,9 +3519,17 @@ impl Parser<'_> {
} else if self.parse_keywords(&[Keyword::SWAP, Keyword::WITH]) {
let target_source = self.parse_object_name()?;
AlterSourceOperation::SwapRenameSource { target_source }
} else if self.parse_keyword(Keyword::PAUSE) {
AlterSourceOperation::SetSourceRateLimit {
rate_limit: SOURCE_RATE_LIMIT_PAUSED,
}
} else if self.parse_keyword(Keyword::RESUME) {
AlterSourceOperation::SetSourceRateLimit {
rate_limit: SOURCE_RATE_LIMIT_RESUMED,
}
} else {
return self.expected(
"RENAME, ADD COLUMN, OWNER TO, SET or SOURCE_RATE_LIMIT after ALTER SOURCE",
"RENAME, ADD COLUMN, OWNER TO, SET, PAUSE, RESUME, or SOURCE_RATE_LIMIT after ALTER SOURCE",
);
};

Expand Down

0 comments on commit 6f2d0c3

Please sign in to comment.