Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add protobuf/echo/ProtoMessageBuilder #416

Merged
merged 25 commits into from
Sep 28, 2023
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
caf1d6d
feat: add protobuf/echo/ProtoMessageBuilder
richardapeters Sep 14, 2023
3659327
ProtoCEchoPlugin: Fix superfluous typename
richardapeters Sep 14, 2023
4850e98
protobuf/echoProtoMessageBuilder: Parse a lot of different types
richardapeters Sep 17, 2023
7ad854d
protobuf/echoProtoMessageBuilder: Parse enums, strings, extract Bufer…
richardapeters Sep 17, 2023
7af4bfd
Resolve code warnings, increase coverage
richardapeters Sep 17, 2023
7143d43
protobuf/echo: Add BufferingStreamWriter
richardapeters Sep 18, 2023
9f6c3ad
protobuf/echo: Add half of ProtoMessageSender
richardapeters Sep 18, 2023
5c9f178
protobuf/echo/ProtoMessageSender: Serialize lots of types
richardapeters Sep 21, 2023
2b44b0f
protobuf/echo/ProtoMessageSender: Fix missing template keyword
richardapeters Sep 21, 2023
50f67fa
protobuf/echo/ProtoMessageSender: Serialize lots of types
richardapeters Sep 21, 2023
f3b4b37
protobuf/protoc_echo_plugin/ProtoCEchoPlugin: Modify result of Messag…
richardapeters Sep 21, 2023
7ff3ff4
Apply suggestions from code review
richardapeters Sep 21, 2023
4d866f2
protobuf/echo/BufferingStreamReader: Make BufferingStreamReader accep…
richardapeters Sep 22, 2023
e216526
protobuf/echo/BufferingStreamReader: Remove useless comment
richardapeters Sep 22, 2023
88b5d8a
Resolve Sonar warnings
richardapeters Sep 23, 2023
67182ec
Resolve Sonar warnings
richardapeters Sep 24, 2023
62b5a07
Move BufferingSteamReader and BufferingStreamWriter to infra/stream
richardapeters Sep 25, 2023
d6cf3e4
infra/stream/test: Add tests for BufferingStreamReader and BufferingS…
richardapeters Sep 26, 2023
cf83d99
Apply suggestions from code review
richardapeters Sep 26, 2023
7ec732d
Resolve Sonar warnings
richardapeters Sep 26, 2023
3625a48
Resolve Sonar warnings
richardapeters Sep 27, 2023
a0b2451
Resolve Sonar warnings
richardapeters Sep 27, 2023
2b8cfea
Reduce duplication
richardapeters Sep 28, 2023
84d2f94
Update services/network/test_doubles/Certificates.cpp
richardapeters Sep 28, 2023
9cc8c23
Merge branch 'main' into feature/proto_message_builder
rjaegers Sep 28, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions infra/syntax/ProtoFormatter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,12 @@ namespace infra
PutBytes(bytes);
}

void ProtoFormatter::PutLengthDelimitedSize(std::size_t size, uint32_t fieldNumber)
{
PutVarInt((fieldNumber << 3) | 2);
PutVarInt(size);
}

ProtoLengthDelimitedFormatter ProtoFormatter::LengthDelimitedFormatter(uint32_t fieldNumber)
{
return ProtoLengthDelimitedFormatter(*this, fieldNumber);
Expand Down
2 changes: 2 additions & 0 deletions infra/syntax/ProtoFormatter.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ namespace infra
{
public:
ProtoLengthDelimitedFormatter(ProtoFormatter& formatter, uint32_t fieldNumber);
ProtoLengthDelimitedFormatter(ProtoFormatter& formatter, uint32_t fieldNumber, std::size_t size);
ProtoLengthDelimitedFormatter(const ProtoLengthDelimitedFormatter& other) = delete;
ProtoLengthDelimitedFormatter(ProtoLengthDelimitedFormatter&& other) noexcept;
ProtoLengthDelimitedFormatter& operator=(const ProtoLengthDelimitedFormatter& other) = delete;
Expand Down Expand Up @@ -43,6 +44,7 @@ namespace infra
void PutLengthDelimitedField(infra::ConstByteRange range, uint32_t fieldNumber);
void PutStringField(infra::BoundedConstString string, uint32_t fieldNumber);
void PutBytesField(infra::ConstByteRange bytes, uint32_t fieldNumber);
void PutLengthDelimitedSize(std::size_t size, uint32_t fieldNumber);
ProtoLengthDelimitedFormatter LengthDelimitedFormatter(uint32_t fieldNumber);

private:
Expand Down
35 changes: 34 additions & 1 deletion infra/syntax/ProtoParser.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,40 @@ namespace infra
return result;
}

struct MakeFullField
: infra::StaticVisitor<ProtoParser::Field>
{
MakeFullField(infra::DataInputStream inputStream, infra::StreamErrorPolicy& formatErrorPolicy, uint32_t fieldNumber)
: inputStream(inputStream)
, formatErrorPolicy(formatErrorPolicy)
, fieldNumber(fieldNumber)
{}

template<class T>
ProtoParser::Field operator()(T value) const
{
return { value, fieldNumber };
}

ProtoParser::Field operator()(PartialProtoLengthDelimited value) const
{
return { ProtoLengthDelimited(inputStream, formatErrorPolicy, value.length), fieldNumber };
}

private:
infra::DataInputStream inputStream;
infra::StreamErrorPolicy& formatErrorPolicy;
uint32_t fieldNumber;
};

ProtoParser::Field ProtoParser::GetField()
{
auto [value, fieldNumber] = GetPartialField();
MakeFullField visitor(input, formatErrorPolicy, fieldNumber);
return infra::ApplyVisitor(visitor, value);
}

ProtoParser::PartialField ProtoParser::GetPartialField()
{
uint32_t x = static_cast<uint32_t>(GetVarInt());
uint8_t type = x & 7;
Expand All @@ -129,7 +162,7 @@ namespace infra
case 1:
return std::make_pair(GetFixed64(), fieldNumber);
case 2:
return std::make_pair(ProtoLengthDelimited(input, formatErrorPolicy, static_cast<uint32_t>(GetVarInt())), fieldNumber);
return std::make_pair(PartialProtoLengthDelimited{ static_cast<uint32_t>(GetVarInt()) }, fieldNumber);
case 5:
return std::make_pair(GetFixed32(), fieldNumber);
default:
Expand Down
11 changes: 10 additions & 1 deletion infra/syntax/ProtoParser.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,18 @@ namespace infra
infra::StreamErrorPolicy& formatErrorPolicy;
};

struct PartialProtoLengthDelimited
{
uint32_t length;
};

class ProtoParser
{
public:
using Field = std::pair<infra::Variant<uint32_t, uint64_t, ProtoLengthDelimited>, uint32_t>;
using FieldVariant = infra::Variant<uint32_t, uint64_t, ProtoLengthDelimited>;
using Field = std::pair<FieldVariant, uint32_t>;
using PartialFieldVariant = infra::Variant<uint32_t, uint64_t, PartialProtoLengthDelimited>;
using PartialField = std::pair<PartialFieldVariant, uint32_t>;

explicit ProtoParser(infra::DataInputStream inputStream);
ProtoParser(infra::DataInputStream inputStream, infra::StreamErrorPolicy& formatErrorPolicy);
Expand All @@ -48,6 +56,7 @@ namespace infra
uint64_t GetFixed64();

Field GetField();
PartialField GetPartialField();

void ReportFormatResult(bool ok);
bool FormatFailed() const;
Expand Down
11 changes: 11 additions & 0 deletions infra/syntax/test/TestProtoFormatter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,17 @@ TEST(ProtoFormatterTest, PutBytesField)
EXPECT_EQ((std::array<uint8_t, 3>{ 4 << 3 | 2, 1, 5 }), stream.Writer().Processed());
}

TEST(ProtoFormatterTest, PutSubObjectOfKnownSize)
{
infra::ByteOutputStream::WithStorage<20> stream;
infra::ProtoFormatter formatter(stream);

formatter.PutLengthDelimitedSize(2, 4);
formatter.PutVarIntField(2, 4);

EXPECT_EQ((std::array<uint8_t, 4>{ 4 << 3 | 2, 2, 4 << 3, 2 }), stream.Writer().Processed());
}

TEST(ProtoFormatterTest, PutSubObject)
{
infra::ByteOutputStream::WithStorage<20> stream;
Expand Down
7 changes: 7 additions & 0 deletions infra/util/ConstructBin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,13 @@ namespace infra
return *this;
}

ConstructBin& ConstructBin::Repeat(std::size_t amount, std::initializer_list<uint8_t> v)
{
for (size_t i = 0; i != amount; ++i)
contents.insert(contents.end(), v.begin(), v.end());
return *this;
}

ConstructBin& ConstructBin::RepeatString(std::size_t amount, const std::string& v)
{
for (size_t i = 0; i != amount; ++i)
Expand Down
1 change: 1 addition & 0 deletions infra/util/ConstructBin.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ namespace infra
ConstructBin& operator()(std::initializer_list<uint8_t> v);

ConstructBin& Repeat(std::size_t amount, uint8_t v);
ConstructBin& Repeat(std::size_t amount, std::initializer_list<uint8_t> v);
ConstructBin& RepeatString(std::size_t amount, const std::string& v);

template<class T>
Expand Down
2 changes: 1 addition & 1 deletion protobuf/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
add_subdirectory(echo)
add_subdirectory(echo_attributes)
add_subdirectory(echo)
add_subdirectory(protoc_echo_plugin)
add_subdirectory(protoc_echo_plugin_csharp)
add_subdirectory(protoc_echo_plugin_java)
109 changes: 109 additions & 0 deletions protobuf/echo/BufferingStreamReader.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
#include "protobuf/echo/BufferingStreamReader.hpp"

namespace services
{
BufferingStreamReader::BufferingStreamReader(infra::BoundedDeque<uint8_t>& buffer, infra::ConstByteRange inputData)
: buffer(buffer)
, inputData(inputData)
{}

BufferingStreamReader::~BufferingStreamReader()
{
StoreRemainder();
}

void BufferingStreamReader::Extract(infra::ByteRange range, infra::StreamErrorPolicy& errorPolicy)
{
if (index != buffer.size())
{
auto from = infra::Head(buffer.contiguous_range(buffer.begin() + index), range.size());
infra::Copy(from, infra::Head(range, from.size()));
range.pop_front(from.size());
index += from.size();

// Perhaps the deque just wrapped around, try once more
from = infra::Head(buffer.contiguous_range(buffer.begin() + index), range.size());
infra::Copy(from, infra::Head(range, from.size()));
range.pop_front(from.size());
index += from.size();
}

if (!range.empty())
{
auto dataIndex = index - buffer.size();
auto from = infra::Head(infra::DiscardHead(inputData, dataIndex), range.size());
infra::Copy(from, infra::Head(range, from.size()));
range.pop_front(from.size());
index += from.size();
}

errorPolicy.ReportResult(range.empty());
}

uint8_t BufferingStreamReader::Peek(infra::StreamErrorPolicy& errorPolicy)
{
auto range = PeekContiguousRange(0);

errorPolicy.ReportResult(!range.empty());

if (range.empty())
return 0;
else
return range.front();
}

infra::ConstByteRange BufferingStreamReader::ExtractContiguousRange(std::size_t max)
{
if (index < buffer.size())
{
auto from = infra::Head(buffer.contiguous_range(buffer.begin() + index), max);
index += from.size();
return from;
}

auto dataIndex = index - buffer.size();
auto from = infra::Head(infra::DiscardHead(inputData, dataIndex), max);
index += from.size();
return from;
}

infra::ConstByteRange BufferingStreamReader::PeekContiguousRange(std::size_t start)
{
if (index + start < buffer.size())
{
auto from = buffer.contiguous_range(buffer.begin() + index + start);
return from;
}

auto dataIndex = index + start - buffer.size();
auto from = infra::DiscardHead(inputData, dataIndex);
return from;
}

bool BufferingStreamReader::Empty() const
{
return Available() == 0;
}

std::size_t BufferingStreamReader::Available() const
{
return buffer.size() + inputData.size() - index;
}

std::size_t BufferingStreamReader::ConstructSaveMarker() const
{
return index;
}

void BufferingStreamReader::Rewind(std::size_t marker)
{
index = marker;
}

void BufferingStreamReader::StoreRemainder()
{
std::size_t bufferDecrease = std::min(buffer.size(), index);
buffer.erase(buffer.begin(), buffer.begin() + bufferDecrease);
buffer.insert(buffer.end(), inputData.begin() + index - bufferDecrease, inputData.end());
}
}
38 changes: 38 additions & 0 deletions protobuf/echo/BufferingStreamReader.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
#ifndef PROTOBUF_BUFFERING_STREAM_READER_HPP
#define PROTOBUF_BUFFERING_STREAM_READER_HPP

#include "infra/stream/InputStream.hpp"
#include "infra/util/BoundedDeque.hpp"

namespace services
{
// Usage: Everything that is not read from the inputData is stored into the buffer upon destruction of the BufferingStreamReader
// Any data already present in the buffer is read first from the reader
class BufferingStreamReader
: public infra::StreamReaderWithRewinding
{
public:
BufferingStreamReader(infra::BoundedDeque<uint8_t>& buffer, infra::ConstByteRange inputData);
~BufferingStreamReader();

// Implementation of StreamReaderWithRewinding
void Extract(infra::ByteRange range, infra::StreamErrorPolicy& errorPolicy) override;
uint8_t Peek(infra::StreamErrorPolicy& errorPolicy) override;
infra::ConstByteRange ExtractContiguousRange(std::size_t max) override;
infra::ConstByteRange PeekContiguousRange(std::size_t start) override;
bool Empty() const override;
std::size_t Available() const override;
std::size_t ConstructSaveMarker() const override;
void Rewind(std::size_t marker) override;

private:
void StoreRemainder();

private:
infra::BoundedDeque<uint8_t>& buffer;
infra::ConstByteRange inputData;
std::size_t index = 0;
};
}

#endif
65 changes: 65 additions & 0 deletions protobuf/echo/BufferingStreamWriter.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
#include "protobuf/echo/BufferingStreamWriter.hpp"

namespace services
{
BufferingStreamWriter::BufferingStreamWriter(infra::BoundedDeque<uint8_t>& buffer, infra::StreamWriter& output)
: buffer(buffer)
, output(output)
{
LoadRemainder();
}

void BufferingStreamWriter::Insert(infra::ConstByteRange range, infra::StreamErrorPolicy& errorPolicy)
{
auto first = infra::Head(range, output.Available());
output.Insert(first, errorPolicy);
index += first.size();
range.pop_front(first.size());

buffer.insert(buffer.end(), range.begin(), range.end());
index += range.size();
}

std::size_t BufferingStreamWriter::Available() const
{
return output.Available() + buffer.max_size() - buffer.size();
}

std::size_t BufferingStreamWriter::ConstructSaveMarker() const
{
return index;
}

std::size_t BufferingStreamWriter::GetProcessedBytesSince(std::size_t marker) const
{
return index - marker;
}

infra::ByteRange BufferingStreamWriter::SaveState(std::size_t marker)
{
std::abort();
}

void BufferingStreamWriter::RestoreState(infra::ByteRange range)
{
std::abort();
}

infra::ByteRange BufferingStreamWriter::Overwrite(std::size_t marker)
{
std::abort();
}

void BufferingStreamWriter::LoadRemainder()
{
infra::StreamErrorPolicy errorPolicy;
auto from = infra::Head(buffer.contiguous_range(buffer.begin()), output.Available());
output.Insert(from, errorPolicy);
buffer.erase(buffer.begin(), buffer.begin() + from.size());
from = infra::Head(buffer.contiguous_range(buffer.begin()), output.Available());
output.Insert(from, errorPolicy);
buffer.clear();

index = 0;
}
}
Loading