Skip to content

Commit

Permalink
fix: handle echo message split over multiple streams (#481)
Browse files Browse the repository at this point in the history
* fix: handle echo message split over multiple streams

* protobuf/echo/Echo: Fix function getting out of scope

(cherry picked from commit 8217500)
  • Loading branch information
richardapeters authored Nov 30, 2023
1 parent be6fdde commit 37d7878
Show file tree
Hide file tree
Showing 6 changed files with 46 additions and 7 deletions.
13 changes: 9 additions & 4 deletions infra/stream/BufferingStreamReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@ namespace infra
{
if (index != buffer.size())
{
Read(infra::Head(buffer.contiguous_range(buffer.begin() + index), range.size()), range);
bufferIndex += Read(infra::Head(buffer.contiguous_range(buffer.begin() + index), range.size()), range);
// Perhaps the deque just wrapped around, try once more
Read(infra::Head(buffer.contiguous_range(buffer.begin() + index), range.size()), range);
bufferIndex += Read(infra::Head(buffer.contiguous_range(buffer.begin() + index), range.size()), range);
}

if (!range.empty())
Expand All @@ -45,6 +45,7 @@ namespace infra
{
auto from = infra::Head(buffer.contiguous_range(buffer.begin() + index), max);
index += from.size();
bufferIndex += from.size();
return from;
}

Expand All @@ -68,7 +69,7 @@ namespace infra

std::size_t BufferingStreamReader::Available() const
{
return buffer.size() + input.Available();
return buffer.size() - bufferIndex + input.Available();
}

std::size_t BufferingStreamReader::ConstructSaveMarker() const
Expand All @@ -86,14 +87,18 @@ namespace infra
}

if (marker < buffer.size())
{
index = marker;
bufferIndex = index;
}
}

void BufferingStreamReader::Read(infra::ConstByteRange from, infra::ByteRange& to)
std::size_t BufferingStreamReader::Read(infra::ConstByteRange from, infra::ByteRange& to)
{
infra::Copy(from, infra::Head(to, from.size()));
to.pop_front(from.size());
index += from.size();
return from.size();
}

void BufferingStreamReader::StoreRemainder()
Expand Down
3 changes: 2 additions & 1 deletion infra/stream/BufferingStreamReader.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,14 @@ namespace infra
void Rewind(std::size_t marker) override;

private:
void Read(infra::ConstByteRange range, infra::ByteRange& to);
std::size_t Read(infra::ConstByteRange range, infra::ByteRange& to);
void StoreRemainder();

private:
infra::BoundedDeque<uint8_t>& buffer;
infra::StreamReaderWithRewinding& input;
std::size_t index = 0;
std::size_t bufferIndex = 0;
};
}

Expand Down
4 changes: 4 additions & 0 deletions infra/stream/test/TestBufferingStreamReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,10 @@ TEST_F(BufferingStreamReaderTest, Available)

EXPECT_CALL(input, Available()).WillOnce(testing::Return(1));
EXPECT_FALSE(reader.Empty());

reader.ExtractContiguousRange(1);
EXPECT_CALL(input, Available()).WillOnce(testing::Return(1));
EXPECT_EQ(2, reader.Available());
}

TEST_F(BufferingStreamReaderTest, ConstructSaveMarker)
Expand Down
13 changes: 12 additions & 1 deletion protobuf/echo/Echo.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,9 @@ namespace services

void EchoOnStreams::DataReceived()
{
if (limitedReader != infra::none)
ContinueReceiveMessage();

while (readerPtr != nullptr && methodDeserializer == nullptr && !readerAccess.Referenced())
{
if (limitedReader == infra::none)
Expand All @@ -149,6 +152,12 @@ namespace services
if (limitedReader != infra::none)
ContinueReceiveMessage();
}

if (!readerAccess.Referenced() && limitedReader != infra::none)
{
bufferedReader = infra::none;
readerPtr = nullptr;
}
}

void EchoOnStreams::StartReceiveMessage()
Expand Down Expand Up @@ -190,8 +199,10 @@ namespace services
if (readerAccess.Referenced())
readerAccess.SetAction([this]()
{
auto& self = *this;
ReaderDone();
DataReceived();
// ReaderDone() may result in readerAccess' completion callback being reset, which invalidates the saved this pointer
self.DataReceived();
});
else
ReaderDone();
Expand Down
2 changes: 1 addition & 1 deletion services/network/EchoOnConnection.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ namespace services
private:
struct LimitedReader
{
LimitedReader(infra::SharedPtr<infra::StreamReaderWithRewinding>&& reader);
explicit LimitedReader(infra::SharedPtr<infra::StreamReaderWithRewinding>&& reader);

infra::SharedPtr<infra::StreamReaderWithRewinding> reader;
infra::LimitedStreamReaderWithRewinding limitedReader;
Expand Down
18 changes: 18 additions & 0 deletions services/network/test/TestEchoOnConnection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -143,3 +143,21 @@ TEST_F(EchoOnConnectionTest, DataReceived_while_service_method_is_executing)

service.MethodDone();
}

TEST_F(EchoOnConnectionTest, DataReceived_to_complete_service_method)
{
infra::StdVectorInputStream::WithStorage stream(infra::inPlace, std::vector<uint8_t>{ 1, 10, 2, 8 });
auto readerPtr = infra::UnOwnedSharedPtr(stream.Reader());
EXPECT_CALL(connection, ReceiveStream()).WillOnce(testing::Return(readerPtr));
EXPECT_CALL(connection, AckReceived());
connection.Observer().DataReceived();

infra::StdVectorInputStream::WithStorage stream2(infra::inPlace, std::vector<uint8_t>{ 5 });
readerPtr = infra::UnOwnedSharedPtr(stream2.Reader());
EXPECT_CALL(connection, ReceiveStream()).WillOnce(testing::Return(readerPtr));
EXPECT_CALL(connection, AckReceived());
EXPECT_CALL(service, Method(5));
connection.Observer().DataReceived();

service.MethodDone();
}

0 comments on commit 37d7878

Please sign in to comment.