Skip to content

Commit

Permalink
[fix](move-memtable) immediately return error when close wait failed (a…
Browse files Browse the repository at this point in the history
…pache#44344)


Problem Summary:

apache#38003 introduced a problem where the last sink node could report
success even when close wait timeout, which may cause data loss.

Previously we made that change hoping to tolerate minority replica
failure in this step.
However, it turns out the last sink node could miss tablet reports from
downstreams in case of close wait failure.

This PR fixes the problem by return the close_wait error immediately.
The most common error in close wait is timeout, and it should not be
fault tolerant on a replica basis anyways.
  • Loading branch information
kaijchen authored and yiguolei committed Nov 21, 2024
1 parent d980d39 commit 53be6d1
Show file tree
Hide file tree
Showing 2 changed files with 5 additions and 4 deletions.
7 changes: 4 additions & 3 deletions be/src/vec/sink/writer/vtablet_writer_v2.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -604,7 +604,7 @@ Status VTabletWriterV2::close(Status exec_status) {
// close_wait on all non-incremental streams, even if this is not the last sink.
// because some per-instance data structures are now shared among all sinks
// due to sharing delta writers and load stream stubs.
_close_wait(false);
RETURN_IF_ERROR(_close_wait(false));

// send CLOSE_LOAD on all incremental streams if this is the last sink.
// this must happen after all non-incremental streams are closed,
Expand All @@ -614,7 +614,7 @@ Status VTabletWriterV2::close(Status exec_status) {
}

// close_wait on all incremental streams, even if this is not the last sink.
_close_wait(true);
RETURN_IF_ERROR(_close_wait(true));

// calculate and submit commit info
if (is_last_sink) {
Expand Down Expand Up @@ -663,7 +663,7 @@ Status VTabletWriterV2::close(Status exec_status) {
return status;
}

void VTabletWriterV2::_close_wait(bool incremental) {
Status VTabletWriterV2::_close_wait(bool incremental) {
SCOPED_TIMER(_close_load_timer);
auto st = _load_stream_map->for_each_st(
[this, incremental](int64_t dst_id, LoadStreamStubs& streams) -> Status {
Expand All @@ -688,6 +688,7 @@ void VTabletWriterV2::_close_wait(bool incremental) {
if (!st.ok()) {
LOG(WARNING) << "close_wait failed: " << st << ", load_id=" << print_id(_load_id);
}
return st;
}

void VTabletWriterV2::_calc_tablets_to_commit() {
Expand Down
2 changes: 1 addition & 1 deletion be/src/vec/sink/writer/vtablet_writer_v2.h
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ class VTabletWriterV2 final : public AsyncResultWriter {

void _calc_tablets_to_commit();

void _close_wait(bool incremental);
Status _close_wait(bool incremental);

void _cancel(Status status);

Expand Down

0 comments on commit 53be6d1

Please sign in to comment.