Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(sink): decouple starrocks commit from risingwave commit #16816

Merged
merged 20 commits into from
Jun 5, 2024

Conversation

ly9chee
Copy link
Contributor

@ly9chee ly9chee commented May 19, 2024

I hereby agree to the terms of the RisingWave Labs, Inc. Contributor License Agreement.

What's changed and what's your intention?

The current StarRocks sink implementation using StreamLoad API writes data to StarRocks. It works well when the upstream does not change frequently. However, if the upstream changes frequently, many versions of the table being written to are generated, which will impact the query performance and bring significant pressure to the BE Compactor.

Implementation details

This PR attempts to solve this problem by leveraging the Starrocks Stream Load Transaction Interface. The idea is inspired by RFC Support sink coordinator and RFC Decouple iceberg sink commit from risingwave checkpoint. Despite these two RFCs being designed for data lakes, but after doing some research, I found OLAP System like StarRocks can apply the same pattern as described in these RFCs as well, i.e., Changing the current StreamLoad to the Stream Load Transaction Interface.

If you run a load job by using a program, the Stream Load transaction interface allows you to merge multiple mini-batches of data on demand and then send them all at once within one transaction by calling the /api/transaction/commit operation. As such, fewer data versions need to be loaded, and load performance is improved.

As the doc says, after a transaction is created, we can write multiple mini-batches to the transaction through /api/transaction/load interface. StarRocks will take care of merging these batches into a larger one when the /api/transaction/commit is called. So in SinkWriter implementation, we can write data to the transaction at every barrier, but only commit the transaction when a checkpoint barrier is received, which is delayed by commit_checkpoint_interval. In fact, the commit is requested after the coordinator collects the metadata from each SinkWriter.

Assume the barrier interval is 1 second and the checkpoint frequency is 1. The current sink implementation will commit data to StarRocks at every barrier, thus producing a version every second. But this PR will commit data to StarRocks only when the user-specified commit_checkpoint_interval is reached. If we set commit_checkpoint_interval to 10, a version is created roughly every 10 seconds.

Minor refactors

  1. URL encoding is applied to the username and password during the construction of mysql_conn_uri to allow special characters to be included.
  2. StarrocksCommon.partial_update option has been moved up to StarrocksConfig, which is a sink-only option.

Simple benchmark on my local environment

Sinking 5 million rows to StarRocks

Current implementation, parallelism=4
image
image
image

This PR, parallelism=4, commit_checkpoint_interval=10
image
image
image

We can see that the Version has reduced from 352 to 45

Notes

  1. Compare to the current sink implementation, there is some slight overhead within a commit_checkpoint_interval period due to the begin, prepare and commit transaction meta requests being made.
  2. The duration of commit_checkpoint_interval should not exceed FE's config parameter stream_load_default_timeout_second; otherwise, the transaction will timeout and the sink will fail repeatedly.
  3. This PR should be compatible with the current implementation, because the current implementation is just a special case of setting commit_checkpoint_interval to 1, which is the default.
  4. The transaction interface was introduced in StarRocks 2.4, but the current StarRocks sink seems only work with StarRocks version >= 2.5 due to the usage of information_schema.tables_config, which was introduced in version 2.5.
  5. Even if we can reduce the versions by setting commit_checkpoint_interval to a higher value, we sacrifice the freshness. BTW, we can further reduce the versions by lowering the sink parallelism as well.

This PR's code is base on #16777, because we all used DecoupleCheckpointLogSinkerOf.

Checklist

  • I have written necessary rustdoc comments
  • I have added necessary unit tests and integration tests
  • I have added test labels as necessary. See details.
  • I have added fuzzing tests or opened an issue to track them. (Optional, recommended for new SQL features Sqlsmith: Sql feature generation #7934).
  • My PR contains breaking changes. (If it deprecates some features, please create a tracking issue to remove them in the future).
  • All checks passed in ./risedev check (or alias, ./risedev c)
  • My PR changes performance-critical code. (Please run macro/micro-benchmarks and show the results.)
  • My PR contains critical fixes that are necessary to be merged into the latest release. (Please check out the details)

Documentation

  • My PR needs documentation updates. (Please use the Release note section below to summarize the impact on users)

Release note

If this PR includes changes that directly affect users or other significant modifications relevant to the community, kindly draft a release note to provide a concise summary of these changes. Please prioritize highlighting the impact these changes will have on users.

@StrikeW StrikeW requested review from fuyufjh, ZENOTME and xxhZs May 20, 2024 02:40
Co-authored-by: TennyZhuang <zty0826@gmail.com>
Copy link
Member

@fuyufjh fuyufjh left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Generally LGTM

src/connector/src/sink/deltalake.rs Outdated Show resolved Hide resolved
let mut parsed_be_url =
Url::parse(&be_url).map_err(|err| SinkError::DorisStarrocksConnect(err.into()))?;

if fe_host != LOCALHOST && fe_host != LOCALHOST_IP {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, actually any IP of 127. refers to localhost i.e. loop-back. Does SR guarantee to return 127.0.0.1?

Copy link
Contributor Author

@ly9chee ly9chee May 21, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This snippet is actually borrowed from previous version 😅。I think it may related to solve some starrocks cluster network issues that deployed for testing, which may redirect BE to a localhost address.

if be_host == LOCALHOST || be_host == LOCALHOST_IP {
// if be host is 127.0.0.1, we may can't connect to it directly,
// so replace it with fe host
parsed_be_url
.set_host(Some(self.fe_host.as_str()))
.map_err(|err| SinkError::DorisStarrocksConnect(err.into()))?;
be_url = parsed_be_url.as_str().into();

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Related PR #16018

Comment on lines 276 to 278
let be_url = try_get_be_url(&resp, self.fe_host.clone())?.ok_or_else(|| {
SinkError::DorisStarrocksConnect(anyhow::anyhow!("Can't get doris BE url",))
})?;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should use the error returned by try_get_be_url, rather than anyhow!

Suggested change
let be_url = try_get_be_url(&resp, self.fe_host.clone())?.ok_or_else(|| {
SinkError::DorisStarrocksConnect(anyhow::anyhow!("Can't get doris BE url",))
})?;
let be_url = try_get_be_url(&resp, self.fe_host.clone())?;

Copy link
Contributor Author

@ly9chee ly9chee May 21, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right, the returned error should not be wrapped twice.
try_get_be_url will return a Result<Option<Url>>, returning Ok(None) indicates this request does not redirect. For exmaple, /api/transaction/commit interface sometimes will not redirect to BE. In this case, commit request should be treated as finished. But other interfaces /api/transaction/begin, /api/transaction/load and /api/db/table/_stream_load (now only used in Doris sink) are expected redirect to BE. ok_or_else is used to handle this case where no redirection is considered an error.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And I don't find any documents in starrocks that describe BE redirection behavior. we cannot make sure in which situation the FE should redirect to BE or not.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW, if reqwest supports auto redirection without removing sensitive headers like Authorization, we may make our code more elegant.

Comment on lines 569 to 571
let be_url = try_get_be_url(&resp, self.fe_host.clone())?.ok_or_else(|| {
SinkError::DorisStarrocksConnect(anyhow::anyhow!("Can't get doris BE url",))
})?;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ditto.

Suggested change
let be_url = try_get_be_url(&resp, self.fe_host.clone())?.ok_or_else(|| {
SinkError::DorisStarrocksConnect(anyhow::anyhow!("Can't get doris BE url",))
})?;
let be_url = try_get_be_url(&resp, self.fe_host.clone())?;

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By the way, why not do try_get_be_url() when the connector initialize and use the URL in later?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is because we send requests to FE, FE acts like a balancer and will redirect the request to a BE based on its internal balance algorithm, so we need to get the BE url for each request.

Comment on lines 600 to 601
String::from_utf8(raw)
.map_err(|err| SinkError::DorisStarrocksConnect(err.into()))?
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Internal err was lost. Please either pass the err as source (recommended) or use err.as_report() to get the proper error message.

See also https://www.notion.so/risingwave-labs/A-Guide-to-Error-Handling-that-Just-Works-Part-I-e108f8d0845b4bddaa0e5843791a8cac

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just send an access request for this doc, please check.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The owner of the doc is vacationing. Let me export it.
A_Guide_to_Error_Handling_that_Just_Works_(Part_I).pdf

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have replaced SinkError::DorisStarrocksConnect(err.into()) with SinkError::DorisStarrocksConnect(anyhow!(err)) to keep the uniformity with other sinks.

src/connector/src/sink/deltalake.rs Show resolved Hide resolved
@ly9chee
Copy link
Contributor Author

ly9chee commented May 21, 2024

Should we run doris sink e2e test as well?, There're some refactors made to doris_starrocks_connector::InserterInnerBuilder @xxhZs

Copy link
Member

@fuyufjh fuyufjh left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM!

@hzxa21 hzxa21 added the user-facing-changes Contains changes that are visible to users label May 22, 2024
@StrikeW
Copy link
Contributor

StrikeW commented Jun 5, 2024

@ly9chee Hi, shall we merge this PR?

@ly9chee
Copy link
Contributor Author

ly9chee commented Jun 5, 2024

@StrikeW
Yes, I think it's done. This PR has actually been running in our environment for a couple of weeks and it seems stable.
BTW, we have added some new features on top of this PR, we may merge this PR first, and then I will raise another PR for the new improvements.

@StrikeW StrikeW enabled auto-merge June 5, 2024 10:13
@StrikeW StrikeW added this pull request to the merge queue Jun 5, 2024
Merged via the queue into risingwavelabs:main with commit e01eb2e Jun 5, 2024
29 of 30 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants