Skip to content
This repository has been archived by the owner on May 1, 2024. It is now read-only.

[wip]: Native Receive #661

Draft
wants to merge 6 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 50 additions & 25 deletions Jasper.sln
Original file line number Diff line number Diff line change
Expand Up @@ -59,13 +59,13 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Jasper.ConfluentKafka.Tests
EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Http", "Http", "{7687741C-5880-464B-A51D-CAA0C0B1CE0D}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Jasper.Http", "src\Jasper.Http\Jasper.Http.csproj", "{AB120B77-376F-4F84-8FAC-297A066E9434}"
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Jasper.Http", "src\Jasper.Http\Jasper.Http.csproj", "{AB120B77-376F-4F84-8FAC-297A066E9434}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Jasper.Http.Testing", "src\Jasper.Http.Testing\Jasper.Http.Testing.csproj", "{39F644C3-832A-471D-8827-BDC6B270F73B}"
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Jasper.Http.Testing", "src\Jasper.Http.Testing\Jasper.Http.Testing.csproj", "{39F644C3-832A-471D-8827-BDC6B270F73B}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "EFPlusSqlServerConsole", "src\EFPlusSqlServerConsole\EFPlusSqlServerConsole.csproj", "{5AD9CD6C-05B9-4A83-8CFA-EDCFF09E5B3B}"
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "EFPlusSqlServerConsole", "src\EFPlusSqlServerConsole\EFPlusSqlServerConsole.csproj", "{5AD9CD6C-05B9-4A83-8CFA-EDCFF09E5B3B}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Jasper.Pulsar", "src\Jasper.Pulsar\Jasper.Pulsar.csproj", "{BB253930-8225-4737-9BB0-6F89A4073225}"
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Jasper.Pulsar", "src\Jasper.Pulsar\Jasper.Pulsar.csproj", "{BB253930-8225-4737-9BB0-6F89A4073225}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Jasper.Pulsar.Tests", "src\Jasper.Pulsar.Tests\Jasper.Pulsar.Tests.csproj", "{0A6C2CD0-23AF-45AB-A737-9D1D64693717}"
EndProject
Expand Down Expand Up @@ -345,6 +345,8 @@ Global
{6EC1EA66-63C7-4DF6-8DCB-40DFBEA4E07A}.Release|x64.Build.0 = Release|Any CPU
{6EC1EA66-63C7-4DF6-8DCB-40DFBEA4E07A}.Release|x86.ActiveCfg = Release|Any CPU
{6EC1EA66-63C7-4DF6-8DCB-40DFBEA4E07A}.Release|x86.Build.0 = Release|Any CPU
{ABBCB70C-9087-4A0C-A3DB-C9BB0DFAAC5A}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{ABBCB70C-9087-4A0C-A3DB-C9BB0DFAAC5A}.Debug|Any CPU.Build.0 = Debug|Any CPU
{ABBCB70C-9087-4A0C-A3DB-C9BB0DFAAC5A}.Debug|x64.ActiveCfg = Debug|Any CPU
{ABBCB70C-9087-4A0C-A3DB-C9BB0DFAAC5A}.Debug|x64.Build.0 = Debug|Any CPU
{ABBCB70C-9087-4A0C-A3DB-C9BB0DFAAC5A}.Debug|x86.ActiveCfg = Debug|Any CPU
Expand All @@ -355,8 +357,6 @@ Global
{ABBCB70C-9087-4A0C-A3DB-C9BB0DFAAC5A}.Release|x64.Build.0 = Release|Any CPU
{ABBCB70C-9087-4A0C-A3DB-C9BB0DFAAC5A}.Release|x86.ActiveCfg = Release|Any CPU
{ABBCB70C-9087-4A0C-A3DB-C9BB0DFAAC5A}.Release|x86.Build.0 = Release|Any CPU
{ABBCB70C-9087-4A0C-A3DB-C9BB0DFAAC5A}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{ABBCB70C-9087-4A0C-A3DB-C9BB0DFAAC5A}.Debug|Any CPU.Build.0 = Debug|Any CPU
{B8F1BCB3-4A8A-4368-85BA-E69EB879BC5A}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{B8F1BCB3-4A8A-4368-85BA-E69EB879BC5A}.Debug|Any CPU.Build.0 = Debug|Any CPU
{B8F1BCB3-4A8A-4368-85BA-E69EB879BC5A}.Debug|x64.ActiveCfg = Debug|Any CPU
Expand All @@ -369,6 +369,42 @@ Global
{B8F1BCB3-4A8A-4368-85BA-E69EB879BC5A}.Release|x64.Build.0 = Release|Any CPU
{B8F1BCB3-4A8A-4368-85BA-E69EB879BC5A}.Release|x86.ActiveCfg = Release|Any CPU
{B8F1BCB3-4A8A-4368-85BA-E69EB879BC5A}.Release|x86.Build.0 = Release|Any CPU
{AB120B77-376F-4F84-8FAC-297A066E9434}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{AB120B77-376F-4F84-8FAC-297A066E9434}.Debug|Any CPU.Build.0 = Debug|Any CPU
{AB120B77-376F-4F84-8FAC-297A066E9434}.Debug|x64.ActiveCfg = Debug|Any CPU
{AB120B77-376F-4F84-8FAC-297A066E9434}.Debug|x64.Build.0 = Debug|Any CPU
{AB120B77-376F-4F84-8FAC-297A066E9434}.Debug|x86.ActiveCfg = Debug|Any CPU
{AB120B77-376F-4F84-8FAC-297A066E9434}.Debug|x86.Build.0 = Debug|Any CPU
{AB120B77-376F-4F84-8FAC-297A066E9434}.Release|Any CPU.ActiveCfg = Release|Any CPU
{AB120B77-376F-4F84-8FAC-297A066E9434}.Release|Any CPU.Build.0 = Release|Any CPU
{AB120B77-376F-4F84-8FAC-297A066E9434}.Release|x64.ActiveCfg = Release|Any CPU
{AB120B77-376F-4F84-8FAC-297A066E9434}.Release|x64.Build.0 = Release|Any CPU
{AB120B77-376F-4F84-8FAC-297A066E9434}.Release|x86.ActiveCfg = Release|Any CPU
{AB120B77-376F-4F84-8FAC-297A066E9434}.Release|x86.Build.0 = Release|Any CPU
{39F644C3-832A-471D-8827-BDC6B270F73B}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{39F644C3-832A-471D-8827-BDC6B270F73B}.Debug|Any CPU.Build.0 = Debug|Any CPU
{39F644C3-832A-471D-8827-BDC6B270F73B}.Debug|x64.ActiveCfg = Debug|Any CPU
{39F644C3-832A-471D-8827-BDC6B270F73B}.Debug|x64.Build.0 = Debug|Any CPU
{39F644C3-832A-471D-8827-BDC6B270F73B}.Debug|x86.ActiveCfg = Debug|Any CPU
{39F644C3-832A-471D-8827-BDC6B270F73B}.Debug|x86.Build.0 = Debug|Any CPU
{39F644C3-832A-471D-8827-BDC6B270F73B}.Release|Any CPU.ActiveCfg = Release|Any CPU
{39F644C3-832A-471D-8827-BDC6B270F73B}.Release|Any CPU.Build.0 = Release|Any CPU
{39F644C3-832A-471D-8827-BDC6B270F73B}.Release|x64.ActiveCfg = Release|Any CPU
{39F644C3-832A-471D-8827-BDC6B270F73B}.Release|x64.Build.0 = Release|Any CPU
{39F644C3-832A-471D-8827-BDC6B270F73B}.Release|x86.ActiveCfg = Release|Any CPU
{39F644C3-832A-471D-8827-BDC6B270F73B}.Release|x86.Build.0 = Release|Any CPU
{5AD9CD6C-05B9-4A83-8CFA-EDCFF09E5B3B}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{5AD9CD6C-05B9-4A83-8CFA-EDCFF09E5B3B}.Debug|Any CPU.Build.0 = Debug|Any CPU
{5AD9CD6C-05B9-4A83-8CFA-EDCFF09E5B3B}.Debug|x64.ActiveCfg = Debug|Any CPU
{5AD9CD6C-05B9-4A83-8CFA-EDCFF09E5B3B}.Debug|x64.Build.0 = Debug|Any CPU
{5AD9CD6C-05B9-4A83-8CFA-EDCFF09E5B3B}.Debug|x86.ActiveCfg = Debug|Any CPU
{5AD9CD6C-05B9-4A83-8CFA-EDCFF09E5B3B}.Debug|x86.Build.0 = Debug|Any CPU
{5AD9CD6C-05B9-4A83-8CFA-EDCFF09E5B3B}.Release|Any CPU.ActiveCfg = Release|Any CPU
{5AD9CD6C-05B9-4A83-8CFA-EDCFF09E5B3B}.Release|Any CPU.Build.0 = Release|Any CPU
{5AD9CD6C-05B9-4A83-8CFA-EDCFF09E5B3B}.Release|x64.ActiveCfg = Release|Any CPU
{5AD9CD6C-05B9-4A83-8CFA-EDCFF09E5B3B}.Release|x64.Build.0 = Release|Any CPU
{5AD9CD6C-05B9-4A83-8CFA-EDCFF09E5B3B}.Release|x86.ActiveCfg = Release|Any CPU
{5AD9CD6C-05B9-4A83-8CFA-EDCFF09E5B3B}.Release|x86.Build.0 = Release|Any CPU
{BB253930-8225-4737-9BB0-6F89A4073225}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{BB253930-8225-4737-9BB0-6F89A4073225}.Debug|Any CPU.Build.0 = Debug|Any CPU
{BB253930-8225-4737-9BB0-6F89A4073225}.Debug|x64.ActiveCfg = Debug|Any CPU
Expand All @@ -393,20 +429,9 @@ Global
{0A6C2CD0-23AF-45AB-A737-9D1D64693717}.Release|x64.Build.0 = Release|Any CPU
{0A6C2CD0-23AF-45AB-A737-9D1D64693717}.Release|x86.ActiveCfg = Release|Any CPU
{0A6C2CD0-23AF-45AB-A737-9D1D64693717}.Release|x86.Build.0 = Release|Any CPU
{5AD9CD6C-05B9-4A83-8CFA-EDCFF09E5B3B}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{5AD9CD6C-05B9-4A83-8CFA-EDCFF09E5B3B}.Debug|Any CPU.Build.0 = Debug|Any CPU
{5AD9CD6C-05B9-4A83-8CFA-EDCFF09E5B3B}.Release|Any CPU.ActiveCfg = Release|Any CPU
{5AD9CD6C-05B9-4A83-8CFA-EDCFF09E5B3B}.Release|Any CPU.Build.0 = Release|Any CPU
{AB120B77-376F-4F84-8FAC-297A066E9434}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{AB120B77-376F-4F84-8FAC-297A066E9434}.Debug|Any CPU.Build.0 = Debug|Any CPU
{AB120B77-376F-4F84-8FAC-297A066E9434}.Release|Any CPU.ActiveCfg = Release|Any CPU
{AB120B77-376F-4F84-8FAC-297A066E9434}.Release|Any CPU.Build.0 = Release|Any CPU
{39F644C3-832A-471D-8827-BDC6B270F73B}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{39F644C3-832A-471D-8827-BDC6B270F73B}.Debug|Any CPU.Build.0 = Debug|Any CPU
{39F644C3-832A-471D-8827-BDC6B270F73B}.Release|Any CPU.ActiveCfg = Release|Any CPU
{39F644C3-832A-471D-8827-BDC6B270F73B}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
EndGlobalSection
GlobalSection(NestedProjects) = preSolution
{4F18A2E4-5056-48C8-89BA-4837F6F983E4} = {A806095A-9D66-4D55-8662-1FC67E90F6FB}
Expand All @@ -423,19 +448,19 @@ Global
{899902B6-63DB-4FED-ABC7-9AE35CCE1DB6} = {166943FC-7D19-4C4A-9E74-02A2CB49CD6B}
{1C7783B1-CC8E-4225-9B9D-30C05A99B912} = {2257A448-52A2-466A-ABC5-BD63018F004A}
{D830F62B-1031-47D5-AF3B-CC48A178FE43} = {166943FC-7D19-4C4A-9E74-02A2CB49CD6B}
{AB120B77-376F-4F84-8FAC-297A066E9434} = {7687741C-5880-464B-A51D-CAA0C0B1CE0D}
{39F644C3-832A-471D-8827-BDC6B270F73B} = {7687741C-5880-464B-A51D-CAA0C0B1CE0D}
{5AD9CD6C-05B9-4A83-8CFA-EDCFF09E5B3B} = {A806095A-9D66-4D55-8662-1FC67E90F6FB}
{1B86F467-4DC6-4D30-9201-FD1BD44C3271} = {C275B6A4-5CE8-4E1D-84C1-ADD4F959A6E4}
{57273C2A-3F16-49B7-AB6C-80C6F44A60FE} = {C275B6A4-5CE8-4E1D-84C1-ADD4F959A6E4}
{40670392-73E5-499C-8324-EE40BA6B5A10} = {2257A448-52A2-466A-ABC5-BD63018F004A}
{D09FBD2B-87AD-47CC-9191-5B4E06A48FBC} = {2257A448-52A2-466A-ABC5-BD63018F004A}
{CA4812BF-8580-4891-95FE-518930FCF859} = {C275B6A4-5CE8-4E1D-84C1-ADD4F959A6E4}
{6EC1EA66-63C7-4DF6-8DCB-40DFBEA4E07A} = {C275B6A4-5CE8-4E1D-84C1-ADD4F959A6E4}
{ABBCB70C-9087-4A0C-A3DB-C9BB0DFAAC5A} = {C275B6A4-5CE8-4E1D-84C1-ADD4F959A6E4}
{B8F1BCB3-4A8A-4368-85BA-E69EB879BC5A} = {C275B6A4-5CE8-4E1D-84C1-ADD4F959A6E4}
{AB120B77-376F-4F84-8FAC-297A066E9434} = {7687741C-5880-464B-A51D-CAA0C0B1CE0D}
{39F644C3-832A-471D-8827-BDC6B270F73B} = {7687741C-5880-464B-A51D-CAA0C0B1CE0D}
{5AD9CD6C-05B9-4A83-8CFA-EDCFF09E5B3B} = {A806095A-9D66-4D55-8662-1FC67E90F6FB}
{BB253930-8225-4737-9BB0-6F89A4073225} = {C275B6A4-5CE8-4E1D-84C1-ADD4F959A6E4}
{0A6C2CD0-23AF-45AB-A737-9D1D64693717} = {C275B6A4-5CE8-4E1D-84C1-ADD4F959A6E4}
{1B86F467-4DC6-4D30-9201-FD1BD44C3271} = {C275B6A4-5CE8-4E1D-84C1-ADD4F959A6E4}
{57273C2A-3F16-49B7-AB6C-80C6F44A60FE} = {C275B6A4-5CE8-4E1D-84C1-ADD4F959A6E4}
{40670392-73E5-499C-8324-EE40BA6B5A10} = {2257A448-52A2-466A-ABC5-BD63018F004A}
{D09FBD2B-87AD-47CC-9191-5B4E06A48FBC} = {2257A448-52A2-466A-ABC5-BD63018F004A}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {D92D723F-44EC-4C1E-AAC3-C162FCEAAA08}
Expand Down
10 changes: 10 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,13 @@ services:
- "ACCEPT_EULA=Y"
- "SA_PASSWORD=P@55w0rd"
- "MSSQL_PID=Developer"
pulsar:
image: apachepulsar/pulsar
ports:
- "6650:6650"
environment:
- PULSAR_MEM=" -Xms512m -Xmx512m -XX:MaxDirectMemorySize=1g"
command: >
/bin/bash -c
"bin/apply-config-from-env.py conf/standalone.conf
&& bin/pulsar standalone"
64 changes: 38 additions & 26 deletions src/Jasper.AzureServiceBus/Internal/AzureServiceBusListener.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
using Baseline;
using Jasper.Logging;
using Jasper.Transports;
Expand All @@ -18,8 +19,6 @@ public class AzureServiceBusListener : IListener
private readonly ITransportLogger _logger;
private readonly ITransportProtocol<Message> _protocol;
private readonly AzureServiceBusTransport _transport;
private IReceiverCallback _callback;


public AzureServiceBusListener(AzureServiceBusEndpoint endpoint, AzureServiceBusTransport transport,
ITransportLogger logger, CancellationToken cancellation)
Expand All @@ -28,8 +27,6 @@ public AzureServiceBusListener(AzureServiceBusEndpoint endpoint, AzureServiceBus
_transport = transport;
_logger = logger;
_cancellation = cancellation;


_protocol = endpoint.Protocol;
Address = endpoint.Uri;
}
Expand All @@ -43,10 +40,38 @@ public void Dispose()
public Uri Address { get; }
public ListeningStatus Status { get; set; }

public void Start(IReceiverCallback callback)
private readonly BufferBlock<(Envelope Envelope, object AckObject)> _buffer = new BufferBlock<(Envelope Envelope, object AckObject)>(new DataflowBlockOptions
{// DO NOT CHANGE THESE SETTINGS THEY ARE IMPORTANT TO LINK RECEIVE DELEGATE WITH CONSUME()
BoundedCapacity = 1,
MaxMessagesPerTask = 1,
EnsureOrdered = true
});

public async IAsyncEnumerable<(Envelope Envelope, object AckObject)> Consume()
{
Start();

while(!_cancellation.IsCancellationRequested)
{
var received = await _buffer.ReceiveAsync(_cancellation);
yield return received;
}
}

public Task Ack((Envelope Envelope, object AckObject) messageInfo)
{
var ackObj = ((Task Ack, Task Nack))messageInfo.AckObject;
return ackObj.Ack;
}

public Task Nack((Envelope Envelope, object AckObject) messageInfo)
{
_callback = callback;
var ackObj = ((Task Ack, Task Nack))messageInfo.AckObject;
return ackObj.Nack;
}

public void Start()
{
var options = new SessionHandlerOptions(handleException);

var connectionString = _transport.ConnectionString;
Expand All @@ -60,26 +85,22 @@ public void Start(IReceiverCallback callback)

if (topicName.IsEmpty())
{
var client = tokenProvider != null
QueueClient queueClient = tokenProvider != null
? new QueueClient(connectionString, queueName, tokenProvider, transportType, receiveMode,
retryPolicy)
: new QueueClient(connectionString, queueName, receiveMode, retryPolicy);

client.RegisterSessionHandler(handleMessage, options);

_clientEntities.Add(client);
queueClient.RegisterSessionHandler(handleMessage, options);
}
else
{
var client = tokenProvider != null
SubscriptionClient subscriptionClient = tokenProvider != null
? new SubscriptionClient(connectionString, topicName, subscriptionName, tokenProvider,
transportType, receiveMode, retryPolicy)
: new SubscriptionClient(connectionString, topicName, subscriptionName,
receiveMode, retryPolicy);

client.RegisterSessionHandler(handleMessage, options);

_clientEntities.Add(client);
subscriptionClient.RegisterSessionHandler(handleMessage, options);
}
}

Expand All @@ -90,9 +111,9 @@ private Task handleException(ExceptionReceivedEventArgs arg)
return Task.CompletedTask;
}

private async Task handleMessage(IMessageSession session, Message message, CancellationToken token)
private async Task handleMessage(IMessageSession session, Message message, CancellationToken cancellationToken)
{
var lockToken = message.SystemProperties.LockToken;
string lockToken = message.SystemProperties.LockToken;

Envelope envelope;

Expand All @@ -110,16 +131,7 @@ private async Task handleMessage(IMessageSession session, Message message, Cance
return;
}

try
{
await _callback.Received(Address, new[] {envelope});
await session.CompleteAsync(lockToken);
}
catch (Exception e)
{
_logger.LogException(e, envelope.Id, "Error trying to receive a message from " + Address);
await session.AbandonAsync(lockToken);
}
await _buffer.SendAsync((envelope, (session.CompleteAsync(lockToken), session.AbandonAsync(lockToken))), cancellationToken);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ public virtual Message WriteFromEnvelope(Envelope envelope)
ContentType = envelope.ContentType,
ReplyTo = envelope.ReplyUri?.ToString(),
ReplyToSessionId = envelope.CausationId.ToString(),

};

if (envelope.ExecutionTime.HasValue)
Expand Down Expand Up @@ -50,7 +49,6 @@ public virtual Envelope ReadEnvelope(Message message)
CausationId = message.ReplyToSessionId.IsNotEmpty() ? Guid.Parse(message.ReplyToSessionId) : Guid.Empty
};


envelope.ReadPropertiesFromDictionary(message.UserProperties);

return envelope;
Expand Down
35 changes: 14 additions & 21 deletions src/Jasper.ConfluentKafka/Internal/ConfluentKafkaListener.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
using System;
using System.Diagnostics;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using Confluent.Kafka;
Expand All @@ -15,7 +15,6 @@ public class ConfluentKafkaListener : IListener
private readonly IConsumer<byte[], byte[]> _consumer;
private readonly KafkaEndpoint _endpoint;
private readonly ITransportLogger _logger;
private IReceiverCallback _callback;
private readonly ITransportProtocol<Message<byte[], byte[]>> _protocol;
private Task _consumerTask;

Expand All @@ -34,22 +33,25 @@ public void Dispose()
_consumerTask?.Dispose();
}

public Uri Address => _endpoint.Uri;
public ListeningStatus Status { get; set; }

public void Start(IReceiverCallback callback)
public Task Ack((Envelope Envelope, object AckObject) messageInfo)
{
_callback = callback;

_consumer.Subscribe(new []{ _endpoint.TopicName });
var achObj = messageInfo.AckObject as ConsumeResult<byte[], byte[]>;

_consumerTask = ConsumeAsync();
_consumer.Commit(achObj);

_logger.ListeningStatusChange(ListeningStatus.Accepting);
return Task.CompletedTask;
}

private async Task ConsumeAsync()
public Task Nack((Envelope Envelope, object AckObject) messageInfo) => Task.CompletedTask;

public Uri Address => _endpoint.Uri;
public ListeningStatus Status { get; set; }

public async IAsyncEnumerable<(Envelope Envelope, object AckObject)> Consume()
{
_consumer.Subscribe(new[] { _endpoint.TopicName });

while (!_cancellation.IsCancellationRequested)
{
ConsumeResult<byte[], byte[]> message;
Expand Down Expand Up @@ -84,16 +86,7 @@ private async Task ConsumeAsync()
continue;
}

try
{
await _callback.Received(Address, new[] {envelope});

_consumer.Commit(message);
}
catch (Exception e)
{
_logger.LogException(e, envelope.Id, "Error trying to receive a message from " + Address);
}
yield return (envelope, message);
}
}
}
Expand Down
Loading