Skip to content

Commit

Permalink
Implement receiving messages; Changes to eventhubs so that all eventh…
Browse files Browse the repository at this point in the history
…ubs tests pass (#6254)


* Eventhubs tests pass

* Noise reduction; explain the task which spawns a task

* Update sdk/eventhubs/azure-messaging-eventhubs/src/producer_client.cpp

Co-authored-by: Anton Kolesnyk <41349689+antkmsft@users.noreply.github.com>

* PR feedback

---------

Co-authored-by: Anton Kolesnyk <41349689+antkmsft@users.noreply.github.com>
  • Loading branch information
LarryOsterman and antkmsft committed Jan 6, 2025
1 parent f934104 commit d9d8276
Show file tree
Hide file tree
Showing 20 changed files with 857 additions and 169 deletions.
18 changes: 18 additions & 0 deletions CMakePresets.json
Original file line number Diff line number Diff line change
Expand Up @@ -493,6 +493,24 @@
"curl-transport"
]
},
{
"name": "x64-static-debug-asan-perftests-rust",
"displayName": "x64 Debug, ASAN Rust AMQP, static With Perf Tests and samples, libcurl+winhttp",
"inherits": [
"x64-static",
"enable-address-sanitizer",
"debug-build",
"enable-tests",
"enable-samples",
"enable-perf",
"rust-amqp",
"winhttp-transport",
"curl-transport"
],
"cacheVariables": {
"DISABLE_AZURE_CORE_OPENTELEMETRY": true
}
},
{
"name": "x64-static-release-perftests-asan",
"displayName": "x64 Release static With Perf Tests and samples, plus ASAN",
Expand Down
143 changes: 122 additions & 21 deletions sdk/core/azure-core-amqp/src/impl/rust_amqp/amqp/message_receiver.cpp
Original file line number Diff line number Diff line change
@@ -1,17 +1,23 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.

// Enable declaration of strerror_s.
#define __STDC_WANT_LIB_EXT1__ 1
// cspell: words amqpmessagereceiver amqpmessagereceiveroptions amqp Amqp

//// Enable declaration of strerror_s.
// #define __STDC_WANT_LIB_EXT1__ 1

#include "azure/core/amqp/internal/message_receiver.hpp"

#include "../../../models/private/error_impl.hpp"
#include "../../../models/private/message_impl.hpp"
#include "../../../models/private/source_impl.hpp"
#include "../../../models/private/target_impl.hpp"
#include "../../../models/private/value_impl.hpp"
#include "azure/core/amqp/internal/link.hpp"
#include "azure/core/amqp/internal/models/messaging_values.hpp"
#include "azure/core/amqp/models/amqp_message.hpp"
#include "private/message_receiver_impl.hpp"
#include "private/session_impl.hpp"

#include <azure/core/diagnostics/logger.hpp>
#include <azure/core/internal/diagnostics/log.hpp>
Expand All @@ -22,39 +28,85 @@
using namespace Azure::Core::Diagnostics::_internal;
using namespace Azure::Core::Diagnostics;
using namespace Azure::Core::Amqp::_internal;
using namespace Azure::Core::Amqp::Common::_detail;

using namespace Azure::Core::Amqp::RustInterop::_detail;

namespace Azure { namespace Core { namespace Amqp { namespace _detail {
#if ENABLE_UAMQP
void UniqueHandleHelper<MESSAGE_RECEIVER_INSTANCE_TAG>::FreeMessageReceiver(
MESSAGE_RECEIVER_HANDLE value)
void UniqueHandleHelper<RustAmqpMessageReceiver>::FreeMessageReceiver(
RustAmqpMessageReceiver* value)
{
messagereceiver_destroy(value);
amqpmessagereceiver_destroy(value);
}
#endif

template <> struct UniqueHandleHelper<RustInterop::_detail::RustAmqpMessageReceiverOptions>
{
static void FreeMessageReceiverOptions(RustAmqpMessageReceiverOptions* obj);

using type = Core::_internal::
BasicUniqueHandle<RustAmqpMessageReceiverOptions, FreeMessageReceiverOptions>;
};

void UniqueHandleHelper<RustAmqpMessageReceiverOptions>::FreeMessageReceiverOptions(
RustInterop::_detail::RustAmqpMessageReceiverOptions* value)
{
amqpmessagereceiveroptions_destroy(value);
}

}}}} // namespace Azure::Core::Amqp::_detail

namespace Azure { namespace Core { namespace Amqp { namespace _detail {
using UniqueMessageReceiverOptions
= UniqueHandle<RustInterop::_detail::RustAmqpMessageReceiverOptions>;

/** Configure the MessageReceiver for receiving messages from a service instance.
*/
MessageReceiverImpl::MessageReceiverImpl(
std::shared_ptr<_detail::SessionImpl> session,
Models::_internal::MessageSource const& source,
MessageReceiverOptions const& options)
: m_options{options}, m_source{source}, m_session{session}
: m_receiver{amqpmessagereceiver_create()}, m_options{options}, m_source{source}, m_session{
session}
{
}

std::pair<std::shared_ptr<Models::AmqpMessage>, Models::_internal::AmqpError>
MessageReceiverImpl::WaitForIncomingMessage(Context const& context)
{
throw std::runtime_error("Not implemented");
(void)context;
Common::_detail::CallContext callContext(
Common::_detail::GlobalStateHolder::GlobalStateInstance()->GetRuntimeContext(), context);

{

auto message = Models::_detail::AmqpMessageFactory::FromImplementation(
amqpmessagereceiver_receive_message_wait(callContext.GetCallContext(), m_receiver.get()));

return std::make_pair(message, Models::_internal::AmqpError{});
}
}

std::pair<std::shared_ptr<Models::AmqpMessage>, Models::_internal::AmqpError>
MessageReceiverImpl::TryWaitForIncomingMessage()
{
throw std::runtime_error("Not implemented");
Common::_detail::CallContext callContext(
Common::_detail::GlobalStateHolder::GlobalStateInstance()->GetRuntimeContext(), {});

auto message = Models::_detail::AmqpMessageFactory::FromImplementation(
amqpmessagereceiver_receive_message_async_poll(
callContext.GetCallContext(), m_receiver.get()));

if (message)
{
return std::make_pair(message, Models::_internal::AmqpError{});
}
else if (callContext.GetError().empty())
{
return std::make_pair(nullptr, Models::_internal::AmqpError{});
}
else
{
return std::make_pair(nullptr, Models::_internal::AmqpError{});
}
}

MessageReceiverImpl::~MessageReceiverImpl() noexcept
Expand All @@ -67,27 +119,76 @@ namespace Azure { namespace Core { namespace Amqp { namespace _detail {
Azure::Core::_internal::AzureNoReturnPath(
"MessageReceiverImpl is being destroyed while open.");
}
if (m_link)
{
m_link.reset();
}
m_messageQueue.Clear();
}

// Nullable<uint32_t> InitialDeliveryCount;

// Nullable<uint64_t> MaxMessageSize;
// uint32_t MaxLinkCredit{};

void MessageReceiverImpl::Open(Azure::Core::Context const& context)
{
if (m_options.AuthenticationRequired)
{
m_session->GetConnection()->AuthenticateAudience(
m_session, static_cast<std::string>(m_source.GetAddress()), context);
}
throw std::runtime_error("Not yet implemented");
}

UniqueMessageReceiverOptions options{amqpmessagereceiveroptions_create()};

InvokeAmqpApi(amqpmessagereceiveroptions_set_name, options, m_options.Name.c_str());

RustReceiverSettleMode settleMode;
switch (m_options.SettleMode)
{
case ReceiverSettleMode::First:
settleMode = RustReceiverSettleMode::First;
break;
case ReceiverSettleMode::Second:
settleMode = RustReceiverSettleMode::Second;
break;
}
InvokeAmqpApi(amqpmessagereceiveroptions_set_receiver_settle_mode, options, settleMode);
InvokeAmqpApi(
amqpmessagereceiveroptions_set_target,
options,
Models::_detail::AmqpTargetFactory::ToImplementation(m_options.MessageTarget));
InvokeAmqpApi(
amqpmessagereceiveroptions_set_properties,
options,
Models::_detail::AmqpValueFactory::ToImplementation(m_options.Properties.AsAmqpValue()));

Common::_detail::CallContext callContext(
Common::_detail::GlobalStateHolder::GlobalStateInstance()->GetRuntimeContext(), context);

if (amqpmessagereceiver_attach(
callContext.GetCallContext(),
m_receiver.get(),
m_session->GetAmqpSession().get(),
Models::_detail::AmqpSourceFactory::ToImplementation(m_source),
options.get()))
{
throw std::runtime_error("Failed to attach message receiver: " + callContext.GetError());
}
else
{
m_receiverOpen = true;
}
}
void MessageReceiverImpl::Close(Context const& context)
{
throw std::runtime_error("Not yet implemented");
(void)context;
}
if (m_receiver)
{
Common::_detail::CallContext callContext(
Common::_detail::GlobalStateHolder::GlobalStateInstance()->GetRuntimeContext(), context);

// Even if the detach fails, we still want to consider the receiver closed.
m_receiverOpen = false;
if (amqpmessagereceiver_detach_and_release(
callContext.GetCallContext(), m_receiver.release()))
{
throw std::runtime_error("Failed to detach message receiver: " + callContext.GetError());
}
}
}
}}}} // namespace Azure::Core::Amqp::_detail
Original file line number Diff line number Diff line change
Expand Up @@ -4,30 +4,26 @@
#pragma once

#include "azure/core/amqp/internal/message_receiver.hpp"
#if ENABLE_UAMQP
#include "link_impl.hpp"
#endif
#include "rust_amqp_wrapper.h"
#include "session_impl.hpp"
#include "unique_handle.hpp"

#include <memory>

namespace Azure { namespace Core { namespace Amqp { namespace _detail {
#if ENABLE_UAMQP
template <> struct UniqueHandleHelper<MESSAGE_RECEIVER_INSTANCE_TAG>

template <> struct UniqueHandleHelper<RustInterop::_detail::RustAmqpMessageReceiver>
{
static void FreeMessageReceiver(MESSAGE_RECEIVER_HANDLE obj);
static void FreeMessageReceiver(RustInterop::_detail::RustAmqpMessageReceiver* obj);

using type
= Core::_internal::BasicUniqueHandle<MESSAGE_RECEIVER_INSTANCE_TAG, FreeMessageReceiver>;
using type = Core::_internal::
BasicUniqueHandle<RustInterop::_detail::RustAmqpMessageReceiver, FreeMessageReceiver>;
};
#endif

}}}} // namespace Azure::Core::Amqp::_detail

namespace Azure { namespace Core { namespace Amqp { namespace _detail {
#if ENABLE_UAMQP
using UniqueMessageReceiver = UniqueHandle<MESSAGE_RECEIVER_INSTANCE_TAG>;
#endif
using UniqueMessageReceiver = UniqueHandle<RustInterop::_detail::RustAmqpMessageReceiver>;
class MessageReceiverFactory final {
public:
static Azure::Core::Amqp::_internal::MessageReceiver CreateFromInternal(
Expand All @@ -39,24 +35,10 @@ namespace Azure { namespace Core { namespace Amqp { namespace _detail {

class MessageReceiverImpl final : public std::enable_shared_from_this<MessageReceiverImpl> {
public:
#if ENABLE_UAMQP
MessageReceiverImpl(
std::shared_ptr<_detail::SessionImpl> session,
Models::_internal::MessageSource const& receiverSource,
_internal::MessageReceiverOptions const& options,
_internal::MessageReceiverEvents* receiverEvents = nullptr);
MessageReceiverImpl(
std::shared_ptr<_detail::SessionImpl> session,
_internal::LinkEndpoint& linkEndpoint,
Models::_internal::MessageSource const& receiverSource,
_internal::MessageReceiverOptions const& options,
_internal::MessageReceiverEvents* receiverEvents = nullptr);
#elif ENABLE_RUST_AMQP
MessageReceiverImpl(
std::shared_ptr<_detail::SessionImpl> session,
Models::_internal::MessageSource const& receiverSource,
_internal::MessageReceiverOptions const& options);
#endif
~MessageReceiverImpl() noexcept;

MessageReceiverImpl(MessageReceiverImpl const&) = delete;
Expand All @@ -76,25 +58,10 @@ namespace Azure { namespace Core { namespace Amqp { namespace _detail {

private:
bool m_receiverOpen{false};
std::shared_ptr<_detail::LinkImpl> m_link;
UniqueMessageReceiver m_receiver;

_internal::MessageReceiverOptions m_options;
Models::_internal::MessageSource m_source;
std::shared_ptr<_detail::SessionImpl> m_session;
Models::_internal::AmqpError m_savedMessageError{};
_internal::MessageReceiverState m_currentState{};
bool m_deferLinkPolling{false};

bool m_linkPollingEnabled{false};
std::mutex m_mutableState;

Azure::Core::Amqp::Common::_internal::
AsyncOperationQueue<std::shared_ptr<Models::AmqpMessage>, Models::_internal::AmqpError>
m_messageQueue;

// When we close a uAMQP messagereceiver, the link is left in the half closed state. We need to
// wait for the link to be fully closed before we can close the session. This queue will hold
// the close operation until the link is fully closed.
Azure::Core::Amqp::Common::_internal::AsyncOperationQueue<Models::_internal::AmqpError>
m_closeQueue;
};
}}}} // namespace Azure::Core::Amqp::_detail
Loading

0 comments on commit d9d8276

Please sign in to comment.