Skip to content

Client API

Mark Papadakis edited this page Dec 1, 2016 · 25 revisions

This wiki page describes the API and client semantics of the C++ client.
Please see the new Java Tank client library for how to use the Java library to interface with Tank.

To use the tank client

#include <tank_client.h>

and link against libtank

$> clang++ -std=c++1z app.cpp -ltank

Before you use the client to connect to a stand-alone tank broker (for clustered mode, there will be a different method), you need to set the endpoint for default broker for all topic/partitions.

To set the default broker endpoint:

client.set_default_leader("127.0.0.1:1025");

the only argument is the endpoint. A Tank endpoint can be formatted as either (for example), 127.0.0.1, 127.0.0.1:1025, :1025. You may also use the tank:// scope as well. You can explicitly map topics to specific standalone Tank instances(brokers), like so:

client.set_topic_leader("updates", ":11011");
client.set_topic_leader("sales", "10.5.5.25:11011");

so that all consume and produce requests for the "updates" topic will be routed to 127.0.0.1:11011, and all requests for the "sales" topic will be routed to 10.5.5.25:11011. Those static routing rules have precedence over the default leader route. That is, if no specific route has been specified for a topic, the default route will be used instead.

Core semantics

The low-level client can have multiple requests in-flight. That is, once you made a request, it will not block until it gets back a response from the broker, nor will it disallow new requests until you have received a response for the first request. You can have as many pending requests, and responses for them may come out of order (though almost always they will be returned in-order).

Most requests(true for publish and consume requests) are identified by a client request ID, which is returned by the the respective TankClient method call. When you get a response or an error, you can check the client request ID to identify the matching request. This is a powerful execution model and is designed for flexibility and performance.

The client will also intelligently deal with transient network or unavailability errors, by retaining the outgoing requests and retrying them if needed and if possible. See later for retry semantics.

For example, you may be generating 100s of messages/second; ideally you 'd use some bundling scheme so that you could take advantage of bundle compression semantics and publish say, 10 of those/time as opposed to 1/time as soon as it is generated(although support for broker-side batching is planned), but if you choose to just immediately publish one at time, you won't need to wait for a response; you can just invoke the publish() method which will return immediately, and later you will get an acknowledgment for each of those requests.

Issuing Requests

produce()

uint32_t produce(const std::vector<std::pair<topic_partition, std::vector<msg>>> &req);  

Use the produce() method to send messages to Tank. it accepts a single argument, a vector that holds the updates to apply. topic_partition defined as

using topic_partition = std::pair<strwlen8_t, uint16_t>;

and the first element of it is the topic name and the second the partition index

whereas msg is defined as

struct msg
{
        strwlen32_t content;
        uint64_t ts;
        strwlen8_t key;
};

this should be self explanatory. content is the content of the message, ts the creation timestamp in milliseconds, and key(optional) the key for the message.

This method returns an ID that identifies the request, or 0 for error. See later for request ID semantics.

Example call:

const auto reqId = client.produce(
        {       
                {{"notifications", 0},
                        {       
                                {"new order", Timings::Milliseconds::SysTime()},
                                {"out of stock product", Timings::Milliseconds::SysTime()},
                                {"150", Timings::Milliseconds::SysTime(), "orders_today"}
                        }       
                }       
        });   

In this example, we publish to topic "notifications", partition 0, 3 messages. Notice that the third has a key("orders_today") whereas the other do not. When you use produce(), all messages for each topic/partition you specify are placed together in a bundle. See Core Concepts for bundle semantics.

consume()

uint32_t consume(const std::vector<std::pair<topic_partition, std::pair<uint64_t, uint32_t>>> &req, 
  const uint64_t maxWait, 
  const uint32_t minSize);

Use the consume() method to consume messages from Tank.
The first argument and describes the data we want to retrieve.
topic_partition is described earlier in this page. The second value in the pair, is another std::pair<>, and its values are:
the absolute sequence number of the first message you are interested in and
the maximum number of bytes the broker should stream for the specified topic_partition, based on the base absolute sequence number

When you produce to a broker, you send 1+ messages in a single request. Those messages are stored together in a bundle. Each message is identified by an absolute sequence number, which is monotonically incremented. The first message of a topic is always assigned sequence number 1.

When you want to consume messages, you specify the sequence number of the first message you are interested in, and the broker will return that message and other that follow it; however many it returns is limited by the maximum number of bytes to be streamed (see earlier).

The broker won't stream return individual messages. Just like it stores together messages into bundles, it will return bundles. The tank client is going to process the bundles (which may require decompression, and other pre-processing) and extract the messages from the bundles. Because the broker doesn't perform boundary checks for performance and simplicity, partial bundles may be returned; the tank client will deal with those and will extract only whole messages. This is similar to Kafka's produce and consume semantics.
See Core Concepts for more information.

The second specifies how long to wait, in milliseconds, if we have requested to receiving only new data (tailing the log). If this set, the broker will try to respond with data until within that time limit; it no data are published in that time, it will respond with an empty response for that topic/partition.

The third argument is related to the second. If you are tailing the topic (i.e asked to retrieve new data only), the broker will attempt to collect no less that much data and will respond as soon as it has collected that much or more (or the wait time has expired). This is a hint, and is not always taken into account. This helps with cases where you 'd like to get multiple messages in the response not just 1, for efficiency and simplicity of processing.

This method returns an ID that identifies the request, or 0 for error. See later for request ID semantics.

Example call:

const auto reqId = client.consume(
                {
                        {
                                {"notifications", 0},
                                {0, 4 * 1024}
                        },
                        {
                                {"orders", 0}
                                {UINT64_MAX, 4 * 1024}
                        },
                        {
                                {"clicks", 1},
                                {nextSeq, 8 * 1024}
                        }
                }, 2000, 0
        );

In this example, we are asking to consume from 3 different partitions.
The first is for topic "notifications" and partition id 0, and we specified that we want the broker to stream messages starting from the first message available, whatever it's sequence number, by setting the base absolute sequence number to special value 0. We also specified that we want up to 4k worth of data for that.
The second is for topic "orders" and partition id 0, and we specified that we want the broker to stream only messages published to the broker from now on, by specifying the special sequence number UINT64_MAX, and we 'd like upto 4k worth of data.
The third is for topic "clicks" and partition id 0. This time, we have been tracking consumed messages and we know that we want to consume starting from a a message stored in variable nextSeq. This time, we ask for upto 8k worth of data.

The 2nd argument to consume(), the maximum wait time when we are tailing from the topic (which applies to the request for "orders"/0) is specified as 2 seconds(2,000ms), and the 3d argument is set to 0, because we want the broker to respond immediately if any messages are published, no matter how much that amounts to in terms of bytes. See earlier for description of that argument.

Alternative methods for accessing a single topic's partition data

You can alternatively use TankClient::consume_from() and TankClient::produce_to() if you only want to consume from a single topic or produce to a single topic respectively.

However, please make sure to always use the canonical TankClient::consume() and TankClient::produce() methods if you want to consume or produce to multiple partitions, because those methods will batch the requests that will reach the same broker together and therefore will require far fewer transmission time and processing time from the broker. Unless you absolutely only want to access a single partition, use those methods.

Getting results

void poll(uint32_t timeoutMS);

You should use poll() in a loop, or whenever you want the client to exchange data with brokers. It will read and parse responses from the brokers, and perform other tasks if needed. When you execute poll(), messages consumed from the broker, faults reported by the broker or generated by the client, and produce acknowledgements may become available, and you should check if any are available, if you are interested in them.
You can also use should_poll(), which returns true if there's work to be done and you need to keep invoking poll() until any pending requests have been delivered and any pending responses have been returned.

while (tankClient.should_poll())
{
   tankClient.poll(1000);
   // check for responses and faults
}

To access collected messages, faults, and product acknowledgements, use the following inspection methods:


const std::vector<partition_content>&consumed() const;

After poll() returns, 0 or more messages from earlier consume requests may have been retrieved, parsed, collected, and are now available to the application via consumed().

partition_content is declared as:

        struct partition_content
        {
                uint32_t clientReqId;
                strwlen8_t topic;
                uint16_t partition;
                range_base<consumed_msg *, uint32_t> msgs;

                struct
                {
                        // to get more messages
                        // consume from (topic, partition) starting from seqNum,
                        // and set fetchSize to be at least minFetchSize
                        uint64_t seqNum;
                        uint32_t minFetchSize;
                } next;
        };

clientReqId is the request for the consume() call that requested this content. That was the return value of that consume() call.
topic is the topic and partition the partition that holds the messages.
msgs is a range, for 1+ consumed messages. See later.
next{} should be used if you want to retrieve the next messages past the message set in this partition_content. It contains the next sequence number you should use in a subsequent consume() call, and minFetchSize is the minimum fetch size you should use for that call. If the consume call responsible for this content specified a very low min fetch size, and the client wasn't able to parse/extract any single message, next will likely be the sequence number generated in that earlier call, but with an adjusted minFetchSize so that you 'll be able to retry the call and this time specify a high enough min.fetch size that will get you at least 1 new message.

Example call:

for (;;)
{       
        client.poll(1000);

        for (const auto &it : client.consumed())
        {       
                Print("Consumed from ", it.topic, ".", it.partition, ", next {", it.next.seqNum, ", ", it.next.minFetchSize, "}\n"); 
                for (const auto mit : it.msgs)
                {       
                        Print("[", mit->key, "]:", mit->content, "(", mit->seqNum, ")\n"); 
                }       
        }       
}

const Switch::vector<TankClient::fault> &faults() const;

After poll() returns, 0 or more faults, either reported by the broker, or generated by the tank client, may have been captured, and you can access them using faults().

fault is declared as:

struct fault
{
        uint32_t clientReqId;

        enum class Type : uint8_t
        {
                UnknownTopic = 1,
                UnknownPartition,
                Access,
                Network,
                BoundaryCheck
        } type;

        enum class Req : uint8_t
        {
                Consume = 1,
                Produce
        } req;

        strwlen8_t topic;
        uint16_t partition;

        union {
                struct
                {
                        uint64_t firstAvailSeqNum;
                        uint64_t highWaterMark;
                };
        } ctx;
};

This is self-explanatory. ctx{} may be set based on the type of fault. For now, this is only set for type BoundaryCheck.

Example call:

client.poll(1000);

for (const auto &it : client.faults())
{
        Print("Fault for ", it.clientReqId, "\n");

        switch (it.type)
        {
                case TankClient::fault::Type::BoundaryCheck:
                        Print("firstAvailSeqNum = ", it.ctx.firstAvailSeqNum, ", hwMark = ", it.ctx.highWaterMark, "\n");
                        break;

                default:
                        break;
        }
}

const std::vector<TankClient::produce_ack> &produce_acks() const;

After poll() returns, 0 or more acknowledgements to earlier produce() requests may have become available. They can be accessed via produce_acks()

produce_ack is declared as:

struct produce_ack
{
        uint32_t clientReqId;
        strwlen8_t topic;
        uint16_t partition;
};

The fields are self-explanatory.

Example call:

client.poll();

for (const auto &it : client.produce_acks())
{
        Print("Produced OK for produce request ", it.clientReqId, ", topic ", it.topic, ", partition ", it.partition, "\n");
}

There is also a new should_poll() TankClient method. It returns true if we should poll now or in the future, because there are pending acknowledgements or responses from the Tank brokers.
Example use:

if (tankClient.should_poll())
    tankClient.poll(8e2);

The dedicated Tank thread pattern

A common pattern is to use a dedicated thread which interfaces with Tank, and you may queue publish or consume requests that are executed in the run loop like so:

std::thread([&client, &queue, &lock]()
{
   TankClient client;

   for (;;)
   {
        lock.lock();
        // drain the queue, and execute client.produce() or client.consume()
        lock.unlock();

        client.poll(1e3);

        // consider client.captured_faults(), etc.
   }
}).detach();

This works great, except when you 'd like other threads to queue some requests and immediately wake up your tank thread which is likely asleep in TankClient::poll(), to process all queued requests. Without that functionality, in this example, you may wait for upto 1 seconds(1e3 milliseconds) until that happens.
There is a useful method you can use to interrupt poll, so that it will return and thus wake up your thread, and immediately consider the queued requests. That method is TankClient::interrupt_poll(). When invoked, it will force poll() to return, thereby returning control to its callee immediately.
So, continuing our example, if one of the threads wanted to queue, say, a produce request it could do it like so:

lock.lock();
queue.push_back(new produce_request(...));
lock.unlock();
client.interrupt_poll();

Client configuration

By default, as of version 0.20, the low-level Tank client will automatically queue outgoing requests and will retry them in case of network errors, based on a simple exponential backoff scheme, until it either succeeds again or it fails too many times.

So if, for example, your Tank standalone broker is stopped so that you can update your configuration or create more topics or partitions and restarted afterwards, the client will re-establish the connection and forward the queued pending requests to it, in order (if that happens within a short amount of time).

If the client fails over 3 times to establish a connection, the broker will become unavailable for 1 minute, and all requests in that time will fail. After that 1 minute, the client will try again in case the problem is resolved. The retry times and time until retry thresholds will be configurable in a future update.

If you would like all requests to fail immediately if the client can't communicate with the broker, as opposed to retrying as described earlier, you can use the TankClient::set_retry_strategy() method. When this retry strategy is selected, on network failure, the broker will flagged as unavailable for 8 seconds and all requests will fail until after 8 seconds when the connection will be retried.

For now, two retry strategies are implemented.

  • TankClient::RetryStrategy::RetryAlways
  • TankClient::RetryStrategy::RetryNever

Use TankClient::set_retry_strategy() to select either.

Please note that the client will retry all requests(if RetryAlways is selected) if it didn't get a chance to send the request, but it will only retry idempotent (e.g consume requests, not publish) requests if it was able to dispatch the whole requests but haven't gotten back a response.

This is because the client cannot deterministically know if, after having queued the request by writing it to the socket buffer, the broker has received it, processed it, sent a response, or the response failed to reach the client. It could have failed in any of those above steps, but we can't know. Thus, to avoid the likelihood of publishing the same messages more than once, the client will never retry in that case, it will fail the request.

In the future, another option will be made available for selecting a different retry strategy for non-idempotent requests.

Managing Brokers

You can use TankClient::discover_partitions() and TankClient::create_topic() directly, although you should use tank-cli to create new topics and discover topic partitions.


This page is a WIP. Expect updates with more examples and more in-depth/expanded explanation of the various methods and semantics. The API will expand soon, to support new functionality.