Skip to content

Commit

Permalink
A few naming and minor fixes
Browse files Browse the repository at this point in the history
- Fixed casing in MultiClusterStatus
- Renamed singleActivation parameter
- Made it explicit when the default MultiClusterRegistrationStrategy is being used (and avoid harmless unused assignment)
- Fixed singleton pattern
  • Loading branch information
jdom committed Dec 4, 2015
1 parent 80c5fe7 commit e307ccb
Show file tree
Hide file tree
Showing 14 changed files with 79 additions and 76 deletions.
17 changes: 3 additions & 14 deletions src/Orleans/GrainDirectory/ClusterLocalRegistration.cs
Original file line number Diff line number Diff line change
@@ -1,8 +1,4 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace Orleans.GrainDirectory
{
Expand All @@ -13,23 +9,16 @@ namespace Orleans.GrainDirectory
[Serializable]
internal class ClusterLocalRegistration : MultiClusterRegistrationStrategy
{
private static ClusterLocalRegistration singleton;
private static readonly Lazy<ClusterLocalRegistration> singleton = new Lazy<ClusterLocalRegistration>(() => new ClusterLocalRegistration());

internal static ClusterLocalRegistration Singleton
{
get
{
if (singleton == null)
{
Initialize();
}
return singleton;
}
get { return singleton.Value; }
}

internal static void Initialize()
{
singleton = new ClusterLocalRegistration();
var instance = Singleton;
}

private ClusterLocalRegistration()
Expand Down
6 changes: 2 additions & 4 deletions src/Orleans/GrainDirectory/IGrainDirectory.cs
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Orleans.Runtime;

Expand All @@ -19,11 +17,11 @@ interface IGrainDirectory
/// <para>This method must be called from a scheduler thread.</para>
/// </summary>
/// <param name="address">The address of the new activation.</param>
/// <param name="singleact">If true, use single-activation registration</param>
/// <param name="singleActivation">If true, use single-activation registration</param>
/// <param name="withRetry">Indicates whether or not to retry the operation.</param>
/// <param name="hopcount">Counts recursion depth across silos</param>
/// <returns>The registered address and the version associated with this directory mapping.</returns>
Task<Tuple<ActivationAddress, int>> RegisterAsync(ActivationAddress address, bool singleact, int hopcount = 0);
Task<Tuple<ActivationAddress, int>> RegisterAsync(ActivationAddress address, bool singleActivation, int hopcount = 0);

/// <summary>
/// Removes the record for an existing activation from the directory service.
Expand Down
7 changes: 2 additions & 5 deletions src/Orleans/GrainDirectory/IGrainRegistrar.cs
Original file line number Diff line number Diff line change
@@ -1,7 +1,4 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Orleans.Runtime;

Expand All @@ -19,8 +16,9 @@ interface IGrainRegistrar
/// <para>This method must be called from a scheduler thread.</para>
/// </summary>
/// <param name="address">The address of the activation to register.</param>
/// <param name="singleActivation">If true, use single-activation registration</param>
/// <returns>The address registered for the grain's single activation.</returns>
Task<Tuple<ActivationAddress, int>> RegisterAsync(ActivationAddress address, bool singleact);
Task<Tuple<ActivationAddress, int>> RegisterAsync(ActivationAddress address, bool singleActivation);

/// <summary>
/// Removes the given activation for the grain.
Expand All @@ -36,6 +34,5 @@ interface IGrainRegistrar
/// </summary>
/// <returns></returns>
Task DeleteAsync(GrainId gid);

}
}
36 changes: 23 additions & 13 deletions src/Orleans/GrainDirectory/MultiClusterStatus.cs
Original file line number Diff line number Diff line change
@@ -1,23 +1,33 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace Orleans.GrainDirectory
namespace Orleans.GrainDirectory
{
/// <summary>
/// Status of a directory entry with respect to multi-cluster registration
/// </summary>
internal enum MultiClusterStatus
{
OWNED, // Registration is owned by this cluster.
DOUBTFUL, // Failed to contact one or more clusters while registering, so may be a duplicate.
/// <summary>
/// Registration is owned by this cluster.
/// </summary>
Owned,

CACHED, // Cached reference to a registration owned by a remote cluster.
/// <summary>
/// Failed to contact one or more clusters while registering, so may be a duplicate.
/// </summary>
Doubtful,

REQUESTED_OWNERSHIP, // The cluster is in the process of checking remote clusters for existing registrations.
RACE_LOSER, // The cluster lost a race condition.
}
/// <summary>
/// Cached reference to a registration owned by a remote cluster.
/// </summary>
Cached,

/// <summary>
/// The cluster is in the process of checking remote clusters for existing registrations.
/// </summary>
RequestedOwnership,

/// <summary>
/// The cluster lost a race condition.
/// </summary>
RaceLoser,
}
}
2 changes: 1 addition & 1 deletion src/Orleans/IDs/ActivationAddress.cs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public static ActivationAddress NewActivationAddress(SiloAddress silo, GrainId g
return GetAddress(silo, grain, activation);
}

public static ActivationAddress GetAddress(SiloAddress silo, GrainId grain, ActivationId activation, MultiClusterStatus status = MultiClusterStatus.OWNED)
public static ActivationAddress GetAddress(SiloAddress silo, GrainId grain, ActivationId activation, MultiClusterStatus status = MultiClusterStatus.Owned)
{
// Silo part is not mandatory
if (grain == null) throw new ArgumentNullException("grain");
Expand Down
6 changes: 3 additions & 3 deletions src/Orleans/SystemTargetInterfaces/IRemoteGrainDirectory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ internal interface IGrainInfo
int VersionTag { get; }
bool SingleInstance { get; }
bool AddActivation(ActivationId act, SiloAddress silo);
ActivationAddress AddSingleActivation(GrainId grain, ActivationId act, SiloAddress silo, MultiClusterStatus registrationStatus = MultiClusterStatus.OWNED);
ActivationAddress AddSingleActivation(GrainId grain, ActivationId act, SiloAddress silo, MultiClusterStatus registrationStatus = MultiClusterStatus.Owned);
bool RemoveActivation(ActivationAddress addr);
bool RemoveActivation(ActivationId act, bool force);
bool Merge(GrainId grain, IGrainInfo other);
Expand All @@ -60,9 +60,9 @@ internal interface IRemoteGrainDirectory : ISystemTarget, IGrainDirectory
/// This method should be called only remotely during handoff.
/// </summary>
/// <param name="addresses">The addresses of the grains to register</param>
/// <param name="singleactivation">whether to use single-activation registration</param>
/// <param name="singleActivation">If true, use single-activation registration</param>
/// <returns></returns>
Task RegisterMany(List<ActivationAddress> addresses, bool singleactivation);
Task RegisterMany(List<ActivationAddress> addresses, bool singleActivation);

/// <summary>
/// Fetch the updated information on the given list of grains.
Expand Down
10 changes: 8 additions & 2 deletions src/OrleansRuntime/Catalog/Catalog.cs
Original file line number Diff line number Diff line change
Expand Up @@ -432,12 +432,18 @@ public ActivationData GetOrCreateActivation(

int typeCode = address.Grain.GetTypeCode();
string actualGrainType = null;
MultiClusterRegistrationStrategy activationStrategy = MultiClusterRegistrationStrategy.GetDefault();
MultiClusterRegistrationStrategy activationStrategy;

if (typeCode != 0) // special case for Membership grain.
if (typeCode != 0)
{
GetGrainTypeInfo(typeCode, out actualGrainType, out placement, out activationStrategy);
}
else
{
// special case for Membership grain.
placement = SystemPlacement.Singleton;
activationStrategy = MultiClusterRegistrationStrategy.GetDefault();
}

if (newPlacement && !SiloStatusOracle.CurrentStatus.IsTerminating())
{
Expand Down
4 changes: 2 additions & 2 deletions src/OrleansRuntime/GrainDirectory/ClientObserverRegistrar.cs
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ internal void ClientAdded(GrainId clientId)
// That way, when we refresh it in the directiry, it's the same one.
var addr = GetClientActivationAddress(clientId);
scheduler.QueueTask(
() => ExecuteWithRetries(() => grainDirectory.RegisterAsync(addr, singleact:false), ErrorCode.ClientRegistrarFailedToRegister, String.Format("Directory.RegisterAsync {0} failed.", addr)),
() => ExecuteWithRetries(() => grainDirectory.RegisterAsync(addr, singleActivation:false), ErrorCode.ClientRegistrarFailedToRegister, String.Format("Directory.RegisterAsync {0} failed.", addr)),
this.SchedulingContext)
.Ignore();
}
Expand Down Expand Up @@ -132,7 +132,7 @@ private async Task OnClientRefreshTimer(object data)
foreach (GrainId clientId in clients)
{
var addr = GetClientActivationAddress(clientId);
Task task = grainDirectory.RegisterAsync(addr, singleact:false).
Task task = grainDirectory.RegisterAsync(addr, singleActivation:false).
LogException(logger, ErrorCode.ClientRegistrarFailedToRegister_2, String.Format("Directory.RegisterAsync {0} failed.", addr));
tasks.Add(task);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ internal void ProcessSiloAddEvent(SiloAddress addedSilo)
if (logger.IsVerbose) logger.Verbose("Sending " + splitPartListSingle.Count + " single activation entries to " + addedSilo);
localDirectory.Scheduler.QueueTask(async () =>
{
await localDirectory.GetDirectoryReference(successors[0]).RegisterMany(splitPartListSingle, singleactivation:true);
await localDirectory.GetDirectoryReference(successors[0]).RegisterMany(splitPartListSingle, singleActivation:true);
splitPartListSingle.ForEach(
activationAddress =>
localDirectory.DirectoryPartition.RemoveGrain(activationAddress.Grain));
Expand All @@ -243,7 +243,7 @@ internal void ProcessSiloAddEvent(SiloAddress addedSilo)
if (logger.IsVerbose) logger.Verbose("Sending " + splitPartListMulti.Count + " entries to " + addedSilo);
localDirectory.Scheduler.QueueTask(async () =>
{
await localDirectory.GetDirectoryReference(successors[0]).RegisterMany(splitPartListMulti, singleactivation:false);
await localDirectory.GetDirectoryReference(successors[0]).RegisterMany(splitPartListMulti, singleActivation:false);
splitPartListMulti.ForEach(
activationAddress =>
localDirectory.DirectoryPartition.RemoveGrain(activationAddress.Grain));
Expand Down
10 changes: 5 additions & 5 deletions src/OrleansRuntime/GrainDirectory/GrainDirectoryPartition.cs
Original file line number Diff line number Diff line change
Expand Up @@ -96,12 +96,12 @@ public bool AddActivation(ActivationId act, SiloAddress silo)
return false;
}
}
Instances[act] = new ActivationInfo(silo, MultiClusterStatus.OWNED);
Instances[act] = new ActivationInfo(silo, MultiClusterStatus.Owned);
VersionTag = rand.Next();
return true;
}

public ActivationAddress AddSingleActivation(GrainId grain, ActivationId act, SiloAddress silo, MultiClusterStatus registrationStatus = MultiClusterStatus.OWNED)
public ActivationAddress AddSingleActivation(GrainId grain, ActivationId act, SiloAddress silo, MultiClusterStatus registrationStatus = MultiClusterStatus.Owned)
{
SingleInstance = true;
if (Instances.Count > 0)
Expand Down Expand Up @@ -194,7 +194,7 @@ public void CacheOrUpdateRemoteClusterRegistration(GrainId grain, ActivationId o
{
Instances.Remove(oldActivation);
}
Instances.Add(activation, new ActivationInfo(silo, MultiClusterStatus.CACHED));
Instances.Add(activation, new ActivationInfo(silo, MultiClusterStatus.Cached));
}

public bool UpdateClusterRegistrationStatus(ActivationId act, MultiClusterStatus status, MultiClusterStatus? comparewith = null)
Expand Down Expand Up @@ -294,7 +294,7 @@ internal virtual int AddActivation(GrainId grain, ActivationId activation, SiloA
/// <param name="silo"></param>
/// <param name="registrationStatus"></param>
/// <returns>The registered ActivationAddress and version associated with this directory mapping</returns>
internal virtual Tuple<ActivationAddress, int> AddSingleActivation(GrainId grain, ActivationId activation, SiloAddress silo, MultiClusterStatus registrationStatus = MultiClusterStatus.OWNED)
internal virtual Tuple<ActivationAddress, int> AddSingleActivation(GrainId grain, ActivationId activation, SiloAddress silo, MultiClusterStatus registrationStatus = MultiClusterStatus.Owned)
{
if (log.IsVerbose3) log.Verbose3("Adding single activation for grain {0}{1}{2}", silo, grain, activation);

Expand Down Expand Up @@ -542,7 +542,7 @@ public void CacheOrUpdateRemoteClusterRegistration(GrainId grain, ActivationId o
else
{
AddSingleActivation(grain, otherClusterAddress.Activation, otherClusterAddress.Silo,
MultiClusterStatus.CACHED);
MultiClusterStatus.Cached);
}
}
}
Expand Down
14 changes: 7 additions & 7 deletions src/OrleansRuntime/GrainDirectory/LocalGrainDirectory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -545,9 +545,9 @@ public SiloAddress CheckIfIShouldForward(GrainId grain, int hopcount, string ope
}


public async Task<Tuple<ActivationAddress, int>> RegisterAsync(ActivationAddress address, bool singleact, int hopcount)
public async Task<Tuple<ActivationAddress, int>> RegisterAsync(ActivationAddress address, bool singleActivation, int hopcount)
{
(singleact ? (hopcount > 0 ? RegistrationsSingleActRemoteReceived : registrationsSingleActIssued)
(singleActivation ? (hopcount > 0 ? RegistrationsSingleActRemoteReceived : registrationsSingleActIssued)
: (hopcount > 0 ? RegistrationsRemoteReceived : registrationsIssued)).Increment();

SiloAddress owner = CalculateTargetSilo(address.Grain);
Expand All @@ -564,21 +564,21 @@ public async Task<Tuple<ActivationAddress, int>> RegisterAsync(ActivationAddress

if (forwardaddress == null)
{
(singleact ? RegistrationsSingleActLocal : RegistrationsLocal).Increment();
(singleActivation ? RegistrationsSingleActLocal : RegistrationsLocal).Increment();

// we are the owner
var registrar = RegistrarManager.Instance.GetRegistrarForGrain(address.Grain);

return await registrar.RegisterAsync(address, singleact);
return await registrar.RegisterAsync(address, singleActivation);
}
else
{
(singleact ? RegistrationsSingleActRemoteSent : RegistrationsRemoteSent).Increment();
(singleActivation ? RegistrationsSingleActRemoteSent : RegistrationsRemoteSent).Increment();

// otherwise, notify the owner
Tuple<ActivationAddress, int> returnedAddress = await GetDirectoryReference(forwardaddress).RegisterAsync(address, singleact, hopcount + 1);
Tuple<ActivationAddress, int> returnedAddress = await GetDirectoryReference(forwardaddress).RegisterAsync(address, singleActivation, hopcount + 1);

if (singleact)
if (singleActivation)
{
// Caching optimization:
// cache the result of a successfull RegisterSingleActivation call, only if it is not a duplicate activation.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@ public ClusterLocalRegistrar(GrainDirectoryPartition partition)
DirectoryPartition = partition;
}

public virtual Task<Tuple<ActivationAddress, int>> RegisterAsync(ActivationAddress address, bool singleact)
public virtual Task<Tuple<ActivationAddress, int>> RegisterAsync(ActivationAddress address, bool singleActivation)
{
if (singleact)
if (singleActivation)
{
var returnedAddress = DirectoryPartition.AddSingleActivation(address.Grain, address.Activation, address.Silo);
return Task.FromResult(returnedAddress);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,5 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Orleans.GrainDirectory;

namespace Orleans.Runtime.GrainDirectory
Expand Down Expand Up @@ -30,22 +27,28 @@ public static void InitializeGrainDirectoryManager(LocalGrainDirectory router)
private void Register<TStrategy>(IGrainRegistrar directory)
where TStrategy : MultiClusterRegistrationStrategy
{
registrars.Add(typeof(TStrategy), directory);
this.registrars.Add(typeof(TStrategy), directory);
}

public IGrainRegistrar GetRegistrarForGrain(GrainId gid)
{
string unusedGrainClass;
PlacementStrategy unusedPlacement;
MultiClusterRegistrationStrategy strategy = ClusterLocalRegistration.Singleton; // default
MultiClusterRegistrationStrategy strategy;

var typeCode = gid.GetTypeCode();

if (typeCode != 0) // special case for Membership grain or client grain.
if (typeCode != 0)
{
string unusedGrainClass;
PlacementStrategy unusedPlacement;
GrainTypeManager.Instance.GetTypeInfo(gid.GetTypeCode(), out unusedGrainClass, out unusedPlacement, out strategy);


return registrars[strategy.GetType()];
}
else
{
// special case for Membership grain or client grain.
strategy = ClusterLocalRegistration.Singleton; // default
}

return this.registrars[strategy.GetType()];
}
}
}
10 changes: 5 additions & 5 deletions src/OrleansRuntime/GrainDirectory/RemoteGrainDirectory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -45,14 +45,14 @@ internal RemoteGrainDirectory(LocalGrainDirectory r, GrainId id)
logger = TraceLogger.GetLogger("Orleans.GrainDirectory.CacheValidator", TraceLogger.LoggerType.Runtime);
}

public async Task<Tuple<ActivationAddress, int>> RegisterAsync(ActivationAddress address, bool singleact, int hopcount)
public async Task<Tuple<ActivationAddress, int>> RegisterAsync(ActivationAddress address, bool singleActivation, int hopcount)
{
(singleact ? router.RegistrationsSingleActRemoteReceived : router.RegistrationsRemoteReceived).Increment();
(singleActivation ? router.RegistrationsSingleActRemoteReceived : router.RegistrationsRemoteReceived).Increment();

return await router.RegisterAsync(address, singleact, hopcount);
return await router.RegisterAsync(address, singleActivation, hopcount);
}

public Task RegisterMany(List<ActivationAddress> addresses, bool singleact)
public Task RegisterMany(List<ActivationAddress> addresses, bool singleActivation)
{
// validate that this request arrived correctly
//logger.Assert(ErrorCode.Runtime_Error_100140, silo.Matches(router.MyAddress), "destination address != my address");
Expand All @@ -62,7 +62,7 @@ public Task RegisterMany(List<ActivationAddress> addresses, bool singleact)
if (addresses.Count == 0)
return TaskDone.Done;

return Task.WhenAll(addresses.Select(addr => router.RegisterAsync(addr, singleact, 1)));
return Task.WhenAll(addresses.Select(addr => router.RegisterAsync(addr, singleActivation, 1)));
}

public Task UnregisterAsync(ActivationAddress address, bool force, int hopcount)
Expand Down

0 comments on commit e307ccb

Please sign in to comment.