Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(frontend): support alter source pause/resume #19636

Merged
merged 1 commit into from
Dec 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
control substitution on

statement ok
SET streaming_use_shared_source TO false;

############## Create kafka seed data

statement ok
create table kafka_seed_data (v1 int);

statement ok
insert into kafka_seed_data select * from generate_series(1, 1000);

############## Sink into kafka

statement ok
create sink kafka_sink
from
kafka_seed_data with (
${RISEDEV_KAFKA_WITH_OPTIONS_COMMON},
topic = 'test_rate_limit',
type = 'append-only',
force_append_only='true'
);

############## Source from kafka (rate_limit = 0)

# Wait for the topic to create
skipif in-memory
sleep 5s

statement ok
create source kafka_source (v1 int) with (
${RISEDEV_KAFKA_WITH_OPTIONS_COMMON},
topic = 'test_rate_limit',
scan.startup.mode = 'earliest',
) FORMAT PLAIN ENCODE JSON


statement ok
create source kafka_source2 (v1 int) with (
${RISEDEV_KAFKA_WITH_OPTIONS_COMMON},
topic = 'test_rate_limit',
scan.startup.mode = 'earliest',
source_rate_limit = 100,
) FORMAT PLAIN ENCODE JSON

statement ok
flush;

############## Check data

skipif in-memory
sleep 3s

############## Create MV on source

# This should be ignored.
statement ok
SET SOURCE_RATE_LIMIT=1000;

statement ok
create materialized view rl_mv1 as select count(*) from kafka_source;

statement ok
create materialized view rl_mv2 as select count(*) from kafka_source;

statement ok
create materialized view rl_mv3 as select count(*) from kafka_source;

skipif in-memory
statement count 0
alter source kafka_source pause;

skipif in-memory
statement error
alter source kafka_source2 pause;

query T
select name, node_name, fragment_type, rate_limit from rw_rate_limit join rw_relations on table_id=id
order by name;
----
rl_mv1 SOURCE {SOURCE} 0
rl_mv2 SOURCE {SOURCE} 0
rl_mv3 SOURCE {SOURCE} 0


skipif in-memory
statement count 0
alter source kafka_source resume;
kwannoel marked this conversation as resolved.
Show resolved Hide resolved

# rate limit becomes None
query T
select count(*) from rw_rate_limit;
----
0

skipif in-memory
sleep 3s

skipif in-memory
query I
select count(*) > 0 from rl_mv1;
----
t

skipif in-memory
query I
select count(*) > 0 from rl_mv2;
----
t

skipif in-memory
query I
select count(*) > 0 from rl_mv3;
----
t

############## Cleanup

statement ok
drop materialized view rl_mv1;

statement ok
drop materialized view rl_mv2;

statement ok
drop materialized view rl_mv3;

statement ok
drop source kafka_source;

statement ok
drop source kafka_source2;

statement ok
drop sink kafka_sink;

statement ok
drop table kafka_seed_data;

statement ok
SET streaming_use_shared_source TO true;
19 changes: 18 additions & 1 deletion src/frontend/src/handler/alter_streaming_rate_limit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,12 @@ use pgwire::pg_response::{PgResponse, StatementType};
use risingwave_common::bail;
use risingwave_pb::meta::ThrottleTarget as PbThrottleTarget;
use risingwave_sqlparser::ast::ObjectName;
use risingwave_sqlparser::parser::{SOURCE_RATE_LIMIT_PAUSED, SOURCE_RATE_LIMIT_RESUMED};

use super::{HandlerArgs, RwPgResponse};
use crate::catalog::root_catalog::SchemaPath;
use crate::catalog::table_catalog::TableType;
use crate::error::ErrorCode::InvalidInputSyntax;
use crate::error::{ErrorCode, Result};
use crate::Binder;

Expand Down Expand Up @@ -56,6 +58,17 @@ pub async fn handle_alter_streaming_rate_limit(
let reader = session.env().catalog_reader().read_guard();
let (source, schema_name) =
reader.get_source_by_name(db_name, schema_path, &real_table_name)?;
if let Some(prev_limit) = source.rate_limit {
if rate_limit == SOURCE_RATE_LIMIT_PAUSED
|| (prev_limit != 0 && rate_limit == SOURCE_RATE_LIMIT_RESUMED)
kwannoel marked this conversation as resolved.
Show resolved Hide resolved
{
return Err(InvalidInputSyntax(
"PAUSE or RESUME is invalid when the stream has pre configured ratelimit."
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"PAUSE or RESUME is invalid when the stream has pre configured ratelimit. Just use SET source_rate_limit = 0"

.to_string(),
)
.into());
}
}
session.check_privilege_for_drop_alter(schema_name, &**source)?;
(StatementType::ALTER_SOURCE, source.id)
}
Expand Down Expand Up @@ -91,7 +104,11 @@ pub async fn handle_alter_streaming_rate_limit(
let meta_client = session.env().meta_client();

let rate_limit = if rate_limit < 0 {
None
if rate_limit == SOURCE_RATE_LIMIT_PAUSED {
Some(0)
} else {
None
}
} else {
Some(rate_limit as u32)
};
Expand Down
9 changes: 6 additions & 3 deletions src/sqlparser/src/ast/ddl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use crate::ast::{
display_comma_separated, display_separated, DataType, Expr, Ident, ObjectName, SecretRefValue,
SetVariableValue, Value,
};
use crate::parser::{SOURCE_RATE_LIMIT_PAUSED, SOURCE_RATE_LIMIT_RESUMED};
use crate::tokenizer::Token;

#[derive(Debug, Clone, PartialEq, Eq, Hash)]
Expand Down Expand Up @@ -458,9 +459,11 @@ impl fmt::Display for AlterSourceOperation {
AlterSourceOperation::RefreshSchema => {
write!(f, "REFRESH SCHEMA")
}
AlterSourceOperation::SetSourceRateLimit { rate_limit } => {
write!(f, "SET SOURCE_RATE_LIMIT TO {}", rate_limit)
}
AlterSourceOperation::SetSourceRateLimit { rate_limit } => match *rate_limit {
SOURCE_RATE_LIMIT_PAUSED => write!(f, "PAUSE"),
SOURCE_RATE_LIMIT_RESUMED => write!(f, "RESUME"),
_ => write!(f, "SET SOURCE_RATE_LIMIT TO {}", rate_limit),
},
AlterSourceOperation::SwapRenameSource { target_source } => {
write!(f, "SWAP WITH {}", target_source)
}
Expand Down
2 changes: 2 additions & 0 deletions src/sqlparser/src/keywords.rs
Original file line number Diff line number Diff line change
Expand Up @@ -384,6 +384,7 @@ define_keywords!(
PARTITIONED,
PARTITIONS,
PASSWORD,
PAUSE,
PERCENT,
PERCENTILE_CONT,
PERCENTILE_DISC,
Expand Down Expand Up @@ -433,6 +434,7 @@ define_keywords!(
REPLACE,
RESTRICT,
RESULT,
RESUME,
RETURN,
RETURNING,
RETURNS,
Expand Down
14 changes: 13 additions & 1 deletion src/sqlparser/src/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@ use crate::{impl_parse_to, parser_v2};

pub(crate) const UPSTREAM_SOURCE_KEY: &str = "connector";
pub(crate) const WEBHOOK_CONNECTOR: &str = "webhook";
// reserve i32::MIN for pause.
pub const SOURCE_RATE_LIMIT_PAUSED: i32 = i32::MIN;
// reserve i32::MIN + 1 for resume.
pub const SOURCE_RATE_LIMIT_RESUMED: i32 = i32::MIN + 1;

#[derive(Debug, Clone, PartialEq)]
pub enum ParserError {
Expand Down Expand Up @@ -3515,9 +3519,17 @@ impl Parser<'_> {
} else if self.parse_keywords(&[Keyword::SWAP, Keyword::WITH]) {
let target_source = self.parse_object_name()?;
AlterSourceOperation::SwapRenameSource { target_source }
} else if self.parse_keyword(Keyword::PAUSE) {
AlterSourceOperation::SetSourceRateLimit {
rate_limit: SOURCE_RATE_LIMIT_PAUSED,
}
} else if self.parse_keyword(Keyword::RESUME) {
AlterSourceOperation::SetSourceRateLimit {
rate_limit: SOURCE_RATE_LIMIT_RESUMED,
}
} else {
return self.expected(
"RENAME, ADD COLUMN, OWNER TO, SET or SOURCE_RATE_LIMIT after ALTER SOURCE",
"RENAME, ADD COLUMN, OWNER TO, SET, PAUSE, RESUME, or SOURCE_RATE_LIMIT after ALTER SOURCE",
);
};

Expand Down
Loading