Skip to content

Commit

Permalink
Move send channel sample to "Producing" project (#251)
Browse files Browse the repository at this point in the history
  • Loading branch information
kandersen82 authored Feb 7, 2025
1 parent 7dc6b0a commit be4cb70
Show file tree
Hide file tree
Showing 5 changed files with 63 additions and 93 deletions.
7 changes: 0 additions & 7 deletions DotPulsar.sln
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,6 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Compression", "benchmarks\C
EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Benchmarks", "Benchmarks", "{2C57AF4B-0D23-42D7-86FE-80277FD52875}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "SendChannel", "samples\SendChannel\SendChannel.csproj", "{366ACA6A-7DA2-4E51-AC27-7B570DFFE5D2}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Extensions", "samples\Extensions\Extensions.csproj", "{F211960B-F2C6-4878-B1AD-72D63CDB3B7A}"
EndProject
Global
Expand Down Expand Up @@ -62,10 +60,6 @@ Global
{040F8253-074D-4977-BDB1-0D9798B52CE2}.Debug|Any CPU.Build.0 = Debug|Any CPU
{040F8253-074D-4977-BDB1-0D9798B52CE2}.Release|Any CPU.ActiveCfg = Release|Any CPU
{040F8253-074D-4977-BDB1-0D9798B52CE2}.Release|Any CPU.Build.0 = Release|Any CPU
{366ACA6A-7DA2-4E51-AC27-7B570DFFE5D2}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{366ACA6A-7DA2-4E51-AC27-7B570DFFE5D2}.Debug|Any CPU.Build.0 = Debug|Any CPU
{366ACA6A-7DA2-4E51-AC27-7B570DFFE5D2}.Release|Any CPU.ActiveCfg = Release|Any CPU
{366ACA6A-7DA2-4E51-AC27-7B570DFFE5D2}.Release|Any CPU.Build.0 = Release|Any CPU
{F211960B-F2C6-4878-B1AD-72D63CDB3B7A}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{F211960B-F2C6-4878-B1AD-72D63CDB3B7A}.Debug|Any CPU.Build.0 = Debug|Any CPU
{F211960B-F2C6-4878-B1AD-72D63CDB3B7A}.Release|Any CPU.ActiveCfg = Release|Any CPU
Expand All @@ -80,7 +74,6 @@ Global
{14934BED-A222-47B2-A58A-CFC4AAB89B49} = {E7106D0F-B255-4631-9FB8-734FC5748FA9}
{6D44683B-865C-4D15-9F0A-1A8441354589} = {E7106D0F-B255-4631-9FB8-734FC5748FA9}
{040F8253-074D-4977-BDB1-0D9798B52CE2} = {2C57AF4B-0D23-42D7-86FE-80277FD52875}
{366ACA6A-7DA2-4E51-AC27-7B570DFFE5D2} = {E7106D0F-B255-4631-9FB8-734FC5748FA9}
{F211960B-F2C6-4878-B1AD-72D63CDB3B7A} = {E7106D0F-B255-4631-9FB8-734FC5748FA9}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
Expand Down
1 change: 1 addition & 0 deletions samples/Producing/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

await Host
.CreateApplicationBuilder(args)
//.AddHostedService<SendChannelWorker>()
.AddHostedService<SendWorker>()
.Build()
.RunAsync();
62 changes: 62 additions & 0 deletions samples/Producing/SendChannelWorker.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

namespace Producing;

using DotPulsar;
using DotPulsar.Extensions;
using Extensions;

public class SendChannelWorker : BackgroundService
{
private readonly ILogger _logger;

public SendChannelWorker(ILogger<SendChannelWorker> logger) => _logger = logger;

protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
await using var client = PulsarClient.Builder()
.ExceptionHandler(_logger.PulsarClientException) // Optional
.Build(); // Connecting to pulsar://localhost:6650

await using var producer = client.NewProducer(Schema.String)
.StateChangedHandler(_logger.ProducerChangedState) // Optional
.Topic("persistent://public/default/mytopic")
.Create();

var sendChannel = producer.SendChannel;

var delay = TimeSpan.FromSeconds(5);

_logger.LogInformation($"Will start sending messages every {delay.TotalSeconds} seconds");

while (!stoppingToken.IsCancellationRequested)
{
var data = DateTime.UtcNow.ToLongTimeString();
await sendChannel.Send(data, id =>
{
_logger.LogInformation($"Received acknowledgement message with content: '{data}' and got message id: '{id}'");
return ValueTask.CompletedTask;
}, stoppingToken);
_logger.LogInformation($"Sent message with content: '{data}'");
await Task.Delay(delay, stoppingToken);
}
sendChannel.Complete();

// Wait up to 5 seconds for in flight messages to be delivered before closing
var shutdownCts = new CancellationTokenSource();
shutdownCts.CancelAfter(TimeSpan.FromSeconds(5));
await sendChannel.Completion(shutdownCts.Token);
}
}
73 changes: 0 additions & 73 deletions samples/SendChannel/Program.cs

This file was deleted.

13 changes: 0 additions & 13 deletions samples/SendChannel/SendChannel.csproj

This file was deleted.

0 comments on commit be4cb70

Please sign in to comment.