Skip to content

Commit

Permalink
Fix call chain reentrancy (#5145)
Browse files Browse the repository at this point in the history
  • Loading branch information
andhesky authored and sergeybykov committed Dec 3, 2018
1 parent 2a6a69e commit 8d8cd70
Show file tree
Hide file tree
Showing 15 changed files with 489 additions and 216 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ private IEnumerable<string> FormatProperty(PropertyInfo property)
}
}

internal class DefaultOptionsFormatterResolver<T> : IOptionFormatterResolver<T>
internal class DefaultOptionsFormatterResolver<T> : IOptionFormatterResolver<T>
where T: class, new()
{
private IOptionsSnapshot<T> optionsSnapshot;
Expand Down
24 changes: 19 additions & 5 deletions src/Orleans.Core/Messaging/RequestInvocationHistory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,22 +4,36 @@ namespace Orleans.Runtime
{
// used for tracking request invocation history for deadlock detection.
[Serializable]
internal sealed class RequestInvocationHistory
internal sealed class RequestInvocationHistory : RequestInvocationHistorySummary
{
public GrainId GrainId { get; private set; }
public ActivationId ActivationId { get; private set; }
public string DebugContext { get; private set; }

public RequestInvocationHistory(GrainId grainId, ActivationId activationId, string debugContext)
public RequestInvocationHistory(GrainId grainId, ActivationId activationId, string debugContext) : base(activationId)
{
this.GrainId = grainId;
this.ActivationId = activationId;
DebugContext = debugContext;
}

public override string ToString()
{
return String.Format("RequestInvocationHistory {0}:{1}:{2}", GrainId, ActivationId, DebugContext);
return $"RequestInvocationHistory {GrainId}:{ActivationId}:{DebugContext}";
}
}
// used for tracking request invocation history for call chain reentrancy
[Serializable]
internal class RequestInvocationHistorySummary
{
public ActivationId ActivationId { get; private set; }

public RequestInvocationHistorySummary(ActivationId activationId)
{
this.ActivationId = activationId;
}

public override string ToString()
{
return $"RequestInvocationHistorySummary {ActivationId}";
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ private static void AddLegacyClusterConfigurationSupport(IServiceCollection serv
var nodeConfig = configuration.GetOrCreateNodeConfigurationForSilo(siloOptions.Value.SiloName);
options.ExcludedGrainTypes.AddRange(nodeConfig.ExcludedGrainTypes);
});

services.AddOptions<SchedulingOptions>()
.Configure<GlobalConfiguration>((options, config) =>
{
Expand Down Expand Up @@ -273,7 +273,7 @@ private static void AddLegacyClusterConfigurationSupport(IServiceCollection serv
{
options.IsRunningAsUnitTest = config.IsRunningAsUnitTest;
});

services.AddOptions<GrainVersioningOptions>()
.Configure<GlobalConfiguration>((options, config) =>
{
Expand Down
42 changes: 17 additions & 25 deletions src/Orleans.Runtime/Catalog/ActivationData.cs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ namespace Orleans.Runtime
/// </summary>
internal class ActivationData : IGrainActivationContext, IActivationData, IInvokable, IDisposable
{
// This class is used for activations that have extension invokers. It keeps a dictionary of
// This class is used for activations that have extension invokers. It keeps a dictionary of
// invoker objects to use with the activation, and extend the default invoker
// defined for the grain class.
// Note that in all cases we never have more than one copy of an actual invoker;
Expand All @@ -34,7 +34,7 @@ private class ExtensionInvoker : IGrainMethodInvoker, IGrainExtensionMap
// Because calls to ExtensionInvoker are allways made within the activation context,
// we rely on the single-threading guarantee of the runtime and do not protect the map with a lock.
private Dictionary<int, Tuple<IGrainExtension, IGrainExtensionMethodInvoker>> extensionMap; // key is the extension interface ID

/// <summary>
/// Try to add an extension for the specific interface ID.
/// Fail and return false if there is already an extension for that interface ID.
Expand Down Expand Up @@ -171,7 +171,7 @@ internal class GrainActivationContextFactory
private IGrainMethodInvoker lastInvoker;
private IServiceScope serviceScope;
private HashSet<IGrainTimer> timers;

public ActivationData(
ActivationAddress addr,
string genericArguments,
Expand Down Expand Up @@ -264,8 +264,6 @@ internal bool TryGetExtensionHandler(Type extensionType, out IGrainExtension res
return extensionInvoker != null && extensionInvoker.TryGetExtensionHandler(extensionType, out result);
}

public HashSet<ActivationId> RunningRequestsSenders { get; } = new HashSet<ActivationId>();

public ISchedulingContext SchedulingContext { get; }

public string GrainTypeName
Expand Down Expand Up @@ -313,14 +311,14 @@ private static void SetGrainActivationContextInScopedServices(IServiceProvider s
var contextFactory = sp.GetRequiredService<GrainActivationContextFactory>();
contextFactory.Context = context;
}

private Streams.StreamDirectory streamDirectory;
internal Streams.StreamDirectory GetStreamDirectory()
{
return streamDirectory ?? (streamDirectory = new Streams.StreamDirectory());
}

internal bool IsUsingStreams
internal bool IsUsingStreams
{
get { return streamDirectory != null; }
}
Expand All @@ -343,7 +341,7 @@ GrainReference IActivationData.GrainReference
{
get { return GrainReference; }
}

public GrainId Identity
{
get { return Grain; }
Expand Down Expand Up @@ -435,7 +433,7 @@ public void ResetCollectionTicket()
public void SetCollectionTicket(DateTime ticket)
{
if (ticket == default(DateTime)) throw new ArgumentException("default(DateTime) is disallowed", "ticket");
if (CollectionTicket != default(DateTime))
if (CollectionTicket != default(DateTime))
{
throw new InvalidOperationException("call ResetCollectionTicket before calling SetCollectionTicket.");
}
Expand All @@ -449,7 +447,7 @@ public void SetCollectionTicket(DateTime ticket)

// Currently, the only supported multi-activation grain is one using the StatelessWorkerPlacement strategy.
internal bool IsStatelessWorker => this.PlacedUsing is StatelessWorkerPlacement;

/// <summary>
/// Returns a value indicating whether or not this placement strategy requires activations to be registered in
/// the grain directory.
Expand All @@ -471,12 +469,6 @@ public void RecordRunning(Message message)
// Note: This method is always called while holding lock on this activation, so no need for additional locks here

numRunning++;
if (message.Direction != Message.Directions.OneWay
&& message.SendingActivation != null
&& !message.SendingGrain?.IsClient == true)
{
RunningRequestsSenders.Add(message.SendingActivation);
}

if (Running != null) return;

Expand All @@ -490,7 +482,7 @@ public void ResetRunning(Message message)
{
// Note: This method is always called while holding lock on this activation, so no need for additional locks here
numRunning--;
RunningRequestsSenders.Remove(message.SendingActivation);

if (numRunning == 0)
{
becameIdle = DateTime.UtcNow;
Expand Down Expand Up @@ -523,7 +515,7 @@ public void ResetRunning(Message message)

/// <summary>Increment the number of in-flight messages currently being processed.</summary>
public void IncrementInFlightCount() { Interlocked.Increment(ref inFlightCount); }

/// <summary>Decrement the number of in-flight messages currently being processed.</summary>
public void DecrementInFlightCount() { Interlocked.Decrement(ref inFlightCount); }

Expand All @@ -532,14 +524,14 @@ public void ResetRunning(Message message)

/// <summary>Decrement the number of messages currently in the process of being received.</summary>
public void DecrementEnqueuedOnDispatcherCount() { Interlocked.Decrement(ref enqueuedOnDispatcherCount); }

/// <summary>
/// grouped by sending activation: responses first, then sorted by id
/// </summary>
private List<Message> waiting;

public int WaitingCount
{
public int WaitingCount
{
get
{
return waiting == null ? 0 : waiting.Count;
Expand Down Expand Up @@ -602,7 +594,7 @@ public EnqueueMessageResult EnqueueMessage(Message message)
}

/// <summary>
/// Check whether this activation is overloaded.
/// Check whether this activation is overloaded.
/// Returns LimitExceededException if overloaded, otherwise <c>null</c>c>
/// </summary>
/// <param name="log">Logger to use for reporting any overflow condition</param>
Expand All @@ -625,7 +617,7 @@ public LimitExceededException CheckOverloaded(ILogger log)

if (maxRequestsHardLimit > 0 && count > maxRequestsHardLimit) // Hard limit
{
log.Warn(ErrorCode.Catalog_Reject_ActivationTooManyRequests,
log.Warn(ErrorCode.Catalog_Reject_ActivationTooManyRequests,
String.Format("Overload - {0} enqueued requests for activation {1}, exceeding hard limit rejection threshold of {2}",
count, this, maxRequestsHardLimit));

Expand Down Expand Up @@ -699,7 +691,7 @@ public TimeSpan GetIdleness(DateTime now)
{
if (now == default(DateTime))
throw new ArgumentException("default(DateTime) is not allowed; Use DateTime.UtcNow instead.", "now");

return now - becameIdle;
}

Expand Down Expand Up @@ -803,7 +795,7 @@ public void OnTimerDisposed(IGrainTimer orleansTimerInsideGrain)
internal Task WaitForAllTimersToFinish()
{
lock(this)
{
{
if (timers == null)
{
return Task.CompletedTask;
Expand Down
Loading

0 comments on commit 8d8cd70

Please sign in to comment.