Skip to content

Commit

Permalink
Added baseThread and QueueManager
Browse files Browse the repository at this point in the history
  • Loading branch information
tbayne committed Oct 10, 2016
1 parent 3779403 commit b845a3f
Show file tree
Hide file tree
Showing 10 changed files with 2,955 additions and 2 deletions.
88 changes: 88 additions & 0 deletions BaseThread.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
using System;
using System.Threading;
using Metrics;
using NLog;

namespace ThreadSupport
{

/// <summary>
/// This BaseThread class creates a BlockingQueue of type ThreadMessage for the given thread. The name of the queue is
/// specified on the constructor.
///
/// Additionally:
/// this class creates and instantiates a thread, using the derived classes Runner() method.
/// this class adds the name of the queue to the queuemanager singleton object
///
/// </summary>
public abstract class BaseThread
{
private volatile bool _running;
private static readonly Logger logger = LogManager.GetCurrentClassLogger();


public string Qname { get; }

public bool Running
{
get { return _running; }
set { _running = value; }
}

private readonly BlockingQueue<ThreadMessage> _myQueue;
public BlockingQueue<ThreadMessage> MyQueue
{
get { return _myQueue; }
}

public string ThreadName { get; }

private Thread _thisThread;
public Thread ThisThread
{
get { return _thisThread; }
}

private static readonly QueueManager qm = QueueManager.Instance;

protected BaseThread(String threadName, String queueName)
{
// Set our name
ThreadName = threadName;

logger.Trace(ThreadName + "|Starting");

// Create our queue
_myQueue = new BlockingQueue<ThreadMessage>();
_myQueue.SetupQueueStats(queueName);

// Name our queue statistics counter
//_myQueue.SetupQueueStats(ThreadName + "Q");

// Add it to the queuemanager so other threads can easily locate this queue
qm.AddQueue(ref _myQueue, queueName);
Qname = queueName;

// Fire up the Runner method from the derived class
ThreadStart threadStarter = Runner;
_thisThread = new Thread(threadStarter);
}

public void Start()
{
_thisThread.Start();
}

protected bool TakeFunc(ThreadMessage tm, int i)
{
//_myQueue.DecrementQueueItemCounter();

if ((tm.Cmd == Defines.ThreadExitMsg))
{
//logger.Trace(ThreadName + "|Received Exit Command");
}
return (tm.Cmd != Defines.ThreadExitMsg);
}
protected abstract void Runner();
}
}
29 changes: 28 additions & 1 deletion BlockingQueue.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using Metrics;

namespace ThreadSupport
{
Expand All @@ -11,8 +12,24 @@ public class BlockingQueue<T> : IEnumerable<T>
//private Queue<T> _queue = new Queue<T>();
// ReSharper disable once FieldCanBeMadeReadOnly.Local
private BlockingCollection<T> _queue = new BlockingCollection<T>();
// Setup our Stats
private bool _enableStats = false;

public bool TryTake(int millisecondsTimeout, out T item)
private Counter _inBoundMessagesReceived;
private Meter _inBoundMsgsPerSecond;
private Counter _outBoundMessagesReceived;
private Meter _outBoundMsgsPerSecond;

public void SetupQueueStats(string QueueStatsName)
{
_inBoundMessagesReceived = Metric.Counter(QueueStatsName + "Received Messages", Unit.Custom("Incoming Messages"));
_inBoundMsgsPerSecond = Metric.Meter(QueueStatsName + "Inbound MPS", Unit.Items, TimeUnit.Seconds);
_outBoundMessagesReceived = Metric.Counter(QueueStatsName + "Processed Messages", Unit.Custom("Incoming Messages"));
_outBoundMsgsPerSecond = Metric.Meter(QueueStatsName + " Processed MPS", Unit.Items, TimeUnit.Seconds);
_enableStats = true;
}

public bool TryTake(int millisecondsTimeout, out T item)
{
var result = _queue.TryTake(out item, millisecondsTimeout);
return result;
Expand All @@ -22,6 +39,11 @@ public T Dequeue()
{
T result;
_queue.TryTake(out result);
if (_enableStats)
{
_outBoundMessagesReceived.Increment();
_outBoundMsgsPerSecond.Mark();
}
return result;
}

Expand All @@ -30,6 +52,11 @@ public void Enqueue(T data)
if (data == null) throw new ArgumentNullException(nameof(data));
_queue.TryAdd(data);
Count++;
if (_enableStats)
{
_inBoundMessagesReceived.Increment();
_inBoundMsgsPerSecond.Mark();
}
}

public IEnumerator<T> GetEnumerator()
Expand Down
8 changes: 8 additions & 0 deletions Defines.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
namespace ThreadSupport
{
public static class Defines
{
public const int ThreadExitMsg = 1;

}
}
42 changes: 42 additions & 0 deletions NLog.config
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
<?xml version="1.0" encoding="utf-8" ?>
<nlog xmlns="http://www.nlog-project.org/schemas/NLog.xsd"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.nlog-project.org/schemas/NLog.xsd NLog.xsd"
autoReload="true"
throwExceptions="false"
internalLogLevel="Off" internalLogFile="c:\temp\nlog-internal.log">

<!-- optional, add some variables
https://github.com/nlog/NLog/wiki/Configuration-file#variables
-->
<variable name="myvar" value="myvalue"/>

<!--
See https://github.com/nlog/nlog/wiki/Configuration-file
for information on customizing logging rules and outputs.
-->
<targets>
<target xsi:type="xsi:type="OutputDebugString" name="ODS" layout="${longdate} ${uppercase:${level}} ${message}"/>
<!--
add your targets here
See https://github.com/nlog/NLog/wiki/Targets for possible targets.
See https://github.com/nlog/NLog/wiki/Layout-Renderers for the possible layout renderers.
-->
<!--
Write events to a file with the date in the filename.
<target xsi:type="File" name="f" fileName="${basedir}/logs/${shortdate}.log"
layout="${longdate} ${uppercase:${level}} ${message}" />
-->
</targets>
<rules>
<!-- add your logging rules here -->
<!--
Write all events with minimal level of Debug (So Debug, Info, Warn, Error and Fatal, but not Trace) to "f"
<logger name="*" minlevel="Debug" writeTo="f" />
-->
<logger name="*" minlevel="Trace" writeTo="ODS" />
</rules>
</nlog>
Loading

0 comments on commit b845a3f

Please sign in to comment.