Skip to content

Commit

Permalink
Uuid support in mock cluster (confluentinc#4591)
Browse files Browse the repository at this point in the history
also adds the possibility to generate pseudo-random Uuids
  • Loading branch information
emasab authored Feb 13, 2024
1 parent 6d88efd commit a6d85bd
Show file tree
Hide file tree
Showing 4 changed files with 102 additions and 2 deletions.
68 changes: 67 additions & 1 deletion src/rdkafka.c
Original file line number Diff line number Diff line change
Expand Up @@ -5077,6 +5077,38 @@ rd_kafka_Uuid_t *rd_kafka_Uuid_copy(const rd_kafka_Uuid_t *uuid) {
return copy_uuid;
}

/**
* Returns a new non cryptographically secure UUIDv4 (random).
*
* @return A UUIDv4.
*
* @remark Must be freed after use using rd_kafka_Uuid_destroy().
*/
rd_kafka_Uuid_t rd_kafka_Uuid_random() {
int i;
unsigned char rand_values_bytes[16] = {0};
uint64_t *rand_values_uint64 = (uint64_t *)rand_values_bytes;
unsigned char *rand_values_app;
rd_kafka_Uuid_t ret = RD_KAFKA_UUID_ZERO;
for (i = 0; i < 16; i += 2) {
uint16_t rand_uint16 = (uint16_t)rd_jitter(0, INT16_MAX - 1);
/* No need to convert endianess here because it's still only
* a random value. */
rand_values_app = (unsigned char *)&rand_uint16;
rand_values_bytes[i] |= rand_values_app[0];
rand_values_bytes[i + 1] |= rand_values_app[1];
}

rand_values_bytes[6] &= 0x0f; /* clear version */
rand_values_bytes[6] |= 0x40; /* version 4 */
rand_values_bytes[8] &= 0x3f; /* clear variant */
rand_values_bytes[8] |= 0x80; /* IETF variant */

ret.most_significant_bits = be64toh(rand_values_uint64[0]);
ret.least_significant_bits = be64toh(rand_values_uint64[1]);
return ret;
}

/**
* @brief Destroy the provided uuid.
*
Expand All @@ -5086,6 +5118,40 @@ void rd_kafka_Uuid_destroy(rd_kafka_Uuid_t *uuid) {
rd_free(uuid);
}

/**
* @brief Computes canonical encoding for the given uuid string.
* Mainly useful for testing.
*
* @param uuid UUID for which canonical encoding is required.
*
* @return canonical encoded string for the given UUID.
*
* @remark Must be freed after use.
*/
const char *rd_kafka_Uuid_str(const rd_kafka_Uuid_t *uuid) {
int i, j;
unsigned char bytes[16];
char *ret = rd_calloc(37, sizeof(*ret));

for (i = 0; i < 8; i++) {
#if __BYTE_ORDER == __LITTLE_ENDIAN
j = 7 - i;
#elif __BYTE_ORDER == __BIG_ENDIAN
j = i;
#endif
bytes[i] = (uuid->most_significant_bits >> (8 * j)) & 0xFF;
bytes[8 + i] = (uuid->least_significant_bits >> (8 * j)) & 0xFF;
}

rd_snprintf(ret, 37,
"%02x%02x%02x%02x-%02x%02x-%02x%02x-%02x%02x-%02x%02x%02x%"
"02x%02x%02x",
bytes[0], bytes[1], bytes[2], bytes[3], bytes[4], bytes[5],
bytes[6], bytes[7], bytes[8], bytes[9], bytes[10],
bytes[11], bytes[12], bytes[13], bytes[14], bytes[15]);
return ret;
}

const char *rd_kafka_Uuid_base64str(const rd_kafka_Uuid_t *uuid) {
if (*uuid->base64str)
return uuid->base64str;
Expand Down Expand Up @@ -5119,4 +5185,4 @@ int64_t rd_kafka_Uuid_least_significant_bits(const rd_kafka_Uuid_t *uuid) {

int64_t rd_kafka_Uuid_most_significant_bits(const rd_kafka_Uuid_t *uuid) {
return uuid->most_significant_bits;
}
}
26 changes: 25 additions & 1 deletion src/rdkafka_mock.c
Original file line number Diff line number Diff line change
Expand Up @@ -618,7 +618,9 @@ rd_kafka_mock_topic_new(rd_kafka_mock_cluster_t *mcluster,
rd_kafka_mock_topic_t *mtopic;
int i;

mtopic = rd_calloc(1, sizeof(*mtopic));
mtopic = rd_calloc(1, sizeof(*mtopic));
/* Assign random topic id */
mtopic->id = rd_kafka_Uuid_random();
mtopic->name = rd_strdup(topic);
mtopic->cluster = mcluster;

Expand Down Expand Up @@ -671,6 +673,28 @@ rd_kafka_mock_topic_find_by_kstr(const rd_kafka_mock_cluster_t *mcluster,
return NULL;
}

/**
* @brief Find a mock topic by id.
*
* @param mcluster Cluster to search in.
* @param id Topic id to find.
* @return Found topic or NULL.
*
* @locks mcluster->lock MUST be held.
*/
rd_kafka_mock_topic_t *
rd_kafka_mock_topic_find_by_id(const rd_kafka_mock_cluster_t *mcluster,
rd_kafka_Uuid_t id) {
const rd_kafka_mock_topic_t *mtopic;

TAILQ_FOREACH(mtopic, &mcluster->topics, link) {
if (!rd_kafka_Uuid_cmp(mtopic->id, id))
return (rd_kafka_mock_topic_t *)mtopic;
}

return NULL;
}


/**
* @brief Create a topic using default settings.
Expand Down
6 changes: 6 additions & 0 deletions src/rdkafka_mock_int.h
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,7 @@ typedef struct rd_kafka_mock_partition_s {
typedef struct rd_kafka_mock_topic_s {
TAILQ_ENTRY(rd_kafka_mock_topic_s) link;
char *name;
rd_kafka_Uuid_t id;

rd_kafka_mock_partition_t *partitions;
int partition_cnt;
Expand Down Expand Up @@ -434,6 +435,11 @@ rd_kafka_mock_topic_find(const rd_kafka_mock_cluster_t *mcluster,
rd_kafka_mock_topic_t *
rd_kafka_mock_topic_find_by_kstr(const rd_kafka_mock_cluster_t *mcluster,
const rd_kafkap_str_t *kname);

rd_kafka_mock_topic_t *
rd_kafka_mock_topic_find_by_id(const rd_kafka_mock_cluster_t *mcluster,
rd_kafka_Uuid_t id);

rd_kafka_mock_broker_t *
rd_kafka_mock_cluster_get_coord(rd_kafka_mock_cluster_t *mcluster,
rd_kafka_coordtype_t KeyType,
Expand Down
4 changes: 4 additions & 0 deletions src/rdkafka_proto.h
Original file line number Diff line number Diff line change
Expand Up @@ -599,6 +599,10 @@ static RD_INLINE RD_UNUSED int rd_kafka_Uuid_cmp(rd_kafka_Uuid_t a,
(a.least_significant_bits - b.least_significant_bits);
}

rd_kafka_Uuid_t rd_kafka_Uuid_random();

const char *rd_kafka_Uuid_str(const rd_kafka_Uuid_t *uuid);

/**
* @name Producer ID and Epoch for the Idempotent Producer
* @{
Expand Down

0 comments on commit a6d85bd

Please sign in to comment.