Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding a function to wait for publisher confirms #841

Merged
merged 19 commits into from
Oct 17, 2024
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions examples/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ target_link_libraries(amqp_listen PRIVATE examples-common rabbitmq::rabbitmq)
add_executable(amqp_producer amqp_producer.c)
target_link_libraries(amqp_producer PRIVATE examples-common rabbitmq::rabbitmq)

add_executable(amqp_confirm_select amqp_confirm_select.c)
target_link_libraries(amqp_confirm_select PRIVATE examples-common rabbitmq::rabbitmq)

add_executable(amqp_connect_timeout amqp_connect_timeout.c)
target_link_libraries(amqp_connect_timeout PRIVATE examples-common rabbitmq::rabbitmq)

Expand Down
174 changes: 174 additions & 0 deletions examples/amqp_confirm_select.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
// Copyright 2007 - 2021, Alan Antonuk and the rabbitmq-c contributors.
// SPDX-License-Identifier: mit

#include <inttypes.h>
#include <stdint.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>

#include <rabbitmq-c/amqp.h>
#include <rabbitmq-c/tcp_socket.h>

#include "rabbitmq-c/framing.h"
#include "utils.h"

#if ((defined(_WIN32)) || (defined(__MINGW32__)) || (defined(__MINGW64__)))
#ifndef WINVER
#define WINVER 0x0502
#endif
#ifndef WIN32_LEAN_AND_MEAN
#define WIN32_LEAN_AND_MEAN
#endif
#include <winsock2.h>
#else
#include <sys/time.h>
#endif

#define SUMMARY_EVERY_US 5000

static void send_batch(amqp_connection_state_t conn, char const *queue_name,
int rate_limit, int message_count) {
uint64_t start_time = now_microseconds();
int i;
int sent = 0;
int previous_sent = 0;
uint64_t previous_report_time = start_time;
uint64_t next_summary_time = start_time + SUMMARY_EVERY_US;

char message[256];
amqp_bytes_t message_bytes;

for (i = 0; i < (int)sizeof(message); i++) {
message[i] = i & 0xff;
}

message_bytes.len = sizeof(message);
message_bytes.bytes = message;

for (i = 0; i < message_count; i++) {
uint64_t now = now_microseconds();

die_on_error(amqp_basic_publish(conn, 1, amqp_cstring_bytes("amq.direct"),
amqp_cstring_bytes(queue_name), 0, 0, NULL,
message_bytes),
"Publishing");
sent++;
if (now > next_summary_time) {
int countOverInterval = sent - previous_sent;
double intervalRate =
countOverInterval / ((now - previous_report_time) / 1000000.0);
printf("%d ms: Sent %d - %d since last report (%d Hz)\n",
(int)(now - start_time) / 1000, sent, countOverInterval,
(int)intervalRate);

previous_sent = sent;
previous_report_time = now;
next_summary_time += SUMMARY_EVERY_US;
}

while (((i * 1000000.0) / (now - start_time)) > rate_limit) {
microsleep(2000);
now = now_microseconds();
}
}

{
uint64_t stop_time = now_microseconds();
int total_delta = (int)(stop_time - start_time);

printf("PRODUCER - Message count: %d\n", message_count);
printf("Total time, milliseconds: %d\n", total_delta / 1000);
printf("Overall messages-per-second: %g\n",
(message_count / (total_delta / 1000000.0)));
}
}

#define CONSUME_TIMEOUT_USEC 100
#define WAITING_TIMEOUT_USEC (30 * 1000)
void wait_for_acks(amqp_connection_state_t conn) {
uint64_t start_time = now_microseconds();
amqp_basic_ack_t ack;
struct timeval timeout = {0, CONSUME_TIMEOUT_USEC};
uint64_t now;
amqp_envelope_t envelope;

for (;;) {
amqp_rpc_reply_t ret;

now = now_microseconds();

if (now > start_time + WAITING_TIMEOUT_USEC) {
return;
}

amqp_maybe_release_buffers(conn);
ret = amqp_publisher_confirm_wait(conn, &envelope, &ack, &timeout);

if (AMQP_RESPONSE_LIBRARY_EXCEPTION == ret.reply_type) {
if (AMQP_STATUS_UNEXPECTED_STATE == ret.library_error) {
fprintf(stderr, "An unexpected method was received\n");
return;
} else {
die_on_amqp_error(ret, "Waiting for publisher confirmation");
}
}

fprintf(stderr, "Got an ACK!\n");
fprintf(stderr, "Here's the ACK:\n");
fprintf(stderr, "\tdelivery_tag: «%" PRIu64 "»\n", ack.delivery_tag);
fprintf(stderr, "\tmultiple: «%d»\n", ack.multiple);
}
}

int main(int argc, char const *const *argv) {
char const *hostname;
int port, status;
int rate_limit;
int message_count;
amqp_socket_t *socket = NULL;
amqp_connection_state_t conn;

if (argc < 5) {
fprintf(stderr,
"Usage: amqp_producer host port rate_limit message_count\n");
return 1;
}

hostname = argv[1];
port = atoi(argv[2]);
rate_limit = atoi(argv[3]);
message_count = atoi(argv[4]);

conn = amqp_new_connection();

socket = amqp_tcp_socket_new(conn);
if (!socket) {
die("creating TCP socket");
}

status = amqp_socket_open(socket, hostname, port);
if (status) {
die("opening TCP socket");
}

die_on_amqp_error(amqp_login(conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN,
"guest", "guest"),
"Logging in");
amqp_channel_open(conn, 1);
die_on_amqp_error(amqp_get_rpc_reply(conn), "Opening channel");

// Enable confirm_select
amqp_confirm_select(conn, 1);
die_on_amqp_error(amqp_get_rpc_reply(conn), "Enable confirm-select");

send_batch(conn, "test queue", rate_limit, message_count);

wait_for_acks(conn);
die_on_amqp_error(amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS),
"Closing channel");
die_on_amqp_error(amqp_connection_close(conn, AMQP_REPLY_SUCCESS),
"Closing connection");
die_on_error(amqp_destroy_connection(conn), "Ending connection");
return 0;
}
1 change: 0 additions & 1 deletion examples/amqp_ssl_connect.c
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,6 @@ int main(int argc, char const *const *argv) {
die_on_amqp_error(amqp_connection_close(conn, AMQP_REPLY_SUCCESS),
"Closing connection");
die_on_error(amqp_destroy_connection(conn), "Ending connection");
die_on_error(amqp_uninitialize_ssl_library(), "Uninitializing SSL library");
manchicken marked this conversation as resolved.
Show resolved Hide resolved

printf("Done\n");
return 0;
Expand Down
20 changes: 20 additions & 0 deletions include/rabbitmq-c/amqp.h
Original file line number Diff line number Diff line change
Expand Up @@ -2433,6 +2433,26 @@ AMQP_EXPORT
int AMQP_CALL amqp_set_rpc_timeout(amqp_connection_state_t state,
const struct timeval *timeout);


/**
* amqp_publisher_confirm_wait
*
* Wait for a publisher confirm when the connection is in select mode.
alanxz marked this conversation as resolved.
Show resolved Hide resolved
alanxz marked this conversation as resolved.
Show resolved Hide resolved
* If the response has a `reply_type` of `AMQP_RESPONSE_LIBRARY_EXCEPTION` _and_
* the `library_error` is `AMQP_STATUS_UNEXPECTED_STATE`, then the frame
* received was likely not an ack.
*
* \param [in] state connection state
* \param [out] pointer to where the ACK details should go
* \param [in] timeout when waiting for the frame. Passing NULL will result in
* blocking behavior
* \returns amqp_rpc_reply_t *
*/
AMQP_EXPORT
amqp_rpc_reply_t AMQP_CALL amqp_publisher_confirm_wait(
alanxz marked this conversation as resolved.
Show resolved Hide resolved
amqp_connection_state_t state, amqp_envelope_t *envelope,
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you describe the intended purpose of the envelope parameter?

Seems like this may not be required.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, it's the only thing that'll tell you which channel the ACK came through. The code for the amqp_basic_ack_t is correct per spec, naturally, but it can be helpful to know which channel the ACK came on.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did you have any more questions or requests for change on this?

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If all you need is the channel id, then use amqp_channel_t* as an output parameter.

Alternatively consider using an amqp_channel_t as an input parameter, then only wait for the confirm on this channel.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like the pulling the channel as an output parameter, and I'll combine that change with the method out param as well. I think making it filter by channel ID may be outside the scope of what I'm doing here, and the channel out param will allow the caller to easily implement that if they want to.

amqp_basic_ack_t *ack, const struct timeval *timeout);
alanxz marked this conversation as resolved.
Show resolved Hide resolved

AMQP_END_DECLS

#endif /* RABBITMQ_C_RABBITMQ_C_H */
37 changes: 37 additions & 0 deletions librabbitmq/amqp_api.c
Original file line number Diff line number Diff line change
Expand Up @@ -364,3 +364,40 @@ int amqp_set_rpc_timeout(amqp_connection_state_t state,
}
return AMQP_STATUS_OK;
}

amqp_rpc_reply_t amqp_publisher_confirm_wait(amqp_connection_state_t state,
alanxz marked this conversation as resolved.
Show resolved Hide resolved
amqp_envelope_t *envelope,
amqp_basic_ack_t *ack,
const struct timeval *timeout) {

manchicken marked this conversation as resolved.
Show resolved Hide resolved
int res;
amqp_frame_t frame;
amqp_rpc_reply_t ret;
amqp_basic_ack_t *ptr_to_ack;

memset(&ret, 0, sizeof(ret));
memset(envelope, 0, sizeof(*envelope));
memset(ack, 0, sizeof(*ack));

res = amqp_simple_wait_frame_noblock(state, &frame, timeout);

if (AMQP_STATUS_OK != res) {
ret.reply_type = AMQP_RESPONSE_LIBRARY_EXCEPTION;
ret.library_error = res;

} else if (AMQP_FRAME_METHOD != frame.frame_type ||
AMQP_BASIC_ACK_METHOD != frame.payload.method.id) {
amqp_put_back_frame(state, &frame);
ret.reply_type = AMQP_RESPONSE_LIBRARY_EXCEPTION;
ret.library_error = AMQP_STATUS_UNEXPECTED_STATE;

} else {
ptr_to_ack = frame.payload.method.decoded;
memcpy(ack, ptr_to_ack, sizeof(*ack));
envelope->channel = frame.channel;
envelope->delivery_tag = ptr_to_ack->delivery_tag;
ret.reply_type = AMQP_RESPONSE_NORMAL;
}

return ret;
}