Skip to content

Commit

Permalink
Cherry-pick post 2.2.0-rc1 fixes for 2.2.0 final #3 (#5254)
Browse files Browse the repository at this point in the history
Add GSI cache maintenance and tests (#5184)
Revert: Fix call chain reentrancy (#5145, #5225) (#5249)
  • Loading branch information
sergeybykov authored and ReubenBond committed Dec 12, 2018
1 parent 3a2be47 commit a561906
Show file tree
Hide file tree
Showing 23 changed files with 450 additions and 478 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ private IEnumerable<string> FormatProperty(PropertyInfo property)
}
}

internal class DefaultOptionsFormatterResolver<T> : IOptionFormatterResolver<T>
internal class DefaultOptionsFormatterResolver<T> : IOptionFormatterResolver<T>
where T: class, new()
{
private IOptionsSnapshot<T> optionsSnapshot;
Expand Down
6 changes: 3 additions & 3 deletions src/Orleans.Core/Messaging/RequestInvocationHistory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

namespace Orleans.Runtime
{
// used for tracking request invocation history for deadlock detection and call chain reentrancy
// used for tracking request invocation history for deadlock detection.
[Serializable]
internal sealed class RequestInvocationHistory
{
Expand All @@ -14,12 +14,12 @@ public RequestInvocationHistory(GrainId grainId, ActivationId activationId, stri
{
this.GrainId = grainId;
this.ActivationId = activationId;
this.DebugContext = debugContext;
DebugContext = debugContext;
}

public override string ToString()
{
return $"RequestInvocationHistory {GrainId}:{ActivationId}:{DebugContext}";
return String.Format("RequestInvocationHistory {0}:{1}:{2}", GrainId, ActivationId, DebugContext);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -95,5 +95,9 @@ Task<RemoteClusterActivationResponse[]> ProcessActivationRequestBatch(
/// <param name="grainId"></param>
Task ProcessDeletion(GrainId grainId);

/// <summary>
/// Called on remote clusters to ping availability of a silo and determine cluster id.
/// </summary>
Task<string> Ping();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ private static void AddLegacyClusterConfigurationSupport(IServiceCollection serv
var nodeConfig = configuration.GetOrCreateNodeConfigurationForSilo(siloOptions.Value.SiloName);
options.ExcludedGrainTypes.AddRange(nodeConfig.ExcludedGrainTypes);
});

services.AddOptions<SchedulingOptions>()
.Configure<GlobalConfiguration>((options, config) =>
{
Expand Down Expand Up @@ -280,7 +280,7 @@ private static void AddLegacyClusterConfigurationSupport(IServiceCollection serv
{
options.IsRunningAsUnitTest = config.IsRunningAsUnitTest;
});

services.AddOptions<GrainVersioningOptions>()
.Configure<GlobalConfiguration>((options, config) =>
{
Expand Down
42 changes: 25 additions & 17 deletions src/Orleans.Runtime/Catalog/ActivationData.cs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ namespace Orleans.Runtime
/// </summary>
internal class ActivationData : IGrainActivationContext, IActivationData, IInvokable, IDisposable
{
// This class is used for activations that have extension invokers. It keeps a dictionary of
// This class is used for activations that have extension invokers. It keeps a dictionary of
// invoker objects to use with the activation, and extend the default invoker
// defined for the grain class.
// Note that in all cases we never have more than one copy of an actual invoker;
Expand All @@ -34,7 +34,7 @@ private class ExtensionInvoker : IGrainMethodInvoker, IGrainExtensionMap
// Because calls to ExtensionInvoker are allways made within the activation context,
// we rely on the single-threading guarantee of the runtime and do not protect the map with a lock.
private Dictionary<int, Tuple<IGrainExtension, IGrainExtensionMethodInvoker>> extensionMap; // key is the extension interface ID

/// <summary>
/// Try to add an extension for the specific interface ID.
/// Fail and return false if there is already an extension for that interface ID.
Expand Down Expand Up @@ -171,7 +171,7 @@ internal class GrainActivationContextFactory
private IGrainMethodInvoker lastInvoker;
private IServiceScope serviceScope;
private HashSet<IGrainTimer> timers;

public ActivationData(
ActivationAddress addr,
string genericArguments,
Expand Down Expand Up @@ -264,6 +264,8 @@ internal bool TryGetExtensionHandler(Type extensionType, out IGrainExtension res
return extensionInvoker != null && extensionInvoker.TryGetExtensionHandler(extensionType, out result);
}

public HashSet<ActivationId> RunningRequestsSenders { get; } = new HashSet<ActivationId>();

public ISchedulingContext SchedulingContext { get; }

public string GrainTypeName
Expand Down Expand Up @@ -311,14 +313,14 @@ private static void SetGrainActivationContextInScopedServices(IServiceProvider s
var contextFactory = sp.GetRequiredService<GrainActivationContextFactory>();
contextFactory.Context = context;
}

private Streams.StreamDirectory streamDirectory;
internal Streams.StreamDirectory GetStreamDirectory()
{
return streamDirectory ?? (streamDirectory = new Streams.StreamDirectory());
}

internal bool IsUsingStreams
internal bool IsUsingStreams
{
get { return streamDirectory != null; }
}
Expand All @@ -341,7 +343,7 @@ GrainReference IActivationData.GrainReference
{
get { return GrainReference; }
}

public GrainId Identity
{
get { return Grain; }
Expand Down Expand Up @@ -433,7 +435,7 @@ public void ResetCollectionTicket()
public void SetCollectionTicket(DateTime ticket)
{
if (ticket == default(DateTime)) throw new ArgumentException("default(DateTime) is disallowed", "ticket");
if (CollectionTicket != default(DateTime))
if (CollectionTicket != default(DateTime))
{
throw new InvalidOperationException("call ResetCollectionTicket before calling SetCollectionTicket.");
}
Expand All @@ -447,7 +449,7 @@ public void SetCollectionTicket(DateTime ticket)

// Currently, the only supported multi-activation grain is one using the StatelessWorkerPlacement strategy.
internal bool IsStatelessWorker => this.PlacedUsing is StatelessWorkerPlacement;

/// <summary>
/// Returns a value indicating whether or not this placement strategy requires activations to be registered in
/// the grain directory.
Expand All @@ -469,6 +471,12 @@ public void RecordRunning(Message message)
// Note: This method is always called while holding lock on this activation, so no need for additional locks here

numRunning++;
if (message.Direction != Message.Directions.OneWay
&& message.SendingActivation != null
&& !message.SendingGrain?.IsClient == true)
{
RunningRequestsSenders.Add(message.SendingActivation);
}

if (Running != null) return;

Expand All @@ -482,7 +490,7 @@ public void ResetRunning(Message message)
{
// Note: This method is always called while holding lock on this activation, so no need for additional locks here
numRunning--;

RunningRequestsSenders.Remove(message.SendingActivation);
if (numRunning == 0)
{
becameIdle = DateTime.UtcNow;
Expand Down Expand Up @@ -515,7 +523,7 @@ public void ResetRunning(Message message)

/// <summary>Increment the number of in-flight messages currently being processed.</summary>
public void IncrementInFlightCount() { Interlocked.Increment(ref inFlightCount); }

/// <summary>Decrement the number of in-flight messages currently being processed.</summary>
public void DecrementInFlightCount() { Interlocked.Decrement(ref inFlightCount); }

Expand All @@ -524,14 +532,14 @@ public void ResetRunning(Message message)

/// <summary>Decrement the number of messages currently in the process of being received.</summary>
public void DecrementEnqueuedOnDispatcherCount() { Interlocked.Decrement(ref enqueuedOnDispatcherCount); }

/// <summary>
/// grouped by sending activation: responses first, then sorted by id
/// </summary>
private List<Message> waiting;

public int WaitingCount
{
public int WaitingCount
{
get
{
return waiting == null ? 0 : waiting.Count;
Expand Down Expand Up @@ -594,7 +602,7 @@ public EnqueueMessageResult EnqueueMessage(Message message)
}

/// <summary>
/// Check whether this activation is overloaded.
/// Check whether this activation is overloaded.
/// Returns LimitExceededException if overloaded, otherwise <c>null</c>c>
/// </summary>
/// <param name="log">Logger to use for reporting any overflow condition</param>
Expand All @@ -617,7 +625,7 @@ public LimitExceededException CheckOverloaded(ILogger log)

if (maxRequestsHardLimit > 0 && count > maxRequestsHardLimit) // Hard limit
{
log.Warn(ErrorCode.Catalog_Reject_ActivationTooManyRequests,
log.Warn(ErrorCode.Catalog_Reject_ActivationTooManyRequests,
String.Format("Overload - {0} enqueued requests for activation {1}, exceeding hard limit rejection threshold of {2}",
count, this, maxRequestsHardLimit));

Expand Down Expand Up @@ -691,7 +699,7 @@ public TimeSpan GetIdleness(DateTime now)
{
if (now == default(DateTime))
throw new ArgumentException("default(DateTime) is not allowed; Use DateTime.UtcNow instead.", "now");

return now - becameIdle;
}

Expand Down Expand Up @@ -795,7 +803,7 @@ public void OnTimerDisposed(IGrainTimer orleansTimerInsideGrain)
internal Task WaitForAllTimersToFinish()
{
lock(this)
{
{
if (timers == null)
{
return Task.CompletedTask;
Expand Down
Loading

0 comments on commit a561906

Please sign in to comment.