Skip to content

Commit

Permalink
Add more tests for invalid changesets
Browse files Browse the repository at this point in the history
  • Loading branch information
tgoyne committed Aug 30, 2022
1 parent 549e81e commit 8c3bffc
Show file tree
Hide file tree
Showing 12 changed files with 215 additions and 56 deletions.
11 changes: 1 addition & 10 deletions src/realm/sync/changeset_encoder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -306,16 +306,7 @@ void ChangesetEncoder::append_path_instr(Instruction::Type t, const Instruction:
template <class T>
void ChangesetEncoder::append_int(T integer)
{
// One sign bit plus number of value bits
const int num_bits = 1 + std::numeric_limits<T>::digits;
// Only the first 7 bits are available per byte. Had it not been
// for the fact that maximum guaranteed bit width of a char is 8,
// this value could have been increased to 15 (one less than the
// number of value bits in 'unsigned').
const int bits_per_byte = 7;
const int max_bytes = (num_bits + (bits_per_byte - 1)) / bits_per_byte;

char buffer[max_bytes];
char buffer[_impl::encode_int_max_bytes<T>()];
std::size_t n = _impl::encode_int(buffer, integer);
append_bytes(buffer, n);
}
Expand Down
2 changes: 1 addition & 1 deletion src/realm/sync/changeset_parser.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -332,10 +332,10 @@ void State::parse_one()

if (t == InstrTypeInternString) {
uint32_t index = read_int<uint32_t>();
StringData str = read_string();
if (index != m_intern_strings.size()) {
parser_error("Unexpected intern index");
}
StringData str = read_string();
if (!m_intern_strings.insert(str).second) {
parser_error("Unexpected intern string");
}
Expand Down
3 changes: 1 addition & 2 deletions src/realm/sync/client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -809,8 +809,7 @@ void SessionImpl::process_pending_flx_bootstrap()


history.integrate_server_changesets(
*pending_batch.progress, &downloadable_bytes, pending_batch.changesets.data(),
pending_batch.changesets.size(), new_version, batch_state, logger,
*pending_batch.progress, &downloadable_bytes, pending_batch.changesets, new_version, batch_state, logger,
[&](const TransactionRef& tr) {
bootstrap_store->pop_front_pending(tr, pending_batch.changesets.size());
},
Expand Down
18 changes: 9 additions & 9 deletions src/realm/sync/noinst/client_history_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -378,27 +378,27 @@ void ClientHistory::find_uploadable_changesets(UploadCursor& upload_progress, ve

void ClientHistory::integrate_server_changesets(const SyncProgress& progress,
const std::uint_fast64_t* downloadable_bytes,
const RemoteChangeset* incoming_changesets,
std::size_t num_changesets, VersionInfo& version_info,
DownloadBatchState batch_state, util::Logger& logger,
util::Span<const RemoteChangeset> incoming_changesets,
VersionInfo& version_info, DownloadBatchState batch_state,
util::Logger& logger,
util::UniqueFunction<void(const TransactionRef&)> run_in_write_tr,
SyncTransactReporter* transact_reporter)
{
REALM_ASSERT(num_changesets != 0);
REALM_ASSERT(incoming_changesets.size() != 0);

std::uint_fast64_t downloaded_bytes_in_message = 0;
std::vector<Changeset> changesets;
changesets.resize(num_changesets); // Throws
changesets.resize(incoming_changesets.size()); // Throws

try {
for (std::size_t i = 0; i < num_changesets; ++i) {
for (std::size_t i = 0; i < incoming_changesets.size(); ++i) {
const RemoteChangeset& changeset = incoming_changesets[i];
downloaded_bytes_in_message += changeset.original_changeset_size;
parse_remote_changeset(changeset, changesets[i]); // Throws
changesets[i].transform_sequence = i;
}
}
catch (BadChangesetError& e) {
catch (const TransformError& e) {
throw IntegrationException(ClientError::bad_changeset,
util::format("Failed to parse received changeset: %1", e.what()));
}
Expand All @@ -420,7 +420,7 @@ void ClientHistory::integrate_server_changesets(const SyncProgress& progress,

ChangesetEncoder::Buffer transformed_changeset;
try {
for (std::size_t i = 0; i < num_changesets; ++i) {
for (std::size_t i = 0; i < incoming_changesets.size(); ++i) {
const RemoteChangeset& changeset = incoming_changesets[i];
REALM_ASSERT(changeset.last_integrated_local_version <= local_version);
REALM_ASSERT(changeset.origin_file_ident > 0 && changeset.origin_file_ident != sync_file_id);
Expand All @@ -446,7 +446,7 @@ void ClientHistory::integrate_server_changesets(const SyncProgress& progress,

TempShortCircuitReplication tscr{m_replication};
InstructionApplier applier{*transact};
for (std::size_t i = 0; i < num_changesets; ++i) {
for (std::size_t i = 0; i < incoming_changesets.size(); ++i) {
encode_changeset(changesets[i], transformed_changeset);
applier.apply(changesets[i], &logger); // Throws
}
Expand Down
6 changes: 2 additions & 4 deletions src/realm/sync/noinst/client_history_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -246,14 +246,12 @@ class ClientHistory final : public _impl::History, public TransformHistory {
/// about byte-level progress, this function updates the persistent record
/// of the estimate of the number of remaining bytes to be downloaded.
///
/// \param num_changesets The number of passed changesets. Must be non-zero.
///
/// \param transact_reporter An optional callback which will be called with the
/// version immediately processing the sync transaction and that of the sync
/// transaction.
void integrate_server_changesets(const SyncProgress& progress, const std::uint_fast64_t* downloadable_bytes,
const RemoteChangeset* changesets, std::size_t num_changesets,
VersionInfo& new_version, DownloadBatchState download_type, util::Logger&,
util::Span<const RemoteChangeset> changesets, VersionInfo& new_version,
DownloadBatchState download_type, util::Logger&,
util::UniqueFunction<void(const TransactionRef&)> run_in_write_tr = nullptr,
SyncTransactReporter* transact_reporter = nullptr);

Expand Down
8 changes: 3 additions & 5 deletions src/realm/sync/noinst/client_impl_base.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1394,17 +1394,15 @@ void Session::integrate_changesets(ClientReplication& repl, const SyncProgress&
history.set_sync_progress(progress, &downloadable_bytes, version_info); // Throws
return;
}
const Transformer::RemoteChangeset* changesets = received_changesets.data();
std::size_t num_changesets = received_changesets.size();
history.integrate_server_changesets(progress, &downloadable_bytes, changesets, num_changesets, version_info,
history.integrate_server_changesets(progress, &downloadable_bytes, received_changesets, version_info,
download_batch_state, logger, {}, get_transact_reporter()); // Throws
if (num_changesets == 1) {
if (received_changesets.size() == 1) {
logger.debug("1 remote changeset integrated, producing client version %1",
version_info.sync_version.version); // Throws
}
else {
logger.debug("%2 remote changesets integrated, producing client version %1",
version_info.sync_version.version, num_changesets); // Throws
version_info.sync_version.version, received_changesets.size()); // Throws
}
}

Expand Down
1 change: 0 additions & 1 deletion src/realm/sync/noinst/protocol_codec.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ struct ProtocolCodecException : public std::runtime_error {
};
class HeaderLineParser {
public:
HeaderLineParser() = default;
explicit HeaderLineParser(std::string_view line)
: m_sv(line)
{
Expand Down
29 changes: 14 additions & 15 deletions src/realm/sync/tools/apply_to_state_command.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -109,23 +109,23 @@ DownloadMessage DownloadMessage::parse(HeaderLineParser& msg, Logger& logger, bo
ret.progress.upload.last_integrated_server_version, ret.progress.upload.client_version,
ret.latest_server_version.version);

HeaderLineParser body;
std::string_view body_str;
if (is_body_compressed) {
ret.uncompressed_body_buffer.set_size(uncompressed_body_size);
auto compressed_body = msg.read_sized_data<BinaryData>(compressed_body_size);
std::error_code ec = util::compression::decompress(
compressed_body, {ret.uncompressed_body_buffer.data(), uncompressed_body_size});
std::error_code ec = util::compression::decompress(compressed_body, ret.uncompressed_body_buffer);

if (ec) {
throw ProtocolCodecException("error decompressing download message");
}

body = HeaderLineParser(std::string_view(ret.uncompressed_body_buffer.data(), uncompressed_body_size));
body_str = std::string_view(ret.uncompressed_body_buffer.data(), uncompressed_body_size);
}
else {
body = HeaderLineParser(msg.read_sized_data<std::string_view>(uncompressed_body_size));
body_str = msg.read_sized_data<std::string_view>(uncompressed_body_size);
}

HeaderLineParser body(body_str);
while (!body.at_end()) {
realm::sync::Transformer::RemoteChangeset cur_changeset;
cur_changeset.remote_version = body.read_next<sync::version_type>();
Expand Down Expand Up @@ -161,25 +161,24 @@ UploadMessage UploadMessage::parse(HeaderLineParser& msg, Logger& logger)
ret.upload_progress.last_integrated_server_version = msg.read_next<sync::version_type>();
ret.locked_server_version = msg.read_next<sync::version_type>('\n');

HeaderLineParser body;
// if is_body_compressed == true, we must decompress the received body.
std::string_view body_str;
if (is_body_compressed) {
ret.uncompressed_body_buffer.set_size(uncompressed_body_size);
auto compressed_body = msg.read_sized_data<BinaryData>(compressed_body_size);

std::error_code ec = util::compression::decompress(
compressed_body, {ret.uncompressed_body_buffer.data(), uncompressed_body_size});
std::error_code ec = util::compression::decompress(compressed_body, ret.uncompressed_body_buffer);

if (ec) {
throw ProtocolCodecException("error decompressing upload message");
}

body = HeaderLineParser(std::string_view(ret.uncompressed_body_buffer.data(), uncompressed_body_size));
body_str = std::string_view(ret.uncompressed_body_buffer.data(), uncompressed_body_size);
}
else {
body = HeaderLineParser(msg.read_sized_data<std::string_view>(uncompressed_body_size));
body_str = msg.read_sized_data<std::string_view>(uncompressed_body_size);
}

HeaderLineParser body(body_str);
while (!body.at_end()) {
realm::sync::Changeset cur_changeset;
cur_changeset.version = body.read_next<sync::version_type>();
Expand Down Expand Up @@ -292,10 +291,10 @@ int main(int argc, const char** argv)
mpark::visit(realm::util::overload{
[&](const DownloadMessage& download_message) {
realm::sync::VersionInfo version_info;
history.integrate_server_changesets(
download_message.progress, &download_message.downloadable_bytes,
download_message.changesets.data(), download_message.changesets.size(), version_info,
download_message.batch_state, *logger, nullptr);
history.integrate_server_changesets(download_message.progress,
&download_message.downloadable_bytes,
download_message.changesets, version_info,
download_message.batch_state, *logger, nullptr);
},
[&](const UploadMessage& upload_message) {
for (const auto& changeset : upload_message.changesets) {
Expand Down
147 changes: 146 additions & 1 deletion test/test_changeset_encoding.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#include <realm/sync/changeset.hpp>
#include <realm/sync/changeset_encoder.hpp>
#include <realm/sync/changeset_parser.hpp>
#include <realm/sync/noinst/integer_codec.hpp>

using namespace realm;
using namespace realm::sync::instr;
Expand All @@ -20,7 +21,6 @@ Changeset encode_then_parse(const Changeset& changeset)
parse_changeset(stream, parsed);
return parsed;
}
} // namespace

TEST(ChangesetEncoding_AddTable)
{
Expand Down Expand Up @@ -294,3 +294,148 @@ TEST(ChangesetEncoding_AccentWords)
// This will throw if a string is interned twice.
CHECK_NOTHROW(parse_changeset(stream, parsed));
}

void encode_instruction(util::AppendBuffer<char>& buffer, char instr)
{
buffer.append(&instr, 1);
}

void encode_int(util::AppendBuffer<char>& buffer, int64_t value)
{
char buf[_impl::encode_int_max_bytes<int64_t>()];
size_t written = _impl::encode_int(buf, value);
buffer.append(buf, written);
}

void encode_string(util::AppendBuffer<char>& buffer, uint32_t index, std::string_view value)
{
encode_instruction(buffer, sync::InstrTypeInternString);
encode_int(buffer, index); // Index
encode_int(buffer, value.size()); // String length
buffer.append(value.data(), value.size());
}

#define CHECK_BADCHANGESET(buffer, msg) \
do { \
util::SimpleNoCopyInputStream stream{buffer}; \
Changeset parsed; \
CHECK_THROW_EX(parse_changeset(stream, parsed), sync::BadChangesetError, \
StringData(e.what()).contains(msg)); \
} while (0)

TEST(ChangesetParser_BadInstruction)
{
util::AppendBuffer<char> buffer;
encode_instruction(buffer, 0x3e);
CHECK_BADCHANGESET(buffer, "unknown instruction");
}

TEST(ChangesetParser_GoodInternString)
{
util::AppendBuffer<char> buffer;
encode_string(buffer, 0, "a");
encode_string(buffer, 1, "b");

util::SimpleNoCopyInputStream stream{buffer};
Changeset parsed;
CHECK_NOTHROW(parse_changeset(stream, parsed));
}

TEST(ChangesetParser_BadInternString_MissingIndex)
{
util::AppendBuffer<char> buffer;
encode_instruction(buffer, sync::InstrTypeInternString);
CHECK_BADCHANGESET(buffer, "bad changeset - integer decoding failure");
}

TEST(ChangesetParser_BadInternString_IndexTooLarge)
{
util::AppendBuffer<char> buffer;
encode_instruction(buffer, sync::InstrTypeInternString);
encode_int(buffer, std::numeric_limits<int64_t>::max()); // Index
encode_int(buffer, 0); // String length
CHECK_BADCHANGESET(buffer, "bad changeset - integer decoding failure");
}

TEST(ChangesetParser_BadInternString_UnorderedIndex)
{
util::AppendBuffer<char> buffer;
encode_instruction(buffer, sync::InstrTypeInternString);
encode_int(buffer, 1); // Index
CHECK_BADCHANGESET(buffer, "Unexpected intern index");
}

TEST(ChangesetParser_BadInternString_MissingLength)
{
util::AppendBuffer<char> buffer;
encode_instruction(buffer, sync::InstrTypeInternString);
encode_int(buffer, 1); // Index
CHECK_BADCHANGESET(buffer, "Unexpected intern index");
}

TEST(ChangesetParser_BadInternString_LengthTooLong)
{
util::AppendBuffer<char> buffer;
encode_instruction(buffer, sync::InstrTypeInternString);
encode_int(buffer, 0); // Index
encode_int(buffer, Table::max_string_size + 1); // String length
CHECK_BADCHANGESET(buffer, "string too long");
}

TEST(ChangesetParser_BadInternString_NegativeLength)
{
util::AppendBuffer<char> buffer;
encode_instruction(buffer, sync::InstrTypeInternString);
encode_int(buffer, 0); // Index
encode_int(buffer, -1); // String length
CHECK_BADCHANGESET(buffer, "bad changeset - integer decoding failure");
}

TEST(ChangesetParser_BadInternString_TruncatedLength)
{
util::AppendBuffer<char> buffer;
encode_instruction(buffer, sync::InstrTypeInternString);
encode_int(buffer, 0); // Index

char buf[_impl::encode_int_max_bytes<uint32_t>()];
size_t written = _impl::encode_int(buf, Table::max_string_size);
buffer.append(buf, written - 1);

CHECK_BADCHANGESET(buffer, "bad changeset - integer decoding failure");
}

TEST(ChangesetParser_BadInternString_MissingBody)
{
util::AppendBuffer<char> buffer;
encode_instruction(buffer, sync::InstrTypeInternString);
encode_int(buffer, 0); // Index
encode_int(buffer, 1); // String length
CHECK_BADCHANGESET(buffer, "truncated input");
}

TEST(ChangesetParser_BadInternString_RepeatedIndex)
{
util::AppendBuffer<char> buffer;
encode_instruction(buffer, sync::InstrTypeInternString);
encode_string(buffer, 0, "a");
encode_string(buffer, 0, "b");
CHECK_BADCHANGESET(buffer, "Unexpected intern index");
}

TEST(ChangesetParser_BadInternString_RepeatedBody)
{
util::AppendBuffer<char> buffer;
encode_string(buffer, 0, "a");
encode_string(buffer, 1, "a");
CHECK_BADCHANGESET(buffer, "Unexpected intern string");
}

TEST(ChangesetParser_BadInternString_InvalidUse)
{
util::AppendBuffer<char> buffer;
encode_instruction(buffer, char(sync::Instruction::Type::CreateObject));
encode_int(buffer, 0); // Index
CHECK_BADCHANGESET(buffer, "Invalid interned string");
}

} // namespace
Loading

0 comments on commit 8c3bffc

Please sign in to comment.