Skip to content

Commit

Permalink
Topic partition and Uuid additional common functions (#4621)
Browse files Browse the repository at this point in the history
  • Loading branch information
emasab authored Mar 18, 2024
1 parent 91a423a commit 267367c
Show file tree
Hide file tree
Showing 6 changed files with 247 additions and 1 deletion.
12 changes: 12 additions & 0 deletions src/rdkafka.c
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
#endif

#include "rdtime.h"
#include "rdmap.h"
#include "crc32c.h"
#include "rdunittest.h"

Expand Down Expand Up @@ -5178,6 +5179,17 @@ const char *rd_kafka_Uuid_base64str(const rd_kafka_Uuid_t *uuid) {
return uuid->base64str;
}

unsigned int rd_kafka_Uuid_hash(const rd_kafka_Uuid_t *uuid) {
unsigned char bytes[16];
memcpy(bytes, &uuid->most_significant_bits, 8);
memcpy(&bytes[8], &uuid->least_significant_bits, 8);
return rd_bytes_hash(bytes, 16);
}

unsigned int rd_kafka_Uuid_map_hash(const void *key) {
return rd_kafka_Uuid_hash(key);
}

int64_t rd_kafka_Uuid_least_significant_bits(const rd_kafka_Uuid_t *uuid) {
return uuid->least_significant_bits;
}
Expand Down
181 changes: 180 additions & 1 deletion src/rdkafka_partition.c
Original file line number Diff line number Diff line change
Expand Up @@ -3087,6 +3087,12 @@ int rd_kafka_topic_partition_by_id_cmp(const void *_a, const void *_b) {
return are_topic_ids_different || RD_CMP(a->partition, b->partition);
}

static int rd_kafka_topic_partition_by_id_cmp_opaque(const void *_a,
const void *_b,
void *opaque) {
return rd_kafka_topic_partition_by_id_cmp(_a, _b);
}

/** @brief Compare only the topic */
int rd_kafka_topic_partition_cmp_topic(const void *_a, const void *_b) {
const rd_kafka_topic_partition_t *a = _a;
Expand All @@ -3100,13 +3106,22 @@ static int rd_kafka_topic_partition_cmp_opaque(const void *_a,
return rd_kafka_topic_partition_cmp(_a, _b);
}

/** @returns a hash of the topic and partition */
/** @returns a hash of the topic name and partition */
unsigned int rd_kafka_topic_partition_hash(const void *_a) {
const rd_kafka_topic_partition_t *a = _a;
int r = 31 * 17 + a->partition;
return 31 * r + rd_string_hash(a->topic, -1);
}

/** @returns a hash of the topic id and partition */
unsigned int rd_kafka_topic_partition_hash_by_id(const void *_a) {
const rd_kafka_topic_partition_t *a = _a;
const rd_kafka_Uuid_t topic_id =
rd_kafka_topic_partition_get_topic_id(a);
int r = 31 * 17 + a->partition;
return 31 * r + rd_kafka_Uuid_hash(&topic_id);
}



/**
Expand Down Expand Up @@ -3313,6 +3328,12 @@ void rd_kafka_topic_partition_list_sort_by_topic(
rktparlist, rd_kafka_topic_partition_cmp_opaque, NULL);
}

void rd_kafka_topic_partition_list_sort_by_topic_id(
rd_kafka_topic_partition_list_t *rktparlist) {
rd_kafka_topic_partition_list_sort(
rktparlist, rd_kafka_topic_partition_by_id_cmp_opaque, NULL);
}

rd_kafka_resp_err_t rd_kafka_topic_partition_list_set_offset(
rd_kafka_topic_partition_list_t *rktparlist,
const char *topic,
Expand Down Expand Up @@ -4479,3 +4500,161 @@ const char *rd_kafka_fetch_pos2str(const rd_kafka_fetch_pos_t fetchpos) {

return ret[idx];
}

typedef RD_MAP_TYPE(const rd_kafka_topic_partition_t *,
void *) map_toppar_void_t;

/**
* @brief Calculates \p a ∩ \p b using \p cmp and \p hash .
* Ordered following \p a order. Elements are copied from \p a.
*/
static rd_kafka_topic_partition_list_t *
rd_kafka_topic_partition_list_intersection0(
rd_kafka_topic_partition_list_t *a,
rd_kafka_topic_partition_list_t *b,
int(cmp)(const void *_a, const void *_b),
unsigned int(hash)(const void *_a)) {
rd_kafka_topic_partition_t *rktpar;
rd_kafka_topic_partition_list_t *ret =
rd_kafka_topic_partition_list_new(a->cnt < b->cnt ? a->cnt
: b->cnt);
map_toppar_void_t b_map =
RD_MAP_INITIALIZER(b->cnt, cmp, hash, NULL, NULL);
RD_KAFKA_TPLIST_FOREACH(rktpar, b) {
RD_MAP_SET(&b_map, rktpar, rktpar);
}
RD_KAFKA_TPLIST_FOREACH(rktpar, a) {
if ((RD_MAP_GET(&b_map, rktpar) != NULL) == 1) {
rd_kafka_topic_partition_list_add_copy(ret, rktpar);
}
}
RD_MAP_DESTROY(&b_map);
return ret;
}

/**
* @brief Calculates \p a - \p b using \p cmp and \p hash .
* Ordered following \p a order. Elements are copied from \p a.
*/
static rd_kafka_topic_partition_list_t *
rd_kafka_topic_partition_list_difference0(rd_kafka_topic_partition_list_t *a,
rd_kafka_topic_partition_list_t *b,
int(cmp)(const void *_a,
const void *_b),
unsigned int(hash)(const void *_a)) {
rd_kafka_topic_partition_t *rktpar;
rd_kafka_topic_partition_list_t *ret =
rd_kafka_topic_partition_list_new(a->cnt);
map_toppar_void_t b_map =
RD_MAP_INITIALIZER(b->cnt, cmp, hash, NULL, NULL);
RD_KAFKA_TPLIST_FOREACH(rktpar, b) {
RD_MAP_SET(&b_map, rktpar, rktpar);
}
RD_KAFKA_TPLIST_FOREACH(rktpar, a) {
if ((RD_MAP_GET(&b_map, rktpar) != NULL) == 0) {
rd_kafka_topic_partition_list_add_copy(ret, rktpar);
}
}
RD_MAP_DESTROY(&b_map);
return ret;
}

/**
* @brief Calculates \p a ∪ \p b using \p cmp and \p hash .
* Ordered following \p a order for elements in \p a
* and \p b order for elements only in \p b.
* Elements are copied the same way.
*/
static rd_kafka_topic_partition_list_t *
rd_kafka_topic_partition_list_union0(rd_kafka_topic_partition_list_t *a,
rd_kafka_topic_partition_list_t *b,
int(cmp)(const void *_a, const void *_b),
unsigned int(hash)(const void *_a)) {

rd_kafka_topic_partition_list_t *b_minus_a =
rd_kafka_topic_partition_list_difference0(b, a, cmp, hash);
rd_kafka_topic_partition_list_t *ret =
rd_kafka_topic_partition_list_new(a->cnt + b_minus_a->cnt);

rd_kafka_topic_partition_list_add_list(ret, a);
rd_kafka_topic_partition_list_add_list(ret, b_minus_a);

rd_kafka_topic_partition_list_destroy(b_minus_a);
return ret;
}

/**
* @brief Calculates \p a ∩ \p b using topic name and partition id.
* Ordered following \p a order. Elements are copied from \p a.
*/
rd_kafka_topic_partition_list_t *
rd_kafka_topic_partition_list_intersection_by_name(
rd_kafka_topic_partition_list_t *a,
rd_kafka_topic_partition_list_t *b) {
return rd_kafka_topic_partition_list_intersection0(
a, b, rd_kafka_topic_partition_cmp, rd_kafka_topic_partition_hash);
}

/**
* @brief Calculates \p a - \p b using topic name and partition id.
* Ordered following \p a order. Elements are copied from \p a.
*/
rd_kafka_topic_partition_list_t *
rd_kafka_topic_partition_list_difference_by_name(
rd_kafka_topic_partition_list_t *a,
rd_kafka_topic_partition_list_t *b) {
return rd_kafka_topic_partition_list_difference0(
a, b, rd_kafka_topic_partition_cmp, rd_kafka_topic_partition_hash);
}

/**
* @brief Calculates \p a ∪ \p b using topic name and partition id.
* Ordered following \p a order for elements in \p a
* and \p b order for elements only in \p b.
* Elements are copied the same way.
*/
rd_kafka_topic_partition_list_t *rd_kafka_topic_partition_list_union_by_name(
rd_kafka_topic_partition_list_t *a,
rd_kafka_topic_partition_list_t *b) {
return rd_kafka_topic_partition_list_union0(
a, b, rd_kafka_topic_partition_cmp, rd_kafka_topic_partition_hash);
}

/**
* @brief Calculates \p a ∩ \p b using topic id and partition id.
* Ordered following \p a order. Elements are copied from \p a.
*/
rd_kafka_topic_partition_list_t *
rd_kafka_topic_partition_list_intersection_by_id(
rd_kafka_topic_partition_list_t *a,
rd_kafka_topic_partition_list_t *b) {
return rd_kafka_topic_partition_list_intersection0(
a, b, rd_kafka_topic_partition_by_id_cmp,
rd_kafka_topic_partition_hash_by_id);
}

/**
* @brief Calculates \p a - \p b using topic id and partition id.
* Ordered following \p a order. Elements are copied from \p a.
*/
rd_kafka_topic_partition_list_t *rd_kafka_topic_partition_list_difference_by_id(
rd_kafka_topic_partition_list_t *a,
rd_kafka_topic_partition_list_t *b) {
return rd_kafka_topic_partition_list_difference0(
a, b, rd_kafka_topic_partition_by_id_cmp,
rd_kafka_topic_partition_hash_by_id);
}

/**
* @brief Calculates \p a ∪ \p b using topic id and partition id.
* Ordered following \p a order for elements in \p a
* and \p b order for elements only in \p b.
* Elements are copied the same way.
*/
rd_kafka_topic_partition_list_t *
rd_kafka_topic_partition_list_union_by_id(rd_kafka_topic_partition_list_t *a,
rd_kafka_topic_partition_list_t *b) {
return rd_kafka_topic_partition_list_union0(
a, b, rd_kafka_topic_partition_by_id_cmp,
rd_kafka_topic_partition_hash_by_id);
}
30 changes: 30 additions & 0 deletions src/rdkafka_partition.h
Original file line number Diff line number Diff line change
Expand Up @@ -773,6 +773,9 @@ rd_kafka_topic_partition_t *rd_kafka_topic_partition_list_find_topic(
void rd_kafka_topic_partition_list_sort_by_topic(
rd_kafka_topic_partition_list_t *rktparlist);

void rd_kafka_topic_partition_list_sort_by_topic_id(
rd_kafka_topic_partition_list_t *rktparlist);

void rd_kafka_topic_partition_list_reset_offsets(
rd_kafka_topic_partition_list_t *rktparlist,
int64_t offset);
Expand Down Expand Up @@ -1122,4 +1125,31 @@ static RD_UNUSED RD_INLINE void rd_kafka_toppar_set_offset_validation_position(
rktp->rktp_offset_validation_pos = offset_validation_pos;
}

rd_kafka_topic_partition_list_t *
rd_kafka_topic_partition_list_intersection_by_name(
rd_kafka_topic_partition_list_t *a,
rd_kafka_topic_partition_list_t *b);

rd_kafka_topic_partition_list_t *
rd_kafka_topic_partition_list_difference_by_name(
rd_kafka_topic_partition_list_t *a,
rd_kafka_topic_partition_list_t *b);

rd_kafka_topic_partition_list_t *
rd_kafka_topic_partition_list_union_by_name(rd_kafka_topic_partition_list_t *a,
rd_kafka_topic_partition_list_t *b);

rd_kafka_topic_partition_list_t *
rd_kafka_topic_partition_list_intersection_by_id(
rd_kafka_topic_partition_list_t *a,
rd_kafka_topic_partition_list_t *b);

rd_kafka_topic_partition_list_t *rd_kafka_topic_partition_list_difference_by_id(
rd_kafka_topic_partition_list_t *a,
rd_kafka_topic_partition_list_t *b);

rd_kafka_topic_partition_list_t *
rd_kafka_topic_partition_list_union_by_id(rd_kafka_topic_partition_list_t *a,
rd_kafka_topic_partition_list_t *b);

#endif /* _RDKAFKA_PARTITION_H_ */
4 changes: 4 additions & 0 deletions src/rdkafka_proto.h
Original file line number Diff line number Diff line change
Expand Up @@ -603,6 +603,10 @@ rd_kafka_Uuid_t rd_kafka_Uuid_random();

const char *rd_kafka_Uuid_str(const rd_kafka_Uuid_t *uuid);

unsigned int rd_kafka_Uuid_hash(const rd_kafka_Uuid_t *uuid);

unsigned int rd_kafka_Uuid_map_hash(const void *key);

/**
* @name Producer ID and Epoch for the Idempotent Producer
* @{
Expand Down
16 changes: 16 additions & 0 deletions src/rdmap.c
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
* librdkafka - The Apache Kafka C/C++ library
*
* Copyright (c) 2020-2022, Magnus Edenhill
* 2023, Confluent Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
Expand Down Expand Up @@ -237,6 +238,21 @@ unsigned int rd_map_str_hash(const void *key) {
}


/**
* @returns a djb2 hash of \p bytes.
*
* @param len \p bytes will be hashed up to \p len.
*/
unsigned int rd_bytes_hash(unsigned char *bytes, size_t len) {
unsigned int hash = 5381;
size_t i;

for (i = 0; i < len; i++)
hash = ((hash << 5) + hash) + bytes[i];

return hash;
}


/**
* @name Unit tests
Expand Down
5 changes: 5 additions & 0 deletions src/rdmap.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
* librdkafka - The Apache Kafka C/C++ library
*
* Copyright (c) 2020-2022, Magnus Edenhill
* 2023, Confluent Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
Expand Down Expand Up @@ -249,6 +250,10 @@ int rd_map_str_cmp(const void *a, const void *b);
*/
unsigned int rd_map_str_hash(const void *a);

/**
* @brief Bytes hash function (djb2).
*/
unsigned int rd_bytes_hash(unsigned char *bytes, size_t len);


/**
Expand Down

0 comments on commit 267367c

Please sign in to comment.