Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Generalise handling of serialisation errors #577

Merged
merged 6 commits into from
Nov 21, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions src/apps/batched/batched.lua
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,12 @@ return {
BATCH_submit = [[
tables, gov_tables, args = ...
count = 0
for n, e in ipairs(args.params) do
for n, e in ipairs(args.params.entries) do
id = e.id
msg = e.msg
tables.priv0:put(id, msg)
if id % args.params.write_key_divisor == 0 then
msg = string.rep(e.msg, args.params.write_size_multiplier)
tables.priv0:put(id, msg)
end
count = count + 1
end
return env.jsucc(count)
Expand Down
14 changes: 0 additions & 14 deletions src/kv/genericserialisewrapper.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,20 +38,6 @@ namespace kv
static_cast<KotBase>(a) | static_cast<KotBase>(b));
}

class KvSerialiserException : public std::exception
{
private:
std::string msg;

public:
KvSerialiserException(const std::string& msg_) : msg(msg_) {}

virtual const char* what() const throw()
{
return msg.c_str();
}
};

template <typename K, typename V, typename Version>
struct KeyValVersion
{
Expand Down
53 changes: 35 additions & 18 deletions src/kv/kv.h
Original file line number Diff line number Diff line change
Expand Up @@ -1010,29 +1010,46 @@ namespace kv
LOG_FAIL_FMT("Could not commit transaction {}", version);
return CommitSuccess::CONFLICT;
}
else
{
version = c.value();

version = c.value();
// From here, we have received a unique commit version and made
// modifications to our local kv. If we fail in any way, we cannot
// recover.
try
{
auto data = serialise();

auto data = serialise();
if (data.empty())
{
auto h = store->get_history();
if (h != nullptr)
if (data.empty())
{
auto h = store->get_history();
if (h != nullptr)
{
// This tx does not have a write set, so this is a read only tx
// because of this we are returning NoVersion
h->add_result(req_id, NoVersion);
}
return CommitSuccess::OK;
}

return store->commit(
version,
[data = std::move(data),
req_id = std::move(req_id)]() -> PendingTx::result_type {
return {CommitSuccess::OK, std::move(req_id), std::move(data)};
},
false);
}
catch (const std::exception& e)
{
// This tx does not have a write set, so this is a read only tx
// because of this we are returning NoVersion
h->add_result(req_id, NoVersion);
LOG_FAIL_FMT("Error during serialisation: {}", e.what());

// Discard original exception type, throw as now fatal
// KvSerialiserException
throw KvSerialiserException(e.what());
}
return CommitSuccess::OK;
}

return store->commit(
version,
[data = std::move(data), req_id = std::move(req_id)]()
-> std::tuple<CommitSuccess, TxHistory::RequestID, SerialisedMaps> {
return {CommitSuccess::OK, std::move(req_id), std::move(data)};
},
false);
}

/** Commit version if committed
Expand Down
14 changes: 14 additions & 0 deletions src/kv/kvtypes.h
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,20 @@ namespace kv
}
};

class KvSerialiserException : public std::exception
{
private:
std::string msg;

public:
KvSerialiserException(const std::string& msg_) : msg(msg_) {}

virtual const char* what() const throw()
{
return msg.c_str();
}
};

class TxHistory
{
public:
Expand Down
58 changes: 58 additions & 0 deletions src/kv/test/kv_serialisation.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -479,4 +479,62 @@ TEST_CASE("replicated and derived table serialisation")
REQUIRE(serialised.replicated.size() > 0);
REQUIRE(serialised.derived.size() > 0);
}
}

struct NonSerialisable
{};

namespace msgpack
{
MSGPACK_API_VERSION_NAMESPACE(MSGPACK_DEFAULT_API_NS)
{
namespace adaptor
{
// msgpack conversion for uint256_t
template <>
struct convert<NonSerialisable>
{
msgpack::object const& operator()(
msgpack::object const& o, NonSerialisable& ns) const
{
throw std::runtime_error("Deserialise failure");
}
};

template <>
struct pack<NonSerialisable>
{
template <typename Stream>
packer<Stream>& operator()(
msgpack::packer<Stream>& o, NonSerialisable const& ns) const
{
throw std::runtime_error("Serialise failure");
}
};
}
}
}

TEST_CASE("Exceptional serdes" * doctest::test_suite("serialisation"))
{
auto encryptor = std::make_shared<ccf::NullTxEncryptor>();
auto consensus = std::make_shared<kv::StubConsensus>();

Store store(consensus);
store.set_encryptor(encryptor);

auto& good_map = store.create<size_t, size_t>("good_map");
auto& bad_map = store.create<size_t, NonSerialisable>("bad_map");

{
Store::Tx tx;

auto good_view = tx.get_view(good_map);
good_view->put(1, 2);

auto bad_view = tx.get_view(bad_map);
bad_view->put(0, {});

REQUIRE_THROWS_AS(tx.commit(), kv::KvSerialiserException);
}
}
60 changes: 51 additions & 9 deletions tests/e2e_batched.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,32 +18,40 @@


@reqs.supports_methods("BATCH_submit", "BATCH_fetch")
def test(network, args, batch_size=100):
def test(network, args, batch_size=100, write_key_divisor=1, write_size_multiplier=1):
LOG.info(f"Running batch submission of {batch_size} new entries")
primary, _ = network.find_primary()

with primary.user_client() as c:
check = infra.checker.Checker()

message_ids = [next(id_gen) for _ in range(batch_size)]
messages = [
{"id": i, "msg": f"A unique message: {md5(bytes(i)).hexdigest()}"}
for i in message_ids
]

pre_submit = time.time()
submit_response = c.rpc("BATCH_submit", messages)
check(
c.rpc(
"BATCH_submit",
{
"entries": messages,
"write_key_divisor": write_key_divisor,
"write_size_multiplier": write_size_multiplier,
},
),
result=len(messages),
)
post_submit = time.time()
LOG.warning(
f"Submitting {batch_size} new keys took {post_submit - pre_submit}s"
)
assert submit_response.result == len(messages)

fetch_response = c.rpc("BATCH_fetch", message_ids)
assert fetch_response.result is not None
assert len(fetch_response.result) == len(message_ids)
for n, m in enumerate(messages):
fetched = fetch_response.result[n]
assert m["id"] == fetched["id"]
assert m["msg"] == fetched["msg"]

if write_key_divisor == 1 and write_size_multiplier == 1:
check(fetch_response, result=messages)

return network

Expand All @@ -61,6 +69,16 @@ def run(args):
network = test(network, args, batch_size=100)
network = test(network, args, batch_size=1000)

network = test(network, args, batch_size=1000, write_key_divisor=10)
network = test(network, args, batch_size=1000, write_size_multiplier=10)
network = test(
network,
args,
batch_size=1000,
write_key_divisor=10,
write_size_multiplier=10,
)

# TODO: CI already takes ~25s for batch of 10k, so avoid large batches for now
# bs = 10000
# step_size = 10000
Expand All @@ -72,9 +90,33 @@ def run(args):
# bs += step_size


def run_to_destruction(args):
hosts = ["localhost", "localhost", "localhost"]

with infra.ccf.network(
hosts, args.build_dir, args.debug_nodes, args.perf_nodes, pdb=args.pdb
) as network:
network.start_and_join(args)

try:
wsm = 5000
while True:
LOG.info(f"Trying with writes scaled by {wsm}")
network = test(network, args, batch_size=10, write_size_multiplier=wsm)
wsm += 5000
except Exception as e:
LOG.info(f"Large write set caused an exception, as expected")
time.sleep(3)
assert (
network.nodes[0].remote.remote.proc.poll() is not None
), "Primary should have been terminated"


if __name__ == "__main__":
args = e2e_args.cli_args()
args.package = "libluagenericenc"
args.enforce_reqs = True

run(args)

run_to_destruction(args)