-
Notifications
You must be signed in to change notification settings - Fork 70
Client API
There is currently a low level client for C++1 available. There will be high-level level clients available in the future. This page describes the low level client API. Hopefully, someone will create clients for other languages. PRs are more than welcome.
To use the tank client
#include <tank_client.h>
and link against libtank
$> clang++ -std=c++14 app.cpp -ltank
uint32_t produce(const std::vector<std::pair<topic_partition, std::vector<msg>>> &req);
Use the produce() method to send messages to Tank. it accepts a single argument, a vector that holds the updates to apply. topic_partition defined as
using topic_partition = std::pair<strwlen8_t, uint16_t>;
and the first element of it is the topic name and the second the partition index
whereas msg is defined as
struct msg
{
strwlen32_t content;
uint64_t ts;
strwlen8_t key;
};
this should be self explanatory. content is the content of the message, ts the creation timestamp in milliseconds, and key(optional) the key for the message.
This method returns an ID that identifies the request, or 0 for error. See later for request ID semantics.
Example call:
const auto reqId = client.produce(
{
{{"notifications", 0},
{
{"new order", Timings::Milliseconds::SysTime()},
{"out of stock product", Timings::Milliseconds::SysTime()},
{"150", Timings::Milliseconds::SysTime(), "orders_today"}
}
}
});
In this example, we publish to topic "notifications", partition 0, 3 messages.
Notice that the third has a key("orders_today") whereas the other do not.
When you use produce(), all messages for each topic/partition you specify are placed together in a bundle
.
See Core Concepts for bundle semantics.
uint32_t consume(const std::vector<std::pair<topic_partition, std::pair<uint64_t, uint32_t>>> &req,
const uint64_t maxWait,
const uint32_t minSize);
Use the consume() method to consume messages from Tank.
The first argument and describes the data we want to retrieve.
topic_partition is described earlier in this page. The second value in the pair, is another std::pair<>, and its values are:
the absolute sequence number of the first message you are interested in and
the maximum number of bytes the broker should stream for the specified topic_partition, based on the base absolute sequence number
When you produce to a broker, you send 1+ messages in a single request. Those messages are stored together in a bundle. Each message is identified by an absolute sequence number, which is monotonically incremented. The first message of a topic is always assigned sequence number 1.
When you want to consume messages, you specify the sequence number of the first message you are interested in, and the broker will return that message and other that follow it; however many it returns is limited by the maximum number of bytes to be streamed (see earlier).
The broker won't stream return individual messages. Just like it stores together messages into bundles, it will return bundles. The tank client is going to process the bundles (which may require decompression, and other pre-processing) and extract the messages from the bundles. Because the broker doesn't perform boundary checks for performance and simplicity, partial bundles may be returned; the tank client will deal with those and will extract only whole messages. This is similar to Kafka's produce and consume semantics.
See Core Concepts for more information.
The second specifies how long to wait, in milliseconds, if we have requested to receiving only new data (tailing the log). If this set, the broker will try to respond with data until within that time limit; it no data are published in that time, it will respond with an empty response for that topic/partition.
The third argument is related to the second. If you are tailing the topic (i.e asked to retrieve new data only), the broker will attempt to collect no less that much data and will respond as soon as it has collected that much or more (or the wait time has expired). This is a hint, and is not always taken into account. This helps with cases where you 'd like to get multiple messages in the response not just 1, for efficiency and simplicity of processing.
This method returns an ID that identifies the request, or 0 for error. See later for request ID semantics.
Example call:
const auto reqId = client.consume(
{
{
{"notifications", 0},
{0, 4 * 1024}
},
{
{"orders", 0}
{UINT64_MAX, 4 * 1024}
},
{
{"clicks", 1},
{nextSeq, 8 * 1024}
}
}, 2000, 0
);
In this example, we are asking to consume from 3 different partitions.
The first is for topic "notifications" and partition id 0, and we specified that we want the broker to stream messages starting from the first message available, whatever it's sequence number, by setting the base absolute sequence number to special value 0. We also specified that we want up to 4k worth of data for that.
The second is for topic "orders" and partition id 0, and we specified that we want the broker to stream only messages published to the broker from now on, by specifying the special sequence number UINT64_MAX, and we 'd like upto 4k worth of data.
The third is for topic "clicks" and partition id 0. This time, we have been tracking consumed messages and we know that we want to consume starting from a a message stored in variable nextSeq. This time, we ask for upto 8k worth of data.
The 2nd argument to consume(), the maximum wait time when we are tailing from the topic (which applies to the request for "orders"/0) is specified as 2 seconds(2,000ms), and the 3d argument is set to 0, because we want the broker to respond immediately if any messages are published, no matter how much that amounts to in terms of bytes. See earlier for description of that argument.
void poll(uint32_t timeoutMS);
You should use poll() in a loop, or whenever you want the client to exchange data with brokers. It will read and parse responses from the brokers, and perform other tasks if needed.
When you execute poll(), messages consumed from the broker, faults reported by the broker or generated by the client, and produce acknowledgements may become available, and you should check if any are available, if you are interested in them.
To access collected messages, faults, and product acknowledgements, use the following inspection methods:
const std::vector<partition_content>&consumed() const;
After poll() returns, 0 or more messages from earlier consume requests may have been retrieved, parsed, collected, and are now available to the application via consumed().
partition_content is declared as:
struct partition_content
{
uint32_t clientReqId;
strwlen8_t topic;
uint16_t partition;
range_base<consumed_msg *, uint32_t> msgs;
struct
{
// to get more messages
// consume from (topic, partition) starting from seqNum,
// and set fetchSize to be at least minFetchSize
uint64_t seqNum;
uint32_t minFetchSize;
} next;
};
clientReqId is the request for the consume() call that requested this content. That was the return value of that consume() call.
topic is the topic and partition the partition that holds the messages.
msgs is a range, for 1+ consumed messages. See later.
next{} should be used if you want to retrieve the next messages past the message set in this partition_content. It contains the next sequence number you should use in a subsequent consume() call, and minFetchSize is the minimum fetch size you should use for that call. If the consume call responsible for this content specified a very low min fetch size, and the client wasn't able to parse/extract any single message, next will likely be the sequence number generated in that earlier call, but with an adjusted minFetchSize so that you 'll be able to retry the call and this time specify a high enough min.fetch size that will get you at least 1 new message.
Example call:
for (;;)
{
client.poll(1000);
for (const auto &it : client.consumed())
{
Print("Consumed from ", it.topic, ".", it.partition, ", next {", it.next.seqNum, ", ", it.next.minFetchSize, "}\n");
for (const auto mit : it.msgs)
{
Print("[", mit->key, "]:", mit->content, "(", mit->seqNum, ")\n");
}
}
}
const switch::vector<TankClient::fault> &faults() const;
After poll() returns, 0 or more faults, either reported by the broker, or generated by the tank client, may have been captured, and you can access them using faults().
fault is declared as:
struct fault
{
uint32_t clientReqId;
enum class Type : uint8_t
{
UnknownTopic = 1,
UnknownPartition,
Access,
Network,
BoundaryCheck
} type;
enum class Req : uint8_t
{
Consume = 1,
Produce
} req;
strwlen8_t topic;
uint16_t partition;
union {
struct
{
uint64_t firstAvailSeqNum;
uint64_t highWaterMark;
};
} ctx;
};
This is self-explanatory. ctx{} may be set based on the type of fault. For now, this is only set for type BoundaryCheck
.
Example call:
client.poll(1000);
for (const auto &it : client.faults())
{
Print("Fault for ", it.clientReqId, "\n");
switch (it.type)
{
case TankClient::fault::Type::BoundaryCheck:
Print("firstAvailSeqNum = ", it.ctx.firstAvailSeqNum, ", hwMark = ", it.ctx.highWaterMark, "\n");
break;
default:
break;
}
}
const std::vector<TankClient::produce_ack> &produce_acks() const;
After poll() returns, 0 or more acknowledgements to earlier produce() requests may have become available. They can be accessed via produce_acks()
produce_ack is declared as:
struct produce_ack
{
uint32_t clientReqId;
strwlen8_t topic;
uint16_t partition;
};
The fields are self-explanatory.
Example call:
client.poll();
for (const auto &it : client.produce_acks())
{
Print("Produced OK for produce request ", it.clientReqId, ", topic ", it.topic, ", partition ", it.partition, "\n");
}
This page is a WIP. Expect updates with more examples and more in-depth/expanded explanation of the various methods and semantics. The API will expand soon, to support new functionality.