Skip to content

Commit

Permalink
Use rd_kafka_compression_t (#4419)
Browse files Browse the repository at this point in the history
  • Loading branch information
anchitj authored Aug 31, 2023
1 parent 5c943ba commit 9e3cf26
Show file tree
Hide file tree
Showing 5 changed files with 11 additions and 12 deletions.
4 changes: 2 additions & 2 deletions src/rdkafka_mock_handlers.c
Original file line number Diff line number Diff line change
Expand Up @@ -2187,13 +2187,13 @@ static int rd_kafka_mock_handle_PushTelemetry(rd_kafka_mock_connection_t *mconn,
rd_kafka_uuid_t ClientInstanceId;
int32_t SubscriptionId;
rd_bool_t terminating;
rd_kafkap_str_t compression_type;
rd_kafka_compression_t compression_type;
rd_kafkap_bytes_t metrics;

rd_kafka_buf_read_uuid(rkbuf, &ClientInstanceId);
rd_kafka_buf_read_i32(rkbuf, &SubscriptionId);
rd_kafka_buf_read_bool(rkbuf, &terminating);
rd_kafka_buf_read_str(rkbuf, &compression_type);
rd_kafka_buf_read_i8(rkbuf, &compression_type);
rd_kafka_buf_read_kbytes(rkbuf, &metrics);

rd_kafka_telemetry_decode_metrics((void *)metrics.data, metrics.len);
Expand Down
6 changes: 3 additions & 3 deletions src/rdkafka_protocol.h
Original file line number Diff line number Diff line change
Expand Up @@ -115,10 +115,10 @@
#define RD_KAFKAP_AllocateProducerIds 67

/* Using dummy numbers for now. */
#define RD_KAFKAP_GetTelemetrySubscriptions 68
#define RD_KAFKAP_PushTelemetry 69
#define RD_KAFKAP_GetTelemetrySubscriptions 69
#define RD_KAFKAP_PushTelemetry 70

#define RD_KAFKAP__NUM 70
#define RD_KAFKAP__NUM 71


#endif /* _RDKAFKA_PROTOCOL_H_ */
7 changes: 3 additions & 4 deletions src/rdkafka_request.c
Original file line number Diff line number Diff line change
Expand Up @@ -5238,7 +5238,7 @@ rd_kafka_PushTelemetryRequest(rd_kafka_broker_t *rkb,
rd_kafka_uuid_t *client_instance_id,
int32_t subscription_id,
rd_bool_t terminating,
const char *compression_type,
const rd_kafka_compression_t compression_type,
const void *metrics,
size_t metrics_size,
char *errstr,
Expand All @@ -5259,16 +5259,15 @@ rd_kafka_PushTelemetryRequest(rd_kafka_broker_t *rkb,
}

size_t len = sizeof(rd_kafka_uuid_t) + sizeof(int32_t) +
sizeof(rd_bool_t) + strlen(compression_type) +
sizeof(rd_bool_t) + sizeof(compression_type) +
metrics_size;
rkbuf = rd_kafka_buf_new_flexver_request(rkb, RD_KAFKAP_PushTelemetry,
1, len, rd_true);

rd_kafka_buf_write_uuid(rkbuf, client_instance_id);
rd_kafka_buf_write_i32(rkbuf, subscription_id);
rd_kafka_buf_write_bool(rkbuf, terminating);
rd_kafka_buf_write_str(rkbuf, compression_type,
strlen(compression_type));
rd_kafka_buf_write_i8(rkbuf, compression_type);

rd_kafkap_bytes_t *metric_bytes =
rd_kafkap_bytes_new(metrics, metrics_size);
Expand Down
2 changes: 1 addition & 1 deletion src/rdkafka_request.h
Original file line number Diff line number Diff line change
Expand Up @@ -484,7 +484,7 @@ rd_kafka_PushTelemetryRequest(rd_kafka_broker_t *rkb,
rd_kafka_uuid_t *client_instance_id,
int32_t subscription_id,
rd_bool_t terminating,
const char *compression_type,
rd_kafka_compression_t compression_type,
const void *metrics,
size_t metrics_size,
char *errstr,
Expand Down
4 changes: 2 additions & 2 deletions src/rdkafka_telemetry.c
Original file line number Diff line number Diff line change
Expand Up @@ -222,8 +222,8 @@ static void rd_kafka_send_push_telemetry(rd_kafka_t *rk,
size_t metrics_payload_size;
void *metrics_payload =
rd_kafka_telemetry_encode_metrics(rk, &metrics_payload_size);
// TODO: Use rd_kafka_compression_t
const char *compression_type = "gzip";
// TODO: Cycle through compression types
rd_kafka_compression_t compression_type = RD_KAFKA_COMPRESSION_GZIP;

rd_kafka_dbg(rk, TELEMETRY, "PUSHSENT",
"Sending PushTelemetryRequest with terminating = %d",
Expand Down

0 comments on commit 9e3cf26

Please sign in to comment.