From 3c92c29bec45fcc78784cd1642305ba4bc98ac8f Mon Sep 17 00:00:00 2001 From: neverchanje Date: Sun, 28 Feb 2021 14:26:35 +0800 Subject: [PATCH 1/4] refactor: move primary's learning preparation of durable states into another function --- src/replica/replica.h | 5 + src/replica/replica_learn.cpp | 181 +++++++++++++++++----------------- 2 files changed, 94 insertions(+), 92 deletions(-) diff --git a/src/replica/replica.h b/src/replica/replica.h index 5886f7ee6b..86b32ff919 100644 --- a/src/replica/replica.h +++ b/src/replica/replica.h @@ -287,6 +287,11 @@ class replica : public serverlet, public ref_counter, public replica_ba void notify_learn_completion(); error_code apply_learned_state_from_private_log(learn_state &state); + void prepare_durable_learn_state(decree learn_start_decree, + const learn_request &request, + learn_response &response, + remote_learner_state &learn_state); + // Gets the position where this round of the learning process should begin. // This method is called on primary-side. // TODO(wutao1): mark it const diff --git a/src/replica/replica_learn.cpp b/src/replica/replica_learn.cpp index ca8c79f544..ce2b1199ee 100644 --- a/src/replica/replica_learn.cpp +++ b/src/replica/replica_learn.cpp @@ -493,98 +493,7 @@ void replica::on_learn(dsn::message_ex *msg, const learn_request &request) // learn delta state or checkpoint // in this case, the state on the PS is still incomplete else { - if (learn_start_decree > _app->last_durable_decree()) { - ddebug("%s: on_learn[%016" PRIx64 "]: learner = %s, choose to learn private logs, " - "because learn_start_decree(%" PRId64 ") > _app->last_durable_decree(%" PRId64 - ")", - name(), - request.signature, - request.learner.to_string(), - learn_start_decree, - _app->last_durable_decree()); - _private_log->get_learn_state(get_gpid(), learn_start_decree, response.state); - response.type = learn_type::LT_LOG; - } else if (_private_log->get_learn_state(get_gpid(), learn_start_decree, response.state)) { - ddebug("%s: on_learn[%016" PRIx64 "]: learner = %s, choose to learn private logs, " - "because mutation_log::get_learn_state() returns true", - name(), - request.signature, - request.learner.to_string()); - response.type = learn_type::LT_LOG; - } else if (learn_start_decree < request.last_committed_decree_in_app + 1) { - ddebug("%s: on_learn[%016" PRIx64 "]: learner = %s, choose to learn private logs, " - "because learn_start_decree steps back for duplication", - name(), - request.signature, - request.learner.to_string()); - response.type = learn_type::LT_LOG; - } else { - ddebug("%s: on_learn[%016" PRIx64 "]: learner = %s, choose to learn app, " - "beacuse learn_start_decree(%" PRId64 ") <= _app->last_durable_decree(%" PRId64 - "), " - "and mutation_log::get_learn_state() returns false", - name(), - request.signature, - request.learner.to_string(), - learn_start_decree, - _app->last_durable_decree()); - response.type = learn_type::LT_APP; - response.state = learn_state(); - } - - if (response.type == learn_type::LT_LOG) { - response.base_local_dir = _private_log->dir(); - if (response.state.files.size() > 0) { - auto &last_file = response.state.files.back(); - if (last_file == learner_state.last_learn_log_file) { - ddebug( - "%s: on_learn[%016" PRIx64 - "]: learner = %s, learn the same file %s repeatedly, hint to switch file", - name(), - request.signature, - request.learner.to_string(), - last_file.c_str()); - _private_log->hint_switch_file(); - } else { - learner_state.last_learn_log_file = last_file; - } - } - // it is safe to commit to last_committed_decree() now - response.state.to_decree_included = last_committed_decree(); - ddebug("%s: on_learn[%016" PRIx64 "]: learner = %s, learn private logs succeed, " - "learned_meta_size = %u, learned_file_count = %u, " - "to_decree_included = %" PRId64, - name(), - request.signature, - request.learner.to_string(), - response.state.meta.length(), - static_cast(response.state.files.size()), - response.state.to_decree_included); - } else { - ::dsn::error_code err = _app->get_checkpoint( - learn_start_decree, request.app_specific_learn_request, response.state); - - if (err != ERR_OK) { - response.err = ERR_GET_LEARN_STATE_FAILED; - derror("%s: on_learn[%016" PRIx64 - "]: learner = %s, get app checkpoint failed, error = %s", - name(), - request.signature, - request.learner.to_string(), - err.to_string()); - } else { - response.base_local_dir = _app->data_dir(); - ddebug( - "%s: on_learn[%016" PRIx64 "]: learner = %s, get app learn state succeed, " - "learned_meta_size = %u, learned_file_count = %u, learned_to_decree = %" PRId64, - name(), - request.signature, - request.learner.to_string(), - response.state.meta.length(), - static_cast(response.state.files.size()), - response.state.to_decree_included); - } - } + prepare_durable_learn_state(learn_start_decree, request, response, learner_state); } for (auto &file : response.state.files) { @@ -599,6 +508,94 @@ void replica::on_learn(dsn::message_ex *msg, const learn_request &request) } } +void replica::prepare_durable_learn_state(decree learn_start_decree, + const learn_request &request, + learn_response &response, + remote_learner_state &learner_state) +{ + if (learn_start_decree > _app->last_durable_decree()) { + ddebug("%s: on_learn[%016" PRIx64 "]: learner = %s, choose to learn private logs, " + "because learn_start_decree(%" PRId64 ") > _app->last_durable_decree(%" PRId64 ")", + name(), + request.signature, + request.learner.to_string(), + learn_start_decree, + _app->last_durable_decree()); + _private_log->get_learn_state(get_gpid(), learn_start_decree, response.state); + response.type = learn_type::LT_LOG; + } else if (_private_log->get_learn_state(get_gpid(), learn_start_decree, response.state)) { + ddebug("%s: on_learn[%016" PRIx64 "]: learner = %s, choose to learn private logs, " + "because mutation_log::get_learn_state() returns true", + name(), + request.signature, + request.learner.to_string()); + response.type = learn_type::LT_LOG; + } else { + ddebug("%s: on_learn[%016" PRIx64 "]: learner = %s, choose to learn app, " + "beacuse learn_start_decree(%" PRId64 ") <= _app->last_durable_decree(%" PRId64 "), " + "and mutation_log::get_learn_state() returns false", + name(), + request.signature, + request.learner.to_string(), + learn_start_decree, + _app->last_durable_decree()); + response.type = learn_type::LT_APP; + response.state = learn_state(); + } + + if (response.type == learn_type::LT_LOG) { + response.base_local_dir = _private_log->dir(); + if (response.state.files.size() > 0) { + auto &last_file = response.state.files.back(); + if (last_file == learner_state.last_learn_log_file) { + ddebug("%s: on_learn[%016" PRIx64 + "]: learner = %s, learn the same file %s repeatedly, hint to switch file", + name(), + request.signature, + request.learner.to_string(), + last_file.c_str()); + _private_log->hint_switch_file(); + } else { + learner_state.last_learn_log_file = last_file; + } + } + // it is safe to commit to last_committed_decree() now + response.state.to_decree_included = last_committed_decree(); + ddebug("%s: on_learn[%016" PRIx64 "]: learner = %s, learn private logs succeed, " + "learned_meta_size = %u, learned_file_count = %u, " + "to_decree_included = %" PRId64, + name(), + request.signature, + request.learner.to_string(), + response.state.meta.length(), + static_cast(response.state.files.size()), + response.state.to_decree_included); + } else { + ::dsn::error_code err = _app->get_checkpoint( + learn_start_decree, request.app_specific_learn_request, response.state); + + if (err != ERR_OK) { + response.err = ERR_GET_LEARN_STATE_FAILED; + derror("%s: on_learn[%016" PRIx64 + "]: learner = %s, get app checkpoint failed, error = %s", + name(), + request.signature, + request.learner.to_string(), + err.to_string()); + } else { + response.base_local_dir = _app->data_dir(); + ddebug("%s: on_learn[%016" PRIx64 "]: learner = %s, get app learn state succeed, " + "learned_meta_size = %u, learned_file_count = %u, learned_to_decree = %" PRId64, + name(), + request.signature, + request.learner.to_string(), + response.state.meta.length(), + static_cast(response.state.files.size()), + response.state.to_decree_included); + } + } +} + void replica::on_learn_reply(error_code err, learn_request &&req, learn_response &&resp) { _checker.only_one_thread_access(); From 3a1d2cf025a97780ff486eb614ab45ef13b2fdaa Mon Sep 17 00:00:00 2001 From: neverchanje Date: Sun, 28 Feb 2021 14:32:09 +0800 Subject: [PATCH 2/4] fix --- src/replica/replica_learn.cpp | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/src/replica/replica_learn.cpp b/src/replica/replica_learn.cpp index ce2b1199ee..b6e157b865 100644 --- a/src/replica/replica_learn.cpp +++ b/src/replica/replica_learn.cpp @@ -530,6 +530,13 @@ void replica::prepare_durable_learn_state(decree learn_start_decree, request.signature, request.learner.to_string()); response.type = learn_type::LT_LOG; + } else if (learn_start_decree < request.last_committed_decree_in_app + 1) { + ddebug("%s: on_learn[%016" PRIx64 "]: learner = %s, choose to learn private logs, " + "because learn_start_decree steps back for duplication", + name(), + request.signature, + request.learner.to_string()); + response.type = learn_type::LT_LOG; } else { ddebug("%s: on_learn[%016" PRIx64 "]: learner = %s, choose to learn app, " "beacuse learn_start_decree(%" PRId64 ") <= _app->last_durable_decree(%" PRId64 "), " From 67ced7ce40500fa00d1ab0e41ebe305efd54a8b3 Mon Sep 17 00:00:00 2001 From: neverchanje Date: Sun, 28 Feb 2021 14:43:06 +0800 Subject: [PATCH 3/4] add comments --- src/replica/replica.h | 5 +++-- src/replica/replica_learn.cpp | 4 ++-- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/src/replica/replica.h b/src/replica/replica.h index 86b32ff919..a55d12097f 100644 --- a/src/replica/replica.h +++ b/src/replica/replica.h @@ -287,10 +287,11 @@ class replica : public serverlet, public ref_counter, public replica_ba void notify_learn_completion(); error_code apply_learned_state_from_private_log(learn_state &state); + // Prepares the files on disk that will participate in the replica's learning. void prepare_durable_learn_state(decree learn_start_decree, const learn_request &request, - learn_response &response, - remote_learner_state &learn_state); + /*out*/ learn_response &response, + /*out*/ remote_learner_state &learn_state); // Gets the position where this round of the learning process should begin. // This method is called on primary-side. diff --git a/src/replica/replica_learn.cpp b/src/replica/replica_learn.cpp index b6e157b865..fe30704938 100644 --- a/src/replica/replica_learn.cpp +++ b/src/replica/replica_learn.cpp @@ -510,8 +510,8 @@ void replica::on_learn(dsn::message_ex *msg, const learn_request &request) void replica::prepare_durable_learn_state(decree learn_start_decree, const learn_request &request, - learn_response &response, - remote_learner_state &learner_state) + /*out*/ learn_response &response, + /*out*/ remote_learner_state &learner_state) { if (learn_start_decree > _app->last_durable_decree()) { ddebug("%s: on_learn[%016" PRIx64 "]: learner = %s, choose to learn private logs, " From bd62a6900ca0be0a04501ceadd8cdddfd1b8ca92 Mon Sep 17 00:00:00 2001 From: neverchanje Date: Wed, 3 Mar 2021 13:09:50 +0800 Subject: [PATCH 4/4] fix comment --- src/replica/replica_learn.cpp | 1 - 1 file changed, 1 deletion(-) diff --git a/src/replica/replica_learn.cpp b/src/replica/replica_learn.cpp index 4fb9c1d981..49b61e6f17 100644 --- a/src/replica/replica_learn.cpp +++ b/src/replica/replica_learn.cpp @@ -438,7 +438,6 @@ void replica::on_learn(dsn::message_ex *msg, const learn_request &request) response.err = ERR_OK; // learn delta state or checkpoint - // in this case, the state on the PS is still incomplete bool should_learn_cache = prepare_cached_learn_state(request, learn_start_decree, local_committed_decree,