Skip to content

Commit

Permalink
Improve logging of stream delivery errors. (#5230)
Browse files Browse the repository at this point in the history
  • Loading branch information
jason-bragg authored and ReubenBond committed Dec 11, 2018
1 parent 7fdfcd2 commit 8210161
Show file tree
Hide file tree
Showing 2 changed files with 70 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
using Orleans.Runtime;
using Orleans.Streams;
using Microsoft.Extensions.Logging;
using System.Threading;

namespace Orleans.Providers.Streams.SimpleMessageStream
{
Expand Down Expand Up @@ -206,15 +207,15 @@ internal Task DeliverItem(StreamId streamId, object item, bool fireAndForgetDeli
continue;
}

Task task = DeliverToRemote(remoteConsumer, streamId, subscriptionKvp.Key, item, optimizeForImmutableData);
Task task = DeliverToRemote(remoteConsumer, streamId, subscriptionKvp.Key, item, optimizeForImmutableData, fireAndForgetDelivery);
if (fireAndForgetDelivery) task.Ignore();
else tasks.Add(task);
}
// If there's no subscriber, presumably we just drop the item on the floor
return fireAndForgetDelivery ? Task.CompletedTask : Task.WhenAll(tasks);
}

private async Task DeliverToRemote(IStreamConsumerExtension remoteConsumer, StreamId streamId, GuidId subscriptionId, object item, bool optimizeForImmutableData)
private async Task DeliverToRemote(IStreamConsumerExtension remoteConsumer, StreamId streamId, GuidId subscriptionId, object item, bool optimizeForImmutableData, bool fireAndForgetDelivery)
{
try
{
Expand All @@ -233,37 +234,76 @@ private async Task DeliverToRemote(IStreamConsumerExtension remoteConsumer, Stre
"Consumer {0} on stream {1} is no longer active - permanently removing Consumer.", remoteConsumer, streamId);
}
}
catch(Exception ex)
{
if (!fireAndForgetDelivery)
{
throw;
}
this.logger.LogWarning(ex, "Failed to deliver message to consumer on {SubscriptionId} for stream {StreamId}.", subscriptionId, streamId);
}
}

internal Task CompleteStream(StreamId streamId, bool fireAndForgetDelivery)
{
var tasks = fireAndForgetDelivery ? null : new List<Task>();
foreach (GuidId subscriptionId in consumers.Keys)
foreach (KeyValuePair<GuidId, Tuple<IStreamConsumerExtension, IStreamFilterPredicateWrapper>> kvp in consumers)
{
var data = consumers[subscriptionId];
IStreamConsumerExtension remoteConsumer = data.Item1;
Task task = remoteConsumer.CompleteStream(subscriptionId);
IStreamConsumerExtension remoteConsumer = kvp.Value.Item1;
GuidId subscriptionId = kvp.Key;
Task task = NotifyComplete(remoteConsumer, subscriptionId, streamId, fireAndForgetDelivery);
if (fireAndForgetDelivery) task.Ignore();
else tasks.Add(task);
}
// If there's no subscriber, presumably we just drop the item on the floor
return fireAndForgetDelivery ? Task.CompletedTask : Task.WhenAll(tasks);
}

private async Task NotifyComplete(IStreamConsumerExtension remoteConsumer, GuidId subscriptionId, StreamId streamId, bool fireAndForgetDelivery)
{
try
{
await remoteConsumer.CompleteStream(subscriptionId);
} catch(Exception ex)
{
if (!fireAndForgetDelivery)
{
throw;
}
this.logger.LogWarning(ex, "Failed to notify consumer of stream completion on {SubscriptionId} for stream {StreamId}.", subscriptionId, streamId);
}
}

internal Task ErrorInStream(StreamId streamId, Exception exc, bool fireAndForgetDelivery)
{
var tasks = fireAndForgetDelivery ? null : new List<Task>();
foreach (GuidId subscriptionId in consumers.Keys)
foreach (KeyValuePair<GuidId, Tuple<IStreamConsumerExtension, IStreamFilterPredicateWrapper>> kvp in consumers)
{
var data = consumers[subscriptionId];
IStreamConsumerExtension remoteConsumer = data.Item1;
Task task = remoteConsumer.ErrorInStream(subscriptionId, exc);
IStreamConsumerExtension remoteConsumer = kvp.Value.Item1;
GuidId subscriptionId = kvp.Key;
Task task = NotifyError(remoteConsumer, subscriptionId, exc, streamId, fireAndForgetDelivery);
if (fireAndForgetDelivery) task.Ignore();
else tasks.Add(task);
}
// If there's no subscriber, presumably we just drop the item on the floor
return fireAndForgetDelivery ? Task.CompletedTask : Task.WhenAll(tasks);
}

private async Task NotifyError(IStreamConsumerExtension remoteConsumer, GuidId subscriptionId, Exception exc, StreamId streamId, bool fireAndForgetDelivery)
{
try
{
await remoteConsumer.ErrorInStream(subscriptionId, exc);
}
catch (Exception ex)
{
if (!fireAndForgetDelivery)
{
throw;
}
this.logger.LogWarning(ex, "Failed to notify consumer of stream error on {SubscriptionId} for stream {StreamId}. Error: {ErrorException}", subscriptionId, streamId, exc);
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -606,13 +606,28 @@ private async Task RunConsumerCursor(StreamConsumerData consumerData, IStreamFil

private async Task<StreamHandshakeToken> DeliverBatchToConsumer(StreamConsumerData consumerData, IBatchContainer batch)
{
StreamHandshakeToken prevToken = consumerData.LastToken;
Task<StreamHandshakeToken> batchDeliveryTask;
try
{
StreamHandshakeToken newToken = await ContextualizedDeliverBatchToConsumer(consumerData, batch);
consumerData.LastToken = StreamHandshakeToken.CreateDeliveyToken(batch.SequenceToken); // this is the currently delivered token
return newToken;
}
catch (Exception ex)
{
this.logger.LogWarning(ex, "Failed to deliver message to consumer on {SubscriptionId} for stream {StreamId}, may retry.", consumerData.SubscriptionId, consumerData.StreamId);
throw;
}
}

/// <summary>
/// Add call context for batch delivery call, then clear context immediately, without giving up turn.
/// </summary>
private Task<StreamHandshakeToken> ContextualizedDeliverBatchToConsumer(StreamConsumerData consumerData, IBatchContainer batch)
{
bool isRequestContextSet = batch.ImportRequestContext();
try
{
batchDeliveryTask = consumerData.StreamConsumer.DeliverBatch(consumerData.SubscriptionId, consumerData.StreamId, batch.AsImmutable(), prevToken);
return consumerData.StreamConsumer.DeliverBatch(consumerData.SubscriptionId, consumerData.StreamId, batch.AsImmutable(), consumerData.LastToken);
}
finally
{
Expand All @@ -622,11 +637,9 @@ private async Task<StreamHandshakeToken> DeliverBatchToConsumer(StreamConsumerDa
RequestContext.Clear();
}
}
StreamHandshakeToken newToken = await batchDeliveryTask;
consumerData.LastToken = StreamHandshakeToken.CreateDeliveyToken(batch.SequenceToken); // this is the currently delivered token
return newToken;
}


private static async Task DeliverErrorToConsumer(StreamConsumerData consumerData, Exception exc, IBatchContainer batch)
{
Task errorDeliveryTask;
Expand Down

0 comments on commit 8210161

Please sign in to comment.