Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions cloud/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -325,4 +325,6 @@ CONF_mString(ca_cert_file_paths,
"/etc/ssl/ca-bundle.pem");

CONF_Bool(enable_check_fe_drop_in_safe_time, "true");
CONF_mBool(enable_logging_conflict_keys, "false");

} // namespace doris::cloud::config
94 changes: 92 additions & 2 deletions cloud/src/meta-store/txn_kv.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -317,6 +317,18 @@ TxnErrorCode Transaction::init() {
return cast_as_txn_code(err);
}

if (config::enable_logging_conflict_keys) {
err = fdb_transaction_set_option(
txn_, FDBTransactionOption::FDB_TR_OPTION_REPORT_CONFLICTING_KEYS, nullptr, 0);
if (err) {
LOG_WARNING("fdb_transaction_set_option error: ")
.tag("option", "FDB_TR_OPTION_REPORT_CONFLICTING_KEYS")
.tag("code", err)
.tag("msg", fdb_get_error(err));
return cast_as_txn_code(err);
}
}

return TxnErrorCode::TXN_OK;
}

Expand Down Expand Up @@ -551,10 +563,15 @@ TxnErrorCode Transaction::commit() {

if (err) {
LOG(WARNING) << "fdb commit error, code=" << err << " msg=" << fdb_get_error(err);
fdb_error_is_txn_conflict(err) ? g_bvar_txn_kv_commit_conflict_counter << 1
: g_bvar_txn_kv_commit_error_counter << 1;
if (fdb_error_is_txn_conflict(err)) {
g_bvar_txn_kv_commit_conflict_counter << 1;
static_cast<void>(report_conflicting_range()); // don't overwrite the original error.
} else {
g_bvar_txn_kv_commit_error_counter << 1;
}
return cast_as_txn_code(err);
}

return TxnErrorCode::TXN_OK;
}

Expand Down Expand Up @@ -596,6 +613,79 @@ TxnErrorCode Transaction::abort() {
return TxnErrorCode::TXN_OK;
}

TxnErrorCode Transaction::get_conflicting_range(
std::vector<std::pair<std::string, std::string>>* values) {
constexpr std::string_view start = "\xff\xff/transaction/conflicting_keys/";
constexpr std::string_view end = "\xff\xff/transaction/conflicting_keys/\xff";

int limit = 0;
int target_bytes = 0;
FDBStreamingMode mode = FDB_STREAMING_MODE_WANT_ALL;
int iteration = 0;
fdb_bool_t snapshot = 0;
fdb_bool_t reverse = 0;
FDBFuture* future = fdb_transaction_get_range(
txn_, FDB_KEYSEL_FIRST_GREATER_OR_EQUAL((uint8_t*)start.data(), start.size()),
FDB_KEYSEL_FIRST_GREATER_OR_EQUAL((uint8_t*)end.data(), end.size()), limit,
target_bytes, mode, iteration, snapshot, reverse);

DORIS_CLOUD_DEFER {
fdb_future_destroy(future);
};

RETURN_IF_ERROR(await_future(future));

FDBKeyValue const* out_kvs;
int out_kvs_count;
fdb_bool_t out_more;
do {
fdb_error_t err =
fdb_future_get_keyvalue_array(future, &out_kvs, &out_kvs_count, &out_more);
if (err) {
LOG(WARNING) << "get_conflicting_range get keyvalue array error: "
<< fdb_get_error(err);
return cast_as_txn_code(err);
}
for (int i = 0; i < out_kvs_count; i++) {
std::string_view key((char*)out_kvs[i].key, out_kvs[i].key_length);
std::string_view value((char*)out_kvs[i].value, out_kvs[i].value_length);
key.remove_prefix(start.size());
values->emplace_back(key, value);
}
} while (out_more);

return TxnErrorCode::TXN_OK;
}

TxnErrorCode Transaction::report_conflicting_range() {
if (!config::enable_logging_conflict_keys) {
return TxnErrorCode::TXN_OK;
}

std::vector<std::pair<std::string, std::string>> key_values;
RETURN_IF_ERROR(get_conflicting_range(&key_values));

// See https://github.com/apple/foundationdb/pull/2257/files for detail.
if (key_values.size() % 2 != 0) {
LOG(WARNING) << "the conflicting range is not well-formed, size=" << key_values.size();
return TxnErrorCode::TXN_UNIDENTIFIED_ERROR;
}

std::string out;
for (size_t i = 0; i < key_values.size(); i += 2) {
std::string_view start = key_values[i].first;
std::string_view end = key_values[i + 1].first;
std::string_view conflict_count = key_values[i].second;
if (!out.empty()) {
out += ", ";
}
out += fmt::format("[{}, {}): {}", hex(start), hex(end), conflict_count);
}
LOG(WARNING) << "conflicting key ranges: " << out;

return TxnErrorCode::TXN_OK;
}

TxnErrorCode RangeGetIterator::init() {
if (fut_ == nullptr) return TxnErrorCode::TXN_UNIDENTIFIED_ERROR;
idx_ = 0;
Expand Down
10 changes: 10 additions & 0 deletions cloud/src/meta-store/txn_kv.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

#include <foundationdb/fdb_c.h>
#include <foundationdb/fdb_c_options.g.h>
#include <gtest/gtest_prod.h>

#include <cstddef>
#include <cstdint>
Expand Down Expand Up @@ -503,6 +504,8 @@ class RangeGetIterator : public cloud::RangeGetIterator {
};

class Transaction : public cloud::Transaction {
FRIEND_TEST(TxnKvTest, ReportConflictingRange);

public:
friend class Database;
friend class FullRangeGetIterator;
Expand Down Expand Up @@ -604,6 +607,13 @@ class Transaction : public cloud::Transaction {
size_t get_bytes() const override { return get_bytes_; }

private:
// Return the conflicting range when the transaction commit returns TXN_CONFLICT.
//
// It only works when the report_conflicting_ranges option is enabled.
TxnErrorCode get_conflicting_range(
std::vector<std::pair<std::string, std::string>>* key_values);
TxnErrorCode report_conflicting_range();

std::shared_ptr<Database> db_ {nullptr};
bool commited_ = false;
bool aborted_ = false;
Expand Down
39 changes: 39 additions & 0 deletions cloud/test/txn_kv_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -819,3 +819,42 @@ TEST(TxnKvTest, FullRangeGetIterator) {
<< "ms" << std::endl;
}
}

TEST(TxnKvTest, ReportConflictingRange) {
config::enable_logging_conflict_keys = true;

constexpr std::string_view key_prefix = "txn_kv_test__report_conflicting_range";
std::string key = std::string(key_prefix) + std::to_string(time(nullptr));

{
// 1. write a common key
std::unique_ptr<Transaction> txn;
ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
txn->put(key, "value0");
ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
}

// 2. two txns, conflicting writes
std::unique_ptr<Transaction> txn1, txn2;
ASSERT_EQ(txn_kv->create_txn(&txn1), TxnErrorCode::TXN_OK);
ASSERT_EQ(txn_kv->create_txn(&txn2), TxnErrorCode::TXN_OK);

std::string val1, val2;
ASSERT_EQ(txn1->get(key, &val1), TxnErrorCode::TXN_OK);
ASSERT_EQ(txn2->get(key, &val2), TxnErrorCode::TXN_OK);

txn1->put(key, "value1");
txn2->put(key, "value2");

ASSERT_EQ(txn1->commit(), TxnErrorCode::TXN_OK);
ASSERT_EQ(txn2->commit(), TxnErrorCode::TXN_CONFLICT);

// 3. get the conflicting ranges.
std::vector<std::pair<std::string, std::string>> values;
ASSERT_EQ(reinterpret_cast<fdb::Transaction*>(txn2.get())->get_conflicting_range(&values),
TxnErrorCode::TXN_OK);
ASSERT_EQ(values.size(), 2);
ASSERT_EQ(values[0].first, key);
ASSERT_EQ(values[1].second, "0");
ASSERT_TRUE(values[1].first.starts_with(key));
}
Loading