-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtestreceiver.pas
159 lines (136 loc) · 4.48 KB
/
testreceiver.pas
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
unit TestReceiver;
{$mode ObjFPC}{$H+}
interface
uses
Classes,
SysUtils,
Crt,
IdTCPClient,
NSQFunctions, NSQReceiver, NSQLookup, NSQTypes;
procedure TestNSQTopic;
procedure TestNSQReceiver;
procedure TestNSQLookup;
implementation
var NSQ_TEST_TOPIC: string = 'igraci';
NSQ_TEST_CHANNEL: string = 'dinko-test';
NSQ_TEST_MESSAGE1: string = 'This is some text';
NSQ_TEST_MESSAGE2: string = 'Second Message';
procedure MyNSQCallback(InTimestampNanosecond: Int64;
InAttempts: Int32;
InMessageID: string;
InBody: string;
var OutHowToHandle: TNSQCallbackResponse;
var OutParam: Int32
);
begin
if NSQ_DEBUG then begin
NSQWrite('MyNSQCallback Received message: %s', [InBody]);
end;
OutHowToHandle := nsqCallFIN;
OutParam := 0;
end;
procedure TestNSQTopic;
var Topic: TClsNSQTopic;
Producer: TNSQProducer;
begin
// parse test
Topic := TClsNSQTopic.Create;
Topic.GetTopicData(NSQ_LOOKUP_URL, 'igraci', NSQ_COMPUTER_NAME);
if Topic._producers.GetCount > 0 then begin
Producer := Topic._producers.GetItem(0)._data;
NSQ_IP := Producer.broadcast_address;
NSQ_PORT := Producer.tcp_port;
end;
FreeAndNil(Topic);
end;
procedure SendSomething(InTCPClient: TIdTCPClient; InObject: TObject);
var F: Integer;
MyKey: Char;
begin
F := 0;
while true do begin
if KeyPressed then begin
F := F +1;
MyKey := ReadKey;
crt.ClrScr;
NSQWrite('Press ctrl-c to finish', []);
NSQWrite('Possible options (1-One msg; 2-Many msg, 3-Delay msg, 4-CloseConnection', []);
NSQWrite('****************', []);
NSQWrite(FormatDateTime('yyyy-mm-dd hh:nn:ss.zzz', Now), []);
if (MyKey = ^c) then begin
if InObject is TNSQReceiverThread then begin
NSQWrite('Received ^c for TNSQReceiverThread', []);
(InObject as TNSQReceiverThread).Terminate;
end
else if InObject is TNSQLookupThread then begin
NSQWrite('Received ^c for TNSQLookupThread',[]);
(InObject as TNSQLookupThread).TerminateThread;
end;
break;
end;
if (InTCPClient = nil) OR ((InTCPClient <> nil) and (InTCPClient.Connected = false)) then begin
// lookup takes some time to connect and find producers
if InObject is TNSQReceiverThread then begin
InTCPClient := (InObject as TNSQReceiverThread)._tcpClient;
end
else if InObject is TNSQLookupThread then begin
InTCPClient := (InObject as TNSQLookupThread)._nsqTopic.GetTcpClient;
end;
end;
if InTCPClient <> nil then begin
if InTCPClient.Connected then begin
if MyKey = '1' then begin
NSQWrite('1. Publish one message', []);
NSQWritePUBMessage(InTCPClient, NSQ_TEST_TOPIC, NSQ_TEST_MESSAGE1 + IntToStr(F));
end
else if MyKey = '2' then begin
NSQWrite('2. Publish many messages', []);
NSQWriteMPUBMessage(InTCPClient, NSQ_TEST_TOPIC, [NSQ_TEST_MESSAGE1 + IntToStr(F), NSQ_TEST_MESSAGE2 + IntToStr(F)]);
end
else if MyKey = '3' then begin
NSQWrite('3. Publish message with delay', []);
NSQWriteDPUBMessage(InTCPClient, NSQ_TEST_TOPIC, 10000, NSQ_TEST_MESSAGE1 + IntToStr(F));
end
else if MyKey = '4' then begin
NSQWrite('4. Close connection to queue nicely', []);
NSQWriteCLSMessage(InTCPClient);
InTCPClient := nil;
end;
end
else begin
NSQWrite('Client is not connected ', [])
end;
end
else begin
NSQWrite('Client is nil ', [])
end;
end
end;
end;
procedure TestNSQReceiver;
var Reader: TNSQReceiverThread;
begin
// Create reader
NSQ_DEBUG := true;
Reader := TNSQReceiverThread.Create(NSQ_IP, NSQ_PORT, NSQ_TEST_TOPIC, NSQ_TEST_CHANNEL);
Reader.InstallCallback(@MyNSQCallback);
Reader.Start;
SendSomething(Reader._tcpClient, Reader);
FreeAndNil(Reader);
NSQWrite('', []);
NSQWrite('Finished', []);
end;
procedure TestNSQLookup;
var Lookup: TNSQLookupThread;
begin
// Create reader
NSQ_DEBUG := true;
Lookup := TNSQLookupThread.Create(NSQ_LOOKUP_URL, NSQ_TEST_TOPIC, NSQ_TEST_CHANNEL, 10);
Lookup.InstallDataCallback(@MyNSQCallback);
Lookup.Start;
SendSomething(Lookup._nsqTopic.GetTcpClient, Lookup);
FreeAndNil(Lookup);
NSQWrite('',[]);
NSQWrite('Finished', []);
end;
end.