diff --git a/src/rdkafka.c b/src/rdkafka.c index 7427fa171b..566d2e065d 100644 --- a/src/rdkafka.c +++ b/src/rdkafka.c @@ -64,6 +64,7 @@ #endif #include "rdtime.h" +#include "rdmap.h" #include "crc32c.h" #include "rdunittest.h" @@ -5178,6 +5179,17 @@ const char *rd_kafka_Uuid_base64str(const rd_kafka_Uuid_t *uuid) { return uuid->base64str; } +unsigned int rd_kafka_Uuid_hash(const rd_kafka_Uuid_t *uuid) { + unsigned char bytes[16]; + memcpy(bytes, &uuid->most_significant_bits, 8); + memcpy(&bytes[8], &uuid->least_significant_bits, 8); + return rd_bytes_hash(bytes, 16); +} + +unsigned int rd_kafka_Uuid_map_hash(const void *key) { + return rd_kafka_Uuid_hash(key); +} + int64_t rd_kafka_Uuid_least_significant_bits(const rd_kafka_Uuid_t *uuid) { return uuid->least_significant_bits; } diff --git a/src/rdkafka_partition.c b/src/rdkafka_partition.c index 357c137db8..f30249f2e3 100644 --- a/src/rdkafka_partition.c +++ b/src/rdkafka_partition.c @@ -3087,6 +3087,12 @@ int rd_kafka_topic_partition_by_id_cmp(const void *_a, const void *_b) { return are_topic_ids_different || RD_CMP(a->partition, b->partition); } +static int rd_kafka_topic_partition_by_id_cmp_opaque(const void *_a, + const void *_b, + void *opaque) { + return rd_kafka_topic_partition_by_id_cmp(_a, _b); +} + /** @brief Compare only the topic */ int rd_kafka_topic_partition_cmp_topic(const void *_a, const void *_b) { const rd_kafka_topic_partition_t *a = _a; @@ -3100,13 +3106,22 @@ static int rd_kafka_topic_partition_cmp_opaque(const void *_a, return rd_kafka_topic_partition_cmp(_a, _b); } -/** @returns a hash of the topic and partition */ +/** @returns a hash of the topic name and partition */ unsigned int rd_kafka_topic_partition_hash(const void *_a) { const rd_kafka_topic_partition_t *a = _a; int r = 31 * 17 + a->partition; return 31 * r + rd_string_hash(a->topic, -1); } +/** @returns a hash of the topic id and partition */ +unsigned int rd_kafka_topic_partition_hash_by_id(const void *_a) { + const rd_kafka_topic_partition_t *a = _a; + const rd_kafka_Uuid_t topic_id = + rd_kafka_topic_partition_get_topic_id(a); + int r = 31 * 17 + a->partition; + return 31 * r + rd_kafka_Uuid_hash(&topic_id); +} + /** @@ -3313,6 +3328,12 @@ void rd_kafka_topic_partition_list_sort_by_topic( rktparlist, rd_kafka_topic_partition_cmp_opaque, NULL); } +void rd_kafka_topic_partition_list_sort_by_topic_id( + rd_kafka_topic_partition_list_t *rktparlist) { + rd_kafka_topic_partition_list_sort( + rktparlist, rd_kafka_topic_partition_by_id_cmp_opaque, NULL); +} + rd_kafka_resp_err_t rd_kafka_topic_partition_list_set_offset( rd_kafka_topic_partition_list_t *rktparlist, const char *topic, @@ -4479,3 +4500,161 @@ const char *rd_kafka_fetch_pos2str(const rd_kafka_fetch_pos_t fetchpos) { return ret[idx]; } + +typedef RD_MAP_TYPE(const rd_kafka_topic_partition_t *, + void *) map_toppar_void_t; + +/** + * @brief Calculates \p a ∩ \p b using \p cmp and \p hash . + * Ordered following \p a order. Elements are copied from \p a. + */ +static rd_kafka_topic_partition_list_t * +rd_kafka_topic_partition_list_intersection0( + rd_kafka_topic_partition_list_t *a, + rd_kafka_topic_partition_list_t *b, + int(cmp)(const void *_a, const void *_b), + unsigned int(hash)(const void *_a)) { + rd_kafka_topic_partition_t *rktpar; + rd_kafka_topic_partition_list_t *ret = + rd_kafka_topic_partition_list_new(a->cnt < b->cnt ? a->cnt + : b->cnt); + map_toppar_void_t b_map = + RD_MAP_INITIALIZER(b->cnt, cmp, hash, NULL, NULL); + RD_KAFKA_TPLIST_FOREACH(rktpar, b) { + RD_MAP_SET(&b_map, rktpar, rktpar); + } + RD_KAFKA_TPLIST_FOREACH(rktpar, a) { + if ((RD_MAP_GET(&b_map, rktpar) != NULL) == 1) { + rd_kafka_topic_partition_list_add_copy(ret, rktpar); + } + } + RD_MAP_DESTROY(&b_map); + return ret; +} + +/** + * @brief Calculates \p a - \p b using \p cmp and \p hash . + * Ordered following \p a order. Elements are copied from \p a. + */ +static rd_kafka_topic_partition_list_t * +rd_kafka_topic_partition_list_difference0(rd_kafka_topic_partition_list_t *a, + rd_kafka_topic_partition_list_t *b, + int(cmp)(const void *_a, + const void *_b), + unsigned int(hash)(const void *_a)) { + rd_kafka_topic_partition_t *rktpar; + rd_kafka_topic_partition_list_t *ret = + rd_kafka_topic_partition_list_new(a->cnt); + map_toppar_void_t b_map = + RD_MAP_INITIALIZER(b->cnt, cmp, hash, NULL, NULL); + RD_KAFKA_TPLIST_FOREACH(rktpar, b) { + RD_MAP_SET(&b_map, rktpar, rktpar); + } + RD_KAFKA_TPLIST_FOREACH(rktpar, a) { + if ((RD_MAP_GET(&b_map, rktpar) != NULL) == 0) { + rd_kafka_topic_partition_list_add_copy(ret, rktpar); + } + } + RD_MAP_DESTROY(&b_map); + return ret; +} + +/** + * @brief Calculates \p a ∪ \p b using \p cmp and \p hash . + * Ordered following \p a order for elements in \p a + * and \p b order for elements only in \p b. + * Elements are copied the same way. + */ +static rd_kafka_topic_partition_list_t * +rd_kafka_topic_partition_list_union0(rd_kafka_topic_partition_list_t *a, + rd_kafka_topic_partition_list_t *b, + int(cmp)(const void *_a, const void *_b), + unsigned int(hash)(const void *_a)) { + + rd_kafka_topic_partition_list_t *b_minus_a = + rd_kafka_topic_partition_list_difference0(b, a, cmp, hash); + rd_kafka_topic_partition_list_t *ret = + rd_kafka_topic_partition_list_new(a->cnt + b_minus_a->cnt); + + rd_kafka_topic_partition_list_add_list(ret, a); + rd_kafka_topic_partition_list_add_list(ret, b_minus_a); + + rd_kafka_topic_partition_list_destroy(b_minus_a); + return ret; +} + +/** + * @brief Calculates \p a ∩ \p b using topic name and partition id. + * Ordered following \p a order. Elements are copied from \p a. + */ +rd_kafka_topic_partition_list_t * +rd_kafka_topic_partition_list_intersection_by_name( + rd_kafka_topic_partition_list_t *a, + rd_kafka_topic_partition_list_t *b) { + return rd_kafka_topic_partition_list_intersection0( + a, b, rd_kafka_topic_partition_cmp, rd_kafka_topic_partition_hash); +} + +/** + * @brief Calculates \p a - \p b using topic name and partition id. + * Ordered following \p a order. Elements are copied from \p a. + */ +rd_kafka_topic_partition_list_t * +rd_kafka_topic_partition_list_difference_by_name( + rd_kafka_topic_partition_list_t *a, + rd_kafka_topic_partition_list_t *b) { + return rd_kafka_topic_partition_list_difference0( + a, b, rd_kafka_topic_partition_cmp, rd_kafka_topic_partition_hash); +} + +/** + * @brief Calculates \p a ∪ \p b using topic name and partition id. + * Ordered following \p a order for elements in \p a + * and \p b order for elements only in \p b. + * Elements are copied the same way. + */ +rd_kafka_topic_partition_list_t *rd_kafka_topic_partition_list_union_by_name( + rd_kafka_topic_partition_list_t *a, + rd_kafka_topic_partition_list_t *b) { + return rd_kafka_topic_partition_list_union0( + a, b, rd_kafka_topic_partition_cmp, rd_kafka_topic_partition_hash); +} + +/** + * @brief Calculates \p a ∩ \p b using topic id and partition id. + * Ordered following \p a order. Elements are copied from \p a. + */ +rd_kafka_topic_partition_list_t * +rd_kafka_topic_partition_list_intersection_by_id( + rd_kafka_topic_partition_list_t *a, + rd_kafka_topic_partition_list_t *b) { + return rd_kafka_topic_partition_list_intersection0( + a, b, rd_kafka_topic_partition_by_id_cmp, + rd_kafka_topic_partition_hash_by_id); +} + +/** + * @brief Calculates \p a - \p b using topic id and partition id. + * Ordered following \p a order. Elements are copied from \p a. + */ +rd_kafka_topic_partition_list_t *rd_kafka_topic_partition_list_difference_by_id( + rd_kafka_topic_partition_list_t *a, + rd_kafka_topic_partition_list_t *b) { + return rd_kafka_topic_partition_list_difference0( + a, b, rd_kafka_topic_partition_by_id_cmp, + rd_kafka_topic_partition_hash_by_id); +} + +/** + * @brief Calculates \p a ∪ \p b using topic id and partition id. + * Ordered following \p a order for elements in \p a + * and \p b order for elements only in \p b. + * Elements are copied the same way. + */ +rd_kafka_topic_partition_list_t * +rd_kafka_topic_partition_list_union_by_id(rd_kafka_topic_partition_list_t *a, + rd_kafka_topic_partition_list_t *b) { + return rd_kafka_topic_partition_list_union0( + a, b, rd_kafka_topic_partition_by_id_cmp, + rd_kafka_topic_partition_hash_by_id); +} diff --git a/src/rdkafka_partition.h b/src/rdkafka_partition.h index 56b4a76138..cdb023d87a 100644 --- a/src/rdkafka_partition.h +++ b/src/rdkafka_partition.h @@ -773,6 +773,9 @@ rd_kafka_topic_partition_t *rd_kafka_topic_partition_list_find_topic( void rd_kafka_topic_partition_list_sort_by_topic( rd_kafka_topic_partition_list_t *rktparlist); +void rd_kafka_topic_partition_list_sort_by_topic_id( + rd_kafka_topic_partition_list_t *rktparlist); + void rd_kafka_topic_partition_list_reset_offsets( rd_kafka_topic_partition_list_t *rktparlist, int64_t offset); @@ -1122,4 +1125,31 @@ static RD_UNUSED RD_INLINE void rd_kafka_toppar_set_offset_validation_position( rktp->rktp_offset_validation_pos = offset_validation_pos; } +rd_kafka_topic_partition_list_t * +rd_kafka_topic_partition_list_intersection_by_name( + rd_kafka_topic_partition_list_t *a, + rd_kafka_topic_partition_list_t *b); + +rd_kafka_topic_partition_list_t * +rd_kafka_topic_partition_list_difference_by_name( + rd_kafka_topic_partition_list_t *a, + rd_kafka_topic_partition_list_t *b); + +rd_kafka_topic_partition_list_t * +rd_kafka_topic_partition_list_union_by_name(rd_kafka_topic_partition_list_t *a, + rd_kafka_topic_partition_list_t *b); + +rd_kafka_topic_partition_list_t * +rd_kafka_topic_partition_list_intersection_by_id( + rd_kafka_topic_partition_list_t *a, + rd_kafka_topic_partition_list_t *b); + +rd_kafka_topic_partition_list_t *rd_kafka_topic_partition_list_difference_by_id( + rd_kafka_topic_partition_list_t *a, + rd_kafka_topic_partition_list_t *b); + +rd_kafka_topic_partition_list_t * +rd_kafka_topic_partition_list_union_by_id(rd_kafka_topic_partition_list_t *a, + rd_kafka_topic_partition_list_t *b); + #endif /* _RDKAFKA_PARTITION_H_ */ diff --git a/src/rdkafka_proto.h b/src/rdkafka_proto.h index ee392a6a38..cf4153f03d 100644 --- a/src/rdkafka_proto.h +++ b/src/rdkafka_proto.h @@ -603,6 +603,10 @@ rd_kafka_Uuid_t rd_kafka_Uuid_random(); const char *rd_kafka_Uuid_str(const rd_kafka_Uuid_t *uuid); +unsigned int rd_kafka_Uuid_hash(const rd_kafka_Uuid_t *uuid); + +unsigned int rd_kafka_Uuid_map_hash(const void *key); + /** * @name Producer ID and Epoch for the Idempotent Producer * @{ diff --git a/src/rdmap.c b/src/rdmap.c index 8e1a0546cc..1e82bcb9a2 100644 --- a/src/rdmap.c +++ b/src/rdmap.c @@ -2,6 +2,7 @@ * librdkafka - The Apache Kafka C/C++ library * * Copyright (c) 2020-2022, Magnus Edenhill + * 2023, Confluent Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -237,6 +238,21 @@ unsigned int rd_map_str_hash(const void *key) { } +/** + * @returns a djb2 hash of \p bytes. + * + * @param len \p bytes will be hashed up to \p len. + */ +unsigned int rd_bytes_hash(unsigned char *bytes, size_t len) { + unsigned int hash = 5381; + size_t i; + + for (i = 0; i < len; i++) + hash = ((hash << 5) + hash) + bytes[i]; + + return hash; +} + /** * @name Unit tests diff --git a/src/rdmap.h b/src/rdmap.h index bea8a1aca6..b8e3feb97b 100644 --- a/src/rdmap.h +++ b/src/rdmap.h @@ -2,6 +2,7 @@ * librdkafka - The Apache Kafka C/C++ library * * Copyright (c) 2020-2022, Magnus Edenhill + * 2023, Confluent Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -249,6 +250,10 @@ int rd_map_str_cmp(const void *a, const void *b); */ unsigned int rd_map_str_hash(const void *a); +/** + * @brief Bytes hash function (djb2). + */ +unsigned int rd_bytes_hash(unsigned char *bytes, size_t len); /**