Skip to content

Commit

Permalink
Merge pull request #73 from confluentinc/dev_electLeaders
Browse files Browse the repository at this point in the history
Dev elect leaders
  • Loading branch information
PratRanj07 authored Oct 9, 2024
2 parents 25db856 + c62e9cf commit 50f6d1e
Show file tree
Hide file tree
Showing 18 changed files with 1,188 additions and 6 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ librdkafka v2.6.0 is a feature release:
of given types.
* Fix for permanent fetch errors when using a newer Fetch RPC version with an older
inter broker protocol (#4806).
* [KIP-460](https://cwiki.apache.org/confluence/display/KAFKA/KIP-460%3A+Admin+Leader+Election+RPC) Admin Leader Election RPC (#4845)


## Fixes
Expand Down
3 changes: 2 additions & 1 deletion INTRODUCTION.md
Original file line number Diff line number Diff line change
Expand Up @@ -2024,7 +2024,7 @@ The [Apache Kafka Implementation Proposals (KIPs)](https://cwiki.apache.org/conf
| KIP-436 - Start time in stats | 2.3.0 | Supported |
| KIP-447 - Producer scalability for EOS | 2.5.0 | Supported |
| KIP-455 - AdminAPI: Replica assignment | 2.4.0 (WIP) | Not supported |
| KIP-460 - AdminAPI: electPreferredLeader | 2.4.0 | Not supported |
| KIP-460 - AdminAPI: electLeaders | 2.6.0 | Supported |
| KIP-464 - AdminAPI: defaults for createTopics | 2.4.0 | Supported |
| KIP-467 - Per-message (sort of) error codes in ProduceResponse | 2.4.0 | Supported |
| KIP-480 - Sticky partitioner | 2.4.0 | Supported |
Expand Down Expand Up @@ -2102,6 +2102,7 @@ release of librdkafka.
| 36 | SaslAuthenticate | 2 | 1 |
| 37 | CreatePartitions | 3 | 0 |
| 42 | DeleteGroups | 2 | 1 |
| 43 | ElectLeaders | 2 | 2 |
| 44 | IncrementalAlterConfigs | 1 | 1 |
| 47 | OffsetDelete | 0 | 0 |
| 50 | DescribeUserScramCredentials | 0 | 0 |
Expand Down
3 changes: 2 additions & 1 deletion examples/.gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -21,4 +21,5 @@ list_consumer_group_offsets
alter_consumer_group_offsets
incremental_alter_configs
user_scram
list_offsets
list_offsets
elect_leaders
3 changes: 3 additions & 0 deletions examples/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,9 @@ target_link_libraries(describe_cluster PUBLIC rdkafka)
add_executable(list_offsets list_offsets.c ${win32_sources})
target_link_libraries(list_offsets PUBLIC rdkafka)

add_executable(elect_leaders elect_leaders.c ${win32_sources})
target_link_libraries(elect_leaders PUBLIC rdkafka)

# The targets below has Unix include dirs and do not compile on Windows.
if(NOT WIN32)
add_executable(rdkafka_example rdkafka_example.c)
Expand Down
5 changes: 5 additions & 0 deletions examples/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ EXAMPLES ?= rdkafka_example rdkafka_performance rdkafka_example_cpp \
incremental_alter_configs \
user_scram \
list_offsets \
elect_leaders \
misc

all: $(EXAMPLES)
Expand Down Expand Up @@ -153,6 +154,10 @@ list_offsets: ../src/librdkafka.a list_offsets.c
$(CC) $(CPPFLAGS) $(CFLAGS) $@.c -o $@ $(LDFLAGS) \
../src/librdkafka.a $(LIBS)

elect_leaders: ../src/librdkafka.a elect_leaders.c
$(CC) $(CPPFLAGS) $(CFLAGS) $@.c -o $@ $(LDFLAGS) \
../src/librdkafka.a $(LIBS)

misc: ../src/librdkafka.a misc.c
$(CC) $(CPPFLAGS) $(CFLAGS) $@.c -o $@ $(LDFLAGS) \
../src/librdkafka.a $(LIBS)
Expand Down
320 changes: 320 additions & 0 deletions examples/elect_leaders.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,320 @@
/*
* librdkafka - Apache Kafka C library
*
* Copyright (c) 2024, Confluent Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* 1. Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright notice,
* this list of conditions and the following disclaimer in the documentation
* and/or other materials provided with the distribution.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SH THE COPYRIGHT OWNER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*/

/**
* Example utility that shows how to use Elect Leaders (AdminAPI)
* to trigger preffered or unclean elections for
* one or more topic partitions.
*/

#include <signal.h>
#include <string.h>
#include <stdlib.h>
#include <stdarg.h>

#ifdef _WIN32
#include "../win32/wingetopt.h"
#else
#include <getopt.h>
#endif

/* Typical include path would be <librdkafka/rdkafka.h>, but this program
* is builtin from within the librdkafka source tree and thus differs. */
#include "rdkafka.h"

const char *argv0;

static rd_kafka_queue_t *queue = NULL; /** Admin result queue.
* This is a global so we can
* yield in stop() */
static volatile sig_atomic_t run = 1;

/**
* @brief Signal termination of program
*/
static void stop(int sig) {
if (!run) {
fprintf(stderr, "%% Forced termination\n");
exit(2);
}
run = 0;
if (queue)
rd_kafka_queue_yield(queue);
}

static void usage(const char *reason, ...) {

fprintf(stderr,
"Elect Leaders usage examples\n"
"\n"
"Usage: %s <options> <election_type> "
"<topic1> <partition1> ...\n"
"\n"
"Options:\n"
" -b <brokers> Bootstrap server list to connect to.\n"
" -X <prop=val> Set librdkafka configuration property.\n"
" See CONFIGURATION.md for full list.\n"
" -d <dbg,..> Enable librdkafka debugging (%s).\n"
"\n",
argv0, rd_kafka_get_debug_contexts());

if (reason) {
va_list ap;
char reasonbuf[512];

va_start(ap, reason);
vsnprintf(reasonbuf, sizeof(reasonbuf), reason, ap);
va_end(ap);

fprintf(stderr, "ERROR: %s\n", reasonbuf);
}

exit(reason ? 1 : 0);
}


#define fatal(...) \
do { \
fprintf(stderr, "ERROR: "); \
fprintf(stderr, __VA_ARGS__); \
fprintf(stderr, "\n"); \
exit(2); \
} while (0)


/**
* @brief Set config property. Exit on failure.
*/
static void conf_set(rd_kafka_conf_t *conf, const char *name, const char *val) {
char errstr[512];

if (rd_kafka_conf_set(conf, name, val, errstr, sizeof(errstr)) !=
RD_KAFKA_CONF_OK)
fatal("Failed to set %s=%s: %s", name, val, errstr);
}

static int
print_elect_leaders_result(const rd_kafka_ElectLeaders_result_t *result) {
const rd_kafka_topic_partition_result_t **results;
size_t results_cnt;
size_t i;
const rd_kafka_ElectLeadersResult_t *res;

res = rd_kafka_ElectLeaders_result(result);

results = rd_kafka_ElectLeadersResult_partitions(res, &results_cnt);
for (i = 0; i < results_cnt; i++) {
const rd_kafka_topic_partition_t *partition =
rd_kafka_topic_partition_result_partition(results[i]);
const rd_kafka_error_t *err =
rd_kafka_topic_partition_result_error(results[i]);
if (rd_kafka_error_code(err)) {
printf("%% ElectLeaders failed for %s [%" PRId32
"] : %s\n",
partition->topic, partition->partition,
rd_kafka_error_string(err));
} else {
printf("%% ElectLeaders succeeded for %s [%" PRId32
"]\n",
partition->topic, partition->partition);
}
}

return 0;
}

/**
* @brief Parse an integer or fail.
*/
int64_t parse_int(const char *what, const char *str) {
char *end;
unsigned long n = strtoull(str, &end, 0);

if (end != str + strlen(str)) {
fprintf(stderr, "%% Invalid input for %s: %s: not an integer\n",
what, str);
exit(1);
}

return (int64_t)n;
}

static void cmd_elect_leaders(rd_kafka_conf_t *conf, int argc, char **argv) {
rd_kafka_t *rk;
char errstr[512];
rd_kafka_AdminOptions_t *options;
rd_kafka_event_t *event = NULL;
rd_kafka_topic_partition_list_t *partitions = NULL;
rd_kafka_ElectionType_t election_type;
rd_kafka_ElectLeaders_t *elect_leaders;
int i;
int retval = 0;

if ((argc - 1) % 2 != 0) {
usage("Invalid number of arguments");
}

election_type = parse_int("election_type", argv[0]);

argc--;
argv++;
if (argc > 0) {
partitions = rd_kafka_topic_partition_list_new(argc / 2);
for (i = 0; i < argc; i += 2) {
rd_kafka_topic_partition_list_add(
partitions, argv[i],
parse_int("partition", argv[i + 1]));
}
}

elect_leaders = rd_kafka_ElectLeaders_new(election_type, partitions);

if (partitions) {
rd_kafka_topic_partition_list_destroy(partitions);
}

/*
* Create consumer instance
* NOTE: rd_kafka_new() takes ownership of the conf object
* and the application must not reference it again after
* this call.
*/
rk = rd_kafka_new(RD_KAFKA_CONSUMER, conf, errstr, sizeof(errstr));
if (!rk)
fatal("Failed to create new consumer: %s", errstr);

/*
* Elect Leaders
*/
queue = rd_kafka_queue_new(rk);

/* Signal handler for clean shutdown */
signal(SIGINT, stop);


options = rd_kafka_AdminOptions_new(rk, RD_KAFKA_ADMIN_OP_ELECTLEADERS);

if (rd_kafka_AdminOptions_set_request_timeout(
options, 10 * 1000 /* 10s */, errstr, sizeof(errstr))) {
fprintf(stderr, "%% Failed to set timeout: %s\n", errstr);
goto exit;
}

if (rd_kafka_AdminOptions_set_operation_timeout(
options, 10 * 1000 /* 10s */, errstr, sizeof(errstr))) {
fprintf(stderr, "%% Failed to set operation timeout: %s\n",
errstr);
goto exit;
}

rd_kafka_ElectLeaders(rk, elect_leaders, options, queue);

rd_kafka_ElectLeaders_destroy(elect_leaders);
rd_kafka_AdminOptions_destroy(options);

/* Wait for results */
event = rd_kafka_queue_poll(queue, -1 /* indefinitely but limited by
* the request timeout set
* above (10s) */);

if (!event) {
/* User hit Ctrl-C,
* see yield call in stop() signal handler */
fprintf(stderr, "%% Cancelled by user\n");

} else if (rd_kafka_event_error(event)) {
rd_kafka_resp_err_t err = rd_kafka_event_error(event);
/* ElectLeaders request failed */
fprintf(stderr, "%% ElectLeaders failed[%" PRId32 "]: %s\n",
err, rd_kafka_event_error_string(event));
goto exit;
} else {
/* ElectLeaders request succeeded */
const rd_kafka_ElectLeaders_result_t *result;
result = rd_kafka_event_ElectLeaders_result(event);
retval = print_elect_leaders_result(result);
}


exit:
if (event)
rd_kafka_event_destroy(event);

rd_kafka_queue_destroy(queue);
/* Destroy the client instance */
rd_kafka_destroy(rk);

exit(retval);
}


int main(int argc, char **argv) {
rd_kafka_conf_t *conf; /**< Client configuration object */
int opt;
argv0 = argv[0];

/*
* Create Kafka client configuration place-holder
*/
conf = rd_kafka_conf_new();

/*
* Parse common options
*/
while ((opt = getopt(argc, argv, "b:X:d:")) != -1) {
switch (opt) {
case 'b':
conf_set(conf, "bootstrap.servers", optarg);
break;

case 'X': {
char *name = optarg, *val;

if (!(val = strchr(name, '=')))
fatal("-X expects a name=value argument");

*val = '\0';
val++;

conf_set(conf, name, val);
break;
}

case 'd':
conf_set(conf, "debug", optarg);
break;

default:
usage("Unknown option %c", (char)opt);
}
}

cmd_elect_leaders(conf, argc - optind, &argv[optind]);

return 0;
}
Loading

0 comments on commit 50f6d1e

Please sign in to comment.