diff --git a/src/apps/batched/batched.lua b/src/apps/batched/batched.lua index 2e462db734e1..8f5e13b9d142 100644 --- a/src/apps/batched/batched.lua +++ b/src/apps/batched/batched.lua @@ -15,10 +15,12 @@ return { BATCH_submit = [[ tables, gov_tables, args = ... count = 0 - for n, e in ipairs(args.params) do + for n, e in ipairs(args.params.entries) do id = e.id - msg = e.msg - tables.priv0:put(id, msg) + if id % args.params.write_key_divisor == 0 then + msg = string.rep(e.msg, args.params.write_size_multiplier) + tables.priv0:put(id, msg) + end count = count + 1 end return env.jsucc(count) diff --git a/src/kv/genericserialisewrapper.h b/src/kv/genericserialisewrapper.h index e37653ad3d6b..a4bad97b9281 100644 --- a/src/kv/genericserialisewrapper.h +++ b/src/kv/genericserialisewrapper.h @@ -38,20 +38,6 @@ namespace kv static_cast(a) | static_cast(b)); } - class KvSerialiserException : public std::exception - { - private: - std::string msg; - - public: - KvSerialiserException(const std::string& msg_) : msg(msg_) {} - - virtual const char* what() const throw() - { - return msg.c_str(); - } - }; - template struct KeyValVersion { diff --git a/src/kv/kv.h b/src/kv/kv.h index 7f5a7a3ad75f..290995f5b084 100644 --- a/src/kv/kv.h +++ b/src/kv/kv.h @@ -1010,29 +1010,46 @@ namespace kv LOG_FAIL_FMT("Could not commit transaction {}", version); return CommitSuccess::CONFLICT; } + else + { + version = c.value(); - version = c.value(); + // From here, we have received a unique commit version and made + // modifications to our local kv. If we fail in any way, we cannot + // recover. + try + { + auto data = serialise(); - auto data = serialise(); - if (data.empty()) - { - auto h = store->get_history(); - if (h != nullptr) + if (data.empty()) + { + auto h = store->get_history(); + if (h != nullptr) + { + // This tx does not have a write set, so this is a read only tx + // because of this we are returning NoVersion + h->add_result(req_id, NoVersion); + } + return CommitSuccess::OK; + } + + return store->commit( + version, + [data = std::move(data), + req_id = std::move(req_id)]() -> PendingTx::result_type { + return {CommitSuccess::OK, std::move(req_id), std::move(data)}; + }, + false); + } + catch (const std::exception& e) { - // This tx does not have a write set, so this is a read only tx - // because of this we are returning NoVersion - h->add_result(req_id, NoVersion); + LOG_FAIL_FMT("Error during serialisation: {}", e.what()); + + // Discard original exception type, throw as now fatal + // KvSerialiserException + throw KvSerialiserException(e.what()); } - return CommitSuccess::OK; } - - return store->commit( - version, - [data = std::move(data), req_id = std::move(req_id)]() - -> std::tuple { - return {CommitSuccess::OK, std::move(req_id), std::move(data)}; - }, - false); } /** Commit version if committed diff --git a/src/kv/kvtypes.h b/src/kv/kvtypes.h index 2a9a1014b051..d8662231b45f 100644 --- a/src/kv/kvtypes.h +++ b/src/kv/kvtypes.h @@ -77,6 +77,20 @@ namespace kv } }; + class KvSerialiserException : public std::exception + { + private: + std::string msg; + + public: + KvSerialiserException(const std::string& msg_) : msg(msg_) {} + + virtual const char* what() const throw() + { + return msg.c_str(); + } + }; + class TxHistory { public: diff --git a/src/kv/test/kv_serialisation.cpp b/src/kv/test/kv_serialisation.cpp index da7d2c8c5f47..4287b574f365 100644 --- a/src/kv/test/kv_serialisation.cpp +++ b/src/kv/test/kv_serialisation.cpp @@ -479,4 +479,62 @@ TEST_CASE("replicated and derived table serialisation") REQUIRE(serialised.replicated.size() > 0); REQUIRE(serialised.derived.size() > 0); } +} + +struct NonSerialisable +{}; + +namespace msgpack +{ + MSGPACK_API_VERSION_NAMESPACE(MSGPACK_DEFAULT_API_NS) + { + namespace adaptor + { + // msgpack conversion for uint256_t + template <> + struct convert + { + msgpack::object const& operator()( + msgpack::object const& o, NonSerialisable& ns) const + { + throw std::runtime_error("Deserialise failure"); + } + }; + + template <> + struct pack + { + template + packer& operator()( + msgpack::packer& o, NonSerialisable const& ns) const + { + throw std::runtime_error("Serialise failure"); + } + }; + } + } +} + +TEST_CASE("Exceptional serdes" * doctest::test_suite("serialisation")) +{ + auto encryptor = std::make_shared(); + auto consensus = std::make_shared(); + + Store store(consensus); + store.set_encryptor(encryptor); + + auto& good_map = store.create("good_map"); + auto& bad_map = store.create("bad_map"); + + { + Store::Tx tx; + + auto good_view = tx.get_view(good_map); + good_view->put(1, 2); + + auto bad_view = tx.get_view(bad_map); + bad_view->put(0, {}); + + REQUIRE_THROWS_AS(tx.commit(), kv::KvSerialiserException); + } } \ No newline at end of file diff --git a/tests/e2e_batched.py b/tests/e2e_batched.py index 19821378ff90..73b3d0be215b 100644 --- a/tests/e2e_batched.py +++ b/tests/e2e_batched.py @@ -18,11 +18,13 @@ @reqs.supports_methods("BATCH_submit", "BATCH_fetch") -def test(network, args, batch_size=100): +def test(network, args, batch_size=100, write_key_divisor=1, write_size_multiplier=1): LOG.info(f"Running batch submission of {batch_size} new entries") primary, _ = network.find_primary() with primary.user_client() as c: + check = infra.checker.Checker() + message_ids = [next(id_gen) for _ in range(batch_size)] messages = [ {"id": i, "msg": f"A unique message: {md5(bytes(i)).hexdigest()}"} @@ -30,20 +32,26 @@ def test(network, args, batch_size=100): ] pre_submit = time.time() - submit_response = c.rpc("BATCH_submit", messages) + check( + c.rpc( + "BATCH_submit", + { + "entries": messages, + "write_key_divisor": write_key_divisor, + "write_size_multiplier": write_size_multiplier, + }, + ), + result=len(messages), + ) post_submit = time.time() LOG.warning( f"Submitting {batch_size} new keys took {post_submit - pre_submit}s" ) - assert submit_response.result == len(messages) fetch_response = c.rpc("BATCH_fetch", message_ids) - assert fetch_response.result is not None - assert len(fetch_response.result) == len(message_ids) - for n, m in enumerate(messages): - fetched = fetch_response.result[n] - assert m["id"] == fetched["id"] - assert m["msg"] == fetched["msg"] + + if write_key_divisor == 1 and write_size_multiplier == 1: + check(fetch_response, result=messages) return network @@ -61,6 +69,16 @@ def run(args): network = test(network, args, batch_size=100) network = test(network, args, batch_size=1000) + network = test(network, args, batch_size=1000, write_key_divisor=10) + network = test(network, args, batch_size=1000, write_size_multiplier=10) + network = test( + network, + args, + batch_size=1000, + write_key_divisor=10, + write_size_multiplier=10, + ) + # TODO: CI already takes ~25s for batch of 10k, so avoid large batches for now # bs = 10000 # step_size = 10000 @@ -72,9 +90,33 @@ def run(args): # bs += step_size +def run_to_destruction(args): + hosts = ["localhost", "localhost", "localhost"] + + with infra.ccf.network( + hosts, args.build_dir, args.debug_nodes, args.perf_nodes, pdb=args.pdb + ) as network: + network.start_and_join(args) + + try: + wsm = 5000 + while True: + LOG.info(f"Trying with writes scaled by {wsm}") + network = test(network, args, batch_size=10, write_size_multiplier=wsm) + wsm += 5000 + except Exception as e: + LOG.info(f"Large write set caused an exception, as expected") + time.sleep(3) + assert ( + network.nodes[0].remote.remote.proc.poll() is not None + ), "Primary should have been terminated" + + if __name__ == "__main__": args = e2e_args.cli_args() args.package = "libluagenericenc" args.enforce_reqs = True run(args) + + run_to_destruction(args)