Skip to content

Commit

Permalink
Add protocol message compression support:
Browse files Browse the repository at this point in the history
* Peers negotiate compression via HTTP Header "X-Offer-Compression: lz4"
* Messages greater than 70 bytes and protocol type messages MANIFESTS,
  ENDPOINTS, TRANSACTION, GET_LEDGER, LEDGER_DATA, GET_OBJECT,
  and VALIDATORLIST are compressed
* If the compressed message is larger than the uncompressed message
  then the uncompressed message is sent
* Compression flag and the compression algorithm type are included
  in the message header
* Only LZ4 block compression is currently supported
  • Loading branch information
gregtatcam authored and manojsdoshi committed Apr 7, 2020
1 parent ade5eb7 commit 758a379
Show file tree
Hide file tree
Showing 14 changed files with 873 additions and 35 deletions.
1 change: 1 addition & 0 deletions Builds/CMake/RippledCore.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -855,6 +855,7 @@ target_sources (rippled PRIVATE
src/test/overlay/ProtocolVersion_test.cpp
src/test/overlay/cluster_test.cpp
src/test/overlay/short_read_test.cpp
src/test/overlay/compression_test.cpp
#[===============================[
test sources:
subdir: peerfinder
Expand Down
153 changes: 153 additions & 0 deletions src/ripple/basics/CompressionAlgorithms.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
//------------------------------------------------------------------------------
/*
This file is part of rippled: https://github.com/ripple/rippled
Copyright (c) 2020 Ripple Labs Inc.
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================

#ifndef RIPPLED_COMPRESSIONALGORITHMS_H_INCLUDED
#define RIPPLED_COMPRESSIONALGORITHMS_H_INCLUDED

#include <ripple/basics/contract.h>
#include <lz4.h>
#include <algorithm>

namespace ripple {

namespace compression_algorithms {

/** Convenience wrapper for Throw
* @param message Message to log/throw
*/
inline void doThrow(const char *message)
{
Throw<std::runtime_error>(message);
}

/** LZ4 block compression.
* @tparam BufferFactory Callable object or lambda.
* Takes the requested buffer size and returns allocated buffer pointer.
* @param in Data to compress
* @param inSize Size of the data
* @param bf Compressed buffer allocator
* @return Size of compressed data, or zero if failed to compress
*/
template<typename BufferFactory>
std::size_t
lz4Compress(void const* in,
std::size_t inSize, BufferFactory&& bf)
{
if (inSize > UINT32_MAX)
doThrow("lz4 compress: invalid size");

auto const outCapacity = LZ4_compressBound(inSize);

// Request the caller to allocate and return the buffer to hold compressed data
auto compressed = bf(outCapacity);

auto compressedSize = LZ4_compress_default(
reinterpret_cast<const char*>(in),
reinterpret_cast<char*>(compressed),
inSize,
outCapacity);
if (compressedSize == 0)
doThrow("lz4 compress: failed");

return compressedSize;
}

/**
* @param in Compressed data
* @param inSize Size of compressed data
* @param decompressed Buffer to hold decompressed data
* @param decompressedSize Size of the decompressed buffer
* @return size of the decompressed data
*/
inline
std::size_t
lz4Decompress(std::uint8_t const* in, std::size_t inSize,
std::uint8_t* decompressed, std::size_t decompressedSize)
{
auto ret = LZ4_decompress_safe(reinterpret_cast<const char*>(in),
reinterpret_cast<char*>(decompressed), inSize, decompressedSize);

if (ret <= 0 || ret != decompressedSize)
doThrow("lz4 decompress: failed");

return decompressedSize;
}

/** LZ4 block decompression.
* @tparam InputStream ZeroCopyInputStream
* @param in Input source stream
* @param inSize Size of compressed data
* @param decompressed Buffer to hold decompressed data
* @param decompressedSize Size of the decompressed buffer
* @return size of the decompressed data
*/
template<typename InputStream>
std::size_t
lz4Decompress(InputStream& in, std::size_t inSize,
std::uint8_t* decompressed, std::size_t decompressedSize)
{
std::vector<std::uint8_t> compressed;
std::uint8_t const* chunk = nullptr;
int chunkSize = 0;
int copiedInSize = 0;
auto const currentBytes = in.ByteCount();

// Use the first chunk if it is >= inSize bytes of the compressed message.
// Otherwise copy inSize bytes of chunks into compressed buffer and
// use the buffer to decompress.
while (in.Next(reinterpret_cast<void const**>(&chunk), &chunkSize))
{
if (copiedInSize == 0)
{
if (chunkSize >= inSize)
{
copiedInSize = inSize;
break;
}
compressed.resize(inSize);
}

chunkSize = chunkSize < (inSize - copiedInSize) ? chunkSize : (inSize - copiedInSize);

std::copy(chunk, chunk + chunkSize, compressed.data() + copiedInSize);

copiedInSize += chunkSize;

if (copiedInSize == inSize)
{
chunk = compressed.data();
break;
}
}

// Put back unused bytes
if (in.ByteCount() > (currentBytes + copiedInSize))
in.BackUp(in.ByteCount() - currentBytes - copiedInSize);

if ((copiedInSize == 0 && chunkSize < inSize) || (copiedInSize > 0 && copiedInSize != inSize))
doThrow("lz4 decompress: insufficient input size");

return lz4Decompress(chunk, inSize, decompressed, decompressedSize);
}

} // compression

} // ripple

#endif //RIPPLED_COMPRESSIONALGORITHMS_H_INCLUDED
3 changes: 3 additions & 0 deletions src/ripple/core/Config.h
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,9 @@ class Config : public BasicConfig
std::string SSL_VERIFY_FILE;
std::string SSL_VERIFY_DIR;

// Compression
bool COMPRESSION = false;

// Thread pool configuration
std::size_t WORKERS = 0;

Expand Down
1 change: 1 addition & 0 deletions src/ripple/core/ConfigSections.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ struct ConfigSection
// VFALCO TODO Rename and replace these macros with variables.
#define SECTION_AMENDMENTS "amendments"
#define SECTION_CLUSTER_NODES "cluster_nodes"
#define SECTION_COMPRESSION "compression"
#define SECTION_DEBUG_LOGFILE "debug_logfile"
#define SECTION_ELB_SUPPORT "elb_support"
#define SECTION_FEE_DEFAULT "fee_default"
Expand Down
3 changes: 3 additions & 0 deletions src/ripple/core/impl/Config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -454,6 +454,9 @@ void Config::loadFromString (std::string const& fileContents)
if (getSingleSection (secConfig, SECTION_WORKERS, strTemp, j_))
WORKERS = beast::lexicalCastThrow <std::size_t> (strTemp);

if (getSingleSection (secConfig, SECTION_COMPRESSION, strTemp, j_))
COMPRESSION = beast::lexicalCastThrow <bool> (strTemp);

// Do not load trusted validator configuration for standalone mode
if (! RUN_STANDALONE)
{
Expand Down
103 changes: 103 additions & 0 deletions src/ripple/overlay/Compression.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
//------------------------------------------------------------------------------
/*
This file is part of rippled: https://github.com/ripple/rippled
Copyright (c) 2020 Ripple Labs Inc.
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================

#ifndef RIPPLED_COMPRESSION_H_INCLUDED
#define RIPPLED_COMPRESSION_H_INCLUDED

#include <ripple/basics/CompressionAlgorithms.h>
#include <ripple/basics/Log.h>
#include <lz4frame.h>

namespace ripple {

namespace compression {

std::size_t constexpr headerBytes = 6;
std::size_t constexpr headerBytesCompressed = 10;

enum class Algorithm : std::uint8_t {
None = 0x00,
LZ4 = 0x01
};

enum class Compressed : std::uint8_t {
On,
Off
};

/** Decompress input stream.
* @tparam InputStream ZeroCopyInputStream
* @param in Input source stream
* @param inSize Size of compressed data
* @param decompressed Buffer to hold decompressed message
* @param algorithm Compression algorithm type
* @return Size of decompressed data or zero if failed to decompress
*/
template<typename InputStream>
std::size_t
decompress(InputStream& in, std::size_t inSize, std::uint8_t* decompressed,
std::size_t decompressedSize, Algorithm algorithm = Algorithm::LZ4) {
try
{
if (algorithm == Algorithm::LZ4)
return ripple::compression_algorithms::lz4Decompress(in, inSize,
decompressed, decompressedSize);
else
{
JLOG(debugLog().warn()) << "decompress: invalid compression algorithm "
<< static_cast<int>(algorithm);
assert(0);
}
}
catch (...) {}
return 0;
}

/** Compress input data.
* @tparam BufferFactory Callable object or lambda.
* Takes the requested buffer size and returns allocated buffer pointer.
* @param in Data to compress
* @param inSize Size of the data
* @param bf Compressed buffer allocator
* @param algorithm Compression algorithm type
* @return Size of compressed data, or zero if failed to compress
*/
template<class BufferFactory>
std::size_t
compress(void const* in,
std::size_t inSize, BufferFactory&& bf, Algorithm algorithm = Algorithm::LZ4) {
try
{
if (algorithm == Algorithm::LZ4)
return ripple::compression_algorithms::lz4Compress(in, inSize, std::forward<BufferFactory>(bf));
else
{
JLOG(debugLog().warn()) << "compress: invalid compression algorithm"
<< static_cast<int>(algorithm);
assert(0);
}
}
catch (...) {}
return 0;
}
} // compression

} // ripple

#endif //RIPPLED_COMPRESSION_H_INCLUDED
53 changes: 44 additions & 9 deletions src/ripple/overlay/Message.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#ifndef RIPPLE_OVERLAY_MESSAGE_H_INCLUDED
#define RIPPLE_OVERLAY_MESSAGE_H_INCLUDED

#include <ripple/overlay/Compression.h>
#include <ripple/protocol/messages.h>
#include <boost/asio/buffer.hpp>
#include <boost/asio/buffers_iterator.hpp>
Expand Down Expand Up @@ -47,27 +48,61 @@ namespace ripple {

class Message : public std::enable_shared_from_this <Message>
{
using Compressed = compression::Compressed;
using Algorithm = compression::Algorithm;
public:
/** Constructor
* @param message Protocol message to serialize
* @param type Protocol message type
*/
Message (::google::protobuf::Message const& message, int type);

public:
/** Retrieve the packed message data. */
/** Retrieve the packed message data. If compressed message is requested but the message
* is not compressible then the uncompressed buffer is returned.
* @param compressed Request compressed (Compress::On) or
* uncompressed (Compress::Off) payload buffer
* @return Payload buffer
*/
std::vector <uint8_t> const&
getBuffer () const
{
return mBuffer;
}
getBuffer (Compressed tryCompressed);

/** Get the traffic category */
std::size_t
getCategory () const
{
return mCategory;
return category_;
}

private:
std::vector <uint8_t> mBuffer;
std::size_t mCategory;
std::vector <uint8_t> buffer_;
std::vector <uint8_t> bufferCompressed_;
std::size_t category_;
std::once_flag once_flag_;

/** Set the payload header
* @param in Pointer to the payload
* @param payloadBytes Size of the payload excluding the header size
* @param type Protocol message type
* @param comprAlgorithm Compression algorithm used in compression,
* currently LZ4 only. If None then the message is uncompressed.
* @param uncompressedBytes Size of the uncompressed message
*/
void setHeader(std::uint8_t* in, std::uint32_t payloadBytes, int type,
Algorithm comprAlgorithm, std::uint32_t uncompressedBytes);

/** Try to compress the payload.
* Can be called concurrently by multiple peers but is compressed once.
* If the message is not compressible then the serialized buffer_ is used.
*/
void compress();

/** Get the message type from the payload header.
* First four bytes are the compression/algorithm flag and the payload size.
* Next two bytes are the message type
* @param in Payload header pointer
* @return Message type
*/
int getType(std::uint8_t const* in) const;
};

}
Expand Down
6 changes: 4 additions & 2 deletions src/ripple/overlay/impl/ConnectAttempt.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ ConnectAttempt::onHandshake (error_code ec)
if (! sharedValue)
return close(); // makeSharedValue logs

req_ = makeRequest(!overlay_.peerFinder().config().peerPrivate);
req_ = makeRequest(!overlay_.peerFinder().config().peerPrivate, app_.config().COMPRESSION);

buildHandshake(req_, *sharedValue, overlay_.setup().networkID,
overlay_.setup().public_ip, remote_endpoint_.address(), app_);
Expand Down Expand Up @@ -264,7 +264,7 @@ ConnectAttempt::onShutdown (error_code ec)
//--------------------------------------------------------------------------

auto
ConnectAttempt::makeRequest (bool crawl) -> request_type
ConnectAttempt::makeRequest (bool crawl, bool compressionEnabled) -> request_type
{
request_type m;
m.method(boost::beast::http::verb::get);
Expand All @@ -275,6 +275,8 @@ ConnectAttempt::makeRequest (bool crawl) -> request_type
m.insert ("Connection", "Upgrade");
m.insert ("Connect-As", "Peer");
m.insert ("Crawl", crawl ? "public" : "private");
if (compressionEnabled)
m.insert("X-Offer-Compression", "lz4");
return m;
}

Expand Down
Loading

1 comment on commit 758a379

@intelliot
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please sign in to comment.