diff --git a/examples/CMakeLists.txt b/examples/CMakeLists.txt index ae426e06..3592dd43 100644 --- a/examples/CMakeLists.txt +++ b/examples/CMakeLists.txt @@ -27,6 +27,9 @@ target_link_libraries(amqp_listen PRIVATE examples-common rabbitmq::rabbitmq) add_executable(amqp_producer amqp_producer.c) target_link_libraries(amqp_producer PRIVATE examples-common rabbitmq::rabbitmq) +add_executable(amqp_confirm_select amqp_confirm_select.c) +target_link_libraries(amqp_confirm_select PRIVATE examples-common rabbitmq::rabbitmq) + add_executable(amqp_connect_timeout amqp_connect_timeout.c) target_link_libraries(amqp_connect_timeout PRIVATE examples-common rabbitmq::rabbitmq) diff --git a/examples/amqp_confirm_select.c b/examples/amqp_confirm_select.c new file mode 100644 index 00000000..9327fe71 --- /dev/null +++ b/examples/amqp_confirm_select.c @@ -0,0 +1,189 @@ +// Copyright 2007 - 2021, Alan Antonuk and the rabbitmq-c contributors. +// SPDX-License-Identifier: mit + +#include +#include +#include +#include +#include + +#include +#include + +#include "utils.h" + +#if ((defined(_WIN32)) || (defined(__MINGW32__)) || (defined(__MINGW64__))) +#ifndef WINVER +#define WINVER 0x0502 +#endif +#ifndef WIN32_LEAN_AND_MEAN +#define WIN32_LEAN_AND_MEAN +#endif +#include +#else +#include +#endif + +#define SUMMARY_EVERY_US 5000 + +static void send_batch(amqp_connection_state_t conn, char const *queue_name, + int rate_limit, int message_count) { + uint64_t start_time = now_microseconds(); + int i; + int sent = 0; + int previous_sent = 0; + uint64_t previous_report_time = start_time; + uint64_t next_summary_time = start_time + SUMMARY_EVERY_US; + + char message[256]; + amqp_bytes_t message_bytes; + + for (i = 0; i < (int)sizeof(message); i++) { + message[i] = i & 0xff; + } + + message_bytes.len = sizeof(message); + message_bytes.bytes = message; + + for (i = 0; i < message_count; i++) { + uint64_t now = now_microseconds(); + + die_on_error(amqp_basic_publish(conn, 1, amqp_cstring_bytes("amq.direct"), + amqp_cstring_bytes(queue_name), 0, 0, NULL, + message_bytes), + "Publishing"); + sent++; + if (now > next_summary_time) { + int countOverInterval = sent - previous_sent; + double intervalRate = + countOverInterval / ((now - previous_report_time) / 1000000.0); + printf("%d ms: Sent %d - %d since last report (%d Hz)\n", + (int)(now - start_time) / 1000, sent, countOverInterval, + (int)intervalRate); + + previous_sent = sent; + previous_report_time = now; + next_summary_time += SUMMARY_EVERY_US; + } + + while (((i * 1000000.0) / (now - start_time)) > rate_limit) { + microsleep(2000); + now = now_microseconds(); + } + } + + { + uint64_t stop_time = now_microseconds(); + int total_delta = (int)(stop_time - start_time); + + printf("PRODUCER - Message count: %d\n", message_count); + printf("Total time, milliseconds: %d\n", total_delta / 1000); + printf("Overall messages-per-second: %g\n", + (message_count / (total_delta / 1000000.0))); + } +} + +#define CONSUME_TIMEOUT_USEC 100 +#define WAITING_TIMEOUT_USEC (30 * 1000) +void wait_for_acks(amqp_connection_state_t conn) { + uint64_t start_time = now_microseconds(); + struct timeval timeout = {0, CONSUME_TIMEOUT_USEC}; + uint64_t now = 0; + amqp_publisher_confirm_t result = {}; + + for (;;) { + amqp_rpc_reply_t ret; + + now = now_microseconds(); + + if (now > start_time + WAITING_TIMEOUT_USEC) { + return; + } + + amqp_maybe_release_buffers(conn); + ret = amqp_publisher_confirm_wait(conn, &timeout, &result); + + if (AMQP_RESPONSE_LIBRARY_EXCEPTION == ret.reply_type) { + if (AMQP_STATUS_UNEXPECTED_STATE == ret.library_error) { + fprintf(stderr, "An unexpected method was received\n"); + return; + } else if (AMQP_STATUS_TIMEOUT == ret.library_error) { + // Timeout means you're done; no publisher confirms were waiting! + return; + } else { + die_on_amqp_error(ret, "Waiting for publisher confirmation"); + } + } + + switch (result.method) { + case AMQP_BASIC_ACK_METHOD: + fprintf(stderr, "Got an ACK!\n"); + fprintf(stderr, "Here's the ACK:\n"); + fprintf(stderr, "\tdelivery_tag: «%" PRIu64 "»\n", + result.payload.ack.delivery_tag); + fprintf(stderr, "\tmultiple: «%d»\n", result.payload.ack.multiple); + break; + case AMQP_BASIC_NACK_METHOD: + fprintf(stderr, "NACK\n"); + break; + case AMQP_BASIC_REJECT_METHOD: + fprintf(stderr, "REJECT\n"); + break; + default: + fprintf(stderr, "Unexpected method «%s» is.\n", + amqp_method_name(result.method)); + }; + } +} + +int main(int argc, char const *const *argv) { + char const *hostname; + int port, status; + int rate_limit; + int message_count; + amqp_socket_t *socket = NULL; + amqp_connection_state_t conn; + + if (argc < 5) { + fprintf(stderr, + "Usage: amqp_producer host port rate_limit message_count\n"); + return 1; + } + + hostname = argv[1]; + port = atoi(argv[2]); + rate_limit = atoi(argv[3]); + message_count = atoi(argv[4]); + + conn = amqp_new_connection(); + + socket = amqp_tcp_socket_new(conn); + if (!socket) { + die("creating TCP socket"); + } + + status = amqp_socket_open(socket, hostname, port); + if (status) { + die("opening TCP socket"); + } + + die_on_amqp_error(amqp_login(conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, + "guest", "guest"), + "Logging in"); + amqp_channel_open(conn, 1); + die_on_amqp_error(amqp_get_rpc_reply(conn), "Opening channel"); + + // Enable confirm_select + amqp_confirm_select(conn, 1); + die_on_amqp_error(amqp_get_rpc_reply(conn), "Enable confirm-select"); + + send_batch(conn, "test queue", rate_limit, message_count); + + wait_for_acks(conn); + die_on_amqp_error(amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS), + "Closing channel"); + die_on_amqp_error(amqp_connection_close(conn, AMQP_REPLY_SUCCESS), + "Closing connection"); + die_on_error(amqp_destroy_connection(conn), "Ending connection"); + return 0; +} diff --git a/include/rabbitmq-c/amqp.h b/include/rabbitmq-c/amqp.h index c9cd0eee..dde976dd 100644 --- a/include/rabbitmq-c/amqp.h +++ b/include/rabbitmq-c/amqp.h @@ -2433,6 +2433,54 @@ AMQP_EXPORT int AMQP_CALL amqp_set_rpc_timeout(amqp_connection_state_t state, const struct timeval *timeout); +/** + * Possible payload permutations for publisher confirms. + **/ +typedef union amqp_publisher_confirm_payload_t_ { + amqp_basic_ack_t ack; /* basic.ack */ + amqp_basic_nack_t nack; /* basic.nack */ + amqp_basic_reject_t reject; /* basic.reject */ +} amqp_publisher_confirm_payload_t; + +/** + * Return information from publisher confirm wait + **/ +typedef struct amqp_publisher_confirm_t_ { + amqp_publisher_confirm_payload_t payload; /* The response payload; check the `method` value to see which value you should use in the union */ + amqp_channel_t channel; /* The channel where the confirmation was received */ + amqp_method_number_t method; /* The method which was received */ +} amqp_publisher_confirm_t; + +/** + * amqp_publisher_confirm_wait + * + * Wait for a publisher confirm when one or more channel is in select mode. + * If the response has a `reply_type` of `AMQP_RESPONSE_LIBRARY_EXCEPTION` _and_ + * the `library_error` is `AMQP_STATUS_UNEXPECTED_STATE`, then the frame + * received was not an ack. + * + * In the event that there are no publisher confirms received during the + * allotted time, `reply_type` will be `AMQP_RESPONSE_LIBRARY_EXCEPTION` + * and the `library_error` will be `AMQP_STATUS_TIMEOUT`. + * + * When a publisher confirm is received, `reply_type` will equal + * `AMQP_RESPONSE_NORMAL`, and the `result` out parameter will + * contain all of the information you need: + * + * - The `channel` will identify which channel the publisher confirm was received on + * - The `method` will tell you whether this is an `ack`, `nack`, or `reject` + * - The `payload` is a union, and based on the `method` it will use one of `amqp_basic_ack_t`, `amqp_basic_nack_t`, or `amqp_basic_reject_t` + * + * \param [in] state connection state + * \param [in] timeout when waiting for the frame. Passing NULL will result in + * blocking behavior + * \param [out] The result of the publisher confirm wait. + */ +AMQP_EXPORT +amqp_rpc_reply_t AMQP_CALL amqp_publisher_confirm_wait( + amqp_connection_state_t state, const struct timeval *timeout, + amqp_publisher_confirm_t *result); + AMQP_END_DECLS #endif /* RABBITMQ_C_RABBITMQ_C_H */ diff --git a/librabbitmq/amqp_api.c b/librabbitmq/amqp_api.c index 37a75e9e..7b70a193 100644 --- a/librabbitmq/amqp_api.c +++ b/librabbitmq/amqp_api.c @@ -364,3 +364,58 @@ int amqp_set_rpc_timeout(amqp_connection_state_t state, } return AMQP_STATUS_OK; } + +amqp_rpc_reply_t amqp_publisher_confirm_wait(amqp_connection_state_t state, + const struct timeval *timeout, + amqp_publisher_confirm_t *result) { + int res; + amqp_frame_t frame; + amqp_rpc_reply_t ret; + + memset(&ret, 0x0, sizeof(ret)); + memset(result, 0x0, sizeof(amqp_publisher_confirm_t)); + + res = amqp_simple_wait_frame_noblock(state, &frame, timeout); + + if (AMQP_STATUS_OK != res) { + ret.reply_type = AMQP_RESPONSE_LIBRARY_EXCEPTION; + ret.library_error = res; + return ret; + } else if (AMQP_FRAME_METHOD != frame.frame_type || + (AMQP_BASIC_ACK_METHOD != frame.payload.method.id && + AMQP_BASIC_NACK_METHOD != frame.payload.method.id && + AMQP_BASIC_REJECT_METHOD != frame.payload.method.id)) { + amqp_put_back_frame(state, &frame); + ret.reply_type = AMQP_RESPONSE_LIBRARY_EXCEPTION; + ret.library_error = AMQP_STATUS_UNEXPECTED_STATE; + return ret; + } + + switch (frame.payload.method.id) { + case AMQP_BASIC_ACK_METHOD: + memcpy(&(result->payload.ack), frame.payload.method.decoded, + sizeof(amqp_basic_ack_t)); + break; + + case AMQP_BASIC_NACK_METHOD: + memcpy(&(result->payload.nack), frame.payload.method.decoded, + sizeof(amqp_basic_nack_t)); + break; + + case AMQP_BASIC_REJECT_METHOD: + memcpy(&(result->payload.reject), frame.payload.method.decoded, + sizeof(amqp_basic_reject_t)); + break; + + default: + amqp_put_back_frame(state, &frame); + ret.reply_type = AMQP_RESPONSE_LIBRARY_EXCEPTION; + ret.library_error = AMQP_STATUS_UNSUPPORTED; + return ret; + } + result->method = frame.payload.method.id; + result->channel = frame.channel; + ret.reply_type = AMQP_RESPONSE_NORMAL; + + return ret; +}