Skip to content

Commit e736138

Browse files
committed
WIP: txnmgr
(cherry picked from commit 880fca6d0853b8b947ad00bafd206105080a0309)
1 parent c8ea87b commit e736138

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

54 files changed

+6806
-585
lines changed

CONFIGURATION.md

+2
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,8 @@ rebalance_cb | C | |
113113
offset_commit_cb | C | | | low | Offset commit result propagation callback. (set with rd_kafka_conf_set_offset_commit_cb()) <br>*Type: pointer*
114114
enable.partition.eof | C | true, false | false | low | Emit RD_KAFKA_RESP_ERR__PARTITION_EOF event whenever the consumer reaches the end of a partition. <br>*Type: boolean*
115115
check.crcs | C | true, false | false | medium | Verify CRC32 of consumed messages, ensuring no on-the-wire or on-disk corruption to the messages occurred. This check comes at slightly increased CPU usage. <br>*Type: boolean*
116+
transactional.id | P | | | high | The TransactionalId to use for transactional delivery. This enables reliability semantics which span multiple producer sessions since it allows the client to guarantee that transactions using the same TransactionalId have been completed prior to starting any new transactions. If no TransactionalId is provided, then the producer is limited to idempotent delivery. Note that enable.idempotence must be enabled if a TransactionalId is configured. The default is null, which means transactions cannot be used. Note that, by default, transactions require a cluster of at least three brokers which is the recommended setting for production; for development you can change this, by adjusting broker setting transaction.state.log.replication.factor. <br>*Type: string*
117+
transaction.timeout.ms | P | 1000 .. 2147483647 | 60000 | medium | The maximum amount of time in milliseconds that the transaction coordinator will wait for a transaction status update from the producer before proactively aborting the ongoing transaction. If this value is larger than the `transaction.max.timeout.ms` setting in the broker, the init_transactions() call will fail with ERR_INVALID_TRANSACTION_TIMEOUT. The transaction timeout must be at least 1000 ms larger than `message.timeout.ms` and `socket.timeout.ms`. <br>*Type: integer*
116118
enable.idempotence | P | true, false | false | high | When set to `true`, the producer will ensure that messages are successfully produced exactly once and in the original produce order. The following configuration properties are adjusted automatically (if not modified by the user) when idempotence is enabled: `max.in.flight.requests.per.connection=5` (must be less than or equal to 5), `retries=INT32_MAX` (must be greater than 0), `acks=all`, `queuing.strategy=fifo`. Producer instantation will fail if user-supplied configuration is incompatible. <br>*Type: boolean*
117119
enable.gapless.guarantee | P | true, false | false | low | **EXPERIMENTAL**: subject to change or removal. When set to `true`, any error that could result in a gap in the produced message series when a batch of messages fails, will raise a fatal error (ERR__GAPLESS_GUARANTEE) and stop the producer. Messages failing due to `message.timeout.ms` are not covered by this guarantee. Requires `enable.idempotence=true`. <br>*Type: boolean*
118120
queue.buffering.max.messages | P | 1 .. 10000000 | 100000 | high | Maximum number of messages allowed on the producer queue. This queue is shared by all topics and partitions. <br>*Type: integer*

INTRODUCTION.md

+102-3
Original file line numberDiff line numberDiff line change
@@ -701,15 +701,106 @@ which returns one of the following values:
701701
This method should be called by the application on delivery report error.
702702

703703

704+
### Transactional Producer
704705

706+
#### FIXME: misc
707+
708+
To make sure messages time out (in case of connectivity problems, etc) within
709+
the transaction, the `message.timeout.ms` configuration property must be
710+
set lower than the `transaction.timeout.ms`.
711+
If `message.timeout.ms` is not explicitly configured it will be adjusted
712+
automatically.
713+
714+
715+
#### Old producer fencing
716+
717+
If a new transactional producer instance is started with the same
718+
`transactional.id`, any previous still running producer
719+
instance will be fenced off at the next produce, commit or abort attempt, by
720+
raising a fatal error with the error code set to
721+
`RD_KAFKA_RESP_ERR__FENCED`.
722+
723+
724+
725+
### Exactly Once Semantics (EOS) and transactions
726+
727+
librdkafka supports Exactly One Semantics (EOS) as defined in [KIP-98](https://cwiki.apache.org/confluence/display/KAFKA/KIP-98+-+Exactly+Once+Delivery+and+Transactional+Messaging).
728+
For more on the use of transactions, see [Transactions in Apache Kafka](https://www.confluent.io/blog/transactions-apache-kafka/).
729+
730+
731+
The transactional consume-process-produce loop roughly boils down to the
732+
following pseudo-code:
733+
734+
```c
735+
/* Producer */
736+
rd_kafka_conf_t *pconf = rd_kafka_conf_new();
737+
rd_kafka_conf_set(pconf, "bootstrap.servers", "mybroker");
738+
rd_kafka_conf_set(pconf, "transactional.id", "my-transactional-id");
739+
rd_kafka_t *producer = rd_kafka_new(RD_KAFKA_PRODUCER, pconf);
740+
741+
rd_kafka_init_transactions(producer);
742+
743+
744+
/* Consumer */
745+
rd_kafka_conf_t *cconf = rd_kafka_conf_new();
746+
rd_kafka_conf_set(cconf, "bootstrap.servers", "mybroker");
747+
rd_kafka_conf_set(cconf, "group.id", "my-group-id");
748+
rd_kafka_conf_set(cconf, "enable.auto.commit", "false");
749+
rd_kafka_t *consumer = rd_kafka_new(RD_KAFKA_CONSUMER, cconf);
750+
rd_kafka_poll_set_consumer(consumer);
751+
752+
rd_kafka_subscribe(consumer, "inputTopic");
753+
754+
/* Consume-Process-Produce loop */
755+
while (run) {
756+
757+
/* Begin transaction */
758+
rd_kafka_begin_transaction(producer);
759+
760+
while (some_limiting_factor) {
761+
rd_kafka_message_t *in, *out;
762+
763+
/* Consume messages */
764+
in = rd_kafka_consumer_poll(consumer, -1);
765+
766+
/* Process message, generating an output message */
767+
out = process_msg(in);
768+
769+
/* Produce output message to output topic */
770+
rd_kafka_produce(producer, "outputTopic", out);
771+
772+
/ FIXME: or perhaps */
773+
rd_kafka_topic_partition_list_set_from_msg(processed, msg);
774+
/* or */
775+
rd_kafka_transaction_store_offset_from_msg(producer, msg);
776+
}
777+
778+
/* Commit the consumer offset as part of the transaction */
779+
rd_kafka_send_offsets_to_transaction(producer,
780+
"my-group-id",
781+
rd_kafka_position(consumer));
782+
/* or processed */
783+
784+
/* Commit the transaction */
785+
rd_kafka_commit_transaction(producer);
786+
}
787+
788+
rd_kafka_consumer_close(consumer);
789+
rd_kafka_destroy(consumer);
790+
rd_kafka_destroy(producer);
791+
```
792+
793+
**Note**: The above code is a logical representation of transactional
794+
program flow and does not represent the exact API parameter usage.
795+
A proper application will perform error handling, etc.
796+
See [`examples/transactions.cpp`](examples/transactions.cpp) for a proper example.
705797
706798
707799
## Usage
708800
709801
### Documentation
710802
711-
The librdkafka API is documented in the
712-
[`rdkafka.h`](src/rdkafka.h)
803+
The librdkafka API is documented in the [`rdkafka.h`](src/rdkafka.h)
713804
header file, the configuration properties are documented in
714805
[`CONFIGURATION.md`](CONFIGURATION.md)
715806
@@ -777,10 +868,18 @@ Configuration is applied prior to object creation using the
777868
rd_kafka_conf_destroy(rk);
778869
fail("Failed to create producer: %s\n", errstr);
779870
}
780-
871+
781872
/* Note: librdkafka takes ownership of the conf object on success */
782873
```
783874

875+
Configuration properties may be set in any order (except for interceptors) and
876+
may be overwritten before being passed to `rd_kafka_new()`.
877+
`rd_kafka_new()` will verify that the passed configuration is consistent
878+
and will fail and return an error if incompatible configuration properties
879+
are detected. It will also emit log warnings for deprecated and problematic
880+
configuration properties.
881+
882+
784883
### Termination
785884

786885
librdkafka is asynchronous in its nature and performs most operation in its

src-cpp/rdkafkacpp.h

+48
Original file line numberDiff line numberDiff line change
@@ -279,6 +279,12 @@ enum ErrorCode {
279279
ERR__GAPLESS_GUARANTEE = -148,
280280
/** Maximum poll interval exceeded */
281281
ERR__MAX_POLL_EXCEEDED = -147,
282+
/** Unknown broker */
283+
ERR__UNKNOWN_BROKER = -146,
284+
/** Functionality not configured */
285+
ERR__NOT_CONFIGURED = -145,
286+
/** Instance has been fenced */
287+
ERR__FENCED = -144,
282288

283289
/** End internal error codes */
284290
ERR__END = -100,
@@ -2915,6 +2921,48 @@ class RD_EXPORT Producer : public virtual Handle {
29152921
* purging to finish. */
29162922
};
29172923

2924+
/**
2925+
* Transactional API
2926+
*
2927+
* Requires Kafka broker version v0.11.0 or later
2928+
*/
2929+
2930+
/**
2931+
* @brief
2932+
*
2933+
* @fixme blocking?
2934+
*/
2935+
virtual ErrorCode init_transactions (int timeout_ms,
2936+
std::string &errstr) = 0;
2937+
2938+
/**
2939+
* @fixme blocking?
2940+
*/
2941+
virtual ErrorCode begin_transaction (std::string &errstr) = 0;
2942+
2943+
/**
2944+
* @fixme blocking?
2945+
*/
2946+
virtual ErrorCode send_offsets_to_transaction (
2947+
const std::vector<TopicPartition*> &offsets,
2948+
const std::string &group_id,
2949+
std::string &errstr) = 0;
2950+
2951+
/**
2952+
* @brief
2953+
*
2954+
* @fixme blocking?
2955+
*/
2956+
virtual ErrorCode commit_transaction (int timeout_ms,
2957+
std::string &errstr) = 0;
2958+
2959+
/**
2960+
* @brief
2961+
*
2962+
* @fixme blocking?
2963+
*/
2964+
virtual ErrorCode abort_transaction (int timeout_ms,
2965+
std::string &errstr) = 0;
29182966
};
29192967

29202968
/**@}*/

src-cpp/rdkafkacpp_int.h

+69
Original file line numberDiff line numberDiff line change
@@ -1241,6 +1241,75 @@ class ProducerImpl : virtual public Producer, virtual public HandleImpl {
12411241
(int)purge_flags));
12421242
}
12431243

1244+
ErrorCode init_transactions (int timeout_ms, std::string &errstr) {
1245+
rd_kafka_resp_err_t c_err;
1246+
char errbuf[512];
1247+
1248+
c_err = rd_kafka_init_transactions(rk_, timeout_ms,
1249+
errbuf, sizeof(errbuf));
1250+
if (c_err)
1251+
errstr = errbuf;
1252+
1253+
return static_cast<ErrorCode>(c_err);
1254+
}
1255+
1256+
ErrorCode begin_transaction (std::string &errstr) {
1257+
rd_kafka_resp_err_t c_err;
1258+
char errbuf[512];
1259+
1260+
c_err = rd_kafka_begin_transaction(rk_, errbuf, sizeof(errbuf));
1261+
if (c_err)
1262+
errstr = errbuf;
1263+
1264+
return static_cast<ErrorCode>(c_err);
1265+
}
1266+
1267+
ErrorCode send_offsets_to_transaction (
1268+
const std::vector<TopicPartition*> &offsets,
1269+
const std::string &group_id,
1270+
std::string &errstr) {
1271+
rd_kafka_resp_err_t c_err;
1272+
char errbuf[512];
1273+
rd_kafka_topic_partition_list_t *c_offsets = partitions_to_c_parts(offsets);
1274+
1275+
c_err = rd_kafka_send_offsets_to_transaction(rk_, c_offsets,
1276+
group_id.c_str(),
1277+
errbuf, sizeof(errbuf));
1278+
1279+
rd_kafka_topic_partition_list_destroy(c_offsets);
1280+
1281+
if (c_err)
1282+
errstr = errbuf;
1283+
1284+
return static_cast<ErrorCode>(c_err);
1285+
1286+
}
1287+
1288+
ErrorCode commit_transaction (int timeout_ms, std::string &errstr) {
1289+
rd_kafka_resp_err_t c_err;
1290+
char errbuf[512];
1291+
1292+
c_err = rd_kafka_commit_transaction(rk_, timeout_ms,
1293+
errbuf, sizeof(errbuf));
1294+
if (c_err)
1295+
errstr = errbuf;
1296+
1297+
return static_cast<ErrorCode>(c_err);
1298+
1299+
}
1300+
1301+
ErrorCode abort_transaction (int timeout_ms, std::string &errstr) {
1302+
rd_kafka_resp_err_t c_err;
1303+
char errbuf[512];
1304+
1305+
c_err = rd_kafka_abort_transaction(rk_, timeout_ms, errbuf, sizeof(errbuf));
1306+
if (c_err)
1307+
errstr = errbuf;
1308+
1309+
return static_cast<ErrorCode>(c_err);
1310+
1311+
}
1312+
12441313
static Producer *create (Conf *conf, std::string &errstr);
12451314

12461315
};

src/CMakeLists.txt

+1
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ set(
4242
rdkafka_background.c
4343
rdkafka_idempotence.c
4444
rdkafka_cert.c
45+
rdkafka_coord.c
4546
rdkafka_mock.c
4647
rdkafka_mock_handlers.c
4748
rdlist.c

src/Makefile

+1
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ SRCS= rdkafka.c rdkafka_broker.c rdkafka_msg.c rdkafka_topic.c \
4747
rdkafka_msgset_writer.c rdkafka_msgset_reader.c \
4848
rdkafka_header.c rdkafka_admin.c rdkafka_aux.c \
4949
rdkafka_background.c rdkafka_idempotence.c rdkafka_cert.c \
50+
rdkafka_txnmgr.c rdkafka_coord.c \
5051
rdvarint.c rdbuf.c rdunittest.c \
5152
rdkafka_mock.c rdkafka_mock_handlers.c \
5253
$(SRCS_y)

src/rd.h

+5-3
Original file line numberDiff line numberDiff line change
@@ -344,20 +344,21 @@ static RD_INLINE RD_UNUSED int rd_refcnt_get (rd_refcnt_t *R) {
344344
} while (0)
345345

346346
#if ENABLE_REFCNT_DEBUG
347-
#define rd_refcnt_add(R) \
347+
#define rd_refcnt_add_fl(FUNC,LINE,R) \
348348
( \
349349
printf("REFCNT DEBUG: %-35s %d +1: %16p: %s:%d\n", \
350-
#R, rd_refcnt_get(R), (R), __FUNCTION__,__LINE__), \
350+
#R, rd_refcnt_get(R), (R), (FUNC), (LINE)), \
351351
rd_refcnt_add0(R) \
352352
)
353353

354+
#define rd_refcnt_add(R) rd_refcnt_add_fl(__FUNCTION__, __LINE__, (R))
355+
354356
#define rd_refcnt_add2(R,WHAT) do { \
355357
printf("REFCNT DEBUG: %-35s %d +1: %16p: %16s: %s:%d\n", \
356358
#R, rd_refcnt_get(R), (R), WHAT, __FUNCTION__,__LINE__), \
357359
rd_refcnt_add0(R); \
358360
} while (0)
359361

360-
361362
#define rd_refcnt_sub2(R,WHAT) ( \
362363
printf("REFCNT DEBUG: %-35s %d -1: %16p: %16s: %s:%d\n", \
363364
#R, rd_refcnt_get(R), (R), WHAT, __FUNCTION__,__LINE__), \
@@ -369,6 +370,7 @@ static RD_INLINE RD_UNUSED int rd_refcnt_get (rd_refcnt_t *R) {
369370
rd_refcnt_sub0(R) )
370371

371372
#else
373+
#define rd_refcnt_add_fl(FUNC,LINE,R) rd_refcnt_add0(R)
372374
#define rd_refcnt_add(R) rd_refcnt_add0(R)
373375
#define rd_refcnt_sub(R) rd_refcnt_sub0(R)
374376
#endif

0 commit comments

Comments
 (0)