-
Notifications
You must be signed in to change notification settings - Fork 8
/
LocalRest.d
2715 lines (2252 loc) · 79.2 KB
/
LocalRest.d
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
/*******************************************************************************
Provides utilities to mock a network in unittests
This module is based on the idea that D `interface`s can be used
to represent a server's API, and that D `class` inheriting this `interface`
are used to define the server's business code,
abstracting away the communication layer.
For example, a server that exposes an API to concatenate two strings would
define the following code:
---
interface API { public string concat (string a, string b); }
class Server : API
{
public override string concat (string a, string b)
{
return a ~ b;
}
}
---
Then one can use "generators" to define how multiple process communicate
together. One such generator, that pioneered this design is `vibe.web.rest`,
which allows to expose such servers as REST APIs.
`localrest` is another generator, which uses message passing and threads
to create a local "network".
The motivation was to create a testing library that could be used to
model a network at a much cheaper cost than spawning processes
(and containers) would be, when doing integration tests.
Control_Interface:
When instantiating a `RemoteAPI`, one has the ability to call foreign
implementations through auto-generated `override`s of the `interface`.
In addition to that, as this library is intended for testing,
a few extra functionalities are offered under a control interface,
accessible under the `ctrl` namespace in the instance.
The control interface allows to make the node unresponsive to one or all
methods, for some defined time or until unblocked, as well as trigger
shutdowns or restart. See the methods for more details.
The `withTimeout` control method can be used to spawn a scoped copy
of the RemoteAPI with a custom configured timeout. The user-provided
delegate will be called with this scoped copy that uses the new timeout.
Shutdown:
The control interface has a shutdown method that can be used to terminate
a node gracefully. When the shutdown request is handled by the node,
the event loop will exit and the thread will terminate. While the destructor
of the node will be called, it might not usable for some actions, for example
because D destructors may not allocate GC memory, or one may want
to perform some test-specific operations, such a logging some data in case of failure.
Therefore, you may provide a shutdown routine in the call to `shutdown`.
It must accept a single argument of the interface type, and will be called
with the implementation object just before the node is destroyed.
If this routine throws, LocalRest will log the error in the console and
proceed with destroying the stack-allocated node.
Note that control requests are asynchronous, hence requests from the node
might be processed / send by the node until the request is actually processed.
There is also a `restart` method which accepts the same callback argument.
Event_Loop:
Server process usually needs to perform some action in an asynchronous way.
Additionally, some actions needs to be completed at a semi-regular interval,
for example based on a timer.
For those use cases, a node should call `runTask` or `sleep`, respectively.
Note that this has the same name (and purpose) as Vibe.d's core primitives.
Users should only ever call Vibe's `runTask` / `sleep` with `vibe.web.rest`,
or only call LocalRest's `runTask` / `sleep` with `RemoteAPI`.
Implementation:
In order for tests to simulate an asynchronous system accurately,
multiple nodes need to be able to run concurrently and asynchronously.
There are two common solutions to this, to use either fibers or threads.
Fibers have the advantage of being simpler to implement and predictable.
Threads have the advantage of more accurately describing an asynchronous
system and thus have the ability to uncover more issues.
When spawning a node, a thread is spawned, a node is instantiated with
the provided arguments, and an event loop waits for messages sent
to the Tid. Messages consist of the sender's Tid, the mangled name
of the function to call (to support overloading) and the arguments,
serialized as a JSON string.
Note:
While this module's original motivation was to test REST nodes,
the only dependency to Vibe.d is actually to it's JSON module,
as Vibe.d is the only available JSON module known to the author
to provide an interface to deserialize composite types.
This dependency is however not enforced by the dub project file,
as users can provide their own serializer (see `geod24.Serialization`).
If the default parameter for serialization is never used,
one's project need not depend on `vibe-d:data`.
Author: Mathias 'Geod24' Lang
License: MIT (See LICENSE.txt)
Copyright: Copyright (c) 2018-2019 Mathias Lang. All rights reserved.
*******************************************************************************/
module geod24.LocalRest;
static import C = geod24.concurrency;
import geod24.Serialization;
import std.datetime.systime : Clock, SysTime;
import std.format;
import std.meta : AliasSeq;
import std.traits : fullyQualifiedName, Parameters, ReturnType;
import core.thread;
import core.time;
/// Data sent by the caller
private struct Command
{
/// Sequence id of Command
size_t id;
/// Method to call
string method;
/// Serialized arguments to the method
SerializedData args;
}
/// Ask the node to exhibit a certain behavior for a given time
private struct TimeCommand
{
/// For how long our remote node apply this behavior
Duration dur;
/// Whether or not affected messages should be dropped
bool drop = false;
/// Whether or not all tasks should be suspended
bool suspend = false;
}
/// Ask the node to shut down
private struct ShutdownCommand
{
/// Any callback to call before the Node's destructor is called
void function (Object) callback;
/// Whether we're restarting or really shutting down
bool restart;
}
/// Filter out requests before they reach a node
private struct FilterAPI
{
/// the mangled symbol name of the function to filter
string func_mangleof;
/// used for debugging
string pretty_func;
}
/// Status of a request
private enum Status
{
/// Request failed
Failed,
/// The request failed to to a client error (4xx style error code)
ClientFailed,
/// Request timed-out
Timeout,
/// Request succeeded
Success
}
/// Data sent by the callee back to the caller
private struct Response
{
/// Final status of a request (failed, timeout, success, etc)
Status status;
/// Response id
size_t id;
/// If `status == Status.Success`, the serialized return value.
/// Otherwise, it contains `Exception.toString()`.
SerializedData data;
}
/// Thrown when the sent request is faulty (e.g. 4xx HTTP error types)
public class ClientException : Exception
{
/// Constructor
public this (string msg,
string file = __FILE__, int line = __LINE__, Exception next = null)
@safe pure nothrow
{
super(msg, file, line, next);
}
}
/// Simple exception to unwind the stack when we need to terminate/restart
private final class ExitException : Exception
{
public bool restart;
this () @safe pure nothrow @nogc
{
super("You should never see this exception - please report a bug");
}
}
/// Simple wrapper to deal with tuples
/// Vibe.d might emit a pragma(msg) when T.length == 0
private struct ArgWrapper (T...)
{
static if (T.length == 0)
size_t dummy;
T args;
}
// very simple & limited variant, to keep it performant.
// should be replaced by a real Variant later
private struct Variant
{
this (Command msg) { this.cmd = msg; this.tag = Variant.Type.command; }
this (Response msg) { this.res = msg; this.tag = Variant.Type.response; }
this (FilterAPI msg) { this.filter = msg; this.tag = Variant.Type.filter; }
this (TimeCommand msg) { this.time = msg; this.tag = Variant.Type.timeCommand; }
this (ShutdownCommand msg) { this.shutdown = msg; this.tag = Variant.Type.shutdownCommand; }
union
{
Command cmd;
Response res;
FilterAPI filter;
TimeCommand time;
ShutdownCommand shutdown;
}
Type tag;
/// Type of a request
enum Type : ubyte
{
command,
response,
filter,
timeCommand,
shutdownCommand,
}
}
private alias CommandChn = C.Channel!Variant;
private alias RespChn = C.Channel!Response;
/// Represents a connection between a server and a client
private class Connection
{
///
this () nothrow
{
this.command_chn = new CommandChn();
this.resp_chn = new RespChn();
}
/*******************************************************************************
Send a message over the Connection
Params:
T = Type of the message, should be support by the `Variant` type
msg = Message to be sent
Returns:
Success/failure
*******************************************************************************/
bool sendCommand (T) (T msg) @trusted
{
bool ret;
if (isMainThread())
scheduler.start({ ret = this.command_chn.write(Variant(msg)); });
else
ret = this.command_chn.write(Variant(msg));
return ret;
}
/*******************************************************************************
Get a unique id for a `Command` to be sent from this `Connection`
Returns:
Unique Command id
*******************************************************************************/
size_t getNextCommandId () @safe
{
return this.next_cmd_id++;
}
/*******************************************************************************
Wait for a `Response` with specific id
Params:
resp_id = Response id to wait for
timeout = timeout duration for the operation
Returns:
Response
*******************************************************************************/
Response waitResponse (size_t resp_id, Duration timeout) @trusted
{
if (isMainThread())
{
Response res;
scheduler.start({
// Loop until we get the Response we are looking for
while (this.resp_chn.read(res, timeout))
if (res.id == resp_id)
return;
res = Response(Status.Timeout, resp_id, SerializedData("Request timed-out"));
});
return res;
}
else
{
// Response may already be ready
if (auto existing_res = (resp_id in this.waiting_list))
return existing_res.res;
// Block while waiting the Response
auto blocker = C.thisScheduler().new FiberBlocker();
this.waiting_list[resp_id] = Waiting(blocker);
if (blocker.wait(timeout))
return this.waiting_list[resp_id].res;
else
return Response(Status.Timeout, resp_id, SerializedData("Request timed-out"));
}
}
/*******************************************************************************
Notify the task waiting for a Response
Params:
res = Newly arrived Response
*******************************************************************************/
void notifyWaiter (Response res)
{
// Is the waiting Fiber already blocked?
if (auto waiter = (res.id in this.waiting_list))
{
waiter.res = res;
waiter.blocker.notify();
}
else // Fiber is not yet blocked, create an entry for the related Fiber to use
this.waiting_list[res.id] = Waiting(null, res);
}
/*******************************************************************************
Close the `Channel`s associated with this `Connection`. Blocked `waitResponse`
calls will timeout and blocked `sendCommand` calls will fail
*******************************************************************************/
void close ()
{
this.command_chn.close();
this.resp_chn.close();
}
///
private struct Waiting
{
C.FiberScheduler.FiberBlocker blocker;
Response res;
}
/// List of Fibers waiting for a Response from this Connection
private Waiting[size_t] waiting_list;
/// Next Command ID
private size_t next_cmd_id;
/// Channel to send `Command`s to
private CommandChn command_chn;
/// Channel to read `Response`s from
private RespChn resp_chn;
}
private struct AwaitingMessage
{
/// Message
public Variant var;
/// Originating `Connection`
public Connection conn;
}
// Used for controling filtering / sleep within the server implementation
private struct Control
{
public FilterAPI filter; // filter specific messages
public SysTime sleep_until; // sleep until this time
public bool drop; // drop messages if sleeping
public bool suspend; // suspend all other tasks
bool isSleeping () const @safe /* nothrow: Not `nothrow` on Windows */
{
return this.sleep_until != SysTime.init
&& Clock.currTime < this.sleep_until;
}
}
/// `Channel` type that the nodes will listen for new `Connection`s
public alias BindChn = C.Channel!Connection;
/// Thread local outgoing `Connection` list
private Connection[RespChn] outgoing_conns;
/// We need a scheduler to simulate an event loop and to be re-entrant
private C.FiberScheduler scheduler;
/***********************************************************************
Check if the current context is running inside the main thread and
intialize the thread local fiber scheduler if it is not initialized
Returns:
If the context is the main thread or not
***********************************************************************/
private bool isMainThread () @trusted nothrow
{
// we are in the main thread
if (scheduler is null)
scheduler = new C.FiberScheduler;
return Fiber.getThis() is null;
}
/*******************************************************************************
Provide eventloop-like functionalities
Since nodes instantiated via this modules are Vibe.d server,
they expect the ability to run an asynchronous task ,
usually provided by `vibe.core.core : runTask`.
In order for them to properly work, we need to integrate them to our event
loop by providing the ability to spawn a task, and wait on some condition,
optionally with a timeout.
The following functions do that.
Note that those facilities are not available from the main thread,
while is supposed to do tests and doesn't have a scheduler.
*******************************************************************************/
public void runTask (void delegate() dg) nothrow
{
assert(scheduler !is null, "Cannot call this delegate from the main thread");
scheduler.spawn(dg);
}
/// Ditto
public void sleep (Duration timeout) nothrow
{
assert(!isMainThread(), "Cannot call this function from the main thread");
// Duration.init (0.seconds) is infinite timeout, ignore
if (timeout == Duration.init)
return;
scope blocker = scheduler.new FiberBlocker();
blocker.wait(timeout);
}
/*******************************************************************************
Run an asynchronous task after a given time.
The task will first run after the given `timeout`, and
can either repeat or run only once (the default).
Works similarly to Vibe.d's `setTimer`.
Params:
timeout = Determines the minimum amount of time that elapses before
the timer fires.
dg = If non-null, this delegate will be called when the timer fires
periodic = Speficies if the timer fires repeatedly or only once
Returns:
A `Timer` instance with the ability to control the timer
*******************************************************************************/
public Timer setTimer (Duration timeout, void delegate() @safe nothrow dg,
bool periodic = false) @safe nothrow
{
assert(scheduler !is null, "Cannot call this delegate from the main thread");
Timer timer = createTimer(dg);
timer.rearm(timeout, periodic);
return timer;
}
/*******************************************************************************
Creates a new timer without arming it
Works similarly to Vibe.d's `createTimer`
Params:
dg = If non-null, this delegate will be called when the timer fires
Returns:
A `Timer` instance with the ability to control the timer
*******************************************************************************/
public Timer createTimer (void delegate() @safe nothrow dg) @safe nothrow
{
assert(dg !is null, "Cannot call this delegate if null");
return new Timer(dg);
}
/// Simple timer
public final class Timer
{
private Duration timeout;
private void delegate () @safe nothrow dg;
// Whether this timer is repeating
private bool periodic;
// Whether this timer was stopped
private bool stopped;
// Whether this timer has a Fiber running
private bool running;
// Whether this timer is waiting for timeout
private bool _pending;
// Whether this timer is rearmed in its handler
private bool rearmed;
@safe nothrow:
public this (void delegate() @safe nothrow dg)
{
this.dg = dg;
this.stopped = true;
this.rearmed = false;
}
// Run a delegate after timeout, and until this.periodic is false
private void run () nothrow
{
scope (exit)
{
this.running = false;
this._pending = false;
}
do
{
this.rearmed = false;
this._pending = true;
() @trusted
{
sleep(timeout);
} ();
this._pending = false;
if (this.stopped)
return;
dg();
} while (this.periodic || this.rearmed);
}
/// Stop the timer. The next time this timer's fiber wakes up
/// it will exit the run() function.
public void stop ()
{
this.stopped = true;
this.periodic = false;
}
/// Rearm a timer
public void rearm (Duration timeout, bool periodic)
{
this.timeout = timeout;
this.periodic = periodic;
this.stopped = false;
if (!this.running)
{
this.running = true;
() @trusted
{
scheduler.schedule(&run);
} ();
}
else
this.rearmed = true;
}
/// True if timer is yet to fire
@property bool pending ()
{
return this._pending;
}
}
/*******************************************************************************
A reference to the "listening" connection of a remote thread
When a remote thread starts, it initially listens for new connection
(similarly to `bind` in C). When a new connection is started, it creates
a separate channel for communication (similar to `accept` in C).
This newly-created channel is what `RemoteAPI` wraps.
The channel / link / connection to the original listener, which is the only
one able to establish a connection, is what this data structure wraps.
*******************************************************************************/
public struct Listener (API)
{
/// Internal data, do not use
package BindChn data;
}
/*******************************************************************************
A reference to an alread-instantiated node
This class serves the same purpose as a `RestInterfaceClient`:
it is a client for an already instantiated rest `API` interface.
In order to instantiate a new server (in a remote thread), use the static
`spawn` function.
Serialization:
In order to support custom serialization policy, one can change the
`Serializer` parameter. This parameter is expected to be either a
template or an aggregate with two static methods, but no explicit
limitation is put on the type.
See `geod24.Serialization`'s documentation for more informations.
Params:
API = The interface defining the API to implement
S = An aggregate which follows the requirement explained above.
*******************************************************************************/
public final class RemoteAPI (API, alias S = VibeJSONSerializer!()) : API
{
static assert (!serializerInvalidReason!(S).length, serializerInvalidReason!S);
/***************************************************************************
Instantiate a node and start it
This is usually called from the main thread, which will start all the
nodes and then start to process request.
In order to have a connected network, no nodes in any thread should have
a different reference to the same node.
In practice, this means there should only be one `Tid` per "address".
Note:
When the `RemoteAPI` returned by this function is finalized,
the child thread will be shut down.
Params:
Impl = Type of the implementation to instantiate
args = Arguments to the object's constructor
timeout = (optional) timeout to use with requests
file = Path to the file that called this function (for diagnostic)
line = Line number tied to the `file` parameter
Returns:
A `RemoteAPI` owning the node reference
***************************************************************************/
public static RemoteAPI spawn (Impl) (
CtorParams!Impl args, Duration timeout = 5.seconds,
string file = __FILE__, int line = __LINE__)
{
auto chn = new BindChn();
new Thread(
{
spawned!(Impl)(chn, file, line, args);
}).start();
return new RemoteAPI(Listener!API(chn), timeout);
}
/// Helper template to get the constructor's parameters
private static template CtorParams (Impl)
{
static if (is(typeof(Impl.__ctor)))
private alias CtorParams = Parameters!(Impl.__ctor);
else
private alias CtorParams = AliasSeq!();
}
/***************************************************************************
Handler function
Performs the dispatch from `cmd` to the proper `node` function,
provided the function is not filtered.
Params:
cmd = the command to run (contains the method name and the arguments)
node = the node to invoke the method on
filter = used for filtering API calls (returns default response)
resp_chn = `Channel` to send the `Response` to
***************************************************************************/
private static void handleCommand (Command cmd, API node, FilterAPI filter, RespChn resp_chn)
{
switch (cmd.method)
{
static foreach (member; __traits(allMembers, API))
static foreach (ovrld; __traits(getOverloads, API, member))
{
mixin(
q{
case `%2$s`:
Response res = Response(Status.Failed, cmd.id);
// Provide informative message in case of filtered method
if (cmd.method == filter.func_mangleof)
res.data = SerializedData(format("Filtered method '%%s'", filter.pretty_func));
else
{
auto args = S.deserialize!(ArgWrapper!(Parameters!ovrld))(
cmd.args.getS!S);
try
{
static if (!is(ReturnType!ovrld == void))
res.data = SerializedData(S.serialize(node.%1$s(args.args)));
else
node.%1$s(args.args);
res.status = Status.Success;
}
catch (Exception e)
{
res.status = Status.ClientFailed;
res.data = SerializedData(e.toString());
}
}
resp_chn.write(res);
return;
}.format(member, ovrld.mangleof));
}
default:
resp_chn.write(Response(Status.ClientFailed, cmd.id, SerializedData("Method not found")));
break;
}
}
/***************************************************************************
Main dispatch function
This function receive string-serialized messages from the calling thread,
which is a struct with the method's mangleof, and the method's arguments
as a tuple, serialized to a JSON string.
Params:
Implementation = Type of the implementation to instantiate
bind_chn = The channel on which to "listen" to receive new "connections"
file = Path to the file that spawned this node
line = Line number in the `file` that spawned this node
cargs = Arguments to `Implementation`'s constructor
***************************************************************************/
private static void spawned (Implementation) (
BindChn bind_chn, string file, int line, CtorParams!Implementation cargs)
nothrow
{
import std.algorithm : each;
import std.range;
scope exc = new ExitException();
// The list of `Connection` we are listening to,
// equivalent to the list of fd in a select / epoll.
Connection[CommandChn] incoming_conns;
void runNode ()
{
scheduler = new C.FiberScheduler;
C.thisScheduler(scheduler);
scope node = new Implementation(cargs);
// Control the node behavior
Control control;
// we need to keep track of messages which were ignored when
// node.sleep() was used, and then handle each message in sequence.
AwaitingMessage[] await_msgs;
scheduler.start(() {
C.SelectEntry[] read_list;
C.SelectEntry[] write_list;
while (true)
{
Connection new_conn;
Response res;
Variant msg;
read_list.length = 0;
assumeSafeAppend(read_list);
write_list.length = 0;
assumeSafeAppend(write_list);
foreach (ref conn; incoming_conns)
if (!conn.command_chn.isClosed())
read_list ~= C.SelectEntry(conn.command_chn, &msg);
foreach (ref conn; outgoing_conns)
if (!conn.resp_chn.isClosed())
read_list ~= C.SelectEntry(conn.resp_chn, &res);
read_list ~= C.SelectEntry(bind_chn, &new_conn);
auto sel_ret = C.select(read_list, write_list, 10.msecs);
if (control.suspend && !control.isSleeping())
{
C.thisScheduler().exitCriticalSection();
control.suspend = false;
}
if (await_msgs.length > 0 && !control.isSleeping())
{
scheduler.spawn({
auto prev_await_msgs = await_msgs;
foreach (ref await_msg; prev_await_msgs)
handleCommand(await_msg.var.cmd, node, control.filter, await_msg.conn.resp_chn);
});
await_msgs.length = 0;
}
if (!sel_ret.success)
continue;
// Bind chn
if (cast(BindChn) read_list[sel_ret.id].selectable)
{
incoming_conns[new_conn.command_chn] = new_conn;
}
// Command
else if (auto comm_chn = cast(CommandChn) read_list[sel_ret.id].selectable)
{
auto curr_conn = incoming_conns[comm_chn];
switch (msg.tag)
{
case Variant.Type.command:
Command cmd = msg.cmd;
if (!control.isSleeping())
scheduler.spawn({
handleCommand(cmd, node, control.filter, curr_conn.resp_chn);
});
else if (!control.drop)
await_msgs ~= AwaitingMessage(msg, curr_conn);
break;
case Variant.Type.shutdownCommand:
ShutdownCommand e = msg.shutdown;
if (e.callback !is null)
e.callback(node);
if (!e.restart)
{
bind_chn.close();
incoming_conns.each!((conn) => conn.close());
}
outgoing_conns.each!((conn) => conn.close());
outgoing_conns.clear();
exc.restart = e.restart;
throw exc;
case Variant.Type.timeCommand:
TimeCommand s = msg.time;
control.sleep_until = Clock.currTime + s.dur;
control.drop = s.drop;
control.suspend = s.suspend;
if (control.suspend)
C.thisScheduler().enterCriticalSection();
break;
case Variant.Type.filter:
FilterAPI filter_api = msg.filter;
control.filter = filter_api;
break;
default:
assert(0, "Got invalid Variant.Type: " ~ msg.tag);
}
}
else if (auto resp_chn = cast(RespChn) read_list[sel_ret.id].selectable)
{
// Response
outgoing_conns[resp_chn].notifyWaiter(res);
}
}
});
}
try
{
while (true)
{
try runNode();
// We use this exception to exit the event loop
catch (ExitException e)
{
if (!e.restart)
break;
}
}
}
catch (Throwable t)
{
import core.stdc.stdio, core.stdc.stdlib, std.stdio;
printf("#### FATAL ERROR: %.*s\n", cast(int) t.msg.length, t.msg.ptr);
printf("This node was started at %.*s:%d\n",
cast(int) file.length, file.ptr, line);
printf("This most likely means that the node crashed due to an uncaught exception\n");
printf("If not, please file a bug at https://github.com/Geod24/localrest/\n");
try writeln("Full error: ", t);
catch (Exception e) { /* Nothing more we can do at this point */ }
// Workaround Github CI issue:
// https://github.com/actions/runner/issues/1235
fflush(core.stdc.stdio.stdout);
abort();
}
}
/// Timeout to use when issuing requests
private const Duration timeout;
/// Main Channel that this Node will listen for incoming messages
private BindChn bind_chn;
/// Connection between this instance and the node main thread
private Connection conn;
/***************************************************************************
Create an instante of a client
This connects to an already instantiated node.
In order to instantiate a node, see the static `spawn` function.
Params:
listener = The listener used to connect to the node (most frequently
obtained by calling `geod24.concurrency.locate`)
timeout = any timeout to use
***************************************************************************/
public this (IncomingAPI) (Listener!IncomingAPI listener, Duration timeout = 5.seconds)
@trusted nothrow
if (is(IncomingAPI : API))
{
import std.exception : assumeWontThrow;
this.bind_chn = listener.data;
this.timeout = timeout;
assert(bind_chn);
// Create a new Connection, register it and send it to the peer
// TODO: What to do when bind_chn is closed?
this.conn = new Connection();
outgoing_conns[this.conn.resp_chn] = this.conn;
if (isMainThread())
assumeWontThrow(scheduler.start({ bind_chn.write(this.conn); }));
else
bind_chn.write(this.conn);
}
/***************************************************************************
Introduce a namespace to avoid name clashes
The only way we have a name conflict is if someone exposes `ctrl`,
in which case they will be served an error along the following line:
LocalRest.d(...): Error: function `RemoteAPI!(...).ctrl` conflicts
with mixin RemoteAPI!(...).ControlInterface!() at LocalRest.d(...)
***************************************************************************/
public mixin ControlInterface!() ctrl;
/// Ditto