Skip to content

Commit

Permalink
feat: add protobuf/echo/ProtoMessageBuilder (#416)
Browse files Browse the repository at this point in the history
* feat: add protobuf/echo/ProtoMessageBuilder

* ProtoCEchoPlugin: Fix superfluous typename

* protobuf/echoProtoMessageBuilder: Parse a lot of different types

* protobuf/echoProtoMessageBuilder: Parse enums, strings, extract BuferingStreamReader

* Resolve code warnings, increase coverage

* protobuf/echo: Add BufferingStreamWriter

* protobuf/echo: Add half of ProtoMessageSender

* protobuf/echo/ProtoMessageSender: Serialize lots of types

* protobuf/echo/ProtoMessageSender: Fix missing template keyword

* protobuf/echo/ProtoMessageSender: Serialize lots of types

* protobuf/protoc_echo_plugin/ProtoCEchoPlugin: Modify result of MessageReference::Get() const to avoid a warning

* Apply suggestions from code review

Co-authored-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com>

* protobuf/echo/BufferingStreamReader: Make BufferingStreamReader accept a stream reader instead of a byte range

* protobuf/echo/BufferingStreamReader: Remove useless comment

* Resolve Sonar warnings

* Resolve Sonar warnings

* Move BufferingSteamReader and BufferingStreamWriter to infra/stream

* infra/stream/test: Add tests for BufferingStreamReader and BufferingStreamWriter

* Apply suggestions from code review

Co-authored-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com>

* Resolve Sonar warnings

* Resolve Sonar warnings

* Resolve Sonar warnings

* Reduce duplication

* Update services/network/test_doubles/Certificates.cpp

---------

Co-authored-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com>
Co-authored-by: Ron <45816308+rjaegers@users.noreply.github.com>
  • Loading branch information
3 people authored Sep 28, 2023
1 parent 7b2ada0 commit f5260dd
Show file tree
Hide file tree
Showing 34 changed files with 2,393 additions and 414 deletions.
107 changes: 107 additions & 0 deletions infra/stream/BufferingStreamReader.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
#include "infra/stream/BufferingStreamReader.hpp"

namespace infra
{
BufferingStreamReader::BufferingStreamReader(infra::BoundedDeque<uint8_t>& buffer, infra::StreamReaderWithRewinding& input)
: buffer(buffer)
, input(input)
{}

BufferingStreamReader::~BufferingStreamReader()
{
StoreRemainder();
}

void BufferingStreamReader::Extract(infra::ByteRange range, infra::StreamErrorPolicy& errorPolicy)
{
if (index != buffer.size())
{
Read(infra::Head(buffer.contiguous_range(buffer.begin() + index), range.size()), range);
// Perhaps the deque just wrapped around, try once more
Read(infra::Head(buffer.contiguous_range(buffer.begin() + index), range.size()), range);
}

if (!range.empty())
Read(input.ExtractContiguousRange(range.size()), range);

errorPolicy.ReportResult(range.empty());
}

uint8_t BufferingStreamReader::Peek(infra::StreamErrorPolicy& errorPolicy)
{
auto range = PeekContiguousRange(0);

errorPolicy.ReportResult(!range.empty());

if (range.empty())
return 0;
else
return range.front();
}

infra::ConstByteRange BufferingStreamReader::ExtractContiguousRange(std::size_t max)
{
if (index < buffer.size())
{
auto from = infra::Head(buffer.contiguous_range(buffer.begin() + index), max);
index += from.size();
return from;
}

return input.ExtractContiguousRange(max);
}

infra::ConstByteRange BufferingStreamReader::PeekContiguousRange(std::size_t start)
{
if (index + start < buffer.size())
return buffer.contiguous_range(buffer.begin() + index + start);

return input.PeekContiguousRange(index + start - buffer.size());
}

bool BufferingStreamReader::Empty() const
{
return Available() == 0;
}

std::size_t BufferingStreamReader::Available() const
{
return buffer.size() + input.Available();
}

std::size_t BufferingStreamReader::ConstructSaveMarker() const
{
return index;
}

void BufferingStreamReader::Rewind(std::size_t marker)
{
if (index > buffer.size())
{
auto rewindAmount = std::min(index - marker, index - buffer.size());
input.Rewind(input.ConstructSaveMarker() - rewindAmount);
index -= rewindAmount;
}

if (marker < buffer.size())
index = marker;
}

void BufferingStreamReader::Read(infra::ConstByteRange from, infra::ByteRange& to)
{
infra::Copy(from, infra::Head(to, from.size()));
to.pop_front(from.size());
index += from.size();
}

void BufferingStreamReader::StoreRemainder()
{
std::size_t bufferDecrease = std::min(buffer.size(), index);
buffer.erase(buffer.begin(), buffer.begin() + bufferDecrease);
while (!input.Empty())
{
auto range = input.ExtractContiguousRange(std::numeric_limits<std::size_t>::max());
buffer.insert(buffer.end(), range.begin(), range.end());
}
}
}
39 changes: 39 additions & 0 deletions infra/stream/BufferingStreamReader.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
#ifndef INFRA_BUFFERING_STREAM_READER_HPP
#define INFRA_BUFFERING_STREAM_READER_HPP

#include "infra/stream/InputStream.hpp"
#include "infra/util/BoundedDeque.hpp"

namespace infra
{
// Usage: Everything that is not read from the inputData is stored into the buffer upon destruction of the BufferingStreamReader
// Any data already present in the buffer is read first from the reader
class BufferingStreamReader
: public infra::StreamReaderWithRewinding
{
public:
BufferingStreamReader(infra::BoundedDeque<uint8_t>& buffer, infra::StreamReaderWithRewinding& input);
~BufferingStreamReader() override;

// Implementation of StreamReaderWithRewinding
void Extract(infra::ByteRange range, infra::StreamErrorPolicy& errorPolicy) override;
uint8_t Peek(infra::StreamErrorPolicy& errorPolicy) override;
infra::ConstByteRange ExtractContiguousRange(std::size_t max) override;
infra::ConstByteRange PeekContiguousRange(std::size_t start) override;
bool Empty() const override;
std::size_t Available() const override;
std::size_t ConstructSaveMarker() const override;
void Rewind(std::size_t marker) override;

private:
void Read(infra::ConstByteRange range, infra::ByteRange& to);
void StoreRemainder();

private:
infra::BoundedDeque<uint8_t>& buffer;
infra::StreamReaderWithRewinding& input;
std::size_t index = 0;
};
}

#endif
65 changes: 65 additions & 0 deletions infra/stream/BufferingStreamWriter.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
#include "infra/stream/BufferingStreamWriter.hpp"

namespace infra
{
BufferingStreamWriter::BufferingStreamWriter(infra::BoundedDeque<uint8_t>& buffer, infra::StreamWriter& output)
: buffer(buffer)
, output(output)
{
LoadRemainder();
}

void BufferingStreamWriter::Insert(infra::ConstByteRange range, infra::StreamErrorPolicy& errorPolicy)
{
auto first = infra::Head(range, output.Available());
output.Insert(first, errorPolicy);
index += first.size();
range.pop_front(first.size());

buffer.insert(buffer.end(), range.begin(), range.end());
index += range.size();
}

std::size_t BufferingStreamWriter::Available() const
{
return output.Available() + buffer.max_size() - buffer.size();
}

std::size_t BufferingStreamWriter::ConstructSaveMarker() const
{
return index;
}

std::size_t BufferingStreamWriter::GetProcessedBytesSince(std::size_t marker) const
{
return index - marker;
}

[[noreturn]] infra::ByteRange BufferingStreamWriter::SaveState(std::size_t marker)
{
std::abort();
}

[[noreturn]] void BufferingStreamWriter::RestoreState(infra::ByteRange range)
{
std::abort();
}

[[noreturn]] infra::ByteRange BufferingStreamWriter::Overwrite(std::size_t marker)
{
std::abort();
}

void BufferingStreamWriter::LoadRemainder()
{
infra::StreamErrorPolicy errorPolicy;
auto from = infra::Head(buffer.contiguous_range(buffer.begin()), output.Available());
output.Insert(from, errorPolicy);
buffer.erase(buffer.begin(), buffer.begin() + from.size());
from = infra::Head(buffer.contiguous_range(buffer.begin()), output.Available());
output.Insert(from, errorPolicy);
buffer.erase(buffer.begin(), buffer.begin() + from.size());

index = buffer.size();
}
}
36 changes: 36 additions & 0 deletions infra/stream/BufferingStreamWriter.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
#ifndef INFRA_BUFFERING_STREAM_WRITER_HPP
#define INFRA_BUFFERING_STREAM_WRITER_HPP

#include "infra/stream/OutputStream.hpp"
#include "infra/util/BoundedDeque.hpp"

namespace infra
{
// Usage: Any data that does not fit into the output stream is written to the buffer
// Any data already present in the buffer is written to the output stream upon construction of BufferingStreamWriter
class BufferingStreamWriter
: public infra::StreamWriter
{
public:
BufferingStreamWriter(infra::BoundedDeque<uint8_t>& buffer, infra::StreamWriter& output);

// Implementation of StreamWriter
void Insert(infra::ConstByteRange range, infra::StreamErrorPolicy& errorPolicy) override;
std::size_t Available() const override;
std::size_t ConstructSaveMarker() const override;
std::size_t GetProcessedBytesSince(std::size_t marker) const override;
[[noreturn]] infra::ByteRange SaveState(std::size_t marker) override;
[[noreturn]] void RestoreState(infra::ByteRange range) override;
[[noreturn]] infra::ByteRange Overwrite(std::size_t marker) override;

private:
void LoadRemainder();

private:
infra::BoundedDeque<uint8_t>& buffer;
infra::StreamWriter& output;
std::size_t index = 0;
};
}

#endif
4 changes: 4 additions & 0 deletions infra/stream/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@ target_sources(infra.stream PRIVATE
BoundedVectorInputStream.hpp
BoundedVectorOutputStream.cpp
BoundedVectorOutputStream.hpp
BufferingStreamReader.cpp
BufferingStreamReader.hpp
BufferingStreamWriter.cpp
BufferingStreamWriter.hpp
ByteInputStream.cpp
ByteInputStream.hpp
ByteOutputStream.cpp
Expand Down
2 changes: 2 additions & 0 deletions infra/stream/test/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ target_sources(infra.stream_test PRIVATE
StreamMock.hpp
TestBoundedDequeInputStream.cpp
TestBoundedVectorOutputStream.cpp
TestBufferingStreamReader.cpp
TestBufferingStreamWriter.cpp
TestByteInputStream.cpp
TestByteOutputStream.cpp
TestCountingInputStream.cpp
Expand Down
Loading

0 comments on commit f5260dd

Please sign in to comment.