Skip to content

Commit

Permalink
Revert: Fix call chain reentrancy (#5145, #5225) (#5249)
Browse files Browse the repository at this point in the history
  • Loading branch information
sergeybykov authored and ReubenBond committed Dec 12, 2018
1 parent a322ba3 commit 9677f59
Show file tree
Hide file tree
Showing 15 changed files with 212 additions and 465 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ private IEnumerable<string> FormatProperty(PropertyInfo property)
}
}

internal class DefaultOptionsFormatterResolver<T> : IOptionFormatterResolver<T>
internal class DefaultOptionsFormatterResolver<T> : IOptionFormatterResolver<T>
where T: class, new()
{
private IOptionsSnapshot<T> optionsSnapshot;
Expand Down
6 changes: 3 additions & 3 deletions src/Orleans.Core/Messaging/RequestInvocationHistory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

namespace Orleans.Runtime
{
// used for tracking request invocation history for deadlock detection and call chain reentrancy
// used for tracking request invocation history for deadlock detection.
[Serializable]
internal sealed class RequestInvocationHistory
{
Expand All @@ -14,12 +14,12 @@ public RequestInvocationHistory(GrainId grainId, ActivationId activationId, stri
{
this.GrainId = grainId;
this.ActivationId = activationId;
this.DebugContext = debugContext;
DebugContext = debugContext;
}

public override string ToString()
{
return $"RequestInvocationHistory {GrainId}:{ActivationId}:{DebugContext}";
return String.Format("RequestInvocationHistory {0}:{1}:{2}", GrainId, ActivationId, DebugContext);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ private static void AddLegacyClusterConfigurationSupport(IServiceCollection serv
var nodeConfig = configuration.GetOrCreateNodeConfigurationForSilo(siloOptions.Value.SiloName);
options.ExcludedGrainTypes.AddRange(nodeConfig.ExcludedGrainTypes);
});

services.AddOptions<SchedulingOptions>()
.Configure<GlobalConfiguration>((options, config) =>
{
Expand Down Expand Up @@ -280,7 +280,7 @@ private static void AddLegacyClusterConfigurationSupport(IServiceCollection serv
{
options.IsRunningAsUnitTest = config.IsRunningAsUnitTest;
});

services.AddOptions<GrainVersioningOptions>()
.Configure<GlobalConfiguration>((options, config) =>
{
Expand Down
42 changes: 25 additions & 17 deletions src/Orleans.Runtime/Catalog/ActivationData.cs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ namespace Orleans.Runtime
/// </summary>
internal class ActivationData : IGrainActivationContext, IActivationData, IInvokable, IDisposable
{
// This class is used for activations that have extension invokers. It keeps a dictionary of
// This class is used for activations that have extension invokers. It keeps a dictionary of
// invoker objects to use with the activation, and extend the default invoker
// defined for the grain class.
// Note that in all cases we never have more than one copy of an actual invoker;
Expand All @@ -34,7 +34,7 @@ private class ExtensionInvoker : IGrainMethodInvoker, IGrainExtensionMap
// Because calls to ExtensionInvoker are allways made within the activation context,
// we rely on the single-threading guarantee of the runtime and do not protect the map with a lock.
private Dictionary<int, Tuple<IGrainExtension, IGrainExtensionMethodInvoker>> extensionMap; // key is the extension interface ID

/// <summary>
/// Try to add an extension for the specific interface ID.
/// Fail and return false if there is already an extension for that interface ID.
Expand Down Expand Up @@ -171,7 +171,7 @@ internal class GrainActivationContextFactory
private IGrainMethodInvoker lastInvoker;
private IServiceScope serviceScope;
private HashSet<IGrainTimer> timers;

public ActivationData(
ActivationAddress addr,
string genericArguments,
Expand Down Expand Up @@ -264,6 +264,8 @@ internal bool TryGetExtensionHandler(Type extensionType, out IGrainExtension res
return extensionInvoker != null && extensionInvoker.TryGetExtensionHandler(extensionType, out result);
}

public HashSet<ActivationId> RunningRequestsSenders { get; } = new HashSet<ActivationId>();

public ISchedulingContext SchedulingContext { get; }

public string GrainTypeName
Expand Down Expand Up @@ -311,14 +313,14 @@ private static void SetGrainActivationContextInScopedServices(IServiceProvider s
var contextFactory = sp.GetRequiredService<GrainActivationContextFactory>();
contextFactory.Context = context;
}

private Streams.StreamDirectory streamDirectory;
internal Streams.StreamDirectory GetStreamDirectory()
{
return streamDirectory ?? (streamDirectory = new Streams.StreamDirectory());
}

internal bool IsUsingStreams
internal bool IsUsingStreams
{
get { return streamDirectory != null; }
}
Expand All @@ -341,7 +343,7 @@ GrainReference IActivationData.GrainReference
{
get { return GrainReference; }
}

public GrainId Identity
{
get { return Grain; }
Expand Down Expand Up @@ -433,7 +435,7 @@ public void ResetCollectionTicket()
public void SetCollectionTicket(DateTime ticket)
{
if (ticket == default(DateTime)) throw new ArgumentException("default(DateTime) is disallowed", "ticket");
if (CollectionTicket != default(DateTime))
if (CollectionTicket != default(DateTime))
{
throw new InvalidOperationException("call ResetCollectionTicket before calling SetCollectionTicket.");
}
Expand All @@ -447,7 +449,7 @@ public void SetCollectionTicket(DateTime ticket)

// Currently, the only supported multi-activation grain is one using the StatelessWorkerPlacement strategy.
internal bool IsStatelessWorker => this.PlacedUsing is StatelessWorkerPlacement;

/// <summary>
/// Returns a value indicating whether or not this placement strategy requires activations to be registered in
/// the grain directory.
Expand All @@ -469,6 +471,12 @@ public void RecordRunning(Message message)
// Note: This method is always called while holding lock on this activation, so no need for additional locks here

numRunning++;
if (message.Direction != Message.Directions.OneWay
&& message.SendingActivation != null
&& !message.SendingGrain?.IsClient == true)
{
RunningRequestsSenders.Add(message.SendingActivation);
}

if (Running != null) return;

Expand All @@ -482,7 +490,7 @@ public void ResetRunning(Message message)
{
// Note: This method is always called while holding lock on this activation, so no need for additional locks here
numRunning--;

RunningRequestsSenders.Remove(message.SendingActivation);
if (numRunning == 0)
{
becameIdle = DateTime.UtcNow;
Expand Down Expand Up @@ -515,7 +523,7 @@ public void ResetRunning(Message message)

/// <summary>Increment the number of in-flight messages currently being processed.</summary>
public void IncrementInFlightCount() { Interlocked.Increment(ref inFlightCount); }

/// <summary>Decrement the number of in-flight messages currently being processed.</summary>
public void DecrementInFlightCount() { Interlocked.Decrement(ref inFlightCount); }

Expand All @@ -524,14 +532,14 @@ public void ResetRunning(Message message)

/// <summary>Decrement the number of messages currently in the process of being received.</summary>
public void DecrementEnqueuedOnDispatcherCount() { Interlocked.Decrement(ref enqueuedOnDispatcherCount); }

/// <summary>
/// grouped by sending activation: responses first, then sorted by id
/// </summary>
private List<Message> waiting;

public int WaitingCount
{
public int WaitingCount
{
get
{
return waiting == null ? 0 : waiting.Count;
Expand Down Expand Up @@ -594,7 +602,7 @@ public EnqueueMessageResult EnqueueMessage(Message message)
}

/// <summary>
/// Check whether this activation is overloaded.
/// Check whether this activation is overloaded.
/// Returns LimitExceededException if overloaded, otherwise <c>null</c>c>
/// </summary>
/// <param name="log">Logger to use for reporting any overflow condition</param>
Expand All @@ -617,7 +625,7 @@ public LimitExceededException CheckOverloaded(ILogger log)

if (maxRequestsHardLimit > 0 && count > maxRequestsHardLimit) // Hard limit
{
log.Warn(ErrorCode.Catalog_Reject_ActivationTooManyRequests,
log.Warn(ErrorCode.Catalog_Reject_ActivationTooManyRequests,
String.Format("Overload - {0} enqueued requests for activation {1}, exceeding hard limit rejection threshold of {2}",
count, this, maxRequestsHardLimit));

Expand Down Expand Up @@ -691,7 +699,7 @@ public TimeSpan GetIdleness(DateTime now)
{
if (now == default(DateTime))
throw new ArgumentException("default(DateTime) is not allowed; Use DateTime.UtcNow instead.", "now");

return now - becameIdle;
}

Expand Down Expand Up @@ -795,7 +803,7 @@ public void OnTimerDisposed(IGrainTimer orleansTimerInsideGrain)
internal Task WaitForAllTimersToFinish()
{
lock(this)
{
{
if (timers == null)
{
return Task.CompletedTask;
Expand Down
52 changes: 26 additions & 26 deletions src/Orleans.Runtime/Catalog/Catalog.cs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ internal class NonExistentActivationException : Exception

public NonExistentActivationException() : base("NonExistentActivationException") { }
public NonExistentActivationException(string msg) : base(msg) { }
public NonExistentActivationException(string message, Exception innerException)
public NonExistentActivationException(string message, Exception innerException)
: base(message, innerException) { }

public NonExistentActivationException(string msg, ActivationAddress nonExistentActivation, bool isStatelessWorker)
Expand Down Expand Up @@ -274,7 +274,7 @@ public List<Tuple<GrainId, string, int>> GetGrainStatistics()

// TODO: generic type expansion
var grainTypeName = TypeUtils.GetFullName(data.GrainInstanceType);

Dictionary<GrainId, int> grains;
int n;
if (!counts.TryGetValue(grainTypeName, out grains))
Expand Down Expand Up @@ -347,8 +347,8 @@ public DetailedGrainReport GetDetailedGrainReport(GrainId grain)
}

List<ActivationData> acts = activations.FindTargets(grain);
report.LocalActivations = acts != null ?
acts.Select(activationData => activationData.ToDetailedString()).ToList() :
report.LocalActivations = acts != null ?
acts.Select(activationData => activationData.ToDetailedString()).ToList() :
new List<string>();
return report;
}
Expand Down Expand Up @@ -397,18 +397,18 @@ internal int UnregisterGrainForTesting(GrainId grain)
int numActsBefore = acts.Count;
foreach (var act in acts)
UnregisterMessageTarget(act);

return numActsBefore;
}

internal bool CanInterleave(ActivationId running, Message message)
{
return this.TryGetActivationData(running, out ActivationData target)
&& target.GrainInstance != null
&& this.GrainTypeManager.TryGetData(TypeUtils.GetFullName(target.GrainInstanceType), out GrainTypeData data)
&& (data.IsReentrant
|| (message.GetDeserializedBody(this.serializationManager) is InvokeMethodRequest invokeMethodRequest
&& data.MayInterleave((invokeMethodRequest))));
ActivationData target;
GrainTypeData data;
return TryGetActivationData(running, out target) &&
target.GrainInstance != null &&
GrainTypeManager.TryGetData(TypeUtils.GetFullName(target.GrainInstanceType), out data) &&
(data.IsReentrant || data.MayInterleave((InvokeMethodRequest)message.GetDeserializedBody(this.serializationManager)));
}

public void GetGrainTypeInfo(int typeCode, out string grainClass, out PlacementStrategy placement, out MultiClusterRegistrationStrategy activationStrategy, string genericArguments = null)
Expand Down Expand Up @@ -449,7 +449,7 @@ public ActivationData GetOrCreateActivation(
{
return result;
}

int typeCode = address.Grain.TypeCode;
string actualGrainType = null;
MultiClusterRegistrationStrategy activationStrategy;
Expand Down Expand Up @@ -502,7 +502,7 @@ public ActivationData GetOrCreateActivation(
CounterStatistic.FindOrCreate(StatisticNames.CATALOG_ACTIVATION_NON_EXISTENT_ACTIVATIONS).Increment();
throw new NonExistentActivationException(msg, address, placement is StatelessWorkerPlacement);
}

SetupActivationInstance(result, grainType, genericArguments);
activatedPromise = InitActivation(result, grainType, genericArguments, requestContextData);
return result;
Expand Down Expand Up @@ -714,15 +714,15 @@ private void CreateGrainInstance(string grainTypeName, ActivationData data, stri
data.SetupContext(grainTypeData, this.serviceProvider);

Grain grain = grainCreator.CreateGrainInstance(data);

//if grain implements IStreamSubscriptionObserver, then install stream consumer extension on it
if(grain is IStreamSubscriptionObserver)
InstallStreamConsumerExtension(data, grain as IStreamSubscriptionObserver);

grain.Data = data;
data.SetGrainInstance(grain);
}

activations.IncrementGrainCounter(grainClassName);

if (logger.IsEnabled(LogLevel.Debug)) logger.Debug("CreateGrainInstance {0}{1}", data.Grain, data.ActivationId);
Expand Down Expand Up @@ -932,11 +932,11 @@ private void DestroyActivationAsync(ActivationData activation, MultiTaskCompleti
/// <returns></returns>
// Overall code flow:
// Deactivating state was already set before, in the correct context under lock.
// that means no more new requests will be accepted into this activation and all timer were stopped (no new ticks will be delivered or enqueued)
// that means no more new requests will be accepted into this activation and all timer were stopped (no new ticks will be delivered or enqueued)
// Wait for all already scheduled ticks to finish
// CallGrainDeactivate
// when AsyncDeactivate promise is resolved (NOT when all Deactivate turns are done, which may be orphan tasks):
// Unregister in the directory
// Unregister in the directory
// when all AsyncDeactivate turns are done (Dispatcher.OnActivationCompletedRequest):
// Set Invalid state
// UnregisterMessageTarget -> no new tasks will be enqueue (if an orphan task get enqueud, it is ignored and dropped on the floor).
Expand Down Expand Up @@ -1003,7 +1003,7 @@ private async void FinishDestroyActivations(List<ActivationData> list, int numbe
//logger.Info(ErrorCode.Catalog_DestroyActivations_Done, "Starting FinishDestroyActivations #{0} - with {1} Activations.", number, list.Count);
// step 3 - UnregisterManyAsync
try
{
{
List<ActivationAddress> activationsToDeactivate = list.
Where((ActivationData d) => d.IsUsingGrainDirectory).
Select((ActivationData d) => ActivationAddress.GetAddress(LocalSilo, d.Grain, d.ActivationId)).ToList();
Expand Down Expand Up @@ -1039,7 +1039,7 @@ await scheduler.RunOrQueueTask(() =>

// IMPORTANT: no more awaits and .Ignore after that point.

// Just use this opportunity to invalidate local Cache Entry as well.
// Just use this opportunity to invalidate local Cache Entry as well.
// If this silo is not the grain directory partition for this grain, it may have it in its cache.
try
{
Expand Down Expand Up @@ -1188,7 +1188,7 @@ private async Task<ActivationData> CallGrainDeactivateAndCleanupStreams(Activati
if (TryGetActivationData(activation.ActivationId, out ignore) &&
activation.State == ActivationState.Deactivating)
{
RequestContext.Clear(); // Clear any previous RC, so it does not leak into this call by mistake.
RequestContext.Clear(); // Clear any previous RC, so it does not leak into this call by mistake.
await activation.Lifecycle.OnStop().WithCancellation(ct);
}
if (logger.IsEnabled(LogLevel.Debug)) logger.Debug(ErrorCode.Catalog_AfterCallingDeactivate, "Returned from calling {1} grain's OnDeactivateAsync() method {0}", activation, grainTypeName);
Expand Down Expand Up @@ -1233,7 +1233,7 @@ private struct ActivationRegistrationResult
/// </summary>
public static readonly ActivationRegistrationResult Success = new ActivationRegistrationResult
{
IsSuccess = true
IsSuccess = true
};

public ActivationRegistrationResult(ActivationAddress existingActivationAddress)
Expand All @@ -1242,7 +1242,7 @@ public ActivationRegistrationResult(ActivationAddress existingActivationAddress)
ExistingActivationAddress = existingActivationAddress;
IsSuccess = false;
}

/// <summary>
/// Returns true if this instance represents a successful registration, false otherwise.
/// </summary>
Expand All @@ -1264,13 +1264,13 @@ private async Task<ActivationRegistrationResult> RegisterActivationInGrainDirect
{
ActivationAddress address = activation.Address;

// Currently, the only grain type that is not registered in the Grain Directory is StatelessWorker.
// Currently, the only grain type that is not registered in the Grain Directory is StatelessWorker.
// Among those that are registered in the directory, we currently do not have any multi activations.
if (activation.IsUsingGrainDirectory)
{
var result = await scheduler.RunOrQueueTask(() => directory.RegisterAsync(address, singleActivation:true), this.SchedulingContext);
if (address.Equals(result.Address)) return ActivationRegistrationResult.Success;

return new ActivationRegistrationResult(existingActivationAddress: result.Address);
}
else if (activation.PlacedUsing is StatelessWorkerPlacement stPlacement)
Expand Down Expand Up @@ -1302,7 +1302,7 @@ private async Task<ActivationRegistrationResult> RegisterActivationInGrainDirect
}
}

// We currently don't have any other case for multiple activations except for StatelessWorker.
// We currently don't have any other case for multiple activations except for StatelessWorker.
}

/// <summary>
Expand Down Expand Up @@ -1388,7 +1388,7 @@ private List<ActivationData> TryGetActivationDatas(List<ActivationAddress> addre
}

private void OnSiloStatusChange(SiloAddress updatedSilo, SiloStatus status)
{
{
// ignore joining events and also events on myself.
if (updatedSilo.Equals(LocalSilo)) return;

Expand Down
Loading

0 comments on commit 9677f59

Please sign in to comment.