Skip to content

Commit aee17f7

Browse files
authored
SQL Filter example [skip ci] (#134)
* SQL Filter example [skip ci] --------- Signed-off-by: Gabriele Santomaggio <G.santomaggio@gmail.com>
1 parent 9435502 commit aee17f7

File tree

5 files changed

+132
-2
lines changed

5 files changed

+132
-2
lines changed

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ The client is distributed via [NuGet](https://www.nuget.org/packages/RabbitMQ.AM
99

1010
## Examples
1111

12-
Inside the `docs/Examples` directory you can find examples of how to use the client.
12+
Inside the [docs/Examples](./docs/Examples) directory you can find examples of how to use the client.
1313

1414

1515
## Documentation

docs/Examples/README.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,4 +8,5 @@ This directory contains examples of how to use the RabbitMQ AMQP 1.0 .NET client
88
- Performance Test [here](./PerformanceTest/). You can tune some parameters in the `Program.cs` file.
99
- OpenTelemetry Integration [here](./OpenTelemetryIntegration/)
1010
- OAuth2 Example [here](./OAuth2)
11-
- Batch Disposition [here](./BatchDispositions)
11+
- Batch Disposition [here](./BatchDispositions)
12+
- Stream Filter [here](./StreamFilter)
Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,104 @@
1+
// See https://aka.ms/new-console-template for more information
2+
3+
using System.Diagnostics;
4+
using System.Text;
5+
using RabbitMQ.AMQP.Client;
6+
using RabbitMQ.AMQP.Client.Impl;
7+
using Trace = Amqp.Trace;
8+
using TraceLevel = Amqp.TraceLevel;
9+
10+
Trace.TraceLevel = TraceLevel.Information;
11+
12+
ConsoleTraceListener consoleListener = new();
13+
Trace.TraceListener = (l, f, a) =>
14+
consoleListener.WriteLine($"[{DateTime.Now}] [{l}] - {f}");
15+
16+
Trace.WriteLine(TraceLevel.Information, "Starting the example...");
17+
const string containerId = "sql-filter-example";
18+
19+
IEnvironment environment = AmqpEnvironment.Create(
20+
ConnectionSettingsBuilder.Create().ContainerId(containerId).Build());
21+
22+
IConnection connection = await environment.CreateConnectionAsync().ConfigureAwait(false);
23+
24+
Trace.WriteLine(TraceLevel.Information, $"Connected to the broker {connection} successfully");
25+
26+
// ------------------------------------------------------------------------------------
27+
// The management object is used to declare/delete queues, exchanges, and bindings
28+
IManagement management = connection.Management();
29+
const string queueName = "q_amqp10-client-stream-filter-test";
30+
IQueueSpecification queueSpec = management.Queue(queueName).Type(QueueType.STREAM);
31+
await queueSpec.DeclareAsync().ConfigureAwait(false);
32+
Trace.WriteLine(TraceLevel.Information,
33+
$"Queue {queueName} declared successfully");
34+
// ------------------------------------------------------------------------------------
35+
36+
// ------------------------------------------------------------------------------------
37+
38+
const int total = 10;
39+
IPublisher publisher = await connection.PublisherBuilder().Queue(queueName).BuildAsync().ConfigureAwait(false);
40+
for (int i = 0; i < total; i++)
41+
{
42+
string subject = i % 2 == 0 ? "Order" : "Invoice";
43+
string region = i % 2 == 0 ? "emea" : "us";
44+
IMessage message = new AmqpMessage(Encoding.UTF8.GetBytes($"Hello Filter {i}"))
45+
.Subject(subject)
46+
.Property("region", region);
47+
var pr = await publisher.PublishAsync(message).ConfigureAwait(false);
48+
49+
switch (pr.Outcome.State)
50+
{
51+
case OutcomeState.Accepted:
52+
Trace.WriteLine(TraceLevel.Information, $"[Publisher] Message: {message.BodyAsString()} confirmed");
53+
break;
54+
case OutcomeState.Released:
55+
Trace.WriteLine(TraceLevel.Information, $"[Publisher] Message: {message.BodyAsString()} Released");
56+
break;
57+
58+
case OutcomeState.Rejected:
59+
Trace.WriteLine(TraceLevel.Error,
60+
$"[Publisher] Message: {message.BodyAsString()} Rejected with error: {pr.Outcome.Error}");
61+
break;
62+
default:
63+
throw new ArgumentOutOfRangeException();
64+
}
65+
}
66+
67+
// ------------------------------------------------------------------------------------
68+
IConsumer consumer = await connection.ConsumerBuilder().Queue(queueName).MessageHandler((context, message) =>
69+
{
70+
Trace.WriteLine(TraceLevel.Information,
71+
$"[Consumer Received] Message Body: [{message.BodyAsString()}], " +
72+
$"Subject: [{message.Subject()}], Property: [{message.Property("region")}]");
73+
context.Accept();
74+
return Task.CompletedTask;
75+
}
76+
).Stream().Offset(StreamOffsetSpecification.First).Filter().Subject("&p:Order").Property("region", "emea").Stream()
77+
.Builder()
78+
.BuildAndStartAsync().ConfigureAwait(false);
79+
// close the above consumer to demonstrate SQL filter
80+
Thread.Sleep(1000); // wait a bit to show the difference in output
81+
82+
// ------------------------------------------------------------------------------------
83+
// the same like above but using SQL filter example
84+
85+
IConsumer sqlConsumer = await connection.ConsumerBuilder().Queue(queueName).MessageHandler((context, message) =>
86+
{
87+
Trace.WriteLine(TraceLevel.Information,
88+
$"[SQL Consumer Received] Message Body: [{message.BodyAsString()}], " +
89+
$"Subject: [{message.Subject()}], Property: [{message.Property("region")}]");
90+
context.Accept();
91+
return Task.CompletedTask;
92+
}
93+
).Stream().Offset(StreamOffsetSpecification.First).Filter()
94+
.Sql("properties.subject LIKE 'Order%' AND region = 'emea'").Stream().Builder()
95+
.BuildAndStartAsync().ConfigureAwait(false);
96+
97+
Console.WriteLine("Press [enter] to exit.");
98+
Console.ReadLine();
99+
await consumer.CloseAsync().ConfigureAwait(false);
100+
await sqlConsumer.CloseAsync().ConfigureAwait(false);
101+
await publisher.CloseAsync().ConfigureAwait(false);
102+
await queueSpec.DeleteAsync().ConfigureAwait(false);
103+
await connection.CloseAsync().ConfigureAwait(false);
104+
Trace.WriteLine(TraceLevel.Information, "Connection closed");
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
<Project Sdk="Microsoft.NET.Sdk">
2+
3+
<PropertyGroup>
4+
<OutputType>Exe</OutputType>
5+
<TargetFramework>net8.0</TargetFramework>
6+
<ImplicitUsings>enable</ImplicitUsings>
7+
<Nullable>enable</Nullable>
8+
</PropertyGroup>
9+
10+
<ItemGroup>
11+
<PackageReference Include="AMQPNetLite.Core" />
12+
</ItemGroup>
13+
14+
<ItemGroup>
15+
<ProjectReference Include="..\..\..\RabbitMQ.AMQP.Client\RabbitMQ.AMQP.Client.csproj" />
16+
</ItemGroup>
17+
18+
</Project>

rabbitmq-amqp-dotnet-client.sln

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,8 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "OAuth2", "docs\Examples\OAu
3737
EndProject
3838
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "BatchDispositions", "docs\Examples\BatchDispositions\BatchDispositions.csproj", "{1FF0D53E-B495-4810-8415-E27DED184C9E}"
3939
EndProject
40+
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "StreamFilter", "docs\Examples\StreamFilter\StreamFilter.csproj", "{76B4C910-B9AB-479E-BA77-067CCE0CD37C}"
41+
EndProject
4042
Global
4143
GlobalSection(SolutionConfigurationPlatforms) = preSolution
4244
Debug|Any CPU = Debug|Any CPU
@@ -79,6 +81,10 @@ Global
7981
{1FF0D53E-B495-4810-8415-E27DED184C9E}.Debug|Any CPU.Build.0 = Debug|Any CPU
8082
{1FF0D53E-B495-4810-8415-E27DED184C9E}.Release|Any CPU.ActiveCfg = Release|Any CPU
8183
{1FF0D53E-B495-4810-8415-E27DED184C9E}.Release|Any CPU.Build.0 = Release|Any CPU
84+
{76B4C910-B9AB-479E-BA77-067CCE0CD37C}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
85+
{76B4C910-B9AB-479E-BA77-067CCE0CD37C}.Debug|Any CPU.Build.0 = Debug|Any CPU
86+
{76B4C910-B9AB-479E-BA77-067CCE0CD37C}.Release|Any CPU.ActiveCfg = Release|Any CPU
87+
{76B4C910-B9AB-479E-BA77-067CCE0CD37C}.Release|Any CPU.Build.0 = Release|Any CPU
8288
EndGlobalSection
8389
GlobalSection(SolutionProperties) = preSolution
8490
HideSolutionNode = FALSE
@@ -91,5 +97,6 @@ Global
9197
{D74F49FC-2C9A-4227-8988-30925C509388} = {9154A0FB-7B2B-413C-A7F5-11ED2E37E93C}
9298
{C1EA4B66-E60E-4945-A4C6-91B433F9BA65} = {9154A0FB-7B2B-413C-A7F5-11ED2E37E93C}
9399
{1FF0D53E-B495-4810-8415-E27DED184C9E} = {9154A0FB-7B2B-413C-A7F5-11ED2E37E93C}
100+
{76B4C910-B9AB-479E-BA77-067CCE0CD37C} = {9154A0FB-7B2B-413C-A7F5-11ED2E37E93C}
94101
EndGlobalSection
95102
EndGlobal

0 commit comments

Comments
 (0)