Skip to content

Commit

Permalink
Optimize getting a single queue/topic/subscription
Browse files Browse the repository at this point in the history
  • Loading branch information
0xced committed Jun 6, 2018
1 parent 3ec0e5d commit e91fdfd
Show file tree
Hide file tree
Showing 8 changed files with 144 additions and 110 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ public void CanGetQueue()
{
this.Given(x => x.GivenABuilder())
.And(x => x.GivenThatTheBusMonitorReturnsAnOverview(false))
.When(x => x.WhenBuildingModel("testqueue", false))
.When(x => x.WhenBuildingModel("testqueue"))
.Then(x => x.ThenTheQueueIsReturned())
.Then(x => x.ThenThereShouldBeNoException())
.BDDfy();
Expand All @@ -36,7 +36,7 @@ public void CanGetQueueWithCaseInsensitiveMatch()
{
this.Given(x => x.GivenABuilder())
.And(x => x.GivenThatTheBusMonitorReturnsAnOverview(false))
.When(x => x.WhenBuildingModel("testQUeuE", false))
.When(x => x.WhenBuildingModel("testQUeuE"))
.Then(x => x.ThenTheQueueIsReturned())
.Then(x => x.ThenThereShouldBeNoException())
.BDDfy();
Expand All @@ -47,7 +47,7 @@ public void CanGetQueueWithForceFresh()
{
this.Given(x => x.GivenABuilder())
.And(x => x.GivenThatTheBusMonitorReturnsAnOverview(true))
.When(x => x.WhenBuildingModel("testqueue", true))
.When(x => x.WhenBuildingModel("testqueue"))
.Then(x => x.ThenTheQueueIsReturned())
.Then(x => x.ThenThereShouldBeNoException())
.BDDfy();
Expand All @@ -58,7 +58,7 @@ public void WillFailOnDuplicateQueues()
{
this.Given(x => x.GivenABuilder())
.And(x => x.GivenThatTheBusMonitorReturnsAnOverviewWithDuplicateQueues())
.When(x => x.WhenBuildingModel("testqueue", true))
.When(x => x.WhenBuildingModel("testqueue"))
.Then(x => x.ThenThereShouldBeAnException())
.BDDfy();
}
Expand Down Expand Up @@ -90,11 +90,11 @@ void GivenThatTheBusMonitorReturnsAnOverviewWithDuplicateQueues()
_busMonitor.GetOverview().Returns(overview);
}

async Task WhenBuildingModel(string queuename, bool forceFresh)
async Task WhenBuildingModel(string queuename)
{
try
{
_result = await _builder.Build(new QueueCriteria(queuename, forceFresh));
_result = await _builder.Build(new QueueCriteria(queuename));
}
catch (Exception ex)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ public void CanGetSubscription()
{
this.Given(x => x.GivenABuilder())
.And(x => x.GivenThatTheBusMonitorReturnsAnOverview(false))
.When(x => x.WhenBuildingModel("testtopic", "testsubscription", false))
.When(x => x.WhenBuildingModel("testtopic", "testsubscription"))
.Then(x => x.ThenTheSubscriptionIsReturned())
.Then(x => x.ThenThereShouldBeNoException())
.BDDfy();
Expand All @@ -36,7 +36,7 @@ public void CanGetSubscriptionWithCaseInsensitiveMatch()
{
this.Given(x => x.GivenABuilder())
.And(x => x.GivenThatTheBusMonitorReturnsAnOverview(false))
.When(x => x.WhenBuildingModel("testtopic", "testSuBscription", false))
.When(x => x.WhenBuildingModel("testtopic", "testSuBscription"))
.Then(x => x.ThenTheSubscriptionIsReturned())
.Then(x => x.ThenThereShouldBeNoException())
.BDDfy();
Expand All @@ -47,7 +47,7 @@ public void CanGetSubscriptionWithForceFresh()
{
this.Given(x => x.GivenABuilder())
.And(x => x.GivenThatTheBusMonitorReturnsAnOverview(true))
.When(x => x.WhenBuildingModel("testtopic", "testsubscription", true))
.When(x => x.WhenBuildingModel("testtopic", "testsubscription"))
.Then(x => x.ThenTheSubscriptionIsReturned())
.Then(x => x.ThenThereShouldBeNoException())
.BDDfy();
Expand All @@ -58,7 +58,7 @@ public void WillFailOnDuplicateSubscriptions()
{
this.Given(x => x.GivenABuilder())
.And(x => x.GivenThatTheBusMonitorReturnsAnOverviewWithDuplicateSubscriptions())
.When(x => x.WhenBuildingModel("testtopic", "testsubscription", false))
.When(x => x.WhenBuildingModel("testtopic", "testsubscription"))
.Then(x => x.ThenThereShouldBeAnException())
.BDDfy();
}
Expand Down Expand Up @@ -96,11 +96,11 @@ void GivenThatTheBusMonitorReturnsAnOverviewWithDuplicateSubscriptions()
_busMonitor.GetOverview().Returns(overview);
}

async Task WhenBuildingModel(string topicname, string subscriptionname, bool forceFresh)
async Task WhenBuildingModel(string topicname, string subscriptionname)
{
try
{
_result = await _builder.Build(new SubscriptionCriteria(topicname, subscriptionname, forceFresh));
_result = await _builder.Build(new SubscriptionCriteria(topicname, subscriptionname));
}
catch (Exception ex)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ public void CanGetTopic()
{
this.Given(x => x.GivenABuilder())
.And(x => x.GivenThatTheBusMonitorReturnsAnOverview(false))
.When(x => x.WhenBuildingModel("testtopic", false))
.When(x => x.WhenBuildingModel("testtopic"))
.Then(x => x.ThenTheTopicIsReturned())
.Then(x => x.ThenThereShouldBeNoException())
.BDDfy();
Expand All @@ -36,7 +36,7 @@ public void CanGetTopicWithCaseInsensitiveMatch()
{
this.Given(x => x.GivenABuilder())
.And(x => x.GivenThatTheBusMonitorReturnsAnOverview(false))
.When(x => x.WhenBuildingModel("testToPiC", false))
.When(x => x.WhenBuildingModel("testToPiC"))
.Then(x => x.ThenTheTopicIsReturned())
.Then(x => x.ThenThereShouldBeNoException())
.BDDfy();
Expand All @@ -47,7 +47,7 @@ public void CanGetTopicWithForceFresh()
{
this.Given(x => x.GivenABuilder())
.And(x => x.GivenThatTheBusMonitorReturnsAnOverview(true))
.When(x => x.WhenBuildingModel("testtopic", true))
.When(x => x.WhenBuildingModel("testtopic"))
.Then(x => x.ThenTheTopicIsReturned())
.Then(x => x.ThenThereShouldBeNoException())
.BDDfy();
Expand All @@ -58,7 +58,7 @@ public void WillFailOnDuplicateTopics()
{
this.Given(x => x.GivenABuilder())
.And(x => x.GivenThatTheBusMonitorReturnsAnOverviewWithDuplicateTopics())
.When(x => x.WhenBuildingModel("testtopic", true))
.When(x => x.WhenBuildingModel("testtopic"))
.Then(x => x.ThenThereShouldBeAnException())
.BDDfy();
}
Expand Down Expand Up @@ -96,11 +96,11 @@ void GivenThatTheBusMonitorReturnsAnOverviewWithDuplicateTopics()
_busMonitor.GetOverview().Returns(overview);
}

async Task WhenBuildingModel(string topicname, bool forceFresh)
async Task WhenBuildingModel(string topicname)
{
try
{
_result = await _builder.Build(new TopicCriteria(topicname, forceFresh));
_result = await _builder.Build(new TopicCriteria(topicname));
}
catch (Exception ex)
{
Expand Down
166 changes: 107 additions & 59 deletions src/SbManager/BusHelpers/BusMonitor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@ namespace SbManager.BusHelpers
public interface IBusMonitor
{
Task<Overview> GetOverview(bool fresh = false);
Task<Queue> GetQueue(string queueName);
Task<Topic> GetTopic(string topicName);
Task<Subscription> GetSubscription(string topicName, string subscriptionName);
}
public class BusMonitor : IBusMonitor
{
Expand Down Expand Up @@ -43,72 +46,58 @@ public async Task<Overview> GetOverview(bool forceDirty = false)
return _cached;
}

public Task<Queue> GetQueue(string queueName)
{
var queue = MapQueue(_namespaceManager.GetQueue(queueName));
return Task.FromResult(queue);
}

public Task<Topic> GetTopic(string topicName)
{
var topic = MapTopic(_namespaceManager.GetTopic(topicName));
FillTopic(topic);
return Task.FromResult(topic);
}

public Task<Subscription> GetSubscription(string topicName, string subscriptionName)
{
var subscription = MapSubscription(topicName, _namespaceManager.GetSubscription(topicName, subscriptionName));
FillSubscription(subscription, topicName);
return Task.FromResult(subscription);
}

private void FillTopic(Topic topic)
{
topic.Subscriptions = _namespaceManager.GetSubscriptions(topic.Name).Select(e => MapSubscription(topic.Name, e)).ToList();

foreach (var subscription in topic.Subscriptions)
{
FillSubscription(subscription, topic.Name);
}

topic.ActiveMessageCount = topic.Subscriptions.Sum(s => s.ActiveMessageCount);
topic.DeadLetterCount = topic.Subscriptions.Sum(s => s.DeadLetterCount);
topic.ScheduledMessageCount = topic.Subscriptions.Sum(s => s.ScheduledMessageCount);
topic.TransferMessageCount = topic.Subscriptions.Sum(s => s.TransferMessageCount);
topic.DeadTransferMessageCount = topic.Subscriptions.Sum(s => s.DeadTransferMessageCount);
}

private void FillSubscription(Subscription subscription, string topicName)
{
subscription.Rules = _namespaceManager.GetRules(topicName, subscription.Name).Select(MapRule).ToList();
}

private Task<Overview> Fetch()
{
var overview = new Overview
{
Queues = _namespaceManager.GetQueues().Select(e => new Queue
{
Status = e.Status.ToString(),
ActiveMessageCount = e.CountDetails.ActiveMessageCount,
DeadLetterCount = e.CountDetails.DeadLetterMessageCount,
ScheduledMessageCount = e.CountDetails.ScheduledMessageCount,
TransferMessageCount = e.CountDetails.TransferMessageCount,
DeadTransferMessageCount = e.CountDetails.TransferDeadLetterMessageCount,
SizeInBytes = e.SizeInBytes,
AutoDeleteOnIdle = new Time(e.AutoDeleteOnIdle),
DefaultMessageTTL = new Time(e.DefaultMessageTimeToLive),
DuplicateDetectionWindow = new Time(e.DuplicateDetectionHistoryTimeWindow),
LockDuration = new Time(e.LockDuration),
CreatedAt = e.CreatedAt,
UpdatedAt = e.UpdatedAt,
AccessedAt = e.AccessedAt,
Name = e.Path
}).ToList(),
Topics = _namespaceManager.GetTopics().Select(e => new Topic
{
Status = e.Status.ToString(),
Name = e.Path,
SizeInBytes = e.SizeInBytes,
AutoDeleteOnIdle = new Time(e.AutoDeleteOnIdle),
DefaultMessageTTL = new Time(e.DefaultMessageTimeToLive),
DuplicateDetectionWindow = new Time(e.DuplicateDetectionHistoryTimeWindow),
CreatedAt = e.CreatedAt,
UpdatedAt = e.UpdatedAt,
//AccessedAt = e.AccessedAt,
}).ToList(),
Queues = _namespaceManager.GetQueues().Select(MapQueue).ToList(),
Topics = _namespaceManager.GetTopics().Select(MapTopic).ToList()
};

foreach (var topic in overview.Topics)
{
topic.Subscriptions = _namespaceManager.GetSubscriptions(topic.Name).Select(e => new Subscription
{
Status = e.Status.ToString(),
Name = e.Name,
TopicName = topic.Name,
//ActiveMessageCount = e.CountDetails.ActiveMessageCount,
//DeadLetterCount = e.CountDetails.DeadLetterMessageCount,
//ScheduledMessageCount = e.CountDetails.ScheduledMessageCount,
//TransferMessageCount = e.CountDetails.TransferMessageCount,
//DeadTransferMessageCount = e.CountDetails.TransferDeadLetterMessageCount,
AutoDeleteOnIdle = new Time(e.AutoDeleteOnIdle),
DefaultMessageTTL = new Time(e.DefaultMessageTimeToLive),
LockDuration = new Time(e.LockDuration),
CreatedAt = e.CreatedAt,
UpdatedAt = e.UpdatedAt,
AccessedAt = e.AccessedAt,
}).ToList();

foreach (var subscription in topic.Subscriptions)
{
subscription.Rules = _namespaceManager.GetRules(topic.Name, subscription.Name).Select(MapRule).ToList();
}

topic.ActiveMessageCount = topic.Subscriptions.Sum(s => s.ActiveMessageCount);
topic.DeadLetterCount = topic.Subscriptions.Sum(s => s.DeadLetterCount);
topic.ScheduledMessageCount = topic.Subscriptions.Sum(s => s.ScheduledMessageCount);
topic.TransferMessageCount = topic.Subscriptions.Sum(s => s.TransferMessageCount);
topic.DeadTransferMessageCount = topic.Subscriptions.Sum(s => s.DeadTransferMessageCount);
FillTopic(topic);
}

var queueMessageCounts = GetCounts(overview.Queues);
Expand Down Expand Up @@ -145,7 +134,66 @@ private bool Dirty(bool forceDirty)
return forceDirty || (GetTime() > _lastTouch.AddMilliseconds(RefreshTime));
}

private Rule MapRule(RuleDescription r)
private static Queue MapQueue(QueueDescription queue)
{
return new Queue
{
Status = queue.Status.ToString(),
ActiveMessageCount = queue.CountDetails.ActiveMessageCount,
DeadLetterCount = queue.CountDetails.DeadLetterMessageCount,
ScheduledMessageCount = queue.CountDetails.ScheduledMessageCount,
TransferMessageCount = queue.CountDetails.TransferMessageCount,
DeadTransferMessageCount = queue.CountDetails.TransferDeadLetterMessageCount,
SizeInBytes = queue.SizeInBytes,
AutoDeleteOnIdle = new Time(queue.AutoDeleteOnIdle),
DefaultMessageTTL = new Time(queue.DefaultMessageTimeToLive),
DuplicateDetectionWindow = new Time(queue.DuplicateDetectionHistoryTimeWindow),
LockDuration = new Time(queue.LockDuration),
CreatedAt = queue.CreatedAt,
UpdatedAt = queue.UpdatedAt,
AccessedAt = queue.AccessedAt,
Name = queue.Path
};
}

private static Topic MapTopic(TopicDescription topic)
{
return new Topic
{
Status = topic.Status.ToString(),
Name = topic.Path,
SizeInBytes = topic.SizeInBytes,
AutoDeleteOnIdle = new Time(topic.AutoDeleteOnIdle),
DefaultMessageTTL = new Time(topic.DefaultMessageTimeToLive),
DuplicateDetectionWindow = new Time(topic.DuplicateDetectionHistoryTimeWindow),
CreatedAt = topic.CreatedAt,
UpdatedAt = topic.UpdatedAt,
//AccessedAt = topic.AccessedAt,
};
}

private static Subscription MapSubscription(string topicName, SubscriptionDescription subscription)
{
return new Subscription
{
Status = subscription.Status.ToString(),
Name = subscription.Name,
TopicName = topicName,
//ActiveMessageCount = subscription.CountDetails.ActiveMessageCount,
//DeadLetterCount = subscription.CountDetails.DeadLetterMessageCount,
//ScheduledMessageCount = subscription.CountDetails.ScheduledMessageCount,
//TransferMessageCount = subscription.CountDetails.TransferMessageCount,
//DeadTransferMessageCount = subscription.CountDetails.TransferDeadLetterMessageCount,
AutoDeleteOnIdle = new Time(subscription.AutoDeleteOnIdle),
DefaultMessageTTL = new Time(subscription.DefaultMessageTimeToLive),
LockDuration = new Time(subscription.LockDuration),
CreatedAt = subscription.CreatedAt,
UpdatedAt = subscription.UpdatedAt,
AccessedAt = subscription.AccessedAt,
};
}

private static Rule MapRule(RuleDescription r)
{
if (r.Filter is SqlFilter)
{
Expand Down
Loading

0 comments on commit e91fdfd

Please sign in to comment.