Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Working toward a fully zero copy receive path #320

Closed
jonesmz opened this issue Jul 11, 2019 · 51 comments
Closed

Working toward a fully zero copy receive path #320

jonesmz opened this issue Jul 11, 2019 · 51 comments
Assignees

Comments

@jonesmz
Copy link
Contributor

jonesmz commented Jul 11, 2019

This is related to #248, #191, and #289

I was working more with the v5 properties, and realized that when we receive the properties from the network we allocate a new buffer for the std::vector<> to store the mqtt::v5::property_varients, even if the property_varient only holds ref properties.

I'd like to see mqtt_cpp support zero allocations for handling the data from incoming packets.

We can do that with the following changes:

  1. Store a pool of shared_ptr<char[]>, which each message is written into.
  2. All callbacks that endpoint.hpp calls when passing messages up to the user level code pass "ref" objects, such as mqtt::string_view, mqtt::will_ref, mqtt::property_ref
  3. The callbacks also pass an mqtt::any, which holds the shared_ptr<char[]> for the message that all of the ref objects refer to.

A more complicated, but "better" way to handle this is to:

  1. Store a pool of shared_ptr<char[]>, which each divisible part of a message is written into.
  2. This means that each message contents, username, password, v5::property, and so on are written into their own buffer. We'll need to have each buffer be contiguous so that higher level classes like string_view and stuff can refer to them directly, but we should be able to keep memory fragmentation down by setting a minimum buffer size of e.g. 256, and always retrieve buffers as multiples of twos.
  3. All callbacks that endpoint.hpp calls when passing messages up to the user level code pass "ref" objects, such as mqtt::string_view, mqtt::will_ref, mqtt::properties.
  4. Create some kind of mqtt::owning_string_view that implements the API from mqtt::string_view, but also holds a handle to the std::shared_ptr<char[]>
  5. Modify mqtt::will and mqtt::property to always hold a reference to the std::shared_ptr<char[]>, and when they are created by user code, serialize directly into a newly allocated std::shared_ptr<char[]>.
  6. Each of the callbacks that endpoint.hpp calls don't need to be modified because each of the arguments already holds a std::shared_ptr<char[]>.

Further, for the above handling of properties we have two ways to avoid allocating that std::vector.

We can either have std::vector<> pull it's storage from the same memory pool of char[]'s, or we can create a new custom data type "property_cursor" that has a pointer to the entire message, and iterates over the message on an as-needed basis to construct the property objects on the fly.

If we implement this by having each chunk of the message given to it's own buffer (more complicated, but would be better over all in my opinion), then we should have the std::vector<> use the memory pool as an allocator.

If we have each message stored in a single buffer, we should implement this using the "property_cursor" concept.

@redboltz I'd like to hear your thoughts on the matter.

@redboltz
Copy link
Owner

Could you give me some example?

I think https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901100 publish is better for the example.

@redboltz
Copy link
Owner

redboltz commented Jul 11, 2019

I expect as follows:

  1. allocate std::shared_ptr<char[1]> for fixed_header. (Or different way).
  2. call async_read() for 1 byte.
  3. allocate std::shared_ptr<char[4]> for remaining_length.
  4. call async_read() for 1 byte. (or 4 bytes?)
  5. ..... then read topic properties payload ..
    I want to know how many std::shared_ptr<char[]>s are allocated.
    And the relationship of the async_read bytes and allocated size.

@jonesmz
Copy link
Contributor Author

jonesmz commented Jul 11, 2019

In your example:

I expect as follows:

  1. allocate std::shared_ptr<char[1]> for fixed_header. (Or different way).

Not needed, since we only need that data on the stack.

  1. call async_read() for 1 byte.

Right, this is how it's currently done.

allocate std::shared_ptr<char[4]> for remaining_length.

Also unnecessary, since the length is only used on the stack, like we currently do things.

call async_read() for 1 byte. (or 4 bytes?)

Right, this is how it's currently done.

..... then read topic properties payload ..
I want to know how many std::shared_ptr<char[]>s are allocated.
And the relationship of the async_read bytes and allocated size.

For a publish message, there are 3 categories of things that would be given an std::shared_ptr<char[]>

  1. Topic
  2. Message
  3. Properties.

But not all properties would involve allocating a std::shared_ptr<char[]>;

Only the properties that have variable length contents, such as ContentType, UserProperty, CorrelationID, and so on, would have a std::shared_ptr<char[]> created.

For Connect messages we also have

  1. Username
  2. Password

What motivates me to discuss this is

  1. Copying is expensive
  2. Allocations are expensive (and can block)

Copying:

We mitigate copying being an expensive operation by ensuring that data enters the program (from boost::asio) directly into the one and only buffer that it will ever need to live in. Having the buffer stored as a std::shared_ptr<char[]> allows for access to the buffer to be shared among multiple data structures cheaply.

For example, we store topics in test_broker.hpp in multiple places, and those topics are stored for potentially a long time.

Messages and properties are stored in test_broker.hpp if they are retained, or as part of a will.

All of the data that's passed to a user program (server, client, whatever) can potentially be used for a long time as well, and allocating storage for them in this way allows for the program using mqtt_cpp to never need to copy the data, only pass the shared_ptr<> to the data around, which is significantly less expensive than copying large multi megabyte strings.

(The max size of a topic is 65536 bytes (64KB) and the max message size is 268,435,456 (256MB) )

Copying data that approach those sizes is very cpu intensive.


Allocations:

By having endpoint accept as one of it's class template, and constructor parameters, an allocator (defaulting to std::allocator, of course), and then allocating all memory allocations through the provided allocator, end user code can achieve better control over the allocation behavior of the program.

As I mentioned in my initial post: A good allocator design in this situation is to always return regions of memory in powers of two (Probably no smaller than 32bytes). You just round up to the nearest power of two any time user code requests an allocation.

Further, any time you need to allocate from the OS, you allocate many multiples of the requested size. E.g., if the requested allocation is X bytes, you round X to the next power of two, and then allocate 10x the result, which is stored in your memory pool as 9 chunks saved for later, and 1 chunk returned to the code requesting the allocation.

Finally, when your std::shared_ptr<char[]> is destructed, the custom deleter you provide to std::shared_ptr<char[]> returns the freed memory back to your allocator, instead of to the global allocator. You do this by calling std::allocate_shared, instead of std::make_shared. The memory returned like that becomes available for use in the next message processed.

But mqtt_cpp doesn't need to know anything about how the provided allocator behaves. All it needs to do is use it to allocate the storage for the memory buffers used to hold the various chunks of messages received from the network.


The way I would control the allocation behavior in my broker is like this:

  1. One or more boost asio threads running mqtt_cpp::endpoints
  2. One management thread (either running on boost::asio, or separate)

A single allocator would be provided to all mqtt_cpp::endpoints, and the broker.

When the allocator reaches a pre-specified low-water mark, a signal would be sent to the management thread indicating that more buffers are needed. The management thread would then allocate more memory, and feed the new buffers to the allocator. The reason this would be done on the management thread is to avoid waiting on the system allocator on the main processing thread.

If an mqtt_cpp::endpoint requests memory from the allocator and the allocator doesn't have enough available, the allocator does an allocation from the system immediately, since continuing processing is more important than waiting on another thread to provide memory.

Eventually the system would reach a steady state, where no allocations come from the system allocator, and would instead be served from the pooled buffers in the custom allocator.


What we need to do in order to support this is to

  1. Modify mqtt_cpp::endpoint to use a custom allocator for all allocations (including containers from the standard library)
  2. Modify mqtt_cpp::endpoint so that every variable-length part of a message is received directly into an std::shared_ptr<char[]>, so that user level code doesn't need to make copies, but instead just shares pointers.

@redboltz
Copy link
Owner

I think that basically this approach is good. Let me clarify that I understand the concept correctly. And I have some comments.

  1. Use std::shared_ptr<char[]> for string/binary types from the initial point. The initial point means before async_read() call, create std::shared_ptr<char[]> and then pass it toasync_read(). std::shared_ptr<char[]> work well with Boost.Asio async APIs.
    However, AFAIK, std::shared_ptr<char[]> is C++17 feature. boost::shared_ptr supports it. See https://www.boost.org/doc/libs/1_70_0/libs/smart_ptr/doc/html/smart_ptr.html#shared_array
    So, we might need to use mqtt::shared_ptr pattern.

  2. How do we know allocated size via allocated object? I think that there is no way to get size from std::shared_ptr<char[]>. Why we don't use std::shared_ptr<vector<char>> or std::shared_ptr<std::string>? I guess that minimize times of memory allocation. But I think that std::shared_ptr<std::string> is acceptable choice because it is supported in C++14 and easy to get allocated size. In addition, std::shared_ptr<char[]> cannot work with std::make_shared. But std::shared_ptr<std::string> can do. See https://wandbox.org/permlink/3adDc4YrqQwK4nAr I think that memory allocation times is even as std::shared_ptr<char[]>.
    What do you think?

  3. I guess that we create std::shared_ptr<char[]> for string/binary objects after we know their size. There are two types of length expression in MQTT. One is up to 4byte variable length, the other is fixed 2 bytes. For the former one, read one byte each async_read() until the finish indicator is detected. For the latter one, prepare 2 bytes member variables for size something like, char len_buf_[2] (similar to fixed_header_). And call async_read() for two bytes. Then we know the size, and allocate the shared_ptr<char[]> with known size. Is that right?

@jonesmz
Copy link
Contributor Author

jonesmz commented Jul 12, 2019

  1. Yes, we'll read the size from the packet, and then allocate space, and then pass the allocated buffer directly to boost::asio.

I didn't know that shared_ptr to array type was C++17. It looks like there's a way to do this without using boost::shared_ptr, but I think boost::shared_ptr will support "allocate_shared" better, and that's probably important. So the mqtt::shared_ptr approach is reasonable.

https://stackoverflow.com/questions/13061979/shared-ptr-to-an-array-should-it-be-used

  1. Having too many levels of pointers can also cause performance problems. I don't want to have a shared_ptr -> string -> actual data. Instead it would be nice to have only shared_ptr -> actual data.

How about something like this? (Not a complete implementation...)
https://pastebin.com/USspNpSs

  1. Yes, exactly.

@redboltz
Copy link
Owner

  1. It seems that the combination clang++ and libc++ doesn't work even if C++17. But boost works fine. mqtt::shared_ptr approach is fine for me.
    See https://wandbox.org/permlink/eiWC4ZPuzxDO1WkS

  2. I think that

struct size_sp { 
    mqtt::string_view as_string_view() const {
        std::string_view(size, data.get());
    }
    std::size_t size;
    mqtt::shared_ptr<char[]> data;
};

is good enough. What do you think?

@jonesmz
Copy link
Contributor Author

jonesmz commented Jul 12, 2019

  1. Sure
  2. Sure, that's sufficient. My version is a bit more user friendly though :-) I don't have a strong preference. Simple is fine.

BTW, It's important that the data is const after it's read out of boost::asio. If we dont, then users might edit the data in a way that isn't thread safe.

@redboltz
Copy link
Owner

I tried yet another implementation of 2.

See
https://wandbox.org/permlink/duR5ze8iicTJms9X

What do you think?

@jonesmz
Copy link
Contributor Author

jonesmz commented Jul 12, 2019

Yea that looks like a good approach. Lets use that!

@jonesmz
Copy link
Contributor Author

jonesmz commented Jul 12, 2019

Though, I think for the constructor, it would be wise to do something like:

    buffer(mqtt::shared_ptr<const char[]> ptr, size_t size)
        : mqtt::string_view size_(ptr.get(), size)
        , ptr_(std::move(ptr))
        , size_(size)
    { }

We don't want the buffer class to ever have a writable buffer. It's basically a read only "view" class. Passing in a shared_ptr<const char[]> (instead of having the buffer class allocate it's own) guarantees that immutability property.

We could actually extend this a bit to have the buffer class contain not only a size, but also an offset. E.g. maybe someone only wants a portion of the buffer, after we've read it from boost::asio. If we can let them provide an offset / start position, then this buffer class can represent any arbitrary range inside of the buffer owned by ptr_.

Imagine a message that is comma separated values, for example. User code may want to grab the 3rd value out of the comma separated values, but that value is 10MB out of a 200MB string. If we support an offset indicator, the user can make a read-only substring that's attached to the same shared_ptr<const char[]> as the buffer that mqtt_cpp::endpoint provided to the callback, but which only represents the specific part of the string that they want.

@jonesmz
Copy link
Contributor Author

jonesmz commented Jul 12, 2019

Ah, actually, now that I think about it. We maybe shouldn't inherit from mqtt::string_view.

There is a lot of code out there that accepts std::string_view by value and not by reference. Inheriting from mqtt::string_view like this will cause slicing of the object. The std::shared_ptr<const char[]> won't be passed to the function, only the mqtt::string_view that's being inherited from.

If we use private inheritance to hide the fact that mqtt::buffer (or whatever we name it) is an mqtt::string_view, then all of those functions won't be called automatically.

But then again, that's kind of the whole point of mqtt::string_view, that it doesn't have the ownership information. So, it's probably totally fine.

@redboltz
Copy link
Owner

Could you show me actual working code ? It is easy to understand for me.

You can edit https://wandbox.org/permlink/duR5ze8iicTJms9X and then click run and share button, then update the permalink.

@jonesmz
Copy link
Contributor Author

jonesmz commented Jul 12, 2019

https://wandbox.org/permlink/8ohhfDwKHernxJhT


#include <iostream>
#include <vector>
#include <boost/asio.hpp>
#include <boost/lexical_cast.hpp>

namespace as = boost::asio;

namespace mqtt
{

class buffer : public std::string_view {
public:
    buffer(std::shared_ptr<const char[]> ptr, std::size_t const length, std::size_t const offset = 0)
     : std::string_view(ptr.get()+offset, length-offset)
     , ptr_(std::move(ptr))
     , length_(length)
     , offset_(offset)
    {
      // If offset > size, throw exception.
    }

    buffer substr(std::size_t offset, std::size_t length)
    {
        // if offset and length result in going out of bounds, throw an exception.
        return buffer(ptr_, offset_+offset, length);
    }

public:
    std::shared_ptr<const char[]> ptr_;
    std::size_t length_;
    std::size_t offset_;
};

} // namespace mqtt

int main() {
    // Common io_service
    as::io_service ios;

    // Common Config
    constexpr std::uint16_t port = 12345;

    // Server
    as::ip::tcp::acceptor ac(ios, as::ip::tcp::endpoint(as::ip::tcp::v4(), port));
    as::ip::tcp::socket ss(ios);
    std::function<void()> do_accept;
    std::vector<as::ip::tcp::socket> connections;

    do_accept = [&] {
        ac.async_accept(
            ss,
            [&]
            (boost::system::error_code const& e) {
                std::cout << e.message() << std::endl;
                // read size of topic from boost::asio. Assume 5 bytes.
                std::size_t topicLen = 5;
                // Allocate space to read the topic into.
                std::shared_ptr<char[]> topicBuf = std::make_shared<char[]>(topicLen);
                as::async_read(
                    ss,
                    as::buffer(topicBuf.get(), topicLen),
                    [&, topicBuf, topicLen](boost::system::error_code const& e, std::size_t) {
                        std::cout << e.message() << std::endl;
                        std::cout << mqtt::buffer(topicBuf, topicLen) << std::endl;
                        
                        // read size of payload from boost::asio. Assume 7 bytes.
                        std::size_t payloadLen = 7;
                        // Allocate space to read the topic into.
                        std::shared_ptr<char[]> payloadBuf = std::make_shared<char[]>(payloadLen);
                        as::async_read(
                            ss,
                            as::buffer(payloadBuf.get(), payloadLen),
                            [payloadBuf, payloadLen](boost::system::error_code const& e, std::size_t) {
                                std::cout << e.message() << std::endl;
                                std::cout << mqtt::buffer(payloadBuf, payloadLen) << std::endl;
                            }
                        );
                    }
                );
            }
        );
                };
    do_accept();

    // Client
    as::ip::address address = boost::asio::ip::address::from_string("127.0.0.1");
    as::ip::tcp::endpoint ep(address, port);
    as::ip::tcp::socket cs(ios);
    cs.async_connect(
        ep,
        [&]
        (boost::system::error_code const& e) {
            std::cout << e.message() << std::endl;
            as::write(cs, as::buffer(std::string("topicpayload")));
        }
    );

    // Start
    ios.run();
}

@redboltz
Copy link
Owner

Thank you!

I think that this approach is very good.

  1. User can access buffer as read only.
  2. Zero copy.
  3. Simple.
  4. Offset functionality is useful.

Keeping original length and modify the copy of buffer as follows:

     : std::string_view(ptr.get()+offset, length-offset)

It's so elegant!

@jonesmz
Copy link
Contributor Author

jonesmz commented Jul 13, 2019

I think there is a bug in my constructor.

std::string_view(ptr.get()+offset, length-offset)

Should be std::string_view(ptr.get()+offset, length)

Because length is the length of the "string", not the length of the whole buffer.

@redboltz
Copy link
Owner

I updated your example.

See https://wandbox.org/permlink/v2ktJq5GsVeGdlbl

I think that your original constructor is right. But the arguments order at buffer() calling in substr() seems wrong.

    buffer substr(std::size_t offset, std::size_t length)
    {
        // if offset and length result in going out of bounds, throw an exception.
        return buffer(ptr_, offset_+offset, length);
    }

should be

    buffer substr(std::size_t offset, std::size_t length)
    {
        // if offset and length result in going out of bounds, throw an exception.
        return buffer(ptr_, length_, offset);
    }

@redboltz
Copy link
Owner

Still something wrong...

@redboltz
Copy link
Owner

Finally, I understand your constructor fix is right. In addition substr implementation should be fixed.
I added updated code.

First, I wrote a test for buffer:

https://wandbox.org/permlink/YgLC8uTB6lKbq625

Then updated the code:

https://wandbox.org/permlink/2n19bFiohTsBymbY

It works as I expedted.

class buffer : public std::string_view {
public:
    buffer(std::shared_ptr<const char[]> ptr, std::size_t const length, std::size_t const offset = 0)
     : std::string_view(ptr.get()+offset, length)
     , ptr_(std::move(ptr))
     , length_(length)
     , offset_(offset)
    {
      // If offset > size, throw exception.
    }

    buffer substr(std::size_t offset, std::size_t length)
    {
        // if offset and length result in going out of bounds, throw an exception.
        return buffer(ptr_, length, offset_+offset);
    }

public:
    std::shared_ptr<const char[]> ptr_;
    std::size_t length_;
    std::size_t offset_;
};

@redboltz
Copy link
Owner

In order to correct out of range checking, I updated the buffer as follows:
https://wandbox.org/permlink/WxNZ9s2Wf27RXlDr

offset_ is no longer needed. It is covered by string_view.

But I'd like to confirm the purpose of offset.

If offset is only a convenience for user reading the buffer, then offset_ isn't needed.

If offset is for endpoint.hpp developer (us), it might be needed. I mean allocate big memory such as all of remaining_length, and give a part of the memory to boost::async_read().
I think that we don't use such approach. I think that we allocate the memory for individual variable length binary/string such as ClientId, TopicFilter, Payload, Properties(string/binary type) and so on.

What do you think?

@redboltz
Copy link
Owner

The basic concept of my approach is member variables are always keep the initial values. And mqtt::string_view provides various view for users.

@jonesmz
Copy link
Contributor Author

jonesmz commented Jul 13, 2019

Offset would not be used by endpoint.jpp.

Actually, we don't even need to store length (aside from informing string_view) because shared_ptr knows the size of the allocation internally, no need to carry that around on our own.

@jonesmz
Copy link
Contributor Author

jonesmz commented Jul 13, 2019

How about this?

class buffer : public mqtt::string_view {
public:
    buffer(mqtt::shared_ptr_const_array ptr, std::size_t length)
        : mqtt::string_view(ptr.get(), length),
          ptr_(std::move(ptr))
    {
    }
    buffer(mqtt::shared_ptr_const_array ptr, mqtt::string_view view)
        : mqtt::string_view(view),
          ptr_(std::move(ptr))
    {
    }

    buffer substr(std::size_t offset, std::size_t length)
    {
        return buffer(ptr_, mqtt::string_view::substr(offset, length));
    }

private:
    mqtt::shared_ptr_const_array ptr_;
};

We could make buffer(mqtt::shared_ptr_const_array ptr, mqtt::string_view view) private to avoid mis-use, but I don't think there's any reason to do so.

There's no substantial reason that mqtt::buffer's shared_ptr needs to be what the provided string_view points to. It would be really strange if it wasn't, but any situation where mqtt::buffer converts to mqtt::string_view has no ownership semantics anyway, and there it doesn't matter what buffer the mqtt::string_view points to.

So we don't need to force users of the code to construct the mqtt::buffer as a string_view to the entire buffer if they really don't want to.

I'd say we can go even further and remove buffer(mqtt::shared_ptr_const_array ptr, std::size_t length), but you might prefer the simpler constructor.

@jonesmz
Copy link
Contributor Author

jonesmz commented Jul 13, 2019

I suppose that we could also technically do this:

class buffer : public mqtt::shared_ptr_const_array, public mqtt::string_view {
public:
    buffer(mqtt::shared_ptr_const_array ptr, std::size_t length)
        : mqtt::shared_ptr_const_array(std::move(ptr))
        , mqtt::string_view(mqtt::shared_ptr_const_array::get(), length)
    {
    }

    buffer(mqtt::shared_ptr_const_array ptr, mqtt::string_view view)
        : mqtt::shared_ptr_const_array(std::move(ptr))
        , mqtt::string_view(view)
    {
    }

    buffer substr(std::size_t offset, std::size_t length)
    {
        return buffer(*this, mqtt::string_view::substr(offset, length));
    }
};

template<class E, class T>
std::basic_ostream<E, T> & operator<< (std::basic_ostream<E, T> & os, buffer const & p)
{
    os << static_cast<mqtt::string_view const&>(p);
    return os;
}

Not saying that we should. Just that I think that would work.

Doing this provides user level code direct access to member functions of std::shared_ptr. Things like .get(), .reset(), and so on.

@jonesmz
Copy link
Contributor Author

jonesmz commented Jul 13, 2019

@redboltz
Copy link
Owner

I'm still thinking about the comment:
#320 (comment)

This is a response to below 2 comments:
#320 (comment)
#320 (comment)

Doing this provides user level code direct access to member functions of std::shared_ptr. Things like .get(), .reset(), and so on.

I don't understand above yet.

Consider the publish_handler.

Current code is

    using v5_publish_handler = std::function<
        bool(std::uint8_t fixed_header,
             mqtt::optional<packet_id_t> packet_id,
             mqtt::string_view topic_name,
             mqtt::string_view contents,
             std::vector<v5::property_variant> props)
    >;

and our idea before the comment is

    using v5_publish_handler = std::function<
        bool(std::uint8_t fixed_header,
             mqtt::optional<packet_id_t> packet_id,
             mqtt::buffer topic_name,
             mqtt::buffer contents,
             std::vector<v5::property_variant> props) // props contains mqtt::buffer
    >;

so far, so good.

But if we removed shared_ptr from mqtt:buffer, then we need to provide shared_ptr to users as follows:

    using v5_publish_handler = std::function<
        bool(std::uint8_t fixed_header,
             mqtt::optional<packet_id_t> packet_id,
             mqtt::buffer topic_name,
             mqtt::shared_ptr_const_array sp_topic_name,
             mqtt::buffer contents,
             mqtt::shared_ptr_const_array sp_topic_name,
             std::vector<v5::property_variant> props, // props contains mqtt::buffer
             std::vector<mqtt::shared_ptr_const_array> sp_props)
    >;

because when we implement broker, publish message is copied to all subscribers, and in order to avoid copy, we need mqtt::shared_ptr_const_array.

Am I missing something?

@redboltz
Copy link
Owner

Ah, I looked over this multiple inheritance. Never mind my comment above.

class buffer : public mqtt::shared_ptr_const_array, public mqtt::string_view {

@redboltz
Copy link
Owner

Doing this provides user level code direct access to member functions of std::shared_ptr. Things like .get(), .reset(), and so on.

I'm not sure it is worth to have. But I don't strongly disagree about that. It might help something unpredictable (at least for me currently) use case.

However, as you add the following operator,

template<class E, class T>
std::basic_ostream<E, T> & operator<< (std::basic_ostream<E, T> & os, buffer const & p)
{
    os << static_cast<mqtt::string_view const&>(p);
    return os;
}

some of operator should be provided.

Consider operator==.

    auto b = mqtt::buffer(sp, 8);
    b == b;

If we insert above comparison, a compile error is occurred.
I think that ordinary users expects string_view comparison.

But advanced user might want to compare original pointer. It is the same as the ownership as long as we on't use shared_ptr's aliasing constructor https://en.cppreference.com/w/cpp/memory/shared_ptr/shared_ptr (8)
We have no plan to use it.

So I think that mqtt::shared_ptr_const_array as a member variable is better than inherits from that.

buffer could have a getter for ptr_ e.g. get_shared_ptr_const_array() const { return ptr_; }.

@redboltz
Copy link
Owner

See
https://wandbox.org/permlink/QzDwIvIuqmqyZN6F

I added back length check and ownership comparison.

The name get_shared_ptr_const_array() might depend on the type information too much.
get_underlying_shared_ptr() could be better.

@redboltz
Copy link
Owner

redboltz commented Jul 14, 2019

BTW, if you don't mind, could you tell me where do you live? I'd like to know your timezone to communicate smoother. I live in Tokto, Japan. My timezone is JST.

@jonesmz
Copy link
Contributor Author

jonesmz commented Jul 14, 2019

I'm in Chicago. United States. (CST)

@jonesmz
Copy link
Contributor Author

jonesmz commented Jul 15, 2019

I think we can implement this using a finite state machine.

enum process_connect_phase
{
    phase_start, phase_header, phase_username, phase_username, phase_password, phase_password, phase_will, phase_end
}
struct ConnectData{ // flags, mqtt::optional<mqtt::buffer> username, mqtt::optional<mqtt::buffer> password, mqtt::optional<will>, mqtt::v5::properties };
void process_connect(async_handler_t func, process_connect_phase phase, ConnectData data = {})
{
    switch(phase)
    {
        case phase_start: // do any initialization here
        case phase_header:  boost::asio::async_read(..., [&](...) {
                                                        // verify that the header is valid.
                                                        // store flags in ConnectData
                                                        process_connect(std::move(func), phase_username, std::move(data));
                                                    });
        case phase_username: boost::asio::async(..., [&](...) {
                                                    // read username length
                                                    async_read_string(length, [](mqtt::buffer username) {
                                                        data.username = std::move(username);
                                                        process_connect(std::move(func), process_password, std::move(data));
                                                     });
                                                 });
        case will: // Call dedicated function that reads Wills and then calls a callback with the result.
                   // The callback will put the data retrieved into the ConnectData structure, and then
                   // call process_connect for the next phase.
        case properties: // Call dedicated function that reads properties and then calls a callback with the result.
                         // The callback will put the data retrieved into the ConnectData structure and then
                         // call process_connect for the next phase.
        case end: // any cleanup happens here
                  // and finally the end user's callback gets called with the contents of ConnectData.
    }
}

@redboltz
Copy link
Owner

Thank you for the advice. I think it is a good approach too.
Incremental receiving process is a kind of skip-able sequence. So switch-case based state management is good enough.

In addition, it could help avoiding code duplication. If the message is small enough to receive all at once, we can call something like this ios.post([] { process_connect(..., string_view) });. The string_view is sliding (shifting).

I restart implementing by this way.

@redboltz
Copy link
Owner

@jonesmz
Copy link
Contributor Author

jonesmz commented Jul 16, 2019

In addition, it could help avoiding code duplication. If the message is small enough to receive all at once, we can call something like this ios.post([] { process_connect(..., string_view) });. The string_view is sliding (shifting).

How would we detect that the message is small enough to receive all at once?

Is there some function that can be called from boost::asio to ask how much data is available to read immediately?

The way I envisioned this working, is that each packet type has a known structure. So for packets like the connect packet, we would read the packet in this way:

  1. async_read the fixed header
  2. async_read the length of the username
  3. allocate space for username, async_read the username
  4. async_read the length of the password
  5. allocate space for password, async_read the password

@jonesmz
Copy link
Contributor Author

jonesmz commented Jul 16, 2019

It's unfortunate that the mqtt protocol doesn't list all of the length values at the beginning of the packet. Since that would allow us to call boost::asio::async_read with multiple different buffers all at once.

But there's no way to do that with the mqtt protocol :(

@redboltz
Copy link
Owner

How about remaining length ?
https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901024

We need at most 5 times 1 byte read in the first phase.
And then mqtt_cpp store it as a member variable remaining_length.
It is expressed as size in my PoC code.

If size is less than lim then choose bulk read mode. Otherwise chunk read mode.
In bulk read mode, all payload https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901030 is written on the one shared_ptr buffer. Each element such as topic_filter cab ve read via string_view using buffer mechanism.

If size is not less that lim then chunk read mode. That means read MQTT element that has small size such as first 10 bytes of CONNECT message payload using internal (member variable) buffer.
If the reading process find variable length element such as string https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901010 then, read 2 bytes using internal buffer and get the string length. It is up to 65535 byte. Then allocate shared_ptr and call async_read. When async_read handler is called, call the parse function. It is expressed as step1 to step3 in my PoC code.The parsing functions are shared by bulk read mode and chunk read mode.
In chunk read mode, after parsing , we get some information from the parse result. And then store them somewhere, I use tuple in my PoC code, and choose the next state depend on the result, then call the state machine function.

In chunk read mode, some case could be inefficient.
For example, in SUBSCRIBE message, https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901168
the payload contains many topic filter entries and each of them are very small but the last one has 64K bytes.
It is reflected to remaining length so the chunk read mode is used. As the result of that, shared_ptr is allocated for each topic filter entries. So far, I accept this inefficiency. I guess that it is rare case. Because the length of topic filters are not so different in my experience.

However, same thing could happen properties. It has more various length. Fortunately, before each properties, we can read property length field https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901028. It is the same format as remaining length. If it is small enough, we can allocate all properties on one shared_ptr. That is recursive pattern of MQTT message.

It is still just idea. I will write more PoC code and share with you.

@redboltz
Copy link
Owner

My answer is

How would we detect that the message is small enough to receive all at once?

It is remaining length. In addition we can use property length.

Is there some function that can be called from boost::asio to ask how much data is available to read immediately?

I don't understand why we need to know that. The version of async_read() I'm using is not call callback handler until expected bytes are received. So we don't need to care about read immediately.
If the rest of message is not received, just waiting. But asynchronously waiting. So mqtt_cpp can do other things while waiting.

@jonesmz
Copy link
Contributor Author

jonesmz commented Jul 16, 2019

How about remaining length ?
https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901024

Perfect!

I understand now.

If the total message size is below $limit, as specified by end-user code, then read the entire message as a single allocation.

If the total message size is above $limit, then allocate each variable length object separately.

I don't understand why we need to know that. The version of async_read() I'm using is not call callback handler until expected bytes are received. So we don't need to care about read immediately.
If the rest of message is not received, just waiting. But asynchronously waiting. So mqtt_cpp can do other things while waiting.

I didn't understand before.

It is now clear that there is no need to do this.

Using the "remaining_length" field of the packet fixed header will work the way you explained.

@redboltz
Copy link
Owner

I noticed that my PoC code "chunk read" and "buld read" print is upside down, sorry.

Anyways, I want to share a subtle design choice.

See

void handle_receive(as::io_context& ioc, std::size_t size, std::size_t lim, cb_t cb) {
    if (size < lim) {
        std::cout << "chunk read" << std::endl; // misprint, actually buld read
        auto buf = std::make_shared<std::string>("", size);
        ioc.post(
            [buf, cb = std::move(cb)] {
                // Let's say buf is fullfilled
                std::string_view v1(*buf);
                auto r1 = step1(v1);
                auto v2 = v1.substr(r1.consumed);
                auto r2 = step2(v2);
                auto v3 = v2.substr(r2.consumed);
                auto r3 = step3(v3);
                assert(r3.consumed == v3.size());
                cb(r1.result, r2.result1, r2.result2, r3.result);
            }
        );
    }
    else {
        std::cout << "bulk read" << std::endl;
        handle_receive_impl(ioc,  size, std::move(cb));
    }
}

I considered three choices here.

At first, I want to use callback based approach both bulk read and chunk read because code duplication is minimized. However, there is problem. From where the callback should be called on bulk read. In the previous handler? It is not good because callback is stacking. If there are many properties or topic_filters on subscribe, then stack overflow could happen. The next idea is call io_context::post(). The argument is lambda expression that calls statemachine function with the next phase. However, post() en-queues the last of the io_context queue. So the other coming message could interleaved. In order to avoid it, we can use priority queue. I used to consider https://stackoverflow.com/questions/43385897/how-to-combine-strand-wrapper-and-priority-wrapper-on-boost-asio
However, it is too complicated and requires some special notation in the main loop, not just ioc.run(). It's difficult to use.

Finally, I chose simple return value based approach. Sharing code is limited as step1() to step3() in my PoC code. But I believe it is reasonable choice.

@jonesmz
Copy link
Contributor Author

jonesmz commented Jul 17, 2019

However, there is problem. From where the callback should be called on bulk read.

  1. In the previous handler? It is not good because callback is stacking. If there are many properties or topic_filters on subscribe, then stack overflow could happen.
  2. The next idea is call io_context::post(). The argument is lambda expression that calls statemachine function with the next phase. However, post() en-queues the last of the io_context queue. So the other coming message could interleaved.
  3. In order to avoid it, we can use priority queue. I used to consider https://stackoverflow.com/questions/43385897/how-to-combine-strand-wrapper-and-priority-wrapper-on-boost-asio
    However, it is too complicated and requires some special notation in the main loop, not just ioc.run(). It's difficult to use.

I think 2. is the best.

In the chunk read model, we're always using callbacks, so no matter what we need to have the various infrastructure to support the callbacks.

At this link, the documentation of async-read says https://www.boost.org/doc/libs/1_65_0/doc/html/boost_asio/reference/async_read/overload1.html

The function call always returns immediately.

Which implies that the function call does no reading of data from the stream until after it returns, before it calls the provided callback.

This should mean that the operation for asynchronously reading the data is being added to the end of the io_context's queue, just like any other operation that uses io_context::post().

So the other coming message could interleaved.

I don't believe that this is possible. boost::asio tcp sockets are "streams" of data. So if any new data arrives before we've finished reading from the stream, the data that has arrived will be en-queued to the back of the queue. The data that we are currently reading will stay in place.

As long as we don't have any of the following:

  1. Multiple calls to boost::asio::async_read at the same time, on any number of threads
  2. A call to boost::asio::async_read and also boost::asio::read (The synchronous version) happening at the same time, on any number of threads,
  3. Multiple calls to boost::asio::read (The synchronous version) happening on two or more threads at the same time

Then I believe we are guaranteed to have the data read from the stream in order, with no possibility of messages overlapping.

@redboltz
Copy link
Owner

I don't believe that this is possible. boost::asio tcp sockets are "streams" of data. So if any new data arrives before we've finished reading from the stream, the data that has arrived will be en-queued to the back of the queue. The data that we are currently reading will stay in place.

Nice comment and nice timing! I can save a lot of time :)
I had forgotten I call the next async_read() after all io_service::post() sequences are finished.
So interleaves are never happen.

I choose approach 2.
Thank you!

redboltz added a commit that referenced this issue Jul 18, 2019
redboltz added a commit that referenced this issue Jul 22, 2019
redboltz added a commit that referenced this issue Aug 13, 2019
Added buffer class that supports `mqtt::string_view` compatible
behavior and life keekping mechanism (optional).

Callback functions for users hold receive buffer directly via
`buffer`.

Removed `*_ref` properties. Ref or not ref is hidden by `buffer`.
redboltz added a commit that referenced this issue Aug 13, 2019
Added buffer class that supports `mqtt::string_view` compatible
behavior and life keekping mechanism (optional).

Callback functions for users hold receive buffer directly via
`buffer`.

Removed `*_ref` properties. Ref or not ref is hidden by `buffer`.
redboltz added a commit that referenced this issue Aug 13, 2019
Added buffer class that supports `mqtt::string_view` compatible
behavior and life keekping mechanism (optional).

Callback functions for users hold receive buffer directly via
`buffer`.

Removed `*_ref` properties. Ref or not ref is hidden by `buffer`.
redboltz added a commit that referenced this issue Aug 13, 2019
Added buffer class that supports `mqtt::string_view` compatible
behavior and life keekping mechanism (optional).

Callback functions for users hold receive buffer directly via
`buffer`.

Removed `*_ref` properties. Ref or not ref is hidden by `buffer`.
redboltz added a commit that referenced this issue Aug 13, 2019
Added buffer class that supports `mqtt::string_view` compatible
behavior and life keekping mechanism (optional).

Callback functions for users hold receive buffer directly via
`buffer`.

Removed `*_ref` properties. Ref or not ref is hidden by `buffer`.
redboltz added a commit that referenced this issue Aug 13, 2019
Added buffer class that supports `mqtt::string_view` compatible
behavior and life keekping mechanism (optional).

Callback functions for users hold receive buffer directly via
`buffer`.

Removed `*_ref` properties. Ref or not ref is hidden by `buffer`.
redboltz added a commit that referenced this issue Aug 18, 2019
Added buffer class that supports `mqtt::string_view` compatible
behavior and life keekping mechanism (optional).

Callback functions for users hold receive buffer directly via
`buffer`.

Removed `*_ref` properties. Ref or not ref is hidden by `buffer`.
Added `""_mb` user defined literals for `buffer`.

Added boost type_erasure.
`mqtt::socket` is type erased socket. It covers tcp, tls, ws, and wss.
Compile times become faster.

Replaced static_cast with boost::numeric_cast if overflow could happen.
Removed redundant static_cast.

Implemented efficient shared_ptr_array allocation.

If use boost (default), then use `boost::make_shared<char[]>(size)`.
If user defines MQTT_STD_SHARED_PTR_ARRAY,
    if __cplusplus is greater than 201703L (means C++20 or later),
        then `std::make_shared<char[]>(size)`,
        otherwise `std::shared_ptr<char[]>(new size)`
    is called.
redboltz added a commit that referenced this issue Aug 21, 2019
Added buffer class that supports `mqtt::string_view` compatible
behavior and life keekping mechanism (optional).

Callback functions for users hold receive buffer directly via
`buffer`.

Removed `*_ref` properties. Ref or not ref is hidden by `buffer`.
Added `""_mb` user defined literals for `buffer`.

Added boost type_erasure.
`mqtt::socket` is type erased socket. It covers tcp, tls, ws, and wss.
Compile times become faster.

Replaced static_cast with boost::numeric_cast if overflow could happen.
Removed redundant static_cast.

Implemented efficient shared_ptr_array allocation.

If use boost (default), then use `boost::make_shared<char[]>(size)`.
If user defines MQTT_STD_SHARED_PTR_ARRAY,
    if __cplusplus is greater than 201703L (means C++20 or later),
        then `std::make_shared<char[]>(size)`,
        otherwise `std::shared_ptr<char[]>(new size)`
    is called.
redboltz added a commit that referenced this issue Aug 22, 2019
@jonesmz jonesmz closed this as completed Aug 23, 2019
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants