-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathStreamDataProducerPoll.cs
137 lines (121 loc) · 3.98 KB
/
StreamDataProducerPoll.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
/*==========================================================================;
*
* This file is part of LATINO. See http://latino.sf.net
*
* File: StreamDataProducerPoll.cs
* Desc: Stream data producer base class (polling)
* Created: Dec-2010
*
* Author: Miha Grcar
*
* License: MIT (http://opensource.org/licenses/MIT)
*
***************************************************************************/
using System;
using System.Threading;
namespace Latino.Workflows
{
/* .-----------------------------------------------------------------------
|
| Class StreamDataProducerPoll
|
'-----------------------------------------------------------------------
*/
public abstract class StreamDataProducerPoll : StreamDataProducer
{
private int mTimeBetweenPolls
= 1;
private bool mRandomDelayAtStart
= false;
private Random mRng
= new Random();
protected bool mStopped
= false;
private Thread mThread
= null;
public StreamDataProducerPoll(string loggerBaseName) : base(loggerBaseName)
{
}
public StreamDataProducerPoll(Type loggerType) : base(loggerType)
{
}
public bool RandomDelayAtStart
{
get { return mRandomDelayAtStart; }
set { mRandomDelayAtStart = value; }
}
public int TimeBetweenPolls
{
get { return mTimeBetweenPolls; }
set
{
Utils.ThrowException(value < 0 ? new ArgumentOutOfRangeException("TimeBetweenPolls") : null);
mTimeBetweenPolls = value;
}
}
private void ProduceDataLoop()
{
if (mRandomDelayAtStart)
{
Thread.Sleep(mRng.Next(0, mTimeBetweenPolls));
}
while (!mStopped)
{
try
{
// produce and dispatch data
object data = ProduceData();
DispatchData(data);
}
catch (Exception exc)
{
mLogger.Error("ProduceDataLoop", exc);
}
int sleepTime = Math.Min(500, mTimeBetweenPolls);
DateTime start = DateTime.Now;
while ((DateTime.Now - start).TotalMilliseconds < mTimeBetweenPolls)
{
if (mStopped) { mLogger.Info("ProduceDataLoop", "Stopped."); return; }
Thread.Sleep(sleepTime);
}
}
mLogger.Info("ProduceDataLoop", "Stopped.");
}
protected abstract object ProduceData();
// *** IDataProducer interface implementation ***
public override void Start()
{
if (!IsRunning)
{
mLogger.Debug("Start", "Starting ...");
mThread = new Thread(new ThreadStart(ProduceDataLoop));
mStopped = false;
mThread.Start();
mLogger.Debug("Start", "Started.");
}
}
public override void Stop()
{
if (IsRunning)
{
mLogger.Debug("Stop", "Stopping ...");
mStopped = true;
}
}
public override bool IsRunning
{
get { return mThread != null && mThread.IsAlive; }
}
// *** IDisposable interface implementation ***
public override void Dispose()
{
mLogger.Debug("Dispose", "Disposing ...");
if (IsRunning)
{
Stop();
while (IsRunning) { Thread.Sleep(100); }
}
mLogger.Debug("Dispose", "Disposed.");
}
}
}