Skip to content

Commit

Permalink
Integrate Rust 2 step message receive code to C++ (#6349)
Browse files Browse the repository at this point in the history
* Integrate Rust 2 step message receive code to C++

* If receiving a delivery failed, transmit the error to the message channel if at all possible
  • Loading branch information
LarryOsterman committed Mar 3, 2025
1 parent aa19507 commit 91fd49e
Show file tree
Hide file tree
Showing 14 changed files with 675 additions and 123 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ uuid = { workspace = true }

[dev-dependencies]
tracing-subscriber = { workspace = true, features = ["env-filter"] }
tokio = { workspace = true }

[features]
default = ["fe2o3-amqp"]
Expand All @@ -46,7 +47,6 @@ fe2o3-amqp = [
"serde_bytes",
]
cplusplus = []
test_e2e = []

[package.metadata.docs.rs]
features = ["fe2o3-amqp"]
Original file line number Diff line number Diff line change
Expand Up @@ -6,23 +6,66 @@ NOTE: THIS IS NOT A GENERAL PURPOSE AMQP LIBRARY AND SHOULD NOT BE USED AS SUCH.

This crate is part of a collection of crates: for more information please refer to [https://github.com/azure/azure-sdk-for-rust](https://github.com/azure/azure-sdk-for-rust).

## Example
## Testing the AMQP Client.

```rust no_run
use azure_messaging::*;
The AMQP package is tested using the standard `cargo test` command line:

#[tokio::main]
async fn main() -> azure_core::Result<()> {
let eventhubs_namespace = std::env::var("AZURE_EVENTHUB_NAMESPACE").expect("missing AZURE_EVENTHUB_NAMESPACE");
```bash
cargo test --package azure_core_amqp --all-features
```

Certain functionality of the test requires that the azure-amqp TestAmqpBroker be running at the time of the test.

To enable this functionality, there are two ways of installing and running the TestAmqpBroker, Scripted and Manual.

### Scripted Broker Install
For Scripted testing, simply run the powershell script in the sdk/core/azure_core_amqp directory:

```powershell
PS> .\sdk\core\azure_core_amqp\Test-Setup.ps1
```

This will download the TestAmqpBroker, build it and run the executable.

Note that this requires that you have the [.Net SDK](https://dot.net/download) installed on your machine.

You can then run the azure_core_amqp package tests.

Once you have finished running your tests, you run:

```powershell
PS> .\sdk\core\azure_core_amqp\Test-Cleanup.ps1
```

let client = ProducerClient::new(
eventhubs_namespace, credential,
)?;
which will terminate the test broker.

client.send_event("hello world").await?;
### Manual Broker Install
For Manual testing, first clone the azure-amqp repository to a local directory:

Ok(())
}
```bash
cd <Test Working Directory>
git clone https://github.com/Azure/azure-amqp
```

Alternately, you can clone to a specific release in the azure-amqp repository:

```
git clone https://github.com/Azure/azure-amqp.git --branch hotfix
```

Set an environment variable the test AMQP broker should listen on:

```powershell
$env:TEST_BROKER_ADDRESS = 'amqp://127.0.0.1:25672'
```

And launch the test broker:

```powershell
cd azure-amqp/test/TestAmqpBroker
dotnet run -- $env:TEST_BROKER_ADDRESS
```

Now, when you run the cargo tests, the networking functionality of the AMQP APIs will be executed.

License: MIT
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License.
# cspell: ignore JOBID

. "$PSScriptRoot\..\..\..\eng\common\scripts\common.ps1"

Write-Host "Test Broker output:"
Receive-Job -Id $env:TEST_BROKER_JOBID

# Check if the test broker job is still running
$job = Get-Job -Id $env:TEST_BROKER_JOBID
if ($job.State -ne "Running") {
Write-Host "Test broker terminated unexpectedly."
exit 1
}

# Stop the test broker job started in Test-Setup.ps1
Write-Host "Stopping test broker"
Stop-Job -Id $env:TEST_BROKER_JOBID
Remove-Job -Id $env:TEST_BROKER_JOBID
Write-Host "Test broker stopped."
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License.
# cspell: ignore JOBID depsfile


# Load common ES scripts
. "$PSScriptRoot\..\..\..\eng\common\scripts\common.ps1"

# Create the test binary *outside* the repo root to avoid polluting the repo.
$WorkingDirectory = ([System.IO.Path]::Combine($RepoRoot, "../TestArtifacts"))

# Create the working directory if it does not exist.
Write-Host "Using Working Directory $WorkingDirectory"

if (-not (Test-Path $WorkingDirectory)) {
Write-Host "Working directory does not exist, creating working directory: $WorkingDirectory"
New-Item -ItemType Directory -Path $WorkingDirectory
}

Write-Host "Setting current directory to working directory: $WorkingDirectory"
Push-Location -Path $WorkingDirectory

# Clone and build the Test Amqp Broker.
try {

$repositoryUrl = "https://github.com/Azure/azure-amqp.git"
# We would like to use the "hotfix" branch because that is current, but unfortunately it references System.Net.Security version 4.0.0
$repositoryBranch = "master"
$cloneCommand = "git clone $repositoryUrl --branch $repositoryBranch"

Write-Host "Cloning repository from $repositoryUrl..."
Invoke-LoggedCommand $cloneCommand

Set-Location -Path "./azure-amqp/test/TestAmqpBroker"

Invoke-LoggedCommand "dotnet build -p RollForward=LatestMajor --framework net6.0"
if (!$? -ne 0) {
Write-Error "Failed to build TestAmqpBroker."
exit 1
}

Write-Host "Test broker built successfully."

# now that the Test broker has been built, launch the broker on a local address.
$env:TEST_BROKER_ADDRESS = 'amqp://127.0.0.1:25672'

Write-Host "Starting test broker listening on ${env:TEST_BROKER_ADDRESS} ..."

Set-Location -Path $WorkingDirectory/azure-amqp/bin/Debug/TestAmqpBroker/net6.0

$job = dotnet exec ./TestAmqpBroker.dll ${env:TEST_BROKER_ADDRESS} /headless &

$env:TEST_BROKER_JOBID = $job.Id

Write-Host "Waiting for test broker to start..."
Start-Sleep -Seconds 3

Write-Host "Job Output after wait:"
Receive-Job $job.Id

$job = Get-Job -Id $env:TEST_BROKER_JOBID
if ($job.State -ne "Running") {
Write-Host "Test broker failed to start."
exit 1
}
}
finally {
Pop-Location
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
// Copyright (c) Microsoft Corporation. All Rights reserved
// Licensed under the MIT license.

// These tests assume that the AmqpTestBroker is running on localhost:25672.
// The AmqpTestBroker can be installed by the following steps:
// 1. Clone the repository:
// git clone https://github.com/Azure/azure-amqp
// 2. Build the project:
// dotnet build
// 3. Run the broker:
// dotnet run --project .\test\TestAmqpBroker\TestAmqpBroker.csproj --framework net462 amqp://localhost:25672
// 4. Run the tests (from the root of the azure-sdk-for-rust repository):
// cargo run --example connection --package azure_core_amqp

use azure_core::Url;
use azure_core_amqp::{
connection::{AmqpConnection, AmqpConnectionApis},
value::AmqpSymbol,
};

async fn amqp_connection_open() {
let connection = AmqpConnection::new();

let url = Url::parse("amqp://localhost:25672").unwrap();
connection
.open("test".to_string(), url, None)
.await
.unwrap();
}

async fn amqp_connection_open_with_error() {
let connection = AmqpConnection::new();
let url = Url::parse("amqp://localhost:32767").unwrap();
assert!(connection
.open("test".to_string(), url, None)
.await
.is_err());
}

async fn amqp_connection_close() {
let connection = AmqpConnection::new();
let url = Url::parse("amqp://localhost:25672").unwrap();
connection
.open("test".to_string(), url, None)
.await
.unwrap();
connection.close().await.unwrap();
}

async fn amqp_connection_close_with_error() {
let connection = AmqpConnection::new();
let url = Url::parse("amqp://localhost:25672").unwrap();
connection
.open("test".to_string(), url, None)
.await
.unwrap();
let res = connection
.close_with_error(
AmqpSymbol::from("amqp:internal-error"),
Some("Internal error.".to_string()),
None,
)
.await;
match res {
Ok(_) => {}
Err(err) => {
assert!(err.to_string().contains("Internal error."));
}
}
}

#[tokio::main]
pub async fn main() {
tracing_subscriber::fmt::init();

tokio::join!(
amqp_connection_open(),
amqp_connection_open_with_error(),
amqp_connection_close(),
amqp_connection_close_with_error()
);
}
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ mod tests {
use super::*;

#[test]
fn test_amqp_connection_options_with_max_frame_size() {
fn amqp_connection_options_with_max_frame_size() {
let connection_options = AmqpConnectionOptions {
max_frame_size: Some(1024),
..Default::default()
Expand Down Expand Up @@ -197,7 +197,7 @@ mod tests {
}

#[test]
fn test_amqp_connection_options() {
fn amqp_connection_options() {
let connection_options = AmqpConnectionOptions {
max_frame_size: Some(1024),
channel_max: Some(16),
Expand Down Expand Up @@ -244,4 +244,78 @@ mod tests {
)
);
}

#[tokio::test]
async fn amqp_connection_open() {
let address = std::env::var("TEST_BROKER_ADDRESS");
if address.is_ok() {
let connection = AmqpConnection::new();
let url = Url::parse(&address.unwrap()).unwrap();
connection
.open("test".to_string(), url, None)
.await
.unwrap();
} else {
println!("TEST_BROKER_ADDRESS is not set. Skipping test.");
}
}

#[tokio::test]
async fn amqp_connection_open_with_error() {
let address = std::env::var("TEST_BROKER_ADDRESS");
if address.is_ok() {
let connection = AmqpConnection::new();
let url = Url::parse("amqp://localhost:32767").unwrap();
assert!(connection
.open("test".to_string(), url, None)
.await
.is_err());
} else {
println!("TEST_BROKER_ADDRESS is not set. Skipping test.");
}
}

#[tokio::test]
async fn amqp_connection_close() {
let address = std::env::var("TEST_BROKER_ADDRESS");
if address.is_ok() {
let connection = AmqpConnection::new();
let url = Url::parse(&address.unwrap()).unwrap();
connection
.open("test".to_string(), url, None)
.await
.unwrap();
connection.close().await.unwrap();
} else {
println!("TEST_BROKER_ADDRESS is not set. Skipping test.");
}
}

#[tokio::test]
async fn amqp_connection_close_with_error() {
let address = std::env::var("TEST_BROKER_ADDRESS");
if address.is_ok() {
let connection = AmqpConnection::new();
let url = Url::parse(&address.unwrap()).unwrap();
connection
.open("test".to_string(), url, None)
.await
.unwrap();
let res = connection
.close_with_error(
AmqpSymbol::from("amqp:internal-error"),
Some("Internal error.".to_string()),
None,
)
.await;
match res {
Ok(_) => {}
Err(err) => {
assert!(err.to_string().contains("Internal error."));
}
}
} else {
println!("TEST_BROKER_ADDRESS is not set. Skipping test.");
}
}
}
Loading

0 comments on commit 91fd49e

Please sign in to comment.