Skip to content

Round Robin Queries

JamesC edited this page May 19, 2018 · 2 revisions

Unlike the ZMQ REQ socket, a single DEALER socket can connect to multiple endpoints, which enables us to connect to multiple BC servers with a single ZMQ socket. When connected to multiple endpoints, DEALER socket will send out requests in a round-robin fashion.

round-robin-queries

If the client app is aware of or can determine the multi-endpoint topology beforehand, this can be used to implement naive query-balancing in a simple fashion.

In our example below, we send 4 fetch transaction queries from our DEALER socket, which is connected to 3 endpoints. The simple script logic can be described as the following:

  • Submit 4 fetch-tx queries in round-robin-fashion
  • While loop (as long as there are pending requests)
    • Poll DEALER socket for received messages

Note: For simplicity, we have not implemented query resends if a pending request times out. Also, it would be important to randomise the order of endpoint connections for multiple clients querying with the same round-robin logic, which is also not implemented in our example.

For our round-robin fetch-transaction query script we get the following printout below. Notice that the response sequence is not necessarily in order of query submission, since there are no ordering guaranties when querying independent endpoints.

Response 2
Serialised Transaction: 01000000014596c5caf914e0e79999acc20aea6d3356bd4d2223e30d5c2dbc05cd82498e20010000001716001415d94d61f79ede6249525bf3863265f3ab1c300cffffffff0240d2df03000000001976a9143271bdf25c28ec5733b3faac3c3adb2b6fd0d15c88accd9fc6310900000017a914bec3b1aee2a7548b5c06d0106ac49d8ea4a3e65d8700000000

Response 0
Serialised Transaction: 01000000018d502eb92d848d1ab8b65849f10ed401066aedb89c66ffffb7764b6f787f2d6601000000171600142ea369f136693cf850f7355ab0cf57de82a7a300ffffffff0220e9ef01000000001976a9143271bdf25c28ec5733b3faac3c3adb2b6fd0d15c88ac24df21920900000017a91463ee43a4c9d8e9509c9eb1268f956085e0859b118700000000

Response 3
Serialised Transaction: 01000000015559e946e71d9187e2597e691d8acf7b479413a4afb1ed790533515a9d73c029010000001716001457592acab0bb4d9b438d15622ce867e4da45d8b4ffffffff0240d2df03000000001976a9143271bdf25c28ec5733b3faac3c3adb2b6fd0d15c88ac0dd2ccab0a00000017a914a7f7286473897c44347dc3949baf8a20d302320b8700000000

Response 1
Serialised Transaction: 0100000002d5032a3afd3101de17018d35f7328f5879c5506587993d6fc98529321fb24a230100000017160014d7156afdde8ba7344b1aa039ca971e061fa1eda7ffffffff2337773f4bcfe2b4d17e3ab347ed95294e465080e006817f43c25edd591b3574010000001716001488ff41323bf8a0d1512ee7269c644bb607f29901ffffffff0240d2df03000000001976a9143271bdf25c28ec5733b3faac3c3adb2b6fd0d15c88ace44e13940900000017a9143dca02b56711f6225a0dcaeddb8efa037d18dbae8700000000

The script code in its entirety:

#include <iostream>
#include <chrono>
#include <bitcoin/protocol.hpp>
#include <bitcoin/bitcoin.hpp>


int main() {

    // Setup of zmq context & socket type.
    //--------------------------------------------------------------------------

    bc::protocol::zmq::context my_context(true); //started
    bc::protocol::zmq::socket my_dealer(
        my_context,
        bc::protocol::zmq::socket::role::dealer
    );

    // Connect to client socket to dealer endpoint.
    //--------------------------------------------------------------------------

    bc::config::endpoint public_endpoint1("tcp://testnet1.libbitcoin.net:19091");
    bc::config::endpoint public_endpoint2("tcp://testnet2.libbitcoin.net:19091");
    bc::config::endpoint public_endpoint3("tcp://testnet3.libbitcoin.net:19091");

    // Omitted: connect error handling
    my_dealer.connect(public_endpoint1);
    my_dealer.connect(public_endpoint2);
    my_dealer.connect(public_endpoint3);

    // TX hashes to be queried (little endian serialisation):
    //--------------------------------------------------------------------------
    auto transaction_hash0 = bc::to_chunk(bc::base16_literal(
        "9457929c26b885b211259b4ab41a17a956ce4d8a67203508cdabcbbb2e446815"));
    auto transaction_hash1 = bc::to_chunk(bc::base16_literal(
        "8d502eb92d848d1ab8b65849f10ed401066aedb89c66ffffb7764b6f787f2d66"));
    auto transaction_hash2 = bc::to_chunk(bc::base16_literal(
        "df1f13c15d101f9b59ea5d8ff3d53967f8b1f9bd85293135723a95872d69a64c"));
    auto transaction_hash3 = bc::to_chunk(bc::base16_literal(
        "0cb0c54b0778ed3e8c9f813b9bf9301563cde28448e8af67f59ea0688de81df7"));

    bc::data_stack transaction_hash_stack =
    {
        transaction_hash0,
        transaction_hash1,
        transaction_hash2,
        transaction_hash3
    };

    // Compose & send request message.
    //--------------------------------------------------------------------------
    bc::code ec;
    uint32_t request_id(0);

    // Create a pending_requests to track which messages are pending.
    std::map<uint32_t, std::chrono::steady_clock::time_point> pending_requests;

    // Send requests. Automatically sent to 3 endpoints in round-robin fashion
    for (size_t i = 0; i < transaction_hash_stack.size() ; i++)
    {
        bc::protocol::zmq::message my_request;

        std::string command = "blockchain.fetch_transaction";

        // my_request.enqueue(delimiter); // Omit if using dealer socket.
        my_request.enqueue(bc::to_chunk(command));
        my_request.enqueue(bc::to_chunk(bc::to_little_endian(request_id)));
        my_request.enqueue(transaction_hash_stack[i]);

        // Socket send: Success/Failure
        if ((ec = my_request.send(my_dealer)))
        {
            // possible errors:
            std::cout << "Error sending message" << i << std::endl;
            std::cout << ec.message() << std::endl;
        }

        // Note: Request timeout & resend not implemented:
        pending_requests[request_id] = std::chrono::steady_clock::now();
        request_id++;
    }

    // Poll socket and receive reply.
    //--------------------------------------------------------------------------

    // Note: Lost messages do not trigger resend, but result endless loop.
    // Note: No reply ordering guarantees (async).

    // Setup poller for our socket
    bc::protocol::zmq::poller my_poller;
    my_poller.add(my_dealer);
    bc::protocol::zmq::identifier my_dealer_id = my_dealer.id();
    bc::protocol::zmq::identifiers socket_ids;

    // Main Polling Loop.
    while (pending_requests.size() > 0)
    {

        socket_ids = my_poller.wait(200);

        if (socket_ids.contains(my_dealer_id))
        {

            // Receive the message from the socket
            // zmq::message:receive() is blocking if no message available.
            bc::protocol::zmq::message server_response;
            server_response.receive(my_dealer);

            std::string my_message_command =
                server_response.dequeue_text();

            uint32_t received_request_id;
            server_response.dequeue(received_request_id);

            if (pending_requests.count(received_request_id) == 1)
            {
                std::cout << "Response "
                          << received_request_id
                          << std::endl;

                bc::data_chunk reply_payload;
                server_response.dequeue(reply_payload);

                // Use stream to format response data chunk.
                bc::data_source reply_byte_stream(reply_payload);
                bc::istream_reader reply_byte_stream_reader(reply_byte_stream);

                // istream class stream filter methods, eg error code reading.
                if ((ec = reply_byte_stream_reader.read_error_code()))
                {
                    std::cout << "Error from response "
                              << received_request_id
                              << ": "
                              << ec.message()
                              << std::endl;
                }

                // Read transaction
                auto transaction_bytes =
                    reply_byte_stream_reader.read_bytes();

                std::cout << "Serialised Transaction: "
                          << bc::encode_base16(transaction_bytes)
                          << "\n"
                          << std::endl;

                // Remove pending request from queue.
                pending_requests.erase(received_request_id);

            }
        }
    }

    // Close socket.
    my_dealer.stop();

    // Close context.
    my_context.stop();

    return 0;
}
Clone this wiki locally