-
Notifications
You must be signed in to change notification settings - Fork 46
/
AutofacBusTest.cs
183 lines (153 loc) · 7.2 KB
/
AutofacBusTest.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;
using Autofac;
using Newtonsoft.Json;
using NsqSharp.Api;
using NsqSharp.Bus;
using NsqSharp.Bus.Configuration;
using NsqSharp.Bus.Configuration.BuiltIn;
using NsqSharp.Tests.Bus.TestFakes;
using NsqSharp.Utils.Extensions;
using NUnit.Framework;
namespace NsqSharp.Tests.Bus
{
#if !RUN_INTEGRATION_TESTS
[TestFixture(IgnoreReason = "NSQD Integration Test")]
#else
[TestFixture]
#endif
public class AutofacBusTest
{
private static readonly NsqdHttpClient _nsqdHttpClient;
private static readonly NsqLookupdHttpClient _nsqLookupdHttpClient;
static AutofacBusTest()
{
_nsqdHttpClient = new NsqdHttpClient("http://127.0.0.1:4151", TimeSpan.FromSeconds(5));
_nsqLookupdHttpClient = new NsqLookupdHttpClient("http://127.0.0.1:4161", TimeSpan.FromSeconds(5));
}
[Test]
public void AutofacContainerTest()
{
string topicName = string.Format("test_autofac_{0}", DateTime.Now.UnixNano());
const string channelName = "test_autofac";
var builder = new ContainerBuilder();
builder.RegisterType<TestDependency>().As<ITestDependency>();
var container = builder.Build();
try
{
var objectBuilder = new AutofacObjectBuilder(container);
var busConfiguration =
new BusConfiguration(
objectBuilder,
new NewtonsoftJsonSerializer(typeof(JsonConverter).Assembly),
new MessageAuditorStub(),
new MessageTypeToTopicDictionary(new Dictionary<Type, string>
{
{ typeof(TestMessage), topicName }
}),
new HandlerTypeToChannelDictionary(new Dictionary<Type, string>
{
{ typeof(TestMessageHandler), channelName }
}),
defaultNsqLookupdHttpEndpoints: new[] { "127.0.0.1:4161" },
defaultThreadsPerHandler: 1,
nsqConfig: new Config
{
LookupdPollJitter = 0,
LookupdPollInterval = TimeSpan.FromSeconds(1)
},
preCreateTopicsAndChannels: true);
BusService.Start(busConfiguration);
var bus = objectBuilder.GetInstance<IBus>();
var testMessage = new TestMessage { Message = "Hello!" };
bus.Send(testMessage);
var json = JsonConvert.SerializeObject(testMessage);
var jsonBytes = Encoding.UTF8.GetBytes(json);
var tuple = TestMessageHandler.GetMessageInfo();
var message = tuple.Item1;
var messageInformation = tuple.Item2;
Assert.IsNotNull(message, "message");
Assert.IsNotNull(messageInformation, "messageInformation");
Assert.AreSame(message, messageInformation.Message);
Assert.IsNotNull(message.Body, "message.Body");
Assert.AreEqual(jsonBytes, message.Body, "message.Body");
Assert.IsInstanceOf<TestMessage>(messageInformation.DeserializedMessageBody);
Assert.AreEqual(testMessage.Message, ((TestMessage)messageInformation.DeserializedMessageBody).Message);
Assert.IsNull(testMessage.ModifiedMessage);
Assert.AreEqual(topicName, messageInformation.Topic, "messageInformation.Topic");
Assert.AreEqual(channelName, messageInformation.Channel, "messageInformation.Channel");
Assert.AreNotEqual(Guid.Empty,
messageInformation.UniqueIdentifier,
"messageInformation.UniqueIdentifier");
}
finally
{
BusService.Stop();
_nsqdHttpClient.DeleteTopic(topicName);
_nsqLookupdHttpClient.DeleteTopic(topicName);
}
}
private class TestMessage
{
public string Message { get; set; }
public string ModifiedMessage { get; set; }
}
private interface ITestDependency
{
void SetModifiedMessage(TestMessage message);
}
// ReSharper disable once ClassNeverInstantiated.Local
private class TestDependency : ITestDependency
{
public void SetModifiedMessage(TestMessage message)
{
if (message == null)
throw new ArgumentNullException("message");
message.ModifiedMessage = string.Format("{0} - from TestDependency", message.Message);
}
}
private class TestMessageHandler : IHandleMessages<TestMessage>
{
private static readonly List<IMessage> _messages = new List<IMessage>();
private static readonly object _messagesLocker = new object();
private static readonly List<ICurrentMessageInformation> _messagesInfos =
new List<ICurrentMessageInformation>();
private static readonly AutoResetEvent _wait = new AutoResetEvent(initialState: false);
private readonly IBus _bus;
private readonly ITestDependency _testDependency;
public TestMessageHandler(IBus bus, ITestDependency testDependency)
{
if (bus == null)
throw new ArgumentNullException("bus");
if (testDependency == null)
throw new ArgumentNullException("testDependency");
_bus = bus;
_testDependency = testDependency;
}
public void Handle(TestMessage message)
{
_testDependency.SetModifiedMessage(message);
Assert.AreEqual(message.Message + " - from TestDependency", message.ModifiedMessage);
var currentMessageInformation = _bus.GetCurrentThreadMessageInformation();
var currentMessage = _bus.CurrentThreadMessage;
lock (_messagesLocker)
{
_messagesInfos.Add(currentMessageInformation);
_messages.Add(currentMessage);
_wait.Set();
}
}
public static Tuple<IMessage, ICurrentMessageInformation> GetMessageInfo()
{
bool signaled = _wait.WaitOne(TimeSpan.FromSeconds(10));
if (!signaled)
throw new Exception("AutoResetEvent not set");
Assert.AreEqual(1, _messages.Count, "_messages.Count");
Assert.AreEqual(1, _messagesInfos.Count, "_messagesInfos.Count");
return new Tuple<IMessage, ICurrentMessageInformation>(_messages[0], _messagesInfos[0]);
}
}
}
}